From 1de497584a1fca51dd4446257a68ea66c75b56a3 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Mon, 8 Apr 2024 20:41:52 +0200 Subject: [PATCH 01/12] Fix callback group logic in executor Signed-off-by: Janosch Machowinski --- rclcpp/src/rclcpp/executor.cpp | 13 ++++++++----- .../executors/executor_entities_collection.cpp | 10 +++++----- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index a54d71e21b..1ab201a02a 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -689,7 +689,8 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) auto entity_iter = current_collection_.timers.find(timer->get_timer_handle().get()); if (entity_iter != current_collection_.timers.end()) { auto callback_group = entity_iter->second.callback_group.lock(); - if (callback_group && !callback_group->can_be_taken_from()) { + if (!callback_group || !callback_group->can_be_taken_from()) { + current_timer_index++; continue; } // At this point the timer is either ready for execution or was perhaps @@ -699,6 +700,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) wait_result_->clear_timer_with_index(current_timer_index); // Check that the timer should be called still, i.e. it wasn't canceled. if (!timer->call()) { + current_timer_index++; continue; } any_executable.timer = timer; @@ -706,6 +708,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) valid_executable = true; break; } + current_timer_index++; } } @@ -715,7 +718,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) subscription->get_subscription_handle().get()); if (entity_iter != current_collection_.subscriptions.end()) { auto callback_group = entity_iter->second.callback_group.lock(); - if (callback_group && !callback_group->can_be_taken_from()) { + if (!callback_group || !callback_group->can_be_taken_from()) { continue; } any_executable.subscription = subscription; @@ -731,7 +734,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) auto entity_iter = current_collection_.services.find(service->get_service_handle().get()); if (entity_iter != current_collection_.services.end()) { auto callback_group = entity_iter->second.callback_group.lock(); - if (callback_group && !callback_group->can_be_taken_from()) { + if (!callback_group || !callback_group->can_be_taken_from()) { continue; } any_executable.service = service; @@ -747,7 +750,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) auto entity_iter = current_collection_.clients.find(client->get_client_handle().get()); if (entity_iter != current_collection_.clients.end()) { auto callback_group = entity_iter->second.callback_group.lock(); - if (callback_group && !callback_group->can_be_taken_from()) { + if (!callback_group || !callback_group->can_be_taken_from()) { continue; } any_executable.client = client; @@ -763,7 +766,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) auto entity_iter = current_collection_.waitables.find(waitable.get()); if (entity_iter != current_collection_.waitables.end()) { auto callback_group = entity_iter->second.callback_group.lock(); - if (callback_group && !callback_group->can_be_taken_from()) { + if (!callback_group || !callback_group->can_be_taken_from()) { continue; } any_executable.waitable = waitable; diff --git a/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp index 765002b2ef..68ac56b656 100644 --- a/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp +++ b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp @@ -153,7 +153,7 @@ ready_executables( continue; } auto group_info = group_cache(entity_iter->second.callback_group); - if (group_info && !group_info->can_be_taken_from().load()) { + if (!group_info || !group_info->can_be_taken_from().load()) { continue; } if (!entity->call()) { @@ -176,7 +176,7 @@ ready_executables( continue; } auto group_info = group_cache(entity_iter->second.callback_group); - if (group_info && !group_info->can_be_taken_from().load()) { + if (!group_info || !group_info->can_be_taken_from().load()) { continue; } rclcpp::AnyExecutable exec; @@ -196,7 +196,7 @@ ready_executables( continue; } auto group_info = group_cache(entity_iter->second.callback_group); - if (group_info && !group_info->can_be_taken_from().load()) { + if (!group_info || !group_info->can_be_taken_from().load()) { continue; } rclcpp::AnyExecutable exec; @@ -216,7 +216,7 @@ ready_executables( continue; } auto group_info = group_cache(entity_iter->second.callback_group); - if (group_info && !group_info->can_be_taken_from().load()) { + if (!group_info || !group_info->can_be_taken_from().load()) { continue; } rclcpp::AnyExecutable exec; @@ -236,7 +236,7 @@ ready_executables( continue; } auto group_info = group_cache(entry.callback_group); - if (group_info && !group_info->can_be_taken_from().load()) { + if (!group_info || !group_info->can_be_taken_from().load()) { continue; } rclcpp::AnyExecutable exec; From f14dc71a0e1066a0fd92522c57b8a0d688a4fa27 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Wed, 10 Apr 2024 19:16:27 +0200 Subject: [PATCH 02/12] fix: Fixed unnecessary copy of wait_set Signed-off-by: Janosch Machowinski --- rclcpp/include/rclcpp/wait_result.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/wait_result.hpp b/rclcpp/include/rclcpp/wait_result.hpp index 74d3d2183c..3384a7846a 100644 --- a/rclcpp/include/rclcpp/wait_result.hpp +++ b/rclcpp/include/rclcpp/wait_result.hpp @@ -274,7 +274,7 @@ class WaitResult final if (this->kind() == WaitResultKind::Ready) { auto & wait_set = this->get_wait_set(); - auto rcl_wait_set = wait_set.get_rcl_wait_set(); + auto & rcl_wait_set = wait_set.get_rcl_wait_set(); while (next_waitable_index_ < wait_set.size_of_waitables()) { auto cur_waitable = wait_set.waitables(next_waitable_index_++); if (cur_waitable != nullptr && cur_waitable->is_ready(rcl_wait_set)) { From 27939a8107409228f4f7f477b092f3716d6ddf19 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Wed, 10 Apr 2024 19:32:55 +0200 Subject: [PATCH 03/12] fix(executor): Fixed race conditions with rebuild of wait_sets Before this change, the rebuild of wait set would be triggered after the wait set was waken up. With bad timing, this could lead to the rebuild not happening with multi threaded executor. Signed-off-by: Janosch Machowinski --- rclcpp/include/rclcpp/executor.hpp | 10 ++++ rclcpp/src/rclcpp/executor.cpp | 91 ++++++++++++++---------------- 2 files changed, 51 insertions(+), 50 deletions(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index cfd7ea81fd..27f9378e34 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -550,6 +550,16 @@ class Executor AnyExecutable & any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + /** + * This function triggers a recollect of all entities + * that are registered to the executor. + * + * Calling this function is thread safe. + * + * @param notify if true will execute a trigger that will wake up a waiting executor + */ + void trigger_entity_recollect(bool notify); + /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins. std::atomic_bool spinning; diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 1ab201a02a..06a2d2e150 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -72,13 +72,10 @@ Executor::Executor(const rclcpp::ExecutorOptions & options) } }); - notify_waitable_->set_on_ready_callback( - [this](auto, auto) { - this->entities_need_rebuild_.store(true); - }); - notify_waitable_->add_guard_condition(interrupt_guard_condition_); notify_waitable_->add_guard_condition(shutdown_guard_condition_); + + wait_set_.add_waitable(notify_waitable_); } Executor::~Executor() @@ -122,6 +119,20 @@ Executor::~Executor() } } +void Executor::trigger_entity_recollect(bool notify) +{ + this->entities_need_rebuild_.store(true); + + if (!spinning.load()) { + std::lock_guard guard(mutex_); + this->collect_entities(); + } + + if(notify) { + interrupt_guard_condition_->trigger(); + } +} + std::vector Executor::get_all_callback_groups() { @@ -152,19 +163,12 @@ Executor::add_callback_group( (void) node_ptr; this->collector_.add_callback_group(group_ptr); - if (!spinning.load()) { - std::lock_guard guard(mutex_); - this->collect_entities(); - } - - if (notify) { - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( - std::string( - "Failed to trigger guard condition on callback group add: ") + ex.what()); - } + try { + this->trigger_entity_recollect(notify); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group add: ") + ex.what()); } } @@ -173,19 +177,12 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt { this->collector_.add_node(node_ptr); - if (!spinning.load()) { - std::lock_guard guard(mutex_); - this->collect_entities(); - } - - if (notify) { - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( + try { + this->trigger_entity_recollect(notify); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( std::string( "Failed to trigger guard condition on node add: ") + ex.what()); - } } } @@ -196,18 +193,12 @@ Executor::remove_callback_group( { this->collector_.remove_callback_group(group_ptr); - if (!spinning.load()) { - std::lock_guard guard(mutex_); - this->collect_entities(); - } - if (notify) { - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( + try { + this->trigger_entity_recollect(notify); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( std::string( "Failed to trigger guard condition on callback group remove: ") + ex.what()); - } } } @@ -222,19 +213,12 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node { this->collector_.remove_node(node_ptr); - if (!spinning.load()) { - std::lock_guard guard(mutex_); - this->collect_entities(); - } - - if (notify) { - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( + try { + this->trigger_entity_recollect(notify); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( std::string( "Failed to trigger guard condition on node remove: ") + ex.what()); - } } } @@ -664,6 +648,13 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) RCUTILS_LOG_WARN_NAMED( "rclcpp", "empty wait set received in wait(). This should never happen."); + } else { + if(this->wait_result_->kind() == WaitResultKind::Ready && notify_waitable_) { + auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set(); + if(notify_waitable_->is_ready(rcl_wait_set)) { + notify_waitable_->execute(notify_waitable_->take_data()); + } + } } } From 438e233008ac291497f103413806ac42e720495e Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Mon, 8 Apr 2024 20:40:58 +0200 Subject: [PATCH 04/12] fix(Executor): Fixed lost of entities rebuild request Signed-off-by: Janosch Machowinski --- rclcpp/src/rclcpp/executor.cpp | 9 +++------ .../rclcpp/executors/static_single_threaded_executor.cpp | 2 +- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 06a2d2e150..62f2f64595 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -123,7 +123,7 @@ void Executor::trigger_entity_recollect(bool notify) { this->entities_need_rebuild_.store(true); - if (!spinning.load()) { + if (!spinning.load() && entities_need_rebuild_.exchange(false)) { std::lock_guard guard(mutex_); this->collect_entities(); } @@ -387,9 +387,7 @@ Executor::execute_any_executable(AnyExecutable & any_exec) } // Reset the callback_group, regardless of type - if (any_exec.callback_group) { - any_exec.callback_group->can_be_taken_from().store(true); - } + any_exec.callback_group->can_be_taken_from().store(true); } template @@ -626,7 +624,6 @@ Executor::collect_entities() // In the case that an entity already has an expired weak pointer // before being removed from the waitset, additionally prune the waitset. this->wait_set_.prune_deleted_entities(); - this->entities_need_rebuild_.store(false); } void @@ -639,7 +636,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) { std::lock_guard guard(mutex_); - if (current_collection_.empty() || this->entities_need_rebuild_.load()) { + if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) { this->collect_entities(); } } diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index 831076c61c..b8fe92cb09 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -110,7 +110,7 @@ StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout) std::optional> StaticSingleThreadedExecutor::collect_and_wait(std::chrono::nanoseconds timeout) { - if (current_collection_.empty() || this->entities_need_rebuild_.load()) { + if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) { this->collect_entities(); } auto wait_result = wait_set_.wait(std::chrono::nanoseconds(timeout)); From 66b988089d469b9543df122b7c775e1b451380d6 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Thu, 11 Apr 2024 11:22:45 +0200 Subject: [PATCH 05/12] chore: Added assert for not set callback_group in execute_any_executable Signed-off-by: Janosch Machowinski --- rclcpp/src/rclcpp/executor.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 62f2f64595..ca094be96f 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -363,6 +364,9 @@ Executor::execute_any_executable(AnyExecutable & any_exec) return; } + assert((void("Internal error, tried to execute a AnyExecutable without a valid callback group"), + any_exec.callback_group)); + if (any_exec.timer) { TRACETOOLS_TRACEPOINT( rclcpp_executor_execute, From 5a2112f8fb31c0319ff439978748f4b1f4b3105f Mon Sep 17 00:00:00 2001 From: Michael Carroll Date: Wed, 3 Apr 2024 15:39:22 +0000 Subject: [PATCH 06/12] Add test for cbg getting reset Signed-off-by: Michael Carroll Co-authored-by: Janosch Machowinski --- rclcpp/test/rclcpp/CMakeLists.txt | 9 ++ ...test_executors_callback_group_behavior.cpp | 139 ++++++++++++++++++ 2 files changed, 148 insertions(+) create mode 100644 rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index d52c94b1d2..ea29f7a288 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -473,6 +473,15 @@ if(TARGET test_executors) target_link_libraries(test_executors_timer_cancel_behavior ${PROJECT_NAME} ${rosgraph_msgs_TARGETS}) endif() +ament_add_gtest( + test_executors_callback_group_behavior + executors/test_executors_callback_group_behavior.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}" + TIMEOUT 180) +if(TARGET test_executors) + target_link_libraries(test_executors_callback_group_behavior ${PROJECT_NAME}) +endif() + ament_add_gtest( test_executors_intraprocess executors/test_executors_intraprocess.cpp diff --git a/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp b/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp new file mode 100644 index 0000000000..1b33c1fbb4 --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp @@ -0,0 +1,139 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This test checks that when callback groups go out of scope, that their associated executable + * entities should not be returned as valid executables. + * + * The test makes use of a bit of executor internals, but is meant to prevent regressions of behavior. + * Ref: https://github.com/ros2/rclcpp/issues/2474 + */ + +#include + +#include +#include +#include +#include + +class CustomExecutor: public rclcpp::Executor +{ +public: + explicit CustomExecutor(const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions()) + : rclcpp::Executor(options) + { + } + + ~CustomExecutor() override = default; + + void spin() override {}; + + void collect() { + this->collect_entities(); + } + + void wait() { + this->wait_for_work(std::chrono::milliseconds(10)); + } + + size_t collected_timers() const { + return this->current_collection_.timers.size(); + } + + rclcpp::AnyExecutable next() { + rclcpp::AnyExecutable ret; + this->get_next_ready_executable(ret); + return ret; + } +}; + + +TEST(TestCallbackGroup, ValidCbg) +{ + rclcpp::init(0, nullptr); + + // Create a timer associated with a callback group + auto node = std::make_shared("node"); + + // Add the callback group to the executor + auto executor = CustomExecutor(); + executor.add_node(node); + executor.spin_all(std::chrono::milliseconds(10)); + + auto cbg = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, true); + auto timer = node->create_wall_timer(std::chrono::milliseconds(1), [](){}, cbg); + executor.add_callback_group(cbg, node->get_node_base_interface()); + + executor.spin_all(std::chrono::milliseconds(10)); + + // Collect the entities + executor.collect(); + EXPECT_EQ(1u, executor.collected_timers()); + + executor.wait(); + auto next_executable = executor.next(); + EXPECT_EQ(timer, next_executable.timer); + EXPECT_EQ(cbg, next_executable.callback_group); + + EXPECT_EQ(nullptr, next_executable.client); + EXPECT_EQ(nullptr, next_executable.service); + EXPECT_EQ(nullptr, next_executable.subscription); + EXPECT_EQ(nullptr, next_executable.waitable); + EXPECT_EQ(nullptr, next_executable.data); + + rclcpp::shutdown(); +} + +TEST(TestCallbackGroup, InvalidCbg) +{ + rclcpp::init(0, nullptr); + + // Create a timer associated with a callback group + auto node = std::make_shared("node"); + + // Add the callback group to the executor + auto executor = CustomExecutor(); + executor.add_node(node); + executor.spin_all(std::chrono::milliseconds(10)); + + auto cbg = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, false); + auto timer = node->create_wall_timer(std::chrono::milliseconds(1), [](){}, cbg); + executor.add_callback_group(cbg, nullptr); + + executor.spin_all(std::chrono::milliseconds(10)); + + // Collect the entities + executor.collect(); + EXPECT_EQ(1u, executor.collected_timers()); + + executor.wait(); + + cbg.reset(); + + // Since the callback group has been reset, this should not be allowed to + // be a valid executable (timer and cbg should both be nullptr). + // In the regression, timer == next_executable.timer whil + // next_executable.callback_group == nullptr, which was incorrect. + auto next_executable = executor.next(); + EXPECT_EQ(nullptr, next_executable.timer); + EXPECT_EQ(nullptr, next_executable.callback_group); + + EXPECT_EQ(nullptr, next_executable.client); + EXPECT_EQ(nullptr, next_executable.service); + EXPECT_EQ(nullptr, next_executable.subscription); + EXPECT_EQ(nullptr, next_executable.waitable); + EXPECT_EQ(nullptr, next_executable.data); + + rclcpp::shutdown(); +} From 733ab4fbc8fb896d4520ac348e65378af31b8120 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Thu, 11 Apr 2024 11:28:23 +0200 Subject: [PATCH 07/12] chore: renamed test cases to snake_case Signed-off-by: Janosch Machowinski --- .../executors/test_executors_callback_group_behavior.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp b/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp index 1b33c1fbb4..f5b189c2af 100644 --- a/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp @@ -59,7 +59,7 @@ class CustomExecutor: public rclcpp::Executor }; -TEST(TestCallbackGroup, ValidCbg) +TEST(TestCallbackGroup, valid_callback_group) { rclcpp::init(0, nullptr); @@ -95,7 +95,7 @@ TEST(TestCallbackGroup, ValidCbg) rclcpp::shutdown(); } -TEST(TestCallbackGroup, InvalidCbg) +TEST(TestCallbackGroup, invalid_callback_group) { rclcpp::init(0, nullptr); From d181c8b5ef8e41f9fd1e7c7717f7156d53964a5a Mon Sep 17 00:00:00 2001 From: William Woodall Date: Fri, 12 Apr 2024 05:25:42 -0700 Subject: [PATCH 08/12] style Signed-off-by: William Woodall --- rclcpp/include/rclcpp/executor.hpp | 9 ++++----- rclcpp/src/rclcpp/executor.cpp | 21 +++++++++++---------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 27f9378e34..642630f381 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -550,15 +550,14 @@ class Executor AnyExecutable & any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + /// This function triggers a recollect of all entities that are registered to the executor. /** - * This function triggers a recollect of all entities - * that are registered to the executor. - * * Calling this function is thread safe. * - * @param notify if true will execute a trigger that will wake up a waiting executor + * \param[in] notify if true will execute a trigger that will wake up a waiting executor */ - void trigger_entity_recollect(bool notify); + void + trigger_entity_recollect(bool notify); /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins. std::atomic_bool spinning; diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index ca094be96f..f65f1da82a 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -129,7 +129,7 @@ void Executor::trigger_entity_recollect(bool notify) this->collect_entities(); } - if(notify) { + if (notify) { interrupt_guard_condition_->trigger(); } } @@ -182,8 +182,8 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt this->trigger_entity_recollect(notify); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( - std::string( - "Failed to trigger guard condition on node add: ") + ex.what()); + std::string( + "Failed to trigger guard condition on node add: ") + ex.what()); } } @@ -198,8 +198,8 @@ Executor::remove_callback_group( this->trigger_entity_recollect(notify); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( - std::string( - "Failed to trigger guard condition on callback group remove: ") + ex.what()); + std::string( + "Failed to trigger guard condition on callback group remove: ") + ex.what()); } } @@ -218,8 +218,8 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node this->trigger_entity_recollect(notify); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( - std::string( - "Failed to trigger guard condition on node remove: ") + ex.what()); + std::string( + "Failed to trigger guard condition on node remove: ") + ex.what()); } } @@ -364,7 +364,8 @@ Executor::execute_any_executable(AnyExecutable & any_exec) return; } - assert((void("Internal error, tried to execute a AnyExecutable without a valid callback group"), + assert( + (void("cannot execute an AnyExecutable without a valid callback group"), any_exec.callback_group)); if (any_exec.timer) { @@ -650,9 +651,9 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) "rclcpp", "empty wait set received in wait(). This should never happen."); } else { - if(this->wait_result_->kind() == WaitResultKind::Ready && notify_waitable_) { + if (this->wait_result_->kind() == WaitResultKind::Ready && notify_waitable_) { auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set(); - if(notify_waitable_->is_ready(rcl_wait_set)) { + if (notify_waitable_->is_ready(rcl_wait_set)) { notify_waitable_->execute(notify_waitable_->take_data()); } } From 8fc57c478b1ea6815d692c8deee47a988f92c392 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Fri, 12 Apr 2024 05:26:56 -0700 Subject: [PATCH 09/12] fixup test to avoid polling and short timeouts Signed-off-by: William Woodall --- ...test_executors_callback_group_behavior.cpp | 59 ++++++++++++------- 1 file changed, 38 insertions(+), 21 deletions(-) diff --git a/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp b/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp index f5b189c2af..ca7b977400 100644 --- a/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp @@ -23,35 +23,42 @@ #include #include +#include + #include #include #include -class CustomExecutor: public rclcpp::Executor +std::chrono::milliseconds g_timer_period {1}; + +class CustomExecutor : public rclcpp::Executor { public: explicit CustomExecutor(const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions()) - : rclcpp::Executor(options) - { - } + : rclcpp::Executor(options) + {} ~CustomExecutor() override = default; - void spin() override {}; + void spin() override {} - void collect() { + void collect() + { this->collect_entities(); } - void wait() { - this->wait_for_work(std::chrono::milliseconds(10)); + void wait() + { + this->wait_for_work(g_timer_period * 10); } - size_t collected_timers() const { + size_t collected_timers() const + { return this->current_collection_.timers.size(); } - rclcpp::AnyExecutable next() { + rclcpp::AnyExecutable next() + { rclcpp::AnyExecutable ret; this->get_next_ready_executable(ret); return ret; @@ -66,16 +73,21 @@ TEST(TestCallbackGroup, valid_callback_group) // Create a timer associated with a callback group auto node = std::make_shared("node"); + std::promise promise; + std::future future = promise.get_future(); + auto timer_callback = [&promise]() { + promise.set_value(); + }; + // Add the callback group to the executor auto executor = CustomExecutor(); - executor.add_node(node); - executor.spin_all(std::chrono::milliseconds(10)); - auto cbg = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, true); - auto timer = node->create_wall_timer(std::chrono::milliseconds(1), [](){}, cbg); + auto timer = node->create_wall_timer(g_timer_period, timer_callback, cbg); executor.add_callback_group(cbg, node->get_node_base_interface()); - executor.spin_all(std::chrono::milliseconds(10)); + ASSERT_EQ( + rclcpp::FutureReturnCode::SUCCESS, + executor.spin_until_future_complete(future, std::chrono::seconds(10))); // Collect the entities executor.collect(); @@ -102,16 +114,21 @@ TEST(TestCallbackGroup, invalid_callback_group) // Create a timer associated with a callback group auto node = std::make_shared("node"); + std::promise promise; + std::future future = promise.get_future(); + auto timer_callback = [&promise]() { + promise.set_value(); + }; + // Add the callback group to the executor auto executor = CustomExecutor(); - executor.add_node(node); - executor.spin_all(std::chrono::milliseconds(10)); - auto cbg = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, false); - auto timer = node->create_wall_timer(std::chrono::milliseconds(1), [](){}, cbg); - executor.add_callback_group(cbg, nullptr); + auto timer = node->create_wall_timer(g_timer_period, timer_callback, cbg); + executor.add_callback_group(cbg, node->get_node_base_interface()); - executor.spin_all(std::chrono::milliseconds(10)); + ASSERT_EQ( + rclcpp::FutureReturnCode::SUCCESS, + executor.spin_until_future_complete(future, std::chrono::seconds(10))); // Collect the entities executor.collect(); From 7a5dd6e39001643c64056d7cb2edcd133eae349f Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Fri, 12 Apr 2024 16:15:08 +0200 Subject: [PATCH 10/12] fix: Use correct notify_waitable_ instance Signed-off-by: Janosch Machowinski --- rclcpp/src/rclcpp/executor.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index c82dd722e7..9e9a737b09 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -651,10 +651,10 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) "rclcpp", "empty wait set received in wait(). This should never happen."); } else { - if (this->wait_result_->kind() == WaitResultKind::Ready && notify_waitable_) { + if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) { auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set(); - if (notify_waitable_->is_ready(rcl_wait_set)) { - notify_waitable_->execute(notify_waitable_->take_data()); + if (current_notify_waitable_->is_ready(rcl_wait_set)) { + current_notify_waitable_->execute(current_notify_waitable_->take_data()); } } } From 7ad43e8391cce34df3b76c2178bbd0feb35e1c04 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Fri, 12 Apr 2024 16:15:58 +0200 Subject: [PATCH 11/12] fix(StaticSingleThreadedExecutor): Added missing special case handling for current_notify_waitable_ Signed-off-by: Janosch Machowinski --- .../rclcpp/executors/static_single_threaded_executor.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index bd5a5a557a..d517ccafd0 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -119,6 +119,13 @@ StaticSingleThreadedExecutor::collect_and_wait(std::chrono::nanoseconds timeout) "rclcpp", "empty wait set received in wait(). This should never happen."); return {}; + } else { + if (wait_result.kind() == WaitResultKind::Ready && current_notify_waitable_) { + auto & rcl_wait_set = wait_result.get_wait_set().get_rcl_wait_set(); + if (current_notify_waitable_->is_ready(rcl_wait_set)) { + current_notify_waitable_->execute(current_notify_waitable_->take_data()); + } + } } return wait_result; } From 0e8eabdaab3adb7c90c656c50c3db695803422ec Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Fri, 12 Apr 2024 16:16:26 +0200 Subject: [PATCH 12/12] fix(TestCallbackGroup): Fixed test after change to timers Signed-off-by: Janosch Machowinski --- .../rclcpp/executors/test_executors_callback_group_behavior.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp b/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp index ca7b977400..49391cd838 100644 --- a/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp @@ -97,12 +97,12 @@ TEST(TestCallbackGroup, valid_callback_group) auto next_executable = executor.next(); EXPECT_EQ(timer, next_executable.timer); EXPECT_EQ(cbg, next_executable.callback_group); + EXPECT_NE(nullptr, next_executable.data); EXPECT_EQ(nullptr, next_executable.client); EXPECT_EQ(nullptr, next_executable.service); EXPECT_EQ(nullptr, next_executable.subscription); EXPECT_EQ(nullptr, next_executable.waitable); - EXPECT_EQ(nullptr, next_executable.data); rclcpp::shutdown(); }