Skip to content

Commit

Permalink
Add BagSplitInfo service call on bag close (#1422) (#1636)
Browse files Browse the repository at this point in the history
- Note: The `BagSplitInfo::opened_file` will have empty string to
indicate that it was "bag close" and not bag split event.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
(cherry picked from commit ba199d0)

Co-authored-by: Michael Orlov <michael.orlov@apex.ai>
  • Loading branch information
mergify[bot] and MichaelOrlov authored May 6, 2024
1 parent 7f0dd71 commit 809ca5a
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 1 deletion.
8 changes: 8 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/bag_events.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ class EventCallbackManager
return false;
}

/**
* \brief Delete all callbacks
*/
void delete_all_callbacks()
{
callbacks_.clear();
}

/**
* \brief Execute all callbacks registered for the given event.
*
Expand Down
12 changes: 11 additions & 1 deletion rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ SequentialWriter::SequentialWriter(

SequentialWriter::~SequentialWriter()
{
// Deleting all callbacks before calling close(). Calling callbacks from destructor is not safe.
// Callbacks likely was created after SequentialWriter object and may point to the already
// destructed objects.
callback_manager_.delete_all_callbacks();
close();
}

Expand Down Expand Up @@ -166,7 +170,13 @@ void SequentialWriter::close()
metadata_io_->write_metadata(base_folder_, metadata_);
}

storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory
if (storage_) {
auto info = std::make_shared<bag_events::BagSplitInfo>();
info->closed_file = storage_->get_relative_file_path();
storage_.reset(); // Destroy storage before calling WRITE_SPLIT callback to make sure that
// bag file was closed before callback call.
callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info);
}
storage_factory_.reset();
}

Expand Down
60 changes: 60 additions & 0 deletions rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,66 @@ TEST_F(SequentialWriterTest, split_event_calls_callback)
EXPECT_EQ(opened_file, fake_storage_uri_);
}

TEST_F(SequentialWriterTest, split_event_calls_on_writer_close)
{
const int message_count = 7;

ON_CALL(
*storage_,
write(An<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>())).WillByDefault(
[this](std::shared_ptr<const rosbag2_storage::SerializedBagMessage>) {
fake_storage_size_ += 1;
});

ON_CALL(*storage_, get_bagfile_size).WillByDefault(
[this]() {
return fake_storage_size_.load();
});

ON_CALL(*metadata_io_, write_metadata).WillByDefault(
[this](const std::string &, const rosbag2_storage::BagMetadata & metadata) {
fake_metadata_ = metadata;
});

ON_CALL(*storage_, get_relative_file_path).WillByDefault(
[this]() {
return fake_storage_uri_;
});

auto sequential_writer = std::make_unique<rosbag2_cpp::writers::SequentialWriter>(
std::move(storage_factory_), converter_factory_, std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";

storage_options_.max_bagfile_size = 0;

bool callback_called = false;
std::string closed_file, opened_file;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&callback_called, &closed_file, &opened_file](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_file = info.closed_file;
opened_file = info.opened_file;
callback_called = true;
};
writer_->add_event_callbacks(callbacks);

writer_->open(storage_options_, {"rmw_format", "rmw_format"});
writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", "", ""});

for (auto i = 0; i < message_count; ++i) {
writer_->write(message);
}
writer_->close();

ASSERT_TRUE(callback_called);
auto expected_closed = rcpputils::fs::path(storage_options_.uri) / (storage_options_.uri + "_0");
EXPECT_EQ(closed_file, expected_closed.string());
EXPECT_TRUE(opened_file.empty());
}

TEST_P(ParametrizedTemporaryDirectoryFixture, split_bag_metadata_has_full_duration) {
const std::vector<std::pair<rcutils_time_point_value_t, uint32_t>> fake_messages {
{100, 1},
Expand Down

0 comments on commit 809ca5a

Please sign in to comment.