线程池兑现接收消息,任务分发器按照CommandId分发给不同处理器处理。(之前不够完善,现在是修改、测试通过了)

线程池实现接收消息,任务分发器按照CommandId分发给不同处理器处理。(之前不够完善,现在是修改、测试通过了)

按照文件排序:

1. ThreadPoolBase.h

/********************************************************************
File name:     ThreadPoolBase.h
Author:   lieyingshengbao  
Version:        1.0 
Date:           2012-11-16
Description:    本文件提供线程池的基类,工程中由接收线程去继承
Others:     本类型使用C++定义 
Function List:  请参见正文
History:        修改历史记录列表,每条修改记录应包括修改日期、修改者及修改内容简述
1. Date:
Author:
Modification:
2. ...
*********************************************************************/
#ifndef _THREAD_POOL_BASE_H_
#define _THREAD_POOL_BASE_H_
#include "Public.h"
#include <afxmt.h>                           //CCriticalSection
#include <process.h>                       //_beginthreadex
#include <queue>
#include <vector>
using std::vector;
using std::queue;

template<typename T>
class CThreadPoolBase
{
public:
 CThreadPoolBase(int nNum):m_nWantedThreadNum(nNum),m_nCurrentThreadNum(0)
 {
  m_bTerminate = false;
  //m_bLock = true;
 }
 virtual ~CThreadPoolBase(){};

 //获取CCriticalSection
 CCriticalSection &GetCriticalSection()
 {
  return m_CriticalSection;
 }

 //向任务队列中添加任务
 void AddTask(T* pTask)
 {
  GetCriticalSection().Lock();
  m_TaskQueue.push(pTask);             //添加任务
  m_Event.SetEvent();                          //发信号
  GetCriticalSection().Unlock();
 }

 //创建线程池
 int Run()
 {
  HANDLE hHandle = NULL;
  while (m_nCurrentThreadNum < m_nWantedThreadNum)
  {
   hHandle = (HANDLE)_beginthreadex(NULL,
                    0,
                    CThreadPoolBase::ThreadProc,
                    (LPVOID)this,
                    0,
                    NULL);
   if (hHandle == NULL)
   {
    return Failed;
   }
   else
   {
    m_nCurrentThreadNum++;
    m_vecThreadHandle.push_back(hHandle);      //m_vecThreadHandle会有自己的存储空间
    //AfxMessageBox(_T("one create"));
   }
  }

  return Success;
 }

 //结束线程池
 void TerminateThreadPool()
 {
  m_bTerminate =true;             //结束标志置为true,然后等待线程池退出
  for (m_ItThreadHandle=m_vecThreadHandle.begin(); m_ItThreadHandle!=m_vecThreadHandle.end(); ++m_ItThreadHandle)
  {
   HANDLE hTempHandle = *m_ItThreadHandle;
   if (NULL != hTempHandle)
   {
    WaitForSingleObject(hTempHandle, INFINITE);
    CloseHandle(hTempHandle);
    hTempHandle = NULL;
    //AfxMessageBox(_T("one close"));
   }
  }

  m_nCurrentThreadNum = 0;
  m_vecThreadHandle.clear();
 }

protected:
 virtual void ProcessFun(T* pTask) = 0;                   //供派生类重写,具体处理任务过程
private:
 static unsigned __stdcall ThreadProc(LPVOID param)
 {
  CThreadPoolBase *pThis = (CThreadPoolBase*)param;
  while (!pThis->m_bTerminate)
  {
   T *pTask = NULL;
   pThis->GetCriticalSection().Lock();
   if (pThis->m_TaskQueue.empty())
   {
    pThis->GetCriticalSection().Unlock();
    if (WAIT_TIMEOUT == WaitForSingleObject(pThis->m_Event.m_hObject, 1))           //WAIT_TIMEOUT等待时间到了,但是没有收到信号
    {
     continue;
    }
   }
   else
   {
    pTask = pThis->m_TaskQueue.front();
    pThis->m_TaskQueue.pop();
    pThis->GetCriticalSection().Unlock();
    pThis->ProcessFun(pTask);                          //实际调用派生类的处理方法
    //Sleep(5000000);
   }
  }

  return Success;
 }
protected:
 bool m_bTerminate;                   //线程结束标志,方便终结线程池
private:
 queue<T*> m_TaskQueue;         //任务队列
 int m_nWantedThreadNum;       //期望的线程数
 int m_nCurrentThreadNum;
 CCriticalSection m_CriticalSection;
 CEvent m_Event;
 //bool m_bLock;                             //是否已加锁
 vector<HANDLE> m_vecThreadHandle;
 vector<HANDLE>::iterator m_ItThreadHandle;
};
#endif

 

