Boost 线程学习笔记

Bolg转载自: http://www.cnblogs.com/lvdongjie/p/4447193.html

一: 创建线程

#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/thread/xtime.hpp>

struct MyThreadFunc 
{
    void operator( )() 
    {
        // Do something long-running...
    }
} threadFun;

int main() 
{

    boost::thread myThread(threadFun); // Create a thread that starts running threadFun
    boost::thread::yield();           // Give up the main thread's timeslice,so the child thread can get some work done.

    // Go do some other work...

    myThread.join();                  // The current (i.e., main) thread will wait for myThread to finish before it returns

}

创建线程
boost::thread t(threadFun);
当一个thread执行完成时,这个子线程就会消失。注意这个线程对象不会消失,它仍然是一个还处在它的生存期的C++对象。同理,当对一个堆上的线程对象的指针调用delete时候,线程对象被
销毁,操作系统的线程并不能保证就消失。

放弃时间片
boost::thread::yield() 或者 this_thread::sleep(posix_time::seconds(3))
当前线程放弃余下的时间片。

等待一个线程
myThread.join();
调用这个方法的线程进入wait状态,直到myThread代表的线程完成为止。如果它不结束的话,join方法就不会返回。join是一个等待子线程结束的最好的方法。如果主程序不调用join方法而直接
结束,它的子线程有可能没有执行完成,但是所有的子线程也随之退出。不调用join方法,主线程就不会等待它的子线程。

线程组
如果你需要创建几个线程,考虑使用一个线程组对象thread_group来组织它们。一个thread_group对象可以使用多种方法管理线程。首先,可以使用一个指向动态创建的线程对象的指针作为参数来调用
add_thread方法,将这个线程加入线程组。也可以直接使用线程组类的create_thread方法,可不先创建线程而直接把线程加入到线程组中。
当线程组对象的析构函数被调用时,它将删除(delete)所有这些通过add_thread方法加入的线程指针。所以,只能将堆上的线程对象指针通过add_thread方法加入线程组。remove_thread方法从线程组
删除某个线程的指针,但是我们仍需负责把线程本身内存释放掉。线程组对象的成员方法join_all方法等待线程组中所有线程结束,才返回。

boost::thread_group grp;
boost::thread *p = new boost::thread(threadFun);
grp.add_thread(p);
//do something...
grp.remove_thread(p);
 
grp.create_thread(threadFun);
grp.create_thread(threadFun);   //Now there are two threads in grp
 
grp.join_all();                 //Wait for all threads to finish

二、使资源是线程安全的

保证同一时刻多个线程不会同时修改同一个共享资源,那么这个程序是线程安全的,或者是串行化访问资源的。可以使用mutex类来控制线程的并发问题。

#include <iostream>
#include <boost/thread/thread.hpp>
#include <string>

// A simple queue class; don't do this, use std::queue
template<typename T>
class Queue 
{
public:
    Queue() {}
    ~Queue() {}

    void enqueue(const T& x)
    {
        // Lock the mutex for this queue
        boost::mutex::scoped_lock lock(mutex_);
        list_.push_back(x);
        // A scoped_lock is automatically destroyed (and thus unlocked)
        // when it goes out of scope
    }

    T dequeue() 
    {
        boost::mutex::scoped_lock lock(mutex_);

        if (list_.empty())
            throw "empty!";     // This leaves the current scope, so the
        T tmp = list_.front(); // lock is released
        list_.pop_front();
        return(tmp);
    } // Again: when scope ends, mutex_ is unlocked

private:
    std::list<T> list_;
    boost::mutex mutex_;
};

Queue<std::string> queueOfStrings;

void sendSomething() 
{
    std::string s;
    for (int i = 0; i < 10; ++i) 
    {
        queueOfStrings.enqueue("Cyrus");
    }
}

void recvSomething() {
    std::string s;

    for (int i = 0; i < 10; ++i) 
    {
        try 
        {
            s = queueOfStrings.dequeue();
        }
        catch (...) {}
    }
}

int main() 
{
    boost::thread thr1(sendSomething);
    boost::thread thr2(recvSomething);

    thr1.join();
    thr2.join();
}

mutex对象本身并不知道它代表什么,它仅仅是被多个消费者线程使用的资源访问的锁定解锁标志。在某个时刻,只有一个线程可以锁定这个mutex对象,这就阻止了同一时刻有多个线程并发
访问共享资源。一个mutex就是一个简单的信号机制。给mutex加解锁有多种策略,最简单的是使用scoped_lock类,它使用一个mutex参数来构造,并一直锁定这个mutex直到对象被销毁。如果
这个正在被构造的mutex已经被别的线程锁定的话,当前线程就会进入wait状态,直到这个锁被解开。

