Skip to content

Commit

Permalink
Improve performance in SqliteStorage::get_bagfile_size() (#1516)
Browse files Browse the repository at this point in the history
* Improve performance in SqliteStorage::get_bagfile_size()

- In SQLite3 DB file size is equal to the db page_count * page_size
- Use DB query to get page_count and page_size instead of filesystem
blocking call.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Address integration tests failure

- Don't use static objects for the SqliteStatement, it causes DB lock
and failure to close and open it again.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Protect get_bagfile_size() with database_write_mutex_

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Make get_bagfile_size() lock free to avoid locking double-buffer cache

- Moved db_file_size_ update to the functions which are writing to the
database

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Move test_storage_interfaces.cpp to the rosbag2_tests

- Move test_storage_interfaces.cpp to the rosbag2_tests due to the
cyclic dependencies between rosbag2_storage and storage plugins.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Rename test_storage_interfaces.cpp to the test_rosbag2_storage_api.cpp

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Address review comments

- Rename `database_write_mutex_` to the `db_read_write_mutex_`
- Add const qualifiers for get_page_size() and
read_total_page_count_locked() methods

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Fix Windows warnings in test_rosbag2_storage_api.cpp

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

---------

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
  • Loading branch information
MichaelOrlov authored Jan 12, 2024
1 parent 73b0772 commit f3d62df
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 14 deletions.
2 changes: 1 addition & 1 deletion rosbag2_storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,6 @@ if(BUILD_TESTING)
rosbag2_test_common::rosbag2_test_common
yaml-cpp
)
endif()
endif()

ament_package()
Original file line number Diff line number Diff line change
Expand Up @@ -122,22 +122,25 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
void activate_transaction();
void commit_transaction();
void write_locked(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message)
RCPPUTILS_TSA_REQUIRES(database_write_mutex_);
RCPPUTILS_TSA_REQUIRES(db_read_write_mutex_);
int get_last_rowid();
int read_db_schema_version();
uint64_t get_page_size() const;
uint64_t read_total_page_count_locked() const RCPPUTILS_TSA_GUARDED_BY(db_read_write_mutex_);

using ReadQueryResult = SqliteStatementWrapper::QueryResult<
std::shared_ptr<rcutils_uint8_array_t>, rcutils_time_point_value_t, std::string, int>;

std::shared_ptr<SqliteWrapper> database_ RCPPUTILS_TSA_GUARDED_BY(database_write_mutex_);
std::shared_ptr<SqliteWrapper> database_ RCPPUTILS_TSA_GUARDED_BY(db_read_write_mutex_);
SqliteStatement write_statement_ {};
SqliteStatement read_statement_ {};
SqliteStatement page_count_statement_ {};
ReadQueryResult message_result_ {nullptr};
ReadQueryResult::Iterator current_message_row_ {
nullptr, SqliteStatementWrapper::QueryResult<>::Iterator::POSITION_END};
std::unordered_map<std::string, int> topics_ RCPPUTILS_TSA_GUARDED_BY(database_write_mutex_);
std::unordered_map<std::string, int> topics_ RCPPUTILS_TSA_GUARDED_BY(db_read_write_mutex_);
std::unordered_map<std::string, int> msg_definitions_ RCPPUTILS_TSA_GUARDED_BY(
database_write_mutex_);
db_read_write_mutex_);
std::vector<rosbag2_storage::TopicMetadata> all_topics_and_types_;
std::string relative_path_;
std::atomic_bool active_transaction_ {false};
Expand All @@ -152,11 +155,13 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
// This mutex is necessary to protect:
// a) database access (this could also be done with FULLMUTEX), but see b)
// b) topics_ collection - since we could be writing and reading it at the same time
std::mutex database_write_mutex_;
std::mutex db_read_write_mutex_;

