From 2be261fbd70b99920c6b1733eb3400bcab3d5247 Mon Sep 17 00:00:00 2001 From: Geoffrey Biggs Date: Tue, 15 Feb 2022 21:37:53 +0900 Subject: [PATCH 1/9] Add play-for functionality Signed-off-by: Geoffrey Biggs --- rosbag2_interfaces/CMakeLists.txt | 1 + rosbag2_interfaces/srv/PlayFor.srv | 3 + .../subscription_manager.hpp | 29 ++ rosbag2_transport/CMakeLists.txt | 8 + .../include/rosbag2_transport/player.hpp | 7 +- .../src/rosbag2_transport/player.cpp | 32 +- .../test/rosbag2_transport/test_play_for.cpp | 275 ++++++++++++++++++ .../test/rosbag2_transport/test_play_loop.cpp | 2 +- 8 files changed, 349 insertions(+), 8 deletions(-) create mode 100644 rosbag2_interfaces/srv/PlayFor.srv create mode 100644 rosbag2_transport/test/rosbag2_transport/test_play_for.cpp diff --git a/rosbag2_interfaces/CMakeLists.txt b/rosbag2_interfaces/CMakeLists.txt index 9b82745b2d..69c02ce1a2 100644 --- a/rosbag2_interfaces/CMakeLists.txt +++ b/rosbag2_interfaces/CMakeLists.txt @@ -17,6 +17,7 @@ rosidl_generate_interfaces(${PROJECT_NAME} "srv/GetRate.srv" "srv/IsPaused.srv" "srv/Pause.srv" + "srv/PlayFor.srv" "srv/PlayNext.srv" "srv/Resume.srv" "srv/Seek.srv" diff --git a/rosbag2_interfaces/srv/PlayFor.srv b/rosbag2_interfaces/srv/PlayFor.srv new file mode 100644 index 0000000000..8f60cb07b4 --- /dev/null +++ b/rosbag2_interfaces/srv/PlayFor.srv @@ -0,0 +1,3 @@ +builtin_interfaces/Duration duration +--- +bool success # return true diff --git a/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp b/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp index f37fa75477..15ea840112 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp @@ -129,6 +129,18 @@ class SubscriptionManager }); } + std::future spin_subscriptions_for(rcutils_duration_value_t duration) + { + return async( + std::launch::async, [this, duration]() { + rclcpp::executors::SingleThreadedExecutor exec; + auto end = std::chrono::high_resolution_clock::now() + std::chrono::nanoseconds(duration); + while (rclcpp::ok() && continue_spinning_until(expected_topics_with_size_, end)) { + exec.spin_node_some(subscriber_node_); + } + }); + } + private: bool continue_spinning( const std::unordered_map & expected_topics_with_sizes, @@ -147,6 +159,23 @@ class SubscriptionManager return false; } + bool continue_spinning_until( + const std::unordered_map & expected_topics_with_sizes, + std::chrono::time_point end_time) + { + auto current = std::chrono::high_resolution_clock::now(); + if (current > end_time) { + return false; + } + + for (const auto & topic_expected : expected_topics_with_sizes) { + if (subscribed_messages_[topic_expected.first].size() < topic_expected.second) { + return true; + } + } + return false; + } + std::vector subscriptions_; std::unordered_map>> subscribed_messages_; diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index e7d341fb66..f2b7eaf831 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -110,6 +110,14 @@ function(create_tests_for_rmw_implementation) AMENT_DEPS test_msgs rosbag2_test_common ${SKIP_TEST}) + rosbag2_transport_add_gmock(test_play_for + src/rosbag2_transport/qos.cpp + test/rosbag2_transport/test_play_for.cpp + INCLUDE_DIRS $ + LINK_LIBS rosbag2_transport + AMENT_DEPS test_msgs rosbag2_test_common + ${SKIP_TEST}) + rosbag2_transport_add_gmock(test_play_loop test/rosbag2_transport/test_play_loop.cpp LINK_LIBS rosbag2_transport diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 02b0741f8d..358edc5a30 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -37,6 +38,7 @@ #include "rosbag2_interfaces/srv/get_rate.hpp" #include "rosbag2_interfaces/srv/is_paused.hpp" #include "rosbag2_interfaces/srv/pause.hpp" +#include "rosbag2_interfaces/srv/play_for.hpp" #include "rosbag2_interfaces/srv/play_next.hpp" #include "rosbag2_interfaces/srv/burst.hpp" #include "rosbag2_interfaces/srv/resume.hpp" @@ -94,7 +96,7 @@ class Player : public rclcpp::Node virtual ~Player(); ROSBAG2_TRANSPORT_PUBLIC - void play(); + void play(const std::optional & duration = std::nullopt); // Playback control interface /// Pause the flow of time for playback. @@ -198,7 +200,7 @@ class Player : public rclcpp::Node bool is_storage_completely_loaded() const; void enqueue_up_to_boundary(size_t boundary) RCPPUTILS_TSA_REQUIRES(reader_mutex_); void wait_for_filled_queue() const; - void play_messages_from_queue(); + void play_messages_from_queue(const std::optional & play_until_time); void prepare_publishers(); bool publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message); static constexpr double read_ahead_lower_bound_percentage_ = 0.9; @@ -237,6 +239,7 @@ class Player : public rclcpp::Node rclcpp::Service::SharedPtr srv_get_rate_; rclcpp::Service::SharedPtr srv_set_rate_; rclcpp::Service::SharedPtr srv_play_next_; + rclcpp::Service::SharedPtr srv_play_for_; rclcpp::Service::SharedPtr srv_burst_; rclcpp::Service::SharedPtr srv_seek_; diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index dfae9cae6f..169b5f00c4 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -191,7 +191,7 @@ bool Player::is_storage_completely_loaded() const return !storage_loading_future_.valid(); } -void Player::play() +void Player::play(const std::optional & duration) { rclcpp::Duration delay(0, 0); if (play_options_.delay >= rclcpp::Duration(0, 0)) { @@ -206,8 +206,12 @@ void Player::play() do { if (delay > rclcpp::Duration(0, 0)) { RCLCPP_INFO_STREAM(get_logger(), "Sleep " << delay.nanoseconds() << " ns"); - std::chrono::nanoseconds duration(delay.nanoseconds()); - std::this_thread::sleep_for(duration); + std::chrono::nanoseconds delay_duration(delay.nanoseconds()); + std::this_thread::sleep_for(delay_duration); + } + std::optional play_until_time{}; + if (duration.has_value() && *duration > 0) { + play_until_time = {starting_time_ + *duration}; } { std::lock_guard lk(reader_mutex_); @@ -216,7 +220,7 @@ void Player::play() } storage_loading_future_ = std::async(std::launch::async, [this]() {load_storage_content();}); wait_for_filled_queue(); - play_messages_from_queue(); + play_messages_from_queue({play_until_time}); { std::lock_guard lk(ready_to_play_from_queue_mutex_); is_ready_to_play_from_queue_ = false; @@ -442,7 +446,8 @@ void Player::enqueue_up_to_boundary(size_t boundary) } } -void Player::play_messages_from_queue() +void Player::play_messages_from_queue( + const std::optional & play_until_time) { // Note: We need to use message_queue_.peek() instead of message_queue_.try_dequeue(message) // to support play_next() API logic. @@ -458,6 +463,9 @@ void Player::play_messages_from_queue() while (message_ptr != nullptr && rclcpp::ok()) { // Do not move on until sleep_until returns true // It will always sleep, so this is not a tight busy loop on pause + if (play_until_time.has_value() && message_ptr->time_stamp > *play_until_time) { + break; + } while (rclcpp::ok() && !clock_->sleep_until(message_ptr->time_stamp)) { if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) { break; @@ -686,6 +694,20 @@ void Player::create_control_services() { response->success = play_next(); }); + srv_play_for_ = create_service( + "~/play_for", + [this]( + const std::shared_ptr/* request_header */, + const std::shared_ptr request, + const std::shared_ptr response) + { + const rcutils_duration_value_t duration = + static_cast(request->duration.sec) * + static_cast(1000000000) + + static_cast(request->duration.nanosec); + play({duration}); + response->success = true; + }); srv_burst_ = create_service( "~/burst", [this]( diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp new file mode 100644 index 0000000000..cd18b0f93b --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp @@ -0,0 +1,275 @@ +// Copyright 2022, Open Source Robotics Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.Bosch Software Innovations GmbH. + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" + +#include "rosbag2_test_common/subscription_manager.hpp" + +#include "rosbag2_transport/player.hpp" +#include "rosbag2_transport/qos.hpp" + +#include "test_msgs/msg/arrays.hpp" +#include "test_msgs/msg/basic_types.hpp" +#include "test_msgs/message_fixtures.hpp" + +#include "rosbag2_play_test_fixture.hpp" +#include "rosbag2_transport_test_fixture.hpp" + +using namespace ::testing; // NOLINT +using namespace rosbag2_transport; // NOLINT +using namespace std::chrono_literals; // NOLINT +using namespace rosbag2_test_common; // NOLINT + + +constexpr int kIntValue{32}; + +constexpr float kFloat1Value{40.}; +constexpr float kFloat2Value{2.}; +constexpr float kFloat3Value{0.}; + +constexpr bool kBool1Value{false}; +constexpr bool kBool2Value{true}; +constexpr bool kBool3Value{false}; + +#define EVAL_REPLAYED_PRIMITIVES(replayed_primitives) \ + EXPECT_THAT( \ + replayed_primitives, \ + Each(Pointee(Field(&test_msgs::msg::BasicTypes::int32_value, kIntValue)))) + +#define EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_float_array_primitive) \ + EXPECT_THAT( \ + replayed_float_array_primitive, \ + Each( \ + Pointee( \ + Field( \ + &test_msgs::msg::Arrays::float32_values, \ + ElementsAre(kFloat1Value, kFloat2Value, kFloat3Value))))) + +#define EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_bool_array_primitive) \ + EXPECT_THAT( \ + replayed_bool_array_primitive, \ + Each( \ + Pointee( \ + Field( \ + &test_msgs::msg::Arrays::bool_values, \ + ElementsAre(kBool1Value, kBool2Value, kBool3Value))))) + +class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture +{ +public: + static constexpr const char * kTopic1Name{"topic1"}; + static constexpr const char * kTopic2Name{"topic2"}; + static constexpr const char * kTopic1{"/topic1"}; + static constexpr const char * kTopic2{"/topic2"}; + + std::vector get_topic_types() + { + return {{kTopic1Name, "test_msgs/BasicTypes", "", ""}, + {kTopic2Name, "test_msgs/Arrays", "", ""}}; + } + + std::vector> + get_serialized_messages() + { + auto primitive_message1 = get_messages_basic_types()[0]; + primitive_message1->int32_value = kIntValue; + + auto complex_message1 = get_messages_arrays()[0]; + complex_message1->float32_values = {{kFloat1Value, kFloat2Value, kFloat3Value}}; + complex_message1->bool_values = {{kBool1Value, kBool2Value, kBool3Value}}; + + // @{ Ordering matters. The mock reader implementation moves messages + // around without any knowledge about message chronology. It just picks + // the next one Make sure to keep the list in order or sort it! + std::vector> messages = + {serialize_test_message(kTopic1Name, 500, primitive_message1), + serialize_test_message(kTopic2Name, 550, complex_message1), + serialize_test_message(kTopic1Name, 700, primitive_message1), + serialize_test_message(kTopic2Name, 750, complex_message1), + serialize_test_message(kTopic1Name, 900, primitive_message1), + serialize_test_message(kTopic2Name, 950, complex_message1)}; + // @} + return messages; + } + + void SetUp() override + { + auto topic_types = get_topic_types(); + auto messages = get_serialized_messages(); + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + // Due to a problem related to the subscriber, we play many (3) messages but make the subscriber + // node spin only until 2 have arrived. Hence the 2 as `launch_subscriber()` argument. + sub_->add_subscription(kTopic1, 2); + sub_->add_subscription(kTopic2, 2); + + player_ = std::make_shared( + std::move( + reader), storage_options_, play_options_); + } + + std::shared_ptr player_; + std::future await_received_messages_; +}; + +TEST_F(RosBag2PlayForTestFixture, play_for_all_are_played_due_to_duration) +{ + auto await_received_messages = sub_->spin_subscriptions(); + + player_->play(std::chrono::nanoseconds(std::chrono::milliseconds(1000)).count()); + + await_received_messages.get(); + + auto replayed_test_primitives = sub_->get_received_messages( + kTopic1); + EXPECT_THAT(replayed_test_primitives, SizeIs(Ge(2u))); + EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives); + + auto replayed_test_arrays = sub_->get_received_messages( + kTopic2); + EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(2u))); + EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); + EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); +} + +TEST_F(RosBag2PlayForTestFixture, play_for_none_are_played_due_to_duration) +{ + const rcutils_duration_value_t duration = + std::chrono::nanoseconds(std::chrono::milliseconds(300)).count(); + + auto await_received_messages = sub_->spin_subscriptions_for(duration); + + player_->play(duration); + + await_received_messages.get(); + + auto replayed_test_primitives = sub_->get_received_messages( + kTopic1); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u))); + + auto replayed_test_arrays = sub_->get_received_messages( + kTopic2); + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(0u))); +} + +TEST_F(RosBag2PlayForTestFixture, play_for_less_than_the_total_duration) +{ + const rcutils_duration_value_t duration = + std::chrono::nanoseconds(std::chrono::milliseconds(800)).count(); + + auto await_received_messages = sub_->spin_subscriptions_for(duration); + + player_->play(duration); + + await_received_messages.get(); + + auto replayed_test_primitives = sub_->get_received_messages( + kTopic1); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(2u))); + EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives); + + auto replayed_test_arrays = sub_->get_received_messages( + kTopic2); + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(2u))); + EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); + EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); +} + +class RosBag2PlayForFilteredTopicTestFixture : public RosBag2PlayForTestFixture +{ +public: + void SetUp() override + { + // Filter allows /topic2, blocks /topic1 + play_options_.topics_to_filter = {"topic2"}; + RosBag2PlayForTestFixture::SetUp(); + } +}; + +TEST_F( + RosBag2PlayForFilteredTopicTestFixture, + play_for_full_duration_recorded_messages_with_filtered_topics) +{ + auto await_received_messages = sub_->spin_subscriptions(); + + player_->play(std::chrono::nanoseconds(std::chrono::milliseconds(1000)).count()); + + await_received_messages.get(); + + auto replayed_test_primitives = + sub_->get_received_messages("/topic1"); + // No messages are allowed to have arrived + EXPECT_THAT(replayed_test_primitives, SizeIs(0u)); + + auto replayed_test_arrays = sub_->get_received_messages("/topic2"); + // All messages should have arrived. + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(3u))); + EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); + EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); +} + +TEST_F( + RosBag2PlayForFilteredTopicTestFixture, + play_for_short_duration_recorded_messages_with_filtered_topics) +{ + auto await_received_messages = sub_->spin_subscriptions(); + + player_->play(std::chrono::nanoseconds(std::chrono::milliseconds(300)).count()); + + await_received_messages.get(); + + auto replayed_test_primitives = + sub_->get_received_messages("/topic1"); + // No messages are allowed to have arrived + EXPECT_THAT(replayed_test_primitives, SizeIs(0u)); + + auto replayed_test_arrays = sub_->get_received_messages("/topic2"); + // All messages should have arrived. + EXPECT_THAT(replayed_test_arrays, SizeIs(0u)); +} + +TEST_F( + RosBag2PlayForFilteredTopicTestFixture, + play_for_intermediate_duration_recorded_messages_with_filtered_topics) +{ + auto await_received_messages = sub_->spin_subscriptions(); + + player_->play(std::chrono::nanoseconds(std::chrono::milliseconds(800)).count()); + + await_received_messages.get(); + + auto replayed_test_primitives = + sub_->get_received_messages("/topic1"); + // No messages are allowed to have arrived + EXPECT_THAT(replayed_test_primitives, SizeIs(0u)); + + auto replayed_test_arrays = sub_->get_received_messages("/topic2"); + // Some messages should have arrived. + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(2u))); + EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); + EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); +} diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp index 65f7795dac..f4c1fed059 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp @@ -131,7 +131,7 @@ TEST_F(RosBag2PlayTestFixture, messages_played_in_loop) { auto player = std::make_shared( std::move( reader), storage_options_, play_options); - std::thread loop_thread(&rosbag2_transport::Player::play, player); + std::thread loop_thread(&rosbag2_transport::Player::play, player, std::nullopt); await_received_messages.get(); rclcpp::shutdown(); From 8cefe4c0904d575d389d4285dd896f2106bbe158 Mon Sep 17 00:00:00 2001 From: Geoffrey Biggs Date: Tue, 15 Feb 2022 21:45:55 +0900 Subject: [PATCH 2/9] Add play-for to the CLI Signed-off-by: Geoffrey Biggs --- ros2bag/ros2bag/verb/play.py | 10 +++++++++- rosbag2_py/src/rosbag2_py/_transport.cpp | 9 ++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 92e01bcbb1..49947c150e 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -111,6 +111,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102 'By default, if loaned message can be used, messages are published as loaned ' 'message. It can help to reduce the number of data copies, so there is a greater ' 'benefit for sending big data.') + parser.add_argument( + '-f', '--duration', type=float, default=None, + help='Play for SEC seconds. Default is None, meaning that playback will continue ' + 'until the end of the bag. Valid range > 0.0') def main(self, *, args): # noqa: D102 qos_profile_overrides = {} # Specify a valid default @@ -152,5 +156,9 @@ def main(self, *, args): # noqa: D102 play_options.wait_acked_timeout = args.wait_for_all_acked play_options.disable_loan_message = args.disable_loan_message + # Gets the duration in nanoseconds when a value is provided for player + # consumption. + duration = int(args.duration * 1e9) if args.duration else args.duration + player = Player() - player.play(storage_options, play_options) + player.play(storage_options, play_options, duration) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index bbfa21b73a..a9592d5a36 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -120,7 +120,8 @@ class Player void play( const rosbag2_storage::StorageOptions & storage_options, - PlayOptions & play_options) + PlayOptions & play_options, + const std::optional & duration) { auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options); auto player = std::make_shared( @@ -132,7 +133,7 @@ class Player [&exec]() { exec.spin(); }); - player->play(); + player->play(duration); exec.cancel(); spin_thread.join(); @@ -278,7 +279,9 @@ PYBIND11_MODULE(_transport, m) { py::class_(m, "Player") .def(py::init()) - .def("play", &rosbag2_py::Player::play) + .def( + "play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg( + "play_options"), py::arg("duration") = std::nullopt) ; py::class_(m, "Recorder") From 0623117c2185437022d84cdee6b53df77fa7ea07 Mon Sep 17 00:00:00 2001 From: Agustin Alba Chicar Date: Tue, 15 Mar 2022 21:49:14 -0300 Subject: [PATCH 3/9] Solves most of the comments from https://github.com/ros2/rosbag2/pull/960/files (#14) * Add play-for functionality Signed-off-by: Geoffrey Biggs * Add play-for to the CLI Signed-off-by: Geoffrey Biggs * Adds playback_duration to PlayOptions. * Changes from PlayFor to Play srv message and changes start_offset and playback_duration. * Restores play_for tests. * Removes extra SubscriptionManager methods. * Solves comment about extra sent message. * Reorders code and comment. * Removes the SKIP_TEST flag. Co-authored-by: Geoffrey Biggs Signed-off-by: Geoffrey Biggs --- ros2bag/ros2bag/verb/play.py | 5 ++ rosbag2_interfaces/CMakeLists.txt | 2 +- rosbag2_interfaces/srv/Play.srv | 6 ++ rosbag2_py/src/rosbag2_py/_transport.cpp | 18 +++- rosbag2_transport/CMakeLists.txt | 3 +- .../rosbag2_transport/play_options.hpp | 4 + .../include/rosbag2_transport/player.hpp | 5 +- .../src/rosbag2_transport/player.cpp | 42 +++++++--- .../test/rosbag2_transport/test_play_for.cpp | 83 +++++++------------ 9 files changed, 99 insertions(+), 69 deletions(-) create mode 100644 rosbag2_interfaces/srv/Play.srv diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 49947c150e..758ac4433b 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -85,6 +85,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102 parser.add_argument( '-d', '--delay', type=positive_float, default=0.0, help='Sleep duration before play (each loop), in seconds. Negative durations invalid.') + parser.add_argument( + '--playback_duration', type=float, default=-1.0, + help='Playback duration, in seconds. Negative durations mark an infinite playback. ' + 'Default is -1.0.') parser.add_argument( '--disable-keyboard-controls', action='store_true', help='disables keyboard controls for playback') @@ -150,6 +154,7 @@ def main(self, *, args): # noqa: D102 play_options.topic_remapping_options = topic_remapping play_options.clock_publish_frequency = args.clock play_options.delay = args.delay + play_options.playback_duration = args.playback_duration play_options.disable_keyboard_controls = args.disable_keyboard_controls play_options.start_paused = args.start_paused play_options.start_offset = args.start_offset diff --git a/rosbag2_interfaces/CMakeLists.txt b/rosbag2_interfaces/CMakeLists.txt index 69c02ce1a2..cb0393bccd 100644 --- a/rosbag2_interfaces/CMakeLists.txt +++ b/rosbag2_interfaces/CMakeLists.txt @@ -17,7 +17,7 @@ rosidl_generate_interfaces(${PROJECT_NAME} "srv/GetRate.srv" "srv/IsPaused.srv" "srv/Pause.srv" - "srv/PlayFor.srv" + "srv/Play.srv" "srv/PlayNext.srv" "srv/Resume.srv" "srv/Seek.srv" diff --git a/rosbag2_interfaces/srv/Play.srv b/rosbag2_interfaces/srv/Play.srv new file mode 100644 index 0000000000..c9e7af80f2 --- /dev/null +++ b/rosbag2_interfaces/srv/Play.srv @@ -0,0 +1,6 @@ +# See rosbag2_transport::PlayOptions::start_time +builtin_interfaces/Time start_offset +# See rosbag2_transport::PlayOptions::playback_duration +builtin_interfaces/Duration playback_duration +--- +bool success # return true diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index a9592d5a36..ce143abe84 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -69,6 +69,17 @@ struct OptionsWrapper : public T static_cast(RCUTILS_S_TO_NS(delay))); } + double getPlaybackDuration() const + { + return RCUTILS_NS_TO_S(static_cast(this->playback_duration.nanoseconds())); + } + + void setPlaybackDuration(double playback_duration) + { + this->playback_duration = rclcpp::Duration::from_nanoseconds( + static_cast(RCUTILS_S_TO_NS(playback_duration))); + } + double getDelay() const { return RCUTILS_NS_TO_S(static_cast(this->delay.nanoseconds())); @@ -242,6 +253,10 @@ PYBIND11_MODULE(_transport, m) { "delay", &PlayOptions::getDelay, &PlayOptions::setDelay) + .def_property( + "playback_duration", + &PlayOptions::getPlaybackDuration, + &PlayOptions::setPlaybackDuration) .def_readwrite("disable_keyboard_controls", &PlayOptions::disable_keyboard_controls) .def_readwrite("start_paused", &PlayOptions::start_paused) .def_property( @@ -280,8 +295,7 @@ PYBIND11_MODULE(_transport, m) { py::class_(m, "Player") .def(py::init()) .def( - "play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg( - "play_options"), py::arg("duration") = std::nullopt) + "play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options")) ; py::class_(m, "Recorder") diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index f2b7eaf831..e835392e18 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -115,8 +115,7 @@ function(create_tests_for_rmw_implementation) test/rosbag2_transport/test_play_for.cpp INCLUDE_DIRS $ LINK_LIBS rosbag2_transport - AMENT_DEPS test_msgs rosbag2_test_common - ${SKIP_TEST}) + AMENT_DEPS test_msgs rosbag2_test_common) rosbag2_transport_add_gmock(test_play_loop test/rosbag2_transport/test_play_loop.cpp diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index 2e8c012c5b..92225373cc 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -50,6 +50,10 @@ struct PlayOptions // Sleep before play. Negative durations invalid. Will delay at the beginning of each loop. rclcpp::Duration delay = rclcpp::Duration(0, 0); + // Determines the maximum duration of the playback. Negative durations will make the playback to + // not stop. Default configuration makes the player to not stop execution. + rclcpp::Duration playback_duration = rclcpp::Duration(-1, 0); + // Start paused. bool start_paused = false; diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 358edc5a30..781ccbff41 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -38,7 +38,7 @@ #include "rosbag2_interfaces/srv/get_rate.hpp" #include "rosbag2_interfaces/srv/is_paused.hpp" #include "rosbag2_interfaces/srv/pause.hpp" -#include "rosbag2_interfaces/srv/play_for.hpp" +#include "rosbag2_interfaces/srv/play.hpp" #include "rosbag2_interfaces/srv/play_next.hpp" #include "rosbag2_interfaces/srv/burst.hpp" #include "rosbag2_interfaces/srv/resume.hpp" @@ -200,7 +200,7 @@ class Player : public rclcpp::Node bool is_storage_completely_loaded() const; void enqueue_up_to_boundary(size_t boundary) RCPPUTILS_TSA_REQUIRES(reader_mutex_); void wait_for_filled_queue() const; - void play_messages_from_queue(const std::optional & play_until_time); + void play_messages_from_queue(const rcutils_duration_value_t & play_until_time); void prepare_publishers(); bool publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message); static constexpr double read_ahead_lower_bound_percentage_ = 0.9; @@ -238,6 +238,7 @@ class Player : public rclcpp::Node rclcpp::Service::SharedPtr srv_is_paused_; rclcpp::Service::SharedPtr srv_get_rate_; rclcpp::Service::SharedPtr srv_set_rate_; + rclcpp::Service::SharedPtr srv_play_; rclcpp::Service::SharedPtr srv_play_next_; rclcpp::Service::SharedPtr srv_play_for_; rclcpp::Service::SharedPtr srv_burst_; diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 169b5f00c4..e01f0d1ddc 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -202,6 +202,18 @@ void Player::play(const std::optional & duration) "Invalid delay value: " << play_options_.delay.nanoseconds() << ". Delay is disabled."); } + rcutils_time_point_value_t play_until_time = starting_time_; + if (play_options_.playback_duration >= rclcpp::Duration(0, 0)) { + play_until_time += play_options_.playback_duration.nanoseconds(); + } else { + play_until_time = -1; + RCLCPP_INFO_STREAM( + get_logger(), + "Invalid playback duration value: " << play_options_.playback_duration.nanoseconds() << + ". Playback duration is disabled."); + } + RCLCPP_INFO_STREAM(get_logger(), "Playback duration value: " << play_until_time); + try { do { if (delay > rclcpp::Duration(0, 0)) { @@ -209,10 +221,6 @@ void Player::play(const std::optional & duration) std::chrono::nanoseconds delay_duration(delay.nanoseconds()); std::this_thread::sleep_for(delay_duration); } - std::optional play_until_time{}; - if (duration.has_value() && *duration > 0) { - play_until_time = {starting_time_ + *duration}; - } { std::lock_guard lk(reader_mutex_); reader_->seek(starting_time_); @@ -220,7 +228,7 @@ void Player::play(const std::optional & duration) } storage_loading_future_ = std::async(std::launch::async, [this]() {load_storage_content();}); wait_for_filled_queue(); - play_messages_from_queue({play_until_time}); + play_messages_from_queue(play_until_time); { std::lock_guard lk(ready_to_play_from_queue_mutex_); is_ready_to_play_from_queue_ = false; @@ -446,8 +454,7 @@ void Player::enqueue_up_to_boundary(size_t boundary) } } -void Player::play_messages_from_queue( - const std::optional & play_until_time) +void Player::play_messages_from_queue(const rcutils_duration_value_t & play_until_time) { // Note: We need to use message_queue_.peek() instead of message_queue_.try_dequeue(message) // to support play_next() API logic. @@ -461,11 +468,12 @@ void Player::play_messages_from_queue( ready_to_play_from_queue_cv_.notify_all(); } while (message_ptr != nullptr && rclcpp::ok()) { - // Do not move on until sleep_until returns true - // It will always sleep, so this is not a tight busy loop on pause - if (play_until_time.has_value() && message_ptr->time_stamp > *play_until_time) { + rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr; + if (play_until_time >= starting_time_ && message->time_stamp > play_until_time) { break; } + // Do not move on until sleep_until returns true + // It will always sleep, so this is not a tight busy loop on pause while (rclcpp::ok() && !clock_->sleep_until(message_ptr->time_stamp)) { if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) { break; @@ -686,6 +694,20 @@ void Player::create_control_services() { response->success = set_rate(request->rate); }); + srv_play_ = create_service( + "~/play", + [this]( + rosbag2_interfaces::srv::Play::Request::ConstSharedPtr request, + rosbag2_interfaces::srv::Play::Response::SharedPtr response) + { + play_options_.start_offset = + static_cast(RCUTILS_S_TO_NS(request->start_offset.sec)) + + static_cast(request->start_offset.nanosec); + play_options_.playback_duration = rclcpp::Duration( + request->playback_duration.sec, request->playback_duration.nanosec); + play(); + response->success = true; + }); srv_play_next_ = create_service( "~/play_next", [this]( diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp index cd18b0f93b..653f3c4b54 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp @@ -36,12 +36,13 @@ #include "rosbag2_play_test_fixture.hpp" #include "rosbag2_transport_test_fixture.hpp" +#include "mock_player.hpp" + using namespace ::testing; // NOLINT using namespace rosbag2_transport; // NOLINT using namespace std::chrono_literals; // NOLINT using namespace rosbag2_test_common; // NOLINT - constexpr int kIntValue{32}; constexpr float kFloat1Value{40.}; @@ -113,7 +114,7 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture return messages; } - void SetUp() override + void InitPlayerWithPlaybackDurationAndPlay(int64_t milliseconds) { auto topic_types = get_topic_types(); auto messages = get_serialized_messages(); @@ -122,50 +123,48 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture prepared_mock_reader->prepare(messages, topic_types); auto reader = std::make_unique(std::move(prepared_mock_reader)); - // Due to a problem related to the subscriber, we play many (3) messages but make the subscriber - // node spin only until 2 have arrived. Hence the 2 as `launch_subscriber()` argument. - sub_->add_subscription(kTopic1, 2); - sub_->add_subscription(kTopic2, 2); + sub_->add_subscription(kTopic1, 3); + sub_->add_subscription(kTopic2, 3); - player_ = std::make_shared( + play_options_.playback_duration = + rclcpp::Duration(std::chrono::nanoseconds(std::chrono::milliseconds(milliseconds))); + player_ = std::make_shared( std::move( reader), storage_options_, play_options_); + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE( + sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), std::chrono::seconds(5))); + + auto await_received_messages = sub_->spin_subscriptions(); + + player_->play(); + + await_received_messages.get(); } - std::shared_ptr player_; - std::future await_received_messages_; + std::shared_ptr player_; }; + TEST_F(RosBag2PlayForTestFixture, play_for_all_are_played_due_to_duration) { - auto await_received_messages = sub_->spin_subscriptions(); - - player_->play(std::chrono::nanoseconds(std::chrono::milliseconds(1000)).count()); - - await_received_messages.get(); + InitPlayerWithPlaybackDurationAndPlay(1000); auto replayed_test_primitives = sub_->get_received_messages( kTopic1); - EXPECT_THAT(replayed_test_primitives, SizeIs(Ge(2u))); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(3u))); EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives); auto replayed_test_arrays = sub_->get_received_messages( kTopic2); - EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(2u))); + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(3u))); EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); } TEST_F(RosBag2PlayForTestFixture, play_for_none_are_played_due_to_duration) { - const rcutils_duration_value_t duration = - std::chrono::nanoseconds(std::chrono::milliseconds(300)).count(); - - auto await_received_messages = sub_->spin_subscriptions_for(duration); - - player_->play(duration); - - await_received_messages.get(); + InitPlayerWithPlaybackDurationAndPlay(300); auto replayed_test_primitives = sub_->get_received_messages( kTopic1); @@ -178,14 +177,7 @@ TEST_F(RosBag2PlayForTestFixture, play_for_none_are_played_due_to_duration) TEST_F(RosBag2PlayForTestFixture, play_for_less_than_the_total_duration) { - const rcutils_duration_value_t duration = - std::chrono::nanoseconds(std::chrono::milliseconds(800)).count(); - - auto await_received_messages = sub_->spin_subscriptions_for(duration); - - player_->play(duration); - - await_received_messages.get(); + InitPlayerWithPlaybackDurationAndPlay(800); auto replayed_test_primitives = sub_->get_received_messages( kTopic1); @@ -206,7 +198,6 @@ class RosBag2PlayForFilteredTopicTestFixture : public RosBag2PlayForTestFixture { // Filter allows /topic2, blocks /topic1 play_options_.topics_to_filter = {"topic2"}; - RosBag2PlayForTestFixture::SetUp(); } }; @@ -214,16 +205,12 @@ TEST_F( RosBag2PlayForFilteredTopicTestFixture, play_for_full_duration_recorded_messages_with_filtered_topics) { - auto await_received_messages = sub_->spin_subscriptions(); - - player_->play(std::chrono::nanoseconds(std::chrono::milliseconds(1000)).count()); - - await_received_messages.get(); + InitPlayerWithPlaybackDurationAndPlay(1000); auto replayed_test_primitives = sub_->get_received_messages("/topic1"); // No messages are allowed to have arrived - EXPECT_THAT(replayed_test_primitives, SizeIs(0u)); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u))); auto replayed_test_arrays = sub_->get_received_messages("/topic2"); // All messages should have arrived. @@ -236,36 +223,28 @@ TEST_F( RosBag2PlayForFilteredTopicTestFixture, play_for_short_duration_recorded_messages_with_filtered_topics) { - auto await_received_messages = sub_->spin_subscriptions(); - - player_->play(std::chrono::nanoseconds(std::chrono::milliseconds(300)).count()); - - await_received_messages.get(); + InitPlayerWithPlaybackDurationAndPlay(300); auto replayed_test_primitives = sub_->get_received_messages("/topic1"); // No messages are allowed to have arrived - EXPECT_THAT(replayed_test_primitives, SizeIs(0u)); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u))); auto replayed_test_arrays = sub_->get_received_messages("/topic2"); // All messages should have arrived. - EXPECT_THAT(replayed_test_arrays, SizeIs(0u)); + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(0u))); } TEST_F( RosBag2PlayForFilteredTopicTestFixture, play_for_intermediate_duration_recorded_messages_with_filtered_topics) { - auto await_received_messages = sub_->spin_subscriptions(); - - player_->play(std::chrono::nanoseconds(std::chrono::milliseconds(800)).count()); - - await_received_messages.get(); + InitPlayerWithPlaybackDurationAndPlay(800); auto replayed_test_primitives = sub_->get_received_messages("/topic1"); // No messages are allowed to have arrived - EXPECT_THAT(replayed_test_primitives, SizeIs(0u)); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u))); auto replayed_test_arrays = sub_->get_received_messages("/topic2"); // Some messages should have arrived. From 4601391aa190afce5c4d8ce8515e351ba2d29c5d Mon Sep 17 00:00:00 2001 From: Agustin Alba Chicar Date: Thu, 17 Mar 2022 04:50:57 -0300 Subject: [PATCH 4/9] Second round of comments from #960 upstream (#15) * Removes duration parameter. A leftover after switching to playback_duration. * Fixes comment. * Solves format in rosbag2_py -> _transport.cpp * Applies style suggestions. * Changes play() to return a boolean indicating whether the request could be fulfilled. * Removes extra unnecessary code. Signed-off-by: Geoffrey Biggs --- ros2bag/ros2bag/verb/play.py | 6 +- rosbag2_interfaces/srv/Play.srv | 5 +- rosbag2_interfaces/srv/PlayFor.srv | 3 - rosbag2_py/src/rosbag2_py/_transport.cpp | 8 +- .../subscription_manager.hpp | 29 ------- .../include/rosbag2_transport/player.hpp | 3 +- .../src/rosbag2_transport/player.cpp | 33 ++++---- .../test/rosbag2_transport/test_play_for.cpp | 82 +++++++++++++++++-- .../test/rosbag2_transport/test_play_loop.cpp | 2 +- 9 files changed, 102 insertions(+), 69 deletions(-) delete mode 100644 rosbag2_interfaces/srv/PlayFor.srv diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 758ac4433b..4932522a44 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -161,9 +161,5 @@ def main(self, *, args): # noqa: D102 play_options.wait_acked_timeout = args.wait_for_all_acked play_options.disable_loan_message = args.disable_loan_message - # Gets the duration in nanoseconds when a value is provided for player - # consumption. - duration = int(args.duration * 1e9) if args.duration else args.duration - player = Player() - player.play(storage_options, play_options, duration) + player.play(storage_options, play_options) diff --git a/rosbag2_interfaces/srv/Play.srv b/rosbag2_interfaces/srv/Play.srv index c9e7af80f2..54edf7dd45 100644 --- a/rosbag2_interfaces/srv/Play.srv +++ b/rosbag2_interfaces/srv/Play.srv @@ -1,6 +1,7 @@ -# See rosbag2_transport::PlayOptions::start_time +# See rosbag2_transport::PlayOptions::start_offset builtin_interfaces/Time start_offset # See rosbag2_transport::PlayOptions::playback_duration builtin_interfaces/Duration playback_duration --- -bool success # return true +# returns false when another playback execution is running, otherwise true +bool success diff --git a/rosbag2_interfaces/srv/PlayFor.srv b/rosbag2_interfaces/srv/PlayFor.srv deleted file mode 100644 index 8f60cb07b4..0000000000 --- a/rosbag2_interfaces/srv/PlayFor.srv +++ /dev/null @@ -1,3 +0,0 @@ -builtin_interfaces/Duration duration ---- -bool success # return true diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index ce143abe84..5de5419bab 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -131,8 +131,7 @@ class Player void play( const rosbag2_storage::StorageOptions & storage_options, - PlayOptions & play_options, - const std::optional & duration) + PlayOptions & play_options) { auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options); auto player = std::make_shared( @@ -144,7 +143,7 @@ class Player [&exec]() { exec.spin(); }); - player->play(duration); + player->play(); exec.cancel(); spin_thread.join(); @@ -294,8 +293,7 @@ PYBIND11_MODULE(_transport, m) { py::class_(m, "Player") .def(py::init()) - .def( - "play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options")) + .def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options")) ; py::class_(m, "Recorder") diff --git a/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp b/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp index 15ea840112..f37fa75477 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp @@ -129,18 +129,6 @@ class SubscriptionManager }); } - std::future spin_subscriptions_for(rcutils_duration_value_t duration) - { - return async( - std::launch::async, [this, duration]() { - rclcpp::executors::SingleThreadedExecutor exec; - auto end = std::chrono::high_resolution_clock::now() + std::chrono::nanoseconds(duration); - while (rclcpp::ok() && continue_spinning_until(expected_topics_with_size_, end)) { - exec.spin_node_some(subscriber_node_); - } - }); - } - private: bool continue_spinning( const std::unordered_map & expected_topics_with_sizes, @@ -159,23 +147,6 @@ class SubscriptionManager return false; } - bool continue_spinning_until( - const std::unordered_map & expected_topics_with_sizes, - std::chrono::time_point end_time) - { - auto current = std::chrono::high_resolution_clock::now(); - if (current > end_time) { - return false; - } - - for (const auto & topic_expected : expected_topics_with_sizes) { - if (subscribed_messages_[topic_expected.first].size() < topic_expected.second) { - return true; - } - } - return false; - } - std::vector subscriptions_; std::unordered_map>> subscribed_messages_; diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 781ccbff41..98e15a3576 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -96,7 +96,7 @@ class Player : public rclcpp::Node virtual ~Player(); ROSBAG2_TRANSPORT_PUBLIC - void play(const std::optional & duration = std::nullopt); + bool play(); // Playback control interface /// Pause the flow of time for playback. @@ -228,6 +228,7 @@ class Player : public rclcpp::Node std::mutex skip_message_in_main_play_loop_mutex_; bool skip_message_in_main_play_loop_ RCPPUTILS_TSA_GUARDED_BY( skip_message_in_main_play_loop_mutex_) = false; + std::atomic_bool is_in_playback_{false}; rcutils_time_point_value_t starting_time_; diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index e01f0d1ddc..7a5ec7bc02 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -191,8 +191,14 @@ bool Player::is_storage_completely_loaded() const return !storage_loading_future_.valid(); } -void Player::play(const std::optional & duration) +bool Player::play() { + if (is_in_playback_) { + RCLCPP_WARN_STREAM(get_logger(), "Trying to play() while in playback, dismissing request."); + return false; + } + is_in_playback_ = true; + rclcpp::Duration delay(0, 0); if (play_options_.delay >= rclcpp::Duration(0, 0)) { delay = play_options_.delay; @@ -202,15 +208,9 @@ void Player::play(const std::optional & duration) "Invalid delay value: " << play_options_.delay.nanoseconds() << ". Delay is disabled."); } - rcutils_time_point_value_t play_until_time = starting_time_; + rcutils_time_point_value_t play_until_time = -1; if (play_options_.playback_duration >= rclcpp::Duration(0, 0)) { - play_until_time += play_options_.playback_duration.nanoseconds(); - } else { - play_until_time = -1; - RCLCPP_INFO_STREAM( - get_logger(), - "Invalid playback duration value: " << play_options_.playback_duration.nanoseconds() << - ". Playback duration is disabled."); + play_until_time = starting_time_ + play_options_.playback_duration.nanoseconds(); } RCLCPP_INFO_STREAM(get_logger(), "Playback duration value: " << play_until_time); @@ -266,6 +266,9 @@ void Player::play(const std::optional & duration) } } } + + is_in_playback_ = false; + return true; } void Player::pause() @@ -474,7 +477,7 @@ void Player::play_messages_from_queue(const rcutils_duration_value_t & play_unti } // Do not move on until sleep_until returns true // It will always sleep, so this is not a tight busy loop on pause - while (rclcpp::ok() && !clock_->sleep_until(message_ptr->time_stamp)) { + while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) { if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) { break; } @@ -700,13 +703,9 @@ void Player::create_control_services() rosbag2_interfaces::srv::Play::Request::ConstSharedPtr request, rosbag2_interfaces::srv::Play::Response::SharedPtr response) { - play_options_.start_offset = - static_cast(RCUTILS_S_TO_NS(request->start_offset.sec)) + - static_cast(request->start_offset.nanosec); - play_options_.playback_duration = rclcpp::Duration( - request->playback_duration.sec, request->playback_duration.nanosec); - play(); - response->success = true; + play_options_.start_offset = rclcpp::Time(request->start_offset).nanoseconds(); + play_options_.playback_duration = rclcpp::Duration(request->playback_duration); + response->success = play(); }); srv_play_next_ = create_service( "~/play_next", diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp index 653f3c4b54..cd6d49480e 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp @@ -126,18 +126,15 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture sub_->add_subscription(kTopic1, 3); sub_->add_subscription(kTopic2, 3); - play_options_.playback_duration = - rclcpp::Duration(std::chrono::nanoseconds(std::chrono::milliseconds(milliseconds))); - player_ = std::make_shared( - std::move( - reader), storage_options_, play_options_); + play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(milliseconds)); + player_ = std::make_shared(std::move(reader), storage_options_, play_options_); // Wait for discovery to match publishers with subscribers ASSERT_TRUE( sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), std::chrono::seconds(5))); auto await_received_messages = sub_->spin_subscriptions(); - player_->play(); + ASSERT_TRUE(player_->play()); await_received_messages.get(); } @@ -252,3 +249,76 @@ TEST_F( EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); } + +class RosBag2PlayForInterruptedTestFixture : public RosBag2PlayTestFixture +{ +public: + static constexpr const char * kTopic1Name{"topic1"}; + static constexpr const char * kTopic1{"/topic1"}; + + std::vector get_topic_types() + { + return {{kTopic1Name, "test_msgs/BasicTypes", "", ""}}; + } + + std::vector> + get_serialized_messages() + { + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = kIntValue; + + // @{ Ordering matters. The mock reader implementation moves messages + // around without any knowledge about message chronology. It just picks + // the next one Make sure to keep the list in order or sort it! + std::vector> messages = + {serialize_test_message(kTopic1Name, 500, primitive_message), + serialize_test_message(kTopic1Name, 700, primitive_message), + serialize_test_message(kTopic1Name, 900, primitive_message)}; + // @} + return messages; + } +}; + +TEST_F( + RosBag2PlayForInterruptedTestFixture, + play_should_return_false_when_interrupted) +{ + + auto topic_types = get_topic_types(); + auto messages = get_serialized_messages(); + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + sub_->add_subscription(kTopic1, 3); + + play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(1000)); + std::shared_ptr player_ = std::make_shared( + std::move( + reader), storage_options_, play_options_); + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE( + sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), std::chrono::seconds(5))); + + auto await_received_messages = sub_->spin_subscriptions(); + + auto play_execution_1 = std::async(std::launch::async, [player_]() {return player_->play();}); + auto play_execution_2 = std::async(std::launch::async, [player_]() {return player_->play();}); + + await_received_messages.get(); + + const auto play_execution_1_wait_result = play_execution_1.wait_for( + std::chrono::milliseconds( + 1500)); + const auto play_execution_2_wait_result = play_execution_2.wait_for( + std::chrono::milliseconds( + 1500)); + + ASSERT_EQ(std::future_status::ready, play_execution_1_wait_result); + ASSERT_EQ(std::future_status::ready, play_execution_2_wait_result); + + ASSERT_TRUE( + (play_execution_1.get() && !play_execution_2.get()) || + (!play_execution_1.get() && play_execution_2.get())); +} diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp index f4c1fed059..65f7795dac 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp @@ -131,7 +131,7 @@ TEST_F(RosBag2PlayTestFixture, messages_played_in_loop) { auto player = std::make_shared( std::move( reader), storage_options_, play_options); - std::thread loop_thread(&rosbag2_transport::Player::play, player, std::nullopt); + std::thread loop_thread(&rosbag2_transport::Player::play, player); await_received_messages.get(); rclcpp::shutdown(); From 58ca1a7acb19c5f5024562f5aed2ccf9db3436ce Mon Sep 17 00:00:00 2001 From: Agustin Alba Chicar Date: Tue, 22 Mar 2022 20:12:19 -0300 Subject: [PATCH 5/9] Updates test execution time for https://github.com/ros2/rosbag2/pull/960 (#16) * Adresses reviewer's comments. * Improve test time by adding an optional argument to SubscriptionManager::spin_subscriptions() - Reduces test_play_for execution time from 50s to 6s approximately. Signed-off-by: Geoffrey Biggs --- .../subscription_manager.hpp | 13 ++-- .../src/rosbag2_transport/player.cpp | 3 +- .../test/rosbag2_transport/test_play_for.cpp | 73 +++++++++---------- 3 files changed, 43 insertions(+), 46 deletions(-) diff --git a/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp b/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp index f37fa75477..9fe9bc3a88 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp @@ -117,13 +117,15 @@ class SubscriptionManager return matched; } - std::future spin_subscriptions() + std::future spin_subscriptions(int maximum_time_spinning_sec = 10) { return async( - std::launch::async, [this]() { + std::launch::async, [this, maximum_time_spinning_sec]() { rclcpp::executors::SingleThreadedExecutor exec; auto start = std::chrono::high_resolution_clock::now(); - while (rclcpp::ok() && continue_spinning(expected_topics_with_size_, start)) { + while (rclcpp::ok() && + continue_spinning(expected_topics_with_size_, start, maximum_time_spinning_sec)) + { exec.spin_node_some(subscriber_node_); } }); @@ -132,10 +134,11 @@ class SubscriptionManager private: bool continue_spinning( const std::unordered_map & expected_topics_with_sizes, - std::chrono::time_point start_time) + std::chrono::time_point start_time, + int maximum_time_spinning_sec = 10) { auto current = std::chrono::high_resolution_clock::now(); - if (current - start_time > std::chrono::seconds(10)) { + if (current - start_time > std::chrono::seconds(maximum_time_spinning_sec)) { return false; } diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 7a5ec7bc02..d5bedc6465 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -193,11 +193,10 @@ bool Player::is_storage_completely_loaded() const bool Player::play() { - if (is_in_playback_) { + if (is_in_playback_.exchange(true)) { RCLCPP_WARN_STREAM(get_logger(), "Trying to play() while in playback, dismissing request."); return false; } - is_in_playback_ = true; rclcpp::Duration delay(0, 0); if (play_options_.delay >= rclcpp::Duration(0, 0)) { diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp index cd6d49480e..75c151754d 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp @@ -53,6 +53,8 @@ constexpr bool kBool1Value{false}; constexpr bool kBool2Value{true}; constexpr bool kBool3Value{false}; +constexpr int kMaximumTimeSpinningSec{1}; + #define EVAL_REPLAYED_PRIMITIVES(replayed_primitives) \ EXPECT_THAT( \ replayed_primitives, \ @@ -104,12 +106,12 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture // around without any knowledge about message chronology. It just picks // the next one Make sure to keep the list in order or sort it! std::vector> messages = - {serialize_test_message(kTopic1Name, 500, primitive_message1), - serialize_test_message(kTopic2Name, 550, complex_message1), - serialize_test_message(kTopic1Name, 700, primitive_message1), - serialize_test_message(kTopic2Name, 750, complex_message1), - serialize_test_message(kTopic1Name, 900, primitive_message1), - serialize_test_message(kTopic2Name, 950, complex_message1)}; + {serialize_test_message(kTopic1Name, 100, primitive_message1), + serialize_test_message(kTopic2Name, 120, complex_message1), + serialize_test_message(kTopic1Name, 200, primitive_message1), + serialize_test_message(kTopic2Name, 220, complex_message1), + serialize_test_message(kTopic1Name, 300, primitive_message1), + serialize_test_message(kTopic2Name, 320, complex_message1)}; // @} return messages; } @@ -128,11 +130,12 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(milliseconds)); player_ = std::make_shared(std::move(reader), storage_options_, play_options_); + // Wait for discovery to match publishers with subscribers ASSERT_TRUE( sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), std::chrono::seconds(5))); - auto await_received_messages = sub_->spin_subscriptions(); + auto await_received_messages = sub_->spin_subscriptions(kMaximumTimeSpinningSec); ASSERT_TRUE(player_->play()); @@ -145,7 +148,7 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture TEST_F(RosBag2PlayForTestFixture, play_for_all_are_played_due_to_duration) { - InitPlayerWithPlaybackDurationAndPlay(1000); + InitPlayerWithPlaybackDurationAndPlay(350); auto replayed_test_primitives = sub_->get_received_messages( kTopic1); @@ -161,7 +164,7 @@ TEST_F(RosBag2PlayForTestFixture, play_for_all_are_played_due_to_duration) TEST_F(RosBag2PlayForTestFixture, play_for_none_are_played_due_to_duration) { - InitPlayerWithPlaybackDurationAndPlay(300); + InitPlayerWithPlaybackDurationAndPlay(50); auto replayed_test_primitives = sub_->get_received_messages( kTopic1); @@ -174,7 +177,7 @@ TEST_F(RosBag2PlayForTestFixture, play_for_none_are_played_due_to_duration) TEST_F(RosBag2PlayForTestFixture, play_for_less_than_the_total_duration) { - InitPlayerWithPlaybackDurationAndPlay(800); + InitPlayerWithPlaybackDurationAndPlay(270); auto replayed_test_primitives = sub_->get_received_messages( kTopic1); @@ -202,7 +205,7 @@ TEST_F( RosBag2PlayForFilteredTopicTestFixture, play_for_full_duration_recorded_messages_with_filtered_topics) { - InitPlayerWithPlaybackDurationAndPlay(1000); + InitPlayerWithPlaybackDurationAndPlay(400); auto replayed_test_primitives = sub_->get_received_messages("/topic1"); @@ -220,7 +223,7 @@ TEST_F( RosBag2PlayForFilteredTopicTestFixture, play_for_short_duration_recorded_messages_with_filtered_topics) { - InitPlayerWithPlaybackDurationAndPlay(300); + InitPlayerWithPlaybackDurationAndPlay(50); auto replayed_test_primitives = sub_->get_received_messages("/topic1"); @@ -236,7 +239,7 @@ TEST_F( RosBag2PlayForFilteredTopicTestFixture, play_for_intermediate_duration_recorded_messages_with_filtered_topics) { - InitPlayerWithPlaybackDurationAndPlay(800); + InitPlayerWithPlaybackDurationAndPlay(270); auto replayed_test_primitives = sub_->get_received_messages("/topic1"); @@ -271,9 +274,9 @@ class RosBag2PlayForInterruptedTestFixture : public RosBag2PlayTestFixture // around without any knowledge about message chronology. It just picks // the next one Make sure to keep the list in order or sort it! std::vector> messages = - {serialize_test_message(kTopic1Name, 500, primitive_message), - serialize_test_message(kTopic1Name, 700, primitive_message), - serialize_test_message(kTopic1Name, 900, primitive_message)}; + {serialize_test_message(kTopic1Name, 50, primitive_message), + serialize_test_message(kTopic1Name, 100, primitive_message), + serialize_test_message(kTopic1Name, 150, primitive_message)}; // @} return messages; } @@ -283,7 +286,6 @@ TEST_F( RosBag2PlayForInterruptedTestFixture, play_should_return_false_when_interrupted) { - auto topic_types = get_topic_types(); auto messages = get_serialized_messages(); @@ -291,34 +293,27 @@ TEST_F( prepared_mock_reader->prepare(messages, topic_types); auto reader = std::make_unique(std::move(prepared_mock_reader)); - sub_->add_subscription(kTopic1, 3); + // Let the player only reproduce one message. + sub_->add_subscription(kTopic1, 1); + play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(75)); - play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(1000)); std::shared_ptr player_ = std::make_shared( - std::move( - reader), storage_options_, play_options_); + std::move(reader), storage_options_, play_options_); + // Wait for discovery to match publishers with subscribers ASSERT_TRUE( sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), std::chrono::seconds(5))); - auto await_received_messages = sub_->spin_subscriptions(); - - auto play_execution_1 = std::async(std::launch::async, [player_]() {return player_->play();}); - auto play_execution_2 = std::async(std::launch::async, [player_]() {return player_->play();}); + auto await_received_messages = sub_->spin_subscriptions(kMaximumTimeSpinningSec); + player_->pause(); + auto player_future = std::async(std::launch::async, [player_]() {return player_->play();}); + player_->wait_for_playback_to_start(); + ASSERT_TRUE(player_->is_paused()); + ASSERT_FALSE(player_->play()); + player_->resume(); + player_future.get(); await_received_messages.get(); - - const auto play_execution_1_wait_result = play_execution_1.wait_for( - std::chrono::milliseconds( - 1500)); - const auto play_execution_2_wait_result = play_execution_2.wait_for( - std::chrono::milliseconds( - 1500)); - - ASSERT_EQ(std::future_status::ready, play_execution_1_wait_result); - ASSERT_EQ(std::future_status::ready, play_execution_2_wait_result); - - ASSERT_TRUE( - (play_execution_1.get() && !play_execution_2.get()) || - (!play_execution_1.get() && play_execution_2.get())); + auto replayed_topic1 = sub_->get_received_messages(kTopic1); + EXPECT_THAT(replayed_topic1, SizeIs(1)); } From 70b58499726614c87102b8f43988568e4fa047ef Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sun, 27 Mar 2022 17:38:52 -0700 Subject: [PATCH 6/9] Redesign tests in test_play_for.cpp (#17) * Redesigned tests to be more deterministic and running faster * Fixed bug in `play_for()` flow when replaying in loop or multiple times from the same player instance. Signed-off-by: Michael Orlov Signed-off-by: Geoffrey Biggs --- .../subscription_manager.hpp | 13 +- .../include/rosbag2_transport/player.hpp | 1 + .../src/rosbag2_transport/player.cpp | 11 +- .../test/rosbag2_transport/test_play_for.cpp | 237 +++++++++--------- 4 files changed, 136 insertions(+), 126 deletions(-) diff --git a/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp b/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp index 9fe9bc3a88..f37fa75477 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp @@ -117,15 +117,13 @@ class SubscriptionManager return matched; } - std::future spin_subscriptions(int maximum_time_spinning_sec = 10) + std::future spin_subscriptions() { return async( - std::launch::async, [this, maximum_time_spinning_sec]() { + std::launch::async, [this]() { rclcpp::executors::SingleThreadedExecutor exec; auto start = std::chrono::high_resolution_clock::now(); - while (rclcpp::ok() && - continue_spinning(expected_topics_with_size_, start, maximum_time_spinning_sec)) - { + while (rclcpp::ok() && continue_spinning(expected_topics_with_size_, start)) { exec.spin_node_some(subscriber_node_); } }); @@ -134,11 +132,10 @@ class SubscriptionManager private: bool continue_spinning( const std::unordered_map & expected_topics_with_sizes, - std::chrono::time_point start_time, - int maximum_time_spinning_sec = 10) + std::chrono::time_point start_time) { auto current = std::chrono::high_resolution_clock::now(); - if (current - start_time > std::chrono::seconds(maximum_time_spinning_sec)) { + if (current - start_time > std::chrono::seconds(10)) { return false; } diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 98e15a3576..e68f0e1093 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -222,6 +222,7 @@ class Player : public rclcpp::Node rosbag2_transport::PlayOptions play_options_; moodycamel::ReaderWriterQueue message_queue_; mutable std::future storage_loading_future_; + std::atomic_bool load_storage_content_{true}; std::unordered_map topic_qos_profile_overrides_; std::unique_ptr clock_; std::shared_ptr clock_publish_timer_; diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index d5bedc6465..866808f2e6 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -225,9 +225,14 @@ bool Player::play() reader_->seek(starting_time_); clock_->jump(starting_time_); } + load_storage_content_ = true; storage_loading_future_ = std::async(std::launch::async, [this]() {load_storage_content();}); wait_for_filled_queue(); play_messages_from_queue(play_until_time); + + load_storage_content_ = false; + if (storage_loading_future_.valid()) {storage_loading_future_.get();} + while (message_queue_.pop()) {} // cleanup queue { std::lock_guard lk(ready_to_play_from_queue_mutex_); is_ready_to_play_from_queue_ = false; @@ -236,6 +241,9 @@ bool Player::play() } while (rclcpp::ok() && play_options_.loop); } catch (std::runtime_error & e) { RCLCPP_ERROR(get_logger(), "Failed to play: %s", e.what()); + load_storage_content_ = false; + if (storage_loading_future_.valid()) {storage_loading_future_.get();} + while (message_queue_.pop()) {} // cleanup queue } std::lock_guard lk(ready_to_play_from_queue_mutex_); is_ready_to_play_from_queue_ = false; @@ -409,6 +417,7 @@ void Player::seek(rcutils_time_point_value_t time_point) // Restart queuing thread if it has finished running (previously reached end of bag), // otherwise, queueing should continue automatically after releasing mutex if (is_storage_completely_loaded() && rclcpp::ok()) { + load_storage_content_ = true; storage_loading_future_ = std::async(std::launch::async, [this]() {load_storage_content();}); } @@ -431,7 +440,7 @@ void Player::load_storage_content() static_cast(play_options_.read_ahead_queue_size * read_ahead_lower_bound_percentage_); auto queue_upper_boundary = play_options_.read_ahead_queue_size; - while (rclcpp::ok()) { + while (rclcpp::ok() && load_storage_content_) { TSAUniqueLock lk(reader_mutex_); if (!reader_->has_next()) {break;} diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp index 75c151754d..66c0346afd 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp @@ -53,8 +53,6 @@ constexpr bool kBool1Value{false}; constexpr bool kBool2Value{true}; constexpr bool kBool3Value{false}; -constexpr int kMaximumTimeSpinningSec{1}; - #define EVAL_REPLAYED_PRIMITIVES(replayed_primitives) \ EXPECT_THAT( \ replayed_primitives, \ @@ -81,15 +79,15 @@ constexpr int kMaximumTimeSpinningSec{1}; class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture { public: - static constexpr const char * kTopic1Name{"topic1"}; - static constexpr const char * kTopic2Name{"topic2"}; - static constexpr const char * kTopic1{"/topic1"}; - static constexpr const char * kTopic2{"/topic2"}; + static constexpr const char * kTopic1Name_{"topic1"}; + static constexpr const char * kTopic2Name_{"topic2"}; + static constexpr const char * kTopic1_{"/topic1"}; + static constexpr const char * kTopic2_{"/topic2"}; std::vector get_topic_types() { - return {{kTopic1Name, "test_msgs/BasicTypes", "", ""}, - {kTopic2Name, "test_msgs/Arrays", "", ""}}; + return {{kTopic1Name_, "test_msgs/BasicTypes", "", ""}, + {kTopic2Name_, "test_msgs/Arrays", "", ""}}; } std::vector> @@ -106,17 +104,20 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture // around without any knowledge about message chronology. It just picks // the next one Make sure to keep the list in order or sort it! std::vector> messages = - {serialize_test_message(kTopic1Name, 100, primitive_message1), - serialize_test_message(kTopic2Name, 120, complex_message1), - serialize_test_message(kTopic1Name, 200, primitive_message1), - serialize_test_message(kTopic2Name, 220, complex_message1), - serialize_test_message(kTopic1Name, 300, primitive_message1), - serialize_test_message(kTopic2Name, 320, complex_message1)}; + {serialize_test_message(kTopic1Name_, 100, primitive_message1), + serialize_test_message(kTopic2Name_, 120, complex_message1), + serialize_test_message(kTopic1Name_, 200, primitive_message1), + serialize_test_message(kTopic2Name_, 220, complex_message1), + serialize_test_message(kTopic1Name_, 300, primitive_message1), + serialize_test_message(kTopic2Name_, 320, complex_message1)}; // @} return messages; } - void InitPlayerWithPlaybackDurationAndPlay(int64_t milliseconds) + void InitPlayerWithPlaybackDurationAndPlay( + int64_t milliseconds, + size_t expected_number_of_messages_on_topic1 = 3, + size_t expected_number_of_messages_on_topic2 = 3) { auto topic_types = get_topic_types(); auto messages = get_serialized_messages(); @@ -125,20 +126,18 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture prepared_mock_reader->prepare(messages, topic_types); auto reader = std::make_unique(std::move(prepared_mock_reader)); - sub_->add_subscription(kTopic1, 3); - sub_->add_subscription(kTopic2, 3); + sub_->add_subscription( + kTopic1_, expected_number_of_messages_on_topic1); + sub_->add_subscription(kTopic2_, expected_number_of_messages_on_topic2); play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(milliseconds)); player_ = std::make_shared(std::move(reader), storage_options_, play_options_); // Wait for discovery to match publishers with subscribers - ASSERT_TRUE( - sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), std::chrono::seconds(5))); - - auto await_received_messages = sub_->spin_subscriptions(kMaximumTimeSpinningSec); + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + auto await_received_messages = sub_->spin_subscriptions(); ASSERT_TRUE(player_->play()); - await_received_messages.get(); } @@ -151,12 +150,12 @@ TEST_F(RosBag2PlayForTestFixture, play_for_all_are_played_due_to_duration) InitPlayerWithPlaybackDurationAndPlay(350); auto replayed_test_primitives = sub_->get_received_messages( - kTopic1); + kTopic1_); EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(3u))); EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives); auto replayed_test_arrays = sub_->get_received_messages( - kTopic2); + kTopic2_); EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(3u))); EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); @@ -164,82 +163,110 @@ TEST_F(RosBag2PlayForTestFixture, play_for_all_are_played_due_to_duration) TEST_F(RosBag2PlayForTestFixture, play_for_none_are_played_due_to_duration) { - InitPlayerWithPlaybackDurationAndPlay(50); + auto primitive_message1 = get_messages_basic_types()[0]; + auto primitive_message2 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 1; + primitive_message2->int32_value = 2; - auto replayed_test_primitives = sub_->get_received_messages( - kTopic1); - EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u))); + auto topic_types = std::vector{ + {kTopic1Name_, "test_msgs/BasicTypes", "", ""}}; - auto replayed_test_arrays = sub_->get_received_messages( - kTopic2); - EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(0u))); -} + std::vector> messages = + { + serialize_test_message(kTopic1Name_, 50, primitive_message1), + serialize_test_message(kTopic1Name_, 100, primitive_message2), + }; -TEST_F(RosBag2PlayForTestFixture, play_for_less_than_the_total_duration) -{ - InitPlayerWithPlaybackDurationAndPlay(270); + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); - auto replayed_test_primitives = sub_->get_received_messages( - kTopic1); - EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(2u))); - EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives); + // Expect to receive messages only from play_next in second round + sub_->add_subscription(kTopic1_, messages.size()); + play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(49)); - auto replayed_test_arrays = sub_->get_received_messages( - kTopic2); - EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(2u))); - EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); - EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); + std::shared_ptr player_ = std::make_shared( + std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + ASSERT_TRUE(player_->play()); + + // Playing one more time with play_next to save time and count messages + player_->pause(); + auto player_future = std::async(std::launch::async, [&player_]() -> void {player_->play();}); + + EXPECT_TRUE(player_->play_next()); + EXPECT_TRUE(player_->play_next()); + EXPECT_FALSE(player_->play_next()); + player_->resume(); + player_future.get(); + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); + EXPECT_THAT(replayed_topic1, SizeIs(2)); + EXPECT_EQ(replayed_topic1[0]->int32_value, 1); + EXPECT_EQ(replayed_topic1[1]->int32_value, 2); } -class RosBag2PlayForFilteredTopicTestFixture : public RosBag2PlayForTestFixture +TEST_F(RosBag2PlayForTestFixture, play_for_less_than_the_total_duration) { -public: - void SetUp() override + auto primitive_message1 = get_messages_basic_types()[0]; + auto primitive_message2 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 1; + primitive_message2->int32_value = 2; + + auto topic_types = std::vector{ + {kTopic1Name_, "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = { - // Filter allows /topic2, blocks /topic1 - play_options_.topics_to_filter = {"topic2"}; - } -}; + serialize_test_message(kTopic1Name_, 10, primitive_message1), + serialize_test_message(kTopic1Name_, 50, primitive_message2), + }; -TEST_F( - RosBag2PlayForFilteredTopicTestFixture, - play_for_full_duration_recorded_messages_with_filtered_topics) -{ - InitPlayerWithPlaybackDurationAndPlay(400); + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); - auto replayed_test_primitives = - sub_->get_received_messages("/topic1"); - // No messages are allowed to have arrived - EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u))); + // Expect to receive 1 message from play() and 2 messages from play_next in second round + sub_->add_subscription(kTopic1_, messages.size() + 1); + play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(49)); - auto replayed_test_arrays = sub_->get_received_messages("/topic2"); - // All messages should have arrived. - EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(3u))); - EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); - EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); -} + std::shared_ptr player_ = std::make_shared( + std::move(reader), storage_options_, play_options_); -TEST_F( - RosBag2PlayForFilteredTopicTestFixture, - play_for_short_duration_recorded_messages_with_filtered_topics) -{ - InitPlayerWithPlaybackDurationAndPlay(50); + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); - auto replayed_test_primitives = - sub_->get_received_messages("/topic1"); - // No messages are allowed to have arrived - EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u))); + auto await_received_messages = sub_->spin_subscriptions(); + ASSERT_TRUE(player_->play()); - auto replayed_test_arrays = sub_->get_received_messages("/topic2"); - // All messages should have arrived. - EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(0u))); + // Playing one more time with play_next to save time and count messages + player_->pause(); + auto player_future = std::async(std::launch::async, [&player_]() -> void {player_->play();}); + + ASSERT_TRUE(player_->play_next()); + ASSERT_TRUE(player_->play_next()); + ASSERT_FALSE(player_->play_next()); + player_->resume(); + player_future.get(); + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); + EXPECT_THAT(replayed_topic1, SizeIs(3)); + EXPECT_EQ(replayed_topic1[0]->int32_value, 1); + EXPECT_EQ(replayed_topic1[1]->int32_value, 1); + EXPECT_EQ(replayed_topic1[2]->int32_value, 2); } TEST_F( - RosBag2PlayForFilteredTopicTestFixture, + RosBag2PlayForTestFixture, play_for_intermediate_duration_recorded_messages_with_filtered_topics) { - InitPlayerWithPlaybackDurationAndPlay(270); + // Filter allows /topic2, blocks /topic1 + play_options_.topics_to_filter = {"topic2"}; + InitPlayerWithPlaybackDurationAndPlay(270, 0, 2); auto replayed_test_primitives = sub_->get_received_messages("/topic1"); @@ -253,58 +280,34 @@ TEST_F( EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); } -class RosBag2PlayForInterruptedTestFixture : public RosBag2PlayTestFixture +TEST_F(RosBag2PlayForTestFixture, play_should_return_false_when_interrupted) { -public: - static constexpr const char * kTopic1Name{"topic1"}; - static constexpr const char * kTopic1{"/topic1"}; + auto topic_types = std::vector{ + {kTopic1Name_, "test_msgs/BasicTypes", "", ""}}; - std::vector get_topic_types() + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = kIntValue; + std::vector> messages = { - return {{kTopic1Name, "test_msgs/BasicTypes", "", ""}}; - } - - std::vector> - get_serialized_messages() - { - auto primitive_message = get_messages_basic_types()[0]; - primitive_message->int32_value = kIntValue; - - // @{ Ordering matters. The mock reader implementation moves messages - // around without any knowledge about message chronology. It just picks - // the next one Make sure to keep the list in order or sort it! - std::vector> messages = - {serialize_test_message(kTopic1Name, 50, primitive_message), - serialize_test_message(kTopic1Name, 100, primitive_message), - serialize_test_message(kTopic1Name, 150, primitive_message)}; - // @} - return messages; - } -}; - -TEST_F( - RosBag2PlayForInterruptedTestFixture, - play_should_return_false_when_interrupted) -{ - auto topic_types = get_topic_types(); - auto messages = get_serialized_messages(); + serialize_test_message(kTopic1Name_, 50, primitive_message), + serialize_test_message(kTopic1Name_, 100, primitive_message), + }; auto prepared_mock_reader = std::make_unique(); prepared_mock_reader->prepare(messages, topic_types); auto reader = std::make_unique(std::move(prepared_mock_reader)); // Let the player only reproduce one message. - sub_->add_subscription(kTopic1, 1); + sub_->add_subscription(kTopic1_, 1); play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(75)); std::shared_ptr player_ = std::make_shared( std::move(reader), storage_options_, play_options_); // Wait for discovery to match publishers with subscribers - ASSERT_TRUE( - sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), std::chrono::seconds(5))); + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); - auto await_received_messages = sub_->spin_subscriptions(kMaximumTimeSpinningSec); + auto await_received_messages = sub_->spin_subscriptions(); player_->pause(); auto player_future = std::async(std::launch::async, [player_]() {return player_->play();}); player_->wait_for_playback_to_start(); @@ -314,6 +317,6 @@ TEST_F( player_->resume(); player_future.get(); await_received_messages.get(); - auto replayed_topic1 = sub_->get_received_messages(kTopic1); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); EXPECT_THAT(replayed_topic1, SizeIs(1)); } From 97dd138f55cd7eca30252b912f30ef37bbb9db61 Mon Sep 17 00:00:00 2001 From: Geoffrey Biggs Date: Thu, 14 Apr 2022 09:31:28 +0900 Subject: [PATCH 7/9] Remove unnecessary source file from test binary Signed-off-by: Geoffrey Biggs --- rosbag2_transport/CMakeLists.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index e835392e18..20020c84a2 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -111,7 +111,6 @@ function(create_tests_for_rmw_implementation) ${SKIP_TEST}) rosbag2_transport_add_gmock(test_play_for - src/rosbag2_transport/qos.cpp test/rosbag2_transport/test_play_for.cpp INCLUDE_DIRS $ LINK_LIBS rosbag2_transport From 5a8a0a7c7edd3ea921dd7825faa8058c23b32b70 Mon Sep 17 00:00:00 2001 From: Geoffrey Biggs Date: Wed, 1 Jun 2022 17:55:55 +0900 Subject: [PATCH 8/9] Correct errors introduced by rebase Signed-off-by: Geoffrey Biggs --- .../include/rosbag2_transport/player.hpp | 1 - .../src/rosbag2_transport/player.cpp | 19 ++----------------- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index e68f0e1093..66821e2382 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -242,7 +242,6 @@ class Player : public rclcpp::Node rclcpp::Service::SharedPtr srv_set_rate_; rclcpp::Service::SharedPtr srv_play_; rclcpp::Service::SharedPtr srv_play_next_; - rclcpp::Service::SharedPtr srv_play_for_; rclcpp::Service::SharedPtr srv_burst_; rclcpp::Service::SharedPtr srv_seek_; diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 866808f2e6..8599455ed3 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -479,13 +479,12 @@ void Player::play_messages_from_queue(const rcutils_duration_value_t & play_unti ready_to_play_from_queue_cv_.notify_all(); } while (message_ptr != nullptr && rclcpp::ok()) { - rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr; - if (play_until_time >= starting_time_ && message->time_stamp > play_until_time) { + if (play_until_time >= starting_time_ && message_ptr->time_stamp > play_until_time) { break; } // Do not move on until sleep_until returns true // It will always sleep, so this is not a tight busy loop on pause - while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) { + while (rclcpp::ok() && !clock_->sleep_until(message_ptr->time_stamp)) { if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) { break; } @@ -723,20 +722,6 @@ void Player::create_control_services() { response->success = play_next(); }); - srv_play_for_ = create_service( - "~/play_for", - [this]( - const std::shared_ptr/* request_header */, - const std::shared_ptr request, - const std::shared_ptr response) - { - const rcutils_duration_value_t duration = - static_cast(request->duration.sec) * - static_cast(1000000000) + - static_cast(request->duration.nanosec); - play({duration}); - response->success = true; - }); srv_burst_ = create_service( "~/burst", [this]( From 38ed671366f55ab31208c65dc70ca0c2cc17c0ac Mon Sep 17 00:00:00 2001 From: Geoffrey Biggs Date: Thu, 2 Jun 2022 10:53:45 +0900 Subject: [PATCH 9/9] Correct play_next behaviour Signed-off-by: Geoffrey Biggs --- .../include/rosbag2_transport/player.hpp | 3 ++- .../src/rosbag2_transport/player.cpp | 19 +++++++++++-------- .../test/rosbag2_transport/test_play_for.cpp | 10 ++-------- 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 66821e2382..9088f14fba 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -200,7 +200,7 @@ class Player : public rclcpp::Node bool is_storage_completely_loaded() const; void enqueue_up_to_boundary(size_t boundary) RCPPUTILS_TSA_REQUIRES(reader_mutex_); void wait_for_filled_queue() const; - void play_messages_from_queue(const rcutils_duration_value_t & play_until_time); + void play_messages_from_queue(); void prepare_publishers(); bool publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message); static constexpr double read_ahead_lower_bound_percentage_ = 0.9; @@ -220,6 +220,7 @@ class Player : public rclcpp::Node rosbag2_storage::StorageOptions storage_options_; rosbag2_transport::PlayOptions play_options_; + rcutils_time_point_value_t play_until_time_ = -1; moodycamel::ReaderWriterQueue message_queue_; mutable std::future storage_loading_future_; std::atomic_bool load_storage_content_{true}; diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 8599455ed3..e54b951e6a 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -159,6 +159,10 @@ Player::Player( set_rate(play_options_.rate); topic_qos_profile_overrides_ = play_options_.topic_qos_profile_overrides; prepare_publishers(); + + if (play_options_.playback_duration >= rclcpp::Duration(0, 0)) { + play_until_time_ = starting_time_ + play_options_.playback_duration.nanoseconds(); + } } create_control_services(); add_keyboard_callbacks(); @@ -207,11 +211,7 @@ bool Player::play() "Invalid delay value: " << play_options_.delay.nanoseconds() << ". Delay is disabled."); } - rcutils_time_point_value_t play_until_time = -1; - if (play_options_.playback_duration >= rclcpp::Duration(0, 0)) { - play_until_time = starting_time_ + play_options_.playback_duration.nanoseconds(); - } - RCLCPP_INFO_STREAM(get_logger(), "Playback duration value: " << play_until_time); + RCLCPP_INFO_STREAM(get_logger(), "Playback duration value: " << play_until_time_); try { do { @@ -228,7 +228,7 @@ bool Player::play() load_storage_content_ = true; storage_loading_future_ = std::async(std::launch::async, [this]() {load_storage_content();}); wait_for_filled_queue(); - play_messages_from_queue(play_until_time); + play_messages_from_queue(); load_storage_content_ = false; if (storage_loading_future_.valid()) {storage_loading_future_.get();} @@ -366,6 +366,9 @@ bool Player::play_next() bool next_message_published = false; while (message_ptr != nullptr && !next_message_published) { + if (play_until_time_ >= starting_time_ && message_ptr->time_stamp > play_until_time_) { + break; + } { next_message_published = publish_message(message_ptr); clock_->jump(message_ptr->time_stamp); @@ -465,7 +468,7 @@ void Player::enqueue_up_to_boundary(size_t boundary) } } -void Player::play_messages_from_queue(const rcutils_duration_value_t & play_until_time) +void Player::play_messages_from_queue() { // Note: We need to use message_queue_.peek() instead of message_queue_.try_dequeue(message) // to support play_next() API logic. @@ -479,7 +482,7 @@ void Player::play_messages_from_queue(const rcutils_duration_value_t & play_unti ready_to_play_from_queue_cv_.notify_all(); } while (message_ptr != nullptr && rclcpp::ok()) { - if (play_until_time >= starting_time_ && message_ptr->time_stamp > play_until_time) { + if (play_until_time_ >= starting_time_ && message_ptr->time_stamp > play_until_time_) { break; } // Do not move on until sleep_until returns true diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp index 66c0346afd..0ec975369f 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp @@ -198,16 +198,12 @@ TEST_F(RosBag2PlayForTestFixture, play_for_none_are_played_due_to_duration) player_->pause(); auto player_future = std::async(std::launch::async, [&player_]() -> void {player_->play();}); - EXPECT_TRUE(player_->play_next()); - EXPECT_TRUE(player_->play_next()); EXPECT_FALSE(player_->play_next()); player_->resume(); player_future.get(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages(kTopic1_); - EXPECT_THAT(replayed_topic1, SizeIs(2)); - EXPECT_EQ(replayed_topic1[0]->int32_value, 1); - EXPECT_EQ(replayed_topic1[1]->int32_value, 2); + EXPECT_THAT(replayed_topic1, SizeIs(0)); } TEST_F(RosBag2PlayForTestFixture, play_for_less_than_the_total_duration) @@ -247,17 +243,15 @@ TEST_F(RosBag2PlayForTestFixture, play_for_less_than_the_total_duration) player_->pause(); auto player_future = std::async(std::launch::async, [&player_]() -> void {player_->play();}); - ASSERT_TRUE(player_->play_next()); ASSERT_TRUE(player_->play_next()); ASSERT_FALSE(player_->play_next()); player_->resume(); player_future.get(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages(kTopic1_); - EXPECT_THAT(replayed_topic1, SizeIs(3)); + EXPECT_THAT(replayed_topic1, SizeIs(2)); EXPECT_EQ(replayed_topic1[0]->int32_value, 1); EXPECT_EQ(replayed_topic1[1]->int32_value, 1); - EXPECT_EQ(replayed_topic1[2]->int32_value, 2); } TEST_F(