diff --git a/rclpy/CMakeLists.txt b/rclpy/CMakeLists.txt index 8012e0fc2..694c3e312 100644 --- a/rclpy/CMakeLists.txt +++ b/rclpy/CMakeLists.txt @@ -109,6 +109,7 @@ pybind11_add_module(_rclpy_pybind11 SHARED src/rclpy/context.cpp src/rclpy/destroyable.cpp src/rclpy/duration.cpp + src/rclpy/clock_event.cpp src/rclpy/exceptions.cpp src/rclpy/graph.cpp src/rclpy/guard_condition.cpp diff --git a/rclpy/rclpy/clock.py b/rclpy/rclpy/clock.py index da10f7376..2bcb21070 100644 --- a/rclpy/rclpy/clock.py +++ b/rclpy/rclpy/clock.py @@ -15,7 +15,9 @@ from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy from .duration import Duration +from .exceptions import NotInitializedException from .time import Time +from .utilities import get_default_context ClockType = _rclpy.ClockType @@ -172,6 +174,68 @@ def callback_shim(jump_dict): clock=self, threshold=threshold, pre_callback=pre_callback, post_callback=post_callback) + def sleep_until(self, until: Time, context=None) -> bool: + """ + Sleep until a Time on this Clock is reached. + + When using a ROSClock, this may sleep forever if the TimeSource is misconfigured and the + context is never shut down. + ROS time being activated or deactivated causes this function to cease sleeping and return + False. + + :param until: Time at which this function should stop sleeping. + :param context: Context which when shut down will cause this sleep to wake early. + If context is None, then the default context is used. + :return: True if until was reached, or False if it woke for another reason. + :raises ValueError: until is specified for a different type of clock than this one. + :raises NotInitializedException: context has not been initialized or is shutdown. + """ + if context is None: + context = get_default_context() + + if not context.ok(): + raise NotInitializedException() + + if until.clock_type != self._clock_type: + raise ValueError("until's clock type does not match this clock's type") + + event = _rclpy.ClockEvent() + time_source_changed = False + + def on_time_jump(time_jump: TimeJump): + """Wake when time jumps and is past target time.""" + nonlocal time_source_changed + + # ROS time being activated or deactivated changes the epoch, so sleep + # time loses its meaning + time_source_changed = ( + time_source_changed or + ClockChange.ROS_TIME_ACTIVATED == time_jump.clock_change or + ClockChange.ROS_TIME_DEACTIVATED == time_jump.clock_change) + + if time_source_changed or self.now() >= until: + event.set() + + # Wake when context is shut down + context.on_shutdown(event.set) + + threshold = JumpThreshold( + min_forward=Duration(nanoseconds=1), + min_backward=None, + on_clock_change=True) + with self.create_jump_callback(threshold, post_callback=on_time_jump): + if ClockType.SYSTEM_TIME == self._clock_type: + event.wait_until_system(self.__clock, until._time_handle) + elif ClockType.STEADY_TIME == self._clock_type: + event.wait_until_steady(self.__clock, until._time_handle) + elif ClockType.ROS_TIME == self._clock_type: + event.wait_until_ros(self.__clock, until._time_handle) + + if not context.ok() or time_source_changed: + return False + + return self.now() >= until + class ROSClock(Clock): diff --git a/rclpy/src/rclpy/_rclpy_pybind11.cpp b/rclpy/src/rclpy/_rclpy_pybind11.cpp index ccf828fd9..3a78e6b5e 100644 --- a/rclpy/src/rclpy/_rclpy_pybind11.cpp +++ b/rclpy/src/rclpy/_rclpy_pybind11.cpp @@ -25,6 +25,7 @@ #include "context.hpp" #include "destroyable.hpp" #include "duration.hpp" +#include "clock_event.hpp" #include "exceptions.hpp" #include "graph.hpp" #include "guard_condition.hpp" @@ -228,4 +229,5 @@ PYBIND11_MODULE(_rclpy_pybind11, m) { rclpy::define_logging_api(m); rclpy::define_signal_handler_api(m); + rclpy::define_clock_event(m); } diff --git a/rclpy/src/rclpy/clock_event.cpp b/rclpy/src/rclpy/clock_event.cpp new file mode 100644 index 000000000..b53fb5ee8 --- /dev/null +++ b/rclpy/src/rclpy/clock_event.cpp @@ -0,0 +1,120 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "clock_event.hpp" + +namespace py = pybind11; + +namespace rclpy +{ +template +void ClockEvent::wait_until(std::shared_ptr clock, rcl_time_point_t until) +{ + // Synchronize because clock epochs might differ + const rcl_time_point_t rcl_entry = clock->get_now(); + const typename ClockType::time_point chrono_entry = ClockType::now(); + + rcl_duration_t delta_t; + rcl_ret_t ret = rcl_difference_times(&rcl_entry, &until, &delta_t); + + if (RCL_RET_OK != ret) { + throw RCLError("failed to subtract times"); + } + + // Cast because system clock resolution is too big for nanoseconds on Windows & OSX + const typename ClockType::time_point chrono_until = chrono_entry + + std::chrono::duration_cast( + std::chrono::nanoseconds(delta_t.nanoseconds)); + + // Could be a long wait, release the gil + py::gil_scoped_release release; + std::unique_lock lock(mutex_); + cv_.wait_until(lock, chrono_until, [this]() {return state_;}); +} + +void ClockEvent::wait_until_ros(std::shared_ptr clock, rcl_time_point_t until) +{ + // Check if ROS time is enabled in C++ to avoid TOCTTOU with TimeSource by holding GIL + if (clock->get_ros_time_override_is_enabled()) { + // Could be a long wait, release the gil + py::gil_scoped_release release; + std::unique_lock lock(mutex_); + // Caller must have setup a time jump callback to wake this event + cv_.wait(lock, [this]() {return state_;}); + } else { + // ROS time not enabled is system time + wait_until(clock, until); + } +} + +bool ClockEvent::is_set() +{ + std::unique_lock lock(mutex_); + return state_; +} + +void ClockEvent::set() +{ + { + std::unique_lock lock(mutex_); + state_ = true; + } + cv_.notify_all(); +} + +void ClockEvent::clear() +{ + { + std::unique_lock lock(mutex_); + state_ = false; + } + cv_.notify_all(); +} + +void define_clock_event(py::object module) +{ + py::class_(module, "ClockEvent") + .def(py::init()) + .def( + "wait_until_steady", &ClockEvent::wait_until, + "Wait for the event to be set (monotonic wait)") + .def( + "wait_until_system", &ClockEvent::wait_until, + "Wait for the event to be set (system timed wait)") + .def( + "wait_until_ros", &ClockEvent::wait_until_ros, + "Wait for the event to be set (ROS timed wait)") + .def( + "is_set", &ClockEvent::is_set, + "Return True if the event is set, False otherwise.") + .def( + "set", &ClockEvent::set, + "Set the event, waking all those who wait on it.") + .def( + "clear", &ClockEvent::clear, + "Unset the event."); +} +} // namespace rclpy diff --git a/rclpy/src/rclpy/clock_event.hpp b/rclpy/src/rclpy/clock_event.hpp new file mode 100644 index 000000000..ab482bbfc --- /dev/null +++ b/rclpy/src/rclpy/clock_event.hpp @@ -0,0 +1,75 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLPY__CLOCK_EVENT_HPP_ +#define RCLPY__CLOCK_EVENT_HPP_ + +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "clock.hpp" + +namespace py = pybind11; + +namespace rclpy +{ +class ClockEvent +{ +public: + /// Wait until a time specified by a system or steady clock. + /// \param clock the clock to use for time synchronization with until + /// \param until this method will block until this time is reached. + template + void wait_until(std::shared_ptr clock, rcl_time_point_t until); + + /// Wait until a time specified by a ROS clock. + /// \warning the caller is responsible for creating a time jump callback to set this event when + /// the target ROS time is reached. + /// when a given ROS time is reached. + /// \param clock the clock to use for time synchronization. + /// \param until this method will block until this time is reached. + void wait_until_ros(std::shared_ptr clock, rcl_time_point_t until); + + /// Indicate if the ClockEvent is set. + /// \return True if the ClockEvent is set. + bool is_set(); + + /// Set the event. + void set(); + + /// Clear the event. + void clear(); + +private: + bool state_ = false; + std::mutex mutex_; + std::condition_variable cv_; +}; + +/// Define a pybind11 wrapper for an rclpy::ClockEvent +void define_clock_event(py::object module); +} // namespace rclpy + +#endif // RCLPY__CLOCK_EVENT_HPP_ diff --git a/rclpy/test/test_clock.py b/rclpy/test/test_clock.py index e5c2c107e..c6a147a24 100644 --- a/rclpy/test/test_clock.py +++ b/rclpy/test/test_clock.py @@ -12,22 +12,30 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading import time import unittest from unittest.mock import Mock import pytest +import rclpy from rclpy.clock import Clock from rclpy.clock import ClockType from rclpy.clock import JumpHandle from rclpy.clock import JumpThreshold from rclpy.clock import ROSClock +from rclpy.context import Context from rclpy.duration import Duration +from rclpy.exceptions import NotInitializedException from rclpy.time import Time +from rclpy.utilities import get_default_context from .mock_compat import __name__ as _ # noqa: ignore=F401 +A_SMALL_AMOUNT_OF_TIME = Duration(seconds=0.5) + + def test_invalid_jump_threshold(): with pytest.raises(ValueError, match='.*min_forward.*'): JumpThreshold( @@ -211,6 +219,145 @@ def test_triggered_clock_change_callbacks(self): handler3.unregister() +@pytest.fixture() +def default_context(): + rclpy.init() + yield get_default_context() + rclpy.shutdown() + + +@pytest.fixture() +def non_default_context(): + context = Context() + context.init() + yield context + context.try_shutdown() + + +def test_sleep_until_mismatched_clock_type(default_context): + clock = Clock(clock_type=ClockType.SYSTEM_TIME) + with pytest.raises(ValueError, match='.*clock type does not match.*'): + clock.sleep_until(Time(clock_type=ClockType.STEADY_TIME)) + + +def test_sleep_until_non_default_context(non_default_context): + clock = Clock() + assert clock.sleep_until(clock.now() + Duration(seconds=0.1), context=non_default_context) + + +def test_sleep_until_invalid_context(): + clock = Clock() + with pytest.raises(NotInitializedException): + clock.sleep_until(clock.now() + Duration(seconds=0.1), context=Context()) + + +@pytest.mark.parametrize( + 'clock_type', (ClockType.SYSTEM_TIME, ClockType.STEADY_TIME, ClockType.ROS_TIME)) +def test_sleep_until_basic(default_context, clock_type): + clock = Clock(clock_type=clock_type) + sleep_duration = Duration(seconds=0.1) + start = clock.now() + assert clock.sleep_until(clock.now() + sleep_duration) + stop = clock.now() + assert stop - start >= sleep_duration + + +@pytest.mark.parametrize( + 'clock_type', (ClockType.SYSTEM_TIME, ClockType.STEADY_TIME, ClockType.ROS_TIME)) +def test_sleep_until_time_in_past(default_context, clock_type): + clock = Clock(clock_type=clock_type) + sleep_duration = Duration(seconds=-1) + start = clock.now() + assert clock.sleep_until(clock.now() + sleep_duration) + stop = clock.now() + assert stop - start < A_SMALL_AMOUNT_OF_TIME + + +@pytest.mark.parametrize('ros_time_enabled', (True, False)) +def test_sleep_until_ros_time_toggled(default_context, ros_time_enabled): + clock = ROSClock() + clock._set_ros_time_is_active(not ros_time_enabled) + + retval = None + + def run(): + nonlocal retval + retval = clock.sleep_until(clock.now() + Duration(seconds=10)) + + t = threading.Thread(target=run) + t.start() + + # wait for thread to get inside sleep_until call + time.sleep(0.2) + + clock._set_ros_time_is_active(ros_time_enabled) + + # wait for thread to exit + start = clock.now() + t.join() + stop = clock.now() + assert stop - start < A_SMALL_AMOUNT_OF_TIME + + assert retval is False + + +def test_sleep_until_context_shut_down(non_default_context): + clock = Clock() + retval = None + + def run(): + nonlocal retval + retval = clock.sleep_until( + clock.now() + Duration(seconds=10), context=non_default_context) + + t = threading.Thread(target=run) + t.start() + + # wait for thread to get inside sleep_until call + time.sleep(0.2) + + non_default_context.shutdown() + + # wait for thread to exit + start = clock.now() + t.join() + stop = clock.now() + assert stop - start < A_SMALL_AMOUNT_OF_TIME + + assert retval is False + + +def test_sleep_until_ros_time_enabled(default_context): + clock = ROSClock() + clock._set_ros_time_is_active(True) + + start_time = Time(seconds=1, clock_type=ClockType.ROS_TIME) + stop_time = start_time + Duration(seconds=10) + clock.set_ros_time_override(start_time) + + retval = None + + def run(): + nonlocal retval + retval = clock.sleep_until(stop_time) + + t = threading.Thread(target=run) + t.start() + + # wait for thread to get inside sleep_until call + time.sleep(0.2) + + clock.set_ros_time_override(stop_time) + + # wait for thread to exit + start = clock.now() + t.join() + stop = clock.now() + assert stop - start < A_SMALL_AMOUNT_OF_TIME + + assert retval + + def test_with_jump_handle(): clock = ROSClock() clock._set_ros_time_is_active(False)