RocketMQ从入门到放弃 单机搭建 下载控制台 生产者 指定任意队列个数 消费者 MessageListenerConcurrently和MessageListenerOrderly的区别 消息发送到哪个队列? RocketMq提供的队列选择器 三种发送模式 发送超时设置 消费者设置拉取消息数量 MessageListenerOrderly的返回值 MessageListenerConcurrently的返回值 集群消费 集群消费策略 自定义消费策略 广播消费 两个消费者组,模拟广播消费 同一个消费者组监听不同的Topic会发生什么? 集群搭建双master,没有salve 集群搭建双master,双salve salve节点只能读,不能写 实现顺序消息 消费者如果有异常,不会抛出!!! 延迟消息 消息去重 保证消息的绝对不丢失 单机搭建 下载控制台 生产者 指定任意队列个数 消费者 MessageListenerConcurrently和MessageListenerOrderly的区别 消

下载控制台

生产者

指定任意队列个数

消费者

MessageListenerConcurrently和MessageListenerOrderly的区别

消息发送到哪个队列?

RocketMq提供的队列选择器

三种发送模式

发送超时设置

消费者设置拉取消息数量

MessageListenerOrderly的返回值

MessageListenerConcurrently的返回值

集群消费

集群消费策略

自定义消费策略

广播消费

两个消费者组,模拟广播消费

同一个消费者组监听不同的Topic会发生什么?

集群搭建双master,没有salve

集群搭建双master,双salve

salve节点只能读,不能写

实现顺序消息

消费者如果有异常,不会抛出!!!

延迟消息

消息去重

保证消息的绝对不丢失

单机搭建

http://rocketmq.apache.org/ 官网下载zip包。unzip 命令解压。

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

 RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

 RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

修改 bin 目录下的 runserver和runbroker (元空间

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

 RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

./mqnamesrv    ./mqbroker -n localhost:9876 autoCreateTopicEnable=true 注意:需要先启动nameServer。启动broker时后缀参数必须带上,否则无法找到对应的nameServer,无法自动创建Topic

下载控制台

https://github.com/apache/rocketmq-externals 从GitHup下载源代码。找到rocketmq-console,先编辑一下rocketmq-console里面的application.properties文件,将项目使用的rocketmq.config.namesrvAddr配置上去。

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

使用maven打包这个项目。进入项目目录使用命令。mvn clean package -Dmaven.test.skip=true

可能时间有点长,等待结束后找到target目录找到jar,java -jar xxx.jar 启动。

浏览器输入 http://localhost:8080 即可进入。

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

生产者

导入jar包我这里没有使用Springboot整合,但其实使用的API是同一套。

      <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.2</version>
        </dependency>
View Code
package com.dfsn.cloud.consumer.mq.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.UUID;

public class m1 {
    public static void main(String[] args)throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("groupA");

        producer.setNamesrvAddr("10.0.98.76:9876");

        producer.start();

        Message message = new Message("topic1","tag1", UUID.randomUUID().toString(),"我是消息".getBytes());

        SendResult send = producer.send(message);

        System.out.println(send);

        producer.shutdown();
    }
}
View Code

以上代码设置nameserver的地址,然后启动。接着创建一个消息对象。参数分别为 Topic tag key 消息体。发送消息后可以得到一个返回体。从返回体中可以查看消息是否发送成功,最后关闭。

SendResult [sendStatus=SEND_OK, msgId=0A061F2B352018B4AAC245884DC30000, offsetMsgId=0A00624C00002A9F00000000000001D8, messageQueue=MessageQueue [topic=topic1, brokerName=sztpilp3appv02t, queueId=1], queueOffset=0]
View Code

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

指定任意队列个数

package com.dfsn.cloud.consumer.mq.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.UUID;

public class m1 {
    public static void main(String[] args)throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("groupA");

        producer.setNamesrvAddr("10.0.98.76:9876");

        producer.setDefaultTopicQueueNums(8);

        producer.start();

        Message message = new Message("topic2","tag2", UUID.randomUUID().toString(),"我是消息".getBytes());

        SendResult send = producer.send(message);

        System.out.println(send);

        producer.shutdown();
    }
}
View Code

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

消费者

package com.dfsn.cloud.consumer.mq.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.UUID;

public class m1 {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("groupA");

        producer.setNamesrvAddr("10.0.98.76:9876");

        producer.setDefaultTopicQueueNums(4);

        producer.start();

        for (int i = 0; i < 30; i++) {
            String m = ("我是消息" + i);
            Message message = new Message("topic2", "tag1", UUID.randomUUID().toString(), m.getBytes());
            SendResult send = producer.send(message);
            System.out.println(m + "====" + send);
        }

        producer.shutdown();
    }

}
View Code
package com.dfsn.cloud.consumer.mq.producer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.UUID;

public class m2 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupB");
        consumer.setNamesrvAddr("10.0.98.76:9876");
        consumer.subscribe("topic2", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt messageExt : msgs) {
                    System.out.println("消息:" + new String(messageExt.getBody()) + "--队列ID:" + messageExt.getQueueId() + "--消息ID:" + messageExt.getMsgId());
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });


        consumer.start();
    }


}
View Code

消费者同样的要设置nameServer的地址,同时要设置监听的Topic和tag,*代表任意类型的tag。最后我们需要给出一个处理消息的Listener。

该Listener的可选对象有两个。MessageListenerConcurrently或者MessageListenerOrderly。

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

diffTotal是未消费的消息数量。

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

MessageListenerConcurrently和MessageListenerOrderly的区别

