diff --git a/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp b/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp index 392fa8f343..f37fa75477 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp @@ -73,6 +73,50 @@ class SubscriptionManager return received_messages_on_topic; } + /// \brief Wait until publishers will be connected to the subscribers or timeout occur. + /// \tparam Timeout Data type for timeout duration from std::chrono:: namespace + /// \param publishers List of raw pointers to the publishers + /// \param timeout Maximum time duration during which discovery should happen. + /// \param n_subscribers_to_match Number of subscribers each publisher should have for match. + /// \return true if publishers have specified number of subscribers, otherwise false. + template + bool spin_and_wait_for_matched( + const std::vector & publishers, + Timeout timeout, size_t n_subscribers_to_match = 1) + { + // Sanity check that we have valid input + for (const auto publisher_ptr : publishers) { + if (publisher_ptr == nullptr) { + throw std::invalid_argument("Null pointer in publisher list"); + } + std::string topic_name{publisher_ptr->get_topic_name()}; + if (expected_topics_with_size_.find(topic_name) == expected_topics_with_size_.end()) { + throw std::invalid_argument( + "Publisher's topic name = `" + topic_name + "` not found in expected topics list"); + } + } + + using clock = std::chrono::steady_clock; + auto start = clock::now(); + + rclcpp::executors::SingleThreadedExecutor exec; + bool matched = false; + while (!matched && ((clock::now() - start) < timeout)) { + exec.spin_node_some(subscriber_node_); + + matched = true; + for (const auto publisher_ptr : publishers) { + if (publisher_ptr->get_subscription_count() + + publisher_ptr->get_intra_process_subscription_count() < n_subscribers_to_match) + { + matched = false; + break; + } + } + } + return matched; + } + std::future spin_subscriptions() { return async( @@ -87,7 +131,7 @@ class SubscriptionManager private: bool continue_spinning( - std::unordered_map expected_topics_with_sizes, + const std::unordered_map & expected_topics_with_sizes, std::chrono::time_point start_time) { auto current = std::chrono::high_resolution_clock::now(); diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index 59ade9645d..e6f03b6724 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -125,6 +125,12 @@ function(create_tests_for_rmw_implementation) AMENT_DEPS test_msgs rosbag2_test_common ${SKIP_TEST}) + rosbag2_transport_add_gmock(test_play_next + test/rosbag2_transport/test_play_next.cpp + INCLUDE_DIRS $ + LINK_LIBS rosbag2_transport + AMENT_DEPS test_msgs rosbag2_test_common) + rosbag2_transport_add_gmock(test_qos src/rosbag2_transport/qos.cpp test/rosbag2_transport/test_qos.cpp diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 1c5ef3ebbc..545bcef7aa 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -112,14 +112,30 @@ class Player : public rclcpp::Node ROSBAG2_TRANSPORT_PUBLIC bool set_rate(double); + /// \brief Playing next message from queue when in pause. + /// \details This is blocking call and it will wait until next available message will be + /// published or rclcpp context shut down. + /// \note If internal player queue is starving and storage has not been completely loaded, + /// this method will wait until new element will be pushed to the queue. + /// \return true if Player::play() has been started, player in pause mode and successfully + /// played next message, otherwise false. + ROSBAG2_TRANSPORT_PUBLIC + bool play_next(); + +protected: + std::atomic playing_messages_from_queue_{false}; + rclcpp::Publisher::SharedPtr clock_publisher_; + std::unordered_map> publishers_; + private: + rosbag2_storage::SerializedBagMessageSharedPtr * peek_next_message_from_queue(); void load_storage_content(); bool is_storage_completely_loaded() const; void enqueue_up_to_boundary(uint64_t boundary); void wait_for_filled_queue() const; void play_messages_from_queue(); - void play_messages_until_queue_empty(); void prepare_publishers(); + bool publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message); static constexpr double read_ahead_lower_bound_percentage_ = 0.9; static const std::chrono::milliseconds queue_read_wait_period_; @@ -128,11 +144,13 @@ class Player : public rclcpp::Node rosbag2_transport::PlayOptions play_options_; moodycamel::ReaderWriterQueue message_queue_; mutable std::future storage_loading_future_; - std::unordered_map> publishers_; std::unordered_map topic_qos_profile_overrides_; std::unique_ptr clock_; - rclcpp::Publisher::SharedPtr clock_publisher_; std::shared_ptr clock_publish_timer_; + std::mutex skip_message_in_main_play_loop_mutex_; + bool skip_message_in_main_play_loop_ RCPPUTILS_TSA_GUARDED_BY + (skip_message_in_main_play_loop_mutex_) = false; + std::atomic_bool is_in_play_{false}; rclcpp::Service::SharedPtr srv_pause_; rclcpp::Service::SharedPtr srv_resume_; diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 9d12026334..e0e411c50e 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -203,25 +203,26 @@ bool Player::is_storage_completely_loaded() const void Player::play() { + is_in_play_ = true; try { do { reader_->open(storage_options_, {"", rmw_get_serialization_format()}); const auto starting_time = std::chrono::duration_cast( reader_->get_metadata().starting_time.time_since_epoch()).count(); + clock_->jump(starting_time); storage_loading_future_ = std::async( std::launch::async, [this]() {load_storage_content();}); wait_for_filled_queue(); - - clock_->jump(starting_time); play_messages_from_queue(); reader_->close(); } while (rclcpp::ok() && play_options_.loop); } catch (std::runtime_error & e) { RCLCPP_ERROR(this->get_logger(), "Failed to play: %s", e.what()); } + is_in_play_ = false; } void Player::pause() @@ -254,6 +255,47 @@ bool Player::set_rate(double rate) return clock_->set_rate(rate); } +rosbag2_storage::SerializedBagMessageSharedPtr * Player::peek_next_message_from_queue() +{ + rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = message_queue_.peek(); + if (message_ptr == nullptr && !is_storage_completely_loaded() && rclcpp::ok()) { + RCLCPP_WARN( + this->get_logger(), + "Message queue starved. Messages will be delayed. Consider " + "increasing the --read-ahead-queue-size option."); + while (message_ptr == nullptr && !is_storage_completely_loaded() && rclcpp::ok()) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + message_ptr = message_queue_.peek(); + } + } + return message_ptr; +} + +bool Player::play_next() +{ + // Temporary take over playback from play_messages_from_queue() + std::lock_guard lk(skip_message_in_main_play_loop_mutex_); + + if (!clock_->is_paused() || !is_in_play_) { + return false; + } + + skip_message_in_main_play_loop_ = true; + rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = peek_next_message_from_queue(); + + bool next_message_published = false; + while (message_ptr != nullptr && !next_message_published) { + { + rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr; + next_message_published = publish_message(message); + clock_->jump(message->time_stamp); + } + message_queue_.pop(); + message_ptr = peek_next_message_from_queue(); + } + return next_message_published; +} + void Player::wait_for_filled_queue() const { while ( @@ -293,31 +335,32 @@ void Player::enqueue_up_to_boundary(uint64_t boundary) void Player::play_messages_from_queue() { - do { - play_messages_until_queue_empty(); - if (!is_storage_completely_loaded() && rclcpp::ok()) { - RCLCPP_WARN( - this->get_logger(), - "Message queue starved. Messages will be delayed. Consider " - "increasing the --read-ahead-queue-size option."); - } - } while (!is_storage_completely_loaded() && rclcpp::ok()); -} - -void Player::play_messages_until_queue_empty() -{ - rosbag2_storage::SerializedBagMessageSharedPtr message; - while (message_queue_.try_dequeue(message) && rclcpp::ok()) { - // Do not move on until sleep_until returns true - // It will always sleep, so this is not a tight busy loop on pause - while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) {} - if (rclcpp::ok()) { - auto publisher_iter = publishers_.find(message->topic_name); - if (publisher_iter != publishers_.end()) { - publisher_iter->second->publish(rclcpp::SerializedMessage(*message->serialized_data.get())); + playing_messages_from_queue_ = true; + // Note: We need to use message_queue_.peek() instead of message_queue_.try_dequeue(message) + // to support play_next() API logic. + rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = peek_next_message_from_queue(); + while (message_ptr != nullptr && rclcpp::ok()) { + { + rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr; + // Do not move on until sleep_until returns true + // It will always sleep, so this is not a tight busy loop on pause + while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) {} + if (rclcpp::ok()) { + { + std::lock_guard lk(skip_message_in_main_play_loop_mutex_); + if (skip_message_in_main_play_loop_) { + skip_message_in_main_play_loop_ = false; + message_ptr = peek_next_message_from_queue(); + continue; + } + } + publish_message(message); } + message_queue_.pop(); + message_ptr = peek_next_message_from_queue(); } } + playing_messages_from_queue_ = false; } void Player::prepare_publishers() @@ -363,8 +406,7 @@ void Player::prepare_publishers() try { publishers_.insert( std::make_pair( - topic.name, this->create_generic_publisher( - topic.name, topic.type, topic_qos))); + topic.name, this->create_generic_publisher(topic.name, topic.type, topic_qos))); } catch (const std::runtime_error & e) { // using a warning log seems better than adding a new option // to ignore some unknown message type library @@ -375,4 +417,15 @@ void Player::prepare_publishers() } } +bool Player::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message) +{ + bool message_published = false; + auto publisher_iter = publishers_.find(message->topic_name); + if (publisher_iter != publishers_.end()) { + publisher_iter->second->publish(rclcpp::SerializedMessage(*message->serialized_data)); + message_published = true; + } + return message_published; +} + } // namespace rosbag2_transport diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp new file mode 100644 index 0000000000..78bf625f44 --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp @@ -0,0 +1,359 @@ +// Copyright 2021, Apex.AI Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include + +#include "rosbag2_play_test_fixture.hpp" +#include "rosbag2_transport/player.hpp" +#include "test_msgs/message_fixtures.hpp" +#include "test_msgs/msg/arrays.hpp" +#include "test_msgs/msg/basic_types.hpp" + +using namespace ::testing; // NOLINT +using namespace rosbag2_transport; // NOLINT +using namespace rosbag2_test_common; // NOLINT + +class MockPlayer : public rosbag2_transport::Player +{ +public: + MockPlayer( + std::unique_ptr reader, + const rosbag2_storage::StorageOptions & storage_options, + const rosbag2_transport::PlayOptions & play_options) + : Player(std::move(reader), storage_options, play_options) + {} + + std::vector get_list_of_publishers() + { + std::vector pub_list; + for (const auto & publisher : publishers_) { + pub_list.push_back(static_cast(publisher.second.get())); + } + return pub_list; + } + + void wait_for_playback_to_start() + { + while (!playing_messages_from_queue_) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } +}; + +TEST_F(RosBag2PlayTestFixture, play_next_with_false_preconditions) { + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = 42; + + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + {serialize_test_message("topic1", 2100, primitive_message)}; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + auto player = std::make_shared(std::move(reader), storage_options_, play_options_); + + ASSERT_FALSE(player->is_paused()); + ASSERT_FALSE(player->play_next()); + + player->pause(); + ASSERT_TRUE(player->is_paused()); + ASSERT_FALSE(player->play_next()); +} + +TEST_F(RosBag2PlayTestFixture, play_next_playing_all_messages_without_delays) { + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = 42; + + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + { + serialize_test_message("topic1", 2100, primitive_message), + serialize_test_message("topic1", 3300, primitive_message), + serialize_test_message("topic1", 4600, primitive_message), + serialize_test_message("topic1", 5900, primitive_message) + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + auto player = std::make_shared(std::move(reader), storage_options_, play_options_); + + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", messages.size()); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE( + sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30))); + + auto await_received_messages = sub_->spin_subscriptions(); + + player->pause(); + ASSERT_TRUE(player->is_paused()); + + auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->wait_for_playback_to_start(); + + ASSERT_TRUE(player->is_paused()); + auto start = std::chrono::steady_clock::now(); + ASSERT_TRUE(player->play_next()); + size_t played_messages = 1; + while (player->play_next()) { + played_messages++; + } + auto replay_time = std::chrono::steady_clock::now() - start; + ASSERT_EQ(played_messages, messages.size()); + ASSERT_THAT(replay_time, Lt(std::chrono::seconds(2))); + ASSERT_TRUE(player->is_paused()); + player->resume(); + player_future.get(); + await_received_messages.get(); + + auto replayed_topic1 = sub_->get_received_messages("/topic1"); + EXPECT_THAT(replayed_topic1, SizeIs(messages.size())); +} + +TEST_F(RosBag2PlayTestFixture, play_next_playing_one_by_one_messages_with_the_same_timestamp) { + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = 42; + + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + { + serialize_test_message("topic1", 1000, primitive_message), + serialize_test_message("topic1", 1000, primitive_message), + serialize_test_message("topic1", 1000, primitive_message), + serialize_test_message("topic1", 1000, primitive_message) + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + auto player = std::make_shared(std::move(reader), storage_options_, play_options_); + + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", messages.size()); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE( + sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30))); + + auto await_received_messages = sub_->spin_subscriptions(); + + player->pause(); + ASSERT_TRUE(player->is_paused()); + + auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->wait_for_playback_to_start(); + + ASSERT_TRUE(player->is_paused()); + ASSERT_TRUE(player->play_next()); + size_t played_messages = 1; + while (player->play_next()) { + // Yield CPU resources for player-play() running in separate thread to make sure that it + // will not play extra messages. + std::this_thread::sleep_for(std::chrono::milliseconds(30)); + played_messages++; + } + ASSERT_EQ(played_messages, messages.size()); + ASSERT_TRUE(player->is_paused()); + player->resume(); + player_future.get(); + await_received_messages.get(); + + auto replayed_topic1 = sub_->get_received_messages("/topic1"); + EXPECT_THAT(replayed_topic1, SizeIs(messages.size())); +} + +TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_play_next) { + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = 42; + + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + { + serialize_test_message("topic1", 1500, primitive_message), + serialize_test_message("topic1", 2500, primitive_message), + serialize_test_message("topic1", 2700, primitive_message), + serialize_test_message("topic1", 2800, primitive_message) + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + auto player = std::make_shared(std::move(reader), storage_options_, play_options_); + + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", messages.size()); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE( + sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30))); + + auto await_received_messages = sub_->spin_subscriptions(); + + player->pause(); + ASSERT_TRUE(player->is_paused()); + + auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->wait_for_playback_to_start(); + + ASSERT_TRUE(player->is_paused()); + ASSERT_TRUE(player->play_next()); + ASSERT_TRUE(player->play_next()); + ASSERT_TRUE(player->is_paused()); + player->resume(); + auto start = std::chrono::steady_clock::now(); + player_future.get(); + auto replay_time = std::chrono::steady_clock::now() - start; + + auto expected_replay_time = + std::chrono::nanoseconds(messages.back()->time_stamp - messages[1]->time_stamp); + // Check for lower bound with some tolerance + ASSERT_THAT(replay_time, Gt(expected_replay_time - std::chrono::milliseconds(50))); + // Check for upper bound with some tolerance + ASSERT_THAT(replay_time, Lt(expected_replay_time + std::chrono::milliseconds(100))); + + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages("/topic1"); + EXPECT_THAT(replayed_topic1, SizeIs(messages.size())); +} + +TEST_F(RosBag2PlayTestFixture, player_can_resume_after_play_next) { + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = 42; + + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + { + serialize_test_message("topic1", 300, primitive_message), + serialize_test_message("topic1", 500, primitive_message), + serialize_test_message("topic1", 700, primitive_message) + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + auto player = std::make_shared(std::move(reader), storage_options_, play_options_); + + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", messages.size()); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE( + sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30))); + + auto await_received_messages = sub_->spin_subscriptions(); + + player->pause(); + ASSERT_TRUE(player->is_paused()); + + auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->wait_for_playback_to_start(); + + ASSERT_TRUE(player->is_paused()); + ASSERT_TRUE(player->play_next()); + ASSERT_TRUE(player->is_paused()); + player->resume(); + player_future.get(); + await_received_messages.get(); + + auto replayed_topic1 = sub_->get_received_messages("/topic1"); + EXPECT_THAT(replayed_topic1, SizeIs(messages.size())); +} + +TEST_F(RosBag2PlayTestFixture, play_next_playing_only_filtered_topics) { + auto primitive_message1 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 42; + + auto complex_message1 = get_messages_arrays()[0]; + complex_message1->float32_values = {{40.0f, 2.0f, 0.0f}}; + complex_message1->bool_values = {{true, false, true}}; + + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}, + {"topic2", "test_msgs/Arrays", "", ""}, + }; + + std::vector> messages = + { + serialize_test_message("topic1", 500, primitive_message1), + serialize_test_message("topic1", 700, primitive_message1), + serialize_test_message("topic1", 900, primitive_message1), + serialize_test_message("topic2", 550, complex_message1), + serialize_test_message("topic2", 750, complex_message1), + serialize_test_message("topic2", 950, complex_message1) + }; + + // Filter allows /topic2, blocks /topic1 + play_options_.topics_to_filter = {"topic2"}; + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + auto player = std::make_shared(std::move(reader), storage_options_, play_options_); + + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", 0); + sub_->add_subscription("/topic2", 3); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE( + sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30))); + + auto await_received_messages = sub_->spin_subscriptions(); + + player->pause(); + ASSERT_TRUE(player->is_paused()); + + auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->wait_for_playback_to_start(); + + ASSERT_TRUE(player->is_paused()); + ASSERT_TRUE(player->play_next()); + + size_t played_messages = 1; + while (player->play_next()) { + played_messages++; + } + ASSERT_EQ(played_messages, 3U); + + ASSERT_TRUE(player->is_paused()); + player->resume(); + player_future.get(); + await_received_messages.get(); + + auto replayed_topic1 = sub_->get_received_messages("/topic1"); + // No messages are allowed to have arrived + EXPECT_THAT(replayed_topic1, SizeIs(0u)); + + auto replayed_topic2 = sub_->get_received_messages("/topic2"); + // All we care is that any messages arrived + EXPECT_THAT(replayed_topic2, SizeIs(Eq(3u))); +}