ACTIVEMQ JMS PUBLISH/SUBSCRIBE范例

ACTIVEMQ JMS PUBLISH/SUBSCRIBE实例

--学习笔记,转自http://www.cnblogs.com/phoebus0501/archive/2011/02/25/1965276.html

 

一、MessagePublisher

package jms.activemq.myexample;
 
import java.util.Date;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class MessagePublisher implements Runnable {
    private String url;
    private String user;
    private String password;
    private String topicName;
 
    public MessagePublisher(String topicName, String url, String user,
            String password) {
        this.url = url;
        this.user = user;
        this.password = password;
        this.topicName = topicName;
    }
 
    @Override
    public void run() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                user, password, url);
        Session session = null;
        MessageProducer sendPublisher;
        Connection connection = null;
 
        int messageCount = 0;
        try {
            connection = connectionFactory.createConnection();
 
            connection.start();
             
            //创建Topic
            //Topic topic = new ActiveMQTopic(this.topicName);
            session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic(this.topicName);
            sendPublisher = session.createProducer(topic);
            while (true) {
                String text = new Date() + "现在发送是第" + messageCount + "条消息";
 
                sendPublisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                sendPublisher.send(session.createTextMessage(text));
 
                if ((++messageCount) == 3) {
                    // 发够十条消息退出
                    break;
                }
                Thread.sleep(1000);
            }
 
            //sendPublisher.close();
 
            //connection.close();
             
            System.out.println("发布消息线程结束!!!!!!!!!!!!!!!!!!!!!!!!");
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
 
    public String getUrl() {
        return url;
    }
 
    public void setUrl(String url) {
        this.url = url;
    }
 
    public String getUser() {
        return user;
    }
 
    public void setUser(String user) {
        this.user = user;
    }
 
    public String getPassword() {
        return password;
    }
 
    public void setPassword(String password) {
        this.password = password;
    }
 
    public String getTopic() {
        return topicName;
    }
 
}

 

二、MessageSubscriber

package jms.activemq.myexample;
 
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
 
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class MessageSubscriber implements Runnable {
    private String url;
    private String user;
    private String password;
    private String topicName;
 
    public MessageSubscriber(String topicName, String url, String user,
            String password) {
        this.url = url;
        this.user = user;
        this.password = password;
        this.topicName = topicName;
    }
 
    @Override
    public void run() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                user, password, url);
        Session session = null;
        MessageConsumer subscriber;
        Connection connection = null;
        try {
            connection = connectionFactory.createConnection();
 
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
            // 创建Topic
            Topic topic = session.createTopic(this.topicName);
 
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
 
            subscriber = session.createConsumer(topic);
            subscriber.setMessageListener(new TextListener());
            connection.start();
 
            System.out.println(Thread.currentThread().getName() + "开启");
 
            // connection.close();
        } catch (JMSException e) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
            }
        }
    }
 
    public String getUrl() {
        return url;
    }
 
    public void setUrl(String url) {
        this.url = url;
    }
 
    public String getUser() {
        return user;
    }
 
    public void setUser(String user) {
        this.user = user;
    }
 
    public String getPassword() {
        return password;
    }
 
    public void setPassword(String password) {
        this.password = password;
    }
 
    public String getTopic() {
        return topicName;
    }
}

 

三、TextListener

package jms.activemq.myexample;
 
 
import javax.jms.*;
 
/**
 * Text消息监听
 * 
 * @author XXXX<br> * 
 */
public class TextListener implements MessageListener {
 
    /**
     * Casts the message to a TextMessage and displays its text.
     * 
     * @param message
     *            the incoming message
     */
    public void onMessage(Message message) {
        TextMessage msg = null;
 
        try {
            if (message instanceof TextMessage) {
                msg = (TextMessage) message;
                System.out.println("Reading message: " + msg.getText());
            } else {
                System.out.println("Message of wrong type: "
                        + message.getClass().getName());
            }
        } catch (JMSException e) {
            System.out.println("JMSException in onMessage(): " + e.toString());
        } catch (Throwable t) {
            System.out.println("Exception in onMessage():" + t.getMessage());
        }
    }
}

 

四、MyActiveMQDemo

package jms.activemq.myexample;
 
import javax.jms.JMSException;
 
public class MyActiveMQDemo {
    public static void main(String[] args) throws InterruptedException, JMSException {
        String url = "tcp://localhost:61616";
        String user = null;
        String password = null;
        String query = "MyQueueA";
        String topic = "TestTopic";
         
         
//      new Thread(new MessageSender(query,url,user,password), "Name-Sender").start();
//      new Thread(new MessageReceiver(query,url,user,password), "Name-Receiver1").start();
//      new Thread(new MessageReceiver(query,url,user,password), "Name-Receiver2").start();
//      new Thread(new MessageReceiver(query,url,user,password), "Name-Receiver3").start();
//      new Thread(new MessageReceiver(query,url,user,password), "Name-Receiver4").start();
//      new Thread(new MessageReceiver(query,url,user,password), "Name-Receiver5").start();
         
        new Thread(new MessageSubscriber(topic,url,user,password), "Name-Subscriber1").start();
        new Thread(new MessageSubscriber(topic,url,user,password), "Name-Subscriber2").start();
        new Thread(new MessageSubscriber(topic,url,user,password), "Name-Subscriber3").start();
        new Thread(new MessageSubscriber(topic,url,user,password), "Name-Subscriber4").start();
        new Thread(new MessageSubscriber(topic,url,user,password), "Name-Subscriber5").start();
        Thread.sleep(5000);
        new Thread(new MessagePublisher(topic,url,user,password), "Name-Publisher").start();
        //new TopicPublisher().run(); 
    }
}