2. Public.h

/********************************************************************
File name:     Public.h
Author:   lieyingshengbao  
Version:        1.0 
Date:           2012-11-16
Description:    本文件提供公共定义
Others:     本类型使用C++定义 
Function List:  请参见正文
History:        修改历史记录列表,每条修改记录应包括修改日期、修改者及修改内容简述
1. Date:
Author:
Modification:
2. ...
*********************************************************************/
#ifndef _PUBLIC_H_
#define _PUBLIC_H_

//可作为返回值:成功与失败
enum Symbol
{
 Success,
 Failed
};

const int MSGBLOCKLEN = 1024;          //CMsg传送的每块消息大小
const int MSGBODYLEN = 512;              //消息体长度

//上线消息的CommandId
const int ONLINEREQ = 0x00000001;
const int ONLINERSP = 0x80000001;

typedef struct tagCommonHead
{
 u_int uMsgLength;
 u_int uMsgCommandId;
}CommonHead_S;

typedef struct tagONLINEMsg
{
 CommonHead_S Common;
 BYTE cArry[MSGBLOCKLEN];
}ONLINEMsg_S;

//注意:typedef struct tag若没有分号,则会出现好多未知错误。


#endif

3. MsgQueue.h

/********************************************************************
File name:     MsgQueue.h
Author:   lieyingshengbao  
Version:        1.0 
Date:           2012-11-16
Description:    本文件提供定义整个工程唯一的消息队列,供接收和处理时公用
Others:     本类型使用C++定义 
Function List:  请参见正文
History:        修改历史记录列表,每条修改记录应包括修改日期、修改者及修改内容简述
1. Date:
Author:
Modification:
2. ...
*********************************************************************/
#ifndef MSG_QUEUE_H_
#define MSG_QUEUE_H_
#include "Public.h"
#include <afxmt.h>
#include <deque>
using std::deque;

class CMsg
{
public:
 SOCKET m_Sock;
 BYTE MsgBuffer[MSGBLOCKLEN];        //传输消息块大小
};

typedef deque<CMsg*> MSGQUEUE;       //定义别名

class CMsgManager
{
public:
 CMsgManager()
 {
  m_bLock = true;
 }
 //获取冲突域
 CCriticalSection *GetCriticalSection()
 {
  return &m_CriticalSection;
 }

 //加入消息队列
 void InsertMsg(CMsg *pMsg)
 {
  GetCriticalSection()->Lock();
  m_MsgQueue.push_back(pMsg);
  m_Event.SetEvent();
  GetCriticalSection()->Unlock();
 }

 //获取消息
 CMsg *GetMsg()
 {
  CMsg *pMsg = NULL;
  GetCriticalSection()->Lock();
  if (m_MsgQueue.empty())
  {
   m_bLock = false;
   GetCriticalSection()->Unlock();
   WaitForSingleObject(m_Event.m_hObject, 1);    //INFINITE
  }

  if (!m_bLock)
  {
   GetCriticalSection()->Lock();
  }
  
  if (!m_MsgQueue.empty())           //消息队列不空
  {
   pMsg = m_MsgQueue.front();
   m_MsgQueue.pop_front();
  }
  GetCriticalSection()->Unlock();
  return pMsg;
 }

private:
 MSGQUEUE m_MsgQueue;            //整个工程中唯一的消息队列
 CCriticalSection m_CriticalSection;
 CEvent m_Event;
 bool m_bLock;                              //是否锁定
};

#endif

4. RecvMsg.h