三、读写锁/boost 1.5.9没有此文件

mutex有一个美中不足,它不区分读和写。线程如果只是进行读操作,mutex强制线程串行化访问资源,效率低。而且这种操作不需要排他性访问。基于这个原因,Boost线程库提供了read_write_mutex。

#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/thread/read_write_mutex.hpp>
#include <string>

template<typename T>
class Queue 
{
public:
    Queue() :  // Use a read/write mutex and give writers priority
        rwMutex_(boost::read_write_scheduling_policy::writer_priority){}
    ~Queue() {}

    void enqueue(const T& x) 
    {
        // Use a r/w lock since enqueue updates the state
        boost::read_write_mutex::scoped_write_lock writeLock(rwMutex_);
        list_.push_back(x);
    }

    T dequeue() 
    {
        // Again, use a write lock
        boost::read_write_mutex::scoped_write_lock writeLock(rwMutex_);

        if (list_.empty())
            throw "empty!";
        T tmp = list_.front();
        list_.pop_front();
        return(tmp);
    }

    T getFront() {
        // This is a read-only operation, so you only need a read lock
        boost::read_write_mutex::scoped_read_lock readLock(rwMutex_);
        if (list_.empty())
            throw "empty!";
        return(list_.front());
    }

private:
    std::list<T> list_;
    boost::read_write_mutex rwMutex_;
};

Queue<std::string> queueOfStrings;

void sendSomething() 
{
    std::string s;

    for (int i = 0; i < 10; ++i) 
    {
        queueOfStrings.enqueue("Cyrus");
    }
}

void checkTheFront() 
{
    std::string s;

    for (int i = 0; i < 10; ++i) 
    {
        try 
        { 
            s = queueOfStrings.getFront(); 
        }
        catch (...) {}
    }
}

int main() 
{

    boost::thread thr1(sendSomething);
    boost::thread_group grp;

    grp.create_thread(checkTheFront);
    grp.create_thread(checkTheFront);
    grp.create_thread(checkTheFront);
    grp.create_thread(checkTheFront);

    thr1.join();
    grp.join_all();
}

注意Queue的构造函数中队读写锁rwMutex的初始化。同一时刻,可能有多个读写线程要锁定一个read_write_mutex,而这些锁的调度策略依赖于构造这个mutex时选定的调度策略。Boost库中提供了四种调度策略:

1)reader_priority:等待读锁的线程优先于等待写锁的线程
2)writer_priority:等待写锁的线程优先于等待读锁的线程
3)alternating_single_read:在读锁和写锁之间交替
4)alternating_many_reads:在读锁和写锁之间交替,这个策略将在两个写锁之间使得所有的在这个queue上挂起的读锁都被允许。

 选择使用哪种策略要慎重,因为使用前两种的话可能会导致某些锁始终不能成功,出现饿死的现象。

死锁、饿死和竞态条件
1)死锁,是涉及至少2个线程和2个资源的情况。线程A和B,资源X和Y。A锁定了X,而B锁定了Y。此时A和B有彼此想要对方的资源,死锁就出现了。
死锁的预防有两种方法。一种是,通过小心的按照一定的顺序对不同的mutex来加锁。另一种是,使用Boost提供的try_mutex互斥量和scoped_try_lock。或者使用时间锁。
scoped_try_lock对try_mutex加锁时,可能成功,也可能失败,但不会阻塞。时间锁则有一个超时时间。

bool dequeue(T& x)
{
    boost::try_mutex::scope_try_lock lock(tryMutex_);
    if(!lock.locked())
        return false;
    else{
        if (list_.empty())
            throw "empty!";
        x = list_.front();
        list_.pop_front();
        return true;
    }
}
private:
    boost::try_mutex tryMutex_;

2)饿死,如果你正在使用write_priority策略,并且你有很多创建写锁的线程,那么读锁的线程就可能饿死。
3)竞态条件

if(q.getFront() == "Cyrus")
{
   str = q.dequeue();
   //....
}

这个代码在单线程环境中工作很好,因为q在第一行和第二行代码之间不会被修改。多线程环境中则会出现问题。此为竞态条件。解决的方法是为Queue添加一个成员函数dequeueIfEquals,在函数执行过程中始终锁定互斥量。

四、从一个线程中给另一个线程发送通知

