From 5b669126b25676f5514cf87f34c91c88e636f7cf Mon Sep 17 00:00:00 2001 From: Aditya Pande Date: Thu, 19 Aug 2021 14:41:19 -0600 Subject: [PATCH] Galactic: for_each_callback_group backport (#1741) Added thread safe for_each_callback_group method Signed-off-by: Aditya Pande --- rclcpp/include/rclcpp/node.hpp | 11 ++ .../rclcpp/node_interfaces/node_base.hpp | 24 +++ .../node_interfaces/node_base_interface.hpp | 2 + rclcpp/src/rclcpp/executor.cpp | 38 +++-- .../static_executor_entities_collector.cpp | 28 ++-- rclcpp/src/rclcpp/node.cpp | 6 + .../src/rclcpp/node_interfaces/node_base.cpp | 64 +++++++- ...est_static_executor_entities_collector.cpp | 64 ++++---- .../test_allocator_memory_strategy.cpp | 154 ++++++++---------- rclcpp/test/rclcpp/test_memory_strategy.cpp | 92 +++++------ rclcpp/test/rclcpp/test_node.cpp | 26 ++- .../rclcpp_lifecycle/lifecycle_node.hpp | 12 ++ rclcpp_lifecycle/src/lifecycle_node.cpp | 7 + rclcpp_lifecycle/test/test_lifecycle_node.cpp | 21 ++- 14 files changed, 333 insertions(+), 216 deletions(-) diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index b2156a0068..d8ca3106af 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -153,6 +153,17 @@ class Node : public std::enable_shared_from_this const std::vector & get_callback_groups() const; + /// Iterate over the callback groups in the node, calling the given function on each valid one. + /** + * This method is called in a thread-safe way, and also makes sure to only call the given + * function on those items that are still valid. + * + * \param[in] func The callback function to call on each valid callback group. + */ + RCLCPP_PUBLIC + void + for_each_callback_group(const node_interfaces::NodeBaseInterface::CallbackGroupFunction & func); + /// Create and return a Publisher. /** * The rclcpp::QoS has several convenient constructors, including a diff --git a/rclcpp/include/rclcpp/node_interfaces/node_base.hpp b/rclcpp/include/rclcpp/node_interfaces/node_base.hpp index 0fbc86d521..6db0e5d1c7 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_base.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_base.hpp @@ -19,9 +19,11 @@ #include #include #include +#include #include #include "rcl/node.h" +#include "rclcpp/callback_group.hpp" #include "rclcpp/context.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" @@ -32,12 +34,34 @@ namespace rclcpp namespace node_interfaces { +RCLCPP_PUBLIC +void global_for_each_callback_group( + NodeBaseInterface * node_base_interface, + const NodeBaseInterface::CallbackGroupFunction & func); + +// Class to hold the global map of mutexes +class map_of_mutexes final +{ +public: + // Methods need to be protected by internal mutex + void create_mutex_of_nodebase(const NodeBaseInterface * nodebase); + std::shared_ptr + get_mutex_of_nodebase(const NodeBaseInterface * nodebase); + void delete_mutex_of_nodebase(const NodeBaseInterface * nodebase); + +private: + std::unordered_map> data_; + std::mutex internal_mutex_; +}; + /// Implementation of the NodeBase part of the Node API. class NodeBase : public NodeBaseInterface { public: RCLCPP_SMART_PTR_ALIASES_ONLY(NodeBase) + static map_of_mutexes map_object; + RCLCPP_PUBLIC NodeBase( const std::string & node_name, diff --git a/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp b/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp index cf738857ac..0de57c2ad0 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp @@ -39,6 +39,8 @@ class NodeBaseInterface public: RCLCPP_SMART_PTR_ALIASES_ONLY(NodeBaseInterface) + using CallbackGroupFunction = std::function; + RCLCPP_PUBLIC virtual ~NodeBaseInterface() = default; diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 321a614643..007da64f3d 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -30,6 +30,7 @@ #include "rclcpp/node.hpp" #include "rclcpp/scope_exit.hpp" #include "rclcpp/utilities.hpp" +#include "rclcpp/node_interfaces/node_base.hpp" #include "rcutils/logging_macros.h" @@ -177,14 +178,13 @@ Executor::add_callback_groups_from_nodes_associated_to_executor() for (auto & weak_node : weak_nodes_) { auto node = weak_node.lock(); if (node) { - auto group_ptrs = node->get_callback_groups(); - std::for_each( - group_ptrs.begin(), group_ptrs.end(), - [this, node](rclcpp::CallbackGroup::WeakPtr group_ptr) + rclcpp::node_interfaces::global_for_each_callback_group( + node.get(), + [this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr) { - auto shared_group_ptr = group_ptr.lock(); - if (shared_group_ptr && shared_group_ptr->automatically_add_to_executor_with_node() && - !shared_group_ptr->get_associated_with_executor_atomic().load()) + if ( + shared_group_ptr->automatically_add_to_executor_with_node() && + !shared_group_ptr->get_associated_with_executor_atomic().load()) { this->add_callback_group_to_map( shared_group_ptr, @@ -263,18 +263,20 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt throw std::runtime_error("Node has already been added to an executor."); } std::lock_guard guard{mutex_}; - for (auto & weak_group : node_ptr->get_callback_groups()) { - auto group_ptr = weak_group.lock(); - if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() && - group_ptr->automatically_add_to_executor_with_node()) + rclcpp::node_interfaces::global_for_each_callback_group( + node_ptr.get(), + [this, node_ptr, notify](rclcpp::CallbackGroup::SharedPtr group_ptr) { - this->add_callback_group_to_map( - group_ptr, - node_ptr, - weak_groups_to_nodes_associated_with_executor_, - notify); - } - } + if (!group_ptr->get_associated_with_executor_atomic().load() && + group_ptr->automatically_add_to_executor_with_node()) + { + this->add_callback_group_to_map( + group_ptr, + node_ptr, + weak_groups_to_nodes_associated_with_executor_, + notify); + } + }); const auto & gc = node_ptr->get_notify_guard_condition(); weak_nodes_to_guard_conditions_[node_ptr] = &gc; diff --git a/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp index d2c1c8373f..4c03693c35 100644 --- a/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp +++ b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp @@ -24,6 +24,7 @@ #include "rclcpp/memory_strategy.hpp" #include "rclcpp/executors/static_single_threaded_executor.hpp" #include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp" +#include "rclcpp/node_interfaces/node_base.hpp" using rclcpp::executors::StaticExecutorEntitiesCollector; @@ -288,18 +289,21 @@ StaticExecutorEntitiesCollector::add_node( if (has_executor.exchange(true)) { throw std::runtime_error("Node has already been added to an executor."); } - for (const auto & weak_group : node_ptr->get_callback_groups()) { - auto group_ptr = weak_group.lock(); - if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() && - group_ptr->automatically_add_to_executor_with_node()) + rclcpp::node_interfaces::global_for_each_callback_group( + node_ptr.get(), + [this, node_ptr, &is_new_node](rclcpp::CallbackGroup::SharedPtr group_ptr) { - is_new_node = (add_callback_group( + if ( + !group_ptr->get_associated_with_executor_atomic().load() && + group_ptr->automatically_add_to_executor_with_node()) + { + is_new_node = (add_callback_group( group_ptr, node_ptr, weak_groups_to_nodes_associated_with_executor_) || is_new_node); - } - } + } + }); weak_nodes_.push_back(node_ptr); return is_new_node; } @@ -467,13 +471,11 @@ StaticExecutorEntitiesCollector::add_callback_groups_from_nodes_associated_to_ex for (const auto & weak_node : weak_nodes_) { auto node = weak_node.lock(); if (node) { - auto group_ptrs = node->get_callback_groups(); - std::for_each( - group_ptrs.begin(), group_ptrs.end(), - [this, node](rclcpp::CallbackGroup::WeakPtr group_ptr) + rclcpp::node_interfaces::global_for_each_callback_group( + node.get(), + [this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr) { - auto shared_group_ptr = group_ptr.lock(); - if (shared_group_ptr && shared_group_ptr->automatically_add_to_executor_with_node() && + if (shared_group_ptr->automatically_add_to_executor_with_node() && !shared_group_ptr->get_associated_with_executor_atomic().load()) { add_callback_group( diff --git a/rclcpp/src/rclcpp/node.cpp b/rclcpp/src/rclcpp/node.cpp index 117702760e..c439769675 100644 --- a/rclcpp/src/rclcpp/node.cpp +++ b/rclcpp/src/rclcpp/node.cpp @@ -605,3 +605,9 @@ Node::get_node_options() const { return this->node_options_; } + +void Node::for_each_callback_group( + const node_interfaces::NodeBaseInterface::CallbackGroupFunction & func) +{ + rclcpp::node_interfaces::global_for_each_callback_group(node_base_.get(), func); +} diff --git a/rclcpp/src/rclcpp/node_interfaces/node_base.cpp b/rclcpp/src/rclcpp/node_interfaces/node_base.cpp index 4d73595bfd..7f929646eb 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_base.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_base.cpp @@ -119,6 +119,8 @@ class NodeHandleWithContext }; } // anonymous namespace +using rclcpp::node_interfaces::map_of_mutexes; + NodeBase::NodeBase( const std::string & node_name, const std::string & namespace_, @@ -135,6 +137,9 @@ NodeBase::NodeBase( notify_guard_condition_(context), notify_guard_condition_is_valid_(false) { + // Generate a mutex for this instance of NodeBase + NodeBase::map_object.create_mutex_of_nodebase(this); + // Create the rcl node and store it in a shared_ptr with a custom destructor. std::unique_ptr rcl_node(new rcl_node_t(rcl_get_zero_initialized_node())); @@ -218,6 +223,8 @@ NodeBase::~NodeBase() std::lock_guard notify_condition_lock(notify_guard_condition_mutex_); notify_guard_condition_is_valid_ = false; } + + NodeBase::map_object.delete_mutex_of_nodebase(this); } const char * @@ -273,12 +280,11 @@ NodeBase::create_callback_group( rclcpp::CallbackGroupType group_type, bool automatically_add_to_executor_with_node) { - using rclcpp::CallbackGroup; - using rclcpp::CallbackGroupType; - auto group = CallbackGroup::SharedPtr( - new CallbackGroup( - group_type, - automatically_add_to_executor_with_node)); + auto group = std::make_shared( + group_type, + automatically_add_to_executor_with_node); + auto mutex_ptr = NodeBase::map_object.get_mutex_of_nodebase(this); + std::lock_guard lock(*mutex_ptr); callback_groups_.push_back(group); return group; } @@ -292,14 +298,16 @@ NodeBase::get_default_callback_group() bool NodeBase::callback_group_in_node(rclcpp::CallbackGroup::SharedPtr group) { - bool group_belongs_to_this_node = false; + auto mutex_ptr = NodeBase::map_object.get_mutex_of_nodebase(this); + std::lock_guard lock(*mutex_ptr); + for (auto & weak_group : this->callback_groups_) { auto cur_group = weak_group.lock(); if (cur_group && (cur_group == group)) { - group_belongs_to_this_node = true; + return true; } } - return group_belongs_to_this_node; + return false; } const std::vector & @@ -362,3 +370,41 @@ NodeBase::trigger_notify_guard_condition() { notify_guard_condition_.trigger(); } + +map_of_mutexes NodeBase::map_object = map_of_mutexes(); + +void map_of_mutexes::create_mutex_of_nodebase( + const rclcpp::node_interfaces::NodeBaseInterface * nodebase) +{ + std::lock_guard guard(this->internal_mutex_); + this->data_.emplace(nodebase, std::make_shared() ); +} + +std::shared_ptr map_of_mutexes::get_mutex_of_nodebase( + const rclcpp::node_interfaces::NodeBaseInterface * nodebase) +{ + std::lock_guard guard(this->internal_mutex_); + return this->data_[nodebase]; +} + +void map_of_mutexes::delete_mutex_of_nodebase( + const rclcpp::node_interfaces::NodeBaseInterface * nodebase) +{ + std::lock_guard guard(this->internal_mutex_); + this->data_.erase(nodebase); +} + +// For each callback group free function implementation +void rclcpp::node_interfaces::global_for_each_callback_group( + NodeBaseInterface * node_base_interface, const NodeBaseInterface::CallbackGroupFunction & func) +{ + auto mutex_ptr = NodeBase::map_object.get_mutex_of_nodebase(node_base_interface); + std::lock_guard lock(*mutex_ptr); + + for (const auto & weak_group : node_base_interface->get_callback_groups()) { + auto group = weak_group.lock(); + if (group) { + func(group); + } + } +} diff --git a/rclcpp/test/rclcpp/executors/test_static_executor_entities_collector.cpp b/rclcpp/test/rclcpp/executors/test_static_executor_entities_collector.cpp index 35e2c9ba18..f43eb65104 100644 --- a/rclcpp/test/rclcpp/executors/test_static_executor_entities_collector.cpp +++ b/rclcpp/test/rclcpp/executors/test_static_executor_entities_collector.cpp @@ -41,38 +41,38 @@ struct NumberOfEntities std::unique_ptr get_number_of_default_entities(rclcpp::Node::SharedPtr node) { auto number_of_entities = std::make_unique(); - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - EXPECT_NE(nullptr, group); - if (!group || !group->can_be_taken_from().load()) { - return nullptr; - } - group->find_subscription_ptrs_if( - [&number_of_entities](rclcpp::SubscriptionBase::SharedPtr &) - { - number_of_entities->subscriptions++; return false; - }); - group->find_timer_ptrs_if( - [&number_of_entities](rclcpp::TimerBase::SharedPtr &) - { - number_of_entities->timers++; return false; - }); - group->find_service_ptrs_if( - [&number_of_entities](rclcpp::ServiceBase::SharedPtr &) - { - number_of_entities->services++; return false; - }); - group->find_client_ptrs_if( - [&number_of_entities](rclcpp::ClientBase::SharedPtr &) - { - number_of_entities->clients++; return false; - }); - group->find_waitable_ptrs_if( - [&number_of_entities](rclcpp::Waitable::SharedPtr &) - { - number_of_entities->waitables++; return false; - }); - } + node->for_each_callback_group( + [&number_of_entities](rclcpp::CallbackGroup::SharedPtr group) + { + if (!group->can_be_taken_from().load()) { + return; + } + group->find_subscription_ptrs_if( + [&number_of_entities](rclcpp::SubscriptionBase::SharedPtr &) + { + number_of_entities->subscriptions++; return false; + }); + group->find_timer_ptrs_if( + [&number_of_entities](rclcpp::TimerBase::SharedPtr &) + { + number_of_entities->timers++; return false; + }); + group->find_service_ptrs_if( + [&number_of_entities](rclcpp::ServiceBase::SharedPtr &) + { + number_of_entities->services++; return false; + }); + group->find_client_ptrs_if( + [&number_of_entities](rclcpp::ClientBase::SharedPtr &) + { + number_of_entities->clients++; return false; + }); + group->find_waitable_ptrs_if( + [&number_of_entities](rclcpp::Waitable::SharedPtr &) + { + number_of_entities->waitables++; return false; + }); + }); return number_of_entities; } diff --git a/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp b/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp index 2ce5c03715..931428344a 100644 --- a/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp +++ b/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp @@ -23,6 +23,7 @@ #include "rclcpp/scope_exit.hpp" #include "rclcpp/strategies/allocator_memory_strategy.hpp" +#include "rclcpp/node_interfaces/node_base.hpp" #include "test_msgs/msg/empty.hpp" #include "test_msgs/srv/empty.hpp" @@ -114,12 +115,11 @@ class TestAllocatorMemoryStrategy : public ::testing::Test { auto node = std::make_shared(name, "ns"); - for (auto & group_weak_ptr : node->get_callback_groups()) { - auto group = group_weak_ptr.lock(); - if (group) { + node->for_each_callback_group( + [](rclcpp::CallbackGroup::SharedPtr group) + { group->can_be_taken_from() = false; - } - } + }); return node; } @@ -201,15 +201,13 @@ class TestAllocatorMemoryStrategy : public ::testing::Test const RclWaitSetSizes & expected) { WeakCallbackGroupsToNodesMap weak_groups_to_nodes; - auto callback_groups = node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, - &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node->for_each_callback_group( + [node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node->get_node_base_interface())); }); allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); @@ -234,15 +232,13 @@ class TestAllocatorMemoryStrategy : public ::testing::Test const RclWaitSetSizes & insufficient_capacities) { WeakCallbackGroupsToNodesMap weak_groups_to_nodes; - auto callback_groups = node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, - &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node->for_each_callback_group( + [node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node->get_node_base_interface())); }); allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); @@ -316,26 +312,22 @@ class TestAllocatorMemoryStrategy : public ::testing::Test { auto basic_node = create_node_with_disabled_callback_groups("basic_node"); WeakCallbackGroupsToNodesMap weak_groups_to_nodes; - auto callback_groups = basic_node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, - &basic_node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + basic_node->for_each_callback_group( + [basic_node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, basic_node->get_node_base_interface())); }); - auto callback_groups1 = node_with_entity1->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups1.begin(), callback_groups1.end(), - [&weak_groups_to_nodes, - &node_with_entity1](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node_with_entity1->for_each_callback_group( + [node_with_entity1, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node_with_entity1->get_node_base_interface())); }); allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); @@ -348,26 +340,23 @@ class TestAllocatorMemoryStrategy : public ::testing::Test auto basic_node2 = std::make_shared("basic_node2", "ns"); WeakCallbackGroupsToNodesMap weak_groups_to_uncollected_nodes; - auto callback_groups2 = basic_node2->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups2.begin(), callback_groups2.end(), - [&weak_groups_to_uncollected_nodes, - &basic_node2](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + basic_node2->for_each_callback_group( + [basic_node2, &weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_uncollected_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, basic_node2->get_node_base_interface())); }); - auto callback_groups3 = node_with_entity2->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups3.begin(), callback_groups3.end(), - [&weak_groups_to_uncollected_nodes, - &node_with_entity2](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node_with_entity2->for_each_callback_group( + [node_with_entity2, + &weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_uncollected_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node_with_entity2->get_node_base_interface())); }); rclcpp::AnyExecutable failed_result = get_next_entity_func(weak_groups_to_uncollected_nodes); @@ -388,24 +377,24 @@ class TestAllocatorMemoryStrategy : public ::testing::Test auto basic_node_base = basic_node->get_node_base_interface(); auto node_base = node_with_entity->get_node_base_interface(); WeakCallbackGroupsToNodesMap weak_groups_to_nodes; - auto callback_groups = basic_node_base->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &basic_node_base](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + rclcpp::node_interfaces::global_for_each_callback_group( + basic_node_base.get(), + [basic_node_base, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, basic_node_base)); }); - callback_groups = node_base->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &node_base](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + rclcpp::node_interfaces::global_for_each_callback_group( + node_base.get(), + [node_base, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node_base)); }); allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); @@ -447,14 +436,13 @@ class TestAllocatorMemoryStrategy : public ::testing::Test TEST_F(TestAllocatorMemoryStrategy, construct_destruct) { auto basic_node = create_node_with_disabled_callback_groups("basic_node"); WeakCallbackGroupsToNodesMap weak_groups_to_nodes; - auto callback_groups = basic_node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &basic_node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + basic_node->for_each_callback_group( + [basic_node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, basic_node->get_node_base_interface())); }); allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); @@ -546,14 +534,13 @@ TEST_F(TestAllocatorMemoryStrategy, number_of_entities_with_timer) { TEST_F(TestAllocatorMemoryStrategy, add_handles_to_wait_set_bad_arguments) { auto node = create_node_with_subscription("subscription_node"); WeakCallbackGroupsToNodesMap weak_groups_to_nodes; - auto callback_groups = node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node->for_each_callback_group( + [node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node->get_node_base_interface())); }); allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); @@ -780,14 +767,13 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_subscription_out_of_scope) { test_msgs::msg::Empty, decltype(subscription_callback)>( "topic", qos, std::move(subscription_callback), subscription_options); - auto callback_groups = node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node->for_each_callback_group( + [node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node->get_node_base_interface())); }); allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); @@ -814,14 +800,13 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_service_out_of_scope) { auto service = node->create_service( "service", std::move(service_callback), rmw_qos_profile_services_default, callback_group); - auto callback_groups = node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node->for_each_callback_group( + [node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node->get_node_base_interface())); }); allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); @@ -836,14 +821,13 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_service_out_of_scope) { TEST_F(TestAllocatorMemoryStrategy, get_next_client_out_of_scope) { auto node = create_node_with_disabled_callback_groups("node"); WeakCallbackGroupsToNodesMap weak_groups_to_nodes; - auto callback_groups = node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node->for_each_callback_group( + [node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node->get_node_base_interface())); }); // Force client to go out of scope and cleanup after collecting entities. @@ -879,14 +863,13 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_timer_out_of_scope) { rclcpp::CallbackGroupType::MutuallyExclusive); auto timer = node->create_wall_timer( std::chrono::seconds(10), []() {}, callback_group); - auto callback_groups = node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node->for_each_callback_group( + [node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node->get_node_base_interface())); }); allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); @@ -906,14 +889,13 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_waitable_out_of_scope) { auto callback_group = node->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive); - auto callback_groups = node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node->for_each_callback_group( + [node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node->get_node_base_interface())); }); allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); diff --git a/rclcpp/test/rclcpp/test_memory_strategy.cpp b/rclcpp/test/rclcpp/test_memory_strategy.cpp index 93fa0c8d5e..3993014d05 100644 --- a/rclcpp/test/rclcpp/test_memory_strategy.cpp +++ b/rclcpp/test/rclcpp/test_memory_strategy.cpp @@ -21,6 +21,7 @@ #include "rclcpp/strategies/allocator_memory_strategy.hpp" #include "rclcpp/memory_strategy.hpp" +#include "rclcpp/node_interfaces/node_base.hpp" #include "test_msgs/msg/empty.hpp" #include "test_msgs/srv/empty.hpp" @@ -84,14 +85,13 @@ TEST_F(TestMemoryStrategy, get_subscription_by_handle) { rclcpp::SubscriptionBase::SharedPtr found_subscription = nullptr; { auto node = std::make_shared("node", "ns"); - auto callback_groups = node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node->for_each_callback_group( + [node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node->get_node_base_interface())); }); memory_strategy()->collect_entities(weak_groups_to_nodes); @@ -131,14 +131,13 @@ TEST_F(TestMemoryStrategy, get_service_by_handle) { rclcpp::ServiceBase::SharedPtr found_service = nullptr; { auto node = std::make_shared("node", "ns"); - auto callback_groups = node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node->for_each_callback_group( + [node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node->get_node_base_interface())); }); memory_strategy()->collect_entities(weak_groups_to_nodes); @@ -184,14 +183,13 @@ TEST_F(TestMemoryStrategy, get_client_by_handle) { rclcpp::ClientBase::SharedPtr found_client = nullptr; { auto node = std::make_shared("node", "ns"); - auto callback_groups = node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node->for_each_callback_group( + [node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node->get_node_base_interface())); }); memory_strategy()->collect_entities(weak_groups_to_nodes); @@ -232,14 +230,13 @@ TEST_F(TestMemoryStrategy, get_timer_by_handle) { rclcpp::TimerBase::SharedPtr found_timer = nullptr; { auto node = std::make_shared("node", "ns"); - auto callback_groups = node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node->for_each_callback_group( + [node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node->get_node_base_interface())); }); memory_strategy()->collect_entities(weak_groups_to_nodes); @@ -281,14 +278,14 @@ TEST_F(TestMemoryStrategy, get_node_by_group) { { auto node = std::make_shared("node", "ns"); auto node_handle = node->get_node_base_interface(); - auto callback_groups = node_handle->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &node_handle](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + rclcpp::node_interfaces::global_for_each_callback_group( + node_handle.get(), + [node_handle, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node_handle)); }); memory_strategy()->collect_entities(weak_groups_to_nodes); @@ -325,14 +322,13 @@ TEST_F(TestMemoryStrategy, get_group_by_subscription) { rclcpp::CallbackGroup::SharedPtr callback_group = nullptr; { auto node = std::make_shared("node", "ns"); - auto callback_groups = node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node->for_each_callback_group( + [node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node->get_node_base_interface())); }); memory_strategy()->collect_entities(weak_groups_to_nodes); @@ -379,14 +375,13 @@ TEST_F(TestMemoryStrategy, get_group_by_service) { rclcpp::ServiceBase::SharedPtr service = nullptr; { auto node = std::make_shared("node", "ns"); - auto callback_groups = node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node->for_each_callback_group( + [node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node->get_node_base_interface())); }); memory_strategy()->collect_entities(weak_groups_to_nodes); @@ -424,14 +419,13 @@ TEST_F(TestMemoryStrategy, get_group_by_client) { rclcpp::ClientBase::SharedPtr client = nullptr; { auto node = std::make_shared("node", "ns"); - auto callback_groups = node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node->for_each_callback_group( + [node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node->get_node_base_interface())); }); memory_strategy()->collect_entities(weak_groups_to_nodes); @@ -464,14 +458,13 @@ TEST_F(TestMemoryStrategy, get_group_by_timer) { rclcpp::TimerBase::SharedPtr timer = nullptr; { auto node = std::make_shared("node", "ns"); - auto callback_groups = node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node->for_each_callback_group( + [node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node->get_node_base_interface())); }); memory_strategy()->collect_entities(weak_groups_to_nodes); @@ -504,14 +497,13 @@ TEST_F(TestMemoryStrategy, get_group_by_waitable) { rclcpp::Waitable::SharedPtr waitable = nullptr; { auto node = std::make_shared("node", "ns"); - auto callback_groups = node->get_node_base_interface()->get_callback_groups(); - std::for_each( - callback_groups.begin(), callback_groups.end(), - [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + node->for_each_callback_group( + [node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { weak_groups_to_nodes.insert( std::pair( - weak_group_ptr, + group_ptr, node->get_node_base_interface())); }); memory_strategy()->collect_entities(weak_groups_to_nodes); diff --git a/rclcpp/test/rclcpp/test_node.cpp b/rclcpp/test/rclcpp/test_node.cpp index 0fcbca5dfe..1f6d88d8ee 100644 --- a/rclcpp/test/rclcpp/test_node.cpp +++ b/rclcpp/test/rclcpp/test_node.cpp @@ -2667,13 +2667,33 @@ TEST_F(TestNode, get_publishers_subscriptions_info_by_topic) { TEST_F(TestNode, callback_groups) { auto node = std::make_shared("node", "ns"); - size_t num_callback_groups_in_basic_node = node->get_callback_groups().size(); + size_t num_callback_groups_in_basic_node = 0; + node->for_each_callback_group( + [&num_callback_groups_in_basic_node](rclcpp::CallbackGroup::SharedPtr group) + { + (void)group; + num_callback_groups_in_basic_node++; + }); auto group1 = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); - EXPECT_EQ(1u + num_callback_groups_in_basic_node, node->get_callback_groups().size()); + size_t num_callback_groups = 0; + node->for_each_callback_group( + [&num_callback_groups](rclcpp::CallbackGroup::SharedPtr group) + { + (void)group; + num_callback_groups++; + }); + EXPECT_EQ(1u + num_callback_groups_in_basic_node, num_callback_groups); auto group2 = node->create_callback_group(rclcpp::CallbackGroupType::Reentrant); - EXPECT_EQ(2u + num_callback_groups_in_basic_node, node->get_callback_groups().size()); + size_t num_callback_groups2 = 0; + node->for_each_callback_group( + [&num_callback_groups2](rclcpp::CallbackGroup::SharedPtr group) + { + (void)group; + num_callback_groups2++; + }); + EXPECT_EQ(2u + num_callback_groups_in_basic_node, num_callback_groups2); } // This is tested more thoroughly in node_interfaces/test_node_graph diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp index d81750ba08..3276d47264 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp @@ -200,6 +200,18 @@ class LifecycleNode : public node_interfaces::LifecycleNodeInterface, const std::vector & get_callback_groups() const; + /// Iterate over the callback groups in the node, calling the given function on each valid one. + /** + * This method is called in a thread-safe way, and also makes sure to only call the given + * function on those items that are still valid. + * + * \param[in] func The callback function to call on each valid callback group. + */ + RCLCPP_LIFECYCLE_PUBLIC + void + for_each_callback_group( + const rclcpp::node_interfaces::NodeBaseInterface::CallbackGroupFunction & func); + /// Create and return a Publisher. /** * \param[in] topic_name The topic for this publisher to publish on. diff --git a/rclcpp_lifecycle/src/lifecycle_node.cpp b/rclcpp_lifecycle/src/lifecycle_node.cpp index 2370e43b67..417a265bd8 100644 --- a/rclcpp_lifecycle/src/lifecycle_node.cpp +++ b/rclcpp_lifecycle/src/lifecycle_node.cpp @@ -651,4 +651,11 @@ LifecycleNode::add_timer_handle(std::shared_ptr timer) impl_->add_timer_handle(timer); } +void +LifecycleNode::for_each_callback_group( + const rclcpp::node_interfaces::NodeBaseInterface::CallbackGroupFunction & func) +{ + rclcpp::node_interfaces::global_for_each_callback_group(node_base_.get(), func); +} + } // namespace rclcpp_lifecycle diff --git a/rclcpp_lifecycle/test/test_lifecycle_node.cpp b/rclcpp_lifecycle/test/test_lifecycle_node.cpp index 7f39b3e7cc..7ee883a5f0 100644 --- a/rclcpp_lifecycle/test/test_lifecycle_node.cpp +++ b/rclcpp_lifecycle/test/test_lifecycle_node.cpp @@ -717,16 +717,27 @@ TEST_F(TestDefaultStateMachine, test_graph_services_by_node) { TEST_F(TestDefaultStateMachine, test_callback_groups) { auto test_node = std::make_shared("testnode"); - auto groups = test_node->get_callback_groups(); - EXPECT_EQ(groups.size(), 1u); + size_t num_groups = 0; + test_node->for_each_callback_group( + [&num_groups](rclcpp::CallbackGroup::SharedPtr group_ptr) + { + (void)group_ptr; + num_groups++; + }); + EXPECT_EQ(num_groups, 1u); auto group = test_node->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive, true); EXPECT_NE(nullptr, group); - groups = test_node->get_callback_groups(); - EXPECT_EQ(groups.size(), 2u); - EXPECT_EQ(groups[1].lock().get(), group.get()); + num_groups = 0; + test_node->for_each_callback_group( + [&num_groups](rclcpp::CallbackGroup::SharedPtr group_ptr) + { + (void)group_ptr; + num_groups++; + }); + EXPECT_EQ(num_groups, 2u); } TEST_F(TestDefaultStateMachine, wait_for_graph_change)