Skip to content

Commit

Permalink
Add spin_all method to Executor (ros2#1156)
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com>
  • Loading branch information
ivanpauno authored and Joshua Hampp committed Jul 7, 2020
1 parent b916f5d commit e9adc3e
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 5 deletions.
23 changes: 22 additions & 1 deletion rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class Executor
void
spin_node_some(std::shared_ptr<rclcpp::Node> node);

/// Complete all available queued work without blocking.
/// Collect work once and execute all available work, optionally within a duration.
/**
* This function can be overridden. The default implementation is suitable for a
* single-threaded model of execution.
Expand All @@ -175,6 +175,23 @@ class Executor
virtual void
spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0));

/// Collect and execute work repeatedly within a duration or until no more work is available.
/**
* This function can be overridden. The default implementation is suitable for a
* single-threaded model of execution.
* Adding subscriptions, timers, services, etc. with blocking callbacks will cause this function
* to block (which may have unintended consequences).
* If the time that waitables take to be executed is longer than the period on which new waitables
* become ready, this method will execute work repeatedly until `max_duration` has elapsed.
*
* \param[in] max_duration The maximum amount of time to spend executing work. Must be positive.
* Note that spin_all() may take longer than this time as it only returns once max_duration has
* been exceeded.
*/
RCLCPP_PUBLIC
virtual void
spin_all(std::chrono::nanoseconds max_duration);

RCLCPP_PUBLIC
virtual void
spin_once(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
Expand Down Expand Up @@ -270,6 +287,10 @@ class Executor
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
std::chrono::nanoseconds timeout);

RCLCPP_PUBLIC
void
spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive);

/// Find the next available executable and do the work associated with it.
/**
* \param[in] any_exec Union structure that can hold any executable type (timer, subscription,
Expand Down
29 changes: 25 additions & 4 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

#include "rcutils/logging_macros.h"

using namespace std::chrono_literals;

using rclcpp::exceptions::throw_from_rcl_error;
using rclcpp::AnyExecutable;
using rclcpp::Executor;
Expand Down Expand Up @@ -212,8 +214,21 @@ Executor::spin_node_some(std::shared_ptr<rclcpp::Node> node)
this->spin_node_some(node->get_node_base_interface());
}

void Executor::spin_some(std::chrono::nanoseconds max_duration)
{
return this->spin_some_impl(max_duration, false);
}

void Executor::spin_all(std::chrono::nanoseconds max_duration)
{
if (max_duration <= 0ns) {
throw std::invalid_argument("max_duration must be positive");
}
return this->spin_some_impl(max_duration, true);
}

void
Executor::spin_some(std::chrono::nanoseconds max_duration)
Executor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
{
auto start = std::chrono::steady_clock::now();
auto max_duration_not_elapsed = [max_duration, start]() {
Expand All @@ -232,14 +247,20 @@ Executor::spin_some(std::chrono::nanoseconds max_duration)
throw std::runtime_error("spin_some() called while already spinning");
}
RCLCPP_SCOPE_EXIT(this->spinning.store(false); );
// non-blocking call to pre-load all available work
wait_for_work(std::chrono::milliseconds::zero());
bool work_available = false;
while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
AnyExecutable any_exec;
if (!work_available) {
wait_for_work(std::chrono::milliseconds::zero());
}
if (get_next_ready_executable(any_exec)) {
execute_any_executable(any_exec);
work_available = true;
} else {
break;
if (!work_available || !exhaustive) {
break;
}
work_available = false;
}
}
}
Expand Down
82 changes: 82 additions & 0 deletions rclcpp/test/rclcpp/test_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,85 @@ TEST_F(TestExecutors, testSpinUntilFutureCompleteSharedFuture) {
ret = executor.spin_until_future_complete(future.share(), 100ms);
EXPECT_EQ(rclcpp::FutureReturnCode::TIMEOUT, ret);
}

class TestWaitable : public rclcpp::Waitable
{
public:
TestWaitable()
{
rcl_guard_condition_options_t guard_condition_options =
rcl_guard_condition_get_default_options();

gc_ = rcl_get_zero_initialized_guard_condition();
rcl_ret_t ret = rcl_guard_condition_init(
&gc_,
rclcpp::contexts::get_global_default_context()->get_rcl_context().get(),
guard_condition_options);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
}

bool
add_to_wait_set(rcl_wait_set_t * wait_set) override
{
rcl_ret_t ret = rcl_wait_set_add_guard_condition(wait_set, &gc_, NULL);
if (RCL_RET_OK != ret) {
return false;
}
ret = rcl_trigger_guard_condition(&gc_);
return RCL_RET_OK == ret;
}

bool
is_ready(rcl_wait_set_t * wait_set) override
{
(void)wait_set;
return true;
}

void
execute() override
{
count_++;
std::this_thread::sleep_for(100ms);
}

size_t
get_number_of_ready_guard_conditions() override {return 1;}

size_t
get_count()
{
return count_;
}

private:
size_t count_ = 0;
rcl_guard_condition_t gc_;
};

TEST_F(TestExecutors, testSpinAllvsSpinSome) {
{
rclcpp::executors::SingleThreadedExecutor executor;
auto waitable_interfaces = node->get_node_waitables_interface();
auto my_waitable = std::make_shared<TestWaitable>();
waitable_interfaces->add_waitable(my_waitable, nullptr);
executor.add_node(node);
executor.spin_all(1s);
executor.remove_node(node);
EXPECT_GT(my_waitable->get_count(), 1u);
waitable_interfaces->remove_waitable(my_waitable, nullptr);
}
{
rclcpp::executors::SingleThreadedExecutor executor;
auto waitable_interfaces = node->get_node_waitables_interface();
auto my_waitable = std::make_shared<TestWaitable>();
waitable_interfaces->add_waitable(my_waitable, nullptr);
executor.add_node(node);
executor.spin_some(1s);
executor.remove_node(node);
EXPECT_EQ(my_waitable->get_count(), 1u);
waitable_interfaces->remove_waitable(my_waitable, nullptr);
}
}

0 comments on commit e9adc3e

Please sign in to comment.