有的时候仅仅依靠锁住共享资源来使用它是不够的。有时候共享资源只有某些状态的时候才能够使用。比方说,某个线程如果要从堆栈中读取数据,那么如果栈中没有数据就必须等待数据被压栈。
这种情况下的同步使用互斥体是不够的。另一种同步的方式--条件变量,就可以使用在这种情况下。
条件变量的使用总是和互斥体及共享资源联系在一起的。线程首先锁住互斥体,然后检验共享资源的状态是否处于可使用的状态。如果不是,那么线程就要等待条件变量。要指向这样的操作就必
须在等待的时候将互斥体解锁,以便其他线程可以访问共享资源并改变其状态。它还得保证从等到得线程返回时互斥体是被上锁得。当另一个线程改变了共享资源的状态时,它就要通知正在等待
条件变量得线程,并将之返回等待的线程。

或者当需要线程等待某个事物时,可以创建一个condition对象,然后通过这个对象来通知那些等待的线程。

#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/thread/condition.hpp>
#include <boost/thread/mutex.hpp>
#include <list>
#include <string>

class Request { /*...*/ };

// A simple job queue class; don't do this, use std::queue
template<typename T>
class JobQueue 
{
public:
    JobQueue() {}
    ~JobQueue() {}

    void submitJob(const T& x)
    {
        boost::mutex::scoped_lock lock(mutex_);
        list_.push_back(x);
        workToBeDone_.notify_one();
    }

    T getJob() 
    {
        boost::mutex::scoped_lock lock(mutex_);

        workToBeDone_.wait(lock); // Wait until this condition is
        // satisfied, then lock the mutex
        T tmp = list_.front();
        list_.pop_front();
        return(tmp);
    }

private:
    std::list<T> list_;
    boost::mutex mutex_;
    boost::condition workToBeDone_;
};

JobQueue<Request> myJobQueue;

void boss() 
{
    for (;;) 
    {
        // Get the request from somewhere
        Request req;
        myJobQueue.submitJob(req);
    }
}

void worker()
{
    for (;;) 
    {
        Request r(myJobQueue.getJob());
        // Do something with the job...
    }
}

int main() 
{
    boost::thread thr1(boss);
    boost::thread thr2(worker);
    boost::thread thr3(worker);

    thr1.join();
    thr2.join();
    thr3.join();
}

boost::mutex::scoped_lock lock(mutex_)
workToBeDone_.wait(lock)
这两行代码,第一行锁定这个mutex对象。第二行代码解开这个mutex上的锁,然后进行等待或者休眠,直到它的条件得到了满足。这个mutex互斥对象的解锁让其他的线程能够使用这个mutex对象,它们中
的某个需要设置这个等待条件,之后通知另外的线程。notify_all函数,通知那些所有正在等待某个条件变为真的线程,那些线程随后进入运行状态。wait方法做两件事情:它一直等待直到有人在它正等
待的condition上调用notify_one或notify_all,然后它就试图锁定相关的mutex。当调用的是notify_all时,尽管多个等待的线程都尽量去获得下一个锁,但谁将获得依赖于这个mutex的类型和使用的优先
策略。一个condition对象能让消费者线程休眠,因此在还没有碰到一个condition时处理器可以去处理别的事情。例如一个web服务器使用一个工作线程池来处理进来的请求。当没有需求进来时,让这些子
线程处于等待状态比让它们循环的查询或者睡眠然后偶尔唤醒来检查这个队列,要好很多。

五、只初始化一次共享资源

如果一个引用程序要产生唯一的全局的对象,由于实例化顺序的问题,某个函数会被调用来返回一个静态的对象,它必须保证第一次被调用时就产生这个静态的对象。这里的问题就
是如果多个线程同时调用了这个函数,那么这个静态对象的构造函数就会被调用多次,这样错误产生了。

解决这个问题的方法就是所谓的“一次实现”(once routine)。“一次实现”在一个应用程序只能执行一次。如果多个线程想同时执行这个操作,那么真正执行的只有一个,而其他线程必须等这个操作结束。为了保证它只被执行一次,这个routine由另一个函数间接的调用,而这个函数传给它一个指针以及一个标志着这个routine是否已经被调用的特殊标志。这个标志是以静态的方式初始化的,这也就保证了它在编译期间就被初始化而不是运行时。因此也就没有多个线程同时将它初始化的问题了。Boost线程库提供了boost::call_once来支持“一次实现”,并且定义了一个标志boost::once_flag及一个初始化这个标志的宏BOOST_ONCE_INIT。

#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/thread/once.hpp>

// Some sort of connection class that should only be initialized once
struct Conn 
{
    static void init() { ++i_; }
    static boost::once_flag init_;
    static int i_;
    // ...
};

int Conn::i_ = 0;
boost::once_flag Conn::init_ = BOOST_ONCE_INIT;

