ActiveMQ详解

ActiveMQ详解

Apache ActiveMQ介绍

ActiveMQ是一个开源的,实现了JMS1.1规范的面向消息(MOM)中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。ActiveMQ使用Apache提供的授权,任何人都可以对其实现代码进行修改。编写客户端支持的语言包括java,C/C++, .NET, Perl, PHP, Python, Ruby等。 ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。ActiveMQ实现了JMS标准并提供了很多附加的特性。这些附加的特性包括: 1. JMX管理(java Management Extensions,即java管理扩展) 2. 主从管理(master/salve,这是集群模式的一种,主要体现在可靠性方面,当主中介(代理)出现故障,那么从代理会替代主代理的位置,不至于使消息系统瘫痪) 3. 消息组通信(同一组的消息,仅会提交给一个客户进行处理) 4. 有序消息管理(确保消息能够按照发送的次序被接受者接收) 5. 消息优先级(优先级高的消息先被投递和处理) 6. 订阅消息的延迟接收(订阅消息在发布时,如果订阅者没有开启连接,那么当订阅者开启连接时,消息中介将会向其提交之前的,其未处理的消息) 7. 接收者处理过慢(可以使用动态负载平衡,将多数消息提交到处理快的接收者,这主要是对PTP消息所说) 8. 虚拟接收者(降低与中介的连接数目) 9. 成熟的消息持久化技术(部分消息需要持久化到数据库或文件系统中,当中介崩溃时,信息不会丢失) 10. 支持游标操作(可以处理大消息) 11. 支持消息的转换 12. 通过使用Apache的Camel可以支持EIP 13. 使用镜像队列的形式轻松的对消息队列进行监控等。

使用MQ的场景

