From 8ab69748e8972c6d6d839e3d82adcd4059bff4c9 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Tue, 2 Apr 2024 11:48:34 +0200 Subject: [PATCH] feat(Executor): Added spin API with exception handler. Signed-off-by: Janosch Machowinski # Conflicts: # rclcpp/src/rclcpp/experimental/timers_manager.cpp --- rclcpp/include/rclcpp/executor.hpp | 24 +++++++ .../executors/multi_threaded_executor.hpp | 6 +- .../executors/single_threaded_executor.hpp | 10 +++ .../static_single_threaded_executor.hpp | 10 +++ .../events_executor/events_executor.hpp | 12 ++++ .../rclcpp/experimental/timers_manager.hpp | 13 +++- rclcpp/src/rclcpp/executor.cpp | 66 +++++++++++++++++++ .../executors/multi_threaded_executor.cpp | 46 +++++-------- .../executors/single_threaded_executor.cpp | 17 +++++ .../static_single_threaded_executor.cpp | 35 ++++++++++ .../events_executor/events_executor.cpp | 29 ++++++++ .../rclcpp/experimental/timers_manager.cpp | 31 +++++++-- .../benchmark/benchmark_parameter_client.cpp | 5 +- .../test/rclcpp/executors/test_executors.cpp | 46 +++++++++++++ .../test_multi_threaded_executor.cpp | 43 ------------ rclcpp/test/rclcpp/test_executor.cpp | 4 ++ 16 files changed, 315 insertions(+), 82 deletions(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 56467db0ec..607c211cb9 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -84,6 +84,18 @@ class Executor virtual void spin() = 0; + /** + * \sa rclcpp::Executor:spin() for more details + * \throws std::runtime_error when spin() called while already spinning + * @param exception_handler will be called for every exception in the processing threads + * + * The exception_handler can be called from multiple threads at the same time. + * The exception_handler shall rethrow the exception it if wants to terminate the program. + */ + RCLCPP_PUBLIC + virtual void + spin(const std::function & exception_handler) = 0; + /// Add a callback group to an executor. /** * An executor can have zero or more callback groups which provide work during `spin` functions. @@ -461,6 +473,18 @@ class Executor void execute_any_executable(AnyExecutable & any_exec); + /// Find the next available executable and do the work associated with it. + /** + * \param[in] any_exec Union structure that can hold any executable type (timer, subscription, + * service, client). + * \throws std::runtime_error if there is an issue triggering the guard condition + */ + RCLCPP_PUBLIC + void + execute_any_executable( + AnyExecutable & any_exec, + const std::function & exception_handler); + /// Run subscription executable. /** * Do necessary setup and tear-down as well as executing the subscription. diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index b63ea2c41b..a75e7af8cc 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -79,7 +79,7 @@ class MultiThreadedExecutor : public rclcpp::Executor */ RCLCPP_PUBLIC void - spin(std::function exception_handler); + spin(const std::function & exception_handler) override; RCLCPP_PUBLIC size_t @@ -88,7 +88,9 @@ class MultiThreadedExecutor : public rclcpp::Executor protected: RCLCPP_PUBLIC void - run(size_t this_thread_number, std::function exception_handler); + run( + size_t this_thread_number, + const std::function & exception_handler); private: RCLCPP_DISABLE_COPY(MultiThreadedExecutor) diff --git a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp index 9dc6dec57b..f4d2bb9a51 100644 --- a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp @@ -65,6 +65,16 @@ class SingleThreadedExecutor : public rclcpp::Executor void spin() override; + /** + * \sa rclcpp::SingleThreadedExecutor:spin() for more details + * \throws std::runtime_error when spin() called while already spinning + * @param exception_handler will be called for every exception in the processing threads + * + * The exception_handler shall rethrow the exception it if wants to terminate the program. + */RCLCPP_PUBLIC + virtual void + spin(const std::function & exception_handler) override; + private: RCLCPP_DISABLE_COPY(SingleThreadedExecutor) }; diff --git a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp index 5294605eaf..076a80efed 100644 --- a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp @@ -79,6 +79,16 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor void spin() override; + /** + * \sa rclcpp::SingleThreadedExecutor:spin() for more details + * \throws std::runtime_error when spin() called while already spinning + * @param exception_handler will be called for every exception in the processing threads + * + * The exception_handler shall rethrow the exception it if wants to terminate the program. + */RCLCPP_PUBLIC + virtual void + spin(const std::function & exception_handler) override; + /// Static executor implementation of spin some /** * This non-blocking function will execute entities that diff --git a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp index 2365b67697..0b69a6a255 100644 --- a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp +++ b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp @@ -92,6 +92,18 @@ class EventsExecutor : public rclcpp::Executor void spin() override; + /** + * \sa rclcpp::Executor:spin() for more details + * \throws std::runtime_error when spin() called while already spinning + * @param exception_handler will be called for every exception in the processing threads + * + * The exception_handler can be called from multiple threads at the same time. + * The exception_handler shall rethrow the exception it if wants to terminate the program. + */ + RCLCPP_PUBLIC + void + spin(const std::function & exception_handler) override; + /// Events executor implementation of spin some /** * This non-blocking function will execute the timers and events diff --git a/rclcpp/include/rclcpp/experimental/timers_manager.hpp b/rclcpp/include/rclcpp/experimental/timers_manager.hpp index 558e58fd2f..70eecd9fc2 100644 --- a/rclcpp/include/rclcpp/experimental/timers_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/timers_manager.hpp @@ -125,9 +125,13 @@ class TimersManager /** * @brief Starts a thread that takes care of executing the timers stored in this object. * Function will throw an error if the timers thread was already running. + * + * @param exception_handler if valid, the execution of the timer will be done in a try catch block, + * and any occurring exception will be passed to the given handler */ RCLCPP_PUBLIC - void start(); + void start( + const std::function & exception_handler = std::function()); /** * @brief Stops the timers thread. @@ -510,6 +514,11 @@ class TimersManager */ void run_timers(); + /** + * @brief calls run_timers with a try catch block. + */ + void run_timers(const std::function & exception_handler); + /** * @brief Get the amount of time before the next timer triggers. * This function is not thread safe, acquire a mutex before calling it. @@ -526,7 +535,7 @@ class TimersManager * while keeping the heap correctly sorted. * This function is not thread safe, acquire the timers_mutex_ before calling it. */ - void execute_ready_timers_unsafe(); + void execute_ready_timers_unsafe(std::function exception_handler); // Callback to be called when timer is ready std::function +void execute_guarded( + const Function & fun, + const std::function & exception_handler) +{ + try { + fun(); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "Exception while spinning : " << e.what()); + + exception_handler(e); + } +} + +void +Executor::execute_any_executable( + AnyExecutable & any_exec, + const std::function & exception_handler) +{ + if (!spinning.load()) { + return; + } + + if (any_exec.timer) { + TRACETOOLS_TRACEPOINT( + rclcpp_executor_execute, + static_cast(any_exec.timer->get_timer_handle().get())); + execute_guarded([&any_exec]() {execute_timer(any_exec.timer);}, exception_handler); + } + if (any_exec.subscription) { + TRACETOOLS_TRACEPOINT( + rclcpp_executor_execute, + static_cast(any_exec.subscription->get_subscription_handle().get())); + execute_guarded( + [&any_exec]() { + execute_subscription(any_exec.subscription); + }, exception_handler); + + } + if (any_exec.service) { + execute_guarded([&any_exec]() {execute_service(any_exec.service);}, exception_handler); + + } + if (any_exec.client) { + execute_guarded([&any_exec]() {execute_client(any_exec.client);}, exception_handler); + + } + if (any_exec.waitable) { + execute_guarded([&any_exec]() {any_exec.waitable->execute(any_exec.data);}, exception_handler); + } + // Reset the callback_group, regardless of type + any_exec.callback_group->can_be_taken_from().store(true); + // Wake the wait, because it may need to be recalculated or work that + // was previously blocked is now available. + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition from execute_any_executable: ") + ex.what()); + } +} + + template static void diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 17198dd912..3325b08131 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -57,7 +57,7 @@ MultiThreadedExecutor::spin() void -MultiThreadedExecutor::spin(std::function exception_handler) +MultiThreadedExecutor::spin(const std::function & exception_handler) { if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); @@ -88,38 +88,28 @@ MultiThreadedExecutor::get_number_of_threads() void MultiThreadedExecutor::run( size_t this_thread_number, - std::function exception_handler) + const std::function & exception_handler) { - try { - - (void)this_thread_number; - while (rclcpp::ok(this->context_) && spinning.load()) { - rclcpp::AnyExecutable any_exec; - { - std::lock_guard wait_lock{wait_mutex_}; - if (!rclcpp::ok(this->context_) || !spinning.load()) { - return; - } - if (!get_next_executable(any_exec, next_exec_timeout_)) { - continue; - } + (void)this_thread_number; + while (rclcpp::ok(this->context_) && spinning.load()) { + rclcpp::AnyExecutable any_exec; + { + std::lock_guard wait_lock{wait_mutex_}; + if (!rclcpp::ok(this->context_) || !spinning.load()) { + return; } - if (yield_before_execute_) { - std::this_thread::yield(); + if (!get_next_executable(any_exec, next_exec_timeout_)) { + continue; } - - execute_any_executable(any_exec); - - // Clear the callback_group to prevent the AnyExecutable destructor from - // resetting the callback group `can_be_taken_from` - any_exec.callback_group.reset(); + } + if (yield_before_execute_) { + std::this_thread::yield(); } - } catch (const std::exception & e) { - RCLCPP_ERROR_STREAM( - rclcpp::get_logger("rclcpp"), - "Exception while spinning : " << e.what()); + execute_any_executable(any_exec, exception_handler); - exception_handler(e); + // Clear the callback_group to prevent the AnyExecutable destructor from + // resetting the callback group `can_be_taken_from` + any_exec.callback_group.reset(); } } diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp index e7f311c147..dd86641096 100644 --- a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -38,3 +38,20 @@ SingleThreadedExecutor::spin() } } } + + +void +SingleThreadedExecutor::spin( + const std::function & exception_handler) +{ + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + while (rclcpp::ok(this->context_) && spinning.load()) { + rclcpp::AnyExecutable any_executable; + if (get_next_executable(any_executable)) { + execute_any_executable(any_executable, exception_handler); + } + } +} diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index 15bca9374e..cd5f30254a 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -57,6 +57,41 @@ StaticSingleThreadedExecutor::spin() } } + +void +StaticSingleThreadedExecutor::spin( + const std::function & exception_handler) +{ + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + + // Set memory_strategy_ and exec_list_ based on weak_nodes_ + // Prepare wait_set_ based on memory_strategy_ + entities_collector_->init(&wait_set_, memory_strategy_); + + while (rclcpp::ok(this->context_) && spinning.load()) { + // Refresh wait set and wait for work + entities_collector_->refresh_wait_set(); + + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "Waitset Refresh"); + + try { + execute_ready_executables(); + + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "Exception while spinning : " << e.what()); + + exception_handler(e); + } + } +} + void StaticSingleThreadedExecutor::spin_some(std::chrono::nanoseconds max_duration) { diff --git a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp index 80dda31997..1cd224d534 100644 --- a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp +++ b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp @@ -126,6 +126,35 @@ EventsExecutor::spin() } } +void +EventsExecutor::spin(const std::function & exception_handler) +{ + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + + timers_manager_->start(exception_handler); + RCPPUTILS_SCOPE_EXIT(timers_manager_->stop(); ); + + while (rclcpp::ok(context_) && spinning.load()) { + // Wait until we get an event + ExecutorEvent event; + bool has_event = events_queue_->dequeue(event); + if (has_event) { + try { + this->execute_event(event); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "Exception while spinning : " << e.what()); + + exception_handler(e); + } + } + } +} + void EventsExecutor::spin_some(std::chrono::nanoseconds max_duration) { diff --git a/rclcpp/src/rclcpp/experimental/timers_manager.cpp b/rclcpp/src/rclcpp/experimental/timers_manager.cpp index ddccf9de34..3dff2af858 100644 --- a/rclcpp/src/rclcpp/experimental/timers_manager.cpp +++ b/rclcpp/src/rclcpp/experimental/timers_manager.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "rcpputils/scope_exit.hpp" @@ -71,14 +72,18 @@ void TimersManager::add_timer(rclcpp::TimerBase::SharedPtr timer) } } -void TimersManager::start() +RCLCPP_PUBLIC +void TimersManager::start(const std::function & exception_handler) { // Make sure that the thread is not already running if (running_.exchange(true)) { throw std::runtime_error("TimersManager::start() can't start timers thread as already running"); } - timers_thread_ = std::thread(&TimersManager::run_timers, this); + timers_thread_ = std::thread( + [this, exception_handler]() { + run_timers(exception_handler); + }); } void TimersManager::stop() @@ -201,7 +206,8 @@ std::chrono::nanoseconds TimersManager::get_head_timeout_unsafe() return head_timer->time_until_trigger(); } -void TimersManager::execute_ready_timers_unsafe() +void TimersManager::execute_ready_timers_unsafe( + std::function exception_handler) { // We start by locking the timers TimersHeap locked_heap = weak_timers_heap_.validate_and_lock(); @@ -224,7 +230,19 @@ void TimersManager::execute_ready_timers_unsafe() if (on_ready_callback_) { on_ready_callback_(head_timer.get(), data); } else { - head_timer->execute_callback(data); + if (exception_handler) { + try { + head_timer->execute_callback(); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "Exception while spinning : " << e.what()); + + exception_handler(e); + } + } else { + head_timer->execute_callback(); + } } } else { // someone canceled the timer between is_ready and call @@ -243,7 +261,8 @@ void TimersManager::execute_ready_timers_unsafe() weak_timers_heap_.store(locked_heap); } -void TimersManager::run_timers() +void TimersManager::run_timers( + const std::function & exception_handler) { // Make sure the running flag is set to false when we exit from this function // to allow restarting the timers thread. @@ -270,7 +289,7 @@ void TimersManager::run_timers() timers_updated_ = false; // Execute timers - this->execute_ready_timers_unsafe(); + this->execute_ready_timers_unsafe(exception_handler); } } diff --git a/rclcpp/test/benchmark/benchmark_parameter_client.cpp b/rclcpp/test/benchmark/benchmark_parameter_client.cpp index 0d9a6fd9c9..b59c029dd0 100644 --- a/rclcpp/test/benchmark/benchmark_parameter_client.cpp +++ b/rclcpp/test/benchmark/benchmark_parameter_client.cpp @@ -46,7 +46,10 @@ class RemoteNodeTest : public benchmark::Fixture remote_node_name, rclcpp::NodeOptions().context(remote_context)); remote_executor->add_node(remote_node); - remote_thread = std::thread(&rclcpp::executors::SingleThreadedExecutor::spin, remote_executor); + remote_thread = + std::thread( + static_cast(&rclcpp:: + executors::SingleThreadedExecutor::spin), remote_executor); } void TearDown(benchmark::State &) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 68138eb82b..995d120966 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -202,6 +202,52 @@ TYPED_TEST(TestExecutors, emptyExecutor) spinner.join(); } +TYPED_TEST(TestExecutors, catch_exception) { + using ExecutorType = TypeParam; + ExecutorType executor; + + std::shared_ptr node = + std::make_shared("test_executor_catch_exception"); + + const std::string test_reason = "test exception"; + + std::atomic_bool timer_executed_after_exception = false; + + size_t cnt = 0; + + auto timer = node->create_wall_timer( + std::chrono::milliseconds(1), [test_reason, &timer_executed_after_exception, &executor, &cnt]() + { + if (cnt == 0) { + cnt++; + throw std::runtime_error(test_reason); + } + + timer_executed_after_exception = true; + + executor.cancel(); + }); + + std::atomic_bool caught_exception = false; + + executor.add_node(node); + executor.spin( + [&caught_exception, &test_reason](const std::exception & e) + { + const std::runtime_error * runtime_error = dynamic_cast(&e); + ASSERT_NE(runtime_error, nullptr); + + ASSERT_EQ(runtime_error->what(), test_reason); + + caught_exception = true; + } + ); + + ASSERT_TRUE(caught_exception); + ASSERT_TRUE(timer_executed_after_exception); +} + + // Check executor throws properly if the same node is added a second time TYPED_TEST(TestExecutors, addNodeTwoExecutors) { diff --git a/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp b/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp index b4729bde4c..6787830c24 100644 --- a/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp +++ b/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp @@ -98,46 +98,3 @@ TEST_F(TestMultiThreadedExecutor, timer_over_take) { executor.spin(); } - -TEST_F(TestMultiThreadedExecutor, catch_exception) { - - rclcpp::executors::MultiThreadedExecutor executor; - - std::shared_ptr node = - std::make_shared("test_multi_threaded_executor_catch_exception"); - - const std::string test_reason = "test exception"; - - std::atomic_bool timer_executed_after_exception = false; - - auto timer = node->create_wall_timer( - std::chrono::milliseconds(1), [test_reason, &timer_executed_after_exception, &executor]() - { - static size_t cnt = 0; - if (cnt == 0) { - cnt++; - throw std::runtime_error(test_reason); - } - - timer_executed_after_exception = true; - executor.cancel(); - }); - - std::atomic_bool caught_exception = false; - - executor.add_node(node); - executor.spin( - [&caught_exception, &test_reason](const std::exception & e) - { - const std::runtime_error * runtime_error = dynamic_cast(&e); - ASSERT_NE(runtime_error, nullptr); - - ASSERT_EQ(runtime_error->what(), test_reason); - - caught_exception = true; - } - ); - - ASSERT_TRUE(caught_exception); - ASSERT_TRUE(timer_executed_after_exception); -} diff --git a/rclcpp/test/rclcpp/test_executor.cpp b/rclcpp/test/rclcpp/test_executor.cpp index bdbb0a1079..2a8ed922ed 100644 --- a/rclcpp/test/rclcpp/test_executor.cpp +++ b/rclcpp/test/rclcpp/test_executor.cpp @@ -42,6 +42,10 @@ class DummyExecutor : public rclcpp::Executor { } + void spin(const std::function & exception_handler) override + { + } + void spin_nanoseconds(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node) { spin_node_once_nanoseconds(node, std::chrono::milliseconds(100));