package com.datang.study.elk.mq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class Provider {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg5");
        producer.setNamesrvAddr("192.168.31.78:9876");

        producer.setDefaultTopicQueueNums(1);

        producer.start();
        for (int i = 0; i < 10; i++) {

            String s = ("消息" + i);

            Message message = new Message("topic5", "tag", s.getBytes());

            producer.send(message);

            System.out.println("发送消息返回结果:" + s);
        }
        producer.shutdown();
    }
}
View Code
package com.datang.study.elk.mq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg5");
        consumer.setNamesrvAddr("192.168.31.78:9876");

        consumer.subscribe("topic5", "*");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                MessageQueue messageQueue = consumeOrderlyContext.getMessageQueue();
                int queueId = messageQueue.getQueueId();
                for (MessageExt messageExt : list) {
                    System.out.println(new String(messageExt.getBody()) + "--队列id:" + queueId);
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });


        consumer.start();
    }
}
View Code
package com.datang.study.elk.mq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class Provider2 {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg6");
        producer.setNamesrvAddr("192.168.31.78:9876");

        producer.setDefaultTopicQueueNums(1);

        producer.start();
        for (int i = 0; i < 10; i++) {

            String s = ("消息" + i);

            Message message = new Message("topic6", "tag", s.getBytes());

            producer.send(message);

            System.out.println("发送消息返回结果:" + s);
        }
        producer.shutdown();
    }
}
View Code
package com.datang.study.elk.mq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class Consumer2 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg6");
        consumer.setNamesrvAddr("192.168.31.78:9876");

        consumer.subscribe("topic6", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageQueue messageQueue = context.getMessageQueue();
                int queueId = messageQueue.getQueueId();
                for (MessageExt messageExt : msgs) {
                    System.out.println(new String(messageExt.getBody()) + "--队列id:" + queueId);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
}
View Code

上边4个代码片段两个为一组。在只有一个消息队列的情况下,注意!是一个消息队列的情况。MessageListenerOrderly是有序的消费,而MessageListenerOrderly

是无序的消费。这里的有序和无序说的是同一个队列的先进先出。

 RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

 RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

消息发送到哪个队列?

默认的一个Topic有四个队列。那一条新的消息会投放到哪一个队列呢?

package com.dfsn.cloud.consumer.mq3;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;
import java.util.UUID;

public class m1 {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg1");

        producer.setNamesrvAddr("10.0.98.76:9876");

        producer.start();

        Message message = new Message("t1", "tag", UUID.randomUUID().toString(), "我是消息".getBytes());

        SendResult send = producer.send(message, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                return null;
            }
        }, null);

        producer.shutdown();
    }
}
View Code

send()的第二个参数接收一个队列选择器,实现MessageQueueSelector接口即可。该接口需要覆盖select()方法,该方法的List类型参数是当前

所有队列的集合。我们可以根据一定的策略选择其中一个返回,那么当前消息必然会投放到这个队列。

package com.dfsn.cloud.consumer.mq3;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;
import java.util.UUID;

public class m1 {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg1");

        producer.setNamesrvAddr("10.0.98.76:9876");

        producer.start();

        Message message = new Message("t1", "tag", UUID.randomUUID().toString(), "我是消息".getBytes());

        for (int i = 0; i < 10; i++) {
            SendResult send = producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    System.out.println("总共有" + mqs.size() + "个队列");
                    //将消息全部投递到最后一个队列
                    return mqs.get(mqs.size() - 1);
                }
            }, null);
        }

        producer.shutdown();
    }
}
View Code
package com.dfsn.cloud.consumer.mq3;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class m2 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg1");
        consumer.setNamesrvAddr("10.0.98.76:9876");
        consumer.subscribe("t1", "tag");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt messageExt : msgs) {
                    System.out.println("消息:" + new String(messageExt.getBody()) + "--队列ID:" + messageExt.getQueueId());
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
    }


}
View Code

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

 RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

以上代码片段表示总共有4个队列,编号从0开始,显示消息全部都投递到了编号为3的队列,也就是最后一个队列。

RocketMq提供的队列选择器。

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

图上第一个是我们上边自己创建的匿名对象可以不管。

SelectMessageQueueByRandom 随机返回队列。这个最简单,队列的总数随机一个就可以了。

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

SelectMessageQueueByHash 算出arg的哈希值在获取绝对值。然后模于队列的总数。一个数模于另一个数,必然小于这个数。所以

最大值一定也是队列size-1。但是这个arg是个啥,还要再看源码。

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

SelectMessageQueueByMachineRoom 计算算法,这个算法没有实现。

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

如果我们不指定任何的投递策略,默认的RocketMQ采用随机递增取模算法。

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

三种发送模式

RocketMQ发送消息时可以指定三种发送模式,SYNC,ASYNC,ONEWAY。

package com.dfsn.cloud.consumer.mq5;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.UUID;

public class m1 {
    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("pg4");

        producer.setNamesrvAddr("10.0.98.76:9876");

        producer.start();

        Message message = new Message("t4", "tag", UUID.randomUUID().toString(), "我是消息".getBytes());

        producer.send(message);
        
        producer.shutdown();
    }
}
View Code

使用send(Message msg)代表SYNC,从源码可以看出,该模式的特点是,如果发送失败则会重复尝试三次发送。会有发送结果返回。

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

 RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

send(Message msg,SendCallback sendCallback) 代表ASYNC,此方法接收SendCallback对象,异步的接收发送结果。这个方法只会做一次失败重试发送。

package com.dfsn.cloud.consumer.mq6;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.UUID;

public class m1 {
    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("pg5");

        producer.setNamesrvAddr("10.0.98.76:9876");

        producer.start();

        Message message = new Message("t5", "tag", UUID.randomUUID().toString(), "我是消息".getBytes());

        producer.send(message, new SendCallback(){
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }

            @Override
            public void onException(Throwable e) {
                System.out.println(e);
            }
        } );


    }
}
View Code

sendOneway(Message msg);代表ONEWAY。这种发送方式失败也是只会重试一次,但无论最终没有返回值。也就是说发送端无法知晓消息是否发送成功。

package com.dfsn.cloud.consumer.mq7;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.UUID;

public class m1 {
    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("pg6");

        producer.setNamesrvAddr("10.0.98.76:9876");

        producer.start();

        Message message = new Message("t6", "tag", UUID.randomUUID().toString(), "我是消息".getBytes());

        producer.sendOneway(message);

        producer.shutdown();
    }
}
View Code

发送超时设置

如果服务器网络环境不好可以修改消息发送超时时间默认是3000毫秒。

package com.dfsn.cloud.consumer.t7;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.UUID;

