Study/C++ & C#

[C++] JobQueue (3 / 3)

BVM 2023. 8. 27. 03:36

지금까지의 코드에선 하나의 스레드에 일감이 몰리는 상황이 발생할 수 있었다.

목표는 이 일감을 최대한 분배해 한 스레드만 괴롭히는 현상을 해소하는 것이다.

여기선 JobQueue를 보관할 GlobalQueue를 만들어 이를 뿌리는 식으로 만들어 본다.

 

1. 클래스 작성

기본적인 기능만 수행할 것이기 때문에 복잡한 일은 없다.

// GlobalQueue.cpp
#pragma once

class GlobalQueue
{
public:
	GlobalQueue();
	~GlobalQueue();

	void					Push(JobQueueRef jobQueue);
	JobQueueRef				Pop();

private:
	LockQueue<JobQueueRef> _jobQueues;
};



// GlobalQueue.h
#include "pch.h"
#include "GlobalQueue.h"

void GlobalQueue::Push(JobQueueRef jobQueue)
{
	_jobQueues.Push(jobQueue);
}

JobQueueRef GlobalQueue::Pop()
{
	return _jobQueues.Pop();
}

 

JobQueueRef를 만들러 가는 김에 매크로를 활용해 기존 코드를 수정했다.

#define USING_SHARED_PTR(name)	using name##Ref = std::shared_ptr<class name>;

USING_SHARED_PTR(IocpCore);
USING_SHARED_PTR(IocpObject);
USING_SHARED_PTR(Session);
USING_SHARED_PTR(PacketSession);
USING_SHARED_PTR(Listener);
USING_SHARED_PTR(ServerService);
USING_SHARED_PTR(ClientService);
USING_SHARED_PTR(SendBuffer);
USING_SHARED_PTR(SendBufferChunk);
USING_SHARED_PTR(Job);
USING_SHARED_PTR(JobQueue);

##을 활용해 Ref 앞에 이름을 붙이도록 했다.

 

다시 돌아와서, 중요한 것은

JobQueue 자체를 갖고 있다가 필요할 때마다 넣고 빼면서 여러 스레드가 일을 처리할 수 있게 하는 개념이다.

 

전역 사용을 위해 몇가지를 추가한다.

// CoreGlobal.h

#pragma once

extern class ThreadManager*		GThreadManager;
extern class Memory*			GMemory;
extern class SendBufferManager* 	GSendBufferManager;
extern class GlobalQueue*		GGlobalQueue;

// ...


// CoreGlobal.cpp
#include "pch.h"
// ...
#include "GlobalQueue.h"

ThreadManager*		GThreadManager = nullptr;
Memory*			GMemory = nullptr;
SendBufferManager*	GSendBufferManager = nullptr;
GlobalQueue*		GGlobalQueue = nullptr;

DeadLockProfiler*	GDeadLockProfiler = nullptr;

class CoreGlobal
{
public:
	CoreGlobal()
	{
		GThreadManager = new ThreadManager();
		GMemory = new Memory();
		GSendBufferManager = new SendBufferManager();
		GGlobalQueue = new GlobalQueue();
		GDeadLockProfiler = new DeadLockProfiler();
		SocketUtils::Init();
	}

	~CoreGlobal()
	{
		delete GThreadManager;
		delete GMemory;
		delete GSendBufferManager;
		delete GGlobalQueue;
		delete GDeadLockProfiler;
		SocketUtils::Clear();
	}
} GCoreGlobal;

 

그리고 TLS 영역에 현재 스레드가 JobQueue를 물고 있는지 확인하기 위한 포인터 변수를 추가한다.

단순히 큐를 물고 있는지 확인만 하는 용도이기 때문에 스마트 포인터가 아닌 일반 포인터로 주소만 갖고 있게 한다.

// CoreTLS.h
#pragma once
#include <stack>

extern thread_local uint32			LThreadId;
extern thread_local uint64			LEndTickCount;

extern thread_local std::stack<int32>		LLockStack;
extern thread_local SendBufferChunkRef		LSendBufferChunk;
extern thread_local class JobQueue*		LCurrentJobQueue;


// CoreTLS.cpp
#include "pch.h"
#include "CoreTLS.h"

thread_local uint32				LThreadId = 0;
thread_local uint64				LEndTickCount = 0;
thread_local std::stack<int32>			LLockStack;
thread_local SendBufferChunkRef			LSendBufferChunk;
thread_local JobQueue*				LCurrentJobQueue = nullptr;

 

 

2. 수정 사항 반영

이제 새로 만든 클래스를 활용하기 위해 기존 코드를 수정해 보자.

JobQueuePush()Execute()를 먼저 고칠 것이다.

 

void JobQueue::Push(JobRef&& job)
{
	const int32 prevCount = _jobCount.fetch_add(1);
	_jobs.Push(job); // WRITE_LOCK

	// 첫번째 Job을 넣은 쓰레드가 실행까지 담당
	if (prevCount == 0)
	{
		// 이미 실행중인 JobQueue가 없으면 실행
		if (LCurrentJobQueue == nullptr)
		{
			Execute();
		}
		else
		{
			// 여유 있는 다른 쓰레드가 실행하도록 GlobalQueue에 넘긴다
			GGlobalQueue->Push(shared_from_this());
		}
	}
}

실행중인 큐가 없다면 실행하고, 있다면 큐를 글로벌 큐에 통으로 넘겨준다.

누군가는 저걸 잡아서 실행하게 될 것이다.

이제 넣어둔 큐를 어떻게 꺼내서 처리할 것인지에 대한 고민이 생긴다.

일단 다음과 같은 방법을 생각해 볼 수 있다.

 

ThreadManager 클래스에 아래와 같은 함수를 추가한다.

// ...
#include "GlobalQueue.h"

