RabbitMQ在.NetCore中的基础应用 RabbitMQ Management HTTP API

一、Rabbit MQ 简介

        RabbitMQ是一个接收、转发消息的消息中间件。你可以把它想象成一个邮局:当你把邮件放假邮箱后时,你能够确保邮递员最终会将你的邮件送到收件人手里。在这个比喻中,RabbitMQ是一个邮箱、一个邮局、一个邮递员。RabbitMQ与邮局最大的不同是它接收、存储和转发二进制数据块--消息,而不是处理纸质信件。

RabbitMQ与其他消息中间件通常使用的一些名词:

  • Producing:指发送,一个发送消息的程序叫做Producer

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

  • Queue:在RabbitMQ中,类似与邮局中的邮箱,用于存放二进制数据块。虽然消息经过RabbitMQ和你的应用程序,但是它们只能存在queue里面。queue本质上是一个大的消息缓冲区,只受主机的内存与硬盘的限制。多个Producer可以发送消息至一个queue,多个Consumer可以从一个queue中接收消息。如下图所示:

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

  • Consuming:至接收消息,一个等待接收消息的程序叫Consumer

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

Producer、COnsumer、Broker没有必要安装在同一台主机上;实际上,在大多应用程序中都是分开安装。一个应用程序可以同时是Producer和Consumer。

RabbitMQ使用了AMQP网络通信协议。可以点击深入了解RabbitMQ的工作原理。

二、Rabbit MQ 安装

安装RabbitMQ服务端(当前最新v 3.8.5下载地址 )注意:安装前先安装Erlang(建议opt 23.0)

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

如遇上图错误,是因为ERLANG_HOME路径配置有误,修改环境变量ERLANG_HOME=erlang安装目录中Bin的上一级:

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

这个时候运行服务端,如果提示RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API则百度安装一下即可。

注意:MSVCR120.dll要下载与系统对应的版本,不然会很难受。

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

三、RabbitMQ示例--"HELLO WORLD"

