C#并行编程中的Parallel.Invoke

一、基础知识

     并行编程:并行编程是指软件开发的代码,它能在同一时间执行多个计算任务,提高执行效率和性能一种编程方式,属于多线程编程范畴。所以我们在设计过程中一般会将很多任务划分成若干个互相独立子任务,这些任务不考虑互相的依赖和顺序。这样我们就可以使用很好的使用并行编程。但是我们都知道多核处理器的并行设计使用共享内存,如果没有考虑并发问题,就会有很多异常和达不到我们预期的效果。不过还好NET Framework4.0引入了Task Parallel Library(TPL)实现了基于任务设计而不用处理重复复杂的线程的并行开发框架。它支持数据并行,任务并行与流水线。核心主要是Task,但是一般简单的并行我们可以利用Parallel提供的静态类如下三个方法。

         Parallel.Invoke  对给定任务实现并行开发

         Parallel.For  对固定数目的任务提供循环迭代并行开发

         parallel.Foreach 对固定数目的任务提供循环迭代并行开发

注意:所有的并行开发不是简单的以为只要将For或者Foreach换成Parallel.For与Parallel.Foreach这样简单。

PS:从简单的Invoke开始逐步深入探讨并行开发的主要知识点,也对自己学习过程中的积累做个总结,其中参考了博客园中的其他优秀博文

二、Parallel.Invoke在并行中的使用

      首先我们来看看它的两个重载方法:

  public static void Invoke(params Action[] actions);  
  public static void Invoke(ParallelOptions parallelOptions, params Action[] actions);

 Invoke主要接受params的委托actions,比如我们要同时执行三个任务,我们可以这样利用    

  方式一
       Parallel.Invoke(() => Task1(), () => Task2(), () => Task3());
  方式二
       Parallel.Invoke(Task1, Task2, Task3);
  方式三
       Parallel.Invoke(
                () =>
                {
                    Task1();
                },
                Task2,
                delegate () { Task3(); console.write('do someting!');});

这样Invoke就简单实现了Task1,Task2,Task3的并行开发。下面我们用实例来说明他们的执行规则。以及两个重载方法的使用。

三 、Demo

     1、 Demo 1:

public class ParallelInvoke
    {
        /// <summary>
        /// Invoke方式一 action
        /// </summary>
        public  void Client1()
        {
            Stopwatch stopWatch = new Stopwatch();
           
            Console.WriteLine("主线程:{0}线程ID : {1};开始", "Client1", Thread.CurrentThread.ManagedThreadId);
            stopWatch.Start();
            Parallel.Invoke(() => Task1("task1"), () => Task2("task2"), () => Task3("task3"));
            stopWatch.Stop();
            Console.WriteLine("主线程:{0}线程ID : {1};结束,共用时{2}ms", "Client1", Thread.CurrentThread.ManagedThreadId, stopWatch.ElapsedMilliseconds);
        }

        private void Task1(string data)
        {
            Thread.Sleep(5000);
            Console.WriteLine("任务名:{0}线程ID : {1}", data, Thread.CurrentThread.ManagedThreadId);
        }

        private void Task2(string data)
        {
            Console.WriteLine("任务名:{0}线程ID : {1}", data, Thread.CurrentThread.ManagedThreadId);
        }

        private void Task3(string data)
        {
            Console.WriteLine("任务名:{0}线程ID : {1}", data, Thread.CurrentThread.ManagedThreadId);
        }
}

执行运行后结果:

C#并行编程中的Parallel.Invoke

我们看到Invoke 执行Task三个方法主要有以下几个特点:

      1、没有固定的顺序,每个Task可能是不同的线程去执行,也可能是相同的;

      2、主线程必须等Invoke中的所有方法执行完成后返回才继续向下执行;这样对我们以后设计并行的时候,要考虑每个Task任务尽可能差不多,如果相差很大,比如一个时间非常长,其他都比较短,这样一个线程可能会影响整个任务的性能。这点非常重要

      3、这个非常简单就实现了并行,不用我们考虑线程问题。主要Framework已经为我们控制好线程池的问题。

