Files
wnmj/Servers/服务器组件/网站网关服务器/IOThreads.cpp

216 lines
5.3 KiB
C++
Raw Normal View History

2026-02-13 14:34:15 +08:00
#include "StdAfx.h"
#include "IOCPServer.h"
#include "IOThreads.h"
//<2F><><EFBFBD><EFBFBD><ECBAAF>
CIOWorkerThread::CIOWorkerThread(void)
{
m_hCompletionPort = NULL;
m_pIOCPServer = NULL;
}
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
CIOWorkerThread::~CIOWorkerThread(void)
{
}
//<2F><><EFBFBD>ú<EFBFBD><C3BA><EFBFBD>
bool CIOWorkerThread::InitThread(HANDLE hCompletionPort, CIOCPServer* pIOCPServer)
{
ASSERT(hCompletionPort != NULL);
m_hCompletionPort = hCompletionPort;
m_pIOCPServer = pIOCPServer;
return true;
}
//<2F><><EFBFBD>к<EFBFBD><D0BA><EFBFBD>
bool CIOWorkerThread::OnEventThreadRun()
{
//Ч<><D0A7><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
ASSERT(m_hCompletionPort != NULL);
SOCKET socket = NULL;
DWORD BytesTransferred = 0;
stOverlappedBase* pSOverlapped = NULL;
BOOL bSuccess = GetQueuedCompletionStatus(m_hCompletionPort, &BytesTransferred, (PULONG_PTR)&socket, (LPOVERLAPPED*)&pSOverlapped, INFINITE);
if ( !bSuccess )
{
DWORD dwLastError = GetLastError();
// 64<36><34><EFBFBD><EFBFBD><EFBFBD>ű<EFBFBD>ʾ<><D6B8><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ٿ<EFBFBD><D9BF><EFBFBD>"<22><><EFBFBD>ͻ<EFBFBD><CDBB><EFBFBD><EFBFBD><EFBFBD>˳<EFBFBD><CBB3><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
if ( dwLastError != 64 ) {
TRACE1("GetQueuedCompletionStatus <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>´<EFBFBD><C2B4><EFBFBD><EFBFBD><EFBFBD> %d\n", GetLastError());
}
}
// HasOverlappedIoCompleted
if (pSOverlapped==NULL&&BytesTransferred==0) {
TRACE("<EFBFBD>˳<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>\n");
return false;
}
if (BytesTransferred == 0) {
TRACE("<EFBFBD><EFBFBD><EFBFBD>ܵ<EFBFBD>0<EFBFBD><EFBFBD><EFBFBD>ֽڵ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>ݣ<EFBFBD><EFBFBD>ͻ<EFBFBD><EFBFBD>˿<EFBFBD><EFBFBD>ܶϿ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>\n");
WORD nClient = pSOverlapped->nClient;
delete pSOverlapped;
pSOverlapped=NULL;
m_pIOCPServer->Disconnect(nClient);
return true;
}
if (pSOverlapped == NULL) {
TRACE("pSOverlapped == NULL\n");
return true;
}
if (pSOverlapped->IoMode == IoSend) {
// <20><><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD>
stSendOverlapped* pSendOverlapped = (stSendOverlapped*)pSOverlapped;
WORD nClient = pSendOverlapped->nClient;
if (pSendOverlapped->dwSentBytes + BytesTransferred < pSendOverlapped->dwTotalBytes) {
TRACE("========<3D><><EFBFBD>ַ<EFBFBD><D6B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ݲ<EFBFBD><DDB2><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>========\n");
pSendOverlapped->dwSentBytes += BytesTransferred;
pSendOverlapped->WsaBuf.buf = pSendOverlapped->pBuffer + pSendOverlapped->dwSentBytes;
pSendOverlapped->WsaBuf.len = pSendOverlapped->dwTotalBytes - pSendOverlapped->dwSentBytes;
// <20><><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD>
DWORD SendBytes = 0;
if ( WSASend(socket, &pSendOverlapped->WsaBuf, 1, &SendBytes, 0,
(LPOVERLAPPED)pSendOverlapped, NULL) == SOCKET_ERROR )
{
delete[] pSendOverlapped->pBuffer;
pSendOverlapped->pBuffer=NULL;
delete pSendOverlapped;
pSendOverlapped=NULL;
int iError = WSAGetLastError();
if ( iError != ERROR_IO_PENDING )
{
TRACE1("IOCPServer::ServerWorkerThread - WSASend() <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>´<EFBFBD><C2B4><EFBFBD><EFBFBD><EFBFBD> %d\n", iError);
// <20><EFBFBD><ECB3A3><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD>
m_pIOCPServer->OnError(nClient, iError);
// <20>ر<EFBFBD>SOCKET
m_pIOCPServer->Disconnect(nClient);
return 0;
}
}
}
else
{
m_pIOCPServer->OnSend(nClient, pSendOverlapped->pBuffer, pSendOverlapped->dwTotalBytes);
delete[] pSendOverlapped->pBuffer;
pSendOverlapped->pBuffer=NULL;
delete pSendOverlapped;
pSendOverlapped=NULL;
}
}
else if (pSOverlapped->IoMode == IoRecv)
{
// <20><><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD>
stRecvOverlapped* pRecvOverlapped = (stRecvOverlapped*)pSOverlapped;
WORD nClient = pRecvOverlapped->nClient;
char* pBuf = pRecvOverlapped->buffer;
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD>
m_pIOCPServer->OnReceive(nClient, pBuf, BytesTransferred);
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>յ<EFBFBD><D5B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ݰ<EFBFBD>
CTCPSocket* tcpSocket = m_pIOCPServer->GetTCPSocket(nClient);
ASSERT(tcpSocket!=NULL);
bool success = false;
try
{
success = tcpSocket->processRecvData(pBuf, BytesTransferred);
}
catch(...)
{
ASSERT(0);
TRACE("<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ݴ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>쳣!");
}
delete pRecvOverlapped;
pRecvOverlapped=NULL;
// <20><><EFBFBD>ͽ<EFBFBD><CDBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
if (success) m_pIOCPServer->PostRecv(nClient, socket);
}
return true;
}
//////////////////////////////////////////////////////////////////////////
//<2F><><EFBFBD><EFBFBD><ECBAAF>
CIOAcceptThread::CIOAcceptThread(void)
{
m_hCompletionPort = NULL;
m_pIOCPServer = NULL;
m_hListenSocket = INVALID_SOCKET;
}
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
CIOAcceptThread::~CIOAcceptThread(void)
{
}
//<2F><><EFBFBD>ú<EFBFBD><C3BA><EFBFBD>
bool CIOAcceptThread::InitThread(HANDLE hCompletionPort, SOCKET hListenSocket, CIOCPServer* pIOCPServer)
{
ASSERT(hCompletionPort != NULL);
ASSERT(pIOCPServer != NULL);
ASSERT(hListenSocket != INVALID_SOCKET);
m_hListenSocket = hListenSocket;
m_hCompletionPort = hCompletionPort;
m_pIOCPServer = pIOCPServer;
return true;
}
//<2F><><EFBFBD>к<EFBFBD><D0BA><EFBFBD>
bool CIOAcceptThread::OnEventThreadRun()
{
//Ч<><D0A7><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
ASSERT(m_hCompletionPort != NULL);
ASSERT(m_pIOCPServer != NULL);
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
SOCKADDR_IN saClient;
int iClientSize = sizeof(saClient);
SOCKET AcceptSocket = WSAAccept(m_hListenSocket, (SOCKADDR *)&saClient, &iClientSize, NULL, NULL);
if ( AcceptSocket == SOCKET_ERROR )
{
// <20><EFBFBD><ECB3A3><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD>
m_pIOCPServer->OnError(INVALID_ID, WSAGetLastError());
return false;
}
CTCPSocket* tcpSocket = m_pIOCPServer->CreateTCPSocket(AcceptSocket, saClient.sin_addr.S_un.S_addr);
WORD nClientId = m_pIOCPServer->InserTCPSocket(tcpSocket);
if (nClientId == INVALID_ID) {
TRACE("<EFBFBD><EFBFBD><EFBFBD><EFBFBD>TCPSocket<EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʧ<EFBFBD>ܣ<EFBFBD><EFBFBD><EFBFBD>ȷ<EFBFBD>϶<EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ƿ<EFBFBD>Ϊ<EFBFBD><EFBFBD>\n");
// <20>ر<EFBFBD>Socket
if (closesocket(AcceptSocket) == SOCKET_ERROR) {
ErrorExit(_T("closesocket"));
return false;
}
return true;
}
// <20><><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD>
m_pIOCPServer->OnAccept(nClientId);
// <20><><EFBFBD><EFBFBD>ǰ<EFBFBD><C7B0><EFBFBD>ӽ<EFBFBD><D3BD><EFBFBD><EFBFBD>Ŀͻ<C4BF><CDBB><EFBFBD>SOCKET<45><54><EFBFBD><EFBFBD>IO<49><4F><EFBFBD>ɶ˿<C9B6>
HANDLE handle = CreateIoCompletionPort((HANDLE)AcceptSocket, m_hCompletionPort, AcceptSocket, 0);
if ( handle == NULL )
{
ErrorExit(_T("CreateIoCompletionPort"));
return false;
}
// <20><><EFBFBD><EFBFBD>һ<EFBFBD><D2BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
m_pIOCPServer->PostRecv(nClientId, AcceptSocket);
return true;
}