ActiveMQ 简单介绍以及安装 ActiveMQ简单的HelloWorld实例

一直知道这个jms,却没有机会用到。今天玩玩!

为什么要学习,使用JMS

   在JAVA中,如果两个应用程序之间对各自都不了解,甚至这两个程序可能部署在不同的大洲上,那么它们之间如何发送消息呢?举个例子,一个应用程序A部署在印度,另一个应用程序部署在美国,然后每当A触发某件事后,B想从A获取一些更新信息。当然,也有可能不止一个B对A的更新信息感兴趣,可能会有N个类似B的应用程序想从A中获取更新的信息。在这种情况下,JAVA提供了最佳的解决方案-JMS,完美解决了上面讨论的问题。JMS同样适用于基于事件的应用程序,如聊天服务,它需要一种发布事件机制向所有与服务器连接的客户端发送消息。JMS与RMI不同,发送消息的时候,接收者不需要在线。服务器发送了消息,然后就不管了;等到客户端上线的时候,能保证接收到服务器发送的消息。这是一个很强大的解决方案,能处理当今世界很多普遍问题。

JMS有什么优势

1、异步:JMS天生就是异步的,客户端获取消息的时候,不需要主动发送请求,消息会自动发送给可用的客户端。

2、可靠:JMS保证消息只会递送一次。大家都遇到过重复创建消息问题,而JMS能帮你避免该问题,只是避免而不是杜绝,所以在一些糟糕的环境下还是有可能会出现重复。

JMS消息传送模型

在JMS API出现之前,大部分产品使用“点对点”和“发布/订阅”中的任一方式来进行消息通讯。JMS定义了这两种消息发送模型的规范,它们相互独立。任何JMS的提供者可以实现其中的一种或两种模型,这是它们自己的选择。JMS规范提供了通用接口保证我们基于JMS API编写的程序适用于任何一种模型。让我们更加详细的看下这两种消息传送模型;

下载ActiveMQ,动手玩玩

官方网站:http://activemq.apache.org/ 

运行ActiveMQ服务

ActiveMQ 简单介绍以及安装
ActiveMQ简单的HelloWorld实例

   从它的目录来说,还是很简单的: 

    • bin存放的是脚本文件
    • conf存放的是基本配置文件
    • data存放的是日志文件
    • docs存放的是说明文档
    • examples存放的是简单的实例
    • lib存放的是activemq所需jar包
    • webapps用于存放项目的目录

 启动ActiveMQ 
  我们了解activemq的基本目录,下面我们运行一下activemq服务,双击bin目录下的activemq.bat脚本文件或运行自己电脑版本下的activemq.bat。 注:
ActiveMQ默认使用的TCP连接端口是61616, 通过查看该端口的信息可以测试ActiveMQ是否成功启动 netstat -an|find “61616”

输入:http://127.0.0.1:8161/admin/queues.jsp  (用户名和密码都是admin)

ActiveMQ 简单介绍以及安装
ActiveMQ简单的HelloWorld实例

   至此,服务端启动完毕。停止服务器,只需要按着Ctrl+Shift+C,之后输入y即可。

JMS编程模型

(1) ConnectionFactory

  创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。

(2) Destination

  Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。

所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。

(3) Connection

  Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

(4) Session

  Session是我们操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当我们需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。

(5) 消息的生产者

  消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

(6) 消息消费者

  消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

(7) MessageListener

  消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。

1点对点模式 - 编写生产者

 1 package com.xiexy.project.test.activemq;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.Destination;
 6 import javax.jms.JMSException;
 7 import javax.jms.MessageProducer;
 8 import javax.jms.Session;
 9 import javax.jms.TextMessage;
