여기서부턴 좀 더 제대로 흐름을 이해할 필요가 있는 것 같다.
아마 앞으로를 위해서도 중요할 것이다.
그런데 어떻게 정리해야 깔끔할지도 모르겠다.
그만큼 난이도가 올라간 것 같다.
1. 클래스 작성
일단 다시 Completion Port의 동작 과정에 대해 살펴보자.
- CP 핸들을 만든다
- CP 핸들에 소켓을 등록한다
- IO 함수를 건다
- GetQueuedCompletionStatus()가 감지
- 일을 처리하고 다시 IO 함수를 건다
였다.
CreateIoCompletionPort() 함수가 2가지 일을 한방에 처리하는 것은 정말 인상적이었다.
이제 이걸 하나의 작은 라이브러리의 형태로 구현해 보자.
1-1. IocpCore
이번 메모의 주제에 맞는 핵심 클래스이다.
// IocpCore.h
class IocpObject
{
public:
virtual HANDLE GetHandle() abstract;
virtual void Dispatch(class IocpEvent* iocpEvent, int32 numOfBytes = 0) abstract;
};
class IocpCore
{
public:
IocpCore();
~IocpCore();
HANDLE GetHandle() { return _iocpHandle; }
bool Register(class IocpObject* iocpObject);
bool Dispatch(uint32 timeoutMs = INFINITE);
private:
HANDLE _iocpHandle;
};
// TEMP
extern IocpCore GIocpCore;
핸들을 갖고 올 함수,
등록할 함수와 감지할 함수를 선언한다.
임시로 extern으로 전역변수를 선언한다. 여기서만 사용할 예정이다.
IocpObject는 나중에 써먹을 것이다.
일단 인터페이스만 만들어 둔 추상 클래스의 형태로 둔다.
이제 이들을 정의한다.
# IocpCore.cpp
#include "pch.h"
#include "IocpCore.h"
#include "IocpEvent.h"
IocpCore::IocpCore()
{
// CP 핸들 생성
_iocpHandle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
ASSERT_CRASH(_iocpHandle != INVALID_HANDLE_VALUE);
}
IocpCore::~IocpCore()
{
::CloseHandle(_iocpHandle);
}
bool IocpCore::Register(IocpObject* iocpObject)
{
// Iocp Object의 핸들의 소켓을 넘겨준다
return ::CreateIoCompletionPort(iocpObject->GetHandle(), _iocpHandle, reinterpret_cast<ULONG_PTR>(iocpObject), 0);
}
bool IocpCore::Dispatch(uint32 timeoutMs)
{
DWORD numOfBytes = 0;
IocpObject* iocpObject = nullptr;
IocpEvent* iocpEvent = nullptr;
if (::GetQueuedCompletionStatus(_iocpHandle, OUT &numOfBytes, OUT reinterpret_cast<PULONG_PTR>(&iocpObject), OUT reinterpret_cast<LPOVERLAPPED*>(&iocpEvent), timeoutMs))
{
// 문제가 없다면 Dispatch
iocpObject->Dispatch(iocpEvent, numOfBytes);
}
else
{
int32 errCode = ::WSAGetLastError();
switch (errCode)
{
// Timeout이라면 에러는 아님
case WAIT_TIMEOUT:
return false;
default:
// TODO : 로그 찍기
iocpObject->Dispatch(iocpEvent, numOfBytes);
break;
}
}
return true;
}
생성자에서 먼저 CP 핸들을 만들어 준다.
Register는 각 IocpObject에서 호출해 줄 것이므로 이를 인자로 받아서 등록해 줄 수 있게 한다.ㅇㅇ
Dispatch()에선 GetQueuedCompletionStatus()를 호출한다.
Timeout은 문제가 되는 에러는 아니므로 일단 넘어간다.
이외의 다양한 에러코드에 대한 처리가 필요하지만 여기선 생략하고,
default 케이스의 경우에도 IocpObject의 Dispatch()를 호출해 주도록 한다.
IocpEvent와 IocpObject가 다시 등장하는데, 먼저 이 클래스들을 작성해 보자.
1-2. IocpEvent
Connect, Accept, Recv, Send 등의 IO를 이벤트로 관리할 것이다.
먼저 아래와 같이 열거형 클래스와 OVERLAPPED를 상속받는 클래스를 작성해 준다.
Sessions도 차후에 필요하니, 일단은 전방선언만 해준다.
// IocpEvent.h
#pragma once
class Session;
enum class EventType : uint8
{
Connect,
Accept,
Recv,
Send
};
// 이전에 만들었던 OverlappedEx 구조체와 같은 역할을 한다고 볼 수 있다
class IocpEvent : public OVERLAPPED
{
public:
IocpEvent(EventType type);
void Init();
EventType GetType() { return _type; }
protected:
EventType _type;
};
IocpEvent는 OVERLAPPED를 상속받는다.
OverlappedEx와 같은 역할을 한다고 생각하면 된다.
그 구조는 아래와 같았다.
struct OverlappedEx
{
WSAOVERLAPPED overlapped = {};
int32 type = 0; // read, write, accept, connect ...
};
여하튼 OVERLAPPED를 상속받았기 때문에 얘를 캐스팅 해서 인자로 넘겨줄 수 있는 것이다.
그리고 일단은 Accept에 대해서만 테스트 할 것이기 때문에 아래와 같은 형태로 작성해 둔다.
class ConnectEvent : public IocpEvent
{
public:
ConnectEvent() : IocpEvent(EventType::Connect) { }
};
class AcceptEvent : public IocpEvent
{
public:
AcceptEvent() : IocpEvent(EventType::Accept) { }
void SetSession(Session* session) { _session = session; }
Session* GetSession() { return _session; }
private:
Session* _session = nullptr;
};
class RecvEvent : public IocpEvent
{
public:
RecvEvent() : IocpEvent(EventType::Recv) { }
};
class SendEvent : public IocpEvent
{
public:
SendEvent() : IocpEvent(EventType::Send) { }
};
IocpEvent 클래스의 초기화 함수를 정의해 보자.
// IocpEvent.cpp
#include "pch.h"
#include "IocpEvent.h"
IocpEvent::IocpEvent(EventType type) : _type(type)
{
Init();
}
void IocpEvent::Init()
{
OVERLAPPED::hEvent = 0;
OVERLAPPED::Internal = 0;
OVERLAPPED::InternalHigh = 0;
OVERLAPPED::Offset = 0;
OVERLAPPED::OffsetHigh = 0;
}
생성자에서 받은 타입을 변수에 넘겨주고 Init()을 호출한다.
Init()에서 본 기억이 있는 변수들을 0으로 밀어주고 있다.
hEvent를 제외하곤 OVERLAPPED에서 OS가 관여하는 부분이었다.
1-3. IocpObject
이 클래스는 아까 인터페이스만 만들어 두었다.
그렇다면 얘는 직접 구현하는 것이 아닌 상속을 통해 파생 클래스를 만드는 놈이라는 것을 알 수 있다.
그 파생 클래스로 Listener라는 클래스를 작성한다.
// Listener.h
#pragma once
#include "IocpCore.h"
#include "NetAddress.h"
class AcceptEvent;
class Listener : public IocpObject
{
public:
Listener() = default;
~Listener();
public:
/* 외부에서 사용 */
bool StartAccept(NetAddress netAddress);
void CloseSocket();
public:
/* 인터페이스 구현 */
virtual HANDLE GetHandle() override;
virtual void Dispatch(class IocpEvent* iocpEvent, int32 numOfBytes = 0) override;
private:
/* 수신 관련 */
void RegisterAccept(AcceptEvent* acceptEvent);
void ProcessAccept(AcceptEvent* acceptEvent);
protected:
SOCKET _socket = INVALID_SOCKET;
Vector<AcceptEvent*> _acceptEvents;
};
Listener 클래스는 IocpObject를 상속받는다.
여기선 Accept에 관한 일들을 처리할 것이다.
Accept를 위한 소켓 생성부터 AcceptEx 함수 걸기까지 수행해 줄 함수와
그 소켓을 정리할 함수를 선언한다.
물론 순수 가상 함수를 받아왔기 때문에 무조건 구현해 주어야만 한다.
그리고 내부에서 사용할 실질적으로 걸어주고 처리해 줄 함수들을 선언한다.
변수로는 소켓과 AcceptEvent를 담을 벡터를 둔다.
AcceptEvent 클래스에 대해선 일단 전방선언한다.
생성자는 default로 뒀으니 소멸자를 구현한다.
// Listener.cpp
Listener::~Listener()
{
SocketUtils::Close(_socket);
// Listner가 소멸까지 가는 경우는 거의 없지만
// 그래도 처리한다
for (AcceptEvent* acceptEvent : _acceptEvents)
{
// TODO
xdelete(acceptEvent);
}
}
Listener 클래스가 소멸될 경우는 사실상 없다시피 하겠지만, 그래도 만들어 둔다.
StartAccept()는 아래와 같이 정의한다.
bool Listener::StartAccept(NetAddress netAddress)
{
_socket = SocketUtils::CreateSocket();
if (_socket == INVALID_SOCKET)
return false;
if (GIocpCore.Register(this) == false)
return false;
if (SocketUtils::SetReuseAddress(_socket, true) == false)
return false;
if (SocketUtils::SetLinger(_socket, 0, 0) == false)
return false;
if (SocketUtils::Bind(_socket, netAddress) == false)
return false;
if (SocketUtils::Listen(_socket) == false)
return false;
// 한번만 걸었을 때동접이 몰리면 처리가 안될 수 있으므로
// 여유분을 만들어 둔다
const int32 acceptCount = 1;
for (int32 i = 0; i < acceptCount; i++)
{
AcceptEvent* acceptEvent = xnew<AcceptEvent>();
_acceptEvents.push_back(acceptEvent);
RegisterAccept(acceptEvent);
}
return false;
}
NetAddress를 받아서 순차적으로 소켓을 만들고 필요한 작업을 진행해 나간다.
아래쪽에 acceptEvent를 여러개 만들 수 있게끔 한 부분이 보일 텐데,
이는 혹시 모를 연결 처리에 실패에 대비하기 위함이다.
계산대는 하나인데 계산하려는 손님이 500명이면, 혼자 다 처리하기 힘들 것이고,
미처 처리하지 못하는 손님이 생길 수도 있다.
이를 위해 계산대의 수를 늘린 것과 같다고 생각하면 된다.
이제 인터페이스를 구현해 보자.
HANDLE Listener::GetHandle()
{
return reinterpret_cast<HANDLE>(_socket);
}
void Listener::Dispatch(IocpEvent* iocpEvent, int32 numOfBytes)
{
ASSERT_CRASH(iocpEvent->GetType() == EventType::Accept);
AcceptEvent* acceptEvent = static_cast<AcceptEvent*>(iocpEvent);
ProcessAccept(acceptEvent);
}
핸들은 소켓을 핸들로 캐스팅해서 던져주면 된다.
Dispatch는 결과적으로 ProcessAccept()를 호출하게 된다.
RegisterAccept는 아래와 같이 정의한다.
void Listener::RegisterAccept(AcceptEvent* acceptEvent)
{
Session* session = xnew<Session>();
acceptEvent->Init();
acceptEvent->SetSession(session);
DWORD bytesReceived = 0;
// Local/RemoteAddressLength를 저렇게 입력한 것은
// 아래 AcceptEx()에 대한 공식 문서를 따름
// https://learn.microsoft.com/ko-kr/windows/win32/api/mswsock/nf-mswsock-acceptex
// IocpEvent는 OVERLLAPED를 상속받으므로 static_cast로 형변환
if (false == SocketUtils::AcceptEx(_socket, session->GetSocket(), session->_recvBuffer, 0, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, OUT & bytesReceived, static_cast<LPOVERLAPPED>(acceptEvent)))
{
const int32 errorCode = ::WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
// Pending이 아닌데 그냥 끝나버리면 RegisterAccept를 걸어줄 게 없다
// 일단 다시 Accept 걸어준다
RegisterAccept(acceptEvent);
}
}
}
Session 구조체에 대해선, 이 클래스 작성이 끝난 뒤에 기술한다.
여기선 AcceptEx를 실행해 걸어주는 동작을 수행한다.
AcceptEx의 인자 중엔 저렇게 Local/RemoteAddressLength를 받는 인자가 있는데,
여기는 공식 문서에 따라 sizeof(SOCKADDR_IN)에 16을 더한 값을 사용한다.
이렇게 함수를 걸었고 여기서 만약 에러가 났다고 하자.
그 내용이 Pending이 아니라면 진짜 문제가 생긴 것이다.
하지만 문제가 생겼다고 해서 다시 Accept를 등록해 주지 않으면 안 된다.
등록하지 않으면 앞으로 Accept를 할 수 없게 된다.
ProcessAceept는 아래와 같이 정의한다.
void Listener::ProcessAccept(AcceptEvent* acceptEvent)
{
Session* session = acceptEvent->GetSession();
if (false == SocketUtils::SetUpdateAcceptSocket(session->GetSocket(), _socket))
{
RegisterAccept(acceptEvent);
return;
}
SOCKADDR_IN sockAddress;
int32 sizeOfSockAddr = sizeof(sockAddress);
if (SOCKET_ERROR == ::getpeername(session->GetSocket(), OUT reinterpret_cast<SOCKADDR*>(&sockAddress), &sizeOfSockAddr))
{
RegisterAccept(acceptEvent);
return;
}
session->SetNetAddress(NetAddress(sockAddress));
cout << "Client Connected!" << endl;
// TODO
// AcceptEvent를 한번 만들어서 계속 재사용한다!
RegisterAccept(acceptEvent);
}
SetUpdateAcceptSocket()은 ListenSocket의 특성을 그대로 ClientSocket에 적용하는 함수였다.
거기서 문제가 생기면 Accept를 등록해 주고 리턴한다.
여기서 getpeername이라는 것이 나오는데, 이름대로 세션의 정보를 얻어오는 함수다.
역시 문제가 생기면 Accept를 등록해 주고 리턴한다.
세션에 주소 정보를 저장하고 정말 클라이언트와 연결이 됐음을 콘솔 창에 알린다.
그리고 Accept를 등록하여 다음 연결을 받을 수 있게 한다.
1-4. Session
이전엔 세션을 구조체로 아래와 같이 정의했었다.
struct Session
{
SOCKET socket = INVALID_SOCKET;
char recvBuffer[BUFSIZE] = {};
int32 recvBytes = 0;
};
이런 형태로는 다양한 요구에 대응하지 못하므로,
IocpObject를 상속받아 하나의 클래스로 아래와 같은 형태로 만든다.
// Session.h
#pragma once
#include "IocpCore.h"
#include "IocpEvent.h"
#include "NetAddress.h"
class Session : public IocpObject
{
public:
Session();
virtual ~Session();
public:
/* 정보 관련 */
void SetNetAddress(NetAddress address) { _netAddress = address; }
NetAddress GetAddress() { return _netAddress; }
SOCKET GetSocket() { return _socket; }
public:
/* 인터페이스 구현 */
virtual HANDLE GetHandle() override;
virtual void Dispatch(class IocpEvent* iocpEvent, int32 numOfBytes = 0) override;
public:
// TEMP
// 인자에서 요구해서 임시로
char _recvBuffer[1000];
private:
SOCKET _socket = INVALID_SOCKET;
NetAddress _netAddress = {};
Atomic<bool> _connected = false;
};
주소를 설정하는 함수, 주소나 소켓을 가져오는 함수를 만든다.
수신버퍼를 저렇게 두지 않을 것이지만, 당장은 저렇게 임시로 선언해 둔다.
변수로는 세션의 소켓, 주소, 연결 여부를 가질 것이다.
// Session.cpp
#include "pch.h"
#include "Session.h"
#include "SocketUtils.h"
Session::Session()
{
_socket = SocketUtils::CreateSocket();
}
Session::~Session()
{
SocketUtils::Close(_socket);
}
HANDLE Session::GetHandle()
{
return reinterpret_cast<HANDLE>(_socket);
}
void Session::Dispatch(IocpEvent* iocpEvent, int32 numOfBytes)
{
// 접속 테스트만 할 것이기 때문에 비워둠
// TODO
}
세션 클래스는 생성되면서 소켓을 만든다. 당연히 소멸되면 소켓을 닫는다.
Dispatch() 함수엔 원래 다른 처리가 들어가지만, 여기선 접속에 관한 것만 테스트할 것이므로 일단 비워둔다.
2. 서버 작성
일단 어느 정도 클래스와 필요한 함수의 작성이 완료됐으니 서버를 작성해 접속 테스트를 해 보자.
리스너 소켓을 만들고 Accept까지 해줄 함수를 이미 만들어 놨으니 그 함수 한방이면 될 것 같다.
// GameServer.cpp
int main()
{
Listener listener;
listener.StartAccept(NetAddress(L"127.0.0.1", 7777)); // L"" -> Wide Character
// 워커 스레드...
}
이거면 AcceptEx를 걸어주는 것까지 한방에 해결된다.
걸어줬으면 낚아줄 놈도 있어야 한다.
Dispatch()를 호출해 줄 워커 스레드를 5개 정도만 만들어 주자.
for (int32 i = 0; i < 5; i++)
{
GThreadManager->Launch([=]()
{
while (true)
{
GIocpCore.Dispatch();
}
});
}
GThreadManager->Join();
IocpCore의 Dispatch() 함수가 뭘 하는지 다시 보자.
bool IocpCore::Dispatch(uint32 timeoutMs)
{
DWORD numOfBytes = 0;
IocpObject* iocpObject = nullptr;
IocpEvent* iocpEvent = nullptr;
if (::GetQueuedCompletionStatus(_iocpHandle, OUT &numOfBytes, OUT reinterpret_cast<PULONG_PTR>(&iocpObject), OUT reinterpret_cast<LPOVERLAPPED*>(&iocpEvent), timeoutMs))
{
// 문제가 없다면 Dispatch
iocpObject->Dispatch(iocpEvent, numOfBytes);
}
else
{
int32 errCode = ::WSAGetLastError();
switch (errCode)
{
// Timeout이라면 에러는 아님
case WAIT_TIMEOUT:
return false;
default:
// TODO : 로그 찍기
iocpObject->Dispatch(iocpEvent, numOfBytes);
break;
}
}
return true;
}
GetQueuedCompletionStatus()를 호출해 감지하고 감지가 되면,
해당 IocpObject의 Dispatch()를 호출해(여기선 Listener의 Dispatch()) 일을 처리하게 한다.
모든 게 정상이라면 정상적으로 Accept 되어 클라이언트와의 연결에 성공할 것이다.
3. 실행
잘 연결됐고 잘 보낸다.
클라이언트는 이벤트 기반 Overlapped로 통신하고 있다.
4. 생각할 점
일단 지금은 Accept에 대해서만 기능을 구현했는데, 모델의 흐름에 대해 이해가 없으면,
다음에 이루어질 Recv나 Send를 제대로 구현하고 이해할 수 없을 것이다.
흐름을 다시 생각해 보자.
- StartAccept로 Accept를 걸어준다 ( 낚싯대를 드리운다 )
- 메인에서 워커 스레드들을 생성해 GetQueuedCompletionStatus()를 호출하여 감지한다 ( 입질을 기다림 )
- 만약 여기서 문제가 생겼다면 다시 Accept를 걸어준다 ( 다시 문제를 고쳐서 낚싯대를 드리운다 )
- 문제가 없다면 ProcessAccept()로 넘어가 실제 연결을 처리한다 ( 입질이 와서 낚아 올렸다 )
- 다른 클라이언트의 연결을 받아야 하므로 Accept()를 다시 걸어준다 ( 또 낚기 위해선 낚싯대를 드리워야 한다)
이번에 Accept 하는 부분은 이런 흐름으로 흘러간다고 생각한다.
여하튼 CP의 이런 흐름에 대해 계속 의식할 필요가 있을 것이다.
아직까진 DirectX를 공부할 때 보단 재밌는 것 같다.
이대로 적성을 붙일 수 있으면 좋으련만...
'Study > C++ & C#' 카테고리의 다른 글
[C++] Session (0) | 2023.06.24 |
---|---|
[C++] Service (0) | 2023.06.23 |
[C++] Socket util 클래스 작성 (0) | 2023.06.20 |
[C++] Completion Port Model (0) | 2023.06.17 |
[C++] Overlapped Model (0) | 2023.06.16 |