Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Galactic: for_each_callback_group backport #1741

Merged
merged 12 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions rclcpp/include/rclcpp/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
#include "rclcpp/timer.hpp"
#include "rclcpp/visibility_control.hpp"


ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
namespace rclcpp
{

Expand Down Expand Up @@ -153,6 +154,17 @@ class Node : public std::enable_shared_from_this<Node>
const std::vector<rclcpp::CallbackGroup::WeakPtr> &
get_callback_groups() const;

/// Iterate over the callback groups in the node, calling the given function on each valid one.
/**
* This method is called in a thread-safe way, and also makes sure to only call the given
* function on those items that are still valid.
*
* \param[in] func The callback function to call on each valid callback group.
*/
RCLCPP_PUBLIC
void
for_each_callback_group(const node_interfaces::NodeBaseInterface::CallbackGroupFunction & func);

/// Create and return a Publisher.
/**
* The rclcpp::QoS has several convenient constructors, including a
Expand Down
33 changes: 33 additions & 0 deletions rclcpp/include/rclcpp/node_interfaces/node_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,42 @@
#ifndef RCLCPP__NODE_INTERFACES__NODE_BASE_HPP_
#define RCLCPP__NODE_INTERFACES__NODE_BASE_HPP_

#include <atomic>
#include <functional>
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>

#include "rcl/node.h"
#include "rclcpp/callback_group.hpp"
#include "rclcpp/context.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/visibility_control.hpp"


adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
namespace rclcpp
{
namespace node_interfaces
{

class map_of_mutexes;

/// Implementation of the NodeBase part of the Node API.
class NodeBase : public NodeBaseInterface
{
public:
RCLCPP_SMART_PTR_ALIASES_ONLY(NodeBase)

static std::unique_ptr<map_of_mutexes> map_object_ptr;
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
static bool map_init_flag;
static std::mutex map_init_flag_mutex;
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved

// Non virtual method
void for_each_callback_group(const CallbackGroupFunction & func);

RCLCPP_PUBLIC
NodeBase(
const std::string & node_name,
Expand Down Expand Up @@ -145,4 +160,22 @@ class NodeBase : public NodeBaseInterface
} // namespace node_interfaces
} // namespace rclcpp

// Class to hold the global map of mutexes
class rclcpp::node_interfaces::map_of_mutexes
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
{
public:
map_of_mutexes();
~map_of_mutexes();

// Methods need to be protected by internal mutex
void create_mutex_of_nodebase(const rclcpp::node_interfaces::NodeBase * nodebase);
std::shared_ptr<std::mutex>
get_mutex_of_nodebase(const rclcpp::node_interfaces::NodeBase * nodebase);
void delete_mutex_of_nodebase(const rclcpp::node_interfaces::NodeBase * nodebase);

// Members
std::unordered_map<const rclcpp::node_interfaces::NodeBase *, std::shared_ptr<std::mutex>> data;
std::mutex internal_mutex;
};

#endif // RCLCPP__NODE_INTERFACES__NODE_BASE_HPP_
2 changes: 2 additions & 0 deletions rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class NodeBaseInterface
public:
RCLCPP_SMART_PTR_ALIASES_ONLY(NodeBaseInterface)

using CallbackGroupFunction = std::function<void (rclcpp::CallbackGroup::SharedPtr)>;

RCLCPP_PUBLIC
virtual
~NodeBaseInterface() = default;
Expand Down
43 changes: 23 additions & 20 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "rclcpp/node.hpp"
#include "rclcpp/scope_exit.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/node_interfaces/node_base.hpp"

#include "rcutils/logging_macros.h"

Expand Down Expand Up @@ -190,18 +191,17 @@ Executor::add_callback_groups_from_nodes_associated_to_executor()
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (node) {
auto group_ptrs = node->get_callback_groups();
std::for_each(
group_ptrs.begin(), group_ptrs.end(),
[this, node](rclcpp::CallbackGroup::WeakPtr group_ptr)
auto node_base = std::dynamic_pointer_cast<rclcpp::node_interfaces::NodeBase>(node);
node_base->for_each_callback_group(
[this, node_base](rclcpp::CallbackGroup::SharedPtr shared_group_ptr)
{
auto shared_group_ptr = group_ptr.lock();
if (shared_group_ptr && shared_group_ptr->automatically_add_to_executor_with_node() &&
!shared_group_ptr->get_associated_with_executor_atomic().load())
if (
shared_group_ptr->automatically_add_to_executor_with_node() &&
!shared_group_ptr->get_associated_with_executor_atomic().load())
{
this->add_callback_group_to_map(
shared_group_ptr,
node,
node_base,
weak_groups_to_nodes_associated_with_executor_,
true);
}
Expand Down Expand Up @@ -271,19 +271,22 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
throw std::runtime_error("Node has already been added to an executor.");
}
std::lock_guard<std::mutex> guard{mutex_};
for (auto & weak_group : node_ptr->get_callback_groups()) {
auto group_ptr = weak_group.lock();
if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
auto node_base = std::dynamic_pointer_cast<rclcpp::node_interfaces::NodeBase>(node_ptr);
node_base->for_each_callback_group(
[this, node_base, notify](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
this->add_callback_group_to_map(
group_ptr,
node_ptr,
weak_groups_to_nodes_associated_with_executor_,
notify);
}
}
weak_nodes_.push_back(node_ptr);
if (!group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
{
this->add_callback_group_to_map(
group_ptr,
node_base,
weak_groups_to_nodes_associated_with_executor_,
notify);
}
});

weak_nodes_.push_back(node_base);
}

void
Expand Down
34 changes: 18 additions & 16 deletions rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/executors/static_single_threaded_executor.hpp"
#include "rclcpp/node_interfaces/node_base.hpp"

using rclcpp::executors::StaticExecutorEntitiesCollector;

Expand Down Expand Up @@ -290,19 +291,22 @@ StaticExecutorEntitiesCollector::add_node(
if (has_executor.exchange(true)) {
throw std::runtime_error("Node has already been added to an executor.");
}
for (const auto & weak_group : node_ptr->get_callback_groups()) {
auto group_ptr = weak_group.lock();
if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
auto node_base = std::dynamic_pointer_cast<rclcpp::node_interfaces::NodeBase>(node_ptr);
node_base->for_each_callback_group(
[this, node_base, &is_new_node](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
is_new_node = (add_callback_group(
if (
!group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
{
is_new_node = (add_callback_group(
group_ptr,
node_ptr,
node_base,
weak_groups_to_nodes_associated_with_executor_) ||
is_new_node);
}
}
weak_nodes_.push_back(node_ptr);
}
});
weak_nodes_.push_back(node_base);
return is_new_node;
}

Expand Down Expand Up @@ -467,18 +471,16 @@ StaticExecutorEntitiesCollector::add_callback_groups_from_nodes_associated_to_ex
for (const auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (node) {
auto group_ptrs = node->get_callback_groups();
std::for_each(
group_ptrs.begin(), group_ptrs.end(),
[this, node](rclcpp::CallbackGroup::WeakPtr group_ptr)
auto node_base = std::dynamic_pointer_cast<rclcpp::node_interfaces::NodeBase>(node);
node_base->for_each_callback_group(
[this, node_base](rclcpp::CallbackGroup::SharedPtr shared_group_ptr)
{
auto shared_group_ptr = group_ptr.lock();
if (shared_group_ptr && shared_group_ptr->automatically_add_to_executor_with_node() &&
if (shared_group_ptr->automatically_add_to_executor_with_node() &&
!shared_group_ptr->get_associated_with_executor_atomic().load())
{
add_callback_group(
shared_group_ptr,
node,
node_base,
weak_groups_to_nodes_associated_with_executor_);
}
});
Expand Down
7 changes: 7 additions & 0 deletions rclcpp/src/rclcpp/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,3 +605,10 @@ Node::get_node_options() const
{
return this->node_options_;
}

void Node::for_each_callback_group(
const node_interfaces::NodeBaseInterface::CallbackGroupFunction & func)
{
auto node_base = std::dynamic_pointer_cast<rclcpp::node_interfaces::NodeBase>(node_base_);
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
node_base->for_each_callback_group(func);
}
80 changes: 71 additions & 9 deletions rclcpp/src/rclcpp/node_interfaces/node_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ NodeBase::NodeBase(
associated_with_executor_(false),
notify_guard_condition_is_valid_(false)
{
{
std::lock_guard<std::mutex> lock(this->map_init_flag_mutex);
// Initialize map_of_mutex object if this is the first NodeBase instance
if (!this->map_init_flag) {
this->map_object_ptr = std::make_unique<map_of_mutexes>();
this->map_init_flag = true;
}
}

// Generate a mutex for this instance of NodeBase
this->map_object_ptr->create_mutex_of_nodebase(this);

// Setup the guard condition that is notified when changes occur in the graph.
rcl_guard_condition_options_t guard_condition_options = rcl_guard_condition_get_default_options();
rcl_ret_t ret = rcl_guard_condition_init(
Expand Down Expand Up @@ -166,6 +178,8 @@ NodeBase::~NodeBase()
"failed to destroy guard condition: %s", rcl_get_error_string().str);
}
}

this->map_object_ptr->delete_mutex_of_nodebase(this);
}

const char *
Expand Down Expand Up @@ -221,12 +235,11 @@ NodeBase::create_callback_group(
rclcpp::CallbackGroupType group_type,
bool automatically_add_to_executor_with_node)
{
using rclcpp::CallbackGroup;
using rclcpp::CallbackGroupType;
auto group = CallbackGroup::SharedPtr(
new CallbackGroup(
group_type,
automatically_add_to_executor_with_node));
auto group = std::make_shared<rclcpp::CallbackGroup>(
group_type,
automatically_add_to_executor_with_node);
auto mutex_ptr = this->map_object_ptr->get_mutex_of_nodebase(this);
std::lock_guard<std::mutex> lock(*mutex_ptr);
callback_groups_.push_back(group);
return group;
}
Expand All @@ -240,14 +253,16 @@ NodeBase::get_default_callback_group()
bool
NodeBase::callback_group_in_node(rclcpp::CallbackGroup::SharedPtr group)
{
bool group_belongs_to_this_node = false;
auto mutex_ptr = this->map_object_ptr->get_mutex_of_nodebase(this);
std::lock_guard<std::mutex> lock(*mutex_ptr);

for (auto & weak_group : this->callback_groups_) {
auto cur_group = weak_group.lock();
if (cur_group && (cur_group == group)) {
group_belongs_to_this_node = true;
return true;
}
}
return group_belongs_to_this_node;
return false;
}

const std::vector<rclcpp::CallbackGroup::WeakPtr> &
Expand Down Expand Up @@ -310,3 +325,50 @@ NodeBase::resolve_topic_or_service_name(
allocator.deallocate(output_cstr, allocator.state);
return output;
}

std::unique_ptr<rclcpp::node_interfaces::map_of_mutexes> NodeBase::map_object_ptr;
bool NodeBase::map_init_flag(false);
std::mutex NodeBase::map_init_flag_mutex;

rclcpp::node_interfaces::map_of_mutexes::map_of_mutexes()
{
}
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved

void rclcpp::node_interfaces::map_of_mutexes::create_mutex_of_nodebase(
const rclcpp::node_interfaces::NodeBase * nodebase)
{
std::lock_guard<std::mutex> guard(this->internal_mutex);
this->data.emplace(nodebase, std::make_shared<std::mutex>() );
}

std::shared_ptr<std::mutex> rclcpp::node_interfaces::map_of_mutexes::get_mutex_of_nodebase(
const rclcpp::node_interfaces::NodeBase * nodebase)
{
std::lock_guard<std::mutex> guard(this->internal_mutex);
return this->data[nodebase];
}

void rclcpp::node_interfaces::map_of_mutexes::delete_mutex_of_nodebase(
const rclcpp::node_interfaces::NodeBase * nodebase)
{
std::lock_guard<std::mutex> guard(this->internal_mutex);
this->data.erase(nodebase);
}

rclcpp::node_interfaces::map_of_mutexes::~map_of_mutexes()
{
}
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved

// For each callback group implementation
void NodeBase::for_each_callback_group(const CallbackGroupFunction & func)
{
auto mutex_ptr = this->map_object_ptr->get_mutex_of_nodebase(this);
std::lock_guard<std::mutex> lock(*mutex_ptr);

for (rclcpp::CallbackGroup::WeakPtr & weak_group : this->callback_groups_) {
rclcpp::CallbackGroup::SharedPtr group = weak_group.lock();
if (group) {
func(group);
}
}
}
Loading