Skip to content

Commit

Permalink
[iron] Bugfix for writer not being able to open again after closing (…
Browse files Browse the repository at this point in the history
…backport #1599) (#1635)

* re-applies fixes from #1590 to rolling. Also removes new message definition in sequential writer test for multiple open operations. Also clears topic_names_to_message_definitions_ and handles message_definitions_s underlying container similarly. Lastly, also avoids reset of factory in the compression writer, adds unit test there too.

Signed-off-by: Yannick Schulz <yschulz854@gmail.com>
Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* removes unused compressor_ member from compresser writer class. Also delegates rest of the closing behavior to the base class in close method, as it is handled in the open and write methods of the compression writer

Signed-off-by: Yannick Schulz <yschulz854@gmail.com>

* Remove unrelated delta

- message_definitions_ was intentionally allocated on the stack and
should persist between writer close() and open() because it represents
cache for message definitions which is not changes.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Don't call virtual methods from destructors

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Cleanup 'rosbag2_directory_next' after the test run

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Protect Writer::open(..) and Writer::close() with mutex on upper level

- Rationale: To avoid race conditions if open(..) and close() could be
ever be called from different threads.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Bugfix for WRITE_SPLIT callback not called for the last compressed file

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Bugfix for lost messages from cache when closing compression writer

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Address build failure by using rcpputils::fs instead of std::filesystem

- Note: On Iron we haven't migrated to the std::filesystem and using
rcpputils::fs

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Adopt failing 'open_succeeds_twice' test for Iron

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Return from writer's open() immediately if storage already open

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

---------

Signed-off-by: Yannick Schulz <yschulz854@gmail.com>
Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
Co-authored-by: Yannick Schulz <yschulz854@gmail.com>
(cherry picked from commit a360d9b)

# Conflicts:
#	rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp
#	rosbag2_cpp/src/rosbag2_cpp/writer.cpp
#	rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
  • Loading branch information
MichaelOrlov committed May 16, 2024
1 parent 6226aa5 commit 704a6cd
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,18 @@ SequentialCompressionWriter::SequentialCompressionWriter(

SequentialCompressionWriter::~SequentialCompressionWriter()
{
<<<<<<< HEAD
close();
// Fix for https://github.com/ros2/rosbag2/issues/1278
// Explicitly deconstruct in the correct order to avoid severe warning message.
// Humble ABI stability does not allow changing the class declaration order.
compressor_.reset();
compression_factory_.reset();
=======
if (storage_) {
SequentialCompressionWriter::close();
}
>>>>>>> a360d9b ([iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635))
}

void SequentialCompressionWriter::compression_thread_fn()
Expand Down Expand Up @@ -181,6 +187,10 @@ void SequentialCompressionWriter::open(
const rosbag2_cpp::ConverterOptions & converter_options)
{
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
// Note. Close and open methods protected with mutex on upper rosbag2_cpp::writer level.
if (storage_) {
return; // The writer already opened.
}
SequentialWriter::open(storage_options, converter_options);
setup_compression();
}
Expand All @@ -196,7 +206,18 @@ void SequentialCompressionWriter::close()
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
std::lock_guard<std::mutex> compressor_lock(compressor_queue_mutex_);
try {
storage_.reset(); // Storage must be closed before it can be compressed.
// Storage must be closed before file can be compressed.
if (use_cache_) {
// destructor will flush message cache
cache_consumer_.reset();
message_cache_.reset();
}
finalize_metadata();
if (storage_) {
storage_->update_metadata(metadata_);
storage_.reset(); // Storage will be closed in storage_ destructor
}

if (!metadata_.relative_file_paths.empty()) {
std::string file = metadata_.relative_file_paths.back();
compressor_file_queue_.push(file);
Expand All @@ -206,19 +227,18 @@ void SequentialCompressionWriter::close()
ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Could not compress the last bag file.\n" << e.what());
}
}
<<<<<<< HEAD

stop_compressor_threads();

finalize_metadata();
metadata_io_->write_metadata(base_folder_, metadata_);
=======
>>>>>>> a360d9b ([iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635))
}

if (use_cache_) {
cache_consumer_.reset();
message_cache_.reset();
}
storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory
storage_factory_.reset();
stop_compressor_threads(); // Note: The metadata_.relative_file_paths will be updated with
// compressed filename when compressor threads will finish.
SequentialWriter::close();
}