void ThreadManager::DoGlobalQueueWork()
{
	while (true)
	{
		uint64 now = ::GetTickCount64();
		if (now > LEndTickCount)
			break;

		JobQueueRef jobQueue = GGlobalQueue->Pop();
		if (jobQueue == nullptr)
			break;

		jobQueue->Execute();
	}
}

특정 조건을 만족하지 않는 한 계속 돌며 일을 처리하는 함수이다.

제한 틱이 되지 않았다면 큐를 꺼내 처리하는 것을 계속 반복한다.

이 함수를 호출할 누군가도 필요하게 되는데, 서버로 가서 생각해 보자.

 

서버는 계속 코어를 확인하며 일감을 기다린다.

Dispatch()를 통해 일감을 잡으면 대체로 패킷 핸들러 등의 게임 로직까지 실행되게 되는 구조다.

다시 말하자면 네트워크 처리를 함과 동시에 그 스레드가 게임 로직까지 담당하게 되므로,

실제 라이브 서버에선 문제가 될 소지가 있다는 것이다.

 

따라서 서버에 아래와 같은 함수를 추가한다.

enum
{
	WORKER_TICK = 64
};

void DoWorkerJob(ServerServiceRef& service)
{
	while (true)
	{
		LEndTickCount = ::GetTickCount64() + WORKER_TICK;

		// 네트워크 입출력 처리 -> 인게임 로직까지 (패킷 핸들러에 의해)
		service->GetIocpCore()->Dispatch(10);

		// 글로벌 큐
		ThreadManager::DoGlobalQueueWork();
	}
}

겸사겸사 틱도 설정해 준다.

그리고 Dispatch()10ms의 타임아웃을 주기로 했다.

타임아웃을 주지 않은 상태에서 일감이 없다면, 빠져나오는 것이 아닌 아무 일도 하지 않는 상태가 된다.

따라서 10ms 동안 일이 없다면 빠져나오게 만든다.

빠져나왔다면 DoGlobalQueueWork()를 호출하게 한다.

 

이는 네트워크는 물론 다른 일도 처리할 수 있는 만능 일꾼이 하나 있는 것과 같다.

네트워크 따로 내부 잡 따로 처리하는 방법도 가능하긴 하지만 여기선 만능 일꾼을 활용해 보자.

 

여하튼 중요한 것은 스레드를 어떻게 활용하느냐이다.

Dispatch()를 통해 모든 것을 처리하게 할 수도 있고, 위처럼 용도를 구분 지을 수도 있는 것이다.

 

그렇다면 이제 서버의 구동 시 호출할 부분은 아래와 같이 Dispatch()가 아니라 DoWorkerJob()이 된다.

for (int32 i = 0; i < 5; i++)
{
	GThreadManager->Launch([&service]()
		{
			DoWorkerJob(service);
		});
}

// Main Thread
DoWorkerJob(service);

메인 스레드도 지금은 같은 걸 돌리게 하자.

 

 

일단 이렇게 하나의 스레드만 괴롭히는 현상은 해결했으니

일감이 너무 몰렸을 때를 대비해야 한다.

 

아래 Execute() 함수를 보자.

틱을 사용해 어느정도 해결할 수 있다.

void JobQueue::Execute()
{
	LCurrentJobQueue = this;

	while (true)
	{
		Vector<JobRef> jobs;
		_jobs.PopAll(OUT jobs);

		const int32 jobCount = static_cast<int32>(jobs.size());
		for (int32 i = 0; i < jobCount; i++)
			jobs[i]->Execute();

		// 남은 일감이 0개라면 종료
		if (_jobCount.fetch_sub(jobCount) == jobCount)
		{
			LCurrentJobQueue = nullptr;
			return;
		}

		const uint64 now = ::GetTickCount64();
		if (now >= LEndTickCount)
		{
			LCurrentJobQueue = nullptr;
			// 여유 있는 다른 쓰레드가 실행하도록 GlobalQueue에 넘긴다
			GGlobalQueue->Push(shared_from_this());
			break;
		}			
	}
}

Execute()를 호출하면 즉시 현재 큐의 포인터를 설정해 준다.

그리고 틱을 설정해서 일정 틱 이상이 되도록 일이 끝나지 않았다면 다시 GlobalQueue에 넣어준다.

 

  1. 호출
  2. 모든 일을 처리했다면 종료
  3. 만약 끝나지 않았는데 틱이 한계에 다다랐다면,
  4. 포인터를 밀어주고 남은 일감들을 다시 글로벌 큐에 넣고 종료한다.

위와 같은 흐름으로 너무 일감이 많을 때 다른 쪽으로 넘겨 여유를 확보할 수 있게 됐다.

 

3. 실행 결과

 

역시 눈으로 변화점을 알 수는 없다.

 

 

4. 생각해 볼 점

JobQueue를 처리하는 데 있어서 특징정인 부분은 여러 개의 스레드가 동시에 처리하는 것이 아닌,

하나의 스레드가 잡아서 처리하다가 다시 다른 단일 스레드에 넘기는 식의 디자인이었다.

멀티 스레드로 처리하면 더 효율적인 처리가 가능한 것이 아닌가 라는 의문이 드는데,

역시 큐라는 자료구조를 사용하는 이유를 생각해 보면 답이 어느 정도 유추되는 것 같다.

 

큐는 순서가 중요한 자료구조이다. 들어온 순서대로 처리가 되어야 한다.

잡도 그럴 수 있다. 모든 잡이 독립적으로 움직인다고 생각할 수는 없다.

먼저 들어간 잡의 결과를 통해 뒤에 있는 잡이 활용해야 하는 경우의 수도 생길 수 있다.

무분별한 멀티 스레딩은 큰 문제를 불러올 수 있지 않을까.