NET Core 2.0利用MassTransit集成RabbitMQ

NET Core 2.0利用MassTransit集成RabbitMQ

NET Core 2.0利用MassTransit集成RabbitMQ

https://www.cnblogs.com/Andre/p/9579764.html

在ASP.NET Core上利用MassTransit来集成使用RabbitMQ真的很简单,代码也很简洁。近期因为项目需要,我便在这基础上再次进行了封装,抽成了公共方法,使得使用RabbitMQ的调用变得更方便简洁。那么,就让咱们来瞧瞧其魅力所在吧。

MassTransit

先看看MassTransit是个什么宝贝(MassTransit官网的简介):

MassTransit是一个免费的开源轻量级消息总线,用于使用.NET框架创建分布式应用程序。MassTransit在现有的*消息传输上提供了一系列广泛的功能,从而以开发人员友好的方式使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠且可扩展的方式。

通俗描述:

MassTransit就是一套基于消息服务的高级封装类库,下游可联接RabbitMQ、Redis、MongoDb等服务。

github官网:https://github.com/MassTransit/MassTransit

RabbitMQ

RabbitMQ是成熟的MQ队列服务,是由 Erlang 语言开发的 AMQP 的开源实现。关于介绍RabbitMQ的中文资料也很多,有需要可以自行查找。我这里贴出其官网与下载安装的链接,如下:

官网:http://www.rabbitmq.com

下载与安装:http://www.rabbitmq.com/download.html

实现代码

通过上面的介绍,咱们已对MassTransit与RabbitMQ有了初步了解,那么现在来看看如何在ASP.NET Core上优雅的使用RabbitMQ吧。

1、创建一个名为“RabbitMQHelp.cs”公共类,用于封装操作RabbitMQ的公共方法,并通过Nuget来管理并引用“MassTransit”与“MassTransit.RabbitMQ”类库。

2、“RabbitMQHelp.cs”公共类主要对外封装两个静态方法,其代码如下:

复制代码
1 using MassTransit;
2 using MassTransit.RabbitMqTransport;
3 using System;
4 using System.Collections.Generic;
5 using System.Text;
6 using System.Threading.Tasks;
7
8 namespace Lezhima.Comm
9 {
10 ///


11 /// RabbitMQ公共操作类,基于MassTransit库
12 ///

13 public class RabbitMQHelp
14 {
15 #region 交换器
16
17 ///
18 /// 操作日志交换器
19 /// 同时需在RabbitMQ的管理后台创建同名交换器
20 ///

21 public static readonly string actionLogExchange = "Lezhima.ActionLogExchange";
22
23
24 #endregion
25
26
27 #region 声明变量
28
29 ///
30 /// MQ联接地址,建议放到配置文件
31 ///

32 private static readonly string mqUrl = "rabbitmq://192.168.1.181/";
33
34 ///
35 /// MQ联接账号,建议放到配置文件
36 ///

37 private static readonly string mqUser = "admin";
38
39 ///
40 /// MQ联接密码,建议放到配置文件
41 ///

42 private static readonly string mqPwd = "admin";
43
44 #endregion
45
46 ///
47 /// 创建连接对象
48 /// 不对外公开
49 ///

50 private static IBusControl CreateBus(Action<IRabbitMqBusFactoryConfigurator, IRabbitMqHost> registrationAction = null)
51 {
52 //通过MassTransit创建MQ联接工厂
53 return Bus.Factory.CreateUsingRabbitMq(cfg =>
54 {
55 var host = cfg.Host(new Uri(mqUrl), hst =>
56 {
57 hst.Username(mqUser);
58 hst.Password(mqPwd);
59 });
60 registrationAction?.Invoke(cfg, host);
61 });
62 }
63
64
65 ///
66 /// MQ生产者
67 /// 这里使用fanout的交换类型
68 ///

69 ///
70 public async static Task PushMessage(string exchange, object obj)
71 {
72 var bus = CreateBus();
73 var sendToUri = new Uri($"{mqUrl}{exchange}");
74 var endPoint = await bus.GetSendEndpoint(sendToUri);
75 await endPoint.Send(obj);
76 }
77
78 ///
79 /// MQ消费者
80 /// 这里使用fanout的交换类型
81 /// consumer必需是实现IConsumer接口的类实例
82 ///

83 ///
84 public static void ReceiveMessage(string exchange, object consumer)
85 {
86 var bus = CreateBus((cfg, host) =>
87 {
88 //从指定的消息队列获取消息 通过consumer来实现消息接收
89 cfg.ReceiveEndpoint(host, exchange, e =>
90 {
91 e.Instance(consumer);
92 });
93 });
94 bus.Start();
95 }
96 }
97 }
98
复制代码

