Skip to content

Commit

Permalink
improve: async dispatcher now use asio / cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mehah authored Nov 23, 2023
1 parent 706c19a commit 89e031a
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 109 deletions.
15 changes: 6 additions & 9 deletions src/client/lightview.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,13 @@
#include "spritemanager.h"

#include <framework/core/eventdispatcher.h>
#include <framework/core/asyncdispatcher.h>
#include <framework/graphics/drawpoolmanager.h>

LightView::LightView(const Size& size, const uint16_t tileSize) : m_pool(g_drawPool.get(DrawPoolType::LIGHT)) {
g_mainDispatcher.addEvent([this, size] {
m_texture = std::make_shared<Texture>(size);
m_texture->setSmooth(true);

m_thread = std::thread([this]() {
std::unique_lock lock(m_pool->getMutex());
m_condition.wait(lock, [this]() -> bool {
updatePixels();
return m_texture == nullptr;
});
});
});

g_drawPool.use(DrawPoolType::LIGHT);
Expand Down Expand Up @@ -119,7 +112,9 @@ void LightView::draw(const Rect& dest, const Rect& src)

std::scoped_lock l(m_pool->getMutex());
if (++m_currentLightData > 1) m_currentLightData = 0;
m_condition.notify_one();
g_asyncDispatcher.dispatch([this] {
updatePixels();
});
}

auto& lightData = m_lightData[m_currentLightData];
Expand All @@ -144,6 +139,8 @@ void LightView::updateCoords(const Rect& dest, const Rect& src) {
}

