Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in callback_groups_for_each. #1723

Merged
merged 2 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions rclcpp/include/rclcpp/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,16 @@ class Node : public std::enable_shared_from_this<Node>
rclcpp::CallbackGroupType group_type,
bool automatically_add_to_executor_with_node = true);

/// Return the list of callback groups in the node.
/// Iterate over the callback groups in the node, calling the given function on each valid one.
/**
* This method is called in a thread-safe way, and also makes sure to only call the given
* function on those items that are still valid.
*
* \param[in] func The callback function to call on each valid callback group.
*/
RCLCPP_PUBLIC
const std::vector<rclcpp::CallbackGroup::WeakPtr> &
get_callback_groups() const;
void
for_each_callback_group(const node_interfaces::NodeBaseInterface::CallbackGroupFunction & func);

/// Create and return a Publisher.
/**
Expand Down
15 changes: 13 additions & 2 deletions rclcpp/include/rclcpp/node_interfaces/node_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
#ifndef RCLCPP__NODE_INTERFACES__NODE_BASE_HPP_
#define RCLCPP__NODE_INTERFACES__NODE_BASE_HPP_

#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

#include "rcl/node.h"
#include "rclcpp/callback_group.hpp"
#include "rclcpp/context.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
Expand Down Expand Up @@ -95,9 +98,16 @@ class NodeBase : public NodeBaseInterface
bool
callback_group_in_node(rclcpp::CallbackGroup::SharedPtr group) override;

/// Iterate over the stored callback groups, calling the given function on each valid one.
/**
* This method is called in a thread-safe way, and also makes sure to only call the given
* function on those items that are still valid.
*
* \param[in] func The callback function to call on each valid callback group.
*/
RCLCPP_PUBLIC
const std::vector<rclcpp::CallbackGroup::WeakPtr> &
get_callback_groups() const override;
void
for_each_callback_group(const CallbackGroupFunction & func) override;

RCLCPP_PUBLIC
std::atomic_bool &
Expand Down Expand Up @@ -132,6 +142,7 @@ class NodeBase : public NodeBaseInterface
std::shared_ptr<rcl_node_t> node_handle_;

rclcpp::CallbackGroup::SharedPtr default_callback_group_;
std::mutex callback_groups_mutex_;
std::vector<rclcpp::CallbackGroup::WeakPtr> callback_groups_;

std::atomic_bool associated_with_executor_;
Expand Down
14 changes: 11 additions & 3 deletions rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,19 @@ class NodeBaseInterface
bool
callback_group_in_node(rclcpp::CallbackGroup::SharedPtr group) = 0;

/// Return list of callback groups associated with this node.
using CallbackGroupFunction = std::function<void (rclcpp::CallbackGroup::SharedPtr)>;

/// Iterate over the stored callback groups, calling the given function on each valid one.
/**
* This method is called in a thread-safe way, and also makes sure to only call the given
* function on those items that are still valid.
*
* \param[in] func The callback function to call on each valid callback group.
*/
RCLCPP_PUBLIC
clalancette marked this conversation as resolved.
Show resolved Hide resolved
virtual
const std::vector<rclcpp::CallbackGroup::WeakPtr> &
get_callback_groups() const = 0;
void
for_each_callback_group(const CallbackGroupFunction & func) = 0;

