RocketMQ入门(3)拉取消息 RocketMQ入门(3)拉取消息
转自:http://www.changeself.net/archives/rocketmq入门(3)拉取消息.html
RocketMQ不止可以直接推送消息,在消费端注册监听器进行监听,还可以由消费端决定自己去拉取数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
/**
* PullConsumer,订阅消息
*/
public
class
PullConsumer
{
//Java缓存
private
static
final
Map<MessageQueue,
Long>
offseTable
=
new
HashMap<MessageQueue,
Long>();
public
static
void
main(String[]
args)
throws
MQClientException
{
DefaultMQPullConsumer
consumer
=
new
DefaultMQPullConsumer("PullConsumerGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
//拉取订阅主题的队列,默认队列大小是4
Set<MessageQueue>
mqs
=
consumer.fetchSubscribeMessageQueues("TopicTestMapBody");
for
(MessageQueue
mq
:
mqs)
{
System.out.println("Consume
from the queue: "
+
mq);
SINGLE_MQ:while(true){
try
{
PullResult
pullResult
=
consumer.pullBlockIfNotFound(mq,
null,
getMessageQueueOffset(mq),
32);
List<MessageExt>
list=pullResult.getMsgFoundList();
if(list!=null&&list.size()<100){
for(MessageExt
msg:list){
System.out.println(SerializableInterface.deserialize(msg.getBody()));
}
}
System.out.println(pullResult.getNextBeginOffset());
putMessageQueueOffset(mq,
pullResult.getNextBeginOffset());
switch
(pullResult.getPullStatus())
{
case
FOUND:
//
TODO
break;
case
NO_MATCHED_MSG:
break;
case
NO_NEW_MSG:
break
SINGLE_MQ;
case
OFFSET_ILLEGAL:
break;
default:
break;
}
}
catch
(Exception
e)
{
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private
static
void
putMessageQueueOffset(MessageQueue
mq,
long
offset)
{
offseTable.put(mq,
offset);
}
private
static
long
getMessageQueueOffset(MessageQueue
mq)
{
Long
offset
=
offseTable.get(mq);
if
(offset
!=
null){
System.out.println(offset);
return
offset;
}
return
0;
}
|
刚开始的没有细看PullResult对象,以为拉取到的结果没有MessageExt对象还跑到群里面问别人,犯2了
特别要注意 静态变量offsetTable的作用,拉取的是按照从offset(理解为下标)位置开始拉取,拉取N条,offsetTable记录下次拉取的offset位置
相关推荐
- RocketMQ入门(3)拉取消息 RocketMQ入门(3)拉取消息
- RocketMQ中PullConsumer的消息拉取源码分析
- Rocketmq源码解读之消息拉取
- RocketMQ入门(三)拉取消息
- RocketMQ从入门到放弃 单机搭建 下载控制台 生产者 指定任意队列个数 消费者 MessageListenerConcurrently和MessageListenerOrderly的区别 消息发送到哪个队列? RocketMq提供的队列选择器 三种发送模式 发送超时设置 消费者设置拉取消息数量 MessageListenerOrderly的返回值 MessageListenerConcurrently的返回值 集群消费 集群消费策略 自定义消费策略 广播消费 两个消费者组,模拟广播消费 同一个消费者组监听不同的Topic会发生什么? 集群搭建双master,没有salve 集群搭建双master,双salve salve节点只能读,不能写 实现顺序消息 消费者如果有异常,不会抛出!!! 延迟消息 消息去重 保证消息的绝对不丢失 单机搭建 下载控制台 生产者 指定任意队列个数 消费者 MessageListenerConcurrently和MessageListenerOrderly的区别 消
- RocketMQ基础:MQ简介,环境搭建,RocketMQ消息 知识点梳理 课堂讲义 1 概述 2.MQ 的作用 3.MQ优缺点分析 4.常见产品 5. RocketMQ入门 6.消息发送
- apollo客户端的长轮询机制的原理 工作原理 Apollo为什么用长轮询而不是长连接? Apollo 3 — 定时/长轮询拉取配置的设计 通过spring提供的DeferredResult实现长轮询服务端推送消息 设置springboot自带tomcat的最大连接数和最大并发数
- IM消息送达保证机制实现(二):保证离线消息的可靠投递 1、前言 2、学习交流 3、IM消息送达保证系列文章 4、消息接收方不在线时的典型消息发送流程 5、典型离线消息表的设计以及拉取离线消息的过程 6、上述流程存在的问题以及优化方案 7、消息接收方一次拉取大量离线消息导致速度慢、卡顿的解决方法 8、优化离线消息的拉取过程,保证离线消息不会丢失 9、进一步优化,解决重复拉取离线消息的问题 10、进一步优化,降低离线拉取ACK带来的额外与服务器的交互次数 11、本文小结 12、IM技术资料分类
- RocketMQ常用命令 1.2. 详细命令
- RocketMQ入门(1) RocketMQ入门(1) RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:能够保证严格的消息顺序