Skip to content

Commit

Permalink
Merge branch 'unstable' into minor-enhancement
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Nov 5, 2024
2 parents 0440925 + c01b00e commit 88fc16f
Show file tree
Hide file tree
Showing 24 changed files with 114 additions and 259 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/kvrocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Check typos
uses: crate-ci/typos@v1.25.0
uses: crate-ci/typos@v1.27.0
with:
config: .github/config/typos.toml

Expand Down
11 changes: 10 additions & 1 deletion src/commands/blocking_commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#pragma once

#include "commander.h"
#include "common/lock_manager.h"
#include "event_util.h"
#include "server/redis_connection.h"

Expand All @@ -44,6 +45,10 @@ class BlockingCommander : public Commander,
// in other words, returning true indicates ending the blocking
virtual bool OnBlockingWrite() = 0;

// GetLocks() locks the keys of the BlockingCommander with MultiLockGuard.
// When OnWrite() is triggered, BlockingCommander needs to relock the keys.
virtual MultiLockGuard GetLocks() = 0;

// to start the blocking process
// usually put to the end of the Execute method
Status StartBlocking(int64_t timeout, std::string *output) {
Expand All @@ -63,7 +68,11 @@ class BlockingCommander : public Commander,
}

void OnWrite(bufferevent *bev) {
bool done = OnBlockingWrite();
bool done{false};
{
auto guard = GetLocks();
done = OnBlockingWrite();
}

if (!done) {
// The connection may be waked up but can't pop from the datatype.
Expand Down
29 changes: 29 additions & 0 deletions src/commands/cmd_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,16 @@ class CommandBPop : public BlockingCommander {
return s;
}

MultiLockGuard GetLocks() override {
std::vector<std::string> lock_keys;
lock_keys.reserve(keys_.size());
for (const auto &key : keys_) {
auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key, srv_->storage->IsSlotIdEncoded());
lock_keys.emplace_back(std::move(ns_key));
}
return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
}

bool OnBlockingWrite() override {
engine::Context ctx(srv_->storage);
auto s = TryPopFromList(ctx);
Expand Down Expand Up @@ -436,6 +446,16 @@ class CommandBLMPop : public BlockingCommander {
}
}

MultiLockGuard GetLocks() override {
std::vector<std::string> lock_keys;
lock_keys.reserve(keys_.size());
for (const auto &key : keys_) {
auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key, srv_->storage->IsSlotIdEncoded());
lock_keys.emplace_back(std::move(ns_key));
}
return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
}

