.NET Core 使用RabbitMQ RabbitMQ简介 RabbitMQ安装 RabbitMQ常用命令 .NET Core 使用RabbitMQ RabbitMQ消费失败的处理 使用RabbitMQ的Exchange

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。

RabbitMQ安装

RabbitMQ安装,网上已经有许多教程了,这里简单介绍一下在CentOS下安装RabbitMQ。使用的版本为3.6.12最新版。

1.首先安装erlang

rpm -Uvh https://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm

2.然后安装socat

yum install socat

3.最后安装RabbitMQ

rpm -Uvh https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/rabbitmq-server-3.6.12-1.el7.noarch.rpm

RabbitMQ常用命令

启用Web控制台

rabbitmq-plugins enable rabbitmq_management

开启服务

systemctl start rabbitmq-server.service

停止服务

systemctl stop rabbitmq-server.service

查看服务状态

systemctl status rabbitmq-server.service

查看RabbitMQ状态

rabbitmqctl status

添加用户赋予管理员权限

rabbitmqctl  add_user  username  password
rabbitmqctl  set_user_tags  username  administrator

查看用户列表

rabbitmqctl list_users

删除用户

rabbitmqctl delete_user username

修改用户密码

rabbitmqctl oldPassword Username newPassword

访问Web控制台

http://服务器ip:15672/ 注意配置防火墙,默认用户名密码都是guest,若新建用户一定要记得配置权限。

.NET Core 使用RabbitMQ
RabbitMQ简介
RabbitMQ安装
RabbitMQ常用命令
.NET Core 使用RabbitMQ
RabbitMQ消费失败的处理
使用RabbitMQ的Exchange

.NET Core 使用RabbitMQ

通过nuget安装:https://www.nuget.org/packages/RabbitMQ.Client/

定义生产者
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "admin",//用户名
    Password = "admin",//密码
    HostName = "192.168.157.130"//rabbitmq ip
};

//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();
//声明一个队列
channel.QueueDeclare("hello", false, false, false, null);

Console.WriteLine("
RabbitMQ连接成功,请输入消息,输入exit退出!");

string input;
do
{
    input = Console.ReadLine();

    var sendBytes = Encoding.UTF8.GetBytes(input);
    //发布消息
    channel.BasicPublish("", "hello", null, sendBytes);

} while (input.Trim().ToLower()!="exit");
channel.Close();
connection.Close();
定义消费者
  //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory
            {
                UserName = "admin",//用户名
                Password = "admin",//密码
                HostName = "192.168.157.130"//rabbitmq ip
            };

            //创建连接
            var connection = factory.CreateConnection();
            //创建通道
            var channel = connection.CreateModel();

            //事件基本消费者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //接收到消息事件
            consumer.Received += (ch, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine($"收到消息: {message}");
                //确认该消息已被消费
                channel.BasicAck(ea.DeliveryTag, false);
            };
            //启动消费者 设置为手动应答消息
            channel.BasicConsume("hello", false, consumer);
            Console.WriteLine("消费者已启动");
            Console.ReadKey();
            channel.Dispose();
            connection.Close();
运行

.NET Core 使用RabbitMQ
RabbitMQ简介
RabbitMQ安装
RabbitMQ常用命令
.NET Core 使用RabbitMQ
RabbitMQ消费失败的处理
使用RabbitMQ的Exchange

启动了一个生产者,两个消费者,可以看见两个消费者都能收到消息,消息投递到哪个消费者是由RabbitMQ决定的。

RabbitMQ消费失败的处理

RabbitMQ采用消息应答机制,即消费者收到一个消息之后,需要发送一个应答,然后RabbitMQ才会将这个消息从队列中删除,如果消费者在消费过程中出现异常,断开连接切没有发送应答,那么RabbitMQ会将这个消息重新投递。

修改一下消费者的代码:

//接收到消息事件
consumer.Received += (ch, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);

    Console.WriteLine($"收到消息: {message}");

    Console.WriteLine($"收到该消息[{ea.DeliveryTag}] 延迟10s发送回执");
    Thread.Sleep(10000);
    //确认该消息已被消费
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine($"已发送回执[{ea.DeliveryTag}]");
};

