SendBuffer는 내용이 많아서 과정을 분할해 올린다.
먼저 이번엔 SendBuffer 자체의 구현에 대해 다룬다.
1. SendBuffer 클래스 작성
RecvBuffer처럼 클래스를 만들어서 관리한다.
#pragma once
// 송신 버퍼도 WSASend 중에 없어지거나 하면 안되기 때문에
// 레퍼런스 카운팅이 이루어져야 한다
class SendBuffer : enable_shared_from_this<SendBuffer>
{
public:
SendBuffer(int32 bufferSize);
~SendBuffer();
BYTE* Buffer() { return _buffer.data(); }
int32 WriteSize() { return _writeSize; }
int32 Capacity() { return static_cast<int32>(_buffer.size()); }
void CopyData(void* data, int32 len);
private:
Vector<BYTE> _buffer;
int32 _writeSize = 0;
};
먼저 이전과 동일하게 버퍼 자체는 벡터의 형태로 만들었다.
생성자에서 크기를 받아 해당 벡터를 우리가 사용할 형태로 크기를 잡아준다.
그리고 enable_shared_from_this 클래스를 상속받은 이유로는 주석에도 쓰여 있지만,
WSASend()가 걸려있는 와중에 버퍼가 사라져 버리면 안되기 때문에 레퍼런스 카운팅을 해 주기 위해서이다.
버퍼의 첫번째 포인터를 리턴해줄 함수,
데이터의 크기를 리턴해줄 함수,
버퍼의 용량을 리턴해줄 함수를 정의한다.
#include "pch.h"
#include "SendBuffer.h"
SendBuffer::SendBuffer(int32 bufferSize)
{
_buffer.resize(bufferSize);
}
SendBuffer::~SendBuffer()
{
}
void SendBuffer::CopyData(void* data, int32 len)
{
ASSERT_CRASH(Capacity() >= len);
::memcpy(_buffer.data(), data, len);
_writeSize = len;
}
CopyData()는 직접 데이터와 길이를 받아서 버퍼에 memcpy()를 통해 복사한다.
나중에 테스트 할 때 이 함수가 사용될 것이다.
2. 수정 사항 반영
먼저 새로 작성한 SendBuffer가 정상 동작하는지 확인해 보자.
기존에 임시 버퍼로 사용하던 부분들을 새로 작성한 클래스로 대체한다.
SendBuffer도 SendBufferRef를 선언하여 관리한다.
class SendEvent : public IocpEvent
{
public:
SendEvent() : IocpEvent(EventType::Send) { }
Vector<SendBufferRef> sendBuffers;
};
Event에선 BYTE형 벡터였지만 SendBuffer의 포인터를 갖고 있는 벡터로 재정의한다.
한 곳에 버퍼들을 모아두어 한방에 Send 하기 편해진다.
Session 클래스도 이와 관련해 수정하거나 추가항 사항들이 있다.
// ...
public:
/* 외부에서 사용 */
void Send(SendBufferRef sendBuffer); // SendBuffer 클래스를 받게 수정
bool Connect();
void Disconnect(const WCHAR* cause);
// ...
private:
/* 전송 관련 */
// ...
void RegisterSend();
// ...
void ProcessSend(int32 numOfBytes);
// ...
private:
USE_LOCK;
/* 수신 관련 */
RecvBuffer _recvBuffer;
/* 송신 관련 */
// 이전엔 매번 복사가 일어났는데, 이젠 복사 없이 관리가 가능함
Queue<SendBufferRef> _sendQueue;
Atomic<bool> _sendRegistered = false;
private:
// ...
SendEvent _sendEvent; // sendEvent를 재사용 할 것이므로 관련 된 부분 수정
};
먼저 BYTE형 버퍼를 받던 부분은 새로 만든 버퍼 클래스를 인자로 받도록 했다.
sendEvent는 재사용 할 것이므로 기존 SendEvent를 받던 부분들도 이를 받지 않게 수정한다.
그리고 새로이 sendQueue와 sendRegistered를 추가했다.
큐에 버퍼들을 몰아넣어 일괄로 전송하는 데 있어 편리함을 챙겼다.
sendRegistered는 Atomic으로 선언하여 현재 WSASend()가 걸려있는지에 대한 여부를 갖고 있을 것이다.
void Session::Send(SendBufferRef sendBuffer)
{
// 현재 RegisterSend가 걸리지 않은 상태라면, 걸어준다
WRITE_LOCK;
_sendQueue.push(sendBuffer);
// 지금은 LOCK을 걸어두었기 때문에 굳이 exchange()를 사용하지 않아도 된다
// 하지만 나중에 다른 방식을 사용하게 될 수도 있으니
// Atomic을 따라 exchange()를 사용한다
if (_sendRegistered.exchange(true) == false)
RegisterSend();
}
이벤트를 계속 재사용 할 것이기 때문에 동적 생성하는 부분은 사라졌다.
그리고 버퍼 포인터를 큐에 Push 하기 때문에, memcpy()로 인한 복사 비용도 사라졌다.
지금까지 문제가 없고, Send가 걸린 상태가 아니라면 sendRegisterd를 true로 바꾸고 Send를 걸어준다.
void Session::RegisterSend()
{
if (IsConnected() == false)
return;
_sendEvent.Init();
_sendEvent.owner = shared_from_this(); // ADD_REF
// 보낼 데이터를 sendEvent에 등록
{
WRITE_LOCK;
int32 writeSize = 0;
while (_sendQueue.empty() == false)
{
SendBufferRef sendBuffer = _sendQueue.front();
// writeSize를 추적을 해서 너무 많은 데이터를 보내지 않게 관리
// SendBuffer도 사이즈에 한계가 있으므로 필요
writeSize += sendBuffer->WriteSize();
// TODO : 예외 체크
_sendQueue.pop();
_sendEvent.sendBuffers.push_back(sendBuffer);
}
}
// Scatter-Gather (흩어져 있는 데이터들을 모아서 한 방에 보낸다)
Vector<WSABUF> wsaBufs;
wsaBufs.reserve(_sendEvent.sendBuffers.size());
for (SendBufferRef sendBuffer : _sendEvent.sendBuffers)
{
WSABUF wsaBuf;
wsaBuf.buf = reinterpret_cast<char*>(sendBuffer->Buffer());
wsaBuf.len = static_cast<LONG>(sendBuffer->WriteSize());
wsaBufs.push_back(wsaBuf);
}
DWORD numOfBytes = 0;
if (SOCKET_ERROR == ::WSASend(_socket, wsaBufs.data(), static_cast<DWORD>(wsaBufs.size()), OUT &numOfBytes, 0, &_sendEvent, nullptr))
{
int32 errorCode = ::WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
HandleError(errorCode);
_sendEvent.owner = nullptr; // RELEASE_REF
_sendEvent.sendBuffers.clear(); // RELEASE_REF
_sendRegistered.store(false); // 기존 값을 확인 할 필요가 없으므로 exchange()가 아니라 store()로
}
}
}
여긴 좀 추가한 것들이 많다.
먼저 이벤트를 초기화 해 주고 owner에 포인터를 넘겨줘 사라지지 않도록 한다.
Send()에서 락을 걸어놨는데 왜 또 락을 거냐 싶을 수 있다.
기존의 처리 방식이 바뀔 수 있으므로 그 상황을 대비해 락을 이중으로 걸어둔 것이다.
구현을 변경했을 때, 저 부분에 락이 필요함에도 불구 미처 락을 걸지 못했을 때 좋은 보험이 되어줄 것이다.
다시 돌아와서, 큐가 비어있지 않다면 큐의 머리를 이벤트에서 관리하는 버퍼에 옮겨준다.
계속 writeSize를 추적하는데, 이는 버퍼의 용량을 초과하는 데이터가 들어오지 못하게 감시하는 역할을 한다.
만약 초과하는 데이터가 들어오려고 하면 문제상황으로 판단 하고 이에 필요한 예외처리가 필요하다.
이벤트 버퍼에 모든 버퍼가 이동됐다면 WSABUF 벡터에 그 내용물들을 옮긴다.
그리고 WSASend()에 data()로 첫 포인터를 넘겨주고 wsaBuf의 사이즈를 넘겨줘 한방에 저 버퍼들을 처리할 수 있게 했다.
문제가 생기면 메모리 누수를 방지하기 위해 포인터들을 다 밀어주고 sendRegistered를 false로 변경한다.
void Session::ProcessSend(int32 numOfBytes)
{
_sendEvent.owner = nullptr; // RELEASE_REF
_sendEvent.sendBuffers.clear(); // RELEASE_REF, 버퍼를 사용했으까 밀어줌
if (numOfBytes == 0)
{
Disconnect(L"Send 0");
return;
}
// 컨텐츠 코드에서 재정의
OnSend(numOfBytes);
WRITE_LOCK;
if (_sendQueue.empty())
_sendRegistered.store(false);
else
RegisterSend();
}
sendEvent에 관한 부분들을 고쳐준다.
락을 걸어준 후, 큐가 비었다면 sendRegistered를 false로 변경하고,
큐가 비어있지 않다면 Send를 걸어줘 나머지 버퍼를 처리할 수 있도록 한다.
ProcessSend()에 변경점이 있으므로 Dispatch()의 내용도 그에 맞게 수정해 준다.
세션도 수정이 완료됐으므로 일단 동작을 살펴보자.
GameServer에 임시로 만들어 둔 GameSession 클래스를 수정한다.
int32 GameSession::OnRecv(BYTE* buffer, int32 len)
{
// Echo
cout << "OnRecv Len = " << len << endl;
SendBufferRef sendBuffer = MakeShared<SendBuffer>(4096);
sendBuffer->CopyData(buffer, len);
Send(sendBuffer);
return len;
}
OnRecv의 내용을 SendBuffer 클래스를 사용하게 고쳤다.
인자로 받은 버퍼를 CopyData()를 사용해 버퍼에 넘겨주고 그 내용을 Send()로 걸어준다.
이렇게 빌드해 테스트 하면 동작 자체는 잘 이루어질 것이다.
3. 세션 매니저
어쨌든 서버엔 많은 클라이언트가 붙을 것이다.
특히 MMORPG의 경우 하나의 현상에 대한 결과를 주변에 있는 모든 플레이어에게 전달해야 한다.
동접이 1000명이고 모두 같은 필드에 있다고 가정하자.
플레이어 A가 몹 B에 대해 공격을 수행했고 이로 인해 몹이 데미지를 입었다.
A가 공격을 수행했고 B가 데미지를 입었다는 사실을 주변에 있는 999명에게도 알려주어야 한다.
그럼 그 999명에 대해 일일이 해당하는 세션을 찾고, 패킷을 담아 Send를 호출하여 전송하면 매우 부하가 심할 것이다.
이러한 Broadcasting이 필요한 상황에 대비하여 세션을 통으로 갖고 관리할 세션 매니저를 만든다.
서비스에서 이미 세션을 관리하고 있지만, 그것은 데이터 통신을 위함이고,
세션 매니저는 컨텐츠 단에서 사용하기 위한 기능들을 위해 필요하다.
#pragma once
class GameSession;
using GameSessionRef = shared_ptr<GameSession>;
class GameSessionManager
{
public:
void Add(GameSessionRef session);
void Remove(GameSessionRef session);
void Broadcast(SendBufferRef sendBuffer);
private:
USE_LOCK;
Set<GameSessionRef> _sessions;
};
extern GameSessionManager GSessionManager;
지금 가장 핵심이 되는 기능은 Broadcast()라고 할 수 있다.
SendBuffer 클래스에 대해 레퍼런스 카운팅을 하고자 한 이유이기도 하다.
세션 매니저는 전역 변수로 설정해 편하게 쓸 수 있도록 했다.
여기서 이렇게 하지 않고 서버의 Main()에 생성하여 사용해도 문제는 없다.
#include "pch.h"
#include "GameSessionManager.h"
#include "GameSession.h"
GameSessionManager GSessionManager;
void GameSessionManager::Add(GameSessionRef session)
{
WRITE_LOCK;
_sessions.insert(session);
}
void GameSessionManager::Remove(GameSessionRef session)
{
WRITE_LOCK;
_sessions.erase(session);
}
void GameSessionManager::Broadcast(SendBufferRef sendBuffer)
{
WRITE_LOCK;
for (GameSessionRef session : _sessions)
{
session->Send(sendBuffer);
}
}
Add와 Remove는 이름 대로 세션을 모아두는 Set에 넣고 빼고에 관한 동작만을 수행한다.
Broadcast()는 모든 세션에 대해서 해당 버퍼를 보내는 동작을 수행한다.
GameSession도 이제 GameServer 코드에서 분리해 사용한다.
#pragma once
#include "Session.h"
class GameSession : public Session
{
public:
~GameSession()
{
cout << "~GameSession" << endl;
}
virtual void OnConnected() override;
virtual void OnDisconnected() override;
virtual int32 OnRecv(BYTE* buffer, int32 len) override;
virtual void OnSend(int32 len) override;
};
여기까지 내용은 동일하다.
#include "pch.h"
#include "GameSession.h"
#include "GameSessionManager.h"
void GameSession::OnConnected()
{
GSessionManager.Add(static_pointer_cast<GameSession>(shared_from_this()));
}
void GameSession::OnDisconnected()
{
GSessionManager.Remove(static_pointer_cast<GameSession>(shared_from_this()));
}
int32 GameSession::OnRecv(BYTE* buffer, int32 len)
{
// Echo
cout << "OnRecv Len = " << len << endl;
SendBufferRef sendBuffer = MakeShared<SendBuffer>(4096);
sendBuffer->CopyData(buffer, len);
for (int i = 0; i < 5; i++)
GSessionManager.Broadcast(sendBuffer);
return len;
}
void GameSession::OnSend(int32 len)
{
cout << "OnSend Len = " << len << endl;
}
세션이 연결되거나 끊기거나 할 때 세션 매니저에 대한 처리를 한다.
데이터를 받았을 때, 버퍼에 인자로 받은 데이터를 넣고 Broadcast 하게 했다.
5번 반복하게 했으니 SendBuffer엔 5개의 버퍼가 쌓이고 WSASend()에서 알아서 한방에 보내줄 것이다.
실제로 Broadcasting이 잘 이루어지는지 확인해 보기 전에 클라이언트 코드도 수정해 줘야 한다.
4. 클라이언트 수정
사실 위에 GameSession에서 했던 것과 크게 다른 것이 없다.
char sendData[] = "Hello World";
void OnConnected() override
{
cout << "Connected To Server" << endl;
SendBufferRef sendBuffer = MakeShared<SendBuffer>(4096);
sendBuffer->CopyData(sendData, sizeof(sendData));
Send(sendBuffer);
}
int32 OnRecv(BYTE* buffer, int32 len) override
{
cout << "OnRecv Len = " << len << endl;
this_thread::sleep_for(1s);
SendBufferRef sendBuffer = MakeShared<SendBuffer>(4096);
sendBuffer->CopyData(sendData, sizeof(sendData));
Send(sendBuffer);
return len;
}
int main()
{
this_thread::sleep_for(1s);
ClientServiceRef service = MakeShared<ClientService>(
NetAddress(L"127.0.0.1", 7777),
MakeShared<IocpCore>(),
MakeShared<ServerSession>, // TODO : SessionManager 등
5); // 5를 넣었으므로 5개의 클라이언트가 있는 상황이 된다
// ...
}
똑같이 데이터를 받아서 버퍼에 카피하고 보낸다.
차이점이 있다면, 클라이언트 측에선 연결이 완료됐을 때 바로 Send를 걸어준다는 점이다.
그리고 Main에서 서비스를 만들 때 마지막 인자로 5를 넣어서 5개의 클라이언트를 시뮬레이션한다.
이제 진짜로 결과를 보자.
5. 실행
줄 바꿈이 제대로 안 돼서 정신사나워 보이지만,
서버 측 OnSend Len을 보면 제대로 묶어서 보내고 있다는 것을 확인할 수 있다.
메모리 곡선이 우상향 곡선만 그리는 것이 아니라 안정되어 있기 때문에,
버퍼도 정상적으로 소멸하고 있음을 알 수 있다.
쉽지 않네 이거.
'Study > C++ & C#' 카테고리의 다른 글
[C++] Packet Session (0) | 2023.07.04 |
---|---|
[C++] SendBuffer Pooling (0) | 2023.06.30 |
[C++] RecvBuffer (0) | 2023.06.28 |
[C++] Session (0) | 2023.06.24 |
[C++] Service (0) | 2023.06.23 |