RocketMQ基础:MQ简介,环境搭建,RocketMQ消息 知识点梳理 课堂讲义 1 概述 2.MQ 的作用 3.MQ优缺点分析 4.常见产品 5. RocketMQ入门 6.消息发送

RocketMQ基础:MQ简介,环境搭建,RocketMQ消息
知识点梳理
课堂讲义
1 概述
2.MQ 的作用
3.MQ优缺点分析
4.常见产品
5. RocketMQ入门
6.消息发送

课堂讲义


1 概述

MQ(Message Queue)消息队列,是一种用来保存消息数据的队列

队列:数据结构的一种,特征为 “先进先出”

MQ的优势

RocketMQ基础:MQ简介,环境搭建,RocketMQ消息
知识点梳理
课堂讲义
1 概述
2.MQ 的作用
3.MQ优缺点分析
4.常见产品
5. RocketMQ入门
6.消息发送

RocketMQ基础:MQ简介,环境搭建,RocketMQ消息
知识点梳理
课堂讲义
1 概述
2.MQ 的作用
3.MQ优缺点分析
4.常见产品
5. RocketMQ入门
6.消息发送

2.MQ 的作用

  • 应用解耦(异步发送消息 )

    RocketMQ基础:MQ简介,环境搭建,RocketMQ消息
知识点梳理
课堂讲义
1 概述
2.MQ 的作用
3.MQ优缺点分析
4.常见产品
5. RocketMQ入门
6.消息发送

  • 快速应用变更维护

    RocketMQ基础:MQ简介,环境搭建,RocketMQ消息
知识点梳理
课堂讲义
1 概述
2.MQ 的作用
3.MQ优缺点分析
4.常见产品
5. RocketMQ入门
6.消息发送

  • 流量削锋,比如双十一秒杀商品,如果所有秒杀请求都发送到MySQL数据库,并发太大会导致宕机;因此选择先将秒杀请求做成消息存储到MQ,然后处理订单的服务器B再去消费消息进行订单处理

    RocketMQ基础:MQ简介,环境搭建,RocketMQ消息
知识点梳理
课堂讲义
1 概述
2.MQ 的作用
3.MQ优缺点分析
4.常见产品
5. RocketMQ入门
6.消息发送

3.MQ优缺点分析

优点(作用):

  • 应用解耦

  • 快速应用变更维护

  • 流量削锋

缺点:

  • 系统可用性降低:使用集群解决

  • 系统复杂度提高:程序员提升水平解决

  • 异步消息机制

    • 消息顺序性

    • 消息丢失

    • 消息一致性

    • 消息重复消费

4.常见产品

ActiveMQ:java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高
RabbitMQ :erlang语言实现,万级数据吞吐量,处理速度us级,主从架构,
RocketMQ :java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,扩展性强
kafka :scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据领域较多

RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的*项目。并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的双十一,RocketMQ流转的消息量达到了万亿级,峰值TPS达到5600万)

 

5. RocketMQ入门

5.1 基础概念

1 消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。

2 主题(Topic)

消息主题,通过 Topic 对不同的业务消息进行分类。

3 标签(Tag)

消息标签,用来进一步区分某个 Topic 下的消息分类

RocketMQ基础:MQ简介,环境搭建,RocketMQ消息
知识点梳理
课堂讲义
1 概述
2.MQ 的作用
3.MQ优缺点分析
4.常见产品
5. RocketMQ入门
6.消息发送

Topic 与 Tag 都是业务上用来归类的标识,区分在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。

比如淘宝订单消息和物流消息使用不同的 Topic 进行区分

而同样是订单消息:电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分

4 消息生产者(Producer)

负责生产消息,由业务系统负责生产。一个消息生产者会把业务应用系统里产生的消息发送到消息服务器

5 消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

6 代理服务器(Broker Server)

消息中转角色,负责存储消息、转发消息。负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

7 名字服务(Name Server)

