感想之言

感触之言

1.主题:消息中间件 activeMQ的源码分析 之 开篇
以前对JMS尤其是activeMQ不了解,一看到什么地方需要使用消息中间件,就比较反感。主要原因是感觉JMS的实现都比较复杂,怕在真实使用过程中出现什么问题时会比较被动。所以,我们基本上是自己写类似的消息中间件,当然功能非常简单。但其实我们自己写出来的中间件,随着功能的不断增加、人员和时间的种种问题,导致最终我们自己做出来的所谓消息中间件越来越不能维护。在吸取了一次一次这种重复发明"*"的事情中,我们觉得也许一开始就采用成熟开源的产品也许是条更好的方式。

2. 以前对JMS尤其是activeMQ不了解,一看到什么地方需要使用消息中间件,就比较反感。主要原因是感觉JMS的实现都比较复杂,怕在真实使用过程中出现什么问题时会比较被动。所以,我们基本上是自己写类似的消息中间件,当然功能非常简单。但其实我们自己写出来的中间件,随着功能的不断增加、人员和时间的种种问题,导致最终我们自己做出来的所谓消息中间件越来越不能维护。在吸取了一次一次这种重复发明"*"的事情中,我们觉得也许一开始就采用成熟开源的产品也许是条更好的方式。
    感觉到现在或将来我们对JMS的使用会更加深入,为了适应这种的需要,作为软件研发人员,需要对在我们工作中占有重要地位的开源产品有源代码级别的熟悉,尤其对我们这些英语不太好的,因为目前用的多的开源产品基本上是以英语作为基础的,那么我们想要提交一个bug或讨论一个什么用法的时候,就比较被动,而且眼巴巴的等着其他人来解决,自己一点都
使不上劲的感觉真的很不舒服。
    我们选择activeMQ来分析它的核心架构、源代码,就是希望能尽量少的发生上面的情况,尤其在我们分析activeMQ的过程中,发现其源代码中确实存在不少小问题。消息中间件在许多项目或产品中占有非常重要的地位,虽然我们目前还不是activeMQ的代码提交人员,但希望通过对activeMQ的源码分析这种方式,同样为使用activeMQ的同行们提供一点帮助,也算是间接为开源做点事情。
    在后面的一系列文章中,我们将主要从如下几个方面来分析:   
   一、activeMQ的核心线程的功能和生命周期
   二、消息存储的kaha实现的分析
   三、消息队列(Queue)实现的分析

   四、activeMQ的领域模型
   五、activeMQ中TCP通讯机制
   六、activeMQ的Cluster
   七、activeMQ中内存使用和管理
    作为开篇,首先我们非常尊重activeMQ的所有committer,它是个不错的软件作品,我们的分析是基于5.1版本的代码,就象任何事情一样,尤其是软件产品它的成熟是需要较长时间的过程,我们也会把分析中发现的5.1版本的bug于大家分享,下面我就以一个小bug作为整个activeMQ分析的开篇。
    为了表述的方便,我们把这个bug叫做bug_1,为了讲清楚该bug,首先我会把相关的背景做一个介绍:
    消息指针(Message cursor)是activeMQ里一个非常重要的核心类,它是提供某种优化消息存储的方法。消息中间件的实现一般都是当消息消费者准备好消费消息的时候,它会从持久化
存储中一批一批的读取消息,并发送给消费者。消息指针维护着下一批待读取消息的相关位置信息。
    消息指针在对不同的消息消费者时,它的内部处理机制也不一样:
    1.当消费者跟得上消息生产者的时候,是快消费者。那这种时候Message cursor的内部过程如下图所示:

    2.当消费者慢于消息生产者的时候,是慢消费者。那这种时候Message cursor的内部过程如下图所示: 

    上面两种情况是能自动调整的,当一个消费者从快变成慢或从慢变成快的时候,Message cursor应该做自动的调整,在5.1里面这种自动调整有点小bug,它只能从快变成慢,反之则不行。具体bug原因应该是疏忽写错了,代码在类AbstractStoreCursor中的public final synchronized void remove()方法中的if (size==0 && isStarted() && cacheEnabled)这一行,只用把cacheEnabled改为useCache就可以了。(该bug已经被后续版本所修复) 

 

