Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

turn player into a node #723

Merged
merged 1 commit into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 52 additions & 27 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,37 @@ namespace rosbag2_transport
namespace impl
{

const std::chrono::milliseconds
Player::queue_read_wait_period_ = std::chrono::milliseconds(100);
Player::Player(const std::string & node_name, const rclcpp::NodeOptions & node_options)
: rclcpp::Node(node_name, node_options)
{
// TODO(karsten1987): Use this constructor later with parameter parsing.
// The reader, storage_options as well as play_options can be loaded via parameter.
// That way, the player can be used as a simple component in a component manager.
throw rclcpp::exceptions::UnimplementedError();
}

Player::Player(
std::shared_ptr<rosbag2_cpp::Reader> reader, std::shared_ptr<rclcpp::Node> transport_node)
: reader_(std::move(reader)), transport_node_(transport_node)
std::shared_ptr<rosbag2_cpp::Reader> reader,
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_transport::PlayOptions & play_options,
const std::string & node_name,
const rclcpp::NodeOptions & node_options)
: rclcpp::Node(
node_name,
rclcpp::NodeOptions(node_options).arguments(play_options.topic_remapping_options)),
reader_(reader),
storage_options_(storage_options),
play_options_(play_options)
{}

Player::~Player()
{
// reader_->reset();
}

const std::chrono::milliseconds
Player::queue_read_wait_period_ = std::chrono::milliseconds(100);

bool Player::is_storage_completely_loaded() const
{
if (storage_loading_future_.valid() &&
Expand All @@ -94,46 +117,48 @@ bool Player::is_storage_completely_loaded() const
return !storage_loading_future_.valid();
}

void Player::play(const PlayOptions & options)
void Player::play()
{
// TODO(karsten1987): Expose `Reader::reset` in rosbag2_cpp
reader_->open(storage_options_, {"", rmw_get_serialization_format()});
if (reader_->has_next()) {
// Reader does not have "peek", so we must "pop" the first message to see its timestamp
auto message = reader_->read_next();
prepare_clock(options, message->time_stamp);
prepare_clock(message->time_stamp);
// Make sure that first message gets played by putting it into the play queue
message_queue_.enqueue(message);
} else {
// The bag contains no messages - there is nothing to play
return;
}

topic_qos_profile_overrides_ = options.topic_qos_profile_overrides;
prepare_publishers(options);
topic_qos_profile_overrides_ = play_options_.topic_qos_profile_overrides;
prepare_publishers();

storage_loading_future_ = std::async(
std::launch::async,
[this, options]() {load_storage_content(options);});
[this]() {load_storage_content();});

wait_for_filled_queue(options);
wait_for_filled_queue();

play_messages_from_queue();
}

void Player::wait_for_filled_queue(const PlayOptions & options) const
void Player::wait_for_filled_queue() const
{
while (
message_queue_.size_approx() < options.read_ahead_queue_size &&
message_queue_.size_approx() < play_options_.read_ahead_queue_size &&
!is_storage_completely_loaded() && rclcpp::ok())
{
std::this_thread::sleep_for(queue_read_wait_period_);
}
}

void Player::load_storage_content(const PlayOptions & options)
void Player::load_storage_content()
{
auto queue_lower_boundary =
static_cast<size_t>(options.read_ahead_queue_size * read_ahead_lower_bound_percentage_);
auto queue_upper_boundary = options.read_ahead_queue_size;
static_cast<size_t>(play_options_.read_ahead_queue_size * read_ahead_lower_bound_percentage_);
auto queue_upper_boundary = play_options_.read_ahead_queue_size;

while (reader_->has_next() && rclcpp::ok()) {
if (message_queue_.size_approx() < queue_lower_boundary) {
Expand Down Expand Up @@ -162,7 +187,7 @@ void Player::play_messages_from_queue()
play_messages_until_queue_empty();
if (!is_storage_completely_loaded() && rclcpp::ok()) {
RCLCPP_WARN(
transport_node_->get_logger(),
this->get_logger(),
"Message queue starved. Messages will be delayed. Consider "
"increasing the --read-ahead-queue-size option.");
}
Expand All @@ -185,10 +210,10 @@ void Player::play_messages_until_queue_empty()
}
}

void Player::prepare_publishers(const PlayOptions & options)
void Player::prepare_publishers()
{
rosbag2_storage::StorageFilter storage_filter;
storage_filter.topics = options.topics_to_filter;
storage_filter.topics = play_options_.topics_to_filter;
reader_->set_filter(storage_filter);

auto topics = reader_->get_all_topics_and_types();
Expand All @@ -204,36 +229,36 @@ void Player::prepare_publishers(const PlayOptions & options)

auto topic_qos = publisher_qos_for_topic(
topic, topic_qos_profile_overrides_,
transport_node_->get_logger());
this->get_logger());
try {
publishers_.insert(
std::make_pair(
topic.name, transport_node_->create_generic_publisher(
topic.name, this->create_generic_publisher(
topic.name, topic.type, topic_qos)));
} catch (const std::runtime_error & e) {
// using a warning log seems better than adding a new option
// to ignore some unknown message type library
RCLCPP_WARN(
transport_node_->get_logger(),
this->get_logger(),
"Ignoring a topic '%s', reason: %s.", topic.name.c_str(), e.what());
}
}
}