3、“RabbitMQHelp.cs”公共类已经有了MQ“生产者”与“消费者”两个对外的静态公共方法,其中“生产者”方法可以在业务代码中直接调用,可传递JSON、对象等类型的参数向指定的交换器发送数据。而“消费者”方法是从指定交换器中进行接收绑定,但接收到的数据处理功能则交给了“consumer”类(因为在实际项目中,不同的数据有不同的业务处理逻辑,所以这里我们直接就通过IConsumer接口交给具体的实现类去做了)。那么,下面我们再来看看消费者里传递进来的“consumer”类的代码吧:

复制代码
1 using MassTransit;
2 using System;
3 using System.Collections.Generic;
4 using System.Text;
5 using System.Threading.Tasks;
6
7 namespace Lezhima.Storage.Consumer
8 {
9 ///


10 /// 从MQ接收并处理数据
11 /// 实现MassTransit的IConsumer接口
12 ///

13 public class LogConsumer : IConsumer
14 {
15 ///
16 /// 实现Consume方法
17 /// 接收并处理数据
18 ///

19 ///
20 ///
21 public Task Consume(ConsumeContext context)
22 {
23 return Task.Run(async () =>
24 {
25 //获取接收到的对象
26 var amsg = context.Message;
27 Console.WriteLine($"Recevied By Consumer:{amsg}");
28 Console.WriteLine($"Recevied By Consumer:{amsg.ActionLogId}");
29 });
30 }
31 }
32 }
33
复制代码

调用代码

1、生产者调用代码如下:

复制代码
1 ///


2 /// 测试MQ生产者
3 ///

4 ///
5 [HttpGet]
6 public async Task AddMessageTest()
7 {
8 //声明一个实体对象
9 var model = new ActionLog();
10 model.ActionLogId = Guid.NewGuid();
11 model.CreateTime = DateTime.Now;
12 model.UpdateTime = DateTime.Now;
13 //调用MQ
14 await RabbitMQHelp.PushMessage(RabbitMQHelp.actionLogExchange, model);
15
16 return new MobiResult(1000, "操作成功");
17 }
复制代码

2、消费者调用代码如下:

复制代码
1 using Lezhima.Storage.Consumer;
2 using Microsoft.Extensions.Configuration;
3 using System;
4 using System.IO;
5
6 namespace Lezhima.Storage
7 {
8 class Program
9 {
10 static void Main(string[] args)
11 {
12 var conf = new ConfigurationBuilder()
13 .SetBasePath(Directory.GetCurrentDirectory())
14 .AddJsonFile("appsettings.json", true, true)
15 .Build();
16
17 //调用接收者
18 RabbitMQHelp.ReceiveMessage(RabbitMQHelp.actionLogExchange,
19 new LogConsumer()
20 );
21
22 Console.ReadLine();
23 }
24 }
25 }
26
复制代码

总结

1、基于MassTransit库使得我们使用RabbitMQ变得更简洁、方便。而基于再次封装后,生产者与消费者将不需要关注具体的业务,也跟业务代码解耦了,更能适应项目的需要。

2、RabbitMQ的交换器需在其管理后台自行创建,而这里使用的fanout类型是因为其发送速度最快,且能满足我的项目需要,各位可视自身情况选用不同的类型。fanout类型不会存储消息,必需要消费者绑定交换器后才会发送给消费者。