diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 557e4f3ab3..a4c9f0b943 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -160,7 +160,7 @@ class Executor void spin_node_some(std::shared_ptr node); - /// Complete all available queued work without blocking. + /// Collect work once and execute all available work, optionally within a duration. /** * This function can be overridden. The default implementation is suitable for a * single-threaded model of execution. @@ -175,6 +175,23 @@ class Executor virtual void spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0)); + /// Collect and execute work repeatedly within a duration or until no more work is available. + /** + * This function can be overridden. The default implementation is suitable for a + * single-threaded model of execution. + * Adding subscriptions, timers, services, etc. with blocking callbacks will cause this function + * to block (which may have unintended consequences). + * If the time that waitables take to be executed is longer than the period on which new waitables + * become ready, this method will execute work repeatedly until `max_duration` has elapsed. + * + * \param[in] max_duration The maximum amount of time to spend executing work. Must be positive. + * Note that spin_all() may take longer than this time as it only returns once max_duration has + * been exceeded. + */ + RCLCPP_PUBLIC + virtual void + spin_all(std::chrono::nanoseconds max_duration); + RCLCPP_PUBLIC virtual void spin_once(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); @@ -270,6 +287,10 @@ class Executor rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds timeout); + RCLCPP_PUBLIC + void + spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive); + /// Find the next available executable and do the work associated with it. /** * \param[in] any_exec Union structure that can hold any executable type (timer, subscription, diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 0c83e0deba..5aac2284ed 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -28,6 +28,8 @@ #include "rcutils/logging_macros.h" +using namespace std::chrono_literals; + using rclcpp::exceptions::throw_from_rcl_error; using rclcpp::AnyExecutable; using rclcpp::Executor; @@ -212,8 +214,21 @@ Executor::spin_node_some(std::shared_ptr node) this->spin_node_some(node->get_node_base_interface()); } +void Executor::spin_some(std::chrono::nanoseconds max_duration) +{ + return this->spin_some_impl(max_duration, false); +} + +void Executor::spin_all(std::chrono::nanoseconds max_duration) +{ + if (max_duration <= 0ns) { + throw std::invalid_argument("max_duration must be positive"); + } + return this->spin_some_impl(max_duration, true); +} + void -Executor::spin_some(std::chrono::nanoseconds max_duration) +Executor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive) { auto start = std::chrono::steady_clock::now(); auto max_duration_not_elapsed = [max_duration, start]() { @@ -232,14 +247,20 @@ Executor::spin_some(std::chrono::nanoseconds max_duration) throw std::runtime_error("spin_some() called while already spinning"); } RCLCPP_SCOPE_EXIT(this->spinning.store(false); ); - // non-blocking call to pre-load all available work - wait_for_work(std::chrono::milliseconds::zero()); + bool work_available = false; while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) { AnyExecutable any_exec; + if (!work_available) { + wait_for_work(std::chrono::milliseconds::zero()); + } if (get_next_ready_executable(any_exec)) { execute_any_executable(any_exec); + work_available = true; } else { - break; + if (!work_available || !exhaustive) { + break; + } + work_available = false; } } } diff --git a/rclcpp/test/rclcpp/test_executor.cpp b/rclcpp/test/rclcpp/test_executor.cpp index cdefec71e1..731add17c1 100644 --- a/rclcpp/test/rclcpp/test_executor.cpp +++ b/rclcpp/test/rclcpp/test_executor.cpp @@ -115,3 +115,85 @@ TEST_F(TestExecutors, testSpinUntilFutureCompleteSharedFuture) { ret = executor.spin_until_future_complete(future.share(), 100ms); EXPECT_EQ(rclcpp::FutureReturnCode::TIMEOUT, ret); } + +class TestWaitable : public rclcpp::Waitable +{ +public: + TestWaitable() + { + rcl_guard_condition_options_t guard_condition_options = + rcl_guard_condition_get_default_options(); + + gc_ = rcl_get_zero_initialized_guard_condition(); + rcl_ret_t ret = rcl_guard_condition_init( + &gc_, + rclcpp::contexts::get_global_default_context()->get_rcl_context().get(), + guard_condition_options); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + } + + bool + add_to_wait_set(rcl_wait_set_t * wait_set) override + { + rcl_ret_t ret = rcl_wait_set_add_guard_condition(wait_set, &gc_, NULL); + if (RCL_RET_OK != ret) { + return false; + } + ret = rcl_trigger_guard_condition(&gc_); + return RCL_RET_OK == ret; + } + + bool + is_ready(rcl_wait_set_t * wait_set) override + { + (void)wait_set; + return true; + } + + void + execute() override + { + count_++; + std::this_thread::sleep_for(100ms); + } + + size_t + get_number_of_ready_guard_conditions() override {return 1;} + + size_t + get_count() + { + return count_; + } + +private: + size_t count_ = 0; + rcl_guard_condition_t gc_; +}; + +TEST_F(TestExecutors, testSpinAllvsSpinSome) { + { + rclcpp::executors::SingleThreadedExecutor executor; + auto waitable_interfaces = node->get_node_waitables_interface(); + auto my_waitable = std::make_shared(); + waitable_interfaces->add_waitable(my_waitable, nullptr); + executor.add_node(node); + executor.spin_all(1s); + executor.remove_node(node); + EXPECT_GT(my_waitable->get_count(), 1u); + waitable_interfaces->remove_waitable(my_waitable, nullptr); + } + { + rclcpp::executors::SingleThreadedExecutor executor; + auto waitable_interfaces = node->get_node_waitables_interface(); + auto my_waitable = std::make_shared(); + waitable_interfaces->add_waitable(my_waitable, nullptr); + executor.add_node(node); + executor.spin_some(1s); + executor.remove_node(node); + EXPECT_EQ(my_waitable->get_count(), 1u); + waitable_interfaces->remove_waitable(my_waitable, nullptr); + } +}