Linux下简单的多线程编程--线程池的实现 talk is cheap show me the code

/*  写在前面的话:

    今天刚“开原”,选择了一篇关于线程池的文件与大家分享,希望能对您学习有所帮助,也希望能与大家共同学习!

    选择在这个特殊的时候注册并发文章也是有一些我个人特殊的意义的,看我的id(西游小学生.45)就知道了,哈哈。在这里也很感谢博客园的员工,刚发申请两分钟就同意了。

*/

  最近由于要写一个类似于QQ的程序,所以想到要用到多线程。既然要用多线程,那何不写一个线程池?于是上网搜了搜多线程的代码,发现大多都不是很完善,或者有些小bug。所以,在这里贴出一个完整的,经过我多重测试的,先贴上一个线程池的简单实现过程(固定大小的线程池),稍后和网络编程的结合过些天会再贴出来。这里引用Linux的一句话:“

”。所以我的博客大部分都会给大家贴代码的,谢谢。

首先,不得不提的就是线程池的好处:

  简单来说,如果调用一个线程分为:

         t1创建线程

         t2完成作业

         t3销毁线程

  那么如果我们有100个作业要完成,总时间T=100*(t1+t2+t3)。但如果我们提前申请一个10个线程的固定大小线程池,那么完成作业的总时间为申请10个线程和销毁10个线程以及100个作业的时间,及T=10*t1+100*t2+10*t3。所以,线程池最大的优势就是节省了线程申请以及销毁的时间!

  下面是具体的代码实现:

  

首先,一般他会有两个数据结构,一个任务队列,一个线程池结构体
typedef struct tpool_work {                   任务队列
    void*               (*routine)(int);            任务函数,该任务的处理函数 
    int                arg;                                 传入任务函数的参数 
    struct tpool_work   *next;                   任务队列的下一个节点 
}tpool_work_t;
 
typedef struct tpool {                             线程池
    int             shutdown;                         线程池是否销毁 
    int             max_thr_num;                   最大线程数 
    pthread_t       *thr_id;                         线程ID数组 
    tpool_work_t    *queue_head;           任务队列的头结点 
    pthread_mutex_t queue_lock;          互斥锁          
    pthread_cond_t  queue_ready;         条件变量
}tpool_t;

第一步:创建线程池:申请线程池结构体(该结构体是一个全局变量,在这里给他初始化,即申请空间malloc),初始化变量(锁,条件变量),申请线程id数组,创建线程pthread_create,让线程执行pthead_run函数,(使用条件变量前会加锁,条件变量wait时会自动解锁) run函数中会使用pthread_cond_wait等待条件变量被唤醒
第二步:增加工作:申请任务节点,malloc,并初始化,将参数传过来的任务函数,函数参数赋值给任务节点结构体,然后将任务挂到全局变量线程池的任务队列上,(此操作需要加锁再解锁)并使用pthread_cond_signal,给条件变量发信号
第三部:run函数接收到信号后会继续执行,此时获取到全局变量线程池任务队列头结点,取得任务节点执行对应函数即可,执行完while(1)回到条件变量的pthread_cond_wait处

/*************************************************************************
> File Name: tpool.h
> Author: 
> Mail: 
> Created Time: 2015年04月01日 星期三 17时34分00秒
************************************************************************/

#ifndef THREAD_POOL_H__
#define THREAD_POOL_H__

#include <pthread.h>

/* 要执行的任务链表 */
typedef struct tpool_work {
void* (*routine)(int); /* 任务函数 */
int arg; /* 传入任务函数的参数 */
struct tpool_work *next; 
}tpool_work_t;

typedef struct tpool {
int shutdown; /* 线程池是否销毁 */
int max_thr_num; /* 最大线程数 */
pthread_t *thr_id; /* 线程ID数组 */
tpool_work_t *queue_head; /* 线程链表 */
pthread_mutex_t queue_lock; 
pthread_cond_t queue_ready; 
}tpool_t;

/*
* @brief 创建线程池 
* @param max_thr_num 最大线程数
* @return 0: 成功 其他: 失败 
*/
int
tpool_create(int max_thr_num);

/*
* @brief 销毁线程池 
*/
void
tpool_destroy();

/*
* @brief 向线程池中添加任务
* @param routine 任务函数指针
* @param arg 任务函数参数
* @return 0: 成功 其他:失败 
*/
int
tpool_add_work(void*(*routine)(int), int arg);

#endif

/*************************************************************************
> File Name: func.c
> Author: 
> Mail: 
> Created Time: 2015年04月01日 星期三 17时35分56秒
************************************************************************/

#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>
#include "tpool.h"

static tpool_t *tpool = NULL;

void 
print_work(tpool_work_t *head) //输出任务列队当前工作
{
tpool_work_t *work= head;
printf(" 当前任务列队里还有的任务是:(任务的参数是) ");
while(work){
printf("%d ",work->arg);
work = work->next;
}
printf(" ");
}

