Skip to content

Commit

Permalink
Merge pull request ros2#14 from alsora/asoragna/clean-entities-collector
Browse files Browse the repository at this point in the history
clean entities collector
  • Loading branch information
iRobot ROS authored Oct 14, 2020
2 parents 419054b + 05ca12f commit 2898804
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ class EventsExecutorEntitiesCollector final : public rclcpp::Waitable
void
set_entities_callbacks(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node);

void
unset_entities_callbacks(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node);

/// List of weak nodes registered in the events executor
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_;

Expand Down
5 changes: 2 additions & 3 deletions rclcpp/src/rclcpp/executors/events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,9 @@ EventsExecutor::remove_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
{
(void)notify;
entities_collector_->remove_node(node_ptr);

std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
has_executor.store(false);
// Remove node from entities collector
entities_collector_->remove_node(node_ptr);
}

void
Expand Down
178 changes: 102 additions & 76 deletions rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ EventsExecutorEntitiesCollector::~EventsExecutorEntitiesCollector()
if (node) {
std::atomic_bool & has_executor = node->get_associated_with_executor_atomic();
has_executor.store(false);
this->unset_entities_callbacks(node);
}
}

// Make sure that the list is empty
weak_nodes_.clear();
}

Expand All @@ -59,7 +62,7 @@ void
EventsExecutorEntitiesCollector::add_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
{
// If the node already has an executor
// Check if the node already has an executor and if not, set this to true
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();

if (has_executor.exchange(true)) {
Expand All @@ -69,109 +72,55 @@ EventsExecutorEntitiesCollector::add_node(
weak_nodes_.push_back(node_ptr);

set_entities_callbacks(node_ptr);

// Set node's guard condition callback, so if new entities are added while
// spinning we can set their callback.
rcl_ret_t ret = rcl_guard_condition_set_events_executor_callback(
associated_executor_,
&EventsExecutor::push_event,
this,
node_ptr->get_notify_guard_condition(),
false /* Discard previous events */);

if (ret != RCL_RET_OK) {
throw std::runtime_error("Couldn't set node guard condition callback");
}
}

void
EventsExecutorEntitiesCollector::remove_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
{
// Check if this node is currently stored here
auto node_it = weak_nodes_.begin();

// Here we unset the node entities callback.
while (node_it != weak_nodes_.end()) {
bool matched = (node_it->lock() == node_ptr);
if (matched) {
// Node found: unset its entities callbacks
rcl_ret_t ret = rcl_guard_condition_set_events_executor_callback(
nullptr, nullptr, nullptr,
node_ptr->get_notify_guard_condition(),
false);

if (ret != RCL_RET_OK) {
throw std::runtime_error(std::string("Couldn't set guard condition callback"));
}

// Unset entities callbacks
for (auto & weak_group : node_ptr->get_callback_groups()) {
auto group = weak_group.lock();
if (!group || !group->can_be_taken_from().load()) {
continue;
}
group->find_timer_ptrs_if(
[this](const rclcpp::TimerBase::SharedPtr & timer) {
if (timer) {
timers_manager_->remove_timer(timer);
}
return false;
});
group->find_subscription_ptrs_if(
[this](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
if (subscription) {
subscription->set_events_executor_callback(nullptr, nullptr);
}
return false;
});
group->find_service_ptrs_if(
[this](const rclcpp::ServiceBase::SharedPtr & service) {
if (service) {
service->set_events_executor_callback(nullptr, nullptr);
}
return false;
});
group->find_client_ptrs_if(
[this](const rclcpp::ClientBase::SharedPtr & client) {
if (client) {
client->set_events_executor_callback(nullptr, nullptr);
}
return false;
});
group->find_waitable_ptrs_if(
[this](const rclcpp::Waitable::SharedPtr & waitable) {
if (waitable) {
waitable->set_events_executor_callback(nullptr, nullptr);
}
return false;
});
}

weak_nodes_.erase(node_it);
return;
} else {
++node_it;
if (node_it->lock() == node_ptr) {
break;
}
}
if (node_it == weak_nodes_.end()) {
// The node is not stored here, so nothing to do
return;
}

// Set that the node does not have an executor anymore
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
has_executor.store(false);

weak_nodes_.erase(node_it);

unset_entities_callbacks(node_ptr);
}

void
EventsExecutorEntitiesCollector::set_entities_callbacks(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
{
// Check in all the callback groups
// Set event callbacks for all entities in this node
// by searching them in all callback groups
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
if (!group || !group->can_be_taken_from().load()) {
continue;
}

// Timers are handled by the timers manager
group->find_timer_ptrs_if(
[this](const rclcpp::TimerBase::SharedPtr & timer) {
if (timer) {
timers_manager_->add_timer(timer);
}
return false;
});

// Set callbacks for all other entity types
group->find_subscription_ptrs_if(
[this](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
if (subscription) {
Expand Down Expand Up @@ -201,4 +150,81 @@ EventsExecutorEntitiesCollector::set_entities_callbacks(
return false;
});
}

// Set an event callback for the node's notify guard condition, so if new entities are added
// or remove to this node we will receive an event.
rcl_ret_t ret = rcl_guard_condition_set_events_executor_callback(
associated_executor_,
&EventsExecutor::push_event,
this,
node_ptr->get_notify_guard_condition(),
false /* Discard previous events */);

if (ret != RCL_RET_OK) {
throw std::runtime_error("Couldn't set node guard condition callback");
}
}

void
EventsExecutorEntitiesCollector::unset_entities_callbacks(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
{
// Unset event callbacks for all entities in this node
// by searching them in all callback groups
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
if (!group || !group->can_be_taken_from().load()) {
continue;
}

// Timers are handled by the timers manager
group->find_timer_ptrs_if(
[this](const rclcpp::TimerBase::SharedPtr & timer) {
if (timer) {
timers_manager_->remove_timer(timer);
}
return false;
});

// Unset callbacks for all other entity types
group->find_subscription_ptrs_if(
[this](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
if (subscription) {
subscription->set_events_executor_callback(nullptr, nullptr);
}
return false;
});
group->find_service_ptrs_if(
[this](const rclcpp::ServiceBase::SharedPtr & service) {
if (service) {
service->set_events_executor_callback(nullptr, nullptr);
}
return false;
});
group->find_client_ptrs_if(
[this](const rclcpp::ClientBase::SharedPtr & client) {
if (client) {
client->set_events_executor_callback(nullptr, nullptr);
}
return false;
});
group->find_waitable_ptrs_if(
[this](const rclcpp::Waitable::SharedPtr & waitable) {
if (waitable) {
waitable->set_events_executor_callback(nullptr, nullptr);
}
return false;
});
}

// Unset the event callback for the node's notify guard condition, to stop receiving events
// if entities are added or removed to this node.
rcl_ret_t ret = rcl_guard_condition_set_events_executor_callback(
nullptr, nullptr, nullptr,
node->get_notify_guard_condition(),
false);

if (ret != RCL_RET_OK) {
throw std::runtime_error("Couldn't set node guard condition callback");
}
}

0 comments on commit 2898804

Please sign in to comment.