Skip to content

Commit

Permalink
Add seek API to the Player class
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
  • Loading branch information
MichaelOrlov committed Jul 31, 2021
1 parent 016f2d7 commit e943710
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 22 deletions.
21 changes: 16 additions & 5 deletions rosbag2_transport/include/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,8 @@ class Player : public rclcpp::Node
ROSBAG2_TRANSPORT_PUBLIC
double get_rate() const;

/// Set the playback rate.
/**
* Set the playback rate.
* \return false if an invalid value was provided (<= 0).
*/
/// \brief Set the playback rate.
/// \return false if an invalid value was provided (<= 0).
ROSBAG2_TRANSPORT_PUBLIC
bool set_rate(double);

Expand All @@ -122,6 +119,16 @@ class Player : public rclcpp::Node
ROSBAG2_TRANSPORT_PUBLIC
bool play_next();

/// \brief Advance player to the message with closest timestamp >= time_point.
/// \details This is blocking call and it will wait until current message will be published
/// and message queue will be refilled. Could take substantial amount of time to find
/// required message or return player to the original stage.
/// \param time_point Time point in ROS playback timeline.
/// \throw runtime_error if fail to restore message queue.
/// \return true if valid time in bag duration and successful seek operation, otherwise false.
ROSBAG2_TRANSPORT_PUBLIC
bool seek(rcutils_time_point_value_t time_point);

protected:
bool is_ready_to_play_from_queue_{false};
std::mutex ready_to_play_from_queue_mutex_;
Expand All @@ -138,8 +145,12 @@ class Player : public rclcpp::Node
void play_messages_from_queue();
void prepare_publishers();
bool publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message);
void restore_message_queue_from_storage(
rosbag2_storage::SerializedBagMessageSharedPtr origin_message) RCPPUTILS_TSA_REQUIRES(
reader_mutex_);
static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
static const std::chrono::milliseconds queue_read_wait_period_;
std::atomic_bool cancel_wait_for_next_message_{false};

std::mutex reader_mutex_;
std::unique_ptr<rosbag2_cpp::Reader> reader_ RCPPUTILS_TSA_GUARDED_BY(reader_mutex_);
Expand Down
158 changes: 141 additions & 17 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,128 @@ bool Player::play_next()
return next_message_published;
}

void Player::restore_message_queue_from_storage(
rosbag2_storage::SerializedBagMessageSharedPtr origin_message)
{
reader_->close();
reader_->open(storage_options_, {"", rmw_get_serialization_format()});
rosbag2_storage::SerializedBagMessageSharedPtr current_message = reader_->read_next();
while (current_message->time_stamp != origin_message->time_stamp) {
current_message = reader_->read_next();
}
if (current_message->time_stamp != origin_message->time_stamp) {
throw std::runtime_error("Fail to restore message queue");
}
message_queue_.enqueue(current_message);
enqueue_up_to_boundary(play_options_.read_ahead_queue_size); // refill queue
}

bool Player::seek(rcutils_time_point_value_t time_point)
{
// Temporary stop playback in play_messages_from_queue() and block play_next()
std::lock_guard<std::mutex> main_play_loop_lk(skip_message_in_main_play_loop_mutex_);
skip_message_in_main_play_loop_ = true;
// Wait for player to be ready for playback messages from queue i.e. wait for Player:play() to
// be called if not yet and queue to be filled with messages.
{
std::unique_lock<std::mutex> lk(ready_to_play_from_queue_mutex_);
ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;});
}

cancel_wait_for_next_message_ = true;
bool message_found = false;
rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = peek_next_message_from_queue();
rosbag2_storage::SerializedBagMessageSharedPtr message_before_seek;
if (message_ptr != nullptr) {
message_before_seek = *message_ptr;
}

