在线程池中与boost :: function用法相关的崩溃

问题描述:

我想使用pthread在C ++中实现线程池。我想封装与线程管理相关的逻辑在一个对象,这是获取这些线程的所有权。这意味着每当这个对象被销毁,线程必须被停止和清理。

I am trying to implement thread pool in C++ using pthread. I want to encapsulate logic related to threads management in one object which is taking ownership of these threads. That means whenever this object is destroyed, threads must be stopped and cleaned up.

我一直在测试我的代码,结果是当我销毁WorkerThreadManager对象,同时有boost :: function调用。从GDB中查看代码和回溯。我真的不明白为什么会发生,据我知道boost :: function是可复制的,所以一旦我从队列中获得它的副本,我可以pop()它,甚至破坏整个队列(我认为一些小测试),然后调用函数的副本。

I've been testing my code and it turns out that I get segmentation fault when I destroy WorkerThreadManager object while there is boost::function called. See the code and backtrace from GDB. I don't really understand why it happens, as far as I know boost::function is copyable, so once I get a copy of it from the queue, I can pop() it and even destroy whole queue (I prooved that in some small test) and then call the function's copy.

WorkerThreadManager.h:

WorkerThreadManager.h:

#include "WorkerThreadManagerInterface.h"
#include "utils/mutex.h"
#include <queue>
#include <semaphore.h>

#include <iostream>

class WorkerThreadManager : public WorkerThreadManagerInterface
{
    public:
        WorkerThreadManager(unsigned threadsNumber = 5);
        virtual ~WorkerThreadManager();

        virtual void    PushTask(thread_function_t A_threadFun, result_function_t A_resultFun);
        void    SignalResults();

    private:
        static void*    WorkerThread(void* A_data);

        void    PushResult(int A_result, result_function_t A_resultFun);

        typedef boost::function<void ()> signal_function_t;

        struct worker_thread_data_t
        {
            worker_thread_data_t(thread_function_t A_threadFun, result_function_t A_resultFun) :
                threadFun(A_threadFun), resultFun(A_resultFun) {}
            worker_thread_data_t() {}

            thread_function_t       threadFun;
            result_function_t       resultFun;
        };


        const unsigned                      m_threadsNumber;
        pthread_t*                          m_pthreads;

        utils::Mutex                        m_tasksMutex;
        sem_t                               m_tasksSem;
        std::queue<worker_thread_data_t>    m_tasks;

        utils::Mutex                        m_resultsMutex;
        std::queue<signal_function_t>       m_results;
};

WorkerThreadManager.cpp:

WorkerThreadManager.cpp:

#include "WorkerThreadManager.h"
#include "gateway_log.h"
#include <pthread.h>

/**
 * @brief Creates semaphore and starts threads.
 */
WorkerThreadManager::WorkerThreadManager(unsigned threadsNumber) : m_threadsNumber(threadsNumber)
{
    if ( sem_init(&m_tasksSem, 0, 0) )
    {
        std::stringstream ss;
        ss << "Semaphore could not be initialized: " << errno << " - " << strerror(errno);
        LOG_FATAL(ss);
        throw std::runtime_error(ss.str());
    }

    m_pthreads = new pthread_t[m_threadsNumber];
    for (unsigned i = 0; i < m_threadsNumber; ++i)
    {
        int rc = pthread_create(&m_pthreads[i], NULL, WorkerThreadManager::WorkerThread, (void*) this );
        if(rc)
        {
            std::stringstream ss;
            ss << "Pthread could not be started: " << errno << " - " << strerror(errno);
            LOG_FATAL(ss.str());

            if ( sem_destroy(&m_tasksSem) )
                LOG_ERROR("Semaphore could not be destroyed: " << errno << " - " << strerror(errno));

            delete [] m_pthreads;

            throw std::runtime_error(ss.str());
        }
        else
        {
            LOG_DEBUG("Worker thread started " << m_pthreads[i]);

            if(pthread_detach(m_pthreads[i]))
                LOG_WARN("Failed to detach worker thread");
        }
    }
}

/**
 * @brief Cancels all threads, destroys semaphore
 */
