[C++] JobQueue (2 / 3)

2023. 8. 19. 01:31·Study/C++ & C#

이전 시간에 Functor를 활용해 봤으니, 이번엔 Lambda를 활용해 볼 수도 있을 것이다.

1. Lambda

이전엔 함수에 인자를 넘겨주기 위해 tuple을 사용했는데, 람다식에선 그럴 필요가 없다.

아래의 람다식 예시 코드를 보자.

void lmbd(int32 a, int32 b)
{
    cout << "Hello Lambda " << a << " " << b << endl;
}

PlayerRef player = make_shared<Player>();

std::function<void()> func = [=]()
    {
        lmbd(1, 2);
        GRoom->Enter(player);
    };

func();

튜플을 사용하지 않고도 특정 함수에 인자를 넘겨줄 수 있다.

이것이 가능한 이유는 람다의 동작 방식에서 찾을 수 있다.

 

런타임에선 람다란 존재하지 않는다. 대신 람다식에 의해 생성된 Closures라는 임시 객체가 존재한다.

이 클로저라는 임시 클래스가 캡처된 데이터들을 임시로 갖고 있게 되는 것이다.

아래와 같은 임시 클래스가 생기는 것과 유사하다.

class closure
{
public:
    //...

private:
    int32 a;
    int32 b;
    PlayerRef player;
};

썩 괜찮은 기능이다.

그렇다면 이전처럼 Job을 힘들게 만들 필요 없이, std::function을 Job으로 인정할 수 있다면 더 편할 것 같다.

 

1-1. 캡처와 C++과의 궁합

캡처 옵션으로 외부 데이터를 복사 또는 참조할 수 있는 \= 및 &를 사용할 수 있다.

캡처 옵션에 &player를 넣는 것처럼, 일부 데이터만 복사 또는 참조할 수 있다.

 

하지만 이는 C++ 에선 문제가 생길 수도 있다.

궁합에 관한 얘기가 이것 때문에 나왔다.

단순히 참조만 하는 것이라면 위에 예시를 든 것처럼 임시 객체가 생성되는데,

스마트 포인터가 아니므로 카운트 관리가 되지 않는다.

만약 player가 사라져 버린다면 엉뚱한 메모리를 참조하는 사고가 생길 것이다.

 

Job은 만들고 바로 사용하기 위해 존재하지 않기 때문에 생명주기 관리가 필수적이다.

잡이 살아있을 때 까진 참조한 객체들이 살아있어야 한다.

어떤 클래스를 예로 들어보자.

class chara
{
public:
    void Heal(int32 val)
    {
        _hp += val;
    }

    void Test(int32 val)
    {
        auto job = [=]()
            {
                Heal(_hp);
            };
    }

private:
    int32 _hp = 100;
};

캡처 모드가 복사이므로 싸그리 복사된다.

하지만 이 코드는 복사가 되는게 아니라 this포인터를 넘겨주는 것과 동일한 동작을 한다.

void Test(int32 val)
{
    auto job = [this]()
        {
            Heal(this->_hp);
        };
}

위처럼 동작하게 되는 것이기 때문에, 만약 chara클래스가 소멸한다면 큰 문제가 생기게 되는 것이다.

만약 chara클래스가 share pointer로 사용된다고 했을 때, this포인터를 사용한 상황이라면

어떤 카운팅도 이루어지지 않기 때문에 더 문제다.

이걸 shared pointer를 사용하는 형식으로 바꾼다면 아래와 같다.

class chara : enable_shared_from_this<chara>
{
public:
    void Heal(int32 val)
    {
        _hp += val;
    }

    void Test(int32 val)
    {
        auto job = [self = shared_from_this()]()
            {
                self->Heal(self->_hp);
            };
    }

private:
    int32 _hp = 100;
};

위와 같이 사용한다면 카운팅을 통해 생명주기를 관리할 수 있다.

2. Lambda를 활용한 Job

이제 람다식을 활용한 Job을 구현해 보자.

#pragma once
#include <functional>

using CallbackType = std::function<void()>;

class Job
{
public:
    // 바로 콜백을 넘겨줄 때 사용
    // 스마트 포인터 관리가 번거로울 수 있음
    Job(CallbackType&& callback) : _callback(std::move(callback))
    {
    }

