ThreadPool做并发程序

参考文章:www.cnblogs.com/xugang/archive/2008/03/23/1118584.html

             http://www.cnblogs.com/SkySoot/archive/2012/04/01/2429259.html

在多线程的程序中,经常会出现两种情况:
   1. 应用程序中线程把大部分的时间花费在等待状态,等待某个事件发生,然后给予响应。这一般使用 ThreadPool(线程池)来解决。 
   2. 线程平时都处于休眠状态,只是周期性地被唤醒。这一般使用 Timer(定时器)来解决。

ThreadPool 类提供一个由系统维护的线程池(可以看作一个线程的容器),该容器需要 Windows 2000 以上系统支持,因为其中某些方法调用了只有高版本的Windows 才有的 API 函数。

将线程安放在线程池里,需使用 ThreadPool.QueueUserWorkItem() 方法,该方法的原型如下:
    // 将一个线程放进线程池,该线程的 Start() 方法将调用 WaitCallback 代理对象代表的函数
    public static bool QueueUserWorkItem(WaitCallback);
    // 重载的方法如下,参数 object 将传递给 WaitCallback 所代表的方法
    public static bool QueueUserWorkItem(WaitCallback, object);
注意:
    ThreadPool 类是一个静态类,你不能也不必要生成它的对象。而且一旦使用该方法在线程池中添加了一个项目,那么该项目将是无法取消的。这里你无需自己建立线程,只需把 你要做的工作写成函数,然后作为参数传递给ThreadPool.QueueUserWorkItem()方法就行了,传递的方法就是依靠 WaitCallback 代理对象,而线程的建立、管理、运行等工作都是由系统自动完成的,你无须考虑那些复杂的细节问题。

ThreadPool 的用法:
    首先程序创建了一个 ManualResetEvent 对象,该对象就像一个信号灯,可以利用它的信号来通知其它线程。本例中,当线程池中所有线程工作都完成以后,ManualResetEvent 对象将被设置为有信号,从而通知主线程继续运行。
ManualResetEvent 对象有几个重要的方法:
    初始化该对象时,用户可以指定其默认的状态(有信号/无信号);
    在初始化以后,该对象将保持原来的状态不变,直到它的 Reset() 或者 Set() 方法被调用:
    Reset():
        将其设置为无信号状态;
    Set():
        将其设置为有信号状态。
    WaitOne():
        使当前线程挂起,直到 ManualResetEvent 对象处于有信号状态,此时该线程将被激活。然后,程序将向线程池中添加工作项,这些以函数形式提供的工作项被系统用来初始化自动建立的线程。当所有的线程 都运行完了以后,ManualResetEvent.Set() 方法被调用,因为调用了 ManualResetEvent.WaitOne() 方法而处在等待状态的主线程将接收到这个信号,于是它接着往下执行,完成后边的工作。

下面是我自己做的例子:

比较浅显的

public class ThreadPoolManualResetEventDemo
    {
        public static void Main() 
        {
            Console.WriteLine("测试");
            //把信号量设置成无信号状态的
            ManualResetEvent eventX = new ManualResetEvent(false);            
            int maxCount = 100;
            SomeState somstate = new SomeState(maxCount);
            for (int i = 0; i < maxCount; i++)
            {
                ThreadPool.QueueUserWorkItem(new WaitCallback(ShowMsg), new ThreadData() { Index = i, WaitHandle = eventX, Somestate = somstate });
            }
            Console.WriteLine("等待线程执行结束");
            eventX.WaitOne(Timeout.Infinite, true);
            Console.WriteLine("结束了");
            Console.ReadKey();
        }
        public static void ShowMsg(object obj) {
            ThreadData td = (ThreadData)obj;
            Console.WriteLine("测试信息"+obj);
            Thread.Sleep(5000);
            if (Interlocked.Decrement(ref td.Somestate.maxCount) == 0)
            {
                td.WaitHandle.Set();
            }
        }
    }
    class ThreadData
    {
        public int Index { get; set; }
        public object Data { get; set; }
        public ManualResetEvent WaitHandle { get; set; }
        public SomeState Somestate { set; get; }
    }

    public class SomeState
    {
        public int maxCount;
        public SomeState(int count)
        {
            this.maxCount = count;
        }
    }