下面我们以发送、接收"Hello World"的示例,先简单了解RabbitMQ的开发(以C#代码为示例):

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

首先创建两个控制台应用程序:RabbitMQ.Producer(消息生产者)、RabbitMQ.Consumer(消息消费者)

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

添加nuget引用-->>RabbitMQ.Client

RabbitMQ.Producer.Program.cs

using RabbitMQ.Client;
using System;
using System.Text;
using System.Timers;

namespace RabbitMQ.Producer
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                    durable: false,
                                    exclusive: false,
                                    autoDelete: false,
                                    arguments: null);

                string message = "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);
                Timer timer = new Timer();
                timer.Elapsed += new ElapsedEventHandler((object sender, ElapsedEventArgs e) => {
                    channel.BasicPublish(exchange: "",
                                        routingKey: "hello",
                                        basicProperties: null,
                                        body: body);
                    Console.WriteLine($"[{DateTime.Now}][Producer Test] Sent {message}");
                });
                timer.Interval = 1000;
                timer.Enabled = true;
                timer.Start();

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.Consumer.Program.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQ.Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Received {message}");
                };
                channel.BasicConsume(queue: "hello",
                                     autoAck: true,
                                     consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

执行效果,每隔一秒发送一次消息:

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

四、队列

任务队列最大的目的是为了避免立即处理一下大资源任务(比如一些文件数据)而不得不等待它完成后才能继续处理后续消息。取而代之的是我们计划稍后处理这些大资源的任务。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行该作业。当运行多个工作进程时,任务将在他们之间共享。

Round-robin dispatching-消息分发机制

上面的代码示例简单的展现了RabbitMQ的运行方式,下面的代码将体现队列的工作过程。以及一些代码中的参数配置。

  • 利用Threading.Sleep来假装处理大资源任务,修改Consumer中的代码,并修改Producer中的代码,增加消息随机数

RabbitMQ.Producer.Program.cs

using RabbitMQ.Client;
using System;
using System.Text;
using System.Timers;

namespace RabbitMQ.Producer
{
    class Program
    {
        static void Main(string[] args)
        {
            Random rd = new Random();
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                    durable: false,
                                    exclusive: false,
                                    autoDelete: false,
                                    arguments: null);

                Timer timer = new Timer();
                timer.Elapsed += new ElapsedEventHandler((object sender, ElapsedEventArgs e) => {
                    string message = "Hello World!" + rd.Next(1000, 10000);
                    var body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish(exchange: "",
                                        routingKey: "hello",
                                        basicProperties: null,
                                        body: body);
                    Console.WriteLine($"[{DateTime.Now}][Producer Test] Sent {message}");
                });
                timer.Interval = 1000;
                timer.Enabled = true;
                timer.Start();

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.Consumer.Program.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace RabbitMQ.Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Received {message}");
                    Thread.Sleep(3000);
                };
                channel.BasicConsume(queue: "hello",
                                     autoAck: true,
                                     consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code
  • 运行一个Producer和两个(或多个)Consumer,观察两个Consumer中接收到的消息

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

        观察发现,Producer一共发送了14条消息,两个Consumer分别收到了7条,这个就是RabbitMQ round-robin消息分发机制,当接收消息处理需要耗费大量时间时(如业务逻辑复杂、处理文件流信息等),可以同时开启多个Consumer去同时接收消息并处理,这样就可以并行处理堆积在队列中的消息。

Message acknowledgment-消息确认机制

        解决了因消费消息时间过长导致队列中消息堆积过多的问题之后,你可能会有这样的疑问,当Consumer接收到一条消息之后,任务处理到一半Consumer挂掉之后,这条消息按照RabbitMQ的机制,一旦发送给Consumer就会被标记为删除,而且接下来发送到这个Consumer的消息都会丢失,但这并不是你想要的结果。当一个Consumer挂掉后,应该把消息转发给另一个Consumer。

        为了解决上述问题,确保消息用于不会丢失,RabbitMQ提供了消息确认机制(message acknowledgments)。在Consumer成功接收并处理一笔消息后,会返回一个确认信息给RabbitMQ,告诉他指定的消息已经成功接收并处理,可以进行删除。如果Consumer因挂掉了(通道关闭、连接关闭、网络连接中断等)而未返回确认信息,RabbitMQ会理解消息没有完全被处理并把它重新排入队列之中。如果同时有另一个Consumer在线,它会快速的把消息重新发送给另一个Consumer。通过这种方式,即使Consumer偶然挂掉,也不会有消息丢失。

Consumer程序中,需要调整部分代码,当消息处理完成后,程序手动返回一个确认给RabbitMQ。

RabbitMQ.Consumer.Program.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace RabbitMQ.Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Received {message}");
                    Thread.Sleep(3000);
                    channel.BasicAck(ea.DeliveryTag, false);// Consumer返回消息处理完成确认信息
                    Console.WriteLine("完成确认信息返回...");
                };
                channel.BasicConsume(queue: "hello",
                                     autoAck: false,        // Consumer返回消息处理完成确认信息
                                     consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

        发布完Consumer程序后,打开两个Consumer,在其中一个接收到消息的时候,立即Ctrl+C,停止其运行,观察消息是否转发到另一个Consumer。

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

        在上动图中,左边的Comsumer在接收到消息9856时,我们立马终止其操作,然后发现右边的Consumer接收到了消息9856,并进行了处理确认。

        在使用BasicAck方法时,要确认接收消息与返回确认消息的channel是同一个,不然会抛出channel-level protocol exception的错误。而且要注意,当确定返回确认消息给RabbitMQ时(即设置 autoAck = false),一定不要忘记BasicAck方法,不然后果很严重。RabbitMQ一直接收不到确认消息,就会一直转发消息(随机转发),当你的Consumer全部关闭后,RabbitMQ将会消耗掉越来越多的内存,因为它无法释放未确认的消息。为了调试这种错误,你可以使用 rabbitmqctl

在Windows中,执行下面命令,可以查询未确认消息的数量

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

Message durability-消息持久机制

我们已经了解了,如何在Consumer挂掉的情况下确保消息任务不会丢失,但如果RabbitMQ服务器挂掉了,消息任务仍然会丢失。

当RabbitMQ服务器关闭会崩溃是,它将丢失队列及消息,除非你告诉它不要这么做。为了确保队列及消息不会丢失,有两件事需要我们去完成:我们需要同时给队列和消息打上持久的标记。

首先,我们需要确保队列在RabbitMQ节点重启队列中存在,对此可以作一下声明:

channel.QueueDeclare(queue: "hello",
                     durable: true,// 声明durable = true
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

上面的声明看起来没啥毛病,但是真正运行起来并没有任何效果。那是因为我们已经定义过了队列 "hello",且durable = false。RabbitMQ不允许重新用不同的参数去定义一个已经存在的队列,所以我们需要重新声明一个队列,例如 queue_task,同时设置消息的持久性(Persistent = true).

RabbitMQ.Producer.Program.cs

using RabbitMQ.Client;
using System;
using System.Text;
using System.Timers;

namespace RabbitMQ.Producer
{
    class Program
    {
        static void Main(string[] args)
        {
            Random rd = new Random();
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;       // 设置消息持久性
                channel.QueueDeclare(queue: "queue_task",
                                     durable: true,  // 设置队列持久性
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                Timer timer = new Timer();
                timer.Elapsed += new ElapsedEventHandler((object sender, ElapsedEventArgs e) => {
                    string message = "Hello World!" + " -- " + rd.Next(1000, 10000);
                    var body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish(exchange: "",
                                        routingKey: "queue_task",
                                        basicProperties: properties,// 设置消息持久性
                                        body: body);
                    Console.WriteLine($"[{DateTime.Now}][Producer Test] Sent {message}");
                });
                timer.Interval = 1000;
                timer.Enabled = true;
                timer.Start();

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.Consumer.Program.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace RabbitMQ.Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "queue_task",
                                     durable: true,  // 设置队列持久性
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Received {message}");
                    Thread.Sleep(3000);
                    channel.BasicAck(ea.DeliveryTag, false);// Consumer返回消息处理完成确认信息
                    Console.WriteLine("完成确认信息返回...");
                };
                channel.BasicConsume(queue: "queue_task",
                                     autoAck: false,        // Consumer返回消息处理完成确认信息
                                     consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code 

        调整完之后编译发布后,执行Producer,在执行过程中关闭RabbitMQ服务后再开启,执行Consumer观察是否可以接收到消息。你会发现重启RabbitMQ服务后,Consumer会正常接收到消息。

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

        由上图所示,当关闭RabbitMQ后,Producer不再发送消息,打开RabbitMQ,Producer继续发送消息,Consumer从第一条消息4204开始接收处理。(中间报错是因为服务器反应有点慢)

注意:设置消息Persistent的属性并能够完全确保消息不会丢失。尽管告诉了RabbitMQ需要保存消息至硬盘,但是仍然会有一个RabbitMQ接收消息后还未保存的时间点。而且RabbitMQ不能同时保存所有的消息至硬盘,有时往往只会写入内存之中。Persistent虽然不够健壮,但是一般的队列任务都是没问题的,如果需要更加安全的消息保护机制,可以点击了解

Fair Dispatch-公平分发机制

        2013年的某个夏天,小明与小刚相约去搬砖,有两种类型运砖车,A类车的砖头搬完需要2个小时,B类车的砖头搬完需要3个小时,工头小华和小明是表兄弟,所以一直让小明负责A类车,这个时候问题就来了,小明一直搬A类车的砖,所以经常出现,小刚在累死累活的时候,小明在放空。这个时候被老板看见了,重新分配了一下,小刚当工头,小明和小华搬砖。当小明搬完A类车时,如果小华正在搬砖,这个时候不管来的是A类车还是B类车,小明都需要接着搬,要保证工作饱和。

        上面提到的 round-robin dispatching是RabbitMQ默认的消息分发机制:两个Consumer,奇数消息发送给A,偶数消息发送给B。看似没啥问题,但仔细一品,如果奇数类消息处理时间很长而偶数类消息处理时间很短,那么Consumer A总是忙个不停,而Consumer B却总是很闲,但是RabbitMQ并不知道这些,还是继续平均分发消息。

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

        发生上述问题的原因是RabbitMQ只会分发进入队列中的消息,它不会考虑每台Consumer还剩多少未确认(消息正在处理,并未返回确认)的消息,只会盲目的把第几条消息分发给第几台Consumer。为了解决这个问题,可以在Consumer程序中使用BasicQos方法,设置prefetchCount=1。这个会告诉RabbitMQ在同一时间不能发送超过一条消息给Consumer,换句话说,只有在Consumer处理完上一条消息并返回确认后才可以分发消息给这个Consumer。取而代之,会把消息发送给另一台空闲的Consumer。

 channel.BasicQos(0, 1, false);                                                                                                                     

注意:这样的设置需要注意一个问题,如果多个Consumer都很忙的时候,那么消息会一直堆在Queue中,你需要考虑到Queue的容量,多增加几台Consumer或者采用其他一些策略。

代码整合

RabbitMQ.Producer.Program.cs

using RabbitMQ.Client;
using System;
using System.Text;
using System.Timers;

namespace RabbitMQ.Producer
{
    class Program
    {
        static void Main(string[] args)
        {
            Random rd = new Random();
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;       // 设置消息持久性
                channel.QueueDeclare(queue: "queue_task",
                                     durable: true,  // 设置队列持久性
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                Timer timer = new Timer();
                timer.Elapsed += new ElapsedEventHandler((object sender, ElapsedEventArgs e) => {
                    string message = "Hello World!" + " -- " + rd.Next(1000, 10000);
                    var body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish(exchange: "",
                                        routingKey: "queue_task",
                                        basicProperties: properties,// 设置消息持久性
                                        body: body);
                    Console.WriteLine($"[{DateTime.Now}][Producer Test] Sent {message}");
                });
                timer.Interval = 1000;
                timer.Enabled = true;
                timer.Start();
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.Consumer.Program.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace RabbitMQ.Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "queue_task",
                                     durable: true,  // 设置队列持久性
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                // 设置每次最多可以接收多少条消息,prefetchCount=1时,表明只有消息处理完成且返回确认后方可接收新消息
                channel.BasicQos(0, 1, false);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Received {message}");
                    Thread.Sleep(3000);
                    // Consumer返回消息处理完成确认信息
                    channel.BasicAck(ea.DeliveryTag, false);
                    Console.WriteLine("完成确认信息返回...");
                };
                channel.BasicConsume(queue: "queue_task",
                                     autoAck: false,        // Consumer返回消息处理完成确认信息
                                     consumer: consumer);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

在Producer程序中,我们设置了队列及消息的持久性,在Consumer程序中,我们设置了消息确认机制及消息公平分发机制。

如果想多了解channel的属性,即IModel和IBasicProperties可以点击RabbitMQ .NET client API reference online进行深入学习。

五、发布与订阅

        在上个章节,我们创建了一个工作队列,他主要的职责就是接收、分发消息,每一条消息准确的分发给某一台Consumer。有时,这种分发方式并不满足于业务需求,例如泰康的客户信息更改,需要保证所有系统的客户信息一致,这时由客户中心分发更新后的客户信息,多个系统进行接收并同步更新。这一章节,我们将会讨论如果将一条消息同事分发给多个Consumer。这中模式被称为“发布/订阅”。

        在这个章节中,我们会创建一个简单的日志系统。首先建一个LogProducer程序用于发送日志消息,再建两个LogConsumer程序:LogConsumerWrite与LogConsumerPrint,前者用于把=将日志写入硬盘,后者用于将日志打印出来。在这个日志系统中,所有运行的LogConsumer程序都会接收到LogProducer发送的所有日志消息。本质上,就是LogProducer发送日志消息给所有运行的LogConsumer。

Exchanges-交换机

让我们重新回顾一下上几个章节中的名词:

  • Producer是一个发送消息的应用程序;
  • Queue是一个存储消息的缓冲区;
  • Consumer是一个接收消息的应用程序;

        在上几章类容,表面上我们看到的是Queue去发送与接收消息。下面介绍一下RabbitMQ中的完整消息模型。这种消息模型的核心思想是Producer从来不直接发送消息给Queue。事实上,通常情况下Producer甚至不知道消息是否发送至Queue。取而代之,Producer只能发送消息给交换机。

        交换机是一个非常简单的东西。它从一边接受来自Producer发送的消息,另一边把消息塞进Queue。但交换机必须得完全明白他接收的消息是干嘛用的。这条消息是加入指定的队列还是加入多个队列?或者这条消息是否丢弃?这些规则由交换机类型去定义。

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

交换机类型分为:direct,topic,headers,fanout。这次的日志系统以fanout为例,字面理解就是广播所有他接收到的消息给他所知道的队列。

我们可以使用以下命令行去获取服务器上交换机的列表:

sudo rabbitmqctl list_exchanges

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

        你会发现有很多amq.*的交换机,这些是默认创建的,暂时用不到。其中第一个名称为空的交换机,其实就是上面我们演示的Hellow World默认创建的交换机,因为Producer不能直接发送消息给Queue,必须通过交换机进行消息处理。

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
                     routingKey: "hello",
                     basicProperties: null,
                     body: body);

        上面的代码是之前的例子,exchange: "",表示这是一个默认的或无名的交换机。routingKey: "hello",表示消息将会发送到指定的队列-hello。

Temporary queues-临时队列

        还记得上面我们定义的hello和queue_task两个队列吗?定义个队列的名称是非常重要的,我们需要指出在同一队列中的Consumer。当你需要在Producer和Consumer中分享一个队列时,队列的名称就格外的重要,而且队列不能重复申明。但是这样就不符合接下来的日志系统的需求了,我们希望所有队列都能够接收到日志消息,而且接收到的是最新的消息。为了满足日志系统的要求,需要做下面两件事:

  • 第一,无论何时连接RabbitMQ我们都需要一个新的、空的队列。因此我们创建队列时需要一个随机的名称,最好系统可以挑选一个随机的名称给我们。
  • 第二,一旦Consumer断开联机,Queue就应该自动删除

在RabbitMQ.Client中提供了一个无参申明队列的方法,我们可以创建一个非持久、独特的、自动删除、自动命名的队列。

var queueName = channel.QueueDeclare().QueueName;

Bindings-绑定

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

了解如何创建交换机和队列后。我们现在需要告诉交换机去发送消息给队列,两者之间的关系叫做 binding。

 channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); 

代码整合

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

        下面是示例代码,Producer程序看起来与之前的没啥不同。最大的不同是我们给exchange定义了一个名称,并清空了routingKey,因为在fanout交换机中,它是可以忽略的。

RabbitMQ.Exchange.Producer.Program.cs:

using RabbitMQ.Client;
using System;
using System.Text;
using System.Timers;

namespace RabbitMQ.Exchange.Producer
{
    class Program
    {
        static void Main(string[] args)
        {
            Random rd = new Random();
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
                Timer timer = new Timer();
                timer.Elapsed += new ElapsedEventHandler((object sender, ElapsedEventArgs e) =>
                {
                    string message = "Log Infoooo!" + " -- " + rd.Next(1000, 10000);
                    var body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish(exchange: "logs",
                                        routingKey: "",
                                        basicProperties: null,
                                        body: body);
                    Console.WriteLine($"[{DateTime.Now}][Producer Test] Sent {message}");
                });
                timer.Interval = 1000;
                timer.Enabled = true;
                timer.Start();
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.Exchange.Consumer_LogDisk.Program.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.IO;
using System.Text;

namespace RabbitMQ.Exchange.Consumer_LogDisk
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
                var queueName = channel.QueueDeclare().QueueName;
                channel.QueueBind(queue: queueName,
                                  exchange: "logs",
                                  routingKey: "");
                Console.WriteLine(" [*] Waiting for logs.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    using(var file = File.CreateText("loginfo.txt"))
                    {
                        file.Write(message);
                    }
                    // 记录消息
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Save {message}");
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,       
                                     consumer: consumer);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.Exchange.Consumer_LogPrint.Program.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQ.Exchange.Consumer_LogPrint
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
                var queueName = channel.QueueDeclare().QueueName;
                channel.QueueBind(queue: queueName,
                                  exchange: "logs",
                                  routingKey: "");
                Console.WriteLine(" [*] Waiting for logs.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    // 打印消息
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Print {message}");
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

        需要注意的是,如果没有队列与交换机绑定,那么消息将会丢失。但在日志系统中,这个没啥问题,如果没有Consumer接收消息,可以将其丢弃,如果需要确保消息不能丢失的业务请慎用。

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

        在上面动图中,先查询Queue列表信息,打开Consumer_Save,再查询Queue列表,发现多了一个amq-gen-随机字符串的Queue。打开Producer开始发送消息,再打开Consumer_Print,再查询Queue列表,又多了一个。这个时候发现Consumer_Print接收的消息是最新发送的消息,Producer发送完消息就立马丢弃消息。最后关闭Consumer,再查询Queue列表,发现创建的随机Queue自动销毁。

六、路由

        第五章的内容,用了一个日志系统的例子简单了解了如果同时广播消息个多个Consumer。在这章内容中,我们将添加一个特性去实现Consumer只订阅消息中的一部分。例如,我们只需要将严重的错误消息记录在硬盘中,其他所有消息依然都需要打印在控制台上。

Bindings

上几个例子,我们已经了解了如何创建Bindings:

 channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); 

Binding是交换机与队列之间的关联。通俗来说,队列对来自交换机的消息感兴趣。Bindings可以传递一个额外的routingKey参数。为了避免与BasicPublish方法中的参数混淆,我们可以叫他 binding key。

 channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "black"); 

Binding的意义取决于交换机的类型,扇形交换机完全忽略了他的价值。

Direct exchange

        上面日志系统的示例将发送所有日志给所有Consumer。我们对它进行扩展,可以基于日志的严重程度去过滤日志消息。例如,我们想只要接收到严重错误日志是,才会记录至硬盘,不会浪费硬盘空间去记录哪些信息、警告啥的。

       为了实现上述需求,我们将用direct交换机去替代fanout交换机。隐藏在direct交换机后的路由算法程序是比较简单的,消息中的routing key与队列中的binding key匹配时,该消息将会被塞进该队列中。

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

        上图所示,direct交换机与两个Queue绑定,routing key=organge月Q1绑定,routing key=black、green与Q2绑定,C1与Q1绑定,只接收organge消息,C2与Q2绑定,可以接收到black、green消息。

Multiple bindings

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

多重绑定,多个Queue绑定同一个路由key是完全没有问题的。在上上张图中,我们把通过key black将X与Q2绑定一起,这张图通过key black 将X 与 Q1、Q2绑定在一起,实现了类似于fanout交换机的功能,广播消息给所有的队列。

Emitting logs

我们用direct交换机来改进日志分发系统,将日志严重等级作为routing key,将direct exchage与Queue绑定一起。

首先,创建一个交换机

 channel.ExchangeDeclare(exchange: "direct_logs", type: "direct"); 

发送消息

var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
                     routingKey: severity,
                     basicProperties: null,
                     body: body);

为了简化程序,我们假设日志严重等级为:info、warning、error

Subscribing--订阅

var queueName = channel.QueueDeclare().QueueName;

foreach(var severity in args)
{
    channel.QueueBind(queue: queueName,
                      exchange: "direct_logs",
                      routingKey: severity);
}

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

示例代码如下:

RabbitMQ.DirectExchange.Producer.Program.cs

using RabbitMQ.Client;
using System;
using System.Text;
using System.Timers;

namespace RabbitMQ.DirectExchange.Producer
{
    class Program
    {
        private static readonly string[] LogLevel = new string[] { "Info", "Debug", "Warn","Error"};
        static void Main(string[] args)
        {
            Random rd = new Random();
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs_direct", type: ExchangeType.Direct);
                Timer timer = new Timer();
                timer.Elapsed += new ElapsedEventHandler((object sender, ElapsedEventArgs e) =>
                {
                    string loglevel = LogLevel[rd.Next(0, 4)];
                    string message = $"[{loglevel}] -- Log Infoooo!";
                    var body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish(exchange: "logs_direct",
                                        routingKey: loglevel,
                                        basicProperties: null,
                                        body: body);
                    Console.WriteLine($"[{DateTime.Now}][Producer Test] Sent {message}");
                });
                timer.Interval = 1000;
                timer.Enabled = true;
                timer.Start();
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.DirectExchange.Consumer_LogDisk.Program.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.IO;
using System.Text;

namespace RabbitMQ.DirectExchange.Consumer_LogDisk
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs_direct", type: ExchangeType.Direct);
                var queueName = channel.QueueDeclare().QueueName;
                channel.QueueBind(queue: queueName,
                                  exchange: "logs_direct",
                                  routingKey: "Error");// 只接收Error级别的消息,并写入硬盘
                Console.WriteLine(" [*] Waiting for logs.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    using (var file = File.CreateText("loginfo.txt"))
                    {
                        file.Write(message);
                    }
                    // 记录消息
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Save {message}");
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.DirectExchange.Consumer_LogPrint.Program.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQ.DirectExchange.Consumer_LogPrint
{
    class Program
    {
        private static readonly string[] LogLevel = new string[] { "Info", "Debug", "Warn", "Error" };

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs_direct", type: ExchangeType.Direct);
                var queueName = channel.QueueDeclare().QueueName;
                foreach (var item in LogLevel)
                {
                    channel.QueueBind(queue: queueName,
                                  exchange: "logs_direct",
                                  routingKey: item);// 接收所有级别的消息,并打印出来
                }
                Console.WriteLine(" [*] Waiting for logs.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Save {message}");
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

执行上面的代码:

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

两个队列,一个只接收Error类型的日志写入硬盘,另一个接收所有类型的日志,打印在控制台。

七、主题

        上一章节我们优化了日志记录系统。我们用direct exchange取代了fanout exchange(只能进行虚拟广播)实现了选择性接收日志消息。但是它仍然有一定的局限性-他不能基于多个条件进行路由选择。

        在我们的日志系统,我们可能不仅仅基于日志严重程度去进行订阅消息,还需要根据日志的来源进行分发日志消息。比如不同系统发布的日志需要进行不同的操作。例如,我们只想监听CRM所有的日志信息,Portal的只监听错误日志信息。为了实现上述需求,我们需要了解更加复杂的交换机 - topic exchange。

Topic exchange

        发送给topic交换机的消息的routing-key有特殊的格式要求,必需是以 点 分隔的字符串。字符串内容可以随便定义,但通常都是以消息的特性去定义。例如泰康项目中,客户中心MQ消息:q.tkzj-ykcrm.khzx1。routing-key可以定义的很长,最长为255个字节。binding-key必须也要用同样一种格式。topic交换机的逻辑与direct交换机类似--使用特定的routing-key发送的消息将被传送到使用匹配binding-key的所有队列之中。下面是binding-key的两种重要的匹配符:

  • *(星号)可以匹配一个单词。
  • #(井号)可以匹配0个或多个单词。

如下图所示:

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

        如上图所示,我们将要发送描述动物的消息。routing-key的格式为 <speed>.<color>.<species>。我们创建三个绑定,Q1的bingding-key为*.orange.* 。Q2的bingding-key为 *.*.rabbit 与l azy.#。

总结一下,Q1将接收到所有颜色为橙色的动物消息。Q2将接收到所有兔子类和迟钝的动物消息。

routing-key为 quick.orange.rabbit 的消息,将会同时发送给Q1与Q2。routing-key为 lazy.orange.elephant 也会同时发送给Q1与Q2。routing-key为 quick.roange.fox的消息,将只会发送给Q1。routing-key为 lazy.brown.fox的消息只会发送给Q2。

如果我们将routing-key设置为 quick.orange.male.rabbit 将匹配不到任何绑定,该消息将会丢失。

代码整合

其实出去名称与binding-key不同,topic交换机与direct交换机非常类似,如果binding-key中除去 * 与 #,那topic与direct就完全一样了。

下面我们将用topic交换机去改进日志系统。我们假设日志消息的routing-key有两个词 <facility>.<severity>。

RabbitMQ.DirectExchange.Producer.Program.cs:

using RabbitMQ.Client;
using System;
using System.Text;
using System.Timers;

namespace RabbitMQ.DirectExchange.Producer
{
    class Program
    {
        private static readonly string[] Source = new string[] { "Crm", "Portal", "Interface" };
        private static readonly string[] LogLevel = new string[] { "Info", "Debug", "Warn", "Error" };
        static void Main(string[] args)
        {
            Random rd = new Random();
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(
                    exchange: "logs_topic",
                    type: ExchangeType.Topic
                    );
                Timer timer = new Timer();
                timer.Elapsed += new ElapsedEventHandler((object sender, ElapsedEventArgs e) =>
                {
                    string source = Source[rd.Next(0, 3)];
                    string loglevel = LogLevel[rd.Next(0, 4)];
                    string message = $"[{source}][{loglevel}] -- Log Infoooo!";
                    var body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish(exchange: "logs_topic",
                                        // routingKey根据排列组合可以有12种
                                        routingKey: string.Concat(source, '.', loglevel),
                                        basicProperties: null,
                                        body: body);
                    Console.WriteLine($"[{DateTime.Now}][Producer Test] Sent {message}");
                });
                timer.Interval = 1000;
                timer.Enabled = true;
                timer.Start();
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.TopicExchange.Consumer_LogPrint.Program.cs:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQ.TopicExchange.Consumer_LogPrint
{
    class Program
    {
        private static readonly string[] Source = new string[] { "Crm", "Portal", "Interface" };
        private static readonly string[] LogLevel = new string[] { "Info", "Debug", "Warn", "Error" };

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs_topic", type: ExchangeType.Topic);
                var queueName = channel.QueueDeclare().QueueName;
                // 打印Crm所有消息
                channel.QueueBind(queue: queueName, exchange: "logs_topic", routingKey: "Crm.*");
                // 打印Portal、接口错误消息
                channel.QueueBind(queue: queueName, exchange: "logs_topic", routingKey: "*.Error");
                Console.WriteLine(" [*] Waiting for logs.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Save {message}");
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.TopicExchange.Consumer_LogDisk.Program.cs:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.IO;
using System.Text;

namespace RabbitMQ.TopicExchange.Consumer_LogDisk
{
    class Program
    {
        private static readonly string[] Source = new string[] { "Crm", "Portal", "Interface" };
        private static readonly string[] LogLevel = new string[] { "Info", "Debug", "Warn", "Error" };
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs_topic", type: ExchangeType.Topic);
                var queueName = channel.QueueDeclare().QueueName;
                // 存储所有系统的错误消息
                channel.QueueBind(queue: queueName, exchange: "logs_topic", routingKey: "#.Error");
                Console.WriteLine(" [*] Waiting for logs.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    using (var file = File.CreateText("loginfo.txt"))
                    {
                        file.Write(message);
                    }
                    // 记录消息
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Save {message}");
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

        上述动图,我们可以看出控制台打印出所有Crm系统所有的消息以及Portal与接口的错误消息;写入硬盘的Customer接收到了所有系统的错误消息。

        上面几章节内容的示例都是以每个新概念编写的简单的代码,例如direct、topic交换机的代码中省略了连接管理、错误处理、连接恢复、并发性的处理方式,谨记在上生产前应考虑周全。

八、Publisher Confirms--发布者确认

        发布者确认是RabbitMQ的一种去提高发布消息可靠性的扩展。当在通道上启用发布者确认时,客户端发布的消息将由代理异步确认,这意味着它们已在服务器端得到处理。在这一章节中我们将使用发布者确认机制去保证发布的消息能够安全的到达Consumer。这将涉及到几种发布者确认的策略以及他们的优缺点。

Enabling Publisher Confirms on a Channel--在Channel上开启发布者确认

发布者确认机制是AMQP协议的一种扩展,所以他不会默认开启。使用ConfirmSelect方法,启用channel上的发布者确认机制:

 var channel = connection.CreateModel(); channel.ConfirmSelect(); 

开启发布者确认机制的方法只使用一次,不需要每条发送的消息都使用。

策略一:单独发布消息确认

让我们从使用confirms发布消息的最简单方法开始,即发布消息并同步等待消息的确认:

while (ThereAreMessagesToPublish())
{
    byte[] body = ...;
    BasicProperties properties = ...;
    channel.BasicPublish(exchange, queue, properties, body);
    // uses a 5 second timeout
    channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
}

        在上面的示例中,我们像往常一样发布消息,并使用WaitForConfirmsOrDie方法等待消息的确认。消息确认后,该方法立即返回。如果消息在超时时间内没有得到确认,或者消息是nack-ed(这意味着代理由于某种原因无法处理它),那么该方法将抛出一个异常。异常的处理通常包括记录错误消息和/或重试发送消息。

        这种确认方式十分直接了当,但也有一个缺点:他在等待的同时阻止了后续消息的发布,大大降低了消息发布效率。这种方法不适用于一秒钟可以发几百条消息的情景。但是如果消息发布频率低的话,这种方法还是足够好的。

策略二:批量发布消息确认

为了优化上面的示例,我们可以按批次去确认消息发布情况。下面以100条一验证:

var batchSize = 100;
var outstandingMessageCount = 0;
while (ThereAreMessagesToPublish())
{
    byte[] body = ...;
    BasicProperties properties = ...;
    channel.BasicPublish(exchange, queue, properties, body);
    outstandingMessageCount++;
    if (outstandingMessageCount == batchSize)
    {
        channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
        outstandingMessageCount = 0;
    }
}
if (outstandingMessageCount > 0)
{
    channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
}

        与等待单个消息的确认相比,等待一批消息被确认大大提高了吞吐量(对于远程RabbitMQ节点,可以达到20-30倍)。但有一个缺点是,我们不知道在失败的情况下到底出了什么问题,所以我们可能必须在内存中保留一整批数据,以便记录有意义的内容或重新发布消息。而且这个解决方案仍然是同步的,所以它阻止了消息的发布。

策略三:异步处理发布消息确认

代理以异步方式确认已发布的消息,只需在客户端上注册一个回调即可收到这些确认的通知:

var channel = connection.CreateModel();
channel.ConfirmSelect();
channel.BasicAcks += (sender, ea) =>
{
  // code when message is confirmed
};
channel.BasicNacks += (sender, ea) =>
{
  //code when message is nack-ed
};

有两个回调:一个用于确认消息,一个用于nack-ed消息(代理可以认为丢失的消息)。两个回调都有一个对应的EventArgs参数(ea),其中包含:

  • delivery tag:识别已确认或未经确认的报文的序列号。我们将很快看到如何将它与发布的消息关联起来。
  • multiple:这是一个布尔值。如果为false,则只有一条消息被确认/nack-ed;如果为true,则序列号较低或相等的所有消息都被确认/nack-ed。

在发布消息前,可以通过Channel中的NextPublishSeqNo去获取消息的序列号:

 var sequenceNumber = channel.NextPublishSeqNo; channel.BasicPublish(exchange, queue, properties, body); 

用Dictionary将消息去序列号关联起来。我们假设发布的是字符串因为他们容易转换成字节数组。:

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();
// ... code for confirm callbacks will come later
var body = "...";
outstandingConfirms.TryAdd(channel.NextPublishSeqNo, body);
channel.BasicPublish(exchange, queue, properties, Encoding.UTF8.GetBytes(body));

发布代码现在使用字典跟踪出站消息。当确认到达时,我们需要清理此字典,并在消息被取消时记录警告:

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();

void cleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
{
    if (multiple)
    {
        var confirmed = outstandingConfirms.Where(k => k.Key <= sequenceNumber);
        foreach (var entry in confirmed)
        {
            outstandingConfirms.TryRemove(entry.Key, out _);
        }
    }
    else
    {
        outstandingConfirms.TryRemove(sequenceNumber, out _);
    }
}

channel.BasicAcks += (sender, ea) => cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
channel.BasicNacks += (sender, ea) =>
{
    outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
    Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
    cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};
// ... publishing code

        上述示例包含一个回调,当确认到达时,该回调将清理字典。注意这个回调同时处理单个和多个确认。当确认到达时使用此回调(Channel.BasicAcks)。nack-ed消息的回调将检索消息正文并发出警告。然后,它重新使用先前的回调来清除字典中未完成的确认(无论消息是确认的还是nack的,它们在字典中的对应条目都必须被删除)。

总结一下,异步发布确认通常需要一下几个步骤:

  • 提供一个序列号与消息关联起来:例如使用 ConcurrentDictionary
  • 在Channel上注册一个确认监听事件,当发布的消息到达时,无论是ack或者nack-ed,都会观察到并作出合适的操作,例如记录nack-ed消息或者重新发布nack-ed消息,同时清理ConsucurrentDictionary中序列号与消息的关联。
  • 在消息发布之前追踪序列号。

代码整合

using RabbitMQ.Client;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Text;
using System.Linq;
using System.Threading;

class PublisherConfirms
{
    private const int MESSAGE_COUNT = 50_000;

        public static void Main()
        {
            PublishMessagesIndividually();
            PublishMessagesInBatch();
            HandlePublishConfirmsAsynchronously();
        }

        private static IConnection CreateConnection()
        {
            var factory = new ConnectionFactory { HostName = "localhost" };
            return factory.CreateConnection();
        }

        private static void PublishMessagesIndividually()
        {
            using (var connection = CreateConnection())
            using (var channel = connection.CreateModel())
            {
                var queueName = channel.QueueDeclare().QueueName;
                channel.ConfirmSelect();

                var timer = new Stopwatch();
                timer.Start();
                for (int i = 0; i < MESSAGE_COUNT; i++)
                {
                    var body = Encoding.UTF8.GetBytes(i.ToString());
                    channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
                    channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
                }
                timer.Stop();
                Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages individually in {timer.ElapsedMilliseconds:N0} ms");
            }
        }

        private static void PublishMessagesInBatch()
        {
            using (var connection = CreateConnection())
            using (var channel = connection.CreateModel())
            {
                var queueName = channel.QueueDeclare().QueueName;
                channel.ConfirmSelect();

                var batchSize = 100;
                var outstandingMessageCount = 0;
                var timer = new Stopwatch();
                timer.Start();
                for (int i = 0; i < MESSAGE_COUNT; i++)
                {
                    var body = Encoding.UTF8.GetBytes(i.ToString());
                    channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
                    outstandingMessageCount++;

                    if (outstandingMessageCount == batchSize)
                    {
                        channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
                        outstandingMessageCount = 0;
                    }
                }

                if (outstandingMessageCount > 0)
                    channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));

                timer.Stop();
                Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages in batch in {timer.ElapsedMilliseconds:N0} ms");
            }
        }

        private static void HandlePublishConfirmsAsynchronously()
        {
            using (var connection = CreateConnection())
            using (var channel = connection.CreateModel())
            {
                var queueName = channel.QueueDeclare().QueueName;
                channel.ConfirmSelect();

                var outstandingConfirms = new ConcurrentDictionary<ulong, string>();

                void cleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
                {
                    if (multiple)
                    {
                        var confirmed = outstandingConfirms.Where(k => k.Key <= sequenceNumber);
                        foreach (var entry in confirmed)
                            outstandingConfirms.TryRemove(entry.Key, out _);
                    }
                    else
                        outstandingConfirms.TryRemove(sequenceNumber, out _);
                }

                channel.BasicAcks += (sender, ea) => cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
                channel.BasicNacks += (sender, ea) =>
                {
                    outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
                    Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
                    cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
                };

                var timer = new Stopwatch();
                timer.Start();
                for (int i = 0; i < MESSAGE_COUNT; i++)
                {
                    var body = i.ToString();
                    outstandingConfirms.TryAdd(channel.NextPublishSeqNo, i.ToString());
                    channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: Encoding.UTF8.GetBytes(body));
                }

                if (!WaitUntil(60, () => outstandingConfirms.IsEmpty))
                    throw new Exception("All messages could not be confirmed in 60 seconds");

                timer.Stop();
                Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages and handled confirm asynchronously {timer.ElapsedMilliseconds:N0} ms");
            }
        }

        private static bool WaitUntil(int numberOfSeconds, Func<bool> condition)
        {
            int waited = 0;
            while(!condition() && waited < numberOfSeconds * 1000)
            {
                Thread.Sleep(100);
                waited += 100;
            }

            return condition();
        }
}
View Code

九、扩展

UI管理工具

该工具是包含在RabbitMQ中插件中的一种,使用前需要开启:

 rabbitmq-plugins enable rabbitmq_management 

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

开启后访问 http://localhost:15672/

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

这个时候需要用命令行去创建一个管理员账号:

# 创建用户
rabbitmqctl add_user <account> <password>
# 给用户一个管理员角色
rabbitmqctl set_user_tags <account> administrator

权限管理:

Tag Capabilities
(None) 没有权限使用管理插件
management 用户通过消息传递协议可以进行任何查找:
  • 列出所有他们可以通过AMQP登录的虚拟机
  • 查看他们虚拟机中所有的Queue、Exchange、Binding
  • 查看和关闭他们自己的Channel、Connection
  • 查看涵盖其所有虚拟主机的“全局”统计信息,包括其中其他用户的活动
policymaker 在management权限的基础上增加:
  • 查看、创建、删除他们可以通过AMQP登录的虚拟机的策略与参数
monitoring 在management权限的基础上增加:
  • 列出所有虚拟主机,包括无法使用消息传递协议访问的虚拟主机
  • 查看其它用户的Connection、Channel
  • 查看节点级数据,如内存使用和集群
  • 查看所有虚拟机的全局信息
administrator 在policymaker与monitoring权限的基础上增加:
  • 查看、创建、删除虚拟机
  • 查看、创建、删除用户
  • 查看、创建、删除权限
  • 关闭其它用户的连接

至于UI管理的运用,可以自己动手试一试。

配置文件

由于RabbitMQ安装包中不包含配置文件,我们可以从官方下载---点击下载 ,下载后命名为 rabbitmq.conf。

将配置文件放入下图位置:

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

修改配置文件后需要重新启动RabbitMQ节点。

控制命令行

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

有兴趣的可以自行查看他们的功能,例如rabbitmqctl -help,会列出其所有的命令行:

rabbitmqctl add_user <account> <password>   增加用户

rabbitmqctl set_user_tags <account> <tag> 赋角色

rabbitmqctl delete_user <account> 删除用户

rabbitmqctl list_exchanges 列出所有交换机

......

RabbitMQ UI 站点还提供了API的信息 可以访问 http://localhost:15672/api/index.html 里面包含了 api 的信息及其应用示例。

现在我们简单的了解一个发送消息的接口:

注释掉之前RabbitMQ.Exchange.Producer.Program.cs中自动发消息的代码,由于我们增加了用户,所以可以通过账号密码进行连接(注意:账号区分大小写!!!)。

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API同时运行,Producer、Consumer,打开Manager UI,观察变化:

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP APIRabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

点击 queue_task 进去队列详情,可以发送消息,(测试、补发 使用):

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

准备开始调用接口,安装postman工具,获取接口授权,授权方式是HTTP基本身份验证:

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

输入参数点击发送

RabbitMQ在.NetCore中的基础应用
RabbitMQ Management HTTP API

        虽然API中有发布消息的接口,但RabbitMQ官方不建议这种发布消息的方式,对于高频率的消息推送,每次都创建一条TCP连接,对性能有不小的影响。当然除此之外还有很多其他的API,大家有兴趣可以去看一哈。

文档参考 www.rabbitmq.com