    template<typename T, typename Ret, typename... Args>
    // 인자로 lvalue 및 rvalue를 다 받을 수 있게 하기 위해 보편참조 사용(Args&&...)
    Job(shared_ptr<T> owner, Ret(T::* memFunc)(Args...), Args&&... args)
    {
        _callback = [owner, memFunc, args...]()
        {
            // shared pointer의 멤버 함수 호출을 수행하기 위해
            // get()으로 내부 원시 포인터를 얻어와야 한다.
            (owner.get()->*memFunc)(args...);
        };
    }

    void Execute()
    {
        _callback();
    }

private:
    CallbackType _callback;
};

각 변수 및 함수는 아래와 같은 역할을 한다

  • CallbackType
    • 콜백 함수를 갖고 있다. 람다식으로 만든 콜백 함수를 갖게 된다.
  • Job(CallbackType&& callback)
    • 콜백 함수를 직접적으로 넘겨받는 생성자이다.
  • Job(shared_ptr owner, Ret(T::* memFunc)(Args...), Args&&... args)
    • 템플릿을 사용해 좀 더 일반적으로 사용할 수 있는 버전의 생성자.
    • 보편참조를 통해 모든 값을 받을 수 있게 했다.
  • Execute()
    • 콜백 함수를 실제로 실행하는 멤버 함수.

중요한 점은 통으로 복사나 참조가 아닌, 필요한 변수만 캡처해와야 한다는 점이다.

그리고 이를 더 편하게 사용하기 위한 JobSerializer라는 클래스를 만들 것이다.

 

기존의 Room 구현처럼 일일이 Push와 Flush를 구현하는 것은 효율적이지 못하다.

JobQueue를 사용하는 모든 클래스는 앞으로 JobSerializer를 상속받게 해 더 효율적인 처리가 가능하게 될 것이다.

#pragma once
#include "Job.h"
#include "JobQueue.h"

class JobSerializer : public enable_shared_from_this<JobSerializer>
{
public:
    void PushJob(CallbackType&& callback)
    {
        auto job = ObjectPool<Job>::MakeShared(std::move(callback));
        _jobQueue.Push(job);
    }

    template<typename T, typename Ret, typename... Args>
    void PushJob(Ret(T::*memFunc)(Args...), Args... args)
    {
        shared_ptr<T> owner = static_pointer_cast<T>(shared_from_this());
        auto job = ObjectPool<Job>::MakeShared(owner, memFunc, std::forward<Args>(args)...);
        _jobQueue.Push(job);
    }

    virtual void FlushJob() abstract;

protected:
    JobQueue _jobQueue;
};

Push 하는 부분을 Job클래스를 생성해 넘겨주는 형태로 구현했다.

FlushJob은 추상함수의 형태로 만들어 자식 클래스에서 구현하도록 한다.

JobQueue를 사용하는 클래스인 Room클래스가 위 클래스를 상속받도록 수정한다.

#pragma once
#include "JobSerializer.h"

class Room : public JobSerializer
{
public
    void Enter(PlayerRef player);
    void Leave(PlayerRef player);
    void Broadcast(SendBufferRef sendBuffer);

public:
    virtual void FlushJob() override;

private:
    map<uint64, PlayerRef> _players;
};

extern shared_ptr<Room> GRoom;

FlushJob만 수해주면 된다.

GRoom도 전역으로 선언해 다른 곳에서 갖다 쓸 수 있도록 했다.

void Room::FlushJob()
{
    while (true)
    {
        JobRef job = _jobQueue.Pop();
        if (job == nullptr)
            break;

        job->Execute();
    }
}

_jobQueue로만 바꾸면 오케이.

이제 이를 사용하기 위해 빌드하며 수정한다.

bool Handle_C_CHAT(PacketSessionRef& session, Protocol::C_CHAT& pkt)
{
    std::cout << pkt.msg() << endl;

    Protocol::S_CHAT chatPkt;
    chatPkt.set_msg(pkt.msg());
    auto sendBuffer = ClientPacketHandler::MakeSendBuffer(chatPkt);

    GRoom->PushJob(&Room::Broadcast, sendBuffer);

    return true;
}