演示:

.NET Core 使用RabbitMQ
RabbitMQ简介
RabbitMQ安装
RabbitMQ常用命令
.NET Core 使用RabbitMQ
RabbitMQ消费失败的处理
使用RabbitMQ的Exchange

从图中可以看出,设置了消息应答延迟10s,如果在这10s中,该消费者断开了连接,那么消息会被RabbitMQ重新投递。

使用RabbitMQ的Exchange

前面我们可以看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)

.NET Core 使用RabbitMQ
RabbitMQ简介
RabbitMQ安装
RabbitMQ常用命令
.NET Core 使用RabbitMQ
RabbitMQ消费失败的处理
使用RabbitMQ的Exchange

AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

RabbitMQ提供了四种Exchange模式:direct,fanout,topic,header 。但是 header模式在实际使用中较少,所以这里只介绍前三种模式。

Exchange不是消费者关心的,所以消费者的代码完全不用变,用上面的消费者就行了。
由于避免文章过长,影响阅读,所以只贴了部分代码,但是demo里面是完整可运行的,详细代码请查看demo。

Direct Exchange

.NET Core 使用RabbitMQ
RabbitMQ简介
RabbitMQ安装
RabbitMQ常用命令
.NET Core 使用RabbitMQ
RabbitMQ消费失败的处理
使用RabbitMQ的Exchange

所有发送到Direct Exchange的消息被转发到具有指定RouteKey的Queue。

Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

 1 //创建连接
 2 var connection = factory.CreateConnection();
 3 //创建通道
 4 var channel = connection.CreateModel();
 5 
 6 //定义一个Direct类型交换机
 7 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
 8 
 9 //定义一个队列
10 channel.QueueDeclare(queueName, false, false, false, null);
11 
12 //将队列绑定到交换机
13 channel.QueueBind(queueName, exchangeName, routeKey, null);

运行:

.NET Core 使用RabbitMQ
RabbitMQ简介
RabbitMQ安装
RabbitMQ常用命令
.NET Core 使用RabbitMQ
RabbitMQ消费失败的处理
使用RabbitMQ的Exchange

Fanout Exchange

.NET Core 使用RabbitMQ
RabbitMQ简介
RabbitMQ安装
RabbitMQ常用命令
.NET Core 使用RabbitMQ
RabbitMQ消费失败的处理
使用RabbitMQ的Exchange

所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。

Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。

所以,Fanout Exchange 转发消息是最快的。