ps:如果其中有一个异常怎么办? 带做这个问题修改了增加了一个Task4.

    2、 Demo2  

  public class ParallelInvoke
    {
        /// <summary>
        /// Invoke方式一 action
        /// </summary>
        public  void Client1()
        {
            Stopwatch stopWatch = new Stopwatch();
           
            Console.WriteLine("主线程:{0}线程ID : {1};开始", "Client1", Thread.CurrentThread.ManagedThreadId);
            stopWatch.Start();

            try
            {
                Parallel.Invoke(() => Task1("task1"), () => Task2("task2"), () => Task3("task3"), delegate () { throw new Exception("我这里发送了异常"); });
            }
            catch (AggregateException ae)
            {
                foreach (var ex in ae.InnerExceptions)
                    Console.WriteLine(ex.Message);
            }
          
            stopWatch.Stop();
            Console.WriteLine("主线程:{0}线程ID : {1};结束,共用时{2}ms", "Client1", Thread.CurrentThread.ManagedThreadId, stopWatch.ElapsedMilliseconds);
        }
   }

主要看 delegate() { throw new Exception("我这里发送了异常");} 增加了这个委托Task3. 然后我们看结果:

C#并行编程中的Parallel.Invoke

 这里我们发现即使有异常程序也会完成执行,而且不会影响其他Task的执行。

  3、demo3 重载方法ParallelOptions 的使用。

   理解ParallelOptions建议大家讲的非常详细。

   主要理解两个参数:

     CancellationToken     控制线程的取消
     MaxDegreeOfParallelism  设置最大的线程数,有时候可能会跑遍所有的内核,为了提高其他应用程序的稳定性,就要限制参与的内核

     下面从代码上看效果如何?

 public class ParallelInvoke
    {
     // 定义CancellationTokenSource 控制取消
        readonly CancellationTokenSource _cts = new CancellationTokenSource();

        /// <summary>
        /// Invoke方式一 action
        /// </summary>
        public  void Client1()
        {
             Console.WriteLine("主线程:{0}线程ID : {1};开始{2}", "Client3", Thread.CurrentThread.ManagedThreadId, DateTime.Now);
            
            var po = new ParallelOptions
            {
                CancellationToken = _cts.Token, // 控制线程取消
                MaxDegreeOfParallelism = 3  // 设置最大的线程数3,仔细观察线程ID变化
            };
           
            Parallel.Invoke(po, () => Task1("task1"), ()=>Task5(po), Task6);
            
            Console.WriteLine("主线程:{0}线程ID : {1};结束{2}", "Client3", Thread.CurrentThread.ManagedThreadId, DateTime.Now);
        }

        private void Task1(string data)
        {
            Thread.Sleep(5000);
            Console.WriteLine("任务名:{0}线程ID : {1}", data, Thread.CurrentThread.ManagedThreadId);
        }

// 打印数字 private void Task5(ParallelOptions po) { Console.WriteLine("进入Task5线程ID : {0}", Thread.CurrentThread.ManagedThreadId); int i = 0; while (i < 100) { // 判断是否已经取消 if (po.CancellationToken.IsCancellationRequested) { Console.WriteLine("已经被取消。"); return; } Thread.Sleep(100); Console.Write(i + " "); Interlocked.Increment(ref i); } } /// <summary> /// 10秒后取消 /// </summary> private void Task6() { Console.WriteLine("进入取消任务,Task6线程ID : {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(1000 * 10); _cts.Cancel(); Console.WriteLine("发起取消请求..........."); } }

执行结果:

C#并行编程中的Parallel.Invoke

 从程序结果我们看到以下特点:

            1、程序在执行过程中线程数码不超过3个。

            2、CancellationTokenSource/CancellationToken控制任务的取消。

四、总结

      Parallel.Invoke 的使用过程中我们要注意以下特点:

      1、没有特定的顺序,Invoke中的方法全部执行完才返回,但是即使有异常在执行过程中也同样会完成,他只是一个很简单的并行处理方法,特点就是简单,不需要我们考虑线程的问题。

      2、如果在设计Invoke中有个需要很长时间,这样会影响整个Invoke的效率和性能,这个我们在设计每个task时候必须去考虑的。

      3、Invoke 参数是委托方法。

      4、当然Invoke在每次调用都有开销的,不一定并行一定比串行好,要根据实际情况,内核环境多次测试调优才可以。

      5、异常处理比较复杂。