Skip to content

Commit

Permalink
fix events-executor warm-up bug and add unit-tests (#2591)
Browse files Browse the repository at this point in the history
* add unit-test to verify that spin-all doesn't need warm-up

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>

* improve comment and add callback group test

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>

* move executor tests to a new file

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>

* do not require warm up iteration with events executor spin_some

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>

* add spin_some tests and cleanup

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>

* add missing include directives

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>

---------

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
alsora and mergify[bot] authored Sep 11, 2024
1 parent 2f71d6e commit f7056c0
Show file tree
Hide file tree
Showing 3 changed files with 273 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ EventsExecutor::setup_notify_waitable()
// ---> we need to wake up the executor so that it can terminate
// - a node or callback group guard condition is triggered:
// ---> the entities collection is changed, we need to update callbacks
entities_need_rebuild_ = false;
this->handle_updated_entities(false);
});

Expand Down Expand Up @@ -168,6 +167,14 @@ EventsExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhau
return false;
};

// If this spin is not exhaustive (e.g. spin_some), we need to explicitly check
// if entities need to be rebuilt here rather than letting the notify waitable event do it.
// A non-exhaustive spin would not check for work a second time, thus delaying the execution
// of some entities to the next invocation of spin.
if (!exhaustive) {
this->handle_updated_entities(false);
}

// Get the number of events and timers ready at start
const size_t ready_events_at_start = events_queue_->size();
size_t executed_events = 0;
Expand Down Expand Up @@ -314,6 +321,17 @@ void
EventsExecutor::handle_updated_entities(bool notify)
{
(void)notify;

// Do not rebuild if we don't need to.
// A rebuild event could be generated, but then
// this function could end up being called from somewhere else
// before that event gets processed, for example if
// a node or callback group is manually added to the executor.
const bool notify_waitable_triggered = entities_need_rebuild_.exchange(false);
if (!notify_waitable_triggered && !this->collector_.has_pending()) {
return;
}

// Build the new collection
this->collector_.update_collections();
auto callback_groups = this->collector_.get_all_callback_groups();
Expand Down
10 changes: 10 additions & 0 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,16 @@ if(TARGET test_executors_busy_waiting)
target_link_libraries(test_executors_busy_waiting ${PROJECT_NAME})
endif()

ament_add_gtest(
test_executors_warmup
executors/test_executors_warmup.cpp
executors/test_waitable.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}"
TIMEOUT 180)
if(TARGET test_executors_warmup)
target_link_libraries(test_executors_warmup ${PROJECT_NAME} ${test_msgs_TARGETS})
endif()

ament_add_gtest(test_static_single_threaded_executor executors/test_static_single_threaded_executor.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}")
ament_add_test_label(test_static_single_threaded_executor mimick)
Expand Down
244 changes: 244 additions & 0 deletions rclcpp/test/rclcpp/executors/test_executors_warmup.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* This test checks all implementations of rclcpp::executor to check they pass they basic API
* tests. Anything specific to any executor in particular should go in a separate test file.
*/

#include <gtest/gtest.h>

#include <chrono>
#include <cstddef>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>

#include "rclcpp/rclcpp.hpp"

#include "test_msgs/msg/empty.hpp"

#include "./executor_types.hpp"

using namespace std::chrono_literals;

template<typename T>
class TestExecutorsWarmup : public ::testing::Test
{
public:
void SetUp()
{
rclcpp::init(0, nullptr);
}

void TearDown()
{
rclcpp::shutdown();
}
};

TYPED_TEST_SUITE(TestExecutorsWarmup, ExecutorTypes, ExecutorTypeNames);

// This test verifies that spin_all is correctly collecting work multiple times
// even when one of the items of work is a notifier waitable event and thus results in
// rebuilding the entities collection.
// When spin_all goes back to collect more work, it should see the ready items from
// the new added entities
TYPED_TEST(TestExecutorsWarmup, spin_all_doesnt_require_warmup)
{
using ExecutorType = TypeParam;
ExecutorType executor;

// Enable intra-process to guarantee deterministic and synchronous delivery of the message / event
auto node_options = rclcpp::NodeOptions().use_intra_process_comms(true);
auto node = std::make_shared<rclcpp::Node>("test_node", node_options);

// Add node to the executor before creating the entities
executor.add_node(node);

// Create entities, this will produce a notifier waitable event, telling the executor to refresh
// the entities collection
auto publisher = node->create_publisher<test_msgs::msg::Empty>("test_topic", rclcpp::QoS(10));
size_t callback_count = 0;
auto callback = [&callback_count](test_msgs::msg::Empty::ConstSharedPtr) {callback_count++;};
auto subscription =
node->create_subscription<test_msgs::msg::Empty>(
"test_topic", rclcpp::QoS(10), std::move(callback));

ASSERT_EQ(callback_count, 0u);

// Publish a message so that the new entities (i.e. the subscriber) already have work to do
publisher->publish(test_msgs::msg::Empty());

// We need to select a duration that is greater than
// the time taken to refresh the entities collection and rebuild the waitset.
// spin-all is expected to process the notifier waitable event, rebuild the collection,
// and then collect more work, finding the subscription message event.
// This duration has been selected empirically.
executor.spin_all(std::chrono::milliseconds(500));

// Verify that the callback is called as part of the spin above
EXPECT_EQ(callback_count, 1u);
}

