Skip to content

Commit

Permalink
Merge pull request #46 from mauropasse/mauro/fix-spin-some-events-exe…
Browse files Browse the repository at this point in the history
…cutor

Proper spin_some behaviour on EventsExecutor
  • Loading branch information
iRobot ROS authored Feb 19, 2021
2 parents e6d6426 + feef4e2 commit bb491bd
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 80 deletions.
33 changes: 29 additions & 4 deletions rclcpp/include/rclcpp/executors/events_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,36 @@ class EventsExecutor : public rclcpp::Executor

/// Events executor implementation of spin some
/**
* executor.provide_callbacks();
* while(condition) {
* executor.spin_some();
* }
* This non-blocking function will execute the timers and events
* that were ready when this API was called, until timeout or no
* more work available. New ready-timers/events arrived while
* executing work, won't be taken into account here.
*
* Example:
* while(condition) {
* spin_some();
* sleep(); // User should have some sync work or
* // sleep to avoid a 100% CPU usage
* }
*/
RCLCPP_PUBLIC
void
spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0)) override;

/// Events executor implementation of spin all
/**
* This non-blocking function will execute timers and events
* until timeout or no more work available. If new ready-timers/events
* arrive while executing work available, they will be executed
* as long as the timeout hasn't expired.
*
* Example:
* while(condition) {
* spin_all();
* sleep(); // User should have some sync work or
* // sleep to avoid a 100% CPU usage
* }
*/
RCLCPP_PUBLIC
void
spin_all(std::chrono::nanoseconds max_duration) override;
Expand Down Expand Up @@ -176,6 +197,10 @@ class EventsExecutor : public rclcpp::Executor
void
spin_once_impl(std::chrono::nanoseconds timeout) override;

RCLCPP_PUBLIC
void
spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive);

private:
RCLCPP_DISABLE_COPY(EventsExecutor)

Expand Down
10 changes: 6 additions & 4 deletions rclcpp/include/rclcpp/executors/timers_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,13 @@ class TimersManager
std::chrono::nanoseconds execute_ready_timers();

/**
* @brief Executes one ready timer if available
*
* @return true if there was a timer ready
* @brief Executes head timer if ready at time point.
* @param tp the timepoint to check for
* @return true if head timer was ready at tp
*/
bool execute_head_timer();
bool execute_head_timer(
std::chrono::time_point<std::chrono::steady_clock> tp =
std::chrono::time_point<std::chrono::steady_clock>::max());

/**
* @brief Get the amount of time before the next timer expires.
Expand Down
9 changes: 9 additions & 0 deletions rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ class EventsQueue
bool
empty() const = 0;

/**
* @brief Returns the number of elements in the queue.
* @return the number of elements in the queue.
*/
RCLCPP_PUBLIC
virtual
size_t
size() const = 0;

/**
* @brief Initializes the queue
*/
Expand Down
12 changes: 12 additions & 0 deletions rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ class SimpleEventsQueue : public EventsQueue
return event_queue_.empty();
}

/**
* @brief Returns the number of elements in the queue.
* @return the number of elements in the queue.
*/
RCLCPP_PUBLIC
virtual
size_t
size() const
{
return event_queue_.size();
}

/**
* @brief Initializes the queue
*/
Expand Down
108 changes: 43 additions & 65 deletions rclcpp/src/rclcpp/executors/events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,14 @@ EventsExecutor::spin()
// When condition variable is notified, check this predicate to proceed
auto has_event_predicate = [this]() {return !events_queue_->empty();};

// Local event queue to allow entities to push events while we execute them
EventQueue execution_event_queue;

timers_manager_->start();