名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。

8 拉取式消费(Pull Consumer)

Consumer消费的一种类型,应用主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。

9 推动式消费(Push Consumer)

Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

RocketMQ基础:MQ简介,环境搭建,RocketMQ消息
知识点梳理
课堂讲义
1 概述
2.MQ 的作用
3.MQ优缺点分析
4.常见产品
5. RocketMQ入门
6.消息发送

5.2 JDK

1)解压 jdk
tar -zxvf jdk-8u171-linux-x64.tar.gz
2)配置环境变量
>vim /etc/profile
export JAVA_HOME=/opt/jdk1.8.0_171
export PATH=$PATH:${JAVA_HOME}/bin
3)重新加载配置
>source /etc/profile
>java -version

错误解决

如果安装完毕 jdk 后  java -version 看到的是 openjdk(需要删除)
因为 操作系统默认已经安装了 opendjdk,
# 查看
rpm -qa | grep java
# 删除(把上一个命令看到的所有的jdk文件 用 如下命令删除)
rpm -e --nodeps java-1.8.0-openjdk-1.8.0.232.b09-0.el7_7.x86_64
rpm -e --nodeps java-1.8.0-openjdk-headless-1.8.0.232.b09-0.el7_7.x86_64
rpm -e --nodeps java-1.7.0-openjdk-headless-1.7.0.241-2.6.20.0.el7_7.x86_64
rmp -e --nodeps java-1.7.0-openjdk-1.7.0.241-2.6.20.0.el7_7.x86_64
rpm -e --nodeps java-1.7.0-openjdk-1.7.0.241-2.6.20.0.el7_7.x86_64

RocketMQ基础:MQ简介,环境搭建,RocketMQ消息
知识点梳理
课堂讲义
1 概述
2.MQ 的作用
3.MQ优缺点分析
4.常见产品
5. RocketMQ入门
6.消息发送

5.3 RocketMQ

# 解压
unzip rocketmq-all-4.5.2-bin-release.zip
# 修改目录名称
mv rocketmq-all-4.5.2-bin-release rocketmq
cd bin
# 调整启动内存为256m或128m
runserver.sh
runbroker.sh
tools.sh

RocketMQ基础:MQ简介,环境搭建,RocketMQ消息
知识点梳理
课堂讲义
1 概述
2.MQ 的作用
3.MQ优缺点分析
4.常见产品
5. RocketMQ入门
6.消息发送

conf/broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUS
# 解决和docker 冲突的
brokerIP1=192.168.52.128
namesrvAddr=192.168.52.128:9876

5.3.1 启动

cd /opt/rocketmq/bin
​
#启动nameserv
sh mqnamesrv
#nameserv默认端口:9876
​
#在虚拟机中后台启动
nohup sh mqnamesrv &
​
# 启动mq  服务  -n 指定 nameserv 的地址(bin)
sh mqbroker -n localhost:9876  -c ../conf/broker.conf
#broker默认端口:10911
​
#在虚拟机中后台启动
nohup sh mqbroker -n localhost:9876  -c ../conf/broker.conf &
#关闭
systemctl stop firewalld.service 
#禁用开机启动
systemctl disable firewalld.service 

5.3.2 测试

测试RocketMQ是否启动成功:

export NAMESRV_ADDR=localhost:9876
​
#切换到bin 目录
cd /opt/rocketmq/bin
​
#执行测试
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

6.消息发送

分解消费发送详细步骤:

1.谁来发?

2.发给谁?

3.怎么发?

4.发什么?

5.发的结果是什么?

6.打扫战场

6.1 消息发送

单生产者单消费者(OneToOne)

  1. 环境搭建

    <dependency>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-client</artifactId>
         <version>4.5.2</version>
    </dependency>
  2. 发送消息,消息格式如下所示:

