Skip to content

Commit

Permalink
Bugfix for writer not being able to open again after closing (#1599)
Browse files Browse the repository at this point in the history
* re-applies fixes from #1590 to rolling. Also removes new message definition in sequential writer test for multiple open operations. Also clears topic_names_to_message_definitions_ and handles message_definitions_s underlying container similarly. Lastly, also avoids reset of factory in the compression writer, adds unit test there too.

Signed-off-by: Yannick Schulz <yschulz854@gmail.com>

* removes unused compressor_ member from compresser writer class. Also delegates rest of the closing behavior to the base class in close method, as it is handled in the open and write methods of the compression writer

Signed-off-by: Yannick Schulz <yschulz854@gmail.com>

* Remove unrelated delta

- message_definitions_ was intentionally allocated on the stack and
should persist between writer close() and open() because it represents
cache for message definitions which is not changes.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Don't call virtual methods from destructors

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Cleanup 'rosbag2_directory_next' after the test run

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Protect Writer::open(..) and Writer::close() with mutex on upper level

- Rationale: To avoid race conditions if open(..) and close() could be
ever be called from different threads.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Bugfix for WRITE_SPLIT callback not called for the last compressed file

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Bugfix for lost messages from cache when closing compression writer

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Use dedicated is_open_ flag instead of relying on storage_

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

---------

Signed-off-by: Yannick Schulz <yschulz854@gmail.com>
Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
Co-authored-by: Michael Orlov <michael.orlov@apex.ai>
(cherry picked from commit 20cfc02)
  • Loading branch information
yschulz authored and mergify[bot] committed May 6, 2024
1 parent 10ee171 commit 24fedb4
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter

private:
std::unique_ptr<rosbag2_compression::CompressionFactory> compression_factory_{};
std::shared_ptr<rosbag2_compression::BaseCompressorInterface> compressor_{};
std::mutex compressor_queue_mutex_;
std::queue<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>
compressor_message_queue_ RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_);
Expand All @@ -198,6 +197,7 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
rosbag2_compression::CompressionOptions compression_options_{};

bool should_compress_last_file_{true};
std::atomic_bool is_open_{false};

// Runs a while loop that pulls data from the compression queue until
// compression_is_running_ is false; should be run in a separate thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ SequentialCompressionWriter::SequentialCompressionWriter(

SequentialCompressionWriter::~SequentialCompressionWriter()
{
close();
SequentialCompressionWriter::close();
}

void SequentialCompressionWriter::compression_thread_fn()
Expand Down Expand Up @@ -225,13 +225,22 @@ void SequentialCompressionWriter::open(
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_cpp::ConverterOptions & converter_options)
{
// Note. close and open methods protected with mutex on upper rosbag2_cpp::writer level.
if (this->is_open_) {
return; // The writer already opened.
}
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
SequentialWriter::open(storage_options, converter_options);
setup_compression();
this->is_open_ = true;
}

void SequentialCompressionWriter::close()
{
// Note. close and open methods protected with mutex on upper rosbag2_cpp::writer level.
if (!this->is_open_.exchange(false)) {
return; // The writer is not open
}
if (!base_folder_.empty()) {
// Reset may be called before initializing the compressor (ex. bad options).
// We compress the last file only if it hasn't been compressed earlier (ex. in split_bagfile()).
Expand All @@ -241,7 +250,18 @@ void SequentialCompressionWriter::close()
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
std::lock_guard<std::mutex> compressor_lock(compressor_queue_mutex_);
try {
storage_.reset(); // Storage must be closed before it can be compressed.
// Storage must be closed before file can be compressed.
if (use_cache_) {
// destructor will flush message cache
cache_consumer_.reset();
message_cache_.reset();
}
finalize_metadata();
if (storage_) {
storage_->update_metadata(metadata_);
storage_.reset(); // Storage will be closed in storage_ destructor
}

if (!metadata_.relative_file_paths.empty()) {
std::string file = metadata_.relative_file_paths.back();
compressor_file_queue_.push(file);
Expand All @@ -251,22 +271,10 @@ void SequentialCompressionWriter::close()
ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Could not compress the last bag file.\n" << e.what());
}
}

stop_compressor_threads();

finalize_metadata();
if (storage_) {
storage_->update_metadata(metadata_);
}
metadata_io_->write_metadata(base_folder_, metadata_);
}

if (use_cache_) {
cache_consumer_.reset();
message_cache_.reset();
}
storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory
storage_factory_.reset();
stop_compressor_threads(); // Note: The metadata_.relative_file_paths will be updated with
// compressed filename when compressor threads will finish.
SequentialWriter::close();
}

