trinitycore 魔兽服务器源码分析(三) 多线程相关

trinitycore 魔兽服务器源码分析(三) 多线程相关

先看LockedQueue.h

template <class T, typename StorageType = std::deque<T> >
class LockedQueue{......}

一个带锁的多线程可安全访问的类,容器默认使用std::deque

常规代码 push进T类型的元素 pop出T类型的元素

使用锁定 保证线程安全 

相比C++11之前的繁琐做法 现在加锁可以使用 std::lock_guard

当使用std::lock_guard<std::mutex> lock(_lock)后 代码进入加锁模式

脱出lock的变量生存周期时候 自动解锁

代码如下

  1 /*
  2  * Copyright (C) 2008-2017 TrinityCore <http://www.trinitycore.org/>
  3  * Copyright (C) 2005-2008 MaNGOS <http://getmangos.com/>
  4  *
  5  * This program is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License as published by the
  7  * Free Software Foundation; either version 2 of the License, or (at your
  8  * option) any later version.
  9  *
 10  * This program is distributed in the hope that it will be useful, but WITHOUT
 11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 12  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
 13  * more details.
 14  *
 15  * You should have received a copy of the GNU General Public License along
 16  * with this program. If not, see <http://www.gnu.org/licenses/>.
 17  */
 18 
 19 #ifndef LOCKEDQUEUE_H
 20 #define LOCKEDQUEUE_H
 21 
 22 #include <deque>
 23 #include <mutex>
 24 
 25 template <class T, typename StorageType = std::deque<T> >
 26 class LockedQueue
 27 {
 28     //! Lock access to the queue.
 29     std::mutex _lock;
 30 
 31     //! Storage backing the queue.
 32     StorageType _queue;
 33 
 34     //! Cancellation flag.
 35     volatile bool _canceled;
 36 
 37 public:
 38 
 39     //! Create a LockedQueue.
 40     LockedQueue()
 41         : _canceled(false)
 42     {
 43     }
 44 
 45     //! Destroy a LockedQueue.
 46     virtual ~LockedQueue()
 47     {
 48     }
 49 
 50     //! Adds an item to the queue.
 51     void add(const T& item)
 52     {
 53         lock();
 54 
 55         _queue.push_back(item);
 56 
 57         unlock();
 58     }
 59 
 60     //! Adds items back to front of the queue
 61     template<class Iterator>
 62     void readd(Iterator begin, Iterator end)
 63     {
 64         std::lock_guard<std::mutex> lock(_lock);
 65         _queue.insert(_queue.begin(), begin, end);
 66     }
 67 
 68     //! Gets the next result in the queue, if any.
 69     bool next(T& result)
 70     {
 71         std::lock_guard<std::mutex> lock(_lock);
 72 
 73         if (_queue.empty())
 74             return false;
 75 
 76         result = _queue.front();
 77         _queue.pop_front();
 78 
 79         return true;
 80     }
 81 
 82     template<class Checker>
 83     bool next(T& result, Checker& check)
 84     {
 85         std::lock_guard<std::mutex> lock(_lock);
 86 
 87         if (_queue.empty())
 88             return false;
 89 
 90         result = _queue.front();
 91         if (!check.Process(result))
 92             return false;
 93 
 94         _queue.pop_front();
 95         return true;
 96     }
 97 
 98     //! Peeks at the top of the queue. Check if the queue is empty before calling! Remember to unlock after use if autoUnlock == false.
 99     T& peek(bool autoUnlock = false)
100     {
101         lock();
102 
103         T& result = _queue.front();
104 
105         if (autoUnlock)
106             unlock();
107 
108         return result;
109     }
110 
111     //! Cancels the queue.
112     void cancel()
113     {
114         std::lock_guard<std::mutex> lock(_lock);
115 
116         _canceled = true;
117     }
118 
119     //! Checks if the queue is cancelled.
120     bool cancelled()
121     {
122         std::lock_guard<std::mutex> lock(_lock);
123         return _canceled;
124     }
125 
126     //! Locks the queue for access.
127     void lock()
128     {
129         this->_lock.lock();
130     }
131 
132     //! Unlocks the queue.
133     void unlock()
134     {
135         this->_lock.unlock();
136     }
137 
138     ///! Calls pop_front of the queue
139     void pop_front()
140     {
141         std::lock_guard<std::mutex> lock(_lock);
142         _queue.pop_front();
143     }
144 
145     ///! Checks if we're empty or not with locks held
146     bool empty()
147     {
148         std::lock_guard<std::mutex> lock(_lock);
149         return _queue.empty();
150     }
151 };
152 #endif
View Code

