发布者从出站适配器确认后如何将 basicAck 发送到入站适配器
我们有一个接收事件通知的入站通道适配器.消费者标准的复杂性限制了我们使用简单的路由键来分发消息的能力,因此应用程序使用拆分器通过直接交换将该消息发送到感兴趣的订阅者队列.
We have an inbound channel adapter that receives notifications of an event. The complexity of the consumer's criteria restrict our ability to use a simple routing key to distribute the messages, so the application uses a splitter to send that message to interested subscriber's queues via a direct exchange.
我们想在我们的出站通道适配器上使用发布者确认来确保交付到客户端队列.我们要等待发布者确认ack
原始消息,如果没有收到发布者确认或者如果ack==false
我们想要nack原始消息来自入站通道适配器的消息.
We want to use publisher confirms on our outbound channel adapter the ensure delivery to the client queues. We want to wait for the publisher confirm to ack
the original message, and if a publisher confirm fails to be received or if the ack==false
we want to nack the original message that came from the inbound channel adapter.
我认为这将在 Rabbit 模板的 confirm-callback
中完成,但我不确定如何完成.(或者如果有可能的话)
I assume this will be done in the confirm-callback
from the Rabbit Template but I am not sure how to accomplish this. (Or if it is even possible)
<rabbit:connection-factory id="rabbitConnectionFactory"
host="${amqpHost}"
username="${amqpUsername}"
password="${amqpPassword}"
virtual-host="${amqpVirtualHost}"
publisher-confirms="true" />
<rabbit:template id="rabbitTemplate"
connection-factory="rabbitConnectionFactory"
confirm-callback="PublisherConfirms" />
<int-amqp:inbound-channel-adapter channel="notificationsFromRabbit"
queue-names="#{'${productNotificationQueue}' + '${queueSuffix}'}"
connection-factory="rabbitConnectionFactory"
mapped-request-headers="*"
message-converter="productNotificationMessageConverter" />
<int:chain input-channel="notificationsFromRabbit" output-channel="notificationsToClients">
<int:service-activator ref="NotificationRouter"
method="addRecipientsHeaders" />
<int:splitter ref="NotificationRouter"
method="groupMessages" />
<int:object-to-json-transformer />
</int:chain>
<int-amqp:outbound-channel-adapter channel="notificationsToClients"
amqp-template="rabbitTemplate"
exchange-name="${servicesClientsExchange}"
routing-key=""
mapped-request-headers="*" />
目前,我们通过将 Channel 和 Delivery 标签作为参数传递来acking
groupMessages
方法中的消息.但是,如果代理从不发送 return
或以 ack=false
返回,那么从入站通道适配器 nack
消息为时已晚.
At the moment we are acking
the messages in the groupMessages
method by passing the Channel and Delivery tag as paramters. But, if the broker never sends a return
or returns with ack=false
then it is too late to nack
the message from the inbound channel adapter.
我是否需要一个 bean 来保留 Map<Channel, Long>
的通道和交付标签以在 confirm-callback
中访问,或者是否有一些其他方式?
Am I going to need a bean that keeps a Map<Channel, Long>
of the channel and delivery tags to access in the confirm-callback
or is there some other way?
在我收到发布者确认时,来自入站通道适配器的通道是否会关闭?
Is the channel from the inbound channel adapter going to be closed by the time I receive a publisher confirm?
只要你暂停消费者线程直到收到所有的 acks/nacks,你就可以为所欲为.
As long as you suspend the consumer thread until all the acks/nacks have been received, you can do what you want.
如果您将 notificationsFromRabbit
设为发布订阅频道,您可以在暂停线程的地方添加另一个订阅者(服务激活器);等待所有 acks/nacks 并采取您想要的操作.
If you make notificationsFromRabbit
a publish-subscribe channel you can add another subscriber (service-activator) where you suspend the thread; wait for all the acks/nacks and take the action you desire.
您也可以使用 Spring 集成为您管理确认,它会将它们作为来自出站适配器的消息发出(而不是自己使用回调).
You can also use Spring Integration to manage the acks for you and it will emit them as messages from the outbound adapter (rather than using a callback yourself).
编辑 2:
然后,您可以在关联数据中使用拆分器的序列大小/序列号标头,从而在收到所有 ack 时释放消费者.
You could then use the splitter's sequence size/sequence number headers in your correlation data, enabling the release of the consumer when all the acks are received.
编辑 3:
这样的东西应该可以工作......
Something like this should work...
在出站适配器上,设置 confirm-correlation-expression="#this"
(整个出站消息).
On the outbound adapter, set confirm-correlation-expression="#this"
(the whole outbound message).
有两种方法的类
private final Map<String, BlockingQueue<Boolean> suspenders;
public void suspend(Message<?> original) {
BlockingQueue<Boolean> bq = new LinkedBlockingQueue();
String key = someKeyFromOriginal(original);
suspenders.put(key, bq);
Boolean result = bq.poll(// some timeout);
try {
if (result == null) {
// timed out
}
else if (!result) {
// throw some exception to nack the message
}
}
finally {
suspenders.remove(key);
}
}
public void ackNack(Message<Message<?>> ackNak) {
Message<?> theOutbound = ackNak.payload;
BlockingQueue<Boolean> bq = suspenders.get(someKeyFromOriginal(theOutbound));
if (bq == null) // late ack/nack; ignore
else {
// check the ack/nack header
// if nack, bq.put(false)
// else, use another map field, to
// keep track of ack count Vs sequenceSize header in
// theOutbound; when all acks received, bq.put(true);
}
}
在第一种方法中暂停消费者线程;将 acks/nacks 从出站适配器路由到第二种方法.
Suspend the consumer thread in the first method; route the acks/nacks from the outbound adapter to the second method.
警告:这没有经过测试,只是我的头顶;但应该很接近.
Caveat: This is not tested, just off the top of my head; but it should be pretty close.