public class m1 {
    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("pg7");

        producer.setNamesrvAddr("10.0.98.76:9876");
        
        producer.setSendMsgTimeout(3000);

        producer.start();

        Message message = new Message("t7", "tag", UUID.randomUUID().toString(), "我是消息".getBytes());

        SendResult send = producer.send(message);

        System.out.println(send);

        producer.shutdown();
    }
}
View Code

消费者设置拉取消息数量

消费者需要从队列中拉取未消费的消息,然后消费监听会消费消息。在Listener中有List<MessageExt>该参数是一个集合。

但默认的,消费者一次拉取一条消息,也就是这个集合中只有一个对象。

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

package com.dfsn.cloud.consumer.t12;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class m2 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg12");
        consumer.setNamesrvAddr("10.0.98.76:9876");
        consumer.subscribe("t12", "tag");

        consumer.setConsumeMessageBatchMaxSize(100);

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                System.out.println("--------------=====================-----------------");

                for (MessageExt messageExt : msgs) {

                    String msg = new String(messageExt.getBody());

                    System.out.println("消息:" + msg + "--队列ID:" + messageExt.getQueueId());

                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

            }
        });

        consumer.start();
    }


}
View Code

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

生产者生产了30个消息。 从结果看,前几次没有拉取到消息,但最后一次确实一次把消息全部拉取了。

MessageListenerOrderly的返回值

messageListenerOrderly的返回值有两个,SUCCESS SUSPEND_CURRENT_QUEUE_A_MOMENT 如果成功则返回SUCCESS该消息会标记消费成功。

如果失败则会标记SUSPEND_CURRENT_QUEUE_A_MOMENT。但经过测试,如果返回该值,这条消息会一直重试消费,没有次数限制。但我们

可以设置每次重试的间隔时间。DefaultMQPushConsumer.setSuspendCurrentQueueTimeMillis()和ConsumeOrderlyContext.setSuspendCurrentQueueTimeMillis()都有相同的效果。

这里一定要注意如果一条消息被重试消费,那该条消息所在的队列的后续消息,则不会被消费。RocketMQ队列里的消息始终是先进先出!

下图的队列跟代码不是同一个,但是代码是一样的。可以看出,当一个消息重试,队列中所有的消息都会阻塞。

package com.dfsn.cloud.consumer.t15;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class m2 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg15");
        consumer.setNamesrvAddr("10.0.98.76:9876");
        consumer.subscribe("t15", "tag");
        //失败情况下,会该设置为重试间隔
        consumer.setSuspendCurrentQueueTimeMillis(1000);

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                MessageExt messageExt = msgs.get(0);

                String msg = new String(messageExt.getBody());

                System.out.println("消息:" + msg + "--队列ID:" + messageExt.getQueueId());

                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
        });

        consumer.start();
    }


}
View Code

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

MessageListenerConcurrently的返回值

成功情况下返回 CONSUME_SUCCESS 失败情况返回 RECONSUME_LATER 同样的 RECONSUME_LATER 也会重试发送。

需要注意的是:RECONSUME_LATER 默认的重试16次,但这16次其中是有间隔的。1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

每一次重试都会递增时间,如果可以通过 ConsumeConcurrentlyContext.setDelayLevelWhenNextConsume(3) 设置重试间隔。注意这个3指的是 10s

最重要的是,在同一个队列中如果有重试的消息,不会影响其他消息的消费。因为失败的消息也算成功消费了。只不过会进入另一个队列,这个队列

是RocketMQ根据消费者组创建的死信队列。

package com.dfsn.cloud.consumer.t20;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.UUID;

public class m1 {
    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("pg20");

        producer.setNamesrvAddr("10.0.98.76:9876");


        producer.setDefaultTopicQueueNums(1);

        producer.start();


        Message message = new Message("t20", "tag", UUID.randomUUID().toString(), "1".getBytes());

        SendResult send = producer.send(message);

        System.out.println(send);


        producer.shutdown();
    }
}
View Code
package com.dfsn.cloud.consumer.t20;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class m2 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg20");
        consumer.setNamesrvAddr("10.0.98.76:9876");
        consumer.subscribe("t20", "tag");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                //1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
                context.setDelayLevelWhenNextConsume(1);

                MessageExt messageExt = msgs.get(0);

                String msg = new String(messageExt.getBody());

                System.out.println("消息:" + msg + "--队列ID:" + messageExt.getQueueId());

                if (msg.equals("05531")) {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                } else {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            }
        });

        consumer.start();
    }


}
View Code

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

 RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失 

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

集群消费

多个消费者,在同一个group组中。Topic队列会按照策略分配组内的消费者。各个消费者只消费各自队列的消息。

package com.dfsn.cloud.consumer.t22;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.UUID;

public class m1 {
    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("pg22");

        producer.setNamesrvAddr("10.0.98.76:9876");


        producer.start();

        for (int i = 0; i < 10; i++) {

            Message message = new Message("t22", "tag", UUID.randomUUID().toString(), ("消息" + i).getBytes());

            SendResult send = producer.send(message);

            System.out.println(send);

        }

        producer.shutdown();
    }
}
View Code
package com.dfsn.cloud.consumer.t22;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class m2 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg22");
        consumer.setNamesrvAddr("10.0.98.76:9876");
        consumer.subscribe("t22", "tag");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                MessageExt messageExt = msgs.get(0);

                String msg = new String(messageExt.getBody());

                System.out.println("消息:" + msg + "--队列ID:" + messageExt.getQueueId());

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
    }


}
View Code
package com.dfsn.cloud.consumer.t22;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class m3 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg22");
        consumer.setNamesrvAddr("10.0.98.76:9876");
        consumer.subscribe("t22", "tag");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                MessageExt messageExt = msgs.get(0);

                String msg = new String(messageExt.getBody());

                System.out.println("消息:" + msg + "--队列ID:" + messageExt.getQueueId());

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
    }


}
View Code

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

集群消费策略

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

在创建消费者时,可以通过 DefaultMQPushConsumer.setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy) 给出一个负载均衡消策略。

上图为RocketMQ已经实现的算法。当然我们也可以自己实现接口,自定义消费策略。