/* 工作者线程函数, 从任务链表中取出任务并执行 */
static void* 
thread_routine(void *arg)
{
tpool_work_t *work; //任务链表

while(1) {
/* 如果线程池没有被销毁且没有任务要执行,则等待 */
pthread_mutex_lock(&tpool->queue_lock); //锁住线程池
while(!tpool->queue_head && !tpool->shutdown) {
pthread_cond_wait(&tpool->queue_ready, &tpool->queue_lock);
}
if (tpool->shutdown) { //0没有注销
pthread_mutex_unlock(&tpool->queue_lock);
pthread_exit(NULL);
}
sleep(3);
// print_work(tpool->queue_head);
//取出任务链表头的一个工作
work = tpool->queue_head;
tpool->queue_head = tpool->queue_head->next;
pthread_mutex_unlock(&tpool->queue_lock);

work->routine(work->arg);
free(work);
}

return NULL; 
}

/*
* 创建线程池 
*/
int
tpool_create(int max_thr_num)
{
int i;

tpool = calloc(1, sizeof(tpool_t)); //线程池tpool
if (!tpool) {
printf("%s: calloc failed ", __FUNCTION__);
exit(1);
}

/* 初始化 */
tpool->max_thr_num = max_thr_num; //初始化最大线程池数
tpool->shutdown = 0; //线程池注销设为0未注
tpool->queue_head = NULL; //线程池链表
if (pthread_mutex_init(&tpool->queue_lock, NULL) !=0) { //初始化线程锁
printf("%s: pthread_mutex_init failed, errno:%d, error:%s ",
__FUNCTION__, errno, strerror(errno));
exit(1);
}
if (pthread_cond_init(&tpool->queue_ready, NULL) !=0 ) { //初始化条件变量
printf("%s: pthread_cond_init failed, errno:%d, error:%s ", 
__FUNCTION__, errno, strerror(errno));
exit(1);
}

/* 创建工作者线程 */
tpool->thr_id = calloc(max_thr_num, sizeof(pthread_t));//申请线程ID数组
if (!tpool->thr_id) {
printf("%s: calloc failed ", __FUNCTION__);
exit(1);
}
for (i = 0; i < max_thr_num; ++i) {
if (pthread_create(&tpool->thr_id[i], NULL, thread_routine, NULL) != 0){
printf("%s:pthread_create failed, errno:%d, error:%s ", __FUNCTION__, 
errno, strerror(errno));
exit(1);
}



return 0;
}

/* 销毁线程池 */
void
tpool_destroy()
{
int i;
tpool_work_t *member;

if (tpool->shutdown) {
return;
}

tpool->shutdown = 1;
/* 通知所有正在等待的线程 */
pthread_mutex_lock(&tpool->queue_lock);
pthread_cond_broadcast(&tpool->queue_ready);
pthread_mutex_unlock(&tpool->queue_lock);
for (i = 0; i < tpool->max_thr_num; ++i) {
pthread_join(tpool->thr_id[i], NULL);
}
free(tpool->thr_id);


while(tpool->queue_head) {
member = tpool->queue_head;
tpool->queue_head = tpool->queue_head->next;
free(member);
}

pthread_mutex_destroy(&tpool->queue_lock); 
pthread_cond_destroy(&tpool->queue_ready);

free(tpool); 
}

/* 向线程池添加任务 */
int
tpool_add_work(void*(*routine)(int), int arg)
{
tpool_work_t *work, *member;

if (!routine){
printf("%s:Invalid argument ", __FUNCTION__);
return -1;
}

work = malloc(sizeof(tpool_work_t)); //申请一个任务结点
if (!work) { //申请失败
printf("%s:malloc failed ", __FUNCTION__);
return -1;
}
work->routine = routine; //线程执行函数设为main中传进来的参数
work->arg = arg; //线程执行函数的参数设置为main中传进来的参数
work->next = NULL; 

pthread_mutex_lock(&tpool->queue_lock); //锁住线程池锁 
member = tpool->queue_head; 
if (!member) { //找到线程链表中最后一个结点,并该工作放置于线程链表尾
tpool->queue_head = work;
} else {
while(member->next) {
member = member->next;
}
member->next = work;
}
/* 通知工作者线程,有新任务添加 */
pthread_cond_signal(&tpool->queue_ready);
pthread_mutex_unlock(&tpool->queue_lock);

return 0; 
}


void *func(int arg)
{
printf("thread %d threadID is: %d ",arg,(int)pthread_self());
print_work(tpool->queue_head);
sleep(3);
return NULL;
}

int
main(int arg, char **argv)
{
if (tpool_create(5) != 0) {
printf("tpool_create failed ");
exit(1);
}
int i;
for (i = 0; i < 10; ++i) {
tpool_add_work(func, i);
// usleep(1);
}
sleep(10); //如果sleep无或时间太短,注意主线程可能先于其他线程完成导致部分作业未完成
tpool_destroy();
return 0;
}