From 76c970b820a62a9ef463fcd8495dd646dc9dc5b1 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Mon, 8 Apr 2024 17:34:31 +0200 Subject: [PATCH] feat: Added support for multi thread wait / shutdown This adds support for multiple threads waiting on the same clock, while an shutdown is invoked. Signed-off-by: Janosch Machowinski --- rclcpp/include/rclcpp/clock.hpp | 8 ++- rclcpp/src/rclcpp/clock.cpp | 15 ++++-- rclcpp/test/rclcpp/test_clock.cpp | 84 +++++++++++++++++++++++++------ 3 files changed, 87 insertions(+), 20 deletions(-) diff --git a/rclcpp/include/rclcpp/clock.hpp b/rclcpp/include/rclcpp/clock.hpp index b714bf59b8..6efb8f31a4 100644 --- a/rclcpp/include/rclcpp/clock.hpp +++ b/rclcpp/include/rclcpp/clock.hpp @@ -193,11 +193,17 @@ class Clock bool ros_time_is_active(); + /** + * Cancels an ongoing or future sleep operation of one thread. + * + * This function is intended for multi threaded signaling. It can + * be used by one thread, to wakeup another thread, using any + * of the sleep_ or wait_ methods. + */ RCLCPP_PUBLIC void cancel_sleep_or_wait(); - /// Return the rcl_clock_t clock handle RCLCPP_PUBLIC rcl_clock_t * diff --git a/rclcpp/src/rclcpp/clock.cpp b/rclcpp/src/rclcpp/clock.cpp index addf3628c3..5c13f19d13 100644 --- a/rclcpp/src/rclcpp/clock.cpp +++ b/rclcpp/src/rclcpp/clock.cpp @@ -50,6 +50,7 @@ class Clock::Impl rcl_clock_t rcl_clock_; rcl_allocator_t allocator_; bool stop_sleeping_ = false; + bool shutdown_ = false; std::condition_variable cv_; std::mutex wait_mutex_; std::mutex clock_mutex_; @@ -109,7 +110,11 @@ Clock::sleep_until( // Wake this thread if the context is shutdown rclcpp::OnShutdownCallbackHandle shutdown_cb_handle = context->add_on_shutdown_callback( [this]() { - cancel_sleep_or_wait(); + { + std::unique_lock lock(impl_->wait_mutex_); + impl_->shutdown_ = true; + } + impl_->cv_.notify_one(); }); // No longer need the shutdown callback when this function exits auto callback_remover = rcpputils::scope_exit( @@ -127,7 +132,7 @@ Clock::sleep_until( // loop over spurious wakeups but notice shutdown or stop of sleep std::unique_lock lock(impl_->wait_mutex_); - while (now() < until && !impl_->stop_sleeping_ && context->is_valid()) { + while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) { impl_->cv_.wait_until(lock, chrono_until); } impl_->stop_sleeping_ = false; @@ -139,7 +144,7 @@ Clock::sleep_until( // loop over spurious wakeups but notice shutdown or stop of sleep std::unique_lock lock(impl_->wait_mutex_); - while (now() < until && !impl_->stop_sleeping_ && context->is_valid()) { + while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) { impl_->cv_.wait_until(lock, system_time); } impl_->stop_sleeping_ = false; @@ -171,7 +176,7 @@ Clock::sleep_until( // loop over spurious wakeups but notice shutdown, stop of sleep or time source change std::unique_lock lock(impl_->wait_mutex_); - while (now() < until && !impl_->stop_sleeping_ && context->is_valid() && + while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() && !time_source_changed) { impl_->cv_.wait_until(lock, system_time); @@ -182,7 +187,7 @@ Clock::sleep_until( // Just wait without "until" because installed // jump callbacks wake the cv on every new sample. std::unique_lock lock(impl_->wait_mutex_); - while (now() < until && !impl_->stop_sleeping_ && context->is_valid() && + while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() && !time_source_changed) { impl_->cv_.wait(lock); diff --git a/rclcpp/test/rclcpp/test_clock.cpp b/rclcpp/test/rclcpp/test_clock.cpp index 08dcbe632a..b1075d7590 100644 --- a/rclcpp/test/rclcpp/test_clock.cpp +++ b/rclcpp/test/rclcpp/test_clock.cpp @@ -25,7 +25,16 @@ #include "../utils/rclcpp_gtest_macros.hpp" using namespace std::chrono_literals; -class TestClockWakeup : public ::testing::Test + +enum class ClockType +{ + RCL_STEADY_TIME, + RCL_SYSTEM_TIME, + RCL_ROS_TIME, +}; + + +class TestClockWakeup : public ::testing::TestWithParam { public: void test_wakeup_before_sleep(const rclcpp::Clock::SharedPtr & clock) @@ -111,20 +120,15 @@ class TestClockWakeup : public ::testing::Test rclcpp::Node::SharedPtr node; }; -TEST_F(TestClockWakeup, wakeup_sleep_steay_time) { - auto clock = std::make_shared(RCL_STEADY_TIME); - test_wakeup_after_sleep(clock); - test_wakeup_before_sleep(clock); -} - -TEST_F(TestClockWakeup, wakeup_sleep_system_time) { - auto clock = std::make_shared(RCL_SYSTEM_TIME); - test_wakeup_after_sleep(clock); - test_wakeup_before_sleep(clock); -} +INSTANTIATE_TEST_SUITE_P( + Clocks, + TestClockWakeup, + ::testing::Values( + RCL_SYSTEM_TIME, RCL_ROS_TIME, RCL_STEADY_TIME +)); -TEST_F(TestClockWakeup, wakeup_sleep_ros_time_not_active) { - auto clock = std::make_shared(RCL_ROS_TIME); +TEST_P(TestClockWakeup, wakeup_sleep) { + auto clock = std::make_shared(GetParam()); test_wakeup_after_sleep(clock); test_wakeup_before_sleep(clock); } @@ -178,3 +182,55 @@ TEST_F(TestClockWakeup, no_wakeup_on_sim_time) { EXPECT_TRUE(thread_finished); EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); } + +TEST_F(TestClockWakeup, multiple_threads_wait_on_one_clock) { + auto clock = std::make_shared(RCL_ROS_TIME); + + std::vector thread_finished(10, false); + + std::vector threads; + + for(size_t nr = 0; nr < thread_finished.size(); nr++) { + threads.push_back(std::thread( + [&clock, &thread_finished, nr]() + { + // make sure the thread starts sleeping late + clock->sleep_until(clock->now() + std::chrono::seconds(10)); + thread_finished[nr] = true; + })); + } + + // wait a bit so all threads can execute the sleep_until + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + for(const bool & finished : thread_finished) { + EXPECT_FALSE(finished); + } + + rclcpp::shutdown(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + bool threads_finished = false; + while (!threads_finished && start_time + std::chrono::seconds(1) > cur_time) { + threads_finished = true; + for(const bool finished : thread_finished) { + if(!finished) { + threads_finished = false; + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + for(const bool finished : thread_finished) { + EXPECT_TRUE(finished); + } + + for(auto & thread : threads) { + thread.join(); + } + + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); +}