Files
wnmj/Servers/服务器组件/内核引擎/QueueService.cpp

229 lines
4.7 KiB
C++
Raw Normal View History

2026-02-13 14:34:15 +08:00
#include "StdAfx.h"
#include "QueueService.h"
//////////////////////////////////////////////////////////////////////////
//<2F><><EFBFBD><EFBFBD><ECBAAF>
CQueueServiceThread::CQueueServiceThread(void)
{
m_hCompletionPort = NULL;
memset(m_cbBuffer, 0, sizeof(m_cbBuffer));
}
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
CQueueServiceThread::~CQueueServiceThread(void)
{
}
//<2F><><EFBFBD>ú<EFBFBD><C3BA><EFBFBD>
bool CQueueServiceThread::InitThread(HANDLE hCompletionPort)
{
//Ч<><D0A7><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
ASSERT(IsRuning() == false);
ASSERT(m_hCompletionPort == NULL);
//<2F><><EFBFBD>ñ<EFBFBD><C3B1><EFBFBD>
m_hCompletionPort = hCompletionPort;
memset(m_cbBuffer, 0, sizeof(m_cbBuffer));
return true;
}
//ȡ<><C8A1><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
bool CQueueServiceThread::UnInitThread()
{
//Ч<><D0A7><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
ASSERT(IsRuning() == false);
//<2F><><EFBFBD>ñ<EFBFBD><C3B1><EFBFBD>
m_hCompletionPort = NULL;
memset(m_cbBuffer, 0, sizeof(m_cbBuffer));
return true;
}
//<2F><><EFBFBD>к<EFBFBD><D0BA><EFBFBD>
bool CQueueServiceThread::OnEventThreadRun()
{
//Ч<><D0A7><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
ASSERT(m_hCompletionPort != NULL);
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
DWORD dwThancferred = 0;
OVERLAPPED * pOverLapped = NULL;
CQueueService * pQueueService = NULL;
//<2F>ȴ<EFBFBD><C8B4><EFBFBD><EFBFBD>ɶ˿<C9B6>
if (GetQueuedCompletionStatus(m_hCompletionPort, &dwThancferred, (PULONG_PTR)&pQueueService, &pOverLapped, INFINITE))
{
//<2F>ж<EFBFBD><D0B6>˳<EFBFBD>
if (pQueueService == NULL) return false;
//<2F><>ȡ<EFBFBD><C8A1><EFBFBD><EFBFBD>
tagDataHead DataHead;
bool bSuccess = pQueueService->GetData(DataHead, m_cbBuffer, sizeof(m_cbBuffer));
ASSERT(bSuccess == true);
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
if (bSuccess == true)
pQueueService->OnQueueServiceThread(DataHead, m_cbBuffer, DataHead.wDataSize);
return true;
}
return false;
}
//////////////////////////////////////////////////////////////////////////
//<2F><><EFBFBD><EFBFBD><ECBAAF>
CQueueService::CQueueService(void)
{
m_bService = false;
m_hCompletionPort = NULL;
m_pIQueueServiceSink = NULL;
}
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
CQueueService::~CQueueService(void)
{
//ֹͣ<CDA3><D6B9><EFBFBD><EFBFBD>
ConcludeService();
return;
}
//<2F>ӿڲ<D3BF>ѯ
void * CQueueService::QueryInterface(const IID & Guid, DWORD dwQueryVer)
{
QUERYINTERFACE(IQueueService, Guid, dwQueryVer);
QUERYINTERFACE_IUNKNOWNEX(IQueueService, Guid, dwQueryVer);
return NULL;
}
//<2F><><EFBFBD>ýӿ<C3BD>
bool CQueueService::SetQueueServiceSink(IUnknownEx * pIUnknownEx)
{
ASSERT(pIUnknownEx != NULL);
m_pIQueueServiceSink = QUERY_OBJECT_PTR_INTERFACE(pIUnknownEx, IQueueServiceSink);
ASSERT(m_pIQueueServiceSink != NULL);
return (m_pIQueueServiceSink != NULL);
}
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ
bool CQueueService::GetBurthenInfo(tagBurthenInfo & BurthenInfo)
{
CWHDataLocker lock(m_CriticalSection);//
m_DataQueue.GetBurthenInfo(BurthenInfo);
return true;
}
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
bool CQueueService::AddToQueue(WORD wIdentifier, void * const pBuffer, WORD wDataSize)
{
CWHDataLocker lock(m_CriticalSection);//
m_DataQueue.InsertData(wIdentifier, pBuffer, wDataSize);
PostQueuedCompletionStatus(m_hCompletionPort, wDataSize, (ULONG_PTR)this, NULL);
return true;
}
//<2F><>ʼ<EFBFBD><CABC><EFBFBD><EFBFBD>
bool CQueueService::StartService()
{
//Ч<><D0A7><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
ASSERT(m_bService == false);
ASSERT(m_hCompletionPort == NULL);
ASSERT(m_pIQueueServiceSink != NULL);
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ɶ˿<C9B6>
m_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1);
if (m_hCompletionPort == NULL) throw TEXT("<EFBFBD><EFBFBD><EFBFBD>ж<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ɶ˿ڴ<EFBFBD><EFBFBD><EFBFBD>ʧ<EFBFBD><EFBFBD>");
//<2F><><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
if (m_QueueServiceThread.InitThread(m_hCompletionPort) == false) throw TEXT("<EFBFBD><EFBFBD><EFBFBD>ж<EFBFBD><EFBFBD><EFBFBD><EFBFBD>̳߳<EFBFBD>ʼ<EFBFBD><EFBFBD>ʧ<EFBFBD><EFBFBD>");
if (m_QueueServiceThread.StartThread() == false) throw TEXT("<EFBFBD><EFBFBD><EFBFBD>ж<EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʧ<EFBFBD><EFBFBD>");
// SetThreadPriority(m_QueueServiceThread.GetThreadHandle(), REALTIME_PRIORITY_CLASS);
//<2F><><EFBFBD>ò<EFBFBD><C3B2><EFBFBD>
m_bService = true;
return true;
}
//ֹͣ<CDA3><D6B9><EFBFBD><EFBFBD>
bool CQueueService::ConcludeService()
{
if ( m_bService )
{
//<2F>ȴ<EFBFBD><C8B4><EFBFBD><EFBFBD>ݰ<EFBFBD><DDB0><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
tagBurthenInfo bi;
CWHDataLocker lock(m_CriticalSection, false);
while(true)
{
lock.Lock();
m_DataQueue.GetBurthenInfo(bi);
lock.UnLock();
if (0==bi.dwDataPacketCount)
{
break;
}
else
{
Sleep(30);
}
}
//<2F><><EFBFBD>ñ<EFBFBD><C3B1><EFBFBD>
m_bService = false;
}
//ֹͣ<CDA3>߳<EFBFBD>
if (m_hCompletionPort != NULL)
{
PostQueuedCompletionStatus(m_hCompletionPort, 0, NULL, NULL);
}
m_QueueServiceThread.ConcludeThread(INFINITE);
m_QueueServiceThread.UnInitThread();
//<2F>ر<EFBFBD><D8B1><EFBFBD><EFBFBD>ɶ˿<C9B6>
if (m_hCompletionPort != NULL)
{
CloseHandle(m_hCompletionPort);
m_hCompletionPort = NULL;
}
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
m_DataQueue.RemoveData(false);
m_pIQueueServiceSink = NULL;
return true;
}
//<2F>߳̾<DFB3><CCBE><EFBFBD>
HANDLE CQueueService::GetThreadHandle()
{
return m_QueueServiceThread.GetThreadHandle();
}
//<2F><>ȡ<EFBFBD><C8A1><EFBFBD><EFBFBD>
bool CQueueService::GetData(tagDataHead & DataHead, void * pBuffer, WORD wBufferSize)
{
CWHDataLocker lock(m_CriticalSection);//
return m_DataQueue.DistillData(DataHead, pBuffer, wBufferSize);
}
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ
void CQueueService::OnQueueServiceThread(const tagDataHead & DataHead, void * pBuffer, WORD wDataSize)
{
VERIFY_RETURN_VOID(m_pIQueueServiceSink != NULL);
try
{
m_pIQueueServiceSink->OnQueueServiceSink(DataHead.wIdentifier, pBuffer, DataHead.wDataSize);
}
catch (...) {}
return;
}