while (rclcpp::ok(context_) && spinning.load()) {
std::unique_lock<std::mutex> push_lock(push_mutex_);
// We wait here until something has been pushed to the event queue
events_queue_cv_.wait(push_lock, has_event_predicate);
// Local event queue to allow entities to push events while we execute them
execution_event_queue = events_queue_->get_all_events();
EventQueue execution_event_queue = events_queue_->get_all_events();
// Unlock the mutex
push_lock.unlock();
// Consume all available events, this queue will be empty at the end of the function
Expand All @@ -82,43 +79,12 @@ EventsExecutor::spin()
void
EventsExecutor::spin_some(std::chrono::nanoseconds max_duration)
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin_some() called while already spinning");
}
RCLCPP_SCOPE_EXIT(this->spinning.store(false););

// In this context a 0 input max_duration means no duration limit
if (std::chrono::nanoseconds(0) == max_duration) {
max_duration = timers_manager_->MAX_TIME;
}

// This function will wait until the first of the following events occur:
// - The input max_duration is elapsed
// - A timer triggers
// - An executor event is received and processed

// When condition variable is notified, check this predicate to proceed
auto has_event_predicate = [this]() {return !events_queue_->empty();};


// Select the smallest between input max_duration and timer timeout
auto next_timer_timeout = timers_manager_->get_head_timeout();
if (next_timer_timeout < max_duration) {
max_duration = next_timer_timeout;
}

std::unique_lock<std::mutex> push_lock(push_mutex_);
// Wait until timeout or event
events_queue_cv_.wait_for(push_lock, max_duration, has_event_predicate);
// Local event queue to allow entities to push events while we execute them
EventQueue execution_event_queue = events_queue_->get_all_events();
// We don't need the lock anymore
push_lock.unlock();

// Execute all ready timers
timers_manager_->execute_ready_timers();
// Consume all available events, this queue will be empty at the end of the function
this->consume_all_events(execution_event_queue);
return this->spin_some_impl(max_duration, false);
}

void
Expand All @@ -127,54 +93,66 @@ EventsExecutor::spin_all(std::chrono::nanoseconds max_duration)
if (max_duration <= 0ns) {
throw std::invalid_argument("max_duration must be positive");
}
return this->spin_some_impl(max_duration, true);
}

void
EventsExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin_some() called while already spinning");
}
RCLCPP_SCOPE_EXIT(this->spinning.store(false););

// When condition variable is notified, check this predicate to proceed
auto has_event_predicate = [this]() {return !events_queue_->empty();};

// Local event queue to allow entities to push events while we execute them
EventQueue execution_event_queue;
RCLCPP_SCOPE_EXIT(this->spinning.store(false););

auto start = std::chrono::steady_clock::now();

auto max_duration_not_elapsed = [max_duration, start]() {
auto elapsed_time = std::chrono::steady_clock::now() - start;
return elapsed_time < max_duration;
};

// Select the smallest between input max duration and timer timeout
auto next_timer_timeout = timers_manager_->get_head_timeout();
if (next_timer_timeout < max_duration) {
max_duration = next_timer_timeout;
}
size_t ready_events_at_start = 0;
size_t executed_events = 0;

{
// Wait once until timeout or event
std::unique_lock<std::mutex> push_lock(push_mutex_);
events_queue_cv_.wait_for(push_lock, max_duration, has_event_predicate);
if (!exhaustive) {
// Get the number of events ready at start
std::unique_lock<std::mutex> lock(push_mutex_);
ready_events_at_start = events_queue_->size();
lock.unlock();
}

auto timeout = timers_manager_->get_head_timeout();

// Keep executing until no more work to do or timeout expired
while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
std::unique_lock<std::mutex> push_lock(push_mutex_);
execution_event_queue = events_queue_->get_all_events();
push_lock.unlock();
// Execute first ready event from queue if exists
if (exhaustive || (executed_events < ready_events_at_start)) {
std::unique_lock<std::mutex> lock(push_mutex_);
bool has_event = !events_queue_->empty();

if (has_event) {
rmw_listener_event_t event = events_queue_->front();
events_queue_->pop();
this->execute_event(event);
executed_events++;
continue;
}
}

// Exit if there is no more work to do
const bool ready_timer = timeout < 0ns;
const bool has_events = !execution_event_queue.empty();
if (!ready_timer && !has_events) {
break;
bool timer_executed;

if (exhaustive) {
// Execute timer if is ready
timer_executed = timers_manager_->execute_head_timer();
} else {
// Execute timer if was ready at start
timer_executed = timers_manager_->execute_head_timer(start);
}

// Execute all ready work
timeout = timers_manager_->execute_ready_timers();
this->consume_all_events(execution_event_queue);
if (timer_executed) {
continue;
}

// If there's no more work available, exit
break;
}
}

