From e943710fc6f4efd23e3fa83aff70b62289c3649b Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Fri, 30 Jul 2021 17:45:10 -0700 Subject: [PATCH] Add seek API to the Player class Signed-off-by: Michael Orlov --- .../include/rosbag2_transport/player.hpp | 21 ++- .../src/rosbag2_transport/player.cpp | 158 ++++++++++++++++-- 2 files changed, 157 insertions(+), 22 deletions(-) diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 5c5aaa088e..390a1b61fb 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -105,11 +105,8 @@ class Player : public rclcpp::Node ROSBAG2_TRANSPORT_PUBLIC double get_rate() const; - /// Set the playback rate. - /** - * Set the playback rate. - * \return false if an invalid value was provided (<= 0). - */ + /// \brief Set the playback rate. + /// \return false if an invalid value was provided (<= 0). ROSBAG2_TRANSPORT_PUBLIC bool set_rate(double); @@ -122,6 +119,16 @@ class Player : public rclcpp::Node ROSBAG2_TRANSPORT_PUBLIC bool play_next(); + /// \brief Advance player to the message with closest timestamp >= time_point. + /// \details This is blocking call and it will wait until current message will be published + /// and message queue will be refilled. Could take substantial amount of time to find + /// required message or return player to the original stage. + /// \param time_point Time point in ROS playback timeline. + /// \throw runtime_error if fail to restore message queue. + /// \return true if valid time in bag duration and successful seek operation, otherwise false. + ROSBAG2_TRANSPORT_PUBLIC + bool seek(rcutils_time_point_value_t time_point); + protected: bool is_ready_to_play_from_queue_{false}; std::mutex ready_to_play_from_queue_mutex_; @@ -138,8 +145,12 @@ class Player : public rclcpp::Node void play_messages_from_queue(); void prepare_publishers(); bool publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message); + void restore_message_queue_from_storage( + rosbag2_storage::SerializedBagMessageSharedPtr origin_message) RCPPUTILS_TSA_REQUIRES( + reader_mutex_); static constexpr double read_ahead_lower_bound_percentage_ = 0.9; static const std::chrono::milliseconds queue_read_wait_period_; + std::atomic_bool cancel_wait_for_next_message_{false}; std::mutex reader_mutex_; std::unique_ptr reader_ RCPPUTILS_TSA_GUARDED_BY(reader_mutex_); diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index d2ba2a3ac0..9276b0ca0d 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -350,6 +350,128 @@ bool Player::play_next() return next_message_published; } +void Player::restore_message_queue_from_storage( + rosbag2_storage::SerializedBagMessageSharedPtr origin_message) +{ + reader_->close(); + reader_->open(storage_options_, {"", rmw_get_serialization_format()}); + rosbag2_storage::SerializedBagMessageSharedPtr current_message = reader_->read_next(); + while (current_message->time_stamp != origin_message->time_stamp) { + current_message = reader_->read_next(); + } + if (current_message->time_stamp != origin_message->time_stamp) { + throw std::runtime_error("Fail to restore message queue"); + } + message_queue_.enqueue(current_message); + enqueue_up_to_boundary(play_options_.read_ahead_queue_size); // refill queue +} + +bool Player::seek(rcutils_time_point_value_t time_point) +{ + // Temporary stop playback in play_messages_from_queue() and block play_next() + std::lock_guard main_play_loop_lk(skip_message_in_main_play_loop_mutex_); + skip_message_in_main_play_loop_ = true; + // Wait for player to be ready for playback messages from queue i.e. wait for Player:play() to + // be called if not yet and queue to be filled with messages. + { + std::unique_lock lk(ready_to_play_from_queue_mutex_); + ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;}); + } + + cancel_wait_for_next_message_ = true; + bool message_found = false; + rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = peek_next_message_from_queue(); + rosbag2_storage::SerializedBagMessageSharedPtr message_before_seek; + if (message_ptr != nullptr) { + message_before_seek = *message_ptr; + } + + { // Update message queue and adjust reader pointer + std::lock_guard lk(reader_mutex_); + // Read next current_message + rosbag2_storage::SerializedBagMessageSharedPtr current_message; + if (!message_queue_.try_dequeue(current_message)) { + if (!reader_->has_next()) { + reader_->close(); + reader_->open(storage_options_, {"", rmw_get_serialization_format()}); + if (!reader_->has_next()) { + return false; // The bag is empty + } + } + current_message = reader_->read_next(); + } + + const bool jump_forward = (time_point >= current_message->time_stamp); + + if (jump_forward) { + if (current_message->time_stamp >= time_point) { + message_found = true; + } + // Pop up current_message queue trying to find current_message with needed timestamp + while (!message_found && message_queue_.try_dequeue(current_message)) { + if (current_message->time_stamp >= time_point) { + message_found = true; + } + } + while (!message_found && reader_->has_next()) { + current_message = reader_->read_next(); + if (current_message->time_stamp >= time_point) { + message_found = true; + } + } + if (message_found) { + // Enqueue current_message and rest of the queue to the queue again to preserve order of + // messages + if (message_queue_.peek() != nullptr) { // if message_queue_ not empty + decltype(message_queue_) temp_message_queue; + temp_message_queue.enqueue(current_message); + // Copy residual messages from message_queue_ to the temp_message_queue + while (message_queue_.try_dequeue(current_message)) { + temp_message_queue.enqueue(current_message); + } + // Copy all messages back to the message_queue_ + while (temp_message_queue.try_dequeue(current_message)) { + message_queue_.enqueue(current_message); + } + enqueue_up_to_boundary(play_options_.read_ahead_queue_size); // refill queue + } else { + message_queue_.enqueue(current_message); + enqueue_up_to_boundary(play_options_.read_ahead_queue_size); // refill queue + } + } else { + restore_message_queue_from_storage(message_before_seek); + } + } else { // jump_backward + // Purge queue + while (message_queue_.pop()) {} + // Reopen bag + reader_->close(); + reader_->open(storage_options_, {"", rmw_get_serialization_format()}); + + // Try to find current_message in bag with timestamp >= time_point + while (!message_found && reader_->has_next()) { + current_message = reader_->read_next(); + if (current_message->time_stamp >= time_point) { + message_found = true; + } + } + if (message_found) { + message_queue_.enqueue(current_message); + enqueue_up_to_boundary(play_options_.read_ahead_queue_size); // refill queue + } else { + restore_message_queue_from_storage(message_before_seek); + } + } + } + + if (message_found) { + clock_->jump(time_point); + return true; + } else { + return false; + } +} + void Player::wait_for_filled_queue() const { while ( @@ -405,25 +527,27 @@ void Player::play_messages_from_queue() ready_to_play_from_queue_cv_.notify_all(); } while (message_ptr != nullptr && rclcpp::ok()) { - { - rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr; - // Do not move on until sleep_until returns true - // It will always sleep, so this is not a tight busy loop on pause - while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) {} - if (rclcpp::ok()) { - { - std::lock_guard lk(skip_message_in_main_play_loop_mutex_); - if (skip_message_in_main_play_loop_) { - skip_message_in_main_play_loop_ = false; - message_ptr = peek_next_message_from_queue(); - continue; - } - } - publish_message(message); + rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr; + // Do not move on until sleep_until returns true + // It will always sleep, so this is not a tight busy loop on pause + while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) { + if (cancel_wait_for_next_message_) { + cancel_wait_for_next_message_ = false; + break; } - message_queue_.pop(); - message_ptr = peek_next_message_from_queue(); } + std::lock_guard lk(skip_message_in_main_play_loop_mutex_); + if (rclcpp::ok()) { + if (skip_message_in_main_play_loop_) { + skip_message_in_main_play_loop_ = false; + cancel_wait_for_next_message_ = false; + message_ptr = peek_next_message_from_queue(); + continue; + } + publish_message(message); + } + message_queue_.pop(); + message_ptr = peek_next_message_from_queue(); } }