为了演示效果,定义了两个队列,分别为hello1,hello2,每个队列都拥有一个消费者。

 1 static void Main(string[] args)
 2 {
 3     string exchangeName = "TestFanoutChange";
 4     string queueName1 = "hello1";
 5     string queueName2 = "hello2";
 6     string routeKey = "";
 7 
 8     //创建连接工厂
 9     ConnectionFactory factory = new ConnectionFactory
10     {
11         UserName = "admin",//用户名
12         Password = "admin",//密码
13         HostName = "192.168.2.6"//rabbitmq ip
14     };
15 
16     //创建连接
17     var connection = factory.CreateConnection();
18     //创建通道
19     var channel = connection.CreateModel();
20 
21     //定义一个Direct类型交换机
22     channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);
23 
24     //定义队列1
25     channel.QueueDeclare(queueName1, false, false, false, null);
26     //定义队列2
27     channel.QueueDeclare(queueName2, false, false, false, null);
28 
29     //将队列绑定到交换机
30     channel.QueueBind(queueName1, exchangeName, routeKey, null);
31     channel.QueueBind(queueName2, exchangeName, routeKey, null);
32 
33     //生成两个队列的消费者
34     ConsumerGenerator(queueName1);
35     ConsumerGenerator(queueName2);
36 
37 
38     Console.WriteLine($"
RabbitMQ连接成功,

请输入消息,输入exit退出!");
39 
40     string input;
41     do
42     {
43         input = Console.ReadLine();
44 
45         var sendBytes = Encoding.UTF8.GetBytes(input);
46         //发布消息
47         channel.BasicPublish(exchangeName, routeKey, null, sendBytes);
48 
49     } while (input.Trim().ToLower() != "exit");
50     channel.Close();
51     connection.Close();
52 }
53 
54 /// <summary>
55 /// 根据队列名称生成消费者
56 /// </summary>
57 /// <param name="queueName"></param>
58 static void ConsumerGenerator(string queueName)
59 {
60     //创建连接工厂
61     ConnectionFactory factory = new ConnectionFactory
62     {
63         UserName = "admin",//用户名
64         Password = "admin",//密码
65         HostName = "192.168.157.130"//rabbitmq ip
66     };
67 
68     //创建连接
69     var connection = factory.CreateConnection();
70     //创建通道
71     var channel = connection.CreateModel();
72 
73     //事件基本消费者
74     EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
75 
76     //接收到消息事件
77     consumer.Received += (ch, ea) =>
78     {
79         var message = Encoding.UTF8.GetString(ea.Body);
80 
81         Console.WriteLine($"Queue:{queueName}收到消息: {message}");
82         //确认该消息已被消费
83         channel.BasicAck(ea.DeliveryTag, false);
84     };
85     //启动消费者 设置为手动应答消息
86     channel.BasicConsume(queueName, false, consumer);
87     Console.WriteLine($"Queue:{queueName},消费者已启动");
88 }

运行:

.NET Core 使用RabbitMQ
RabbitMQ简介
RabbitMQ安装
RabbitMQ常用命令
.NET Core 使用RabbitMQ
RabbitMQ消费失败的处理
使用RabbitMQ的Exchange

Topic Exchange

.NET Core 使用RabbitMQ
RabbitMQ简介
RabbitMQ安装
RabbitMQ常用命令
.NET Core 使用RabbitMQ
RabbitMQ消费失败的处理
使用RabbitMQ的Exchange

所有发送到Topic Exchange的消息被转发到能和Topic匹配的Queue上,

Exchange 将路由进行模糊匹配。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“XiaoChen.#”能够匹配到“XiaoChen.pets.cat”,但是“XiaoChen.*” 只会匹配到“XiaoChen.money”。

所以,Topic Exchange 使用非常灵活。

 1 string exchangeName = "TestTopicChange";
 2 string queueName = "hello";
 3 string routeKey = "TestRouteKey.*";
 4 
 5 //创建连接工厂
 6 ConnectionFactory factory = new ConnectionFactory
 7 {
 8     UserName = "admin",//用户名
 9     Password = "admin",//密码
10     HostName = "192.168.2.6"//rabbitmq ip
11 };
12 
13 //创建连接
14 var connection = factory.CreateConnection();
15 //创建通道
16 var channel = connection.CreateModel();
17 
18 //定义一个Direct类型交换机
19 channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);
20 
21 //定义队列1
22 channel.QueueDeclare(queueName, false, false, false, null);
23 
24 //将队列绑定到交换机
25 channel.QueueBind(queueName, exchangeName, routeKey, null);
26 
27 
28 
29 Console.WriteLine($"
RabbitMQ连接成功,

请输入消息,输入exit退出!");
30 
31 string input;
32 do
33 {
34     input = Console.ReadLine();
35 
36     var sendBytes = Encoding.UTF8.GetBytes(input);
37     //发布消息
38     channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);
39 
40 } while (input.Trim().ToLower() != "exit");
41 channel.Close();
42 connection.Close();
运行

.NET Core 使用RabbitMQ
RabbitMQ简介
RabbitMQ安装
RabbitMQ常用命令
.NET Core 使用RabbitMQ
RabbitMQ消费失败的处理
使用RabbitMQ的Exchange