AllocateMessageQueueAveragely 平均分配算法 默认使用的算法。(PS:这个算法我没看懂。)

记结论:设队列为【0,1,2,3】消费者【a,b,c】则【a:0,1】【b:2】【c:3】。设队列为【0,1,2】消费者【a,b,c,d】则【a:0】【b:1】【c:2】【d:】。设队列为【0,1,2】消费者为【a,b,c】则【a:0】【b:0】【c:0】

/**
 * Average Hashing queue algorithm
 * 队列分配策略 - 平均分配
 * 如果 队列数 和 消费者数量 相除有余数时,余数按照顺序"1"个"1"个分配消费者。
 * 例如,5个队列,3个消费者时,分配如下:
 * - 消费者0:[0, 1] 2个队列
 * - 消费者1:[2, 3] 2个队列
 * - 消费者2:[4, 4] 1个队列
 *
 * 代码块 (mod > 0 && index < mod) 判断即在处理相除有余数的情况。
 */
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {

    private final Logger log = ClientLogger.getLog();

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
        // 校验参数是否正确
        if (currentCID == null || currentCID.length() < 1) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        List<MessageQueue> result = new ArrayList<>();
        if (!cidAll.contains(currentCID)) {
            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                consumerGroup,
                currentCID,
                cidAll);
            return result;
        }
        // 平均分配
        int index = cidAll.indexOf(currentCID); // 第几个consumer。
        int mod = mqAll.size() % cidAll.size(); // 余数,即多少消息队列无法平均分配。

        //队列总数 <= 消费者总数时,分配当前消费者1个队列
        //不能均分 &&  当前消费者序号(从0开始) < 余下的队列数 ,分配当前消费者 mqAll / cidAll +1 个队列
        //不能均分 &&  当前消费者序号(从0开始) >= 余下的队列数 ,分配当前消费者 mqAll / cidAll 个队列
        int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());

        int startIndex = (mod > 0 && index < mod) ? index * averageSize
            : index * averageSize + mod; // 有余数的情况下,[0, mod) 平分余数,即每consumer多分配一个节点;第index开始,跳过前mod余数。
        int range = Math.min(averageSize, mqAll.size() - startIndex); // 分配队列数量。之所以要Math.min()的原因是,mqAll.size() <= cidAll.size(),部分consumer分配不到消费队列。
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }

    @Override
    public String getName() {
        return "AVG";
    }
}
View Code

AllocateMessageQueueAveragelyByCircle 环形平均分配算法,这个好算,就是转圈分配。

记结论:设队列为【0,1,2,3】消费者【a,b,c】则【a:0,3】【b:1】【c:2】。设队列为【0,1,2】消费者【a,b,c,d】则【a:0】【b:1】【c:2】【d:】。。设队列为【0,1,2】消费者为【a,b,c】则【a:0】【b:0】【c:0】

/**
 * Cycle average Hashing queue algorithm
 * 队列分配策略 - 环状分配
 */
public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
    private final Logger log = ClientLogger.getLog();

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        // 校验参数是否正确
        if (currentCID == null || currentCID.length() < 1) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!cidAll.contains(currentCID)) {
            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                consumerGroup,
                currentCID,
                cidAll);
            return result;
        }

        // 环状分配
        int index = cidAll.indexOf(currentCID);
        for (int i = index; i < mqAll.size(); i++) {
            if (i % cidAll.size() == index) {
                result.add(mqAll.get(i));
            }
        }
        return result;
    }

    @Override
    public String getName() {
        return "AVG_BY_CIRCLE";
    }
}
View Code

自定义消费策略

以下代码片段定义了两个消费者,第一个固定的消费第一个队列的消息,第二个固定消费第二个队列的消息。这里建议大家,不要自定义消费策略,同一个消费组

也不要设置不同的消费策略,否则可能会出现有的队列没有没有消费,有的队列被多个消费者监听消费。

package com.dfsn.cloud.consumer.t24;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.UUID;

public class m1 {
    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("pg24");

        producer.setNamesrvAddr("10.0.98.76:9876");

        producer.setDefaultTopicQueueNums(2);

        producer.start();

        for (int i = 0; i < 10; i++) {

            Message message = new Message("t24", "tag", UUID.randomUUID().toString(), ("消息" + i).getBytes());

            SendResult send = producer.send(message);

            System.out.println(send);

        }

        producer.shutdown();
    }
}
View Code
package com.dfsn.cloud.consumer.t24;

import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.ArrayList;
import java.util.List;

public class m3 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg24");
        consumer.setNamesrvAddr("10.0.98.76:9876");
        consumer.subscribe("t24", "tag");
        consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueStrategy(){

            @Override
            public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
                ArrayList arrayList = new ArrayList();
                arrayList.add(mqAll.get(0));
                return arrayList;
            }

            @Override
            public String getName() {
                return "a";
            }
        });

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                MessageExt messageExt = msgs.get(0);

                String msg = new String(messageExt.getBody());

                System.out.println("消息:" + msg + "--队列ID:" + messageExt.getQueueId());

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
    }


}
View Code
package com.dfsn.cloud.consumer.t24;

import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.ArrayList;
import java.util.List;

public class m2 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg24");
        consumer.setNamesrvAddr("10.0.98.76:9876");
        consumer.subscribe("t24", "tag");
        consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueStrategy(){

            @Override
            public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
                ArrayList arrayList = new ArrayList();
                arrayList.add(mqAll.get(1));
                return arrayList;
            }

            @Override
            public String getName() {
                return "a";
            }
        });
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                MessageExt messageExt = msgs.get(0);

                String msg = new String(messageExt.getBody());

                System.out.println("消息:" + msg + "--队列ID:" + messageExt.getQueueId());

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
    }


}
View Code

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

广播消费

默认的RocketMQ使用集群消费,也就是同一个组的消费者,不会重复消费消息。但RocketMQ也支持广播消费。(无效,不知道为啥)

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

package com.dfsn.cloud.consumer.t29;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.UUID;

public class m1 {
    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("pg29");

