From 5c1dcee9619e5304268249199e73b4bd65fbfcd3 Mon Sep 17 00:00:00 2001 From: Christian Menard Date: Mon, 15 May 2023 11:26:14 +0200 Subject: [PATCH 1/5] Revert "avoid race between a scheduler starting up and an enclave trying to acquire." This reverts commit b4c6963ea6a5d951a5cf71b947f550117bd9f688. --- include/reactor-cpp/scheduler.hh | 4 ---- lib/scheduler.cc | 5 ----- 2 files changed, 9 deletions(-) diff --git a/include/reactor-cpp/scheduler.hh b/include/reactor-cpp/scheduler.hh index 3f84cfa8..2d3f7fa9 100644 --- a/include/reactor-cpp/scheduler.hh +++ b/include/reactor-cpp/scheduler.hh @@ -110,10 +110,6 @@ private: std::mutex scheduling_mutex_; std::condition_variable cv_schedule_; - // lock used to protect the scheduler from asynchronous requests (i.e. from - // enclaves pr federates). We hold the mutex from construction and release in - // start(). - std::unique_lock startup_lock_{scheduling_mutex_}; std::shared_mutex mutex_event_queue_; std::map event_queue_; diff --git a/lib/scheduler.cc b/lib/scheduler.cc index 5b660aa3..c1e07b65 100644 --- a/lib/scheduler.cc +++ b/lib/scheduler.cc @@ -214,11 +214,6 @@ void Scheduler::start() { set_ports_.resize(num_workers); triggered_reactions_.resize(num_workers); - // release the scheduling mutex, allowing other asynchronous processes (i.e. - // enclaves or federates) to access the event queue and the current logical - // time. - startup_lock_.unlock(); - // Initialize and start the workers. By resizing the workers vector first, // we make sure that there is sufficient space for all the workers and non of // them needs to be moved. This is important because a running worker may not From 1ec6e5b1cc26c1d9e78d2ae369822695b6f99562 Mon Sep 17 00:00:00 2001 From: Christian Menard Date: Mon, 15 May 2023 14:59:09 +0200 Subject: [PATCH 2/5] move macro definition out of namespace --- include/reactor-cpp/assert.hh | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/include/reactor-cpp/assert.hh b/include/reactor-cpp/assert.hh index ab722cc2..c1039e1e 100644 --- a/include/reactor-cpp/assert.hh +++ b/include/reactor-cpp/assert.hh @@ -33,6 +33,18 @@ constexpr bool runtime_assertion = true; #include #endif +// assert macro that avoids unused variable warnings +#ifdef NDEBUG +// NOLINTNEXTLINE(cppcoreguidelines-macro-usage) +#define reactor_assert(x) \ + do { \ + (void)sizeof(x); \ + } while (0) +#else +// NOLINTNEXTLINE(cppcoreguidelines-macro-usage) +#define reactor_assert(x) assert(x) +#endif + namespace reactor { using EnvPhase = Environment::Phase; @@ -67,18 +79,6 @@ constexpr inline void validate([[maybe_unused]] bool condition, [[maybe_unused]] } } -// assert macro that avoids unused variable warnings -#ifdef NDEBUG -// NOLINTNEXTLINE(cppcoreguidelines-macro-usage) -#define reactor_assert(x) \ - do { \ - (void)sizeof(x); \ - } while (0) -#else -// NOLINTNEXTLINE(cppcoreguidelines-macro-usage) -#define reactor_assert(x) assert(x) -#endif - template constexpr auto extract_value(E enum_value) -> typename std::underlying_type::type { return static_cast::type>(enum_value); } From 8ffddae66ff71a89936bb42fb05cf8136a460bdd Mon Sep 17 00:00:00 2001 From: Christian Menard Date: Mon, 15 May 2023 15:00:54 +0200 Subject: [PATCH 3/5] factor out the event queue in its own class --- include/reactor-cpp/scheduler.hh | 35 ++++++++--- lib/scheduler.cc | 101 +++++++++++++++++-------------- 2 files changed, 84 insertions(+), 52 deletions(-) diff --git a/include/reactor-cpp/scheduler.hh b/include/reactor-cpp/scheduler.hh index 2d3f7fa9..f0ad9ce6 100644 --- a/include/reactor-cpp/scheduler.hh +++ b/include/reactor-cpp/scheduler.hh @@ -99,6 +99,33 @@ public: using ActionList = SafeVector; using ActionListPtr = std::unique_ptr; +class EventQueue { +private: + std::shared_mutex mutex_; + std::map event_queue_; + /// stores the actions triggered at the current tag + ActionListPtr triggered_actions_{nullptr}; + + std::vector action_list_pool_; + static constexpr std::size_t action_list_pool_increment_{10}; + + void fill_action_list_pool(); + +public: + EventQueue() { fill_action_list_pool(); } + + [[nodiscard]] auto empty() const -> bool { return event_queue_.empty(); } + [[nodiscard]] auto next_tag() const -> Tag; + + auto insert_event_at(const Tag& tag) -> const ActionListPtr&; + + // should only be called while holding the scheduler mutex + auto extract_next_event() -> ActionListPtr&&; + + // should only be called while holding the scheduler mutex + void return_action_list(ActionListPtr&& action_list); +}; + class Scheduler { // NOLINT private: const bool using_workers_; @@ -111,16 +138,10 @@ private: std::mutex scheduling_mutex_; std::condition_variable cv_schedule_; - std::shared_mutex mutex_event_queue_; - std::map event_queue_; + EventQueue event_queue_; /// stores the actions triggered at the current tag ActionListPtr triggered_actions_{nullptr}; - std::vector action_list_pool_; - static constexpr std::size_t action_list_pool_increment_{10}; - void fill_action_list_pool(); - auto insert_event_at(const Tag& tag) -> const ActionListPtr&; - std::vector> set_ports_; std::vector> triggered_reactions_; std::vector> reaction_queue_; diff --git a/lib/scheduler.cc b/lib/scheduler.cc index c1e07b65..7120908f 100644 --- a/lib/scheduler.cc +++ b/lib/scheduler.cc @@ -149,6 +149,49 @@ void ReadyQueue::fill_up(std::vector& ready_reactions) { } } +void EventQueue::fill_action_list_pool() { + for (std::size_t i{0}; i < action_list_pool_increment_; i++) { + action_list_pool_.emplace_back(std::make_unique()); + } +} + +auto EventQueue::next_tag() const -> Tag { + reactor_assert(!event_queue_.empty()); + return event_queue_.begin()->first; +} + +auto EventQueue::extract_next_event() -> ActionListPtr&& { + reactor_assert(!event_queue_.empty()); + return std::move(event_queue_.extract(event_queue_.begin()).mapped()); +} + +auto EventQueue::insert_event_at(const Tag& tag) -> const ActionListPtr& { + auto shared_lock = std::shared_lock(mutex_); + + auto event_it = event_queue_.find(tag); + if (event_it == event_queue_.end()) { + shared_lock.unlock(); + { + auto unique_lock = std::unique_lock(mutex_); + if (action_list_pool_.empty()) { + fill_action_list_pool(); + } + const auto& result = event_queue_.try_emplace(tag, std::move(action_list_pool_.back())); + if (result.second) { + action_list_pool_.pop_back(); + } + return result.first->second; + } + } else { + return event_it->second; + } +} + +void EventQueue::return_action_list(ActionListPtr&& action_list) { + reactor_assert(action_list != nullptr); + action_list_pool_.emplace_back(std::forward(action_list)); +} + void Scheduler::terminate_all_workers() { log_.debug() << "Send termination signal to all workers"; auto num_workers = environment_->num_workers(); @@ -260,8 +303,9 @@ void Scheduler::next() { // NOLINT while (triggered_actions_ == nullptr || triggered_actions_->empty()) { if (triggered_actions_ != nullptr) { - action_list_pool_.emplace_back(std::move(triggered_actions_)); + event_queue_.return_action_list(std::move(triggered_actions_)); } + reactor_assert(triggered_actions_ == nullptr); // shutdown if there are no more events in the queue if (event_queue_.empty() && !stop_) { @@ -278,25 +322,23 @@ void Scheduler::next() { // NOLINT continue_execution_ = false; log_.debug() << "Shutting down the scheduler"; Tag t_next = Tag::from_logical_time(logical_time_).delay(); - if (!event_queue_.empty() && t_next == event_queue_.begin()->first) { + if (!event_queue_.empty() && t_next == event_queue_.next_tag()) { log_.debug() << "Schedule the last round of reactions including all " "termination reactions"; - triggered_actions_ = std::move(event_queue_.begin()->second); - event_queue_.erase(event_queue_.begin()); + triggered_actions_ = std::move(event_queue_.extract_next_event()); log_.debug() << "advance logical time to tag " << t_next; logical_time_.advance_to(t_next); } else { return; } } else { - // find the next tag - auto t_next = event_queue_.begin()->first; + auto t_next = event_queue_.next_tag(); log_.debug() << "try to advance logical time to tag " << t_next; // synchronize with physical time if not in fast forward mode if (!environment_->fast_fwd_execution()) { bool result = PhysicalTimeBarrier::acquire_tag( - t_next, lock, cv_schedule_, [&t_next, this]() { return t_next != event_queue_.begin()->first; }); + t_next, lock, cv_schedule_, [&t_next, this]() { return t_next != event_queue_.next_tag(); }); // If acquire tag returns false, then a new event was inserted into the queue and we need to start over if (!result) { continue; @@ -307,10 +349,10 @@ void Scheduler::next() { // NOLINT bool result{true}; for (auto* action : environment_->input_actions_) { bool inner_result = action->acquire_tag(t_next, lock, cv_schedule_, - [&t_next, this]() { return t_next != event_queue_.begin()->first; }); + [&t_next, this]() { return t_next != event_queue_.next_tag(); }); // If the wait was aborted or if the next tag changed in the meantime, // we need to break from the loop and continue with the main loop. - if (!inner_result || t_next != event_queue_.begin()->first) { + if (!inner_result || t_next != event_queue_.next_tag()) { result = false; break; } @@ -320,12 +362,11 @@ void Scheduler::next() { // NOLINT continue; } - // Retrieve all events with tag equal to current logical time from the - // queue. + // Retrieve all triggered actions at the next tag. // We do not need to lock mutex_event_queue_ here, as the lock on // scheduling_mutex_ already ensures that no one can write to the event // queue. - triggered_actions_ = std::move(event_queue_.extract(event_queue_.begin()).mapped()); + triggered_actions_ = std::move(event_queue_.extract_next_event()); // advance logical time log_.debug() << "advance logical time to tag " << t_next; @@ -359,47 +400,17 @@ Scheduler::Scheduler(Environment* env) : using_workers_(env->num_workers() > 1) , environment_(env) , log_("Scheduler " + env->name()) - , ready_queue_(log_, env->num_workers()) { - fill_action_list_pool(); -} + , ready_queue_(log_, env->num_workers()) {} Scheduler::~Scheduler() = default; -void Scheduler::fill_action_list_pool() { - for (std::size_t i{0}; i < action_list_pool_increment_; i++) { - action_list_pool_.emplace_back(std::make_unique()); - } -} - -auto Scheduler::insert_event_at(const Tag& tag) -> const ActionListPtr& { - auto shared_lock = std::shared_lock(mutex_event_queue_); - - auto event_it = event_queue_.find(tag); - if (event_it == event_queue_.end()) { - shared_lock.unlock(); - { - auto unique_lock = std::unique_lock(mutex_event_queue_); - if (action_list_pool_.empty()) { - fill_action_list_pool(); - } - const auto& result = event_queue_.try_emplace(tag, std::move(action_list_pool_.back())); - if (result.second) { - action_list_pool_.pop_back(); - } - return result.first->second; - } - } else { - return event_it->second; - } -} - void Scheduler::schedule_sync(BaseAction* action, const Tag& tag) { log_.debug() << "Schedule action " << action->fqn() << (action->is_logical() ? " synchronously " : " asynchronously ") << " with tag " << tag; reactor_assert(logical_time_ < tag); tracepoint(reactor_cpp, schedule_action, action->container()->fqn(), action->name(), tag); - const auto& action_list = insert_event_at(tag); + const auto& action_list = event_queue_.insert_event_at(tag); action_list->push_back(action); } @@ -436,7 +447,7 @@ auto Scheduler::schedule_empty_async_at(const Tag& tag) -> bool { // processed. return tag == logical_time_; } - insert_event_at(tag); + event_queue_.insert_event_at(tag); } notify(); return true; From 58fed2b888c87acd658f196594adfe34049e4e1a Mon Sep 17 00:00:00 2001 From: Christian Menard Date: Mon, 15 May 2023 15:53:42 +0200 Subject: [PATCH 4/5] fix race condition between starting enclaves --- include/reactor-cpp/scheduler.hh | 3 +++ lib/scheduler.cc | 30 ++++++++++++++++++++++++++---- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/include/reactor-cpp/scheduler.hh b/include/reactor-cpp/scheduler.hh index f0ad9ce6..a4f11482 100644 --- a/include/reactor-cpp/scheduler.hh +++ b/include/reactor-cpp/scheduler.hh @@ -124,6 +124,9 @@ public: // should only be called while holding the scheduler mutex void return_action_list(ActionListPtr&& action_list); + + // should only be called while holding the scheduler mutex + void discard_events_until_tag(const Tag& tag); }; class Scheduler { // NOLINT diff --git a/lib/scheduler.cc b/lib/scheduler.cc index 7120908f..c5711728 100644 --- a/lib/scheduler.cc +++ b/lib/scheduler.cc @@ -192,6 +192,13 @@ void EventQueue::return_action_list(ActionListPtr&& action_list) { action_list_pool_.emplace_back(std::forward(action_list)); } +void EventQueue::discard_events_until_tag(const Tag& tag) { + while (!empty() && next_tag() <= tag) { + auto actions = extract_next_event(); + return_action_list(std::move(actions)); + } +} + void Scheduler::terminate_all_workers() { log_.debug() << "Send termination signal to all workers"; auto num_workers = environment_->num_workers(); @@ -245,10 +252,25 @@ auto Scheduler::schedule_ready_reactions() -> bool { void Scheduler::start() { log_.debug() << "Starting the scheduler..."; - // Initialize our logical time to the value right before the start tag. This - // is important for usage with enclaves/federates, to indicate, that no events - // before the start tag ca be generated. - logical_time_.advance_to(environment_->start_tag().decrement()); + { + // Other schedulers (enclaves or federates) could try to access our logical + // time and our event queue. Thus, we need to lock the main scheduling mutex + // in order to avoid data races. + std::lock_guard lock_guard(scheduling_mutex_); + + // Initialize our logical time to the value right before the start tag. This + // is important for usage with enclaves/federates, to indicate, that no events + // before the start tag can be generated. + logical_time_.advance_to(environment_->start_tag().decrement()); + + // It could happen that another scheduler (enclave or federates) already + // tried to acquire a tag before our start tag. In that case, we will have + // empty events on the queue, that are earlier than our startup event. We + // resolve this simply by deleting all such events. Once we have processed + // the startup reactions, the start tag will be released, and consequently + // also all earlier tags are released. + event_queue_.discard_events_until_tag(Tag::from_logical_time(logical_time_)); + } auto num_workers = environment_->num_workers(); // initialize the reaction queue, set ports vector, and triggered reactions From 1d17d9c5439d57e00145798e4dbd87d18342b891 Mon Sep 17 00:00:00 2001 From: Christian Menard Date: Mon, 15 May 2023 17:30:41 +0200 Subject: [PATCH 5/5] fix segfault --- include/reactor-cpp/scheduler.hh | 2 +- lib/scheduler.cc | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/include/reactor-cpp/scheduler.hh b/include/reactor-cpp/scheduler.hh index a4f11482..256c76e6 100644 --- a/include/reactor-cpp/scheduler.hh +++ b/include/reactor-cpp/scheduler.hh @@ -120,7 +120,7 @@ public: auto insert_event_at(const Tag& tag) -> const ActionListPtr&; // should only be called while holding the scheduler mutex - auto extract_next_event() -> ActionListPtr&&; + auto extract_next_event() -> ActionListPtr; // should only be called while holding the scheduler mutex void return_action_list(ActionListPtr&& action_list); diff --git a/lib/scheduler.cc b/lib/scheduler.cc index c5711728..c3eb55f6 100644 --- a/lib/scheduler.cc +++ b/lib/scheduler.cc @@ -160,7 +160,7 @@ auto EventQueue::next_tag() const -> Tag { return event_queue_.begin()->first; } -auto EventQueue::extract_next_event() -> ActionListPtr&& { +auto EventQueue::extract_next_event() -> ActionListPtr { reactor_assert(!event_queue_.empty()); return std::move(event_queue_.extract(event_queue_.begin()).mapped()); } @@ -347,7 +347,7 @@ void Scheduler::next() { // NOLINT if (!event_queue_.empty() && t_next == event_queue_.next_tag()) { log_.debug() << "Schedule the last round of reactions including all " "termination reactions"; - triggered_actions_ = std::move(event_queue_.extract_next_event()); + triggered_actions_ = event_queue_.extract_next_event(); log_.debug() << "advance logical time to tag " << t_next; logical_time_.advance_to(t_next); } else { @@ -388,7 +388,7 @@ void Scheduler::next() { // NOLINT // We do not need to lock mutex_event_queue_ here, as the lock on // scheduling_mutex_ already ensures that no one can write to the event // queue. - triggered_actions_ = std::move(event_queue_.extract_next_event()); + triggered_actions_ = event_queue_.extract_next_event(); // advance logical time log_.debug() << "advance logical time to tag " << t_next;