void SequentialCompressionWriter::create_topic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,30 @@ TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_f
writer_->open(tmp_dir_storage_options_, {serialization_format_, serialization_format_}));
}

TEST_F(SequentialCompressionWriterTest, open_succeeds_twice)
{
rosbag2_compression::CompressionOptions compression_options{
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
initializeWriter(compression_options);

auto tmp_dir = rcpputils::fs::temp_directory_path() / "path_not_empty";
auto tmp_dir_next = rcpputils::fs::temp_directory_path() / "path_not_empty_next";

auto storage_options = rosbag2_storage::StorageOptions();
auto storage_options_next = rosbag2_storage::StorageOptions();

storage_options.uri = tmp_dir.string();
storage_options_next.uri = tmp_dir_next.string();

EXPECT_NO_THROW(
writer_->open(storage_options, {serialization_format_, serialization_format_}));

writer_->close();
EXPECT_NO_THROW(
writer_->open(storage_options_next, {serialization_format_, serialization_format_}));
}

TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor)
{
rosbag2_compression::CompressionOptions compression_options{
Expand Down
3 changes: 3 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter

// Used to track topic -> message count. If cache is present, it is updated by CacheConsumer
std::unordered_map<std::string, rosbag2_storage::TopicInformation> topics_names_to_info_;
// Note: topics_names_to_info_ needs to be protected with mutex only when we are explicitly
// adding or deleting items (create_topic(..)/remove_topic(..)) and when we access it from
// CacheConsumer callback i.e., write_messages(..)
std::mutex topics_info_mutex_;

rosbag2_storage::BagMetadata metadata_;
Expand Down
10 changes: 10 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,19 @@ void Writer::open(
const rosbag2_storage::StorageOptions & storage_options,
const ConverterOptions & converter_options)
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
writer_impl_->open(storage_options, converter_options);
}

<<<<<<< HEAD
=======
void Writer::close()
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
writer_impl_->close();
}

>>>>>>> a360d9b ([iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635))
void Writer::create_topic(const rosbag2_storage::TopicMetadata & topic_with_type)
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
Expand Down
35 changes: 31 additions & 4 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ SequentialWriter::SequentialWriter(
metadata_io_(std::move(metadata_io)),
converter_(nullptr),
topics_names_to_info_(),
<<<<<<< HEAD
=======
topic_names_to_message_definitions_(),
>>>>>>> a360d9b ([iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635))
metadata_()
{}

Expand All @@ -65,7 +69,9 @@ SequentialWriter::~SequentialWriter()
// Callbacks likely was created after SequentialWriter object and may point to the already
// destructed objects.
callback_manager_.delete_all_callbacks();
close();
if (storage_) {
SequentialWriter::close();
}
}

void SequentialWriter::init_metadata()
Expand All @@ -87,8 +93,13 @@ void SequentialWriter::open(
const rosbag2_storage::StorageOptions & storage_options,
const ConverterOptions & converter_options)
{
// Note. Close and open methods protected with mutex on upper rosbag2_cpp::writer level.
if (storage_) {
return; // The writer already opened.
}
base_folder_ = storage_options.uri;
storage_options_ = storage_options;

if (storage_options_.storage_id.empty()) {
storage_options_.storage_id = kDefaultStorageID;
}
Expand Down Expand Up @@ -166,13 +177,22 @@ void SequentialWriter::close()
}