사용은 위와 같은 형태로 하게 된다.

 

3. 실행 결과

변화를 체감할 순 없지만 잘 동작하니 다행이다.

 

A. 이걸 개선한다면

사실 위 코드는 스마트 포인터의 사이클 문제에서 자유롭지 못하다.

잡을 실행할 FlushJob()도 따로 있는 것이 별로인 것 같다.

잡이 수십만 ~ 수백만개 있다고 했을 때, 지금처럼 메인 스레드에서 이걸 다 부담하는 것은 무리일 것이다.

while안에 있기 때문에 처리할 잡이 없더라도 돌아가는 것이 효율적이지 못한 것 같다.

만약 잡을 Push할 때 같이 실행도 할 수 있다면 괜찮을지도 모른다.

 

A-1. JobQueue 개선

기존에 구현한 Queue 클래스는 보다 더 일반적인 상황에서도 사용할 수 있을 것 같다.

좀 더 범용적인 느낌이 들도록 개조해 보자.

#pragma once

template<typename T>
class LockQueue
{
public:
    void Push(T item)
    {
        WRITE_LOCK;
        _items.push(item);
    }

    T Pop()
    {
        WRITE_LOCK;
        if (_items.empty())
            return T();

        T ret = _items.front();
        _items.pop();
        return ret;
    }

    void PopAll(OUT Vector<T>& items)
    {
        WRITE_LOCK;
        while (T item = Pop())
            items.push_back(item);
    }

    void Clear()
    {
        WRITE_LOCK;
        _items = Queue<T>();
    }

private:
    USE_LOCK;
    Queue<T> _items;
};

템플릿을 사용하게 변경했고, 큐 이름도 _items로 바꿨다.

모든 원소를 한 번에 꺼낼 수 있는 PopAll()과 큐를 비울 Clear()도 추가했다.

이름이 바뀐 것들을 나머지 코드에 반영해 준다.

 

그리고 JobQueue라는 이름을 JobSerializer가 계승하도록 한다. 이름이 짧아질수록 더 좋을 것 같다.

이제 필요한 부분은 FlushJob()을 어떻게 하느냐다.

따로 두는 부분이 맘에 들지 않았으므로, 일을 실행하는 부분을 Push()에 넣을 수도 있을 것이다.

 

class JobQueue : public enable_shared_from_this<JobQueue>
{
public:
    void DoAsync(CallbackType&& callback)
    {
        Push(ObjectPool<Job>::MakeShared(std::move(callback)));
    }

    template<typename T, typename Ret, typename... Args>
    void DoAsync(Ret(T::*memFunc)(Args...), Args... args)
    {
        shared_ptr<T> owner = static_pointer_cast<T>(shared_from_this());
        Push(ObjectPool<Job>::MakeShared(owner, memFunc, std::forward<Args>(args)...));
    }

    void                ClearJobs() { _jobs.Clear(); }

private:
    void                Push(JobRef&& job);
    void                Execute();

protected:
    LockQueue<JobRef>    _jobs;

private로 Push와 Execute를 두고 public 함수에서 꺼내쓰도록 했다.

함수에 대한 구현은 아래와 같다.

#include "pch.h"
#include "JobQueue.h"

void JobQueue::Push(JobRef&& job)
{
    const int32 prevCount = _jobCount.fetch_add(1);
    _jobs.Push(job); // WRITE_LOCK

    // 첫번째 Job을 넣은 스레드가 실행까지 담당
    if (prevCount == 0)
    {
        Execute();
    }
}

void JobQueue::Execute()
{
    while (true)
    {
        Vector<JobRef> jobs;
        _jobs.PopAll(OUT jobs);

        const int32 jobCount = static_cast<int32>(jobs.size());
        for (int32 i = 0; i < jobCount; i++)
            jobs[i]->Execute();

        // 남은 일감이 0개라면 종료
        if (_jobCount.fetch_sub(jobCount) == jobCount)
        {
            return;
        }
    }
}

fetch_add()는 해당 Atomic 변수에 N을 더하고 더하기 전의 값을 리턴한다.

리턴값이 0이라면 첫 일감이란 소리므로 바로 해당 스레드가 실행까지 담당한다.

실행 중 남은 Job이 0개라면 종료하도록 해 리소스 낭비를 막는다.

 

# 중요한 점

const int32 prevCount = _jobCount.fetch_add(1);
_jobs.Push(job); // WRITE_LOCK

이 부분은 순서가 뒤바뀌면 안 된다.

먼저 카운트를 증가시킨 후 Job을 Push 해야 한다.

어떤 잡이 처리되는 와중에도 다른 스레드에서는 잡을 Push 한다.

실행 중 잡이 더 생기더라도 카운트를 미리 늘려놨으므로 새로 추가된 잡도 정상적으로 처리된다.

만약 Push가 먼저 된다면 아래와 같은 일이 생길 수 있다.

