diff --git a/dev/tricky_disp_case/main.cpp b/dev/tricky_disp_case/main.cpp index 10f5b46..85557a4 100644 --- a/dev/tricky_disp_case/main.cpp +++ b/dev/tricky_disp_case/main.cpp @@ -6,20 +6,84 @@ #include // A class of dispatcher intended to process events of a_device_manager_t agent. -class tricky_dispatcher_t +class tricky_dispatcher_t final : public so_5::disp_binder_t , public so_5::event_queue_t { + // A kind of std::latch from C++20, but without a fixed number of participant. + // It's something similar to Run-Down Protection from Windows's kernel: + // + // https://learn.microsoft.com/en-us/windows-hardware/drivers/kernel/run-down-protection + class rundown_latch_t { + std::mutex lock_; + std::condition_variable wakeup_cv_; + + bool closed_{false}; + unsigned attenders_{}; + + public: + rundown_latch_t() = default; + + void acquire() { + std::lock_guard lock{lock_}; + if(closed_) + throw std::runtime_error{"rundown_latch is closed"}; + ++attenders_; + } + + void release() noexcept { + std::lock_guard lock{lock_}; + --attenders_; + if(!attenders_) + wakeup_cv_.notify_all(); + } + + void wait_then_close() { + std::unique_lock lock{lock_}; + if(attenders_) + { + wakeup_cv_.wait(lock, [this]{ return 0u == attenders_; }); + closed_ = true; + } + } + }; + + // A kind of std::lock_guard, but for rundown_latch_t. + class auto_acquire_release_rundown_latch_t { + rundown_latch_t & room_; + + public: + auto_acquire_release_rundown_latch_t(rundown_latch_t & room) : room_{room} { + room_.acquire(); + } + ~auto_acquire_release_rundown_latch_t() { + room_.release(); + } + }; + // Type of container for worker threads. using thread_pool_t = std::vector; // Channels to be used as event-queues. + so_5::mchain_t start_finish_ch_; so_5::mchain_t init_reinit_ch_; so_5::mchain_t other_demands_ch_; // The pool of worker threads for that dispatcher. thread_pool_t work_threads_; + // Synchronization objects required for thread management. + // + // This one is for starting worker threads. + // The leader thread should wait while all workers are created. + rundown_latch_t launch_room_; + // This one is for handling evt_start, + // All workers (except the leader) have to wait while evt_start completed. + rundown_latch_t start_room_; + // This on is for handling evt_finish. + // The leader thread has to wait while all workers complete their work. + rundown_latch_t finish_room_; + static const std::type_index init_device_type; static const std::type_index reinit_device_type; @@ -37,7 +101,8 @@ class tricky_dispatcher_t // Helper method for shutdown and join all threads. void shutdown_work_threads() noexcept { - // Both channels should be closed first. + // All channels should be closed first. + so_5::close_drop_content(so_5::terminate_if_throws, start_finish_ch_); so_5::close_drop_content(so_5::terminate_if_throws, init_reinit_ch_); so_5::close_drop_content(so_5::terminate_if_throws, other_demands_ch_); @@ -57,7 +122,14 @@ class tricky_dispatcher_t unsigned second_type_threads_count) { work_threads_.reserve(first_type_threads_count + second_type_threads_count); try { - for(auto i = 0u; i < first_type_threads_count; ++i) + // The leader has to be suspended until all workers will be created. + auto_acquire_release_rundown_latch_t launch_room_changer{launch_room_}; + + // Start the leader thread first. + work_threads_.emplace_back([this]{ leader_thread_body(); }); + + // Now we can launch all remaining workers. + for(auto i = 1u; i < first_type_threads_count; ++i) work_threads_.emplace_back([this]{ first_type_thread_body(); }); for(auto i = 0u; i < second_type_threads_count; ++i) @@ -74,8 +146,40 @@ class tricky_dispatcher_t d.call_handler(so_5::null_current_thread_id()); } + // The body of the leader thread. + void leader_thread_body() { + // We have to wait while all workers are created. + // NOTE: not all of them can start their work actually, but all + // std::thread objects should be created. + launch_room_.wait_then_close(); + + { + // We have to block all other threads until evt_start will be processed. + auto_acquire_release_rundown_latch_t start_room_changer{start_room_}; + // Process evt_start. + so_5::receive(so_5::from(start_finish_ch_).handle_n(1), + exec_demand_handler); + } + + // Now the leader can play the role of the first thread type. + first_type_thread_body(); + + // All worker should finish their work before processing of evt_finish. + finish_room_.wait_then_close(); + + // Process evt_finish. + so_5::receive(so_5::from(start_finish_ch_).handle_n(1), + exec_demand_handler); + } + // The body for a thread of the first type. void first_type_thread_body() { + // Processing of evt_finish has to be enabled at the end. + auto_acquire_release_rundown_latch_t finish_room_changer{finish_room_}; + + // Wait while evt_start is processed. + start_room_.wait_then_close(); + // Run until all channels will be closed. so_5::select(so_5::from_all().handle_all(), receive_case(init_reinit_ch_, exec_demand_handler), @@ -84,6 +188,12 @@ class tricky_dispatcher_t // The body for a thread of the second type. void second_type_thread_body() { + // Processing of evt_finish has to be enabled at the end. + auto_acquire_release_rundown_latch_t finish_room_changer{finish_room_}; + + // Wait while evt_start is processed. + start_room_.wait_then_close(); + // Run until all channels will be closed. so_5::select(so_5::from_all().handle_all(), receive_case(other_demands_ch_, exec_demand_handler)); @@ -91,11 +201,11 @@ class tricky_dispatcher_t // Implementation of the methods inherited from disp_binder. void preallocate_resources(so_5::agent_t & /*agent*/) override { - // There is no need to do something. + // Nothing to do. } void undo_preallocation(so_5::agent_t & /*agent*/) noexcept override { - // There is no need to do something. + // Nothing to do. } void bind(so_5::agent_t & agent) noexcept override { @@ -103,7 +213,7 @@ class tricky_dispatcher_t } void unbind(so_5::agent_t & /*agent*/) noexcept override { - // There is no need to do something. + // Nothing to do. } // Implementation of the methods inherited from event_queue. @@ -120,15 +230,18 @@ class tricky_dispatcher_t } void push_evt_start(so_5::execution_demand_t demand) override { - // There is no need to check the type of the demand. - so_5::send(other_demands_ch_, std::move(demand)); + so_5::send(start_finish_ch_, std::move(demand)); } // NOTE: don't care about exception, if the demand can't be stored // into the queue the application has to be aborted anyway. void push_evt_finish(so_5::execution_demand_t demand) noexcept override { - // There is no need to check the type of the demand. - so_5::send(other_demands_ch_, std::move(demand)); + // Chains for "ordinary" messages has to be closed. + so_5::close_retain_content(so_5::terminate_if_throws, init_reinit_ch_); + so_5::close_retain_content(so_5::terminate_if_throws, other_demands_ch_); + + // Now we can store the evt_finish demand in the special chain. + so_5::send(start_finish_ch_, std::move(demand)); } public: @@ -138,14 +251,21 @@ class tricky_dispatcher_t so_5::environment_t & env, // The size of the thread pool. unsigned pool_size) - : init_reinit_ch_{so_5::create_mchain(env)} - , other_demands_ch_{so_5::create_mchain(env)} { + : start_finish_ch_{ + so_5::create_mchain(env, + 2u, // Just evt_start and evt_finish. + so_5::mchain_props::memory_usage_t::preallocated, + so_5::mchain_props::overflow_reaction_t::abort_app) + } + , init_reinit_ch_{so_5::create_mchain(env)} + , other_demands_ch_{so_5::create_mchain(env)} + { const auto [first_type_count, second_type_count] = calculate_pools_sizes(pool_size); launch_work_threads(first_type_count, second_type_count); } - ~tricky_dispatcher_t() noexcept { + ~tricky_dispatcher_t() noexcept override { // All worker threads should be stopped. shutdown_work_threads(); }