trinitycore 魔兽服务器源码分析(一) 网络

trinitycore 魔兽服务器源码分析(一) 网络

trinitycore是游戏服务器的开源代码 许多玩家使用魔兽的数据来进行测试 ,使用它来假设魔兽私服。

官方网址  https://www.trinitycore.org/

类似的还有mangos 和 kbengine 不过mangos使用庞大的ACE网络框架

kbengine使用自写网络库  两者均使用了多语言进行开发

作为trinitycore 主要使用c++。代码比较好读,就开启本篇这个代码阅读的坑

代码要求具备c++11的function shared_ptr 指针的相关知识

以及了解阅读过boost asio网络库的文档和示例代码的相关知识

先从网络看起

大致看了下 trinitycore使用boost asio作为网络库

大致就是这么几个代码

trinitycore 魔兽服务器源码分析(一) 网络

class AsyncAcceptor 顾名思义 是异步accept

构造函数 简单的初始化类中变量 无他

AsyncAcceptor(boost::asio::io_service& ioService, std::string const& bindIp, uint16 port) :
_acceptor(ioService), _endpoint(boost::asio::ip::address::from_string(bindIp), port),
_socket(ioService), _closed(false), _socketFactory(std::bind(&AsyncAcceptor::DefeaultSocketFactory, this))
{
}

  void AsyncAcceptWithCallback() 函数

1 使用_socketFactory产生socket指针复制给类变量

tcp::socket* socket;
std::tie(socket, threadIndex) = _socketFactory();

2 将初始化时传入的函数指针AcceptCallback在ACCEPT时候调用

 typedef void(*AcceptCallback)(tcp::socket&& newSocket, uint32 threadIndex);

 acceptCallback(std::move(*socket), threadIndex);

并再次进入 AsyncAcceptWithCallback函数等待下次accept;

if (!_closed)
this->AsyncAcceptWithCallback<acceptCallback>();

bool Bind()函数将类中accpter与指定的协议绑定bind 并进行监听listen

template<class T>
void AsyncAcceptor::AsyncAccept()函数

不设置回调函数 而是将accept的socket转化为 类型T的指针 并调用T->start();

