Skip to content

Commit

Permalink
Store db schema version and ROS_DISTRO name in db3 files (#1156)
Browse files Browse the repository at this point in the history
* Add new tables for `schema` and `metadata`

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Add `SqliteWrapper::table_exists(table_name)`

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Add `get_db_schema_versio()` and `get_recorded_ros_distro()`

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Add unit tests for checking new and old db schema versions

Added following unit tests:
- `get_db_schema_version_returns_correct_value`
- `get_recorded_ros_distro_returns_correct_value`
- `check_backward_compatibility_with_schema_version_2`

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
  • Loading branch information
MichaelOrlov authored Nov 22, 2022
1 parent 1e4efa4 commit 9498426
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
*/
SqliteWrapper & get_sqlite_database_wrapper();

int get_db_schema_version() const;
std::string get_recorded_ros_distro() const;

enum class PresetProfile
{
Resilient,
Expand All @@ -113,6 +116,7 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
void write_locked(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message)
RCPPUTILS_TSA_REQUIRES(database_write_mutex_);
int get_last_rowid();
int read_db_schema_version();

using ReadQueryResult = SqliteStatementWrapper::QueryResult<
std::shared_ptr<rcutils_uint8_array_t>, rcutils_time_point_value_t, std::string, int>;
Expand All @@ -137,6 +141,9 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
// a) database access (this could also be done with FULLMUTEX), but see b)
// b) topics_ collection - since we could be writing and reading it at the same time
std::mutex database_write_mutex_;

const int kDBSchemaVersion_ = 3;
int db_schema_version_ = -1; // Valid version number starting from 1
};


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteWrapper
SqliteWrapper();
~SqliteWrapper();

bool table_exists(const std::string & table_name);
bool field_exists(const std::string & table_name, const std::string & field_name);
SqliteStatement prepare_statement(const std::string & query);
std::string query_pragma_value(const std::string & key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <utility>
#include <vector>

#include "rcpputils/env.hpp"
#include "rcpputils/filesystem_helper.hpp"

#include "rosbag2_storage/metadata_io.hpp"
Expand Down Expand Up @@ -212,7 +213,10 @@ void SqliteStorage::open(

// initialize only for READ_WRITE since the DB is already initialized if in APPEND.
if (is_read_write(io_flag)) {
db_schema_version_ = kDBSchemaVersion_;
initialize();
} else {
db_schema_version_ = read_db_schema_version();
}

// Reset the read and write statements in case the database changed.
Expand Down Expand Up @@ -368,21 +372,41 @@ uint64_t SqliteStorage::get_bagfile_size() const

void SqliteStorage::initialize()
{
std::string create_stmt = "CREATE TABLE topics(" \
std::string create_stmt = "CREATE TABLE schema(" \
"schema_version INTEGER PRIMARY KEY," \
"ros_distro TEXT NOT NULL);";
database_->prepare_statement(create_stmt)->execute_and_reset();

create_stmt = "CREATE TABLE metadata(" \
"id INTEGER PRIMARY KEY," \
"metadata_version INTEGER NOT NULL," \
"metadata TEXT NOT NULL);";
database_->prepare_statement(create_stmt)->execute_and_reset();

create_stmt = "CREATE TABLE topics(" \
"id INTEGER PRIMARY KEY," \
"name TEXT NOT NULL," \
"type TEXT NOT NULL," \
"serialization_format TEXT NOT NULL," \
"offered_qos_profiles TEXT NOT NULL);";
database_->prepare_statement(create_stmt)->execute_and_reset();

create_stmt = "CREATE TABLE messages(" \
"id INTEGER PRIMARY KEY," \
"topic_id INTEGER NOT NULL," \
"timestamp INTEGER NOT NULL, " \
"data BLOB NOT NULL);";
database_->prepare_statement(create_stmt)->execute_and_reset();

create_stmt = "CREATE INDEX timestamp_idx ON messages (timestamp ASC);";
database_->prepare_statement(create_stmt)->execute_and_reset();

std::string ros_distro = rcpputils::get_env_var("ROS_DISTRO");
auto insert_db_schema =
database_->prepare_statement(
"INSERT INTO schema (schema_version, ros_distro) VALUES (?, ?)");
insert_db_schema->bind(kDBSchemaVersion_, ros_distro);
insert_db_schema->execute_and_reset();
}

void SqliteStorage::create_topic(const rosbag2_storage::TopicMetadata & topic)
Expand Down Expand Up @@ -619,13 +643,49 @@ SqliteWrapper & SqliteStorage::get_sqlite_database_wrapper()
return *database_;
}

int SqliteStorage::get_db_schema_version() const
{
return db_schema_version_;
}

std::string SqliteStorage::get_recorded_ros_distro() const
{
std::string ros_distro;
if (db_schema_version_ >= 3 && database_->table_exists("schema")) {
// Read schema version
auto statement = database_->prepare_statement("SELECT ros_distro from schema;");
auto query_results = statement->execute_query<std::string>();
ros_distro = std::get<0>(*query_results.begin());
}
return ros_distro;
}

int SqliteStorage::get_last_rowid()
{
auto statement = database_->prepare_statement("SELECT max(rowid) from messages;");
auto query_results = statement->execute_query<int>();
return std::get<0>(*query_results.begin());
}

int SqliteStorage::read_db_schema_version()
{
int schema_version = -1;
if (database_->table_exists("schema")) {
// Read schema version
auto statement = database_->prepare_statement("SELECT schema_version from schema;");
auto query_results = statement->execute_query<int>();
schema_version = std::get<0>(*query_results.begin());
} else {
if (database_->field_exists("topics", "offered_qos_profiles")) {
schema_version = 2;
} else {
schema_version = 1;
}
}

return schema_version;
}

} // namespace rosbag2_storage_plugins

#include "pluginlib/class_list_macros.hpp" // NOLINT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ std::string SqliteWrapper::query_pragma_value(const std::string & key)
return std::get<0>(pragma_value);
}

