Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Galactic: for_each_callback_group backport #1741

Merged
merged 12 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions rclcpp/include/rclcpp/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,17 @@ class Node : public std::enable_shared_from_this<Node>
const std::vector<rclcpp::CallbackGroup::WeakPtr> &
get_callback_groups() const;

/// Iterate over the callback groups in the node, calling the given function on each valid one.
/**
* This method is called in a thread-safe way, and also makes sure to only call the given
* function on those items that are still valid.
*
* \param[in] func The callback function to call on each valid callback group.
*/
RCLCPP_PUBLIC
void
for_each_callback_group(const node_interfaces::NodeBaseInterface::CallbackGroupFunction & func);

/// Create and return a Publisher.
/**
* The rclcpp::QoS has several convenient constructors, including a
Expand Down
26 changes: 26 additions & 0 deletions rclcpp/include/rclcpp/node_interfaces/node_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,52 @@
#define RCLCPP__NODE_INTERFACES__NODE_BASE_HPP_

#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>

#include "rcl/node.h"
#include "rclcpp/callback_group.hpp"
#include "rclcpp/context.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/visibility_control.hpp"


adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
namespace rclcpp
{
namespace node_interfaces
{

void global_for_each_callback_group(
NodeBaseInterface * node_base_interface,
const NodeBaseInterface::CallbackGroupFunction & func);

// Class to hold the global map of mutexes
class map_of_mutexes final
{
public:
// Methods need to be protected by internal mutex
void create_mutex_of_nodebase(const NodeBaseInterface * nodebase);
std::shared_ptr<std::mutex>
get_mutex_of_nodebase(const NodeBaseInterface * nodebase);
void delete_mutex_of_nodebase(const NodeBaseInterface * nodebase);

private:
// Members
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
std::unordered_map<const NodeBaseInterface *, std::shared_ptr<std::mutex>> data;
std::mutex internal_mutex;
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
};

/// Implementation of the NodeBase part of the Node API.
class NodeBase : public NodeBaseInterface
{
public:
RCLCPP_SMART_PTR_ALIASES_ONLY(NodeBase)

static map_of_mutexes map_object;

RCLCPP_PUBLIC
NodeBase(
const std::string & node_name,
Expand Down
2 changes: 2 additions & 0 deletions rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class NodeBaseInterface
public:
RCLCPP_SMART_PTR_ALIASES_ONLY(NodeBaseInterface)

using CallbackGroupFunction = std::function<void (rclcpp::CallbackGroup::SharedPtr)>;

RCLCPP_PUBLIC
virtual
~NodeBaseInterface() = default;
Expand Down
39 changes: 21 additions & 18 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "rclcpp/node.hpp"
#include "rclcpp/scope_exit.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/node_interfaces/node_base.hpp"

#include "rcutils/logging_macros.h"

Expand Down Expand Up @@ -190,14 +191,13 @@ Executor::add_callback_groups_from_nodes_associated_to_executor()
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (node) {
auto group_ptrs = node->get_callback_groups();
std::for_each(
group_ptrs.begin(), group_ptrs.end(),
[this, node](rclcpp::CallbackGroup::WeakPtr group_ptr)
rclcpp::node_interfaces::global_for_each_callback_group(
node.get(),
[this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr)
{
auto shared_group_ptr = group_ptr.lock();
if (shared_group_ptr && shared_group_ptr->automatically_add_to_executor_with_node() &&
!shared_group_ptr->get_associated_with_executor_atomic().load())
if (
shared_group_ptr->automatically_add_to_executor_with_node() &&
!shared_group_ptr->get_associated_with_executor_atomic().load())
{
this->add_callback_group_to_map(
shared_group_ptr,
Expand Down Expand Up @@ -271,18 +271,21 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
throw std::runtime_error("Node has already been added to an executor.");
}
std::lock_guard<std::mutex> guard{mutex_};
for (auto & weak_group : node_ptr->get_callback_groups()) {
auto group_ptr = weak_group.lock();
if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
rclcpp::node_interfaces::global_for_each_callback_group(
node_ptr.get(),
[this, node_ptr, notify](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
this->add_callback_group_to_map(
group_ptr,
node_ptr,
weak_groups_to_nodes_associated_with_executor_,
notify);
}
}
if (!group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
{
this->add_callback_group_to_map(
group_ptr,
node_ptr,
weak_groups_to_nodes_associated_with_executor_,
notify);
}
});

weak_nodes_.push_back(node_ptr);
}

Expand Down
28 changes: 15 additions & 13 deletions rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/executors/static_single_threaded_executor.hpp"
#include "rclcpp/node_interfaces/node_base.hpp"

using rclcpp::executors::StaticExecutorEntitiesCollector;

Expand Down Expand Up @@ -290,18 +291,21 @@ StaticExecutorEntitiesCollector::add_node(
if (has_executor.exchange(true)) {
throw std::runtime_error("Node has already been added to an executor.");
}
for (const auto & weak_group : node_ptr->get_callback_groups()) {
auto group_ptr = weak_group.lock();
if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
rclcpp::node_interfaces::global_for_each_callback_group(
node_ptr.get(),
[this, node_ptr, &is_new_node](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
is_new_node = (add_callback_group(
if (
!group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
{
is_new_node = (add_callback_group(
group_ptr,
node_ptr,
weak_groups_to_nodes_associated_with_executor_) ||
is_new_node);
}
}
}
});
weak_nodes_.push_back(node_ptr);
return is_new_node;
}
Expand Down Expand Up @@ -467,13 +471,11 @@ StaticExecutorEntitiesCollector::add_callback_groups_from_nodes_associated_to_ex
for (const auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (node) {
auto group_ptrs = node->get_callback_groups();
std::for_each(
group_ptrs.begin(), group_ptrs.end(),
[this, node](rclcpp::CallbackGroup::WeakPtr group_ptr)
rclcpp::node_interfaces::global_for_each_callback_group(
node.get(),
[this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr)
{
auto shared_group_ptr = group_ptr.lock();
if (shared_group_ptr && shared_group_ptr->automatically_add_to_executor_with_node() &&
if (shared_group_ptr->automatically_add_to_executor_with_node() &&
!shared_group_ptr->get_associated_with_executor_atomic().load())
{
add_callback_group(
Expand Down
6 changes: 6 additions & 0 deletions rclcpp/src/rclcpp/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,3 +605,9 @@ Node::get_node_options() const
{
return this->node_options_;
}

void Node::for_each_callback_group(
const node_interfaces::NodeBaseInterface::CallbackGroupFunction & func)
{
rclcpp::node_interfaces::global_for_each_callback_group(node_base_.get(), func);
}
69 changes: 60 additions & 9 deletions rclcpp/src/rclcpp/node_interfaces/node_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ using rclcpp::exceptions::throw_from_rcl_error;

using rclcpp::node_interfaces::NodeBase;

using rclcpp::node_interfaces::map_of_mutexes;

NodeBase::NodeBase(
const std::string & node_name,
const std::string & namespace_,
Expand All @@ -46,6 +48,9 @@ NodeBase::NodeBase(
associated_with_executor_(false),
notify_guard_condition_is_valid_(false)
{
// Generate a mutex for this instance of NodeBase
this->map_object.create_mutex_of_nodebase(this);
cottsay marked this conversation as resolved.
Show resolved Hide resolved

// Setup the guard condition that is notified when changes occur in the graph.
rcl_guard_condition_options_t guard_condition_options = rcl_guard_condition_get_default_options();
rcl_ret_t ret = rcl_guard_condition_init(
Expand Down Expand Up @@ -166,6 +171,8 @@ NodeBase::~NodeBase()
"failed to destroy guard condition: %s", rcl_get_error_string().str);
}
}

this->map_object.delete_mutex_of_nodebase(this);
}

const char *
Expand Down Expand Up @@ -221,12 +228,11 @@ NodeBase::create_callback_group(
rclcpp::CallbackGroupType group_type,
bool automatically_add_to_executor_with_node)
{
using rclcpp::CallbackGroup;
using rclcpp::CallbackGroupType;
auto group = CallbackGroup::SharedPtr(
new CallbackGroup(
group_type,
automatically_add_to_executor_with_node));
auto group = std::make_shared<rclcpp::CallbackGroup>(
group_type,
automatically_add_to_executor_with_node);
auto mutex_ptr = this->map_object.get_mutex_of_nodebase(this);
std::lock_guard<std::mutex> lock(*mutex_ptr);
callback_groups_.push_back(group);
return group;
}
Expand All @@ -240,14 +246,16 @@ NodeBase::get_default_callback_group()
bool
NodeBase::callback_group_in_node(rclcpp::CallbackGroup::SharedPtr group)
{
bool group_belongs_to_this_node = false;
auto mutex_ptr = this->map_object.get_mutex_of_nodebase(this);
std::lock_guard<std::mutex> lock(*mutex_ptr);

for (auto & weak_group : this->callback_groups_) {
auto cur_group = weak_group.lock();
if (cur_group && (cur_group == group)) {
group_belongs_to_this_node = true;
return true;
}
}
return group_belongs_to_this_node;
return false;
}

const std::vector<rclcpp::CallbackGroup::WeakPtr> &
Expand Down Expand Up @@ -310,3 +318,46 @@ NodeBase::resolve_topic_or_service_name(
allocator.deallocate(output_cstr, allocator.state);
return output;
}

map_of_mutexes NodeBase::map_object = map_of_mutexes();

void map_of_mutexes::create_mutex_of_nodebase(
const rclcpp::node_interfaces::NodeBaseInterface * nodebase)
{
std::lock_guard<std::mutex> guard(this->internal_mutex);
this->data.emplace(nodebase, std::make_shared<std::mutex>() );
}

std::shared_ptr<std::mutex> map_of_mutexes::get_mutex_of_nodebase(
const rclcpp::node_interfaces::NodeBaseInterface * nodebase)
{
std::lock_guard<std::mutex> guard(this->internal_mutex);
return this->data[nodebase];
}

void map_of_mutexes::delete_mutex_of_nodebase(
const rclcpp::node_interfaces::NodeBaseInterface * nodebase)
{
std::lock_guard<std::mutex> guard(this->internal_mutex);
this->data.erase(nodebase);
}

// For each callback group free function implementation
void rclcpp::node_interfaces::global_for_each_callback_group(
NodeBaseInterface * node_base_interface, const NodeBaseInterface::CallbackGroupFunction & func)
{
auto mutex_ptr = NodeBase::map_object.get_mutex_of_nodebase(node_base_interface);
if (mutex_ptr) {
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
std::lock_guard<std::mutex> lock(*mutex_ptr);

for (const auto & weak_group : node_base_interface->get_callback_groups()) {
auto group = weak_group.lock();
if (group) {
func(group);
}
}
} else {
auto logger = rclcpp::get_logger("ForEachCallbackGroup");
RCLCPP_ERROR(logger, "Mutex entry not found in the global map");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,38 +41,38 @@ struct NumberOfEntities
std::unique_ptr<NumberOfEntities> get_number_of_default_entities(rclcpp::Node::SharedPtr node)
{
auto number_of_entities = std::make_unique<NumberOfEntities>();
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
EXPECT_NE(nullptr, group);
if (!group || !group->can_be_taken_from().load()) {
return nullptr;
}
group->find_subscription_ptrs_if(
[&number_of_entities](rclcpp::SubscriptionBase::SharedPtr &)
{
number_of_entities->subscriptions++; return false;
});
group->find_timer_ptrs_if(
[&number_of_entities](rclcpp::TimerBase::SharedPtr &)
{
number_of_entities->timers++; return false;
});
group->find_service_ptrs_if(
[&number_of_entities](rclcpp::ServiceBase::SharedPtr &)
{
number_of_entities->services++; return false;
});
group->find_client_ptrs_if(
[&number_of_entities](rclcpp::ClientBase::SharedPtr &)
{
number_of_entities->clients++; return false;
});
group->find_waitable_ptrs_if(
[&number_of_entities](rclcpp::Waitable::SharedPtr &)
{
number_of_entities->waitables++; return false;
});
}
node->for_each_callback_group(
[&number_of_entities](rclcpp::CallbackGroup::SharedPtr group)
{
if (!group->can_be_taken_from().load()) {
return;
}
group->find_subscription_ptrs_if(
[&number_of_entities](rclcpp::SubscriptionBase::SharedPtr &)
{
number_of_entities->subscriptions++; return false;
});
group->find_timer_ptrs_if(
[&number_of_entities](rclcpp::TimerBase::SharedPtr &)
{
number_of_entities->timers++; return false;
});
group->find_service_ptrs_if(
[&number_of_entities](rclcpp::ServiceBase::SharedPtr &)
{
number_of_entities->services++; return false;
});
group->find_client_ptrs_if(
[&number_of_entities](rclcpp::ClientBase::SharedPtr &)
{
number_of_entities->clients++; return false;
});
group->find_waitable_ptrs_if(
[&number_of_entities](rclcpp::Waitable::SharedPtr &)
{
number_of_entities->waitables++; return false;
});
});

return number_of_entities;
}
Expand Down
Loading