void LightView::updatePixels() {
std::scoped_lock l(m_pool->getMutex());

const auto& lightData = m_lightData[m_currentLightData ? 0 : 1];

const size_t lightSize = lightData.lights.size();
Expand Down
8 changes: 1 addition & 7 deletions src/client/lightview.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@
#include <framework/graphics/framebuffer.h>
#include "declarations.h"
#include "thingtype.h"
#include <thread>
#include <condition_variable>

class LightView : public LuaObject
{
public:
LightView(const Size& size, const uint16_t tileSize);
~LightView() { m_texture = nullptr; m_condition.notify_one(); m_thread.join(); }
~LightView() { m_texture = nullptr; }

void resize(const Size& size, uint16_t tileSize);
void draw(const Rect& dest, const Rect& src);
Expand Down Expand Up @@ -85,9 +83,5 @@ class LightView : public LuaObject
TexturePtr m_texture;
LightData m_lightData[2];
std::atomic_uint8_t m_currentLightData{ 0 };

std::thread m_thread;
std::condition_variable m_condition;

std::vector<uint8_t> m_pixels;
};
55 changes: 14 additions & 41 deletions src/framework/core/asyncdispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,61 +20,34 @@
* THE SOFTWARE.
*/

#include <algorithm>

#include "asyncdispatcher.h"

AsyncDispatcher g_asyncDispatcher;

void AsyncDispatcher::init(uint8_t maxThreads)
{
if (maxThreads != 0)
m_maxThreads = maxThreads;
if (maxThreads == 0)
maxThreads = 6;

// -2 = Main Thread and Map Thread
int_fast8_t threads = std::clamp<int_fast8_t>(std::thread::hardware_concurrency() - 2, 1, m_maxThreads);
int_fast8_t threads = std::clamp<int_fast8_t>(std::thread::hardware_concurrency() - 2, 1, maxThreads);
for (; --threads >= 0;)
spawn_thread();
m_threads.emplace_back([this] { m_ioService.run(); });
}

void AsyncDispatcher::terminate()
{
stop();
m_tasks.clear();
}

void AsyncDispatcher::spawn_thread()
{
m_running = true;
m_threads.emplace_back([this] { exec_loop(); });
}
void AsyncDispatcher::terminate() { stop(); }

void AsyncDispatcher::stop()
{
m_mutex.lock();
m_running = false;
m_condition.notify_all();
m_mutex.unlock();
for (std::thread& thread : m_threads)
thread.join();
m_threads.clear();
};

void AsyncDispatcher::exec_loop()
{
std::unique_lock lock(m_mutex);
while (true) {
while (m_tasks.empty() && m_running)
m_condition.wait(lock);

if (!m_running)
return;
if (m_ioService.stopped()) {
return;
}

std::function<void()> task = m_tasks.front();
m_tasks.pop_front();
m_ioService.stop();

lock.unlock();
task();
lock.lock();
for (std::size_t i = 0; i < m_threads.size(); i++) {
if (m_threads[i].joinable()) {
m_threads[i].join();
}
}
}
};
30 changes: 11 additions & 19 deletions src/framework/core/asyncdispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,49 +22,41 @@

#pragma once

#include <future>
#include <list>
#include <asio.hpp>
#include <thread>

class AsyncDispatcher
{
public:
void init(uint8_t maxThreads = 0);
void terminate();

void spawn_thread();
void stop();

template<class F>
std::shared_future<std::invoke_result_t<F>> schedule(const F& task)
{
std::scoped_lock lock(m_mutex);
const auto& prom = std::make_shared<std::promise<std::invoke_result_t<F>>>();
m_tasks.push_back([=] { prom->set_value(task()); });
m_condition.notify_all();
dispatch([=] { prom->set_value(task()); });
return std::shared_future<std::invoke_result_t<F>>(prom->get_future());
}

void dispatch(const std::function<void()>& f)
void dispatch(std::function<void()>&& f)
{
std::scoped_lock lock(m_mutex);
m_tasks.push_back(f);
m_condition.notify_all();
asio::post(m_ioService, [this, f = std::move(f)]() {
if (!m_ioService.stopped())
f();
});
}

inline auto getNumberOfThreads() const {
return m_threads.size();
}

protected:
void exec_loop();

private:
std::list<std::function<void()>> m_tasks;
std::list<std::thread> m_threads;
std::mutex m_mutex;
std::condition_variable m_condition;
bool m_running{ false };
uint8_t m_maxThreads{ 6 };
asio::io_context m_ioService;
std::vector<std::thread> m_threads;
asio::io_context::work m_work{ m_ioService };
};

extern AsyncDispatcher g_asyncDispatcher;
50 changes: 17 additions & 33 deletions src/framework/core/graphicalapplication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ void GraphicalApplication::run()
const auto& txt = g_drawPool.get(DrawPoolType::TEXT);
const auto& map = g_drawPool.get(DrawPoolType::MAP);

std::condition_variable foreCondition, txtCondition;

// clang c++20 dont support jthread
std::thread t1([&]() {
g_eventThreadId = std::this_thread::get_id();
Expand All @@ -157,15 +155,28 @@ void GraphicalApplication::run()
continue;
}*/

if (foreground->canRepaint())
foreCondition.notify_one();
if (foreground->canRepaint()) {
g_asyncDispatcher.dispatch([this, &foreground] {
std::scoped_lock l(foreground->getMutex());
g_ui.render(DrawPoolType::FOREGROUND);
});
}

if (g_game.isOnline()) {
if (!g_ui.m_mapWidget)
g_ui.m_mapWidget = g_ui.getRootWidget()->recursiveGetChildById("gameMapPanel")->static_self_cast<UIMap>();

if (txt->canRepaint() || foreground_tile->canRepaint())
txtCondition.notify_one();
if (txt->canRepaint() || foreground_tile->canRepaint()) {
g_asyncDispatcher.dispatch([this, &txt] {
std::scoped_lock l(txt->getMutex());
g_textDispatcher.poll();

if (g_ui.m_mapWidget) {
g_ui.m_mapWidget->drawSelf(DrawPoolType::TEXT);
g_ui.m_mapWidget->drawSelf(DrawPoolType::FOREGROUND_TILE);
}
});
}

{
std::scoped_lock l(map->getMutex());
Expand All @@ -175,31 +186,6 @@ void GraphicalApplication::run()

stdext::millisleep(1);
}

foreCondition.notify_one();
txtCondition.notify_one();
});

std::thread t2([&]() {
std::unique_lock lock(foreground->getMutex());
foreCondition.wait(lock, [&]() -> bool {
g_ui.render(DrawPoolType::FOREGROUND);
return m_stopping;
});
});

std::thread t3([&]() {
std::unique_lock lock(txt->getMutex());
txtCondition.wait(lock, [&]() -> bool {
g_textDispatcher.poll();

if (g_ui.m_mapWidget) {
g_ui.m_mapWidget->drawSelf(DrawPoolType::TEXT);
g_ui.m_mapWidget->drawSelf(DrawPoolType::FOREGROUND_TILE);
}

return m_stopping;
});
});

m_running = true;
Expand All @@ -219,8 +205,6 @@ void GraphicalApplication::run()
}

t1.join();
t2.join();
t3.join();

m_stopping = false;
m_running = false;
Expand Down

0 comments on commit 89e031a

Please sign in to comment.