RabbitMQ 一、数据丢失的三个场景 二、消息发送端:confirm机制(生产者-->MQ server) 三、消息中间件端:消息持久化(exchange 和 queue 的持久化) 四、消息消费端:ACK事务机制(队列-->消费者) 五、补充方案1:设置集群镜像模式 六、补充方案2:消息补偿机制 参考文献

一条消息从生产者发送到消费者消费的过程:

RabbitMQ
一、数据丢失的三个场景
二、消息发送端:confirm机制(生产者-->MQ server)
三、消息中间件端:消息持久化(exchange 和 queue 的持久化)
四、消息消费端:ACK事务机制(队列-->消费者)
五、补充方案1:设置集群镜像模式
六、补充方案2:消息补偿机制
参考文献

可以看出,一条消息整个过程要经历两次的网络传输:

  • 从生产者发送到RabbitMQ服务器,从RabbitMQ服务器发送到消费者
  • 在消费者未消费前存储在队列(Queue)中

所以可以知道,有三个场景下是会发生消息丢失的

  1. 生产者发送消息到RabbitMQ服务器过程中,RabbitMQ服务器如果宕机停止服务,消息会丢失。
  2. 存储在队列中,如果队列没有对消息持久化,RabbitMQ服务器宕机重启会丢失数据。
  3. 消费者从RabbitMQ服务器获取队列中存储的数据消费,但是消费者程序出错或者宕机而没有正确消费,导致数据丢失。

针对以上三种场景,RabbitMQ提供了三种解决的方式,分别是confirm机制,消息持久化,ACK事务机制。

RabbitMQ
一、数据丢失的三个场景
二、消息发送端:confirm机制(生产者-->MQ server)
三、消息中间件端:消息持久化(exchange 和 queue 的持久化)
四、消息消费端:ACK事务机制(队列-->消费者)
五、补充方案1:设置集群镜像模式
六、补充方案2:消息补偿机制
参考文献

二、消息发送端:confirm机制(生产者-->MQ server)

生产者发送到RabbitMQ Server时,有可能因为网络问题导致投递失败,从而丢失数据。我们可以使用confirm模式防止数据丢失。工作流程是怎么样的呢,看以下图解:

  • 第一步,一条消息从生产者发送到RabbitMQ,首先会发送到Exchange,对应回调函数confirm()
  • 第二步,从Exchange路由分配到Queue中,对应回调函数则是returnedMessage()

注意:

  • exchange无论是否收到消息,都会回调confirm()函数
  • 只有当exchange无法路由到queue,才会调用returnedMessage()

RabbitMQ
一、数据丢失的三个场景
二、消息发送端:confirm机制(生产者-->MQ server)
三、消息中间件端:消息持久化(exchange 和 queue 的持久化)
四、消息消费端:ACK事务机制(队列-->消费者)
五、补充方案1:设置集群镜像模式
六、补充方案2:消息补偿机制
参考文献

代码实现

代码怎么实现呢,请看演示:

首先在application.yml配置文件中加上如下配置:

spring:
  rabbitmq:
    publisher-confirms: true
    publisher-returns: true
    template:
      mandatory: true

//publisher-confirms:设置为true时。当消息投递到Exchange后,会回调confirm()方法进行通知生产者
//publisher-returns:设置为true时。当消息匹配到Queue并且失败时,会通过回调returnedMessage()方法返回消息
//spring.rabbitmq.template.mandatory: 设置为true时。指定消息在没有被队列接收时会通过回调returnedMessage()方法退回。

有个小细节,publisher-returns和mandatory如果都设置的话,优先级是以mandatory优先。可以看源码:

RabbitMQ
一、数据丢失的三个场景
二、消息发送端:confirm机制(生产者-->MQ server)
三、消息中间件端:消息持久化(exchange 和 queue 的持久化)
四、消息消费端:ACK事务机制(队列-->消费者)
五、补充方案1:设置集群镜像模式
六、补充方案2:消息补偿机制
参考文献

接着我们需要定义回调方法:

@Component
public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class);

    /**
     * 监听消息是否到达Exchange
     *
     * @param correlationData 包含消息的唯一标识的对象
     * @param ack             true 标识 ack,false 标识 nack
     * @param cause           nack 投递失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            logger.info("消息投递成功~消息Id:{}", correlationData.getId());
        } else {
            logger.error("消息投递失败,Id:{},错误提示:{}", correlationData.getId(), cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        logger.info("消息没有路由到队列,获得返回的消息");
        Map map = byteToObject(message.getBody(), Map.class);
        logger.info("message body: {}", map == null ? "" : map.toString());
        logger.info("replyCode: {}", replyCode);
        logger.info("replyText: {}", replyText);
        logger.info("exchange: {}", exchange);
        logger.info("routingKey: {}", exchange);
        logger.info("------------> end <------------");
    }

    @SuppressWarnings("unchecked")
    private <T> T byteToObject(byte[] bytes, Class<T> clazz) {
        T t;
        try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
             ObjectInputStream ois = new ObjectInputStream(bis)) {
            t = (T) ois.readObject();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
        return t;
    }
}

我这里就简单地打印回调方法返回的消息,在实际项目中,可以把返回的消息存储到日志表中,使用定时任务进行进一步的处理。

我这里是使用RabbitTemplate进行发送,所以在Service层的RabbitTemplate需要设置一下:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
 @Resource
    private RabbitmqConfirmCallback rabbitmqConfirmCallback;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        //指定 ConfirmCallback
        rabbitTemplate.setConfirmCallback(rabbitmqConfirmCallback);
        //指定 ReturnCallback
        rabbitTemplate.setReturnCallback(rabbitmqConfirmCallback);
    }
    
    @Override
    public String sendMsg(String msg) throws Exception {
        Map<String, Object> message = getMessage(msg);
        try {
            CorrelationData correlationData = (CorrelationData) message.remove("correlationData");
            rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message, correlationData);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
    
 private Map<String, Object> getMessage(String msg) {
        String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
        CorrelationData correlationData = new CorrelationData(msgId);
        String sendTime = sdf.format(new Date());
        Map<String, Object> map = new HashMap<>();
        map.put("msgId", msgId);
        map.put("sendTime", sendTime);
        map.put("msg", msg);
        map.put("correlationData", correlationData);
        return map;
    }
}

大功告成!接下来我们进行测试,发送一条消息,我们可以控制台:

RabbitMQ
一、数据丢失的三个场景
二、消息发送端:confirm机制(生产者-->MQ server)
三、消息中间件端:消息持久化(exchange 和 queue 的持久化)
四、消息消费端:ACK事务机制(队列-->消费者)
五、补充方案1:设置集群镜像模式
六、补充方案2:消息补偿机制
参考文献

假设发送一条信息没有路由匹配到队列,可以看到如下信息:

RabbitMQ
一、数据丢失的三个场景
二、消息发送端:confirm机制(生产者-->MQ server)
三、消息中间件端:消息持久化(exchange 和 queue 的持久化)
四、消息消费端:ACK事务机制(队列-->消费者)
五、补充方案1:设置集群镜像模式
六、补充方案2:消息补偿机制
参考文献

这就是confirm模式。它的作用是为了保障生产者投递消息到RabbitMQ不会出现消息丢失。

三、消息中间件端:消息持久化(exchange 和 queue 的持久化)

RabbitMQ是支持消息持久化的,消息持久化需要设置:Exchange为持久化和Queue持久化,这样当消息发送到RabbitMQ服务器时,消息就会持久化。

3.1 exchange的持久化

首先看Exchange交换机的类图:

RabbitMQ
一、数据丢失的三个场景
二、消息发送端:confirm机制(生产者-->MQ server)
三、消息中间件端:消息持久化(exchange 和 queue 的持久化)
四、消息消费端:ACK事务机制(队列-->消费者)
五、补充方案1:设置集群镜像模式
六、补充方案2:消息补偿机制
参考文献

看这个类图其实是要说明上一篇文章介绍的四种交换机都是AbstractExchange抽象类的子类,所以根据java的特性,创建子类的实例会先调用父类的构造器,父类也就是AbstractExchange的构造器是怎么样的呢?

RabbitMQ
一、数据丢失的三个场景
二、消息发送端:confirm机制(生产者-->MQ server)
三、消息中间件端:消息持久化(exchange 和 queue 的持久化)
四、消息消费端:ACK事务机制(队列-->消费者)
五、补充方案1:设置集群镜像模式
六、补充方案2:消息补偿机制
参考文献

从上面的注释可以看到durable参数表示是否持久化。默认是持久化(true)。创建持久化的Exchange可以这样写:

@Bean
    public DirectExchange rabbitmqDemoDirectExchange() {
        //Direct交换机
        return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
    }

3.2 queue的持久化

接着是Queue队列,我们先看看Queue的构造器是怎么样的:

RabbitMQ
一、数据丢失的三个场景
二、消息发送端:confirm机制(生产者-->MQ server)
三、消息中间件端:消息持久化(exchange 和 queue 的持久化)
四、消息消费端:ACK事务机制(队列-->消费者)
五、补充方案1:设置集群镜像模式
六、补充方案2:消息补偿机制
参考文献

也是通过durable参数设置是否持久化,默认是true。所以创建时可以不指定:

 @Bean
    public Queue fanoutExchangeQueueA() {
     //只需要指定名称,默认是持久化的
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A);
    }

3.3 怎么证明持久化成功

怎么证明是已经持久化了呢,实际上可以找到对应的文件:

RabbitMQ
一、数据丢失的三个场景
二、消息发送端:confirm机制(生产者-->MQ server)
三、消息中间件端:消息持久化(exchange 和 queue 的持久化)
四、消息消费端:ACK事务机制(队列-->消费者)
五、补充方案1:设置集群镜像模式
六、补充方案2:消息补偿机制
参考文献

找到对应磁盘中的目录:

RabbitMQ
一、数据丢失的三个场景
二、消息发送端:confirm机制(生产者-->MQ server)
三、消息中间件端:消息持久化(exchange 和 queue 的持久化)
四、消息消费端:ACK事务机制(队列-->消费者)
五、补充方案1:设置集群镜像模式
六、补充方案2:消息补偿机制
参考文献

消息持久化可以防止消息在RabbitMQ Server中不会因为宕机重启而丢失。

四、消息消费端:ACK事务机制(队列-->消费者)

原本:消费者从队列中获取到消息后,会直接确认签收,假设消费者宕机或者程序出现异常,数据没有正常消费,这种情况就会出现数据丢失。

修改:所以关键在于把自动签收改成手动签收,正常消费则返回确认签收,如果出现异常,则返回拒绝签收重回队列。

RabbitMQ
一、数据丢失的三个场景
二、消息发送端:confirm机制(生产者-->MQ server)
三、消息中间件端:消息持久化(exchange 和 queue 的持久化)
四、消息消费端:ACK事务机制(队列-->消费者)
五、补充方案1:设置集群镜像模式
六、补充方案2:消息补偿机制
参考文献

代码实现

首先在消费者的application.yml文件中设置事务提交为manual手动模式:

spring:
  rabbitmq:
    listener:
      simple:
  acknowledge-mode: manual # 手动ack模式
        concurrency: 1 # 最少消费者数量
        max-concurrency: 10 # 最大消费者数量

然后编写消费者的监听器:

@Component
public class RabbitDemoConsumer {

    enum Action {
        //处理成功
        SUCCESS,
        //可以重试的错误,消息重回队列
        RETRY,
        //无需重试的错误,拒绝消息,并从队列中删除
        REJECT
    }

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
    public void process(String msg, Message message, Channel channel) {
        long tag = message.getMessageProperties().getDeliveryTag();
        Action action = Action.SUCCESS;
        try {
            System.out.println("消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:" + msg);
            if ("bad".equals(msg)) {
                throw new IllegalArgumentException("测试:抛出可重回队列的异常");
            }
            if ("error".equals(msg)) {
                throw new Exception("测试:抛出无需重回队列的异常");
            }
        } catch (IllegalArgumentException e1) {
            e1.printStackTrace();
            //根据异常的类型判断,设置action是可重试的,还是无需重试的
            action = Action.RETRY;
        } catch (Exception e2) {
            //打印异常
            e2.printStackTrace();
            //根据异常的类型判断,设置action是可重试的,还是无需重试的
            action = Action.REJECT;
        } finally {
            try {
                if (action == Action.SUCCESS) {
                    //multiple 表示是否批量处理。true表示批量ack处理小于tag的所有消息。false则处理当前消息
                    channel.basicAck(tag, false);
                } else if (action == Action.RETRY) {
                    //Nack,拒绝策略,消息重回队列
                    channel.basicNack(tag, false, true);
                } else {
                    //Nack,拒绝策略,并且从队列中删除
                    channel.basicNack(tag, false, false);
                }
                channel.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

解释一下上面的代码:

  • 如果没有异常,则手动确认回复RabbitMQ服务端basicAck(消费成功)。
  • 如果抛出某些可以重回队列的异常,我们就回复basicNack并且设置重回队列。
  • 如果是抛出不可重回队列的异常,就回复basicNack并且设置从RabbitMQ的队列中删除。

接下来进行测试,发送一条普通的消息"hello":

RabbitMQ
一、数据丢失的三个场景
二、消息发送端:confirm机制(生产者-->MQ server)
三、消息中间件端:消息持久化(exchange 和 queue 的持久化)
四、消息消费端:ACK事务机制(队列-->消费者)
五、补充方案1:设置集群镜像模式
六、补充方案2:消息补偿机制
参考文献

解释一下ack返回的三个方法的意思。

①成功确认

void basicAck(long deliveryTag, boolean multiple) throws IOException;

消费者成功处理后调用此方法对消息进行确认。

  • deliveryTag:该消息的index
  • multiple:是否批量.。true:将一次性ack所有小于deliveryTag的消息。

②失败确认

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
  • deliveryTag:该消息的index。
  • multiple:是否批量。true:将一次性拒绝所有小于deliveryTag的消息。
  • requeue:被拒绝的是否重新入队列。

③失败确认

void basicReject(long deliveryTag, boolean requeue) throws IOException;
  • deliveryTag:该消息的index。
  • requeue:被拒绝的是否重新入队列。

basicNack()和basicReject()的区别在于:basicNack()可以批量拒绝,basicReject()一次只能拒接一条消息。

五、补充方案1:设置集群镜像模式

我们先来介绍下RabbitMQ三种部署模式:

1)单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。
2)普通模式:默认的集群模式,某个节点挂了,该节点上的消息不能用,有影响的业务瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。
3)镜像模式:把需要的队列做成镜像队列(互为镜像的是队列,并非节点),一个消息会存在于多个节点的队列中,属于RabbitMQ的HA方案(高可用方案)

下面介绍下镜像模式的三种高可用策略模式:

1)同步至所有的broker节点
2)同步最多N个机器
3)只同步至符合指定名称的nodes

命令处理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

1)为每个以“rock.wechat”开头的队列设置所有节点的镜像,并且设置为自动同步模式
rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

2)为每个以“rock.wechat.”开头的队列设置两个节点的镜像,并且设置为自动同步模式
rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat"
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

3)为每个以“node.”开头的队列分配指定的节点做镜像
rabbitmqctl set_policy ha-nodes "^nodes."
'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

但是:HA 镜像队列有一个很大的缺点就是:系统的吞吐量会有所下降

六、补充方案2:消息补偿机制

我们通过之前的方案 , 基本上已经能够保证消息投递成功了 ! 为什么还要消息补偿机制呢? 难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题。但是事无完全:

比如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?

产线网络环境太复杂,所以不知数太多,所以要做消息补偿机制 !

消息补偿机制需要建立在业务数据库和MQ数据库的基础之上 , 当我们发送消息时 , 需要同时将消息数据保存在数据库中, 两者的状态必须记录。 然后通过业务数据库和MQ数据库的对比检查消费是否成功,不成功,进行消息补偿措施,重新发送消息处理。RabbitMQ
一、数据丢失的三个场景
二、消息发送端:confirm机制(生产者-->MQ server)
三、消息中间件端:消息持久化(exchange 和 queue 的持久化)
四、消息消费端:ACK事务机制(队列-->消费者)
五、补充方案1:设置集群镜像模式
六、补充方案2:消息补偿机制
参考文献


RabbitMQ
一、数据丢失的三个场景
二、消息发送端:confirm机制(生产者-->MQ server)
三、消息中间件端:消息持久化(exchange 和 queue 的持久化)
四、消息消费端:ACK事务机制(队列-->消费者)
五、补充方案1:设置集群镜像模式
六、补充方案2:消息补偿机制
参考文献

参考文献

https://zhuanlan.zhihu.com/p/166426241 

https://www.cnblogs.com/flyrock/p/8859203.html

https://juejin.cn/post/6866647684682350600