void Player::prepare_clock(const PlayOptions & options, rcutils_time_point_value_t starting_time)
void Player::prepare_clock(rcutils_time_point_value_t starting_time)
{
double rate = options.rate > 0.0 ? options.rate : 1.0;
double rate = play_options_.rate > 0.0 ? play_options_.rate : 1.0;
clock_ = std::make_unique<rosbag2_cpp::TimeControllerClock>(starting_time, rate);

// Create /clock publisher
if (options.clock_publish_frequency > 0.f) {
if (play_options_.clock_publish_frequency > 0.f) {
const auto publish_period = std::chrono::nanoseconds(
static_cast<uint64_t>(RCUTILS_S_TO_NS(1) / options.clock_publish_frequency));
static_cast<uint64_t>(RCUTILS_S_TO_NS(1) / play_options_.clock_publish_frequency));
// NOTE: PlayerClock does not own this publisher because rosbag2_cpp
// should not own transport-based functionality
clock_publisher_ = transport_node_->create_publisher<rosgraph_msgs::msg::Clock>(
clock_publisher_ = this->create_publisher<rosgraph_msgs::msg::Clock>(
"/clock", rclcpp::ClockQoS());
clock_publish_timer_ = transport_node_->create_wall_timer(
clock_publish_timer_ = this->create_wall_timer(
publish_period, [this]() {
auto msg = rosgraph_msgs::msg::Clock();
msg.clock = rclcpp::Time(clock_->now());
Expand Down
27 changes: 19 additions & 8 deletions rosbag2_transport/src/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "rosbag2_cpp/clocks/player_clock.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"
#include "rosbag2_storage/storage_options.hpp"
#include "rosbag2_transport/play_options.hpp"
#include "rosgraph_msgs/msg/clock.hpp"

Expand All @@ -43,31 +44,41 @@ namespace rosbag2_transport
namespace impl
{

class Player
class Player : public rclcpp::Node
{
public:
explicit Player(
const std::string & node_name = "rosbag2_player",
const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions());

Player(
std::shared_ptr<rosbag2_cpp::Reader> reader,
std::shared_ptr<rclcpp::Node> transport_node);
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_transport::PlayOptions & play_options,
const std::string & node_name = "rosbag2_player",
const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions());

virtual ~Player();

void play(const PlayOptions & options);
void play();

private:
void load_storage_content(const PlayOptions & options);
void load_storage_content();
bool is_storage_completely_loaded() const;
void enqueue_up_to_boundary(uint64_t boundary);
void wait_for_filled_queue(const PlayOptions & options) const;
void wait_for_filled_queue() const;
void play_messages_from_queue();
void play_messages_until_queue_empty();
void prepare_publishers(const PlayOptions & options);
void prepare_clock(const PlayOptions & options, rcutils_time_point_value_t starting_time);
void prepare_publishers();
void prepare_clock(rcutils_time_point_value_t starting_time);
static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
static const std::chrono::milliseconds queue_read_wait_period_;

std::shared_ptr<rosbag2_cpp::Reader> reader_;
rosbag2_storage::StorageOptions storage_options_;
rosbag2_transport::PlayOptions play_options_;
moodycamel::ReaderWriterQueue<rosbag2_storage::SerializedBagMessageSharedPtr> message_queue_;
mutable std::future<void> storage_loading_future_;
std::shared_ptr<rclcpp::Node> transport_node_;
std::unordered_map<std::string, std::shared_ptr<rclcpp::GenericPublisher>> publishers_;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unique_ptr<rosbag2_cpp::PlayerClock> clock_;
Expand Down
51 changes: 24 additions & 27 deletions rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,30 @@ Player::Player()
Player::~Player()
{}

void Player::play(
const rosbag2_storage::StorageOptions & storage_options, const PlayOptions & play_options)
{
auto player = std::make_shared<impl::Player>(reader_, storage_options, play_options);
rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(player);
auto spin_thread = std::thread(
[&exec]() {
exec.spin();
});
auto exit = rcpputils::scope_exit(
[&exec, &spin_thread]() {
exec.cancel();
spin_thread.join();
});
try {
do {
player->play();
} while (rclcpp::ok() && play_options.loop);
} catch (std::runtime_error & e) {
RCLCPP_ERROR(player->get_logger(), "Failed to play: %s", e.what());
}
}

Recorder::Recorder(std::shared_ptr<rosbag2_cpp::Writer> writer)
: writer_(std::move(writer))
{}
Expand Down Expand Up @@ -89,31 +113,4 @@ void Recorder::record(
}
}

void Player::play(
const rosbag2_storage::StorageOptions & storage_options, const PlayOptions & play_options)
{
auto transport_node =
setup_node(play_options.node_prefix, play_options.topic_remapping_options);
rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(transport_node);
auto spin_thread = std::thread(
[&exec]() {
exec.spin();
});
auto exit = rcpputils::scope_exit(
[&exec, &spin_thread]() {
exec.cancel();
spin_thread.join();
});
try {
impl::Player player(reader_, transport_node);
do {
reader_->open(storage_options, {"", rmw_get_serialization_format()});
player.play(play_options);
} while (rclcpp::ok() && play_options.loop);
} catch (std::runtime_error & e) {
RCLCPP_ERROR(transport_node->get_logger(), "Failed to play: %s", e.what());
}
}

} // namespace rosbag2_transport