Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factored event queue into its own class and fixed race condition between multiple starting enclaves #45

Merged
merged 5 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions include/reactor-cpp/assert.hh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ constexpr bool runtime_assertion = true;
#include <unistd.h>
#endif

// assert macro that avoids unused variable warnings
#ifdef NDEBUG
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define reactor_assert(x) \
do { \
(void)sizeof(x); \
} while (0)
#else
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define reactor_assert(x) assert(x)
#endif

namespace reactor {
using EnvPhase = Environment::Phase;

Expand Down Expand Up @@ -67,18 +79,6 @@ constexpr inline void validate([[maybe_unused]] bool condition, [[maybe_unused]]
}
}

// assert macro that avoids unused variable warnings
#ifdef NDEBUG
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define reactor_assert(x) \
do { \
(void)sizeof(x); \
} while (0)
#else
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define reactor_assert(x) assert(x)
#endif

template <typename E> constexpr auto extract_value(E enum_value) -> typename std::underlying_type<E>::type {
return static_cast<typename std::underlying_type<E>::type>(enum_value);
}
Expand Down
42 changes: 31 additions & 11 deletions include/reactor-cpp/scheduler.hh
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,36 @@ public:
using ActionList = SafeVector<BaseAction*>;
using ActionListPtr = std::unique_ptr<ActionList>;

class EventQueue {
private:
std::shared_mutex mutex_;
std::map<Tag, ActionListPtr> event_queue_;
/// stores the actions triggered at the current tag
ActionListPtr triggered_actions_{nullptr};

std::vector<ActionListPtr> action_list_pool_;
static constexpr std::size_t action_list_pool_increment_{10};

void fill_action_list_pool();

public:
EventQueue() { fill_action_list_pool(); }

[[nodiscard]] auto empty() const -> bool { return event_queue_.empty(); }
[[nodiscard]] auto next_tag() const -> Tag;

auto insert_event_at(const Tag& tag) -> const ActionListPtr&;

// should only be called while holding the scheduler mutex
auto extract_next_event() -> ActionListPtr;

// should only be called while holding the scheduler mutex
void return_action_list(ActionListPtr&& action_list);

// should only be called while holding the scheduler mutex
void discard_events_until_tag(const Tag& tag);
};

class Scheduler { // NOLINT
private:
const bool using_workers_;
Expand All @@ -110,21 +140,11 @@ private:

std::mutex scheduling_mutex_;
std::condition_variable cv_schedule_;
// lock used to protect the scheduler from asynchronous requests (i.e. from
// enclaves pr federates). We hold the mutex from construction and release in
// start().
std::unique_lock<std::mutex> startup_lock_{scheduling_mutex_};

std::shared_mutex mutex_event_queue_;
std::map<Tag, ActionListPtr> event_queue_;
EventQueue event_queue_;
/// stores the actions triggered at the current tag
ActionListPtr triggered_actions_{nullptr};

std::vector<ActionListPtr> action_list_pool_;
static constexpr std::size_t action_list_pool_increment_{10};
void fill_action_list_pool();
auto insert_event_at(const Tag& tag) -> const ActionListPtr&;

std::vector<std::vector<BasePort*>> set_ports_;
std::vector<std::vector<Reaction*>> triggered_reactions_;
std::vector<std::vector<Reaction*>> reaction_queue_;
Expand Down
136 changes: 82 additions & 54 deletions lib/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,56 @@ void ReadyQueue::fill_up(std::vector<Reaction*>& ready_reactions) {
}
}

void EventQueue::fill_action_list_pool() {
for (std::size_t i{0}; i < action_list_pool_increment_; i++) {
action_list_pool_.emplace_back(std::make_unique<ActionList>());
}
}

auto EventQueue::next_tag() const -> Tag {
reactor_assert(!event_queue_.empty());
return event_queue_.begin()->first;
}

auto EventQueue::extract_next_event() -> ActionListPtr {
reactor_assert(!event_queue_.empty());
return std::move(event_queue_.extract(event_queue_.begin()).mapped());
}

