【RabbitMQ消息中间件】7.订阅模式

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/u013517797/article/details/79506575上一篇我们了解了RabbitMQ的消息的确认模式,本篇我们继续讲解RabbitMQ的五大队列模式之一的“订阅模式”。在实际开发中,通常会遇到以下需一个生产者,多个消费者,同一个消息被多个消费者获取。
 
package cn.jack.rabbitmq.ps;
import cn.jack.rabbitmq.connection.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
 
public class Send {
	
    private final static String EXCHANGE_NAME="test_exchange_fanout";
    
    public static void main(String[] args) throws Exception {
		//获取到连接以及mq通道
    	Connection connection = ConnectionUtil.getConnection();
    	//从连接中创建通道
    	Channel channel = connection.createChannel();
    	
    	//声明Exchange
    	channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    	
    	//消息内容
    	String message = "Hello World!";
    	channel.basicPublish(EXCHANGE_NAME, "",null, message.getBytes());
    	System.out.println("[product] Send '"+ message +"'");
    	
    	//关闭通道和连接
    	channel.close();
    	connection.close();
	}
}

  

与之前不同的是,之前的生产者是声明(创建)队列:
 
//声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
而该生产者不再声明队列,即不再与队列直接连接,而是换为声明交换机:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

  

我们这里先不编写消费者,直接运行一下生产者:
【RabbitMQ消息中间件】7.订阅模式
在RabbitMQ的管理工具的“Exchange”模块下,我们可以看到刚刚创建的交换机:
【RabbitMQ消息中间件】7.订阅模式

此时还没有队列绑定到交换机,那么此时消息到了哪里了呢?可以这么说,消息“丢失”了。因为当我们尝试将信息发送到一个没有绑定队列的交换机时,由于交换机无法及时将信息推送至消息队列,而交换机本身没有存储信息的能力,则会导致信息丢失。所以这里我们要注意。

下面我们创建第一个消费者类:
【RabbitMQ消息中间件】7.订阅模式
 

package cn.jack.rabbitmq.ps;
import cn.jack.rabbitmq.connection.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
 
public class Recv1 {
 
    private final static String QUEUE_NAME = "test_queue_ps_1";//队列名称
    
    private final static String EXCHANGE_NAME="test_exchange_fanout";//交换机名称
 
    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
 
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        //绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
 
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
 
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);
 
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [consumer1] Received '" + message + "'");
            //休眠10ms
            Thread.sleep(10);
            // 返回确认状态
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

  

这里我们为消费者声明了一个队列:
 

// 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);

  

然后将该队列绑定到之前生产者绑定的交换机上:
 
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

  

这样就可以获取生产者通过交换机推送的信息了。
运行一下消费者,再运行一下生产者,可以发现消费者成功获取了信息:
【RabbitMQ消息中间件】7.订阅模式
再创建一个Recv2的消费者类,与Recv1代码相同,除了定义的队列名称不同即可:
private final static String QUEUE_NAME = "test_queue_ps_2";//队列名称
获取消息后的控制台打印记得更换名称为“consumer2”:
System.out.println(" [consumer2] Received '" + message + "'");
http://blog.csdn.net/acmman/article/details/79506575