지금까지의 코드에선 하나의 스레드에 일감이 몰리는 상황이 발생할 수 있었다.
목표는 이 일감을 최대한 분배해 한 스레드만 괴롭히는 현상을 해소하는 것이다.
여기선 JobQueue
를 보관할 GlobalQueue
를 만들어 이를 뿌리는 식으로 만들어 본다.
1. 클래스 작성
기본적인 기능만 수행할 것이기 때문에 복잡한 일은 없다.
// GlobalQueue.cpp
#pragma once
class GlobalQueue
{
public:
GlobalQueue();
~GlobalQueue();
void Push(JobQueueRef jobQueue);
JobQueueRef Pop();
private:
LockQueue<JobQueueRef> _jobQueues;
};
// GlobalQueue.h
#include "pch.h"
#include "GlobalQueue.h"
void GlobalQueue::Push(JobQueueRef jobQueue)
{
_jobQueues.Push(jobQueue);
}
JobQueueRef GlobalQueue::Pop()
{
return _jobQueues.Pop();
}
JobQueueRef
를 만들러 가는 김에 매크로를 활용해 기존 코드를 수정했다.
#define USING_SHARED_PTR(name) using name##Ref = std::shared_ptr<class name>;
USING_SHARED_PTR(IocpCore);
USING_SHARED_PTR(IocpObject);
USING_SHARED_PTR(Session);
USING_SHARED_PTR(PacketSession);
USING_SHARED_PTR(Listener);
USING_SHARED_PTR(ServerService);
USING_SHARED_PTR(ClientService);
USING_SHARED_PTR(SendBuffer);
USING_SHARED_PTR(SendBufferChunk);
USING_SHARED_PTR(Job);
USING_SHARED_PTR(JobQueue);
##
을 활용해 Ref 앞에 이름을 붙이도록 했다.
다시 돌아와서, 중요한 것은
JobQueue 자체를 갖고 있다가 필요할 때마다 넣고 빼면서 여러 스레드가 일을 처리할 수 있게 하는 개념이다.
전역 사용을 위해 몇가지를 추가한다.
// CoreGlobal.h
#pragma once
extern class ThreadManager* GThreadManager;
extern class Memory* GMemory;
extern class SendBufferManager* GSendBufferManager;
extern class GlobalQueue* GGlobalQueue;
// ...
// CoreGlobal.cpp
#include "pch.h"
// ...
#include "GlobalQueue.h"
ThreadManager* GThreadManager = nullptr;
Memory* GMemory = nullptr;
SendBufferManager* GSendBufferManager = nullptr;
GlobalQueue* GGlobalQueue = nullptr;
DeadLockProfiler* GDeadLockProfiler = nullptr;
class CoreGlobal
{
public:
CoreGlobal()
{
GThreadManager = new ThreadManager();
GMemory = new Memory();
GSendBufferManager = new SendBufferManager();
GGlobalQueue = new GlobalQueue();
GDeadLockProfiler = new DeadLockProfiler();
SocketUtils::Init();
}
~CoreGlobal()
{
delete GThreadManager;
delete GMemory;
delete GSendBufferManager;
delete GGlobalQueue;
delete GDeadLockProfiler;
SocketUtils::Clear();
}
} GCoreGlobal;
그리고 TLS 영역에 현재 스레드가 JobQueue
를 물고 있는지 확인하기 위한 포인터 변수를 추가한다.
단순히 큐를 물고 있는지 확인만 하는 용도이기 때문에 스마트 포인터가 아닌 일반 포인터로 주소만 갖고 있게 한다.
// CoreTLS.h
#pragma once
#include <stack>
extern thread_local uint32 LThreadId;
extern thread_local uint64 LEndTickCount;
extern thread_local std::stack<int32> LLockStack;
extern thread_local SendBufferChunkRef LSendBufferChunk;
extern thread_local class JobQueue* LCurrentJobQueue;
// CoreTLS.cpp
#include "pch.h"
#include "CoreTLS.h"
thread_local uint32 LThreadId = 0;
thread_local uint64 LEndTickCount = 0;
thread_local std::stack<int32> LLockStack;
thread_local SendBufferChunkRef LSendBufferChunk;
thread_local JobQueue* LCurrentJobQueue = nullptr;
2. 수정 사항 반영
이제 새로 만든 클래스를 활용하기 위해 기존 코드를 수정해 보자.
JobQueue
의 Push()
와 Execute()
를 먼저 고칠 것이다.
void JobQueue::Push(JobRef&& job)
{
const int32 prevCount = _jobCount.fetch_add(1);
_jobs.Push(job); // WRITE_LOCK
// 첫번째 Job을 넣은 쓰레드가 실행까지 담당
if (prevCount == 0)
{
// 이미 실행중인 JobQueue가 없으면 실행
if (LCurrentJobQueue == nullptr)
{
Execute();
}
else
{
// 여유 있는 다른 쓰레드가 실행하도록 GlobalQueue에 넘긴다
GGlobalQueue->Push(shared_from_this());
}
}
}
실행중인 큐가 없다면 실행하고, 있다면 큐를 글로벌 큐에 통으로 넘겨준다.
누군가는 저걸 잡아서 실행하게 될 것이다.
이제 넣어둔 큐를 어떻게 꺼내서 처리할 것인지에 대한 고민이 생긴다.
일단 다음과 같은 방법을 생각해 볼 수 있다.
ThreadManager
클래스에 아래와 같은 함수를 추가한다.
// ...
#include "GlobalQueue.h"
void ThreadManager::DoGlobalQueueWork()
{
while (true)
{
uint64 now = ::GetTickCount64();
if (now > LEndTickCount)
break;
JobQueueRef jobQueue = GGlobalQueue->Pop();
if (jobQueue == nullptr)
break;
jobQueue->Execute();
}
}
특정 조건을 만족하지 않는 한 계속 돌며 일을 처리하는 함수이다.
제한 틱이 되지 않았다면 큐를 꺼내 처리하는 것을 계속 반복한다.
이 함수를 호출할 누군가도 필요하게 되는데, 서버로 가서 생각해 보자.
서버는 계속 코어를 확인하며 일감을 기다린다.
Dispatch()
를 통해 일감을 잡으면 대체로 패킷 핸들러 등의 게임 로직까지 실행되게 되는 구조다.
다시 말하자면 네트워크 처리를 함과 동시에 그 스레드가 게임 로직까지 담당하게 되므로,
실제 라이브 서버에선 문제가 될 소지가 있다는 것이다.
따라서 서버에 아래와 같은 함수를 추가한다.
enum
{
WORKER_TICK = 64
};
void DoWorkerJob(ServerServiceRef& service)
{
while (true)
{
LEndTickCount = ::GetTickCount64() + WORKER_TICK;
// 네트워크 입출력 처리 -> 인게임 로직까지 (패킷 핸들러에 의해)
service->GetIocpCore()->Dispatch(10);
// 글로벌 큐
ThreadManager::DoGlobalQueueWork();
}
}
겸사겸사 틱도 설정해 준다.
그리고 Dispatch()
에 10ms
의 타임아웃을 주기로 했다.
타임아웃을 주지 않은 상태에서 일감이 없다면, 빠져나오는 것이 아닌 아무 일도 하지 않는 상태가 된다.
따라서 10ms 동안 일이 없다면 빠져나오게 만든다.
빠져나왔다면 DoGlobalQueueWork()
를 호출하게 한다.
이는 네트워크는 물론 다른 일도 처리할 수 있는 만능 일꾼이 하나 있는 것과 같다.
네트워크 따로 내부 잡 따로 처리하는 방법도 가능하긴 하지만 여기선 만능 일꾼을 활용해 보자.
여하튼 중요한 것은 스레드를 어떻게 활용하느냐이다.
Dispatch()
를 통해 모든 것을 처리하게 할 수도 있고, 위처럼 용도를 구분 지을 수도 있는 것이다.
그렇다면 이제 서버의 구동 시 호출할 부분은 아래와 같이 Dispatch()
가 아니라 DoWorkerJob()
이 된다.
for (int32 i = 0; i < 5; i++)
{
GThreadManager->Launch([&service]()
{
DoWorkerJob(service);
});
}
// Main Thread
DoWorkerJob(service);
메인 스레드도 지금은 같은 걸 돌리게 하자.
일단 이렇게 하나의 스레드만 괴롭히는 현상은 해결했으니
일감이 너무 몰렸을 때를 대비해야 한다.
아래 Execute()
함수를 보자.
틱을 사용해 어느정도 해결할 수 있다.
void JobQueue::Execute()
{
LCurrentJobQueue = this;
while (true)
{
Vector<JobRef> jobs;
_jobs.PopAll(OUT jobs);
const int32 jobCount = static_cast<int32>(jobs.size());
for (int32 i = 0; i < jobCount; i++)
jobs[i]->Execute();
// 남은 일감이 0개라면 종료
if (_jobCount.fetch_sub(jobCount) == jobCount)
{
LCurrentJobQueue = nullptr;
return;
}
const uint64 now = ::GetTickCount64();
if (now >= LEndTickCount)
{
LCurrentJobQueue = nullptr;
// 여유 있는 다른 쓰레드가 실행하도록 GlobalQueue에 넘긴다
GGlobalQueue->Push(shared_from_this());
break;
}
}
}
Execute()
를 호출하면 즉시 현재 큐의 포인터를 설정해 준다.
그리고 틱을 설정해서 일정 틱 이상이 되도록 일이 끝나지 않았다면 다시 GlobalQueue
에 넣어준다.
- 호출
- 모든 일을 처리했다면 종료
- 만약 끝나지 않았는데 틱이 한계에 다다랐다면,
- 포인터를 밀어주고 남은 일감들을 다시 글로벌 큐에 넣고 종료한다.
위와 같은 흐름으로 너무 일감이 많을 때 다른 쪽으로 넘겨 여유를 확보할 수 있게 됐다.
3. 실행 결과
역시 눈으로 변화점을 알 수는 없다.
4. 생각해 볼 점
JobQueue
를 처리하는 데 있어서 특징정인 부분은 여러 개의 스레드가 동시에 처리하는 것이 아닌,
하나의 스레드가 잡아서 처리하다가 다시 다른 단일 스레드에 넘기는 식의 디자인이었다.
멀티 스레드로 처리하면 더 효율적인 처리가 가능한 것이 아닌가 라는 의문이 드는데,
역시 큐라는 자료구조를 사용하는 이유를 생각해 보면 답이 어느 정도 유추되는 것 같다.
큐는 순서가 중요한 자료구조이다. 들어온 순서대로 처리가 되어야 한다.
잡도 그럴 수 있다. 모든 잡이 독립적으로 움직인다고 생각할 수는 없다.
먼저 들어간 잡의 결과를 통해 뒤에 있는 잡이 활용해야 하는 경우의 수도 생길 수 있다.
무분별한 멀티 스레딩은 큰 문제를 불러올 수 있지 않을까.
'Study > C++ & C#' 카테고리의 다른 글
[C++ / DB] DBConnection (0) | 2023.08.29 |
---|---|
[C++] JobTimer (0) | 2023.08.27 |
[C++] JobQueue (2 / 3) (0) | 2023.08.19 |
[C++] JobQueue (1 / 3) (0) | 2023.08.16 |
[C++/C#] C# 채팅 클라이언트 간보기 (0) | 2023.08.01 |