Skip to content

Commit

Permalink
Avoid losing waitable handles while using MultiThreadedExecutor (ros2…
Browse files Browse the repository at this point in the history
…#2109)

Signed-off-by: Barry Xu <barry.xu@sony.com>
  • Loading branch information
Barry-Xu-2018 authored and Alberto Soragna committed May 3, 2023
1 parent 3ea69d6 commit 3953908
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 19 deletions.
23 changes: 12 additions & 11 deletions rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <memory>
#include <vector>
#include <utility>

#include "rcl/allocator.h"

Expand Down Expand Up @@ -120,8 +121,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
}
}
for (size_t i = 0; i < waitable_handles_.size(); ++i) {
if (!waitable_handles_[i]->is_ready(wait_set)) {
waitable_handles_[i].reset();
if (waitable_handles_[i]->is_ready(wait_set)) {
waitable_triggered_handles_.emplace_back(std::move(waitable_handles_[i]));
}
}

Expand All @@ -145,10 +146,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
timer_handles_.end()
);

waitable_handles_.erase(
std::remove(waitable_handles_.begin(), waitable_handles_.end(), nullptr),
waitable_handles_.end()
);
waitable_handles_.clear();
}

bool collect_entities(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
Expand Down Expand Up @@ -392,16 +390,17 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
rclcpp::AnyExecutable & any_exec,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
{
auto it = waitable_handles_.begin();
while (it != waitable_handles_.end()) {
auto & waitable_handles = waitable_triggered_handles_;
auto it = waitable_handles.begin();
while (it != waitable_handles.end()) {
std::shared_ptr<Waitable> & waitable = *it;
if (waitable) {
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_waitable(waitable, weak_groups_to_nodes);
if (!group) {
// Group was not found, meaning the waitable is not valid...
// Remove it from the ready list and continue looking
it = waitable_handles_.erase(it);
it = waitable_handles.erase(it);
continue;
}
if (!group->can_be_taken_from().load()) {
Expand All @@ -414,11 +413,11 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
any_exec.waitable = waitable;
any_exec.callback_group = group;
any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
waitable_handles_.erase(it);
waitable_handles.erase(it);
return;
}
// Else, the waitable is no longer valid, remove it and continue
it = waitable_handles_.erase(it);
it = waitable_handles.erase(it);
}
}

Expand Down Expand Up @@ -499,6 +498,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
VectorRebind<std::shared_ptr<Waitable>> waitable_handles_;

VectorRebind<std::shared_ptr<Waitable>> waitable_triggered_handles_;

std::shared_ptr<VoidAlloc> allocator_;
};

Expand Down
176 changes: 168 additions & 8 deletions rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,57 @@ class TestWaitable : public rclcpp::Waitable
}
};

static bool test_waitable_result2 = false;

class TestWaitable2 : public rclcpp::Waitable
{
public:
explicit TestWaitable2(rcl_publisher_t * pub_ptr)
: pub_ptr_(pub_ptr),
pub_event_(rcl_get_zero_initialized_event())
{
EXPECT_EQ(
rcl_publisher_event_init(&pub_event_, pub_ptr_, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED),
RCL_RET_OK);
}

~TestWaitable2()
{
EXPECT_EQ(rcl_event_fini(&pub_event_), RCL_RET_OK);
}

void add_to_wait_set(rcl_wait_set_t * wait_set) override
{
EXPECT_EQ(rcl_wait_set_add_event(wait_set, &pub_event_, &wait_set_event_index_), RCL_RET_OK);
}

bool is_ready(rcl_wait_set_t *) override
{
return test_waitable_result2;
}

std::shared_ptr<void>
take_data() override
{
return nullptr;
}

void execute(std::shared_ptr<void> & data) override
{
(void) data;
}

size_t get_number_of_ready_events() override
{
return 1;
}

private:
rcl_publisher_t * pub_ptr_;
rcl_event_t pub_event_;
size_t wait_set_event_index_;
};

struct RclWaitSetSizes
{
size_t size_of_subscriptions = 0;
Expand Down Expand Up @@ -657,20 +708,129 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_timer) {
}

TEST_F(TestAllocatorMemoryStrategy, get_next_waitable) {
auto node1 = std::make_shared<rclcpp::Node>("waitable_node", "ns");
auto node2 = std::make_shared<rclcpp::Node>("waitable_node2", "ns");
rclcpp::Waitable::SharedPtr waitable1 = std::make_shared<TestWaitable>();
rclcpp::Waitable::SharedPtr waitable2 = std::make_shared<TestWaitable>();
node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr);
node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr);

auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) {
rclcpp::AnyExecutable result;
allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes);
return result;
};

EXPECT_TRUE(TestGetNextEntity(node1, node2, get_next_entity));
{
auto node1 = std::make_shared<rclcpp::Node>(
"waitable_node", "ns",
rclcpp::NodeOptions()
.start_parameter_event_publisher(false)
.start_parameter_services(false));

rclcpp::PublisherOptions pub_options;
pub_options.use_default_callbacks = false;

auto pub1 = node1->create_publisher<test_msgs::msg::Empty>(
"test_topic_1", rclcpp::QoS(10), pub_options);

auto waitable1 =
std::make_shared<TestWaitable2>(pub1->get_publisher_handle().get());
node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr);

auto basic_node = create_node_with_disabled_callback_groups("basic_node");
WeakCallbackGroupsToNodesMap weak_groups_to_nodes;
basic_node->for_each_callback_group(
[basic_node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
weak_groups_to_nodes.insert(
std::pair<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
group_ptr,
basic_node->get_node_base_interface()));
});
node1->for_each_callback_group(
[node1, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
weak_groups_to_nodes.insert(
std::pair<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
group_ptr,
node1->get_node_base_interface()));
});
allocator_memory_strategy()->collect_entities(weak_groups_to_nodes);

rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
ASSERT_EQ(
rcl_wait_set_init(
&wait_set,
allocator_memory_strategy()->number_of_ready_subscriptions(),
allocator_memory_strategy()->number_of_guard_conditions(),
allocator_memory_strategy()->number_of_ready_timers(),
allocator_memory_strategy()->number_of_ready_clients(),
allocator_memory_strategy()->number_of_ready_services(),
allocator_memory_strategy()->number_of_ready_events(),
rclcpp::contexts::get_global_default_context()->get_rcl_context().get(),
allocator_memory_strategy()->get_allocator()),
RCL_RET_OK);

ASSERT_TRUE(allocator_memory_strategy()->add_handles_to_wait_set(&wait_set));

ASSERT_EQ(
rcl_wait(
&wait_set,
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds(100))
.count()),
RCL_RET_OK);
test_waitable_result2 = true;
allocator_memory_strategy()->remove_null_handles(&wait_set);

rclcpp::AnyExecutable result = get_next_entity(weak_groups_to_nodes);
EXPECT_EQ(result.node_base, node1->get_node_base_interface());
test_waitable_result2 = false;

EXPECT_EQ(rcl_wait_set_fini(&wait_set), RCL_RET_OK);
}

{
auto node2 = std::make_shared<rclcpp::Node>(
"waitable_node2", "ns",
rclcpp::NodeOptions()
.start_parameter_services(false)
.start_parameter_event_publisher(false));

rclcpp::PublisherOptions pub_options;
pub_options.use_default_callbacks = false;

auto pub2 = node2->create_publisher<test_msgs::msg::Empty>(
"test_topic_2", rclcpp::QoS(10), pub_options);

auto waitable2 =
std::make_shared<TestWaitable2>(pub2->get_publisher_handle().get());
node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr);

auto basic_node2 = std::make_shared<rclcpp::Node>(
"basic_node2", "ns",
rclcpp::NodeOptions()
.start_parameter_services(false)
.start_parameter_event_publisher(false));
WeakCallbackGroupsToNodesMap weak_groups_to_uncollected_nodes;
basic_node2->for_each_callback_group(
[basic_node2, &weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
weak_groups_to_uncollected_nodes.insert(
std::pair<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
group_ptr,
basic_node2->get_node_base_interface()));
});
node2->for_each_callback_group(
[node2,
&weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
weak_groups_to_uncollected_nodes.insert(
std::pair<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
group_ptr,
node2->get_node_base_interface()));
});

rclcpp::AnyExecutable failed_result = get_next_entity(weak_groups_to_uncollected_nodes);
EXPECT_EQ(failed_result.node_base, nullptr);
}
}

TEST_F(TestAllocatorMemoryStrategy, get_next_subscription_mutually_exclusive) {
Expand Down

0 comments on commit 3953908

Please sign in to comment.