From 704a6cd016d8312d3a9e7a4f83f16044e8adefcc Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sat, 11 May 2024 10:11:15 -0700 Subject: [PATCH] [iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635) * re-applies fixes from #1590 to rolling. Also removes new message definition in sequential writer test for multiple open operations. Also clears topic_names_to_message_definitions_ and handles message_definitions_s underlying container similarly. Lastly, also avoids reset of factory in the compression writer, adds unit test there too. Signed-off-by: Yannick Schulz Signed-off-by: Michael Orlov * removes unused compressor_ member from compresser writer class. Also delegates rest of the closing behavior to the base class in close method, as it is handled in the open and write methods of the compression writer Signed-off-by: Yannick Schulz * Remove unrelated delta - message_definitions_ was intentionally allocated on the stack and should persist between writer close() and open() because it represents cache for message definitions which is not changes. Signed-off-by: Michael Orlov * Don't call virtual methods from destructors Signed-off-by: Michael Orlov * Cleanup 'rosbag2_directory_next' after the test run Signed-off-by: Michael Orlov * Protect Writer::open(..) and Writer::close() with mutex on upper level - Rationale: To avoid race conditions if open(..) and close() could be ever be called from different threads. Signed-off-by: Michael Orlov * Bugfix for WRITE_SPLIT callback not called for the last compressed file Signed-off-by: Michael Orlov * Bugfix for lost messages from cache when closing compression writer Signed-off-by: Michael Orlov * Address build failure by using rcpputils::fs instead of std::filesystem - Note: On Iron we haven't migrated to the std::filesystem and using rcpputils::fs Signed-off-by: Michael Orlov * Adopt failing 'open_succeeds_twice' test for Iron Signed-off-by: Michael Orlov * Return from writer's open() immediately if storage already open Signed-off-by: Michael Orlov --------- Signed-off-by: Yannick Schulz Signed-off-by: Michael Orlov Co-authored-by: Yannick Schulz (cherry picked from commit a360d9b97e4ee8efa3bef461cac5ed0ab0c8fc0a) # Conflicts: # rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp # rosbag2_cpp/src/rosbag2_cpp/writer.cpp # rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp --- .../sequential_compression_writer.cpp | 36 ++++++++++++++----- .../test_sequential_compression_writer.cpp | 24 +++++++++++++ .../rosbag2_cpp/writers/sequential_writer.hpp | 3 ++ rosbag2_cpp/src/rosbag2_cpp/writer.cpp | 10 ++++++ .../rosbag2_cpp/writers/sequential_writer.cpp | 35 +++++++++++++++--- .../rosbag2_tests/test_rosbag2_cpp_api.cpp | 34 +++++++++++++++++- 6 files changed, 129 insertions(+), 13 deletions(-) diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index f711c722a3..7f483762fd 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -57,12 +57,18 @@ SequentialCompressionWriter::SequentialCompressionWriter( SequentialCompressionWriter::~SequentialCompressionWriter() { +<<<<<<< HEAD close(); // Fix for https://github.com/ros2/rosbag2/issues/1278 // Explicitly deconstruct in the correct order to avoid severe warning message. // Humble ABI stability does not allow changing the class declaration order. compressor_.reset(); compression_factory_.reset(); +======= + if (storage_) { + SequentialCompressionWriter::close(); + } +>>>>>>> a360d9b ([iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635)) } void SequentialCompressionWriter::compression_thread_fn() @@ -181,6 +187,10 @@ void SequentialCompressionWriter::open( const rosbag2_cpp::ConverterOptions & converter_options) { std::lock_guard lock(storage_mutex_); + // Note. Close and open methods protected with mutex on upper rosbag2_cpp::writer level. + if (storage_) { + return; // The writer already opened. + } SequentialWriter::open(storage_options, converter_options); setup_compression(); } @@ -196,7 +206,18 @@ void SequentialCompressionWriter::close() std::lock_guard lock(storage_mutex_); std::lock_guard compressor_lock(compressor_queue_mutex_); try { - storage_.reset(); // Storage must be closed before it can be compressed. + // Storage must be closed before file can be compressed. + if (use_cache_) { + // destructor will flush message cache + cache_consumer_.reset(); + message_cache_.reset(); + } + finalize_metadata(); + if (storage_) { + storage_->update_metadata(metadata_); + storage_.reset(); // Storage will be closed in storage_ destructor + } + if (!metadata_.relative_file_paths.empty()) { std::string file = metadata_.relative_file_paths.back(); compressor_file_queue_.push(file); @@ -206,19 +227,18 @@ void SequentialCompressionWriter::close() ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Could not compress the last bag file.\n" << e.what()); } } +<<<<<<< HEAD stop_compressor_threads(); finalize_metadata(); metadata_io_->write_metadata(base_folder_, metadata_); +======= +>>>>>>> a360d9b ([iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635)) } - - if (use_cache_) { - cache_consumer_.reset(); - message_cache_.reset(); - } - storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory - storage_factory_.reset(); + stop_compressor_threads(); // Note: The metadata_.relative_file_paths will be updated with + // compressed filename when compressor threads will finish. + SequentialWriter::close(); } void SequentialCompressionWriter::create_topic( diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index 0db18df82a..569bdb591d 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -199,6 +199,30 @@ TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_f writer_->open(tmp_dir_storage_options_, {serialization_format_, serialization_format_})); } +TEST_F(SequentialCompressionWriterTest, open_succeeds_twice) +{ + rosbag2_compression::CompressionOptions compression_options{ + DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; + initializeWriter(compression_options); + + auto tmp_dir = rcpputils::fs::temp_directory_path() / "path_not_empty"; + auto tmp_dir_next = rcpputils::fs::temp_directory_path() / "path_not_empty_next"; + + auto storage_options = rosbag2_storage::StorageOptions(); + auto storage_options_next = rosbag2_storage::StorageOptions(); + + storage_options.uri = tmp_dir.string(); + storage_options_next.uri = tmp_dir_next.string(); + + EXPECT_NO_THROW( + writer_->open(storage_options, {serialization_format_, serialization_format_})); + + writer_->close(); + EXPECT_NO_THROW( + writer_->open(storage_options_next, {serialization_format_, serialization_format_})); +} + TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor) { rosbag2_compression::CompressionOptions compression_options{ diff --git a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp index a5156c8ec3..21e3dee36c 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp @@ -142,6 +142,9 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter // Used to track topic -> message count. If cache is present, it is updated by CacheConsumer std::unordered_map topics_names_to_info_; + // Note: topics_names_to_info_ needs to be protected with mutex only when we are explicitly + // adding or deleting items (create_topic(..)/remove_topic(..)) and when we access it from + // CacheConsumer callback i.e., write_messages(..) std::mutex topics_info_mutex_; rosbag2_storage::BagMetadata metadata_; diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index 1bcca18371..f932c59214 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -60,9 +60,19 @@ void Writer::open( const rosbag2_storage::StorageOptions & storage_options, const ConverterOptions & converter_options) { + std::lock_guard writer_lock(writer_mutex_); writer_impl_->open(storage_options, converter_options); } +<<<<<<< HEAD +======= +void Writer::close() +{ + std::lock_guard writer_lock(writer_mutex_); + writer_impl_->close(); +} + +>>>>>>> a360d9b ([iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635)) void Writer::create_topic(const rosbag2_storage::TopicMetadata & topic_with_type) { std::lock_guard writer_lock(writer_mutex_); diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 591661d7f5..d49f3cbb6f 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -56,6 +56,10 @@ SequentialWriter::SequentialWriter( metadata_io_(std::move(metadata_io)), converter_(nullptr), topics_names_to_info_(), +<<<<<<< HEAD +======= + topic_names_to_message_definitions_(), +>>>>>>> a360d9b ([iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635)) metadata_() {} @@ -65,7 +69,9 @@ SequentialWriter::~SequentialWriter() // Callbacks likely was created after SequentialWriter object and may point to the already // destructed objects. callback_manager_.delete_all_callbacks(); - close(); + if (storage_) { + SequentialWriter::close(); + } } void SequentialWriter::init_metadata() @@ -87,8 +93,13 @@ void SequentialWriter::open( const rosbag2_storage::StorageOptions & storage_options, const ConverterOptions & converter_options) { + // Note. Close and open methods protected with mutex on upper rosbag2_cpp::writer level. + if (storage_) { + return; // The writer already opened. + } base_folder_ = storage_options.uri; storage_options_ = storage_options; + if (storage_options_.storage_id.empty()) { storage_options_.storage_id = kDefaultStorageID; } @@ -166,13 +177,22 @@ void SequentialWriter::close() } if (storage_) { - auto info = std::make_shared(); - info->closed_file = storage_->get_relative_file_path(); storage_.reset(); // Destroy storage before calling WRITE_SPLIT callback to make sure that // bag file was closed before callback call. + } + if (!metadata_.relative_file_paths.empty()) { + auto info = std::make_shared(); + // Take the latest file name from metadata in case if it was updated after compression in + // derived class + info->closed_file = + (rcpputils::fs::path(base_folder_) / metadata_.relative_file_paths.back()).string(); callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info); } - storage_factory_.reset(); + + topics_names_to_info_.clear(); + topic_names_to_message_definitions_.clear(); + + converter_.reset(); } void SequentialWriter::create_topic(const rosbag2_storage::TopicMetadata & topic_with_type) @@ -260,14 +280,21 @@ void SequentialWriter::switch_to_next_storage() base_folder_, metadata_.relative_file_paths.size()); storage_ = storage_factory_->open_read_write(storage_options_); +<<<<<<< HEAD +======= +>>>>>>> a360d9b ([iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635)) if (!storage_) { std::stringstream errmsg; errmsg << "Failed to rollover bagfile to new file: \"" << storage_options_.uri << "\"!"; throw std::runtime_error(errmsg.str()); } +<<<<<<< HEAD +======= + storage_->update_metadata(metadata_); +>>>>>>> a360d9b ([iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635)) // Re-register all topics since we rolled-over to a new bagfile. for (const auto & topic : topics_names_to_info_) { storage_->create_topic(topic.second.topic_metadata); diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp index 256f63a13e..abad8c9c36 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp @@ -43,8 +43,10 @@ TEST(TestRosbag2CPPAPI, minimal_writer_example) serialization.serialize_message(&test_msg, &serialized_msg); auto rosbag_directory = rcpputils::fs::path("test_rosbag2_writer_api_bag"); + auto rosbag_directory_next = rcpputils::fs::path("test_rosbag2_writer_api_bag_next"); // in case the bag was previously not cleaned up rcpputils::fs::remove_all(rosbag_directory); + rcpputils::fs::remove_all(rosbag_directory_next); { rosbag2_cpp::Writer writer; @@ -87,7 +89,18 @@ TEST(TestRosbag2CPPAPI, minimal_writer_example) // writing a non-serialized message writer.write(test_msg, "/a/ros2/message", rclcpp::Clock().now()); - // close on scope exit + // close as prompted + writer.close(); + + // open a new bag with the same writer + writer.open(rosbag_directory_next.string()); + + // write same topic to different bag + writer.write( + serialized_msg2, "/yet/another/topic", "test_msgs/msg/BasicTypes", + rclcpp::Clock().now()); + + // close by scope } { @@ -114,6 +127,24 @@ TEST(TestRosbag2CPPAPI, minimal_writer_example) // close on scope exit } + { + rosbag2_cpp::Reader reader; + std::string topic; + reader.open(rosbag_directory_next.string()); + ASSERT_TRUE(reader.has_next()); + + auto bag_message = reader.read_next(); + topic = bag_message->topic_name; + + TestMsgT extracted_test_msg; + rclcpp::SerializedMessage extracted_serialized_msg(*bag_message->serialized_data); + serialization.deserialize_message( + &extracted_serialized_msg, &extracted_test_msg); + + EXPECT_EQ(test_msg, extracted_test_msg); + EXPECT_EQ("/yet/another/topic", topic); + } + // alternative reader { rosbag2_cpp::Reader reader; @@ -128,4 +159,5 @@ TEST(TestRosbag2CPPAPI, minimal_writer_example) // remove the rosbag again after the test EXPECT_TRUE(rcpputils::fs::remove_all(rosbag_directory)); + EXPECT_TRUE(rcpputils::fs::remove_all(rosbag_directory_next)); }