QX项目实战-8.ActiveMQ的Queue讯息和Topic消息

QX项目实战-8.ActiveMQ的Queue消息和Topic消息

       项目使用ActiveMQ传递消息,保证大量数据信息的同步任务。对于这种基础架构层面的东西必须进行测试和熟悉。前几天已经下载安装和测试了ActiveMQ服务器[1],对该服务有了直观认识,下面我们深入了解下ActiveMQ到底能完成什么样的功能。

       首先测试Queue消息的生产者消费者问题,在cmd下进入ActiveMQ的example目录,运行ant consumer命令,编译执行消费者程序,消费者运行起来后,在等待传递2000条消息。在另外的cmd下也进入example目录,执行antproducer指令,编译执行生产者程序,生产者程序执行发送消息的命令,给ActiveMQ服务器发送了2000条消息。这时消息由消费者接收到。随后消费者和生产者先后完成、关闭程序。需要注意的是执行ant编译指令,必须安装apache-ant自动构建程序,参考[2-4]。

       Queue消息是点对点模式消息传播机制,一条消息仅能被一个消费者收到,如果未处理,将会一直保存着。多个消费者的情况下,消费者会自动实现负载均衡。

再来测试Topic消息,Topic消息是发布者/订阅者模式,这种模式下,发布者的消息发布到服务器,订阅者接收到该消息的一份拷贝。发布者/订阅者模式有非持久订阅和持久订阅两种。本例中首先运行订阅者,在example目录下执行anttopic-listener命令执行topic消息消费者,提示接受消息。再在example下执行anttopic-publisher命令执行Topic消息生产者。生产的消息十轮分发给订阅者。

       下面分析Queue消息的生产者ProducerTool.java代码:

       ProducerTool代码实现了Thread线程类,其中的Run方法定义如下:

// Create the connection.
           ActiveMQConnectionFactory connectionFactory = newActiveMQConnectionFactory(user, password, url);
            connection =connectionFactory.createConnection();
           connection.start();
 
            // Create the session
            Session session =connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
            if (topic) {
                destination =session.createTopic(subject);
            } else {
                destination =session.createQueue(subject);
            }
 
            // Create theproducer.
            MessageProducerproducer = session.createProducer(destination);
            if (persistent) {
               producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            } else {
               producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            }
            if (timeToLive !=0) {
               producer.setTimeToLive(timeToLive);
            }
 
            // Start sendingmessages
            sendLoop(session,producer);

       消费者Consumer.java代码实现了消费者类,这个类继承自Threads类,实现了MessageListener, ExceptionListener接口。实现的功能是接受ActiveMQ传来的消息并处理,其run代码为:

ActiveMQConnectionFactory connectionFactory= new ActiveMQConnectionFactory(user, password, url);
            Connectionconnection = connectionFactory.createConnection();
            if (durable&& clientId != null && clientId.length() > 0 &&!"null".equals(clientId)) {
               connection.setClientID(clientId);
            }
           connection.setExceptionListener(this);
            connection.start();
 
            session =connection.createSession(transacted, ackMode);
            if (topic) {
                destination =session.createTopic(subject);
            } else {
                destination =session.createQueue(subject);
            }
 
            replyProducer =session.createProducer(null);
           replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
            MessageConsumerconsumer = null;
            if (durable&& topic) {
                consumer =session.createDurableSubscriber((Topic) destination, consumerName);
            } else {
                consumer =session.createConsumer(destination);
            }
 
            if (maxiumMessages> 0) {
               consumeMessagesAndClose(connection, session, consumer);
            } else {
                if(receiveTimeOut == 0) {
                   consumer.setMessageListener(this);
                } else {
                   consumeMessagesAndClose(connection, session, consumer, receiveTimeOut);
                }
            }

参考

1.      QX项目实战-7.ActiveMQ的安装与测试

2.      Ant系统构建工具的使用

3.      使用Subversionant结合Subversion进行项目构建

4.      Eclipse下用Ant运行JUnit