/// Return the atomic bool which is used to ensure only one executor is used.
RCLCPP_PUBLIC
Expand Down
36 changes: 18 additions & 18 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,12 @@ Executor::add_callback_groups_from_nodes_associated_to_executor()
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (node) {
auto group_ptrs = node->get_callback_groups();
std::for_each(
group_ptrs.begin(), group_ptrs.end(),
[this, node](rclcpp::CallbackGroup::WeakPtr group_ptr)
node->for_each_callback_group(
[this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr)
{
auto shared_group_ptr = group_ptr.lock();
if (shared_group_ptr && shared_group_ptr->automatically_add_to_executor_with_node() &&
!shared_group_ptr->get_associated_with_executor_atomic().load())
if (
shared_group_ptr->automatically_add_to_executor_with_node() &&
!shared_group_ptr->get_associated_with_executor_atomic().load())
{
this->add_callback_group_to_map(
shared_group_ptr,
Expand Down Expand Up @@ -273,18 +271,20 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
"' has already been added to an executor.");
}
std::lock_guard<std::mutex> guard{mutex_};
for (auto & weak_group : node_ptr->get_callback_groups()) {
auto group_ptr = weak_group.lock();
if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
node_ptr->for_each_callback_group(
[this, node_ptr, notify](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
this->add_callback_group_to_map(
group_ptr,
node_ptr,
weak_groups_to_nodes_associated_with_executor_,
notify);
}
}
if (!group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
{
clalancette marked this conversation as resolved.
Show resolved Hide resolved
this->add_callback_group_to_map(
group_ptr,
node_ptr,
weak_groups_to_nodes_associated_with_executor_,
notify);
}
});

weak_nodes_.push_back(node_ptr);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,18 +290,20 @@ StaticExecutorEntitiesCollector::add_node(
if (has_executor.exchange(true)) {
throw std::runtime_error("Node has already been added to an executor.");
}
for (const auto & weak_group : node_ptr->get_callback_groups()) {
auto group_ptr = weak_group.lock();
if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
node_ptr->for_each_callback_group(
[this, node_ptr, &is_new_node](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
is_new_node = (add_callback_group(
if (
!group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
{
is_new_node = (add_callback_group(
group_ptr,
node_ptr,
weak_groups_to_nodes_associated_with_executor_) ||
is_new_node);
}
}
}
});
weak_nodes_.push_back(node_ptr);
return is_new_node;
}
Expand Down Expand Up @@ -467,13 +469,10 @@ StaticExecutorEntitiesCollector::add_callback_groups_from_nodes_associated_to_ex
for (const auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (node) {
auto group_ptrs = node->get_callback_groups();
std::for_each(
group_ptrs.begin(), group_ptrs.end(),
[this, node](rclcpp::CallbackGroup::WeakPtr group_ptr)
node->for_each_callback_group(
[this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr)
{
auto shared_group_ptr = group_ptr.lock();
if (shared_group_ptr && shared_group_ptr->automatically_add_to_executor_with_node() &&
if (shared_group_ptr->automatically_add_to_executor_with_node() &&
!shared_group_ptr->get_associated_with_executor_atomic().load())
{
add_callback_group(
Expand Down
7 changes: 4 additions & 3 deletions rclcpp/src/rclcpp/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,10 +482,11 @@ Node::get_subscriptions_info_by_topic(const std::string & topic_name, bool no_ma
return node_graph_->get_subscriptions_info_by_topic(topic_name, no_mangle);
}

const std::vector<rclcpp::CallbackGroup::WeakPtr> &
Node::get_callback_groups() const
void
Node::for_each_callback_group(
const node_interfaces::NodeBaseInterface::CallbackGroupFunction & func)
{
return node_base_->get_callback_groups();
node_base_->for_each_callback_group(func);
}

rclcpp::Event::SharedPtr
Expand Down
27 changes: 15 additions & 12 deletions rclcpp/src/rclcpp/node_interfaces/node_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,10 @@ NodeBase::create_callback_group(
rclcpp::CallbackGroupType group_type,
bool automatically_add_to_executor_with_node)
{
using rclcpp::CallbackGroup;
using rclcpp::CallbackGroupType;
auto group = CallbackGroup::SharedPtr(
new CallbackGroup(
group_type,
automatically_add_to_executor_with_node));
auto group = std::make_shared<rclcpp::CallbackGroup>(
group_type,
automatically_add_to_executor_with_node);
std::lock_guard<std::mutex> lock(callback_groups_mutex_);
callback_groups_.push_back(group);
return group;
}
Expand All @@ -240,20 +238,25 @@ NodeBase::get_default_callback_group()
bool
NodeBase::callback_group_in_node(rclcpp::CallbackGroup::SharedPtr group)
{
bool group_belongs_to_this_node = false;
std::lock_guard<std::mutex> lock(callback_groups_mutex_);
for (auto & weak_group : this->callback_groups_) {
auto cur_group = weak_group.lock();
if (cur_group && (cur_group == group)) {
group_belongs_to_this_node = true;
return true;
}
}
return group_belongs_to_this_node;
return false;
}

const std::vector<rclcpp::CallbackGroup::WeakPtr> &
NodeBase::get_callback_groups() const
void NodeBase::for_each_callback_group(const CallbackGroupFunction & func)
{
return callback_groups_;
std::lock_guard<std::mutex> lock(callback_groups_mutex_);
for (rclcpp::CallbackGroup::WeakPtr & weak_group : this->callback_groups_) {
rclcpp::CallbackGroup::SharedPtr group = weak_group.lock();
if (group) {
func(group);
}
}
}

std::atomic_bool &
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,38 +41,38 @@ struct NumberOfEntities
std::unique_ptr<NumberOfEntities> get_number_of_default_entities(rclcpp::Node::SharedPtr node)
{
auto number_of_entities = std::make_unique<NumberOfEntities>();
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
EXPECT_NE(nullptr, group);
if (!group || !group->can_be_taken_from().load()) {
return nullptr;
}
group->find_subscription_ptrs_if(
[&number_of_entities](rclcpp::SubscriptionBase::SharedPtr &)
{
number_of_entities->subscriptions++; return false;
});
group->find_timer_ptrs_if(
[&number_of_entities](rclcpp::TimerBase::SharedPtr &)
{
number_of_entities->timers++; return false;
});
group->find_service_ptrs_if(
[&number_of_entities](rclcpp::ServiceBase::SharedPtr &)
{
number_of_entities->services++; return false;
});
group->find_client_ptrs_if(
[&number_of_entities](rclcpp::ClientBase::SharedPtr &)
{
number_of_entities->clients++; return false;
});
group->find_waitable_ptrs_if(
[&number_of_entities](rclcpp::Waitable::SharedPtr &)
{
number_of_entities->waitables++; return false;
});
}
node->for_each_callback_group(
[&number_of_entities](rclcpp::CallbackGroup::SharedPtr group)
{
if (!group->can_be_taken_from().load()) {
return;
}
group->find_subscription_ptrs_if(
[&number_of_entities](rclcpp::SubscriptionBase::SharedPtr &)
{
number_of_entities->subscriptions++; return false;
});
group->find_timer_ptrs_if(
[&number_of_entities](rclcpp::TimerBase::SharedPtr &)
{
number_of_entities->timers++; return false;
});
group->find_service_ptrs_if(
[&number_of_entities](rclcpp::ServiceBase::SharedPtr &)
{
number_of_entities->services++; return false;
});
group->find_client_ptrs_if(
[&number_of_entities](rclcpp::ClientBase::SharedPtr &)
{
number_of_entities->clients++; return false;
});
group->find_waitable_ptrs_if(
[&number_of_entities](rclcpp::Waitable::SharedPtr &)
{
number_of_entities->waitables++; return false;
});
});

return number_of_entities;
}
Expand Down
Loading