diff --git a/docs/design/rosbag2_playback_time.md b/docs/design/rosbag2_playback_time.md new file mode 100644 index 0000000000..8233983b52 --- /dev/null +++ b/docs/design/rosbag2_playback_time.md @@ -0,0 +1,206 @@ +# Rosbag2 Playback Time Handling + +## Context + +This design does not consider recording, it only handles playback, which has a more complex interaction with time. + +Terms: +* "ROS Time" - the time expressed on a `/clock` topic +* "ROS Time Source" - the entity that publishes to `/clock` +For more context on time in ROS 2 see http://design.ros2.org/articles/clock_and_time.html#ros-time + +## Goal + +We need Rosbag2 playback to do more than just playing back in real time according to the system clock. +For playback we want to support the following "time-control" features: +* Pause and resume time + * When time is resumed, rosbag2 will play the next message in sequence after the last one it published + * There is an alternate concept of "pause and resume" that means "suppress recording/publishing while time continues, such that messages are skipped" - I use "pause and resume" in this document in the sense of "stopping and starting the flow of time". An alternative design handles that other feature, which may be more appropriately named something else such as "suppress", or "mute" for an analogy with an audio playback. +* Set a (forward) rate of playback - either faster or slower than real time + * This is already implemented as of this writing, but this design proposes a change in the implementation +* Jump back or forward to an arbitrary point in time + * This is called "jump" in the time interface, but will likely be exposed in the player interface as "seek" + +These are the time situations to handle: +1. Steady Time - Rosbag2 keeps time internally with reference to a monotonic system clock, and neither publishes or subscribes to `/clock` + * This is the default behavior +2. Rosbag2 acts as ROS Time Source by publishing to the `/clock` topic (a simple extension of case 1) + * Chosen by `--clock` option to `ros2 bag play` +3. Rosbag2 is driven by an external ROS Time Source - most commonly the Gazebo simulator + * In this case, Rosbag2 cannot use "time-control" features presented above, since it is a passive consumer of time + * Chosen by `--use-sim-time` argument to `ros2 bag play` + +NOTE: The user is responsible for ensuring that there is only one publisher on `/clock` - Rosbag2 will not try to de-conflict multiple ROS Time Sources, but it will print a warning if more than one publisher is detected. + +## Notes on current implementation (as of writing this design) + +`rosbag2_transport::Player` currently uses `std::chrono::system_clock` to query time, and `std::this_thread::sleep_until` to wait between publishing messages, with explicit handling of playback rate. + +## Proposal + +Pass a `PlayerClock` instance to `rosbag2_transport::Player` +* Use `PlayerClock::now` to query starting time +* Use `PlayerClock::sleep_until` between messages + * `rclcpp::Clock` does not yet implement `sleep_until` - as noted in https://github.com/ros2/rcl/issues/898 + * If we are not able to contribute this feature into `rclcpp` in time for the Galactic API freeze - it may need to temporarily go into a rosbag2-based subclass for the Galactic release. + +The following pseudocode suggests the high level function and API (but does not set it in stone) + +``` +// A time point value without a reference time +type TimePointValue; + +// A time interval +type Duration; + +/* + Used to control the timing of bag playback. + This clock should be used to query times and sleep between message playing, + so that the complexity involved around time control and time sources is encapsulated in this one place. + Internally, it may own an rclcpp::Clock, but does not override the class in order to implement a slightly different API. +*/ +class PlayerClock +{ + /* + Provide the current time according to the clock's internal model. + if use_sim_time: provides current ROS Time (with optional extrapolation - see "Clock Rate and Time Extrapolation" section) + if !use_sim_time: calculates current "Player Time" based on starting time, playback rate, pause state. + this means that /clock time will match with the recorded messages time, as if we are fully reliving the recorded session + */ + TimePointValue now(); + + /* + Sleep (non-busy wait) the current thread until the provided time is reached - according to this Clock + If time is paused, the requested time may never be reached: `real_time_timeout` uses the internal steady clock to return false if the timeout elapses + If jump() is called, return false, allowing the caller to handle the new time + Return true when the time is reached + */ + bool sleep_until(TimePointValue until, Duration real_time_timeout); + + /* + Pauses/resumes time. + While paused, `now()` will repeatedly return the same time, until resumed. + Note: this could have been defined as `set_rate(0)`, but this interface allows the clock to save the playback rate internally + */ + void set_paused(bool paused); + bool get_paused() const; + + /* + Set the rate of playback - a unitless ratio. Defaults to 1.0 (real-time) + rate must be greater than 0 - to stop playback, use set_paused instead + */ + void set_rate(float rate); + float get_rate() const; + + /* + Set the rate in Hz that /clock will be published. Defaults to 0. + If this is set to <= 0, then /clock will not be published. + If this is set to > 0, then /clock will start being published immediately + */ + void set_clock_publish_frequency(float frequency); + float get_clock_publish_frequency() const; + + /* + Change the current internally maintained offset so that next published time is different. + This will trigger any registered JumpHandler callbacks. + Call this with the first message timestamp for a bag before starting playback (otherwise this will return current wall time) + */ + void jump(rclcpp::Time time); + + /* + This is a copy of the rclcpp::Clock API - these handlers will be called in two cases: + 1. use_sim_time is true: if the external time source jumps back in time, or forward farther than the threshold + 2. use_sim_time is false: if jump() is called) + */ + rclcpp::JumpHandler::SharedPtr + create_jump_callback( + rclcpp::JumpHandler::pre_callback_t pre_callback, + rclcpp::JumpHandler::post_callback_t post_callback, + const rcl_jump_threshold_t & threshold); +}; // end of PlayerClock API + +// Construct a clock that subscribes to /clock and cannot control time. +// It will print a warning when a user tries to change rate, jump, pause, etc. +PlayerClock SimTimePlayerClock(bool extrapolate_samples = false); + +// Construct a clock that can control time and optionally publish to /clock +PlayerClock TimeControlPlayerClock( + TimePointValue starting_time, + float rate = 1.0, + bool start_paused = true, + float clock_publish_frequency = 0.0 +); +``` + +### Clock Rate and Time Extrapolation + +This section is only relevant for Case 3 (subscribing to an external ROS Time Source), and does not apply to other cases. + +Common `/clock` publish rates are 10-100Hz - which is considerably slower than, for example, an IMU which may commonly publish at 200Hz or faster. +According to the [ROS Time Source Design](http://design.ros2.org/articles/clock_and_time.html#ros-time-source), "calls to the ROS time abstraction will return the latest time received from the `/clock` topic." +This means that faster topics will have a playback resolution only as fine as the `/clock` rate, as illustrated below. + +``` +# dots are arbitrary time tick, letters are when a message is published +# M is topic Message, C is /clock message + +# original system +M..M..M..M..M..M..M..M..M..M..M.. +C.......C........C........C...... + +# playback on use_sim_time with default behavior - when a new time sample is received a burst of backed up messages will be played +M.......MM.......MMM......MMM.... +C.......C........C........C...... +``` + +The section ["No Advanced Estimating Clock By Default"](http://design.ros2.org/articles/clock_and_time.html#no-advanced-estimating-clock-by-default) of the "Clock and Time" design lays out that more advanced behavior is possible, though (direct quote) "these techniques will require making assumptions about the future behavior of the time abstraction. And in the case that playback or simulation is instantaneously paused, it will break any of these assumptions." +To paraphrase: we can't possibly know what the next `/clock` message will be or if it will arrive at all; it could be a larger value interval (increased rate), smaller time interval (decreased rate), never arrive (paused), or a jump to a totally unrelated time. + +While the above is technically true, I propose that we implement a simple extrapolation anyways for the case of subscribing to an external ROS Time Source: +* Store internally the last N time samples (noting the rate that samples are received) +* When `now()` is called, determine the rate from the stored time samples and extrapolate based on time +* Put a watchdog on the clock to notice if new messages are not received in the expected period + * Deadline QoS on the `/clock` publisher is preferred for this, but we may not be able to enforce this for all ROS Time Sources + * Immediately "pause time" when a missing sample is noticed + +Error cases: + * ROS Time Source increases rate-of-time: playback may be slightly behind, and catches up by the next sample + * ROS Time Source decreases rate-of-time: playback may be slightly ahead based on extrapolated messages being published early, and will wait until the newly correct time to publish next messages. It will not re-publish any already-published message. + * ROS Time Source is instantaneously paused: Rosbag2 will play back "future" messages for no longer than one clock-sample period, stopping once the pause is noticed + +The error in at all cases is _at most_ the amount of time that is represented by one clock sample. +This design deems that error acceptable for most use cases, and the benefit large for high-frequency topic playback. +To let the user make this decision, it is an optionally enabled behavior via `--extrapolate-ros-time`, which only has an effect when `--use-sim-time` is provided. +Note: The faster the `/clock` rate, the smaller the possible error both with and without extrapolation. + +### Pause and Resume Time + +No special allowance needs to be made in the `Player` - time will still be provided, but it will not move forward, so the next message will not be published until time is resumed. + +NOTE: this is different than "suppress/mute" where time continues but publishing stops - that feature is handled outside of time-control. + +### Setting the Rate + +No special allowance needs to be made in the `Player` - time will be provided at a different rate, and `sleep_until` will handle this. + +### Time Jumps + +The `Player` must register a `JumpHandler` with the `Clock` - so that when a jump occurs, the current message playback queue can be invalidated and re-enqueued according to the new starting time. + +### Synchronizing rosbag2 playback + +As a consequence of this design, it is possible to synchronize the playback of multiple rosbags, by setting one to publish with `--clock`, and the others to listen with `--use-sim-time`. This feature is useful if for example bags were recorded for different topic selections, in the same time range. + +## Implementation Staging + +This should not be implemented monolithically. Implementation should focus on small incremental PRs with solid testing that are easy to review. +This is a proposed order of operations. + +* Create a mostly-unimplemented `PlayerClock` class, and have `Player` use it to maintain current functionality. This accomplishes the entire API change without taking very much implementation or review. +* Move playback rate handling out of `Player` to be handled by `PlayerClock` +* Implement `/clock` publisher +* Implement pause/resume to the `PlayerClock` +* Implement time jump to the `PlayerClock` (and handlers to the `Player`) +* Expose rate, pause/resume, and time jump as Services to enable CLI/keyboard/GUI controls +* Implement `use_sim_time` (`/clock` subscription) - without extrapolation +* Implement sim time extrapolation diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index d08b06dde2..008c497f2f 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -24,6 +24,13 @@ import yaml +def positive_float(arg: str) -> float: + value = float(arg) + if value <= 0: + raise ValueError(f'Value {value} is less than or equal to zero.') + return value + + class PlayVerb(VerbExtension): """Play back ROS data from a bag.""" @@ -64,6 +71,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102 ' pragmas: [\"\" = ]' 'Note that applicable settings are limited to read-only for ros2 bag play.' 'For a list of sqlite3 settings, refer to sqlite3 documentation') + parser.add_argument( + '-c', '--clock', type=float, default=0.0, + help='Publish to /clock at a specific frequency in Hz, to act as a ROS Time Source. ' + 'Value must be positive. Defaults to not publishing.') def main(self, *, args): # noqa: D102 qos_profile_overrides = {} # Specify a valid default @@ -95,4 +106,5 @@ def main(self, *, args): # noqa: D102 qos_profile_overrides=qos_profile_overrides, loop=args.loop, topic_remapping=args.remap, - storage_config_file=storage_config_file) + storage_config_file=storage_config_file, + clock_publish_frequency = args.clock) diff --git a/rosbag2_cpp/CMakeLists.txt b/rosbag2_cpp/CMakeLists.txt index e13d7dd4b8..24faf74ccd 100644 --- a/rosbag2_cpp/CMakeLists.txt +++ b/rosbag2_cpp/CMakeLists.txt @@ -48,6 +48,7 @@ add_library(${PROJECT_NAME} SHARED src/rosbag2_cpp/cache/cache_consumer.cpp src/rosbag2_cpp/cache/message_cache_buffer.cpp src/rosbag2_cpp/cache/message_cache.cpp + src/rosbag2_cpp/clocks/time_controller_clock.cpp src/rosbag2_cpp/converter.cpp src/rosbag2_cpp/info.cpp src/rosbag2_cpp/reader.cpp @@ -206,6 +207,12 @@ if(BUILD_TESTING) ament_target_dependencies(test_multifile_reader rosbag2_storage) target_link_libraries(test_multifile_reader ${PROJECT_NAME}) endif() + + ament_add_gmock(test_time_controller_clock + test/rosbag2_cpp/test_time_controller_clock.cpp) + if(TARGET test_time_controller_clock) + target_link_libraries(test_time_controller_clock ${PROJECT_NAME}) + endif() endif() ament_package() diff --git a/rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp b/rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp new file mode 100644 index 0000000000..78f25f95c3 --- /dev/null +++ b/rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp @@ -0,0 +1,71 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_CPP__CLOCKS__PLAYER_CLOCK_HPP_ +#define ROSBAG2_CPP__CLOCKS__PLAYER_CLOCK_HPP_ + +#include +#include +#include + +#include "rcutils/time.h" +#include "rosbag2_cpp/visibility_control.hpp" + +namespace rosbag2_cpp +{ + +/** + * Virtual interface used to drive the timing of bag playback. + * This clock should be used to query times and sleep between message playing, + * so that the complexity involved around time control and time sources + * is encapsulated in this one place. + */ +class PlayerClock +{ +public: + /** + * Type representing an arbitrary steady time, used to measure real-time durations + * This type is never exposed by the PlayerClock - it is only used as input to the PlayerClock. + */ + typedef std::function NowFunction; + + ROSBAG2_CPP_PUBLIC + virtual ~PlayerClock() = default; + + /** + * Calculate and return current rcutils_time_point_value_t based on starting time, playback rate, pause state. + */ + ROSBAG2_CPP_PUBLIC + virtual rcutils_time_point_value_t now() const = 0; + + /** + * Try to sleep (non-busy) the current thread until the provided time is reached - according to this Clock + * + * Return true if the time has been reached, false if it was not successfully reached after sleeping + * for the appropriate duration. + * The user should not take action based on this sleep until it returns true. + */ + ROSBAG2_CPP_PUBLIC + virtual bool sleep_until(rcutils_time_point_value_t until) = 0; + + /** + * Return the current playback rate. + */ + ROSBAG2_CPP_PUBLIC + virtual double get_rate() const = 0; +}; + +} // namespace rosbag2_cpp + +#endif // ROSBAG2_CPP__CLOCKS__PLAYER_CLOCK_HPP_ diff --git a/rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp b/rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp new file mode 100644 index 0000000000..980e43545f --- /dev/null +++ b/rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp @@ -0,0 +1,83 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_CPP__CLOCKS__TIME_CONTROLLER_CLOCK_HPP_ +#define ROSBAG2_CPP__CLOCKS__TIME_CONTROLLER_CLOCK_HPP_ + +#include +#include + +#include "rosbag2_cpp/clocks/player_clock.hpp" + +namespace rosbag2_cpp +{ + +/** + * Version of the PlayerClock interface that has control over time. + * It does not listen to any external ROS Time Source and can optionally publish /clock + */ +class TimeControllerClockImpl; +class TimeControllerClock : public PlayerClock +{ +public: + /** + * Constructor. + * + * \param starting_time: provides an initial offset for managing time + * This will likely be the timestamp of the first message in the bag + * \param rate: Rate of playback, a unit-free ratio. 1.0 is real-time + * \param now_fn: Function used to get the current steady time + * defaults to std::chrono::steady_clock::now + * Used to control for unit testing, or for specialized needs + * \throws std::runtime_error if rate is <= 0 + */ + ROSBAG2_CPP_PUBLIC + TimeControllerClock( + rcutils_time_point_value_t starting_time, + double rate = 1.0, + NowFunction now_fn = std::chrono::steady_clock::now); + + ROSBAG2_CPP_PUBLIC + virtual ~TimeControllerClock(); + + /** + * Calculate and return current rcutils_time_point_value_t based on starting time, playback rate, pause state. + */ + ROSBAG2_CPP_PUBLIC + rcutils_time_point_value_t now() const override; + + /** + * Try to sleep (non-busy) the current thread until the provided time is reached - according to this Clock + * + * Return true if the time has been reached, false if it was not successfully reached after sleeping + * for the appropriate duration. + * The user should not take action based on this sleep until it returns true. + */ + ROSBAG2_CPP_PUBLIC + bool sleep_until(rcutils_time_point_value_t until) override; + + /** + * Return the current playback rate. + */ + ROSBAG2_CPP_PUBLIC + double get_rate() const override; + +private: + std::unique_ptr impl_; +}; + + +} // namespace rosbag2_cpp + +#endif // ROSBAG2_CPP__CLOCKS__TIME_CONTROLLER_CLOCK_HPP_ diff --git a/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp b/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp new file mode 100644 index 0000000000..dc00bf14d2 --- /dev/null +++ b/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp @@ -0,0 +1,108 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include "rcpputils/thread_safety_annotations.hpp" +#include "rosbag2_cpp/clocks/time_controller_clock.hpp" +#include "rosbag2_cpp/types.hpp" + +namespace rosbag2_cpp +{ + +class TimeControllerClockImpl +{ +public: + /** + * Stores an exact time match between a system steady clock and the playback ROS clock. + * This snapshot is taken whenever a factor changes such that a new reference is needed, + * such as pause, resume, rate change, or jump + */ + struct TimeReference + { + rcutils_time_point_value_t ros; + std::chrono::steady_clock::time_point steady; + }; + + TimeControllerClockImpl() = default; + virtual ~TimeControllerClockImpl() = default; + + template + rcutils_duration_value_t duration_nanos(const T & duration) + { + return std::chrono::duration_cast(duration).count(); + } + + rcutils_time_point_value_t steady_to_ros(std::chrono::steady_clock::time_point steady_time) + { + return reference.ros + static_cast( + rate * duration_nanos(steady_time - reference.steady)); + } + + std::chrono::steady_clock::time_point ros_to_steady(rcutils_time_point_value_t ros_time) + { + const auto diff_nanos = static_cast( + (ros_time - reference.ros) / rate); + return reference.steady + std::chrono::nanoseconds(diff_nanos); + } + + std::mutex mutex; + std::condition_variable cv; + PlayerClock::NowFunction now_fn RCPPUTILS_TSA_GUARDED_BY(mutex); + double rate RCPPUTILS_TSA_GUARDED_BY(mutex) = 1.0; + TimeReference reference RCPPUTILS_TSA_GUARDED_BY(mutex); +}; + +TimeControllerClock::TimeControllerClock( + rcutils_time_point_value_t starting_time, + double rate, + NowFunction now_fn) +: impl_(std::make_unique()) +{ + std::lock_guard lock(impl_->mutex); + impl_->now_fn = now_fn; + impl_->reference.ros = starting_time; + impl_->reference.steady = impl_->now_fn(); + impl_->rate = rate; +} + +TimeControllerClock::~TimeControllerClock() +{} + +rcutils_time_point_value_t TimeControllerClock::now() const +{ + std::lock_guard lock(impl_->mutex); + return impl_->steady_to_ros(impl_->now_fn()); +} + +bool TimeControllerClock::sleep_until(rcutils_time_point_value_t until) +{ + { + std::unique_lock lock(impl_->mutex); + const auto steady_until = impl_->ros_to_steady(until); + impl_->cv.wait_until(lock, steady_until); + } + return now() >= until; +} + +double TimeControllerClock::get_rate() const +{ + std::lock_guard lock(impl_->mutex); + return impl_->rate; +} + +} // namespace rosbag2_cpp diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp new file mode 100644 index 0000000000..9953bc0550 --- /dev/null +++ b/rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp @@ -0,0 +1,114 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "rosbag2_cpp/clocks/time_controller_clock.hpp" + +using namespace testing; // NOLINT +using SteadyTimePoint = std::chrono::steady_clock::time_point; +using NowFunction = rosbag2_cpp::PlayerClock::NowFunction; + +class TimeControllerClockTest : public Test +{ +public: + TimeControllerClockTest() + { + now_fn = [this]() { + return return_time; + }; + } + + rcutils_time_point_value_t as_nanos(const SteadyTimePoint & time) + { + return std::chrono::duration_cast(time.time_since_epoch()).count(); + } + + NowFunction now_fn; + SteadyTimePoint return_time; // defaults to 0 + rcutils_time_point_value_t ros_start_time = 0; +}; + +TEST_F(TimeControllerClockTest, steadytime_precision) +{ + const double playback_rate = 1.0; + rosbag2_cpp::TimeControllerClock pclock(ros_start_time, playback_rate, now_fn); + + const SteadyTimePoint begin_time(std::chrono::seconds(0)); + return_time = begin_time; + EXPECT_EQ(pclock.now(), as_nanos(begin_time)); + + const SteadyTimePoint ten_seconds(std::chrono::seconds(10)); + return_time = ten_seconds; + EXPECT_EQ(pclock.now(), as_nanos(ten_seconds)); + + // NOTE: this would have already lost precision at 100 seconds if we were multiplying by float + const SteadyTimePoint hundred_seconds(std::chrono::seconds(100)); + return_time = hundred_seconds; + EXPECT_EQ(pclock.now(), as_nanos(hundred_seconds)); + + const int64_t near_limit_nanos = 1LL << 61; + const auto near_limit_time = SteadyTimePoint(std::chrono::nanoseconds(near_limit_nanos)); + return_time = near_limit_time; + EXPECT_EQ(pclock.now(), near_limit_nanos); + + // Expect to lose exact equality at this point due to precision limit of double*int mult + const int64_t over_limit_nanos = (1LL << 61) + 1LL; + const auto over_limit_time = SteadyTimePoint(std::chrono::nanoseconds(over_limit_nanos)); + return_time = over_limit_time; + EXPECT_NE(pclock.now(), over_limit_nanos); + EXPECT_LT(std::abs(pclock.now() - over_limit_nanos), 10LL); +} + +TEST_F(TimeControllerClockTest, nonzero_start_time) +{ + ros_start_time = 1234567890LL; + const double playback_rate = 1.0; + rosbag2_cpp::TimeControllerClock pclock(ros_start_time, playback_rate, now_fn); + + const SteadyTimePoint begin_time(std::chrono::seconds(0)); + return_time = begin_time; + EXPECT_EQ(pclock.now(), ros_start_time); + + return_time = SteadyTimePoint(std::chrono::seconds(1)); + EXPECT_EQ(pclock.now(), ros_start_time + as_nanos(return_time)); +} + +TEST_F(TimeControllerClockTest, fast_rate) +{ + const double playback_rate = 2.5; + rosbag2_cpp::TimeControllerClock pclock(ros_start_time, playback_rate, now_fn); + + const SteadyTimePoint begin_time(std::chrono::seconds(0)); + return_time = begin_time; + EXPECT_EQ(pclock.now(), as_nanos(begin_time)); + + const SteadyTimePoint some_time(std::chrono::seconds(3)); + return_time = some_time; + EXPECT_EQ(pclock.now(), as_nanos(some_time) * playback_rate); +} + +TEST_F(TimeControllerClockTest, slow_rate) +{ + const double playback_rate = 0.4; + rosbag2_cpp::TimeControllerClock pclock(ros_start_time, playback_rate, now_fn); + + const SteadyTimePoint begin_time(std::chrono::seconds(0)); + return_time = begin_time; + EXPECT_EQ(pclock.now(), as_nanos(begin_time)); + + const SteadyTimePoint some_time(std::chrono::seconds(12)); + return_time = some_time; + EXPECT_EQ(pclock.now(), as_nanos(some_time) * playback_rate); +} diff --git a/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp b/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp index ffce2bd650..e971e4889f 100644 --- a/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp +++ b/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp @@ -31,6 +31,8 @@ struct SerializedBagMessage std::string topic_name; }; +typedef std::shared_ptr SerializedBagMessageSharedPtr; + } // namespace rosbag2_storage #endif // ROSBAG2_STORAGE__SERIALIZED_BAG_MESSAGE_HPP_ diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index 8aa04ead74..006ee6aac4 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -188,6 +188,12 @@ function(create_tests_for_rmw_implementation) ${SKIP_TEST} LINK_LIBS rosbag2_transport AMENT_DEPS rosbag2_cpp rosbag2_storage test_msgs rosbag2_test_common) + + rosbag2_transport_add_gmock(test_play_publish_clock + test/rosbag2_transport/test_play_publish_clock.cpp + ${SKIP_TEST} + LINK_LIBS rosbag2_transport + AMENT_DEPS rosbag2_cpp rosbag2_storage test_msgs rosbag2_test_common) endfunction() if(BUILD_TESTING) diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index b616a76591..1f551621d9 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -40,6 +40,10 @@ struct PlayOptions std::unordered_map topic_qos_profile_overrides = {}; bool loop = false; std::vector topic_remapping_options = {}; + + // Rate in Hz at which to publish to /clock. + // 0 (or negative) means that no publisher will be created + double clock_publish_frequency = 0.0; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 8cf8c962c4..de209f9c8d 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -29,6 +29,7 @@ #include "rcutils/time.h" +#include "rosbag2_cpp/clocks/time_controller_clock.hpp" #include "rosbag2_cpp/reader.hpp" #include "rosbag2_cpp/typesupport_helpers.hpp" @@ -38,7 +39,6 @@ #include "qos.hpp" #include "rosbag2_node.hpp" -#include "replayable_message.hpp" namespace { @@ -94,6 +94,17 @@ bool Player::is_storage_completely_loaded() const void Player::play(const PlayOptions & options) { + if (reader_->has_next()) { + // Reader does not have "peek", so we must "pop" the first message to see its timestamp + auto message = reader_->read_next(); + prepare_clock(options, message->time_stamp); + // Make sure that first message gets played by putting it into the play queue + message_queue_.enqueue(message); + } else { + // The bag contains no messages - there is nothing to play + return; + } + topic_qos_profile_overrides_ = options.topic_qos_profile_overrides; prepare_publishers(options); @@ -103,7 +114,7 @@ void Player::play(const PlayOptions & options) wait_for_filled_queue(options); - play_messages_from_queue(options); + play_messages_from_queue(); } void Player::wait_for_filled_queue(const PlayOptions & options) const @@ -118,49 +129,35 @@ void Player::wait_for_filled_queue(const PlayOptions & options) const void Player::load_storage_content(const PlayOptions & options) { - TimePoint time_first_message; - - ReplayableMessage message; - if (reader_->has_next()) { - message.message = reader_->read_next(); - message.time_since_start = std::chrono::nanoseconds(0); - time_first_message = TimePoint(std::chrono::nanoseconds(message.message->time_stamp)); - message_queue_.enqueue(message); - } - auto queue_lower_boundary = static_cast(options.read_ahead_queue_size * read_ahead_lower_bound_percentage_); auto queue_upper_boundary = options.read_ahead_queue_size; while (reader_->has_next() && rclcpp::ok()) { if (message_queue_.size_approx() < queue_lower_boundary) { - enqueue_up_to_boundary(time_first_message, queue_upper_boundary); + enqueue_up_to_boundary(queue_upper_boundary); } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } } -void Player::enqueue_up_to_boundary(const TimePoint & time_first_message, uint64_t boundary) +void Player::enqueue_up_to_boundary(uint64_t boundary) { - ReplayableMessage message; + rosbag2_storage::SerializedBagMessageSharedPtr message; for (size_t i = message_queue_.size_approx(); i < boundary; i++) { if (!reader_->has_next()) { break; } - message.message = reader_->read_next(); - message.time_since_start = - TimePoint(std::chrono::nanoseconds(message.message->time_stamp)) - time_first_message; - + message = reader_->read_next(); message_queue_.enqueue(message); } } -void Player::play_messages_from_queue(const PlayOptions & options) +void Player::play_messages_from_queue() { - start_time_ = std::chrono::system_clock::now(); do { - play_messages_until_queue_empty(options); + play_messages_until_queue_empty(); if (!is_storage_completely_loaded() && rclcpp::ok()) { ROSBAG2_TRANSPORT_LOG_WARN( "Message queue starved. Messages will be delayed. Consider " @@ -169,24 +166,17 @@ void Player::play_messages_from_queue(const PlayOptions & options) } while (!is_storage_completely_loaded() && rclcpp::ok()); } -void Player::play_messages_until_queue_empty(const PlayOptions & options) +void Player::play_messages_until_queue_empty() { - ReplayableMessage message; - - float rate = 1.0; - // Use rate if in valid range - if (options.rate > 0.0) { - rate = options.rate; - } - + rosbag2_storage::SerializedBagMessageSharedPtr message; while (message_queue_.try_dequeue(message) && rclcpp::ok()) { - std::this_thread::sleep_until( - start_time_ + std::chrono::duration_cast( - 1.0 / rate * message.time_since_start)); + // Do not move on until sleep_until returns true + // It will always sleep, so this is not a tight busy loop on pause + while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) {} if (rclcpp::ok()) { - auto publisher_iter = publishers_.find(message.message->topic_name); + auto publisher_iter = publishers_.find(message->topic_name); if (publisher_iter != publishers_.end()) { - publisher_iter->second->publish(message.message->serialized_data); + publisher_iter->second->publish(message->serialized_data); } } } @@ -224,4 +214,26 @@ void Player::prepare_publishers(const PlayOptions & options) } } +void Player::prepare_clock(const PlayOptions & options, rcutils_time_point_value_t starting_time) +{ + double rate = options.rate > 0.0 ? options.rate : 1.0; + clock_ = std::make_unique(starting_time, rate); + + // Create /clock publisher + if (options.clock_publish_frequency > 0.f) { + const auto publish_period = std::chrono::nanoseconds( + static_cast(RCUTILS_S_TO_NS(1) / options.clock_publish_frequency)); + // NOTE: PlayerClock does not own this publisher because rosbag2_cpp + // should not own transport-based functionality + clock_publisher_ = rosbag2_transport_->create_publisher( + "/clock", rclcpp::QoS(1)); + clock_publish_timer_ = rosbag2_transport_->create_wall_timer( + publish_period, [this]() { + auto msg = rosgraph_msgs::msg::Clock(); + msg.clock = rclcpp::Time(clock_->now()); + clock_publisher_->publish(msg); + }); + } +} + } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.hpp b/rosbag2_transport/src/rosbag2_transport/player.hpp index cc7a612ae1..7e74f6d936 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.hpp +++ b/rosbag2_transport/src/rosbag2_transport/player.hpp @@ -24,13 +24,14 @@ #include "moodycamel/readerwriterqueue.h" +#include "rclcpp/node.hpp" +#include "rclcpp/publisher.hpp" #include "rclcpp/qos.hpp" +#include "rosbag2_cpp/clocks/player_clock.hpp" +#include "rosbag2_storage/serialized_bag_message.hpp" #include "rosbag2_transport/play_options.hpp" - -#include "replayable_message.hpp" - -using TimePoint = std::chrono::time_point; +#include "rosgraph_msgs/msg/clock.hpp" namespace rosbag2_cpp { @@ -55,21 +56,24 @@ class Player private: void load_storage_content(const PlayOptions & options); bool is_storage_completely_loaded() const; - void enqueue_up_to_boundary(const TimePoint & time_first_message, uint64_t boundary); + void enqueue_up_to_boundary(uint64_t boundary); void wait_for_filled_queue(const PlayOptions & options) const; - void play_messages_from_queue(const PlayOptions & options); - void play_messages_until_queue_empty(const PlayOptions & options); + void play_messages_from_queue(); + void play_messages_until_queue_empty(); void prepare_publishers(const PlayOptions & options); + void prepare_clock(const PlayOptions & options, rcutils_time_point_value_t starting_time); static constexpr double read_ahead_lower_bound_percentage_ = 0.9; static const std::chrono::milliseconds queue_read_wait_period_; std::shared_ptr reader_; - moodycamel::ReaderWriterQueue message_queue_; - std::chrono::time_point start_time_; + moodycamel::ReaderWriterQueue message_queue_; mutable std::future storage_loading_future_; std::shared_ptr rosbag2_transport_; std::unordered_map> publishers_; std::unordered_map topic_qos_profile_overrides_; + std::unique_ptr clock_; + rclcpp::Publisher::SharedPtr clock_publisher_; + std::shared_ptr clock_publish_timer_; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/replayable_message.hpp b/rosbag2_transport/src/rosbag2_transport/replayable_message.hpp deleted file mode 100644 index 70b07f31df..0000000000 --- a/rosbag2_transport/src/rosbag2_transport/replayable_message.hpp +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2018 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef ROSBAG2_TRANSPORT__REPLAYABLE_MESSAGE_HPP_ -#define ROSBAG2_TRANSPORT__REPLAYABLE_MESSAGE_HPP_ - -#include -#include - -#include "rosbag2_storage/serialized_bag_message.hpp" - -namespace rosbag2_transport -{ - -struct ReplayableMessage -{ - std::shared_ptr message; - std::chrono::nanoseconds time_since_start; -}; - -} // namespace rosbag2_transport - -#endif // ROSBAG2_TRANSPORT__REPLAYABLE_MESSAGE_HPP_ diff --git a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp index 57c866361d..0097be7c9b 100644 --- a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp +++ b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp @@ -23,6 +23,7 @@ #include "rclcpp/rclcpp.hpp" +#include "rcpputils/scope_exit.hpp" #include "rcutils/time.h" #include "rosbag2_cpp/info.hpp" @@ -96,9 +97,20 @@ std::shared_ptr Rosbag2Transport::setup_node( void Rosbag2Transport::play( const rosbag2_storage::StorageOptions & storage_options, const PlayOptions & play_options) { + auto transport_node = + setup_node(play_options.node_prefix, play_options.topic_remapping_options); + rclcpp::executors::SingleThreadedExecutor exec; + exec.add_node(transport_node); + auto spin_thread = std::thread( + [&exec]() { + exec.spin(); + }); + auto exit = rcpputils::make_scope_exit( + [&exec, &spin_thread]() { + exec.cancel(); + spin_thread.join(); + }); try { - auto transport_node = - setup_node(play_options.node_prefix, play_options.topic_remapping_options); Player player(reader_, transport_node); do { reader_->open(storage_options, {"", rmw_get_serialization_format()}); diff --git a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp index 8349dd9149..86d89022a1 100644 --- a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp +++ b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp @@ -267,6 +267,7 @@ rosbag2_transport_play(PyObject * Py_UNUSED(self), PyObject * args, PyObject * k "loop", "topic_remapping", "storage_config_file", + "clock_publish_frequency", nullptr }; @@ -280,8 +281,9 @@ rosbag2_transport_play(PyObject * Py_UNUSED(self), PyObject * args, PyObject * k bool loop = false; PyObject * topic_remapping = nullptr; char * storage_config_file = nullptr; + float clock_publish_frequency = 0.0; if (!PyArg_ParseTupleAndKeywords( - args, kwargs, "sss|kfOObOs", const_cast(kwlist), + args, kwargs, "sss|kfOObOsf", const_cast(kwlist), &uri, &storage_id, &node_prefix, @@ -291,7 +293,8 @@ rosbag2_transport_play(PyObject * Py_UNUSED(self), PyObject * args, PyObject * k &qos_profile_overrides, &loop, &topic_remapping, - &storage_config_file)) + &storage_config_file, + &clock_publish_frequency)) { return nullptr; } @@ -304,6 +307,7 @@ rosbag2_transport_play(PyObject * Py_UNUSED(self), PyObject * args, PyObject * k play_options.read_ahead_queue_size = read_ahead_queue_size; play_options.rate = rate; play_options.loop = loop; + play_options.clock_publish_frequency = clock_publish_frequency; if (topics) { PyObject * topic_iterator = PyObject_GetIter(topics); diff --git a/rosbag2_transport/test/rosbag2_transport/rosbag2_play_test_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/rosbag2_play_test_fixture.hpp index b33d3f2267..c68ab7b01f 100644 --- a/rosbag2_transport/test/rosbag2_transport/rosbag2_play_test_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/rosbag2_play_test_fixture.hpp @@ -17,6 +17,7 @@ #include +#include "rosbag2_test_common/subscription_manager.hpp" #include "rosbag2_transport_test_fixture.hpp" class RosBag2PlayTestFixture : public Rosbag2TransportTestFixture diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_publish_clock.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_publish_clock.cpp new file mode 100644 index 0000000000..cd479f55fb --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_play_publish_clock.cpp @@ -0,0 +1,115 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rosbag2_transport/rosbag2_transport.hpp" +#include "rosgraph_msgs/msg/clock.hpp" +#include "test_msgs/message_fixtures.hpp" + +#include "rosbag2_play_test_fixture.hpp" + +using namespace ::testing; // NOLINT + +namespace +{ +rcutils_duration_value_t period_for_frequency(double frequency) +{ + return static_cast(RCUTILS_S_TO_NS(1) / frequency); +} +} // namespace + +class ClockPublishFixture : public RosBag2PlayTestFixture +{ +public: + ClockPublishFixture() + : RosBag2PlayTestFixture() + { + // Fake bag setup + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}, + }; + + std::vector> messages; + for (size_t i = 0; i < messages_to_play_; i++) { + auto message = get_messages_basic_types()[0]; + message->int32_value = static_cast(i); + messages.push_back( + serialize_test_message("topic1", milliseconds_between_messages_ * i, message)); + } + + // Player setup + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + reader_ = std::make_unique(std::move(prepared_mock_reader)); + + sub_->add_subscription( + "/clock", expected_clock_messages_, rclcpp::SensorDataQoS().keep_last(1)); + } + + void run_test() + { + auto await_received_messages = sub_->spin_subscriptions(); + rosbag2_transport::Rosbag2Transport rosbag2_transport(reader_, writer_, info_); + rosbag2_transport.play(storage_options_, play_options_); + await_received_messages.get(); + + // Check that we got enough messages + auto received_clock = sub_->get_received_messages("/clock"); + EXPECT_THAT(received_clock, SizeIs(Ge(expected_clock_messages_))); + + // Check time deltas between messages + const auto expect_clock_delta = + period_for_frequency(play_options_.clock_publish_frequency) * play_options_.rate; + const auto allowed_error = static_cast( + expect_clock_delta * error_tolerance_); + // On Windows, publishing seems to take time to "warm up", ignore the first couple messages + const auto start_message = 2; + for (size_t i = start_message; i < expected_clock_messages_ - 1; i++) { + auto current = rclcpp::Time(received_clock[i]->clock).nanoseconds(); + auto next = rclcpp::Time(received_clock[i + 1]->clock).nanoseconds(); + auto delta = next - current; + auto error = std::abs(delta - expect_clock_delta); + EXPECT_LE(error, allowed_error) << "Message was too far from next: " << i; + } + } + + // Number of bag messages to publish + size_t messages_to_play_ = 10; + const int64_t milliseconds_between_messages_ = 50; + + // Amount of error allowed between expected and actual clock deltas (expected to be much smaller) + const double error_tolerance_ = 0.3; + + // Wait for just a few clock messages, we're checking the time between them, not the total count + size_t expected_clock_messages_ = 6; +}; + +TEST_F(ClockPublishFixture, clock_is_published_at_chosen_frequency) +{ + play_options_.clock_publish_frequency = 20; + run_test(); +} + +TEST_F(ClockPublishFixture, clock_respects_playback_rate) +{ + play_options_.clock_publish_frequency = 20; + play_options_.rate = 0.5; + messages_to_play_ = 5; + run_test(); +}