From 710b41f7ab2f30551fc5216bc1243502e18e38f0 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Wed, 6 Dec 2023 16:39:02 +0000 Subject: [PATCH] Revert "Avoid losing waitable handles while using MultiThreadedExecutor (#2109)" This reverts commit 232262c02a1265830c7785b7547bd51e1124fcd8. --- .../strategies/allocator_memory_strategy.hpp | 23 ++- .../test_allocator_memory_strategy.cpp | 176 +----------------- 2 files changed, 19 insertions(+), 180 deletions(-) diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index 41e8ec5d86..55aeef0d20 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -17,7 +17,6 @@ #include #include -#include #include "rcl/allocator.h" @@ -121,8 +120,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy } } for (size_t i = 0; i < waitable_handles_.size(); ++i) { - if (waitable_handles_[i]->is_ready(wait_set)) { - waitable_triggered_handles_.emplace_back(std::move(waitable_handles_[i])); + if (!waitable_handles_[i]->is_ready(wait_set)) { + waitable_handles_[i].reset(); } } @@ -146,7 +145,10 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy timer_handles_.end() ); - waitable_handles_.clear(); + waitable_handles_.erase( + std::remove(waitable_handles_.begin(), waitable_handles_.end(), nullptr), + waitable_handles_.end() + ); } bool collect_entities(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override @@ -392,9 +394,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy rclcpp::AnyExecutable & any_exec, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override { - auto & waitable_handles = waitable_triggered_handles_; - auto it = waitable_handles.begin(); - while (it != waitable_handles.end()) { + auto it = waitable_handles_.begin(); + while (it != waitable_handles_.end()) { std::shared_ptr & waitable = *it; if (waitable) { // Find the group for this handle and see if it can be serviced @@ -402,7 +403,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy if (!group) { // Group was not found, meaning the waitable is not valid... // Remove it from the ready list and continue looking - it = waitable_handles.erase(it); + it = waitable_handles_.erase(it); continue; } if (!group->can_be_taken_from().load()) { @@ -415,11 +416,11 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy any_exec.waitable = waitable; any_exec.callback_group = group; any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes); - waitable_handles.erase(it); + waitable_handles_.erase(it); return; } // Else, the waitable is no longer valid, remove it and continue - it = waitable_handles.erase(it); + it = waitable_handles_.erase(it); } } @@ -500,8 +501,6 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy VectorRebind> timer_handles_; VectorRebind> waitable_handles_; - VectorRebind> waitable_triggered_handles_; - std::shared_ptr allocator_; }; diff --git a/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp b/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp index 2adf907bc5..0e53deec70 100644 --- a/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp +++ b/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp @@ -63,57 +63,6 @@ class TestWaitable : public rclcpp::Waitable } }; -static bool test_waitable_result2 = false; - -class TestWaitable2 : public rclcpp::Waitable -{ -public: - explicit TestWaitable2(rcl_publisher_t * pub_ptr) - : pub_ptr_(pub_ptr), - pub_event_(rcl_get_zero_initialized_event()) - { - EXPECT_EQ( - rcl_publisher_event_init(&pub_event_, pub_ptr_, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED), - RCL_RET_OK); - } - - ~TestWaitable2() - { - EXPECT_EQ(rcl_event_fini(&pub_event_), RCL_RET_OK); - } - - void add_to_wait_set(rcl_wait_set_t * wait_set) override - { - EXPECT_EQ(rcl_wait_set_add_event(wait_set, &pub_event_, &wait_set_event_index_), RCL_RET_OK); - } - - bool is_ready(rcl_wait_set_t *) override - { - return test_waitable_result2; - } - - std::shared_ptr - take_data() override - { - return nullptr; - } - - void execute(std::shared_ptr & data) override - { - (void) data; - } - - size_t get_number_of_ready_events() override - { - return 1; - } - -private: - rcl_publisher_t * pub_ptr_; - rcl_event_t pub_event_; - size_t wait_set_event_index_; -}; - struct RclWaitSetSizes { size_t size_of_subscriptions = 0; @@ -708,129 +657,20 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_timer) { } TEST_F(TestAllocatorMemoryStrategy, get_next_waitable) { + auto node1 = std::make_shared("waitable_node", "ns"); + auto node2 = std::make_shared("waitable_node2", "ns"); + rclcpp::Waitable::SharedPtr waitable1 = std::make_shared(); + rclcpp::Waitable::SharedPtr waitable2 = std::make_shared(); + node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr); + node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr); + auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { rclcpp::AnyExecutable result; allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes); return result; }; - { - auto node1 = std::make_shared( - "waitable_node", "ns", - rclcpp::NodeOptions() - .start_parameter_event_publisher(false) - .start_parameter_services(false)); - - rclcpp::PublisherOptions pub_options; - pub_options.use_default_callbacks = false; - - auto pub1 = node1->create_publisher( - "test_topic_1", rclcpp::QoS(10), pub_options); - - auto waitable1 = - std::make_shared(pub1->get_publisher_handle().get()); - node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr); - - auto basic_node = create_node_with_disabled_callback_groups("basic_node"); - WeakCallbackGroupsToNodesMap weak_groups_to_nodes; - basic_node->for_each_callback_group( - [basic_node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) - { - weak_groups_to_nodes.insert( - std::pair( - group_ptr, - basic_node->get_node_base_interface())); - }); - node1->for_each_callback_group( - [node1, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) - { - weak_groups_to_nodes.insert( - std::pair( - group_ptr, - node1->get_node_base_interface())); - }); - allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); - - rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); - ASSERT_EQ( - rcl_wait_set_init( - &wait_set, - allocator_memory_strategy()->number_of_ready_subscriptions(), - allocator_memory_strategy()->number_of_guard_conditions(), - allocator_memory_strategy()->number_of_ready_timers(), - allocator_memory_strategy()->number_of_ready_clients(), - allocator_memory_strategy()->number_of_ready_services(), - allocator_memory_strategy()->number_of_ready_events(), - rclcpp::contexts::get_global_default_context()->get_rcl_context().get(), - allocator_memory_strategy()->get_allocator()), - RCL_RET_OK); - - ASSERT_TRUE(allocator_memory_strategy()->add_handles_to_wait_set(&wait_set)); - - ASSERT_EQ( - rcl_wait( - &wait_set, - std::chrono::duration_cast(std::chrono::milliseconds(100)) - .count()), - RCL_RET_OK); - test_waitable_result2 = true; - allocator_memory_strategy()->remove_null_handles(&wait_set); - - rclcpp::AnyExecutable result = get_next_entity(weak_groups_to_nodes); - EXPECT_EQ(result.node_base, node1->get_node_base_interface()); - test_waitable_result2 = false; - - EXPECT_EQ(rcl_wait_set_fini(&wait_set), RCL_RET_OK); - } - - { - auto node2 = std::make_shared( - "waitable_node2", "ns", - rclcpp::NodeOptions() - .start_parameter_services(false) - .start_parameter_event_publisher(false)); - - rclcpp::PublisherOptions pub_options; - pub_options.use_default_callbacks = false; - - auto pub2 = node2->create_publisher( - "test_topic_2", rclcpp::QoS(10), pub_options); - - auto waitable2 = - std::make_shared(pub2->get_publisher_handle().get()); - node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr); - - auto basic_node2 = std::make_shared( - "basic_node2", "ns", - rclcpp::NodeOptions() - .start_parameter_services(false) - .start_parameter_event_publisher(false)); - WeakCallbackGroupsToNodesMap weak_groups_to_uncollected_nodes; - basic_node2->for_each_callback_group( - [basic_node2, &weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) - { - weak_groups_to_uncollected_nodes.insert( - std::pair( - group_ptr, - basic_node2->get_node_base_interface())); - }); - node2->for_each_callback_group( - [node2, - &weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) - { - weak_groups_to_uncollected_nodes.insert( - std::pair( - group_ptr, - node2->get_node_base_interface())); - }); - - rclcpp::AnyExecutable failed_result = get_next_entity(weak_groups_to_uncollected_nodes); - EXPECT_EQ(failed_result.node_base, nullptr); - } + EXPECT_TRUE(TestGetNextEntity(node1, node2, get_next_entity)); } TEST_F(TestAllocatorMemoryStrategy, get_next_subscription_mutually_exclusive) {