diff --git a/src/common/db_util.h b/src/common/db_util.h index a7379c614b5..8df34daaa50 100644 --- a/src/common/db_util.h +++ b/src/common/db_util.h @@ -62,6 +62,13 @@ StatusOr> WrapOutPtrToUnique(Args&&... args) { return rocksdb::DB::OpenForReadOnly(db_options, dbname, column_families, handles, dbptr); } +[[nodiscard]] inline rocksdb::Status DBOpenForSecondaryInstance( + const rocksdb::DBOptions& db_options, const std::string& dbname, const std::string& secondary_path, + const std::vector& column_families, + std::vector* handles, rocksdb::DB** dbptr) { + return rocksdb::DB::OpenAsSecondary(db_options, dbname, secondary_path, column_families, handles, dbptr); +} + } // namespace details inline StatusOr> DBOpen(const rocksdb::Options& options, const std::string& dbname) { @@ -95,6 +102,19 @@ inline StatusOr> DBOpenForReadOnly( Status::DBOpenErr>(db_options, dbname, column_families, handles); } +inline StatusOr> DBOpenAsSecondaryInstance( + const rocksdb::DBOptions& db_options, const std::string& dbname, const std::string& secondary_path, + const std::vector& column_families, + std::vector* handles) { + return details::WrapOutPtrToUnique< + rocksdb::DB, + static_cast&, + std::vector*, rocksdb::DB**)>( + details::DBOpenForSecondaryInstance), + Status::DBOpenErr>(db_options, dbname, secondary_path, column_families, handles); +} + inline StatusOr> BackupEngineOpen(rocksdb::Env* db_env, const rocksdb::BackupEngineOptions& options) { return details::WrapOutPtrToUnique< diff --git a/src/storage/storage.cc b/src/storage/storage.cc index c176db1b137..c59be7077a6 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -237,7 +237,7 @@ Status Storage::CreateColumnFamilies(const rocksdb::Options &options) { return Status::OK(); } -Status Storage::Open(bool read_only) { +Status Storage::Open(DBOpenMode mode) { auto guard = WriteLockGuard(); db_closing_ = false; @@ -250,7 +250,7 @@ Status Storage::Open(bool read_only) { } rocksdb::Options options = InitRocksDBOptions(); - if (!read_only) { + if (mode == kDBOpenModeDefault) { if (auto s = CreateColumnFamilies(options); !s.IsOK()) { return s.Prefixed("failed to create column families"); } @@ -316,17 +316,32 @@ Status Storage::Open(bool read_only) { if (!s.ok()) return {Status::NotOK, s.ToString()}; auto start = std::chrono::high_resolution_clock::now(); - auto dbs = read_only ? util::DBOpenForReadOnly(options, config_->db_dir, column_families, &cf_handles_) - : util::DBOpen(options, config_->db_dir, column_families, &cf_handles_); + switch (mode) { + case DBOpenMode::kDBOpenModeDefault: { + db_ = GET_OR_RET(util::DBOpen(options, config_->db_dir, column_families, &cf_handles_)); + break; + } + case DBOpenMode::kDBOpenModeForReadOnly: { + db_ = GET_OR_RET(util::DBOpenForReadOnly(options, config_->db_dir, column_families, &cf_handles_)); + break; + } + case DBOpenMode::kDBOpenModeAsSecondaryInstance: { + db_ = GET_OR_RET( + util::DBOpenAsSecondaryInstance(options, config_->db_dir, config_->dir, column_families, &cf_handles_)); + break; + } + default: + __builtin_unreachable(); + } auto end = std::chrono::high_resolution_clock::now(); int64_t duration = std::chrono::duration_cast(end - start).count(); - if (!s.ok()) { + + if (!db_) { LOG(INFO) << "[storage] Failed to load the data from disk: " << duration << " ms"; - return {Status::DBOpenErr, s.ToString()}; + return {Status::DBOpenErr}; } - - db_ = std::move(*dbs); LOG(INFO) << "[storage] Success to load the data from disk: " << duration << " ms"; + return Status::OK(); } diff --git a/src/storage/storage.h b/src/storage/storage.h index b0de2dd0015..570ecf6cc87 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -29,6 +29,7 @@ #include #include +#include #include #include #include @@ -51,6 +52,12 @@ enum ColumnFamilyID { kColumnFamilyIDStream, }; +enum DBOpenMode { + kDBOpenModeDefault, + kDBOpenModeForReadOnly, + kDBOpenModeAsSecondaryInstance, +}; + namespace engine { constexpr const char *kPubSubColumnFamilyName = "pubsub"; @@ -100,7 +107,7 @@ class Storage { ~Storage(); void SetWriteOptions(const Config::RocksDB::WriteOptions &config); - Status Open(bool read_only = false); + Status Open(DBOpenMode mode = kDBOpenModeDefault); void CloseDB(); void EmptyDB(); rocksdb::BlockBasedTableOptions InitTableOptions(); diff --git a/utils/kvrocks2redis/config.cc b/utils/kvrocks2redis/config.cc index 8e48f8fff70..ac6f24eddae 100644 --- a/utils/kvrocks2redis/config.cc +++ b/utils/kvrocks2redis/config.cc @@ -87,14 +87,6 @@ Status Config::parseConfigFromString(const std::string &input) { } } else if (size == 1 && key == "pidfile") { pidfile = args[0]; - } else if (size >= 2 && key == "kvrocks") { - kvrocks_host = args[0]; - // In new versions, we don't use extra port to implement replication - kvrocks_port = GET_OR_RET(ParseInt(args[1]).Prefixed("kvrocks port number")); - - if (size == 3) { - kvrocks_auth = args[2]; - } } else if (size == 1 && key == "cluster-enable") { // Renamed to cluster-enabled, keeping the old one for compatibility. cluster_enabled = GET_OR_RET(yesnotoi(args[0]).Prefixed("key 'cluster-enable'")); diff --git a/utils/kvrocks2redis/config.h b/utils/kvrocks2redis/config.h index e67cae194f4..a9a4bf3bc6a 100644 --- a/utils/kvrocks2redis/config.h +++ b/utils/kvrocks2redis/config.h @@ -48,9 +48,6 @@ struct Config { std::string next_offset_file_name = "last_next_offset.txt"; std::string next_seq_file_path = output_dir + "/last_next_seq.txt"; - std::string kvrocks_auth; - std::string kvrocks_host; - int kvrocks_port = 0; std::map tokens; bool cluster_enabled = false; diff --git a/utils/kvrocks2redis/kvrocks2redis.conf b/utils/kvrocks2redis/kvrocks2redis.conf index 33a19555584..c3d1cbca2aa 100644 --- a/utils/kvrocks2redis/kvrocks2redis.conf +++ b/utils/kvrocks2redis/kvrocks2redis.conf @@ -22,10 +22,6 @@ data-dir ./data # Default: ./ output-dir ./ -# Sync kvrocks node. Use the node's Psync command to get the newest wal raw write_batch. -# -# kvrocks [] -kvrocks 127.0.0.1 6666 # Enable cluster mode. # diff --git a/utils/kvrocks2redis/main.cc b/utils/kvrocks2redis/main.cc index 767c2f19bf2..e6ca93bf422 100644 --- a/utils/kvrocks2redis/main.cc +++ b/utils/kvrocks2redis/main.cc @@ -122,7 +122,7 @@ int main(int argc, char *argv[]) { kvrocks_config.slot_id_encoded = config.cluster_enabled; engine::Storage storage(&kvrocks_config); - s = storage.Open(true); + s = storage.Open(kDBOpenModeAsSecondaryInstance); if (!s.IsOK()) { LOG(ERROR) << "Failed to open Kvrocks storage: " << s.Msg(); exit(1); diff --git a/utils/kvrocks2redis/parser.cc b/utils/kvrocks2redis/parser.cc index 0c463d9aca4..86b3e1a5540 100644 --- a/utils/kvrocks2redis/parser.cc +++ b/utils/kvrocks2redis/parser.cc @@ -33,14 +33,13 @@ Status Parser::ParseFullDB() { rocksdb::DB *db = storage_->GetDB(); - if (!latest_snapshot_) latest_snapshot_ = std::make_unique(db); rocksdb::ColumnFamilyHandle *metadata_cf_handle = storage_->GetCFHandle(engine::kMetadataColumnFamilyName); - + // Due to RSI(Rocksdb Secondary Instance) not supporting "Snapshots based read", we don't need to set the snapshot + // parameter. However, until we proactively invoke TryCatchUpWithPrimary, this replica is read-only, which can be + // considered as a snapshot. rocksdb::ReadOptions read_options; - read_options.snapshot = latest_snapshot_->GetSnapShot(); read_options.fill_cache = false; std::unique_ptr iter(db->NewIterator(read_options, metadata_cf_handle)); - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { Metadata metadata(kRedisNone); auto ds = metadata.Decode(iter->value()); @@ -91,7 +90,6 @@ Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) { std::string next_version_prefix_key = InternalKey(ns_key, "", metadata.version + 1, slot_id_encoded_).Encode(); rocksdb::ReadOptions read_options = storage_->DefaultScanOptions(); - read_options.snapshot = latest_snapshot_->GetSnapShot(); rocksdb::Slice upper_bound(next_version_prefix_key); read_options.iterate_upper_bound = &upper_bound; diff --git a/utils/kvrocks2redis/parser.h b/utils/kvrocks2redis/parser.h index 69fe0f8d9c5..09ba2f17c4c 100644 --- a/utils/kvrocks2redis/parser.h +++ b/utils/kvrocks2redis/parser.h @@ -33,27 +33,10 @@ #include "storage/storage.h" #include "writer.h" -class LatestSnapShot { - public: - explicit LatestSnapShot(rocksdb::DB *db) : db_(db), snapshot_(db_->GetSnapshot()) {} - ~LatestSnapShot() { db_->ReleaseSnapshot(snapshot_); } - - LatestSnapShot(const LatestSnapShot &) = delete; - LatestSnapShot &operator=(const LatestSnapShot &) = delete; - - const rocksdb::Snapshot *GetSnapShot() { return snapshot_; } - - private: - rocksdb::DB *db_ = nullptr; - const rocksdb::Snapshot *snapshot_ = nullptr; -}; - class Parser { public: explicit Parser(engine::Storage *storage, Writer *writer) - : storage_(storage), writer_(writer), slot_id_encoded_(storage_->IsSlotIdEncoded()) { - latest_snapshot_ = std::make_unique(storage->GetDB()); - } + : storage_(storage), writer_(writer), slot_id_encoded_(storage_->IsSlotIdEncoded()) {} ~Parser() = default; Status ParseFullDB(); @@ -62,7 +45,6 @@ class Parser { protected: engine::Storage *storage_ = nullptr; Writer *writer_ = nullptr; - std::unique_ptr latest_snapshot_; bool slot_id_encoded_ = false; Status parseSimpleKV(const Slice &ns_key, const Slice &value, uint64_t expire); diff --git a/utils/kvrocks2redis/sync.cc b/utils/kvrocks2redis/sync.cc index c142b942609..61303a0aec4 100644 --- a/utils/kvrocks2redis/sync.cc +++ b/utils/kvrocks2redis/sync.cc @@ -43,17 +43,14 @@ Sync::Sync(engine::Storage *storage, Writer *writer, Parser *parser, kvrocks2red : storage_(storage), writer_(writer), parser_(parser), config_(config) {} Sync::~Sync() { - if (sock_fd_) close(sock_fd_); if (next_seq_fd_) close(next_seq_fd_); writer_->Stop(); } /* - * Run connect to kvrocks, and start the following steps - * asynchronously - * - TryPsync - * - - if ok, IncrementBatchLoop - * - - not, parseAllLocalStorage and restart TryPsync when done + * 1. Attempt to directly parse the wal. + * 2. If the attempt fails, then it is necessary to parse the current snapshot, + * After completion, repeat the steps of the first phase. */ void Sync::Start() { auto s = readNextSeqFromFile(&next_seq_); @@ -61,39 +58,16 @@ void Sync::Start() { LOG(ERROR) << s.Msg(); return; } - LOG(INFO) << "[kvrocks2redis] Start sync the data from kvrocks to redis"; while (!IsStopped()) { - auto sock_fd = util::SockConnect(config_->kvrocks_host, config_->kvrocks_port); - if (!sock_fd) { - LOG(ERROR) << fmt::format("Failed to connect to Kvrocks on {}:{}. Error: {}", config_->kvrocks_host, - config_->kvrocks_port, sock_fd.Msg()); - usleep(10000); - continue; + s = checkWalBoundary(); + if (!s.IsOK()) { + parseKVFromLocalStorage(); } - - sock_fd_ = *sock_fd; - s = auth(); + s = incrementBatchLoop(); if (!s.IsOK()) { LOG(ERROR) << s.Msg(); - usleep(10000); - continue; - } - - while (!IsStopped()) { - s = tryPSync(); - if (!s.IsOK()) { - LOG(ERROR) << s.Msg(); - break; - } - LOG(INFO) << "[kvrocks2redis] PSync is ok, start increment batch loop"; - s = incrementBatchLoop(); - if (!s.IsOK()) { - LOG(ERROR) << s.Msg(); - continue; - } } - close(sock_fd_); } } @@ -104,98 +78,68 @@ void Sync::Stop() { LOG(INFO) << "[kvrocks2redis] Stopped"; } -Status Sync::auth() { - // Send auth when needed - if (!config_->kvrocks_auth.empty()) { - const auto auth_command = redis::MultiBulkString({"AUTH", config_->kvrocks_auth}); - auto s = util::SockSend(sock_fd_, auth_command); - if (!s) return s.Prefixed("send auth command err"); - std::string line = GET_OR_RET(util::SockReadLine(sock_fd_).Prefixed("read auth response err")); - if (line.compare(0, 3, "+OK") != 0) { - return {Status::NotOK, "auth got invalid response"}; - } - } - LOG(INFO) << "[kvrocks2redis] Auth succ, continue..."; - return Status::OK(); +Status Sync::tryCatchUpWithPrimary() { + auto s = storage_->GetDB()->TryCatchUpWithPrimary(); + return s.ok() ? Status() : Status::NotOK; } -Status Sync::tryPSync() { - const auto seq_str = std::to_string(next_seq_); - const auto seq_len_str = std::to_string(seq_str.length()); - const auto cmd_str = "*2" CRLF "$5" CRLF "PSYNC" CRLF "$" + seq_len_str + CRLF + seq_str + CRLF; - auto s = util::SockSend(sock_fd_, cmd_str); - LOG(INFO) << "[kvrocks2redis] Try to use psync, next seq: " << next_seq_; - if (!s) return s.Prefixed("send psync command err"); - std::string line = GET_OR_RET(util::SockReadLine(sock_fd_).Prefixed("read psync response err")); +Status Sync::checkWalBoundary() { + if (next_seq_ == storage_->LatestSeqNumber() + 1) { + return Status::OK(); + } - if (line.compare(0, 3, "+OK") != 0) { - if (next_seq_ > 0) { - // Ooops, Failed to psync , sync process has been terminated, administrator should be notified - // when full sync is needed, please remove last_next_seq config file, and restart kvrocks2redis - auto error_msg = - "[kvrocks2redis] CRITICAL - Failed to psync , please remove" - " last_next_seq config file, and restart kvrocks2redis, redis reply: " + - std::string(line); - stop_flag_ = true; - return {Status::NotOK, error_msg}; + // Upper bound + if (next_seq_ > storage_->LatestSeqNumber() + 1) { + return {Status::NotOK}; + } + + // Lower bound + std::unique_ptr iter; + auto s = storage_->GetWALIter(next_seq_, &iter); + if (s.IsOK() && iter->Valid()) { + auto batch = iter->GetBatch(); + if (next_seq_ != batch.sequence) { + if (next_seq_ > batch.sequence) { + LOG(ERROR) << "checkWALBoundary with sequence: " << next_seq_ + << ", but GetWALIter return older sequence: " << batch.sequence; + } + return {Status::NotOK}; } - // PSYNC isn't OK, we should use parseAllLocalStorage - // Switch to parseAllLocalStorage - LOG(INFO) << "[kvrocks2redis] Failed to psync, redis reply: " << std::string(line); - parseKVFromLocalStorage(); - // Restart tryPSync - return tryPSync(); + return Status::OK(); } - return Status::OK(); + return {Status::NotOK}; } Status Sync::incrementBatchLoop() { - std::cout << "Start parse increment batch ..." << std::endl; - evbuffer *evbuf = evbuffer_new(); + LOG(INFO) << "[kvrocks2redis] Start parsing increment data"; + std::unique_ptr iter; while (!IsStopped()) { - if (evbuffer_read(evbuf, sock_fd_, -1) <= 0) { - evbuffer_free(evbuf); - return {Status::NotOK, std::string("[kvrocks2redis] read increment batch err: ") + strerror(errno)}; + if (!tryCatchUpWithPrimary().IsOK()) { + return {Status::NotOK}; } - if (incr_state_ == IncrementBatchLoopState::Incr_batch_size) { - // Read bulk length - UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_CRLF_STRICT); - if (!line) { - usleep(10000); - continue; - } - incr_bulk_len_ = line.length > 0 ? std::strtoull(line.get() + 1, nullptr, 10) : 0; - if (incr_bulk_len_ == 0) { - return {Status::NotOK, "[kvrocks2redis] Invalid increment data size"}; - } - incr_state_ = Incr_batch_data; - } - - if (incr_state_ == IncrementBatchLoopState::Incr_batch_data) { - // Read bulk data (batch data) - if (incr_bulk_len_ + 2 <= evbuffer_get_length(evbuf)) { // We got enough data - char *bulk_data = reinterpret_cast(evbuffer_pullup(evbuf, static_cast(incr_bulk_len_) + 2)); - std::string bulk_data_str = std::string(bulk_data, incr_bulk_len_); - // Skip the ping packet - if (bulk_data_str != "ping") { - auto bat = rocksdb::WriteBatch(bulk_data_str); - int count = static_cast(bat.Count()); - auto s = parser_->ParseWriteBatch(bulk_data_str); - if (!s.IsOK()) { - return s.Prefixed(fmt::format("failed to parse write batch '{}'", util::StringToHex(bulk_data_str))); - } - - s = updateNextSeq(next_seq_ + count); - if (!s.IsOK()) { - return s.Prefixed("failed to update next sequence"); + if (next_seq_ <= storage_->LatestSeqNumber()) { + storage_->GetDB()->GetUpdatesSince(next_seq_, &iter); + for (; iter->Valid(); iter->Next()) { + auto batch = iter->GetBatch(); + if (batch.sequence != next_seq_) { + if (next_seq_ > batch.sequence) { + LOG(ERROR) << "checkWALBoundary with sequence: " << next_seq_ + << ", but GetWALIter return older sequence: " << batch.sequence; } + return {Status::NotOK}; + } + auto s = parser_->ParseWriteBatch(batch.writeBatchPtr->Data()); + if (!s.IsOK()) { + return s.Prefixed( + fmt::format("failed to parse write batch '{}'", util::StringToHex(batch.writeBatchPtr->Data()))); + } + s = updateNextSeq(next_seq_ + batch.writeBatchPtr->Count()); + if (!s.IsOK()) { + return s.Prefixed("failed to update next sequence"); } - evbuffer_drain(evbuf, incr_bulk_len_ + 2); - incr_state_ = Incr_batch_size; - } else { - usleep(10000); - continue; } + } else { + usleep(10000); } } return Status::OK(); @@ -217,8 +161,8 @@ void Sync::parseKVFromLocalStorage() { LOG(ERROR) << "[kvrocks2redis] Failed to parse full db, encounter error: " << s.Msg(); return; } - - s = updateNextSeq(storage_->LatestSeqNumber() + 1); + auto last_seq = storage_->GetDB()->GetLatestSequenceNumber(); + s = updateNextSeq(last_seq + 1); if (!s.IsOK()) { LOG(ERROR) << "[kvrocks2redis] Failed to update next sequence: " << s.Msg(); } diff --git a/utils/kvrocks2redis/sync.h b/utils/kvrocks2redis/sync.h index 66b9ffae505..5c746b845ba 100644 --- a/utils/kvrocks2redis/sync.h +++ b/utils/kvrocks2redis/sync.h @@ -46,7 +46,6 @@ class Sync { bool IsStopped() const { return stop_flag_; } private: - int sock_fd_; bool stop_flag_ = false; engine::Storage *storage_ = nullptr; Writer *writer_ = nullptr; @@ -61,12 +60,11 @@ class Sync { Incr_batch_data, } incr_state_ = Incr_batch_size; - size_t incr_bulk_len_ = 0; - - Status auth(); - Status tryPSync(); Status incrementBatchLoop(); + Status tryCatchUpWithPrimary(); + Status checkWalBoundary(); + void parseKVFromLocalStorage(); Status updateNextSeq(rocksdb::SequenceNumber seq); diff --git a/utils/kvrocks2redis/tests/README.md b/utils/kvrocks2redis/tests/README.md index 17c6d687ae0..6bc6048d9b9 100644 --- a/utils/kvrocks2redis/tests/README.md +++ b/utils/kvrocks2redis/tests/README.md @@ -12,9 +12,11 @@ For testing the `kvrocks2redis` utility, manually check generate AOF. ```bash # populate data -python populate-kvrocks.py -# check generated aof file -# append new data -python append-data-to-kvrocks.py -# check appended new aof data +python3 populate-kvrocks.py [--host HOST] [--port PORT] [--password PASSWORD] [--flushdb FLUSHDB] +# check generated aof file & user_key.log file + +# check consistency +python3 check_consistency.py [--src_host SRC_HOST] [--src_port SRC_PORT] [--src_password SRC_PASSWORD] + [--dst_host DST_HOST] [--dst_port DST_PORT] [--dst_password DST_PASSWORD] + [--key_file KEY_FILE] ``` diff --git a/utils/kvrocks2redis/tests/append-data-to-kvrocks.py b/utils/kvrocks2redis/tests/append-data-to-kvrocks.py deleted file mode 100644 index 6cfb32118f7..00000000000 --- a/utils/kvrocks2redis/tests/append-data-to-kvrocks.py +++ /dev/null @@ -1,84 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import redis - -range=100 -factor=32 -port=6666 - -r = redis.StrictRedis(host='localhost', port=port, db=0, password='foobared') - -# string -rst = r.set('foo', 2) # update old -assert rst -rst = r.set('foo2', 2) # add new -assert rst - -rst = r.setex('foo_ex', 7200, 2) -assert rst - -# zset -rst = r.zadd('zfoo', 4, 'd') -assert(rst == 1) -rst = r.zrem('zfoo', 'd') -assert(rst == 1) - -# list -rst = r.lset('lfoo', 0, 'a') -assert(rst == 1) -rst = r.rpush('lfoo', 'a') -assert(rst == 5) -rst = r.lpush('lfoo', 'b') -assert(rst == 6) -rst = r.lpop('lfoo') -assert(rst == 'b') -rst = r.rpop('lfoo') -assert(rst == 'a') -rst = r.ltrim('lfoo', 0, 2) -assert rst - -# set -rst = r.sadd('sfoo', 'f') -assert(rst == 1) -rst = r.srem('sfoo', 'f') -assert(rst == 1) - -# hash -rst = r.hset('hfoo', 'b', 2) -assert(rst == 1) -rst = r.hdel('hfoo', 'b') -assert(rst == 1) - -# bitmap -rst = r.setbit('bfoo', 0, 0) # update old -assert(rst == 1) -rst = r.setbit('bfoo', 900000, 1) # add new -assert(rst == 0) - -# expire cmd -rst = r.expire('foo', 7200) -assert rst -rst = r.expire('zfoo', 7200) -assert rst - -# del cmd -rst = r.delete('foo') -assert rst -rst = r.delete('zfoo') -assert rst - diff --git a/utils/kvrocks2redis/tests/check_consistency.py b/utils/kvrocks2redis/tests/check_consistency.py new file mode 100644 index 00000000000..3a176cc87ec --- /dev/null +++ b/utils/kvrocks2redis/tests/check_consistency.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import redis +import codecs +import argparse +import time + +class RedisComparator: + def __init__(self, src_host, src_port, src_password, dst_host, dst_port, dst_password): + self.src_cli = self._get_redis_client(src_host, src_port, src_password) + self.dst_cli = self._get_redis_client(dst_host, dst_port, dst_password) + + def _get_redis_client(self, host, port, password): + return redis.Redis(host=host, port=port, decode_responses=True, password=password) + + def _compare_string_data(self, key): + src_data = self.src_cli.get(key) + dst_data = self.dst_cli.get(key) + return src_data, dst_data + + def _compare_hash_data(self, key): + src_data = self.src_cli.hgetall(key) + dst_data = self.dst_cli.hgetall(key) + return src_data, dst_data + + def _compare_list_data(self, key): + src_data = self.src_cli.lrange(key, 0, -1) + dst_data = self.dst_cli.lrange(key, 0, -1) + return src_data, dst_data + + def _compare_set_data(self, key): + src_data = self.src_cli.smembers(key) + dst_data = self.dst_cli.smembers(key) + return src_data, dst_data + + def _compare_zset_data(self, key): + src_data = self.src_cli.zrange(key, 0, -1, withscores=True) + dst_data = self.dst_cli.zrange(key, 0, -1, withscores=True) + return src_data, dst_data + + def _compare_bitmap_data(self, key, pos): + src_data = self.src_cli.getbit(key, pos) + dst_data = self.dst_cli.getbit(key, pos) + return src_data, dst_data + + def _compare_data(self, keys : list, data_type): + if data_type == "string": + return self._compare_string_data(keys[0]) + elif data_type == "hash": + return self._compare_hash_data(keys[0]) + elif data_type == "list": + return self._compare_list_data(keys[0]) + elif data_type == "set": + return self._compare_set_data(keys[0]) + elif data_type == "zset": + return self._compare_zset_data(keys[0]) + elif data_type == 'bitmap': + return self._compare_bitmap_data(keys[0], keys[1]) + elif data_type == 'none': + return self.src_cli.type(keys[0]), 'none' + else: + raise ValueError(f"Unsupported data type '{data_type}' for key '{keys[0]}'") + + def compare_redis_data(self, key_file=''): + if key_file: + with open(key_file, 'rb') as f: + for line in f: + keys = codecs.decode(line.strip()).split('-') + data_type = self.src_cli.type(keys[0]) + src_data, dst_data = self._compare_data(keys, data_type) + if src_data != dst_data: + raise AssertionError(f"Data mismatch for key '{key}': source data: '{src_data}' destination data: '{dst_data}'") + + self._import_and_compare(100) + print('All tests passed.') + + def _import_and_compare(self, num): + for i in range(num): + key = f'key_{i}' + value = f'value_{i}' + self.src_cli.set(key, value) + incr_key = 'incr_key' + self.src_cli.incr(incr_key) + hash_key = f'hash_key_{i}' + hash_value = {'field1': f'field1_value_{i}', 'field2': f'field2_value_{i}'} + self.src_cli.hmset(hash_key, hash_value) + set_key = f'set_key_{i}' + set_value = [f'set_value_{i}_1', f'set_value_{i}_2', f'set_value_{i}_3'] + self.src_cli.sadd(set_key, *set_value) + zset_key = f'zset_key_{i}' + zset_value = {f'member_{i}_1': i+1, f'member_{i}_2': i+2, f'member_{i}_3': i+3} + self.src_cli.zadd(zset_key, zset_value) + time.sleep(0.02) + keys = [key, incr_key, hash_key, set_key, zset_key] + for key in keys: + data_type = self.src_cli.type(key) + src_data, dst_data = self._compare_data([key], data_type) + if src_data != dst_data: + raise AssertionError(f"Data mismatch for key '{key}': source data: '{src_data}' destination data: '{dst_data}'") + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Redis Comparator') + parser.add_argument('--src_host', type=str, default='127.0.0.1', help='Source Redis host') + parser.add_argument('--src_port', type=int, default=6666, help='Source Redis port') + parser.add_argument('--src_password', type=str, default='foobared', help='Source Redis password') + parser.add_argument('--dst_host', type=str, default='127.0.0.1', help='Destination Redis host') + parser.add_argument('--dst_port', type=int, default=6379, help='Destination Redis port') + parser.add_argument('--dst_password', type=str, default='', help='Destination Redis password') + parser.add_argument('--key_file', type=str, help='Path to the file containing keys to compare') + + args = parser.parse_args() + + redis_comparator = RedisComparator(args.src_host, args.src_port, args.src_password, args.dst_host, args.dst_port, args.dst_password) + redis_comparator.compare_redis_data(args.key_file) \ No newline at end of file diff --git a/utils/kvrocks2redis/tests/populate-kvrocks.py b/utils/kvrocks2redis/tests/populate-kvrocks.py index cad3d2d4e81..8883a0dfd36 100644 --- a/utils/kvrocks2redis/tests/populate-kvrocks.py +++ b/utils/kvrocks2redis/tests/populate-kvrocks.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -15,57 +17,143 @@ # specific language governing permissions and limitations # under the License. +import time +import argparse import redis - -range=100 -factor=32 -port=6666 - -r = redis.StrictRedis(host='localhost', port=port, db=0, password='foobared') - -# flushall ? -# rst = r.flushall() -# assert rst - -# string -rst = r.set('foo', 1) -assert rst - -rst = r.setex('foo_ex', 3600, 1) -assert rst - -# zset -rst = r.zadd('zfoo', 1, 'a', 2, 'b', 3, 'c') -assert(rst == 3) - -# list -rst = r.rpush('lfoo', 1, 2, 3, 4) -assert(rst == 4) - -# set -rst = r.sadd('sfoo', 'a', 'b', 'c', 'd') -assert(rst == 4) - -# hash -rst = r.hset('hfoo', 'a', 1) -assert(rst == 1) - -# bitmap -rst = r.setbit('bfoo', 0, 1) -assert(rst == 0) -rst = r.setbit('bfoo', 1, 1) -assert(rst == 0) -rst = r.setbit('bfoo', 800000, 1) -assert(rst == 0) - -# expire cmd -rst = r.expire('foo', 3600) -assert rst -rst = r.expire('zfoo', 3600) -assert rst - - - - - - +import sys + + +filename = 'user_key.log' +file = open(filename, 'w') + +PopulateCases = [ + ('string', [ + [('set', 'foo', 1), True], + [('setex', 'foo_ex', 3600, 1), True], + ]), + ('zset', [ + [('zadd', 'zfoo', 1, 'a', 2, 'b', 3, 'c'), 3] + ]), + ('list', [ + [('rpush', 'lfoo', 1, 2, 3, 4), 4] + ]), + ('set', [ + [('sadd', 'sfoo', 'a', 'b', 'c', 'd'), 4] + ]), + ('hash', [ + [('hset', 'hfoo', 'a', 1), 1] + ]), + ('bitmap', [ + [('setbit', 'bfoo', 0, 1), 0], + [('setbit', 'bfoo', 1, 1), 0], + [('setbit', 'bfoo', 800000, 1), 0] + ]), + ('expire', [ + [('expire', 'foo', 3600), True], + [('expire', 'zfoo', 3600), True] + ]) +] + +AppendCases = [ + ('string', [ + [('set', 'foo', 2), True], + [('set', 'foo2', 2), True], + [('setex', 'foo_ex', 7200, 2), True] + ]), + ('zset', [ + [('zadd', 'zfoo', 4, 'd'), 1], + [('zrem', 'zfoo', 'd'), 1] + ]), + ('list', [ + [('lset', 'lfoo', 0, 'a'), 1], + [('rpush', 'lfoo', 'a'), 5], + [('lpush', 'lfoo', 'b'), 6], + [('lpop', 'lfoo'), 'b'], + [('rpop', 'lfoo'), 'a'], + [('ltrim', 'lfoo', 0, 2), True] + ]), + ('set', [ + [('sadd', 'sfoo', 'f'), 1], + [('srem', 'sfoo', 'f'), 1] + ]), + ('hash', [ + [('hset', 'hfoo', 'b', 2), 1], + [('hdel', 'hfoo', 'b'), 1] + ]), + ('bitmap', [ + [('setbit', 'bfoo', 0, 0), 1], + [('setbit', 'bfoo', 900000, 1), 0] + ]), + ('expire', [ + [('expire', 'foo', 7200), True], + [('expire', 'zfoo', 7200), True] + ]), + ('delete', [ + [('del', 'foo'), True], + [('del', 'zfoo'), True] + ]) +] + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument('--host', default='127.0.0.1', type=str, help='host') + parser.add_argument('--port', default=6666, type=int, help='port') + parser.add_argument('--password', default='foobared') + parser.add_argument('--flushdb', default=False, type=str, help='need to flushdb') + + return parser.parse_args() + +def check(cmd, r): + if len(cmd) == 1: + print('EXEC %s' % (str(cmd[0]),)) + return True + if hasattr(cmd[1], '__call__'): + isPass = cmd[1](r) + else: + isPass = r == cmd[1] + if not isPass: + print('FAIL %s:%s != %s' % (str(cmd[0]), repr(r), repr(cmd[1]))) + return False + return True + +def pipeline_execute(client, name, cmds): + succ = True + p = client.pipeline(transaction=False) + try: + for cmd in cmds: + if (name != 'bitmap'): + file.write(cmd[0][1] + '\n') + else: + file.write(f"{cmd[0][1]}-{cmd[0][2]}" + '\n') + p.execute_command(*cmd[0]) + res = p.execute() + for i in range(0, len(cmds)): + if not check(cmds[i], res[i]): + succ = False + except Exception as excp: + succ = False + print('EXCP %s' % str(excp)) + return succ + + + +def run_test(client, cases : list): + fails = [] + for case in cases: + if not pipeline_execute(client, case[0], case[1]): + fails.append(case[0]) + if len(fails) > 0: + print('******* Some case test fail *******') + for cmd in fails: + print(cmd) + else: + print('All case passed.') + + +if __name__ == '__main__': + args = parse_args() + client = redis.Redis(host=args.host, port=args.port, decode_responses=True, password=args.password) + if args.flushdb: + client.flushdb() + run_test(client, PopulateCases) + run_test(client, AppendCases)