From 9eb778e25a02ad8873949ac7401e34050231d714 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Thu, 25 Mar 2021 16:26:29 -0700 Subject: [PATCH] Clock publisher in Player Add a `rosgraph_msgs/Clock` publisher to the `Player` - that uses `PlayerClock::now()` to publish current time. Signed-off-by: Emerson Knapp --- ros2bag/ros2bag/verb/play.py | 11 +++++++ .../rosbag2_cpp/clocks/player_clock.hpp | 5 ++-- .../clocks/time_controller_clock.hpp | 10 +++++-- .../clocks/time_controller_clock.cpp | 9 +++++- .../test_time_controller_clock.cpp | 9 +++--- rosbag2_py/src/rosbag2_py/_transport.cpp | 1 + .../rosbag2_transport/play_options.hpp | 4 +++ .../src/rosbag2_transport/player.cpp | 29 +++++++++++++++++-- .../src/rosbag2_transport/player.hpp | 6 ++-- 9 files changed, 70 insertions(+), 14 deletions(-) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 2198987e1e..c2171a23b0 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -27,6 +27,12 @@ from rosbag2_py import StorageOptions import yaml +def positive_float(arg: str) -> float: + value = float(arg) + if value <= 0: + raise ValueError(f'Value {value} is less than or equal to zero.') + return value + class PlayVerb(VerbExtension): """Play back ROS data from a bag.""" @@ -71,6 +77,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102 ' pragmas: [\"\" = ]' 'Note that applicable settings are limited to read-only for ros2 bag play.' 'For a list of sqlite3 settings, refer to sqlite3 documentation') + parser.add_argument( + '--clock', type=positive_float, default=0, + help='Publish to /clock at a specific frequency in Hz, to act as a ROS Time Source. ' + 'Value must be positive. Defaults to not publishing.') def main(self, *, args): # noqa: D102 qos_profile_overrides = {} # Specify a valid default @@ -104,6 +114,7 @@ def main(self, *, args): # noqa: D102 play_options.topic_qos_profile_overrides = qos_profile_overrides play_options.loop = args.loop play_options.topic_remapping_options = topic_remapping + play_options.clock_publish_frequency = args.clock player = Player() player.play(storage_options, play_options) diff --git a/rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp b/rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp index 78f25f95c3..34e68e084a 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp @@ -52,9 +52,10 @@ class PlayerClock /** * Try to sleep (non-busy) the current thread until the provided time is reached - according to this Clock * - * Return true if the time has been reached, false if it was not successfully reached after sleeping - * for the appropriate duration. * The user should not take action based on this sleep until it returns true. + * + * \param until: The ROS time to sleep until + * \return true if the `until` has been reached, false if timeout or awakened early. */ ROSBAG2_CPP_PUBLIC virtual bool sleep_until(rcutils_time_point_value_t until) = 0; diff --git a/rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp b/rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp index 980e43545f..f4bff58bb9 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp @@ -36,6 +36,10 @@ class TimeControllerClock : public PlayerClock * * \param starting_time: provides an initial offset for managing time * This will likely be the timestamp of the first message in the bag + * \param sleep_timeout: maximum amount of time to sleep even if time is not reached + * This helps dictate spin rate when clock is paused, as well as allowing periodic return + * of control even if messages are far apart. + * Value <= 0 means that sleep_until could theoretically wait forever without returning. * \param rate: Rate of playback, a unit-free ratio. 1.0 is real-time * \param now_fn: Function used to get the current steady time * defaults to std::chrono::steady_clock::now @@ -45,6 +49,7 @@ class TimeControllerClock : public PlayerClock ROSBAG2_CPP_PUBLIC TimeControllerClock( rcutils_time_point_value_t starting_time, + std::chrono::nanoseconds sleep_timeout, double rate = 1.0, NowFunction now_fn = std::chrono::steady_clock::now); @@ -60,9 +65,10 @@ class TimeControllerClock : public PlayerClock /** * Try to sleep (non-busy) the current thread until the provided time is reached - according to this Clock * - * Return true if the time has been reached, false if it was not successfully reached after sleeping - * for the appropriate duration. * The user should not take action based on this sleep until it returns true. + * + * \param until: The ROS time to sleep until + * \return true if the `until` has been reached, false if timeout or awakened early. */ ROSBAG2_CPP_PUBLIC bool sleep_until(rcutils_time_point_value_t until) override; diff --git a/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp b/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp index dc00bf14d2..f274657fb7 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp @@ -62,6 +62,7 @@ class TimeControllerClockImpl std::mutex mutex; std::condition_variable cv; + std::chrono::nanoseconds sleep_timeout; PlayerClock::NowFunction now_fn RCPPUTILS_TSA_GUARDED_BY(mutex); double rate RCPPUTILS_TSA_GUARDED_BY(mutex) = 1.0; TimeReference reference RCPPUTILS_TSA_GUARDED_BY(mutex); @@ -69,12 +70,14 @@ class TimeControllerClockImpl TimeControllerClock::TimeControllerClock( rcutils_time_point_value_t starting_time, + std::chrono::nanoseconds sleep_timeout, double rate, NowFunction now_fn) : impl_(std::make_unique()) { std::lock_guard lock(impl_->mutex); impl_->now_fn = now_fn; + impl_->sleep_timeout = sleep_timeout; impl_->reference.ros = starting_time; impl_->reference.steady = impl_->now_fn(); impl_->rate = rate; @@ -93,7 +96,11 @@ bool TimeControllerClock::sleep_until(rcutils_time_point_value_t until) { { std::unique_lock lock(impl_->mutex); - const auto steady_until = impl_->ros_to_steady(until); + const auto until_timeout = impl_->now_fn() + impl_->sleep_timeout; + auto steady_until = impl_->ros_to_steady(until); + if (until_timeout < steady_until) { + steady_until = until_timeout; + } impl_->cv.wait_until(lock, steady_until); } return now() >= until; diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp index 9953bc0550..3297798bde 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp @@ -37,13 +37,14 @@ class TimeControllerClockTest : public Test NowFunction now_fn; SteadyTimePoint return_time; // defaults to 0 + std::chrono::nanoseconds sleep_timeout{25000000}; // 40Hz rcutils_time_point_value_t ros_start_time = 0; }; TEST_F(TimeControllerClockTest, steadytime_precision) { const double playback_rate = 1.0; - rosbag2_cpp::TimeControllerClock pclock(ros_start_time, playback_rate, now_fn); + rosbag2_cpp::TimeControllerClock pclock(ros_start_time, sleep_timeout, playback_rate, now_fn); const SteadyTimePoint begin_time(std::chrono::seconds(0)); return_time = begin_time; @@ -75,7 +76,7 @@ TEST_F(TimeControllerClockTest, nonzero_start_time) { ros_start_time = 1234567890LL; const double playback_rate = 1.0; - rosbag2_cpp::TimeControllerClock pclock(ros_start_time, playback_rate, now_fn); + rosbag2_cpp::TimeControllerClock pclock(ros_start_time, sleep_timeout, playback_rate, now_fn); const SteadyTimePoint begin_time(std::chrono::seconds(0)); return_time = begin_time; @@ -88,7 +89,7 @@ TEST_F(TimeControllerClockTest, nonzero_start_time) TEST_F(TimeControllerClockTest, fast_rate) { const double playback_rate = 2.5; - rosbag2_cpp::TimeControllerClock pclock(ros_start_time, playback_rate, now_fn); + rosbag2_cpp::TimeControllerClock pclock(ros_start_time, sleep_timeout, playback_rate, now_fn); const SteadyTimePoint begin_time(std::chrono::seconds(0)); return_time = begin_time; @@ -102,7 +103,7 @@ TEST_F(TimeControllerClockTest, fast_rate) TEST_F(TimeControllerClockTest, slow_rate) { const double playback_rate = 0.4; - rosbag2_cpp::TimeControllerClock pclock(ros_start_time, playback_rate, now_fn); + rosbag2_cpp::TimeControllerClock pclock(ros_start_time, sleep_timeout, playback_rate, now_fn); const SteadyTimePoint begin_time(std::chrono::seconds(0)); return_time = begin_time; diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 264a1157dd..4c6bb0a202 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -190,6 +190,7 @@ PYBIND11_MODULE(_transport, m) { &PlayOptions::setTopicQoSProfileOverrides) .def_readwrite("loop", &PlayOptions::loop) .def_readwrite("topic_remapping_options", &PlayOptions::topic_remapping_options) + .def_readwrite("clock_publish_frequency", &PlayOptions::clock_publish_frequency) ; py::class_(m, "RecordOptions") diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index b616a76591..6142fcc963 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -40,6 +40,10 @@ struct PlayOptions std::unordered_map topic_qos_profile_overrides = {}; bool loop = false; std::vector topic_remapping_options = {}; + + // Rate in Hz at which to publish to /clock. + // 0 (or negative) means that no publisher will be created + float clock_publish_frequency = 0.f; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 6088cf200b..be0b8ba666 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -169,10 +169,14 @@ void Player::play_messages_from_queue() void Player::play_messages_until_queue_empty() { rosbag2_storage::SerializedBagMessageSharedPtr message; + rclcpp::executors::SingleThreadedExecutor exec; + exec.add_node(rosbag2_transport_); while (message_queue_.try_dequeue(message) && rclcpp::ok()) { // Do not move on until sleep_until returns true // It will always sleep, so this is not a tight busy loop on pause - while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) {} + do { + exec.spin_some(); + } while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)); if (rclcpp::ok()) { auto publisher_iter = publishers_.find(message->topic_name); if (publisher_iter != publishers_.end()) { @@ -216,8 +220,27 @@ void Player::prepare_publishers(const PlayOptions & options) void Player::prepare_clock(const PlayOptions & options, rcutils_time_point_value_t starting_time) { - double rate = options.rate > 0.0 ? options.rate : 1.0; - clock_ = std::make_unique(starting_time, rate); + float rate = options.rate > 0.0 ? options.rate : 1.0; + std::chrono::nanoseconds sleep_timeout{25000000}; // default to 40Hz maximum spin + + // Create /clock publisher + if (options.clock_publish_frequency > 0.f) { + // NOTE: PlayerClock does not own this publisher because rosbag2_cpp + // should not own transport-based functionality + std::chrono::duration clock_period{1.0 / options.clock_publish_frequency}; + // When publishing clock, make sure we wake up often enough to spin and publish + sleep_timeout = std::chrono::duration_cast(clock_period); + clock_publisher_ = rosbag2_transport_->create_publisher( + "/clock", rclcpp::ClockQoS()); + clock_publish_timer_ = rosbag2_transport_->create_wall_timer( + sleep_timeout, [this]() { + auto msg = rosgraph_msgs::msg::Clock(); + msg.clock = rclcpp::Time(clock_->now()); + clock_publisher_->publish(msg); + }); + } + + clock_ = std::make_unique(starting_time, sleep_timeout, rate); } } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.hpp b/rosbag2_transport/src/rosbag2_transport/player.hpp index d65a6dd530..76b1d83e99 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.hpp +++ b/rosbag2_transport/src/rosbag2_transport/player.hpp @@ -23,12 +23,12 @@ #include #include "moodycamel/readerwriterqueue.h" - +#include "rclcpp/publisher.hpp" #include "rclcpp/qos.hpp" - #include "rosbag2_cpp/clocks/player_clock.hpp" #include "rosbag2_storage/serialized_bag_message.hpp" #include "rosbag2_transport/play_options.hpp" +#include "rosgraph_msgs/msg/clock.hpp" namespace rosbag2_cpp { @@ -69,6 +69,8 @@ class Player std::unordered_map> publishers_; std::unordered_map topic_qos_profile_overrides_; std::unique_ptr clock_; + rclcpp::Publisher::SharedPtr clock_publisher_; + std::shared_ptr clock_publish_timer_; }; } // namespace rosbag2_transport