Skip to content

Commit

Permalink
Merge pull request #25 from epsilla-cloud/benchmark
Browse files Browse the repository at this point in the history
Add a flag to control enable/disable wal
  • Loading branch information
ricki-epsilla authored Aug 3, 2023
2 parents bcb8a6c + fc212c7 commit 70817c5
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 24 deletions.
6 changes: 6 additions & 0 deletions engine/db/db_mvp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ class DBMVP {
std::shared_ptr<TableMVP> GetTable(const std::string& table_name);
Status Rebuild();

void SetWALEnabled(bool enabled) {
for (auto table : tables_) {
table->SetWALEnabled(enabled);
}
}

public:
std::string db_catalog_path_; // The path to the db catalog.
// TODO: change to concurrent version.
Expand Down
3 changes: 2 additions & 1 deletion engine/db/db_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ DBServer::~DBServer() {
}
}

Status DBServer::LoadDB(const std::string& db_name, std::string& db_catalog_path, int64_t init_table_scale) {
Status DBServer::LoadDB(const std::string& db_name, std::string& db_catalog_path, int64_t init_table_scale, bool wal_enabled) {
// Load database meta
vectordb::Status status = meta_->LoadDatabase(db_catalog_path, db_name);
if (!status.ok()) {
Expand All @@ -38,6 +38,7 @@ Status DBServer::LoadDB(const std::string& db_name, std::string& db_catalog_path
}

auto db = std::make_shared<DBMVP>(db_schema, init_table_scale);
db->SetWALEnabled(wal_enabled);
dbs_.push_back(db);
db_name_to_id_map_[db_schema.name_] = dbs_.size() - 1;
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion engine/db/db_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class DBServer {

~DBServer();

Status LoadDB(const std::string& db_name, std::string& db_catalog_path, int64_t init_table_scale);
Status LoadDB(const std::string& db_name, std::string& db_catalog_path, int64_t init_table_scale, bool wal_enabled);
Status UnloadDB(const std::string& db_name);
Status CreateTable(const std::string& db_name, meta::TableSchema& table_schema);
Status DropTable(const std::string& db_name, const std::string& table_name);
Expand Down
4 changes: 4 additions & 0 deletions engine/db/table_mvp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class TableMVP {
vectordb::Json& result
);

void SetWALEnabled(bool enabled) {
wal_->SetEnabled(enabled);
}

~TableMVP();

public:
Expand Down
46 changes: 27 additions & 19 deletions engine/db/wal/write_ahead_log.hpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#include <omp.h>
#include <unistd.h>

#include <boost/filesystem.hpp>
#include <chrono>
#include <cstdio>
#include <cstring>
#include <ctime>
#include <omp.h>

#include "db/catalog/meta_types.hpp"
#include "db/table_segment_mvp.hpp"
Expand All @@ -19,7 +19,6 @@ namespace engine {
const std::chrono::seconds ROTATION_INTERVAL(600);
const std::chrono::seconds LOG_RETENTION(3600 * 24 * 7);


enum LogEntryType {
INSERT = 1,
DELETE = 2
Expand Down Expand Up @@ -53,6 +52,11 @@ class WriteAheadLog {
}

int64_t WriteEntry(LogEntryType type, const std::string& entry) {
// Skip WAL for realtime scenario
if (!enabled_) {
return global_counter_.Get();
}

auto now = std::chrono::system_clock::now();
if (now - last_rotation_time_ > ROTATION_INTERVAL) {
RotateFile();
Expand Down Expand Up @@ -118,25 +122,28 @@ class WriteAheadLog {
GetSortedLogFiles(files);

for (const auto& file : files) {
// Extract the timestamp from the filename
auto filename = file.filename().string();
auto pos = filename.find_last_of('.');
auto timestamp_str = filename.substr(0, pos);
auto timestamp = std::stoll(timestamp_str);

// Convert now to seconds since epoch
auto now_in_seconds = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();

// If the file is older than LOG_RETENTION, delete it
if (now_in_seconds - timestamp > retention_period_seconds) {
server::CommonUtil::RemoveFile(file.string());
} else {
// Since the files are sorted, we can break as soon as we encounter a file that's not old enough to be deleted
break;
}
// Extract the timestamp from the filename
auto filename = file.filename().string();
auto pos = filename.find_last_of('.');
auto timestamp_str = filename.substr(0, pos);
auto timestamp = std::stoll(timestamp_str);

// Convert now to seconds since epoch
auto now_in_seconds = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();

// If the file is older than LOG_RETENTION, delete it
if (now_in_seconds - timestamp > retention_period_seconds) {
server::CommonUtil::RemoveFile(file.string());
} else {
// Since the files are sorted, we can break as soon as we encounter a file that's not old enough to be deleted
break;
}
}
}
}

void SetEnabled(bool enabled) {
enabled_ = enabled;
}

private:
void ApplyEntry(meta::TableSchema& table_schema, std::shared_ptr<TableSegmentMVP> segment, int64_t global_id, LogEntryType& type, std::string& content) {
Expand Down Expand Up @@ -213,6 +220,7 @@ class WriteAheadLog {
std::chrono::time_point<std::chrono::system_clock> last_rotation_time_;
FILE* file_ = nullptr;
AtomicCounter global_counter_;
bool enabled_;
};
} // namespace engine
} // namespace vectordb
8 changes: 5 additions & 3 deletions engine/server/web_server/web_controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ class WebController : public oatpp::web::server::api::ApiController {
std::string db_name = parsedBody.GetString("name");
int64_t init_table_scale = InitTableScale;
if (parsedBody.HasMember("vectorScale")) {
std::cout << "DB loaded with customized vector scale limit." << std::endl;
init_table_scale = parsedBody.GetInt("vectorScale");
std::cout << "Vector scale limit: " << init_table_scale << std::endl;
}
vectordb::Status status = db_server->LoadDB(db_name, db_path, init_table_scale);
bool wal_enabled = true;
if (parsedBody.HasMember("walEnabled")) {
wal_enabled = parsedBody.GetBool("walEnabled");
}
vectordb::Status status = db_server->LoadDB(db_name, db_path, init_table_scale, wal_enabled);

auto dto = StatusDto::createShared();
if (!status.ok()) {
Expand Down

0 comments on commit 70817c5

Please sign in to comment.