        producer.setNamesrvAddr("10.0.98.76:9876");

        producer.start();

        for (int i = 0; i < 10; i++) {

            Message message = new Message("t29", "tag", UUID.randomUUID().toString(), ("消息" + i).getBytes());

            SendResult send = producer.send(message);

            System.out.println(send);

        }

        producer.shutdown();
    }
}
View Code
package com.dfsn.cloud.consumer.t29;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

public class m2 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg29");
        consumer.setNamesrvAddr("10.0.98.76:9876");
        consumer.subscribe("t29", "tag");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                MessageExt messageExt = msgs.get(0);

                String msg = new String(messageExt.getBody());

                System.out.println("消息:" + msg + "--队列ID:" + messageExt.getQueueId());

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
    }


}
View Code
package com.dfsn.cloud.consumer.t29;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

public class m3 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg29");
        consumer.setNamesrvAddr("10.0.98.76:9876");
        consumer.subscribe("t29", "tag");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                MessageExt messageExt = msgs.get(0);

                String msg = new String(messageExt.getBody());

                System.out.println("消息:" + msg + "--队列ID:" + messageExt.getQueueId());

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
    }


}
View Code

两个消费者组,模拟广播消费

两个消费者组同时订阅同一个Topic,这种方式下,每一个消费者组同样可以有多个消费者负载队列。下面代码示例

AB两个组分别有一个消费者,则每个消费者分配四个队列。从这个结果也可以得出,同一个队列里的消息,不会因为

在一个组被消费就没有了,消息的消费机制完全存在于消费者,队列只负责存储,至于消息是否消费成功,和下次消费

的位点偏移量于队列本身无关。

package com.dfsn.cloud.consumer.t31;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.UUID;

public class m1 {
    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("pg31");

        producer.setNamesrvAddr("10.0.98.76:9876");

        producer.start();

        for (int i = 0; i < 10; i++) {

            Message message = new Message("t31", "tag", UUID.randomUUID().toString(), ("消息" + i).getBytes());

            SendResult send = producer.send(message);

            System.out.println(send);

        }

        producer.shutdown();
    }
}
View Code
package com.dfsn.cloud.consumer.t31;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

public class m2 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg31-a");
        consumer.setNamesrvAddr("10.0.98.76:9876");
        consumer.subscribe("t31", "tag");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                MessageExt messageExt = msgs.get(0);

                String msg = new String(messageExt.getBody());

                System.out.println("消息:" + msg + "--队列ID:" + messageExt.getQueueId());

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
    }


}
View Code
package com.dfsn.cloud.consumer.t31;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

public class m3 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg31-b");
        consumer.setNamesrvAddr("10.0.98.76:9876");
        consumer.subscribe("t31", "tag");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                MessageExt messageExt = msgs.get(0);

                String msg = new String(messageExt.getBody());

                System.out.println("消息:" + msg + "--队列ID:" + messageExt.getQueueId());

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
    }


}
View Code

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

同一个消费者组监听不同的Topic会发生什么?

以下代码片段有两个主题1topic,2topic。一个消费者组gc1,但是改组有两个消费者,分别订阅1topic,2topic。最终执行结果发现。1topic和2topic的消息并没有完全被消费。

出现这样的原因不奇怪。gc1组内有两个消费者,其中一个订阅了1topic,发现自己所在组内还有其他消费者,所以根据平均分配算法,它只分到了0,1两个队列,剩下的分给了

另一个消费者,但另一个消费者并没有订阅1topic所以就出现了部分消息没有被消费。

package com.dfsn.cloud.consumer.f1;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.UUID;

public class p1 {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("gp1");

        producer.setNamesrvAddr("10.0.98.76:9876");

        producer.start();

        for (int i = 0; i < 10; i++) {
            Message message = new Message("1topic", "tag", UUID.randomUUID().toString(), ("1topic" + i).getBytes());
            SendResult send = producer.send(message);
            System.out.println(send);
        }

        producer.shutdown();
    }

}
View Code
package com.dfsn.cloud.consumer.f1;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.UUID;

public class p2 {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("gp2");

        producer.setNamesrvAddr("10.0.98.76:9876");

        producer.start();

        for (int i = 0; i < 10; i++) {
            Message message = new Message("2topic", "tag", UUID.randomUUID().toString(), ("2topic" + i).getBytes());
            SendResult send = producer.send(message);
            System.out.println(send);
        }

        producer.shutdown();
    }

}
View Code
package com.dfsn.cloud.consumer.f1;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class c1 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("gc1");
        consumer.setNamesrvAddr("10.0.98.76:9876");
        consumer.subscribe("1topic", "tag");
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt messageExt : msgs) {
                    System.out.println("消息:" + new String(messageExt.getBody()) + "--队列ID:" + messageExt.getQueueId());
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });


        consumer.start();
    }


}
View Code
package com.dfsn.cloud.consumer.f1;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class c2 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("gc1");
        consumer.setNamesrvAddr("10.0.98.76:9876");
        consumer.subscribe("2topic", "tag");
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt messageExt : msgs) {
                    System.out.println("消息:" + new String(messageExt.getBody()) + "--队列ID:" + messageExt.getQueueId());
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });


        consumer.start();
    }


}
View Code

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

 RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

集群搭建双master,没有salve

顾名思义,我需要搭建两个rocketmq,并且两个都是master,salve。这意味着两个master之间负载均衡,但是没有备份节点。

和单机搭建不同的就是需要在配置文件配置。你可以自己创建一个文件,或者直接修改 config/broker.conf 这里贴出来

A,B两个master的配置。

#集群IP
namesrvAddr=10.0.98.76:9876;10.0.98.76:9877
#集群名称
brokerClusterName=testbroker
#主机名称
brokerName=broker1
#0是master非0是slave
brokerId=0
#同步模式,指的是同步给salve ASYNC_FLUSH异步,SYNC_FLUSH同步
brokerRole=ASYNC_MASTER
#刷盘模式 ASYNC_FLUSH异步,SYNC_FLUSH同步
flushDiskType=ASYNC_FLUSH
#允许自动创建topic
autoCreateTopicEnable=true
#Broker 对外服务的监听端口,10911为默认值
listenPort=10911