//=========================================================

MPSCQueue.h

这是一个操作原子类型的元素指针的链表队列

template<typename T>
class MPSCQueue
{
std::atomic<Node*> _head;
std::atomic<Node*> _tail;
}

Node就是一个链表的类型  当我们使用exchange交换Node的指针时 是多线程安全的

而std::memory_order_acq_rel 类似的原子操作的内存模式参数

可参考

http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue

https://www.zhihu.com/question/24301047

源码

 1 /*
 2  * Copyright (C) 2008-2017 TrinityCore <http://www.trinitycore.org/>
 3  *
 4  * This program is free software; you can redistribute it and/or modify it
 5  * under the terms of the GNU General Public License as published by the
 6  * Free Software Foundation; either version 2 of the License, or (at your
 7  * option) any later version.
 8  *
 9  * This program is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12  * more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program. If not, see <http://www.gnu.org/licenses/>.
16  */
17 
18 #ifndef MPSCQueue_h__
19 #define MPSCQueue_h__
20 
21 #include <atomic>
22 #include <utility>
23 
24 // C++ implementation of Dmitry Vyukov's lock free MPSC queue
25 // http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
26 template<typename T>
27 class MPSCQueue
28 {
29 public:
30     MPSCQueue() : _head(new Node()), _tail(_head.load(std::memory_order_relaxed))
31     {
32         Node* front = _head.load(std::memory_order_relaxed);
33         front->Next.store(nullptr, std::memory_order_relaxed);
34     }
35 
36     ~MPSCQueue()
37     {
38         T* output;
39         while (this->Dequeue(output))
40             ;
41 
42         Node* front = _head.load(std::memory_order_relaxed);
43         delete front;
44     }
45 
46     void Enqueue(T* input)
47     {
48         Node* node = new Node(input);
49         Node* prevHead = _head.exchange(node, std::memory_order_acq_rel);
50         prevHead->Next.store(node, std::memory_order_release);
51     }
52 
53     bool Dequeue(T*& result)
54     {
55         Node* tail = _tail.load(std::memory_order_relaxed);
56         Node* next = tail->Next.load(std::memory_order_acquire);
57         if (!next)
58             return false;
59 
60         result = next->Data;
61         _tail.store(next, std::memory_order_release);
62         delete tail;
63         return true;
64     }
65 
66 private:
67     struct Node
68     {
69         Node() = default;
70         explicit Node(T* data) : Data(data) { Next.store(nullptr, std::memory_order_relaxed); }
71 
72         T* Data;
73         std::atomic<Node*> Next;
74     };
75 
76     std::atomic<Node*> _head;
77     std::atomic<Node*> _tail;
78 
79     MPSCQueue(MPSCQueue const&) = delete;
80     MPSCQueue& operator=(MPSCQueue const&) = delete;
81 };
82 
83 #endif // MPSCQueue_h__
View Code

//=========================================================================

使用多线程安全的队列 设计出一个生产消费者队列

template <typename T>
class ProducerConsumerQueue
{

private:
std::mutex _queueLock;        //进行操作时候的锁
std::queue<T> _queue;        // 存储元素的队列
std::condition_variable _condition;   //条件变量
std::atomic<bool> _shutdown;     //原子类型的标记 标记此队列是否关闭

}

//构造函数

ProducerConsumerQueue<T>() : _shutdown(false) { }

// 加锁情况下  生产者push元素进队列 并通过条件变量提示消费者进行处理

void Push(const T& value)
{
std::lock_guard<std::mutex> lock(_queueLock);
_queue.push(std::move(value));

_condition.notify_one();
}

//加锁模式下 判断队列是否为空

bool Empty()
{
std::lock_guard<std::mutex> lock(_queueLock);

return _queue.empty();
}

//加锁模式下 队列不空且没有shuntdown关闭标记 进行元素弹出操作

bool Pop(T& value)
{
std::lock_guard<std::mutex> lock(_queueLock);

if (_queue.empty() || _shutdown)
return false;

value = _queue.front();

_queue.pop();

return true;
}

//消费者调用 加锁模式在未有元素的处理下进行等待元素