const int kDBSchemaVersion_ = 4;
int db_schema_version_ = -1; // Valid version number starting from 1
rosbag2_storage::BagMetadata metadata_{};
uint64_t db_page_size_ = 0;
std::atomic<uint64_t> db_file_size_{0};
};


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "rcpputils/env.hpp"
#include "rcpputils/filesystem_helper.hpp"
#include "rcpputils/scope_exit.hpp"

#include "rosbag2_storage/metadata_io.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"
Expand Down Expand Up @@ -181,6 +182,10 @@ void SqliteStorage::open(
const rosbag2_storage::StorageOptions & storage_options,
rosbag2_storage::storage_interfaces::IOFlag io_flag)
{
if (page_count_statement_) {
page_count_statement_->reset(); // Reset statement to avoid DB lock
page_count_statement_.reset();
}
storage_mode_ = io_flag;
const auto preset = parse_preset_profile(storage_options.storage_preset_profile);
auto pragmas = parse_pragmas(storage_options.storage_config_uri, io_flag);
Expand Down Expand Up @@ -212,10 +217,16 @@ void SqliteStorage::open(
throw std::runtime_error("Failed to setup storage. Error: " + std::string(e.what()));
}

db_page_size_ = get_page_size();
db_file_size_ = 0;
page_count_statement_ = database_->prepare_statement("PRAGMA page_count;");

// initialize only for READ_WRITE since the DB is already initialized if in APPEND.
if (is_read_write(io_flag)) {
db_schema_version_ = kDBSchemaVersion_;
std::lock_guard<std::mutex> db_lock(db_read_write_mutex_);
initialize();
db_file_size_ = db_page_size_ * read_total_page_count_locked();
} else {
db_schema_version_ = read_db_schema_version();
read_metadata();
Expand All @@ -240,7 +251,9 @@ void SqliteStorage::update_metadata(const rosbag2_storage::BagMetadata & metadat
auto insert_metadata = database_->prepare_statement(
"INSERT INTO metadata (metadata_version, metadata) VALUES (?, ?)");
insert_metadata->bind(metadata.version, serialized_metadata);
std::lock_guard<std::mutex> db_lock(db_read_write_mutex_);
insert_metadata->execute_and_reset();
db_file_size_ = db_page_size_ * read_total_page_count_locked();
}
}

Expand Down Expand Up @@ -270,8 +283,9 @@ void SqliteStorage::commit_transaction()

void SqliteStorage::write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message)
{
std::lock_guard<std::mutex> db_lock(database_write_mutex_);
std::lock_guard<std::mutex> db_lock(db_read_write_mutex_);
write_locked(message);
db_file_size_ = db_page_size_ * read_total_page_count_locked();
}

