From f3d62dfb76c20846b7e8756d27387ba784eb213d Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Fri, 12 Jan 2024 09:28:07 -0800 Subject: [PATCH] Improve performance in SqliteStorage::get_bagfile_size() (#1516) * Improve performance in SqliteStorage::get_bagfile_size() - In SQLite3 DB file size is equal to the db page_count * page_size - Use DB query to get page_count and page_size instead of filesystem blocking call. Signed-off-by: Michael Orlov * Address integration tests failure - Don't use static objects for the SqliteStatement, it causes DB lock and failure to close and open it again. Signed-off-by: Michael Orlov * Protect get_bagfile_size() with database_write_mutex_ Signed-off-by: Michael Orlov * Make get_bagfile_size() lock free to avoid locking double-buffer cache - Moved db_file_size_ update to the functions which are writing to the database Signed-off-by: Michael Orlov * Move test_storage_interfaces.cpp to the rosbag2_tests - Move test_storage_interfaces.cpp to the rosbag2_tests due to the cyclic dependencies between rosbag2_storage and storage plugins. Signed-off-by: Michael Orlov * Rename test_storage_interfaces.cpp to the test_rosbag2_storage_api.cpp Signed-off-by: Michael Orlov * Address review comments - Rename `database_write_mutex_` to the `db_read_write_mutex_` - Add const qualifiers for get_page_size() and read_total_page_count_locked() methods Signed-off-by: Michael Orlov * Fix Windows warnings in test_rosbag2_storage_api.cpp Signed-off-by: Michael Orlov --------- Signed-off-by: Michael Orlov --- rosbag2_storage/CMakeLists.txt | 2 +- .../sqlite_storage.hpp | 15 +- .../sqlite_storage.cpp | 70 +++++++- rosbag2_tests/CMakeLists.txt | 12 +- .../test_rosbag2_storage_api.cpp | 167 ++++++++++++++++++ 5 files changed, 252 insertions(+), 14 deletions(-) create mode 100644 rosbag2_tests/test/rosbag2_tests/test_rosbag2_storage_api.cpp diff --git a/rosbag2_storage/CMakeLists.txt b/rosbag2_storage/CMakeLists.txt index 1fbd3e6dba..38ff4a22f4 100644 --- a/rosbag2_storage/CMakeLists.txt +++ b/rosbag2_storage/CMakeLists.txt @@ -132,6 +132,6 @@ if(BUILD_TESTING) rosbag2_test_common::rosbag2_test_common yaml-cpp ) - endif() +endif() ament_package() diff --git a/rosbag2_storage_sqlite3/include/rosbag2_storage_sqlite3/sqlite_storage.hpp b/rosbag2_storage_sqlite3/include/rosbag2_storage_sqlite3/sqlite_storage.hpp index b38c5a9c56..22b2e376ba 100644 --- a/rosbag2_storage_sqlite3/include/rosbag2_storage_sqlite3/sqlite_storage.hpp +++ b/rosbag2_storage_sqlite3/include/rosbag2_storage_sqlite3/sqlite_storage.hpp @@ -122,22 +122,25 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage void activate_transaction(); void commit_transaction(); void write_locked(std::shared_ptr message) - RCPPUTILS_TSA_REQUIRES(database_write_mutex_); + RCPPUTILS_TSA_REQUIRES(db_read_write_mutex_); int get_last_rowid(); int read_db_schema_version(); + uint64_t get_page_size() const; + uint64_t read_total_page_count_locked() const RCPPUTILS_TSA_GUARDED_BY(db_read_write_mutex_); using ReadQueryResult = SqliteStatementWrapper::QueryResult< std::shared_ptr, rcutils_time_point_value_t, std::string, int>; - std::shared_ptr database_ RCPPUTILS_TSA_GUARDED_BY(database_write_mutex_); + std::shared_ptr database_ RCPPUTILS_TSA_GUARDED_BY(db_read_write_mutex_); SqliteStatement write_statement_ {}; SqliteStatement read_statement_ {}; + SqliteStatement page_count_statement_ {}; ReadQueryResult message_result_ {nullptr}; ReadQueryResult::Iterator current_message_row_ { nullptr, SqliteStatementWrapper::QueryResult<>::Iterator::POSITION_END}; - std::unordered_map topics_ RCPPUTILS_TSA_GUARDED_BY(database_write_mutex_); + std::unordered_map topics_ RCPPUTILS_TSA_GUARDED_BY(db_read_write_mutex_); std::unordered_map msg_definitions_ RCPPUTILS_TSA_GUARDED_BY( - database_write_mutex_); + db_read_write_mutex_); std::vector all_topics_and_types_; std::string relative_path_; std::atomic_bool active_transaction_ {false}; @@ -152,11 +155,13 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage // This mutex is necessary to protect: // a) database access (this could also be done with FULLMUTEX), but see b) // b) topics_ collection - since we could be writing and reading it at the same time - std::mutex database_write_mutex_; + std::mutex db_read_write_mutex_; const int kDBSchemaVersion_ = 4; int db_schema_version_ = -1; // Valid version number starting from 1 rosbag2_storage::BagMetadata metadata_{}; + uint64_t db_page_size_ = 0; + std::atomic db_file_size_{0}; }; diff --git a/rosbag2_storage_sqlite3/src/rosbag2_storage_sqlite3/sqlite_storage.cpp b/rosbag2_storage_sqlite3/src/rosbag2_storage_sqlite3/sqlite_storage.cpp index a1e32ebee4..998d248cd5 100644 --- a/rosbag2_storage_sqlite3/src/rosbag2_storage_sqlite3/sqlite_storage.cpp +++ b/rosbag2_storage_sqlite3/src/rosbag2_storage_sqlite3/sqlite_storage.cpp @@ -29,6 +29,7 @@ #include "rcpputils/env.hpp" #include "rcpputils/filesystem_helper.hpp" +#include "rcpputils/scope_exit.hpp" #include "rosbag2_storage/metadata_io.hpp" #include "rosbag2_storage/serialized_bag_message.hpp" @@ -181,6 +182,10 @@ void SqliteStorage::open( const rosbag2_storage::StorageOptions & storage_options, rosbag2_storage::storage_interfaces::IOFlag io_flag) { + if (page_count_statement_) { + page_count_statement_->reset(); // Reset statement to avoid DB lock + page_count_statement_.reset(); + } storage_mode_ = io_flag; const auto preset = parse_preset_profile(storage_options.storage_preset_profile); auto pragmas = parse_pragmas(storage_options.storage_config_uri, io_flag); @@ -212,10 +217,16 @@ void SqliteStorage::open( throw std::runtime_error("Failed to setup storage. Error: " + std::string(e.what())); } + db_page_size_ = get_page_size(); + db_file_size_ = 0; + page_count_statement_ = database_->prepare_statement("PRAGMA page_count;"); + // initialize only for READ_WRITE since the DB is already initialized if in APPEND. if (is_read_write(io_flag)) { db_schema_version_ = kDBSchemaVersion_; + std::lock_guard db_lock(db_read_write_mutex_); initialize(); + db_file_size_ = db_page_size_ * read_total_page_count_locked(); } else { db_schema_version_ = read_db_schema_version(); read_metadata(); @@ -240,7 +251,9 @@ void SqliteStorage::update_metadata(const rosbag2_storage::BagMetadata & metadat auto insert_metadata = database_->prepare_statement( "INSERT INTO metadata (metadata_version, metadata) VALUES (?, ?)"); insert_metadata->bind(metadata.version, serialized_metadata); + std::lock_guard db_lock(db_read_write_mutex_); insert_metadata->execute_and_reset(); + db_file_size_ = db_page_size_ * read_total_page_count_locked(); } } @@ -270,8 +283,9 @@ void SqliteStorage::commit_transaction() void SqliteStorage::write(std::shared_ptr message) { - std::lock_guard db_lock(database_write_mutex_); + std::lock_guard db_lock(db_read_write_mutex_); write_locked(message); + db_file_size_ = db_page_size_ * read_total_page_count_locked(); } void SqliteStorage::write_locked( @@ -314,7 +328,7 @@ void SqliteStorage::write_locked( void SqliteStorage::write( const std::vector> & messages) { - std::lock_guard db_lock(database_write_mutex_); + std::lock_guard db_lock(db_read_write_mutex_); if (!write_statement_) { prepare_for_writing(); } @@ -323,9 +337,14 @@ void SqliteStorage::write( for (auto & message : messages) { write_locked(message); + // Update db_file_size_ with the estimated data size written to the DB + // + 3 * sizeof(int64_t) This is for storing timestamp, topic_id and index for messages table + db_file_size_ += message->serialized_data->buffer_length + 3 * sizeof(int64_t); } commit_transaction(); + // Correct db_file_size_ by reading out the exact numbers from the DB + db_file_size_ = db_page_size_ * read_total_page_count_locked(); } bool SqliteStorage::set_read_order(const rosbag2_storage::ReadOrder & read_order) @@ -401,9 +420,14 @@ void SqliteStorage::get_all_message_definitions( uint64_t SqliteStorage::get_bagfile_size() const { - const auto bag_path = rcpputils::fs::path{get_relative_file_path()}; - - return bag_path.exists() ? bag_path.file_size() : 0u; + if (!database_ || !page_count_statement_) { + // Trying to call get_bagfile_size when SqliteStorage::open was not called or db + // failed to open. Fallback to the filesystem call. + const auto bag_path = rcpputils::fs::path{get_relative_file_path()}; + return bag_path.exists() ? bag_path.file_size() : 0u; + } else { + return db_file_size_; + } } void SqliteStorage::initialize() @@ -458,7 +482,7 @@ void SqliteStorage::create_topic( const rosbag2_storage::TopicMetadata & topic, const rosbag2_storage::MessageDefinition & message_definition) { - std::lock_guard db_lock(database_write_mutex_); + std::lock_guard db_lock(db_read_write_mutex_); if (topics_.find(topic.name) == std::end(topics_)) { auto insert_topic = database_->prepare_statement( @@ -494,11 +518,12 @@ void SqliteStorage::create_topic( topic_type_and_hash, static_cast(database_->get_last_insert_id())); } + db_file_size_ = db_page_size_ * read_total_page_count_locked(); } void SqliteStorage::remove_topic(const rosbag2_storage::TopicMetadata & topic) { - std::lock_guard db_lock(database_write_mutex_); + std::lock_guard db_lock(db_read_write_mutex_); if (topics_.find(topic.name) != std::end(topics_)) { auto delete_topic = database_->prepare_statement( @@ -507,6 +532,7 @@ void SqliteStorage::remove_topic(const rosbag2_storage::TopicMetadata & topic) delete_topic->execute_and_reset(); topics_.erase(topic.name); } + db_file_size_ = db_page_size_ * read_total_page_count_locked(); } void SqliteStorage::prepare_for_writing() @@ -852,6 +878,36 @@ int SqliteStorage::read_db_schema_version() return schema_version; } +uint64_t SqliteStorage::read_total_page_count_locked() const +{ + auto scope_exit_reset_statement = rcpputils::make_scope_exit( + [this]() { + page_count_statement_->reset(); + }); + auto page_count_query_result = page_count_statement_->execute_query(); + auto page_count_query_result_begin = page_count_query_result.begin(); + if (page_count_query_result_begin == page_count_query_result.end()) { + throw SqliteException{"Error. PRAGMA page_count return no result."}; + } + return std::get<0>(*page_count_query_result_begin); +} + +uint64_t SqliteStorage::get_page_size() const +{ + if (!database_) { + return 0; // Trying to call get_page_size when SqliteStorage::open was not called or db + // failed to open + } + SqliteStatement get_page_size_statement = database_->prepare_statement("PRAGMA page_size;"); + + auto page_size_query_result = get_page_size_statement->execute_query(); + auto page_size_query_result_begin = page_size_query_result.begin(); + if (page_size_query_result_begin == page_size_query_result.end()) { + throw SqliteException{"Error. PRAGMA page_size return no result."}; + } + return std::get<0>(*page_size_query_result_begin); +} + } // namespace rosbag2_storage_plugins #include "pluginlib/class_list_macros.hpp" // NOLINT diff --git a/rosbag2_tests/CMakeLists.txt b/rosbag2_tests/CMakeLists.txt index d856d80dbc..70673fde28 100644 --- a/rosbag2_tests/CMakeLists.txt +++ b/rosbag2_tests/CMakeLists.txt @@ -107,7 +107,6 @@ if(BUILD_TESTING) ) endif() - ament_add_gmock(test_rosbag2_cpp_api test/rosbag2_tests/test_rosbag2_cpp_api.cpp WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) @@ -133,6 +132,17 @@ if(BUILD_TESTING) ${test_msgs_TARGETS} ) endif() + + ament_add_gmock(test_rosbag2_storage_api + test/rosbag2_tests/test_rosbag2_storage_api.cpp) + if(TARGET test_rosbag2_storage_api) + target_link_libraries(test_rosbag2_storage_api + rosbag2_storage::rosbag2_storage + rosbag2_test_common::rosbag2_test_common + ${std_msgs_TARGETS} + ) + endif() + endif() ament_package() diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_storage_api.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_storage_api.cpp new file mode 100644 index 0000000000..f4fd7d5373 --- /dev/null +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_storage_api.cpp @@ -0,0 +1,167 @@ +// Copyright 2023, Apex.AI. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rosbag2_storage/storage_factory.hpp" +#include "rosbag2_storage/storage_options.hpp" +#include "rosbag2_test_common/memory_management.hpp" +#include "rosbag2_test_common/tested_storage_ids.hpp" +#include "rosbag2_test_common/temporary_directory_fixture.hpp" +#include "std_msgs/msg/string.hpp" + +using namespace std::chrono_literals; // NOLINT +using namespace ::testing; // NOLINT +using namespace rosbag2_test_common; // NOLINT + +class Rosbag2StorageAPITests : public rosbag2_test_common::ParametrizedTemporaryDirectoryFixture +{ +public: + Rosbag2StorageAPITests() + { + memory_management_ = std::make_unique(); + } + + void SetUp() override + { + auto bag_name = get_test_name() + "_" + GetParam(); + root_bag_path_ = std::filesystem::path(temporary_dir_path_) / bag_name; + + // Clean up potentially leftover bag files. + // There may be leftovers if the system reallocates a temp directory + // used by a previous test execution and the test did not have a clean exit. + std::filesystem::remove_all(root_bag_path_); + } + + void TearDown() override + { + std::filesystem::remove_all(root_bag_path_); + } + + static std::string get_test_name() + { + const auto * test_info = UnitTest::GetInstance()->current_test_info(); + std::string test_name = test_info->name(); + // Replace any slashes in the test name, since it is used in paths + std::replace(test_name.begin(), test_name.end(), '/', '_'); + return test_name; + } + + std::vector> + prepare_serialized_messages( + const std::vector & topics, const size_t num_msgs_per_topic = 5, + std::chrono::duration dt = 0.01s) const + { + std::vector> serialized_messages; + for (const auto & topic : topics) { + std::chrono::duration timestamp{dt}; + for (size_t msg_idx = 0; msg_idx < num_msgs_per_topic; msg_idx++) { + auto std_string_msg = std::make_shared(); + std::stringstream ss; + for (int i = 0; i < 100; i++) { + ss << topic << "_long_long_string_message_" << msg_idx + 1 << "_repeat_"; + } + std_string_msg->data = ss.str(); + + auto bag_message = std::make_shared(); + bag_message->serialized_data = memory_management_->serialize_message(std_string_msg); + bag_message->time_stamp = + std::chrono::duration_cast(timestamp).count(); + bag_message->topic_name = topic; + timestamp += dt; + serialized_messages.push_back(bag_message); + } + } + return serialized_messages; + } + + static void create_topics( + std::shared_ptr & rw_storage, + const std::vector & topics) + { + for (size_t topic_idx = 0; topic_idx < topics.size(); topic_idx++) { + const std::string type_description_hash = "type_hash_" + std::to_string(topic_idx); + const rosbag2_storage::MessageDefinition msg_definition = { + "std_msgs/msg/String", "ros2msg", "string data", type_description_hash}; + + rosbag2_storage::TopicMetadata topic_metadata = { + topics[topic_idx], + "std_msgs/msg/String", + "cdr", + {rclcpp::QoS(1)}, + type_description_hash + }; + rw_storage->create_topic(topic_metadata, msg_definition); + } + } + + std::filesystem::path root_bag_path_; + std::unique_ptr memory_management_; +}; + +TEST_P(Rosbag2StorageAPITests, get_bagfile_size_read_write_interface) +{ + const std::string FILE_EXTENSION = (GetParam() == "mcap") ? ".mcap" : ".db3"; + std::filesystem::path full_bagfile_path = root_bag_path_; + full_bagfile_path.replace_extension(FILE_EXTENSION); + + rosbag2_storage::StorageFactory factory{}; + rosbag2_storage::StorageOptions options{}; + options.uri = root_bag_path_.generic_string(); + options.storage_id = GetParam(); + + std::shared_ptr rw_storage = + factory.open_read_write(options); + + std::vector topics = {"topic1_topic1", "topic2_topic2"}; + auto serialized_messages = prepare_serialized_messages(topics, 500); + create_topics(rw_storage, topics); + + rw_storage->write(serialized_messages); + uint64_t storage_bagfile_size = rw_storage->get_bagfile_size(); + + size_t fs_bagfile_size = std::filesystem::file_size(full_bagfile_path); + auto tolerance = static_cast(fs_bagfile_size * 0.001); // tolerance = 0.1% + + size_t filesize_difference = + std::abs(static_cast(storage_bagfile_size) - static_cast(fs_bagfile_size)); + + EXPECT_LE(filesize_difference, tolerance) << " tolerance = " << tolerance << std::endl << + " filesize_difference = " << filesize_difference << std::endl << + " bagfile_size_from_storage = " << storage_bagfile_size; + + // Write messages one more time to make sure that storage_bagfile_size updating with each write + rw_storage->write(serialized_messages); + storage_bagfile_size = rw_storage->get_bagfile_size(); + + fs_bagfile_size = std::filesystem::file_size(full_bagfile_path); + tolerance = static_cast(fs_bagfile_size * 0.001); // tolerance = 0.1% + + filesize_difference = + std::abs(static_cast(storage_bagfile_size) - static_cast(fs_bagfile_size)); + + EXPECT_LE(filesize_difference, tolerance) << " tolerance = " << tolerance << std::endl << + " filesize_difference = " << filesize_difference << std::endl << + " bagfile_size_from_storage = " << storage_bagfile_size; +} + +INSTANTIATE_TEST_SUITE_P( + ParametrizedStorageAPITests, + Rosbag2StorageAPITests, + ValuesIn(rosbag2_test_common::kTestedStorageIDs) +);