linux下C语言实现多线程通信—环形缓冲区,可用于生产者(producer)/消费者(consumer)【转】

转自:http://blog.chinaunix.net/uid-28458801-id-4262445.html

操作系统:ubuntu10.04

前言:
    在嵌入式开发中,只要是带操作系统的,在其上开发产品应用,基本都需要用到多线程。
    为了提高效率,尽可能的提高并发率。因此,线程之间的通信就是问题的核心。
    根据当前产品需要,使用 环形缓冲区 解决。

一,环形缓冲区的实现
    1,cbuf.h

点击(此处)折叠或打开

  • #ifndef __CBUF_H__
  • #define __CBUF_H__

  • #ifdef __cplusplus
  • extern "C" {
  • #endif

  • /* Define to prevent recursive inclusion
  • -------------------------------------*/
  • #include "types.h"
  • #include "thread.h"


  • typedef    struct _cbuf
  • {
  •     int32_t        size;            /* 当前缓冲区中存放的数据的个数 */
  •     int32_t        next_in;        /* 缓冲区中下一个保存数据的位置 */
  •     int32_t        next_out;        /* 从缓冲区中取出下一个数据的位置 */
  •     int32_t        capacity;        /* 这个缓冲区的可保存的数据的总个数 */
  •     mutex_t        mutex;            /* Lock the structure */
  •     cond_t        not_full;        /* Full -> not full condition */
  •     cond_t        not_empty;        /* Empty -> not empty condition */
  •     void        *data[CBUF_MAX];/* 缓冲区中保存的数据指针 */
  • }cbuf_t;


  • /* 初始化环形缓冲区 */
  • extern    int32_t        cbuf_init(cbuf_t *c);

  • /* 销毁环形缓冲区 */
  • extern    void        cbuf_destroy(cbuf_t    *c);

  • /* 压入数据 */
  • extern    int32_t        cbuf_enqueue(cbuf_t *c,void *data);

  • /* 取出数据 */
  • extern    void*        cbuf_dequeue(cbuf_t *c);


  • /* 判断缓冲区是否为满 */
  • extern    bool        cbuf_full(cbuf_t    *c);

  • /* 判断缓冲区是否为空 */
  • extern    bool        cbuf_empty(cbuf_t *c);

  • /* 获取缓冲区可存放的元素的总个数 */
  • extern    int32_t        cbuf_capacity(cbuf_t *c);


  • #ifdef __cplusplus
  • }
  • #endif

  • #endif
  • /* END OF FILE
  • ---------------------------------------------------------------*/

  •     2,cbuf.c

    点击(此处)折叠或打开

  • #include "cbuf.h"



  • /* 初始化环形缓冲区 */
  • int32_t        cbuf_init(cbuf_t *c)
  • {
  •     int32_t    ret = OPER_OK;

  •     if((ret = mutex_init(&c->mutex)) != OPER_OK)    
  •     {
  • #ifdef DEBUG_CBUF
  •     debug("cbuf init fail ! mutex init fail ! ");
  • #endif
  •         return ret;
  •     }

  •     if((ret = cond_init(&c->not_full)) != OPER_OK)    
  •     {
  • #ifdef DEBUG_CBUF
  •     debug("cbuf init fail ! cond not full init fail ! ");
  • #endif
  •         mutex_destroy(&c->mutex);
  •         return ret;
  •     }

  •     if((ret = cond_init(&c->not_empty)) != OPER_OK)
  •     {
  • #ifdef DEBUG_CBUF
  •     debug("cbuf init fail ! cond not empty init fail ! ");
  • #endif
  •         cond_destroy(&c->not_full);
  •         mutex_destroy(&c->mutex);
  •         return ret;
  •     }

  •     c->size     = 0;
  •     c->next_in    = 0;
  •     c->next_out = 0;
  •     c->capacity    = CBUF_MAX;

  • #ifdef DEBUG_CBUF
  •     debug("cbuf init success ! ");
  • #endif

  •     return ret;
  • }


  • /* 销毁环形缓冲区 */
  • void        cbuf_destroy(cbuf_t    *c)
  • {
  •     cond_destroy(&c->not_empty);
  •     cond_destroy(&c->not_full);
  •     mutex_destroy(&c->mutex);

  • #ifdef DEBUG_CBUF
  •     debug("cbuf destroy success ");
  • #endif
  • }



  • /* 压入数据 */
  • int32_t        cbuf_enqueue(cbuf_t *c,void *data)
  • {
  •     int32_t    ret = OPER_OK;

  •     if((ret = mutex_lock(&c->mutex)) != OPER_OK)    return ret;

  •     /*
  •      * Wait while the buffer is full.
  •      */
  •     while(cbuf_full(c))
  •     {
  • #ifdef DEBUG_CBUF
  •     debug("cbuf is full !!! ");
  • #endif
  •         cond_wait(&c->not_full,&c->mutex);
  •     }

  •     c->data[c->next_in++] = data;
  •     c->size++;
  •     c->next_in %= c->capacity;

  •     mutex_unlock(&c->mutex);

  •     /*
  •      * Let a waiting consumer know there is data.
  •      */
  •     cond_signal(&c->not_empty);

  • #ifdef DEBUG_CBUF
  • //    debug("cbuf enqueue success ,data : %p ",data);
  •     debug("enqueue ");
  • #endif

  •     return ret;
  • }



  • /* 取出数据 */
  • void*        cbuf_dequeue(cbuf_t *c)
  • {
  •     void     *data     = NULL;
  •     int32_t    ret     = OPER_OK;

  •     if((ret = mutex_lock(&c->mutex)) != OPER_OK)    return NULL;

  •        /*
  •      * Wait while there is nothing in the buffer
  •      */
  •     while(cbuf_empty(c))
  •     {
  • #ifdef DEBUG_CBUF
  •     debug("cbuf is empty!!! ");
  • #endif
  •         cond_wait(&c->not_empty,&c->mutex);
  •     }

  •     data = c->data[c->next_out++];
  •     c->size--;
  •     c->next_out %= c->capacity;

  •     mutex_unlock(&c->mutex);


  •     /*
  •      * Let a waiting producer know there is room.
  •      * 取出了一个元素,又有空间来保存接下来需要存储的元素
  •      */
  •     cond_signal(&c->not_full);

  • #ifdef DEBUG_CBUF
  • //    debug("cbuf dequeue success ,data : %p ",data);
  •     debug("dequeue ");
  • #endif

  •     return data;
  • }


  • /* 判断缓冲区是否为满 */
  • bool        cbuf_full(cbuf_t    *c)
  • {
  •     return (c->size == c->capacity);
  • }

  • /* 判断缓冲区是否为空 */
  • bool        cbuf_empty(cbuf_t *c)
  • {
  •     return (c->size == 0);
  • }

  • /* 获取缓冲区可存放的元素的总个数 */
  • int32_t        cbuf_capacity(cbuf_t *c)
  • {
  •     return c->capacity;
  • }


  • 二,辅助文件
        为了提高程序的移植性,对线程相关进行封装。
        1,thread.h

    点击(此处)折叠或打开

  • #ifndef __THREAD_H__
  • #define __THREAD_H__

  • #ifdef __cplusplus
  • extern "C" {
  • #endif

  • /* Define to prevent recursive inclusion
  • -------------------------------------*/
  • #include "types.h"





  • typedef    struct _mutex
  • {
  •     pthread_mutex_t        mutex;
  • }mutex_t;


  • typedef    struct _cond
  • {
  •     pthread_cond_t        cond;
  • }cond_t;


  • typedef    pthread_t        tid_t;
  • typedef    pthread_attr_t    attr_t;
  • typedef    void*    (* thread_fun_t)(void*);


  • typedef    struct _thread
  • {
  •     tid_t            tid;
  •     cond_t            *cv;
  •     int32_t            state;
  •     int32_t            stack_size;
  •     attr_t         attr;
  •     thread_fun_t    fun;
  • }thread_t;



  • /* mutex */
  • extern    int32_t        mutex_init(mutex_t    *m);
  • extern    int32_t        mutex_destroy(mutex_t    *m);
  • extern    int32_t        mutex_lock(mutex_t    *m);
  • extern    int32_t        mutex_unlock(mutex_t    *m);


  • /* cond */
  • extern    int32_t        cond_init(cond_t    *c);
  • extern    int32_t        cond_destroy(cond_t    *c);
  • extern    int32_t        cond_signal(cond_t *c);
  • extern    int32_t        cond_wait(cond_t    *c,mutex_t *m);



  • /* thread */
  • /* 线程的创建,其属性的设置等都封装在里面 */
  • extern    int32_t        thread_create(thread_t *t);
  • //extern    int32_t        thread_init(thread_t    *t);

  • #define    thread_join(t, p)     pthread_join(t, p)
  • #define    thread_self()        pthread_self()
  • #define    thread_sigmask        pthread_sigmask


  • #ifdef __cplusplus
  • }
  • #endif

  • #endif
  • /* END OF FILE
  • ---------------------------------------------------------------*/

  •     2,thread.c

    点击(此处)折叠或打开

  • #include "thread.h"




  • /* mutex */
  • int32_t        mutex_init(mutex_t    *m)
  • {
  •     int32_t        ret = OPER_OK;

  •     if((ret = pthread_mutex_init(&m->mutex, NULL)) != 0)
  •         ret = -THREAD_MUTEX_INIT_ERROR;

  •     return ret;
  • }


  • int32_t        mutex_destroy(mutex_t    *m)
  • {
  •     int32_t        ret = OPER_OK;

  •     if((ret = pthread_mutex_destroy(&m->mutex)) != 0)
  •         ret = -MUTEX_DESTROY_ERROR;

  •     return ret;
  • }



  • int32_t        mutex_lock(mutex_t    *m)
  • {
  •     int32_t        ret = OPER_OK;

  •     if((ret = pthread_mutex_lock(&m->mutex)) != 0)
  •         ret = -THREAD_MUTEX_LOCK_ERROR;

  •     return ret;
  • }



  • int32_t        mutex_unlock(mutex_t    *m)
  • {
  •     int32_t        ret = OPER_OK;

  •     if((ret = pthread_mutex_unlock(&m->mutex)) != 0)
  •         ret = -THREAD_MUTEX_UNLOCK_ERROR;
  •     
  •     return ret;
  • }






  • /* cond */
  • int32_t        cond_init(cond_t    *c)
  • {
  •     int32_t        ret = OPER_OK;

  •     if((ret = pthread_cond_init(&c->cond, NULL)) != 0)
  •         ret = -THREAD_COND_INIT_ERROR;

  •     return ret;
  • }



  • int32_t        cond_destroy(cond_t    *c)
  • {
  •     int32_t        ret = OPER_OK;

  •     if((ret = pthread_cond_destroy(&c->cond)) != 0)
  •         ret = -COND_DESTROY_ERROR;
  •     
  •     return ret;
  • }



  • int32_t        cond_signal(cond_t *c)
  • {
  •     int32_t        ret = OPER_OK;


  •     if((ret = pthread_cond_signal(&c->cond)) != 0)
  •         ret = -COND_SIGNAL_ERROR;
  •     
  •     return ret;
  • }




  • int32_t        cond_wait(cond_t    *c,mutex_t *m)
  • {
  •     int32_t        ret = OPER_OK;

  •     if((ret = pthread_cond_wait(&c->cond, &m->mutex)) != 0)
  •         ret = -COND_WAIT_ERROR;    
  •     
  •     return ret;
  • }


  • 三,测试
        1,测试代码

    点击(此处)折叠或打开

  • /*
  •  * cbuf begin
  •  */
  • #define        OVER    (-1)

  • static        cbuf_t    cmd;
  • static        int        line_1[200];
  • static        int        line_2[200];
  • //static        int        temp = 0;

  • static        bool    line1_finish = false;
  • static        bool    line2_finish = false;

  • void*    producer_1(void *data)
  • {
  •     int32_t    i = 0;

  •     for(i = 0; i < 200; i++)
  •     {
  •         line_1[i] = i+1000;
  •         cbuf_enqueue(&cmd, &line_1[i]);

  •         if(0 == (i % 9)) sleep(1);
  •     }

  •     line1_finish = true;

  •     return NULL;
  • }

  • void*    producer_2(void *data)
  • {
  •     int32_t    i = 0;

  •     for(i = 0; i < 200; i++)
  •     {
  •         line_2[i] = i+20000;
  •         cbuf_enqueue(&cmd, &line_2[i]);

  •         if(0 == (i % 9)) sleep(1);
  •     }

  •     line2_finish = true;

  •     return NULL;
  • }


  • void*    consumer(void *data)
  • {
  •     int32_t        *ptr = NULL;

  •     while(1)
  •     {
  •         ptr = cbuf_dequeue(&cmd);
  •         printf("%d ",*ptr);

  •         if(cbuf_empty(&cmd) && line2_finish && line1_finish)
  •         {
  •             printf("quit ");
  •             break;
  •         }
  •     }

  •     return NULL;
  • }


  • void    test_cbuf_oper(void)
  • {
  •     pthread_t    l_1;
  •     pthread_t    l_2;
  •     pthread_t    c;
  •     
  •     cbuf_init(&cmd);

  •     pthread_create(&l_1,NULL,producer_1,0);
  •     pthread_create(&l_2,NULL,producer_2,0);
  •     pthread_create(&c,NULL,consumer,0);

  •     pthread_join(l_1,NULL);
  •     pthread_join(l_2,NULL);
  •     pthread_join(c,NULL);

  •     cbuf_destroy(&cmd);
  • }


  • void    test_cbuf(void)
  • {
  •     test_cbuf_oper();
  • }


  • /*
  •  * cbuf end
  •  */

  •     2,测试结果
    linux下C语言实现多线程通信—环形缓冲区,可用于生产者(producer)/消费者(consumer)【转】


    四,参考文件
    1,《bareos-master》源码
    2,《nginx》源码