Skip to content

Commit

Permalink
Use a different mechanism to avoid timers being scheduled multiple ti…
Browse files Browse the repository at this point in the history
…mes by the MultiThreadedExecutor (#1692)

Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com>
  • Loading branch information
ivanpauno authored Jul 23, 2021
1 parent f7a3014 commit 5c4f809
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 216 deletions.
1 change: 0 additions & 1 deletion rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/clock.cpp
src/rclcpp/context.cpp
src/rclcpp/contexts/default_context.cpp
src/rclcpp/detail/mutex_two_priorities.cpp
src/rclcpp/detail/resolve_parameter_overrides.cpp
src/rclcpp/detail/rmw_implementation_specific_payload.cpp
src/rclcpp/detail/rmw_implementation_specific_publisher_payload.cpp
Expand Down
76 changes: 0 additions & 76 deletions rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp

This file was deleted.

5 changes: 1 addition & 4 deletions rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <thread>
#include <unordered_map>

#include "rclcpp/detail/mutex_two_priorities.hpp"
#include "rclcpp/executor.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/memory_strategies.hpp"
Expand Down Expand Up @@ -82,12 +81,10 @@ class MultiThreadedExecutor : public rclcpp::Executor
private:
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)

detail::MutexTwoPriorities wait_mutex_;
std::mutex wait_mutex_;
size_t number_of_threads_;
bool yield_before_execute_;
std::chrono::nanoseconds next_exec_timeout_;

std::set<TimerBase::SharedPtr> scheduled_timers_;
};

} // namespace executors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,14 +388,19 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
++it;
continue;
}
if (!timer->call()) {
// timer was cancelled, skip it.
++it;
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec.timer = timer;
any_exec.callback_group = group;
any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
timer_handles_.erase(it);
return;
}
// Else, the service is no longer valid, remove it and continue
// Else, the timer is no longer valid, remove it and continue
it = timer_handles_.erase(it);
}
}
Expand Down
30 changes: 25 additions & 5 deletions rclcpp/include/rclcpp/timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ class TimerBase
void
reset();

/// Indicate that we're about to execute the callback.
/**
* The multithreaded executor takes advantage of this to avoid scheduling
* the callback multiple times.
*
* \return `true` if the callback should be executed, `false` if the timer was canceled.
*/
RCLCPP_PUBLIC
virtual bool
call() = 0;

/// Call the callback function when the timer signal is emitted.
RCLCPP_PUBLIC
virtual void
Expand Down Expand Up @@ -192,19 +203,28 @@ class GenericTimer : public TimerBase
}

/**
* \sa rclcpp::TimerBase::execute_callback
* \throws std::runtime_error if it failed to notify timer that callback occurred
* \sa rclcpp::TimerBase::call
* \throws std::runtime_error if it failed to notify timer that callback will occurr
*/
void
execute_callback() override
bool
call() override
{
rcl_ret_t ret = rcl_timer_call(timer_handle_.get());
if (ret == RCL_RET_TIMER_CANCELED) {
return;
return false;
}
if (ret != RCL_RET_OK) {
throw std::runtime_error("Failed to notify timer that callback occurred");
}
return true;
}

/**
* \sa rclcpp::TimerBase::execute_callback
*/
void
execute_callback() override
{
TRACEPOINT(callback_start, static_cast<const void *>(&callback_), false);
execute_callback_delegate<>();
TRACEPOINT(callback_end, static_cast<const void *>(&callback_));
Expand Down
104 changes: 0 additions & 104 deletions rclcpp/src/rclcpp/detail/mutex_two_priorities.cpp

This file was deleted.

27 changes: 2 additions & 25 deletions rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "rclcpp/utilities.hpp"
#include "rclcpp/scope_exit.hpp"

using rclcpp::detail::MutexTwoPriorities;
using rclcpp::executors::MultiThreadedExecutor;

MultiThreadedExecutor::MultiThreadedExecutor(
Expand Down Expand Up @@ -52,8 +51,7 @@ MultiThreadedExecutor::spin()
std::vector<std::thread> threads;
size_t thread_id = 0;
{
auto low_priority_wait_mutex = wait_mutex_.get_low_priority_lockable();
std::lock_guard<MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
std::lock_guard wait_lock{wait_mutex_};
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
threads.emplace_back(func);
Expand All @@ -78,41 +76,20 @@ MultiThreadedExecutor::run(size_t)
while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_exec;
{
auto low_priority_wait_mutex = wait_mutex_.get_low_priority_lockable();
std::lock_guard<MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
std::lock_guard wait_lock{wait_mutex_};
if (!rclcpp::ok(this->context_) || !spinning.load()) {
return;
}
if (!get_next_executable(any_exec, next_exec_timeout_)) {
continue;
}
if (any_exec.timer) {
// Guard against multiple threads getting the same timer.
if (scheduled_timers_.count(any_exec.timer) != 0) {
// Make sure that any_exec's callback group is reset before
// the lock is released.
if (any_exec.callback_group) {
any_exec.callback_group->can_be_taken_from().store(true);
}
continue;
}
scheduled_timers_.insert(any_exec.timer);
}
}
if (yield_before_execute_) {
std::this_thread::yield();
}

execute_any_executable(any_exec);

if (any_exec.timer) {
auto high_priority_wait_mutex = wait_mutex_.get_high_priority_lockable();
std::lock_guard<MutexTwoPriorities::HighPriorityLockable> wait_lock(high_priority_wait_mutex);
auto it = scheduled_timers_.find(any_exec.timer);
if (it != scheduled_timers_.end()) {
scheduled_timers_.erase(it);
}
}
// Clear the callback_group to prevent the AnyExecutable destructor from
// resetting the callback group `can_be_taken_from`
any_exec.callback_group.reset();
Expand Down
1 change: 1 addition & 0 deletions rclcpp/test/rclcpp/node_interfaces/test_node_timers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class TestTimer : public rclcpp::TimerBase
: TimerBase(node->get_clock(), std::chrono::nanoseconds(1),
node->get_node_base_interface()->get_context()) {}

bool call() override {return true;}
void execute_callback() override {}
bool is_steady() override {return false;}
};
Expand Down

0 comments on commit 5c4f809

Please sign in to comment.