下面的是我们项目里面用的一个并发,原理也是线程池和ManualResetEvent来控制的。一个同事封装的

public class Parallel
    {
        class ThreadData
        {
            public int Index { get; set; }
            public object Data { get; set; }
            public CountdownEvent WaitHandle { get; set; }
        }
        /// <summary>
        /// default timeout 60 minutes.
        /// </summary>
        static TimeSpan defaultTimeout = TimeSpan.FromMinutes(60);
        static void Log(int item, Exception ex)
        {
            Debug.WriteLine(string.Format("error position {0}:{1}", item, ex.ToString()));
        }
        static void Log<T>(T item, Exception ex)
        {
            Debug.WriteLine(string.Format("error item {0}:{1}", JsonConvert.SerializeObject(item), ex.ToString()));
        }
        public static void For(int len, Action<int> action)
        {
            For(len, action, defaultTimeout);
        }
        public static void For(int len, Action<int> action, TimeSpan timeout)
        {
            For(len, action, Log, timeout);
        }
        public static void For(int len, Action<int> action, Action<int, Exception> handleError)
        {
            For(len, action, handleError, defaultTimeout);
        }
        public static void For(int len, Action<int> action, Action<int, Exception> handleError, TimeSpan timeout)
        {
            AggregateException errors = new AggregateException();
            using (CountdownEvent evt = new CountdownEvent(len))
            {
                for (int i = 0; i < len; i++)
                {
                    ThreadPool.QueueUserWorkItem(obj =>
                    {
                        var shared = obj as Parallel.ThreadData;
                        try
                        {
                            action(shared.Index);
                        }
                        catch (Exception ex)
                        {
                            handleError(shared.Index, ex);
                            errors.AddException(shared.Index, ex);
                        }
                        finally
                        {
                            shared.WaitHandle.Signal();
                        }

                    }, new ThreadData { Index = i, WaitHandle = evt });

                }
                evt.Wait(timeout);
            }
            if (errors.Count > 0) throw errors;
        }
        public static void ForEach<T>(IEnumerable<T> source, Action<T> action)
        {
            ForEach<T>(source, action, defaultTimeout);
        }
        public static void ForEach<T>(IEnumerable<T> source, Action<T> action, TimeSpan timeout)
        {
            ForEach<T>(source, action, Log, timeout);
        }
        public static void ForEach<T>(IEnumerable<T> source, Action<T> action, Action<T, Exception> handleError)
        {
            ForEach<T>(source, action, handleError, defaultTimeout);
        }
        public static void ForEach<T>(IEnumerable<T> source, Action<T> action, Action<T, Exception> handleError, TimeSpan timeout)
        {
            AggregateException errors = new AggregateException();
            using (CountdownEvent evt = new CountdownEvent(source.Count()))
            {
                int i = 0;
                foreach (var item in source)
                {
                    Interlocked.Increment(ref i);
                    ThreadPool.QueueUserWorkItem(obj =>
                    {
                        var shared = obj as Parallel.ThreadData;

                        try
                        {
                            action(shared.Data.AsType<T>());
                        }
                        catch (Exception ex)
                        {
                            handleError(shared.Data.AsType<T>(), ex);
                            errors.AddException(shared.Index, ex);
                        }
                        finally
                        {
                            shared.WaitHandle.Signal();
                        }

                    }, new ThreadData { Index = i, Data = item, WaitHandle = evt });
                }

                evt.Wait(timeout);
            }
            if (errors.Count > 0) throw errors;
        }
        public static ConcurrentDictionary<int, TResult> ForEach<T, TResult>(IEnumerable<T> source, Func<T, TResult> func)
        {
            return ForEach<T, TResult>(source, func, defaultTimeout);
        }
        public static ConcurrentDictionary<int, TResult> ForEach<T, TResult>(IEnumerable<T> source, Func<T, TResult> func, TimeSpan timeout)
        {
            return ForEach<T, TResult>(source, func, Log, defaultTimeout);
        }
        public static ConcurrentDictionary<int, TResult> ForEach<T, TResult>(IEnumerable<T> source, Func<T, TResult> func, Action<T, Exception> handleError)
        {
            return ForEach<T, TResult>(source, func, handleError, defaultTimeout);
        }
        public static ConcurrentDictionary<int, TResult> ForEach<T, TResult>(IEnumerable<T> source, Func<T, TResult> func, Action<T, Exception> handleError, TimeSpan timeout)
        {
            ConcurrentDictionary<int, TResult> dict = new ConcurrentDictionary<int, TResult>();
            AggregateException errors = new AggregateException();
            using (CountdownEvent evt = new CountdownEvent(source.Count()))
            {
                int i = 0;
                foreach (var item in source)
                {
                    Interlocked.Increment(ref i);
                    ThreadPool.QueueUserWorkItem(obj =>
                    {
                        var shared = obj as Parallel.ThreadData;
                        try
                        {
                            dict[shared.Index] = func(shared.Data.AsType<T>());
                        }
                        catch (Exception ex)
                        {
                            handleError(shared.Data.AsType<T>(), ex);
                            errors.AddException(shared.Index, ex);
                        }
                        finally
                        {
                            shared.WaitHandle.Signal();
                        }

                    }, new ThreadData { Index = i, Data = item, WaitHandle = evt });
                }
                evt.Wait(timeout);
            }
            if (errors.Count > 0) throw errors;
            return dict;
        }
    }

 public class CountdownEvent : IDisposable
    {
        private readonly ManualResetEvent done;
        private readonly int total;
        private long current;

        public CountdownEvent(int total)
        {
            this.total = total;
            current = total;
            done = new ManualResetEvent(false);
        }

        public void Signal()
        {
            if (Interlocked.Decrement(ref current) == 0)
            {
                done.Set();
            }
        }

        public void Wait()
        {
            done.WaitOne();
        }
        public bool Wait(TimeSpan timeout)
        {
            return done.WaitOne(timeout);
        }
        public void Dispose()
        {
            ((IDisposable)done).Dispose();
        }
    }