bool SqliteWrapper::table_exists(const std::string & table_name)
{
auto query =
"SELECT count(*) FROM sqlite_master WHERE type='table' AND name='" + table_name + "';";
auto query_result = prepare_statement(query)->execute_query<int>();
return std::get<0>(*query_result.begin());
}

bool SqliteWrapper::field_exists(const std::string & table_name, const std::string & field_name)
{
auto query = "SELECT INSTR(sql, '" + field_name + "') FROM sqlite_master WHERE type='table' AND "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,50 @@ class StorageTestFixture : public TemporaryDirectoryFixture
}
}

void create_new_db3_file_with_schema_version_2()
{
auto db_file = (rcpputils::fs::path(temporary_dir_path_) / "rosbag").string();
std::string relative_path = db_file + ".db3";

// READ_WRITE requires the DB to not exist.
if (rcpputils::fs::path(relative_path).exists()) {
throw std::runtime_error(
"Failed to create bag: File '" + relative_path + "' already exists!");
}
using rosbag2_storage_plugins::SqliteWrapper;
using rosbag2_storage_plugins::SqliteException;
std::unordered_map<std::string, std::string> pragmas;

std::shared_ptr<SqliteWrapper> database;
try {
database = std::make_unique<SqliteWrapper>(
relative_path,
rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE,
std::move(pragmas));
} catch (const SqliteException & e) {
throw std::runtime_error("Failed to setup storage. Error: " + std::string(e.what()));
}

// Init database
std::string create_stmt = "CREATE TABLE topics(" \
"id INTEGER PRIMARY KEY," \
"name TEXT NOT NULL," \
"type TEXT NOT NULL," \
"serialization_format TEXT NOT NULL," \
"offered_qos_profiles TEXT NOT NULL);";
database->prepare_statement(create_stmt)->execute_and_reset();

create_stmt = "CREATE TABLE messages(" \
"id INTEGER PRIMARY KEY," \
"topic_id INTEGER NOT NULL," \
"timestamp INTEGER NOT NULL, " \
"data BLOB NOT NULL);";
database->prepare_statement(create_stmt)->execute_and_reset();

create_stmt = "CREATE INDEX timestamp_idx ON messages (timestamp ASC);";
database->prepare_statement(create_stmt)->execute_and_reset();
}

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>>
read_all_messages_from_sqlite()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <utility>
#include <vector>

#include "rcpputils/env.hpp"

#include "rcpputils/filesystem_helper.hpp"

#include "rcutils/snprintf.h"
Expand Down Expand Up @@ -293,6 +295,46 @@ TEST_F(StorageTestFixture, messages_readable_for_prefoxy_db_schema) {
}
}

TEST_F(StorageTestFixture, get_db_schema_version_returns_correct_value) {
auto writable_storage = std::make_shared<rosbag2_storage_plugins::SqliteStorage>();
EXPECT_EQ(writable_storage->get_db_schema_version(), -1);

auto db_file = (rcpputils::fs::path(temporary_dir_path_) / "rosbag").string();
writable_storage->open({db_file, plugin_id_});

EXPECT_GE(writable_storage->get_db_schema_version(), 3);
}

TEST_F(StorageTestFixture, get_recorded_ros_distro_returns_correct_value) {
auto writable_storage = std::make_shared<rosbag2_storage_plugins::SqliteStorage>();
EXPECT_TRUE(writable_storage->get_recorded_ros_distro().empty());

auto db_file = (rcpputils::fs::path(temporary_dir_path_) / "rosbag").string();
writable_storage->open({db_file, plugin_id_});

std::string current_ros_distro = rcpputils::get_env_var("ROS_DISTRO");
EXPECT_EQ(writable_storage->get_recorded_ros_distro(), current_ros_distro);
}

TEST_F(StorageTestFixture, check_backward_compatibility_with_schema_version_2) {
create_new_db3_file_with_schema_version_2();
std::unique_ptr<rosbag2_storage_plugins::SqliteStorage> readable_storage =
std::make_unique<rosbag2_storage_plugins::SqliteStorage>();

EXPECT_EQ(readable_storage->get_db_schema_version(), -1);
EXPECT_TRUE(readable_storage->get_recorded_ros_distro().empty());

auto db_file = (rcpputils::fs::path(temporary_dir_path_) / "rosbag.db3").string();

readable_storage->open(
{db_file, plugin_id_},
rosbag2_storage::storage_interfaces::IOFlag::READ_ONLY);
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> read_messages;

EXPECT_EQ(readable_storage->get_db_schema_version(), 2);
EXPECT_TRUE(readable_storage->get_recorded_ros_distro().empty());
}

TEST_F(StorageTestFixture, get_metadata_returns_correct_struct_if_no_messages) {
write_messages_to_sqlite({});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,10 @@ TEST_F(SqliteWrapperTestFixture, field_exists) {
EXPECT_THROW(
db_.field_exists("non_existent_table", "data"), rosbag2_storage_plugins::SqliteException);
}

TEST_F(SqliteWrapperTestFixture, table_exists) {
db_.prepare_statement("CREATE TABLE test_table (timestamp INTEGER, data BLOB);")
->execute_and_reset();
EXPECT_TRUE(db_.table_exists("test_table"));
EXPECT_FALSE(db_.table_exists("non_existent_table"));
}

0 comments on commit 9498426

Please sign in to comment.