rabbitMq(2)之“Work”模式

rabbitMq(2)之“Work”模式

1.架构图

  rabbitMq(2)之“Work”模式

 模式简介

  • 一个消息生产者P,一个消息存储队列Q,多个消息消费者C
  • Work模型能够较好的解决资源密集型场景的问题,不需要像Hello World那样孤注一掷的等唯一的消费者消费完
  • 多个消费者,多管齐下,更加高效的并行处理消息

2.实践应用

2.1  生产者

package com.rabbitmq.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectUtil;

public class Producer_Work {
    private static String QUEUE_NAME = "test_queue_work";
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        //获得连接
        Connection con =  ConnectUtil.getConnection();
        //获得通道
        Channel channel = con.createChannel();
        //声明创建队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //同一时刻只给消费者发送一条
//        channel.basicQos(1);
        //消息内容
        int i = 0;
        while(i<50){
            String message = "hello "+i;
            //发送消息队列
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            //发送成功,打印发送信息
            System.out.println("生产者发送消息是:"+message);
            i++;
            Thread.sleep(i*10);
        }
        //关闭通道和连接
        channel.close();
        con.close();
    }

}

2.2 消费者 

消费者1

package com.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.utils.ConnectUtil;

public class Work_Consumer1 {
    private static String QUEUE_NAME = "test_queue_work";
    public static void main(String[] argv) throws Exception{
        //获得连接
        Connection con =  ConnectUtil.getConnection();
        //获得通道
        Channel channel = con.createChannel();
        //声明创建队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //同一时刻只给消费者发送一条
        //channel.basicQos(1);
        //创建消息者
        QueueingConsumer consumer = new QueueingConsumer(channel); 
        //发送消息队列
        channel.basicConsume(QUEUE_NAME, true, consumer);
        while(true){
            Delivery delivery= consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("g 消费者接受的消息是:"+message);
            Thread.sleep(10);
            //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

消费者2

package com.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.utils.ConnectUtil;

public class Work_Consumer2 {
    private static String QUEUE_NAME = "test_queue_work";
    public static void main(String[] argv) throws Exception{
        //获得连接
        Connection con =  ConnectUtil.getConnection();
        //获得通道
        Channel channel = con.createChannel();
        //声明创建队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //创建消息者
        //同一时刻只给消费者发送一条
        //channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel); 
        //发送消息队列(true:自动确认,false:手动确认)
        channel.basicConsume(QUEUE_NAME, false, consumer);
        while(true){
            Delivery delivery= consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("g 消费者接受的消息是:"+message);
            Thread.sleep(1000);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

3.测试结果

  3.1测试结果:(默认是公平接受)

  1. 消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
  2. 消费者1和消费者2获取到的消息的数量是相同的,一个是奇数一个是偶数。
  3. 其实,这样是不合理的,应该是消费者1(sleep时间短,消费消息能力高)要比消费者2(sleep时间长,消费消息能力低)获取到的消息多才对。

  其实,只需要设置在同一时刻只有一个接受者就可以达到目的了(非公平模式)

  channel.basicQos(1);

  在两个消费者中添加以上两个代码即可。

  3.2修改后测试结果:

  消费者1比消费者2获取的消息更多。

  3.3 消息的确认模式

  消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

  模式1:自动确认 (消费者1)

  只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。

  模式2:手动确认(消费者2)

  消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。

4.总结

  • 启动两个消费者,他们消费同一个队列就可以实现Worker模式了,也就是多个消费者同时消费一个消息队列中的消息。这样就提高了消息队列的使用效率。 
  • 不再是一个消费者,需要注意公平和非公平模式的区分。
  • 其实还有一点需要注意,那就消费确认,如果不是手动设置了手动确认,而消费者使用后没有确认,队列即使没有收到确认反馈也就不可消费了。