WorkerThreadManager::~WorkerThreadManager()
{
    LOG_DEBUG("~WorkerThreadManager()");

    for(unsigned i = 0; i < m_threadsNumber; ++i)
    {
        if ( pthread_cancel(m_pthreads[i]) )
            LOG_ERROR("Worker thread cancellation failed");
    }

    if ( sem_destroy(&m_tasksSem) )
        LOG_ERROR("Semaphore could not be destroyed: " << errno << " - " << strerror(errno));

    delete [] m_pthreads;
}

/**
 * @brief Adds new task to queue, so worker threads can
 * @param A_threadFun function which will be executed by thread
 * @param A_resultFun function which will be enqueued for calling with return value of A_threadFun as parameter
 *          after worker thread executes A_threadFun.
 */
void WorkerThreadManager::PushTask(thread_function_t A_threadFun, result_function_t A_resultFun)
{
    utils::ScopedLock mutex(m_tasksMutex);

    worker_thread_data_t    data(A_threadFun, A_resultFun);
    m_tasks.push( data );
    sem_post(&m_tasksSem);
    LOG_DEBUG("Task for worker threads has been added to queue");
}

/**
 * @brief   Executes result functions (if there are any) to give feedback 
 *  to classes which requested task execution in worker thread.
 */
void WorkerThreadManager::SignalResults()
{
    while(true)
    {
        signal_function_t signal;
        {
            utils::ScopedLock mutex(m_resultsMutex);
            if(m_results.size())
            {
                signal = m_results.front();
                m_results.pop();
            }
            else
                return;
        }

        signal();
    }
}

/**
 * @brief Enqueues result of function executed in worker thread.
 * @param A_result return value of function executed in worker thread
 * @param A_resultFun function which will be enqueued for calling with A_result as a parameter.
 */
void WorkerThreadManager::PushResult(int A_result, result_function_t A_resultFun)
{
    utils::ScopedLock mutex(m_resultsMutex);

    signal_function_t signal = boost::bind(A_resultFun, A_result);
    m_results.push( signal );
}


/**
 * @brief   worker thread body
 * @param A_data pointer to WorkerThreadManager instance
 */
void* WorkerThreadManager::WorkerThread(void* A_data)
{
    WorkerThreadManager* manager = reinterpret_cast<WorkerThreadManager*>(A_data);
    LOG_DEBUG("Starting worker thread loop");
    while (1)
    {
        if ( -1 == sem_wait(&manager->m_tasksSem) && errno == EINTR )
        {
            LOG_DEBUG("sem_wait interrupted with signal");
            continue;
        }
        LOG_DEBUG("WorkerThread:::::: about to call lock mutex");

        worker_thread_data_t data;
        {
            utils::ScopedLock mutex(manager->m_tasksMutex);
            data = manager->m_tasks.front();
            manager->m_results.pop();
        }

        LOG_DEBUG("WorkerThread:::::: about to call resultFun");
        int result  = data.threadFun();
        LOG_DEBUG("WorkerThread:::::: after call resultFun");
        pthread_testcancel();

        manager->PushResult(result, data.resultFun);
    }

    return NULL;
}

main.cpp:

#include "gateway_log.h"
#include "WorkerThreadManager.h"
#include <memory>

class A {
public:
    int Fun() { LOG_DEBUG("Fun before sleep"); sleep(8); LOG_DEBUG("Fun after sleep");return 0; }
    void Result(int a) { LOG_DEBUG("Result: " << a); }
};


int main()
{
    sd::auto_ptr<WorkerThreadManager> workerThreadManager = new WorkerThreadManager;
    A a;
    workerThreadManager->PushTask(boost::bind(&A::Fun, &a), boost::bind(&A::Result, &a, _1));
    sleep(3);
    LOG_DEBUG("deleting workerThreadManager");
    workerThreadManager.reset();                    // <<<--- CRASH
    LOG_DEBUG("deleted workerThreadManager");
    sleep(10);
    LOG_DEBUG("after sleep");    

    return 0;
}

GDB:

