QX项目实战-8.ActiveMQ的Queue讯息和Topic消息
项目使用ActiveMQ传递消息,保证大量数据信息的同步任务。对于这种基础架构层面的东西必须进行测试和熟悉。前几天已经下载安装和测试了ActiveMQ服务器[1],对该服务有了直观认识,下面我们深入了解下ActiveMQ到底能完成什么样的功能。
首先测试Queue消息的生产者消费者问题,在cmd下进入ActiveMQ的example目录,运行ant consumer命令,编译执行消费者程序,消费者运行起来后,在等待传递2000条消息。在另外的cmd下也进入example目录,执行antproducer指令,编译执行生产者程序,生产者程序执行发送消息的命令,给ActiveMQ服务器发送了2000条消息。这时消息由消费者接收到。随后消费者和生产者先后完成、关闭程序。需要注意的是执行ant编译指令,必须安装apache-ant自动构建程序,参考[2-4]。
Queue消息是点对点模式消息传播机制,一条消息仅能被一个消费者收到,如果未处理,将会一直保存着。多个消费者的情况下,消费者会自动实现负载均衡。
再来测试Topic消息,Topic消息是发布者/订阅者模式,这种模式下,发布者的消息发布到服务器,订阅者接收到该消息的一份拷贝。发布者/订阅者模式有非持久订阅和持久订阅两种。本例中首先运行订阅者,在example目录下执行anttopic-listener命令执行topic消息消费者,提示接受消息。再在example下执行anttopic-publisher命令执行Topic消息生产者。生产的消息十轮分发给订阅者。
下面分析Queue消息的生产者ProducerTool.java代码:
ProducerTool代码实现了Thread线程类,其中的Run方法定义如下:
// Create the connection. ActiveMQConnectionFactory connectionFactory = newActiveMQConnectionFactory(user, password, url); connection =connectionFactory.createConnection(); connection.start(); // Create the session Session session =connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); if (topic) { destination =session.createTopic(subject); } else { destination =session.createQueue(subject); } // Create theproducer. MessageProducerproducer = session.createProducer(destination); if (persistent) { producer.setDeliveryMode(DeliveryMode.PERSISTENT); } else { producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } if (timeToLive !=0) { producer.setTimeToLive(timeToLive); } // Start sendingmessages sendLoop(session,producer);
消费者Consumer.java代码实现了消费者类,这个类继承自Threads类,实现了MessageListener, ExceptionListener接口。实现的功能是接受ActiveMQ传来的消息并处理,其run代码为:
ActiveMQConnectionFactory connectionFactory= new ActiveMQConnectionFactory(user, password, url); Connectionconnection = connectionFactory.createConnection(); if (durable&& clientId != null && clientId.length() > 0 &&!"null".equals(clientId)) { connection.setClientID(clientId); } connection.setExceptionListener(this); connection.start(); session =connection.createSession(transacted, ackMode); if (topic) { destination =session.createTopic(subject); } else { destination =session.createQueue(subject); } replyProducer =session.createProducer(null); replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); MessageConsumerconsumer = null; if (durable&& topic) { consumer =session.createDurableSubscriber((Topic) destination, consumerName); } else { consumer =session.createConsumer(destination); } if (maxiumMessages> 0) { consumeMessagesAndClose(connection, session, consumer); } else { if(receiveTimeOut == 0) { consumer.setMessageListener(this); } else { consumeMessagesAndClose(connection, session, consumer, receiveTimeOut); } }
参考
1. QX项目实战-7.ActiveMQ的安装与测试
2. Ant系统构建工具的使用
3. 使用Subversionant结合Subversion进行项目构建
4. Eclipse下用Ant运行JUnit