Skip to content

Commit

Permalink
Add play_next() API to the player class (#762)
Browse files Browse the repository at this point in the history
* Add play_next() API to the player class

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Make play_next() as blocking call. Add wait until next message will be published.

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Simplified logic in play_next().

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Fix for issue with unplayed message in play_next() due to the topic filtration.
Also partially solved issue with undefined behaviour when more than one message recorded with the same timestamp.

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Put skip_message_in_main_play_loop_ under the TSA guard and some renames

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Scoped shared_ptr for message to avoid memory corruption since underlying queue do reinterpret_cast to internal buffer when peeking element.

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Move publishers to protected section and add playing_messages_from_queue_ flag for unit tests

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Add spin_and_wait_for_matched in SubscriptionManager

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Add unit tests for play_next()

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Add lower bound check for expected playback time after play_next() call

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Add one more play_next() iteration for play_respect_messages_timing_after_play_next test

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Add constraint that play_next() will return false if player not in playback mode. i.e. Player::play() hasn't been started

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Move part of the code responsible for the publishing one message to the private publish_message(message) method

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Fix for undefined behaviour when resume() call happen during play_next()

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Replace std::list with std::vector for publishers in spin_and_wait_for_matched

Signed-off-by: Michael Orlov <morlovmr@gmail.com>
  • Loading branch information
MichaelOrlov authored May 13, 2021
1 parent dd50cbb commit c488315
Show file tree
Hide file tree
Showing 5 changed files with 510 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,50 @@ class SubscriptionManager
return received_messages_on_topic;
}

/// \brief Wait until publishers will be connected to the subscribers or timeout occur.
/// \tparam Timeout Data type for timeout duration from std::chrono:: namespace
/// \param publishers List of raw pointers to the publishers
/// \param timeout Maximum time duration during which discovery should happen.
/// \param n_subscribers_to_match Number of subscribers each publisher should have for match.
/// \return true if publishers have specified number of subscribers, otherwise false.
template<typename Timeout>
bool spin_and_wait_for_matched(
const std::vector<rclcpp::PublisherBase *> & publishers,
Timeout timeout, size_t n_subscribers_to_match = 1)
{
// Sanity check that we have valid input
for (const auto publisher_ptr : publishers) {
if (publisher_ptr == nullptr) {
throw std::invalid_argument("Null pointer in publisher list");
}
std::string topic_name{publisher_ptr->get_topic_name()};
if (expected_topics_with_size_.find(topic_name) == expected_topics_with_size_.end()) {
throw std::invalid_argument(
"Publisher's topic name = `" + topic_name + "` not found in expected topics list");
}
}

using clock = std::chrono::steady_clock;
auto start = clock::now();

rclcpp::executors::SingleThreadedExecutor exec;
bool matched = false;
while (!matched && ((clock::now() - start) < timeout)) {
exec.spin_node_some(subscriber_node_);

matched = true;
for (const auto publisher_ptr : publishers) {
if (publisher_ptr->get_subscription_count() +
publisher_ptr->get_intra_process_subscription_count() < n_subscribers_to_match)
{
matched = false;
break;
}
}
}
return matched;
}