void WaitAndPop(T& value)
{
std::unique_lock<std::mutex> lock(_queueLock);

// we could be using .wait(lock, predicate) overload here but it is broken
// https://connect.microsoft.com/VisualStudio/feedback/details/1098841
while (_queue.empty() && !_shutdown)
_condition.wait(lock);

if (_queue.empty() || _shutdown)
return;

value = _queue.front();

_queue.pop();
}

void Cancel()
{
std::unique_lock<std::mutex> lock(_queueLock);

while (!_queue.empty())
{
T& value = _queue.front();

DeleteQueuedObject(value);

_queue.pop();
}

_shutdown = true;

_condition.notify_all();
}

//c++ 模板 template的小技巧 SFINAE (substitution-failure-is-not-an-error) 原则
//根据传入的模板参数是否是指针 而适配到不同的处理函数中 如果是指针类型则在删除时 delete指针
//否则适配到另个函数 删除时候什么都不做

template<typename E = T>
typename std::enable_if<std::is_pointer<E>::value>::type DeleteQueuedObject(E& obj) { delete obj; }

template<typename E = T>
typename std::enable_if<!std::is_pointer<E>::value>::type DeleteQueuedObject(E const& /*packet*/) { }

  1 /*
  2  * Copyright (C) 2008-2017 TrinityCore <http://www.trinitycore.org/>
  3  *
  4  * This program is free software; you can redistribute it and/or modify it
  5  * under the terms of the GNU General Public License as published by the
  6  * Free Software Foundation; either version 2 of the License, or (at your
  7  * option) any later version.
  8  *
  9  * This program is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
 12  * more details.
 13  *
 14  * You should have received a copy of the GNU General Public License along
 15  * with this program. If not, see <http://www.gnu.org/licenses/>.
 16  */
 17 
 18 #ifndef _PCQ_H
 19 #define _PCQ_H
 20 
 21 #include <condition_variable>
 22 #include <mutex>
 23 #include <queue>
 24 #include <atomic>
 25 #include <type_traits>
 26 
 27 template <typename T>
 28 class ProducerConsumerQueue
 29 {
 30 private:
 31     std::mutex _queueLock;
 32     std::queue<T> _queue;
 33     std::condition_variable _condition;
 34     std::atomic<bool> _shutdown;
 35 
 36 public:
 37 
 38     ProducerConsumerQueue<T>() : _shutdown(false) { }
 39 
 40     void Push(const T& value)
 41     {
 42         std::lock_guard<std::mutex> lock(_queueLock);
 43         _queue.push(std::move(value));
 44 
 45         _condition.notify_one();
 46     }
 47 
 48     bool Empty()
 49     {
 50         std::lock_guard<std::mutex> lock(_queueLock);
 51 
 52         return _queue.empty();
 53     }
 54 
 55     bool Pop(T& value)
 56     {
 57         std::lock_guard<std::mutex> lock(_queueLock);
 58 
 59         if (_queue.empty() || _shutdown)
 60             return false;
 61 
 62         value = _queue.front();
 63 
 64         _queue.pop();
 65 
 66         return true;
 67     }
 68 
 69     void WaitAndPop(T& value)
 70     {
 71         std::unique_lock<std::mutex> lock(_queueLock);
 72 
 73         // we could be using .wait(lock, predicate) overload here but it is broken
 74         // https://connect.microsoft.com/VisualStudio/feedback/details/1098841
 75         while (_queue.empty() && !_shutdown)
 76             _condition.wait(lock);
 77 
 78         if (_queue.empty() || _shutdown)
 79             return;
 80 
 81         value = _queue.front();
 82 
 83         _queue.pop();
 84     }
 85 
 86     void Cancel()
 87     {
 88         std::unique_lock<std::mutex> lock(_queueLock);
 89 
 90         while (!_queue.empty())
 91         {
 92             T& value = _queue.front();
 93 
 94             DeleteQueuedObject(value);
 95 
 96             _queue.pop();
 97         }
 98 
 99         _shutdown = true;
100 
101         _condition.notify_all();
102     }
103 
104 private:
105     template<typename E = T>
106     typename std::enable_if<std::is_pointer<E>::value>::type DeleteQueuedObject(E& obj) { delete obj; }
107 
108     template<typename E = T>
109     typename std::enable_if<!std::is_pointer<E>::value>::type DeleteQueuedObject(E const& /*packet*/) { }
110 };
111 
112 #endif
View Code

相关推荐