Skip to content

Commit

Permalink
feat: Added support for multi thread wait / shutdown
Browse files Browse the repository at this point in the history
This adds support for multiple threads waiting on the same
clock, while an shutdown is invoked.

Signed-off-by: Janosch Machowinski <j.machowinski@cellumation.com>
  • Loading branch information
Janosch Machowinski committed Apr 13, 2024
1 parent 218d86d commit 76c970b
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 20 deletions.
8 changes: 7 additions & 1 deletion rclcpp/include/rclcpp/clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,17 @@ class Clock
bool
ros_time_is_active();

/**
* Cancels an ongoing or future sleep operation of one thread.
*
* This function is intended for multi threaded signaling. It can
* be used by one thread, to wakeup another thread, using any
* of the sleep_ or wait_ methods.
*/
RCLCPP_PUBLIC
void
cancel_sleep_or_wait();


/// Return the rcl_clock_t clock handle
RCLCPP_PUBLIC
rcl_clock_t *
Expand Down
15 changes: 10 additions & 5 deletions rclcpp/src/rclcpp/clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Clock::Impl
rcl_clock_t rcl_clock_;
rcl_allocator_t allocator_;
bool stop_sleeping_ = false;
bool shutdown_ = false;
std::condition_variable cv_;
std::mutex wait_mutex_;
std::mutex clock_mutex_;
Expand Down Expand Up @@ -109,7 +110,11 @@ Clock::sleep_until(
// Wake this thread if the context is shutdown
rclcpp::OnShutdownCallbackHandle shutdown_cb_handle = context->add_on_shutdown_callback(
[this]() {
cancel_sleep_or_wait();
{
std::unique_lock lock(impl_->wait_mutex_);
impl_->shutdown_ = true;
}
impl_->cv_.notify_one();
});
// No longer need the shutdown callback when this function exits
auto callback_remover = rcpputils::scope_exit(
Expand All @@ -127,7 +132,7 @@ Clock::sleep_until(

// loop over spurious wakeups but notice shutdown or stop of sleep
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && context->is_valid()) {
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) {
impl_->cv_.wait_until(lock, chrono_until);
}
impl_->stop_sleeping_ = false;
Expand All @@ -139,7 +144,7 @@ Clock::sleep_until(

// loop over spurious wakeups but notice shutdown or stop of sleep
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && context->is_valid()) {
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) {
impl_->cv_.wait_until(lock, system_time);
}
impl_->stop_sleeping_ = false;
Expand Down Expand Up @@ -171,7 +176,7 @@ Clock::sleep_until(

// loop over spurious wakeups but notice shutdown, stop of sleep or time source change
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && context->is_valid() &&
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() &&
!time_source_changed)
{
impl_->cv_.wait_until(lock, system_time);
Expand All @@ -182,7 +187,7 @@ Clock::sleep_until(
// Just wait without "until" because installed
// jump callbacks wake the cv on every new sample.
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && context->is_valid() &&
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() &&
!time_source_changed)
{
impl_->cv_.wait(lock);
Expand Down
84 changes: 70 additions & 14 deletions rclcpp/test/rclcpp/test_clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,16 @@
#include "../utils/rclcpp_gtest_macros.hpp"

using namespace std::chrono_literals;
class TestClockWakeup : public ::testing::Test

enum class ClockType
{
RCL_STEADY_TIME,
RCL_SYSTEM_TIME,
RCL_ROS_TIME,
};


class TestClockWakeup : public ::testing::TestWithParam<rcl_clock_type_e>
{
public:
void test_wakeup_before_sleep(const rclcpp::Clock::SharedPtr & clock)
Expand Down Expand Up @@ -111,20 +120,15 @@ class TestClockWakeup : public ::testing::Test
rclcpp::Node::SharedPtr node;
};

TEST_F(TestClockWakeup, wakeup_sleep_steay_time) {
auto clock = std::make_shared<rclcpp::Clock>(RCL_STEADY_TIME);
test_wakeup_after_sleep(clock);
test_wakeup_before_sleep(clock);
}

TEST_F(TestClockWakeup, wakeup_sleep_system_time) {
auto clock = std::make_shared<rclcpp::Clock>(RCL_SYSTEM_TIME);
test_wakeup_after_sleep(clock);
test_wakeup_before_sleep(clock);
}
INSTANTIATE_TEST_SUITE_P(
Clocks,
TestClockWakeup,
::testing::Values(
RCL_SYSTEM_TIME, RCL_ROS_TIME, RCL_STEADY_TIME
));

TEST_F(TestClockWakeup, wakeup_sleep_ros_time_not_active) {
auto clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
TEST_P(TestClockWakeup, wakeup_sleep) {
auto clock = std::make_shared<rclcpp::Clock>(GetParam());
test_wakeup_after_sleep(clock);
test_wakeup_before_sleep(clock);
}
Expand Down Expand Up @@ -178,3 +182,55 @@ TEST_F(TestClockWakeup, no_wakeup_on_sim_time) {
EXPECT_TRUE(thread_finished);
EXPECT_LT(cur_time, start_time + std::chrono::seconds(1));
}

TEST_F(TestClockWakeup, multiple_threads_wait_on_one_clock) {
auto clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);

std::vector<bool> thread_finished(10, false);

std::vector<std::thread> threads;

for(size_t nr = 0; nr < thread_finished.size(); nr++) {
threads.push_back(std::thread(
[&clock, &thread_finished, nr]()
{
// make sure the thread starts sleeping late
clock->sleep_until(clock->now() + std::chrono::seconds(10));
thread_finished[nr] = true;
}));
}

// wait a bit so all threads can execute the sleep_until
std::this_thread::sleep_for(std::chrono::milliseconds(500));

for(const bool & finished : thread_finished) {
EXPECT_FALSE(finished);
}

rclcpp::shutdown();

auto start_time = std::chrono::steady_clock::now();
auto cur_time = start_time;
bool threads_finished = false;
while (!threads_finished && start_time + std::chrono::seconds(1) > cur_time) {
threads_finished = true;
for(const bool finished : thread_finished) {
if(!finished) {
threads_finished = false;
}
}

std::this_thread::sleep_for(std::chrono::milliseconds(10));
cur_time = std::chrono::steady_clock::now();
}

for(const bool finished : thread_finished) {
EXPECT_TRUE(finished);
}

for(auto & thread : threads) {
thread.join();
}

EXPECT_LT(cur_time, start_time + std::chrono::seconds(1));
}

0 comments on commit 76c970b

Please sign in to comment.