Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid losing waitable handles while using MultiThreadedExecutor #2109

Merged
merged 4 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <memory>
#include <vector>
#include <utility>

#include "rcl/allocator.h"

Expand Down Expand Up @@ -120,8 +121,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
}
}
for (size_t i = 0; i < waitable_handles_.size(); ++i) {
if (!waitable_handles_[i]->is_ready(wait_set)) {
waitable_handles_[i].reset();
if (waitable_handles_[i]->is_ready(wait_set)) {
waitable_triggered_handles_.emplace_back(std::move(waitable_handles_[i]));
}
}

Expand All @@ -145,10 +146,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
timer_handles_.end()
);

waitable_handles_.erase(
std::remove(waitable_handles_.begin(), waitable_handles_.end(), nullptr),
waitable_handles_.end()
);
waitable_handles_.clear();
}

bool collect_entities(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
Expand Down Expand Up @@ -392,16 +390,17 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
rclcpp::AnyExecutable & any_exec,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
{
auto it = waitable_handles_.begin();
while (it != waitable_handles_.end()) {
auto & waitable_handles = waitable_triggered_handles_;
auto it = waitable_handles.begin();
while (it != waitable_handles.end()) {
std::shared_ptr<Waitable> & waitable = *it;
if (waitable) {
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_waitable(waitable, weak_groups_to_nodes);
if (!group) {
// Group was not found, meaning the waitable is not valid...
// Remove it from the ready list and continue looking
it = waitable_handles_.erase(it);
it = waitable_handles.erase(it);
continue;
}
if (!group->can_be_taken_from().load()) {
Expand All @@ -414,11 +413,11 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
any_exec.waitable = waitable;
any_exec.callback_group = group;
any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
waitable_handles_.erase(it);
waitable_handles.erase(it);
return;
}
// Else, the waitable is no longer valid, remove it and continue
it = waitable_handles_.erase(it);
it = waitable_handles.erase(it);
}
}

Expand Down Expand Up @@ -499,6 +498,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
VectorRebind<std::shared_ptr<Waitable>> waitable_handles_;

VectorRebind<std::shared_ptr<Waitable>> waitable_triggered_handles_;

std::shared_ptr<VoidAlloc> allocator_;
};

Expand Down
176 changes: 168 additions & 8 deletions rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,57 @@ class TestWaitable : public rclcpp::Waitable
}
};

static bool test_waitable_result2 = false;

class TestWaitable2 : public rclcpp::Waitable
{
public:
explicit TestWaitable2(rcl_publisher_t * pub_ptr)
: pub_ptr_(pub_ptr),
pub_event_(rcl_get_zero_initialized_event())
{
EXPECT_EQ(
rcl_publisher_event_init(&pub_event_, pub_ptr_, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED),
RCL_RET_OK);
}

~TestWaitable2()
{
EXPECT_EQ(rcl_event_fini(&pub_event_), RCL_RET_OK);
}

void add_to_wait_set(rcl_wait_set_t * wait_set) override
{
EXPECT_EQ(rcl_wait_set_add_event(wait_set, &pub_event_, &wait_set_event_index_), RCL_RET_OK);
}

bool is_ready(rcl_wait_set_t *) override
{
return test_waitable_result2;
}

std::shared_ptr<void>
take_data() override
{
return nullptr;
}

void execute(std::shared_ptr<void> & data) override
{
(void) data;
}

size_t get_number_of_ready_events() override
{
return 1;
}

private:
rcl_publisher_t * pub_ptr_;
rcl_event_t pub_event_;
size_t wait_set_event_index_;
};

struct RclWaitSetSizes
{
size_t size_of_subscriptions = 0;
Expand Down Expand Up @@ -657,20 +708,129 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_timer) {
}

TEST_F(TestAllocatorMemoryStrategy, get_next_waitable) {
auto node1 = std::make_shared<rclcpp::Node>("waitable_node", "ns");
auto node2 = std::make_shared<rclcpp::Node>("waitable_node2", "ns");
rclcpp::Waitable::SharedPtr waitable1 = std::make_shared<TestWaitable>();
rclcpp::Waitable::SharedPtr waitable2 = std::make_shared<TestWaitable>();
node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr);
node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr);

auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) {
rclcpp::AnyExecutable result;
allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes);
return result;
};

EXPECT_TRUE(TestGetNextEntity(node1, node2, get_next_entity));
{
auto node1 = std::make_shared<rclcpp::Node>(
"waitable_node", "ns",
rclcpp::NodeOptions()
.start_parameter_event_publisher(false)
.start_parameter_services(false));

rclcpp::PublisherOptions pub_options;
pub_options.use_default_callbacks = false;

auto pub1 = node1->create_publisher<test_msgs::msg::Empty>(
"test_topic_1", rclcpp::QoS(10), pub_options);

auto waitable1 =
std::make_shared<TestWaitable2>(pub1->get_publisher_handle().get());
node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr);

auto basic_node = create_node_with_disabled_callback_groups("basic_node");
WeakCallbackGroupsToNodesMap weak_groups_to_nodes;
basic_node->for_each_callback_group(
[basic_node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
weak_groups_to_nodes.insert(
std::pair<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
group_ptr,
basic_node->get_node_base_interface()));
});
node1->for_each_callback_group(
[node1, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
weak_groups_to_nodes.insert(
std::pair<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
group_ptr,
node1->get_node_base_interface()));
});
allocator_memory_strategy()->collect_entities(weak_groups_to_nodes);

rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
ASSERT_EQ(
rcl_wait_set_init(
&wait_set,
allocator_memory_strategy()->number_of_ready_subscriptions(),
allocator_memory_strategy()->number_of_guard_conditions(),
allocator_memory_strategy()->number_of_ready_timers(),
allocator_memory_strategy()->number_of_ready_clients(),
allocator_memory_strategy()->number_of_ready_services(),
allocator_memory_strategy()->number_of_ready_events(),
rclcpp::contexts::get_global_default_context()->get_rcl_context().get(),
allocator_memory_strategy()->get_allocator()),
RCL_RET_OK);

ASSERT_TRUE(allocator_memory_strategy()->add_handles_to_wait_set(&wait_set));

ASSERT_EQ(
rcl_wait(
&wait_set,
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds(100))
.count()),
RCL_RET_OK);
test_waitable_result2 = true;
allocator_memory_strategy()->remove_null_handles(&wait_set);

rclcpp::AnyExecutable result = get_next_entity(weak_groups_to_nodes);
EXPECT_EQ(result.node_base, node1->get_node_base_interface());
test_waitable_result2 = false;

EXPECT_EQ(rcl_wait_set_fini(&wait_set), RCL_RET_OK);
}

{
auto node2 = std::make_shared<rclcpp::Node>(
"waitable_node2", "ns",
rclcpp::NodeOptions()
.start_parameter_services(false)
.start_parameter_event_publisher(false));

rclcpp::PublisherOptions pub_options;
pub_options.use_default_callbacks = false;

auto pub2 = node2->create_publisher<test_msgs::msg::Empty>(
"test_topic_2", rclcpp::QoS(10), pub_options);

auto waitable2 =
std::make_shared<TestWaitable2>(pub2->get_publisher_handle().get());
node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr);

auto basic_node2 = std::make_shared<rclcpp::Node>(
"basic_node2", "ns",
rclcpp::NodeOptions()
.start_parameter_services(false)
.start_parameter_event_publisher(false));
WeakCallbackGroupsToNodesMap weak_groups_to_uncollected_nodes;
basic_node2->for_each_callback_group(
[basic_node2, &weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
weak_groups_to_uncollected_nodes.insert(
std::pair<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
group_ptr,
basic_node2->get_node_base_interface()));
});
node2->for_each_callback_group(
[node2,
&weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
weak_groups_to_uncollected_nodes.insert(
std::pair<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
group_ptr,
node2->get_node_base_interface()));
});

rclcpp::AnyExecutable failed_result = get_next_entity(weak_groups_to_uncollected_nodes);
EXPECT_EQ(failed_result.node_base, nullptr);
}
}

TEST_F(TestAllocatorMemoryStrategy, get_next_subscription_mutually_exclusive) {
Expand Down