Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SplitBagfile recording service. #1115

Merged
merged 4 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ class ROSBAG2_CPP_PUBLIC Writer final
*/
bool take_snapshot();

/**
* Close the current bagfile and opens the next bagfile.
*/
void split_bagfile();

/**
* Remove a new topic in the underlying storage.
* If creation of subscription fails remove the topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ class ROSBAG2_CPP_PUBLIC BaseWriterInterface
*/
virtual bool take_snapshot() = 0;

/**
* Close the current bagfile and opens the next bagfile.
*/
virtual void split_bagfile() = 0;

virtual void add_event_callbacks(const bag_events::WriterEventCallbacks & callbacks) = 0;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
*/
void add_event_callbacks(const bag_events::WriterEventCallbacks & callbacks) override;

/**
* \brief Closes the current backed storage and opens the next bagfile.
*/
void split_bagfile() override;

protected:
std::string base_folder_;
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_;
Expand All @@ -146,9 +151,6 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter

rosbag2_storage::BagMetadata metadata_;

// Closes the current backed storage and opens the next bagfile.
virtual void split_bagfile();

// Checks if the current recording bagfile needs to be split and rolled over to a new file.
bool should_split_bagfile(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time) const;
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ bool Writer::take_snapshot()
return writer_impl_->take_snapshot();
}

void Writer::split_bagfile()
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
return writer_impl_->split_bagfile();
}

