Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[events executor] - Fix Behavior with Timer Cancel #2375

Merged
merged 13 commits into from
Jan 30, 2024
12 changes: 7 additions & 5 deletions rclcpp/include/rclcpp/experimental/timers_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <thread>
#include <utility>
#include <vector>

#include "rclcpp/context.hpp"
#include "rclcpp/timer.hpp"

Expand Down Expand Up @@ -172,13 +172,14 @@ class TimersManager
* @brief Get the amount of time before the next timer triggers.
* This function is thread safe.
*
* @return std::chrono::nanoseconds to wait,
* @return std::optional<std::chrono::nanoseconds> to wait,
* the returned value could be negative if the timer is already expired
* or std::chrono::nanoseconds::max() if there are no timers stored in the object.
* If the head timer was cancelled, then this will return a nullopt.
* @throws std::runtime_error if the timers thread was already running.
*/
RCLCPP_PUBLIC
std::chrono::nanoseconds get_head_timeout();
std::optional<std::chrono::nanoseconds> get_head_timeout();

private:
RCLCPP_DISABLE_COPY(TimersManager)
Expand Down Expand Up @@ -512,12 +513,13 @@ class TimersManager
* @brief Get the amount of time before the next timer triggers.
* This function is not thread safe, acquire a mutex before calling it.
*
* @return std::chrono::nanoseconds to wait,
* @return std::optional<std::chrono::nanoseconds> to wait,
* the returned value could be negative if the timer is already expired
* or std::chrono::nanoseconds::max() if the heap is empty.
* If the head timer was cancelled, then this will return a nullopt.
* This function is not thread safe, acquire the timers_mutex_ before calling it.
*/
std::chrono::nanoseconds get_head_timeout_unsafe();
std::optional<std::chrono::nanoseconds> get_head_timeout_unsafe();

/**
* @brief Executes all the timers currently ready when the function is invoked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,12 @@ EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
timeout = std::chrono::nanoseconds::max();
}

// Select the smallest between input timeout and timer timeout
// Select the smallest between input timeout and timer timeout.
// Cancelled timers are not considered.
bool is_timer_timeout = false;
auto next_timer_timeout = timers_manager_->get_head_timeout();
if (next_timer_timeout < timeout) {
timeout = next_timer_timeout;
if (next_timer_timeout.has_value() && next_timer_timeout.value() < timeout) {
timeout = next_timer_timeout.value();
is_timer_timeout = true;
}

Expand Down
45 changes: 32 additions & 13 deletions rclcpp/src/rclcpp/experimental/timers_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ void TimersManager::stop()
}
}

std::chrono::nanoseconds TimersManager::get_head_timeout()
std::optional<std::chrono::nanoseconds> TimersManager::get_head_timeout()
{
// Do not allow to interfere with the thread running
if (running_) {
Expand Down Expand Up @@ -169,7 +169,7 @@ void TimersManager::execute_ready_timer(const rclcpp::TimerBase * timer_id)
}
}

std::chrono::nanoseconds TimersManager::get_head_timeout_unsafe()
std::optional<std::chrono::nanoseconds> TimersManager::get_head_timeout_unsafe()
{
// If we don't have any weak pointer, then we just return maximum timeout
if (weak_timers_heap_.empty()) {
Expand All @@ -191,7 +191,9 @@ std::chrono::nanoseconds TimersManager::get_head_timeout_unsafe()
}
head_timer = locked_heap.front();
}

if (head_timer->is_canceled()) {
return std::nullopt;
}
return head_timer->time_until_trigger();
}

Expand Down Expand Up @@ -242,17 +244,34 @@ void TimersManager::run_timers()
// Lock mutex
std::unique_lock<std::mutex> lock(timers_mutex_);

std::chrono::nanoseconds time_to_sleep = get_head_timeout_unsafe();
std::optional<std::chrono::nanoseconds> time_to_sleep = get_head_timeout_unsafe();

// If head timer was cancelled, try to reheap and get a new head.
// This avoids an edge condition where head timer is cancelled, but other
// valid timers remain in the heap.
if (!time_to_sleep.has_value()) {
// Re-heap to (possibly) move cancelled timer from head of heap. If
// entire heap is cancelled, this will still result in a nullopt.
TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
locked_heap.heapify();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The queue is sorted according to the return a->time_until_trigger() > b->time_until_trigger(); comparison.
What does time_until_trigger returns for a cancelled timer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon a cancelled timer, time_until_trigger returns std::chrono::nanoseconds::max(), so resorting should force cancelled timers to be at the bottom of the heap. In a previous iteration of the PR, we had updated the comparison function for the heap to explicitly check for canceled timers and ensure they were on the bottom of the heap, but that was removed since it's obsolete if std::chrono::nanoseconds::max() is always returned for a cancelled timer.

weak_timers_heap_.store(locked_heap);
time_to_sleep = get_head_timeout_unsafe();
}

// No need to wait if a timer is already available
if (time_to_sleep > std::chrono::nanoseconds::zero()) {
if (time_to_sleep != std::chrono::nanoseconds::max()) {
// Wait until timeout or notification that timers have been updated
timers_cv_.wait_for(lock, time_to_sleep, [this]() {return timers_updated_;});
} else {
// Wait until notification that timers have been updated
timers_cv_.wait(lock, [this]() {return timers_updated_;});
}
// If no timers, or all timers cancelled, wait for an update.
if (!time_to_sleep.has_value() || (time_to_sleep.value() == std::chrono::nanoseconds::max()) ) {
// Wait until notification that timers have been updated
timers_cv_.wait(lock, [this]() {return timers_updated_;});

// Re-heap in case ordering changed due to a cancelled timer
// re-activating.
TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
locked_heap.heapify();
weak_timers_heap_.store(locked_heap);
} else if (time_to_sleep.value() != std::chrono::nanoseconds::zero()) {
// If time_to_sleep is zero, we immediately execute. Otherwise, wait
// until timeout or notification that timers have been updated
timers_cv_.wait_for(lock, time_to_sleep.value(), [this]() {return timers_updated_;});
}

// Reset timers updated flag
Expand Down
Loading