void SequentialCompressionWriter::create_topic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,31 @@ TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_f
writer_->open(tmp_dir_storage_options_, {serialization_format_, serialization_format_}));
}

TEST_F(SequentialCompressionWriterTest, open_succeeds_twice)
{
rosbag2_compression::CompressionOptions compression_options{
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority};
initializeWriter(compression_options);

auto tmp_dir = fs::temp_directory_path() / "path_not_empty";
auto tmp_dir_next = fs::temp_directory_path() / "path_not_empty_next";

auto storage_options = rosbag2_storage::StorageOptions();
auto storage_options_next = rosbag2_storage::StorageOptions();

storage_options.uri = tmp_dir.string();
storage_options_next.uri = tmp_dir_next.string();

EXPECT_NO_THROW(
writer_->open(storage_options, {serialization_format_, serialization_format_}));

writer_->close();
EXPECT_NO_THROW(
writer_->open(storage_options_next, {serialization_format_, serialization_format_}));
}

TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor)
{
rosbag2_compression::CompressionOptions compression_options{
Expand Down
4 changes: 4 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter

// Used to track topic -> message count. If cache is present, it is updated by CacheConsumer
std::unordered_map<std::string, rosbag2_storage::TopicInformation> topics_names_to_info_;
// Note: topics_names_to_info_ needs to be protected with mutex only when we are explicitly
// adding or deleting items (create_topic(..)/remove_topic(..)) and when we access it from
// CacheConsumer callback i.e., write_messages(..)
std::mutex topics_info_mutex_;

LocalMessageDefinitionSource message_definitions_;
Expand Down Expand Up @@ -197,6 +200,7 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
void write_messages(
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & messages);
bool is_first_message_ {true};
std::atomic_bool is_open_{false};

bag_events::EventCallbackManager callback_manager_;
};
Expand Down
2 changes: 2 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ void Writer::open(
const rosbag2_storage::StorageOptions & storage_options,
const ConverterOptions & converter_options)
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
writer_impl_->open(storage_options, converter_options);
}

void Writer::close()
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
writer_impl_->close();
}

Expand Down
38 changes: 28 additions & 10 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ SequentialWriter::SequentialWriter(
metadata_io_(std::move(metadata_io)),
converter_(nullptr),
topics_names_to_info_(),
message_definitions_(),
topic_names_to_message_definitions_(),
metadata_()
{}

Expand All @@ -69,7 +69,7 @@ SequentialWriter::~SequentialWriter()
// Callbacks likely was created after SequentialWriter object and may point to the already
// destructed objects.
callback_manager_.delete_all_callbacks();
close();
SequentialWriter::close();
}

void SequentialWriter::init_metadata()
Expand Down Expand Up @@ -97,8 +97,13 @@ void SequentialWriter::open(
const rosbag2_storage::StorageOptions & storage_options,
const ConverterOptions & converter_options)
{
// Note. close and open methods protected with mutex on upper rosbag2_cpp::writer level.
if (is_open_) {
return; // The writer already opened
}
base_folder_ = storage_options.uri;
storage_options_ = storage_options;

if (storage_options_.storage_id.empty()) {
storage_options_.storage_id = rosbag2_storage::get_default_storage_id();
}
Expand Down Expand Up @@ -161,10 +166,15 @@ void SequentialWriter::open(

init_metadata();
storage_->update_metadata(metadata_);
is_open_ = true;
}

void SequentialWriter::close()
{
// Note. close and open methods protected with mutex on upper rosbag2_cpp::writer level.
if (!is_open_.exchange(false)) {
return; // The writer is not open
}
if (use_cache_) {
// destructor will flush message cache
cache_consumer_.reset();
Expand All @@ -180,13 +190,22 @@ void SequentialWriter::close()
}

if (storage_) {
auto info = std::make_shared<bag_events::BagSplitInfo>();
info->closed_file = storage_->get_relative_file_path();
storage_.reset(); // Destroy storage before calling WRITE_SPLIT callback to make sure that
// bag file was closed before callback call.
}
if (!metadata_.relative_file_paths.empty()) {
auto info = std::make_shared<bag_events::BagSplitInfo>();
// Take the latest file name from metadata in case if it was updated after compression in
// derived class
info->closed_file =
(fs::path(base_folder_) / metadata_.relative_file_paths.back()).generic_string();
callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info);
}
storage_factory_.reset();

topics_names_to_info_.clear();
topic_names_to_message_definitions_.clear();

converter_.reset();
}

