Skip to content

Commit

Permalink
More intuitive approach ...
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com>
  • Loading branch information
ivanpauno committed Jan 15, 2021
1 parent c7ad013 commit 6883d9a
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 63 deletions.
57 changes: 2 additions & 55 deletions rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,66 +81,13 @@ class MultiThreadedExecutor : public rclcpp::Executor
private:
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)

/// \internal A mutex that has two locking mechanism, one with higher priority than the other.
/**
* After the current mutex owner release the lock, a thread that used the high
* priority mechanism will have priority over threads that used the low priority mechanism.
*/
class MutexTwoPriorities {
public:
class HpMutex
{
public:
HpMutex(MutexTwoPriorities & parent) : parent_(parent) {}

void lock() {
parent_.data_.lock();
}

void unlock() {
parent_.data_.unlock();
}
private:
MutexTwoPriorities & parent_;
};

class LpMutex
{
public:
LpMutex(MutexTwoPriorities & parent) : parent_(parent) {}

void lock() {
// low_prio_.lock(); data_.lock();
std::unique_lock<std::mutex> lpg{parent_.low_prio_};
parent_.data_.lock();
lpg.release();
}

void unlock() {
// data_.unlock(); low_prio_.unlock()
std::lock_guard<std::mutex> lpg{parent_.low_prio_, std::adopt_lock};
parent_.data_.unlock();
}
private:
MutexTwoPriorities & parent_;
};

HpMutex hp() {return HpMutex{*this};}
LpMutex lp() {return LpMutex{*this};}

private:
// Implementation detail: the whole idea here is that only one low priority thread can be
// trying to take the data_ mutex, while all high priority threads are already waiting there.
std::mutex low_prio_;
std::mutex data_;
};

MutexTwoPriorities wait_mutex_;
std::mutex wait_mutex_;
size_t number_of_threads_;
bool yield_before_execute_;
std::chrono::nanoseconds next_exec_timeout_;

std::set<TimerBase::SharedPtr> scheduled_timers_;
std::mutex scheduled_timers_mutex_;
};

} // namespace executors
Expand Down
26 changes: 18 additions & 8 deletions rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,14 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {}
void
MultiThreadedExecutor::spin()
{
using MutexTwoPriorities = rclcpp::executors::MultiThreadedExecutor::MutexTwoPriorities;
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCLCPP_SCOPE_EXIT(this->spinning.store(false); );
std::vector<std::thread> threads;
size_t thread_id = 0;
{
auto lp_wait_mutex = wait_mutex_.lp();
std::lock_guard<MutexTwoPriorities::LpMutex> wait_lock(lp_wait_mutex);
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
threads.emplace_back(func);
Expand All @@ -75,12 +73,10 @@ MultiThreadedExecutor::get_number_of_threads()
void
MultiThreadedExecutor::run(size_t)
{
using MutexTwoPriorities = rclcpp::executors::MultiThreadedExecutor::MutexTwoPriorities;
while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_exec;
{
auto lp_wait_mutex = wait_mutex_.lp();
std::lock_guard<MutexTwoPriorities::LpMutex> wait_lock(lp_wait_mutex);
std::lock_guard<std::mutex> wait_guard{wait_mutex_};
if (!rclcpp::ok(this->context_) || !spinning.load()) {
return;
}
Expand All @@ -89,6 +85,13 @@ MultiThreadedExecutor::run(size_t)
}
if (any_exec.timer) {
// Guard against multiple threads getting the same timer.

// THIS MUST BE DONE WITH BOTH THE WAIT_MUTEX AND SCHEDULED TIMERS MUTEX TAKEN!!
// It might seem unnecessary, but you avoid the following race:
// Thread A: get timer, context switch.
// Thread B: get timer, insert scheduled timer, execute timer, remove scheduled timer.
// Thread A: execute timer.
std::lock_guard<std::mutex> scheduled_timers_guard{scheduled_timers_mutex_};
if (scheduled_timers_.count(any_exec.timer) != 0) {
// Make sure that any_exec's callback group is reset before
// the lock is released.
Expand All @@ -107,8 +110,15 @@ MultiThreadedExecutor::run(size_t)
execute_any_executable(any_exec);

if (any_exec.timer) {
auto hp_wait_mutex = wait_mutex_.hp();
std::lock_guard<MutexTwoPriorities::HpMutex> wait_lock(hp_wait_mutex);
// DON'T DELETE THE scheduled_timers_mutex_ AND REPLACE IT WITH THE wait_mutex_ here.
// Now, this mutex will only compete with ONE worker thread that's is trying to insert
// a timer.
// (and also, N other worker threads also trying to delete a timer).
// If the wait_mutex_ is used here, this will compete with all the other worker threads!!!
// If we're waiting too long too remove the scheduled timer, maybe we are discarding a timer
// execution that we shouldn't!
// Of course, this can still happen if the callback is too long ...
std::lock_guard<std::mutex> scheduled_timers_guard{scheduled_timers_mutex_};
auto it = scheduled_timers_.find(any_exec.timer);
if (it != scheduled_timers_.end()) {
scheduled_timers_.erase(it);
Expand Down

0 comments on commit 6883d9a

Please sign in to comment.