Skip to content

Commit

Permalink
Merge pull request #125 from mauropasse/mauro/galactic-ev-ex-cherry-p…
Browse files Browse the repository at this point in the history
…ick-1741

Galactic: for_each_callback_group backport (ros2#1741)
  • Loading branch information
alsora authored Oct 16, 2023
2 parents 0719ef5 + 5b66912 commit 31030fb
Show file tree
Hide file tree
Showing 14 changed files with 333 additions and 216 deletions.
11 changes: 11 additions & 0 deletions rclcpp/include/rclcpp/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,17 @@ class Node : public std::enable_shared_from_this<Node>
const std::vector<rclcpp::CallbackGroup::WeakPtr> &
get_callback_groups() const;

/// Iterate over the callback groups in the node, calling the given function on each valid one.
/**
* This method is called in a thread-safe way, and also makes sure to only call the given
* function on those items that are still valid.
*
* \param[in] func The callback function to call on each valid callback group.
*/
RCLCPP_PUBLIC
void
for_each_callback_group(const node_interfaces::NodeBaseInterface::CallbackGroupFunction & func);

/// Create and return a Publisher.
/**
* The rclcpp::QoS has several convenient constructors, including a
Expand Down
24 changes: 24 additions & 0 deletions rclcpp/include/rclcpp/node_interfaces/node_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>

#include "rcl/node.h"
#include "rclcpp/callback_group.hpp"
#include "rclcpp/context.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
Expand All @@ -32,12 +34,34 @@ namespace rclcpp
namespace node_interfaces
{

RCLCPP_PUBLIC
void global_for_each_callback_group(
NodeBaseInterface * node_base_interface,
const NodeBaseInterface::CallbackGroupFunction & func);

// Class to hold the global map of mutexes
class map_of_mutexes final
{
public:
// Methods need to be protected by internal mutex
void create_mutex_of_nodebase(const NodeBaseInterface * nodebase);
std::shared_ptr<std::mutex>
get_mutex_of_nodebase(const NodeBaseInterface * nodebase);
void delete_mutex_of_nodebase(const NodeBaseInterface * nodebase);

private:
std::unordered_map<const NodeBaseInterface *, std::shared_ptr<std::mutex>> data_;
std::mutex internal_mutex_;
};

/// Implementation of the NodeBase part of the Node API.
class NodeBase : public NodeBaseInterface
{
public:
RCLCPP_SMART_PTR_ALIASES_ONLY(NodeBase)

static map_of_mutexes map_object;

RCLCPP_PUBLIC
NodeBase(
const std::string & node_name,
Expand Down
2 changes: 2 additions & 0 deletions rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class NodeBaseInterface
public:
RCLCPP_SMART_PTR_ALIASES_ONLY(NodeBaseInterface)

using CallbackGroupFunction = std::function<void (rclcpp::CallbackGroup::SharedPtr)>;

RCLCPP_PUBLIC
virtual
~NodeBaseInterface() = default;
Expand Down
38 changes: 20 additions & 18 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "rclcpp/node.hpp"
#include "rclcpp/scope_exit.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/node_interfaces/node_base.hpp"

#include "rcutils/logging_macros.h"

Expand Down Expand Up @@ -177,14 +178,13 @@ Executor::add_callback_groups_from_nodes_associated_to_executor()
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (node) {
auto group_ptrs = node->get_callback_groups();
std::for_each(
group_ptrs.begin(), group_ptrs.end(),
[this, node](rclcpp::CallbackGroup::WeakPtr group_ptr)
rclcpp::node_interfaces::global_for_each_callback_group(
node.get(),
[this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr)
{
auto shared_group_ptr = group_ptr.lock();
if (shared_group_ptr && shared_group_ptr->automatically_add_to_executor_with_node() &&
!shared_group_ptr->get_associated_with_executor_atomic().load())
if (
shared_group_ptr->automatically_add_to_executor_with_node() &&
!shared_group_ptr->get_associated_with_executor_atomic().load())
{
this->add_callback_group_to_map(
shared_group_ptr,
Expand Down Expand Up @@ -263,18 +263,20 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
throw std::runtime_error("Node has already been added to an executor.");
}
std::lock_guard<std::mutex> guard{mutex_};
for (auto & weak_group : node_ptr->get_callback_groups()) {
auto group_ptr = weak_group.lock();
if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
rclcpp::node_interfaces::global_for_each_callback_group(
node_ptr.get(),
[this, node_ptr, notify](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
this->add_callback_group_to_map(
group_ptr,
node_ptr,
weak_groups_to_nodes_associated_with_executor_,
notify);
}
}
if (!group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
{
this->add_callback_group_to_map(
group_ptr,
node_ptr,
weak_groups_to_nodes_associated_with_executor_,
notify);
}
});

const auto & gc = node_ptr->get_notify_guard_condition();
weak_nodes_to_guard_conditions_[node_ptr] = &gc;
Expand Down
28 changes: 15 additions & 13 deletions rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/executors/static_single_threaded_executor.hpp"
#include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
#include "rclcpp/node_interfaces/node_base.hpp"

using rclcpp::executors::StaticExecutorEntitiesCollector;

Expand Down Expand Up @@ -288,18 +289,21 @@ StaticExecutorEntitiesCollector::add_node(
if (has_executor.exchange(true)) {
throw std::runtime_error("Node has already been added to an executor.");
}
for (const auto & weak_group : node_ptr->get_callback_groups()) {
auto group_ptr = weak_group.lock();
if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
rclcpp::node_interfaces::global_for_each_callback_group(
node_ptr.get(),
[this, node_ptr, &is_new_node](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
is_new_node = (add_callback_group(
if (
!group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
{
is_new_node = (add_callback_group(
group_ptr,
node_ptr,
weak_groups_to_nodes_associated_with_executor_) ||
is_new_node);
}
}
}
});
weak_nodes_.push_back(node_ptr);
return is_new_node;
}
Expand Down Expand Up @@ -467,13 +471,11 @@ StaticExecutorEntitiesCollector::add_callback_groups_from_nodes_associated_to_ex
for (const auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (node) {
auto group_ptrs = node->get_callback_groups();
std::for_each(
group_ptrs.begin(), group_ptrs.end(),
[this, node](rclcpp::CallbackGroup::WeakPtr group_ptr)
rclcpp::node_interfaces::global_for_each_callback_group(
node.get(),
[this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr)
{
auto shared_group_ptr = group_ptr.lock();
if (shared_group_ptr && shared_group_ptr->automatically_add_to_executor_with_node() &&
if (shared_group_ptr->automatically_add_to_executor_with_node() &&
!shared_group_ptr->get_associated_with_executor_atomic().load())
{
add_callback_group(
Expand Down
6 changes: 6 additions & 0 deletions rclcpp/src/rclcpp/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,3 +605,9 @@ Node::get_node_options() const
{
return this->node_options_;
}

void Node::for_each_callback_group(
const node_interfaces::NodeBaseInterface::CallbackGroupFunction & func)
{
rclcpp::node_interfaces::global_for_each_callback_group(node_base_.get(), func);
}
64 changes: 55 additions & 9 deletions rclcpp/src/rclcpp/node_interfaces/node_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class NodeHandleWithContext
};
} // anonymous namespace

using rclcpp::node_interfaces::map_of_mutexes;

NodeBase::NodeBase(
const std::string & node_name,
const std::string & namespace_,
Expand All @@ -135,6 +137,9 @@ NodeBase::NodeBase(
notify_guard_condition_(context),
notify_guard_condition_is_valid_(false)
{
// Generate a mutex for this instance of NodeBase
NodeBase::map_object.create_mutex_of_nodebase(this);

// Create the rcl node and store it in a shared_ptr with a custom destructor.
std::unique_ptr<rcl_node_t> rcl_node(new rcl_node_t(rcl_get_zero_initialized_node()));

Expand Down Expand Up @@ -218,6 +223,8 @@ NodeBase::~NodeBase()
std::lock_guard<std::recursive_mutex> notify_condition_lock(notify_guard_condition_mutex_);
notify_guard_condition_is_valid_ = false;
}

NodeBase::map_object.delete_mutex_of_nodebase(this);
}

const char *
Expand Down Expand Up @@ -273,12 +280,11 @@ NodeBase::create_callback_group(
rclcpp::CallbackGroupType group_type,
bool automatically_add_to_executor_with_node)
{
using rclcpp::CallbackGroup;
using rclcpp::CallbackGroupType;
auto group = CallbackGroup::SharedPtr(
new CallbackGroup(
group_type,
automatically_add_to_executor_with_node));
auto group = std::make_shared<rclcpp::CallbackGroup>(
group_type,
automatically_add_to_executor_with_node);
auto mutex_ptr = NodeBase::map_object.get_mutex_of_nodebase(this);
std::lock_guard<std::mutex> lock(*mutex_ptr);
callback_groups_.push_back(group);
return group;
}
Expand All @@ -292,14 +298,16 @@ NodeBase::get_default_callback_group()
bool
NodeBase::callback_group_in_node(rclcpp::CallbackGroup::SharedPtr group)
{
bool group_belongs_to_this_node = false;
auto mutex_ptr = NodeBase::map_object.get_mutex_of_nodebase(this);
std::lock_guard<std::mutex> lock(*mutex_ptr);

for (auto & weak_group : this->callback_groups_) {
auto cur_group = weak_group.lock();
if (cur_group && (cur_group == group)) {
group_belongs_to_this_node = true;
return true;
}
}
return group_belongs_to_this_node;
return false;
}

const std::vector<rclcpp::CallbackGroup::WeakPtr> &
Expand Down Expand Up @@ -362,3 +370,41 @@ NodeBase::trigger_notify_guard_condition()
{
notify_guard_condition_.trigger();
}

map_of_mutexes NodeBase::map_object = map_of_mutexes();

void map_of_mutexes::create_mutex_of_nodebase(
const rclcpp::node_interfaces::NodeBaseInterface * nodebase)
{
std::lock_guard<std::mutex> guard(this->internal_mutex_);
this->data_.emplace(nodebase, std::make_shared<std::mutex>() );
}

std::shared_ptr<std::mutex> map_of_mutexes::get_mutex_of_nodebase(
const rclcpp::node_interfaces::NodeBaseInterface * nodebase)
{
std::lock_guard<std::mutex> guard(this->internal_mutex_);
return this->data_[nodebase];
}

void map_of_mutexes::delete_mutex_of_nodebase(
const rclcpp::node_interfaces::NodeBaseInterface * nodebase)
{
std::lock_guard<std::mutex> guard(this->internal_mutex_);
this->data_.erase(nodebase);
}

// For each callback group free function implementation
void rclcpp::node_interfaces::global_for_each_callback_group(
NodeBaseInterface * node_base_interface, const NodeBaseInterface::CallbackGroupFunction & func)
{
auto mutex_ptr = NodeBase::map_object.get_mutex_of_nodebase(node_base_interface);
std::lock_guard<std::mutex> lock(*mutex_ptr);

for (const auto & weak_group : node_base_interface->get_callback_groups()) {
auto group = weak_group.lock();
if (group) {
func(group);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,38 +41,38 @@ struct NumberOfEntities
std::unique_ptr<NumberOfEntities> get_number_of_default_entities(rclcpp::Node::SharedPtr node)
{
auto number_of_entities = std::make_unique<NumberOfEntities>();
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
EXPECT_NE(nullptr, group);
if (!group || !group->can_be_taken_from().load()) {
return nullptr;
}
group->find_subscription_ptrs_if(
[&number_of_entities](rclcpp::SubscriptionBase::SharedPtr &)
{
number_of_entities->subscriptions++; return false;
});
group->find_timer_ptrs_if(
[&number_of_entities](rclcpp::TimerBase::SharedPtr &)
{
number_of_entities->timers++; return false;
});
group->find_service_ptrs_if(
[&number_of_entities](rclcpp::ServiceBase::SharedPtr &)
{
number_of_entities->services++; return false;
});
group->find_client_ptrs_if(
[&number_of_entities](rclcpp::ClientBase::SharedPtr &)
{
number_of_entities->clients++; return false;
});
group->find_waitable_ptrs_if(
[&number_of_entities](rclcpp::Waitable::SharedPtr &)
{
number_of_entities->waitables++; return false;
});
}
node->for_each_callback_group(
[&number_of_entities](rclcpp::CallbackGroup::SharedPtr group)
{
if (!group->can_be_taken_from().load()) {
return;
}
group->find_subscription_ptrs_if(
[&number_of_entities](rclcpp::SubscriptionBase::SharedPtr &)
{
number_of_entities->subscriptions++; return false;
});
group->find_timer_ptrs_if(
[&number_of_entities](rclcpp::TimerBase::SharedPtr &)
{
number_of_entities->timers++; return false;
});
group->find_service_ptrs_if(
[&number_of_entities](rclcpp::ServiceBase::SharedPtr &)
{
number_of_entities->services++; return false;
});
group->find_client_ptrs_if(
[&number_of_entities](rclcpp::ClientBase::SharedPtr &)
{
number_of_entities->clients++; return false;
});
group->find_waitable_ptrs_if(
[&number_of_entities](rclcpp::Waitable::SharedPtr &)
{
number_of_entities->waitables++; return false;
});
});

return number_of_entities;
}
Expand Down
Loading

0 comments on commit 31030fb

Please sign in to comment.