// Same test as `spin_all_doesnt_require_warmup`, but uses a callback group
// This test reproduces the bug reported by https://github.com/ros2/rclcpp/issues/2589
TYPED_TEST(TestExecutorsWarmup, spin_all_doesnt_require_warmup_with_cbgroup)
{
using ExecutorType = TypeParam;

// TODO(alsora): Enable when https://github.com/ros2/rclcpp/pull/2595 gets merged
if (
std::is_same<ExecutorType, rclcpp::executors::SingleThreadedExecutor>() ||
std::is_same<ExecutorType, rclcpp::executors::MultiThreadedExecutor>())
{
GTEST_SKIP();
}

ExecutorType executor;

// Enable intra-process to guarantee deterministic and synchronous delivery of the message / event
auto node_options = rclcpp::NodeOptions().use_intra_process_comms(true);
auto node = std::make_shared<rclcpp::Node>("test_node", node_options);

auto callback_group = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive,
false);

// Add callback group to the executor before creating the entities
executor.add_callback_group(callback_group, node->get_node_base_interface());

// Create entities, this will produce a notifier waitable event, telling the executor to refresh
// the entities collection
auto publisher = node->create_publisher<test_msgs::msg::Empty>("test_topic", rclcpp::QoS(10));
size_t callback_count = 0;
auto callback = [&callback_count](test_msgs::msg::Empty::ConstSharedPtr) {callback_count++;};
rclcpp::SubscriptionOptions sub_options;
sub_options.callback_group = callback_group;
auto subscription =
node->create_subscription<test_msgs::msg::Empty>(
"test_topic", rclcpp::QoS(10), std::move(callback), sub_options);

ASSERT_EQ(callback_count, 0u);

// Publish a message so that the new entities (i.e. the subscriber) already have work to do
publisher->publish(test_msgs::msg::Empty());

// We need to select a duration that is greater than
// the time taken to refresh the entities collection and rebuild the waitset.
// spin-all is expected to process the notifier waitable event, rebuild the collection,
// and then collect more work, finding the subscription message event.
// This duration has been selected empirically.
executor.spin_all(std::chrono::milliseconds(500));

// Verify that the callback is called as part of the spin above
EXPECT_EQ(callback_count, 1u);
}

TYPED_TEST(TestExecutorsWarmup, spin_some_doesnt_require_warmup)
{
using ExecutorType = TypeParam;

// TODO(alsora): currently only the events-executor passes this test.
// Enable single-threaded and multi-threaded executors
// when https://github.com/ros2/rclcpp/pull/2595 gets merged
if (
!std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>())
{
GTEST_SKIP();
}

ExecutorType executor;

// Enable intra-process to guarantee deterministic and synchronous delivery of the message / event
auto node_options = rclcpp::NodeOptions().use_intra_process_comms(true);
auto node = std::make_shared<rclcpp::Node>("test_node", node_options);

// Add node to the executor before creating the entities
executor.add_node(node);

// Create entities, this will produce a notifier waitable event, telling the executor to refresh
// the entities collection
auto publisher = node->create_publisher<test_msgs::msg::Empty>("test_topic", rclcpp::QoS(10));
size_t callback_count = 0;
auto callback = [&callback_count](test_msgs::msg::Empty::ConstSharedPtr) {callback_count++;};
auto subscription =
node->create_subscription<test_msgs::msg::Empty>(
"test_topic", rclcpp::QoS(10), std::move(callback));

ASSERT_EQ(callback_count, 0u);

// Publish a message so that the new entities (i.e. the subscriber) already have work to do
publisher->publish(test_msgs::msg::Empty());

// NOTE: intra-process communication is enabled, so the subscription will immediately see
// the new message, no risk of race conditions where spin_some gets called before the
// message has been delivered.
executor.spin_some();

// Verify that the callback is called as part of the spin above
EXPECT_EQ(callback_count, 1u);
}

TYPED_TEST(TestExecutorsWarmup, spin_some_doesnt_require_warmup_with_cbgroup)
{
using ExecutorType = TypeParam;

// TODO(alsora): currently only the events-executor passes this test.
// Enable single-threaded and multi-threaded executors
// when https://github.com/ros2/rclcpp/pull/2595 gets merged
if (
!std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>())
{
GTEST_SKIP();
}

ExecutorType executor;

// Enable intra-process to guarantee deterministic and synchronous delivery of the message / event
auto node_options = rclcpp::NodeOptions().use_intra_process_comms(true);
auto node = std::make_shared<rclcpp::Node>("test_node", node_options);

auto callback_group = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive,
false);

// Add callback group to the executor before creating the entities
executor.add_callback_group(callback_group, node->get_node_base_interface());

// Create entities, this will produce a notifier waitable event, telling the executor to refresh
// the entities collection
auto publisher = node->create_publisher<test_msgs::msg::Empty>("test_topic", rclcpp::QoS(10));
size_t callback_count = 0;
auto callback = [&callback_count](test_msgs::msg::Empty::ConstSharedPtr) {callback_count++;};
rclcpp::SubscriptionOptions sub_options;
sub_options.callback_group = callback_group;
auto subscription =
node->create_subscription<test_msgs::msg::Empty>(
"test_topic", rclcpp::QoS(10), std::move(callback), sub_options);

ASSERT_EQ(callback_count, 0u);

// Publish a message so that the new entities (i.e. the subscriber) already have work to do
publisher->publish(test_msgs::msg::Empty());

// NOTE: intra-process communication is enabled, so the subscription will immediately see
// the new message, no risk of race conditions where spin_some gets called before the
// message has been delivered.
executor.spin_some();

// Verify that the callback is called as part of the spin above
EXPECT_EQ(callback_count, 1u);
}

0 comments on commit f7056c0

Please sign in to comment.