activemq in action 读书笔记(三) -创建一个简单的JMS应用
activemq in action 读书笔记(3) -创建一个简单的JMS应用
创建一个简单的JMS应用
1.1使用jmsapi创建JMS 应用的步骤
1.需要一个jms连接工厂
2.使用工厂创建jms连接
3.开启连接
4.通过连接创建session
5.取得一个destination
6.创建一个生产者
7.创建一个消费者
8.发送或接受消息
9.关闭所有资源.
PS: ConnectionFactory,Connection,Destination支持并发访问,Session,MessageProducer,MessageConsumer不支持并发访问
1.2一个简单的生产者和消费者(点对点队列模式queue)例子
生产者:
public class SimpleProducer { private static String brokerURL = "tcp://localhost:61616"; private static ConnectionFactory factory= null; private static Connection connection= null; private static Session session = null; private static Destination destination = null; private static MessageProducer mp = null; public static void main(String[] args) throws Exception { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("zcf"); mp = session.createProducer(destination); Message message = session.createTextMessage("hah"); mp.send(message); } }
消费者:
public class SimpleResumer { private static String brokerURL = "tcp://localhost:61616"; private static ConnectionFactory factory= null; private static Connection connection= null; private static Session session = null; public static void main(String[] args) throws Exception { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("zcf"); MessageConsumer consumer = session.createConsumer(destination); Message message = consumer.receive(); TextMessage textMessage = (TextMessage) message; System.out.println(textMessage.getText()); /* ; mp = session.createProducer(destination); Message message = session.createTextMessage("hah"); mp.send(message);*/ } }
消费者异步消费:
public class SimpleResumerAsync { private static String brokerURL = "tcp://localhost:61616"; private static ConnectionFactory factory= null; private static Connection connection= null; private static Session session = null; public static void main(String[] args) throws Exception { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("zcf"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { System.out.println(message); } }); } }
1.3一个简单的发布订阅模式(一对多非持久化)例子
package com.zcf.activemq.simpleexample; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 异步接受消息 * @author cfzhou * */ public class SimpleTopicResumerAsync { private static String brokerURL = "tcp://localhost:61616"; private static ConnectionFactory factory= null; private static Connection connection= null; private static Session session = null; public static void main(String[] args) throws Exception { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); new Thread(new ResumerThread()).start(); } private static class ResumerThread implements Runnable{ @Override public void run() { for(int i = 0 ;i < 3;i++){ try { Destination destination = session.createTopic("topic"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { System.out.println(message); } }); } catch (JMSException e) { e.printStackTrace(); } } } } }
package com.zcf.activemq.simpleexample; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 异步接受消息 * @author cfzhou * */ public class SimpleQueueResumerAsync { private static String brokerURL = "tcp://localhost:61616"; private static ConnectionFactory factory= null; private static Connection connection= null; private static Session session = null; public static void main(String[] args) throws Exception { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("zcf"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { System.out.println(message); } }); } }