Skip to content

Commit

Permalink
Added for_each_callback non-virtual method, static members to node_ba…
Browse files Browse the repository at this point in the history
…se.hpp, updated the

for_each_callback_group wrapper method in lifecycle_node and node.cpp,
added a thread safe global map of mutexes in node_base.

Implemented the suggestions in preliminary review.
  • Loading branch information
adityapande-1995 committed Aug 4, 2021
1 parent 07f6b64 commit 6971111
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 26 deletions.
12 changes: 12 additions & 0 deletions rclcpp/include/rclcpp/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
#include "rclcpp/timer.hpp"
#include "rclcpp/visibility_control.hpp"


namespace rclcpp
{

Expand Down Expand Up @@ -153,6 +154,17 @@ class Node : public std::enable_shared_from_this<Node>
const std::vector<rclcpp::CallbackGroup::WeakPtr> &
get_callback_groups() const;

/// Iterate over the callback groups in the node, calling the given function on each valid one.
/**
* This method is called in a thread-safe way, and also makes sure to only call the given
* function on those items that are still valid.
*
* \param[in] func The callback function to call on each valid callback group.
*/
RCLCPP_PUBLIC
void
for_each_callback_group(const node_interfaces::NodeBaseInterface::CallbackGroupFunction & func);

/// Create and return a Publisher.
/**
* The rclcpp::QoS has several convenient constructors, including a
Expand Down
32 changes: 32 additions & 0 deletions rclcpp/include/rclcpp/node_interfaces/node_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,42 @@
#ifndef RCLCPP__NODE_INTERFACES__NODE_BASE_HPP_
#define RCLCPP__NODE_INTERFACES__NODE_BASE_HPP_

#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>

#include "rcl/node.h"
#include "rclcpp/callback_group.hpp"
#include "rclcpp/context.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/visibility_control.hpp"


namespace rclcpp
{
namespace node_interfaces
{

class map_of_mutexes;

/// Implementation of the NodeBase part of the Node API.
class NodeBase : public NodeBaseInterface
{
public:
RCLCPP_SMART_PTR_ALIASES_ONLY(NodeBase)

static std::unique_ptr<map_of_mutexes> map_object_ptr;
static bool map_init_flag;
static std::mutex map_init_flag_mutex;

// Non virtual method
void for_each_callback_group(const CallbackGroupFunction & func);

RCLCPP_PUBLIC
NodeBase(
const std::string & node_name,
Expand Down Expand Up @@ -145,4 +160,21 @@ class NodeBase : public NodeBaseInterface
} // namespace node_interfaces
} // namespace rclcpp

// Class to hold the global map of mutexes
class rclcpp::node_interfaces::map_of_mutexes
{
public:
map_of_mutexes();
~map_of_mutexes();

// Methods need to be protected by internal mutex
void create_mutex_of_nodebase(const rclcpp::node_interfaces::NodeBase* nodebase);
std::shared_ptr<std::mutex> get_mutex_of_nodebase(const rclcpp::node_interfaces::NodeBase* nodebase);
void delete_mutex_of_nodebase(const rclcpp::node_interfaces::NodeBase* nodebase);

// Members
std::unordered_map<const rclcpp::node_interfaces::NodeBase*, std::shared_ptr<std::mutex>> data;
std::mutex internal_mutex;
};

#endif // RCLCPP__NODE_INTERFACES__NODE_BASE_HPP_
2 changes: 2 additions & 0 deletions rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class NodeBaseInterface
public:
RCLCPP_SMART_PTR_ALIASES_ONLY(NodeBaseInterface)

using CallbackGroupFunction = std::function<void (rclcpp::CallbackGroup::SharedPtr)>;

RCLCPP_PUBLIC
virtual
~NodeBaseInterface() = default;
Expand Down
39 changes: 21 additions & 18 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "rclcpp/node.hpp"
#include "rclcpp/scope_exit.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/node_interfaces/node_base.hpp"

#include "rcutils/logging_macros.h"

Expand Down Expand Up @@ -190,14 +191,13 @@ Executor::add_callback_groups_from_nodes_associated_to_executor()
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (node) {
auto group_ptrs = node->get_callback_groups();
std::for_each(
group_ptrs.begin(), group_ptrs.end(),
[this, node](rclcpp::CallbackGroup::WeakPtr group_ptr)
auto node_base = std::dynamic_pointer_cast<rclcpp::node_interfaces::NodeBase>(node);
node_base->for_each_callback_group(
[this,node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr)
{
auto shared_group_ptr = group_ptr.lock();
if (shared_group_ptr && shared_group_ptr->automatically_add_to_executor_with_node() &&
!shared_group_ptr->get_associated_with_executor_atomic().load())
if(
shared_group_ptr->automatically_add_to_executor_with_node() &&
!shared_group_ptr->get_associated_with_executor_atomic().load())
{
this->add_callback_group_to_map(
shared_group_ptr,
Expand Down Expand Up @@ -271,18 +271,21 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
throw std::runtime_error("Node has already been added to an executor.");
}
std::lock_guard<std::mutex> guard{mutex_};
for (auto & weak_group : node_ptr->get_callback_groups()) {
auto group_ptr = weak_group.lock();
if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
auto node_base = std::dynamic_pointer_cast<rclcpp::node_interfaces::NodeBase>(node_ptr);
node_base->for_each_callback_group(
[this, node_ptr, notify](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
this->add_callback_group_to_map(
group_ptr,
node_ptr,
weak_groups_to_nodes_associated_with_executor_,
notify);
}
}
if (!group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
{
this->add_callback_group_to_map(
group_ptr,
node_ptr,
weak_groups_to_nodes_associated_with_executor_,
notify);
}
});

weak_nodes_.push_back(node_ptr);
}

Expand Down
6 changes: 6 additions & 0 deletions rclcpp/src/rclcpp/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,3 +605,9 @@ Node::get_node_options() const
{
return this->node_options_;
}

void Node::for_each_callback_group(const node_interfaces::NodeBaseInterface::CallbackGroupFunction & func)
{
auto node_base = std::dynamic_pointer_cast<rclcpp::node_interfaces::NodeBase>(node_base_);
node_base->for_each_callback_group(func);
}
77 changes: 69 additions & 8 deletions rclcpp/src/rclcpp/node_interfaces/node_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ NodeBase::NodeBase(
associated_with_executor_(false),
notify_guard_condition_is_valid_(false)
{
{
std::lock_guard<std::mutex> lock(this->map_init_flag_mutex);
// Initialize map_of_mutex object if this is the first NodeBase instance
if (!this->map_init_flag) {
this->map_object_ptr = std::make_unique<map_of_mutexes>();
this->map_init_flag = true;
}
}

// Generate a mutex for this instance of NodeBase
this->map_object_ptr->create_mutex_of_nodebase(this);

// Setup the guard condition that is notified when changes occur in the graph.
rcl_guard_condition_options_t guard_condition_options = rcl_guard_condition_get_default_options();
rcl_ret_t ret = rcl_guard_condition_init(
Expand Down Expand Up @@ -166,6 +178,8 @@ NodeBase::~NodeBase()
"failed to destroy guard condition: %s", rcl_get_error_string().str);
}
}

this->map_object_ptr->delete_mutex_of_nodebase(this);
}

const char *
Expand Down Expand Up @@ -221,12 +235,11 @@ NodeBase::create_callback_group(
rclcpp::CallbackGroupType group_type,
bool automatically_add_to_executor_with_node)
{
using rclcpp::CallbackGroup;
using rclcpp::CallbackGroupType;
auto group = CallbackGroup::SharedPtr(
new CallbackGroup(
auto group = std::make_shared<rclcpp::CallbackGroup>(
group_type,
automatically_add_to_executor_with_node));
automatically_add_to_executor_with_node);
auto mutex_ptr = this->map_object_ptr->get_mutex_of_nodebase(this);
std::lock_guard<std::mutex> lock(*mutex_ptr);
callback_groups_.push_back(group);
return group;
}
Expand All @@ -240,14 +253,16 @@ NodeBase::get_default_callback_group()
bool
NodeBase::callback_group_in_node(rclcpp::CallbackGroup::SharedPtr group)
{
bool group_belongs_to_this_node = false;
auto mutex_ptr = this->map_object_ptr->get_mutex_of_nodebase(this);
std::lock_guard<std::mutex> lock(*mutex_ptr);

for (auto & weak_group : this->callback_groups_) {
auto cur_group = weak_group.lock();
if (cur_group && (cur_group == group)) {
group_belongs_to_this_node = true;
return true;
}
}
return group_belongs_to_this_node;
return false;
}

const std::vector<rclcpp::CallbackGroup::WeakPtr> &
Expand Down Expand Up @@ -310,3 +325,49 @@ NodeBase::resolve_topic_or_service_name(
allocator.deallocate(output_cstr, allocator.state);
return output;
}

std::unique_ptr<rclcpp::node_interfaces::map_of_mutexes> NodeBase::map_object_ptr;
bool NodeBase::map_init_flag(false);
std::mutex NodeBase::map_init_flag_mutex;

rclcpp::node_interfaces::map_of_mutexes::map_of_mutexes()
{
}

void rclcpp::node_interfaces::map_of_mutexes::create_mutex_of_nodebase(const rclcpp::node_interfaces::NodeBase* nodebase)
{
std::lock_guard<std::mutex> guard(this->internal_mutex);
this->data.emplace(nodebase, std::make_shared<std::mutex>() );
}

std::shared_ptr<std::mutex> rclcpp::node_interfaces::map_of_mutexes::get_mutex_of_nodebase(const rclcpp::node_interfaces::NodeBase* nodebase)
{
std::lock_guard<std::mutex> guard(this->internal_mutex);
return this->data[nodebase];
}

void rclcpp::node_interfaces::map_of_mutexes::delete_mutex_of_nodebase(const rclcpp::node_interfaces::NodeBase* nodebase)
{
std::lock_guard<std::mutex> guard(this->internal_mutex);
this->data.erase(nodebase);
}

rclcpp::node_interfaces::map_of_mutexes::~map_of_mutexes()
{
}

// For each callback group implementation
void NodeBase::for_each_callback_group(const CallbackGroupFunction & func)
{
auto mutex_ptr = this->map_object_ptr->get_mutex_of_nodebase(this);
std::lock_guard<std::mutex> lock(*mutex_ptr);

for (rclcpp::CallbackGroup::WeakPtr & weak_group : this->callback_groups_) {
rclcpp::CallbackGroup::SharedPtr group = weak_group.lock();
if (group){
func(group);
}
}

}

13 changes: 13 additions & 0 deletions rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
#include "rclcpp_lifecycle/transition.hpp"
#include "rclcpp_lifecycle/visibility_control.h"


namespace rclcpp_lifecycle
{

Expand Down Expand Up @@ -200,6 +201,18 @@ class LifecycleNode : public node_interfaces::LifecycleNodeInterface,
const std::vector<rclcpp::CallbackGroup::WeakPtr> &
get_callback_groups() const;

/// Iterate over the callback groups in the node, calling the given function on each valid one.
/**
* This method is called in a thread-safe way, and also makes sure to only call the given
* function on those items that are still valid.
*
* \param[in] func The callback function to call on each valid callback group.
*/
RCLCPP_LIFECYCLE_PUBLIC
void
for_each_callback_group(
const rclcpp::node_interfaces::NodeBaseInterface::CallbackGroupFunction & func);

/// Create and return a Publisher.
/**
* \param[in] topic_name The topic for this publisher to publish on.
Expand Down
8 changes: 8 additions & 0 deletions rclcpp_lifecycle/src/lifecycle_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -651,4 +651,12 @@ LifecycleNode::add_timer_handle(std::shared_ptr<rclcpp::TimerBase> timer)
impl_->add_timer_handle(timer);
}

void
LifecycleNode::for_each_callback_group(
const rclcpp::node_interfaces::NodeBaseInterface::CallbackGroupFunction & func)
{
auto node_base = std::dynamic_pointer_cast<rclcpp::node_interfaces::NodeBase>(node_base_);
node_base->for_each_callback_group(func);
}

} // namespace rclcpp_lifecycle

0 comments on commit 6971111

Please sign in to comment.