rocketMQ 事务消息

producer

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction

客户端的流程
1. 客户端同步发送事务 half 消息
2. 收到 broker 响应,则执行本地事务
3. 根据本地事务状态,决定 commit 或 rollback 消息
4. 发送 commit 或 rollback 给 broker
5. 等待 broker 发送 check 消息

第 1 步是同步发送,客户端需要等到消息的 commitLog offset 和 queue offset
第 3 步 commit 或 rollback 时,会传 commitLog offset 和 queue offset
第 4 步是 oneway,即不需要结果

broker

涉及 3 个 topic:
RMQ_SYS_TRANS_HALF_TOPIC
half topic,producer 发送的 half 消息存储在 RMQ_SYS_TRANS_HALF_TOPIC,这个 topic 只有一个 queue

RMQ_SYS_TRANS_OP_HALF_TOPIC
op topic,producer commit 或 rollback 消息时,broker 往 RMQ_SYS_TRANS_OP_HALF_TOPIC 写入一条消息,内容是 half 消息的 queue offset

真实 topic
producer commit 消息时,broker 把 half 消息写入真实 topic

理想情况:
1. producer 发送 half 消息
2. broker 把消息放入 half topic,返回响应给 producer
3. producer 执行本地事务成功,取出响应中的 half 消息的 commitLog offset 和 queue offset,commit 消息(oneway)
4. broker 从 commitLog 中查询消息,并把消息写入真实 topic
5. broker 写入一条 op 消息,op 消息的内容是 half 消息的 queue offset

broker 会一直去 check 消息

org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#onWaitEnd

当客户端执行本地事务,既不 commit 也不 rollback。对于这种消息,broker 就需要发送 check 给 producer。

check 的基本思想就是:
broker 遍历 half 消息,如果当前 half 消息没有对应的 op 消息,而且超过了过期时间,需要 check
check 的过程是:
broker 把 half 消息再写入 half topic,带上这条消息的 commitLog offset 和 queue offset,向 producer 发送 check,producer 检查本地事务状态,决定 commit 还是 rollback

check 超过次数限制,或者消息太旧了,会被 skip,不再 check
过期时间可以指定,如果指定了,则当时间没有超时,也会把这条消息重新放入 half topic
如果当前消息没到过期时间,重新放入 half topic