RocketMQ入门、安装、详解、*配置

version 2.0 【更新于 2020.03.20】

  • 本次更新重点,主从同步!!!(很多小伙伴出现主从不同步的问题)
  • 主从同步关注点(详见配置说明):
    • 1. brokerRole
    • 2. brokerIP2
    • 3. 重启程序顺序
  • 官方压缩包由tar.gz改为zip
  • bin目录下多了一个文件夹,从而grep命令参数多了--exclude-dir
  • 注意 nameserver 和 master 以及 slave 的区别
  • 注意重启方式,不要使用kill -9
  • nameserver集群、broker(单master/单slave;多master/多slave;单master/多slave)

下载&安装

下载地址http://rocketmq.apache.org/docs/quick-start/

解压 tar -zxvf rocketmq-all-4.4.0-bin-release.tar.gz

解压 unzip rocketmq-all-4.7.0-bin-release.zip【version 2.0】 

进入HOME目录 cd rocketmq-all-4.4.0-bin-release

开启端口 9876/9876 10909/10912 验证 netstat -nltp

修改配置

启动脚本内存配置

cd bin

grep "Xmx" *

grep 'Xmx' * --exclude-dir="*" 【version 2.0】

grep 'MaxDirectMemorySize' * --exclude-dir="*" 【version 2.0】

vim runbroker.sh runserver.sh vim tools.sh

 JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"

改为 JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=1g"

Broker配置 【master】

vim conf/broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
# 0 表示master,大于0 表示slave
brokerId = 0
# 服务节点/注册中心
namesrvAddr=注册中心IP:9876
# 多服务节点/注册中心,使用分号【;】
#namesrvAddr=注册中心IP:9876;注册中心IP:9876;注册中心IP:9876
# Broker服务地址
brokerIP1=当前机器IP
# BrokerHAIP地址,供slave同步消息的地址
brokerIP2=当前机器IP
# 删除时间【时】此处表示凌晨4点
deleteWhen = 04
# 数据存储时间【时】 此处表示48小时
fileReservedTime = 48
# SYNC_MASTER(同步双写) 、ASYNC_MASTER(异步复制) 、SLAVE
brokerRole = SYNC_MASTER
# SYNC_FLUSH(同步刷盘) 和ASYNC_FLUSH(异步刷盘),写磁盘
flushDiskType = SYNC_FLUSH
# 是否自动创建topic 线上改为false 测试true
autoCreateTopicEnable=true

Broker配置 【slave】

vim conf/broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
# 0 表示master,大于0 表示slave
brokerId = 1
# 服务节点/注册中心
namesrvAddr=注册中心IP:9876
# 多服务节点/注册中心,使用分号【;】
#namesrvAddr=注册中心IP:9876;注册中心IP:9876;注册中心IP:9876
# Broker服务地址
brokerIP1=当前机器IP
# BrokerHAIP地址,供slave同步消息的地址
brokerIP2=当前机器IP
# 删除时间【时】此处表示凌晨4点
deleteWhen = 04
# 数据存储时间【时】 此处表示48小时
fileReservedTime = 48
# SYNC_MASTER(同步双写) 、ASYNC_MASTER(异步复制) 、SLAVE
brokerRole = SLAVE
# SYNC_FLUSH(同步刷盘) 和ASYNC_FLUSH(异步刷盘),写磁盘
flushDiskType = SYNC_FLUSH
# 是否自动创建topic 线上改为false 测试true
autoCreateTopicEnable=true

启动

  1. 【NamesrvStartup】启动namesrv nohup bin/mqnamesrv -n 注册中心IP:9876 > mqnamesrv.log 2>&1 &

  2. 检查端口监听是否为0.0.0.0:9876/注册中心IP:9876 命令 netstat -anpt | grep 9876

  3. 【BrokerStartup】启动master节点 nohup sh bin/mqbroker -n 注册中心IP:9876 -c conf/broker.conf > broker.log 2>&1 &

  4. 【BrokerStartup】启动slave节点 nohup sh bin/mqbroker -n 注册中心IP:9876 -c conf/broker.conf > broker.log 2>&1 &

  5. 查看是否注册成功(集群信息) bin/mqadmin clusterList -n 注册中心IP:9876

不要使用kill -9!!!

  1. 停止 slave bin/mqshutdown broker

  2. 停止 master bin/mqshutdown broker

  3. 停止 namesrv bin/mqshutdown namesrv

常用命令

  • cd rocketmq-all-4.4.0-bin-release/
  • 注册中心机器上

---集群相关

查询集群信息 bin/mqadmin clusterList -n localhost:9876

打印Broker配置 bin/mqbroker -m -n localhost:9876

更新Broker配置 bin/mqadmin updateBrokerConfig -c DefaultCluster -k autoCreateTopicEnable -v false -n localhost:9876

查看Broker统计信息 bin/mqadmin brokerstatus –n localhost:9876 –b locahost:10909

---订阅组相关

创建订阅组 bin/mqadmin updateSubGroup -n localhost:9876 -c ClusterName -g GroupName

列出消费组 bin/mqadmin consumerProgress -n localhost:9876

查看消费组IP bin/mqadmin consumerStatus -g GroupName -n localhost:9876

查看消费组数据堆积 bin/mqadmin consumerProgress -n localhost:9876 -g GroupName

删除订阅组 bin/mqadmin deleteSubGroup -n localhost:9876 -c ClusterName -g GroupName

---Topic相关

创建Topic bin/mqadmin updateTopic -c ClusterName -n localhost:9876 -t TopicName

Topic列表 bin/mqadmin topicList -n localhost:9876

发送Topic消息测试 bin/mqadmin checkMsgSendRT -n localhost:9876 -t TopicName -s 1024

