JMS学习笔记 (4)Spring与ActiveMQ结合
JMS学习笔记 (四)Spring与ActiveMQ结合
一 实验环境
1.jdk1.6
2.spring 2.5
3.apache-activemq-5.10.0
二 点对点消息模型的收发消息示例
负责发送消息的ProductService具体代码
负责接收消息的ConsumerService具体代码
在spring的配置文件中加入如下配置信息
运行ConsumerService 中的main方法效果如下


点对点的消息模型,除了上面的接收消息方法外Spring JMS提供了消息监听的模式代码和配置如下
在spring 的配置文件中配置监听容器以及侦听者
运行后的效果如下

二 主题消息模型的收发消息示例
负责发布主题(消息发布者)TopicProvider代码如下
负责接收已订阅的消息(消息监听者)
在spring 的配置中加入以下代码
运行TopicProvider中的main方法,效果如下

一 实验环境
1.jdk1.6
2.spring 2.5
3.apache-activemq-5.10.0
二 点对点消息模型的收发消息示例
负责发送消息的ProductService具体代码
package com.testactivemq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class ProductService { private JmsTemplate jmsTemplate; /**** * 向默认队列发送信息,默认队列即是注入jmsTemplate中的消息队列 * @param msg * @return */ public void sendMessage(final String msg) { String destination = jmsTemplate.getDefaultDestination().toString(); System.out.println("向队列" +destination+ "发送了消息: " + msg); jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage tx=session.createTextMessage(); tx.setText(msg); return tx; } }); } /*** * 向指定队列发送消息 * @param destination * @param msg */ public void sendMessage(String destination,final String msg) { System.out.println("向队列" +destination+ "发送了消息:" + msg); jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage tx=session.createTextMessage(); tx.setText(msg); return tx; } }); } public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } }
负责接收消息的ConsumerService具体代码
package com.testactivemq; import javax.jms.JMSException; import javax.jms.TextMessage; import org.springframework.context.ApplicationContext; import org.springframework.context.support.FileSystemXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; public class ConsumerService { private JmsTemplate jmsTemplate; /**** * 接收默认的消息 */ public void receive() { TextMessage tm = (TextMessage)jmsTemplate.receive(); String destinationName=jmsTemplate.getDefaultDestination().toString(); try { System.out.println("从队列" + destinationName + "收到了消息: "+tm.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /*** * 接收指定消息队列 * @param destination */ public void receive(String destination) { TextMessage tm = (TextMessage) jmsTemplate.receive(destination); try { System.out.println("从队列" + destination + "收到了消息:"+tm.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static void main(String[] args) { //读取spring配置文件的路径 ApplicationContext ac = new FileSystemXmlApplicationContext("F:\\workerspace\\MyTest\\WEB-INF\\classes\\applicationContext.xml"); ProductService ps= (ProductService) ac.getBean("productService"); ConsumerService cs1=(ConsumerService) ac.getBean("consumerService"); ps.sendMessage("哈哈"); cs1.receive(); //以下是从queue2队列发送和读取消息 ps.sendMessage("queue2", "萌萌达"); cs1.receive("queue2"); } public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } }
在spring的配置文件中加入如下配置信息
<!-- 配置JMS连接工厂 --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="failover:(tcp://localhost:61616)" /> </bean> <!-- 定义消息队列(Queue) --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg> <value>queue1</value> </constructor-arg> </bean> <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="queueDestination" /> <property name="receiveTimeout" value="10000" /> </bean> <!-- 定义消息队列(Queue) --> <bean id="queueDestination2" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg> <value>queue2</value> </constructor-arg> </bean> <!--queue消息生产者 --> <bean id="productService" class="com.testactivemq.ProductService"> <property name="jmsTemplate" ref="jmsTemplate"></property> </bean> <!--queue消息消费者 --> <bean id="consumerService" class="com.testactivemq.ConsumerService"> <property name="jmsTemplate" ref="jmsTemplate"></property> </bean>
运行ConsumerService 中的main方法效果如下
点对点的消息模型,除了上面的接收消息方法外Spring JMS提供了消息监听的模式代码和配置如下
package com.testactivemq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class QueueMessageListener implements MessageListener { //当收到消息时,自动调用该方法,运行后会一直停留在进程当中。 public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { String destination=tm.getJMSDestination().toString(); System.out.println("收到["+destination+"]文本消息:" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
在spring 的配置文件中配置监听容器以及侦听者
<!-- 配置消息队列监听者(Queue)--> <bean id="queueMessageListener" class="com.testactivemq.QueueMessageListener" /> <!-- 消息监听容器(Queue),配置连接工厂,监听的队列是queue2,监听器是前面定义的监听器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination2" /> <property name="messageListener" ref="queueMessageListener" /> </bean>
public static void main(String[] args) { ApplicationContext ac = new FileSystemXmlApplicationContext("F:\\workerspace\\MyTest\\WEB-INF\\classes\\applicationContext.xml"); ProductService ps= (ProductService) ac.getBean("productService"); ConsumerService cs1=(ConsumerService) ac.getBean("consumerService"); //以下是从queue2队列发送消息,读取消息在QueueMessageListener 中的onMessage方法自动执行 ps.sendMessage("queue2", "萌萌达"); }
运行后的效果如下
二 主题消息模型的收发消息示例
负责发布主题(消息发布者)TopicProvider代码如下
package com.testactivemq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.context.ApplicationContext; import org.springframework.context.support.FileSystemXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class TopicProvider { private JmsTemplate topicJmsTemplate; /** * 向指定的topic发布消息 * * @param topic * @param msg */ public void publish(final String topic, final String msg) { topicJmsTemplate.send(topic, new MessageCreator() { public Message createMessage(Session session) throws JMSException { System.out.println("主题 是[" + topic + "],发布消息内容为: " + msg); return session.createTextMessage(msg); } }); } public static void main(String[] args) { ApplicationContext ac = new FileSystemXmlApplicationContext("F:\\workerspace\\MyTest\\WEB-INF\\classes\\applicationContext.xml"); TopicProvider tp= (TopicProvider) ac.getBean("topicProvider"); //test_topic为题的名称是主题的识别标志 tp.publish("test_topic", "么么达"); } public void setTopicJmsTemplate(JmsTemplate topicJmsTemplate) { this.topicJmsTemplate = topicJmsTemplate; } }
负责接收已订阅的消息(消息监听者)
package com.testactivemq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** *和队列监听的代码一样。 */ public class TopicMessageListener implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { String destination=tm.getJMSDestination().toString(); System.out.println("收到["+destination+"],文本消息:" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
在spring 的配置中加入以下代码
<!-- 定义消息主题(Topic) --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg> <value>test_topic</value> </constructor-arg> </bean> <!-- 配置JMS模板(Topic),pubSubDomain="true"--> <bean id="topicJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="topicDestination" /> <property name="pubSubDomain" value="true" /> <property name="receiveTimeout" value="10000" /> </bean> <!--topic消息发布者 --> <bean id="topicProvider" class="com.testactivemq.TopicProvider"> <property name="topicJmsTemplate" ref="topicJmsTemplate"></property> </bean> <!-- 消息主题监听者 和 主题监听容器 ,主题监听容器可以配置多个,即多个订阅者 --> <!-- 消息主题监听者(Topic) --> <bean id="topicMessageListener" class="com.testactivemq.TopicMessageListener" /> <!-- 主题监听容器 (Topic),订阅者1 --> <bean id="topicJmsContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="topicDestination" /> <property name="messageListener" ref="topicMessageListener" /> </bean> <!-- 主题监听容器 (Topic),订阅者2 --> <bean id="topicJmsContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="topicDestination" /> <property name="messageListener" ref="topicMessageListener" /> </bean>
运行TopicProvider中的main方法,效果如下