RabbitMQ/AMQP:单个队列,同一条消息的多个使用者?
我一般才开始使用RabbitMQ和AMQP.
I am just starting to use RabbitMQ and AMQP in general.
- 我有一条消息队列
- 我有多个消费者,我想用相同的消息来做不同的事情.
- I have a queue of messages
- I have multiple consumers, which I would like to do different things with the same message.
RabbitMQ的大多数文档似乎都集中在循环上,即单个消息由单个使用者使用,而负载则在每个使用者之间分散.这确实是我目睹的行为.
Most of the RabbitMQ documentation seems to be focused on round-robin, ie where a single message is consumed by a single consumer, with the load being spread between each consumer. This is indeed the behavior I witness.
一个例子:生产者只有一个队列,每2秒发送一次消息:
An example: the producer has a single queue, and send messages every 2 sec:
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;
connection.on('ready', function () {
var sendMessage = function(connection, queue_name, payload) {
var encoded_payload = JSON.stringify(payload);
connection.publish(queue_name, encoded_payload);
}
setInterval( function() {
var test_message = 'TEST '+count
sendMessage(connection, "my_queue_name", test_message)
count += 1;
}, 2000)
})
这是一个消费者:
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
connection.queue("my_queue_name", function(queue){
queue.bind('#');
queue.subscribe(function (message) {
var encoded_payload = unescape(message.data)
var payload = JSON.parse(encoded_payload)
console.log('Recieved a message:')
console.log(payload)
})
})
})
如果我两次启动使用者,我可以看到每个使用者正在以循环方式使用备用消息.例如,我将在一个终端中看到消息1、3、5,在另一个终端中看到消息2、4、6 .
If I start the consumer twice, I can see that each consumer is consuming alternate messages in round-robin behavior. Eg, I'll see messages 1, 3, 5 in one terminal, 2, 4, 6 in the other.
我的问题是:
-
我可以让每个消费者收到相同的消息吗?即,两个消费者都收到消息1、2、3、4、5、6? AMQP/RabbitMQ所说的是什么?通常如何配置?
Can I have each consumer receive the same messages? Ie, both consumers get message 1, 2, 3, 4, 5, 6? What is this called in AMQP/RabbitMQ speak? How is it normally configured?
这是常见的吗?我是否应该让消息交换将消息路由到两个单独的队列中,并由一个使用者使用?
Is this commonly done? Should I just have the exchange route the message into two separate queues, with a single consumer, instead?
我可以让每个消费者收到相同的消息吗?即,两个消费者都收到消息1、2、3、4、5、6? AMQP/RabbitMQ所说的是什么?通常如何配置?
否,如果使用者在同一队列中,则不会.摘自RabbitMQ的 AMQP概念指南:
No, not if the consumers are on the same queue. From RabbitMQ's AMQP Concepts guide:
重要的是要了解,在AMQP 0-9-1中,消息在使用者之间是负载均衡的.
it is important to understand that, in AMQP 0-9-1, messages are load balanced between consumers.
这似乎暗示队列中的循环行为是给定的,并且不可配置.即,为了使多个使用者处理相同的消息ID,需要单独的队列.
This seems to imply that round-robin behavior within a queue is a given, and not configurable. Ie, separate queues are required in order to have the same message ID be handled by multiple consumers.
这通常是吗?我是否应该让消息交换将消息路由到两个单独的队列中,并由一个使用者使用?
不是,不是单个队列/多个使用者,每个使用者都处理相同的消息ID.通过交换将消息路由到两个单独的队列中确实更好.
No it's not, single queue/multiple consumers with each each consumer handling the same message ID isn't possible. Having the exchange route the message onto into two separate queues is indeed better.
由于我不需要太复杂的路由,因此扇出交换可以很好地解决这一问题.由于node-amqp具有默认交换"的概念,允许您直接将消息发布到连接中,因此我没有过多地关注Exchange,但是大多数AMQP消息都发布到了特定的交换中.
As I don't require too complex routing, a fanout exchange will handle this nicely. I didn't focus too much on Exchanges earlier as node-amqp has the concept of a 'default exchange' allowing you to publish messages to a connection directly, however most AMQP messages are published to a specific exchange.
这是我的扇出交流,同时发送和接收:
Here's my fanout exchange, both sending and receiving:
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;
connection.on('ready', function () {
connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {
var sendMessage = function(exchange, payload) {
console.log('about to publish')
var encoded_payload = JSON.stringify(payload);
exchange.publish('', encoded_payload, {})
}
// Recieve messages
connection.queue("my_queue_name", function(queue){
console.log('Created queue')
queue.bind(exchange, '');
queue.subscribe(function (message) {
console.log('subscribed to queue')
var encoded_payload = unescape(message.data)
var payload = JSON.parse(encoded_payload)
console.log('Recieved a message:')
console.log(payload)
})
})
setInterval( function() {
var test_message = 'TEST '+count
sendMessage(exchange, test_message)
count += 1;
}, 2000)
})
})