이전 시간에 Functor
를 활용해 봤으니, 이번엔 Lambda
를 활용해 볼 수도 있을 것이다.
1. Lambda
이전엔 함수에 인자를 넘겨주기 위해 tuple
을 사용했는데, 람다식에선 그럴 필요가 없다.
아래의 람다식 예시 코드를 보자.
void lmbd(int32 a, int32 b)
{
cout << "Hello Lambda " << a << " " << b << endl;
}
PlayerRef player = make_shared<Player>();
std::function<void()> func = [=]()
{
lmbd(1, 2);
GRoom->Enter(player);
};
func();
튜플을 사용하지 않고도 특정 함수에 인자를 넘겨줄 수 있다.
이것이 가능한 이유는 람다의 동작 방식에서 찾을 수 있다.
런타임에선 람다란 존재하지 않는다. 대신 람다식에 의해 생성된 Closures
라는 임시 객체가 존재한다.
이 클로저라는 임시 클래스가 캡처된 데이터들을 임시로 갖고 있게 되는 것이다.
아래와 같은 임시 클래스가 생기는 것과 유사하다.
class closure
{
public:
//...
private:
int32 a;
int32 b;
PlayerRef player;
};
썩 괜찮은 기능이다.
그렇다면 이전처럼 Job
을 힘들게 만들 필요 없이, std::function
을 Job
으로 인정할 수 있다면 더 편할 것 같다.
1-1. 캡처와 C++과의 궁합
캡처 옵션으로 외부 데이터를 복사 또는 참조할 수 있는 \=
및 &
를 사용할 수 있다.
캡처 옵션에 &player
를 넣는 것처럼, 일부 데이터만 복사 또는 참조할 수 있다.
하지만 이는 C++ 에선 문제가 생길 수도 있다.
궁합에 관한 얘기가 이것 때문에 나왔다.
단순히 참조만 하는 것이라면 위에 예시를 든 것처럼 임시 객체가 생성되는데,
스마트 포인터가 아니므로 카운트 관리가 되지 않는다.
만약 player
가 사라져 버린다면 엉뚱한 메모리를 참조하는 사고가 생길 것이다.
Job
은 만들고 바로 사용하기 위해 존재하지 않기 때문에 생명주기 관리가 필수적이다.
잡이 살아있을 때 까진 참조한 객체들이 살아있어야 한다.
어떤 클래스를 예로 들어보자.
class chara
{
public:
void Heal(int32 val)
{
_hp += val;
}
void Test(int32 val)
{
auto job = [=]()
{
Heal(_hp);
};
}
private:
int32 _hp = 100;
};
캡처 모드가 복사이므로 싸그리 복사된다.
하지만 이 코드는 복사가 되는게 아니라 this
포인터를 넘겨주는 것과 동일한 동작을 한다.
void Test(int32 val)
{
auto job = [this]()
{
Heal(this->_hp);
};
}
위처럼 동작하게 되는 것이기 때문에, 만약 chara
클래스가 소멸한다면 큰 문제가 생기게 되는 것이다.
만약 chara
클래스가 share pointer
로 사용된다고 했을 때, this
포인터를 사용한 상황이라면
어떤 카운팅도 이루어지지 않기 때문에 더 문제다.
이걸 shared pointer
를 사용하는 형식으로 바꾼다면 아래와 같다.
class chara : enable_shared_from_this<chara>
{
public:
void Heal(int32 val)
{
_hp += val;
}
void Test(int32 val)
{
auto job = [self = shared_from_this()]()
{
self->Heal(self->_hp);
};
}
private:
int32 _hp = 100;
};
위와 같이 사용한다면 카운팅을 통해 생명주기를 관리할 수 있다.
2. Lambda를 활용한 Job
이제 람다식을 활용한 Job을 구현해 보자.
#pragma once
#include <functional>
using CallbackType = std::function<void()>;
class Job
{
public:
// 바로 콜백을 넘겨줄 때 사용
// 스마트 포인터 관리가 번거로울 수 있음
Job(CallbackType&& callback) : _callback(std::move(callback))
{
}
template<typename T, typename Ret, typename... Args>
// 인자로 lvalue 및 rvalue를 다 받을 수 있게 하기 위해 보편참조 사용(Args&&...)
Job(shared_ptr<T> owner, Ret(T::* memFunc)(Args...), Args&&... args)
{
_callback = [owner, memFunc, args...]()
{
// shared pointer의 멤버 함수 호출을 수행하기 위해
// get()으로 내부 원시 포인터를 얻어와야 한다.
(owner.get()->*memFunc)(args...);
};
}
void Execute()
{
_callback();
}
private:
CallbackType _callback;
};
각 변수 및 함수는 아래와 같은 역할을 한다
- CallbackType
- 콜백 함수를 갖고 있다. 람다식으로 만든 콜백 함수를 갖게 된다.
- Job(CallbackType&& callback)
- 콜백 함수를 직접적으로 넘겨받는 생성자이다.
- Job(shared_ptr owner, Ret(T::* memFunc)(Args...), Args&&... args)
- 템플릿을 사용해 좀 더 일반적으로 사용할 수 있는 버전의 생성자.
- 보편참조를 통해 모든 값을 받을 수 있게 했다.
- Execute()
- 콜백 함수를 실제로 실행하는 멤버 함수.
중요한 점은 통으로 복사나 참조가 아닌, 필요한 변수만 캡처해와야 한다는 점이다.
그리고 이를 더 편하게 사용하기 위한 JobSerializer
라는 클래스를 만들 것이다.
기존의 Room
구현처럼 일일이 Push
와 Flush
를 구현하는 것은 효율적이지 못하다.
JobQueue
를 사용하는 모든 클래스는 앞으로 JobSerializer
를 상속받게 해 더 효율적인 처리가 가능하게 될 것이다.
#pragma once
#include "Job.h"
#include "JobQueue.h"
class JobSerializer : public enable_shared_from_this<JobSerializer>
{
public:
void PushJob(CallbackType&& callback)
{
auto job = ObjectPool<Job>::MakeShared(std::move(callback));
_jobQueue.Push(job);
}
template<typename T, typename Ret, typename... Args>
void PushJob(Ret(T::*memFunc)(Args...), Args... args)
{
shared_ptr<T> owner = static_pointer_cast<T>(shared_from_this());
auto job = ObjectPool<Job>::MakeShared(owner, memFunc, std::forward<Args>(args)...);
_jobQueue.Push(job);
}
virtual void FlushJob() abstract;
protected:
JobQueue _jobQueue;
};
Push
하는 부분을 Job
클래스를 생성해 넘겨주는 형태로 구현했다.
FlushJob
은 추상함수의 형태로 만들어 자식 클래스에서 구현하도록 한다.
JobQueue
를 사용하는 클래스인 Room
클래스가 위 클래스를 상속받도록 수정한다.
#pragma once
#include "JobSerializer.h"
class Room : public JobSerializer
{
public
void Enter(PlayerRef player);
void Leave(PlayerRef player);
void Broadcast(SendBufferRef sendBuffer);
public:
virtual void FlushJob() override;
private:
map<uint64, PlayerRef> _players;
};
extern shared_ptr<Room> GRoom;
FlushJob
만 수해주면 된다.
GRoom
도 전역으로 선언해 다른 곳에서 갖다 쓸 수 있도록 했다.
void Room::FlushJob()
{
while (true)
{
JobRef job = _jobQueue.Pop();
if (job == nullptr)
break;
job->Execute();
}
}
_jobQueue
로만 바꾸면 오케이.
이제 이를 사용하기 위해 빌드하며 수정한다.
bool Handle_C_CHAT(PacketSessionRef& session, Protocol::C_CHAT& pkt)
{
std::cout << pkt.msg() << endl;
Protocol::S_CHAT chatPkt;
chatPkt.set_msg(pkt.msg());
auto sendBuffer = ClientPacketHandler::MakeSendBuffer(chatPkt);
GRoom->PushJob(&Room::Broadcast, sendBuffer);
return true;
}
사용은 위와 같은 형태로 하게 된다.
3. 실행 결과
변화를 체감할 순 없지만 잘 동작하니 다행이다.
A. 이걸 개선한다면
사실 위 코드는 스마트 포인터의 사이클 문제에서 자유롭지 못하다.
잡을 실행할 FlushJob()
도 따로 있는 것이 별로인 것 같다.
잡이 수십만 ~ 수백만개 있다고 했을 때, 지금처럼 메인 스레드에서 이걸 다 부담하는 것은 무리일 것이다.
while
안에 있기 때문에 처리할 잡이 없더라도 돌아가는 것이 효율적이지 못한 것 같다.
만약 잡을 Push
할 때 같이 실행도 할 수 있다면 괜찮을지도 모른다.
A-1. JobQueue 개선
기존에 구현한 Queue 클래스는 보다 더 일반적인 상황에서도 사용할 수 있을 것 같다.
좀 더 범용적인 느낌이 들도록 개조해 보자.
#pragma once
template<typename T>
class LockQueue
{
public:
void Push(T item)
{
WRITE_LOCK;
_items.push(item);
}
T Pop()
{
WRITE_LOCK;
if (_items.empty())
return T();
T ret = _items.front();
_items.pop();
return ret;
}
void PopAll(OUT Vector<T>& items)
{
WRITE_LOCK;
while (T item = Pop())
items.push_back(item);
}
void Clear()
{
WRITE_LOCK;
_items = Queue<T>();
}
private:
USE_LOCK;
Queue<T> _items;
};
템플릿을 사용하게 변경했고, 큐 이름도 _items
로 바꿨다.
모든 원소를 한 번에 꺼낼 수 있는 PopAll()
과 큐를 비울 Clear()
도 추가했다.
이름이 바뀐 것들을 나머지 코드에 반영해 준다.
그리고 JobQueue
라는 이름을 JobSerializer
가 계승하도록 한다. 이름이 짧아질수록 더 좋을 것 같다.
이제 필요한 부분은 FlushJob()
을 어떻게 하느냐다.
따로 두는 부분이 맘에 들지 않았으므로, 일을 실행하는 부분을 Push()
에 넣을 수도 있을 것이다.
class JobQueue : public enable_shared_from_this<JobQueue>
{
public:
void DoAsync(CallbackType&& callback)
{
Push(ObjectPool<Job>::MakeShared(std::move(callback)));
}
template<typename T, typename Ret, typename... Args>
void DoAsync(Ret(T::*memFunc)(Args...), Args... args)
{
shared_ptr<T> owner = static_pointer_cast<T>(shared_from_this());
Push(ObjectPool<Job>::MakeShared(owner, memFunc, std::forward<Args>(args)...));
}
void ClearJobs() { _jobs.Clear(); }
private:
void Push(JobRef&& job);
void Execute();
protected:
LockQueue<JobRef> _jobs;
private
로 Push
와 Execute
를 두고 public
함수에서 꺼내쓰도록 했다.
함수에 대한 구현은 아래와 같다.
#include "pch.h"
#include "JobQueue.h"
void JobQueue::Push(JobRef&& job)
{
const int32 prevCount = _jobCount.fetch_add(1);
_jobs.Push(job); // WRITE_LOCK
// 첫번째 Job을 넣은 스레드가 실행까지 담당
if (prevCount == 0)
{
Execute();
}
}
void JobQueue::Execute()
{
while (true)
{
Vector<JobRef> jobs;
_jobs.PopAll(OUT jobs);
const int32 jobCount = static_cast<int32>(jobs.size());
for (int32 i = 0; i < jobCount; i++)
jobs[i]->Execute();
// 남은 일감이 0개라면 종료
if (_jobCount.fetch_sub(jobCount) == jobCount)
{
return;
}
}
}
fetch_add
()는 해당 Atomic
변수에 N을 더하고 더하기 전의 값을 리턴한다.
리턴값이 0이라면 첫 일감이란 소리므로 바로 해당 스레드가 실행까지 담당한다.
실행 중 남은 Job
이 0개라면 종료하도록 해 리소스 낭비를 막는다.
# 중요한 점
const int32 prevCount = _jobCount.fetch_add(1);
_jobs.Push(job); // WRITE_LOCK
이 부분은 순서가 뒤바뀌면 안 된다.
먼저 카운트를 증가시킨 후 Job
을 Push
해야 한다.
어떤 잡이 처리되는 와중에도 다른 스레드에서는 잡을 Push
한다.
실행 중 잡이 더 생기더라도 카운트를 미리 늘려놨으므로 새로 추가된 잡도 정상적으로 처리된다.
만약 Push
가 먼저 된다면 아래와 같은 일이 생길 수 있다.
Execute()
가 10개의 잡을 실행 중- 새로운 잡이
Push
됨- 하지만 아직 카운트 추가는 실행되지 않음
jobCount
는 11이 됐지만_jobCount
는 여전히 10인 상황- 10에서 11을 빼게 되니 -1가 되어
Execute()
가 정상적으로 종료되지 못하는 상황 발생
위와 같은 상황을 방지하기 위해서라도 순서를 잘 지키는 것이 좋다.
이제 빌드하며 고칠 부분은 고치고 FlushJob()
을 사용하던 부분도 지우자.
A-2. 실행 결과
여전히 동작은 달라진 게 없지만, 똑같이 나와준다는 것은 수정 사항에 당장의 큰 문제는 없다는 뜻이겠다.
A-3. 생각해 볼 점
만약 위와 같은 코드가 라이브 서버에 올라갔다고 가정하자.
라이브 환경에선 수많은 Job
이 발생할 것인데, 위 코드의 경우엔 아마 Execute()
가 종료되지 않을 것이다.
처음 Job
을 Push 하여
Execute()
를 실행하게 된 스레드가 하나가 모든 일을 다 처리해야 하게 될 수도 있다.
종료될 새도 없이 새로운 잡이 계속 생겨서 무한히 일 하게 될 것이 자명하다.
위 경우는 각 Job
의 Execute()
를 타고 들어간 곳에 DoAsync()
가 있어서 이게 반복이 되다 보며
다른 스레드의 일감도 다 건드리게 되는 상황에서 발생할 수 있다.
타고 타고 실행되다 보니 완료는 요원할 수밖에 없다.
이 문제는 다음에 다뤄볼 수 있을 것이다.
'Study > C++ & C#' 카테고리의 다른 글
[C++] JobTimer (0) | 2023.08.27 |
---|---|
[C++] JobQueue (3 / 3) (0) | 2023.08.27 |
[C++] JobQueue (1 / 3) (0) | 2023.08.16 |
[C++/C#] C# 채팅 클라이언트 간보기 (0) | 2023.08.01 |
[C++] IOCP를 활용한 채팅 서버 구현 (0) | 2023.07.21 |