[解决]可以/我应该使用线程来从方法中产生结果吗?

问题描述:





我有一些代码可以从db消息队列中获取任务列表。并产生结果。这种运行速度很慢所以我想多线程。我怎样才能最好地实现这个的多线程版本:



Hi,

I have some code that gets a list of tasks from a db message queue. and yields results. This is running quite slowly so I want to multi-thread it. How can I best implement a multi-threaded version of this:

private IEnumerable<Data> GetData()
{
  while (!_cancel)
  {
     foreach (var dueMessage in F.MessageFactory.GetDueMessages())
     {
        var item = ProcessMessage(dueMessage);
        if (item != null)
        yield return item;
    }
  }
}



每个 F.MessageFactory.GetDueMessages()可能有零到几百条消息在等待(在某些情况下,许多进程同时被安排)



理想情况下我希望每个 item 在准备就绪后立即生成,但消息(目前)仅在 ProcessMessage(dueMessage)中设置完成。



这意味着我必须为每个使用WaitHandle,而循环或者设置消息以便更早完成在这个过程中,就像 foreach 循环命中它一样。编辑:例如,只要 F。 MessageFactory.GetDueMessages()返回列表。



所以,我的问题是:我怎样才能安全地多线程这个方法,同时让你产生结果很快?





谢谢大家^ _ ^

Andy


In each F.MessageFactory.GetDueMessages() there may be zero to hundreds of messages waiting (many processes are scheduled at the same time in some cases)

Ideally I would like each item to be yielded as soon as it is ready, but the message is (currently) only set to complete within ProcessMessage(dueMessage).

This means that I would either have to use a WaitHandle for each while loop or set the message to complete earlier in the process, such as as soon as the foreach loop hits it. such as as soon as F.MessageFactory.GetDueMessages() returns the list.

So, My question is: How can I safely multi-thread this method while yielding the results as soon as possible?


Thanks guys ^_^
Andy

编辑:我去的解决方案包括调整。



如果你按照评论,我的问题是有一次进程在我的ProcessMessage方法中占用所有时间。在我等待它做的时候我几乎无能为力,所以async并没有太大的改进。



我选择了Reactive Extension解决方案相反。



注意 - 我需要返回一个返回Enumberable 的方法。

The solution I went for including tweaks.

If you follow the comments, my issue is that there is once process that takes all the time in my ProcessMessage method. There is barely anything I can do while I'm waiting for it to do it's stuff so async was not much of an improvement.

I went for a Reactive Extension solution instead.

Note - I need to return a method that returns Enumberable.
        private void GetFlighData()
{
    while (!_cancel)
    {
        Parallel.ForEach(F.MessageFactory.GetDueMessages(), dueMessage =>
        {

            var item = ProcessMessage(dueMessage);
            if (item != null)
            {
                //Event instead of yield
                OnGetFlightData(item);
            }
        });
    }
}

private delegate void GetAFlightDataHandler(FlightData flightData);

private event GetAFlightDataHandler FlightData;

private void OnGetFlightData(FlightData flightData)
{
    if (FlightData != null)
        FlightData(flightData);
}

BackgroundWorker _worker = new BackgroundWorker();

//This is my property that returns a method.  It no longer returns GetFlightData but creates the method from observable.FromEvent
public override GetFlighDataMethod GetFlighDataMethod
{
    get
    {
        //Start the queue
        _worker.DoWork += delegate { GetFlighData(); };
        _worker.RunWorkerAsync();

        // create the method
        return () =>
        {
            var flightDataReceived = Observable.FromEvent<GetAFlightDataHandler, FlightData>(handler =>
            {
                GetAFlightDataHandler fdHandler = (e) =>
                {
                    handler(e);
                };
                return fdHandler;
            },
                fdHandler => FlightData += fdHandler,
                fdHandler => FlightData -= fdHandler
                );

            return flightDataReceived.ToEnumerable();
        };
    }
}