打印Topic消息 bin/mqadmin printMsg -n localhost:9876 -t TopicName

Topic详情统计 bin/mqadmin topicstatus -n localhost:9876 -t TopicName

获取Topic的cluster bin/mqadmin topicClusterList -n localhost:9876 -t TopicName

删除Topic bin/mqadmin deleteTopic -n localhost:9876 -t TopicName -c ClusterName

查看Topic路由 bin/mqadmin topicRoute -n localhost:9876 -t TopicName

查看Topic状态 bin/mqadmin topicStatus -n localhost:9876 -t TopicName

根据ID查询消息 bin/mqadmin queryMsgById -i msgId -n localhost:9876

根据偏移量查询消息 bin/mqadmin queryMsgByOffset -b BrokerName -i 3 -n localhost:9876 -o 299 -t TopicName

broker配置说明

基础配置

配置 描述 默认值 例子
namesrvAddr nameServer地址,如果nameserver是多台集群的话,用分号分割 namesrvAddr=10.1.219.75:9876;10.1.219.76:9876
brokerClusterName 所属集群名字。Cluster 的地址,如果集群机器数比较多,可以分成多个Cluster ,每个Cluster 供一个业务群使用 brokerClusterName=rocketmq-cluster
brokerName Broker 的名称, Master 和Slave 通过使用相同的Broker 名称来表明相互关系,以说明某个Slave 是哪个Master 的Slave   brokerName=broker-a
brokerId 一个Master Barker 可以有多个Slave, 0 表示Master ,大于0 表示不同的 Slave 的ID   brokerId=0
fileReservedTime 在磁盘上保存消息的时长,单位是小时,自动删除超时的消息   fileReservedTime=48
deleteWhen 与fileReservedTim巳参数呼应,表明在几点做消息删除动作,默认值04 表示凌晨4 点   deleteWhen=04
brokerRole brokerRole 有3 种: SYNCMASTER(同步双写) 、ASYNCMASTER(异步复制) 、SLAVE 。关键词SYNC 和ASYNC 表示Master 和Slave 之间同步消息的机制, SYNC 的意思是当Slave 和Master 消息同步完成后,再返回发送成功的状态   brokerRole=SYNC_MASTER
flushDiskType flushDiskType 表示刷盘策略,分为SYNCFLUSH 和ASYNCFLUSH两种,分别代表同步刷盘和异步刷盘。同步刷盘情况下,消息真正写人磁盘后再返回成功状态;异步刷盘情况下,消息写人page_cache 后就返回成功状态   flushDiskType=ASYNC_FLUSH
listenPort Broker 监听的端口号,如果一台机器上启动了多个Broker , 则要设置不同的端口号,避免冲突   listenPort=10911
storePathRootDir 存储消息以及一些配置信息的根目录   storePathRootDir=/app/custom/data/rocketmq/store-a

进阶配置

配置 描述 默认值 例子
autoCreateTopicEnable 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭   autoCreateTopicEnable=true
defaultTopicQueueNums 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。   defaultTopicQueueNums=4
autoCreateSubscriptionGroup 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭   autoCreateTopicEnable=true
mapedFileSizeCommitLog commitLog每个文件的大小,默认1G 1G mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue ConsumeQueue每个文件默认存30W条,根据业务情况调整   mapedFileSizeConsumeQueue=300000

存储配置

配置 描述 默认值 例子
storePathRootDir 存储消息以及一些配置信息的根目录   storePathRootDir=/app/custom/data/rocketmq/store-a
storePathCommitLog commitLog 存储路径   storePathCommitLog=/data/rocketmq/store/commitlog
storePathConsumeQueue 消费队列存储路径存储路径   storePathConsumeQueue=/data/rocketmq/store/consumequeue
storePathIndex 消息索引存储路径   storePathIndex=/data/rocketmq/store/index
storeCheckpoint checkpoint 文件存储路径   storeCheckpoint=/data/rocketmq/store/checkpoint
abortFile abort 文件存储路径   abortFile=/data/rocketmq/store/abort

JAVA示例

  1. pom.xml

<dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.5.0</version>
            <exclusions>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>netty-tcnative</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
    </dependencies>

  2. Producer 生产者

public class Producer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
                DefaultMQProducer("GroupName");
        // Specify name server addresses.
        producer.setNamesrvAddr("IP:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicName" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

  3. Consumer 消费者

public class Consumer {
    public static void main(String[] args) {
        String topicName = "TopicName";
        DefaultMQPushConsumer consumer =
                new DefaultMQPushConsumer("GroupName");
        consumer.setNamesrvAddr("IP:9876");
        try {
            consumer.subscribe(topicName, "*");
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
             * 如果非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(
                    ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            /**
             * 如果是顺序消息,这边的监听就要使用MessageListenerOrderly监听
             * 并且,返回结果也要使用ConsumeOrderlyStatus
             */
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    //设置自动提交,如果不设置自动提交就算返回SUCCESS,消费者关闭重启 还是会重复消费的
                    context.setAutoCommit(true);
                    try {
                        for (MessageExt msg : msgs) {
                            String recString = null;
                            try {
                                recString = new String(msg.getBody(), "UTF-8");
                            } catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                            System.out.println(recString);
                    } catch (Exception e) {
                        e.printStackTrace();
                        //如果出现异常,消费失败,挂起消费队列一会会,稍后继续消费
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                    //消费成功
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  4. 打包成jar的插件[可选]

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <!--jdk 版本-->
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!--全限定名-->
                                    <mainClass>com.package.Consumer</mainClass>
                                </transformer>
                            </transformers>
                            <artifactSet> </artifactSet>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
RocketMQ入门、安装、详解、*配置 个人微信,有什么建议、意见或补充,欢迎及时沟通!!!(添加时注明“博客园”,谢谢)