RocketMQ基础:MQ简介,环境搭建,RocketMQ消息
知识点梳理
课堂讲义
1 概述
2.MQ 的作用
3.MQ优缺点分析
4.常见产品
5. RocketMQ入门
6.消息发送

public static void main(String[] args) throws Exception {
    //1.创建一个发送消息的对象Producer
    DefaultMQProducer producer = new DefaultMQProducer("group1");
    //2.设定发送的命名服务器地址
    producer.setNamesrvAddr("192.168.52.128:9876");
    //3.1启动发送的服务
    producer.start();
    //4.创建要发送的消息对象,指定topic,指定内容body
    Message msg = new Message("topic1","hello rocketmq".getBytes("UTF-8"));
    //4.2发送消息
    SendResult result = producer.send(msg);
    System.out.println("返回结果:"+result);
    //5.关闭连接
    producer.shutdown();
}

注意:关闭服务器防火墙

#关闭
systemctl stop firewalld.service 
#禁用开机启动
systemctl disable firewalld.service 
//发送多个消息 
for (int i = 1; i <= 10; i++) {
    Message msg = new Message("topic1",("生产者2: hello rocketmq "+i).getBytes("UTF-8"));
    SendResult result = producer.send(msg);
    System.out.println("返回结果:"+result);
}

3.消费者

public static void main(String[] args) throws Exception {
        //1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("192.168.52.128:9876");
        //3.设置接收消息对应的topic,对应的sub标签为任意*
        consumer.subscribe("topic1","*");
        //3.开启监听,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //遍历消息
                for(MessageExt msg : list){
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //4.启动接收消息的服务
        consumer.start();// 开启多线程 监控消息,持续运行
        System.out.println("接收消息服务已开启运行");
    }

 

6.2 广播模式(了解)

广播模式的现象
1) 如果 生产者先发送消息, 后启动消费者, 消息只能被消费一次
2) 如果多个消费者先启动(广播模式),后发消息,才有广播的效果
结论:
必须先启动消费者再启动发送者才有广播的效果

发送者

同上

消费者

//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
System.out.println(consumer.getInstanceName());
//consumer.setInstanceName("instance01");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("192.168.52.128:9876");
//3.设置接收消息对应的topic,对应的sub标签为任意*
consumer.subscribe("topic1","*");
​
//设置当前消费者的消费模式(默认模式:负载均衡)
// consumer.setMessageModel(MessageModel.CLUSTERING);
//设置当前消费者的消费模式为广播模式:所有客户端接收的消息都是一样的
consumer.setMessageModel(MessageModel.BROADCASTING);
​
//3.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        //遍历消息
        for(MessageExt msg : list){
            //                  System.out.println("收到消息:"+msg);
            System.out.println("消费者1:"+new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
​
//4.启动接收消息的服务
consumer.start();
System.out.println("接收消息服务已开启运行");

同时启动多个消费者

RocketMQ基础:MQ简介,环境搭建,RocketMQ消息
知识点梳理
课堂讲义
1 概述
2.MQ 的作用
3.MQ优缺点分析
4.常见产品
5. RocketMQ入门
6.消息发送

6.3 三种消息类型

  • 同步消息,默认消息类型

    特征:即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)

    RocketMQ基础:MQ简介,环境搭建,RocketMQ消息
知识点梳理
课堂讲义
1 概述
2.MQ 的作用
3.MQ优缺点分析
4.常见产品
5. RocketMQ入门
6.消息发送

    发送者

    //直接调用send(),默认即为同步消息
    SendResult result = producer.send(msg);

    消费者:接受topic2的消息

    consumer.subscribe("topic2","*");
  • 异步消息

    特征:即时性较弱,但需要有回执的消息,例如订单中的某些信息

    RocketMQ基础:MQ简介,环境搭建,RocketMQ消息
知识点梳理
课堂讲义
1 概述
2.MQ 的作用
3.MQ优缺点分析
4.常见产品
5. RocketMQ入门
6.消息发送

    //异步消息发送
    Message msg = new Message("topic2",
            ("异步消息:hello rocketmq " + i).getBytes("UTF-8"));
    producer.send(msg, new SendCallback() {
        //表示成功返回结果
        public void onSuccess(SendResult sendResult) {
            System.out.println("成功: " + sendResult);
        }
    ​
        //表示发送消息失败
        public void onException(Throwable t) {
            System.out.println("失败: "+ t.getMessage());
        }
    });
    ​
    //添加一个休眠操作,确保异步消息返回后能够输出
    TimeUnit.SECONDS.sleep(10);
    //producer.shutdown();
  • 单向消息 特征:不需要有回执的消息,例如日志类消息

     

    producer.sendOneway(msg);

完整代码如下:

public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("group1");
    producer.setNamesrvAddr("192.168.52.128:9876");
    producer.start();
    for (int i = 1; i <= 5; i++) {
        //同步消息发送
        Message msg = new Message("topic2",
                ("同步消息:hello rocketmq " + i).getBytes("UTF-8"));
        SendResult result = producer.send(msg);
        System.out.println("返回结果:" + result);
​
        //异步消息发送
        //Message msg = new Message("topic2",
        //        ("异步消息:hello rocketmq " + i).getBytes("UTF-8"));
        //producer.send(msg, new SendCallback() {
        //    //表示成功返回结果
        //    public void onSuccess(SendResult sendResult) {
        //        System.out.println(sendResult);
        //    }
        //
        //    //表示发送消息失败
        //    public void onException(Throwable t) {
        //        System.out.println(t);
        //    }
        //});
//单向消息
        //Message msg = new Message("topic2", ("单向消息:hello rocketmq " + i).getBytes(
        //        "UTF-8"));
        //producer.sendOneway(msg);
    }
    //添加一个休眠操作,确保异步消息返回后能够输出
    TimeUnit.SECONDS.sleep(10);
​
    producer.shutdown();
}

