activeMq的发布/订阅式的主题方式demo

创建一个maven项目,先导入相关依赖,对应的pom文件为:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.ssc</groupId>
    <artifactId>activemq_demo</artifactId>
    <version>1.0-SNAPSHOT</version>
<dependencies>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.16.1</version>
    </dependency>
    <!-- 日志相关依赖 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.6.1</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.6.1</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.16</version>
    </dependency>
</dependencies>
<build>
    <resources>
        <resource>
            <directory>src/main/java</directory>
            <includes>
                <include>**/*.xml</include>
            </includes>
        </resource>
    </resources>
</build>
</project>

创建一个消息生产者

 1 package com.ssc.demo2;
 2 
 3 import org.apache.activemq.ActiveMQConnectionFactory;
 4 
 5 import javax.jms.*;
 6 
 7 public class Producer {
 8     private static final String BROKER_URL="tcp://127.0.0.1:61616";
 9     public static void main(String[] args) throws JMSException {
10         //连接工厂
11         ConnectionFactory connectionFactory=null;
12         //连接实例
13        Connection connection=null;
14        //收发的线程实例
15         Session session=null;
16         //消息发送目标地址
17         Destination destination=null;
18         try{
19             //实例化连接工厂
20             connectionFactory=new ActiveMQConnectionFactory("ssc", "ssc", BROKER_URL);
21             //获取连接实例
22             connection=connectionFactory.createConnection();
23             //启动连接
24             connection.start();
25             //创建接收或发送的线程实例
26             session =connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
27             //创建队列
28             destination=session.createTopic("test.topic");
29             //创建消息发布者
30             MessageProducer messageProducer=session.createProducer(destination);
31             //创建TestMessage消息
32             TextMessage message=session.createTextMessage("你好测试,这是发布的消息");
33             messageProducer.send(message);
34             session.commit();
35         }catch (Exception e){
36             e.printStackTrace();
37         }finally {
38             if(session!=null){
39                 session.close();
40             }
41             if(connection!=null){
42                 connection.close();
43             }
44         }
45     }
46 }

查看activitemq控制台,控制台显示如下:

activeMq的发布/订阅式的主题方式demo

 创建两个订阅者,分别是:

 1 package com.ssc.demo2;
 2 
 3 import org.apache.activemq.ActiveMQConnectionFactory;
 4 
 5 import javax.jms.*;
 6 
 7 public class Customer01 {
 8     private static final String BROKER_URL="tcp://127.0.0.1:61616";
 9    public static void main(String[] args){
10        //连接工厂
11        ConnectionFactory connectionFactory=null;
12        //连接实例
13        Connection connection=null;
14        //收发的线程信息
15        Session session=null;
16        //消息发送目标地址
17        Destination destination=null;
18        try{
19            //创建连接工厂
20            connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD, BROKER_URL);
21            //获取连接实例
22            connection=connectionFactory.createConnection();
23            //启动连接
24            connection.start();
25            //创建会话
26            session =connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
27            //创建队列(返回一个消息目的地)
28            destination=session.createTopic("test.topic");
29            //创建消息订阅者
30            MessageConsumer consumer=session.createConsumer(destination);
31            //消息发布者添加监听器
32            consumer.setMessageListener(new MessageListener() {
33                public void onMessage(Message message) {
34                    try {
35                        System.out.println("订阅者01接收到的消息"+((TextMessage)message).getText());
36                    }catch (JMSException e){
37                        e.printStackTrace();
38                    }
39 
40                }
41            });
42        } catch (JMSException e) {
43            e.printStackTrace();
44        }
45    }
46 }
运行第一个消费者:

activeMq的发布/订阅式的主题方式demo

 创建并启动第二个消费者:

 1 package com.ssc.demo2;
 2 
 3 import org.apache.activemq.ActiveMQConnectionFactory;
 4 
 5 import javax.jms.*;
 6 
 7 public class Customer02 {
 8     private static final String BROKER_URL="tcp://127.0.0.1:61616";
 9     public static void main(String[] args){
10         //连接工厂
11         ConnectionFactory connectionFactory=null;
12         //连接实例
13         Connection connection=null;
14         //收发的线程信息
15         Session session=null;
16         //消息发送目标地址
17         Destination destination=null;
18         try{
19             //创建连接工厂
20             connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD, BROKER_URL);
21             //获取连接实例
22             connection=connectionFactory.createConnection();
23             //启动连接
24             connection.start();
25             //创建会话
26             session =connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
27             //创建队列(返回一个消息目的地)
28             destination=session.createTopic("test1.topic");
29             //创建消息订阅者
30             MessageConsumer consumer=session.createConsumer(destination);
31             //消息发布者添加监听器
32             consumer.setMessageListener(new MessageListener() {
33                 public void onMessage(Message message) {
34                     try {
35                         System.out.println("订阅者02接收到的消息"+((TextMessage)message).getText());
36                     }catch (JMSException e){
37                         e.printStackTrace();
38                     }
39 
40                 }
41             });
42         } catch (JMSException e) {
43             e.printStackTrace();
44         }
45     }
46 }

再次查看activemq控制台

activeMq的发布/订阅式的主题方式demo