std::future<void> spin_subscriptions()
{
return async(
Expand All @@ -87,7 +131,7 @@ class SubscriptionManager

private:
bool continue_spinning(
std::unordered_map<std::string, size_t> expected_topics_with_sizes,
const std::unordered_map<std::string, size_t> & expected_topics_with_sizes,
std::chrono::time_point<std::chrono::high_resolution_clock> start_time)
{
auto current = std::chrono::high_resolution_clock::now();
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ function(create_tests_for_rmw_implementation)
AMENT_DEPS test_msgs rosbag2_test_common
${SKIP_TEST})

rosbag2_transport_add_gmock(test_play_next
test/rosbag2_transport/test_play_next.cpp
INCLUDE_DIRS $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src/rosbag2_transport>
LINK_LIBS rosbag2_transport
AMENT_DEPS test_msgs rosbag2_test_common)

rosbag2_transport_add_gmock(test_qos
src/rosbag2_transport/qos.cpp
test/rosbag2_transport/test_qos.cpp
Expand Down
24 changes: 21 additions & 3 deletions rosbag2_transport/include/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,30 @@ class Player : public rclcpp::Node
ROSBAG2_TRANSPORT_PUBLIC
bool set_rate(double);

/// \brief Playing next message from queue when in pause.
/// \details This is blocking call and it will wait until next available message will be
/// published or rclcpp context shut down.
/// \note If internal player queue is starving and storage has not been completely loaded,
/// this method will wait until new element will be pushed to the queue.
/// \return true if Player::play() has been started, player in pause mode and successfully
/// played next message, otherwise false.
ROSBAG2_TRANSPORT_PUBLIC
bool play_next();

protected:
std::atomic<bool> playing_messages_from_queue_{false};
rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr clock_publisher_;
std::unordered_map<std::string, std::shared_ptr<rclcpp::GenericPublisher>> publishers_;

private:
rosbag2_storage::SerializedBagMessageSharedPtr * peek_next_message_from_queue();
void load_storage_content();
bool is_storage_completely_loaded() const;
void enqueue_up_to_boundary(uint64_t boundary);
void wait_for_filled_queue() const;
void play_messages_from_queue();
void play_messages_until_queue_empty();
void prepare_publishers();
bool publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message);
static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
static const std::chrono::milliseconds queue_read_wait_period_;

Expand All @@ -128,11 +144,13 @@ class Player : public rclcpp::Node
rosbag2_transport::PlayOptions play_options_;
moodycamel::ReaderWriterQueue<rosbag2_storage::SerializedBagMessageSharedPtr> message_queue_;
mutable std::future<void> storage_loading_future_;
std::unordered_map<std::string, std::shared_ptr<rclcpp::GenericPublisher>> publishers_;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unique_ptr<rosbag2_cpp::PlayerClock> clock_;
rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr clock_publisher_;
std::shared_ptr<rclcpp::TimerBase> clock_publish_timer_;
std::mutex skip_message_in_main_play_loop_mutex_;
bool skip_message_in_main_play_loop_ RCPPUTILS_TSA_GUARDED_BY
(skip_message_in_main_play_loop_mutex_) = false;
std::atomic_bool is_in_play_{false};

rclcpp::Service<rosbag2_interfaces::srv::Pause>::SharedPtr srv_pause_;
rclcpp::Service<rosbag2_interfaces::srv::Resume>::SharedPtr srv_resume_;
Expand Down
105 changes: 79 additions & 26 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,25 +203,26 @@ bool Player::is_storage_completely_loaded() const

void Player::play()
{
is_in_play_ = true;
try {
do {
reader_->open(storage_options_, {"", rmw_get_serialization_format()});
const auto starting_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
reader_->get_metadata().starting_time.time_since_epoch()).count();
clock_->jump(starting_time);

storage_loading_future_ = std::async(
std::launch::async,
[this]() {load_storage_content();});

wait_for_filled_queue();

clock_->jump(starting_time);
play_messages_from_queue();
reader_->close();
} while (rclcpp::ok() && play_options_.loop);
} catch (std::runtime_error & e) {
RCLCPP_ERROR(this->get_logger(), "Failed to play: %s", e.what());
}
is_in_play_ = false;
}

void Player::pause()
Expand Down Expand Up @@ -254,6 +255,47 @@ bool Player::set_rate(double rate)
return clock_->set_rate(rate);
}

