Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split Rosbag2Transport into Player and Recorder classes - first pass to enable further progress #721

Merged
merged 1 commit into from
Apr 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,19 @@ namespace rosbag2_py
class Player
{
public:
Player() = default;
virtual ~Player() = default;
Player()
{
rclcpp::init(0, nullptr);
}
virtual ~Player()
{
rclcpp::shutdown();
}

void play(
const rosbag2_storage::StorageOptions & storage_options,
PlayOptions & play_options)
{
auto writer = std::make_shared<rosbag2_cpp::Writer>(
std::make_unique<rosbag2_cpp::writers::SequentialWriter>());
std::shared_ptr<rosbag2_cpp::Reader> reader = nullptr;
// Determine whether to build compression or regular reader
{
Expand All @@ -116,18 +120,22 @@ class Player
}
}

rosbag2_transport::Rosbag2Transport impl(reader, writer);
impl.init();
rosbag2_transport::Player impl(reader);
impl.play(storage_options, play_options);
impl.shutdown();
}
};

class Recorder
{
public:
Recorder() = default;
virtual ~Recorder() = default;
Recorder()
{
rclcpp::init(0, nullptr);
}
virtual ~Recorder()
{
rclcpp::shutdown();
}

void record(
const rosbag2_storage::StorageOptions & storage_options,
Expand Down Expand Up @@ -160,10 +168,8 @@ class Recorder
std::make_unique<rosbag2_cpp::writers::SequentialWriter>());
}

rosbag2_transport::Rosbag2Transport impl(reader, writer);
impl.init();
rosbag2_transport::Recorder impl(writer);
impl.record(storage_options, record_options);
impl.shutdown();
}
};

Expand Down
60 changes: 30 additions & 30 deletions rosbag2_transport/include/rosbag2_transport/rosbag2_transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,44 @@ class Writer;
namespace rosbag2_transport
{

class Rosbag2Node;

class Rosbag2Transport
class Player
{
public:
/// Default constructor
ROSBAG2_TRANSPORT_PUBLIC
Rosbag2Transport();
Player();

/// Constructor for testing, allows to set the reader and writer to use
ROSBAG2_TRANSPORT_PUBLIC
Rosbag2Transport(
std::shared_ptr<rosbag2_cpp::Reader> reader,
std::shared_ptr<rosbag2_cpp::Writer> writer);
explicit Player(std::shared_ptr<rosbag2_cpp::Reader> reader);

ROSBAG2_TRANSPORT_PUBLIC
void init();
virtual ~Player();

/**
* Replay a bagfile.
*
* \param storage_options Option regarding the storage (e.g. bag file name)
* \param play_options Options regarding the playback (e.g. queue size)
*/
ROSBAG2_TRANSPORT_PUBLIC
void shutdown();
void play(
const rosbag2_storage::StorageOptions & storage_options,
const PlayOptions & play_options);

protected:
std::shared_ptr<rosbag2_cpp::Reader> reader_;
};

class Recorder
{
public:
ROSBAG2_TRANSPORT_PUBLIC
Recorder();

ROSBAG2_TRANSPORT_PUBLIC
explicit Recorder(std::shared_ptr<rosbag2_cpp::Writer> writer);

ROSBAG2_TRANSPORT_PUBLIC
virtual ~Recorder();

/**
* Records topics to a bagfile. Subscription happens at startup time, hence the topics must
Expand All @@ -68,26 +86,8 @@ class Rosbag2Transport
const rosbag2_storage::StorageOptions & storage_options,
const RecordOptions & record_options);

/**
* Replay all topics in a bagfile.
*
* \param storage_options Option regarding the storage (e.g. bag file name)
* \param play_options Options regarding the playback (e.g. queue size)
*/
ROSBAG2_TRANSPORT_PUBLIC
void play(
const rosbag2_storage::StorageOptions & storage_options,
const PlayOptions & play_options);

private:
std::shared_ptr<rclcpp::Node> setup_node(
std::string node_prefix = "",
const std::vector<std::string> & topic_remapping_options = {});

std::shared_ptr<rosbag2_cpp::Reader> reader_;
protected:
std::shared_ptr<rosbag2_cpp::Writer> writer_;

std::shared_ptr<rclcpp::Node> transport_node_;
};

} // namespace rosbag2_transport
Expand Down
3 changes: 3 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ rclcpp::QoS publisher_qos_for_topic(

namespace rosbag2_transport
{
namespace impl
{

const std::chrono::milliseconds
Player::queue_read_wait_period_ = std::chrono::milliseconds(100);
Expand Down Expand Up @@ -240,4 +242,5 @@ void Player::prepare_clock(const PlayOptions & options, rcutils_time_point_value
}
}

} // namespace impl
} // namespace rosbag2_transport
6 changes: 3 additions & 3 deletions rosbag2_transport/src/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ class Reader;

namespace rosbag2_transport
{

class GenericPublisher;
class Rosbag2Node;
namespace impl
{

class Player
{
Expand Down Expand Up @@ -76,6 +75,7 @@ class Player
std::shared_ptr<rclcpp::TimerBase> clock_publish_timer_;
};

} // namespace impl
} // namespace rosbag2_transport

#endif // ROSBAG2_TRANSPORT__PLAYER_HPP_
16 changes: 9 additions & 7 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@

