Dotnet Core下的Channel

var channel = Channel.CreateUnbounded<string>();

var producer1 = new Producer<string>(channel.Writer);
var consumer1 = new Consumer<string>(channel.Reader);

 internal class Producer<T>
    {
        private readonly ChannelWriter<T> _writer;

        public Producer(ChannelWriter<T> writer)
        {
            _writer = writer; 
        }

        public async Task BeginProducing(T t)
        {
            await _writer.WriteAsync(t);
        }
    }

    internal class Consumer<T>
    {
        private readonly ChannelReader<T> _reader;
      
        public Consumer(ChannelReader<T> reader)
        {
            _reader = reader;
            
        }

        public async Task ConsumeData()
        {
         
            while (await _reader.WaitToReadAsync())
            {
                if (_reader.TryRead(out var timeString))
                {
                  
                }
            }
        }
    }

 

单一生产者/单一消费者

这个例子中,创建了一个生产者和一个消费者。两者的任务都是并发启动的。

里面的延时,是用来模拟工作负载的。

 var channel = Channel.CreateUnbounded<string>();

            var producer1 = new Producer<string>(channel.Writer);
            var consumer1 = new Consumer<string>(channel.Reader);

            Task consumerTask1 = consumer1.ConsumeData(); // begin consuming
            Task producerTask1 = producer1.BeginProducing("sss"); // begin producing

            await producerTask1.ContinueWith(_ => channel.Writer.Complete());

            await consumerTask1;

  

多个生产者/单一消费者

这个例子中有两个生产者。通常在应用中有多个生产者时,我们需要确保生产与单个消费者所能处理的消息数量大致相当,这样能更好地利用服务器资源。

 var channel = Channel.CreateUnbounded<string>();

            var producer1 = new Producer<string>(channel.Writer);
            var producer2 = new Producer<string>(channel.Writer);
            var consumer1 = new Consumer<string>(channel.Reader);

            Task consumerTask1 = consumer1.ConsumeData(); // begin consuming

            Task producerTask1 = producer1.BeginProducing("1111");

            await Task.Delay(500); // stagger the producers

            Task producerTask2 = producer2.BeginProducing("1111");

            await Task.WhenAll(producerTask1, producerTask2)
                .ContinueWith(_ => channel.Writer.Complete());

            await consumerTask1;

  

单一生产者/多个消费者

这个其实是应用中最常见的情况,就是产生消息很快,但处理工作相关较慢,而且工作也更密集。这种情况,实际应用中我们可以通过扩大消费者数量来满足生产的需求。

var channel = Channel.CreateUnbounded<string>();

            var producer1 = new Producer<string>(channel.Writer);
            var consumer1 = new Consumer<string>(channel.Reader);
            var consumer2 = new Consumer<string>(channel.Reader);
            var consumer3 = new Consumer<string>(channel.Reader);

            Task consumerTask1 = consumer1.ConsumeData(); // begin consuming
            Task consumerTask2 = consumer2.ConsumeData(); // begin consuming
            Task consumerTask3 = consumer3.ConsumeData(); // begin consuming

            Task producerTask1 = producer1.BeginProducing("<string");

            await producerTask1.ContinueWith(_ => channel.Writer.Complete());

            await Task.WhenAll(consumerTask1, consumerTask2, consumerTask3);

 多生产者/多消费(高IO)

var channel = Channel.CreateUnbounded<string>();

            var producer1 = new Producer<string>(channel.Writer);
            var producer2 = new Producer<string>(channel.Writer);
            var consumer1 = new Consumer<string>(channel.Reader);
            var consumer2 = new Consumer<string>(channel.Reader);
            var consumer3 = new Consumer<string>(channel.Reader);

            Task consumerTask1 = consumer1.ConsumeData(); // begin consuming
            Task consumerTask2 = consumer2.ConsumeData(); // begin consuming
            Task consumerTask3 = consumer3.ConsumeData(); // begin consuming

            Task producerTask1 = producer1.BeginProducing("ssss");
            Task producerTask2 = producer2.BeginProducing("11111");


            await Task.WhenAll(producerTask1, producerTask2)
              .ContinueWith(_ => channel.Writer.Complete());

            await Task.WhenAll(consumerTask1, consumerTask2, consumerTask3);