RabbitMQ示例:多个线程,通道和队列

问题描述:

我刚刚阅读了 RabbitMQ的Java API文档,发现它非常有用并且很简单,向前.如何设置简单的Channel进行发布/使用的示例非常容易理解和理解.但这是一个非常简单/基本的示例,它给我留下了一个重要的问题:我如何设置1+ Channels以便在多个队列之间发布/消费?

I just read RabbitMQ's Java API docs, and found it very informative and straight-forward. The example for how to set up a simple Channel for publishing/consuming is very easy to follow and understand. But it's a very simple/basic example, and it left me with an important question: How can I set up 1+ Channels to publish/consume to and from multiple queues?

假设我有一个RabbitMQ服务器,上面有3个队列:loggingsecurity_eventscustomer_orders.因此,我们要么只需要一个Channel就能发布/使用到所有3个队列,要么更有可能拥有3个单独的Channels,每个都专用于一个队列.

Let's say I have a RabbitMQ server with 3 queues on it: logging, security_events and customer_orders. So we'd either need a single Channel to have the ability to publish/consume to all 3 queues, or more likely, have 3 separate Channels, each dedicated to a single queue.

最重要的是,RabbitMQ的最佳实践要求我们为每个使用者线程设置1个Channel.对于此示例,假设security_events仅使用1个使用者线程是可以的,但是loggingcustomer_order都需要5个线程来处理该卷.因此,如果我理解正确,是否表示我们需要:

On top of this, RabbitMQ's best practices dictate that we set up 1 Channel per consumer thread. For this example, let's say security_events is fine with only 1 consumer thread, but logging and customer_order both need 5 threads to handle the volume. So, if I understand correctly, does that mean we need:

  • 1个Channel和1个使用者线程,用于与security_events之间进行发布/使用.和
  • 5个Channels和5个使用者线程,用于与logging之间进行发布/使用.和
  • 5个Channels和5个消费者线程用于发布和消费customer_orders?
  • 1 Channel and 1 consumer thread for publishing/consuming to and from security_events; and
  • 5 Channels and 5 consumer threads for publishing/consuming to and from logging; and
  • 5 Channels and 5 consumer threads for publishing/consuming to and from customer_orders?

如果在这里我的理解被误导了,请先纠正我.无论哪种方式,有些疲惫不堪的RabbitMQ老手可以帮我用一个体面的代码示例来连接点",以设置符合我这里要求的发布者/消费者吗?预先感谢!

If my understanding is misguided here, please begin by correcting me. Either way, could some battle-weary RabbitMQ veteran help me "connect the dots" with a decent code example for setting up publishers/consumers that meet my requirements here? Thanks in advance!

我认为您在初步理解上有几个问题.坦白说,我对以下内容感到有些惊讶:both need 5 threads to handle the volume.您如何确定需要该确切号码?您有任何保证5个线程就足够了吗?

I think you have several issues with initial understanding. Frankly, I'm a bit surprised to see the following: both need 5 threads to handle the volume. How did you identify you need that exact number? Do you have any guarantees 5 threads will be enough?

RabbitMQ已经过调优和时间测试,因此一切都与正确的设计有关 高效的消息处理.

RabbitMQ is tuned and time tested, so it is all about proper design and efficient message processing.

让我们尝试检查问题并找到适当的解决方案.顺便说一句,消息队列本身不会为您提供真正好的解决方案提供任何保证.您必须了解自己在做什么,并且还要进行一些其他测试.

Let's try to review the problem and find a proper solution. BTW, message queue itself will not provide any guarantees you have really good solution. You have to understand what you are doing and also do some additional testing.

您肯定知道有很多可能的布局:

As you definitely know there are many layouts possible:

我将使用布局B作为说明1生产者N消费者问题的最简单方法.由于您非常担心吞吐量.顺便说一句,您可能会期望RabbitMQ表现的很好().请注意prefetchCount,我稍后再解决:

I will use layout B as the simplest way to illustrate 1 producer N consumers problem. Since you are so worried about the throughput. BTW, as you might expect RabbitMQ behaves quite well (source). Pay attention to prefetchCount, I'll address it later:

因此消息处理逻辑很可能是确保您有足够吞吐量的正确位置.自然,每次需要处理一条消息时,您都可以跨越一个新线程,但是最终,这种方法将杀死您的系统.基本上,更多的线程将具有更大的延迟(如果需要,可以查看阿姆达尔定律).

So it is likely message processing logic is a right place to make sure you'll have enough throughput. Naturally you can span a new thread every time you need to process a message, but eventually such approach will kill your system. Basically, more threads you have bigger latency you'll get (you can check Amdahl's law if you want).

(请参见说明了阿姆达尔定律)

提示#1:请谨慎使用线程,请使用ThreadPools(详细信息)

Tip #1: Be careful with threads, use ThreadPools (details)

线程池可以描述为可运行对象的集合 (工作队列)和正在运行的线程的连接.这些线程是 持续运行,并正在检查工作查询中是否有新工作.如果 他们执行此Runnable有新工作要做.线程 类本身提供了一种方法,例如execute(Runnable r)添加一个新的 可运行对象到工作队列.

A thread pool can be described as a collection of Runnable objects (work queue) and a connections of running threads. These threads are constantly running and are checking the work query for new work. If there is new work to be done they execute this Runnable. The Thread class itself provides a method, e.g. execute(Runnable r) to add a new Runnable object to the work queue.

public class Main {
  private static final int NTHREDS = 10;

  public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
    for (int i = 0; i < 500; i++) {
      Runnable worker = new MyRunnable(10000000L + i);
      executor.execute(worker);
    }
    // This will make the executor accept no new threads
    // and finish all existing threads in the queue
    executor.shutdown();
    // Wait until all threads are finish
    executor.awaitTermination();
    System.out.println("Finished all threads");
  }
} 

