From d66e82750c5636c5674e66cbb6597c0a74bf3704 Mon Sep 17 00:00:00 2001 From: Aleks Lozovyuk Date: Mon, 4 Nov 2024 13:35:39 +0200 Subject: [PATCH 1/3] ci: update crate-ci/typos to 1.27.0 (#2645) --- .github/workflows/kvrocks.yaml | 2 +- src/types/redis_json.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/kvrocks.yaml b/.github/workflows/kvrocks.yaml index a203f892d0d..4c24163265f 100644 --- a/.github/workflows/kvrocks.yaml +++ b/.github/workflows/kvrocks.yaml @@ -58,7 +58,7 @@ jobs: steps: - uses: actions/checkout@v4 - name: Check typos - uses: crate-ci/typos@v1.25.0 + uses: crate-ci/typos@v1.27.0 with: config: .github/config/typos.toml diff --git a/src/types/redis_json.h b/src/types/redis_json.h index 1a69408ee87..54831472dc1 100644 --- a/src/types/redis_json.h +++ b/src/types/redis_json.h @@ -86,7 +86,7 @@ class Json : public Database { private: rocksdb::Status write(engine::Context &ctx, Slice ns_key, JsonMetadata *metadata, const JsonValue &json_val); rocksdb::Status read(engine::Context &ctx, const Slice &ns_key, JsonMetadata *metadata, JsonValue *value); - static rocksdb::Status parse(const JsonMetadata &metadata, const Slice &json_byt, JsonValue *value); + static rocksdb::Status parse(const JsonMetadata &metadata, const Slice &json_byte, JsonValue *value); rocksdb::Status create(engine::Context &ctx, const std::string &ns_key, JsonMetadata &metadata, const std::string &value); rocksdb::Status del(engine::Context &ctx, const Slice &ns_key); From 1e25be6a4e20242c6f87c7b561f87666c6359437 Mon Sep 17 00:00:00 2001 From: SiLe Zhou Date: Tue, 5 Nov 2024 00:25:14 +0800 Subject: [PATCH 2/3] refactor: hoist key mutexes to ExecuteCommands (#2620) Co-authored-by: Twice Co-authored-by: hulk --- src/commands/blocking_commander.h | 11 ++- src/commands/cmd_list.cc | 29 ++++++++ src/commands/cmd_stream.cc | 8 +++ src/commands/cmd_zset.cc | 20 ++++++ src/server/redis_connection.cc | 18 ++++- src/storage/redis_db.cc | 20 +++--- src/types/redis_bitmap.cc | 10 +-- src/types/redis_bloom_chain.cc | 2 - src/types/redis_hash.cc | 5 +- src/types/redis_hyperloglog.cc | 2 - src/types/redis_json.cc | 21 +----- src/types/redis_list.cc | 10 +-- src/types/redis_set.cc | 30 -------- src/types/redis_sortedint.cc | 2 - src/types/redis_stream.cc | 20 ++---- src/types/redis_string.cc | 36 ++-------- src/types/redis_string.h | 3 +- src/types/redis_zset.cc | 36 ---------- tests/gocase/unit/protocol/regression_test.go | 3 - tests/gocase/unit/type/list/list_test.go | 68 +------------------ tests/gocase/unit/type/zset/zset_test.go | 7 -- 21 files changed, 107 insertions(+), 254 deletions(-) diff --git a/src/commands/blocking_commander.h b/src/commands/blocking_commander.h index 05883a8ac04..2749681984f 100644 --- a/src/commands/blocking_commander.h +++ b/src/commands/blocking_commander.h @@ -21,6 +21,7 @@ #pragma once #include "commander.h" +#include "common/lock_manager.h" #include "event_util.h" #include "server/redis_connection.h" @@ -44,6 +45,10 @@ class BlockingCommander : public Commander, // in other words, returning true indicates ending the blocking virtual bool OnBlockingWrite() = 0; + // GetLocks() locks the keys of the BlockingCommander with MultiLockGuard. + // When OnWrite() is triggered, BlockingCommander needs to relock the keys. + virtual MultiLockGuard GetLocks() = 0; + // to start the blocking process // usually put to the end of the Execute method Status StartBlocking(int64_t timeout, std::string *output) { @@ -63,7 +68,11 @@ class BlockingCommander : public Commander, } void OnWrite(bufferevent *bev) { - bool done = OnBlockingWrite(); + bool done{false}; + { + auto guard = GetLocks(); + done = OnBlockingWrite(); + } if (!done) { // The connection may be waked up but can't pop from the datatype. diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc index cf588ec0891..1a9f5d03dfc 100644 --- a/src/commands/cmd_list.cc +++ b/src/commands/cmd_list.cc @@ -313,6 +313,16 @@ class CommandBPop : public BlockingCommander { return s; } + MultiLockGuard GetLocks() override { + std::vector lock_keys; + lock_keys.reserve(keys_.size()); + for (const auto &key : keys_) { + auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key, srv_->storage->IsSlotIdEncoded()); + lock_keys.emplace_back(std::move(ns_key)); + } + return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys); + } + bool OnBlockingWrite() override { engine::Context ctx(srv_->storage); auto s = TryPopFromList(ctx); @@ -436,6 +446,16 @@ class CommandBLMPop : public BlockingCommander { } } + MultiLockGuard GetLocks() override { + std::vector lock_keys; + lock_keys.reserve(keys_.size()); + for (const auto &key : keys_) { + auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key, srv_->storage->IsSlotIdEncoded()); + lock_keys.emplace_back(std::move(ns_key)); + } + return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys); + } + bool OnBlockingWrite() override { engine::Context ctx(srv_->storage); auto s = ExecuteUnblocked(ctx); @@ -767,6 +787,15 @@ class CommandBLMove : public BlockingCommander { void UnblockKeys() override { srv_->UnblockOnKey(args_[1], conn_); } + MultiLockGuard GetLocks() override { + std::vector lock_keys{ + ComposeNamespaceKey(conn_->GetNamespace(), args_[1], srv_->storage->IsSlotIdEncoded())}; + if (args_[1] != args_[2]) { + lock_keys.emplace_back(ComposeNamespaceKey(conn_->GetNamespace(), args_[2], srv_->storage->IsSlotIdEncoded())); + } + return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys); + } + bool OnBlockingWrite() override { redis::List list_db(srv_->storage, conn_->GetNamespace()); std::string elem; diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index 521e5ca312f..0fe1cd3bccf 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -1635,6 +1635,14 @@ class CommandXReadGroup : public Commander, redis::Stream stream_db(srv_->storage, conn_->GetNamespace()); std::vector results; + + std::vector lock_keys; + lock_keys.reserve(streams_.size()); + for (auto &stream_name : streams_) { + auto ns_key = stream_db.AppendNamespacePrefix(stream_name); + lock_keys.emplace_back(std::move(ns_key)); + } + MultiLockGuard guard(srv_->storage->GetLockManager(), lock_keys); engine::Context ctx(srv_->storage); for (size_t i = 0; i < streams_.size(); ++i) { redis::StreamRangeOptions options; diff --git a/src/commands/cmd_zset.cc b/src/commands/cmd_zset.cc index 5b1f392f1a7..99285a625e9 100644 --- a/src/commands/cmd_zset.cc +++ b/src/commands/cmd_zset.cc @@ -366,6 +366,16 @@ class CommandBZPop : public BlockingCommander { conn_->Reply(output); } + MultiLockGuard GetLocks() override { + std::vector lock_keys; + lock_keys.reserve(keys_.size()); + for (const auto &key : keys_) { + auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key, srv_->storage->IsSlotIdEncoded()); + lock_keys.emplace_back(std::move(ns_key)); + } + return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys); + } + bool OnBlockingWrite() override { std::string user_key; std::vector member_scores; @@ -548,6 +558,16 @@ class CommandBZMPop : public BlockingCommander { std::string NoopReply(const Connection *conn) override { return conn->NilString(); } + MultiLockGuard GetLocks() override { + std::vector lock_keys; + lock_keys.reserve(keys_.size()); + for (const auto &key : keys_) { + auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key, srv_->storage->IsSlotIdEncoded()); + lock_keys.emplace_back(std::move(ns_key)); + } + return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys); + } + bool OnBlockingWrite() override { std::string user_key; std::vector member_scores; diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc index 70abfe70b54..54899afe153 100644 --- a/src/server/redis_connection.cc +++ b/src/server/redis_connection.cc @@ -489,9 +489,24 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { SetLastCmd(cmd_name); { + std::optional guard; + if (cmd_flags & kCmdWrite) { + std::vector lock_keys; + attributes->ForEachKeyRange( + [&lock_keys, this](const std::vector &args, const CommandKeyRange &key_range) { + key_range.ForEachKey( + [&, this](const std::string &key) { + auto ns_key = ComposeNamespaceKey(ns_, key, srv_->storage->IsSlotIdEncoded()); + lock_keys.emplace_back(std::move(ns_key)); + }, + args); + }, + cmd_tokens); + + guard.emplace(srv_->storage->GetLockManager(), lock_keys); + } engine::Context ctx(srv_->storage); - // TODO: transaction support for index recording std::vector index_records; if (!srv_->index_mgr.index_map.empty() && IsCmdForIndexing(cmd_flags, attributes->category) && !config->cluster_enabled) { @@ -512,7 +527,6 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { } s = ExecuteCommand(ctx, cmd_name, cmd_tokens, current_cmd.get(), &reply); - // TODO: transaction support for index updating for (const auto &record : index_records) { auto s = GlobalIndexer::Update(ctx, record); if (!s.IsOK() && !s.Is()) { diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc index 5eabd8d8c94..26864f3ee8b 100644 --- a/src/storage/redis_db.cc +++ b/src/storage/redis_db.cc @@ -112,7 +112,7 @@ rocksdb::Status Database::Expire(engine::Context &ctx, const Slice &user_key, ui std::string value; Metadata metadata(kRedisNone, false); - LockGuard guard(storage_->GetLockManager(), ns_key); + rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), metadata_cf_handle_, ns_key, &value); if (!s.ok()) return s; @@ -150,7 +150,7 @@ rocksdb::Status Database::Del(engine::Context &ctx, const Slice &user_key) { std::string ns_key = AppendNamespacePrefix(user_key); std::string value; - LockGuard guard(storage_->GetLockManager(), ns_key); + rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), metadata_cf_handle_, ns_key, &value); if (!s.ok()) return s; Metadata metadata(kRedisNone, false); @@ -165,13 +165,12 @@ rocksdb::Status Database::Del(engine::Context &ctx, const Slice &user_key) { rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector &keys, uint64_t *deleted_cnt) { *deleted_cnt = 0; - std::vector lock_keys; - lock_keys.reserve(keys.size()); + std::vector ns_keys; + ns_keys.reserve(keys.size()); for (const auto &key : keys) { std::string ns_key = AppendNamespacePrefix(key); - lock_keys.emplace_back(std::move(ns_key)); + ns_keys.emplace_back(std::move(ns_key)); } - MultiLockGuard guard(storage_->GetLockManager(), lock_keys); auto batch = storage_->GetWriteBatchBase(); WriteBatchLogData log_data(kRedisNone); @@ -181,8 +180,8 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector &k } std::vector slice_keys; - slice_keys.reserve(lock_keys.size()); - for (const auto &ns_key : lock_keys) { + slice_keys.reserve(ns_keys.size()); + for (const auto &ns_key : ns_keys) { slice_keys.emplace_back(ns_key); } @@ -202,7 +201,7 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector &k if (!s.ok()) continue; if (metadata.Expired()) continue; - s = batch->Delete(metadata_cf_handle_, lock_keys[i]); + s = batch->Delete(metadata_cf_handle_, ns_keys[i]); if (!s.ok()) return s; *deleted_cnt += 1; } @@ -652,9 +651,6 @@ rocksdb::Status Database::typeInternal(engine::Context &ctx, const Slice &key, R rocksdb::Status Database::Copy(engine::Context &ctx, const std::string &key, const std::string &new_key, bool nx, bool delete_old, CopyResult *res) { - std::vector lock_keys = {key, new_key}; - MultiLockGuard guard(storage_->GetLockManager(), lock_keys); - RedisType type = kRedisNone; auto s = typeInternal(ctx, key, &type); if (!s.ok()) return s; diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc index ae9fefcd315..e75deea3644 100644 --- a/src/types/redis_bitmap.cc +++ b/src/types/redis_bitmap.cc @@ -184,7 +184,6 @@ rocksdb::Status Bitmap::SetBit(engine::Context &ctx, const Slice &user_key, uint std::string raw_value; std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); BitmapMetadata metadata; rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata, &raw_value); if (!s.ok() && !s.IsNotFound()) return s; @@ -461,7 +460,6 @@ rocksdb::Status Bitmap::BitOp(engine::Context &ctx, BitOpFlags op_flag, const st const Slice &user_key, const std::vector &op_keys, int64_t *len) { std::string raw_value; std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); std::vector> meta_pairs; uint64_t max_bitmap_size = 0; @@ -824,15 +822,9 @@ template rocksdb::Status Bitmap::bitfield(engine::Context &ctx, const Slice &user_key, const std::vector &ops, std::vector> *rets) { std::string ns_key = AppendNamespacePrefix(user_key); - - std::optional guard; - if constexpr (!ReadOnly) { - guard = LockGuard(storage_->GetLockManager(), ns_key); - } - BitmapMetadata metadata; std::string raw_value; - // TODO(mwish): maintain snapshot for read-only bitfield. + auto s = GetMetadata(ctx, ns_key, &metadata, &raw_value); if (!s.ok() && !s.IsNotFound()) { return s; diff --git a/src/types/redis_bloom_chain.cc b/src/types/redis_bloom_chain.cc index 30c07fe338c..7dea8189b16 100644 --- a/src/types/redis_bloom_chain.cc +++ b/src/types/redis_bloom_chain.cc @@ -126,7 +126,6 @@ rocksdb::Status BloomChain::Reserve(engine::Context &ctx, const Slice &user_key, uint16_t expansion) { std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); BloomChainMetadata bloom_chain_metadata; rocksdb::Status s = getBloomChainMetadata(ctx, ns_key, &bloom_chain_metadata); if (!s.ok() && !s.IsNotFound()) return s; @@ -156,7 +155,6 @@ rocksdb::Status BloomChain::InsertCommon(engine::Context &ctx, const Slice &user const BloomFilterInsertOptions &insert_options, std::vector *rets) { std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); BloomChainMetadata metadata; rocksdb::Status s = getBloomChainMetadata(ctx, ns_key, &metadata); diff --git a/src/types/redis_hash.cc b/src/types/redis_hash.cc index 8930c9fce64..905efdadd37 100644 --- a/src/types/redis_hash.cc +++ b/src/types/redis_hash.cc @@ -66,7 +66,6 @@ rocksdb::Status Hash::IncrBy(engine::Context &ctx, const Slice &user_key, const std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); HashMetadata metadata; rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok() && !s.IsNotFound()) return s; @@ -117,7 +116,6 @@ rocksdb::Status Hash::IncrByFloat(engine::Context &ctx, const Slice &user_key, c std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); HashMetadata metadata; rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok() && !s.IsNotFound()) return s; @@ -211,7 +209,7 @@ rocksdb::Status Hash::Delete(engine::Context &ctx, const Slice &user_key, const WriteBatchLogData log_data(kRedisHash); auto s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; - LockGuard guard(storage_->GetLockManager(), ns_key); + s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s; @@ -245,7 +243,6 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const Slice &user_key, const st *added_cnt = 0; std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); HashMetadata metadata; rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok() && !s.IsNotFound()) return s; diff --git a/src/types/redis_hyperloglog.cc b/src/types/redis_hyperloglog.cc index 562a7d6b1f6..b7b2197f208 100644 --- a/src/types/redis_hyperloglog.cc +++ b/src/types/redis_hyperloglog.cc @@ -112,7 +112,6 @@ rocksdb::Status HyperLogLog::Add(engine::Context &ctx, const Slice &user_key, *ret = 0; std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); HyperLogLogMetadata metadata{}; rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok() && !s.IsNotFound()) { @@ -238,7 +237,6 @@ rocksdb::Status HyperLogLog::Merge(engine::Context &ctx, const Slice &dest_user_ } std::string dest_key = AppendNamespacePrefix(dest_user_key); - LockGuard guard(storage_->GetLockManager(), dest_key); std::vector registers; HyperLogLogMetadata metadata; diff --git a/src/types/redis_json.cc b/src/types/redis_json.cc index 5120573e7ce..ba331ef96ff 100644 --- a/src/types/redis_json.cc +++ b/src/types/redis_json.cc @@ -124,8 +124,6 @@ rocksdb::Status Json::Set(engine::Context &ctx, const std::string &user_key, con const std::string &value) { auto ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); - JsonMetadata metadata; JsonValue origin; auto s = read(ctx, ns_key, &metadata, &origin); @@ -190,8 +188,6 @@ rocksdb::Status Json::ArrAppend(engine::Context &ctx, const std::string &user_ke append_values.emplace_back(std::move(value.value)); } - LockGuard guard(storage_->GetLockManager(), ns_key); - JsonMetadata metadata; JsonValue value; auto s = read(ctx, ns_key, &metadata, &value); @@ -248,8 +244,6 @@ rocksdb::Status Json::Merge(engine::Context &ctx, const std::string &user_key, c const std::string &merge_value, bool &result) { auto ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); - JsonMetadata metadata; JsonValue json_val; @@ -279,8 +273,6 @@ rocksdb::Status Json::Clear(engine::Context &ctx, const std::string &user_key, c size_t *result) { auto ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); - JsonValue json_val; JsonMetadata metadata; auto s = read(ctx, ns_key, &metadata, &json_val); @@ -327,8 +319,6 @@ rocksdb::Status Json::ArrInsert(engine::Context &ctx, const std::string &user_ke insert_values.emplace_back(std::move(value.value)); } - LockGuard guard(storage_->GetLockManager(), ns_key); - JsonMetadata metadata; JsonValue value; auto s = read(ctx, ns_key, &metadata, &value); @@ -349,8 +339,6 @@ rocksdb::Status Json::Toggle(engine::Context &ctx, const std::string &user_key, Optionals *results) { auto ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); - JsonMetadata metadata; JsonValue origin; auto s = read(ctx, ns_key, &metadata, &origin); @@ -367,8 +355,6 @@ rocksdb::Status Json::ArrPop(engine::Context &ctx, const std::string &user_key, std::vector> *results) { auto ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); - JsonMetadata metadata; JsonValue json_val; auto s = read(ctx, ns_key, &metadata, &json_val); @@ -403,8 +389,6 @@ rocksdb::Status Json::ArrTrim(engine::Context &ctx, const std::string &user_key, int64_t stop, Optionals *results) { auto ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); - JsonMetadata metadata; JsonValue json_val; auto s = read(ctx, ns_key, &metadata, &json_val); @@ -424,7 +408,7 @@ rocksdb::Status Json::Del(engine::Context &ctx, const std::string &user_key, con *result = 0; auto ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); + JsonValue json_val; JsonMetadata metadata; auto s = read(ctx, ns_key, &metadata, &json_val); @@ -473,8 +457,6 @@ rocksdb::Status Json::numop(engine::Context &ctx, JsonValue::NumOpEnum op, const auto s = read(ctx, ns_key, &metadata, &json_val); if (!s.ok()) return s; - LockGuard guard(storage_->GetLockManager(), ns_key); - auto res = json_val.NumOp(path, number, op, result); if (!res) { return rocksdb::Status::InvalidArgument(res.Msg()); @@ -571,7 +553,6 @@ rocksdb::Status Json::MSet(engine::Context &ctx, const std::vector std::string ns_key = AppendNamespacePrefix(user_key); ns_keys.emplace_back(std::move(ns_key)); } - MultiLockGuard guard(storage_->GetLockManager(), ns_keys); auto batch = storage_->GetWriteBatchBase(); WriteBatchLogData log_data(kRedisJson); diff --git a/src/types/redis_list.cc b/src/types/redis_list.cc index e640ffb6005..acef1ed9cc6 100644 --- a/src/types/redis_list.cc +++ b/src/types/redis_list.cc @@ -63,7 +63,7 @@ rocksdb::Status List::push(engine::Context &ctx, const Slice &user_key, const st WriteBatchLogData log_data(kRedisList, {std::to_string(cmd)}); auto s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; - LockGuard guard(storage_->GetLockManager(), ns_key); + s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok() && !(create_if_missing && s.IsNotFound())) { return s.IsNotFound() ? rocksdb::Status::OK() : s; @@ -108,7 +108,6 @@ rocksdb::Status List::PopMulti(engine::Context &ctx, const rocksdb::Slice &user_ std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); ListMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s; @@ -177,7 +176,6 @@ rocksdb::Status List::Rem(engine::Context &ctx, const Slice &user_key, int count std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); ListMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s; @@ -271,7 +269,6 @@ rocksdb::Status List::Insert(engine::Context &ctx, const Slice &user_key, const *new_size = 0; std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); ListMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s; @@ -462,7 +459,6 @@ rocksdb::Status List::Pos(engine::Context &ctx, const Slice &user_key, const Sli rocksdb::Status List::Set(engine::Context &ctx, const Slice &user_key, int index, Slice elem) { std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); ListMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s; @@ -501,7 +497,6 @@ rocksdb::Status List::lmoveOnSingleList(engine::Context &ctx, const rocksdb::Sli std::string *elem) { std::string ns_key = AppendNamespacePrefix(src); - LockGuard guard(storage_->GetLockManager(), ns_key); ListMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) { @@ -567,8 +562,6 @@ rocksdb::Status List::lmoveOnTwoLists(engine::Context &ctx, const rocksdb::Slice std::string src_ns_key = AppendNamespacePrefix(src); std::string dst_ns_key = AppendNamespacePrefix(dst); - std::vector lock_keys{src_ns_key, dst_ns_key}; - MultiLockGuard guard(storage_->GetLockManager(), lock_keys); ListMetadata src_metadata(false); auto s = GetMetadata(ctx, src_ns_key, &src_metadata); if (!s.ok()) { @@ -636,7 +629,6 @@ rocksdb::Status List::Trim(engine::Context &ctx, const Slice &user_key, int star uint32_t trim_cnt = 0; std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); ListMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s; diff --git a/src/types/redis_set.cc b/src/types/redis_set.cc index 043e0d36cea..01849328c60 100644 --- a/src/types/redis_set.cc +++ b/src/types/redis_set.cc @@ -37,7 +37,6 @@ rocksdb::Status Set::GetMetadata(engine::Context &ctx, const Slice &ns_key, SetM rocksdb::Status Set::Overwrite(engine::Context &ctx, Slice user_key, const std::vector &members) { std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); SetMetadata metadata; auto batch = storage_->GetWriteBatchBase(); WriteBatchLogData log_data(kRedisSet); @@ -62,7 +61,6 @@ rocksdb::Status Set::Add(engine::Context &ctx, const Slice &user_key, const std: std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); SetMetadata metadata; rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok() && !s.IsNotFound()) return s; @@ -100,7 +98,6 @@ rocksdb::Status Set::Remove(engine::Context &ctx, const Slice &user_key, const s std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); SetMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s; @@ -218,9 +215,6 @@ rocksdb::Status Set::Take(engine::Context &ctx, const Slice &user_key, std::vect std::string ns_key = AppendNamespacePrefix(user_key); - std::optional lock_guard; - if (pop) lock_guard.emplace(storage_->GetLockManager(), ns_key); - SetMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s; @@ -290,14 +284,6 @@ rocksdb::Status Set::Scan(engine::Context &ctx, const Slice &user_key, const std * DIFF key1 key2 key3 = {b,d} */ rocksdb::Status Set::Diff(engine::Context &ctx, const std::vector &keys, std::vector *members) { - std::vector lock_keys; - lock_keys.reserve(keys.size()); - for (const auto key : keys) { - std::string ns_key = AppendNamespacePrefix(key); - lock_keys.emplace_back(std::move(ns_key)); - } - MultiLockGuard guard(storage_->GetLockManager(), lock_keys); - members->clear(); std::vector source_members; auto s = Members(ctx, keys[0], &source_members); @@ -329,14 +315,6 @@ rocksdb::Status Set::Diff(engine::Context &ctx, const std::vector &keys, * UNION key1 key2 key3 = {a,b,c,d,e} */ rocksdb::Status Set::Union(engine::Context &ctx, const std::vector &keys, std::vector *members) { - std::vector lock_keys; - lock_keys.reserve(keys.size()); - for (const auto key : keys) { - std::string ns_key = AppendNamespacePrefix(key); - lock_keys.emplace_back(std::move(ns_key)); - } - MultiLockGuard guard(storage_->GetLockManager(), lock_keys); - members->clear(); std::map union_members; @@ -363,14 +341,6 @@ rocksdb::Status Set::Union(engine::Context &ctx, const std::vector &keys, * INTER key1 key2 key3 = {c} */ rocksdb::Status Set::Inter(engine::Context &ctx, const std::vector &keys, std::vector *members) { - std::vector lock_keys; - lock_keys.reserve(keys.size()); - for (const auto key : keys) { - std::string ns_key = AppendNamespacePrefix(key); - lock_keys.emplace_back(std::move(ns_key)); - } - MultiLockGuard guard(storage_->GetLockManager(), lock_keys); - members->clear(); std::map member_counters; diff --git a/src/types/redis_sortedint.cc b/src/types/redis_sortedint.cc index 98e1f7a5133..f29ca7de616 100644 --- a/src/types/redis_sortedint.cc +++ b/src/types/redis_sortedint.cc @@ -38,7 +38,6 @@ rocksdb::Status Sortedint::Add(engine::Context &ctx, const Slice &user_key, cons std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); SortedintMetadata metadata; rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok() && !s.IsNotFound()) return s; @@ -75,7 +74,6 @@ rocksdb::Status Sortedint::Remove(engine::Context &ctx, const Slice &user_key, c std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); SortedintMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s; diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 009a98c0a8d..df9284c2db5 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -96,7 +96,6 @@ rocksdb::Status Stream::Add(engine::Context &ctx, const Slice &stream_name, cons std::string ns_key = AppendNamespacePrefix(stream_name); - LockGuard guard(storage_->GetLockManager(), ns_key); StreamMetadata metadata; rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok() && !s.IsNotFound()) return s; @@ -333,7 +332,6 @@ rocksdb::Status Stream::DeletePelEntries(engine::Context &ctx, const Slice &stre std::string ns_key = AppendNamespacePrefix(stream_name); - LockGuard guard(storage_->GetLockManager(), ns_key); StreamMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) { @@ -400,7 +398,7 @@ rocksdb::Status Stream::ClaimPelEntries(engine::Context &ctx, const Slice &strea const std::vector &entry_ids, const StreamClaimOptions &options, StreamClaimResult *result) { std::string ns_key = AppendNamespacePrefix(stream_name); - LockGuard guard(storage_->GetLockManager(), ns_key); + StreamMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s; @@ -536,7 +534,6 @@ rocksdb::Status Stream::AutoClaim(engine::Context &ctx, const Slice &stream_name std::string ns_key = AppendNamespacePrefix(stream_name); StreamMetadata metadata(false); - LockGuard guard(storage_->GetLockManager(), ns_key); auto s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) { // not found will be caught by outside with no such key or consumer group return s; @@ -692,7 +689,6 @@ rocksdb::Status Stream::CreateGroup(engine::Context &ctx, const Slice &stream_na } std::string ns_key = AppendNamespacePrefix(stream_name); - LockGuard guard(storage_->GetLockManager(), ns_key); StreamMetadata metadata; rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok() && !s.IsNotFound()) { @@ -745,7 +741,6 @@ rocksdb::Status Stream::DestroyGroup(engine::Context &ctx, const Slice &stream_n *delete_cnt = 0; std::string ns_key = AppendNamespacePrefix(stream_name); - LockGuard guard(storage_->GetLockManager(), ns_key); StreamMetadata metadata; rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok() && !s.IsNotFound()) { @@ -849,14 +844,14 @@ rocksdb::Status Stream::createConsumerWithoutLock(engine::Context &ctx, const Sl rocksdb::Status Stream::CreateConsumer(engine::Context &ctx, const Slice &stream_name, const std::string &group_name, const std::string &consumer_name, int *created_number) { std::string ns_key = AppendNamespacePrefix(stream_name); - LockGuard guard(storage_->GetLockManager(), ns_key); + return createConsumerWithoutLock(ctx, stream_name, group_name, consumer_name, created_number); } rocksdb::Status Stream::DestroyConsumer(engine::Context &ctx, const Slice &stream_name, const std::string &group_name, const std::string &consumer_name, uint64_t &deleted_pel) { std::string ns_key = AppendNamespacePrefix(stream_name); - LockGuard guard(storage_->GetLockManager(), ns_key); + StreamMetadata metadata; rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok() && !s.IsNotFound()) { @@ -923,7 +918,7 @@ rocksdb::Status Stream::DestroyConsumer(engine::Context &ctx, const Slice &strea rocksdb::Status Stream::GroupSetId(engine::Context &ctx, const Slice &stream_name, const std::string &group_name, const StreamXGroupCreateOptions &options) { std::string ns_key = AppendNamespacePrefix(stream_name); - LockGuard guard(storage_->GetLockManager(), ns_key); + StreamMetadata metadata; rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok() && !s.IsNotFound()) { @@ -965,7 +960,6 @@ rocksdb::Status Stream::DeleteEntries(engine::Context &ctx, const Slice &stream_ std::string ns_key = AppendNamespacePrefix(stream_name); - LockGuard guard(storage_->GetLockManager(), ns_key); StreamMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) { @@ -1211,7 +1205,6 @@ rocksdb::Status Stream::GetStreamInfo(engine::Context &ctx, const rocksdb::Slice uint64_t count, StreamInfo *info) { std::string ns_key = AppendNamespacePrefix(stream_name); - LockGuard guard(storage_->GetLockManager(), ns_key); StreamMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s; @@ -1452,7 +1445,6 @@ rocksdb::Status Stream::RangeWithPending(engine::Context &ctx, const Slice &stre } std::string ns_key = AppendNamespacePrefix(stream_name); - LockGuard guard(storage_->GetLockManager(), ns_key); StreamMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); @@ -1582,8 +1574,6 @@ rocksdb::Status Stream::Trim(engine::Context &ctx, const Slice &stream_name, con std::string ns_key = AppendNamespacePrefix(stream_name); - LockGuard guard(storage_->GetLockManager(), ns_key); - StreamMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) { @@ -1696,8 +1686,6 @@ rocksdb::Status Stream::SetId(engine::Context &ctx, const Slice &stream_name, co std::string ns_key = AppendNamespacePrefix(stream_name); - LockGuard guard(storage_->GetLockManager(), ns_key); - StreamMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok() && !s.IsNotFound()) { diff --git a/src/types/redis_string.cc b/src/types/redis_string.cc index 7d78534256a..210fb697b91 100644 --- a/src/types/redis_string.cc +++ b/src/types/redis_string.cc @@ -118,7 +118,6 @@ rocksdb::Status String::Append(engine::Context &ctx, const std::string &user_key *new_size = 0; std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); std::string raw_value; rocksdb::Status s = getRawValue(ctx, ns_key, &raw_value); if (!s.ok() && !s.IsNotFound()) return s; @@ -156,7 +155,6 @@ rocksdb::Status String::GetEx(engine::Context &ctx, const std::string &user_key, std::optional expire) { std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); rocksdb::Status s = getValue(ctx, ns_key, value); if (!s.ok()) return s; @@ -190,7 +188,6 @@ rocksdb::Status String::GetSet(engine::Context &ctx, const std::string &user_key rocksdb::Status String::GetDel(engine::Context &ctx, const std::string &user_key, std::string *value) { std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); rocksdb::Status s = getValue(ctx, ns_key, value); if (!s.ok()) return s; @@ -199,7 +196,7 @@ rocksdb::Status String::GetDel(engine::Context &ctx, const std::string &user_key rocksdb::Status String::Set(engine::Context &ctx, const std::string &user_key, const std::string &value) { std::vector pairs{StringPair{user_key, value}}; - return MSet(ctx, pairs, /*expire=*/0, /*lock=*/true); + return MSet(ctx, pairs, /*expire=*/0); } rocksdb::Status String::Set(engine::Context &ctx, const std::string &user_key, const std::string &value, @@ -207,7 +204,6 @@ rocksdb::Status String::Set(engine::Context &ctx, const std::string &user_key, c uint64_t expire = 0; std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); bool need_old_value = args.type != StringSetType::NONE || args.get || args.keep_ttl; if (need_old_value) { std::string old_value; @@ -289,7 +285,6 @@ rocksdb::Status String::SetRange(engine::Context &ctx, const std::string &user_k const std::string &value, uint64_t *new_size) { std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); std::string raw_value; rocksdb::Status s = getRawValue(ctx, ns_key, &raw_value); if (!s.ok() && !s.IsNotFound()) return s; @@ -329,7 +324,6 @@ rocksdb::Status String::IncrBy(engine::Context &ctx, const std::string &user_key int64_t *new_value) { std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); std::string raw_value; rocksdb::Status s = getRawValue(ctx, ns_key, &raw_value); if (!s.ok() && !s.IsNotFound()) return s; @@ -366,7 +360,7 @@ rocksdb::Status String::IncrBy(engine::Context &ctx, const std::string &user_key rocksdb::Status String::IncrByFloat(engine::Context &ctx, const std::string &user_key, double increment, double *new_value) { std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); + std::string raw_value; rocksdb::Status s = getRawValue(ctx, ns_key, &raw_value); if (!s.ok() && !s.IsNotFound()) return s; @@ -397,21 +391,7 @@ rocksdb::Status String::IncrByFloat(engine::Context &ctx, const std::string &use return updateRawValue(ctx, ns_key, raw_value); } -rocksdb::Status String::MSet(engine::Context &ctx, const std::vector &pairs, uint64_t expire_ms, - bool lock) { - // Data race, key string maybe overwrite by other key while didn't lock the keys here, - // to improve the set performance - std::optional guard; - if (lock) { - std::vector lock_keys; - lock_keys.reserve(pairs.size()); - for (const StringPair &pair : pairs) { - std::string ns_key = AppendNamespacePrefix(pair.key); - lock_keys.emplace_back(std::move(ns_key)); - } - guard.emplace(storage_->GetLockManager(), lock_keys); - } - +rocksdb::Status String::MSet(engine::Context &ctx, const std::vector &pairs, uint64_t expire_ms) { auto batch = storage_->GetWriteBatchBase(); WriteBatchLogData log_data(kRedisString); auto s = batch->PutLogData(log_data.Encode()); @@ -434,25 +414,19 @@ rocksdb::Status String::MSetNX(engine::Context &ctx, const std::vector lock_keys; - lock_keys.reserve(pairs.size()); std::vector keys; keys.reserve(pairs.size()); for (StringPair pair : pairs) { std::string ns_key = AppendNamespacePrefix(pair.key); - lock_keys.emplace_back(std::move(ns_key)); keys.emplace_back(pair.key); } - // Lock these keys before doing anything. - MultiLockGuard guard(storage_->GetLockManager(), lock_keys); - if (Exists(ctx, keys, &exists).ok() && exists > 0) { return rocksdb::Status::OK(); } - rocksdb::Status s = MSet(ctx, pairs, /*expire_ms=*/expire_ms, /*lock=*/false); + rocksdb::Status s = MSet(ctx, pairs, /*expire_ms=*/expire_ms); if (!s.ok()) return s; *flag = true; @@ -471,7 +445,6 @@ rocksdb::Status String::CAS(engine::Context &ctx, const std::string &user_key, c std::string current_value; std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); rocksdb::Status s = getValue(ctx, ns_key, ¤t_value); if (!s.ok() && !s.IsNotFound()) { @@ -507,7 +480,6 @@ rocksdb::Status String::CAD(engine::Context &ctx, const std::string &user_key, c std::string current_value; std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); rocksdb::Status s = getValue(ctx, ns_key, ¤t_value); if (!s.ok() && !s.IsNotFound()) { diff --git a/src/types/redis_string.h b/src/types/redis_string.h index b218bae346a..e5025d64fc6 100644 --- a/src/types/redis_string.h +++ b/src/types/redis_string.h @@ -100,8 +100,7 @@ class String : public Database { rocksdb::Status IncrByFloat(engine::Context &ctx, const std::string &user_key, double increment, double *new_value); std::vector MGet(engine::Context &ctx, const std::vector &keys, std::vector *values); - rocksdb::Status MSet(engine::Context &ctx, const std::vector &pairs, uint64_t expire_ms, - bool lock = true); + rocksdb::Status MSet(engine::Context &ctx, const std::vector &pairs, uint64_t expire_ms); rocksdb::Status MSetNX(engine::Context &ctx, const std::vector &pairs, uint64_t expire_ms, bool *flag); rocksdb::Status CAS(engine::Context &ctx, const std::string &user_key, const std::string &old_value, const std::string &new_value, uint64_t expire_ms, int *flag); diff --git a/src/types/redis_zset.cc b/src/types/redis_zset.cc index 4474b348e5b..5fa586d6160 100644 --- a/src/types/redis_zset.cc +++ b/src/types/redis_zset.cc @@ -42,7 +42,6 @@ rocksdb::Status ZSet::Add(engine::Context &ctx, const Slice &user_key, ZAddFlags std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); ZSetMetadata metadata; rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok() && !s.IsNotFound()) return s; @@ -159,7 +158,6 @@ rocksdb::Status ZSet::Pop(engine::Context &ctx, const Slice &user_key, int count std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); ZSetMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s; @@ -224,8 +222,6 @@ rocksdb::Status ZSet::RangeByRank(engine::Context &ctx, const Slice &user_key, c std::string ns_key = AppendNamespacePrefix(user_key); - std::optional lock_guard; - if (spec.with_deletion) lock_guard.emplace(storage_->GetLockManager(), ns_key); ZSetMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); @@ -306,8 +302,6 @@ rocksdb::Status ZSet::RangeByScore(engine::Context &ctx, const Slice &user_key, std::string ns_key = AppendNamespacePrefix(user_key); - std::optional lock_guard; - if (spec.with_deletion) lock_guard.emplace(storage_->GetLockManager(), ns_key); ZSetMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s; @@ -429,10 +423,6 @@ rocksdb::Status ZSet::RangeByLex(engine::Context &ctx, const Slice &user_key, co std::string ns_key = AppendNamespacePrefix(user_key); - std::optional lock_guard; - if (spec.with_deletion) { - lock_guard.emplace(storage_->GetLockManager(), ns_key); - } ZSetMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s; @@ -526,7 +516,6 @@ rocksdb::Status ZSet::Remove(engine::Context &ctx, const Slice &user_key, const *removed_cnt = 0; std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); ZSetMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s; @@ -621,7 +610,6 @@ rocksdb::Status ZSet::Rank(engine::Context &ctx, const Slice &user_key, const Sl rocksdb::Status ZSet::Overwrite(engine::Context &ctx, const Slice &user_key, const MemberScores &mscores) { std::string ns_key = AppendNamespacePrefix(user_key); - LockGuard guard(storage_->GetLockManager(), ns_key); ZSetMetadata metadata; auto batch = storage_->GetWriteBatchBase(); WriteBatchLogData log_data(kRedisZSet); @@ -658,14 +646,6 @@ rocksdb::Status ZSet::InterStore(engine::Context &ctx, const Slice &dst, const s rocksdb::Status ZSet::Inter(engine::Context &ctx, const std::vector &keys_weights, AggregateMethod aggregate_method, std::vector *members) { - std::vector lock_keys; - lock_keys.reserve(keys_weights.size()); - for (const auto &key_weight : keys_weights) { - std::string ns_key = AppendNamespacePrefix(key_weight.key); - lock_keys.emplace_back(std::move(ns_key)); - } - MultiLockGuard guard(storage_->GetLockManager(), lock_keys); - std::map dst_zset; std::map member_counters; std::vector target_mscores; @@ -723,14 +703,6 @@ rocksdb::Status ZSet::Inter(engine::Context &ctx, const std::vector & rocksdb::Status ZSet::InterCard(engine::Context &ctx, const std::vector &user_keys, uint64_t limit, uint64_t *inter_cnt) { - std::vector lock_keys; - lock_keys.reserve(user_keys.size()); - for (const auto &user_key : user_keys) { - std::string ns_key = AppendNamespacePrefix(user_key); - lock_keys.emplace_back(std::move(ns_key)); - } - MultiLockGuard guard(storage_->GetLockManager(), lock_keys); - std::vector mscores_list; mscores_list.reserve(user_keys.size()); RangeScoreSpec spec; @@ -780,14 +752,6 @@ rocksdb::Status ZSet::UnionStore(engine::Context &ctx, const Slice &dst, const s rocksdb::Status ZSet::Union(engine::Context &ctx, const std::vector &keys_weights, AggregateMethod aggregate_method, std::vector *members) { - std::vector lock_keys; - lock_keys.reserve(keys_weights.size()); - for (const auto &key_weight : keys_weights) { - std::string ns_key = AppendNamespacePrefix(key_weight.key); - lock_keys.emplace_back(std::move(ns_key)); - } - MultiLockGuard guard(storage_->GetLockManager(), lock_keys); - std::map dst_zset; std::vector target_mscores; uint64_t target_size = 0; diff --git a/tests/gocase/unit/protocol/regression_test.go b/tests/gocase/unit/protocol/regression_test.go index 7dd4ea22b41..091bed26c29 100644 --- a/tests/gocase/unit/protocol/regression_test.go +++ b/tests/gocase/unit/protocol/regression_test.go @@ -23,7 +23,6 @@ import ( "context" "fmt" "testing" - "time" "github.com/apache/kvrocks/tests/gocase/util" "github.com/stretchr/testify/require" @@ -42,8 +41,6 @@ func TestRegression(t *testing.T) { proto := "*3\r\n$5\r\nBLPOP\r\n$6\r\nhandle\r\n$1\r\n0\r\n" require.NoError(t, c.Write(fmt.Sprintf("%s%s", proto, proto))) - // TODO: Remove time.Sleep after fix issue #2473 - time.Sleep(100 * time.Millisecond) resList := []string{"*2", "$6", "handle", "$1", "a"} v := rdb.RPush(ctx, "handle", "a") diff --git a/tests/gocase/unit/type/list/list_test.go b/tests/gocase/unit/type/list/list_test.go index 8b7a2c58f4e..f7509300236 100644 --- a/tests/gocase/unit/type/list/list_test.go +++ b/tests/gocase/unit/type/list/list_test.go @@ -376,22 +376,13 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() createList("blist", []string{"a", "b", large, "c", "d"}) - // TODO: Remove time.Sleep after fix issue #2473 - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("blpop", "blist", "1")) - time.Sleep(100 * time.Millisecond) rd.MustReadStrings(t, []string{"blist", "a"}) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("brpop", "blist", "1")) - time.Sleep(100 * time.Millisecond) rd.MustReadStrings(t, []string{"blist", "d"}) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("blpop", "blist", "1")) - time.Sleep(100 * time.Millisecond) rd.MustReadStrings(t, []string{"blist", "b"}) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("brpop", "blist", "1")) - time.Sleep(100 * time.Millisecond) rd.MustReadStrings(t, []string{"blist", "c"}) }) @@ -400,23 +391,15 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { defer func() { require.NoError(t, rd.Close()) }() createList("blist1", []string{"a", large, "c"}) createList("blist2", []string{"d", large, "f"}) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("blpop", "blist1", "blist2", "1")) - time.Sleep(100 * time.Millisecond) rd.MustReadStrings(t, []string{"blist1", "a"}) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("brpop", "blist1", "blist2", "1")) - time.Sleep(100 * time.Millisecond) rd.MustReadStrings(t, []string{"blist1", "c"}) require.EqualValues(t, 1, rdb.LLen(ctx, "blist1").Val()) require.EqualValues(t, 3, rdb.LLen(ctx, "blist2").Val()) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("blpop", "blist2", "blist2", "1")) - time.Sleep(100 * time.Millisecond) rd.MustReadStrings(t, []string{"blist2", "d"}) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("brpop", "blist2", "blist2", "1")) - time.Sleep(100 * time.Millisecond) rd.MustReadStrings(t, []string{"blist2", "f"}) require.EqualValues(t, 1, rdb.LLen(ctx, "blist1").Val()) require.EqualValues(t, 1, rdb.LLen(ctx, "blist2").Val()) @@ -427,13 +410,9 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, "blist1").Err()) createList("blist2", []string{"d", large, "f"}) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("blpop", "blist1", "blist2", "1")) - time.Sleep(100 * time.Millisecond) rd.MustReadStrings(t, []string{"blist2", "d"}) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("brpop", "blist1", "blist2", "1")) - time.Sleep(100 * time.Millisecond) rd.MustReadStrings(t, []string{"blist2", "f"}) require.EqualValues(t, 0, rdb.LLen(ctx, "blist1").Val()) require.EqualValues(t, 1, rdb.LLen(ctx, "blist2").Val()) @@ -444,25 +423,17 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, "list1", "list2").Err()) - time.Sleep(time.Millisecond * 100) require.NoError(t, rd.WriteArgs("blpop", "list1", "list2", "list2", "list1", "0")) - time.Sleep(time.Millisecond * 100) require.NoError(t, rdb.LPush(ctx, "list1", "a").Err()) rd.MustReadStrings(t, []string{"list1", "a"}) - time.Sleep(time.Millisecond * 100) require.NoError(t, rd.WriteArgs("blpop", "list1", "list2", "list2", "list1", "0")) - time.Sleep(time.Millisecond * 100) require.NoError(t, rdb.LPush(ctx, "list2", "b").Err()) rd.MustReadStrings(t, []string{"list2", "b"}) require.NoError(t, rdb.LPush(ctx, "list1", "a").Err()) require.NoError(t, rdb.LPush(ctx, "list2", "b").Err()) - time.Sleep(time.Millisecond * 100) require.NoError(t, rd.WriteArgs("blpop", "list1", "list2", "list2", "list1", "0")) - time.Sleep(time.Millisecond * 100) rd.MustReadStrings(t, []string{"list1", "a"}) - time.Sleep(time.Millisecond * 100) require.NoError(t, rd.WriteArgs("blpop", "list1", "list2", "list2", "list1", "0")) - time.Sleep(time.Millisecond * 100) rd.MustReadStrings(t, []string{"list2", "b"}) }) @@ -470,9 +441,7 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, "blist", "target").Err()) - time.Sleep(time.Millisecond * 100) require.NoError(t, rd.WriteArgs("blpop", "blist", "0")) - time.Sleep(time.Millisecond * 100) require.EqualValues(t, 2, rdb.LPush(ctx, "blist", "foo", "bar").Val()) rd.MustReadStrings(t, []string{"blist", "bar"}) require.Equal(t, "foo", rdb.LRange(ctx, "blist", 0, -1).Val()[0]) @@ -483,9 +452,7 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, "blist1").Err()) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs(popType, "blist1", "0")) - time.Sleep(100 * time.Millisecond) require.NoError(t, rdb.RPush(ctx, "blist1", "foo").Err()) rd.MustReadStrings(t, []string{"blist1", "foo"}) require.EqualValues(t, 0, rdb.Exists(ctx, "blist1").Val()) @@ -495,7 +462,6 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rd.WriteArgs(popType, "blist1", "-1")) - time.Sleep(100 * time.Millisecond) rd.MustMatch(t, ".*negative.*") }) @@ -505,7 +471,6 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rd.WriteArgs(popType, "blist1", "0")) - time.Sleep(time.Millisecond * 1000) require.NoError(t, rdb.RPush(ctx, "blist1", "foo").Err()) rd.MustReadStrings(t, []string{"blist1", "foo"}) }) @@ -515,7 +480,6 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, "blist1", "blist2").Err()) require.NoError(t, rdb.Set(ctx, "blist2", "nolist", 0).Err()) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs(popType, "blist1", "blist2", "1")) rd.MustMatch(t, ".*WRONGTYPE.*") }) @@ -524,7 +488,6 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, "blist1", "blist2").Err()) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs(popType, "blist1", "blist2", "1")) rd.MustMatch(t, "") }) @@ -533,16 +496,12 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, "blist1", "blist2").Err()) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs(popType, "blist1", "blist2", "4")) - time.Sleep(100 * time.Millisecond) require.NoError(t, rdb.RPush(ctx, "blist1", "foo").Err()) rd.MustReadStrings(t, []string{"blist1", "foo"}) require.EqualValues(t, 0, rdb.Exists(ctx, "blist1").Val()) require.EqualValues(t, 0, rdb.Exists(ctx, "blist2").Val()) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs(popType, "blist1", "blist2", "1")) - time.Sleep(100 * time.Millisecond) require.NoError(t, rdb.RPush(ctx, "blist2", "foo").Err()) rd.MustReadStrings(t, []string{"blist2", "foo"}) require.EqualValues(t, 0, rdb.Exists(ctx, "blist1").Val()) @@ -950,9 +909,7 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { require.NoError(t, rdb.Del(ctx, "target_key{t}").Err()) require.NoError(t, rdb.RPush(ctx, "target_key{t}", 1).Err()) createList("list{t}", []string{"a", "b", "c", "d"}) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("lmove", "list{t}", "target_key{t}", from, to)) - time.Sleep(100 * time.Millisecond) r, err1 := rd.ReadLine() require.Equal(t, "$1", r) require.NoError(t, err1) @@ -998,9 +955,7 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { require.NoError(t, rdb.Del(ctx, "target_key{t}").Err()) require.NoError(t, rdb.RPush(ctx, "target_key{t}", 1).Err()) createList("list{t}", []string{"a", "b", "c", "d"}) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("blmove", "list{t}", "target_key{t}", from, to, "1")) - time.Sleep(100 * time.Millisecond) r, err1 := rd.ReadLine() require.Equal(t, "$1", r) require.NoError(t, err1) @@ -1026,9 +981,7 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, "blist", "target").Err()) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("blmove", "blist", "target", "left", "right", "0")) - time.Sleep(100 * time.Millisecond) require.EqualValues(t, 2, rdb.LPush(ctx, "blist", "foo", "bar").Val()) rd.MustRead(t, "$3") require.Equal(t, "bar", rdb.LRange(ctx, "target", 0, -1).Val()[0]) @@ -1436,9 +1389,7 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, key1, key2).Err()) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("blmpop", "1", "1", key1, direction, "count", "1")) - time.Sleep(time.Millisecond * 100) require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) if direction == "LEFT" { rd.MustReadStringsWithKey(t, key1, []string{"ONE"}) @@ -1452,9 +1403,7 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, key1, key2).Err()) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("blmpop", "1", "1", key1, direction, "count", "2")) - time.Sleep(time.Millisecond * 100) require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) if direction == "LEFT" { rd.MustReadStringsWithKey(t, key1, []string{"ONE", "TWO"}) @@ -1468,9 +1417,7 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, key1, key2).Err()) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("blmpop", "1", "1", key1, direction, "count", "10")) - time.Sleep(time.Millisecond * 100) require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) if direction == "LEFT" { rd.MustReadStringsWithKey(t, key1, []string{"ONE", "TWO"}) @@ -1484,9 +1431,7 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, key1, key2).Err()) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("blmpop", "1", "2", key1, key2, direction, "count", "2")) - time.Sleep(time.Millisecond * 100) require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) if direction == "LEFT" { rd.MustReadStringsWithKey(t, key1, []string{"ONE", "TWO"}) @@ -1500,9 +1445,7 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, key1, key2).Err()) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("blmpop", "1", "2", key1, key2, direction, "count", "2")) - time.Sleep(time.Millisecond * 100) require.NoError(t, rdb.RPush(ctx, key2, "one", "two").Err()) if direction == "LEFT" { rd.MustReadStringsWithKey(t, key2, []string{"one", "two"}) @@ -1516,9 +1459,10 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, key1, key2).Err()) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("blmpop", "1", "2", key1, key2, direction, "count", "2")) - time.Sleep(time.Millisecond * 100) + // https://github.com/apache/kvrocks/issues/2617 + // WriteArgs are required to be executed first + time.Sleep(100 * time.Millisecond) require.NoError(t, rdb.RPush(ctx, key2, "one", "two").Err()) require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) if direction == "LEFT" { @@ -1534,9 +1478,7 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, key1, key2).Err()) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("blmpop", "1", "2", key1, key2, direction)) - time.Sleep(time.Millisecond * 100) require.NoError(t, rdb.RPush(ctx, key2, "one", "two").Err()) if direction == "LEFT" { rd.MustReadStringsWithKey(t, key2, []string{"one"}) @@ -1551,9 +1493,7 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, key1, key2).Err()) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("blmpop", "1", "2", key1, key2, direction)) - time.Sleep(time.Millisecond * 1200) rd.MustMatch(t, "") }) @@ -1562,9 +1502,7 @@ func testList(t *testing.T, configs util.KvrocksServerConfigs) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, key1, key2).Err()) - time.Sleep(100 * time.Millisecond) require.NoError(t, rd.WriteArgs("blmpop", "0", "2", key1, key2, direction, "count", "2")) - time.Sleep(time.Millisecond * 1200) require.NoError(t, rdb.RPush(ctx, key2, "one", "two").Err()) if direction == "LEFT" { rd.MustReadStringsWithKey(t, key2, []string{"one", "two"}) diff --git a/tests/gocase/unit/type/zset/zset_test.go b/tests/gocase/unit/type/zset/zset_test.go index ad57defaa85..fce7e96a652 100644 --- a/tests/gocase/unit/type/zset/zset_test.go +++ b/tests/gocase/unit/type/zset/zset_test.go @@ -332,8 +332,6 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx context.Context, enabledRES rdb.ZAdd(ctx, "zsetb", redis.Z{Score: 1, Member: "d"}, redis.Z{Score: 2, Member: "e"}) require.EqualValues(t, 3, rdb.ZCard(ctx, "zseta").Val()) require.EqualValues(t, 2, rdb.ZCard(ctx, "zsetb").Val()) - // TODO: Remove time.Sleep after fix issue #2473 - time.Sleep(time.Millisecond * 100) resultz := rdb.BZPopMin(ctx, 0, "zseta", "zsetb").Val().Z require.Equal(t, redis.Z{Score: 1, Member: "a"}, resultz) resultz = rdb.BZPopMin(ctx, 0, "zseta", "zsetb").Val().Z @@ -349,9 +347,7 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx context.Context, enabledRES rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() - time.Sleep(time.Millisecond * 100) require.NoError(t, rd.WriteArgs("bzpopmin", "zseta", "0")) - time.Sleep(time.Millisecond * 100) rdb.ZAdd(ctx, "zseta", redis.Z{Score: 1, Member: "a"}) rd.MustReadStrings(t, []string{"zseta", "a", "1"}) }) @@ -363,7 +359,6 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx context.Context, enabledRES rdb.ZAdd(ctx, "zsetb", redis.Z{Score: 1, Member: "d"}, redis.Z{Score: 2, Member: "e"}) require.EqualValues(t, 3, rdb.ZCard(ctx, "zseta").Val()) require.EqualValues(t, 2, rdb.ZCard(ctx, "zsetb").Val()) - time.Sleep(time.Millisecond * 100) resultz := rdb.BZPopMax(ctx, 0, "zseta", "zsetb").Val().Z require.Equal(t, redis.Z{Score: 3, Member: "c"}, resultz) resultz = rdb.BZPopMax(ctx, 0, "zseta", "zsetb").Val().Z @@ -379,9 +374,7 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx context.Context, enabledRES rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() - time.Sleep(time.Millisecond * 100) require.NoError(t, rd.WriteArgs("bzpopmax", "zseta", "0")) - time.Sleep(time.Millisecond * 100) rdb.ZAdd(ctx, "zseta", redis.Z{Score: 1, Member: "a"}) rd.MustReadStrings(t, []string{"zseta", "a", "1"}) }) From c01b00e61e0b913733bc2bc2b0945a9003a82aa1 Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 5 Nov 2024 10:18:36 +0800 Subject: [PATCH 3/3] fix(bugfix): for txn_write_batch creation (#2648) Co-authored-by: Twice --- src/storage/storage.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/storage/storage.cc b/src/storage/storage.cc index 2eead08ace6..234309f5f1a 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -868,9 +868,11 @@ Status Storage::BeginTxn() { // The EXEC command is exclusive and shouldn't have multi transaction at the same time, // so it's fine to reset the global write batch without any lock. is_txn_mode_ = true; - txn_write_batch_ = - std::make_unique(rocksdb::BytewiseComparator() /*default backup_index_comparator */, - 0 /* default reserved_bytes*/, GetWriteBatchMaxBytes()); + // Set overwrite_key to false to avoid overwriting the existing key in case + // like downstream would parse the replication log etc. + txn_write_batch_ = std::make_unique( + /*backup_index_comparator=*/rocksdb::BytewiseComparator(), + /*reserved_bytes=*/0, /*overwrite_key=*/false, /*max_bytes=*/GetWriteBatchMaxBytes()); return Status::OK(); }