From bf49bc51d60a1abd503df275286a954834ff3f31 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Fri, 15 Jan 2021 11:03:35 -0300 Subject: [PATCH 01/10] Use mutex with two different priorities in the multithreaded executor Signed-off-by: Ivan Santiago Paunovic --- .../executors/multi_threaded_executor.hpp | 62 ++++++++++++++++++- .../executors/multi_threaded_executor.cpp | 11 +++- 2 files changed, 69 insertions(+), 4 deletions(-) diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index c18df5b7bc..1651aa9c4b 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -81,7 +81,67 @@ class MultiThreadedExecutor : public rclcpp::Executor private: RCLCPP_DISABLE_COPY(MultiThreadedExecutor) - std::mutex wait_mutex_; + /// \internal A mutex that has two locking mechanism, one with higher priority than the other. + /** + * After the current mutex owner release the lock, a thread that used the high + * priority mechanism will have priority over threads that used the low priority mechanism. + */ + class MutexTwoPriorities { +public: + class HpMutex + { +public: + HpMutex(MutexTwoPriorities & parent) : parent_(parent) {} + + void lock() { + // next_to_access_.lock; data_.lock; next_to_access.unlock(); + std::lock_guard nag{parent_.next_to_access_}; + parent_.data_.lock(); + } + + void unlock() { + parent_.data_.unlock(); + } +private: + MutexTwoPriorities & parent_; + }; + + class LpMutex + { +public: + LpMutex(MutexTwoPriorities & parent) : parent_(parent) {} + + void lock() { + // low_prio_.lock(); next_to_access_.lock(); data_.lock(); next_to_access_.unlock(); + + // The whole trick here is that only one low priority thread can be waiting to take the + // data mutex, while all high priority thread can rapidly pass the next_to_access mutex to + // wait on the data mutex. + std::unique_lock lpg{parent_.low_prio_}; + std::lock_guard nag{parent_.next_to_access_}; + parent_.data_.lock(); + lpg.release(); + } + + void unlock() { + // data_.unlock(); low_prio_.unlock() + std::lock_guard lpg{parent_.low_prio_, std::adopt_lock}; + parent_.data_.unlock(); + } +private: + MutexTwoPriorities & parent_; + }; + + HpMutex hp() {return HpMutex{*this};} + LpMutex lp() {return LpMutex{*this};} + +private: + std::mutex low_prio_; + std::mutex next_to_access_; + std::mutex data_; + }; + + MutexTwoPriorities wait_mutex_; size_t number_of_threads_; bool yield_before_execute_; std::chrono::nanoseconds next_exec_timeout_; diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 0dfdc35467..34815e62ae 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -44,6 +44,7 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {} void MultiThreadedExecutor::spin() { + using MutexTwoPriorities = rclcpp::executors::MultiThreadedExecutor::MutexTwoPriorities; if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); } @@ -51,7 +52,8 @@ MultiThreadedExecutor::spin() std::vector threads; size_t thread_id = 0; { - std::lock_guard wait_lock(wait_mutex_); + auto lp_wait_mutex = wait_mutex_.lp(); + std::lock_guard wait_lock(lp_wait_mutex); for (; thread_id < number_of_threads_ - 1; ++thread_id) { auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); threads.emplace_back(func); @@ -73,10 +75,12 @@ MultiThreadedExecutor::get_number_of_threads() void MultiThreadedExecutor::run(size_t) { + using MutexTwoPriorities = rclcpp::executors::MultiThreadedExecutor::MutexTwoPriorities; while (rclcpp::ok(this->context_) && spinning.load()) { rclcpp::AnyExecutable any_exec; { - std::lock_guard wait_lock(wait_mutex_); + auto lp_wait_mutex = wait_mutex_.lp(); + std::lock_guard wait_lock(lp_wait_mutex); if (!rclcpp::ok(this->context_) || !spinning.load()) { return; } @@ -103,7 +107,8 @@ MultiThreadedExecutor::run(size_t) execute_any_executable(any_exec); if (any_exec.timer) { - std::lock_guard wait_lock(wait_mutex_); + auto hp_wait_mutex = wait_mutex_.hp(); + std::lock_guard wait_lock(hp_wait_mutex); auto it = scheduled_timers_.find(any_exec.timer); if (it != scheduled_timers_.end()) { scheduled_timers_.erase(it); From c7ad013f46514a12fd1e1f3b56bbbd195c73c9b6 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Fri, 15 Jan 2021 11:12:20 -0300 Subject: [PATCH 02/10] Simplify the two priorities mutex Signed-off-by: Ivan Santiago Paunovic --- .../rclcpp/executors/multi_threaded_executor.hpp | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 1651aa9c4b..36e756bb16 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -94,8 +94,6 @@ class MultiThreadedExecutor : public rclcpp::Executor HpMutex(MutexTwoPriorities & parent) : parent_(parent) {} void lock() { - // next_to_access_.lock; data_.lock; next_to_access.unlock(); - std::lock_guard nag{parent_.next_to_access_}; parent_.data_.lock(); } @@ -112,13 +110,8 @@ class MultiThreadedExecutor : public rclcpp::Executor LpMutex(MutexTwoPriorities & parent) : parent_(parent) {} void lock() { - // low_prio_.lock(); next_to_access_.lock(); data_.lock(); next_to_access_.unlock(); - - // The whole trick here is that only one low priority thread can be waiting to take the - // data mutex, while all high priority thread can rapidly pass the next_to_access mutex to - // wait on the data mutex. + // low_prio_.lock(); data_.lock(); std::unique_lock lpg{parent_.low_prio_}; - std::lock_guard nag{parent_.next_to_access_}; parent_.data_.lock(); lpg.release(); } @@ -136,8 +129,9 @@ class MultiThreadedExecutor : public rclcpp::Executor LpMutex lp() {return LpMutex{*this};} private: + // Implementation detail: the whole idea here is that only one low priority thread can be + // trying to take the data_ mutex, while all high priority threads are already waiting there. std::mutex low_prio_; - std::mutex next_to_access_; std::mutex data_; }; From 6883d9a4240b036735c88fb85e568221b1b88e10 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Fri, 15 Jan 2021 11:40:34 -0300 Subject: [PATCH 03/10] More intuitive approach ... Signed-off-by: Ivan Santiago Paunovic --- .../executors/multi_threaded_executor.hpp | 57 +------------------ .../executors/multi_threaded_executor.cpp | 26 ++++++--- 2 files changed, 20 insertions(+), 63 deletions(-) diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 36e756bb16..8ae3b44d7e 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -81,66 +81,13 @@ class MultiThreadedExecutor : public rclcpp::Executor private: RCLCPP_DISABLE_COPY(MultiThreadedExecutor) - /// \internal A mutex that has two locking mechanism, one with higher priority than the other. - /** - * After the current mutex owner release the lock, a thread that used the high - * priority mechanism will have priority over threads that used the low priority mechanism. - */ - class MutexTwoPriorities { -public: - class HpMutex - { -public: - HpMutex(MutexTwoPriorities & parent) : parent_(parent) {} - - void lock() { - parent_.data_.lock(); - } - - void unlock() { - parent_.data_.unlock(); - } -private: - MutexTwoPriorities & parent_; - }; - - class LpMutex - { -public: - LpMutex(MutexTwoPriorities & parent) : parent_(parent) {} - - void lock() { - // low_prio_.lock(); data_.lock(); - std::unique_lock lpg{parent_.low_prio_}; - parent_.data_.lock(); - lpg.release(); - } - - void unlock() { - // data_.unlock(); low_prio_.unlock() - std::lock_guard lpg{parent_.low_prio_, std::adopt_lock}; - parent_.data_.unlock(); - } -private: - MutexTwoPriorities & parent_; - }; - - HpMutex hp() {return HpMutex{*this};} - LpMutex lp() {return LpMutex{*this};} - -private: - // Implementation detail: the whole idea here is that only one low priority thread can be - // trying to take the data_ mutex, while all high priority threads are already waiting there. - std::mutex low_prio_; - std::mutex data_; - }; - - MutexTwoPriorities wait_mutex_; + std::mutex wait_mutex_; size_t number_of_threads_; bool yield_before_execute_; std::chrono::nanoseconds next_exec_timeout_; std::set scheduled_timers_; + std::mutex scheduled_timers_mutex_; }; } // namespace executors diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 34815e62ae..233d2149f0 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -44,7 +44,6 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {} void MultiThreadedExecutor::spin() { - using MutexTwoPriorities = rclcpp::executors::MultiThreadedExecutor::MutexTwoPriorities; if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); } @@ -52,8 +51,7 @@ MultiThreadedExecutor::spin() std::vector threads; size_t thread_id = 0; { - auto lp_wait_mutex = wait_mutex_.lp(); - std::lock_guard wait_lock(lp_wait_mutex); + std::lock_guard wait_lock(wait_mutex_); for (; thread_id < number_of_threads_ - 1; ++thread_id) { auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); threads.emplace_back(func); @@ -75,12 +73,10 @@ MultiThreadedExecutor::get_number_of_threads() void MultiThreadedExecutor::run(size_t) { - using MutexTwoPriorities = rclcpp::executors::MultiThreadedExecutor::MutexTwoPriorities; while (rclcpp::ok(this->context_) && spinning.load()) { rclcpp::AnyExecutable any_exec; { - auto lp_wait_mutex = wait_mutex_.lp(); - std::lock_guard wait_lock(lp_wait_mutex); + std::lock_guard wait_guard{wait_mutex_}; if (!rclcpp::ok(this->context_) || !spinning.load()) { return; } @@ -89,6 +85,13 @@ MultiThreadedExecutor::run(size_t) } if (any_exec.timer) { // Guard against multiple threads getting the same timer. + + // THIS MUST BE DONE WITH BOTH THE WAIT_MUTEX AND SCHEDULED TIMERS MUTEX TAKEN!! + // It might seem unnecessary, but you avoid the following race: + // Thread A: get timer, context switch. + // Thread B: get timer, insert scheduled timer, execute timer, remove scheduled timer. + // Thread A: execute timer. + std::lock_guard scheduled_timers_guard{scheduled_timers_mutex_}; if (scheduled_timers_.count(any_exec.timer) != 0) { // Make sure that any_exec's callback group is reset before // the lock is released. @@ -107,8 +110,15 @@ MultiThreadedExecutor::run(size_t) execute_any_executable(any_exec); if (any_exec.timer) { - auto hp_wait_mutex = wait_mutex_.hp(); - std::lock_guard wait_lock(hp_wait_mutex); + // DON'T DELETE THE scheduled_timers_mutex_ AND REPLACE IT WITH THE wait_mutex_ here. + // Now, this mutex will only compete with ONE worker thread that's is trying to insert + // a timer. + // (and also, N other worker threads also trying to delete a timer). + // If the wait_mutex_ is used here, this will compete with all the other worker threads!!! + // If we're waiting too long too remove the scheduled timer, maybe we are discarding a timer + // execution that we shouldn't! + // Of course, this can still happen if the callback is too long ... + std::lock_guard scheduled_timers_guard{scheduled_timers_mutex_}; auto it = scheduled_timers_.find(any_exec.timer); if (it != scheduled_timers_.end()) { scheduled_timers_.erase(it); From 8a49e3660ca6a6e33528dffd6b07953abfcf1781 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Fri, 15 Jan 2021 11:45:37 -0300 Subject: [PATCH 04/10] nit Signed-off-by: Ivan Santiago Paunovic --- rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 233d2149f0..fac17f04c0 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -113,7 +113,7 @@ MultiThreadedExecutor::run(size_t) // DON'T DELETE THE scheduled_timers_mutex_ AND REPLACE IT WITH THE wait_mutex_ here. // Now, this mutex will only compete with ONE worker thread that's is trying to insert // a timer. - // (and also, N other worker threads also trying to delete a timer). + // (and also, N other worker threads trying to delete a timer). // If the wait_mutex_ is used here, this will compete with all the other worker threads!!! // If we're waiting too long too remove the scheduled timer, maybe we are discarding a timer // execution that we shouldn't! From d2f4a91115902d7c420b2c0b5647d7fcf376fe86 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Mon, 18 Jan 2021 09:49:30 -0300 Subject: [PATCH 05/10] Revert "nit" This reverts commit 8a49e3660ca6a6e33528dffd6b07953abfcf1781. Signed-off-by: Ivan Santiago Paunovic --- rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index fac17f04c0..233d2149f0 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -113,7 +113,7 @@ MultiThreadedExecutor::run(size_t) // DON'T DELETE THE scheduled_timers_mutex_ AND REPLACE IT WITH THE wait_mutex_ here. // Now, this mutex will only compete with ONE worker thread that's is trying to insert // a timer. - // (and also, N other worker threads trying to delete a timer). + // (and also, N other worker threads also trying to delete a timer). // If the wait_mutex_ is used here, this will compete with all the other worker threads!!! // If we're waiting too long too remove the scheduled timer, maybe we are discarding a timer // execution that we shouldn't! From bfad43b741fc77a0efd0823cde9d3bc6677ce9c6 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Mon, 18 Jan 2021 09:49:49 -0300 Subject: [PATCH 06/10] Revert "More intuitive approach ..." This reverts commit 6883d9a4240b036735c88fb85e568221b1b88e10. Signed-off-by: Ivan Santiago Paunovic --- .../executors/multi_threaded_executor.hpp | 57 ++++++++++++++++++- .../executors/multi_threaded_executor.cpp | 26 +++------ 2 files changed, 63 insertions(+), 20 deletions(-) diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 8ae3b44d7e..36e756bb16 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -81,13 +81,66 @@ class MultiThreadedExecutor : public rclcpp::Executor private: RCLCPP_DISABLE_COPY(MultiThreadedExecutor) - std::mutex wait_mutex_; + /// \internal A mutex that has two locking mechanism, one with higher priority than the other. + /** + * After the current mutex owner release the lock, a thread that used the high + * priority mechanism will have priority over threads that used the low priority mechanism. + */ + class MutexTwoPriorities { +public: + class HpMutex + { +public: + HpMutex(MutexTwoPriorities & parent) : parent_(parent) {} + + void lock() { + parent_.data_.lock(); + } + + void unlock() { + parent_.data_.unlock(); + } +private: + MutexTwoPriorities & parent_; + }; + + class LpMutex + { +public: + LpMutex(MutexTwoPriorities & parent) : parent_(parent) {} + + void lock() { + // low_prio_.lock(); data_.lock(); + std::unique_lock lpg{parent_.low_prio_}; + parent_.data_.lock(); + lpg.release(); + } + + void unlock() { + // data_.unlock(); low_prio_.unlock() + std::lock_guard lpg{parent_.low_prio_, std::adopt_lock}; + parent_.data_.unlock(); + } +private: + MutexTwoPriorities & parent_; + }; + + HpMutex hp() {return HpMutex{*this};} + LpMutex lp() {return LpMutex{*this};} + +private: + // Implementation detail: the whole idea here is that only one low priority thread can be + // trying to take the data_ mutex, while all high priority threads are already waiting there. + std::mutex low_prio_; + std::mutex data_; + }; + + MutexTwoPriorities wait_mutex_; size_t number_of_threads_; bool yield_before_execute_; std::chrono::nanoseconds next_exec_timeout_; std::set scheduled_timers_; - std::mutex scheduled_timers_mutex_; }; } // namespace executors diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 233d2149f0..34815e62ae 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -44,6 +44,7 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {} void MultiThreadedExecutor::spin() { + using MutexTwoPriorities = rclcpp::executors::MultiThreadedExecutor::MutexTwoPriorities; if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); } @@ -51,7 +52,8 @@ MultiThreadedExecutor::spin() std::vector threads; size_t thread_id = 0; { - std::lock_guard wait_lock(wait_mutex_); + auto lp_wait_mutex = wait_mutex_.lp(); + std::lock_guard wait_lock(lp_wait_mutex); for (; thread_id < number_of_threads_ - 1; ++thread_id) { auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); threads.emplace_back(func); @@ -73,10 +75,12 @@ MultiThreadedExecutor::get_number_of_threads() void MultiThreadedExecutor::run(size_t) { + using MutexTwoPriorities = rclcpp::executors::MultiThreadedExecutor::MutexTwoPriorities; while (rclcpp::ok(this->context_) && spinning.load()) { rclcpp::AnyExecutable any_exec; { - std::lock_guard wait_guard{wait_mutex_}; + auto lp_wait_mutex = wait_mutex_.lp(); + std::lock_guard wait_lock(lp_wait_mutex); if (!rclcpp::ok(this->context_) || !spinning.load()) { return; } @@ -85,13 +89,6 @@ MultiThreadedExecutor::run(size_t) } if (any_exec.timer) { // Guard against multiple threads getting the same timer. - - // THIS MUST BE DONE WITH BOTH THE WAIT_MUTEX AND SCHEDULED TIMERS MUTEX TAKEN!! - // It might seem unnecessary, but you avoid the following race: - // Thread A: get timer, context switch. - // Thread B: get timer, insert scheduled timer, execute timer, remove scheduled timer. - // Thread A: execute timer. - std::lock_guard scheduled_timers_guard{scheduled_timers_mutex_}; if (scheduled_timers_.count(any_exec.timer) != 0) { // Make sure that any_exec's callback group is reset before // the lock is released. @@ -110,15 +107,8 @@ MultiThreadedExecutor::run(size_t) execute_any_executable(any_exec); if (any_exec.timer) { - // DON'T DELETE THE scheduled_timers_mutex_ AND REPLACE IT WITH THE wait_mutex_ here. - // Now, this mutex will only compete with ONE worker thread that's is trying to insert - // a timer. - // (and also, N other worker threads also trying to delete a timer). - // If the wait_mutex_ is used here, this will compete with all the other worker threads!!! - // If we're waiting too long too remove the scheduled timer, maybe we are discarding a timer - // execution that we shouldn't! - // Of course, this can still happen if the callback is too long ... - std::lock_guard scheduled_timers_guard{scheduled_timers_mutex_}; + auto hp_wait_mutex = wait_mutex_.hp(); + std::lock_guard wait_lock(hp_wait_mutex); auto it = scheduled_timers_.find(any_exec.timer); if (it != scheduled_timers_.end()) { scheduled_timers_.erase(it); From 7bd49399a0ffb7016c511d9e1ed44dfc7a8c8850 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Mon, 18 Jan 2021 10:15:48 -0300 Subject: [PATCH 07/10] please linters Signed-off-by: Ivan Santiago Paunovic --- .../executors/multi_threaded_executor.hpp | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 36e756bb16..fbf446f561 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -86,20 +86,25 @@ class MultiThreadedExecutor : public rclcpp::Executor * After the current mutex owner release the lock, a thread that used the high * priority mechanism will have priority over threads that used the low priority mechanism. */ - class MutexTwoPriorities { + class MutexTwoPriorities + { public: class HpMutex { public: - HpMutex(MutexTwoPriorities & parent) : parent_(parent) {} + explicit HpMutex(MutexTwoPriorities & parent) + : parent_(parent) {} - void lock() { + void lock() + { parent_.data_.lock(); } - void unlock() { + void unlock() + { parent_.data_.unlock(); } + private: MutexTwoPriorities & parent_; }; @@ -107,27 +112,31 @@ class MultiThreadedExecutor : public rclcpp::Executor class LpMutex { public: - LpMutex(MutexTwoPriorities & parent) : parent_(parent) {} + explicit LpMutex(MutexTwoPriorities & parent) + : parent_(parent) {} - void lock() { + void lock() + { // low_prio_.lock(); data_.lock(); std::unique_lock lpg{parent_.low_prio_}; parent_.data_.lock(); lpg.release(); } - void unlock() { + void unlock() + { // data_.unlock(); low_prio_.unlock() std::lock_guard lpg{parent_.low_prio_, std::adopt_lock}; parent_.data_.unlock(); } + private: MutexTwoPriorities & parent_; }; HpMutex hp() {return HpMutex{*this};} LpMutex lp() {return LpMutex{*this};} - + private: // Implementation detail: the whole idea here is that only one low priority thread can be // trying to take the data_ mutex, while all high priority threads are already waiting there. From 7b9548d583f9f822809dc0d8b12fa60997b1e2e9 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Wed, 27 Jan 2021 15:35:28 -0300 Subject: [PATCH 08/10] Avoid abbreviations, move MutexTwoPriorities to rclcpp::detail::MutexTwoPriorities Signed-off-by: Ivan Santiago Paunovic --- .../rclcpp/detail/mutex_two_priorities.hpp | 90 +++++++++++++++++++ .../executors/multi_threaded_executor.hpp | 66 +------------- .../executors/multi_threaded_executor.cpp | 15 ++-- 3 files changed, 99 insertions(+), 72 deletions(-) create mode 100644 rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp diff --git a/rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp b/rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp new file mode 100644 index 0000000000..9e1946833b --- /dev/null +++ b/rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp @@ -0,0 +1,90 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__DETAIL__MUTEX_TWO_PRIORITIES_HPP_ +#define RCLCPP__DETAIL__MUTEX_TWO_PRIORITIES_HPP_ + +#include + +namespace rclcpp +{ +namespace detail +{ +/// \internal A mutex that has two locking mechanism, one with higher priority than the other. +/** + * After the current mutex owner release the lock, a thread that used the high + * priority mechanism will have priority over threads that used the low priority mechanism. + */ +class MutexTwoPriorities +{ +public: + class HighPriorityMutex + { +public: + explicit HighPriorityMutex(MutexTwoPriorities & parent) + : parent_(parent) {} + + void lock() + { + parent_.data_.lock(); + } + + void unlock() + { + parent_.data_.unlock(); + } + +private: + MutexTwoPriorities & parent_; + }; + + class LowPriorityMutex + { +public: + explicit LowPriorityMutex(MutexTwoPriorities & parent) + : parent_(parent) {} + + void lock() + { + std::unique_lock barrier_guard{parent_.barrier_}; + parent_.data_.lock(); + barrier_guard.release(); + } + + void unlock() + { + // data_.unlock(); low_prio_.unlock() + std::lock_guard barrier_guard{parent_.barrier_, std::adopt_lock}; + parent_.data_.unlock(); + } + +private: + MutexTwoPriorities & parent_; + }; + + HighPriorityMutex get_high_priority_mutex() {return HighPriorityMutex{*this};} + LowPriorityMutex get_low_priority_mutex() {return LowPriorityMutex{*this};} + +private: + // Implementation detail: the idea here is that only one low priority thread can be + // trying to take the data_ mutex while the others are excluded by the barrier_ mutex. + // All high priority threads are already waiting for the data_ mutex. + std::mutex barrier_; + std::mutex data_; + }; + +} // namespace detail +} // namespace rclcpp + +#endif // RCLCPP__DETAIL__MUTEX_TWO_PRIORITIES_HPP_ diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index fbf446f561..28504620c5 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -22,6 +22,7 @@ #include #include +#include "rclcpp/detail/mutex_two_priorities.hpp" #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" @@ -81,70 +82,7 @@ class MultiThreadedExecutor : public rclcpp::Executor private: RCLCPP_DISABLE_COPY(MultiThreadedExecutor) - /// \internal A mutex that has two locking mechanism, one with higher priority than the other. - /** - * After the current mutex owner release the lock, a thread that used the high - * priority mechanism will have priority over threads that used the low priority mechanism. - */ - class MutexTwoPriorities - { -public: - class HpMutex - { -public: - explicit HpMutex(MutexTwoPriorities & parent) - : parent_(parent) {} - - void lock() - { - parent_.data_.lock(); - } - - void unlock() - { - parent_.data_.unlock(); - } - -private: - MutexTwoPriorities & parent_; - }; - - class LpMutex - { -public: - explicit LpMutex(MutexTwoPriorities & parent) - : parent_(parent) {} - - void lock() - { - // low_prio_.lock(); data_.lock(); - std::unique_lock lpg{parent_.low_prio_}; - parent_.data_.lock(); - lpg.release(); - } - - void unlock() - { - // data_.unlock(); low_prio_.unlock() - std::lock_guard lpg{parent_.low_prio_, std::adopt_lock}; - parent_.data_.unlock(); - } - -private: - MutexTwoPriorities & parent_; - }; - - HpMutex hp() {return HpMutex{*this};} - LpMutex lp() {return LpMutex{*this};} - -private: - // Implementation detail: the whole idea here is that only one low priority thread can be - // trying to take the data_ mutex, while all high priority threads are already waiting there. - std::mutex low_prio_; - std::mutex data_; - }; - - MutexTwoPriorities wait_mutex_; + detail::MutexTwoPriorities wait_mutex_; size_t number_of_threads_; bool yield_before_execute_; std::chrono::nanoseconds next_exec_timeout_; diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 34815e62ae..bf92875cdc 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -22,6 +22,7 @@ #include "rclcpp/utilities.hpp" #include "rclcpp/scope_exit.hpp" +using rclcpp::detail::MutexTwoPriorities; using rclcpp::executors::MultiThreadedExecutor; MultiThreadedExecutor::MultiThreadedExecutor( @@ -44,7 +45,6 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {} void MultiThreadedExecutor::spin() { - using MutexTwoPriorities = rclcpp::executors::MultiThreadedExecutor::MutexTwoPriorities; if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); } @@ -52,8 +52,8 @@ MultiThreadedExecutor::spin() std::vector threads; size_t thread_id = 0; { - auto lp_wait_mutex = wait_mutex_.lp(); - std::lock_guard wait_lock(lp_wait_mutex); + auto low_priority_wait_mutex = wait_mutex_.get_low_priority_mutex(); + std::lock_guard wait_lock(low_priority_wait_mutex); for (; thread_id < number_of_threads_ - 1; ++thread_id) { auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); threads.emplace_back(func); @@ -75,12 +75,11 @@ MultiThreadedExecutor::get_number_of_threads() void MultiThreadedExecutor::run(size_t) { - using MutexTwoPriorities = rclcpp::executors::MultiThreadedExecutor::MutexTwoPriorities; while (rclcpp::ok(this->context_) && spinning.load()) { rclcpp::AnyExecutable any_exec; { - auto lp_wait_mutex = wait_mutex_.lp(); - std::lock_guard wait_lock(lp_wait_mutex); + auto low_priority_wait_mutex = wait_mutex_.get_low_priority_mutex(); + std::lock_guard wait_lock(low_priority_wait_mutex); if (!rclcpp::ok(this->context_) || !spinning.load()) { return; } @@ -107,8 +106,8 @@ MultiThreadedExecutor::run(size_t) execute_any_executable(any_exec); if (any_exec.timer) { - auto hp_wait_mutex = wait_mutex_.hp(); - std::lock_guard wait_lock(hp_wait_mutex); + auto high_priority_wait_mutex = wait_mutex_.get_high_priority_mutex(); + std::lock_guard wait_lock(high_priority_wait_mutex); auto it = scheduled_timers_.find(any_exec.timer); if (it != scheduled_timers_.end()) { scheduled_timers_.erase(it); From 6618464239ab6492a9f172cb670f7075c4f4c0b1 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Wed, 27 Jan 2021 16:06:06 -0300 Subject: [PATCH 09/10] Move implementation to src file Signed-off-by: Ivan Santiago Paunovic --- rclcpp/CMakeLists.txt | 1 + .../rclcpp/detail/mutex_two_priorities.hpp | 41 ++++------- .../rclcpp/detail/mutex_two_priorities.cpp | 71 +++++++++++++++++++ 3 files changed, 85 insertions(+), 28 deletions(-) create mode 100644 rclcpp/src/rclcpp/detail/mutex_two_priorities.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 561beb5d3e..a05436a4de 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -39,6 +39,7 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/clock.cpp src/rclcpp/context.cpp src/rclcpp/contexts/default_context.cpp + src/rclcpp/detail/mutex_two_priorities.cpp src/rclcpp/detail/rmw_implementation_specific_payload.cpp src/rclcpp/detail/rmw_implementation_specific_publisher_payload.cpp src/rclcpp/detail/rmw_implementation_specific_subscription_payload.cpp diff --git a/rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp b/rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp index 9e1946833b..2bb5d29a50 100644 --- a/rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp +++ b/rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp @@ -1,4 +1,4 @@ -// Copyright 2020 Open Source Robotics Foundation, Inc. +// Copyright 2021 Open Source Robotics Foundation, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -32,18 +32,11 @@ class MutexTwoPriorities class HighPriorityMutex { public: - explicit HighPriorityMutex(MutexTwoPriorities & parent) - : parent_(parent) {} + explicit HighPriorityMutex(MutexTwoPriorities & parent); - void lock() - { - parent_.data_.lock(); - } + void lock(); - void unlock() - { - parent_.data_.unlock(); - } + void unlock(); private: MutexTwoPriorities & parent_; @@ -52,29 +45,21 @@ class MutexTwoPriorities class LowPriorityMutex { public: - explicit LowPriorityMutex(MutexTwoPriorities & parent) - : parent_(parent) {} + explicit LowPriorityMutex(MutexTwoPriorities & parent); - void lock() - { - std::unique_lock barrier_guard{parent_.barrier_}; - parent_.data_.lock(); - barrier_guard.release(); - } + void lock(); - void unlock() - { - // data_.unlock(); low_prio_.unlock() - std::lock_guard barrier_guard{parent_.barrier_, std::adopt_lock}; - parent_.data_.unlock(); - } + void unlock(); private: MutexTwoPriorities & parent_; }; - HighPriorityMutex get_high_priority_mutex() {return HighPriorityMutex{*this};} - LowPriorityMutex get_low_priority_mutex() {return LowPriorityMutex{*this};} + HighPriorityMutex + get_high_priority_mutex(); + + LowPriorityMutex + get_low_priority_mutex(); private: // Implementation detail: the idea here is that only one low priority thread can be @@ -82,7 +67,7 @@ class MutexTwoPriorities // All high priority threads are already waiting for the data_ mutex. std::mutex barrier_; std::mutex data_; - }; +}; } // namespace detail } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/detail/mutex_two_priorities.cpp b/rclcpp/src/rclcpp/detail/mutex_two_priorities.cpp new file mode 100644 index 0000000000..eced82e492 --- /dev/null +++ b/rclcpp/src/rclcpp/detail/mutex_two_priorities.cpp @@ -0,0 +1,71 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/detail/mutex_two_priorities.hpp" + +#include + +namespace rclcpp +{ +namespace detail +{ + +using LowPriorityMutex = MutexTwoPriorities::LowPriorityMutex; +using HighPriorityMutex = MutexTwoPriorities::HighPriorityMutex; + +HighPriorityMutex::HighPriorityMutex(MutexTwoPriorities & parent) : parent_(parent) {} + +void +HighPriorityMutex::lock() +{ + parent_.data_.lock(); +} + +void +HighPriorityMutex::unlock() +{ + parent_.data_.unlock(); +} + +LowPriorityMutex::LowPriorityMutex(MutexTwoPriorities & parent) : parent_(parent) {} + +void +LowPriorityMutex::lock() +{ + std::unique_lock barrier_guard{parent_.barrier_}; + parent_.data_.lock(); + barrier_guard.release(); +} + +void +LowPriorityMutex::unlock() +{ + std::lock_guard barrier_guard{parent_.barrier_, std::adopt_lock}; + parent_.data_.unlock(); +} + +HighPriorityMutex +MutexTwoPriorities::get_high_priority_mutex() +{ + return HighPriorityMutex{*this}; +} + +LowPriorityMutex +MutexTwoPriorities::get_low_priority_mutex() +{ + return LowPriorityMutex{*this}; +} + +} // namespace detail +} // namespace rclcpp From b05036ac5a4f739d64ef23e8d10b84fcec06b068 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Wed, 27 Jan 2021 16:11:34 -0300 Subject: [PATCH 10/10] renaming to avoid confusions + linters happy Signed-off-by: Ivan Santiago Paunovic --- .../rclcpp/detail/mutex_two_priorities.hpp | 16 +++++----- .../rclcpp/detail/mutex_two_priorities.cpp | 32 +++++++++++-------- .../executors/multi_threaded_executor.cpp | 12 +++---- 3 files changed, 32 insertions(+), 28 deletions(-) diff --git a/rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp b/rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp index 2bb5d29a50..6c5f69e98f 100644 --- a/rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp +++ b/rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp @@ -29,10 +29,10 @@ namespace detail class MutexTwoPriorities { public: - class HighPriorityMutex + class HighPriorityLockable { public: - explicit HighPriorityMutex(MutexTwoPriorities & parent); + explicit HighPriorityLockable(MutexTwoPriorities & parent); void lock(); @@ -42,10 +42,10 @@ class MutexTwoPriorities MutexTwoPriorities & parent_; }; - class LowPriorityMutex + class LowPriorityLockable { public: - explicit LowPriorityMutex(MutexTwoPriorities & parent); + explicit LowPriorityLockable(MutexTwoPriorities & parent); void lock(); @@ -55,11 +55,11 @@ class MutexTwoPriorities MutexTwoPriorities & parent_; }; - HighPriorityMutex - get_high_priority_mutex(); + HighPriorityLockable + get_high_priority_lockable(); - LowPriorityMutex - get_low_priority_mutex(); + LowPriorityLockable + get_low_priority_lockable(); private: // Implementation detail: the idea here is that only one low priority thread can be diff --git a/rclcpp/src/rclcpp/detail/mutex_two_priorities.cpp b/rclcpp/src/rclcpp/detail/mutex_two_priorities.cpp index eced82e492..e7c7a091e8 100644 --- a/rclcpp/src/rclcpp/detail/mutex_two_priorities.cpp +++ b/rclcpp/src/rclcpp/detail/mutex_two_priorities.cpp @@ -21,27 +21,31 @@ namespace rclcpp namespace detail { -using LowPriorityMutex = MutexTwoPriorities::LowPriorityMutex; -using HighPriorityMutex = MutexTwoPriorities::HighPriorityMutex; +using LowPriorityLockable = MutexTwoPriorities::LowPriorityLockable; +using HighPriorityLockable = MutexTwoPriorities::HighPriorityLockable; -HighPriorityMutex::HighPriorityMutex(MutexTwoPriorities & parent) : parent_(parent) {} +HighPriorityLockable::HighPriorityLockable(MutexTwoPriorities & parent) +: parent_(parent) +{} void -HighPriorityMutex::lock() +HighPriorityLockable::lock() { parent_.data_.lock(); } void -HighPriorityMutex::unlock() +HighPriorityLockable::unlock() { parent_.data_.unlock(); } -LowPriorityMutex::LowPriorityMutex(MutexTwoPriorities & parent) : parent_(parent) {} +LowPriorityLockable::LowPriorityLockable(MutexTwoPriorities & parent) +: parent_(parent) +{} void -LowPriorityMutex::lock() +LowPriorityLockable::lock() { std::unique_lock barrier_guard{parent_.barrier_}; parent_.data_.lock(); @@ -49,22 +53,22 @@ LowPriorityMutex::lock() } void -LowPriorityMutex::unlock() +LowPriorityLockable::unlock() { std::lock_guard barrier_guard{parent_.barrier_, std::adopt_lock}; parent_.data_.unlock(); } -HighPriorityMutex -MutexTwoPriorities::get_high_priority_mutex() +HighPriorityLockable +MutexTwoPriorities::get_high_priority_lockable() { - return HighPriorityMutex{*this}; + return HighPriorityLockable{*this}; } -LowPriorityMutex -MutexTwoPriorities::get_low_priority_mutex() +LowPriorityLockable +MutexTwoPriorities::get_low_priority_lockable() { - return LowPriorityMutex{*this}; + return LowPriorityLockable{*this}; } } // namespace detail diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index bf92875cdc..a9b01b0c34 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -52,8 +52,8 @@ MultiThreadedExecutor::spin() std::vector threads; size_t thread_id = 0; { - auto low_priority_wait_mutex = wait_mutex_.get_low_priority_mutex(); - std::lock_guard wait_lock(low_priority_wait_mutex); + auto low_priority_wait_mutex = wait_mutex_.get_low_priority_lockable(); + std::lock_guard wait_lock(low_priority_wait_mutex); for (; thread_id < number_of_threads_ - 1; ++thread_id) { auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); threads.emplace_back(func); @@ -78,8 +78,8 @@ MultiThreadedExecutor::run(size_t) while (rclcpp::ok(this->context_) && spinning.load()) { rclcpp::AnyExecutable any_exec; { - auto low_priority_wait_mutex = wait_mutex_.get_low_priority_mutex(); - std::lock_guard wait_lock(low_priority_wait_mutex); + auto low_priority_wait_mutex = wait_mutex_.get_low_priority_lockable(); + std::lock_guard wait_lock(low_priority_wait_mutex); if (!rclcpp::ok(this->context_) || !spinning.load()) { return; } @@ -106,8 +106,8 @@ MultiThreadedExecutor::run(size_t) execute_any_executable(any_exec); if (any_exec.timer) { - auto high_priority_wait_mutex = wait_mutex_.get_high_priority_mutex(); - std::lock_guard wait_lock(high_priority_wait_mutex); + auto high_priority_wait_mutex = wait_mutex_.get_high_priority_lockable(); + std::lock_guard wait_lock(high_priority_wait_mutex); auto it = scheduled_timers_.find(any_exec.timer); if (it != scheduled_timers_.end()) { scheduled_timers_.erase(it);