十、ActiveMQ多节点集群 一、异步投递 二、迟投递和定时投递 三、消息消费的重试机制 四、死信队列 五、消息不被重复消费,幂等性
消息队列如何保证高可用?
基于zookeeper和levelDB搭建activeMQ集群,集群仅提供主备方式的高可用集群功能,避免单点故障。
1.1、异步投递概述
ActiveMQ支持同步,异步两种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著的提高发送性能。
ActiveMQ默认使用异步发送的模式:除非明确指定使用同步发送的方式或者在未使用事物的前提下发送持久的消息,这两种情况都是同步发送的。
如果你没有使用事物且发送的是持久化的消息,每一次发送都是同步发送的且阻塞producer直到broker返回一个确认,这两种情况都是同步发送的。
如果你没有使用事务,且发送的是持久化的消息,每一次发送都是同步发送的且会阻塞producer直到broker返回一个确认,表示消息已经被安全的持久化到磁盘,确认机制提供了消息安全的保证,但同时会阻塞客户端带来了很大的延时。
很多高性能的应该,允许在失败的情况下有少量的数据丢失,如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。
异步发送
它可以最大化producer端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,他可以很大的提升Producer性能;不过也带来了额外的问题。
就是需要消耗较多的Client端内存同时也会导致broker端性能消耗增加。
此外它不能有效的确保消息的发送成功,在useAsyncSend=true的情况下客户端需要容忍消息丢失的可能。
自我理解:此处的异步是指生产者和broker之间发送消息的异步。不是指生产者和消费者之间异步。
官网介绍:http://activemq.apache.org/async-sends
说明:对于一个Slow Consumer,使用同步发送消息可能出成Producer堵塞等情况,慢消费者适合使用异步发送。(这句话我认为有误)
总结:
① 异步发送可以让生产者发的更快。
② 如果异步投递不需要保证消息是否发送成功,发送者的效率会有所提高。如果异步投递还需要保证消息是否成功发送,并采用了回调的方式,发送者的效率提高不多,这种就有些鸡肋。
1.2、代码实现
官网上3中代码实现:
public class Jms_TX_Producer { // 方式1。3种方式任选一种 private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626?jms.useAsyncSend=true"; private static final String ACTIVEMQ_QUEUE_NAME = "Async"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 方式2 activeMQConnectionFactory.setUseAsyncSend(true); Connection connection = activeMQConnectionFactory.createConnection(); // 方式3 ((ActiveMQConnection)connection).setUseAsyncSend(true); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME); MessageProducer producer = session.createProducer(queue); try { for (int i = 0; i < 3; i++) { TextMessage textMessage = session.createTextMessage("tx msg--" + i); producer.send(textMessage); } System.out.println("消息发送完成"); } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); session.close(); connection.close(); } } }
1.3、异步发送如何确认发送成功
异步发送丢消息的场景是:生产者设置useAsyncSend=true,使用producer.send(msg)持续发送消息。由于消息不阻塞,生产者会认为所有send的消息均被成功发送至MQ。如果MQ突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。
所以,正确的异步发送方法是需要接受回调的。
同步发送和异步发送的区别就在此,同步发送等send不阻塞了就表示一定发送成功了。
异步发送需要接收回执并由客户端再判断一次是否发送成功。
public class Jms_TX_Producer { private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626"; private static final String ACTIVEMQ_QUEUE_NAME = "Async"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); activeMQConnectionFactory.setUseAsyncSend(true); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME); ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer)session.createProducer(queue); try { for (int i = 0; i < 3; i++) { TextMessage textMessage = session.createTextMessage("tx msg--" + i); textMessage.setJMSMessageID(UUID.randomUUID().toString()+"orderAtguigu"); final String msgId = textMessage.getJMSMessageID(); activeMQMessageProducer.send(textMessage, new AsyncCallback() { public void onSuccess() { System.out.println("成功发送消息Id:"+msgId); } public void onException(JMSException e) { System.out.println("失败发送消息Id:"+msgId); } }); } System.out.println("消息发送完成"); } catch (Exception e) { e.printStackTrace(); } finally { activeMQMessageProducer.close(); session.close(); connection.close(); } } }
控制台观察发送消息的信息:
二、迟投递和定时投递
官网文档:http://activemq.apache.org/delay-and-schedule-message-delivery.html
四大属性
Property name | type | description |
---|---|---|
AMQ_SCHEDULED_DELAY | long | 延迟投递的时间 |
AMQ_SCHEDULED_PERIOD | long | 重复投递的时间间隔 |
AMQ_SCHEDULED_REPEAT | int | 重复投递的次数 |
AMQ_SCHEDULED_CRON | String | Corn 表达式 |
2.2、修改配置文件并重启
要将activemq.xml中配置schedulerSupport属性设为true
</bean> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" > <destinationPolicy>
之后重启activemq
java代码里面封装的辅助消息类型:ScheduleMessage。
public class Jms_TX_Producer { private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626"; private static final String ACTIVEMQ_QUEUE_NAME = "Schedule01"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME); MessageProducer messageProducer = session.createProducer(queue); long delay = 10*1000; long period = 5*1000; int repeat = 3 ; try { for (int i = 0; i < 3; i++) { TextMessage textMessage = session.createTextMessage("tx msg--" + i); // 延迟的时间 textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); // 重复投递的时间间隔 textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); // 重复投递的次数 textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); // 此处的意思:该条消息,等待10秒,之后每5秒发送一次,重复发送3次。 messageProducer.send(textMessage); } System.out.println("消息发送完成"); } catch (Exception e) { e.printStackTrace(); } finally { messageProducer.close(); session.close(); connection.close(); } } }
- 消费者代码
public class Jms_TX_Consumer { private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626"; private static final String ACTIVEMQ_QUEUE_NAME = "Schedule01"; public static void main(String[] args) throws JMSException, IOException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME); MessageConsumer messageConsumer = session.createConsumer(queue); messageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { if (message instanceof TextMessage) { try { TextMessage textMessage = (TextMessage) message; System.out.println("***消费者接收到的消息: " + textMessage.getText()); textMessage.acknowledge(); } catch (Exception e) { System.out.println("出现异常,消费失败,放弃消费"); } } } }); System.in.read(); messageConsumer.close(); session.close(); connection.close(); } }
三、消息消费的重试机制
3.1、概述
官网文档:http://activemq.apache.org/redelivery-policy
是什么: 消费者收到消息,之后出现异常了,没有告诉broker确认收到该消息,broker会尝试再将该消息发送给消费者。尝试n次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列重,之后broker不会再将该消息发送给消费者。
3.2、体哪些情况会引发消息重发
① Client用了transactions且再session中调用了rollback
② Client用了transactions且再调用commit之前关闭或者没有commit
③ Client再CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover
3.3、请说说消息重发时间间隔和重发次数
间隔:1 次数:6 每秒发6次
3.4、有毒消息Poison ACK
一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(私信队列)。
3.5、属性说明
-
-
maximumRedeliveries:最大重传次数,达到最大重传次数后抛异常,为-1时不限制次数,为0时表示不进行重传,默认是6
-
maximumRedeliveryDelay:最大传送延迟,只在useExponentialBackOff为True时有效,假设首次重连间隔 为10ms,倍数为2,那么第二重连时间间隔为20ms,第三次重连时间间隔为40ms。当重连时间间隔的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔,默认为-1。
-
initalRedeliveryDelay:初始重发延迟时间,默认1000L
-
redeliveryDelay:重发延迟时间,当initalRedeliveryDelay=0时生效,默认1000L。
-
useCollisionAvoidance:启用防止冲突的功能,默认是false。
-
useExponentialBackOff:启用指数倍数递增的方式增加延迟时间,默认false
-
3.6、代码验证
生产者。发送3条数据。代码省略.....
消费者。开启事务,却没有commit。重启消费者,前6次都能收到消息,到第7次,不会再收到消息。代码
public class Jms_TX_Consumer { private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626"; private static final String ACTIVEMQ_QUEUE_NAME = "dead01"; public static void main(String[] args) throws JMSException, IOException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME); MessageConsumer messageConsumer = session.createConsumer(queue); messageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("***消费者接收到的消息: " + textMessage.getText()); //session.commit(); }catch (Exception e){ e.printStackTrace(); } } } }); //关闭资源 System.in.read(); messageConsumer.close(); session.close(); connection.close(); } }
activemq管理后台。多了一个名为ActiveMQ.DLQ队列,里面多了3条消息。
3.7、代码修改默认参数
- 消费者
public class Jms_TX_Consumer { private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626"; private static final String ACTIVEMQ_QUEUE_NAME = "dead01"; public static void main(String[] args) throws JMSException, IOException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 修改默认参数,设置消息消费重试3次 RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); redeliveryPolicy.setMaximumRedeliveries(3); activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME); MessageConsumer messageConsumer = session.createConsumer(queue); messageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("***消费者接收到的消息: " + textMessage.getText()); //session.commit(); }catch (Exception e){ e.printStackTrace(); } } } }); System.in.read(); messageConsumer.close(); session.close(); connection.close(); } }
3.8、整合spring
四、死信队列
4.1、概述
官网文档: http://activemq.apache.org/redelivery-policy 死信队列:异常消息规避处理的集合,主要处理失败的消息。
ActiveMQ中引入了“死信队列”(Dead Letter Queue)的概念,即一条消息再被重发了多次后(默认为重发6次redeliveryCount==6),将会被ActiveMQ移入“死信队列”。开发人员可以在这个Queue中查看处理出错的信息,进行人工干预。
-
-
核心业务队列,就是比如上图专门用来让订单系统发送订单消息的,然后另外一个死信队列就是用来处理异常情况的。
-
假如第三方物流系统故障了此时无法请求,那么仓储系统每次消费到一个订单消息,尝试通知发货和配送都会遇到对方的接口报错,此时仓储系统系统就可以把这条消息拒接访问或者标志位处理失败,一旦标志这条消息处理失败之后,MQ就会把这条消息转入提前设置好的一个死队列中。
-
4.2、死信队列的配置(一般采用默认)
4.存放非持久消息到死信队列中
五、消息不被重复消费,幂等性
如何保证消息不被重复消费呢?幕等性问题你谈谈
-
之间我们学习Web框架阶段的,方式表单重复提交
-
表单 = message
-
网络延迟传输中,会造成进行MQ重试中,在重试的过程中,可能会造成重复消费。
-
如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
-
如果上面两种情况还不行,准备一个第三服务方来消费记录。以redis为列,给消息分配一个全局id,只要消费过该消息,将<id,message>以k-v形式写入redis,那消费者开始消费前,先去redis中查询有没有消费记录即可。
-