/********************************************************************
File name:     RecvMsg.h
Author:   lieyingshengbao  
Version:        1.0 
Date:           2012-11-16
Description:    本文件提供接收消息类,从ThreadPoolBase.h基类继承
Others:     本类型使用C++定义 
Function List:  请参见正文
History:        修改历史记录列表,每条修改记录应包括修改日期、修改者及修改内容简述
1. Date:
Author:
Modification:
2. ...
*********************************************************************/
#ifndef RECV_MSG_H_
#define RECV_MSG_H_
#include "MsgQueue.h"
#include "Public.h"
#include "ThreadPoolBase.h"
#include <WinSock2.h>

class CRecvMsg : public CThreadPoolBase<SOCKET>
{
public:
 CRecvMsg(int nThreadNum, CMsgManager *pMsgQueue):CThreadPoolBase(nThreadNum),m_pMsgQueue(pMsgQueue)
 {
  //nThreadNum为创建线程数目
 }

 //重写基类的虚方法,此处负责接收消息
 void ProcessFun(SOCKET* pSock) ;

 ////获取消息的CommandId
 //u_int GetCommandId(BYTE MsgBuffer[]);

 //获取消息长度
 u_int GetMsgLen(BYTE MsgBuffer[]);

 //将接收的消息加入工程中唯一的消息队列中
 void AddMsg(CMsg *pMsg);

private:
 CMsgManager *m_pMsgQueue;
};

#endif

 

5. RecvMsg.cpp

#include "stdafx.h"
#include "RecvMsg.h"

/*************************************************************************
* Function name      :  GetMsgLen
* description  :  获取接收到消息的头部中消息总长度
* input        :  无
* output     :  无
* return      :  返回消息长度
*************************************************************************/
u_int CRecvMsg::GetMsgLen(BYTE MsgBuffer[])
{
 CommonHead_S *pHead = (CommonHead_S*)MsgBuffer;
 u_int uMsgLen = ntohl(pHead->uMsgLength);                        //注意字节序的转换
 return uMsgLen;
}


/*************************************************************************
* Function name      :  ProcessFun
* description  :  重写基类中的方法,具体负责接收消息的过程
* input        :  无
* output     :  无
* return      :  Success, Failed
*************************************************************************/
void CRecvMsg::ProcessFun(SOCKET* pSock)
{
 fd_set set;
 timeval tmOut = {0, 200};
 BYTE *pBuffer = new BYTE[MSGBLOCKLEN];
 while (!m_bTerminate)
 {
  FD_ZERO(&set);
  FD_SET(*pSock, &set);
  int nRet = select(0, &set, NULL, NULL, &tmOut);
  if ( SOCKET_ERROR == nRet )
  {
   break;
  }
  if ( 0 == nRet )           //超时继续循环、方便检测外部终止线程的条件
  {
   continue;
  }

  int nRecvHeadLen = 0;
  int nRecvHeadTotalLen = 0;
  memset(pBuffer, 0, MSGBLOCKLEN);
  while (1)          //接收头消息
  {
   nRecvHeadLen = recv(*pSock, (char*)pBuffer + nRecvHeadTotalLen, sizeof(CommonHead_S) - nRecvHeadTotalLen, 0);
   if (nRecvHeadLen == SOCKET_ERROR || nRecvHeadLen == 0)
   {
    AfxMessageBox(_T("Client disconnect or error happen when recv Msg Head"));
    return;
   }
   nRecvHeadTotalLen += nRecvHeadLen;
   if (nRecvHeadTotalLen < sizeof(CommonHead_S))
   {
    Sleep(200);
    continue;
   }
   if (nRecvHeadTotalLen >= sizeof(CommonHead_S))
   {
    break;
   }
  }

  u_int nMsgLen = GetMsgLen(pBuffer);
  if (nMsgLen > nRecvHeadTotalLen)            //继续接收Body体
  {
   int nRecvBodyLen = 0;
   int nRecvBodyTotalLen = 0;
   int nRemainLen = nMsgLen - nRecvHeadTotalLen;
   while (1)
   {
    nRecvBodyLen = recv(*pSock, (char*)pBuffer + nRecvHeadTotalLen + nRecvBodyTotalLen, nRemainLen - nRecvBodyTotalLen, 0);
    if (nRecvBodyLen == SOCKET_ERROR || nRecvBodyLen == 0)
    {
     AfxMessageBox(_T("Client disconnect or error happen when recv Msg Body"));
     return;
    }

    nRecvBodyTotalLen += nRecvBodyLen;
    if (nRecvBodyTotalLen < nRemainLen)
    {
     Sleep(200);
     continue;
    }
    if (nRecvBodyTotalLen >= nRemainLen)
    {
     break;
    }
   }

   //接收完消息
   CMsg *pMsg = new CMsg;
   pMsg->m_Sock = *pSock;
   memset(pMsg->MsgBuffer, 0, MSGBLOCKLEN);
   memcpy(pMsg->MsgBuffer, pBuffer, MSGBLOCKLEN);
   AddMsg(pMsg);                                                               //向工程中唯一的消息队列中添加消息
  }
  else
  {
   continue;
  }
 }

 if (pBuffer)
 {
  delete [] pBuffer;
  pBuffer = NULL;
 }

 AfxMessageBox(_T("Recv Cycle exit!"));           //跳出接收循环

 //关闭套接字,删掉资源
 closesocket(*pSock);
 if (pSock)
 {
  delete pSock;
  pSock = NULL;
 }

 return;
}


