memcache研究札记 之 socket接入与工作线程分发Model

memcache研究笔记 之 socket接入与工作线程分发Model
// 服务器端  
struct event_base* main_base;  
static const char MESSAGE[] ="Hello, World!\n";  
typedef struct{
  int index;
  int notify_recv_fd;
  int notify_send_fd;
  struct event_base *base;
  struct event notify_event; 
  //int conn_queue *new_conn_queue;
}LIBEVENT_THREAD;
static LIBEVENT_THREAD *threads;
static int nthreads = 4;
  
void conn_new(const int sfd, const short event, void *arg)  
{  
    //cout<<"accept handle"<<endl;  
    struct sockaddr_in addr;  
    socklen_t addrlen = sizeof(addr);  
    int fd = accept(sfd, (struct sockaddr *) &addr, &addrlen); //处理连接  
    struct bufferevent* buf_ev;  
    buf_ev = bufferevent_new(fd, NULL, NULL, NULL, NULL);  
    buf_ev->wm_read.high = 4096;  
    //cout<<"event write"<<endl;  
    bufferevent_write(buf_ev, MESSAGE, strlen(MESSAGE));  
    //cq_push
    char buf[1];
    buf[0] = 'c';
    int index = rand()%nthreads;
    if (write(threads[index].notify_send_fd, buf, 1) != 1)
      printf("write pipe failed ... ... \r\n");
}  

static void *worker_libevent(void *arg)
{
   LIBEVENT_THREAD *me = (LIBEVENT_THREAD *)arg; 
   event_base_loop(me->base, 0);
   return NULL;
}
static void create_worker(void *(*func)(void *), void *arg)
{
  pthread_t thread;
  pthread_attr_t attr;
  int ret;
  pthread_attr_init(&attr);
  if ((ret=pthread_create(&thread, &attr, func, arg))  != 0)
  {
    printf("pthread create failed \r\n");
    exit(1);
  }
}
static void thread_libevent_proc(int fd, short int, void *arg)
{
  LIBEVENT_THREAD *me = (LIBEVENT_THREAD*)arg;
  char buf[1];
  memset(buf, 0, 1);
  if (read(fd, buf, 1) != 1)
  {
    //cq_pop
    printf("read from pipe failed... \r\n");
    exit(1);
  }
  printf("New client come, %c thread:%d is working. \r\n", buf[0], me->index);
}
int main()  
{  
    cout<<"Server start !"<<endl;  
    //start workthreads 
    threads = (LIBEVENT_THREAD *)calloc(nthreads, sizeof(LIBEVENT_THREAD));
    //error check
    for (int i=0; i<nthreads; ++i)
    {
      //thread data prepare
      threads[i].index = i;
      int fds[2];
      if (pipe(fds) != 0){
        printf("pipe failed !");
        exit(1);
      }
      threads[i].notify_recv_fd = fds[0];
      threads[i].notify_send_fd = fds[1];
      threads[i].base = event_init();
      LIBEVENT_THREAD *me = &threads[i];
      event_set(&me->notify_event, me->notify_recv_fd, EV_READ|EV_PERSIST, thread_libevent_proc, me);
      event_base_set(me->base, &me->notify_event);
      if (event_add(&me->notify_event, 0) == -1){
        printf("event_add failed \r\n");
        exit(1);
      }
      create_worker(worker_libevent, &threads[i]); 
    }   

    main_base = event_init();  
    if(main_base)  
        cout<<"init event ok!"<<endl;  
  
    int sListen;  
  
    // Create listening socket  
    sListen = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);  
  
    struct sockaddr_in server_addr;  
    bzero(&server_addr,sizeof(struct sockaddr_in));  
    server_addr.sin_family=AF_INET;  
    server_addr.sin_addr.s_addr=htonl(INADDR_ANY);  
    int portnumber = 8080;  
    server_addr.sin_port = htons(portnumber);  
  
    if(bind(sListen,(struct sockaddr *)(&server_addr),sizeof(struct sockaddr))==-1)  
    {  
        cout<<"error!"<<endl;  
        return -1;  
    }  
  
    // Listen  
    ::listen(sListen, 3);  
    cout<<"Server is listening!\n"<<endl;  
  
    int flags = ::fcntl(sListen, F_GETFL);  
    flags |= O_NONBLOCK;  
    fcntl(sListen, F_SETFL, flags);  
  
    struct event ev;  
    event_set(&ev, sListen, EV_READ | EV_PERSIST, conn_new, (void *)&ev);  
  
    event_add(&ev, NULL);  
  
    event_base_loop(main_base, 0);  

    cout<<"over!"<<endl;  

}  


$ ./test 
Server start !
init event ok!
Server is listening!
New client come, c thread:3 is working. 
New client come, c thread:2 is working. 
New client come, c thread:1 is working. 
New client come, c thread:3 is working. 
New client come, c thread:1 is working. 
New client come, c thread:3 is working. 
New client come, c thread:2 is working. 
New client come, c thread:0 is working. 

后续采用均衡算法,分发到比较空闲的work thread