boost:asio 联接管理5
boost::asio 连接管理5
重新看一下优雅的退出,还是有些不完善。因为当收到退出信号的时候,直接关闭了io_service,而不是关闭所有连接再退出。其实应该按照顺序做三件事情:
1.拒绝新的TCP连接
2.关闭所有已有的连接
3.关闭io_service
1很简单,只需要调用一次acceptor_.close()即可。
2.每个Connection类应该提供一个CloseSocket方法,内部调用socket.close(). 同时需要一个地方集中管理所有的Connection对象。因此可以设计一个Connections类,里面包含了很多shared_ptr<Connection>。
3.最后再调用io_service的close方法。
现在代码进行重构,有点复杂。而且由于涉及到两个类互相指向对方,只好分开文件编写。
首先,修改一下Connection类,提供了一个CloseSocket方法,另外就是读取一个字符,如果是'a',则继续读取,如果读取的不是a,则关闭连接。
头文件:
#ifndef CONNECTION_H #define CONNECTION_H #include <vector> #include <boost/asio.hpp> #include <boost/enable_shared_from_this.hpp> using namespace boost; using namespace boost::asio; using ip::tcp; using namespace std; using boost::system::error_code; class Connections; class Connection: public boost::enable_shared_from_this<Connection> { public: Connection(io_service& s, Connections& cons); ~Connection(); void StartWork(); void CloseSocket(); void AfterReadChar(error_code const& ec); public: tcp::socket socket; private: vector<char> read_buffer_; Connections& cons_; }; #endif /* CONNECTION_H */
实现代码:
#include "Connection.h" #include <boost/bind.hpp> #include "Connections.h" Connection::Connection(io_service& s, Connections& cons) : socket(s), read_buffer_(1, 0), cons_(cons) { } Connection::~Connection() { cout << "~Connection" << endl; } void Connection::StartWork() { cout << "the new connection object is starting now." << endl; async_read(socket, buffer(read_buffer_), boost::bind(&Connection::AfterReadChar, shared_from_this(), _1)); } void Connection::CloseSocket() { cout << "closing the socket" << endl; socket.shutdown(tcp::socket::shutdown_both); socket.close(); } void Connection::AfterReadChar(error_code const& ec) { if (ec) { cout << ec.message() << endl; return; } char x = read_buffer_[0]; if (x == 'a') { cout << "correct data received" << endl; async_read(socket, buffer(read_buffer_), boost::bind(&Connection::AfterReadChar, shared_from_this(), _1)); } else { cout << "wrong data received, char is:" << (int) x << endl; CloseSocket(); cons_.Remove(shared_from_this()); } }
Connection内部保存了Connections的引用。在读取到错误字符时,会关闭socket,然后从connections中把自己删除。
现在看看Connections的代码,内部用set保存所有连接,因为我的目标是至少支持几千个连接。vector显然不合适,以tree为结构的set还能顶一顶。
头文件:
#ifndef CONNECTIONS_H #define CONNECTIONS_H #include <set> #include <algorithm> #include <boost/shared_array.hpp> using namespace std; using namespace boost; #include "Connection.h" class Connections { public: void Add(shared_ptr<Connection>& con); void Remove(shared_ptr<Connection> con); void CloseAll(); private: set<shared_ptr<Connection> > values_; };
实现代码:
#include "Connections.h" #include <boost/bind.hpp> void Connections::Add(shared_ptr<Connection>& con) { values_.insert(con); cout << "the number of connections is: " << values_.size() << endl; } void Connections::Remove(shared_ptr<Connection> con) { values_.erase(con); } void Connections::CloseAll() { for_each(values_.begin(), values_.end(), boost::bind(&Connection::CloseSocket, _1)); values_.clear(); }
现在看一下Server以及main函数的代码:
#include <cstdlib> #include <boost/asio.hpp> #include <boost/bind.hpp> #include <iostream> #include <vector> #include <algorithm> #include "Connection.h" #include "Connections.h" using namespace boost; using namespace boost::asio; using ip::tcp; using namespace std; using boost::system::error_code; class Server { public: Server(io_service & s, tcp::endpoint const& listen_endpoint) : io_(s), signals_(s), acceptor_(s, listen_endpoint) { signals_.add(SIGINT); signals_.add(SIGTERM); #if defined(SIGQUIT) signals_.add(SIGQUIT); #endif signals_.async_wait(boost::bind(&Server::Stop, this)); shared_ptr<Connection> c(new Connection(io_, cons_)); acceptor_.async_accept(c->socket, boost::bind(&Server::AfterAccept, this, c, _1)); } void Run() { io_.run(); } void AfterAccept(shared_ptr<Connection>& c, error_code const& ec) { // Check whether the server was stopped by a signal before this completion // handler had a chance to run. if (!acceptor_.is_open()) { return; } if (!ec) { cons_.Add(c); c->StartWork(); shared_ptr<Connection> c2(new Connection(io_, cons_)); acceptor_.async_accept(c2->socket, boost::bind(&Server::AfterAccept, this, c2, _1)); } } private: void Stop() { cout << "stopping" << endl; acceptor_.close(); cons_.CloseAll(); io_.stop(); } private: io_service& io_; boost::asio::signal_set signals_; tcp::acceptor acceptor_; Connections cons_; }; int main(int argc, char** argv) { io_service s; tcp::endpoint listen_endpoint(tcp::v4(), 8888); Server server(s, listen_endpoint); server.Run(); return 0; }
下一节将用一个newLISP程序测试,证明程序是可靠的。