#存储路径    
storePathRootDir=/tpsys/elk/rocketmq/rocketmq1/data
#commitLog存储路径 
storePathCommitLog=/tpsys/elk/rocketmq/rocketmq1/data/commitlog
#消费队列存储路径    
storePathConsumeQueue=/tpsys/elk/rocketmq/rocketmq1/data/consumequeue
#消息索引存储路径
storePathIndex=/tpsys/elk/rocketmq/rocketmq1/data/index
#checkpoint 文件存储路径
storeCheckPoint=/tpsys/elk/rocketmq/rocketmq1/data/checkpoint
#abort 文件存储路径
abortFile=/tpsys/elk/rocketmq/rocketmq1/data/abort

 
View Code
#集群IP
namesrvAddr=10.0.98.76:9876;10.0.98.76:9877
#集群名称
brokerClusterName=testbroker
#主机名称
brokerName=broker2
#0是master非0是slave
brokerId=0
#同步模式,指的是同步给salve ASYNC_FLUSH异步,SYNC_FLUSH同步
brokerRole=ASYNC_MASTER
#刷盘模式 ASYNC_FLUSH异步,SYNC_FLUSH同步
flushDiskType=ASYNC_FLUSH
#允许自动创建topic
autoCreateTopicEnable=true
#Broker 对外服务的监听端口,10911为默认值
listenPort=10811

#存储路径    
storePathRootDir=/tpsys/elk/rocketmq/rocketmq2/data
#commitLog存储路径 
storePathCommitLog=/tpsys/elk/rocketmq/rocketmq2/data/commitlog
#消费队列存储路径    
storePathConsumeQueue=/tpsys/elk/rocketmq/rocketmq2/data/consumequeue
#消息索引存储路径
storePathIndex=/tpsys/elk/rocketmq/rocketmq2/data/index
#checkpoint 文件存储路径
storeCheckPoint=/tpsys/elk/rocketmq/rocketmq2/data/checkpoint
#abort 文件存储路径
abortFile=/tpsys/elk/rocketmq/rocketmq2/data/abort

 
View Code

因为我是在同一个服务器搭建的集群,所以在启动 namesrv必须指定一个配置文件,配置文件中指定了namesrv的端口

listenPort=9877

启动 namesrv ./mqnamesrv -c config/namesrv.properties

启动 broker ./mqbroker -c config/broker.conf

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

以下是代码片段,可以看出一个topic其实是创建了8个队列,默认一个master创建4个。

package com.dfsn.cloud.consumer.f1;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;
import java.util.UUID;

public class p1 {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg1");

        producer.setNamesrvAddr("10.0.98.76:9876;10.0.98.76:9877");

        producer.start();

        for (int i = 0; i < 100; i++) {
            Message message = new Message("topic1", "tag", UUID.randomUUID().toString(), ("消息" + i).getBytes());
            SendResult send = producer.send(message);
            System.out.println(send);
        }

        producer.shutdown();
    }

}
View Code
package com.dfsn.cloud.consumer.f1;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class c1 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg1");
        consumer.setNamesrvAddr("10.0.98.76:9876;10.0.98.76:9877");
        consumer.subscribe("topic1", "tag");
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt messageExt : msgs) {
                    System.out.println("消息:" + new String(messageExt.getBody()) + "--队列ID:" + messageExt.getQueueId());
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });


        consumer.start();
    }


}
View Code

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

集群搭建双master,双salve

以上集群只有两个master,虽然可以做到负载均衡,但如果其中一个master挂掉了,那存在于该master里的队列也就不可用了

会造成消息丢失。所以保险的方案是给每个master搭配一个salve。salve的配置只有少许不同,下边直接贴出来。

#集群IP
namesrvAddr=10.0.98.76:9876;10.0.98.76:9877
#集群名称
brokerClusterName=testbroker
#主机名称
brokerName=broker1
#0是master非0是slave
brokerId=1
#同步模式,指的是同步给salve ASYNC_FLUSH异步,SYNC_FLUSH同步
brokerRole=SLAVE
#刷盘模式 ASYNC_FLUSH异步,SYNC_FLUSH同步
flushDiskType=ASYNC_FLUSH
#允许自动创建topic
autoCreateTopicEnable=true
#Broker 对外服务的监听端口,10911为默认值
listenPort=10711

#存储路径    
storePathRootDir=/tpsys/elk/rocketmq/rocketmq3/data
#commitLog存储路径 
storePathCommitLog=/tpsys/elk/rocketmq/rocketmq3/data/commitlog
#消费队列存储路径    
storePathConsumeQueue=/tpsys/elk/rocketmq/rocketmq3/data/consumequeue
#消息索引存储路径
storePathIndex=/tpsys/elk/rocketmq/rocketmq3/data/index
#checkpoint 文件存储路径
storeCheckPoint=/tpsys/elk/rocketmq/rocketmq3/data/checkpoint
#abort 文件存储路径
abortFile=/tpsys/elk/rocketmq/rocketmq3/data/abort

 
View Code
#集群IP
namesrvAddr=10.0.98.76:9876;10.0.98.76:9877
#集群名称
brokerClusterName=testbroker
#主机名称
brokerName=broker2
#0是master非0是slave
brokerId=1
#同步模式,指的是同步给salve ASYNC_FLUSH异步,SYNC_FLUSH同步
brokerRole=SLAVE
#刷盘模式 ASYNC_FLUSH异步,SYNC_FLUSH同步
flushDiskType=ASYNC_FLUSH
#允许自动创建topic
autoCreateTopicEnable=true
#Broker 对外服务的监听端口,10911为默认值
listenPort=10611

#存储路径    
storePathRootDir=/tpsys/elk/rocketmq/rocketmq4/data
#commitLog存储路径 
storePathCommitLog=/tpsys/elk/rocketmq/rocketmq4/data/commitlog
#消费队列存储路径    
storePathConsumeQueue=/tpsys/elk/rocketmq/rocketmq4/data/consumequeue
#消息索引存储路径
storePathIndex=/tpsys/elk/rocketmq/rocketmq4/data/index
#checkpoint 文件存储路径
storeCheckPoint=/tpsys/elk/rocketmq/rocketmq4/data/checkpoint
#abort 文件存储路径
abortFile=/tpsys/elk/rocketmq/rocketmq4/data/abort

 
View Code

