activitemq的点对点模型的队列方式demo

创建一个由maven项目对应的pom文件为:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6 
 7     <groupId>org.ssc</groupId>
 8     <artifactId>activemq_demo</artifactId>
 9     <version>1.0-SNAPSHOT</version>
10 <dependencies>
11     <dependency>
12         <groupId>org.apache.activemq</groupId>
13         <artifactId>activemq-all</artifactId>
14         <version>5.16.1</version>
15     </dependency>
16     <!-- 日志相关依赖 -->
17     <dependency>
18         <groupId>org.slf4j</groupId>
19         <artifactId>slf4j-api</artifactId>
20         <version>1.6.1</version>
21     </dependency>
22     <dependency>
23         <groupId>org.slf4j</groupId>
24         <artifactId>slf4j-log4j12</artifactId>
25         <version>1.6.1</version>
26     </dependency>
27     <dependency>
28         <groupId>log4j</groupId>
29         <artifactId>log4j</artifactId>
30         <version>1.2.16</version>
31     </dependency>
32 </dependencies>
33 <build>
34     <resources>
35         <resource>
36             <directory>src/main/java</directory>
37             <includes>
38                 <include>**/*.xml</include>
39             </includes>
40         </resource>
41     </resources>
42 </build>
43 </project>

服务发送者

 1 package com.ssc.demo1;
 2 
 3 import org.apache.activemq.ActiveMQConnectionFactory;
 4 
 5 import javax.jms.*;
 6 
 7 public class Sender {
 8     //发送次数
 9     private static final int SEND_NUM=5;
10     //tcp地址
11     private static final String BROKER_url="tcp://127.0.0.1:61616";
12     public static final String DESTINATION = "test1.mq.queue";
13 
14     public static void main(String[] args) throws Exception {
15         Session session=null;
16         ConnectionFactory connectionFactory=null;
17         Connection connection=null;
18         try {
19             //第一步建立connectionFactory连接工厂,需要填入用户名,密码,以及要连接的地址
20          connectionFactory=new ActiveMQConnectionFactory("ssc","ssc",BROKER_url );
21        //第二步根据connectionFactory工厂对象给我们创建一个Connection连接,并且调用Connection的start方法开启连接
22         connection=connectionFactory.createConnection();
23         connection.start();
24         //第三步通过Connection对象创建session会话(上下文环境对象),
25         session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
26         //创建一个消息队列
27         Destination destination=session.createQueue(DESTINATION);
28         //创建消息制作者
29         MessageProducer messageProducer=session.createProducer(destination);
30         //创建持久化模式
31         messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
32         sendMessage(session,messageProducer);
33         session.commit();
34 
35         }catch (Exception e){
36             throw e;
37         }finally {
38             //关闭释放资源
39             if (session!=null){
40                 session.close();
41             }
42             if(connection!=null){
43                 connection.close();
44             }
45         }
46     }
47     public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
48         for (int i=0;i<SEND_NUM;i++){
49             String message="发送消息第"+(i+1)+"条";
50             TextMessage text=session.createTextMessage(message);
51             System.out.println(message);
52             messageProducer.send(text);
53         }
54     }
55 }

运行服务发送者后,查看activemq控制台,会发现,等待处理的数据是5条,进入队列的数据也是5条,此时该消息未被消费者消费

activitemq的点对点模型的队列方式demo

服务发送者

 1 package com.ssc.demo1;
 2 
 3 import org.apache.activemq.ActiveMQConnectionFactory;
 4 
 5 import javax.jms.*;
 6 
 7 public class Consumer{
 8     private static final String BROKER_URL="tcp://127.0.0.1:61616";
 9     public static void main(String[] args){
10         //连接工厂
11         ConnectionFactory connectionFactory=null;
12         //连接实例
13         Connection connection=null;
14         //收发线程
15         Session session=null;
16         //消息发送目标地址
17         Destination destination=null;
18         try {
19             //实例化连接工厂
20             connectionFactory=new ActiveMQConnectionFactory("ssc", "ssc", BROKER_URL);
21             //获取连接实例
22             connection=connectionFactory.createConnection();
23             //启动连接
24             connection.start();
25             //创建接收和发送的线程实例
26             session=connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
27             //创建一个消息队列
28             destination=session.createQueue("test1.mq.queue");
29             //创建消息消费者
30             MessageConsumer messageConsumer=session.createConsumer(destination);
31             //创建消息监听
32             messageConsumer.setMessageListener(new MQListener());
33         } catch (JMSException e) {
34             e.printStackTrace();
35         }
36     }
37 }

消息监听器

 1 package com.ssc.demo1;
 2 
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.MessageListener;
 6 import javax.jms.TextMessage;
 7 
 8 public class MQListener implements MessageListener {
 9 
10     public void onMessage(Message message) {
11         try{
12             System.out.println(((TextMessage)message).getText());
13         }catch (JMSException e){
14             e.printStackTrace();
15         }
16     }
17 }

运行消费者,后查看activemq控制台,会发现这5条消息会被消费

activitemq的点对点模型的队列方式demo