void worker() 
{
    boost::call_once(Conn::init, Conn::init_);
    // Do the real work...
}

Conn c;  // You probably don't want to use a global, so see the
// next Recipe

int main() 
{

    boost::thread_group grp;

    for (int i = 0; i < 100; ++i)
        grp.create_thread(worker);

    grp.join_all();

    std::cout << c.i_ << '
'; // c.i_ = 1
}

一个共享资源不得不在某个地方被初始化,并且你希望第一次使用这个资源的线程来完成初始化工作。一个once_flag类型和call_once函数能够保证多个线程不会重复的初始化同一个对象。
首先,必须使用BOOST_ONCE_INIT宏来初始化这个once_flag对象。boost::once_flag Conn::init_ = BOOST_ONCE_INIT; 之后调用call_once函数,boost::call_once(Conn::init, Conn::init_);
第一个形参是希望被执行一次的初始化函数的地址。

六、给线程函数传递一个参数

#include <iostream>
#include <string>
#include <functional>
#include <boost/thread/thread.hpp>

// A typedef to make the declarations below easier to read
typedef void(*WorkerFunPtr)(const std::string&);

template<typename FunT,   // The type of the function being called
    typename ParamT> // The type of its parameter
struct Adapter 
{
    Adapter(FunT f, ParamT& p) : // Construct this adapter and set the
        f_(f), p_(&p) {}          // members to the function and its arg

    void operator( )() { // This just calls the function with its arg
        f_(*p_);
    }
private:
    FunT    f_;
    ParamT* p_;  // Use the parameter's address to avoid extra copying
};

void worker(const std::string& s) 
{
    std::cout << s << '
';
}

int main() {

    std::string s1 = "This is the first thread!";
    std::string s2 = "This is the second thread!";

    boost::thread thr1(Adapter<WorkerFunPtr, std::string>(worker, s1));
    boost::thread thr2(Adapter<WorkerFunPtr, std::string>(worker, s2));

    thr1.join();
    thr2.join();
}

使用这个函数适配器类模板,你就可以给线程函数传递参数了。如果你需要传递多个参数,仅需要在这个适配器中增加另一个类型和成员变量。或者使用bind方式直接传递参数

七、线程局部存储

大多数函数都不是可重入的。这也就是说在某一个线程已经调用了一个函数时,如果你再调用同一个函数,那么这样是不安全的。一个不可重入的函数通过连续的调用来保存静态变量或者是返回
一个指向静态数据的指针。 举例来说,std::strtok就是不可重入的,因为它使用静态变量来保存要被分割成符号的字符串。
有两种方法可以让不可重用的函数变成可重用的函数。第一种方法就是改变接口,用指针或引用代替原先使用静态数据的地方。比方说,POSIX定义了strok_r,std::strtok中的一个可重入的变量,
它用一个额外的char**参数来代替静态数据。这种方法很简单,而且提供了可能的最佳效果。但是这样必须改变公共接口,也就意味着必须改代码。另一种方法不用改变公有接口,而是用本地存储
线程(thread local storage)来代替静态数据(有时也被成为特殊线程存储,thread-specific storage)。
Boost线程库提供了智能指针boost::thread_specific_ptr来访问本地存储线程。每一个线程第一次使用这个智能指针的实例时,它的初值是NULL,所以必须要先检查这个它的只是否为空,并且为
它赋值。Boost线程库保证本地存储线程中保存的数据会在线程结束后被清除。

List5是一个使用boost::thread_specific_ptr的简单例子。其中创建了两个线程来初始化本地存储线程,并有10次循环,每一次都会增加智能指针指向的值,并将其输出到std::cout上(由于
std::cout是一个共享资源,所以通过互斥体进行同步)。main线程等待这两个线程结束后就退出。从这个例子输出可以明白的看出每个线程都处理属于自己的数据实例,尽管它们都是使用同一
个boost::thread_specific_ptr。

#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/tss.hpp>
#include <iostream>

boost::mutex io_mutex;
boost::thread_specific_ptr<int> ptr;

struct count
{
    count(int id) : id(id) { }

    void operator()()
    {
        if (ptr.get() == 0)
            ptr.reset(new int(0));

        for (int i = 0; i < 10; ++i)
        {
            (*ptr)++;
            boost::mutex::scoped_lock
                lock(io_mutex);
            std::cout << id << ": "
                << *ptr << std::endl;
        }
    }

    int id;
};
int main(int argc, char* argv[])
{
    boost::thread thrd1(count(1));
    boost::thread thrd2(count(2));
    thrd1.join();
    thrd2.join();

    getchar();
    return 0;
}