RocketMQ入门(三)拉取消息
RocketMQ入门(3)拉取消息
RocketMQ不止可以直接推送消息,在消费端注册监听器进行监听,还可以由消费端决定自己去拉取数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
/**
* PullConsumer,订阅消息
*/
publicclassPullConsumer{
//Java缓存
privatestaticfinalMap<MessageQueue,Long>offseTable=newHashMap<MessageQueue,Long>();
publicstaticvoidmain(String[]args)throwsMQClientException{
DefaultMQPullConsumer consumer=newDefaultMQPullConsumer("PullConsumerGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
//拉取订阅主题的队列,默认队列大小是4
Set<MessageQueue>mqs=consumer.fetchSubscribeMessageQueues("TopicTestMapBody");
for(MessageQueue mq:mqs){
System.out.println("Consume from the queue: "+mq);
SINGLE_MQ:while(true){
try{
PullResult pullResult=
consumer.pullBlockIfNotFound(mq,null,getMessageQueueOffset(mq),32);
List<MessageExt>list=pullResult.getMsgFoundList();
if(list!=null&&list.size()<100){
for(MessageExt msg:list){
System.out.println(SerializableInterface.deserialize(msg.getBody()));
}
}
System.out.println(pullResult.getNextBeginOffset());
putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
switch(pullResult.getPullStatus()){
caseFOUND:
// TODO
break;
caseNO_MATCHED_MSG:
break;
caseNO_NEW_MSG:
breakSINGLE_MQ;
caseOFFSET_ILLEGAL:
break;
default:
break;
}
}
catch(Exceptione){
e.printStackTrace();
}
}
}
consumer.shutdown();
}
privatestaticvoidputMessageQueueOffset(MessageQueue mq,longoffset){
offseTable.put(mq,offset);
}
privatestaticlonggetMessageQueueOffset(MessageQueue mq){
Longoffset=offseTable.get(mq);
if(offset!=null){
System.out.println(offset);
returnoffset;
}
return0;
}
|
刚开始的没有细看PullResult对象,以为拉取到的结果没有MessageExt对象还跑到群里面问别人,犯2了
特别要注意 静态变量offsetTable的作用,拉取的是按照从offset(理解为下标)位置开始拉取,拉取N条,offsetTable记录下次拉取的offset位置
http://www.changeself.net/archives/rocketmq%E5%85%A5%E9%97%A8%EF%BC%883%EF%BC%89%E6%8B%89%E5%8F%96%E6%B6%88%E6%81%AF.html
相关推荐
- RocketMQ入门(2)最佳实践 RocketMQ入门(2)最佳实践 一、服务端安装部署 二、编写客户端 三、Consumer最佳实践 四、Producer最佳实践
- RocketMQ入门(3)拉取消息 RocketMQ入门(3)拉取消息
- RocketMQ入门(1) RocketMQ入门(1) RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:能够保证严格的消息顺序
- asp.net c# 通过消息队列处理高并发请求(以抢小米手机为例) 演示一下现象 第一阶段,利用线程锁简单粗暴 第二阶段,拉消息队列,通过生产者,消费者的模式 第三阶段 反转生产者消费者的角色,把可售产品提前放到队列里,然后让提交的订单来消费队列里的内容
- git 删除远端分支,本地新创建分支推到远程或者 拉取远程分支并创建本地分支 一、本地新创建分支推到远程 二、git拉取远程分支并创建本地分支 三、删除远程分支 四、修改分支名称
- RocketMQ之六:RocketMQ消息存储 三、RocketMQ存储优化技术
- 网站拉取QQ登录及第三方登录,详解...
- SignalR技术 Asp.net SignalR快速入门 一、前言 二、Asp.net SignalR 是个什么东东 三、使用Asp.net SignalR在Web端实现广播消息 四、在桌面程序中如何使用Asp.net SignalR 五、总结
- 消息队列中间件(三)Kafka 入门指南
- 版本控制git之三-多人协作 变基 推送 拉取 删除远程分支 版本控制git之三-多人协作
- pushlet 之 无法传送中文解决方法
- glibc安装后gnome下界面变成中英混合了。该怎么处理