auto EventQueue::insert_event_at(const Tag& tag) -> const ActionListPtr& {
auto shared_lock = std::shared_lock<std::shared_mutex>(mutex_);

auto event_it = event_queue_.find(tag);
if (event_it == event_queue_.end()) {
shared_lock.unlock();
{
auto unique_lock = std::unique_lock<std::shared_mutex>(mutex_);
if (action_list_pool_.empty()) {
fill_action_list_pool();
}
const auto& result = event_queue_.try_emplace(tag, std::move(action_list_pool_.back()));
if (result.second) {
action_list_pool_.pop_back();
}
return result.first->second;
}
} else {
return event_it->second;
}
}

void EventQueue::return_action_list(ActionListPtr&& action_list) {
reactor_assert(action_list != nullptr);
action_list_pool_.emplace_back(std::forward<ActionListPtr>(action_list));
}

void EventQueue::discard_events_until_tag(const Tag& tag) {
while (!empty() && next_tag() <= tag) {
auto actions = extract_next_event();
return_action_list(std::move(actions));
}
}

void Scheduler::terminate_all_workers() {
log_.debug() << "Send termination signal to all workers";
auto num_workers = environment_->num_workers();
Expand Down Expand Up @@ -202,10 +252,25 @@ auto Scheduler::schedule_ready_reactions() -> bool {
void Scheduler::start() {
log_.debug() << "Starting the scheduler...";

// Initialize our logical time to the value right before the start tag. This
// is important for usage with enclaves/federates, to indicate, that no events
// before the start tag ca be generated.
logical_time_.advance_to(environment_->start_tag().decrement());
{
// Other schedulers (enclaves or federates) could try to access our logical
// time and our event queue. Thus, we need to lock the main scheduling mutex
// in order to avoid data races.
std::lock_guard<std::mutex> lock_guard(scheduling_mutex_);

// Initialize our logical time to the value right before the start tag. This
// is important for usage with enclaves/federates, to indicate, that no events
// before the start tag can be generated.
logical_time_.advance_to(environment_->start_tag().decrement());

// It could happen that another scheduler (enclave or federates) already
// tried to acquire a tag before our start tag. In that case, we will have
// empty events on the queue, that are earlier than our startup event. We
// resolve this simply by deleting all such events. Once we have processed
// the startup reactions, the start tag will be released, and consequently
// also all earlier tags are released.
event_queue_.discard_events_until_tag(Tag::from_logical_time(logical_time_));
}

auto num_workers = environment_->num_workers();
// initialize the reaction queue, set ports vector, and triggered reactions
Expand All @@ -214,11 +279,6 @@ void Scheduler::start() {
set_ports_.resize(num_workers);
triggered_reactions_.resize(num_workers);

// release the scheduling mutex, allowing other asynchronous processes (i.e.
// enclaves or federates) to access the event queue and the current logical
// time.
startup_lock_.unlock();

// Initialize and start the workers. By resizing the workers vector first,
// we make sure that there is sufficient space for all the workers and non of
// them needs to be moved. This is important because a running worker may not
Expand Down Expand Up @@ -265,8 +325,9 @@ void Scheduler::next() { // NOLINT

while (triggered_actions_ == nullptr || triggered_actions_->empty()) {
if (triggered_actions_ != nullptr) {
action_list_pool_.emplace_back(std::move(triggered_actions_));
event_queue_.return_action_list(std::move(triggered_actions_));
}
reactor_assert(triggered_actions_ == nullptr);

// shutdown if there are no more events in the queue
if (event_queue_.empty() && !stop_) {
Expand All @@ -283,25 +344,23 @@ void Scheduler::next() { // NOLINT
continue_execution_ = false;
log_.debug() << "Shutting down the scheduler";
Tag t_next = Tag::from_logical_time(logical_time_).delay();
if (!event_queue_.empty() && t_next == event_queue_.begin()->first) {
if (!event_queue_.empty() && t_next == event_queue_.next_tag()) {
log_.debug() << "Schedule the last round of reactions including all "
"termination reactions";
triggered_actions_ = std::move(event_queue_.begin()->second);
event_queue_.erase(event_queue_.begin());
triggered_actions_ = event_queue_.extract_next_event();
log_.debug() << "advance logical time to tag " << t_next;
logical_time_.advance_to(t_next);
} else {
return;
}
} else {
// find the next tag
auto t_next = event_queue_.begin()->first;
auto t_next = event_queue_.next_tag();
log_.debug() << "try to advance logical time to tag " << t_next;

// synchronize with physical time if not in fast forward mode
if (!environment_->fast_fwd_execution()) {
bool result = PhysicalTimeBarrier::acquire_tag(
t_next, lock, cv_schedule_, [&t_next, this]() { return t_next != event_queue_.begin()->first; });
t_next, lock, cv_schedule_, [&t_next, this]() { return t_next != event_queue_.next_tag(); });
// If acquire tag returns false, then a new event was inserted into the queue and we need to start over
if (!result) {
continue;
Expand All @@ -312,10 +371,10 @@ void Scheduler::next() { // NOLINT
bool result{true};
for (auto* action : environment_->input_actions_) {
bool inner_result = action->acquire_tag(t_next, lock, cv_schedule_,
[&t_next, this]() { return t_next != event_queue_.begin()->first; });
[&t_next, this]() { return t_next != event_queue_.next_tag(); });
// If the wait was aborted or if the next tag changed in the meantime,
// we need to break from the loop and continue with the main loop.
if (!inner_result || t_next != event_queue_.begin()->first) {
if (!inner_result || t_next != event_queue_.next_tag()) {
result = false;
break;
}
Expand All @@ -325,12 +384,11 @@ void Scheduler::next() { // NOLINT
continue;
}

// Retrieve all events with tag equal to current logical time from the
// queue.
// Retrieve all triggered actions at the next tag.
// We do not need to lock mutex_event_queue_ here, as the lock on
// scheduling_mutex_ already ensures that no one can write to the event
// queue.
triggered_actions_ = std::move(event_queue_.extract(event_queue_.begin()).mapped());
triggered_actions_ = event_queue_.extract_next_event();

// advance logical time
log_.debug() << "advance logical time to tag " << t_next;
Expand Down Expand Up @@ -364,47 +422,17 @@ Scheduler::Scheduler(Environment* env)
: using_workers_(env->num_workers() > 1)
, environment_(env)
, log_("Scheduler " + env->name())
, ready_queue_(log_, env->num_workers()) {
fill_action_list_pool();
}
, ready_queue_(log_, env->num_workers()) {}

Scheduler::~Scheduler() = default;

void Scheduler::fill_action_list_pool() {
for (std::size_t i{0}; i < action_list_pool_increment_; i++) {
action_list_pool_.emplace_back(std::make_unique<ActionList>());
}
}

auto Scheduler::insert_event_at(const Tag& tag) -> const ActionListPtr& {
auto shared_lock = std::shared_lock<std::shared_mutex>(mutex_event_queue_);

auto event_it = event_queue_.find(tag);
if (event_it == event_queue_.end()) {
shared_lock.unlock();
{
auto unique_lock = std::unique_lock<std::shared_mutex>(mutex_event_queue_);
if (action_list_pool_.empty()) {
fill_action_list_pool();
}
const auto& result = event_queue_.try_emplace(tag, std::move(action_list_pool_.back()));
if (result.second) {
action_list_pool_.pop_back();
}
return result.first->second;
}
} else {
return event_it->second;
}
}

void Scheduler::schedule_sync(BaseAction* action, const Tag& tag) {
log_.debug() << "Schedule action " << action->fqn() << (action->is_logical() ? " synchronously " : " asynchronously ")
<< " with tag " << tag;
reactor_assert(logical_time_ < tag);
tracepoint(reactor_cpp, schedule_action, action->container()->fqn(), action->name(), tag);

const auto& action_list = insert_event_at(tag);
const auto& action_list = event_queue_.insert_event_at(tag);
action_list->push_back(action);
}

Expand Down Expand Up @@ -441,7 +469,7 @@ auto Scheduler::schedule_empty_async_at(const Tag& tag) -> bool {
// processed.
return tag == logical_time_;
}
insert_event_at(tag);
event_queue_.insert_event_at(tag);
}
notify();
return true;
Expand Down