void SqliteStorage::write_locked(
Expand Down Expand Up @@ -314,7 +328,7 @@ void SqliteStorage::write_locked(
void SqliteStorage::write(
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & messages)
{
std::lock_guard<std::mutex> db_lock(database_write_mutex_);
std::lock_guard<std::mutex> db_lock(db_read_write_mutex_);
if (!write_statement_) {
prepare_for_writing();
}
Expand All @@ -323,9 +337,14 @@ void SqliteStorage::write(

for (auto & message : messages) {
write_locked(message);
// Update db_file_size_ with the estimated data size written to the DB
// + 3 * sizeof(int64_t) This is for storing timestamp, topic_id and index for messages table
db_file_size_ += message->serialized_data->buffer_length + 3 * sizeof(int64_t);
}

commit_transaction();
// Correct db_file_size_ by reading out the exact numbers from the DB
db_file_size_ = db_page_size_ * read_total_page_count_locked();
}

bool SqliteStorage::set_read_order(const rosbag2_storage::ReadOrder & read_order)
Expand Down Expand Up @@ -401,9 +420,14 @@ void SqliteStorage::get_all_message_definitions(

uint64_t SqliteStorage::get_bagfile_size() const
{
const auto bag_path = rcpputils::fs::path{get_relative_file_path()};

return bag_path.exists() ? bag_path.file_size() : 0u;
if (!database_ || !page_count_statement_) {
// Trying to call get_bagfile_size when SqliteStorage::open was not called or db
// failed to open. Fallback to the filesystem call.
const auto bag_path = rcpputils::fs::path{get_relative_file_path()};
return bag_path.exists() ? bag_path.file_size() : 0u;
} else {
return db_file_size_;
}
}

void SqliteStorage::initialize()
Expand Down Expand Up @@ -458,7 +482,7 @@ void SqliteStorage::create_topic(
const rosbag2_storage::TopicMetadata & topic,
const rosbag2_storage::MessageDefinition & message_definition)
{
std::lock_guard<std::mutex> db_lock(database_write_mutex_);
std::lock_guard<std::mutex> db_lock(db_read_write_mutex_);
if (topics_.find(topic.name) == std::end(topics_)) {
auto insert_topic =
database_->prepare_statement(
Expand Down Expand Up @@ -494,11 +518,12 @@ void SqliteStorage::create_topic(
topic_type_and_hash,
static_cast<int>(database_->get_last_insert_id()));
}
db_file_size_ = db_page_size_ * read_total_page_count_locked();
}

void SqliteStorage::remove_topic(const rosbag2_storage::TopicMetadata & topic)
{
std::lock_guard<std::mutex> db_lock(database_write_mutex_);
std::lock_guard<std::mutex> db_lock(db_read_write_mutex_);
if (topics_.find(topic.name) != std::end(topics_)) {
auto delete_topic =
database_->prepare_statement(
Expand All @@ -507,6 +532,7 @@ void SqliteStorage::remove_topic(const rosbag2_storage::TopicMetadata & topic)
delete_topic->execute_and_reset();
topics_.erase(topic.name);
}
db_file_size_ = db_page_size_ * read_total_page_count_locked();
}

void SqliteStorage::prepare_for_writing()
Expand Down Expand Up @@ -852,6 +878,36 @@ int SqliteStorage::read_db_schema_version()
return schema_version;
}

uint64_t SqliteStorage::read_total_page_count_locked() const
{
auto scope_exit_reset_statement = rcpputils::make_scope_exit(
[this]() {
page_count_statement_->reset();
});
auto page_count_query_result = page_count_statement_->execute_query<int>();
auto page_count_query_result_begin = page_count_query_result.begin();
if (page_count_query_result_begin == page_count_query_result.end()) {
throw SqliteException{"Error. PRAGMA page_count return no result."};
}
return std::get<0>(*page_count_query_result_begin);
}

uint64_t SqliteStorage::get_page_size() const
{
if (!database_) {
return 0; // Trying to call get_page_size when SqliteStorage::open was not called or db
// failed to open
}
SqliteStatement get_page_size_statement = database_->prepare_statement("PRAGMA page_size;");

auto page_size_query_result = get_page_size_statement->execute_query<int>();
auto page_size_query_result_begin = page_size_query_result.begin();
if (page_size_query_result_begin == page_size_query_result.end()) {
throw SqliteException{"Error. PRAGMA page_size return no result."};
}
return std::get<0>(*page_size_query_result_begin);
}

} // namespace rosbag2_storage_plugins

#include "pluginlib/class_list_macros.hpp" // NOLINT
Expand Down
12 changes: 11 additions & 1 deletion rosbag2_tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ if(BUILD_TESTING)
)
endif()


ament_add_gmock(test_rosbag2_cpp_api
test/rosbag2_tests/test_rosbag2_cpp_api.cpp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
Expand All @@ -133,6 +132,17 @@ if(BUILD_TESTING)
${test_msgs_TARGETS}
)
endif()

ament_add_gmock(test_rosbag2_storage_api
test/rosbag2_tests/test_rosbag2_storage_api.cpp)
if(TARGET test_rosbag2_storage_api)
target_link_libraries(test_rosbag2_storage_api
rosbag2_storage::rosbag2_storage
rosbag2_test_common::rosbag2_test_common
${std_msgs_TARGETS}
)
endif()

endif()

ament_package()
Loading

0 comments on commit f3d62df

Please sign in to comment.