namespace rosbag2_transport
{
namespace impl
{
Recorder::Recorder(
std::shared_ptr<rosbag2_cpp::Writer> writer,
std::shared_ptr<rclcpp::Node> transport_node)
Expand Down Expand Up @@ -162,12 +164,12 @@ void Recorder::subscribe_topics(
{
for (const auto & topic_with_type : topics_and_types) {
subscribe_topic(
{
topic_with_type.first,
topic_with_type.second,
serialization_format_,
serialized_offered_qos_profiles_for_topic(topic_with_type.first)
});
{
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why, but uncrustify started complaining about this in my local env

topic_with_type.first,
topic_with_type.second,
serialization_format_,
serialized_offered_qos_profiles_for_topic(topic_with_type.first)
});
}
}

Expand Down Expand Up @@ -298,5 +300,5 @@ void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_na
}
}
}

} // namespace impl
} // namespace rosbag2_transport
6 changes: 3 additions & 3 deletions rosbag2_transport/src/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ class Writer;

namespace rosbag2_transport
{

class GenericSubscription;
class Rosbag2Node;
namespace impl
{

class Recorder
{
Expand Down Expand Up @@ -106,6 +105,7 @@ class Recorder
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
};

} // namespace impl
} // namespace rosbag2_transport

#endif // ROSBAG2_TRANSPORT__RECORDER_HPP_
68 changes: 35 additions & 33 deletions rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,45 @@
#include "player.hpp"
#include "recorder.hpp"

namespace
{
std::shared_ptr<rclcpp::Node> setup_node(
std::string node_prefix = "",
const std::vector<std::string> & topic_remapping_options = {})
{
auto node_options = rclcpp::NodeOptions().arguments(topic_remapping_options);
return std::make_shared<rclcpp::Node>(node_prefix + "_rosbag2", node_options);
}
} // namespace

namespace rosbag2_transport
{

Rosbag2Transport::Rosbag2Transport()
: reader_(std::make_shared<rosbag2_cpp::Reader>(
std::make_unique<rosbag2_cpp::readers::SequentialReader>())),
writer_(std::make_shared<rosbag2_cpp::Writer>(
std::make_unique<rosbag2_cpp::writers::SequentialWriter>()))
Player::Player(std::shared_ptr<rosbag2_cpp::Reader> reader)
: reader_(std::move(reader))
{}

Rosbag2Transport::Rosbag2Transport(
std::shared_ptr<rosbag2_cpp::Reader> reader,
std::shared_ptr<rosbag2_cpp::Writer> writer)
: reader_(std::move(reader)), writer_(std::move(writer)) {}
Player::Player()
: Player(std::make_shared<rosbag2_cpp::Reader>(
std::make_unique<rosbag2_cpp::readers::SequentialReader>()))
{}

void Rosbag2Transport::init()
{
rclcpp::init(0, nullptr);
}
Player::~Player()
{}

void Rosbag2Transport::shutdown()
{
rclcpp::shutdown();
}
Recorder::Recorder(std::shared_ptr<rosbag2_cpp::Writer> writer)
: writer_(std::move(writer))
{}

Recorder::Recorder()
: Recorder(std::make_shared<rosbag2_cpp::Writer>(
std::make_unique<rosbag2_cpp::writers::SequentialWriter>()))
{}

Recorder::~Recorder()
{}

void Rosbag2Transport::record(
void Recorder::record(
const rosbag2_storage::StorageOptions & storage_options, const RecordOptions & record_options)
{
try {
Expand All @@ -69,25 +82,14 @@ void Rosbag2Transport::record(

auto transport_node = setup_node(record_options.node_prefix);

Recorder recorder(writer_, transport_node);
impl::Recorder recorder(writer_, transport_node);
recorder.record(record_options);
} catch (std::runtime_error & e) {
RCLCPP_ERROR(rclcpp::get_logger("rosbag2_transport"), "Failed to record: %s", e.what());
}
}

std::shared_ptr<rclcpp::Node> Rosbag2Transport::setup_node(
std::string node_prefix,
const std::vector<std::string> & topic_remapping_options)
{
if (!transport_node_) {
auto node_options = rclcpp::NodeOptions().arguments(topic_remapping_options);
transport_node_ = std::make_shared<rclcpp::Node>(node_prefix + "_rosbag2", node_options);
}
return transport_node_;
}

void Rosbag2Transport::play(
void Player::play(
const rosbag2_storage::StorageOptions & storage_options, const PlayOptions & play_options)
{
auto transport_node =
Expand All @@ -104,13 +106,13 @@ void Rosbag2Transport::play(
spin_thread.join();
});
try {
Player player(reader_, transport_node);
impl::Player player(reader_, transport_node);
do {
reader_->open(storage_options, {"", rmw_get_serialization_format()});
player.play(play_options);
} while (rclcpp::ok() && play_options.loop);
} catch (std::runtime_error & e) {
RCLCPP_ERROR(transport_node_->get_logger(), "Failed to play: %s", e.what());
RCLCPP_ERROR(transport_node->get_logger(), "Failed to play: %s", e.what());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture
// the future object returned from std::async needs to be stored not to block the execution
future_ = std::async(
std::launch::async, [this, options]() {
rosbag2_transport::Rosbag2Transport rosbag2_transport(reader_, writer_);
rosbag2_transport.record(storage_options_, options);
rosbag2_transport::Recorder recorder(writer_);
recorder.record(storage_options_, options);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,6 @@
using namespace ::testing; // NOLINT
using namespace rosbag2_test_common; // NOLINT

inline char separator()
{
#ifdef _WIN32
return '\\';
#else
return '/';
#endif
}

class Rosbag2TransportTestFixture : public Test
{
public:
Expand Down
Loading