memcache研究札记 之 socket接入与工作线程分发Model
memcache研究笔记 之 socket接入与工作线程分发Model
// 服务器端
struct event_base* main_base;
static const char MESSAGE[] ="Hello, World!\n";
typedef struct{
int index;
int notify_recv_fd;
int notify_send_fd;
struct event_base *base;
struct event notify_event;
//int conn_queue *new_conn_queue;
}LIBEVENT_THREAD;
static LIBEVENT_THREAD *threads;
static int nthreads = 4;
void conn_new(const int sfd, const short event, void *arg)
{
//cout<<"accept handle"<<endl;
struct sockaddr_in addr;
socklen_t addrlen = sizeof(addr);
int fd = accept(sfd, (struct sockaddr *) &addr, &addrlen); //处理连接
struct bufferevent* buf_ev;
buf_ev = bufferevent_new(fd, NULL, NULL, NULL, NULL);
buf_ev->wm_read.high = 4096;
//cout<<"event write"<<endl;
bufferevent_write(buf_ev, MESSAGE, strlen(MESSAGE));
//cq_push
char buf[1];
buf[0] = 'c';
int index = rand()%nthreads;
if (write(threads[index].notify_send_fd, buf, 1) != 1)
printf("write pipe failed ... ... \r\n");
}
static void *worker_libevent(void *arg)
{
LIBEVENT_THREAD *me = (LIBEVENT_THREAD *)arg;
event_base_loop(me->base, 0);
return NULL;
}
static void create_worker(void *(*func)(void *), void *arg)
{
pthread_t thread;
pthread_attr_t attr;
int ret;
pthread_attr_init(&attr);
if ((ret=pthread_create(&thread, &attr, func, arg)) != 0)
{
printf("pthread create failed \r\n");
exit(1);
}
}
static void thread_libevent_proc(int fd, short int, void *arg)
{
LIBEVENT_THREAD *me = (LIBEVENT_THREAD*)arg;
char buf[1];
memset(buf, 0, 1);
if (read(fd, buf, 1) != 1)
{
//cq_pop
printf("read from pipe failed... \r\n");
exit(1);
}
printf("New client come, %c thread:%d is working. \r\n", buf[0], me->index);
}
int main()
{
cout<<"Server start !"<<endl;
//start workthreads
threads = (LIBEVENT_THREAD *)calloc(nthreads, sizeof(LIBEVENT_THREAD));
//error check
for (int i=0; i<nthreads; ++i)
{
//thread data prepare
threads[i].index = i;
int fds[2];
if (pipe(fds) != 0){
printf("pipe failed !");
exit(1);
}
threads[i].notify_recv_fd = fds[0];
threads[i].notify_send_fd = fds[1];
threads[i].base = event_init();
LIBEVENT_THREAD *me = &threads[i];
event_set(&me->notify_event, me->notify_recv_fd, EV_READ|EV_PERSIST, thread_libevent_proc, me);
event_base_set(me->base, &me->notify_event);
if (event_add(&me->notify_event, 0) == -1){
printf("event_add failed \r\n");
exit(1);
}
create_worker(worker_libevent, &threads[i]);
}
main_base = event_init();
if(main_base)
cout<<"init event ok!"<<endl;
int sListen;
// Create listening socket
sListen = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
struct sockaddr_in server_addr;
bzero(&server_addr,sizeof(struct sockaddr_in));
server_addr.sin_family=AF_INET;
server_addr.sin_addr.s_addr=htonl(INADDR_ANY);
int portnumber = 8080;
server_addr.sin_port = htons(portnumber);
if(bind(sListen,(struct sockaddr *)(&server_addr),sizeof(struct sockaddr))==-1)
{
cout<<"error!"<<endl;
return -1;
}
// Listen
::listen(sListen, 3);
cout<<"Server is listening!\n"<<endl;
int flags = ::fcntl(sListen, F_GETFL);
flags |= O_NONBLOCK;
fcntl(sListen, F_SETFL, flags);
struct event ev;
event_set(&ev, sListen, EV_READ | EV_PERSIST, conn_new, (void *)&ev);
event_add(&ev, NULL);
event_base_loop(main_base, 0);
cout<<"over!"<<endl;
// 服务器端
struct event_base* main_base;
static const char MESSAGE[] ="Hello, World!\n";
typedef struct{
int index;
int notify_recv_fd;
int notify_send_fd;
struct event_base *base;
struct event notify_event;
//int conn_queue *new_conn_queue;
}LIBEVENT_THREAD;
static LIBEVENT_THREAD *threads;
static int nthreads = 4;
void conn_new(const int sfd, const short event, void *arg)
{
//cout<<"accept handle"<<endl;
struct sockaddr_in addr;
socklen_t addrlen = sizeof(addr);
int fd = accept(sfd, (struct sockaddr *) &addr, &addrlen); //处理连接
struct bufferevent* buf_ev;
buf_ev = bufferevent_new(fd, NULL, NULL, NULL, NULL);
buf_ev->wm_read.high = 4096;
//cout<<"event write"<<endl;
bufferevent_write(buf_ev, MESSAGE, strlen(MESSAGE));
//cq_push
char buf[1];
buf[0] = 'c';
int index = rand()%nthreads;
if (write(threads[index].notify_send_fd, buf, 1) != 1)
printf("write pipe failed ... ... \r\n");
}
static void *worker_libevent(void *arg)
{
LIBEVENT_THREAD *me = (LIBEVENT_THREAD *)arg;
event_base_loop(me->base, 0);
return NULL;
}
static void create_worker(void *(*func)(void *), void *arg)
{
pthread_t thread;
pthread_attr_t attr;
int ret;
pthread_attr_init(&attr);
if ((ret=pthread_create(&thread, &attr, func, arg)) != 0)
{
printf("pthread create failed \r\n");
exit(1);
}
}
static void thread_libevent_proc(int fd, short int, void *arg)
{
LIBEVENT_THREAD *me = (LIBEVENT_THREAD*)arg;
char buf[1];
memset(buf, 0, 1);
if (read(fd, buf, 1) != 1)
{
//cq_pop
printf("read from pipe failed... \r\n");
exit(1);
}
printf("New client come, %c thread:%d is working. \r\n", buf[0], me->index);
}
int main()
{
cout<<"Server start !"<<endl;
//start workthreads
threads = (LIBEVENT_THREAD *)calloc(nthreads, sizeof(LIBEVENT_THREAD));
//error check
for (int i=0; i<nthreads; ++i)
{
//thread data prepare
threads[i].index = i;
int fds[2];
if (pipe(fds) != 0){
printf("pipe failed !");
exit(1);
}
threads[i].notify_recv_fd = fds[0];
threads[i].notify_send_fd = fds[1];
threads[i].base = event_init();
LIBEVENT_THREAD *me = &threads[i];
event_set(&me->notify_event, me->notify_recv_fd, EV_READ|EV_PERSIST, thread_libevent_proc, me);
event_base_set(me->base, &me->notify_event);
if (event_add(&me->notify_event, 0) == -1){
printf("event_add failed \r\n");
exit(1);
}
create_worker(worker_libevent, &threads[i]);
}
main_base = event_init();
if(main_base)
cout<<"init event ok!"<<endl;
int sListen;
// Create listening socket
sListen = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
struct sockaddr_in server_addr;
bzero(&server_addr,sizeof(struct sockaddr_in));
server_addr.sin_family=AF_INET;
server_addr.sin_addr.s_addr=htonl(INADDR_ANY);
int portnumber = 8080;
server_addr.sin_port = htons(portnumber);
if(bind(sListen,(struct sockaddr *)(&server_addr),sizeof(struct sockaddr))==-1)
{
cout<<"error!"<<endl;
return -1;
}
// Listen
::listen(sListen, 3);
cout<<"Server is listening!\n"<<endl;
int flags = ::fcntl(sListen, F_GETFL);
flags |= O_NONBLOCK;
fcntl(sListen, F_SETFL, flags);
struct event ev;
event_set(&ev, sListen, EV_READ | EV_PERSIST, conn_new, (void *)&ev);
event_add(&ev, NULL);
event_base_loop(main_base, 0);
cout<<"over!"<<endl;
}
$ ./test
Server start !
init event ok!
Server is listening!
New client come, c thread:3 is working.
New client come, c thread:2 is working.
New client come, c thread:1 is working.
New client come, c thread:3 is working.
New client come, c thread:1 is working.
New client come, c thread:3 is working.
New client come, c thread:2 is working.
New client come, c thread:0 is working.
后续采用均衡算法,分发到比较空闲的work thread