像COM、CORBA、DCE和EJB等应用技术使用RPC(Remote Procedural Calls,即远程过程调用)属于紧耦合技术。使用RPC,一个应用程序调用另一个应用程序,调用者必须阻塞,直到被调用者执行结束返回结果信息为止。 ![](http://images2017.cnblogs.com/blog/575312/201709/575312-20170907101916257-125661489.png)

下图给出一种松耦合的方式,进行架构设计:应用程序1向消息中介(MOM)发送一条消息,很可能一段时间之后,应用程序2调用MOM来收取消息。任何一个应用程序都不知道对方是否存在也不需要阻塞等待。这种通信方式大大缩减了维护开销,因为对于一个应用程序的修改,会对其他应用程序影响极小。
ActiveMQ详解

ActiveMQ就是采用了上面提到的松耦合方式,因此,我们经常说应用程序发送消息仅仅是触发后即忘却。应用程序将消息发送给ActiveMQ而并不关心什么时间以何种方式消息投递给接收者。同样的消息接收者也不会关心消息来源于哪里和消息是怎样投递给ActiveMQ的。因此下图可以很好地表示出amq(broker,服务端,消息传递提供者)和客户端(client,消息收发者)之间的模型建立。
ActiveMQ详解

MQ的安装

Step1 运行ActiveMQ至少需要JavaSE 1.5的支持,在使用ActiveMQ之前,需要先下载和安装Java 运行环境。 Step2 下载ActiveMQ,可以从官方网站(http://activemq.apache.org/download.html.)上免费下载最新版本的ActiveMQ。 Step3 运行 ActiveMQ:./bin/activemq。通过这条命令就会启动ActiveMQ的代理,并启动了一些常用的连接器供客户端连接使用,主要包括TCP,SSL,STOMP和XMPP。当看到如下页面,说明mq已成功启动。 ![](http://images2017.cnblogs.com/blog/575312/201709/575312-20170907105821522-459846238.png)

Step4 http://localhost:8161/可进入网页控制台(监控、操作broker)。默认用户名和密码都为admin。
ActiveMQ详解

收发消息的简单实现

```java // 生产者 public class JMSProducer {

private static final String USERNAME = "admin";

private static final String PASSWORD = "123";

private static final String BROKEURL = "failover://tcp://localhost:61616";

private static final int SENDNUM = 10;

public static void main(String[] args) {

ConnectionFactory connectionFactory;

Connection connection = null;

Session session;

Destination destination;

MessageProducer messageProducer;

connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);

try {

connection = connectionFactory.createConnection();

connection.start();

session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

destination = session.createQueue("hyr_event");

messageProducer = session.createProducer(destination);

// messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//不用设置持久,默认的

// messageProducer.setTimeToLive(10000);

sendMessage(session, messageProducer);//发消息头带有可筛选属性的消息

session.commit();

session.close();

} catch (Exception e) {

e.printStackTrace();

}

}

public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {

int i = 0;

String type1 = "car";

String type2 = "bike";

String color1 = "white";

String color2 = "black";

String color3 = "red";

for (i = 0; i < SENDNUM; i++) {

TextMessage message = session.createTextMessage();

if(i < 5){

message.setStringProperty("TYPE", type1);

  // message.setStringProperty("TYPE", type2);

message.setStringProperty("COLOR", color1);

message.setStringProperty("COLOR", color2);

}

if(i >= 5){

message.setStringProperty("TYPE", type1);

message.setStringProperty("TYPE", type2);

message.setStringProperty("COLOR", color1);

message.setStringProperty("COLOR", color2);

}

message.setText("发送的第" + (i + 1) + "条消息:");

System.out.println("发送的第" + (i + 1) + "条消息");

//通过消息生产者发出消息

messageProducer.send(message);

 //  sleep(1000);
                }
        }

}


```java
// 消费者
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
  
public class JMSConsumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 
    private static final String BROKEURL = "failover://tcp://10.63.240.216:61616"; // 默认的连接地址
 
    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工厂 
        Connection connection = null; // 连接 
        Session session; // 会话 接受或者发送消息的线程 
        Destination destination; // 消息的目的地 
        MessageConsumer messageConsumer;// 消息消费者 
 
        // 实例化连接工厂 
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
 
        try {
            connection = connectionFactory.createConnection();// 通过工厂获取连接 
            connection.start();// 启动连接 
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 第一个参数为是否开启事务
            destination = session.createQueue("HELLO_ActiveMQ");// 创建消息队列
            String selector = "SYMBOL = 'A'";
            messageConsumer = session.createConsumer(destination,selector);// 创建消息消费者
       //   MessageConsumer  messageConsumer1 = session.createConsumer(destination,selector);// 创建消息消费者
          //  messageConsumer = session.createConsumer(destination);
 
            /*
             * 实际应用中,不会这么用,会注册一个监听
             */
            while (true) {
                TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
                if (textMessage != null) {
                    System.out.println("收到的消息:" + textMessage.getText());
                } else {
                 //   session.commit();
                   connection.close();
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

ActiveMQ内部实现

![](http://images2017.cnblogs.com/blog/575312/201709/575312-20170907135625022-788833581.png) 生产者进程向activeMQ所在进程发送消息和消费者消费消息的过程如上图所示,消息传递的路径经过了核心领域模型,具体步骤如下: 步骤1:生产者通过向activeMQ为它建立好的TransportConnection发送消息给activeMQ。 步骤2:TransportConnection对象找到RegionBroker。 步骤3:RegionBroker根据消息的类型找到对应的消息区域(Region)。 步骤4:该Region在它自己里面找到相应的消息目的地。 步骤5、6:该目的地首先根据需要进行持久化操作,并使用待发送消息指针对象。 步骤7:当有合适的消息消费者、订阅者来到时,目的地会找到这些消费者。 步骤8、9:通过该消费者对应的TransportConnection,消息从activeMQ中出来,发给相应的消费者进程。

queue和topic

在JMS中,topic实现的是发布订阅的语义。您发布消息时,它将发送给所有感兴趣的订阅者——因此,对于许多订阅者来说,0到许多订阅者将收到消息的副本。值得注意的是只有在代理接收到消息的时候拥有一个活跃订阅的订阅者将获得消息的副本。queue实现的是负载均衡的语义。一条消息只被一个消费者接收,如果在发送消息时没有可用的用户,则将该消息保留,直到可以接收该消息的用户可用为止。如果消费者收到一条消息,并且在关闭之前不承认它(not ack),那么消息将被重新发送到另一个消费者。队列可以让许多消费者在可用的消费者之间平衡消息负载。

消息持久化

JMS规范支持两种类型的消息传递:持久性和非持久性。持久性消息传递必须将持久性属性记录到稳定存储中,非持久性只是进行最大努力的传递信息,不用记录。ActiveMQ支持这两种消息传递,也可配置为支持消息恢复,介于两者之间的状态的消息被存在内存中。ActiveMQ很好的支持了消息的持久性(Persistence)。消息持久性对于可靠消息传递来说应该是一种比较好的方法,有了消息持久化,即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重新启动后仍然可以将消息发送出去,如果把这种持久化和 ReliableMessaging结合起来应该是很好的保证了消息的可靠传送。消息持久性的原理很简单,就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试。消息中心启动以后首先要检查制定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。 对于ActiveMQ,消息的持久化是很简单的,仅仅通过配置信息就可以实现。非持久性消息通常被用在发送通知和实时数据。当性能要求是第一位,确认消息交付在第二位时应该选用非持久性消息。

  • 配置文件
    ActiveMQ详解

  • 消息怎样存放在AMQ中
    了解消息存储有助于我们进行配置,并了解在发送持久性消息时broker发生了什么。queue和topic存储消息是不同的,因为有些存储的优化可以被用在topics上而对queues毫无意义。
    队列存储概念直截了当,消息先进先出,如下图:
    ActiveMQ详解

某时一条消息被调度到单个消费者。只有当消息被消费并答复时才会从代理消息仓库中删除。
对于一个主题的持久订阅者,每个消费者获取一条消息的拷贝。为了节省存储空间,只有消息的一个拷贝被代理存储。存储中的一个持久订阅者对象维护一个指向它的下一个存储消息的指针并分派它的一个拷贝到它的消费者,如下图:
ActiveMQ详解

消息仓库以这种方式实现因为每一个持有订阅者可能以不同的速度消费消息,或者同一时间它们可能不都在运行。同时,因为每个消息可能潜在的有多个消费者,一条消息不能被仓库删除在直到它被成功地传递到每个对它感兴趣的持久订阅者。

kahadb原理

KahaDB消息仓库(Amq默认的持久化方式)是所有提供消息存储实现中最快的。它的速度是由于组合了包含数据日志文件的快速的事务性日报,高度优化的消息ID索引,和内存内消息缓存。下图提供了一张上层KahaDB消息仓库图。

ActiveMQ详解

Data logs:作为消息日报,它包含了存储在一定长度的数据文件的一个消息轮环日志和命令(例如事务性范围和消息删除)。如果当前使用的文件已达到最大长度,会创建一个新的数据文件。在数据文件中的所有消息被参考计算,所以一旦那个数据文件中的每个消息不再被需要,消息文件能被删除或存档。在数据日志中,消息只被附加到当前数据文件的尾部,所以存储很快。
cache:如果消息有活动的消费者,则缓存(cache)临时保存消息。如果有活动的消费者,当消息被安排存储的时候同时被分派出去了,如果消息及时回应,那它们不必存到磁盘中。
BTree 索引:在由消息ID索引的数据日志中BTree 索引保存这些数据的说明。这些索引为队列维护先进先出数据结构,为主题消息维护持续订阅者指针。
Redo日志:仅用作当amq没有正常关闭时保证btree索引的完整性。

了解了每一块的含义,现在讲一下每一块到底可以做什么:

checkPoint:在内存(cache)中的那部分B-Tree是Metadata Cache,磁盘上的叫Metadata Store,它对应于文件db.data。显而易见的,通过将索引缓存到内存中,可以加快查询的速度,这个同步过程就称为:check point。
消息的恢复和B-tree重建:有了B-Tree,Broker(消息服务器)可以快速地重启恢复,因为它是消息的索引,根据它就能恢复出每条消息的location。如果Metadata Store被损坏,则只能扫描整个Data Logs来重建B树了,这个过程是很复杂且缓慢的。 
消息载体:Data Logs以日志形式存储消息,它是生产者生产的数据的真正载体,对应于文件 db-*.log,默认是32MB。

kahadb在磁盘上的目录结构和上层设计图是保持一致的:
ActiveMQ详解

db-编号 :数据日志文件产生,达到预定大小编号自动加1;销毁:当没有任何引用指向该数据日志文件中的消息,那么该日志文件被删除或归档
 归档目录:在启用归档功能时才产生(默认关闭),如果关闭消息不再使用时日志文件会被删除
db.data :保存 数据日志文件中消息的持久化btree索引
db.redo: 重做日志,用于kahaDb从一次非正常关闭后重启时Btree索引的恢复

最关键的6个配置

1. indexWriteBatchSize 默认值1000,当Metadata Cache中更新的索引到达了1000时,才同步到磁盘上的Metadata Store中。不是每次更新都写磁盘,而是批量更新写磁盘,比较写磁盘的代价是很大的。 2. indexCacheSize 默认值10000,(number of index pages cached in memory),在内存中最多分配多个页面来缓存index。缓存的index越多,命中的概率就越大,检索的效率就越高。 3. journalMaxFileLength 默认值32MB,当存储的消息达到32MB时,新建一个新文件来保存消息。这个配置对生产者或消息者的速率有影响。比如,生产者速率很快而消费者速率很慢时,将它配置得大一点比较好。 4. enableJournalDiskSyncs 默认值true,默认采用同步写磁盘,即消息先存储到磁盘中再向Producer返回ACK 5. cleanupInterval 默认值30000ms,当消息被消息者成功消费之后,Broker就可以将消息删除了。 6.checkpointInterval 默认值5s,每隔5s将内存中的Index(Metadata Cache)更新到磁盘的Index文件中(Metadata Store)

总结

MQ使用中,还是碰到过很多问题。如,MQ使用中意外重启或崩掉,MQ发送方可以重连,MQ接收方必须重启进程才可以重连。解决方案:新建一个断链重连Listen接口,需要重连的接收方注册监听器(实现重新注册MQ),服务端启动一个Monitor线程(守护)轮询MQ服务器,发现断链,调用所有监听器。