bool OnBlockingWrite() override {
engine::Context ctx(srv_->storage);
auto s = ExecuteUnblocked(ctx);
Expand Down Expand Up @@ -767,6 +787,15 @@ class CommandBLMove : public BlockingCommander {

void UnblockKeys() override { srv_->UnblockOnKey(args_[1], conn_); }

MultiLockGuard GetLocks() override {
std::vector<std::string> lock_keys{
ComposeNamespaceKey(conn_->GetNamespace(), args_[1], srv_->storage->IsSlotIdEncoded())};
if (args_[1] != args_[2]) {
lock_keys.emplace_back(ComposeNamespaceKey(conn_->GetNamespace(), args_[2], srv_->storage->IsSlotIdEncoded()));
}
return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
}

bool OnBlockingWrite() override {
redis::List list_db(srv_->storage, conn_->GetNamespace());
std::string elem;
Expand Down
8 changes: 8 additions & 0 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1635,6 +1635,14 @@ class CommandXReadGroup : public Commander,
redis::Stream stream_db(srv_->storage, conn_->GetNamespace());

std::vector<StreamReadResult> results;

std::vector<std::string> lock_keys;
lock_keys.reserve(streams_.size());
for (auto &stream_name : streams_) {
auto ns_key = stream_db.AppendNamespacePrefix(stream_name);
lock_keys.emplace_back(std::move(ns_key));
}
MultiLockGuard guard(srv_->storage->GetLockManager(), lock_keys);
engine::Context ctx(srv_->storage);
for (size_t i = 0; i < streams_.size(); ++i) {
redis::StreamRangeOptions options;
Expand Down
20 changes: 20 additions & 0 deletions src/commands/cmd_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,16 @@ class CommandBZPop : public BlockingCommander {
conn_->Reply(output);
}

MultiLockGuard GetLocks() override {
std::vector<std::string> lock_keys;
lock_keys.reserve(keys_.size());
for (const auto &key : keys_) {
auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key, srv_->storage->IsSlotIdEncoded());
lock_keys.emplace_back(std::move(ns_key));
}
return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
}

bool OnBlockingWrite() override {
std::string user_key;
std::vector<MemberScore> member_scores;
Expand Down Expand Up @@ -548,6 +558,16 @@ class CommandBZMPop : public BlockingCommander {

std::string NoopReply(const Connection *conn) override { return conn->NilString(); }

MultiLockGuard GetLocks() override {
std::vector<std::string> lock_keys;
lock_keys.reserve(keys_.size());
for (const auto &key : keys_) {
auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key, srv_->storage->IsSlotIdEncoded());
lock_keys.emplace_back(std::move(ns_key));
}
return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
}

bool OnBlockingWrite() override {
std::string user_key;
std::vector<MemberScore> member_scores;
Expand Down
18 changes: 16 additions & 2 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,24 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {

SetLastCmd(cmd_name);
{
std::optional<MultiLockGuard> guard;
if (cmd_flags & kCmdWrite) {
std::vector<std::string> lock_keys;
attributes->ForEachKeyRange(
[&lock_keys, this](const std::vector<std::string> &args, const CommandKeyRange &key_range) {
key_range.ForEachKey(
[&, this](const std::string &key) {
auto ns_key = ComposeNamespaceKey(ns_, key, srv_->storage->IsSlotIdEncoded());
lock_keys.emplace_back(std::move(ns_key));
},
args);
},
cmd_tokens);

guard.emplace(srv_->storage->GetLockManager(), lock_keys);
}
engine::Context ctx(srv_->storage);

// TODO: transaction support for index recording
std::vector<GlobalIndexer::RecordResult> index_records;
if (!srv_->index_mgr.index_map.empty() && IsCmdForIndexing(cmd_flags, attributes->category) &&
!config->cluster_enabled) {
Expand All @@ -512,7 +527,6 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
}

s = ExecuteCommand(ctx, cmd_name, cmd_tokens, current_cmd.get(), &reply);
// TODO: transaction support for index updating
for (const auto &record : index_records) {
auto s = GlobalIndexer::Update(ctx, record);
if (!s.IsOK() && !s.Is<Status::TypeMismatched>()) {
Expand Down
20 changes: 8 additions & 12 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ rocksdb::Status Database::Expire(engine::Context &ctx, const Slice &user_key, ui

std::string value;
Metadata metadata(kRedisNone, false);
LockGuard guard(storage_->GetLockManager(), ns_key);

rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), metadata_cf_handle_, ns_key, &value);
if (!s.ok()) return s;

Expand Down Expand Up @@ -150,7 +150,7 @@ rocksdb::Status Database::Del(engine::Context &ctx, const Slice &user_key) {
std::string ns_key = AppendNamespacePrefix(user_key);

std::string value;
LockGuard guard(storage_->GetLockManager(), ns_key);

rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), metadata_cf_handle_, ns_key, &value);
if (!s.ok()) return s;
Metadata metadata(kRedisNone, false);
Expand All @@ -165,13 +165,12 @@ rocksdb::Status Database::Del(engine::Context &ctx, const Slice &user_key) {
rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice> &keys, uint64_t *deleted_cnt) {
*deleted_cnt = 0;

std::vector<std::string> lock_keys;
lock_keys.reserve(keys.size());
std::vector<std::string> ns_keys;
ns_keys.reserve(keys.size());
for (const auto &key : keys) {
std::string ns_key = AppendNamespacePrefix(key);
lock_keys.emplace_back(std::move(ns_key));
ns_keys.emplace_back(std::move(ns_key));
}
MultiLockGuard guard(storage_->GetLockManager(), lock_keys);

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisNone);
Expand All @@ -181,8 +180,8 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice> &k
}

std::vector<Slice> slice_keys;
slice_keys.reserve(lock_keys.size());
for (const auto &ns_key : lock_keys) {
slice_keys.reserve(ns_keys.size());
for (const auto &ns_key : ns_keys) {
slice_keys.emplace_back(ns_key);
}

Expand All @@ -202,7 +201,7 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice> &k
if (!s.ok()) continue;
if (metadata.Expired()) continue;

s = batch->Delete(metadata_cf_handle_, lock_keys[i]);
s = batch->Delete(metadata_cf_handle_, ns_keys[i]);
if (!s.ok()) return s;
*deleted_cnt += 1;
}
Expand Down Expand Up @@ -652,9 +651,6 @@ rocksdb::Status Database::typeInternal(engine::Context &ctx, const Slice &key, R

rocksdb::Status Database::Copy(engine::Context &ctx, const std::string &key, const std::string &new_key, bool nx,
bool delete_old, CopyResult *res) {
std::vector<std::string> lock_keys = {key, new_key};
MultiLockGuard guard(storage_->GetLockManager(), lock_keys);

RedisType type = kRedisNone;
auto s = typeInternal(ctx, key, &type);
if (!s.ok()) return s;
Expand Down
8 changes: 5 additions & 3 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -871,9 +871,11 @@ Status Storage::BeginTxn() {
// The EXEC command is exclusive and shouldn't have multi transaction at the same time,
// so it's fine to reset the global write batch without any lock.
is_txn_mode_ = true;
txn_write_batch_ =
std::make_unique<rocksdb::WriteBatchWithIndex>(/*backup_index_comparator=*/rocksdb::BytewiseComparator(),
/*reserved_bytes=*/0, GetWriteBatchMaxBytes());
// Set overwrite_key to false to avoid overwriting the existing key in case
// like downstream would parse the replication log etc.
txn_write_batch_ = std::make_unique<rocksdb::WriteBatchWithIndex>(
/*backup_index_comparator=*/rocksdb::BytewiseComparator(),
/*reserved_bytes=*/0, /*overwrite_key=*/false, /*max_bytes=*/GetWriteBatchMaxBytes());
return Status::OK();
}

Expand Down
10 changes: 1 addition & 9 deletions src/types/redis_bitmap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ rocksdb::Status Bitmap::SetBit(engine::Context &ctx, const Slice &user_key, uint
std::string raw_value;
std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
BitmapMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata, &raw_value);
if (!s.ok() && !s.IsNotFound()) return s;
Expand Down Expand Up @@ -461,7 +460,6 @@ rocksdb::Status Bitmap::BitOp(engine::Context &ctx, BitOpFlags op_flag, const st
const Slice &user_key, const std::vector<Slice> &op_keys, int64_t *len) {
std::string raw_value;
std::string ns_key = AppendNamespacePrefix(user_key);
LockGuard guard(storage_->GetLockManager(), ns_key);

std::vector<std::pair<std::string, BitmapMetadata>> meta_pairs;
uint64_t max_bitmap_size = 0;
Expand Down Expand Up @@ -824,15 +822,9 @@ template <bool ReadOnly>
rocksdb::Status Bitmap::bitfield(engine::Context &ctx, const Slice &user_key, const std::vector<BitfieldOperation> &ops,
std::vector<std::optional<BitfieldValue>> *rets) {
std::string ns_key = AppendNamespacePrefix(user_key);

std::optional<LockGuard> guard;
if constexpr (!ReadOnly) {
guard = LockGuard(storage_->GetLockManager(), ns_key);
}

BitmapMetadata metadata;
std::string raw_value;
// TODO(mwish): maintain snapshot for read-only bitfield.

auto s = GetMetadata(ctx, ns_key, &metadata, &raw_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
Expand Down
2 changes: 0 additions & 2 deletions src/types/redis_bloom_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ rocksdb::Status BloomChain::Reserve(engine::Context &ctx, const Slice &user_key,
uint16_t expansion) {
std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
BloomChainMetadata bloom_chain_metadata;
rocksdb::Status s = getBloomChainMetadata(ctx, ns_key, &bloom_chain_metadata);
if (!s.ok() && !s.IsNotFound()) return s;
Expand Down Expand Up @@ -156,7 +155,6 @@ rocksdb::Status BloomChain::InsertCommon(engine::Context &ctx, const Slice &user
const BloomFilterInsertOptions &insert_options,
std::vector<BloomFilterAddResult> *rets) {
std::string ns_key = AppendNamespacePrefix(user_key);
LockGuard guard(storage_->GetLockManager(), ns_key);

BloomChainMetadata metadata;
rocksdb::Status s = getBloomChainMetadata(ctx, ns_key, &metadata);
Expand Down
5 changes: 1 addition & 4 deletions src/types/redis_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ rocksdb::Status Hash::IncrBy(engine::Context &ctx, const Slice &user_key, const

std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
HashMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
Expand Down Expand Up @@ -117,7 +116,6 @@ rocksdb::Status Hash::IncrByFloat(engine::Context &ctx, const Slice &user_key, c

std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
HashMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
Expand Down Expand Up @@ -211,7 +209,7 @@ rocksdb::Status Hash::Delete(engine::Context &ctx, const Slice &user_key, const
WriteBatchLogData log_data(kRedisHash);
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
LockGuard guard(storage_->GetLockManager(), ns_key);

s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;

Expand Down Expand Up @@ -245,7 +243,6 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const Slice &user_key, const st
*added_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
HashMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
Expand Down
2 changes: 0 additions & 2 deletions src/types/redis_hyperloglog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ rocksdb::Status HyperLogLog::Add(engine::Context &ctx, const Slice &user_key,
*ret = 0;
std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
HyperLogLogMetadata metadata{};
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
Expand Down Expand Up @@ -238,7 +237,6 @@ rocksdb::Status HyperLogLog::Merge(engine::Context &ctx, const Slice &dest_user_
}

std::string dest_key = AppendNamespacePrefix(dest_user_key);
LockGuard guard(storage_->GetLockManager(), dest_key);
std::vector<std::string> registers;
HyperLogLogMetadata metadata;

Expand Down
Loading

0 comments on commit 88fc16f

Please sign in to comment.