activemq in action 读书笔记(五) - 网络连接器
网络连接器
网络连接器是两个broker之间的通道,让他们可以互相交互.一个网络连接器默认是单向的称作转发桥.但是ActiveMq提供一种双工连接器能保证用一个通道接送发送信息.
下图是一个包含两种工作方式的网络代理例子
网络连接器的配置通过以下节点来配置
< networkConnectors >
< networkConnector name="default-nc" uri="multicast://default" />
</ networkConnectors >
在深入理解之前,我们需要先理解一个概念 discovery(挖掘).它是一个可以侦测远程broker的进程.Client和broker都需要发现远程的broker,以便生成网络代理.
静态网络连接器
使用静态化配置网络连接需要通过配置一系列可用的broker URL,首先你得知道所有的URL,协议使用一种复合的URL(url中带着url).一个复合url中包含多个broker地址。
例如:static:(uri1,uri2,uri3,...)?key=value,具体的xml配置如下
<networkConnectors> <networkConnector name="local network" uri="static://(tcp://remotehost1:61616,tcp://remotehost2:61616)"/> </networkConnectors>
以下图片是两个broker在静态网络之间交换数据的例子.消费者附着在brokerB上,生产者附着在brokerA上,生产者生产的消息会被brokerA转发到brokerB上供消费者消费.
让我们来配置这个例子,首先你需要启动两个broker.我们先启动brokerB.brokerB的配置如下.只需修改activemq.xml中transportConnectors节点配置
<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://localhost:61617" /> </transportConnectors>
然后再来配置brokerA.brokerA配置如下.
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="BrokerA" dataDirectory="${activemq.base}/data"> ...... <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616" /> </transportConnectors> <networkConnectors> <networkConnector uri="static:(tcp://localhost:61617)" /> </networkConnectors> .... </broker>
启动brokerA,会发现报错,因为brokerB启动的时候会启动jetty,brokerA启动的时候也会启动jetty,端口冲突了.只需要改变jetty.xml中的端口8186为8162即可.
这时使用三个java类来进行测试.一个生产者,两个消费者.两个消费者分别附着于不同的broker端口.会发现两个消费者都接收到了生产者的消息.
public class SimpleTopicProducer {
private static String brokerURL = "tcp://localhost:61616";
private static ConnectionFactory factory= null;
private static Connection connection= null;
private static Session session = null;
private static Destination destination = null;
private static MessageProducer mp = null;
public static void main(String[] args) throws JMSException {
try{
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic("topic");
mp = session.createProducer(destination);
//发送10条信息到topic
for(int i = 0 ; i < 10 ;i++){
Message message = session.createTextMessage("hah");
mp.send(message);
}
}finally{
mp.close();
session.close();
connection.close();
}
}
}
/**
* 异步接受消息
* @author cfzhou
*
*/
public class SimpleTopicResumerAsync {
private static String brokerURL = "tcp://localhost:61616";
private static ConnectionFactory factory= null;
private static Connection connection= null;
private static Session session = null;
public static void main(String[] args) throws Exception {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
new Thread(new ResumerThread()).start();
}
private static class ResumerThread implements Runnable{
@Override
public void run() {
for(int i = 0 ;i < 3;i++){
try {
Destination destination = session.createTopic("topic");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println(message);
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
public class SimpleTopicResumerAsync2 {
private static String brokerURL = "tcp://localhost:61617";
private static ConnectionFactory factory= null;
private static Connection connection= null;
private static Session session = null;
public static void main(String[] args) throws Exception {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
new Thread(new ResumerThread()).start();
}
private static class ResumerThread implements Runnable{
@Override
public void run() {
for(int i = 0 ;i < 3;i++){
try {
Destination destination = session.createTopic("topic");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println(message);
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
试想一种情况.有一个本地的broker,有许多远程的客户端需要连接这个broker.这样会引起很多不必要的网络开销.为了最小化连接,我们可以在每个远程主机上启动一个broker,然后配置远程broker和本地broker的静态网络连接.这样不仅最小化了远程和本地之间的连接数量,还可以让远程的客户端更有效的操作,消除网络中的长连接意味着更少的出错,以及客户端更少的等待时间.
故障转移协议
之前的例子,一个client都是只连接到一个broker上的.如果这个broker出现故障,client要么失效要么连接到另外一个功能相同的broker上去.
有两种方法可以实现故障转移
1.提供一个可用的broker的列表,用于故障转移传输协议.例如:failover:(uri1,...,uriN)?key=value或者failover:uri1,...,uriN
2.动态的发现可用的broker.
这种协议使用一种随机算法选择连接器,如果连接失败,会自动选择其它的url连接.默认的配置实现了延迟重连的逻辑,意味着在10ms后进行第一次重连,之后的重连间隔的时间是前一次的2倍,知道重连时间等于30000ms.
消费者:
package com.zcf.activemq.failoverexample; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; public class Consumer { private static Logger logger = Logger.getLogger(Consumer.class); private static String brokerURL = "failover:(tcp://localhost:61617,tcp://localhost:61616)";// private static String brokerURL = private static transient ConnectionFactory factory; private transient Connection connection; private transient Session session; private String jobs[] = new String[] { "suspend", "delete" }; public Consumer() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public void close() throws JMSException { if (connection != null) { connection.close(); } } public static void main(String[] args) throws JMSException, InterruptedException { Consumer consumer = new Consumer(); for (String job : consumer.jobs) { Destination destination = consumer.getSession().createQueue("JOBS." + job); MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination); messageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { try { //do something here System.out.println( ((ObjectMessage)message).getObject()); logger.info(((ObjectMessage)message).getObject()); } catch (Exception e) { e.printStackTrace(); } } }); } } public Session getSession() { return session; } }
生产者:
package com.zcf.activemq.failoverexample; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class Publisher { private static String brokerURL = "failover:(tcp://localhost:61617,tcp://localhost:61616)"; private static transient ConnectionFactory factory; private transient Connection connection; private transient Session session; private transient MessageProducer producer; private static int count = 10; private static int total; private static int id = 1000000; private String jobs[] = new String[]{"suspend", "delete"}; public Publisher() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); } public void close() throws JMSException { if (connection != null) { connection.close(); } } public static void main(String[] args) throws JMSException { Publisher publisher = new Publisher(); while (total < 1) { for (int i = 0; i < count; i++) { publisher.sendMessage(); } total += count; System.out.println("Published '" + count + "' of '" + total + "' job messages"); try { Thread.sleep(1000); } catch (InterruptedException x) { } } publisher.close(); } public void sendMessage() throws JMSException { int idx = 0; while (true) { idx = (int)Math.round(jobs.length * Math.random()); if (idx < jobs.length) { break; } } String job = jobs[idx]; Destination destination = session.createQueue("JOBS." + job); Message message = session.createObjectMessage(id++); System.out.println("Sending: id: " + ((ObjectMessage)message).getObject() + " on queue: " + destination); producer.send(destination, message); } }
log4j配置
log4j.rootLogger= debug, stdout log4j.logger.org.apache.activemq=debug ### \u8F93\u51FA\u5230\u63A7\u5236\u53F0 ### log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target = System.out log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern =%d{yyyy-MM-dd HH:mm:ss}%m%n
在没启动Activemq之前,启动Consumer.java,会发现一直提示连接失败(61616,61617),每次连接时间都会翻倍
启动两个broker之后,会提示2016-11-24 17:56:24Successfully connected to tcp://localhost:61616.当我们关闭61616的broker.会提示 Successfully reconnected to tcp://localhost:61617。
默认情况下,如果client与broker直接的connection断开,则client会新起一个线程,不断的从url参数中获取一个url来重试连接。这个机制对于在容器(spring)中使用的connection木有问题。
但是对于简单实现的一个独立运行client,一般重连一次就会出现进程退出的bug.
程序退出不再重连的原因在于重连的线程是daemon的,连接出错以后,其他线程都退出了,这个线程也随即被销毁掉了。
官方修复过一次,在 ActiveMQ Connection Executor 上设置了daemon=false,但是这个线程不一定被创建出来。所以bug依然在。
修复的方法很简单:
FailoverTransport.java 的132 行
reconnectTaskFactory = new TaskRunnerFactory();
reconnectTaskFactory.setDaemon(false); // to set daemon=false by kimmking
reconnectTaskFactory.init();
把重连的线程设置成daemon=false就成。然后再按照上面的步骤来执行,发现多次重启broker,都是可以自动重连的.即使只有一个broker,也可以使用重连机制.
动态网络连接器(用于经常会增加或者删除broker的情况)
ip广播是一种通过ip网络就可以很容易的把数据从源到一组对它感兴趣的接受者的网络机制.ip广播的一个基本概念是所谓的组地址(例如数据源和接受者的IP地址在224.0.0.0和之间的239.255.255.255).源把这些地址作为数据的目的地,接受者使用它来表达对这个组的数据的兴趣.
当ip广播被配置,ActiveMq brokers使用广播协议来公布他们的服务并且定位其它broker服务的地址用来创建代理网络.客户端使用广播来定位brokers以及建议和它们的连接.广播形式的URL如下;
multicast://ipadaddress:port?key=value
具体的配置如下:
<broker xmlns="http://activemq.apache.org/schema/ core" brokerName="multicast" dataDirectory="${activemq.base}/data"> ... <networkConnectors> <networkConnector name="default-nc" uri="multicast://default"/> </networkConnectors> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/> </transportConnectors> ... </broker>
在上面的例子中,使用群组名称"default"来替代具体的IP地址.上面的配置代码片段中有两个地方比较重要.首先,transport connector的discoveryUri属性用于暴露这个传输连接器的URI到名称为default的群组中.所有的希望查找可用代理的客户端都可以使用这个代理。
network connector的uri属性用于查找可用的代理并与之建立代理网络.这样配置后,代理就像客户端一样,使用多点传送协议来查找其他代理。
,移除discoveryUri属性,客户端就无法通过多点协议扫描到代理,
用多点传送协议的一个缺点是侦测是自动的.如果你不想把某个代理添加到群组中,你必须十分小心的设置。
自动侦测协议
这种传输连接器是配置在客户端的.它和故障重连很相似,唯一不同的地方是它使用广播去发现可用的broker并随机选择其中之一.发现协议的URL如下:
discovery:(multicast://default)
点对点协议(PEER PROTOCOL)
它是VM连接器的一个超集,它会建立一个对等的内嵌网络代理
点对点协议的URI语法如下:
peer://peergroup/brokerName?key=value (peer://group1 )
group1代表组名,内嵌代理会与组名是group1的代理之间建立网络通信
FanOut连接器
Fanout是一种通信器群组,用于使得客户端可以同时连接到多个代理并对这些代理进行相同的操作.
Fanout协议的URI语法如下: fanout:(fanoutURI)?key=value
fanoutURI值可以使用静态的URI或者多点传送URI.参考下面的示例: fanout:(static:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)) 客户端将尝试连接掉3个使用静态协议配置的静态代理.
使用动态效果 fanout:(multicast://default) 此协议的目的是发送消息到多个代理.其次,如果你使用的代理属于同一个代理网络,那么指定的消息消费者可能会接收到重复的消息.因此,通常情况下,fanout协议仅用于发布消息到多个相 互之间没有连接在一起的代理.即多个代理之间的独立的。
各个协议的总结