Expand Down
17 changes: 15 additions & 2 deletions rclcpp/src/rclcpp/executors/timers_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ std::chrono::nanoseconds TimersManager::execute_ready_timers()
return this->get_head_timeout_unsafe(timers_heap);
}

bool TimersManager::execute_head_timer()
bool TimersManager::execute_head_timer(
std::chrono::time_point<std::chrono::steady_clock> timepoint)
{
// Do not allow to interfere with the thread running
if (running_) {
Expand All @@ -127,13 +128,25 @@ bool TimersManager::execute_head_timer()
std::unique_lock<std::mutex> lock(timers_mutex_);

TimersHeap timers_heap = weak_timers_heap_.validate_and_lock();

// Nothing to do if we don't have any timer
if (timers_heap.empty()) {
return false;
}

TimerPtr head = timers_heap.front();
if (head->is_ready()) {

bool timer_ready = false;

auto max_time = std::chrono::time_point<std::chrono::steady_clock>::max();

if (timepoint != max_time) {
timer_ready = timer_was_ready_at_tp(head, timepoint);
} else {
timer_ready = head->is_ready();
}

if (timer_ready) {
// Head timer is ready, execute
head->execute_callback();
timers_heap.heapify_root();
Expand Down
9 changes: 9 additions & 0 deletions rclcpp/test/rclcpp/executors/test_events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ TEST_F(TestEventsExecutor, spin_some_max_duration)
t_runs++;
});

// Sleep some time for the timer to be ready when spin
std::this_thread::sleep_for(10ms);

EventsExecutor executor;
executor.add_node(node);

Expand All @@ -209,6 +212,9 @@ TEST_F(TestEventsExecutor, spin_some_zero_duration)
t_runs++;
});

// Sleep some time for the timer to be ready when spin
std::this_thread::sleep_for(20ms);

EventsExecutor executor;
executor.add_node(node);
executor.spin_some(0ms);
Expand Down Expand Up @@ -248,6 +254,9 @@ TEST_F(TestEventsExecutor, spin_all_max_duration)
t_runs++;
});

// Sleep some time for the timer to be ready when spin
std::this_thread::sleep_for(10ms);

EventsExecutor executor;
executor.add_node(node);

Expand Down
6 changes: 1 addition & 5 deletions rclcpp/test/rclcpp/executors/test_executors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ TYPED_TEST(TestExecutorsStable, spinAll) {
// executor.
bool spin_exited = false;
std::thread spinner([&spin_exited, &executor, this]() {
std::this_thread::sleep_for(10ms);
executor.spin_all(1s);
executor.remove_node(this->node, true);
spin_exited = true;
Expand Down Expand Up @@ -547,11 +548,6 @@ TYPED_TEST(TestExecutorsStable, spinSome) {
spin_exited = true;
});

// Give some time for executor to start spinning
// otherwise when it will start looking for work to do it will already find
// more than 1 notification
std::this_thread::sleep_for(10ms);

// Do some work until sufficient calls to the waitable occur, but keep going until either
// count becomes too large, spin exits, or the 1 second timeout completes.
auto start = std::chrono::steady_clock::now();
Expand Down

0 comments on commit bb491bd

Please sign in to comment.