/*************************************************************************
* Function name      :  AddMsg
* description  :  向消息队列中添加消息
* input        :  无
* output     :  无
* return      :  无
*************************************************************************/
void CRecvMsg::AddMsg(CMsg *pMsg)
{
 m_pMsgQueue->InsertMsg(pMsg);

//  if (pMsg)                //此处绝对不能删除,在处理完消息时再删除
//  {
//   delete pMsg;
//   pMsg = NULL;
//  }

 return;
}

 

6. Accepter.h

/********************************************************************
File name:     Accepter.h
Author:   lieyingshengbao  
Version:        1.0 
Date:           2012-11-16
Description:    本文件提供启动监听线程,创建接收线程池,Accept后驱动接收线程池
Others:     本类型使用C++定义 
Function List:  请参见正文
History:        修改历史记录列表,每条修改记录应包括修改日期、修改者及修改内容简述
1. Date:
Author:
Modification:
2. ...
*********************************************************************/
#ifndef ACCEPTER_H_
#define ACCEPTER_H_

#include "RecvMsg.h"

class CAccepter
{
public:
 CAccepter(int nThreadNum, CMsgManager *pMsgQueue)
  :m_RecvMsg(nThreadNum, pMsgQueue),
  m_ListenPort(12345),
  m_bTerminate(false)
 {
  //nThreadNum为创建的线程数,pMsgQueue为工程中唯一的消息队列
 }

 //创建监听线程并在线程中创建接收线程池
 int Run();

 int InitSock();

private:
 static unsigned __stdcall ProcFun(LPVOID param);
public:
 bool m_bTerminate;             //线程终止
private:
 CRecvMsg m_RecvMsg;
 SOCKET m_ListenSocket;
 SOCKET m_ConnSock;
 short m_ListenPort;
};

#endif

 

7. Accepter.cpp

#include "stdafx.h"
#include "Accepter.h"

/*************************************************************************
* Function name      :  Run
* description  :  用于启动监听线程,创建接收线程池,驱动接收线程池
* input        :  无
* output     :  无
* return      :  Success, Failed
*************************************************************************/
int CAccepter::Run()
{
 m_RecvMsg.Run();                                //创建接收线程池
 HANDLE hHandle = (HANDLE)_beginthreadex(NULL,
                      NULL,
                      CAccepter::ProcFun,
                      (LPVOID)this,
                      NULL,
                      NULL);
 if (NULL == hHandle)
 {
  AfxMessageBox(_T("Failed create Listen thread!"));
  return Failed;
 }
 return Success;
}

/*************************************************************************
* Function name      :  InitSock
* description  :  用于初始化Socket
* input        :  无
* output     :  无
* return      :  Success、Failed
*************************************************************************/
int CAccepter::InitSock()
{
 WSADATA wsaData;

 int nResult = WSAStartup( MAKEWORD(2,2), &wsaData );
 if ( nResult != NO_ERROR )
  return Failed;
 return Success;
}