rosbag2_storage::SerializedBagMessageSharedPtr * Player::peek_next_message_from_queue()
{
rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = message_queue_.peek();
if (message_ptr == nullptr && !is_storage_completely_loaded() && rclcpp::ok()) {
RCLCPP_WARN(
this->get_logger(),
"Message queue starved. Messages will be delayed. Consider "
"increasing the --read-ahead-queue-size option.");
while (message_ptr == nullptr && !is_storage_completely_loaded() && rclcpp::ok()) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
message_ptr = message_queue_.peek();
}
}
return message_ptr;
}

bool Player::play_next()
{
// Temporary take over playback from play_messages_from_queue()
std::lock_guard<std::mutex> lk(skip_message_in_main_play_loop_mutex_);

if (!clock_->is_paused() || !is_in_play_) {
return false;
}

skip_message_in_main_play_loop_ = true;
rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = peek_next_message_from_queue();

bool next_message_published = false;
while (message_ptr != nullptr && !next_message_published) {
{
rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr;
next_message_published = publish_message(message);
clock_->jump(message->time_stamp);
}
message_queue_.pop();
message_ptr = peek_next_message_from_queue();
}
return next_message_published;
}

void Player::wait_for_filled_queue() const
{
while (
Expand Down Expand Up @@ -293,31 +335,32 @@ void Player::enqueue_up_to_boundary(uint64_t boundary)

void Player::play_messages_from_queue()
{
do {
play_messages_until_queue_empty();
if (!is_storage_completely_loaded() && rclcpp::ok()) {
RCLCPP_WARN(
this->get_logger(),
"Message queue starved. Messages will be delayed. Consider "
"increasing the --read-ahead-queue-size option.");
}
} while (!is_storage_completely_loaded() && rclcpp::ok());
}

void Player::play_messages_until_queue_empty()
{
rosbag2_storage::SerializedBagMessageSharedPtr message;
while (message_queue_.try_dequeue(message) && rclcpp::ok()) {
// Do not move on until sleep_until returns true
// It will always sleep, so this is not a tight busy loop on pause
while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) {}
if (rclcpp::ok()) {
auto publisher_iter = publishers_.find(message->topic_name);
if (publisher_iter != publishers_.end()) {
publisher_iter->second->publish(rclcpp::SerializedMessage(*message->serialized_data.get()));
playing_messages_from_queue_ = true;
// Note: We need to use message_queue_.peek() instead of message_queue_.try_dequeue(message)
// to support play_next() API logic.
rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = peek_next_message_from_queue();
while (message_ptr != nullptr && rclcpp::ok()) {
{
rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr;
// Do not move on until sleep_until returns true
// It will always sleep, so this is not a tight busy loop on pause
while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) {}
if (rclcpp::ok()) {
{
std::lock_guard<std::mutex> lk(skip_message_in_main_play_loop_mutex_);
if (skip_message_in_main_play_loop_) {
skip_message_in_main_play_loop_ = false;
message_ptr = peek_next_message_from_queue();
continue;
}
}
publish_message(message);
}
message_queue_.pop();
message_ptr = peek_next_message_from_queue();
}
}
playing_messages_from_queue_ = false;
}

void Player::prepare_publishers()
Expand Down Expand Up @@ -363,8 +406,7 @@ void Player::prepare_publishers()
try {
publishers_.insert(
std::make_pair(
topic.name, this->create_generic_publisher(
topic.name, topic.type, topic_qos)));
topic.name, this->create_generic_publisher(topic.name, topic.type, topic_qos)));
} catch (const std::runtime_error & e) {
// using a warning log seems better than adding a new option
// to ignore some unknown message type library
Expand All @@ -375,4 +417,15 @@ void Player::prepare_publishers()
}
}

bool Player::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message)
{
bool message_published = false;
auto publisher_iter = publishers_.find(message->topic_name);
if (publisher_iter != publishers_.end()) {
publisher_iter->second->publish(rclcpp::SerializedMessage(*message->serialized_data));
message_published = true;
}
return message_published;
}

} // namespace rosbag2_transport
Loading

0 comments on commit c488315

Please sign in to comment.