提示#2:请谨慎处理消息处理开销

我会说这是显而易见的优化技术.您可能会发送少量且易于处理的消息.整个方法是要连续设置和处理较小的消息.大消息最终将起到开玩笑的作用,因此最好避免这种情况.

I would say this is obvious optimization technique. It is likely you'll send small and easy to process messages. The whole approach is about smaller messages to be continuously set and processed. Big messages eventually will play a bad joke, so it is better to avoid that.

因此,发送少量信息会更好,但是处理呢?每次提交工作都会产生开销.在传入消息率很高的情况下,批处理可能会非常有帮助.

So it is better to send tiny pieces of information, but what about processing? There is an overhead every time you submit a job. Batch processing can be very helpful in case of high incoming message rate.

例如,假设我们具有简单的消息处理逻辑,并且我们不希望每次处理消息时都具有线程特定的开销.为了优化非常简单的CompositeRunnable can be introduced:

For example, let's say we have simple message processing logic and we do not want to have thread specific overheads every time message is being processed. In order to optimize that very simple CompositeRunnable can be introduced:

class CompositeRunnable implements Runnable {

    protected Queue<Runnable> queue = new LinkedList<>();

    public void add(Runnable a) {
        queue.add(a);
    }

    @Override
    public void run() {
        for(Runnable r: queue) {
            r.run();
        }
    }
}

或者通过收集要处理的消息,以稍微不同的方式执行相同操作:

Or do the same in a slightly different way, by collecting messages to be processed:

class CompositeMessageWorker<T> implements Runnable {

    protected Queue<T> queue = new LinkedList<>();

    public void add(T message) {
        queue.add(message);
    }

    @Override
    public void run() {
        for(T message: queue) {
            // process a message
        }
    }
}

通过这种方式,您可以更有效地处理消息.

In such a way you can process messages more effectively.

提示3:优化邮件处理

尽管您知道可以并行处理消息(Tip #1)并减少处理开销(Tip #2)的事实,但您必须快速完成所有操作.冗余的处理步骤,繁重的循环等可能会对性能产生很大影响.请参阅有趣的案例研究:

Despite the fact you know can process messages in parallel (Tip #1) and reduce processing overhead (Tip #2) you have to do everything fast. Redundant processing steps, heavy loops and so on might affect performance a lot. Please see interesting case-study:

改善消息通过选择正确的XML解析器将吞吐量提高十倍.

提示4:连接和渠道管理

  • 在现有连接上启动新频道涉及一个网络 往返-启动新连接需要花费几个时间.
  • 每个连接在服务器上使用文件描述符.频道没有.
  • 在一个频道上发布大消息将阻止连接 当它熄灭时.除此之外,多路复用是相当透明的.
  • 如果服务器是服务器,则正在发布的连接可能会被阻止 超载-分开发布和使用是个好主意 连接
  • 准备好处理突发消息
  • Starting a new channel on an existing connection involves one network round trip - starting a new connection takes several.
  • Each connection uses a file descriptor on the server. Channels don't.
  • Publishing a large message on one channel will block a connection while it goes out. Other than that, the multiplexing is fairly transparent.
  • Connections which are publishing can get blocked if the server is overloaded - it's a good idea to separate publishing and consuming connections
  • Be prepared to handle message bursts

(来源)

请注意,所有技巧都可以完美地协同工作.如果您需要其他详细信息,请随时告诉我.

Please note, all tips are perfectly work together. Feel free to let me know if you need additional details.

完整的消费者示例()

Complete consumer example (source)

请注意以下几点:

  • channel.basicQos(prefetch)-如您先前所见,prefetchCount可能非常有用:
  • channel.basicQos(prefetch) - As you saw earlier prefetchCount might be very useful:

此命令允许使用者选择一个预取窗口,该窗口可以 指定准备处理的未确认消息的数量 收到.通过将预取计数设置为非零值,代理 不会向消费者传递任何可能违反该要求的消息 限制.为了向前移动窗口,消费者必须确认 收到一条消息(或一组消息).

This command allows a consumer to choose a prefetch window that specifies the amount of unacknowledged messages it is prepared to receive. By setting the prefetch count to a non-zero value, the broker will not deliver any messages to the consumer that would breach that limit. To move the window forwards, the consumer has to acknowledge the receipt of a message (or a group of messages).

  • ExecutorService threadExecutor -您可以指定正确配置的执行程序服务.
  • ExecutorService threadExecutor - you can specify properly configured executor service.
  • 示例:

    static class Worker extends DefaultConsumer {
    
        String name;
        Channel channel;
        String queue;
        int processed;
        ExecutorService executorService;
    
        public Worker(int prefetch, ExecutorService threadExecutor,
                      , Channel c, String q) throws Exception {
            super(c);
            channel = c;
            queue = q;
            channel.basicQos(prefetch);
            channel.basicConsume(queue, false, this);
            executorService = threadExecutor;
        }
    
        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException {
            Runnable task = new VariableLengthTask(this,
                                                   envelope.getDeliveryTag(),
                                                   channel);
            executorService.submit(task);
        }
    }
    

    您还可以检查以下内容:

    You can also check the following:

    • Solution Architecting Using Queues?
    • Some queuing theory: throughput, latency and bandwidth
    • A quick message queue benchmark: ActiveMQ, RabbitMQ, HornetQ, QPID, Apollo…