if (storage_) {
auto info = std::make_shared<bag_events::BagSplitInfo>();
info->closed_file = storage_->get_relative_file_path();
storage_.reset(); // Destroy storage before calling WRITE_SPLIT callback to make sure that
// bag file was closed before callback call.
}
if (!metadata_.relative_file_paths.empty()) {
auto info = std::make_shared<bag_events::BagSplitInfo>();
// Take the latest file name from metadata in case if it was updated after compression in
// derived class
info->closed_file =
(rcpputils::fs::path(base_folder_) / metadata_.relative_file_paths.back()).string();
callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info);
}
storage_factory_.reset();

topics_names_to_info_.clear();
topic_names_to_message_definitions_.clear();

converter_.reset();
}

void SequentialWriter::create_topic(const rosbag2_storage::TopicMetadata & topic_with_type)
Expand Down Expand Up @@ -260,14 +280,21 @@ void SequentialWriter::switch_to_next_storage()
base_folder_,
metadata_.relative_file_paths.size());
storage_ = storage_factory_->open_read_write(storage_options_);
<<<<<<< HEAD

=======
>>>>>>> a360d9b ([iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635))
if (!storage_) {
std::stringstream errmsg;
errmsg << "Failed to rollover bagfile to new file: \"" << storage_options_.uri << "\"!";

throw std::runtime_error(errmsg.str());
}
<<<<<<< HEAD

=======
storage_->update_metadata(metadata_);
>>>>>>> a360d9b ([iron] Bugfix for writer not being able to open again after closing (backport #1599) (#1635))
// Re-register all topics since we rolled-over to a new bagfile.
for (const auto & topic : topics_names_to_info_) {
storage_->create_topic(topic.second.topic_metadata);
Expand Down
34 changes: 33 additions & 1 deletion rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ TEST(TestRosbag2CPPAPI, minimal_writer_example)
serialization.serialize_message(&test_msg, &serialized_msg);

auto rosbag_directory = rcpputils::fs::path("test_rosbag2_writer_api_bag");
auto rosbag_directory_next = rcpputils::fs::path("test_rosbag2_writer_api_bag_next");
// in case the bag was previously not cleaned up
rcpputils::fs::remove_all(rosbag_directory);
rcpputils::fs::remove_all(rosbag_directory_next);

{
rosbag2_cpp::Writer writer;
Expand Down Expand Up @@ -87,7 +89,18 @@ TEST(TestRosbag2CPPAPI, minimal_writer_example)
// writing a non-serialized message
writer.write(test_msg, "/a/ros2/message", rclcpp::Clock().now());

// close on scope exit
// close as prompted
writer.close();

// open a new bag with the same writer
writer.open(rosbag_directory_next.string());

// write same topic to different bag
writer.write(
serialized_msg2, "/yet/another/topic", "test_msgs/msg/BasicTypes",
rclcpp::Clock().now());

// close by scope
}

{
Expand All @@ -114,6 +127,24 @@ TEST(TestRosbag2CPPAPI, minimal_writer_example)
// close on scope exit
}

{
rosbag2_cpp::Reader reader;
std::string topic;
reader.open(rosbag_directory_next.string());
ASSERT_TRUE(reader.has_next());

auto bag_message = reader.read_next();
topic = bag_message->topic_name;

TestMsgT extracted_test_msg;
rclcpp::SerializedMessage extracted_serialized_msg(*bag_message->serialized_data);
serialization.deserialize_message(
&extracted_serialized_msg, &extracted_test_msg);

EXPECT_EQ(test_msg, extracted_test_msg);
EXPECT_EQ("/yet/another/topic", topic);
}

// alternative reader
{
rosbag2_cpp::Reader reader;
Expand All @@ -128,4 +159,5 @@ TEST(TestRosbag2CPPAPI, minimal_writer_example)

// remove the rosbag again after the test
EXPECT_TRUE(rcpputils::fs::remove_all(rosbag_directory));
EXPECT_TRUE(rcpputils::fs::remove_all(rosbag_directory_next));
}

0 comments on commit 704a6cd

Please sign in to comment.