Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add play-for functionality #960

Merged
merged 9 commits into from
Jun 2, 2022
9 changes: 9 additions & 0 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102
parser.add_argument(
'-d', '--delay', type=positive_float, default=0.0,
help='Sleep duration before play (each loop), in seconds. Negative durations invalid.')
parser.add_argument(
'--playback_duration', type=float, default=-1.0,
help='Playback duration, in seconds. Negative durations mark an infinite playback. '
'Default is -1.0.')
parser.add_argument(
'--disable-keyboard-controls', action='store_true',
help='disables keyboard controls for playback')
Expand All @@ -111,6 +115,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'By default, if loaned message can be used, messages are published as loaned '
'message. It can help to reduce the number of data copies, so there is a greater '
'benefit for sending big data.')
parser.add_argument(
'-f', '--duration', type=float, default=None,
help='Play for SEC seconds. Default is None, meaning that playback will continue '
'until the end of the bag. Valid range > 0.0')

def main(self, *, args): # noqa: D102
qos_profile_overrides = {} # Specify a valid default
Expand Down Expand Up @@ -146,6 +154,7 @@ def main(self, *, args): # noqa: D102
play_options.topic_remapping_options = topic_remapping
play_options.clock_publish_frequency = args.clock
play_options.delay = args.delay
play_options.playback_duration = args.playback_duration
play_options.disable_keyboard_controls = args.disable_keyboard_controls
play_options.start_paused = args.start_paused
play_options.start_offset = args.start_offset
Expand Down
1 change: 1 addition & 0 deletions rosbag2_interfaces/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ rosidl_generate_interfaces(${PROJECT_NAME}
"srv/GetRate.srv"
"srv/IsPaused.srv"
"srv/Pause.srv"
"srv/Play.srv"
"srv/PlayNext.srv"
"srv/Resume.srv"
"srv/Seek.srv"
Expand Down
7 changes: 7 additions & 0 deletions rosbag2_interfaces/srv/Play.srv
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# See rosbag2_transport::PlayOptions::start_offset
builtin_interfaces/Time start_offset
# See rosbag2_transport::PlayOptions::playback_duration
builtin_interfaces/Duration playback_duration
---
# returns false when another playback execution is running, otherwise true
bool success
17 changes: 16 additions & 1 deletion rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ struct OptionsWrapper : public T
static_cast<rcl_duration_value_t>(RCUTILS_S_TO_NS(delay)));
}

double getPlaybackDuration() const
{
return RCUTILS_NS_TO_S(static_cast<double>(this->playback_duration.nanoseconds()));
}

void setPlaybackDuration(double playback_duration)
{
this->playback_duration = rclcpp::Duration::from_nanoseconds(
static_cast<rcl_duration_value_t>(RCUTILS_S_TO_NS(playback_duration)));
}

