From 0a1ba0407ae1d1acca11a088f8f5ddd71e23b7e7 Mon Sep 17 00:00:00 2001 From: Guillaume Autran Date: Wed, 24 Jul 2019 15:41:34 -0400 Subject: [PATCH] Crash in callback group pointer vector iterator 1. Mutually exclusive callback group hangs The root cause for this issue is due to a combination between the multithreaded executor and the mutually exclusive callback group used for all the ROS topics. When the executor collects all the references to the subscriptions, timers and services, it skips the mutually exclusive callback_groups which are currently locked (ie: being processed by another thread). This cause the resulting waitset to only contain the guard pointers. If there is no activity on those guards, the thread will wait for work forever in the get_next_executable and block all other threads. The resolution is to simply add a timeout for the multithreaded get_next_executable call ensuring the calling thread will eventually return. 2. Memory leak in callback group weak reference vectors There is leak in the callback group weak reference vectors that occurs when a weak reference becomes invalid (subscription is dropped, service deleted, etc). The now obsolete weak pointer reference is never deleted from the callback group pointer vector causing the leak. The resolution of this problem is implemented by scanning and deleting expired weak pointer at the time of insertion of a new weak pointer into the callback group vectors. This approach is the lowest computational cost to purging obsolete weak pointers. 3. Crash in iterator for callback group pointer vectors This problem exists because a reference to the callback group pointer vector is provided as a return value to facilitate loop iterator. This is a significant crash root cause with a multithreaded executor where a thread is able to add new subscription to the callback group. The crash is caused by a concurrent modification of the weak pointer vector while another thread is iterating over that same vector resulting in a crash. Testing: These changes where implemented and tested using a test application which creates / publish / deletes thousands of topics (up to 100,000) from a separate standalone thread while the ROS2 layer is receiving data on the topics deleted. The muiltithreaded was setup to contain 10 separate executor threads on a single mutually exclusive callback group containing thousands of topics. issue: #813 Signed-off-by: Guillaume Autran --- rclcpp/include/rclcpp/callback_group.hpp | 48 ++++++++--- .../executors/multi_threaded_executor.hpp | 5 +- .../strategies/allocator_memory_strategy.hpp | 41 ++++----- rclcpp/src/rclcpp/callback_group.cpp | 64 +++++++------- rclcpp/src/rclcpp/executor.cpp | 28 ++++--- .../executors/multi_threaded_executor.cpp | 9 +- rclcpp/src/rclcpp/memory_strategy.cpp | 83 ++++++++++--------- 7 files changed, 152 insertions(+), 126 deletions(-) diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index 57319b2c23..6fe62f05bb 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -62,25 +62,40 @@ class CallbackGroup RCLCPP_PUBLIC explicit CallbackGroup(CallbackGroupType group_type); + template RCLCPP_PUBLIC - const std::vector & - get_subscription_ptrs() const; + rclcpp::SubscriptionBase::SharedPtr + find_subscription_ptrs_if(Function func) const { + return _find_ptrs_if_impl(func, subscription_ptrs_); + } + template RCLCPP_PUBLIC - const std::vector & - get_timer_ptrs() const; + rclcpp::TimerBase::SharedPtr + find_timer_ptrs_if(Function func) const { + return _find_ptrs_if_impl(func, timer_ptrs_); + } + template RCLCPP_PUBLIC - const std::vector & - get_service_ptrs() const; + rclcpp::ServiceBase::SharedPtr + find_service_ptrs_if(Function func) const { + return _find_ptrs_if_impl(func, service_ptrs_); + } + template RCLCPP_PUBLIC - const std::vector & - get_client_ptrs() const; + rclcpp::ClientBase::SharedPtr + find_client_ptrs_if(Function func) const { + return _find_ptrs_if_impl(func, client_ptrs_); + } + template RCLCPP_PUBLIC - const std::vector & - get_waitable_ptrs() const; + rclcpp::Waitable::SharedPtr + find_waitable_ptrs_if(Function func) const { + return _find_ptrs_if_impl(func, waitable_ptrs_); + } RCLCPP_PUBLIC std::atomic_bool & @@ -130,6 +145,19 @@ class CallbackGroup std::vector client_ptrs_; std::vector waitable_ptrs_; std::atomic_bool can_be_taken_from_; + +private: + template + typename TypeT::SharedPtr _find_ptrs_if_impl( + Function func, const std::vector& vect_ptrs) const { + std::lock_guard lock(mutex_); + for (auto & weak_ptr : vect_ptrs) { + auto ref_ptr = weak_ptr.lock(); + if (ref_ptr && func(ref_ptr)) + return ref_ptr; + } + return typename TypeT::SharedPtr(); + } }; } // namespace callback_group diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 72df5dd82a..316c016a45 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -15,6 +15,7 @@ #ifndef RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_ #define RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_ +#include #include #include #include @@ -53,7 +54,8 @@ class MultiThreadedExecutor : public executor::Executor MultiThreadedExecutor( const executor::ExecutorArgs & args = executor::ExecutorArgs(), size_t number_of_threads = 0, - bool yield_before_execute = false); + bool yield_before_execute = false, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); RCLCPP_PUBLIC virtual ~MultiThreadedExecutor(); @@ -77,6 +79,7 @@ class MultiThreadedExecutor : public executor::Executor std::mutex wait_mutex_; size_t number_of_threads_; bool yield_before_execute_; + std::chrono::nanoseconds next_exec_timeout_; std::set scheduled_timers_; }; diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index 61d5b74097..a692b74662 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -164,40 +164,31 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy if (!group || !group->can_be_taken_from().load()) { continue; } - for (auto & weak_subscription : group->get_subscription_ptrs()) { - auto subscription = weak_subscription.lock(); - if (subscription) { + group->find_subscription_ptrs_if( + [this](const rclcpp::SubscriptionBase::SharedPtr& subscription) { subscription_handles_.push_back(subscription->get_subscription_handle()); if (subscription->get_intra_process_subscription_handle()) { subscription_handles_.push_back( subscription->get_intra_process_subscription_handle()); } - } - } - for (auto & weak_service : group->get_service_ptrs()) { - auto service = weak_service.lock(); - if (service) { + return false; + }); + group->find_service_ptrs_if([this](const rclcpp::ServiceBase::SharedPtr& service) { service_handles_.push_back(service->get_service_handle()); - } - } - for (auto & weak_client : group->get_client_ptrs()) { - auto client = weak_client.lock(); - if (client) { + return false; + }); + group->find_client_ptrs_if([this](const rclcpp::ClientBase::SharedPtr& client) { client_handles_.push_back(client->get_client_handle()); - } - } - for (auto & weak_timer : group->get_timer_ptrs()) { - auto timer = weak_timer.lock(); - if (timer) { + return false; + }); + group->find_timer_ptrs_if([this](const rclcpp::TimerBase::SharedPtr& timer) { timer_handles_.push_back(timer->get_timer_handle()); - } - } - for (auto & weak_waitable : group->get_waitable_ptrs()) { - auto waitable = weak_waitable.lock(); - if (waitable) { + return false; + }); + group->find_waitable_ptrs_if([this](const rclcpp::Waitable::SharedPtr& waitable) { waitable_handles_.push_back(waitable); - } - } + return false; + }); } } return has_invalid_weak_nodes; diff --git a/rclcpp/src/rclcpp/callback_group.cpp b/rclcpp/src/rclcpp/callback_group.cpp index b812afc516..0d152d703d 100644 --- a/rclcpp/src/rclcpp/callback_group.cpp +++ b/rclcpp/src/rclcpp/callback_group.cpp @@ -23,40 +23,6 @@ CallbackGroup::CallbackGroup(CallbackGroupType group_type) : type_(group_type), can_be_taken_from_(true) {} -const std::vector & -CallbackGroup::get_subscription_ptrs() const -{ - std::lock_guard lock(mutex_); - return subscription_ptrs_; -} - -const std::vector & -CallbackGroup::get_timer_ptrs() const -{ - std::lock_guard lock(mutex_); - return timer_ptrs_; -} - -const std::vector & -CallbackGroup::get_service_ptrs() const -{ - std::lock_guard lock(mutex_); - return service_ptrs_; -} - -const std::vector & -CallbackGroup::get_client_ptrs() const -{ - std::lock_guard lock(mutex_); - return client_ptrs_; -} - -const std::vector & -CallbackGroup::get_waitable_ptrs() const -{ - std::lock_guard lock(mutex_); - return waitable_ptrs_; -} std::atomic_bool & CallbackGroup::can_be_taken_from() @@ -76,6 +42,12 @@ CallbackGroup::add_subscription( { std::lock_guard lock(mutex_); subscription_ptrs_.push_back(subscription_ptr); + subscription_ptrs_.erase( + std::remove_if( + subscription_ptrs_.begin(), + subscription_ptrs_.end(), + [](rclcpp::SubscriptionBase::WeakPtr x){return x.expired();}), + subscription_ptrs_.end()); } void @@ -83,6 +55,12 @@ CallbackGroup::add_timer(const rclcpp::TimerBase::SharedPtr timer_ptr) { std::lock_guard lock(mutex_); timer_ptrs_.push_back(timer_ptr); + timer_ptrs_.erase( + std::remove_if( + timer_ptrs_.begin(), + timer_ptrs_.end(), + [](rclcpp::TimerBase::WeakPtr x){return x.expired();}), + timer_ptrs_.end()); } void @@ -90,6 +68,12 @@ CallbackGroup::add_service(const rclcpp::ServiceBase::SharedPtr service_ptr) { std::lock_guard lock(mutex_); service_ptrs_.push_back(service_ptr); + service_ptrs_.erase( + std::remove_if( + service_ptrs_.begin(), + service_ptrs_.end(), + [](rclcpp::ServiceBase::WeakPtr x){return x.expired();}), + service_ptrs_.end()); } void @@ -97,6 +81,12 @@ CallbackGroup::add_client(const rclcpp::ClientBase::SharedPtr client_ptr) { std::lock_guard lock(mutex_); client_ptrs_.push_back(client_ptr); + client_ptrs_.erase( + std::remove_if( + client_ptrs_.begin(), + client_ptrs_.end(), + [](rclcpp::ClientBase::WeakPtr x){return x.expired();}), + client_ptrs_.end()); } void @@ -104,6 +94,12 @@ CallbackGroup::add_waitable(const rclcpp::Waitable::SharedPtr waitable_ptr) { std::lock_guard lock(mutex_); waitable_ptrs_.push_back(waitable_ptr); + waitable_ptrs_.erase( + std::remove_if( + waitable_ptrs_.begin(), + waitable_ptrs_.end(), + [](rclcpp::Waitable::WeakPtr x){return x.expired();}), + waitable_ptrs_.end()); } void diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 2a47b71bbd..aec3bf2a6f 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -511,11 +511,12 @@ Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer) if (!group) { continue; } - for (auto & weak_timer : group->get_timer_ptrs()) { - auto t = weak_timer.lock(); - if (t == timer) { - return group; - } + auto timer_ref = group->find_timer_ptrs_if( + [timer](const rclcpp::TimerBase::SharedPtr& timer_ptr)->bool { + return (timer_ptr == timer); + }); + if (timer_ref) { + return group; } } } @@ -535,14 +536,15 @@ Executor::get_next_timer(AnyExecutable & any_exec) if (!group || !group->can_be_taken_from().load()) { continue; } - for (auto & timer_ref : group->get_timer_ptrs()) { - auto timer = timer_ref.lock(); - if (timer && timer->is_ready()) { - any_exec.timer = timer; - any_exec.callback_group = group; - node = get_node_by_group(group); - return; - } + auto timer_ref = group->find_timer_ptrs_if( + [](const rclcpp::TimerBase::SharedPtr& timer)->bool { + return timer->is_ready(); + }); + if (timer_ref) { + any_exec.timer = timer_ref; + any_exec.callback_group = group; + node = get_node_by_group(group); + return; } } } diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 24c5c79b2c..bf980c25a6 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -27,8 +27,11 @@ using rclcpp::executors::MultiThreadedExecutor; MultiThreadedExecutor::MultiThreadedExecutor( const rclcpp::executor::ExecutorArgs & args, size_t number_of_threads, - bool yield_before_execute) -: executor::Executor(args), yield_before_execute_(yield_before_execute) + bool yield_before_execute, + std::chrono::nanoseconds next_exec_timeout) +: executor::Executor(args) +, yield_before_execute_(yield_before_execute) +, next_exec_timeout_(next_exec_timeout) { number_of_threads_ = number_of_threads ? number_of_threads : std::thread::hardware_concurrency(); if (number_of_threads_ == 0) { @@ -77,7 +80,7 @@ MultiThreadedExecutor::run(size_t) if (!rclcpp::ok(this->context_) || !spinning.load()) { return; } - if (!get_next_executable(any_exec)) { + if (!get_next_executable(any_exec, next_exec_timeout_)) { continue; } if (any_exec.timer) { diff --git a/rclcpp/src/rclcpp/memory_strategy.cpp b/rclcpp/src/rclcpp/memory_strategy.cpp index 01b6226d3d..f22aebcb9a 100644 --- a/rclcpp/src/rclcpp/memory_strategy.cpp +++ b/rclcpp/src/rclcpp/memory_strategy.cpp @@ -32,16 +32,13 @@ MemoryStrategy::get_subscription_by_handle( if (!group) { continue; } - for (auto & weak_subscription : group->get_subscription_ptrs()) { - auto subscription = weak_subscription.lock(); - if (subscription) { - if (subscription->get_subscription_handle() == subscriber_handle) { - return subscription; - } - if (subscription->get_intra_process_subscription_handle() == subscriber_handle) { - return subscription; - } - } + auto match_subscription = group->find_subscription_ptrs_if( + [&subscriber_handle](const rclcpp::SubscriptionBase::SharedPtr& subscription)->bool { + return (subscription->get_subscription_handle() == subscriber_handle) || + (subscription->get_intra_process_subscription_handle() == subscriber_handle); + }); + if (match_subscription) { + return match_subscription; } } } @@ -63,11 +60,12 @@ MemoryStrategy::get_service_by_handle( if (!group) { continue; } - for (auto & weak_service : group->get_service_ptrs()) { - auto service = weak_service.lock(); - if (service && service->get_service_handle() == service_handle) { - return service; - } + auto service_ref = group->find_service_ptrs_if( + [&service_handle](const rclcpp::ServiceBase::SharedPtr& service)->bool { + return (service->get_service_handle() == service_handle); + }); + if (service_ref) { + return service_ref; } } } @@ -89,11 +87,12 @@ MemoryStrategy::get_client_by_handle( if (!group) { continue; } - for (auto & weak_client : group->get_client_ptrs()) { - auto client = weak_client.lock(); - if (client && client->get_client_handle() == client_handle) { - return client; - } + auto client_ref = group->find_client_ptrs_if( + [&client_handle](const rclcpp::ClientBase::SharedPtr &client)->bool { + return (client->get_client_handle() == client_handle); + }); + if (client_ref) { + return client_ref; } } } @@ -138,11 +137,12 @@ MemoryStrategy::get_group_by_subscription( if (!group) { continue; } - for (auto & weak_sub : group->get_subscription_ptrs()) { - auto sub = weak_sub.lock(); - if (sub == subscription) { - return group; - } + auto match_sub = group->find_subscription_ptrs_if( + [&subscription](const rclcpp::SubscriptionBase::SharedPtr& sub)->bool { + return (sub == subscription); + }); + if (match_sub) { + return group; } } } @@ -164,11 +164,12 @@ MemoryStrategy::get_group_by_service( if (!group) { continue; } - for (auto & weak_serv : group->get_service_ptrs()) { - auto serv = weak_serv.lock(); - if (serv && serv == service) { - return group; - } + auto service_ref = group->find_service_ptrs_if( + [&service](const rclcpp::ServiceBase::SharedPtr& serv)->bool { + return (serv == service); + }); + if (service_ref) { + return group; } } } @@ -190,11 +191,12 @@ MemoryStrategy::get_group_by_client( if (!group) { continue; } - for (auto & weak_client : group->get_client_ptrs()) { - auto cli = weak_client.lock(); - if (cli && cli == client) { - return group; - } + auto client_ref = group->find_client_ptrs_if( + [&client](const rclcpp::ClientBase::SharedPtr &cli)->bool { + return (cli == client); + }); + if (client_ref) { + return group; } } } @@ -216,11 +218,12 @@ MemoryStrategy::get_group_by_waitable( if (!group) { continue; } - for (auto & weak_waitable : group->get_waitable_ptrs()) { - auto group_waitable = weak_waitable.lock(); - if (group_waitable && group_waitable == waitable) { - return group; - } + auto waitable_ref = group->find_waitable_ptrs_if( + [&waitable](const rclcpp::Waitable::SharedPtr& group_waitable)->bool { + return (group_waitable == waitable); + }); + if (waitable_ref) { + return group; } } }