10 
11 import org.apache.activemq.ActiveMQConnection;
12 import org.apache.activemq.ActiveMQConnectionFactory;
13 
14 
15 /**
16  * 消息的生产者(发送者) 
17  * 点对点模式
18  * 一条消息只能被一个人接收
19  * 发布者发布消息时,订阅者不需要在线(异步)
20  * 后面才上线,也能收到消息
21  *
22  */
23 public class JMSProducer {
24 
25     //默认连接用户名
26     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
27     //默认连接密码
28     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
29     //默认连接地址
30     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
31     //发送的消息数量
32     private static final int SENDNUM = 10;
33 
34     public static void main(String[] args) {
35         //连接工厂
36         ConnectionFactory connectionFactory;
37         //连接
38         Connection connection = null;
39         //会话 接受或者发送消息的线程
40         Session session;
41         //消息的目的地
42         Destination destination;
43         //消息生产者
44         MessageProducer messageProducer;
45         //实例化连接工厂
46         connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
47 
48         try {
49             //通过连接工厂获取连接
50             connection = connectionFactory.createConnection();
51             //启动连接
52             connection.start();
53             //创建session
54             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
55             //创建一个名称为HelloWorld的消息队列
56             destination = session.createQueue("HelloWorld@2");
57             //创建消息生产者
58             messageProducer = session.createProducer(destination);
59             //发送消息
60             sendMessage(session, messageProducer);
61 
62             session.commit();
63 
64         } catch (Exception e) {
65             e.printStackTrace();
66         }finally{
67             if(connection != null){
68                 try {
69                     connection.close();
70                 } catch (JMSException e) {
71                     e.printStackTrace();
72                 }
73             }
74         }
75 
76     }
77     /**
78      * 发送消息
79      * @param session
80      * @param messageProducer  消息生产者
81      * @throws Exception
82      */
83     public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
84         for (int i = 0; i < JMSProducer.SENDNUM; i++) {
85             //创建一条文本消息 
86             TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i);
87             System.out.println("发送消息:Activemq 发送消息" + i);
88             //通过消息生产者发出消息 
89             messageProducer.send(message);
90         }
91 
92     }
93 }

1点对点模式 - 编写消费者

 1 package com.xiexy.project.test.activemq;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.Destination;
 6 import javax.jms.JMSException;
 7 import javax.jms.MessageConsumer;
 8 import javax.jms.Session;
 9 import javax.jms.TextMessage;
10 
11 import org.apache.activemq.ActiveMQConnection;
12 import org.apache.activemq.ActiveMQConnectionFactory;
13 
14 
15 /**
16  * 消息的消费者(接受者)
17  * 点对点模式
18  * 一条消息只能被一个人接收
19  * 发布者发布消息时,订阅者不需要在线(异步)
20  * 后面才上线,也能收到消息
21  *
22  */
23 public class JMSConsumer {
24 
25     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
26     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
27     //private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
28     private static final String BROKEURL = "tcp://localhost:61616";
29 
30     public static void main(String[] args) {
31         ConnectionFactory connectionFactory;//连接工厂
32         Connection connection = null;//连接
33 
34         Session session;//会话 接受或者发送消息的线程
35         Destination destination;//消息的目的地
36 
37         MessageConsumer messageConsumer;//消息的消费者
38     
39         //实例化连接工厂
40         connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
41 
42         try {
43             //通过连接工厂获取连接
44             connection = connectionFactory.createConnection();
45             //启动连接
46             connection.start();
47             //创建session
48             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
49             //创建一个连接HelloWorld的消息队列
50             destination = session.createQueue("HelloWorld@2");
51             //创建消息消费者
52             messageConsumer = session.createConsumer(destination);
53 
54             while (true) {
55                 //receive(连接队列时间)
56                 TextMessage textMessage = (TextMessage) messageConsumer.receive(10000);
57                 if(textMessage != null){
58                     System.out.println("收到的消息:" + textMessage.getText());
59                 }else {
60                     System.out.println("暂时没有消息~");
61                     break;
62                 }
63             }
64 
65         } catch (JMSException e) {
66             e.printStackTrace();
67         }finally{
68             try {
69                 if(connection != null) {
70                     connection.close();
71                 }
72             } catch (JMSException e) {
73                 e.printStackTrace();
74             }
75         }
76 
77     }
78 }

2订阅模式 - 编写生产者

 1 package com.xiexy.project.test.activemq;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.DeliveryMode;
 6 import javax.jms.Destination;
 7 import javax.jms.JMSException;
 8 import javax.jms.MessageProducer;
 9 import javax.jms.Session;
