Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor kvrocks2redis via rocksdb secondary instance #1963

Merged
merged 16 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/common/db_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ StatusOr<std::unique_ptr<T>> WrapOutPtrToUnique(Args&&... args) {
return rocksdb::DB::OpenForReadOnly(db_options, dbname, column_families, handles, dbptr);
}

[[nodiscard]] inline rocksdb::Status DBOpenForSecondaryInstance(
const rocksdb::DBOptions& db_options, const std::string& dbname, const std::string& secondary_path,
const std::vector<rocksdb::ColumnFamilyDescriptor>& column_families,
std::vector<rocksdb::ColumnFamilyHandle*>* handles, rocksdb::DB** dbptr) {
return rocksdb::DB::OpenAsSecondary(db_options, dbname, secondary_path, column_families, handles, dbptr);
}

} // namespace details

inline StatusOr<std::unique_ptr<rocksdb::DB>> DBOpen(const rocksdb::Options& options, const std::string& dbname) {
Expand Down Expand Up @@ -95,6 +102,19 @@ inline StatusOr<std::unique_ptr<rocksdb::DB>> DBOpenForReadOnly(
Status::DBOpenErr>(db_options, dbname, column_families, handles);
}

inline StatusOr<std::unique_ptr<rocksdb::DB>> DBOpenAsSecondaryInstance(
const rocksdb::DBOptions& db_options, const std::string& dbname, const std::string& secondary_path,
const std::vector<rocksdb::ColumnFamilyDescriptor>& column_families,
std::vector<rocksdb::ColumnFamilyHandle*>* handles) {
return details::WrapOutPtrToUnique<
rocksdb::DB,
static_cast<rocksdb::Status (*)(const rocksdb::DBOptions&, const std::string&, const std::string&,
const std::vector<rocksdb::ColumnFamilyDescriptor>&,
std::vector<rocksdb::ColumnFamilyHandle*>*, rocksdb::DB**)>(
details::DBOpenForSecondaryInstance),
Status::DBOpenErr>(db_options, dbname, secondary_path, column_families, handles);
}

inline StatusOr<std::unique_ptr<rocksdb::BackupEngine>> BackupEngineOpen(rocksdb::Env* db_env,
const rocksdb::BackupEngineOptions& options) {
return details::WrapOutPtrToUnique<
Expand Down
31 changes: 23 additions & 8 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ Status Storage::CreateColumnFamilies(const rocksdb::Options &options) {
return Status::OK();
}

Status Storage::Open(bool read_only) {
Status Storage::Open(DBOpenMode mode) {
auto guard = WriteLockGuard();
db_closing_ = false;

Expand All @@ -256,7 +256,7 @@ Status Storage::Open(bool read_only) {
}

rocksdb::Options options = InitRocksDBOptions();
if (!read_only) {
if (mode == kDBOpenModeDefault) {
if (auto s = CreateColumnFamilies(options); !s.IsOK()) {
return s.Prefixed("failed to create column families");
}
Expand Down Expand Up @@ -322,17 +322,32 @@ Status Storage::Open(bool read_only) {
if (!s.ok()) return {Status::NotOK, s.ToString()};

auto start = std::chrono::high_resolution_clock::now();
auto dbs = read_only ? util::DBOpenForReadOnly(options, config_->db_dir, column_families, &cf_handles_)
: util::DBOpen(options, config_->db_dir, column_families, &cf_handles_);
switch (mode) {
case DBOpenMode::kDBOpenModeDefault: {
db_ = GET_OR_RET(util::DBOpen(options, config_->db_dir, column_families, &cf_handles_));
break;
}
case DBOpenMode::kDBOpenModeForReadOnly: {
db_ = GET_OR_RET(util::DBOpenForReadOnly(options, config_->db_dir, column_families, &cf_handles_));
break;
}
case DBOpenMode::kDBOpenModeAsSecondaryInstance: {
db_ = GET_OR_RET(
util::DBOpenAsSecondaryInstance(options, config_->db_dir, config_->dir, column_families, &cf_handles_));
break;
}
default:
__builtin_unreachable();
}
auto end = std::chrono::high_resolution_clock::now();
int64_t duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
if (!s.ok()) {

if (!db_) {
LOG(INFO) << "[storage] Failed to load the data from disk: " << duration << " ms";
return {Status::DBOpenErr, s.ToString()};
return {Status::DBOpenErr};
}

db_ = std::move(*dbs);
LOG(INFO) << "[storage] Success to load the data from disk: " << duration << " ms";

return Status::OK();
}

Expand Down
9 changes: 8 additions & 1 deletion src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include <atomic>
#include <cinttypes>
#include <cstddef>
#include <memory>
#include <shared_mutex>
#include <string>
Expand All @@ -51,6 +52,12 @@ enum ColumnFamilyID {
kColumnFamilyIDStream,
};

enum DBOpenMode {
kDBOpenModeDefault,
kDBOpenModeForReadOnly,
kDBOpenModeAsSecondaryInstance,
};

namespace engine {

constexpr const char *kPubSubColumnFamilyName = "pubsub";
Expand Down Expand Up @@ -86,7 +93,7 @@ class Storage {
~Storage();

void SetWriteOptions(const Config::RocksDB::WriteOptions &config);
Status Open(bool read_only = false);
Status Open(DBOpenMode mode = kDBOpenModeDefault);
void CloseDB();
void EmptyDB();
rocksdb::BlockBasedTableOptions InitTableOptions();
Expand Down
8 changes: 0 additions & 8 deletions utils/kvrocks2redis/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,6 @@ Status Config::parseConfigFromString(const std::string &input) {
}
} else if (size == 1 && key == "pidfile") {
pidfile = args[0];
} else if (size >= 2 && key == "kvrocks") {
kvrocks_host = args[0];
// In new versions, we don't use extra port to implement replication
kvrocks_port = GET_OR_RET(ParseInt<std::uint16_t>(args[1]).Prefixed("kvrocks port number"));

if (size == 3) {
kvrocks_auth = args[2];
}
} else if (size == 1 && key == "cluster-enable") {
// Renamed to cluster-enabled, keeping the old one for compatibility.
cluster_enabled = GET_OR_RET(yesnotoi(args[0]).Prefixed("key 'cluster-enable'"));
Expand Down
3 changes: 0 additions & 3 deletions utils/kvrocks2redis/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ struct Config {
std::string next_offset_file_name = "last_next_offset.txt";
std::string next_seq_file_path = output_dir + "/last_next_seq.txt";

std::string kvrocks_auth;
std::string kvrocks_host;
int kvrocks_port = 0;
std::map<std::string, RedisServer> tokens;
bool cluster_enabled = false;

Expand Down
4 changes: 0 additions & 4 deletions utils/kvrocks2redis/kvrocks2redis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ data-dir ./data
# Default: ./
output-dir ./

# Sync kvrocks node. Use the node's Psync command to get the newest wal raw write_batch.
#
# kvrocks <kvrocks_ip> <kvrocks_port> [<kvrocks_auth>]
kvrocks 127.0.0.1 6666

# Enable cluster mode.
#
Expand Down
2 changes: 1 addition & 1 deletion utils/kvrocks2redis/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ int main(int argc, char *argv[]) {
kvrocks_config.slot_id_encoded = config.cluster_enabled;

engine::Storage storage(&kvrocks_config);
s = storage.Open(true);
s = storage.Open(kDBOpenModeAsSecondaryInstance);
if (!s.IsOK()) {
LOG(ERROR) << "Failed to open Kvrocks storage: " << s.Msg();
exit(1);
Expand Down
8 changes: 3 additions & 5 deletions utils/kvrocks2redis/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@

Status Parser::ParseFullDB() {
rocksdb::DB *db = storage_->GetDB();
if (!latest_snapshot_) latest_snapshot_ = std::make_unique<LatestSnapShot>(db);
rocksdb::ColumnFamilyHandle *metadata_cf_handle = storage_->GetCFHandle(engine::kMetadataColumnFamilyName);

// Due to RSI(Rocksdb Secondary Instance) not supporting "Snapshots based read", we don't need to set the snapshot
// parameter. However, until we proactively invoke TryCatchUpWithPrimary, this replica is read-only, which can be
// considered as a snapshot.
rocksdb::ReadOptions read_options;
read_options.snapshot = latest_snapshot_->GetSnapShot();
read_options.fill_cache = false;
std::unique_ptr<rocksdb::Iterator> iter(db->NewIterator(read_options, metadata_cf_handle));

for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Metadata metadata(kRedisNone);
auto ds = metadata.Decode(iter->value());
Expand Down Expand Up @@ -91,7 +90,6 @@ Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) {
std::string next_version_prefix_key = InternalKey(ns_key, "", metadata.version + 1, slot_id_encoded_).Encode();

rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = latest_snapshot_->GetSnapShot();
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;

Expand Down
20 changes: 1 addition & 19 deletions utils/kvrocks2redis/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,10 @@
#include "storage/storage.h"
#include "writer.h"

class LatestSnapShot {
public:
explicit LatestSnapShot(rocksdb::DB *db) : db_(db), snapshot_(db_->GetSnapshot()) {}
~LatestSnapShot() { db_->ReleaseSnapshot(snapshot_); }

LatestSnapShot(const LatestSnapShot &) = delete;
LatestSnapShot &operator=(const LatestSnapShot &) = delete;

const rocksdb::Snapshot *GetSnapShot() { return snapshot_; }

private:
rocksdb::DB *db_ = nullptr;
const rocksdb::Snapshot *snapshot_ = nullptr;
};

class Parser {
public:
explicit Parser(engine::Storage *storage, Writer *writer)
: storage_(storage), writer_(writer), slot_id_encoded_(storage_->IsSlotIdEncoded()) {
latest_snapshot_ = std::make_unique<LatestSnapShot>(storage->GetDB());
}
: storage_(storage), writer_(writer), slot_id_encoded_(storage_->IsSlotIdEncoded()) {}
~Parser() = default;

Status ParseFullDB();
Expand All @@ -62,7 +45,6 @@ class Parser {
protected:
engine::Storage *storage_ = nullptr;
Writer *writer_ = nullptr;
std::unique_ptr<LatestSnapShot> latest_snapshot_;
bool slot_id_encoded_ = false;

Status parseSimpleKV(const Slice &ns_key, const Slice &value, uint64_t expire);
Expand Down
Loading