EasyNetQ笔记-Publish/Subscribe模式(发布订阅) 介绍 消息发布(Publish) 消息订阅(Subscribe) 非泛型发布订阅

asyNetQ支持的最简单的消息模式是发布/订阅.这个模式是一个极好的方法用来解耦消息提供者和消费者。消息发布者只要简单的对世界说,“这里有些事发生” 或者 “我现在有一个信息”。它不关心有没有人监听,或者接收者是谁,或者接收者在那里。我们能够添加和移除特定类型的消息的订阅者,不需发布者做任何的重新配置。我们也能够有多个发布者发布相同的消息,添加和删除发布者也不用其他的发布者或者订阅者做任何重新配置。
假如你开始去发布消息,而没有任何订阅者曾经定义此消息,那么这个消息就简单的消失了。
一个EasyNetQ订阅者订阅一种消息类型(消息类为.NET 类型)。通过调用Subcribe方法一旦对一个类型设置了订阅,一个持久化队列就会在RabbitMQ broker上被创建,这个类型的任何消息都会被发送到这个队列上。订阅者无论什么时候连接上,RabbitMQ都将会将消息从队列中发送给订阅者。

消息发布(Publish)

  EasyNetQ支持最简单的消息模式是发布和订阅。发布消息后,任意消费者可以订阅该消息,也可以多个消费者订阅。并且不需要额外配置。首先,如上文中需要先创建一个IBus对象,然后,在创建一个可序列化的.NET对象。调用Publish方法即可。

var _bus = RabbitHutch.CreateBus("host=xxxxxx;port=5672;virtualHost=my_vhost;username=admin;password=admin;timeout=30;publisherConfirms=true");//连接字符串末尾不要加";"
var message = new MyMessage { Text = "Hello Rabbit" };
            for (int i = 0; i < 10; i++)
            {
                _bus.Publish<MyMessage>(message);
            }

  警告,Publish只顾发送消息到队列,但是不管有没有消费端订阅,所以,发布之后,如果没有消费者,该消息将不会被消费甚至丢失。

消息订阅(Subscribe)

  EasyNetQ提供了消息订阅,当调用Subscribe方法时候,EasyNetQ会创建一个用于接收消息的队列,不过与消息发布不同的是,消息订阅增加了一个参数,subscribe_id.代码如下:

_bus.Subscribe<MyMessage>("subscribe_id", myMessage => {
                lbxMessage1.Invoke(new Action(() => { lbxMessage1.Items.Add(myMessage.Text); }));
            });

  第一个参数是订阅id,另外一个是delegate参数,用于处理接收到的消息。
这里要注意的是,subscribe_id参数很重要,假如开发者用同一个subscribeid订阅了同一种消息类型两次或者多次,RabbitMQ会以轮训的方式给每个订阅的队列发送消息。接收到之后,其他队列就接收不到该消息。
如果用不同的subscribeid订阅同一种消息类型,那么生成的每一个队列都会收到该消息。
  举个例子:出库发货,我们有五个商品仓库,每个仓库的商品都是一样的,假如来了一堆订单,那么我们需要五个仓库共同工作,分别处理订单。而同样,总仓库需要知道总出货量,正常情况下,可以用每个仓库的出货量相加即可。不过如果我们在总仓库也监听商品订单消息,那么,每次来订单,总仓库也都会收到一份,那么可以作相应的统计了。

//不同的subscribe_id将会创建两个队列,消息同时发给两个队列

        private void btn_subscribe11_Click(object sender, EventArgs e)
        {
            _bus.Subscribe<MyMessage>("subscribe_id_1", myMessage => {
                lbxMessage1.Invoke(new Action(() => { lbxMessage1.Items.Add(myMessage.Text); }));
            });
        }

        private void btnSubscribe22_Click(object sender, EventArgs e)
        {
            _bus.Subscribe<MyMessage>("subscribe_id_2", myMessage => {
                lbxMessage2.Invoke(new Action(() => { lbxMessage2.Items.Add(myMessage.Text); }));
            });
        }

  需要注意的是,在收到消息处理消息时候,不要占用太多的时间,会影响消息的处理效率,所以,遇到占用长时间的处理方法,最好用异步处理。代码如下:

bus.SubscribeAsync<MyMessage>("subscribe_async_test", message => 
    new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500"))
        .ContinueWith(task => 
            Console.WriteLine("Received: '{0}', Downloaded: '{1}'", 
                message.Text, 
                task.Result)));

非泛型发布订阅

如果想在项目运行期间发布订阅消息,EasyNetQ提供了非泛型的发布订阅
加using

using EasyNetQ.NonGeneric;

提供如下非泛型的发布订阅方法:

//订阅
public static IDisposable Subscribe(
    this IBus bus,
    Type messageType,
    string subscriptionId,
    Action<object> onMessage,
    Action<ISubscriptionConfiguration> configure)    
public static IDisposable SubscribeAsync(    
    this IBus bus,     
    Type messageType,     
    string subscriptionId,     
    Func<object, Task> onMessage,     
    Action<ISubscriptionConfiguration> configure)
 
//发布
 public static void Publish(
    this IBus bus, 
    Type messageType, 
    object message, 
    string topic)
 public static Task PublishAsync(
    this IBus bus, 
    Type messageType, 
    object message, 
    string topic)

  取消订阅,可以用如下方法:

var subscriptionResult = bus.Subscribe<MyMessage>("sub_id", MyHandler);
subscriptionResult.Dispose();
//或者直接IBus.Dispose();