double getDelay() const
{
return RCUTILS_NS_TO_S(static_cast<double>(this->delay.nanoseconds()));
Expand Down Expand Up @@ -241,6 +252,10 @@ PYBIND11_MODULE(_transport, m) {
"delay",
&PlayOptions::getDelay,
&PlayOptions::setDelay)
.def_property(
"playback_duration",
&PlayOptions::getPlaybackDuration,
&PlayOptions::setPlaybackDuration)
.def_readwrite("disable_keyboard_controls", &PlayOptions::disable_keyboard_controls)
.def_readwrite("start_paused", &PlayOptions::start_paused)
.def_property(
Expand Down Expand Up @@ -278,7 +293,7 @@ PYBIND11_MODULE(_transport, m) {

py::class_<rosbag2_py::Player>(m, "Player")
.def(py::init())
.def("play", &rosbag2_py::Player::play)
.def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options"))
;

py::class_<rosbag2_py::Recorder>(m, "Recorder")
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ function(create_tests_for_rmw_implementation)
AMENT_DEPS test_msgs rosbag2_test_common
${SKIP_TEST})

rosbag2_transport_add_gmock(test_play_for
test/rosbag2_transport/test_play_for.cpp
INCLUDE_DIRS $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src/rosbag2_transport>
LINK_LIBS rosbag2_transport
AMENT_DEPS test_msgs rosbag2_test_common)

rosbag2_transport_add_gmock(test_play_loop
test/rosbag2_transport/test_play_loop.cpp
LINK_LIBS rosbag2_transport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ struct PlayOptions
// Sleep before play. Negative durations invalid. Will delay at the beginning of each loop.
rclcpp::Duration delay = rclcpp::Duration(0, 0);

// Determines the maximum duration of the playback. Negative durations will make the playback to
// not stop. Default configuration makes the player to not stop execution.
rclcpp::Duration playback_duration = rclcpp::Duration(-1, 0);

// Start paused.
bool start_paused = false;

Expand Down
8 changes: 7 additions & 1 deletion rosbag2_transport/include/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <functional>
#include <future>
#include <memory>
#include <optional>
#include <queue>
#include <string>
#include <unordered_map>
Expand All @@ -37,6 +38,7 @@
#include "rosbag2_interfaces/srv/get_rate.hpp"
#include "rosbag2_interfaces/srv/is_paused.hpp"
#include "rosbag2_interfaces/srv/pause.hpp"
#include "rosbag2_interfaces/srv/play.hpp"
#include "rosbag2_interfaces/srv/play_next.hpp"
#include "rosbag2_interfaces/srv/burst.hpp"
#include "rosbag2_interfaces/srv/resume.hpp"
Expand Down Expand Up @@ -94,7 +96,7 @@ class Player : public rclcpp::Node
virtual ~Player();

ROSBAG2_TRANSPORT_PUBLIC
void play();
bool play();

// Playback control interface
/// Pause the flow of time for playback.
Expand Down Expand Up @@ -218,14 +220,17 @@ class Player : public rclcpp::Node

rosbag2_storage::StorageOptions storage_options_;
rosbag2_transport::PlayOptions play_options_;
rcutils_time_point_value_t play_until_time_ = -1;
moodycamel::ReaderWriterQueue<rosbag2_storage::SerializedBagMessageSharedPtr> message_queue_;
mutable std::future<void> storage_loading_future_;
std::atomic_bool load_storage_content_{true};
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unique_ptr<rosbag2_cpp::PlayerClock> clock_;
std::shared_ptr<rclcpp::TimerBase> clock_publish_timer_;
std::mutex skip_message_in_main_play_loop_mutex_;
bool skip_message_in_main_play_loop_ RCPPUTILS_TSA_GUARDED_BY(
skip_message_in_main_play_loop_mutex_) = false;
std::atomic_bool is_in_playback_{false};

rcutils_time_point_value_t starting_time_;

Expand All @@ -236,6 +241,7 @@ class Player : public rclcpp::Node
rclcpp::Service<rosbag2_interfaces::srv::IsPaused>::SharedPtr srv_is_paused_;
rclcpp::Service<rosbag2_interfaces::srv::GetRate>::SharedPtr srv_get_rate_;
rclcpp::Service<rosbag2_interfaces::srv::SetRate>::SharedPtr srv_set_rate_;
rclcpp::Service<rosbag2_interfaces::srv::Play>::SharedPtr srv_play_;
rclcpp::Service<rosbag2_interfaces::srv::PlayNext>::SharedPtr srv_play_next_;
rclcpp::Service<rosbag2_interfaces::srv::Burst>::SharedPtr srv_burst_;
rclcpp::Service<rosbag2_interfaces::srv::Seek>::SharedPtr srv_seek_;
Expand Down
47 changes: 43 additions & 4 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ Player::Player(
set_rate(play_options_.rate);
topic_qos_profile_overrides_ = play_options_.topic_qos_profile_overrides;
prepare_publishers();

if (play_options_.playback_duration >= rclcpp::Duration(0, 0)) {
play_until_time_ = starting_time_ + play_options_.playback_duration.nanoseconds();
}
}
create_control_services();
add_keyboard_callbacks();
Expand Down Expand Up @@ -191,8 +195,13 @@ bool Player::is_storage_completely_loaded() const
return !storage_loading_future_.valid();
}

void Player::play()
bool Player::play()
{
if (is_in_playback_.exchange(true)) {
RCLCPP_WARN_STREAM(get_logger(), "Trying to play() while in playback, dismissing request.");
return false;
}

rclcpp::Duration delay(0, 0);
if (play_options_.delay >= rclcpp::Duration(0, 0)) {
delay = play_options_.delay;
Expand All @@ -202,21 +211,28 @@ void Player::play()
"Invalid delay value: " << play_options_.delay.nanoseconds() << ". Delay is disabled.");
}

RCLCPP_INFO_STREAM(get_logger(), "Playback duration value: " << play_until_time_);

try {
do {
if (delay > rclcpp::Duration(0, 0)) {
RCLCPP_INFO_STREAM(get_logger(), "Sleep " << delay.nanoseconds() << " ns");
std::chrono::nanoseconds duration(delay.nanoseconds());
std::this_thread::sleep_for(duration);
std::chrono::nanoseconds delay_duration(delay.nanoseconds());
std::this_thread::sleep_for(delay_duration);
}
{
std::lock_guard<std::mutex> lk(reader_mutex_);
reader_->seek(starting_time_);
clock_->jump(starting_time_);
}
load_storage_content_ = true;
storage_loading_future_ = std::async(std::launch::async, [this]() {load_storage_content();});
wait_for_filled_queue();
play_messages_from_queue();

load_storage_content_ = false;
if (storage_loading_future_.valid()) {storage_loading_future_.get();}
while (message_queue_.pop()) {} // cleanup queue
{
std::lock_guard<std::mutex> lk(ready_to_play_from_queue_mutex_);
is_ready_to_play_from_queue_ = false;
Expand All @@ -225,6 +241,9 @@ void Player::play()
} while (rclcpp::ok() && play_options_.loop);
} catch (std::runtime_error & e) {
RCLCPP_ERROR(get_logger(), "Failed to play: %s", e.what());
load_storage_content_ = false;
if (storage_loading_future_.valid()) {storage_loading_future_.get();}
while (message_queue_.pop()) {} // cleanup queue
}
std::lock_guard<std::mutex> lk(ready_to_play_from_queue_mutex_);
is_ready_to_play_from_queue_ = false;
Expand Down Expand Up @@ -254,6 +273,9 @@ void Player::play()
}
}
}