这里需要注意brokerName,你这个salve是哪个master的备机就要和master的名称相同。brokerId就不用指定0了,brokerRole因为是slave不需要同步数据。

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

salve节点只能读不能写

当有新的消息产生后,master节点会把消息同步到salve节点备份,但是当master节点宕机后,salve不会顶替成为master,它只能用来消费已有的信息

不能接收新产生的信息,也就是只读。

生产100条消息两个broker都有,此时关闭其中一个broker master 只保留salve

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

消费情况

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

重启master,消费记录已经同步

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

再次测试关闭master2,生产消息,可以看出消息全都投递到了broker1中,但是不影响消费。

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

 RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

实现顺序消息

思考一个场景,在生产环境中。我们有多个生产者,多个消费者构建的集群环境。当前有个业务需求

在一个商城系统中。有创建订单,支付订单,支付完毕,商品出库四个步骤。每个步骤完成后都会发送

消息到队列,以处理对应的记录操作。

package com.dfsn.cloud.consumer.f6;

import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.UUID;

public class p1 {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg7");

        producer.setNamesrvAddr("10.0.98.76:9876;10.0.98.76:9877");

        producer.start();


        for (int i = 0; i < 3; i++) {
            Order order = new Order();
            order.setOrderId(i + 1);
            order.setGoodsName("商品" + i);
            order.setLink("创建订单");

            String json = JSONObject.toJSONString(order);

            Message message = new Message("topic7", "tag", UUID.randomUUID().toString(), json.getBytes());
            SendResult send = producer.send(message);
            System.out.println(send);
        }

        for (int i = 0; i < 3; i++) {
            Order order = new Order();
            order.setOrderId(i + 1);
            order.setGoodsName("商品" + i);
            order.setLink("支付订单");

            String json = JSONObject.toJSONString(order);

            Message message = new Message("topic7", "tag", UUID.randomUUID().toString(), json.getBytes());
            SendResult send = producer.send(message);
            System.out.println(send);
        }

        for (int i = 0; i < 3; i++) {
            Order order = new Order();
            order.setOrderId(i + 1);
            order.setGoodsName("商品" + i);
            order.setLink("支付完毕");

            String json = JSONObject.toJSONString(order);

            Message message = new Message("topic7", "tag", UUID.randomUUID().toString(), json.getBytes());
            SendResult send = producer.send(message);
            System.out.println(send);
        }

        for (int i = 0; i < 3; i++) {
            Order order = new Order();
            order.setOrderId(i + 1);
            order.setGoodsName("商品" + i);
            order.setLink("商品出库");

            String json = JSONObject.toJSONString(order);

            Message message = new Message("topic7", "tag", UUID.randomUUID().toString(), json.getBytes());
            SendResult send = producer.send(message);
            System.out.println(send);
        }

        producer.shutdown();
    }

}
View Code
package com.dfsn.cloud.consumer.f6;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;

public class c1 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg7");
        consumer.setNamesrvAddr("10.0.98.76:9876;10.0.98.76:9877");
        consumer.subscribe("topic7", "tag");
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt messageExt : msgs) {
                    LocalDateTime localDateTime = LocalDateTime.now();
                    DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:SSS");

                    String strDate2 = dtf2.format(localDateTime);

                    System.out.println(strDate2 + "消费者1---消息:" + new String(messageExt.getBody()) + "--队列ID:" + messageExt.getQueueId());
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });


        consumer.start();
    }


}
View Code

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

以上打印结果,只看商品1,执行顺序是:支付完毕,创建订单,支付订单,商品出口。显然是有问题的。

思考问题的出现。当前消费者只有一个,那分配给它的队列就是全部。而订单有四个步骤,每个步骤投递到的队列都不同。这样就

造成了消息消费顺序错乱。解决方案是:1 同一个订单投递到同一个队列。2 消费者必须使用 MessageListenerOrderly 因为它可以

保证同一个队列中的消费顺序是先进先出的。消息投递到一个队列可以使用订单的id%队列数量,同一个id的%出来的结果肯定是一样的。

package com.dfsn.cloud.consumer.f6;

import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;
import java.util.UUID;

public class p1 {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg7");

        producer.setNamesrvAddr("10.0.98.76:9876;10.0.98.76:9877");

        producer.start();


        MessageQueueSelector messageQueueSelector = new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                System.out.println(mqs.size());
                String json = new String(msg.getBody());
                Order order = JSONObject.parseObject(json, Order.class);
                return mqs.get(order.getOrderId()%mqs.size());
            }
        };

        for (int i = 0; i < 3; i++) {
            Order order = new Order();
            order.setOrderId(i + 1);
            order.setGoodsName("商品" + i);
            order.setLink("创建订单");

            String json = JSONObject.toJSONString(order);

            Message message = new Message("topic7", "tag", UUID.randomUUID().toString(), json.getBytes());
            SendResult send = producer.send(message,messageQueueSelector ,null);
            System.out.println(send);
        }

        for (int i = 0; i < 3; i++) {
            Order order = new Order();
            order.setOrderId(i + 1);
            order.setGoodsName("商品" + i);
            order.setLink("支付订单");

            String json = JSONObject.toJSONString(order);

            Message message = new Message("topic7", "tag", UUID.randomUUID().toString(), json.getBytes());
            SendResult send = producer.send(message,messageQueueSelector ,null);
            System.out.println(send);
        }

        for (int i = 0; i < 3; i++) {
            Order order = new Order();
            order.setOrderId(i + 1);
            order.setGoodsName("商品" + i);
            order.setLink("支付完毕");

            String json = JSONObject.toJSONString(order);

            Message message = new Message("topic7", "tag", UUID.randomUUID().toString(), json.getBytes());
            SendResult send = producer.send(message,messageQueueSelector ,null);
            System.out.println(send);
        }

        for (int i = 0; i < 3; i++) {
            Order order = new Order();
            order.setOrderId(i + 1);
            order.setGoodsName("商品" + i);
            order.setLink("商品出库");

            String json = JSONObject.toJSONString(order);

            Message message = new Message("topic7", "tag", UUID.randomUUID().toString(), json.getBytes());
            SendResult send = producer.send(message,messageQueueSelector ,null);
            System.out.println(send);
        }

        producer.shutdown();
    }

}
View Code
package com.dfsn.cloud.consumer.f6;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;