public class ParallelException : Exception
    {
        public int Index { get; private set; }
        public ParallelException(int itemIndex) { Index = itemIndex; }
        public ParallelException(int itemIndex, string message) : base(message) { Index = itemIndex; }
        public override string Message
        {
            get
            {
                return string.Format("the specified item index: {0}, occured :{1}", Index, base.Message);
            }
        }
    }

    public class AggregateException : Exception
    {
        List<ParallelException> inners = new List<ParallelException>();
        public AggregateException() : base() { }
        public AggregateException(string message) : base(message) { }
        public ReadOnlyCollection<ParallelException> Exceptions
        {
            get
            {
                lock (inners)
                {
                    return inners.AsReadOnly();
                }

            }
        }
        public void AddException(int itemIndex, string message)
        {
            lock (inners)
            {
                inners.Add(new ParallelException(itemIndex, message));
            }
        }
        public void AddException(int itemIndex, Exception ex)
        {
            AddException(itemIndex, ex.ToString());
        }
        public int Count
        {
            get
            {
                lock (inners)
                {
                    return inners.Count;
                }
            }
        }
        public override string Message
        {
            get
            {
                StringBuilder sb = new StringBuilder(base.Message);
                foreach (var ex in Exceptions)
                    sb.AppendLine(ex.ToString());
                return sb.ToString();
            }
        }
    }

语言组织能力太弱了,就做下记录,以后用了有个参考。