void SequentialWriter::create_topic(const rosbag2_storage::TopicMetadata & topic_with_type)
Expand Down Expand Up @@ -226,7 +245,7 @@ void SequentialWriter::create_topic(
return;
}

if (!storage_) {
if (!is_open_) {
throw std::runtime_error("Bag is not open. Call open() before writing.");
}

Expand Down Expand Up @@ -260,7 +279,7 @@ void SequentialWriter::create_topic(

void SequentialWriter::remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type)
{
if (!storage_) {
if (!is_open_) {
throw std::runtime_error("Bag is not open. Call open() before removing.");
}

Expand Down Expand Up @@ -308,14 +327,13 @@ void SequentialWriter::switch_to_next_storage()
base_folder_,
metadata_.relative_file_paths.size());
storage_ = storage_factory_->open_read_write(storage_options_);
storage_->update_metadata(metadata_);

if (!storage_) {
std::stringstream errmsg;
errmsg << "Failed to rollover bagfile to new file: \"" << storage_options_.uri << "\"!";

throw std::runtime_error(errmsg.str());
}
storage_->update_metadata(metadata_);
// Re-register all topics since we rolled-over to a new bagfile.
for (const auto & topic : topics_names_to_info_) {
auto const & md = topic_names_to_message_definitions_[topic.first];
Expand Down Expand Up @@ -348,7 +366,7 @@ void SequentialWriter::split_bagfile()

void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message)
{
if (!storage_) {
if (!is_open_) {
throw std::runtime_error("Bag is not open. Call open() before writing.");
}

Expand Down
34 changes: 33 additions & 1 deletion rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example)
serialization.serialize_message(&test_msg, &serialized_msg);

auto rosbag_directory = fs::path("test_rosbag2_writer_api_bag");
auto rosbag_directory_next = fs::path("test_rosbag2_writer_api_bag_next");
// in case the bag was previously not cleaned up
fs::remove_all(rosbag_directory);
fs::remove_all(rosbag_directory_next);

{
rosbag2_cpp::Writer writer;
Expand Down Expand Up @@ -99,7 +101,18 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example)
// writing a non-serialized message
writer.write(test_msg, "/a/ros2/message", rclcpp::Clock().now());

// close on scope exit
// close as prompted
writer.close();

// open a new bag with the same writer
writer.open(rosbag_directory_next.string());

// write same topic to different bag
writer.write(
serialized_msg2, "/yet/another/topic", "test_msgs/msg/BasicTypes",
rclcpp::Clock().now());

// close by scope
}

{
Expand All @@ -126,6 +139,24 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example)
// close on scope exit
}

{
rosbag2_cpp::Reader reader;
std::string topic;
reader.open(rosbag_directory_next.string());
ASSERT_TRUE(reader.has_next());

auto bag_message = reader.read_next();
topic = bag_message->topic_name;

TestMsgT extracted_test_msg;
rclcpp::SerializedMessage extracted_serialized_msg(*bag_message->serialized_data);
serialization.deserialize_message(
&extracted_serialized_msg, &extracted_test_msg);

EXPECT_EQ(test_msg, extracted_test_msg);
EXPECT_EQ("/yet/another/topic", topic);
}

// alternative reader
{
rosbag2_cpp::Reader reader;
Expand All @@ -140,6 +171,7 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example)

// remove the rosbag again after the test
EXPECT_TRUE(fs::remove_all(rosbag_directory));
EXPECT_TRUE(fs::remove_all(rosbag_directory_next));
}

INSTANTIATE_TEST_SUITE_P(
Expand Down

0 comments on commit 24fedb4

Please sign in to comment.