mqtt学习笔记 epoll + 多线程实现并发网络连接处理

mosquito monitor idea
check process mosquitto with pidfile /var/run/mosquitto.pid
start = "/etc/init.d/mosquitto start"
stop = "/etc/init.d/mosquitto stop"
 

mosquito 是一个MQTT 服务器。目前只在公司移动互联网服务器上环境部署过,用的比较少,show 一下安装过程:


wget http://mosquitto.org/files/source/mosquitto-1.0.3.tar.gz
tar -zxvf mosquitto-1.0.3.tar.gz
cd mosquitto-1.0.3
make WITH_TLS_PSK=no  (遇到undefined reference to `SSL_CTX_set_psk_server_callback' ,添加参数WITH_TLS_PSK=no 禁用SSL PSK 支持)
make install prefix=/home/mosquitto
mkdir /home/mosquitto/etc
mv /etc/mosquitto/*  /home/mosquitto/etc/
strip /home/mosquitto/bin/*
strip /homo/mosquitto/sbin/*

strip /homo/mosquitto/lib/*


echo "/home/mosquitto/lib/" >> /etc/ld.so.conf
ldconfig -f /etc/ld.so.conf

修改 /homo/mosquitto/etc/mosquitto.conf 用户:
user nobody
(其它参数目前使用默认值)

启动服务(如有服务相关错误,检查/home/mosquitto/mosquitto.log,端口默认1883)

/home/mosquitto/sbin/mosquitto  -d -c /home/mosquitto/etc/mosquitto.conf > /home/mosquitto/mosquitto.log 2>&1



终端测试:
客户端
mosquitto_sub -h SERVERIP -t test
服务器端执行后
/home/mosquitto/bin/mosquitto_pub -t test -m "123456"
客户端会成功收到"123456"

查看Mqtt订阅者运行状况
mosquitto_sub -v -t $SYS/#
或者细化为其中一个命令
mosquitto_sub -v -t ‘$SYS/broker/clients/active’
Mosquitto pub/sub代码分析
优化:
对于后续的提高优化的地方,简单记录几点:
 
发送数据用writev
poll -> epoll ,用以支持更高的冰法;
改为单线程版本,降低锁开销,目前锁开销还是非常大的。目测可以改为单进程版本,类似redis,精心维护的话应该能达到不错的效果;
网络数据读写使用一次尽量多读的方式,避免多次进入系统调用;
内存操作优化。不free,留着下次用;
考虑使用spwan-fcgi的形式或者内置一次启动多个实例监听同一个端口。这样能更好的发挥机器性能,达到更高的性能;
 
 
(MaxProcessMemory - JVMMemory - ReservedOsMemory) / (ThreadStackSize) = Number of threads
MaxProcessMemory 指的是一个进程的最大内存
JVMMemory         JVM内存
ReservedOsMemory  保留的操作系统内存
ThreadStackSize      线程栈的大小

在java语言里, 当你创建一个线程的时候,虚拟机会在JVM内存创建一个Thread对象同时创建一个操作系统线程,而这个系统线程的内存用的不是JVMMemory,而是系统中剩下的内存(MaxProcessMemory - JVMMemory - ReservedOsMemory)。 
 
 
JAVA使用EPoll来进行NIO处理的方法
JDK 6.0 以及JDK 5.0 update 9 的 nio支持epoll (仅限 Linux 系统 ),对并发idle connection会有大幅度的性能提升,这就是很多网络服务器应用程序需要的。
 
启用的方法如下:
 
-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider
 
  • 单进程FD上限是最大可以打开文件的数目,
这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max

 

mosquitto -c /etc/mosquitto/mosquitto.conf -d”即可开启服务
 
压力测试: 系统参数修改
ulimit -u 12000 (max processes)
ulimit -n 12000 (max files)
 
测试最大连接数
 
#!/bin/bash
c=1
while [ $c -le 18000 ]
do
mosquitto_sub -d -t hello/world -k 900 &
 (( c++ ))
done
 
# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}' 
 
kill -9 `ps -ef|grep mosquitto|awk '{print $2}'` 
ps -ef|grep report |grep -v grep|awk '{print $2}' |xargs kill -9 
pkill -9 mosquitto
killall -9 mosquitto
ps H -eo user,pid,ppid,tid,time,%cpu,cmd --sort=%cpu

查看线程的I/O信息
/proc/15938/task/15942/fd
lsof -c mosquitto

[root@wwwu fd]# pstack 15938
Thread 5 (Thread 0x7f280e377700 (LWP 15939)):
#0  0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6
#1  0x000000000040c1d0 in mosquitto_main_loop_4_client ()
#2  0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3  0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 4 (Thread 0x7f280d976700 (LWP 15940)):
#0  0x000000000040a464 in mqtt3_db_message_timeout_check_epoll ()
#1  0x000000000040bc3b in mosquitto_main_loop_4_client_all ()
#2  0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3  0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 3 (Thread 0x7f280cf75700 (LWP 15941)):
#0  0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6
#1  0x000000000040c1d0 in mosquitto_main_loop_4_client ()
#2  0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3  0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 2 (Thread 0x7f280c574700 (LWP 15942)):
#0  0x000000000040a464 in mqtt3_db_message_timeout_check_epoll ()
#1  0x000000000040bc3b in mosquitto_main_loop_4_client_all ()
#2  0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3  0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 1 (Thread 0x7f280e59c7c0 (LWP 15938)):
#0  0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6
#1  0x000000000040c7c5 in mosquitto_main_loop_4_epoll ()
#2  0x000000000040414b in main ()
  • 实际测试
Test case 1
 
1 10934 max connections with QoS 0
[root@ ~]# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}' 
CLOSE_WAIT 1
ESTABLISHED 10934
LISTEN 12
 
脚本输出
Client mosqsub/13526-wwwu.mqtt sending CONNECT
Client mosqsub/13526-wwwu.mqtt received CONNACK
Client mosqsub/13526-wwwu.mqtt sending SUBSCRIBE (Mid: 1, Topic: hello/world, QoS: 0)
Client mosqsub/13526-wwwu.mqtt received SUBACK
Subscribed (mid: 1): 0
Client mosqsub/13523-wwwu.mqtt sending CONNECT
Client mosqsub/13523-wwwu.mqtt received CONNACK
Client mosqsub/13523-wwwu.mqtt sending SUBSCRIBE (Mid: 1, Topic: hello/world, QoS: 0)
Client mosqsub/13523-wwwu.mqtt received SUBACK
Subscribed (mid: 1): 0
 
 
Test Case 2:
该测试只是验证能打开的最大连接数,意义不太大
max files:
 ulimit -n 15000
 
[root@wwwu test]# netstat -na|grep ESTAB|grep 1883|wc -l
15088
 
cpu 2 core 只使用100%(total 200%),目前没有publish数据,所以memory使用量很少3%,
还可以继续提升
vmstat 
 
[root@wwwu test]# vmstat 3
procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu-----
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
 1  0  24080 1865596 233880 266228    1    3     2     4    2   11  0  0 99  0  0
 1  0  24080 1865588 233880 266228    0    0     0     0 1030   30 17 33 50  0  0
 
[root@wwwu test]# top
 
top - 03:17:46 up 13 days, 10 min,  4 users,  load average: 1.09, 0.83, 0.37
Tasks: 164 total,   2 running, 162 sleeping,   0 stopped,   0 zombie
Cpu0  : 31.2%us, 68.8%sy,  0.0%ni,  0.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu1  :  0.0%us,  0.3%sy,  0.0%ni, 99.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Mem:   2957536k total,  1091948k used,  1865588k free,   233880k buffers
Swap:  3096568k total,    24080k used,  3072488k free,   266228k cached
 
  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                                
 6709 root      20   0  150m 113m  784 R 100.0  3.9  15:16.63 mosquitto   
 
发现问题
 

no route to host;

telnet: Unable to connect to remote host: No route to host

觉得甚是差异,估计是虚拟机装了有问题,就把虚拟机中的防火墙给清了一下,发现可行。

[zhoulei@localhost ~]$ sudo iptables -F

 

mosquitto无法启动

发现netstat -an|grep 1883 没有发现记录,发现mosquitto.db文件很大445M,把该文件删除后,

可以正常启动

 

2014/05/07 结果

------------------------------------------------------------------------------------------------

【subscriber】

 

开6000个subscribe active 连接,脚本如下

java  -Xms150M -Xmx350M -Djava.ext.dirs=./ -cp /usr/Paul  MQTT31PerfHarness -su -nt 6000 -ss 10 -sc BasicStats -rl 0 -wp true -wc 50 -wi 60 -wt 90 -id 1 -qos 2 -ka 600 -cs false -tc mqtt.Subscriber -d TOPICSINGEL -db 1 -dx 20000 -dn 1 -iu tcp://9.119.154.107:1883


kswapd0 进程跳来跳去


top - 13:16:13 up 31 min,  4 users,  load average: 617.83, 187.04, 65.29

Tasks: 161 total,   1 running, 160 sleeping,   0 stopped,   0 zombie

Cpu(s):  2.9%us, 58.7%sy,  0.0%ni,  0.0%id, 34.5%wa,  0.0%hi,  3.9%si,  0.0%st

Mem:   2957536k total,  2899268k used,    58268k free,      984k buffers

Swap:  3096568k total,   805564k used,  2291004k free,    14656k cached


  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                    2538 root      20   0 25.2g 1.7g 4420 S 122.1 60.3   5:27.12 java    


[root@wwwu ~]# free -m

             total       used       free     shared    buffers     cached

 

Mem:          2888       2810         77          0          0         15

 

id=1,rate=996.20,threads=6000

id=1,rate=1007.30,threads=6000

之后司机

publish 只有700M可用内存

[root@oc8050533176 Local]# top


top - 17:13:51 up  3:44,  2 users,  load average: 0.00, 0.00, 0.05

Tasks: 124 total,   1 running, 123 sleeping,   0 stopped,   0 zombie

Cpu(s): 14.7%us, 15.2%sy,  0.0%ni, 64.6%id,  0.0%wa,  0.0%hi,  5.4%si,  0.0%st

Mem:   2011348k total,  1736976k used,   274372k free,    23976k buffers

Swap:  8388600k total,        0k used,  8388600k free,   803732k cached


  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                     

 1946 root      20   0 3362m 591m 7292 S 66.7 30.1   3:22.69 java    

最多2500个publish active 连接, throughput 3242 msgs/second, 

id=5,rate=3084.80,threads=2000

id=5,rate=2324.60,threads=2000

id=5,rate=3176.30,threads=2000

id=5,rate=3091.40,threads=2000


[mosquitto]

connection 后的top

[root@wwwu mosquitto]# top -b

top - 05:50:55 up  2:00,  4 users,  load average: 1.00, 0.81, 0.74

Tasks: 161 total,   2 running, 159 sleeping,   0 stopped,   0 zombie

Cpu(s): 31.9%us,  7.4%sy,  0.0%ni, 54.8%id,  3.2%wa,  0.0%hi,  2.6%si,  0.0%st

Mem:   2957536k total,  2518020k used,   439516k free,    31976k buffers

Swap:  3096568k total,        0k used,  3096568k free,   209064k cached


  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                     

 2509 root      20   0 1264m 1.2g  740 R 77.9 42.5  93:35.86 mosquitto

 

 [root@wwwu ~]# netstat -an|grep ESTABLISH|grep 1883|wc -l

8000


遇到问题

[root@wwwu ~]# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}' 

CLOSE_WAIT 1

FIN_WAIT1 298

ESTABLISHED 2372

 

LISTEN 12


tcp        0  21848 9.119.154.107:1883          9.119.154.160:55851(sub)         FIN_WAIT1   

tcp        0  37393 9.119.154.107:1883          9.119.154.160:57275         FIN_WAIT1   

tcp        0      0 9.119.154.107:1883          9.119.154.160:56913         FIN_WAIT2   

 

tcp        0  40545 9.119.154.107:1883          9.119.154.160:55864         FIN_WAIT1   


netstat显示的连接状态有几种WAIT: FIN_WAIT_1,FIN_WAIT_2,CLOSE_WAIT和TIME_WAIT. 他们的含义要从TCP的连接中断过程说起


Server              Client

  -------- FIN -------->

  <------- ACK ---------

  <------- FIN ---------

  -------- ACK -------->

假设服务器主动关闭连接(Active Close)


服务器首先向客户机发送FIN包,然后服务器进入FIN_WAIT_1状态。

客户机向服务器确认FIN包收到,向服务器发送FIN/ACK,客户机进入CLOSE_WAIT状态。

服务器收到来自客户机的FIN/ACK后,进入FIN_WAIT_2状态

现在客户机进入被动关闭(“passive close”)状态,客户机操作系统等待他上面的应用程序关闭连接。一旦连接被关闭,客户端会发送FIN包到服务器

当服务器收到FIN包后,服务器会向客户机发送FIN/ACK确认,然后进入著名的TIME_WAIT状态

 

由于在连接关闭后,还不能确定所有连接关闭前的包都被服务器接受到了(包的接受是没有先后顺序的),因此有了TIME_WAIT状态。在这个状态中,服务器仍然在等待客户机发送的但是还未到达服务器的包。这个状态将保持2*MSL的时间,这里的MSL指的是一个TCP包在网络中存在的最长时间。一般情况下2*MSL=240秒。

-------------------------------------------------------------------------------------


10000 sub connections 

mosquitto 2G 内存 32% CPU

Tasks: 161 total,   1 running, 160 sleeping,   0 stopped,   0 zombie

Cpu0  :  3.7%us, 11.6%sy,  0.0%ni, 84.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st

Cpu1  :  4.3%us, 13.6%sy,  0.0%ni, 82.1%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st

Mem:   2957536k total,  2202836k used,   754700k free,    34916k buffers

Swap:  3096568k total,        0k used,  3096568k free,    48164k cached


  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                                

 

 2509 root      20   0 1264m 1.2g  740 S 32.6 42.5 148:35.98 mosquitto   



-----------------boss worker multithread model---------------------------

int queue[QUEUE_SIZE];

This is the main thread. It creates a queue struct (defined elsewhere) with methods enqueue,

 dequeue, empty, etc. When the server accepts a connection,

 it enqueues the socket that the incoming connection is on. 

 The worker threads which were dispatched at the beginning are constantly checking this queue to

 see if any jobs have been added, and if there are jobs, then they dequeue the socket, connect to that port, 

 and read/parse/write the incoming http request.


int main(int argc, char* argv[])

{

int hSocket, hServerSocket;  

struct hostent* pHostInfo;  

struct sockaddr_in Address;

int nAddressSize = sizeof(struct sockaddr_in);

int nHostPort;

int numThreads;

int i;


init(&head,&tail);


/

hServerSocket=socket(AF_INET,SOCK_STREAM,0);


if(hServerSocket == SOCKET_ERROR)

{

    printf(" Could not make a socket ");

    return 0;

}


 

Address.sin_addr.s_addr = INADDR_ANY;

Address.sin_port = htons(nHostPort);

Address.sin_family = AF_INET;


printf(" Binding to port %d ",nHostPort);


 

if(bind(hServerSocket,(struct sockaddr*)&Address,sizeof(Address)) == SOCKET_ERROR) {

    printf(" Could not connect to host ");

    return 0;

}

 

getsockname(hServerSocket, (struct sockaddr *) &Address,(socklen_t *)&nAddressSize);


printf("Opened socket as fd (%d) on port (%d) for stream i/o ",hServerSocket, ntohs(Address.sin_port));


printf("Server

      sin_family        = %d

      sin_addr.s_addr   = %d

      sin_port          = %d "

      , Address.sin_family

      , Address.sin_addr.s_addr

      , ntohs(Address.sin_port)

    );

//Up to this point is boring server set up stuff. I need help below this.

/

if(listen(hServerSocket,QUEUE_SIZE) == SOCKET_ERROR) {

    printf(" Could not listen ");

    return 0;

}


while(1) {


    pthread_mutex_lock(&mtx);

    printf(" Waiting for a connection");


    while(!empty(head,tail)) {

        pthread_cond_wait (&cond2, &mtx);

    }


   

    hSocket = accept(hServerSocket,(struct sockaddr*)&Address,(socklen_t *)&nAddressSize);


    printf(" Got a connection");


    enqueue(queue,&tail,hSocket);


    pthread_mutex_unlock(&mtx);

    pthread_cond_signal(&cond);     // wake worker thread

}

}



void *worker(void *threadarg) {


while(true)

{


pthread_mutex_lock(&mtx);


while(empty(head,tail)) {

    pthread_cond_wait(&cond, &mtx);

}

int hSocket = dequeue(queue,&head);


unsigned nSendAmount, nRecvAmount;

char line[BUFFER_SIZE];


nRecvAmount = read(hSocket,line,sizeof line);

printf(" Received %s from client ",line);



/

if(close(hSocket) == SOCKET_ERROR) {

    printf(" Could not close socket ");

    return 0;

}



pthread_mutex_unlock(&mtx);

pthread_cond_signal(&cond);


 

}


  • epoll与select、poll区别

1、相比于select与poll,epoll最大的好处在于它不会随着监听fd数目的增长而降低效率。内核中的select与poll的实现是采用轮询来处理的,轮询的fd数目越多,自然耗时越多。 
2、epoll的实现是基于回调的,如果fd有期望的事件发生就通过回调函数将其加入epoll就绪队列中,也就是说它只关心“活跃”的fd,与fd数目无关。 
3、内核 / 用户空间 内存拷贝问题,如何让内核把 fd消息通知给用户空间呢?在这个问题上select/poll采取了内存拷贝方法。而epoll采用了共享内存的方式。 
4、epoll不仅会告诉应用程序有I/0 事件到来,还会告诉应用程序相关的信息,这些信息是应用程序填充的,因此根据这些信息应用程序就能直接定位到事件,而不必遍历整个fd集合。

epoll 的EPOLLLT (水平触发,默认)和 EPOLLET(边沿触发)模式的区别

1、EPOLLLT:完全靠kernel epoll驱动,应用程序只需要处理从epoll_wait返回的fds,这些fds我们认为它们处于就绪状态。此时epoll可以认为是更快速的poll。

2、EPOLLET:此模式下,系统仅仅通知应用程序哪些fds变成了就绪状态,一旦fd变成就绪状态,epoll将不再关注这个fd的任何状态信息,(从epoll队列移除)直到应用程序通过读写操作(非阻塞)触发EAGAIN状态,epoll认为这个fd又变为空闲状态,那么epoll又重新关注这个fd的状态变化(重新加入epoll队列)。随着epoll_wait的返回,队列中的fds是在减少的,所以在大并发的系统中,EPOLLET更有优势,但是对程序员的要求也更高,因为有可能会出现数据读取不完整的问题,举例如下:

假设现在对方发送了2k的数据,而我们先读取了1k,然后这时调用了epoll_wait,如果是边沿触发,那么这个fd变成就绪状态就会从epoll 队列移除,很可能epoll_wait 会一直阻塞,忽略尚未读取的1k数据,与此同时对方还在等待着我们发送一个回复ack,表示已经接收到数据;如果是电平触发,那么epoll_wait 还会检测到可读事件而返回,我们可以继续读取剩下的1k 数据。

 
  • man epoll example
 
           #define MAX_EVENTS 10
           struct epoll_event ev, events[MAX_EVENTS];
           int listen_sock, conn_sock, nfds, epollfd;
 
           
 
           epollfd = epoll_create(10);
           if (epollfd == -1) {
               perror("epoll_create");
               exit(EXIT_FAILURE);
           }
 
           ev.events = EPOLLIN;
           ev.data.fd = listen_sock;
           if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
               perror("epoll_ctl: listen_sock");
               exit(EXIT_FAILURE);
           }
 
           for (;;) {
               nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
               if (nfds == -1) {
                   perror("epoll_pwait");
                   exit(EXIT_FAILURE);
               }
 
               for (n = 0; n < nfds; ++n) {
                   if (events[n].data.fd == listen_sock) {
                       conn_sock = accept(listen_sock,
                                       (struct sockaddr *) &local, &addrlen);
                       if (conn_sock == -1) {
                           perror("accept");
                           exit(EXIT_FAILURE);
                       }
                       setnonblocking(conn_sock);
                       ev.events = EPOLLIN | EPOLLET;
                       ev.data.fd = conn_sock;
                       if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
                                   &ev) == -1) {
                           perror("epoll_ctl: conn_sock");
                           exit(EXIT_FAILURE);
                       }
                   } else {
                       do_use_fd(events[n].data.fd);
                   }
               }
           }