From 69711119c22561a72af59bce610603b474744f33 Mon Sep 17 00:00:00 2001 From: Aditya Pande Date: Thu, 29 Jul 2021 15:59:14 -0700 Subject: [PATCH] Added for_each_callback non-virtual method, static members to node_base.hpp, updated the for_each_callback_group wrapper method in lifecycle_node and node.cpp, added a thread safe global map of mutexes in node_base. Implemented the suggestions in preliminary review. --- rclcpp/include/rclcpp/node.hpp | 12 +++ .../rclcpp/node_interfaces/node_base.hpp | 32 ++++++++ .../node_interfaces/node_base_interface.hpp | 2 + rclcpp/src/rclcpp/executor.cpp | 39 +++++----- rclcpp/src/rclcpp/node.cpp | 6 ++ .../src/rclcpp/node_interfaces/node_base.cpp | 77 +++++++++++++++++-- .../rclcpp_lifecycle/lifecycle_node.hpp | 13 ++++ rclcpp_lifecycle/src/lifecycle_node.cpp | 8 ++ 8 files changed, 163 insertions(+), 26 deletions(-) diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 2550403de6..1e19a95247 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -70,6 +70,7 @@ #include "rclcpp/timer.hpp" #include "rclcpp/visibility_control.hpp" + namespace rclcpp { @@ -153,6 +154,17 @@ class Node : public std::enable_shared_from_this const std::vector & get_callback_groups() const; + /// Iterate over the callback groups in the node, calling the given function on each valid one. + /** + * This method is called in a thread-safe way, and also makes sure to only call the given + * function on those items that are still valid. + * + * \param[in] func The callback function to call on each valid callback group. + */ + RCLCPP_PUBLIC + void + for_each_callback_group(const node_interfaces::NodeBaseInterface::CallbackGroupFunction & func); + /// Create and return a Publisher. /** * The rclcpp::QoS has several convenient constructors, including a diff --git a/rclcpp/include/rclcpp/node_interfaces/node_base.hpp b/rclcpp/include/rclcpp/node_interfaces/node_base.hpp index 3f8b0021a0..a17c9f8303 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_base.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_base.hpp @@ -15,27 +15,42 @@ #ifndef RCLCPP__NODE_INTERFACES__NODE_BASE_HPP_ #define RCLCPP__NODE_INTERFACES__NODE_BASE_HPP_ +#include +#include #include +#include #include +#include #include #include "rcl/node.h" +#include "rclcpp/callback_group.hpp" #include "rclcpp/context.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/visibility_control.hpp" + namespace rclcpp { namespace node_interfaces { +class map_of_mutexes; + /// Implementation of the NodeBase part of the Node API. class NodeBase : public NodeBaseInterface { public: RCLCPP_SMART_PTR_ALIASES_ONLY(NodeBase) + static std::unique_ptr map_object_ptr; + static bool map_init_flag; + static std::mutex map_init_flag_mutex; + + // Non virtual method + void for_each_callback_group(const CallbackGroupFunction & func); + RCLCPP_PUBLIC NodeBase( const std::string & node_name, @@ -145,4 +160,21 @@ class NodeBase : public NodeBaseInterface } // namespace node_interfaces } // namespace rclcpp +// Class to hold the global map of mutexes +class rclcpp::node_interfaces::map_of_mutexes +{ + public: + map_of_mutexes(); + ~map_of_mutexes(); + + // Methods need to be protected by internal mutex + void create_mutex_of_nodebase(const rclcpp::node_interfaces::NodeBase* nodebase); + std::shared_ptr get_mutex_of_nodebase(const rclcpp::node_interfaces::NodeBase* nodebase); + void delete_mutex_of_nodebase(const rclcpp::node_interfaces::NodeBase* nodebase); + + // Members + std::unordered_map> data; + std::mutex internal_mutex; +}; + #endif // RCLCPP__NODE_INTERFACES__NODE_BASE_HPP_ diff --git a/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp b/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp index a9c30dd09a..e27e4f18f3 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp @@ -38,6 +38,8 @@ class NodeBaseInterface public: RCLCPP_SMART_PTR_ALIASES_ONLY(NodeBaseInterface) + using CallbackGroupFunction = std::function; + RCLCPP_PUBLIC virtual ~NodeBaseInterface() = default; diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 20876668a4..89a0ad2268 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -30,6 +30,7 @@ #include "rclcpp/node.hpp" #include "rclcpp/scope_exit.hpp" #include "rclcpp/utilities.hpp" +#include "rclcpp/node_interfaces/node_base.hpp" #include "rcutils/logging_macros.h" @@ -190,14 +191,13 @@ Executor::add_callback_groups_from_nodes_associated_to_executor() for (auto & weak_node : weak_nodes_) { auto node = weak_node.lock(); if (node) { - auto group_ptrs = node->get_callback_groups(); - std::for_each( - group_ptrs.begin(), group_ptrs.end(), - [this, node](rclcpp::CallbackGroup::WeakPtr group_ptr) + auto node_base = std::dynamic_pointer_cast(node); + node_base->for_each_callback_group( + [this,node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr) { - auto shared_group_ptr = group_ptr.lock(); - if (shared_group_ptr && shared_group_ptr->automatically_add_to_executor_with_node() && - !shared_group_ptr->get_associated_with_executor_atomic().load()) + if( + shared_group_ptr->automatically_add_to_executor_with_node() && + !shared_group_ptr->get_associated_with_executor_atomic().load()) { this->add_callback_group_to_map( shared_group_ptr, @@ -271,18 +271,21 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt throw std::runtime_error("Node has already been added to an executor."); } std::lock_guard guard{mutex_}; - for (auto & weak_group : node_ptr->get_callback_groups()) { - auto group_ptr = weak_group.lock(); - if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() && - group_ptr->automatically_add_to_executor_with_node()) + auto node_base = std::dynamic_pointer_cast(node_ptr); + node_base->for_each_callback_group( + [this, node_ptr, notify](rclcpp::CallbackGroup::SharedPtr group_ptr) { - this->add_callback_group_to_map( - group_ptr, - node_ptr, - weak_groups_to_nodes_associated_with_executor_, - notify); - } - } + if (!group_ptr->get_associated_with_executor_atomic().load() && + group_ptr->automatically_add_to_executor_with_node()) + { + this->add_callback_group_to_map( + group_ptr, + node_ptr, + weak_groups_to_nodes_associated_with_executor_, + notify); + } + }); + weak_nodes_.push_back(node_ptr); } diff --git a/rclcpp/src/rclcpp/node.cpp b/rclcpp/src/rclcpp/node.cpp index 117702760e..c683d28fcb 100644 --- a/rclcpp/src/rclcpp/node.cpp +++ b/rclcpp/src/rclcpp/node.cpp @@ -605,3 +605,9 @@ Node::get_node_options() const { return this->node_options_; } + +void Node::for_each_callback_group(const node_interfaces::NodeBaseInterface::CallbackGroupFunction & func) +{ + auto node_base = std::dynamic_pointer_cast(node_base_); + node_base->for_each_callback_group(func); +} diff --git a/rclcpp/src/rclcpp/node_interfaces/node_base.cpp b/rclcpp/src/rclcpp/node_interfaces/node_base.cpp index 8fa46abe60..5473e39be6 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_base.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_base.cpp @@ -46,6 +46,18 @@ NodeBase::NodeBase( associated_with_executor_(false), notify_guard_condition_is_valid_(false) { + { + std::lock_guard lock(this->map_init_flag_mutex); + // Initialize map_of_mutex object if this is the first NodeBase instance + if (!this->map_init_flag) { + this->map_object_ptr = std::make_unique(); + this->map_init_flag = true; + } + } + + // Generate a mutex for this instance of NodeBase + this->map_object_ptr->create_mutex_of_nodebase(this); + // Setup the guard condition that is notified when changes occur in the graph. rcl_guard_condition_options_t guard_condition_options = rcl_guard_condition_get_default_options(); rcl_ret_t ret = rcl_guard_condition_init( @@ -166,6 +178,8 @@ NodeBase::~NodeBase() "failed to destroy guard condition: %s", rcl_get_error_string().str); } } + + this->map_object_ptr->delete_mutex_of_nodebase(this); } const char * @@ -221,12 +235,11 @@ NodeBase::create_callback_group( rclcpp::CallbackGroupType group_type, bool automatically_add_to_executor_with_node) { - using rclcpp::CallbackGroup; - using rclcpp::CallbackGroupType; - auto group = CallbackGroup::SharedPtr( - new CallbackGroup( + auto group = std::make_shared( group_type, - automatically_add_to_executor_with_node)); + automatically_add_to_executor_with_node); + auto mutex_ptr = this->map_object_ptr->get_mutex_of_nodebase(this); + std::lock_guard lock(*mutex_ptr); callback_groups_.push_back(group); return group; } @@ -240,14 +253,16 @@ NodeBase::get_default_callback_group() bool NodeBase::callback_group_in_node(rclcpp::CallbackGroup::SharedPtr group) { - bool group_belongs_to_this_node = false; + auto mutex_ptr = this->map_object_ptr->get_mutex_of_nodebase(this); + std::lock_guard lock(*mutex_ptr); + for (auto & weak_group : this->callback_groups_) { auto cur_group = weak_group.lock(); if (cur_group && (cur_group == group)) { - group_belongs_to_this_node = true; + return true; } } - return group_belongs_to_this_node; + return false; } const std::vector & @@ -310,3 +325,49 @@ NodeBase::resolve_topic_or_service_name( allocator.deallocate(output_cstr, allocator.state); return output; } + +std::unique_ptr NodeBase::map_object_ptr; +bool NodeBase::map_init_flag(false); +std::mutex NodeBase::map_init_flag_mutex; + +rclcpp::node_interfaces::map_of_mutexes::map_of_mutexes() +{ +} + +void rclcpp::node_interfaces::map_of_mutexes::create_mutex_of_nodebase(const rclcpp::node_interfaces::NodeBase* nodebase) +{ + std::lock_guard guard(this->internal_mutex); + this->data.emplace(nodebase, std::make_shared() ); +} + +std::shared_ptr rclcpp::node_interfaces::map_of_mutexes::get_mutex_of_nodebase(const rclcpp::node_interfaces::NodeBase* nodebase) +{ + std::lock_guard guard(this->internal_mutex); + return this->data[nodebase]; +} + +void rclcpp::node_interfaces::map_of_mutexes::delete_mutex_of_nodebase(const rclcpp::node_interfaces::NodeBase* nodebase) +{ + std::lock_guard guard(this->internal_mutex); + this->data.erase(nodebase); +} + +rclcpp::node_interfaces::map_of_mutexes::~map_of_mutexes() +{ +} + +// For each callback group implementation +void NodeBase::for_each_callback_group(const CallbackGroupFunction & func) +{ + auto mutex_ptr = this->map_object_ptr->get_mutex_of_nodebase(this); + std::lock_guard lock(*mutex_ptr); + + for (rclcpp::CallbackGroup::WeakPtr & weak_group : this->callback_groups_) { + rclcpp::CallbackGroup::SharedPtr group = weak_group.lock(); + if (group){ + func(group); + } + } + +} + diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp index d81750ba08..621712a6c6 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp @@ -89,6 +89,7 @@ #include "rclcpp_lifecycle/transition.hpp" #include "rclcpp_lifecycle/visibility_control.h" + namespace rclcpp_lifecycle { @@ -200,6 +201,18 @@ class LifecycleNode : public node_interfaces::LifecycleNodeInterface, const std::vector & get_callback_groups() const; + /// Iterate over the callback groups in the node, calling the given function on each valid one. + /** + * This method is called in a thread-safe way, and also makes sure to only call the given + * function on those items that are still valid. + * + * \param[in] func The callback function to call on each valid callback group. + */ + RCLCPP_LIFECYCLE_PUBLIC + void + for_each_callback_group( + const rclcpp::node_interfaces::NodeBaseInterface::CallbackGroupFunction & func); + /// Create and return a Publisher. /** * \param[in] topic_name The topic for this publisher to publish on. diff --git a/rclcpp_lifecycle/src/lifecycle_node.cpp b/rclcpp_lifecycle/src/lifecycle_node.cpp index 2370e43b67..c5919afa9c 100644 --- a/rclcpp_lifecycle/src/lifecycle_node.cpp +++ b/rclcpp_lifecycle/src/lifecycle_node.cpp @@ -651,4 +651,12 @@ LifecycleNode::add_timer_handle(std::shared_ptr timer) impl_->add_timer_handle(timer); } +void +LifecycleNode::for_each_callback_group( + const rclcpp::node_interfaces::NodeBaseInterface::CallbackGroupFunction & func) +{ + auto node_base = std::dynamic_pointer_cast(node_base_); + node_base->for_each_callback_group(func); +} + } // namespace rclcpp_lifecycle