diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index f711c722a3..7f483762fd 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -57,12 +57,18 @@ SequentialCompressionWriter::SequentialCompressionWriter( SequentialCompressionWriter::~SequentialCompressionWriter() { +<<<<<<< HEAD close(); // Fix for https://github.com/ros2/rosbag2/issues/1278 // Explicitly deconstruct in the correct order to avoid severe warning message. // Humble ABI stability does not allow changing the class declaration order. compressor_.reset(); compression_factory_.reset(); +======= + if (storage_) { + SequentialCompressionWriter::close(); + } +>>>>>>> a360d9b ([iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635)) } void SequentialCompressionWriter::compression_thread_fn() @@ -181,6 +187,10 @@ void SequentialCompressionWriter::open( const rosbag2_cpp::ConverterOptions & converter_options) { std::lock_guard lock(storage_mutex_); + // Note. Close and open methods protected with mutex on upper rosbag2_cpp::writer level. + if (storage_) { + return; // The writer already opened. + } SequentialWriter::open(storage_options, converter_options); setup_compression(); } @@ -196,7 +206,18 @@ void SequentialCompressionWriter::close() std::lock_guard lock(storage_mutex_); std::lock_guard compressor_lock(compressor_queue_mutex_); try { - storage_.reset(); // Storage must be closed before it can be compressed. + // Storage must be closed before file can be compressed. + if (use_cache_) { + // destructor will flush message cache + cache_consumer_.reset(); + message_cache_.reset(); + } + finalize_metadata(); + if (storage_) { + storage_->update_metadata(metadata_); + storage_.reset(); // Storage will be closed in storage_ destructor + } + if (!metadata_.relative_file_paths.empty()) { std::string file = metadata_.relative_file_paths.back(); compressor_file_queue_.push(file); @@ -206,19 +227,18 @@ void SequentialCompressionWriter::close() ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Could not compress the last bag file.\n" << e.what()); } } +<<<<<<< HEAD stop_compressor_threads(); finalize_metadata(); metadata_io_->write_metadata(base_folder_, metadata_); +======= +>>>>>>> a360d9b ([iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635)) } - - if (use_cache_) { - cache_consumer_.reset(); - message_cache_.reset(); - } - storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory - storage_factory_.reset(); + stop_compressor_threads(); // Note: The metadata_.relative_file_paths will be updated with + // compressed filename when compressor threads will finish. + SequentialWriter::close(); } void SequentialCompressionWriter::create_topic( diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index 0db18df82a..569bdb591d 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -199,6 +199,30 @@ TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_f writer_->open(tmp_dir_storage_options_, {serialization_format_, serialization_format_})); } +TEST_F(SequentialCompressionWriterTest, open_succeeds_twice) +{ + rosbag2_compression::CompressionOptions compression_options{ + DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; + initializeWriter(compression_options); + + auto tmp_dir = rcpputils::fs::temp_directory_path() / "path_not_empty"; + auto tmp_dir_next = rcpputils::fs::temp_directory_path() / "path_not_empty_next"; + + auto storage_options = rosbag2_storage::StorageOptions(); + auto storage_options_next = rosbag2_storage::StorageOptions(); + + storage_options.uri = tmp_dir.string(); + storage_options_next.uri = tmp_dir_next.string(); + + EXPECT_NO_THROW( + writer_->open(storage_options, {serialization_format_, serialization_format_})); + + writer_->close(); + EXPECT_NO_THROW( + writer_->open(storage_options_next, {serialization_format_, serialization_format_})); +} + TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor) { rosbag2_compression::CompressionOptions compression_options{ diff --git a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp index a5156c8ec3..21e3dee36c 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp @@ -142,6 +142,9 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter // Used to track topic -> message count. If cache is present, it is updated by CacheConsumer std::unordered_map topics_names_to_info_; + // Note: topics_names_to_info_ needs to be protected with mutex only when we are explicitly + // adding or deleting items (create_topic(..)/remove_topic(..)) and when we access it from + // CacheConsumer callback i.e., write_messages(..) std::mutex topics_info_mutex_; rosbag2_storage::BagMetadata metadata_; diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index 1bcca18371..f932c59214 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -60,9 +60,19 @@ void Writer::open( const rosbag2_storage::StorageOptions & storage_options, const ConverterOptions & converter_options) { + std::lock_guard writer_lock(writer_mutex_); writer_impl_->open(storage_options, converter_options); } +<<<<<<< HEAD +======= +void Writer::close() +{ + std::lock_guard writer_lock(writer_mutex_); + writer_impl_->close(); +} + +>>>>>>> a360d9b ([iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635)) void Writer::create_topic(const rosbag2_storage::TopicMetadata & topic_with_type) { std::lock_guard writer_lock(writer_mutex_); diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 591661d7f5..d49f3cbb6f 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -56,6 +56,10 @@ SequentialWriter::SequentialWriter( metadata_io_(std::move(metadata_io)), converter_(nullptr), topics_names_to_info_(), +<<<<<<< HEAD +======= + topic_names_to_message_definitions_(), +>>>>>>> a360d9b ([iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635)) metadata_() {} @@ -65,7 +69,9 @@ SequentialWriter::~SequentialWriter() // Callbacks likely was created after SequentialWriter object and may point to the already // destructed objects. callback_manager_.delete_all_callbacks(); - close(); + if (storage_) { + SequentialWriter::close(); + } } void SequentialWriter::init_metadata() @@ -87,8 +93,13 @@ void SequentialWriter::open( const rosbag2_storage::StorageOptions & storage_options, const ConverterOptions & converter_options) { + // Note. Close and open methods protected with mutex on upper rosbag2_cpp::writer level. + if (storage_) { + return; // The writer already opened. + } base_folder_ = storage_options.uri; storage_options_ = storage_options; + if (storage_options_.storage_id.empty()) { storage_options_.storage_id = kDefaultStorageID; } @@ -166,13 +177,22 @@ void SequentialWriter::close() } if (storage_) { - auto info = std::make_shared(); - info->closed_file = storage_->get_relative_file_path(); storage_.reset(); // Destroy storage before calling WRITE_SPLIT callback to make sure that // bag file was closed before callback call. + } + if (!metadata_.relative_file_paths.empty()) { + auto info = std::make_shared(); + // Take the latest file name from metadata in case if it was updated after compression in + // derived class + info->closed_file = + (rcpputils::fs::path(base_folder_) / metadata_.relative_file_paths.back()).string(); callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info); } - storage_factory_.reset(); + + topics_names_to_info_.clear(); + topic_names_to_message_definitions_.clear(); + + converter_.reset(); } void SequentialWriter::create_topic(const rosbag2_storage::TopicMetadata & topic_with_type) @@ -260,14 +280,21 @@ void SequentialWriter::switch_to_next_storage() base_folder_, metadata_.relative_file_paths.size()); storage_ = storage_factory_->open_read_write(storage_options_); +<<<<<<< HEAD +======= +>>>>>>> a360d9b ([iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635)) if (!storage_) { std::stringstream errmsg; errmsg << "Failed to rollover bagfile to new file: \"" << storage_options_.uri << "\"!"; throw std::runtime_error(errmsg.str()); } +<<<<<<< HEAD +======= + storage_->update_metadata(metadata_); +>>>>>>> a360d9b ([iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635)) // Re-register all topics since we rolled-over to a new bagfile. for (const auto & topic : topics_names_to_info_) { storage_->create_topic(topic.second.topic_metadata); diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp index 256f63a13e..abad8c9c36 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp @@ -43,8 +43,10 @@ TEST(TestRosbag2CPPAPI, minimal_writer_example) serialization.serialize_message(&test_msg, &serialized_msg); auto rosbag_directory = rcpputils::fs::path("test_rosbag2_writer_api_bag"); + auto rosbag_directory_next = rcpputils::fs::path("test_rosbag2_writer_api_bag_next"); // in case the bag was previously not cleaned up rcpputils::fs::remove_all(rosbag_directory); + rcpputils::fs::remove_all(rosbag_directory_next); { rosbag2_cpp::Writer writer; @@ -87,7 +89,18 @@ TEST(TestRosbag2CPPAPI, minimal_writer_example) // writing a non-serialized message writer.write(test_msg, "/a/ros2/message", rclcpp::Clock().now()); - // close on scope exit + // close as prompted + writer.close(); + + // open a new bag with the same writer + writer.open(rosbag_directory_next.string()); + + // write same topic to different bag + writer.write( + serialized_msg2, "/yet/another/topic", "test_msgs/msg/BasicTypes", + rclcpp::Clock().now()); + + // close by scope } { @@ -114,6 +127,24 @@ TEST(TestRosbag2CPPAPI, minimal_writer_example) // close on scope exit } + { + rosbag2_cpp::Reader reader; + std::string topic; + reader.open(rosbag_directory_next.string()); + ASSERT_TRUE(reader.has_next()); + + auto bag_message = reader.read_next(); + topic = bag_message->topic_name; + + TestMsgT extracted_test_msg; + rclcpp::SerializedMessage extracted_serialized_msg(*bag_message->serialized_data); + serialization.deserialize_message( + &extracted_serialized_msg, &extracted_test_msg); + + EXPECT_EQ(test_msg, extracted_test_msg); + EXPECT_EQ("/yet/another/topic", topic); + } + // alternative reader { rosbag2_cpp::Reader reader; @@ -128,4 +159,5 @@ TEST(TestRosbag2CPPAPI, minimal_writer_example) // remove the rosbag again after the test EXPECT_TRUE(rcpputils::fs::remove_all(rosbag_directory)); + EXPECT_TRUE(rcpputils::fs::remove_all(rosbag_directory_next)); }