3.apache-activemq-5.4.2-bin.zip

 

 

4.

3)        远程过程调用中间件(RPC

4)        消息中间件(MOM

 

5.消息中间件可以即支持同步方式,又支持异步方式。异步中间件比同步中间件具有更强的容错性,在系统故障时可以保证消息的正常传输。异步中间件技术又分为两类:广播方式和发布/订阅方式。由于发布/订阅方式可以指定哪种类型的用户可以接受哪种类型的消息,更加有针对性,事实上已成为异步中间件的非正式标准。

 

6.JMS是sun公司的消息中间件,RMI是sun公司的对象中间件。

7.ActiveMQ入门:

1、环境:
Windows XP
apache-activemq-5.2.0-bin.zip
 
2、安装
解压缩到apache-activemq-5.2.0-bin.zip到一个目录,比如C:\apache-activemq-5.2.0
 
3、配置
配置就在C:\apache-activemq-5.2.0\conf目录下三个文件
activemq.xml
credentials.properties
log4j.properties
 
4、启动ActiveMQ
运行C:\apache-activemq-5.2.0\bin\activemq.bat
5、测试
ActiveMQ默认使用的TCP连接端口是61616, 通过查看该端口的信息可以测试ActiveMQ是否成功启动 netstat -an|find "61616"

C:\Documents and Settings\Administrator>netstat -an|find "61616"
    TCP        0.0.0.0:61616                    0.0.0.0:0                            LISTENING
6、监控
ActiveMQ5.0版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于监控ActiveMQ的admin应用。
admin:http://127.0.0.1:8161/admin/
demo:http://127.0.0.1:8161/demo/

下面是ActiveMQ5.2的一个最简单例子!
环境还是apache-activemq-5.2.0-bin.zip,需要注意的是,开发时候,要将apache-activemq- 5.2.0-bin.zip解压缩后里面的activemq-all-5.2.0.jar包加入到classpath下面,这个包包含了所有jms接口 api的实现。

Java代码 感想之言
  1. import org.apache.activemq.ActiveMQConnection;   
  2. import org.apache.activemq.ActiveMQConnectionFactory;   
  3.   
  4. import javax.jms.*;   
  5.   
  6. /**  
  7. * 消息的生产者(发送者)  
  8. *  
  9. */  
  10. public class JmsSender {   
  11.         public static void main(String[] args) throws JMSException {   
  12.                 // ConnectionFactory :连接工厂,JMS 用它创建连接   
  13.                 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(   
  14.                                 ActiveMQConnection.DEFAULT_USER,   
  15.                                 ActiveMQConnection.DEFAULT_PASSWORD,   
  16.                                 "tcp://192.168.14.117:61616");   
  17.                 //JMS 客户端到JMS Provider 的连接   
  18.                 Connection connection = connectionFactory.createConnection();   
  19.                 connection.start();   
  20.                 // Session: 一个发送或接收消息的线程   
  21.                 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);   
  22.                 // Destination :消息的目的地;消息发送给谁.   
  23.                 // 获取session注意参数值my-queue是Query的名字   
  24.                 Destination destination = session.createQueue("my-queue");   
  25.                 // MessageProducer:消息生产者   
  26.                 MessageProducer producer = session.createProducer(destination);   
  27.                 //设置不持久化   
  28.                 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);   
  29.                 //发送一条消息   
  30.                 sendMsg(session, producer);   
  31.                 session.commit();   
  32.                 connection.close();   
  33.         }   
  34.   
  35.         /**  
  36.          * 在指定的会话上,通过指定的消息生产者发出一条消息  
  37.          *  
  38.          * @param session    消息会话  
  39.          * @param producer 消息生产者  
  40.          */  
  41.         public static void sendMsg(Session session, MessageProducer producer) throws JMSException {   
  42.                 //创建一条文本消息   
  43.                 TextMessage message = session.createTextMessage("Hello ActiveMQ!");   
  44.                 //通过消息生产者发出消息   
  45.                 producer.send(message);   
  46.                 System.out.println("");   
  47.         }   
  48. }  
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 消息的生产者(发送者) * */ public class JmsSender { public static void main(String[] args) throws JMSException { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://192.168.14.117:61616"); //JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一个发送或接收消息的线程 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息发送给谁. // 获取session注意参数值my-queue是Query的名字 Destination destination = session.createQueue("my-queue"); // MessageProducer:消息生产者 MessageProducer producer = session.createProducer(destination); //设置不持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //发送一条消息 sendMsg(session, producer); session.commit(); connection.close(); } /** * 在指定的会话上,通过指定的消息生产者发出一条消息 * * @param session 消息会话 * @param producer 消息生产者 */ public static void sendMsg(Session session, MessageProducer producer) throws JMSException { //创建一条文本消息 TextMessage message = session.createTextMessage("Hello ActiveMQ!"); //通过消息生产者发出消息 producer.send(message); System.out.println(""); } }

 
Java代码 感想之言
  1. import org.apache.activemq.ActiveMQConnection;   
  2. import org.apache.activemq.ActiveMQConnectionFactory;   
  3.   
  4. import javax.jms.*;   
  5.   
  6. /**  
  7. * 消息的消费者(接受者)  
  8. *  
  9. */  
  10. public class JmsReceiver {   
  11.         public static void main(String[] args) throws JMSException {   
  12.                 // ConnectionFactory :连接工厂,JMS 用它创建连接   
  13.                 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(   
  14.                                 ActiveMQConnection.DEFAULT_USER,   
  15.                                 ActiveMQConnection.DEFAULT_PASSWORD,   
  16.                                 "tcp://192.168.14.117:61616");   
  17.                 //JMS 客户端到JMS Provider 的连接   
  18.                 Connection connection = connectionFactory.createConnection();   
  19.                 connection.start();   
  20.                 // Session: 一个发送或接收消息的线程   
  21.                 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);   
  22.                 // Destination :消息的目的地;消息发送给谁.   
  23.                 // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置   
  24.                 Destination destination = session.createQueue("my-queue");   
  25.                 // 消费者,消息接收者   
  26.                 MessageConsumer consumer = session.createConsumer(destination);   
  27.                 while (true) {   
  28.                         TextMessage message = (TextMessage) consumer.receive(1000);   
  29.                         if (null != message)   
  30.                                 System.out.println("收到消息:" + message.getText());   
  31.                         else  
  32.                                 break;   
  33.                 }   
  34.                 session.close();   
  35.                 connection.close();   
  36.         }   
  37. }  
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 消息的消费者(接受者) * */ public class JmsReceiver { public static void main(String[] args) throws JMSException { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://192.168.14.117:61616"); //JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一个发送或接收消息的线程 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息发送给谁. // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 Destination destination = session.createQueue("my-queue"); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); while (true) { TextMessage message = (TextMessage) consumer.receive(1000); if (null != message) System.out.println("收到消息:" + message.getText()); else break; } session.close(); connection.close(); } }

 
启动ActiveMQ,然后开始执行:
先运行发送者,连续运行了三次,最后一次控制台输出:


Process finished with exit code 0
 
后运行接受者,输出结果:
收到消息Hello ActiveMQ!
收到消息Hello ActiveMQ!
收到消息Hello ActiveMQ!

Process finished with exit code 0
 
注意:
其中的端口61616是ActiveMQ默认的配置,在activemq.xml中,

Xml代码 感想之言
  1. <!-- The transport connectors ActiveMQ will listen to -->  
  2.              <transportConnectors>  
  3.                      <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>  
  4.                      <transportConnector name="ssl" uri="ssl://localhost:61617"/>  
  5.                      <transportConnector name="stomp" uri="stomp://localhost:61613"/>  
  6.                      <transportConnector name="xmpp" uri="xmpp://localhost:61222"/>  
  7.              </transportConnectors>   
<!-- The transport connectors ActiveMQ will listen to --> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/> <transportConnector name="ssl" uri="ssl://localhost:61617"/> <transportConnector name="stomp" uri="stomp://localhost:61613"/> <transportConnector name="xmpp" uri="xmpp://localhost:61222"/> </transportConnectors>

 
,建议不要改动,都用这个端口多好,就像ftp都用21端口,也没错。
 
 
这是官方的HelloWorld例子,不过看着不顺眼:
http://activemq.apache.org/hello-world.html

8.建议单步调试跟踪源码