/*************************************************************************
* Function name      :  ProcFun
* description  :  用于初始化Socket
* input        :  无
* output     :  无
* return      :  Success、Failed
*************************************************************************/
unsigned __stdcall CAccepter::ProcFun(LPVOID param)
{
 fd_set fdListen;
 struct timeval timeout = {0, 200};
 CAccepter *pThis = (CAccepter*)param;
 if (Success == pThis->InitSock())            //初始化套接字成功
 {
  pThis->m_ListenSocket = socket( AF_INET, SOCK_STREAM, 0);
  if (SOCKET_ERROR == pThis->m_ListenSocket)
  {
   AfxMessageBox(_T("Failed Create Socket"));
   return Failed;
  }

  //设置socket选项,重用地址
  int nReuseFlag = 1;
  int ret = setsockopt(pThis->m_ListenSocket, SOL_SOCKET, SO_REUSEADDR, (char*)&nReuseFlag, sizeof(int));

  //设置网络地址属性
  SOCKADDR_IN addr;
  memset(&addr, 0, sizeof(SOCKADDR_IN));
  addr.sin_family = AF_INET;
  addr.sin_port = htons(pThis->m_ListenPort);
  addr.sin_addr.s_addr = htonl(INADDR_ANY);

  //绑定地址
  ret = bind(pThis->m_ListenSocket, (struct sockaddr*)(&addr), sizeof(addr));
  if (SOCKET_ERROR == ret)
  {
   AfxMessageBox(_T("Failed Bind socket"));
   return Failed;
  }
  //监听端口
  ret = listen(pThis->m_ListenSocket, SOMAXCONN);
  if (SOCKET_ERROR == ret)
  {
   AfxMessageBox(_T("Failed Listen"));
   return Failed;
  }

  //客户端地址信息
  SOCKADDR_IN addrClient;
  int nLen = sizeof(addrClient);

  while (!pThis->m_bTerminate)               //检测外部终止条件
  {
   if (SOCKET_ERROR == pThis->m_ListenSocket || INVALID_SOCKET == pThis->m_ListenSocket)
   {
    AfxMessageBox(_T("Listen sock error!"));
    break;
   }

   FD_ZERO(&fdListen);
   FD_SET(pThis->m_ListenSocket, &fdListen);
   int nRet = select(0, &fdListen, NULL, NULL, &timeout);
   if ( SOCKET_ERROR == nRet )
   {
    AfxMessageBox(_T("select error"));
    break;
   }
   if ( 0 == nRet )             //超时,重新循环
   {
    continue;
   }

   pThis->m_ConnSock = accept(pThis->m_ListenSocket, (struct sockaddr*)(&addrClient), &nLen);
   if ( pThis->m_ConnSock == INVALID_SOCKET || SOCKET_ERROR == pThis->m_ConnSock)
   {
    AfxMessageBox(_T("Connect Socket Error"));
    continue;
   }

   SOCKET *pSock = new SOCKET;
   *pSock = pThis->m_ConnSock;
   pThis->m_RecvMsg.AddTask(pSock);           //驱动接收线程池运行。pSock在接收线程池删掉
  }
 }

 pThis->m_RecvMsg.TerminateThreadPool();      //终止接收线程池
 Sleep(200);
 AfxMessageBox(_T("Listen Thread exit!"));

 return Success;
}

 

8. MsgDealBase.h

/********************************************************************
File name:     MsgDealBase.h
Author:   lieyingshengbao  
Version:        1.0 
Date:           2012-11-16
Description:    本文件提供处理消息的基类
Others:     本类型使用C++定义 
Function List:  请参见正文
History:        修改历史记录列表,每条修改记录应包括修改日期、修改者及修改内容简述
1. Date:
Author:
Modification:
2. ...
*********************************************************************/
#ifndef MSG_DEAL_BASE_H_
#define MSG_DEAL_BASE_H_
#include "Public.h"
#include "MsgQueue.h"
#include <process.h>
#include <vector>
using std::vector;

class CMsgDealBase
{
public:
 virtual void GetCommandId(vector<u_int> &vec) = 0;
 virtual void ProcessFun(CMsg *pMsg) = 0;                //供派生类重写,具体处理过程

 void OnMessage(CMsg *pMsg);                               //起线程处理
private:
 static unsigned __stdcall ThreadDeal(LPVOID pParam);
};


class CProgram
{
public:
 CMsg *pMsg;
 CMsgDealBase *pThis;            //基类指针
};
#endif

 

9. MsgDealBase.cpp