并且再次进入AsyncAccept()等待下次accept

  1 /*
  2  * Copyright (C) 2008-2017 TrinityCore <http://www.trinitycore.org/>
  3  *
  4  * This program is free software; you can redistribute it and/or modify it
  5  * under the terms of the GNU General Public License as published by the
  6  * Free Software Foundation; either version 2 of the License, or (at your
  7  * option) any later version.
  8  *
  9  * This program is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
 12  * more details.
 13  *
 14  * You should have received a copy of the GNU General Public License along
 15  * with this program. If not, see <http://www.gnu.org/licenses/>.
 16  */
 17 
 18 #ifndef __ASYNCACCEPT_H_
 19 #define __ASYNCACCEPT_H_
 20 
 21 #include "Log.h"
 22 #include <boost/asio/ip/tcp.hpp>
 23 #include <boost/asio/ip/address.hpp>
 24 #include <functional>
 25 #include <atomic>
 26 
 27 using boost::asio::ip::tcp;
 28 
 29 class AsyncAcceptor
 30 {
 31 public:
 32     typedef void(*AcceptCallback)(tcp::socket&& newSocket, uint32 threadIndex);
 33 
 34     AsyncAcceptor(boost::asio::io_service& ioService, std::string const& bindIp, uint16 port) :
 35         _acceptor(ioService), _endpoint(boost::asio::ip::address::from_string(bindIp), port),
 36         _socket(ioService), _closed(false), _socketFactory(std::bind(&AsyncAcceptor::DefeaultSocketFactory, this))
 37     {
 38     }
 39 
 40     template<class T>
 41     void AsyncAccept();
 42 
 43     template<AcceptCallback acceptCallback>
 44     void AsyncAcceptWithCallback()
 45     {
 46         tcp::socket* socket;
 47         uint32 threadIndex;
 48         std::tie(socket, threadIndex) = _socketFactory();
 49         _acceptor.async_accept(*socket, [this, socket, threadIndex](boost::system::error_code error)
 50         {
 51             if (!error)
 52             {
 53                 try
 54                 {
 55                     socket->non_blocking(true);
 56 
 57                     acceptCallback(std::move(*socket), threadIndex);
 58                 }
 59                 catch (boost::system::system_error const& err)
 60                 {
 61                     TC_LOG_INFO("network", "Failed to initialize client's socket %s", err.what());
 62                 }
 63             }
 64 
 65             if (!_closed)
 66                 this->AsyncAcceptWithCallback<acceptCallback>();
 67         });
 68     }
 69 
 70     bool Bind()
 71     {
 72         boost::system::error_code errorCode;
 73         _acceptor.open(_endpoint.protocol(), errorCode);
 74         if (errorCode)
 75         {
 76             TC_LOG_INFO("network", "Failed to open acceptor %s", errorCode.message().c_str());
 77             return false;
 78         }
 79 
 80         _acceptor.bind(_endpoint, errorCode);
 81         if (errorCode)
 82         {
 83             TC_LOG_INFO("network", "Could not bind to %s:%u %s", _endpoint.address().to_string().c_str(), _endpoint.port(), errorCode.message().c_str());
 84             return false;
 85         }
 86 
 87         _acceptor.listen(boost::asio::socket_base::max_connections, errorCode);
 88         if (errorCode)
 89         {
 90             TC_LOG_INFO("network", "Failed to start listening on %s:%u %s", _endpoint.address().to_string().c_str(), _endpoint.port(), errorCode.message().c_str());
 91             return false;
 92         }
 93 
 94         return true;
 95     }
 96 
 97     void Close()
 98     {
 99         if (_closed.exchange(true))
100             return;
101 
102         boost::system::error_code err;
103         _acceptor.close(err);
104     }
105 
106     void SetSocketFactory(std::function<std::pair<tcp::socket*, uint32>()> func) { _socketFactory = func; }
107 
108 private:
109     std::pair<tcp::socket*, uint32> DefeaultSocketFactory() { return std::make_pair(&_socket, 0); }
110 
111     tcp::acceptor _acceptor;
112     tcp::endpoint _endpoint;
113     tcp::socket _socket;
114     std::atomic<bool> _closed;
115     std::function<std::pair<tcp::socket*, uint32>()> _socketFactory;
116 };
117 
118 template<class T>
119 void AsyncAcceptor::AsyncAccept()
120 {
121     _acceptor.async_accept(_socket, [this](boost::system::error_code error)
122     {
123         if (!error)
124         {
125             try
126             {
127                 // this-> is required here to fix an segmentation fault in gcc 4.7.2 - reason is lambdas in a templated class
128                 std::make_shared<T>(std::move(this->_socket))->Start();
129             }
130             catch (boost::system::system_error const& err)
131             {
132                 TC_LOG_INFO("network", "Failed to retrieve client's remote address %s", err.what());
133             }
134         }
135 
136         // lets slap some more this-> on this so we can fix this bug with gcc 4.7.2 throwing internals in yo face
137         if (!_closed)
138             this->AsyncAccept<T>();
139     });
140 }
141 
142 #endif /* __ASYNCACCEPT_H_ */
View Code

//============================================================================

template<class SocketType>
class NetworkThread

类使用多线程执行socket的管理

那么类的成员变量及作用如后


typedef std::vector<std::shared_ptr<SocketType>> SocketContainer;  //socket容器进行socket指针的存储管理

std::atomic<int32> _connections;    //原子计数 多线程下记录连接数目
std::atomic<bool> _stopped;   //原子bool型flag 标记此线程是否停止

std::thread* _thread;      //线程指针

SocketContainer _sockets;   //socket容器进行socket指针的存储管理

std::mutex _newSocketsLock;  //多线程互斥变量
SocketContainer _newSockets;  //另一个socket容器

//boost 设置常规变量

boost::asio::io_service _io_service;
tcp::socket _acceptSocket;
boost::asio::deadline_timer _updateTimer;

从  void Run()函数入手

 void Run()的功能如下

定时异步执行  Update函数 ,运行_io_service

_updateTimer.expires_from_now(boost::posix_time::milliseconds(10));
_updateTimer.async_wait(std::bind(&NetworkThread<SocketType>::Update, this));
_io_service.run();

void Update()函数

在加锁情况下 将_newSockets容器socket指针添加到_sockets ==》  AddNewSockets();

