ActiveMQ的p2p模式与发布订阅模式
1.消息中间件:采用异步通讯防止,支持点对点以及发布订阅模式,可以解决高并发问题
传统调用接口,可能发生阻塞,重复提交,超时等等问题,可以利用消息中间件发送异步通讯请求
点对点:生产者 消息队列 消费者
发布订阅:生产者 主题 消费者1 消费者N
2.windows安装ActiveMQ
2.1 解压,进入apache-activemq-5.11.1inwin64
2.2 启动,双击activeMQ.bat脚本启动,启动窗口不要关闭,可以设置后台启动
2.3 启动完成后,如果发送消息或者消费消息通过61616端口进行 后台查看信息通过8161端口查看
2.4 进入后台登陆:默认用户名和密码都是admin
P2P (点对点)
P2P
- P2P模式图
2. 涉及到的概念
- 消息队列(Queue)
- 发送者(Sender)
- 接收者(Receiver)
- 每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
3. P2P的特点
- 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
- 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
- 接收者在成功接收消息之后需向队列应答成功
如果你希望发送的每个消息都应该被成功处理的话,那么你需要P2P模式。
应用场景
A用户与B用户发送消息
案例:
生产者:
public static void main(String[] args) throws JMSException { //步骤一:创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); //步骤二:创建连接 Connection connection = activeMQConnectionFactory.createConnection(); //步骤三:启动连接 connection.start(); //步骤四:获取会话工厂 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //步骤五:创建队列 Queue queue = session.createQueue("wdksoft_queue"); //创建消息生产者 MessageProducer producer = session.createProducer(queue); //消息持久化 producer.setDeliveryMode(2); //模拟消息 TextMessage textMessage = session.createTextMessage("hello activeMQ"); //发送消息 producer.send(textMessage); System.out.println("生产者生产消息完毕~"); //回收资源 session.close(); connection.close(); }
消费者:
public static void main(String[] args) throws JMSException { //步骤一:创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //步骤二:创建连接 Connection connection = activeMQConnectionFactory.createConnection(); //步骤三:开启连接 connection.start(); //创建会话对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //获取到接受消息的队列 Queue queue = session.createQueue("wdksoft_queue"); //创建消费者 MessageConsumer consumer = session.createConsumer(queue); while(true){ //获取消息 TextMessage message = (TextMessage)consumer.receive(); if(message!=null){ System.out.println("消费者获取消息:"+message.getText()); }else{ break; } } //回收资源 session.close(); connection.close(); }
在没有消费者服务启动时,生产者生产的消息会被暂存到消息队列中,等到有消费者服务启动时,就从消息队列中取走消息
点对点的方式,生产者生产的消息只能被一个消费者获取
当有多个消费者存在时,只有一个消费者会收到消息
如果想一个生产者发布了消息,多个消费者都能接收到,需要用到另一种模式,发布与订阅模式
Pub/Sub (发布与订阅)
Pub/Sub模式图
涉及到的概念
主题(Topic)
发布者(Publisher)
订阅者(Subscriber)
客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
Pub/Sub的特点
每个消息可以有多个消费者
发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。
为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
如果你希望发送的消息可以不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型
消息的消费
在JMS中,消息的产生和消息是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。
○ 同步
订阅者或接收者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞
○ 异步
订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。
应用场景:
用户注册、订单修改库存、日志存储
画图演示
案例:
消费者:
public static void main(String[] args) throws JMSException { //步骤一:创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //步骤二:创建连接 Connection connection = activeMQConnectionFactory.createConnection(); //步骤三:开启连接 connection.start(); //创建会话对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //获取到接受消息的队列 Topic topic = session.createTopic("wdksoft_topic"); //创建消费者 MessageConsumer consumer = session.createConsumer(topic); while(true){ //获取消息 TextMessage message = (TextMessage)consumer.receive(); if(message!=null){ System.out.println("消费者获取消息:"+message.getText()); }else{ break; } } //回收资源 session.close(); connection.close(); }
生产者:
public static void main(String[] args) throws JMSException { //步骤一:创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); //步骤二:创建连接 Connection connection = activeMQConnectionFactory.createConnection(); //步骤三:启动连接 connection.start(); //步骤四:获取会话工厂 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //步骤五:创建主题 Topic topic = session.createTopic("wdksoft_topic"); //创建消息生产者 MessageProducer producer = session.createProducer(null); //消息持久化 producer.setDeliveryMode(2); //模拟消息 TextMessage textMessage = session.createTextMessage("hello activeMQ pub"); //发送消息 producer.send(topic,textMessage); System.out.println("生产者生产消息完毕~"); //回收资源 session.close(); connection.close(); }
使用发布与订阅模式,必须先启动消费者,确定了有哪些订阅,生产者在向这些消费者发布消息