ACE_Select_Reactor 二 —— 服务器网关
ACE中的流包装提供面向连接的通信。流数据传输包装类包括ACE_SOCK_Stream和ACE_LSOCK_Stream,他们分别包装TCP/IP和UNIX域Socket协议数据传输功能。连接建立类包括针对TCP/IP的ACE_SOCK_Connector和ACE_SOCK_Acceptor,以及针对UNIX域Socket的ACE_LSOCK_Connector和ACE_LSOCK_Acceptor。
class Server { public: Server (int port): server_addr_(port),peer_acceptor_(server_addr_) { data_buf_= new char[SIZE_BUF]; } int handle_connection() { // Read data from client int byte_count=0; if( (byte_count=new_stream_.recv (data_buf_, SIZE_DATA, 0))==-1) ACE_ERROR ((LM_ERROR, "%p\n", "Error in recv")); else { data_buf_[byte_count]=0; ACE_DEBUG((LM_DEBUG,"Server received %s \n",data_buf_)); } // Close new endpoint if (new_stream_.close () == -1) ACE_ERROR ((LM_ERROR, "%p\n", "close")); return 0; } int accept_connections () { if (peer_acceptor_.get_local_addr (server_addr_) == -1) ACE_ERROR_RETURN ((LM_ERROR,"%p\n","Error in get_local_addr"),1); ACE_DEBUG ((LM_DEBUG,"Starting server at port %d\n", server_addr_.get_port_number ())); while(1) { ACE_Time_Value timeout (ACE_DEFAULT_TIMEOUT); if (peer_acceptor_.accept (new_stream_, &client_addr_, &timeout)== -1) { ACE_ERROR ((LM_ERROR, "%p\n", "accept")); continue; } else { ACE_DEBUG((LM_DEBUG, "Connection established with remote %s:%d\n", client_addr_.get_host_name(),client_addr_.get_port_number())); //Handle the connection handle_connection(); } } } private: char *data_buf_; ACE_INET_Addr server_addr_; ACE_INET_Addr client_addr_; ACE_SOCK_Acceptor peer_acceptor_; ACE_SOCK_Stream new_stream_; };
在上面的Server类中,创建了一个被动服务器,侦听到来的客户端连接,在连接建立之后,服务器接收来自客户端的数据,然后关闭链接。
Server类包含的accept_connection()方法使用接收器来将连接接受“进”ACE_SOCK_Stream new_stream_。该操作完成的基本流程是:调用接收器上的accept(),并将流作为参数传入其中。一旦连接已经建立进流中,流的包装方法send()和recv()就可以用来在新建立的链路上发送和接收数据,还有一个空的ACE_INET_Addr也被传入接收器的accept()方法,并在其中被设定为发起连接的远程机器地址。
在连接建立后,服务器调用handle_connection()方法,它开始从客户端那里收取一个预先知道的单词,然后将流关闭。连接关闭通过调用流上的close()方法来完成,该方法会释放所有的Socket资源并终止连接。
http://acme-ltt.iteye.com/blog/1455556中提到的ACE_Select_Reactor,在static ACE_THR_FUNC_RETURN controller (void *arg)函数中,调用上述的Server类,搭建基于ACE_Select_Reactor的Socket服务器网关。
客户端程序:
#include "ace/SOCK_Connector.h" #include "ace/INET_Addr.h" #include "ace/OS.h" #include "ace/Log_Msg.h" #include <stdlib.h> #include "text.h" #include "ace/Thread_Manager.h" #include <iostream> #define SIZE_BUF 128 static const ACE_Time_Value TIME_INTERVAL(0, 1000000); class Client { public: Client(char *hostname, int port):remote_addr_(port,hostname) { } int connect_to_server() { ACE_DEBUG ((LM_DEBUG, "(%P|%t) Starting connect to %s:%d\n", remote_addr_.get_host_name(),remote_addr_.get_port_number())); if (connector_.connect (client_stream_, remote_addr_) == -1) ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) %p\n","connection failed"),-1); else ACE_DEBUG ((LM_DEBUG,"(%P|%t) connected to %s\n", remote_addr_.get_host_name ())); return 0; } int send_to_server() { iovec iov[3]; iov[0].iov_base = (void *)"get"; iov[0].iov_len = 3; iov[1].iov_base = getdata()/* some data */; iov[1].iov_len = strlen(getdata()); iov[2].iov_base = (void *)"end."; iov[2].iov_len = 4; if (client_stream_.sendv_n (iov,3) == -1) { ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) %p\n","send_n"),0); //break; exit(0); } close(); } int close() { if (client_stream_.close () == -1) ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) %p\n","close"),-1); else return 0; } private: ACE_SOCK_Stream client_stream_; ACE_INET_Addr remote_addr_; ACE_SOCK_Connector connector_; char *data_buf_; }; int main (int argc, char *argv[]) { if(argc<3) { ACE_DEBUG((LM_DEBUG,”Usage %s <hostname> <port_number>\n”, argv[0])); ACE_OS::exit(1); } Client client(argv[1],ACE_OS::atoi(argv[2])); client.connect_to_server(); client.send_to_server(); }
客户端由单个Client类表示。Client含有connect_to_server()和send_to_server()方法。
connect_to_server()方法使用类型为ACE_SOCK_Connector的连接器来主动地建立连接。连接的设置通过调用连接器上的connect()方法来完成:传入的参数为想要连接的机器的远程地址,以及用于在其中建立连接的空ACE_SOCK_Stream 。远程机器在运行时参数中指定。一旦connect()方法成功返回,通过使用ACE_SOCK_Stream封装类中的send()和recv()方法族,流就可以用于在新建立的链路上发送和接收数据。
在该代码中,一旦连接建立好,send_to_server()方法就会被调用,将一个iovec类型的数组,用sendv_n()方法,发送到服务器上。