public class c1 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg7");
        consumer.setNamesrvAddr("10.0.98.76:9876;10.0.98.76:9877");
        consumer.subscribe("topic7", "tag");
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt messageExt : msgs) {
                    LocalDateTime localDateTime = LocalDateTime.now();
                    DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:SSS");

                    String strDate2 = dtf2.format(localDateTime);

                    System.out.println(strDate2 + "消费者1---消息:" + new String(messageExt.getBody()) + "--队列ID:" + messageExt.getQueueId());
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });


        consumer.start();
    }


}
View Code

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

消费者如果有异常,不会抛出!!!

如果消费者是 MessageListenerOrderly 内部抛出异常,该消息会一直尝试重试。

如果消息者是 MessageListenerConcurrently 内部抛出异常,该消息算被消费,而且死信队列中不会有该消息。

package com.dfsn.cloud.consumer.f1;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;

public class c1 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg8");
        consumer.setNamesrvAddr("10.0.98.76:9876;10.0.98.76:9877");
        consumer.subscribe("topic8", "tag");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt messageExt = msgs.get(0);

                System.out.println("--消息:" + new String(messageExt.getBody()) + "--队列ID:" + messageExt.getQueueId());

                int i = 1 / 0;

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }

        });


        consumer.start();
    }


}
View Code

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

package com.dfsn.cloud.consumer.f1;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class c2 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg8");
        consumer.setNamesrvAddr("10.0.98.76:9876;10.0.98.76:9877");
        consumer.subscribe("topic8", "tag");
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {

                MessageExt messageExt = msgs.get(0);

                System.out.println("--消息:" + new String(messageExt.getBody()) + "--队列ID:" + messageExt.getQueueId());

                int i = 1 / 0;


                return ConsumeOrderlyStatus.SUCCESS;
            }


        });


        consumer.start();
    }


}
View Code

RocketMQ从入门到放弃
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读,不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失
单机搭建
下载控制台
生产者
指定任意队列个数
消费者
MessageListenerConcurrently和MessageListenerOrderly的区别
消息发送到哪个队列?
RocketMq提供的队列选择器。
三种发送模式
发送超时设置
消费者设置拉取消息数量
MessageListenerOrderly的返回值
MessageListenerConcurrently的返回值
集群消费
集群消费策略
自定义消费策略
广播消费
两个消费者组,模拟广播消费
同一个消费者组监听不同的Topic会发生什么?
集群搭建双master,没有salve
集群搭建双master,双salve
salve节点只能读不能写
实现顺序消息
消费者如果有异常,不会抛出!!!
延迟消息
消息去重
保证消息的绝对不丢失

延迟消息

在某些业务场景下,消息发送后,需要延迟一定时间消费。例如订单已经生成,库存也减了

但如果用户迟迟不肯结算,就要算订单过期,库存也要加回来。要实现这种效果很简单,生产者

代码加配置就好。默认的延迟级别分别是 :1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 

如果需要自定义,可以在broker配置文件中新增属性:messageDelayLevel

package com.datang.study.elk.mq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Provider {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg1");
        producer.setNamesrvAddr("192.168.31.77:9876;192.168.31.77:9877");

        producer.start();

        Message message = new Message("topic1", "tag", "我是延迟消息".getBytes());

        message.setDelayTimeLevel(2);

        SendResult send = producer.send(message);
        System.out.println(send);


        producer.shutdown();
    }
}
View Code

消息去重

思考一下,什么情况下。消息会重复?还记得消息生产者有重发机制吗?如果消息发送

模式是同步,默认失败重发三次。如果因为网络抖动,消息已经发送出去了,但是因为

延迟被判定为发送失败,再次重发,此时就会出现消息重复。消息重复分为单机消息重复

和集群消息重复。

如果消费者只有一个,最简单的办法,在消息生成时指定唯一的key或者根据消息内容的唯一

性。直接到mysql中查,如果查不到则表明是不重复的,并把该消息存起来,如果下次发现有

则表示重复,就要舍掉这条消息,不做处理。当然,放到redis或者其他db也是可以的。

如果消费者有两个或者两个以上,这个方法就不好使了。例如现在有4个队列,1,2被A消费这

订阅,3,4被B消费者订阅。两条重复的消息刚好在1,3中,则A,B两个队列并发去mysql查,肯定

都是查不到的,那么该消息重复吗?这是有问题的,所以这里要保证同时只有一个队列能查,需要

用到分布式锁。简单说就是A去查询时,B不能查,实现分布式锁的方法有很多,这里给一个之前的

博客有兴趣的可以看一下 https://www.cnblogs.com/zumengjie/p/12187669.html

保证消息的绝对不丢失

什么情况下,消息会丢失?消息发出去,并且得到了发送成功,但是落盘时broker挂了(硬盘烧了),也没有

同步到salve。如何应对呢?在broker的配置中有一下两个,刷盘指定是broker接收到了消息,是否是落盘后

在反馈成功?如果要保证master宕机,重启后消息不丢失,就要选择同步刷盘。只有刷盘后,才会返回成功。

但是此时依然有风险,如果刷盘了,但是没有同步salve,硬盘烧了,那也凉凉。所以可以再指定同步更新

salve。这样就万无一失了,除非salve硬盘也烧了。一般来说,保证消息不丢失,设置成同步刷盘,异步更新

#刷盘模式 ASYNC_FLUSH异步,SYNC_FLUSH同步
flushDiskType=ASYNC_FLUSH

#同步模式,指的是同步给salve ASYNC_FLUSH异步,SYNC_FLUSH同步
brokerRole=ASYNC_MASTER