  1. Execute() 가 10개의 잡을 실행 중
  2. 새로운 잡이 Push 됨
    • 하지만 아직 카운트 추가는 실행되지 않음
  3. jobCount는 11이 됐지만 _jobCount는 여전히 10인 상황
  4. 10에서 11을 빼게 되니 -1가 되어 Execute()가 정상적으로 종료되지 못하는 상황 발생

위와 같은 상황을 방지하기 위해서라도 순서를 잘 지키는 것이 좋다.

이제 빌드하며 고칠 부분은 고치고 FlushJob()을 사용하던 부분도 지우자.

 

A-2. 실행 결과

여전히 동작은 달라진 게 없지만, 똑같이 나와준다는 것은 수정 사항에 당장의 큰 문제는 없다는 뜻이겠다.

 

A-3. 생각해 볼 점

만약 위와 같은 코드가 라이브 서버에 올라갔다고 가정하자.

라이브 환경에선 수많은 Job이 발생할 것인데, 위 코드의 경우엔 아마 Execute()가 종료되지 않을 것이다.

처음 Job을 Push 하여 Execute()를 실행하게 된 스레드가 하나가 모든 일을 다 처리해야 하게 될 수도 있다.

종료될 새도 없이 새로운 잡이 계속 생겨서 무한히 일 하게 될 것이 자명하다.

 

위 경우는 각 Job의 Execute()를 타고 들어간 곳에 DoAsync()가 있어서 이게 반복이 되다 보며

다른 스레드의 일감도 다 건드리게 되는 상황에서 발생할 수 있다.

타고 타고 실행되다 보니 완료는 요원할 수밖에 없다.

 

이 문제는 다음에 다뤄볼 수 있을 것이다.

'Study > C++ & C#' 카테고리의 다른 글

[C++] JobTimer  (0) 2023.08.27
[C++] JobQueue (3 / 3)  (0) 2023.08.27
[C++] JobQueue (1 / 3)  (0) 2023.08.16
[C++/C#] C# 채팅 클라이언트 간보기  (0) 2023.08.01
[C++] IOCP를 활용한 채팅 서버 구현  (0) 2023.07.21
'Study/C++ & C#' 카테고리의 다른 글
  • [C++] JobTimer
  • [C++] JobQueue (3 / 3)
  • [C++] JobQueue (1 / 3)
  • [C++/C#] C# 채팅 클라이언트 간보기
BVM
BVM
  • BVM
    E:\
    BVM
  • 전체
    오늘
    어제
    • 분류 전체보기 (168)
      • Thoughts (14)
      • Study (69)
        • Japanese (3)
        • C++ & C# (46)
        • Javascript (3)
        • Python (14)
        • Others (3)
      • Play (1)
        • Battlefield (1)
      • Others (11)
      • Camp (73)
        • T.I.L. (57)
        • Temp (1)
        • Standard (10)
        • Challenge (3)
        • Project (1)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

    • 본 블로그 개설의 목적
  • 인기 글

  • 태그

    c#
    서버
    discord
    암호화
    db
    FF14
    Dalamud
    discord py
    7계층
    네트워크 프로그래밍
    JS
    베데스다
    포인터
    boost
    Selenium
    Server
    로깅
    OSI
    Network
    Python
    C++
    스타필드
    IOCP
    Asio
    bot
    네트워크
    클라우드
    cloudtype
    프로그래머스
    docker
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.2
BVM
[C++] JobQueue (2 / 3)
상단으로

티스토리툴바