10 import javax.jms.TextMessage;
11 
12 import org.apache.activemq.ActiveMQConnection;
13 import org.apache.activemq.ActiveMQConnectionFactory;
14 
15 
16 /**
17  * 消息的生产者(发送者) 
18  * 订阅模式
19  * 发布者发布消息时,订阅者必须在线才能收到消息(同步)
20  * 后面才上线,也收不到消息
21  */
22 public class JMSTopicProducer {
23 
24     //默认连接用户名
25     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
26     //默认连接密码
27     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
28     //默认连接地址
29     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
30     //发送的消息数量
31     private static final int SENDNUM = 10;
32 
33     public static void main(String[] args) {
34         //连接工厂
35         ConnectionFactory connectionFactory;
36         //连接
37         Connection connection = null;
38         //会话 接受或者发送消息的线程
39         Session session;
40         //消息的目的地
41         Destination destination;
42         //消息生产者
43         MessageProducer messageProducer;
44         //实例化连接工厂
45         System.out.println("MQ-user:  " + JMSTopicProducer.USERNAME);
46         System.out.println("MQ-pwds:  " + JMSTopicProducer.PASSWORD);
47         System.out.println("MQ-burl:  " + JMSTopicProducer.BROKEURL);
48         connectionFactory = new ActiveMQConnectionFactory(JMSTopicProducer.USERNAME, JMSTopicProducer.PASSWORD, JMSTopicProducer.BROKEURL);
49 
50         try {
51             //通过连接工厂获取连接
52             connection = connectionFactory.createConnection();
53             //启动连接
54             connection.start();
55             //创建session
56             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
57             //创建一个名称为HelloTopic的主题订阅
58             destination = session.createTopic("HelloTopic");
59             //创建消息生产者
60             messageProducer = session.createProducer(destination);
61             
62             System.out.println("DeliveryMode.NON_PERSISTENT: " + DeliveryMode.NON_PERSISTENT);
63             //发送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
64             messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
65             
66             //发送消息
67             sendMessage(session, messageProducer);
68             session.commit();
69 
70         } catch (Exception e) {
71             e.printStackTrace();
72         }finally{
73             if(connection != null){
74                 try {
75                     connection.close();
76                 } catch (JMSException e) {
77                     e.printStackTrace();
78                 }
79             }
80         }
81 
82     }
83     /**
84      * 发送消息
85      * @param session
86      * @param messageProducer  消息生产者
87      * @throws Exception
88      */
89     public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
90         for (int i = 0; i < JMSTopicProducer.SENDNUM; i++) {
91             //创建一条文本消息 
92             TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i);
93             System.out.println("发送消息:Activemq 发送消息" + i);
94             //通过消息生产者发出消息 
95             messageProducer.send(message);
96         }
97 
98     }
99 }

2订阅模式 - 编写消费者

 1 package com.xiexy.project.test.activemq;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.Destination;
 6 import javax.jms.JMSException;
 7 import javax.jms.MessageConsumer;
 8 import javax.jms.Session;
 9 import javax.jms.TextMessage;
10 
11 import org.apache.activemq.ActiveMQConnection;
12 import org.apache.activemq.ActiveMQConnectionFactory;
13 
14 
15 /**
16  * 消息的消费者(接受者)
17  * 订阅模式
18  * 发布者发布消息时,订阅者必须在线才能收到消息(同步)
19  * 后面才上线,也收不到消息
20  */
21 public class JMSTopicConsumer {
22 
23     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
24     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
25     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
26     //private static final String BROKEURL = "tcp://localhost:61616";
27 
28     public static void main(String[] args) {
29         ConnectionFactory connectionFactory;//连接工厂
30         Connection connection = null;//连接
31 
32         Session session;//会话 接受或者发送消息的线程
33         Destination destination;//消息的目的地
34 
35         MessageConsumer messageConsumer;//消息的消费者
36         
37         System.out.println("MQ-user   " + JMSTopicConsumer.USERNAME);
38         System.out.println("MQ-pwds   " + JMSTopicConsumer.PASSWORD);
39         System.out.println("MQ-burl   " + JMSTopicConsumer.BROKEURL);
40         //实例化连接工厂
41         connectionFactory = new ActiveMQConnectionFactory(JMSTopicConsumer.USERNAME, JMSTopicConsumer.PASSWORD, JMSTopicConsumer.BROKEURL);
42 
43         try {
44             //通过连接工厂获取连接
45             connection = connectionFactory.createConnection();
46             //启动连接
47             connection.start();
48             //创建session
49             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
50             //创建一个名称为HelloTopic的主题订阅
51             destination = session.createTopic("HelloTopic");
52             //创建消息消费者
53             messageConsumer = session.createConsumer(destination);
54 
55             while (true) {
56                 TextMessage textMessage = (TextMessage) messageConsumer.receive();
57                 if(textMessage != null){
58                     System.out.println("收到的消息:" + textMessage.getText());
59                 }else {
60                     System.out.println("暂时没有消息~");
61                     break;
62                 }
63             }
64 
65         } catch (JMSException e) {
66             e.printStackTrace();
67         }finally{
68             try {
69                 if(connection != null) {
70                     connection.close();
71                 }
72             } catch (JMSException e) {
73                 e.printStackTrace();
74             }
75         }
76 
77     }
78 }