(gdb) bt
#0  0xb7ad33a0 in ?? () from /lib/i386-linux-gnu/libc.so.6
#1  0x0807d3a7 in boost::function0<void>::clear (this=0x858db48) at /home/marcin/intel_build/boost_1_42_0/boost/function/function_template.hpp:856
#2  0x0807d17b in boost::function0<void>::~function0 (this=0x858db48, __in_chrg=<optimized out>) at /home/marcin/intel_build/boost_1_42_0/boost/function/function_template.hpp:752
#3  0x0807cec5 in boost::function<void()>::~function(void) (this=0x858db48, __in_chrg=<optimized out>) at /home/marcin/intel_build/boost_1_42_0/boost/function/function_template.hpp:1043
#4  0x0807ced8 in std::_Destroy<boost::function<void ()> >(boost::function<void ()>*) (__pointer=0x858db48) at /usr/include/c++/4.6/bits/stl_construct.h:94
#5  0x0807c868 in std::_Destroy_aux<false>::__destroy<boost::function<void ()>*>(boost::function<void ()>*, boost::function<void ()>*) (__first=0x858db48, __last=0x858d928) at /usr/include/c++/4.6/bits/stl_construct.h:104
#6  0x0807bd05 in std::_Destroy<boost::function<void ()>*>(boost::function<void ()>*, boost::function<void ()>*) (__first=0x858d938, __last=0x858d928) at /usr/include/c++/4.6/bits/stl_construct.h:127
#7  0x0807af23 in std::_Destroy<boost::function<void ()>*, boost::function<void ()> >(boost::function<void ()>*, boost::function<void ()>*, std::allocator<boost::function<void ()> >&) (__first=0x858d938, __last=0x858d928)
    at /usr/include/c++/4.6/bits/stl_construct.h:153
#8  0x0807a037 in std::deque<boost::function<void ()>, std::allocator<boost::function<void ()> > >::_M_destroy_data_aux(std::_Deque_iterator<boost::function<void ()>, boost::function<void ()>&, boost::function<void ()>*>, std::_Deque_iterator<boost::function<void ()>, boost::function<void ()>&, boost::function<void ()>*>) (this=0x858beec, __first=..., __last=...) at /usr/include/c++/4.6/bits/deque.tcc:795
#9  0x08076153 in std::deque<boost::function<void ()>, std::allocator<boost::function<void ()> > >::_M_destroy_data(std::_Deque_iterator<boost::function<void ()>, boost::function<void ()>&, boost::function<void ()>*>, std::_Deque_iterator<boost::function<void ()>, boost::function<void ()>&, boost::function<void ()>*>, std::allocator<boost::function<void ()> > const&) (this=0x858beec, __first=..., __last=...) at /usr/include/c++/4.6/bits/stl_deque.h:1816
#10 0x08073411 in std::deque<boost::function<void()>, std::allocator<boost::function<void()> > >::~deque(void) (this=0x858beec, __in_chrg=<optimized out>) at /usr/include/c++/4.6/bits/stl_deque.h:898
#11 0x0806a355 in std::queue<boost::function<void()>, std::deque<boost::function<void()>, std::allocator<boost::function<void()> > > >::~queue(void) (this=0x858beec, __in_chrg=<optimized out>)
    at /usr/include/c++/4.6/bits/stl_queue.h:92
#12 0x0815a054 in WorkerThreadManager::~WorkerThreadManager (this=0x858be98, __in_chrg=<optimized out>) at WorkerThreadManager.cpp:42
#13 0x0815a1e3 in WorkerThreadManager::~WorkerThreadManager (this=0x858be98, __in_chrg=<optimized out>) at WorkerThreadManager.cpp:56
#14 0x080c6c51 in std::auto_ptr<WorkerThreadManager>::reset (this=0x85463e4, __p=0x0) at /usr/include/c++/4.6/backward/auto_ptr.h:244
#15 0x080604a9 in main ()



我真的很感激任何帮助。 p>

I would really appreciate any help.

我发现了一个错误,这是微不足道的耻辱我:(

I found a bug, it was trivial - shame on me :(

在函数 void * WorkerThreadManager :: WorkerThread(void * A_data)我弹出 m_results $ c> m_tasks :

In function void* WorkerThreadManager::WorkerThread(void* A_data) I popped m_results queue instead of m_tasks as I had intended:

worker_thread_data_t data;
{
    utils::ScopedLock mutex(manager->m_tasksMutex);
    data = manager->m_tasks.front();
    manager->m_results.pop();
}

反正我不明白为什么会导致崩溃这么晚 - 的队列。

Anyway I don't really understand why it caused crash so late - in destructor of the queue.