Skip to content

Commit

Permalink
Fixed implementation of tricky_thread_pool_disp.
Browse files Browse the repository at this point in the history
This implementation respects evt_start and evt_finish demands
and uses a scheme with one leader thread, the handles evt_start and
evt_finish.
  • Loading branch information
eao197 committed Feb 12, 2024
1 parent c714dd1 commit 22f177d
Showing 1 changed file with 133 additions and 13 deletions.
146 changes: 133 additions & 13 deletions dev/tricky_disp_case/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,84 @@
#include <fmt/ostream.h>

// A class of dispatcher intended to process events of a_device_manager_t agent.
class tricky_dispatcher_t
class tricky_dispatcher_t final
: public so_5::disp_binder_t
, public so_5::event_queue_t {

// A kind of std::latch from C++20, but without a fixed number of participant.
// It's something similar to Run-Down Protection from Windows's kernel:
//
// https://learn.microsoft.com/en-us/windows-hardware/drivers/kernel/run-down-protection
class rundown_latch_t {
std::mutex lock_;
std::condition_variable wakeup_cv_;

bool closed_{false};
unsigned attenders_{};

public:
rundown_latch_t() = default;

void acquire() {
std::lock_guard<std::mutex> lock{lock_};
if(closed_)
throw std::runtime_error{"rundown_latch is closed"};
++attenders_;
}

void release() noexcept {
std::lock_guard<std::mutex> lock{lock_};
--attenders_;
if(!attenders_)
wakeup_cv_.notify_all();
}

void wait_then_close() {
std::unique_lock<std::mutex> lock{lock_};
if(attenders_)
{
wakeup_cv_.wait(lock, [this]{ return 0u == attenders_; });
closed_ = true;
}
}
};

// A kind of std::lock_guard, but for rundown_latch_t.
class auto_acquire_release_rundown_latch_t {
rundown_latch_t & room_;

public:
auto_acquire_release_rundown_latch_t(rundown_latch_t & room) : room_{room} {
room_.acquire();
}
~auto_acquire_release_rundown_latch_t() {
room_.release();
}
};

// Type of container for worker threads.
using thread_pool_t = std::vector<std::thread>;

// Channels to be used as event-queues.
so_5::mchain_t start_finish_ch_;
so_5::mchain_t init_reinit_ch_;
so_5::mchain_t other_demands_ch_;

// The pool of worker threads for that dispatcher.
thread_pool_t work_threads_;

// Synchronization objects required for thread management.
//
// This one is for starting worker threads.
// The leader thread should wait while all workers are created.
rundown_latch_t launch_room_;
// This one is for handling evt_start,
// All workers (except the leader) have to wait while evt_start completed.
rundown_latch_t start_room_;
// This on is for handling evt_finish.
// The leader thread has to wait while all workers complete their work.
rundown_latch_t finish_room_;

static const std::type_index init_device_type;
static const std::type_index reinit_device_type;

Expand All @@ -37,7 +101,8 @@ class tricky_dispatcher_t

// Helper method for shutdown and join all threads.
void shutdown_work_threads() noexcept {
// Both channels should be closed first.
// All channels should be closed first.
so_5::close_drop_content(so_5::terminate_if_throws, start_finish_ch_);
so_5::close_drop_content(so_5::terminate_if_throws, init_reinit_ch_);
so_5::close_drop_content(so_5::terminate_if_throws, other_demands_ch_);

Expand All @@ -57,7 +122,14 @@ class tricky_dispatcher_t
unsigned second_type_threads_count) {
work_threads_.reserve(first_type_threads_count + second_type_threads_count);
try {
for(auto i = 0u; i < first_type_threads_count; ++i)
// The leader has to be suspended until all workers will be created.
auto_acquire_release_rundown_latch_t launch_room_changer{launch_room_};

// Start the leader thread first.
work_threads_.emplace_back([this]{ leader_thread_body(); });

// Now we can launch all remaining workers.
for(auto i = 1u; i < first_type_threads_count; ++i)
work_threads_.emplace_back([this]{ first_type_thread_body(); });

for(auto i = 0u; i < second_type_threads_count; ++i)
Expand All @@ -74,8 +146,40 @@ class tricky_dispatcher_t
d.call_handler(so_5::null_current_thread_id());
}

// The body of the leader thread.
void leader_thread_body() {
// We have to wait while all workers are created.
// NOTE: not all of them can start their work actually, but all
// std::thread objects should be created.
launch_room_.wait_then_close();

{
// We have to block all other threads until evt_start will be processed.
auto_acquire_release_rundown_latch_t start_room_changer{start_room_};
// Process evt_start.
so_5::receive(so_5::from(start_finish_ch_).handle_n(1),
exec_demand_handler);
}

// Now the leader can play the role of the first thread type.
first_type_thread_body();

// All worker should finish their work before processing of evt_finish.
finish_room_.wait_then_close();

// Process evt_finish.
so_5::receive(so_5::from(start_finish_ch_).handle_n(1),
exec_demand_handler);
}

// The body for a thread of the first type.
void first_type_thread_body() {
// Processing of evt_finish has to be enabled at the end.
auto_acquire_release_rundown_latch_t finish_room_changer{finish_room_};

// Wait while evt_start is processed.
start_room_.wait_then_close();

// Run until all channels will be closed.
so_5::select(so_5::from_all().handle_all(),
receive_case(init_reinit_ch_, exec_demand_handler),
Expand All @@ -84,26 +188,32 @@ class tricky_dispatcher_t

// The body for a thread of the second type.
void second_type_thread_body() {
// Processing of evt_finish has to be enabled at the end.
auto_acquire_release_rundown_latch_t finish_room_changer{finish_room_};

// Wait while evt_start is processed.
start_room_.wait_then_close();

// Run until all channels will be closed.
so_5::select(so_5::from_all().handle_all(),
receive_case(other_demands_ch_, exec_demand_handler));
}

// Implementation of the methods inherited from disp_binder.
void preallocate_resources(so_5::agent_t & /*agent*/) override {
// There is no need to do something.
// Nothing to do.
}

void undo_preallocation(so_5::agent_t & /*agent*/) noexcept override {
// There is no need to do something.
// Nothing to do.
}

void bind(so_5::agent_t & agent) noexcept override {
agent.so_bind_to_dispatcher(*this);
}

void unbind(so_5::agent_t & /*agent*/) noexcept override {
// There is no need to do something.
// Nothing to do.
}

// Implementation of the methods inherited from event_queue.
Expand All @@ -120,15 +230,18 @@ class tricky_dispatcher_t
}

void push_evt_start(so_5::execution_demand_t demand) override {
// There is no need to check the type of the demand.
so_5::send<so_5::execution_demand_t>(other_demands_ch_, std::move(demand));
so_5::send<so_5::execution_demand_t>(start_finish_ch_, std::move(demand));
}

// NOTE: don't care about exception, if the demand can't be stored
// into the queue the application has to be aborted anyway.
void push_evt_finish(so_5::execution_demand_t demand) noexcept override {
// There is no need to check the type of the demand.
so_5::send<so_5::execution_demand_t>(other_demands_ch_, std::move(demand));
// Chains for "ordinary" messages has to be closed.
so_5::close_retain_content(so_5::terminate_if_throws, init_reinit_ch_);
so_5::close_retain_content(so_5::terminate_if_throws, other_demands_ch_);

// Now we can store the evt_finish demand in the special chain.
so_5::send<so_5::execution_demand_t>(start_finish_ch_, std::move(demand));
}

public:
Expand All @@ -138,14 +251,21 @@ class tricky_dispatcher_t
so_5::environment_t & env,
// The size of the thread pool.
unsigned pool_size)
: init_reinit_ch_{so_5::create_mchain(env)}
, other_demands_ch_{so_5::create_mchain(env)} {
: start_finish_ch_{
so_5::create_mchain(env,
2u, // Just evt_start and evt_finish.
so_5::mchain_props::memory_usage_t::preallocated,
so_5::mchain_props::overflow_reaction_t::abort_app)
}
, init_reinit_ch_{so_5::create_mchain(env)}
, other_demands_ch_{so_5::create_mchain(env)}
{
const auto [first_type_count, second_type_count] =
calculate_pools_sizes(pool_size);

launch_work_threads(first_type_count, second_type_count);
}
~tricky_dispatcher_t() noexcept {
~tricky_dispatcher_t() noexcept override {
// All worker threads should be stopped.
shutdown_work_threads();
}
Expand Down

0 comments on commit 22f177d

Please sign in to comment.