6.4 延时消息

延时消息是指生产者发送消息后,不能立刻被消费者消费,需要等待指定的时间后才可以被消费

应用场景 
在电商系统中,订单创建后,会有一个等待用户支付的时间窗口,一般为30分钟,30分钟后consumer收到这条订单消息,然后程序去订单表中检查当前这条订单的支付状态,如果是未支付的状态,则自动清理掉,这样就不需要使用定时任务的方式去处理了

延时消息格式:

Message msg = new Message("topic3",
        ("延时消息:hello rocketmq "+i).getBytes("UTF-8"));
//设置当前消息的延时效果
msg.setDelayTimeLevel(3);
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);

目前支持的延迟消息时间

  • 秒级:1,5,10,30

  • 分级:1~10,20,30

  • 时级:1,2

  • 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

    分别代表延迟level1-level18

消费者:

public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    consumer.setNamesrvAddr("192.168.52.128:9876");
    consumer.subscribe("topic3", "*");
​
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
                                                        ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            for (MessageExt msg : list) {
                System.out.println("消息:" + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.println("接收消息服务已开启运行");
}

6.5 批量消息

应用场景
12306的批量买票功能:要求同时买两张连坐票,需要同时发送两条购票信息

发送批量消息

//创建一个集合保存多个消息
List<Message> msgList = new ArrayList<Message>();
Message msg1 = new Message("topic5",("批量消息:hello rocketmq "+1).getBytes("UTF-8"));
Message msg2 = new Message("topic5",("批量消息:hello rocketmq "+2).getBytes("UTF-8"));
Message msg3 = new Message("topic5",("批量消息:hello rocketmq "+3).getBytes("UTF-8"));
msgList.add(msg1);
msgList.add(msg2);
msgList.add(msg3);

//发送批量消息(每次发送的消息总量不得超过4M)
//消息的总长度包含4个信息:topic,body,消息的属性,日志(20字节)
SendResult send = producer.send(msgList);

注意:

  • 消息内容总长度不超过4M

  • 消息内容总长度包含如下: topic(字符串字节数) body (字节数组长度) 消息追加的属性(key与value对应字符串字节数) 日志(固定20字节)

6.6 Tag过滤消息

应用场景
订单超时时间:虚拟商品30分钟,实物商品24小时;
处理超时订单需要两个消费者,一个只消费tag=虚拟的消息,另一个消费tag=实物的消息

生产者

public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.52.128:9876");
        producer.start();

        //创建消息的时候除了制定topic,还可以指定tag
        Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq 2").getBytes("UTF-8"));

        SendResult send = producer.send(msg);
        System.out.println(send);

        producer.shutdown();
    }