#include "stdafx.h"
#include "MsgDealBase.h"

/*************************************************************************
* Function name      :  OnMessage
* description  :  用于启动线程,具体处理消息
* input        :  无
* output     :  无
* return      :  Success, Failed
*************************************************************************/
void CMsgDealBase::OnMessage(CMsg *pMsg)
{
 CProgram *pProgram = new CProgram;
 pProgram->pMsg = pMsg;
 pProgram->pThis = this;                           //当前对象为派生类
 HANDLE hHandle = (HANDLE)_beginthreadex(NULL,
                      NULL,
                      CMsgDealBase::ThreadDeal,
                      (LPVOID)pProgram,
                      NULL,
                      NULL);
 return;
}


/*************************************************************************
* Function name      :  ThreadDeal
* description  :  线程内,多态调用派生类的处理方法
* input        :  无
* output     :  无
* return      :  Success, Failed
*************************************************************************/
unsigned __stdcall CMsgDealBase::ThreadDeal(LPVOID pParam)
{
 CProgram *pTemp = (CProgram *)pParam;
 pTemp->pThis->ProcessFun(pTemp->pMsg);          //调用派生类的方法处理

 ////在派生类ProcessFun中删除
 //if (pTemp->pMsg)
 //{
 // delete pTemp->pMsg;
 // pTemp->pMsg = NULL;
 //}

 if (pTemp)
 {
  delete pTemp;
  pTemp = NULL;
 }

 return Success;
}

 

10. DealOnlineMsg.h         ---消息处理的派生类,继承自8.9

/********************************************************************
File name:     DealOnlineMsg.h
Author:      lieyingshengbao  
Version:        1.0 
Date:              2012-11-16
Description:    本文件提供处理上线消息的功能,继承CMsgDealBase类
Others:     本类型使用C++定义 
Function List:  请参见正文
History:        修改历史记录列表,每条修改记录应包括修改日期、修改者及修改内容简述
1. Date:
Author:
Modification:
2. ...
*********************************************************************/
#include "Public.h"
#include "MsgQueue.h"
#include "MsgDealBase.h"

class CDealOnlineMsg : public CMsgDealBase
{
public:
 void GetCommandId(vector<u_int> &vec)                              //重写获取CommandId方法
 {
  vec.push_back(ONLINEREQ);
 }

 void ProcessFun(CMsg *pMsg);                                                  //重写基类方法,具体处理消息。同时删掉pMsg。否则内存泄露
};

 

11. DealOnlineMsg.cpp

#include "stdafx.h"
#include "DealOnlineMsg.h"


/*************************************************************************
* Function name      :  ProcessFun
* description  :  派生类中具体处理消息
* input        :  无
* output     :  无
* return      :  Success, Failed
*************************************************************************/
void CDealOnlineMsg::ProcessFun(CMsg *pMsg)
{
 //具体处理消息
 AfxMessageBox(_T("Deal Msg"));               //用于测试是否走到此处

 if (pMsg)    //处理完成后删除结构
 {
  delete pMsg;
  pMsg = NULL;
 }
 return;
}

 

12. Dispatcher.h                 ------消息分发器

/********************************************************************
File name:     Dispatcher.h
Author:      lieyingshengbao  
Version:        1.0 
Date:              2012-11-16
Description:    本文件提供消息分发器的功能,即根据不同的消息Id,调用相应的对象处理
Others:     本类型使用C++定义 
Function List:  请参见正文
History:        修改历史记录列表,每条修改记录应包括修改日期、修改者及修改内容简述
1. Date:
Author:
Modification:
2. ...
*********************************************************************/
#include "Public.h"
#include "MsgQueue.h"
#include "MsgDealBase.h"
#include <process.h>
#include <map>
using std::map;

class CDispatcher
{
public:
 CDispatcher(CMsgManager *pMsgQueue):m_pMsgQueue(pMsgQueue),m_bTerminate(false)
 {

 }
 void Register(CMsgDealBase *pParam);      //注册功能。将消息Id和对应的处理对象建立关系

 u_int GetCommandId(CMsg *pMsg);          //获取消息的CommandId

