Files
2026-02-13 14:34:15 +08:00

216 lines
5.3 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#include "StdAfx.h"
#include "IOCPServer.h"
#include "IOThreads.h"
//构造函数
CIOWorkerThread::CIOWorkerThread(void)
{
m_hCompletionPort = NULL;
m_pIOCPServer = NULL;
}
//析构函数
CIOWorkerThread::~CIOWorkerThread(void)
{
}
//配置函数
bool CIOWorkerThread::InitThread(HANDLE hCompletionPort, CIOCPServer* pIOCPServer)
{
ASSERT(hCompletionPort != NULL);
m_hCompletionPort = hCompletionPort;
m_pIOCPServer = pIOCPServer;
return true;
}
//运行函数
bool CIOWorkerThread::OnEventThreadRun()
{
//效验参数
ASSERT(m_hCompletionPort != NULL);
SOCKET socket = NULL;
DWORD BytesTransferred = 0;
stOverlappedBase* pSOverlapped = NULL;
BOOL bSuccess = GetQueuedCompletionStatus(m_hCompletionPort, &BytesTransferred, (PULONG_PTR)&socket, (LPOVERLAPPED*)&pSOverlapped, INFINITE);
if ( !bSuccess )
{
DWORD dwLastError = GetLastError();
// 64错误号表示"指定的网络名不再可用",客户端异常退出会产生这个错误号
if ( dwLastError != 64 ) {
TRACE1("GetQueuedCompletionStatus 发生了如下错误: %d\n", GetLastError());
}
}
// HasOverlappedIoCompleted
if (pSOverlapped==NULL&&BytesTransferred==0) {
TRACE("退出程序\n");
return false;
}
if (BytesTransferred == 0) {
TRACE("接受到0个字节的数据客户端可能断开了连接\n");
WORD nClient = pSOverlapped->nClient;
delete pSOverlapped;
pSOverlapped=NULL;
m_pIOCPServer->Disconnect(nClient);
return true;
}
if (pSOverlapped == NULL) {
TRACE("pSOverlapped == NULL\n");
return true;
}
if (pSOverlapped->IoMode == IoSend) {
// 发送事件
stSendOverlapped* pSendOverlapped = (stSendOverlapped*)pSOverlapped;
WORD nClient = pSendOverlapped->nClient;
if (pSendOverlapped->dwSentBytes + BytesTransferred < pSendOverlapped->dwTotalBytes) {
TRACE("========出现发送数据不完整的情况!!!========\n");
pSendOverlapped->dwSentBytes += BytesTransferred;
pSendOverlapped->WsaBuf.buf = pSendOverlapped->pBuffer + pSendOverlapped->dwSentBytes;
pSendOverlapped->WsaBuf.len = pSendOverlapped->dwTotalBytes - pSendOverlapped->dwSentBytes;
// 发送事件
DWORD SendBytes = 0;
if ( WSASend(socket, &pSendOverlapped->WsaBuf, 1, &SendBytes, 0,
(LPOVERLAPPED)pSendOverlapped, NULL) == SOCKET_ERROR )
{
delete[] pSendOverlapped->pBuffer;
pSendOverlapped->pBuffer=NULL;
delete pSendOverlapped;
pSendOverlapped=NULL;
int iError = WSAGetLastError();
if ( iError != ERROR_IO_PENDING )
{
TRACE1("IOCPServer::ServerWorkerThread - WSASend() 发生了如下错误: %d\n", iError);
// 异常错误事件
m_pIOCPServer->OnError(nClient, iError);
// 关闭SOCKET
m_pIOCPServer->Disconnect(nClient);
return 0;
}
}
}
else
{
m_pIOCPServer->OnSend(nClient, pSendOverlapped->pBuffer, pSendOverlapped->dwTotalBytes);
delete[] pSendOverlapped->pBuffer;
pSendOverlapped->pBuffer=NULL;
delete pSendOverlapped;
pSendOverlapped=NULL;
}
}
else if (pSOverlapped->IoMode == IoRecv)
{
// 接收事件
stRecvOverlapped* pRecvOverlapped = (stRecvOverlapped*)pSOverlapped;
WORD nClient = pRecvOverlapped->nClient;
char* pBuf = pRecvOverlapped->buffer;
// 触发接收事件
m_pIOCPServer->OnReceive(nClient, pBuf, BytesTransferred);
// 处理接收到的数据包
CTCPSocket* tcpSocket = m_pIOCPServer->GetTCPSocket(nClient);
ASSERT(tcpSocket!=NULL);
bool success = false;
try
{
success = tcpSocket->processRecvData(pBuf, BytesTransferred);
}
catch(...)
{
ASSERT(0);
TRACE("接收数据处理异常!");
}
delete pRecvOverlapped;
pRecvOverlapped=NULL;
// 发送接收请求
if (success) m_pIOCPServer->PostRecv(nClient, socket);
}
return true;
}
//////////////////////////////////////////////////////////////////////////
//构造函数
CIOAcceptThread::CIOAcceptThread(void)
{
m_hCompletionPort = NULL;
m_pIOCPServer = NULL;
m_hListenSocket = INVALID_SOCKET;
}
//析构函数
CIOAcceptThread::~CIOAcceptThread(void)
{
}
//配置函数
bool CIOAcceptThread::InitThread(HANDLE hCompletionPort, SOCKET hListenSocket, CIOCPServer* pIOCPServer)
{
ASSERT(hCompletionPort != NULL);
ASSERT(pIOCPServer != NULL);
ASSERT(hListenSocket != INVALID_SOCKET);
m_hListenSocket = hListenSocket;
m_hCompletionPort = hCompletionPort;
m_pIOCPServer = pIOCPServer;
return true;
}
//运行函数
bool CIOAcceptThread::OnEventThreadRun()
{
//效验参数
ASSERT(m_hCompletionPort != NULL);
ASSERT(m_pIOCPServer != NULL);
// 处理连接请求
SOCKADDR_IN saClient;
int iClientSize = sizeof(saClient);
SOCKET AcceptSocket = WSAAccept(m_hListenSocket, (SOCKADDR *)&saClient, &iClientSize, NULL, NULL);
if ( AcceptSocket == SOCKET_ERROR )
{
// 异常错误事件
m_pIOCPServer->OnError(INVALID_ID, WSAGetLastError());
return false;
}
CTCPSocket* tcpSocket = m_pIOCPServer->CreateTCPSocket(AcceptSocket, saClient.sin_addr.S_un.S_addr);
WORD nClientId = m_pIOCPServer->InserTCPSocket(tcpSocket);
if (nClientId == INVALID_ID) {
TRACE("插入TCPSocket对象失败请确认对象是否为空\n");
// 关闭Socket
if (closesocket(AcceptSocket) == SOCKET_ERROR) {
ErrorExit(_T("closesocket"));
return false;
}
return true;
}
// 连接事件
m_pIOCPServer->OnAccept(nClientId);
// 给当前连接进来的客户端SOCKET创建IO完成端口
HANDLE handle = CreateIoCompletionPort((HANDLE)AcceptSocket, m_hCompletionPort, AcceptSocket, 0);
if ( handle == NULL )
{
ErrorExit(_T("CreateIoCompletionPort"));
return false;
}
// 发送一个接收请求
m_pIOCPServer->PostRecv(nClientId, AcceptSocket);
return true;
}