Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[events executor] - Fix Behavior with Timer Cancel #2375

Merged
merged 13 commits into from
Jan 30, 2024
3 changes: 1 addition & 2 deletions rclcpp/include/rclcpp/experimental/timers_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <thread>
#include <utility>
#include <vector>

#include "rclcpp/context.hpp"
#include "rclcpp/timer.hpp"

Expand Down Expand Up @@ -517,7 +516,7 @@ class TimersManager
* or std::chrono::nanoseconds::max() if the heap is empty.
* This function is not thread safe, acquire the timers_mutex_ before calling it.
*/
std::chrono::nanoseconds get_head_timeout_unsafe();
std::chrono::nanoseconds get_head_timeout_unsafe(bool& head_was_canceled);

/**
* @brief Executes all the timers currently ready when the function is invoked
Expand Down
18 changes: 14 additions & 4 deletions rclcpp/src/rclcpp/experimental/timers_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ std::chrono::nanoseconds TimersManager::get_head_timeout()
}

std::unique_lock<std::mutex> lock(timers_mutex_);
return this->get_head_timeout_unsafe();
bool head_was_cancelled;
return this->get_head_timeout_unsafe(head_was_cancelled);
}

size_t TimersManager::get_number_ready_timers()
Expand Down Expand Up @@ -169,7 +170,7 @@ void TimersManager::execute_ready_timer(const rclcpp::TimerBase * timer_id)
}
}

std::chrono::nanoseconds TimersManager::get_head_timeout_unsafe()
std::chrono::nanoseconds TimersManager::get_head_timeout_unsafe(bool& head_was_cancelled)
{
// If we don't have any weak pointer, then we just return maximum timeout
if (weak_timers_heap_.empty()) {
Expand All @@ -191,6 +192,7 @@ std::chrono::nanoseconds TimersManager::get_head_timeout_unsafe()
}
head_timer = locked_heap.front();
}
head_was_cancelled = head_timer->is_canceled();
mwcondino marked this conversation as resolved.
Show resolved Hide resolved

return head_timer->time_until_trigger();
}
Expand Down Expand Up @@ -242,14 +244,22 @@ void TimersManager::run_timers()
// Lock mutex
std::unique_lock<std::mutex> lock(timers_mutex_);

std::chrono::nanoseconds time_to_sleep = get_head_timeout_unsafe();
bool head_was_cancelled = false;
std::chrono::nanoseconds time_to_sleep = get_head_timeout_unsafe(head_was_cancelled);

// No need to wait if a timer is already available
if (time_to_sleep > std::chrono::nanoseconds::zero()) {
if (time_to_sleep != std::chrono::nanoseconds::max()) {
// Wait until timeout or notification that timers have been updated
timers_cv_.wait_for(lock, time_to_sleep, [this]() {return timers_updated_;});
} else {
}
else if (head_was_cancelled) {
Copy link
Collaborator

@alsora alsora Nov 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if there's only 1 timer which is cancelled? Will we heap keep entering here at every iteration, without ever sleeping?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent point - as-is, this would result in a busy loop as you suggest!

One possible fix would be to explicitly check the size of the heap and if it is 1, we can do a similar wait for timers_updated_. Any thoughts there? To me, that would result in a slightly messier run_timers method, but I don't know how else we would cleanly handle this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the single timer handling, but run_timers is getting a bit messy.
Happy to refactor/reorganize it for clarity if you would like, just let me know

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that the fix works.
What if there are more than 1 timers but they are all cancelled?

I think that the correct approach would likely require to:
a) remove cancelled timers from the heap
b) have a notification to put them back in when they are re-activated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, you are correct, this approach was brittle - sorry about that.

Will implement what you suggested here, thank you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alsora sorry for the delay here - had a busy couple of weeks.

@gusbrigantino and I refactored the code, and we came up with an implementation of run_timers that does not require storing cancelled timers and removing them from the heap. Rather, the approach is to try to re-heap if the head timer was cancelled, then re-organize some logic from that point on. If there are non-cancelled timers in the heap, then the re-heap will force them to be at the front and thus head is now valid and the logic remains the same. However, if all timers in the heap are cancelled, the behavior now matches the previous behavior with no timers in the heap -- we wait until timers_updated is true.

We added a few more test cases to validate a couple of different scenarios.

Please let us know if that makes sense and works for you!

// Wait until notification that timers have been updated
TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
locked_heap.heapify();
weak_timers_heap_.store(locked_heap);
}
else {
// Wait until notification that timers have been updated
timers_cv_.wait(lock, [this]() {return timers_updated_;});
}
Expand Down
102 changes: 102 additions & 0 deletions rclcpp/test/rclcpp/executors/test_executors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -818,3 +818,105 @@ TYPED_TEST(TestIntraprocessExecutors, testIntraprocessRetrigger) {
executor.spin();
EXPECT_EQ(kNumMessages, this->callback_count.load());
}

class TimerNode : public rclcpp::Node {
public:
TimerNode(std::string subname)
: Node("timer_node", subname) {


timer1_ = rclcpp::create_timer(this->get_node_base_interface(), get_node_timers_interface(),
get_clock(), 1ms,
std::bind(&TimerNode::Timer1Callback, this));

timer2_ =
rclcpp::create_timer(this->get_node_base_interface(), get_node_timers_interface(),
get_clock(), 1ms,
std::bind(&TimerNode::Timer2Callback, this));
}

int GetTimer1Cnt() { return cnt1_; }
int GetTimer2Cnt() { return cnt2_; }
private:
void Timer1Callback() {
RCLCPP_DEBUG(this->get_logger(), "Timer 1!");
if (++cnt1_ > 5) {
RCLCPP_DEBUG(this->get_logger(), "Timer cancelling itself!");
timer1_->cancel();
}
}

void Timer2Callback() {
RCLCPP_DEBUG(this->get_logger(), "Timer 2!");
cnt2_++;
}

rclcpp::TimerBase::SharedPtr timer1_;
rclcpp::TimerBase::SharedPtr timer2_;
int cnt1_ = 0;
int cnt2_ = 0;
};


template<typename T>
class TestTimerCancelBehavior : public ::testing::Test
{
public:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}

static void TearDownTestCase()
{
rclcpp::shutdown();
}

void SetUp()
{
const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info();
std::stringstream test_name;
test_name << test_info->test_case_name() << "_" << test_info->name();
node = std::make_shared<TimerNode>(test_name.str());
}

void TearDown()
{
node.reset();
}


std::shared_ptr<TimerNode> node;
};

TYPED_TEST_SUITE(TestTimerCancelBehavior, ExecutorTypes, ExecutorTypeNames);

TYPED_TEST(TestTimerCancelBehavior, testOneTimerCancelledWithExecutorSpin) {
// Validate that cancelling one timer yields no change in behavior for other
// timers. Specifically, this tests the behavior when using spin() to run the
// executor, which is the most common usecase.

// Spin the executor in a standalone thread
using ExecutorType = TypeParam;
ExecutorType executor;
executor.add_node(this->node);
std::thread real_time_thread([&executor]() {
executor.spin();
});

// Cancel to stop the spin after some time.
std::this_thread::sleep_for(15ms);
executor.cancel();

size_t t1_runs = this->node->GetTimer1Cnt();
size_t t2_runs = this->node->GetTimer2Cnt();
EXPECT_NE(t1_runs, t2_runs);
// Check that t2 has significantly more calls
EXPECT_LT(t1_runs + 5, t2_runs);

// Clean up thread object
if (real_time_thread.joinable()) {
real_time_thread.join();
}
}

46 changes: 46 additions & 0 deletions rclcpp/test/rclcpp/test_timers_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,49 @@ TEST_F(TestTimersManager, infinite_loop)
EXPECT_LT(0u, t1_runs);
EXPECT_LT(0u, t2_runs);
}

// Validate that cancelling one timer yields no change in behavior for other
// timers.
TEST_F(TestTimersManager, check_one_timer_cancel_doesnt_affect_other_timers)
{
auto timers_manager = std::make_shared<TimersManager>(
rclcpp::contexts::get_global_default_context());

size_t t1_runs = 0;
std::shared_ptr<TimerT> t1;
// After a while cancel t1. Don't remove it though.
// Simulates typical usage in a Node where a timer is cancelled but not removed,
// since typical users aren't going to mess around with the timer manager.
t1 = TimerT::make_shared(
1ms,
[&t1_runs, &t1]() {
t1_runs++;
if (t1_runs == 5) {
t1->cancel();
}
},
rclcpp::contexts::get_global_default_context());

size_t t2_runs = 0;
auto t2 = TimerT::make_shared(
1ms,
[&t2_runs]() {
t2_runs++;
},
rclcpp::contexts::get_global_default_context());

// Add timers
timers_manager->add_timer(t1);
timers_manager->add_timer(t2);

// Start timers thread
timers_manager->start();

std::this_thread::sleep_for(15ms);

// t1 has stopped running
EXPECT_NE(t1_runs, t2_runs);
// Check that t2 has significantly more calls
EXPECT_LT(t1_runs + 5, t2_runs);
timers_manager->stop();
}