RocketMQ装配与demo
服务器设置
1. 安装64位JDK;
2. 设置Linux文件系统为Ext4
3.开启9876,10911防火墙端口
源码编码
1. 安装Maven 2. 下载RocketMQ源码,下载地址:http://github.com/alibaba/RocketMQ.git/trunk,进入到源码解压目录下运行install.bat或DOS命令行切换到解压目录运行: mvn -Dmaven.test.skip=true clean package install assembly:assembly -U,编译成功后,在target目录下会有alibaba-rocketmq-3.1.1.tar.gz,该压缩包就是安装包。
3. 安装
将alibaba-rocketmq-3.1.1.tar.gz上传到linux服务器,解压:tar -zxvf
alibaba-rocketmq-3.1.1.tar.gz设置执行权限chmod +x ./alibaba-rocketmq/bin/*
4. 运行
配置采用双Master,双Slave,异步复制的配置方式,共需要4台服务器做硬件支持。 a. 修改配置
(1)创建目录
mkdir /home/rocket/alibaba-rocketmq/logs #创建日志目录
mkdir -p /home/rocket/alibaba-rocketmq/data/store/commitlog #创建数据存储目录
更改日志目录
cd /home/rocket/alibaba-rocketmq/conf
(2)修改A主配置
vi ./conf/2m-2s-async/broker-a.properties:
1. # brokerClusterName=DefaultCluster
2. brokerName=broker-a
3. brokerId=0
4. namesrvAddr=192.168.1.119:9876;192.168.1.120:9876;192.168.1.121:9876;1
92.168.1.122:9876
5. defaultTopicQueueNums=4
6. autoCreateTopicEnable=true
7. autoCreateSubscriptionGroup=true
8. listenPort=10911
9. deleteWhen=04
10. fileReservedTime=120
11. mapedFileSizeCommitLog=1073741824
12. mapedFileSizeConsumeQueue=50000000
13. destroyMapedFileIntervalForcibly=120000
14. redeleteHangedFileInterval=120000
15. diskMaxUsedSpaceRatio=88
16. storePathRootDir=/usr/framework/rocketmq/datas
17. storePathCommitLog=/usr/framework/rocketmq/logs
18. maxMessageSize=65536
19. flushCommitLogLeastPages=4
20. flushConsumeQueueLeastPages=2
21. flushCommitLogThoroughInterval=10000
22. flushConsumeQueueThoroughInterval=60000
23. checkTransactionMessageEnable=false
24. sendMessageThreadPoolNums=128
25. pullMessageThreadPoolNums=128
26. brokerRole=SYNC_MASTER
27. flushDiskType=ASYNC_FLUSH
(3)修改A从配置
vi ./conf/2m-2s-async/broker-a-s.properties:
1. # brokerClusterName=DefaultCluster
2. brokerName=broker-a
3. brokerId=1
4. namesrvAddr=192.168.1.119:9876;192.168.1.120:9876;192.168.1.121:9876;1
92.168.1.122:9876
5. defaultTopicQueueNums=4
6. autoCreateTopicEnable=true
7. autoCreateSubscriptionGroup=true
8. listenPort=10911
9. deleteWhen=04
10. fileReservedTime=120
11. mapedFileSizeCommitLog=1073741824
12. mapedFileSizeConsumeQueue=50000000
13. destroyMapedFileIntervalForcibly=120000
14. redeleteHangedFileInterval=120000
15. diskMaxUsedSpaceRatio=88
16. storePathRootDir=/usr/framework/rocketmq/datas
17. storePathCommitLog=/usr/framework/rocketmq/logs
18. maxMessageSize=65536
19. flushCommitLogLeastPages=4
20. flushConsumeQueueLeastPages=2
21. flushCommitLogThoroughInterval=10000
22. flushConsumeQueueThoroughInterval=60000
23. checkTransactionMessageEnable=false
24. sendMessageThreadPoolNums=128
25. pullMessageThreadPoolNums=128
26. brokerRole=SLAVE
27. flushDiskType=ASYNC_FLUSH
(4)修改B主配置
vi ./conf/2m-2s-async/broker-b.properties:
28. # brokerClusterName=DefaultCluster
29. brokerName=broker-b
30. brokerId=0
31. namesrvAddr=192.168.1.119:9876;192.168.1.120:9876;192.168.1.121:9876;1
92.168.1.122:9876
32. defaultTopicQueueNums=4
33. autoCreateTopicEnable=true
34. autoCreateSubscriptionGroup=true
35. listenPort=10911
36. deleteWhen=04
37. fileReservedTime=120
38. mapedFileSizeCommitLog=1073741824
39. mapedFileSizeConsumeQueue=50000000
40. destroyMapedFileIntervalForcibly=120000
41. redeleteHangedFileInterval=120000
42. diskMaxUsedSpaceRatio=88
43. storePathRootDir=/usr/framework/rocketmq/datas
44. storePathCommitLog=/usr/framework/rocketmq/logs
45. maxMessageSize=65536
46. flushCommitLogLeastPages=4
47. flushConsumeQueueLeastPages=2
48. flushCommitLogThoroughInterval=10000
49. flushConsumeQueueThoroughInterval=60000
50. checkTransactionMessageEnable=false
51. sendMessageThreadPoolNums=128
52. pullMessageThreadPoolNums=128
53. brokerRole=SYNC_MASTER
54. flushDiskType=ASYNC_FLUSH (5)修改B从配置
vi ./conf/2m-2s-async/broker-b-s.properties:
28. # brokerClusterName=DefaultCluster
29. brokerName=broker-b
30. brokerId=1
31. namesrvAddr=192.168.1.119:9876;192.168.1.120:9876;192.168.1.121:9876;1
92.168.1.122:9876
32. defaultTopicQueueNums=4
33. autoCreateTopicEnable=true
34. autoCreateSubscriptionGroup=true
35. listenPort=10911
36. deleteWhen=04
37. fileReservedTime=120
38. mapedFileSizeCommitLog=1073741824
39. mapedFileSizeConsumeQueue=50000000
40. destroyMapedFileIntervalForcibly=120000
41. redeleteHangedFileInterval=120000
42. diskMaxUsedSpaceRatio=88
43. storePathRootDir=/usr/framework/rocketmq/datas
44. storePathCommitLog=/usr/framework/rocketmq/logs
45. maxMessageSize=65536
46. flushCommitLogLeastPages=4
47. flushConsumeQueueLeastPages=2
demo:
Producer类
package com.lvxc.study.tech.rmq;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("Producer");
//nameserver服务,多个以;分开
producer.setNamesrvAddr("192.168.133.128:9876");
try{
producer.start();
Message msg = new Message("PushTopic","push","1","Just for test.".getBytes());
SendResult result = producer.send(msg);
System.out.println("id:"+result.getMsgId()+" result:" +result.getSendStatus());;
msg = new Message("PushTopic","push","2","Just for test.".getBytes());
result = producer.send(msg);
System.out.println("id:"+result.getMsgId()+" result:" +result.getSendStatus());;
msg = new Message("PullTopic","pull","1","Just for test.".getBytes());
result = producer.send(msg);
System.out.println("id:"+result.getMsgId()+" result:" +result.getSendStatus());;
}catch(Exception e){
e.printStackTrace();
}finally {
producer.shutdown();
}
}
}
Consumer类
package com.lvxc.study.tech.rmq;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer");
consumer.setNamesrvAddr("192.168.133.128:9876");
try {
//订阅PushTopic下Tag为push的消息
consumer.subscribe("PushTopic", "push");
//程序第一次启动从消息队列头取数据
consumer.setConsumeFromWhere(
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> list,
ConsumeConcurrentlyContext Context) {
Message msg = list.get(0);
System.out.println(msg.toString());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}