From 320c111cc56db56d3f8c97def5cab84d880b7a81 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Tue, 14 Mar 2023 01:10:24 +0800 Subject: [PATCH] Avoid losing waitable handles while using MultiThreadedExecutor (#2109) Signed-off-by: Barry Xu --- .../strategies/allocator_memory_strategy.hpp | 23 +-- .../test_allocator_memory_strategy.cpp | 176 +++++++++++++++++- 2 files changed, 180 insertions(+), 19 deletions(-) diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index 88698179d4..46379744a1 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -17,6 +17,7 @@ #include #include +#include #include "rcl/allocator.h" @@ -120,8 +121,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy } } for (size_t i = 0; i < waitable_handles_.size(); ++i) { - if (!waitable_handles_[i]->is_ready(wait_set)) { - waitable_handles_[i].reset(); + if (waitable_handles_[i]->is_ready(wait_set)) { + waitable_triggered_handles_.emplace_back(std::move(waitable_handles_[i])); } } @@ -145,10 +146,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy timer_handles_.end() ); - waitable_handles_.erase( - std::remove(waitable_handles_.begin(), waitable_handles_.end(), nullptr), - waitable_handles_.end() - ); + waitable_handles_.clear(); } bool collect_entities(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override @@ -392,8 +390,9 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy rclcpp::AnyExecutable & any_exec, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override { - auto it = waitable_handles_.begin(); - while (it != waitable_handles_.end()) { + auto & waitable_handles = waitable_triggered_handles_; + auto it = waitable_handles.begin(); + while (it != waitable_handles.end()) { std::shared_ptr & waitable = *it; if (waitable) { // Find the group for this handle and see if it can be serviced @@ -401,7 +400,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy if (!group) { // Group was not found, meaning the waitable is not valid... // Remove it from the ready list and continue looking - it = waitable_handles_.erase(it); + it = waitable_handles.erase(it); continue; } if (!group->can_be_taken_from().load()) { @@ -414,11 +413,11 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy any_exec.waitable = waitable; any_exec.callback_group = group; any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes); - waitable_handles_.erase(it); + waitable_handles.erase(it); return; } // Else, the waitable is no longer valid, remove it and continue - it = waitable_handles_.erase(it); + it = waitable_handles.erase(it); } } @@ -499,6 +498,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy VectorRebind> timer_handles_; VectorRebind> waitable_handles_; + VectorRebind> waitable_triggered_handles_; + std::shared_ptr allocator_; }; diff --git a/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp b/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp index 0e53deec70..2adf907bc5 100644 --- a/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp +++ b/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp @@ -63,6 +63,57 @@ class TestWaitable : public rclcpp::Waitable } }; +static bool test_waitable_result2 = false; + +class TestWaitable2 : public rclcpp::Waitable +{ +public: + explicit TestWaitable2(rcl_publisher_t * pub_ptr) + : pub_ptr_(pub_ptr), + pub_event_(rcl_get_zero_initialized_event()) + { + EXPECT_EQ( + rcl_publisher_event_init(&pub_event_, pub_ptr_, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED), + RCL_RET_OK); + } + + ~TestWaitable2() + { + EXPECT_EQ(rcl_event_fini(&pub_event_), RCL_RET_OK); + } + + void add_to_wait_set(rcl_wait_set_t * wait_set) override + { + EXPECT_EQ(rcl_wait_set_add_event(wait_set, &pub_event_, &wait_set_event_index_), RCL_RET_OK); + } + + bool is_ready(rcl_wait_set_t *) override + { + return test_waitable_result2; + } + + std::shared_ptr + take_data() override + { + return nullptr; + } + + void execute(std::shared_ptr & data) override + { + (void) data; + } + + size_t get_number_of_ready_events() override + { + return 1; + } + +private: + rcl_publisher_t * pub_ptr_; + rcl_event_t pub_event_; + size_t wait_set_event_index_; +}; + struct RclWaitSetSizes { size_t size_of_subscriptions = 0; @@ -657,20 +708,129 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_timer) { } TEST_F(TestAllocatorMemoryStrategy, get_next_waitable) { - auto node1 = std::make_shared("waitable_node", "ns"); - auto node2 = std::make_shared("waitable_node2", "ns"); - rclcpp::Waitable::SharedPtr waitable1 = std::make_shared(); - rclcpp::Waitable::SharedPtr waitable2 = std::make_shared(); - node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr); - node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr); - auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { rclcpp::AnyExecutable result; allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes); return result; }; - EXPECT_TRUE(TestGetNextEntity(node1, node2, get_next_entity)); + { + auto node1 = std::make_shared( + "waitable_node", "ns", + rclcpp::NodeOptions() + .start_parameter_event_publisher(false) + .start_parameter_services(false)); + + rclcpp::PublisherOptions pub_options; + pub_options.use_default_callbacks = false; + + auto pub1 = node1->create_publisher( + "test_topic_1", rclcpp::QoS(10), pub_options); + + auto waitable1 = + std::make_shared(pub1->get_publisher_handle().get()); + node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr); + + auto basic_node = create_node_with_disabled_callback_groups("basic_node"); + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; + basic_node->for_each_callback_group( + [basic_node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { + weak_groups_to_nodes.insert( + std::pair( + group_ptr, + basic_node->get_node_base_interface())); + }); + node1->for_each_callback_group( + [node1, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { + weak_groups_to_nodes.insert( + std::pair( + group_ptr, + node1->get_node_base_interface())); + }); + allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); + + rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); + ASSERT_EQ( + rcl_wait_set_init( + &wait_set, + allocator_memory_strategy()->number_of_ready_subscriptions(), + allocator_memory_strategy()->number_of_guard_conditions(), + allocator_memory_strategy()->number_of_ready_timers(), + allocator_memory_strategy()->number_of_ready_clients(), + allocator_memory_strategy()->number_of_ready_services(), + allocator_memory_strategy()->number_of_ready_events(), + rclcpp::contexts::get_global_default_context()->get_rcl_context().get(), + allocator_memory_strategy()->get_allocator()), + RCL_RET_OK); + + ASSERT_TRUE(allocator_memory_strategy()->add_handles_to_wait_set(&wait_set)); + + ASSERT_EQ( + rcl_wait( + &wait_set, + std::chrono::duration_cast(std::chrono::milliseconds(100)) + .count()), + RCL_RET_OK); + test_waitable_result2 = true; + allocator_memory_strategy()->remove_null_handles(&wait_set); + + rclcpp::AnyExecutable result = get_next_entity(weak_groups_to_nodes); + EXPECT_EQ(result.node_base, node1->get_node_base_interface()); + test_waitable_result2 = false; + + EXPECT_EQ(rcl_wait_set_fini(&wait_set), RCL_RET_OK); + } + + { + auto node2 = std::make_shared( + "waitable_node2", "ns", + rclcpp::NodeOptions() + .start_parameter_services(false) + .start_parameter_event_publisher(false)); + + rclcpp::PublisherOptions pub_options; + pub_options.use_default_callbacks = false; + + auto pub2 = node2->create_publisher( + "test_topic_2", rclcpp::QoS(10), pub_options); + + auto waitable2 = + std::make_shared(pub2->get_publisher_handle().get()); + node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr); + + auto basic_node2 = std::make_shared( + "basic_node2", "ns", + rclcpp::NodeOptions() + .start_parameter_services(false) + .start_parameter_event_publisher(false)); + WeakCallbackGroupsToNodesMap weak_groups_to_uncollected_nodes; + basic_node2->for_each_callback_group( + [basic_node2, &weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { + weak_groups_to_uncollected_nodes.insert( + std::pair( + group_ptr, + basic_node2->get_node_base_interface())); + }); + node2->for_each_callback_group( + [node2, + &weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { + weak_groups_to_uncollected_nodes.insert( + std::pair( + group_ptr, + node2->get_node_base_interface())); + }); + + rclcpp::AnyExecutable failed_result = get_next_entity(weak_groups_to_uncollected_nodes); + EXPECT_EQ(failed_result.node_base, nullptr); + } } TEST_F(TestAllocatorMemoryStrategy, get_next_subscription_mutually_exclusive) {