消费者

*代表任意tag

"tag1 || tag2" 代表两个 tag 那个都行

//接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag
consumer.subscribe("topic6","tag1 || tag2");
//consumer.subscribe("topic6","*");

6.7 属性过滤消息(了解)

也可以叫做属性过滤/语法过滤/SQL过滤

生产者

//为消息添加属性
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");

消费者

//使用消息选择器来过滤对应的属性,语法格式为类SQL语法
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
注意:SQL过滤需要依赖服务器的功能支持,在broker配置文件中添加对应的功能项,并开启对应功能

conf/broker.conf

enablePropertyFilter=true

启动服务器

sh mqbroker -n localhost:9876 -c ../conf/broker.conf

6.8 顺序消息

6.8.1 错乱的消息顺序

默认情况下,MQ 开启了多个队列, 同时发送多个消息的的话,发送给那个队列是不确定的,同时消息的消费者读取消息,每读取一个消息开启一个线程,也不能保证消息的顺序性

RocketMQ基础:MQ简介,环境搭建,RocketMQ消息
知识点梳理
课堂讲义
1 概述
2.MQ 的作用
3.MQ优缺点分析
4.常见产品
5. RocketMQ入门
6.消息发送

6.8.2 顺序消息实现

应用场景
秒杀系统,先发送下单消息的用户,应该优先下单成功;此时就必须保证先发过来的消息,先被消费插入到订单数据库

想要保证消息的有序性需要满足两点:

  • 相同ID的消息发送到同一个队列中

  • 同一个队列中的消息被同一个线程消费

实现方式如下:

  • 发送者

   public static void main(String[] args) throws Exception {
       DefaultMQProducer producer = new DefaultMQProducer("group1");
       producer.setNamesrvAddr("192.168.52.128:9876");
       producer.start();

       //创建要执行的业务队列
       List<Order> orderList = new ArrayList<Order>();

       Order order11 = new Order();
       order11.setId("a");
       order11.setMsg("主单-1");
       orderList.add(order11);

       Order order12 = new Order();
       order12.setId("a");
       order12.setMsg("子单-2");
       orderList.add(order12);

       Order order13 = new Order();
       order13.setId("a");
       order13.setMsg("支付-3");
       orderList.add(order13);

       Order order14 = new Order();
       order14.setId("a");
       order14.setMsg("推送-4");
       orderList.add(order14);

       Order order21 = new Order();
       order21.setId("b");
       order21.setMsg("主单-1");
       orderList.add(order21);

       Order order22 = new Order();
       order22.setId("b");
       order22.setMsg("子单-2");
       orderList.add(order22);

       Order order31 = new Order();
       order31.setId("c");
       order31.setMsg("主单-1");
       orderList.add(order31);

       Order order32 = new Order();
       order32.setId("c");
       order32.setMsg("子单-2");
       orderList.add(order32);

       Order order33 = new Order();
       order33.setId("c");
       order33.setMsg("支付-3");
       orderList.add(order33);

       //设置消息进入到指定的消息队列中
       for(final Order order : orderList){
           Message msg = new Message("orderTopic",order.toString().getBytes());
           //发送时要指定对应的消息队列选择器
           SendResult result = producer.send(