 int MsgLoop();                                               //启动线程,循环获取消息
private:
 static unsigned __stdcall ThreadGetMsg(LPVOID Param);
public:
 bool m_bTerminate;
private:
 map<u_int, CMsgDealBase*> m_mapPatch;
 CMsgManager *m_pMsgQueue;
};

 

13. Dispatcher.cpp

#include "stdafx.h"
#include "Dispatcher.h"

/*************************************************************************
* Function name      :  MsgLoop
* description  :  创建线程
* input        :  无
* output     :  无
* return      :  Success, Failed
*************************************************************************/
int CDispatcher::MsgLoop()
{
 HANDLE hHandle = (HANDLE)_beginthreadex(NULL,
                      NULL,
                      CDispatcher::ThreadGetMsg,
                      (LPVOID)this,
                      NULL,
                      NULL);
 if (NULL == hHandle)
 {
  return Failed;
 }

 return Success;
}


/*************************************************************************
* Function name      :  GetCommandId
* description  :  获取消息的CommandId,方便提取对应的对象
* input        :  无
* output     :  无
* return      :  Success, Failed
*************************************************************************/
u_int CDispatcher::GetCommandId(CMsg* pMsg)
{
 CommonHead_S* pCommon = (CommonHead_S*)pMsg->MsgBuffer;
 u_int uCommandId = ntohl(pCommon->uMsgCommandId);                         //字节序转换
 return uCommandId;
}


/*************************************************************************
* Function name      :  MsgLoop
* description  :  不断从消息队列中获取消息
* input        :  无
* output     :  无
* return      :  Success, Failed
*************************************************************************/
unsigned __stdcall CDispatcher::ThreadGetMsg(LPVOID Param)
{
 CDispatcher *pThis = (CDispatcher*)Param;
 while (!pThis->m_bTerminate)
 {
  CMsg *pMsg = pThis->m_pMsgQueue->GetMsg();         //循环取消息,没有消息,则等待一段时间继续循环取消息
  if (NULL != pMsg)
  {
   u_int uTempCommandId = pThis->GetCommandId(pMsg);
   map<u_int, CMsgDealBase*>::iterator itCmds;
   for (itCmds=pThis->m_mapPatch.begin(); itCmds!=pThis->m_mapPatch.end(); ++itCmds)
   {
    if (uTempCommandId == itCmds->first)
    {
     itCmds->second->OnMessage(pMsg);                     //根据Id,调用派生类对象处理消息
     break;
    }
   }
  }
 }
 AfxMessageBox(_T("GetMsg Thread exit!"));

 return Success;
}

/*************************************************************************
* Function name      :  Register
* description  :  注册功能
* input        :  无
* output     :  无
* return      :  Success, Failed
*************************************************************************/
void CDispatcher::Register(CMsgDealBase *pParam)
{
 vector<u_int> vecTemp;
 pParam->GetCommandId(vecTemp);          //调用派生类的获取CommandId的方法

 for (vector<u_int>::size_type i=0; i!=vecTemp.size(); ++i)
 {
  u_int nTemp = vecTemp[i];
  m_mapPatch.insert(map<u_int, CMsgDealBase*>::value_type(nTemp, pParam));
 }
 return;
}

 

14. 调用时候:

.h中声明private对象

CAccepter m_Accepter;
CDispatcher m_Dispatcher;
CMsgManager m_UniqMsgQueue;           //唯一的消息队列
CDealOnlineMsg m_DealOnlineMsg;       //具体处理上线消息的对象

.cpp中:(此处在一个窗口中调用)

初始化:

CChatServerDlg::CChatServerDlg(CWnd* pParent /*=NULL*/)
 : CDialog(CChatServerDlg::IDD, pParent),
 m_Accepter(5, &m_UniqMsgQueue),                           //初始化列表方式,取工程中唯一的消息队列的地址传递
 m_Dispatcher(&m_UniqMsgQueue)
{
 m_hIcon = AfxGetApp()->LoadIcon(IDR_MAINFRAME);
}

 

调用时候:

m_Dispatcher.Register(&m_DealOnlineMsg);
m_Accepter.Run();
m_Dispatcher.MsgLoop();

终止条件:

m_Accepter.m_bTerminate = true;              //终止接收部分所有线程
m_Dispatcher.m_bTerminate = true;           //终止处理消息部分所有线程