is_in_playback_ = false;
return true;
}

void Player::pause()
Expand Down Expand Up @@ -344,6 +366,9 @@ bool Player::play_next()

bool next_message_published = false;
while (message_ptr != nullptr && !next_message_published) {
if (play_until_time_ >= starting_time_ && message_ptr->time_stamp > play_until_time_) {
break;
}
{
next_message_published = publish_message(message_ptr);
clock_->jump(message_ptr->time_stamp);
Expand Down Expand Up @@ -395,6 +420,7 @@ void Player::seek(rcutils_time_point_value_t time_point)
// Restart queuing thread if it has finished running (previously reached end of bag),
// otherwise, queueing should continue automatically after releasing mutex
if (is_storage_completely_loaded() && rclcpp::ok()) {
load_storage_content_ = true;
storage_loading_future_ =
std::async(std::launch::async, [this]() {load_storage_content();});
}
Expand All @@ -417,7 +443,7 @@ void Player::load_storage_content()
static_cast<size_t>(play_options_.read_ahead_queue_size * read_ahead_lower_bound_percentage_);
auto queue_upper_boundary = play_options_.read_ahead_queue_size;

while (rclcpp::ok()) {
while (rclcpp::ok() && load_storage_content_) {
TSAUniqueLock lk(reader_mutex_);
if (!reader_->has_next()) {break;}

Expand Down Expand Up @@ -456,6 +482,9 @@ void Player::play_messages_from_queue()
ready_to_play_from_queue_cv_.notify_all();
}
while (message_ptr != nullptr && rclcpp::ok()) {
if (play_until_time_ >= starting_time_ && message_ptr->time_stamp > play_until_time_) {
break;
}
// Do not move on until sleep_until returns true
// It will always sleep, so this is not a tight busy loop on pause
while (rclcpp::ok() && !clock_->sleep_until(message_ptr->time_stamp)) {
Expand Down Expand Up @@ -678,6 +707,16 @@ void Player::create_control_services()
{
response->success = set_rate(request->rate);
});
srv_play_ = create_service<rosbag2_interfaces::srv::Play>(
"~/play",
[this](
rosbag2_interfaces::srv::Play::Request::ConstSharedPtr request,
rosbag2_interfaces::srv::Play::Response::SharedPtr response)
{
play_options_.start_offset = rclcpp::Time(request->start_offset).nanoseconds();
play_options_.playback_duration = rclcpp::Duration(request->playback_duration);
response->success = play();
});
srv_play_next_ = create_service<rosbag2_interfaces::srv::PlayNext>(
"~/play_next",
[this](
Expand Down
Loading