如何使用java和spring 3.0从JMS主题(而不是队列)并发处理多个消息?

如何使用java和spring 3.0从JMS主题(而不是队列)并发处理多个消息?

问题描述:

请注意,我想要多个消息侦听器处理来自主题的并发消息。此外,我希望每个消息监听器都以事务方式进行操作,这样在给定消息监听器中的处理失败将导致监听器的消息保留在主题上。

Note that I'd like multiple message listeners to handle successive messages from the topic concurrently. In addition I'd like each message listener to operate transactionally so that a processing failure in a given message listener would result in that listener's message remaining on the topic.

弹簧DefaultMessageListenerContainer似乎仅支持JMS队列的并发。

The spring DefaultMessageListenerContainer seems to support concurrency for JMS queues only.

我需要实例化多个DefaultMessageListenerContainers吗?

Do I need to instantiate multiple DefaultMessageListenerContainers?

沿垂直轴流动:

ListenerA reads msg 1        ListenerB reads msg 2        ListenerC reads msg 3
ListenerA reads msg 4        ListenerB reads msg 5        ListenerC reads msg 6
ListenerA reads msg 7        ListenerB reads msg 8        ListenerC reads msg 9
ListenerA reads msg 10       ListenerB reads msg 11       ListenerC reads msg 12
...

更新:

UPDATE:
Thanks for your feedback @T.Rob and @skaffman.

我最后做的是创建多个 DefaultMessageListenerContainers code> concurrency = 1 ,然后在消息侦听器中放置逻辑,以便只有一个线程处理给定的消息id。

What I ended up doing is creating multiple DefaultMessageListenerContainers with concurrency=1 and then putting logic in the message listener so that only one thread would process a given message id.

您不需要多个 DefaultMessageListenerContainer 实例,但是您需要配置 DefaultMessageListenerContainer 为并发,使用 concurrentConsumers 属性

You don't want multiple DefaultMessageListenerContainer instances, no, but you do need to configure the DefaultMessageListenerContainer to be concurrent, using the concurrentConsumers property:


指定要创建的并发
个消费者的数量。默认值为1.

Specify the number of concurrent consumers to create. Default is 1.

为此
设置更高的值将增加运行时计划的并发
消费者的标准
级别:这是
有效地是在任何给定时间计划的
的并发消费者的最小数量。这是一个
静态设置;对于动态缩放,
考虑指定
maxConcurrentConsumers设置

Specifying a higher value for this setting will increase the standard level of scheduled concurrent consumers at runtime: This is effectively the minimum number of concurrent consumers which will be scheduled at any given time. This is a static setting; for dynamic scaling, consider specifying the "maxConcurrentConsumers" setting instead.

提高并发
消费者的数量是推荐的,以便
缩放从队列进入的消息
的消耗。然而,注意
任何订单保证丢失
一旦多个消费者是
注册。一般来说,对于少量队列,请保持1
的消费者。

Raising the number of concurrent consumers is recommendable in order to scale the consumption of messages coming in from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In general, stick with 1 consumer for low-volume queues.

但是,底部有很大的警告: / p>

However, there's big warning at the bottom:


不要提高主题的并发用户数
这将导致同一消息的同时
消费,其中
几乎是不可取的。

Do not raise the number of concurrent consumers for a topic. This would lead to concurrent consumption of the same message, which is hardly ever desirable.

这很有趣,当你想到它是有道理的。如果你有多个 DefaultMessageListenerContainer 实例,也会发生同样的情况。

This is interesting, and makes sense when you think about it. The same would occur if you had multiple DefaultMessageListenerContainer instances.

我想也许你需要重新思考你的设计,我不知道我会建议什么。 pub / sub消息的并发消耗看起来是一个完全合理的事情,但如何避免同时向所有消费者传递相同的消息?

I think perhaps you need to rethink your design, although I'm not sure what I'd suggest. Concurrent consumption of pub/sub messages seems like a perfectly reasonable thing to do, but how to avoid getting the same message delivered to all of your consumers at the same time?