在线程之间传递数据的最佳实践是什么?队列,消息还是其他?

问题描述:

我得到了需要在不同阶段处理的各种类型的传感器数据.根据我所读的内容,最有效的方法是将任务拆分为多个线程.每个都将处理后的数据放入下一个线程的入口队列中.所以基本上是管道.

I got sensor data of various types that needs to be processed at different stages. From what I have read around, the most efficent way is to split the tasks into threads. Each puts the processed data into the entry queue of the next thread. So basically, a pipeline.

数据可能很大(几Mb),因此需要将其复制到传感器缓冲区之外,然后传递给将对其进行修改并传递的线程.

The data can be quite large (a few Mbs) so it needs to be copied out of the sensor buffer and then passed along to the threads that will modify it and pass it along.

我有兴趣了解进行传球的最佳方法.我读到,如果我在线程之间进行消息发布,则可以分配数据并将指针传递给其他线程,以便接收线程可以负责取消分配数据.我不太确定这对于流数据如何工作,即确保线程按顺序处理消息(我想我可以添加时间检查吗?).同样,我应该使用哪种数据结构来实现呢?我想我还是需要使用锁吗?

I am interested in understanding the best way to do the passing. I read that, if I do message posting between threads, I could allocate the data and pass the pointer to the other threads so the receiving thread can take care of de-allocating it. I am not quite sure, how this would work for streaming data, that is to make sure that the threads process the messages in order (I guess I could add a time check?). Also what data structure should I use for such an implementation? I presume I would need to use locks anyway?

拥有同步队列会更有效吗?

Would it be more efficient to have synchronised queues?

让我知道其他解决方案是否更好.计算需要实时进行,因此我需要这样才能真正高效.如果有人链接到通过线程管道传递的数据的良好示例,我将对它非常感兴趣.

Let me know if other solutions are better. The computations need to happen in real-time so I need this to be really efficient. If anyone has links to good examples of data being passed through a pipeline of threads I would be very interested in looking at it.

注意事项:没有增强功能或其他库.使用Pthreads.我需要使实现尽可能接近标准库.最终将在各种平台上使用(我尚不知道).

Caveats: No boost or other libraries. Using Pthreads. I need to keep the implementation as close to the standard libraries as possible. This will eventually be used on various platforms (which I don't know yet).

最近我不得不做类似的事情.我使用了输入/输出队列的方法.我认为这是最好,最快的方法.这是我的线程安全并发队列版本.我在我的项目中有三个工作线程对同一缓冲区等依次进行大量计算.每个线程使用输入队列中的弹出窗口并推送输出队列.所以我有这个wpop等待队列中的下一个可用缓冲区.希望对您有用.

I had to do something similar recently. I used the approach of input/output queue. I think is the best and fast method to use. This is my version of a thread safe concurrent queue. I have in my project three working threads doing lots of calculation in sequence to the same buffer etc. Each thread use the pop from the input queue and push the output queue. So I have this wpop that wait the next buffer available in the queue. I hope can be usefull for you.

/*
    Thread_safe queue with conditional variable
*/

template<typename dataType>
class CConcurrentQueue
{
private:
    /// Queue
    std::queue<dataType> m_queue;       
    /// Mutex to controll multiple access
    std::mutex m_mutex;                 
    /// Conditional variable used to fire event
    std::condition_variable m_cv;       
    /// Atomic variable used to terminate immediately wpop and wtpop functions
    std::atomic<bool> m_forceExit = false;  

public:
    /// <summary> Add a new element in the queue. </summary>
    /// <param name="data"> New element. </param>
    void push ( dataType const& data )
    {
        m_forceExit.store ( false );
        std::unique_lock<std::mutex> lk ( m_mutex );
        m_queue.push ( data );
        lk.unlock ();
        m_cv.notify_one ();
    }
    /// <summary> Check queue empty. </summary>
    /// <returns> True if the queue is empty. </returns>
    bool isEmpty () const
    {
        std::unique_lock<std::mutex> lk ( m_mutex );
        return m_queue.empty ();
    }
    /// <summary> Pop element from queue. </summary>
    /// <param name="popped_value"> [in,out] Element. </param>
    /// <returns> false if the queue is empty. </returns>
    bool pop ( dataType& popped_value )
    {
        std::unique_lock<std::mutex> lk ( m_mutex );
        if ( m_queue.empty () )
        {
            return false;
        }
        else
        {
            popped_value = m_queue.front ();
            m_queue.pop ();
            return true;
        }
    }
    /// <summary> Wait and pop an element in the queue. </summary>
    /// <param name="popped_value"> [in,out] Element. </param>
    ///  <returns> False for forced exit. </returns>
    bool wpop ( dataType& popped_value )
    {
        std::unique_lock<std::mutex> lk ( m_mutex );
        m_cv.wait ( lk, [&]()->bool{ return !m_queue.empty () || m_forceExit.load(); } );
        if ( m_forceExit.load() ) return false;
        popped_value = m_queue.front ();
        m_queue.pop ();
        return true;
    }
    /// <summary> Timed wait and pop an element in the queue. </summary>
    /// <param name="popped_value"> [in,out] Element. </param>
    /// <param name="milliseconds"> [in] Wait time. </param>
    ///  <returns> False for timeout or forced exit. </returns>
    bool wtpop ( dataType& popped_value , long milliseconds = 1000)
    {
        std::unique_lock<std::mutex> lk ( m_mutex );
        m_cv.wait_for ( lk, std::chrono::milliseconds ( milliseconds  ), [&]()->bool{ return !m_queue.empty () || m_forceExit.load(); } );
        if ( m_forceExit.load() ) return false;
        if ( m_queue.empty () ) return false;
        popped_value = m_queue.front ();
        m_queue.pop ();
        return true;
    }
    /// <summary> Queue size. </summary>    
    int size ()
    {   
        std::unique_lock<std::mutex> lk ( m_mutex );
        return static_cast< int >( m_queue.size () );
    }
    /// <summary> Free the queue and force stop. </summary>
    void clear ()
    { 
        m_forceExit.store( true );
        std::unique_lock<std::mutex> lk ( m_mutex );
        while ( !m_queue.empty () )
        {
            delete m_queue.front ();
            m_queue.pop ();
        }
    }
    /// <summary> Check queue in forced exit state. </summary>
    bool isExit () const
    {
        return m_forceExit.load();
    }

};