{ // Update message queue and adjust reader pointer
std::lock_guard<std::mutex> lk(reader_mutex_);
// Read next current_message
rosbag2_storage::SerializedBagMessageSharedPtr current_message;
if (!message_queue_.try_dequeue(current_message)) {
if (!reader_->has_next()) {
reader_->close();
reader_->open(storage_options_, {"", rmw_get_serialization_format()});
if (!reader_->has_next()) {
return false; // The bag is empty
}
}
current_message = reader_->read_next();
}

const bool jump_forward = (time_point >= current_message->time_stamp);

if (jump_forward) {
if (current_message->time_stamp >= time_point) {
message_found = true;
}
// Pop up current_message queue trying to find current_message with needed timestamp
while (!message_found && message_queue_.try_dequeue(current_message)) {
if (current_message->time_stamp >= time_point) {
message_found = true;
}
}
while (!message_found && reader_->has_next()) {
current_message = reader_->read_next();
if (current_message->time_stamp >= time_point) {
message_found = true;
}
}
if (message_found) {
// Enqueue current_message and rest of the queue to the queue again to preserve order of
// messages
if (message_queue_.peek() != nullptr) { // if message_queue_ not empty
decltype(message_queue_) temp_message_queue;
temp_message_queue.enqueue(current_message);
// Copy residual messages from message_queue_ to the temp_message_queue
while (message_queue_.try_dequeue(current_message)) {
temp_message_queue.enqueue(current_message);
}
// Copy all messages back to the message_queue_
while (temp_message_queue.try_dequeue(current_message)) {
message_queue_.enqueue(current_message);
}
enqueue_up_to_boundary(play_options_.read_ahead_queue_size); // refill queue
} else {
message_queue_.enqueue(current_message);
enqueue_up_to_boundary(play_options_.read_ahead_queue_size); // refill queue
}
} else {
restore_message_queue_from_storage(message_before_seek);
}
} else { // jump_backward
// Purge queue
while (message_queue_.pop()) {}
// Reopen bag
reader_->close();
reader_->open(storage_options_, {"", rmw_get_serialization_format()});

// Try to find current_message in bag with timestamp >= time_point
while (!message_found && reader_->has_next()) {
current_message = reader_->read_next();
if (current_message->time_stamp >= time_point) {
message_found = true;
}
}
if (message_found) {
message_queue_.enqueue(current_message);
enqueue_up_to_boundary(play_options_.read_ahead_queue_size); // refill queue
} else {
restore_message_queue_from_storage(message_before_seek);
}
}
}

if (message_found) {
clock_->jump(time_point);
return true;
} else {
return false;
}
}

void Player::wait_for_filled_queue() const
{
while (
Expand Down Expand Up @@ -405,25 +527,27 @@ void Player::play_messages_from_queue()
ready_to_play_from_queue_cv_.notify_all();
}
while (message_ptr != nullptr && rclcpp::ok()) {
{
rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr;
// Do not move on until sleep_until returns true
// It will always sleep, so this is not a tight busy loop on pause
while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) {}
if (rclcpp::ok()) {
{
std::lock_guard<std::mutex> lk(skip_message_in_main_play_loop_mutex_);
if (skip_message_in_main_play_loop_) {
skip_message_in_main_play_loop_ = false;
message_ptr = peek_next_message_from_queue();
continue;
}
}
publish_message(message);
rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr;
// Do not move on until sleep_until returns true
// It will always sleep, so this is not a tight busy loop on pause
while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) {
if (cancel_wait_for_next_message_) {
cancel_wait_for_next_message_ = false;
break;
}
message_queue_.pop();
message_ptr = peek_next_message_from_queue();
}
std::lock_guard<std::mutex> lk(skip_message_in_main_play_loop_mutex_);
if (rclcpp::ok()) {
if (skip_message_in_main_play_loop_) {
skip_message_in_main_play_loop_ = false;
cancel_wait_for_next_message_ = false;
message_ptr = peek_next_message_from_queue();
continue;
}
publish_message(message);
}
message_queue_.pop();
message_ptr = peek_next_message_from_queue();
}
}

Expand Down

0 comments on commit e943710

Please sign in to comment.