이거에 대해 얘기하기 전에 스마트 포인터 사이클 이슈를 해결해 보자.
1. 사이클 문제
아래와 같이 클라이언트가 종료돼도 GameSession
의 소멸자가 호출되지 않는 메모리 릭 현상이 발생한다.
지금은 문제라고 부르기도 뭐한 수준이지만, 만약 이런 현상이 계속 라이브에서 발생한다고 생각해 보자.
예상치 못한 메모리 릭으로 인해 서버가 제 기능을 하지 못하게 되는 현상이 생기지 않을까?
사실 우리는 이미 사이클 문제를 인지하고 있었다.
#pragma once
class Player
{
public:
uint64 playerId = 0;
string name;
Protocol::PlayerType type = Protocol::PLAYER_TYPE_NONE;
GameSessionRef ownerSession; // Cycle
};
이렇게 주석도 달아놨고....
void GameSession::OnDisconnected()
{
GSessionManager.Remove(static_pointer_cast<GameSession>(shared_from_this()));
}
// ...
#pragma once
#include "JobQueue.h"
class Room : public JobQueue
{
public:
void Enter(PlayerRef player);
void Leave(PlayerRef player);
void Broadcast(SendBufferRef sendBuffer);
private:
map<uint64, PlayerRef> _players;
};
extern shared_ptr<Room> GRoom;
위와 같이 PlayerRef
를 사용하는 곳은 많은데 따로 이를 해제하는 부분은 만들어 주지 않아서 사이클 문제가 생겼다.
따라서 GameSession
에서 해당 정보들을 들고 있다가 해제해 주는 부분을 만들어 줘야 한다.
public:
Vector<PlayerRef> _players;
PlayerRef _currentPlayer;
weak_ptr<class Room> _room;
이런 느낌으로 현재 어떤 플레이어로 접속했는지, 어떤 Room
에 들어가 있는지 알고 있게 한다.
위크 포인터 말고 쉐어드 포인터를 사용해도 되지만, 그에 맞는 처리를 해 주어야 한다.
그렇다면 패킷 핸들러의 해당 부분을 아래와 같이 수정할 수 있다.
// 얘를
PlayerRef player = gameSession->_players[index];
// 이렇게
gameSession->_currentPlayer = gameSession->_players[index];
gameSession->_room = GRoom;
그리고 GameSession
의 OnDisconnected()
에 아래와 같이 처리 부분을 추가한다.
#include "Room.h"
// ...
void GameSession::OnDisconnected()
{
GSessionManager.Remove(static_pointer_cast<GameSession>(shared_from_this()));
if (_currentPlayer)
{
if (auto room = _room.lock())
room->DoAsync(&Room::Leave, _currentPlayer);
}
_currentPlayer = nullptr;
_players.clear();
}
- 현재
player
가null
인지 판단한다. null
이 아니라면_room
에 락을 걸어 쉐어드 포인터로 만든다.
- 만약 문제가 생긴다면room
은null
이 될 것이다.- 현재 플레이어를 룸에서 내보낸다.
_currentplayer
를nullptr
로 민다._players
를 초기화한다.
위와 같은 흐름으로 사이클을 방지할 수 있다.
다시 빌드해서 중단점을 걸어보면,
이제 정상적으로 소멸자가 호출되어 사이클 문제가 해결됐음을 알 수 있다.
2. 다시 Job으로
MMORPG에는 스킬에 쿨타임이 있다.
쿨타임 체크는 클라이언트뿐만 아니라 서버에서도 이루어져야 한다.,
아래와 같은 코드를 예로 생각해 보자.
int64 coolTime = 2500;
int64 end = GetTickCount64() + coolTime;
while(true)
{
if (GetTickCount64() >= end)
break;
}
무한하게 루프를 돌면서 쿨타임이 다 됐는지 체크하는 것은 간단한 솔루션이지만,
결코 효율적이라고는 할 수 없다. 수십 ~ 수백만 개의 오브젝트가 각자의 쿨타임을 가질 수 있는데,
저만큼 계속 의미 없는 루프가 돈다면 매우 끔찍한 일이 될지도 모른다.
따라서 무작정 루프를 돌리는 것이 아닌 언제 끝내달라고 예약을 거는 식의 시스템이 필요하다.
3. 클래스 작성
#pragma once
struct JobData
{
JobData(weak_ptr<JobQueue> owner, JobRef job) : owner(owner), job(job)
{
}
weak_ptr<JobQueue> owner;
JobRef job;
};
struct TimerItem
{
bool operator<(const TimerItem& other) const
{
return executeTick > other.executeTick;
}
uint64 executeTick = 0;
JobData* jobData = nullptr;
};
class JobTimer
{
public:
void Reserve(uint64 tickAfter, weak_ptr<JobQueue> owner, JobRef job);
void Distribute(uint64 now);
void Clear();
private:
USE_LOCK;
PriorityQueue<TimerItem> _items;
Atomic<bool> _distributing = false;
};
이 클래스는 전역으로 사용될 예정이다.
- JobData
-Job
에 관한 정보를 갖고 있는 구조체
- 이Job
이 들어가 있는JobQueue
를owner
로
- 해당Job
의 쉐어드 포인터를job
으로 - TimerItem
- 틱 비교를 위한 오퍼레이터 오버로딩
- 끝나는 틱을executeTick
에 저장
- JobData의 포인터를 갖고 있음
- 생포인터를 갖고 있는 이유는 우선순위 큐 안에서 순서가 바뀌는 것에 대한 대비도 있지만,
- 계속 복사가 이루어지면 스마트 포인터의 카운트를 건드리게 돼 문제가 생길 수 있기 때문.
- 우선순위 큐는 기본이less
이기 때문에 오퍼레이터의 틱 비교를>
로 한 것이다.
- 반대라면 틱이 큰 순서대로 나오게 될 것이다. - JobTimer
- 예약을 걸어줄Reserver()
함수
- JobQueue에 예약된 잡을 뿌려줄Distribute()
함수
- 많이 쓰이진 않겠지만 큐를 초기화할Clear()
함수
- 우선순위 큐를 이용해 틱 시간에 따라 아이템들을 정렬할_items
- 분배하고 있는지 확인하기 위한Atomic bool
변수_distribute
함수들의 구현은 아래와 같다.
#include "pch.h"
#include "JobTimer.h"
#include "JobQueue.h"
void JobTimer::Reserve(uint64 tickAfter, weak_ptr<JobQueue> owner, JobRef job)
{
const uint64 executeTick = ::GetTickCount64() + tickAfter;
JobData* jobData = ObjectPool<JobData>::Pop(owner, job);
WRITE_LOCK;
_items.push(TimerItem{ executeTick, jobData });
}
void JobTimer::Distribute(uint64 now)
{
// 한 번에 하나의 스레드만 통과
// 다중 스레드 처리를 하게 되면 매우 낮은 확률로 문제가 생길 수 있는데,
// 각 잡이 다른 스레드에 분배되어 뒤에 넣은 잡이 먼저 처리되는 경우가 생길 수 있다.
// 따라서 한개 스레드만 작업하게끔 하여 위 문제를 방지한다.
if (_distributing.exchange(true) == true)
return;
// 락을 덜 잡기 위해 임시 벡터에 시간 다 된 잡들만 저장
Vector<TimerItem> items;
{
WRITE_LOCK;
while (_items.empty() == false)
{
const TimerItem& timerItem = _items.top();
if (now < timerItem.executeTick)
break;
items.push_back(timerItem);
_items.pop();
}
}
for (TimerItem& item : items)
{
if (JobQueueRef owner = item.jobData->owner.lock())
owner->Push(item.jobData->job);
// 소멸시키기 위함
ObjectPool<JobData>::Push(item.jobData);
}
// 끝났으면 풀어준다
_distributing.store(false);
}
void JobTimer::Clear()
{
WRITE_LOCK;
while (_items.empty() == false)
{
const TimerItem& timerItem = _items.top();
ObjectPool<JobData>::Push(timerItem.jobData);
_items.pop();
}
}
A. Reserve()
이 함수는 실제로 예약을 거는 함수다.
전체적으로 락이 걸리는 것이 아니라 push 하는
부분에만 락이 걸린다.
윗동네는 별로 중요하지 않고, push 하는
부분이 중요하기 때문에 락을 건다.
B. Distribute()
중요한 것은 한 번에 하나의 스레드만 받아들이게 하는 것이다.
만약 1, 2, 3의 잡이 endTick
이 작은 순서대로 들어갔다고 가정하자.
여기서 다중 스레드 처리를 하기 위해 잡을 나눠서 처리하는데,
틱 시간이 더 긴 쪽이 오히려 더 빨리 끝날 수 있는 약간의 가능성이 존재한다.
만약 처리 순서가 중요했다면 큰 문제가 생길 수 있는 부분이다.
이런 연유로 인해 여기선 단일 스레드가 일을 하게 했다.
그리고 임시 벡터를 만들어 큐를 순회하며 시간이 된 잡들을 끌어와 저장한다.
마지막으로 임시 벡터에 있는 데이터들을 JobQueue
에 Push 하고
마친다.
C. Clear()
단순히 큐를 밀기만 한다.
거의 사용되지 않을 듯.
4. 수정 사항
JobQueue
의 Push()
가 인자로 pushOnly
라는 bool
변수를 받게 하자.
만약 이 변수가 true
면 오직 분배만을 위한 Job으로 간주해 큐에 넣기만 하게 된다.
public:
void Push(JobRef job, bool pushOnly = false);
void JobQueue::Push(JobRef job, bool pushOnly)
{
const int32 prevCount = _jobCount.fetch_add(1);
_jobs.Push(job); // WRITE_LOCK
// 첫번째 Job을 넣은 쓰레드가 실행까지 담당
if (prevCount == 0)
{
// 이미 실행중인 JobQueue가 없으면 실행
// 배분하려는 의도라면 처음 넣은 것이라도 큐에 저장만 함
if (LCurrentJobQueue == nullptr && pushOnly == false)
{
Execute();
}
else
{
// ...
}
}
}
DoAsync
는 지연 시간을 받을 수 없으니 시간을 받기 위한 다른 DoTimer
를 추가한다.
#include "JobTimer.h"
// ...
void DoTimer(uint64 tickAfter, CallbackType&& callback)
{
JobRef job = ObjectPool<Job>::MakeShared(std::move(callback));
GJobTimer->Reserve(tickAfter, shared_from_this(), job);
}
template<typename T, typename Ret, typename... Args>
void DoTimer(uint64 tickAfter, Ret(T::* memFunc)(Args...), Args... args)
{
shared_ptr<T> owner = static_pointer_cast<T>(shared_from_this());
JobRef job = ObjectPool<Job>::MakeShared(owner, memFunc, std::forward<Args>(args)...);
GJobTimer->Reserve(tickAfter, shared_from_this(), job);
}
tickAfter
를 받아 Reserve()
를 호출한다.
DoAsync
와 마찬가지로 2가지 버전을 준비한다.
그리고 분배를 위한 함수가 필요하다.
이 분배 작업 또한 한 스레드가 담당하는 것이 아니라 여러 스레드가 담당하도록 한다.
현재 정책은 모든 스레드가 만능 일꾼이 되는 것이기 때문에 각 스레드에 사용할 수 있는 분배용 함수를 작성한다.
ThreadManager
클래스와 아래와 같은 함수를 추가한다.
void ThreadManager::DistributeReservedJobs()
{
const uint64 now = ::GetTickCount64();
GJobTimer->Distribute(now);
}
현재의 틱을 받아서 Distribute()
에 넘겨주기만 하면 된다.
최종적으로 서버의 DoWorkerJob()
의 형태는 아래와 같다.
void DoWorkerJob(ServerServiceRef& service)
{
while (true)
{
LEndTickCount = ::GetTickCount64() + WORKER_TICK;
// 네트워크 입출력 처리 -> 인게임 로직까지 (패킷 핸들러에 의해)
service->GetIocpCore()->Dispatch(10);
// 예약된 일감 처리
ThreadManager::DistributeReservedJobs();
// 글로벌 큐
ThreadManager::DoGlobalQueueWork();
}
}
네트워크 처리 -> 예약된 일감 분배 -> 글로벌 큐 처리를 모든 스레드가 무한히 반복하게 될 것이다.
일감을 예약하는 부분은 아래와 같다.
GRoom->DoTimer(1000, [] { cout << "Hello 1000" << endl; });
GRoom->DoTimer(2000, [] { cout << "Hello 2000" << endl; });
GRoom->DoTimer(3000, [] { cout << "Hello 3000" << endl; });
1초마다 총 3번 문자열이 출력될 것이다.
5. 결과
순서대로 잘 나온다.
복습이 정말 많이 필요할 것 같은 부분이다.
'Study > C++ & C#' 카테고리의 다른 글
[C++ / DB] DBBind (0) | 2023.08.29 |
---|---|
[C++ / DB] DBConnection (0) | 2023.08.29 |
[C++] JobQueue (3 / 3) (0) | 2023.08.27 |
[C++] JobQueue (2 / 3) (0) | 2023.08.19 |
[C++] JobQueue (1 / 3) (0) | 2023.08.16 |