void Writer::write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message)
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void write_sample_split_bag(
{
std::string topic_name = "testtopic";

ManualSplitSequentialWriter writer{};
rosbag2_cpp::writers::SequentialWriter writer{};
writer.open(storage_options, rosbag2_cpp::ConverterOptions{});
writer.create_topic(
{
Expand Down
6 changes: 0 additions & 6 deletions rosbag2_cpp/test/rosbag2_cpp/fake_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@

#include "rosbag2_cpp/writers/sequential_writer.hpp"

class ManualSplitSequentialWriter : public rosbag2_cpp::writers::SequentialWriter
{
public:
using rosbag2_cpp::writers::SequentialWriter::split_bagfile;
};

// Write vector of <timestamp, uint32_data_value> pairs to bag files, splitting every N messages
void write_sample_split_bag(
const rosbag2_storage::StorageOptions & storage_options,
Expand Down
1 change: 1 addition & 0 deletions rosbag2_interfaces/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ rosidl_generate_interfaces(${PROJECT_NAME}
"srv/Seek.srv"
"srv/SetRate.srv"
"srv/Snapshot.srv"
"srv/SplitBagfile.srv"
"srv/Stop.srv"
"srv/TogglePaused.srv"
DEPENDENCIES builtin_interfaces
Expand Down
1 change: 1 addition & 0 deletions rosbag2_interfaces/srv/SplitBagfile.srv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
---
2 changes: 2 additions & 0 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ function(create_tests_for_rmw_implementation)
LINK_LIBS rosbag2_transport
AMENT_DEPS test_msgs rosbag2_test_common)

ament_add_test_label(test_record_services${target_suffix} xfail)

if(${rmw_implementation} MATCHES "rmw_cyclonedds(.*)")
ament_add_test_label(test_play_services__rmw_cyclonedds_cpp xfail)
endif()
Expand Down
2 changes: 2 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "rosbag2_cpp/writer.hpp"

#include "rosbag2_interfaces/srv/snapshot.hpp"
#include "rosbag2_interfaces/srv/split_bagfile.hpp"

#include "rosbag2_interfaces/msg/write_split_event.hpp"

Expand Down Expand Up @@ -158,6 +159,7 @@ class Recorder : public rclcpp::Node
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unordered_set<std::string> topic_unknown_types_;
rclcpp::Service<rosbag2_interfaces::srv::Snapshot>::SharedPtr srv_snapshot_;
rclcpp::Service<rosbag2_interfaces::srv::SplitBagfile>::SharedPtr srv_split_bagfile_;
std::atomic<bool> paused_ = false;
// Keyboard handler
std::shared_ptr<KeyboardHandler> keyboard_handler_;
Expand Down
10 changes: 10 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,16 @@ void Recorder::record()
});
}

srv_split_bagfile_ = create_service<rosbag2_interfaces::srv::SplitBagfile>(
"~/split_bagfile",
[this](
const std::shared_ptr<rmw_request_id_t>/* request_header */,
const std::shared_ptr<rosbag2_interfaces::srv::SplitBagfile::Request>/* request */,
const std::shared_ptr<rosbag2_interfaces::srv::SplitBagfile::Response>/* response */)
{
writer_->split_bagfile();
});

// Start the thread that will publish events
event_publisher_thread_ = std::thread(&Recorder::event_publisher_thread_main, this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,22 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn
return true;
}

void split_bagfile() override
{
auto info = std::make_shared<rosbag2_cpp::bag_events::BagSplitInfo>();
info->closed_file = "BagFile" + std::to_string(file_number_);
file_number_ += 1;
info->opened_file = "BagFile" + std::to_string(file_number_);
callback_manager_.execute_callbacks(rosbag2_cpp::bag_events::BagEvent::WRITE_SPLIT, info);
rshanor marked this conversation as resolved.
Show resolved Hide resolved
messages_per_file_ = 0;
split_bagfile_called_ = true;
}

bool split_bagfile_called()
{
return split_bagfile_called_;
}

void
add_event_callbacks(const rosbag2_cpp::bag_events::WriterEventCallbacks & callbacks) override
{
Expand Down Expand Up @@ -119,6 +135,7 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn
rosbag2_cpp::bag_events::EventCallbackManager callback_manager_;
size_t file_number_ = 0;
const size_t max_messages_per_file_ = 5;
bool split_bagfile_called_ = false;
};

#endif // ROSBAG2_TRANSPORT__MOCK_SEQUENTIAL_WRITER_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "rclcpp/rclcpp.hpp"

#include "rosbag2_interfaces/srv/snapshot.hpp"
#include "rosbag2_interfaces/srv/split_bagfile.hpp"
#include "rosbag2_transport/recorder.hpp"

#include "rosbag2_test_common/publication_manager.hpp"
Expand All @@ -40,9 +41,11 @@ class RecordSrvsTest : public RecordIntegrationTestFixture
{
public:
using Snapshot = rosbag2_interfaces::srv::Snapshot;
using SplitBagfile = rosbag2_interfaces::srv::SplitBagfile;

RecordSrvsTest()
: RecordIntegrationTestFixture()
explicit RecordSrvsTest(const bool snapshot_mode)
: RecordIntegrationTestFixture(),
snapshot_mode_(snapshot_mode)
{}

~RecordSrvsTest() override
Expand All @@ -68,7 +71,7 @@ class RecordSrvsTest : public RecordIntegrationTestFixture

rosbag2_transport::RecordOptions record_options =
{false, false, {test_topic_}, "rmw_format", 100ms};
storage_options_.snapshot_mode = true;
storage_options_.snapshot_mode = snapshot_mode_;
storage_options_.max_cache_size = 200;
recorder_ = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options, recorder_name_);
Expand All @@ -80,6 +83,7 @@ class RecordSrvsTest : public RecordIntegrationTestFixture

const std::string ns = "/" + recorder_name_;
cli_snapshot_ = client_node_->create_client<Snapshot>(ns + "/snapshot");
cli_split_bagfile_ = client_node_->create_client<SplitBagfile>(ns + "/split_bagfile");

exec_ = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();

Expand All @@ -94,7 +98,10 @@ class RecordSrvsTest : public RecordIntegrationTestFixture
pub_manager.run_publishers();

// Make sure expected service is present before starting test
ASSERT_TRUE(cli_snapshot_->wait_for_service(service_wait_timeout_));
if (snapshot_mode_) {
ASSERT_TRUE(cli_snapshot_->wait_for_service(service_wait_timeout_));
}
ASSERT_TRUE(cli_split_bagfile_->wait_for_service(service_wait_timeout_));
}

/// Send a service request, and expect it to successfully return within a reasonable timeout
Expand Down Expand Up @@ -135,9 +142,19 @@ class RecordSrvsTest : public RecordIntegrationTestFixture
// Service clients
rclcpp::Node::SharedPtr client_node_;
rclcpp::Client<Snapshot>::SharedPtr cli_snapshot_;
rclcpp::Client<SplitBagfile>::SharedPtr cli_split_bagfile_;

bool snapshot_mode_;
};

class RecordSrvsSnapshotTest : public RecordSrvsTest
{
protected:
RecordSrvsSnapshotTest()
: RecordSrvsTest(true /*snapshot_mode*/) {}
};

TEST_F(RecordSrvsTest, trigger_snapshot)
TEST_F(RecordSrvsSnapshotTest, trigger_snapshot)
{
auto & writer = recorder_->get_writer_handle();
MockSequentialWriter & mock_writer =
Expand All @@ -152,3 +169,20 @@ TEST_F(RecordSrvsTest, trigger_snapshot)
successful_service_request<Snapshot>(cli_snapshot_);
EXPECT_THAT(mock_writer.get_messages().size(), Ne(0u));
}

class RecordSrvsSplitBagfileTest : public RecordSrvsTest
{
protected:
RecordSrvsSplitBagfileTest()
: RecordSrvsTest(false /*snapshot_mode*/) {}
};

TEST_F(RecordSrvsSplitBagfileTest, split_bagfile)
{
auto & writer = recorder_->get_writer_handle();
MockSequentialWriter & mock_writer =
static_cast<MockSequentialWriter &>(writer.get_implementation_handle());
EXPECT_FALSE(mock_writer.split_bagfile_called());
successful_service_request<SplitBagfile>(cli_split_bagfile_);
EXPECT_TRUE(mock_writer.split_bagfile_called());
}