开源讯息中间件ActiveMQ回顾(一):客户端实现与JMS规范
开源消息中间件ActiveMQ回顾(一):客户端实现与JMS规范
前一段时间工作中经常使用到Apache ActiveMQ用作消息传输。今天在公司不是很忙,于是又深入研究了一下,总结一下分享出来。
首先是基于ActiveMQ的Java客户端实现例子。
接口定义:
public interface MQService { public void start(); public void sendQueueMessage(String text) throws JMSException; public void publishTopicMessage(String text) throws JMSException; public void destroy(); }
实现类(部分代码折行了,大家将就着看吧):
public class ActiveMQServiceImpl implements MQService { private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQServiceImpl.class); private static MQService instance = null; private InitialContext initialContext; private QueueConnectionFactory queueConnectionFactory; private QueueConnection queueConnection; private QueueSession queueSession; private Queue queue; private QueueSender queueSender; private QueueReceiver queueReceiver; private TopicConnectionFactory topicConnectionFactory; private TopicConnection topicConnection; private TopicSession topicSession; private Topic topic; private TopicPublisher topicPublisher; private TopicSubscriber topicSubscriber; protected ActiveMQServiceImpl() { try { initialContext = new InitialContext(getInitalContextTable()); initQueue(); initTopic(); LOGGER.info("AMQ init complete!"); } catch (Exception e) { LOGGER.error("failed to connect mq:",e); } } private static Hashtable<String,String> getInitalContextTable(){ Hashtable<String,String> table=new Hashtable<String,String>(); table.put("java.naming.factory.initial", (String)ConfigProject.AMQ_CONFIG.getString("java.naming.factory.initial")); table.put("java.naming.provider.url", (String)ConfigProject.AMQ_CONFIG.getString("java.naming.provider.url")); return table; } public static MQService getInstance(){ if(instance==null){ instance = new ActiveMQServiceImpl(); return instance; } return instance; } protected void initQueue() throws Exception{ LOGGER.info("initQueue begin..."); //获取queueConnectionFactory,通过javax.jms.ConnectionFactory. queueConnectionFactory = (QueueConnectionFactory) initialContext.lookup("ConnectionFactory"); //创建queueConnection if (ConfigProject.AMQ_CONFIG.getProperty("java.naming.security.principal")!=null) { queueConnection = queueConnectionFactory.createQueueConnection( (String)ConfigProject.AMQ_CONFIG.getProperty("java.naming.security.principal"), (String)ConfigProject.AMQ_CONFIG.getProperty("java.naming.security.credentials") ); } else { queueConnection = queueConnectionFactory.createQueueConnection(); } //设置queueConnection的clientId持久化消息 String clientID = "queue@"; try { clientID += InetAddress.getLocalHost(); } catch (Exception e) { clientID += UUID.randomUUID().toString(); } queueConnection.setClientID(clientID); //创建queueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); //创建queue queue = queueSession.createQueue((String)ConfigProject.AMQ_CONFIG.getProperty("test.notification.queue")); //创建queueSender发送消息 queueSender = queueSession.createSender(queue); //设置消息是否持久化:如果消费端没有接收到消息,设置持久化后即使AMQ服务器重启,消费端启动后也会收到消息. //如果设置非持久化,消息只保存在AMQ服务器内存中,一旦服务器重启,消费端将收不到消息. queueSender.setDeliveryMode(DeliveryMode.PERSISTENT); //创建queueReceiver接收消息 queueReceiver = queueSession.createReceiver(queue); queueReceiver.setMessageListener(new QueueReceiveListener()); LOGGER.info("initQueue end..."); } protected void initTopic() throws Exception{ LOGGER.info("initTopic begin..."); //获取topicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) initialContext.lookup("ConnectionFactory"); //创建topicConnection if (ConfigProject.AMQ_CONFIG.getProperty("java.naming.security.principal") != null) { topicConnection = topicConnectionFactory.createTopicConnection( (String)ConfigProject.AMQ_CONFIG.getProperty("java.naming.security.principal"), (String)ConfigProject.AMQ_CONFIG.getProperty("java.naming.security.credentials")); } else { topicConnection = topicConnectionFactory.createTopicConnection(); } //设置clientId String clientID = "topic@"; try { clientID += InetAddress.getLocalHost(); } catch (Exception e) { clientID += UUID.randomUUID().toString(); } topicConnection.setClientID(clientID); //创建topicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); //创建topic topic = topicSession.createTopic((String)ConfigProject.AMQ_CONFIG.getProperty("test.notification.topic")); //创建发布者 topicPublisher = topicSession.createPublisher(topic); topicPublisher.setDeliveryMode(DeliveryMode.PERSISTENT); //创建消费者 topicSubscriber = topicSession.createSubscriber(topic); topicSubscriber.setMessageListener(new TopicReciverListener()); LOGGER.info("initTopic end..."); } public void start() { if(queueConnection!=null) { try { queueConnection.start(); } catch (JMSException e) { LOGGER.error("failed to start queue connection!",e); } } if(topicConnection!=null) { try { topicConnection.start(); } catch (JMSException e) { LOGGER.error("failed to start topic connection!",e); } } } public void sendQueueMessage(String text) throws JMSException { queueSender.send(queueSession.createTextMessage(text)); } public void publishTopicMessage(String text) throws JMSException { topicPublisher.publish(topicSession.createTextMessage(text)); } public void destroy() { try { this.queueSender.close(); this.queueReceiver.close(); this.queueSession.close(); this.queueConnection.close(); this.topicPublisher.close(); this.topicSubscriber.close(); this.topicSession.close(); this.topicConnection.close(); this.initialContext.close(); } catch (NamingException e) { LOGGER.warn("failed to shutdown mq", e); } catch (Exception e) { LOGGER.warn("failed to shutdown mq", e); } } }
上面程序中:
ConfigProject是个配置工具类,读取properties中的AMQ配置信息。
JNDI初始化InitialContext时一定要注意加载amq的InitialContextFactory和请求url地址。
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=failover://(nio://1x.xx.xx.x3:61616,nio://1x.xx.xx.x4:61616)randomize=false
日志用的是logback+slf4j这2个开源组件。
QueueReceiveListener是队列消费端的监听器,TopicReciverListener是主题订阅者的监听器。
QueueReceiveListener.java:
/** * 消息监听类: * 如果需要回复,则可以作为内部类实现. */ public class QueueReceiveListener implements MessageListener{ private static final Logger LOGGER = LoggerFactory.getLogger(QueueReceiveListener.class); public void onMessage(Message message) { LOGGER.info("QueueReceiveListener onMessage begin..."); try{ if(message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; LOGGER.info("textMessage:"+textMessage.getText()); } }catch(Exception e){ e.printStackTrace(); } LOGGER.info("QueueReceiveListener onMessage end..."); } }
TopicReciverListener.java:
public class TopicReciverListener implements MessageListener{ private static final Logger LOGGER = LoggerFactory.getLogger(TopicReciverListener.class); public void onMessage(Message message) { LOGGER.info("TopicReciverListener onMessage begin..."); try{ if(message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; LOGGER.info("textMessage:"+textMessage.getText()); } }catch(Exception e){ e.printStackTrace(); } LOGGER.info("TopicReciverListener onMessage end..."); } }
同样,这两个监听类也可以通过内部类的形式写在实现类内部,例如需要回复收到的消息时。此处因为没有回复消息的功能,所以就单独写了一个类。
最后,定义一个工具类方便调用:
public class MQServiceUtils { //此处同样可以使用IoC框架注入 private static MQService mqService = ActiveMQServiceImpl.getInstance(); public static void sendMessage(String message){ try{ mqService.sendQueueMessage(message); }catch(Exception e){ e.printStackTrace(); } } public static void publishMessage(String message){ try{ mqService.publishTopicMessage(message); }catch(Exception e){ e.printStackTrace(); } } }
更新中...