并移除那些update失败的socket==》_sockets.erase(。。。。。。

再次定时异步调用update

函数作用应该是定时清除无效socket 具体效果还需要在后继代码中看看 template<typename SocketType> 中SocketType的update函数的具体作用

整个类代码如下

  1 /*
  2  * Copyright (C) 2008-2017 TrinityCore <http://www.trinitycore.org/>
  3  *
  4  * This program is free software; you can redistribute it and/or modify it
  5  * under the terms of the GNU General Public License as published by the
  6  * Free Software Foundation; either version 2 of the License, or (at your
  7  * option) any later version.
  8  *
  9  * This program is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
 12  * more details.
 13  *
 14  * You should have received a copy of the GNU General Public License along
 15  * with this program. If not, see <http://www.gnu.org/licenses/>.
 16  */
 17 
 18 #ifndef NetworkThread_h__
 19 #define NetworkThread_h__
 20 
 21 #include "Define.h"
 22 #include "Errors.h"
 23 #include "Log.h"
 24 #include "Timer.h"
 25 #include <boost/asio/ip/tcp.hpp>
 26 #include <boost/asio/deadline_timer.hpp>
 27 #include <atomic>
 28 #include <chrono>
 29 #include <memory>
 30 #include <mutex>
 31 #include <set>
 32 #include <thread>
 33 
 34 using boost::asio::ip::tcp;
 35 
 36 template<class SocketType>
 37 class NetworkThread
 38 {
 39 public:
 40     NetworkThread() : _connections(0), _stopped(false), _thread(nullptr),
 41         _acceptSocket(_io_service), _updateTimer(_io_service)
 42     {
 43     }
 44 
 45     virtual ~NetworkThread()
 46     {
 47         Stop();
 48         if (_thread)
 49         {
 50             Wait();
 51             delete _thread;
 52         }
 53     }
 54 
 55     void Stop()
 56     {
 57         _stopped = true;
 58         _io_service.stop();
 59     }
 60 
 61     bool Start()
 62     {
 63         if (_thread)
 64             return false;
 65 
 66         _thread = new std::thread(&NetworkThread::Run, this);
 67         return true;
 68     }
 69 
 70     void Wait()
 71     {
 72         ASSERT(_thread);
 73 
 74         _thread->join();
 75         delete _thread;
 76         _thread = nullptr;
 77     }
 78 
 79     int32 GetConnectionCount() const
 80     {
 81         return _connections;
 82     }
 83 
 84     virtual void AddSocket(std::shared_ptr<SocketType> sock)
 85     {
 86         std::lock_guard<std::mutex> lock(_newSocketsLock);
 87 
 88         ++_connections;
 89         _newSockets.push_back(sock);
 90         SocketAdded(sock);
 91     }
 92 
 93     tcp::socket* GetSocketForAccept() { return &_acceptSocket; }
 94 
 95 protected:
 96     virtual void SocketAdded(std::shared_ptr<SocketType> /*sock*/) { }
 97     virtual void SocketRemoved(std::shared_ptr<SocketType> /*sock*/) { }
 98 
 99     void AddNewSockets()
100     {
101         std::lock_guard<std::mutex> lock(_newSocketsLock);
102 
103         if (_newSockets.empty())
104             return;
105 
106         for (std::shared_ptr<SocketType> sock : _newSockets)
107         {
108             if (!sock->IsOpen())
109             {
110                 SocketRemoved(sock);
111                 --_connections;
112             }
113             else
114                 _sockets.push_back(sock);
115         }
116 
117         _newSockets.clear();
118     }
119 
120     void Run()
121     {
122         TC_LOG_DEBUG("misc", "Network Thread Starting");
123 
124         _updateTimer.expires_from_now(boost::posix_time::milliseconds(10));
125         _updateTimer.async_wait(std::bind(&NetworkThread<SocketType>::Update, this));
126         _io_service.run();
127 
128         TC_LOG_DEBUG("misc", "Network Thread exits");
129         _newSockets.clear();
130         _sockets.clear();
131     }
132 
133     void Update()
134     {
135         if (_stopped)
136             return;
137 
138         _updateTimer.expires_from_now(boost::posix_time::milliseconds(10));
139         _updateTimer.async_wait(std::bind(&NetworkThread<SocketType>::Update, this));
140 
141         AddNewSockets();
142 
143         _sockets.erase(std::remove_if(_sockets.begin(), _sockets.end(), [this](std::shared_ptr<SocketType> sock)
144         {
145             if (!sock->Update())
146             {
147                 if (sock->IsOpen())
148                     sock->CloseSocket();
149 
150                 this->SocketRemoved(sock);
151 
152                 --this->_connections;
153                 return true;
154             }
155 
156             return false;
157         }), _sockets.end());
158     }
159 
160 private:
161     typedef std::vector<std::shared_ptr<SocketType>> SocketContainer;
162 
163     std::atomic<int32> _connections;
164     std::atomic<bool> _stopped;
165 
166     std::thread* _thread;
167 
168     SocketContainer _sockets;
169 
170     std::mutex _newSocketsLock;
171     SocketContainer _newSockets;
172 
173     boost::asio::io_service _io_service;
174     tcp::socket _acceptSocket;
175     boost::asio::deadline_timer _updateTimer;
176 };
177 
178 #endif // NetworkThread_h__
View Code