boost:asio 联接管理5

boost::asio 连接管理5

重新看一下优雅的退出,还是有些不完善。因为当收到退出信号的时候,直接关闭了io_service,而不是关闭所有连接再退出。其实应该按照顺序做三件事情:

1.拒绝新的TCP连接

2.关闭所有已有的连接

3.关闭io_service


1很简单,只需要调用一次acceptor_.close()即可。

2.每个Connection类应该提供一个CloseSocket方法,内部调用socket.close(). 同时需要一个地方集中管理所有的Connection对象。因此可以设计一个Connections类,里面包含了很多shared_ptr<Connection>。

3.最后再调用io_service的close方法。

现在代码进行重构,有点复杂。而且由于涉及到两个类互相指向对方,只好分开文件编写。


首先,修改一下Connection类,提供了一个CloseSocket方法,另外就是读取一个字符,如果是'a',则继续读取,如果读取的不是a,则关闭连接。

头文件:

#ifndef CONNECTION_H
#define	CONNECTION_H

#include <vector>
#include <boost/asio.hpp>
#include <boost/enable_shared_from_this.hpp>

using namespace boost;
using namespace boost::asio;
using ip::tcp;
using namespace std;
using boost::system::error_code;

class Connections;
       
class Connection: public boost::enable_shared_from_this<Connection> {
public:
    Connection(io_service& s, Connections& cons);
    
    ~Connection();
    
    void StartWork();
    
    void CloseSocket();
    
    void AfterReadChar(error_code const& ec);
    
public:
    tcp::socket socket;
    
private:
    vector<char> read_buffer_;
    Connections& cons_;
};


#endif	/* CONNECTION_H */

实现代码:

#include "Connection.h"
#include <boost/bind.hpp>
#include "Connections.h"

Connection::Connection(io_service& s, Connections& cons)
: socket(s), read_buffer_(1, 0), cons_(cons) {

}

Connection::~Connection() {
    cout << "~Connection" << endl;
}

void Connection::StartWork() {
    cout << "the new connection object is starting now." << endl;
    async_read(socket, buffer(read_buffer_),
            boost::bind(&Connection::AfterReadChar, shared_from_this(), _1));
}

void Connection::CloseSocket() {
    cout << "closing the socket" << endl;
    socket.shutdown(tcp::socket::shutdown_both);
    socket.close();
}

void Connection::AfterReadChar(error_code const& ec) {
    if (ec) {
        cout << ec.message() << endl;
        return;
    }

    char x = read_buffer_[0];
    if (x == 'a') {
        cout << "correct data received" << endl;
        async_read(socket, buffer(read_buffer_),
                boost::bind(&Connection::AfterReadChar, shared_from_this(), _1));
    } else {
        cout << "wrong data received, char is:" << (int) x << endl;
        CloseSocket();
        cons_.Remove(shared_from_this());
    }
}


Connection内部保存了Connections的引用。在读取到错误字符时,会关闭socket,然后从connections中把自己删除。

现在看看Connections的代码,内部用set保存所有连接,因为我的目标是至少支持几千个连接。vector显然不合适,以tree为结构的set还能顶一顶。

头文件:

#ifndef CONNECTIONS_H
#define	CONNECTIONS_H

#include <set>
#include <algorithm>
#include <boost/shared_array.hpp>

using namespace std;
using namespace boost;

#include "Connection.h"

class Connections {
public:
    void Add(shared_ptr<Connection>& con);
    
    void Remove(shared_ptr<Connection> con);
    
    void CloseAll();
    
private:
    set<shared_ptr<Connection> > values_;
};

实现代码:

#include "Connections.h"
#include <boost/bind.hpp>

void Connections::Add(shared_ptr<Connection>& con) {
    values_.insert(con);
    cout << "the number of connections is: " << values_.size() << endl;
}

void Connections::Remove(shared_ptr<Connection> con) {
    values_.erase(con);
}

void Connections::CloseAll() {
    for_each(values_.begin(), values_.end(), boost::bind(&Connection::CloseSocket, _1));
    values_.clear();
}


Connections提供了添加,删除和关闭所有连接的方法,Server对象将使用Add和CloseAll,Remove是Connection对象使用的。

现在看一下Server以及main函数的代码:

#include <cstdlib>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <iostream>
#include <vector>
#include <algorithm>
#include "Connection.h"
#include "Connections.h"

using namespace boost;
using namespace boost::asio;
using ip::tcp;
using namespace std;
using boost::system::error_code;

class Server {
public:

    Server(io_service & s, tcp::endpoint const& listen_endpoint)
    : io_(s), signals_(s), acceptor_(s, listen_endpoint) {
        signals_.add(SIGINT);
        signals_.add(SIGTERM);
#if defined(SIGQUIT)
        signals_.add(SIGQUIT);
#endif
        signals_.async_wait(boost::bind(&Server::Stop, this));
        shared_ptr<Connection> c(new Connection(io_, cons_));
        
        acceptor_.async_accept(c->socket,
                boost::bind(&Server::AfterAccept, this, c, _1));
    }

    void Run() {
        io_.run();
    }

    void AfterAccept(shared_ptr<Connection>& c, error_code const& ec) {
        // Check whether the server was stopped by a signal before this completion
        // handler had a chance to run.
        if (!acceptor_.is_open()) {
            return;
        }
        
        if (!ec) {
            cons_.Add(c);
            c->StartWork();
            shared_ptr<Connection> c2(new Connection(io_, cons_));
            acceptor_.async_accept(c2->socket,
                    boost::bind(&Server::AfterAccept, this, c2, _1));
        }
    }

private:

    void Stop() {
        cout << "stopping" << endl;
        acceptor_.close();
        cons_.CloseAll();
        io_.stop();
    }

private:
    io_service& io_;
    boost::asio::signal_set signals_;
    tcp::acceptor acceptor_;
    Connections cons_;
};

int main(int argc, char** argv) {
    io_service s;

    tcp::endpoint listen_endpoint(tcp::v4(), 8888);

    Server server(s, listen_endpoint);
    server.Run();
    return 0;
}


连接一旦接受,就会将新的连接加到connections中。 当接到关闭信号时,将关闭所有连接。


下一节将用一个newLISP程序测试,证明程序是可靠的。