Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: hoist key mutexes to ExecuteCommands #2620

Merged
merged 31 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
df58c47
Merge commit 'a29df097dd23c36a749e5c616f4d1cae1051e853' into unstable
PokIsemaine Sep 8, 2024
559f6a8
test: txn-context-enabled test cases
PokIsemaine Sep 8, 2024
adc497a
Merge branch 'apache:unstable' into unstable
PokIsemaine Sep 11, 2024
fcfeec5
Merge branch 'apache:unstable' into unstable
PokIsemaine Sep 17, 2024
603fe89
Merge branch 'apache:unstable' into unstable
PokIsemaine Oct 4, 2024
512f4a2
Merge commit 'f2c423348d7bb3b475d988559068ba8e2697d497' into unstable
PokIsemaine Oct 7, 2024
3d067c6
refactor: promote the engine::Context in the Execute
PokIsemaine Oct 7, 2024
476e08b
Merge branch 'unstable' into unstable
PragmaTwice Oct 9, 2024
d7cf182
Merge branch 'apache:unstable' into unstable
PokIsemaine Oct 11, 2024
4373fdd
Merge branch 'apache:unstable' into unstable
PokIsemaine Oct 13, 2024
5e2a4e8
Merge branch 'apache:unstable' into unstable
PokIsemaine Oct 16, 2024
9b20687
Merge branch 'apache:unstable' into unstable
PokIsemaine Oct 23, 2024
f439dbe
refactor: Hoist key mutexes
PokIsemaine Oct 23, 2024
ccf757d
chore: remove temporary comments
PokIsemaine Oct 24, 2024
19715ab
Merge branch 'unstable' into unstable
PragmaTwice Oct 24, 2024
0430acd
Merge branch 'unstable' into unstable
git-hulk Oct 28, 2024
1177698
fix: add MultiLockGuard of OnWrite
PokIsemaine Oct 30, 2024
765366f
fix: try to fix issue 2473
PokIsemaine Oct 31, 2024
b1af21f
Merge branch 'unstable' into unstable
PokIsemaine Oct 31, 2024
3715626
refactor: GetLockKeys=>GetLocks
PokIsemaine Oct 31, 2024
abd3826
fix: issues 2617
PokIsemaine Oct 31, 2024
ab3ce18
Merge branch 'unstable' into unstable
PokIsemaine Oct 31, 2024
23501da
chore: comments & guard
PokIsemaine Oct 31, 2024
6cc5351
Merge branch 'unstable' into unstable
PragmaTwice Nov 1, 2024
a521a7f
Merge branch 'unstable' into unstable
PragmaTwice Nov 1, 2024
02ca56a
Merge branch 'unstable' into unstable
PragmaTwice Nov 2, 2024
2672d72
Merge branch 'unstable' into unstable
PragmaTwice Nov 2, 2024
b8227a1
Merge branch 'unstable' into unstable
PragmaTwice Nov 3, 2024
8b8f1ca
Merge branch 'unstable' into unstable
PragmaTwice Nov 3, 2024
76a8b1d
Merge branch 'unstable' into unstable
PragmaTwice Nov 4, 2024
9af83cd
Merge branch 'unstable' into unstable
PragmaTwice Nov 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/commands/blocking_commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#pragma once

#include "commander.h"
#include "common/lock_manager.h"
#include "event_util.h"
#include "server/redis_connection.h"

Expand All @@ -44,6 +45,10 @@ class BlockingCommander : public Commander,
// in other words, returning true indicates ending the blocking
virtual bool OnBlockingWrite() = 0;

// GetLocks() locks the keys of the BlockingCommander with MultiLockGuard.
// When OnWrite() is triggered, BlockingCommander needs to relock the keys.
virtual MultiLockGuard GetLocks() = 0;

// to start the blocking process
// usually put to the end of the Execute method
Status StartBlocking(int64_t timeout, std::string *output) {
Expand All @@ -63,7 +68,11 @@ class BlockingCommander : public Commander,
}

void OnWrite(bufferevent *bev) {
bool done = OnBlockingWrite();
bool done{false};
{
auto guard = GetLocks();
done = OnBlockingWrite();
}

if (!done) {
// The connection may be waked up but can't pop from the datatype.
Expand Down
29 changes: 29 additions & 0 deletions src/commands/cmd_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,16 @@ class CommandBPop : public BlockingCommander {
return s;
}

MultiLockGuard GetLocks() override {
std::vector<std::string> lock_keys;
lock_keys.reserve(keys_.size());
for (const auto &key : keys_) {
auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key, srv_->storage->IsSlotIdEncoded());
lock_keys.emplace_back(std::move(ns_key));
}
return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
}

bool OnBlockingWrite() override {
engine::Context ctx(srv_->storage);
auto s = TryPopFromList(ctx);
Expand Down Expand Up @@ -436,6 +446,16 @@ class CommandBLMPop : public BlockingCommander {
}
}

MultiLockGuard GetLocks() override {
std::vector<std::string> lock_keys;
lock_keys.reserve(keys_.size());
for (const auto &key : keys_) {
auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key, srv_->storage->IsSlotIdEncoded());
lock_keys.emplace_back(std::move(ns_key));
}
return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
}

bool OnBlockingWrite() override {
engine::Context ctx(srv_->storage);
auto s = ExecuteUnblocked(ctx);
Expand Down Expand Up @@ -767,6 +787,15 @@ class CommandBLMove : public BlockingCommander {

void UnblockKeys() override { srv_->UnblockOnKey(args_[1], conn_); }

MultiLockGuard GetLocks() override {
std::vector<std::string> lock_keys{
ComposeNamespaceKey(conn_->GetNamespace(), args_[1], srv_->storage->IsSlotIdEncoded())};
if (args_[1] != args_[2]) {
lock_keys.emplace_back(ComposeNamespaceKey(conn_->GetNamespace(), args_[2], srv_->storage->IsSlotIdEncoded()));
}
return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
}

bool OnBlockingWrite() override {
redis::List list_db(srv_->storage, conn_->GetNamespace());
std::string elem;
Expand Down
8 changes: 8 additions & 0 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1635,6 +1635,14 @@ class CommandXReadGroup : public Commander,
redis::Stream stream_db(srv_->storage, conn_->GetNamespace());

std::vector<StreamReadResult> results;

std::vector<std::string> lock_keys;
lock_keys.reserve(streams_.size());
for (auto &stream_name : streams_) {
auto ns_key = stream_db.AppendNamespacePrefix(stream_name);
lock_keys.emplace_back(std::move(ns_key));
}
MultiLockGuard guard(srv_->storage->GetLockManager(), lock_keys);
PragmaTwice marked this conversation as resolved.
Show resolved Hide resolved
engine::Context ctx(srv_->storage);
for (size_t i = 0; i < streams_.size(); ++i) {
redis::StreamRangeOptions options;
Expand Down
20 changes: 20 additions & 0 deletions src/commands/cmd_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,16 @@ class CommandBZPop : public BlockingCommander {
conn_->Reply(output);
}

MultiLockGuard GetLocks() override {
std::vector<std::string> lock_keys;
lock_keys.reserve(keys_.size());
for (const auto &key : keys_) {
auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key, srv_->storage->IsSlotIdEncoded());
lock_keys.emplace_back(std::move(ns_key));
}
return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
}

bool OnBlockingWrite() override {
std::string user_key;
std::vector<MemberScore> member_scores;
Expand Down Expand Up @@ -548,6 +558,16 @@ class CommandBZMPop : public BlockingCommander {

std::string NoopReply(const Connection *conn) override { return conn->NilString(); }

MultiLockGuard GetLocks() override {
std::vector<std::string> lock_keys;
lock_keys.reserve(keys_.size());
for (const auto &key : keys_) {
auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key, srv_->storage->IsSlotIdEncoded());
lock_keys.emplace_back(std::move(ns_key));
}
return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
}

bool OnBlockingWrite() override {
std::string user_key;
std::vector<MemberScore> member_scores;
Expand Down
18 changes: 16 additions & 2 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,24 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {

SetLastCmd(cmd_name);
{
std::optional<MultiLockGuard> guard;
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
if (cmd_flags & kCmdWrite) {
std::vector<std::string> lock_keys;
attributes->ForEachKeyRange(
[&lock_keys, this](const std::vector<std::string> &args, const CommandKeyRange &key_range) {
key_range.ForEachKey(
[&, this](const std::string &key) {
auto ns_key = ComposeNamespaceKey(ns_, key, srv_->storage->IsSlotIdEncoded());
lock_keys.emplace_back(std::move(ns_key));
},
args);
},
cmd_tokens);

guard.emplace(srv_->storage->GetLockManager(), lock_keys);
}
engine::Context ctx(srv_->storage);

// TODO: transaction support for index recording
std::vector<GlobalIndexer::RecordResult> index_records;
if (!srv_->index_mgr.index_map.empty() && IsCmdForIndexing(cmd_flags, attributes->category) &&
!config->cluster_enabled) {
Expand All @@ -512,7 +527,6 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
}

s = ExecuteCommand(ctx, cmd_name, cmd_tokens, current_cmd.get(), &reply);
// TODO: transaction support for index updating
for (const auto &record : index_records) {
auto s = GlobalIndexer::Update(ctx, record);
if (!s.IsOK() && !s.Is<Status::TypeMismatched>()) {
Expand Down
20 changes: 8 additions & 12 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ rocksdb::Status Database::Expire(engine::Context &ctx, const Slice &user_key, ui

std::string value;
Metadata metadata(kRedisNone, false);
LockGuard guard(storage_->GetLockManager(), ns_key);

rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), metadata_cf_handle_, ns_key, &value);
if (!s.ok()) return s;

Expand Down Expand Up @@ -150,7 +150,7 @@ rocksdb::Status Database::Del(engine::Context &ctx, const Slice &user_key) {
std::string ns_key = AppendNamespacePrefix(user_key);

std::string value;
LockGuard guard(storage_->GetLockManager(), ns_key);

rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), metadata_cf_handle_, ns_key, &value);
if (!s.ok()) return s;
Metadata metadata(kRedisNone, false);
Expand All @@ -165,13 +165,12 @@ rocksdb::Status Database::Del(engine::Context &ctx, const Slice &user_key) {
rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice> &keys, uint64_t *deleted_cnt) {
*deleted_cnt = 0;

std::vector<std::string> lock_keys;
lock_keys.reserve(keys.size());
std::vector<std::string> ns_keys;
ns_keys.reserve(keys.size());
for (const auto &key : keys) {
std::string ns_key = AppendNamespacePrefix(key);
lock_keys.emplace_back(std::move(ns_key));
ns_keys.emplace_back(std::move(ns_key));
}
MultiLockGuard guard(storage_->GetLockManager(), lock_keys);

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisNone);
Expand All @@ -181,8 +180,8 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice> &k
}

std::vector<Slice> slice_keys;
slice_keys.reserve(lock_keys.size());
for (const auto &ns_key : lock_keys) {
slice_keys.reserve(ns_keys.size());
for (const auto &ns_key : ns_keys) {
slice_keys.emplace_back(ns_key);
}

Expand All @@ -202,7 +201,7 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice> &k
if (!s.ok()) continue;
if (metadata.Expired()) continue;

s = batch->Delete(metadata_cf_handle_, lock_keys[i]);
s = batch->Delete(metadata_cf_handle_, ns_keys[i]);
if (!s.ok()) return s;
*deleted_cnt += 1;
}
Expand Down Expand Up @@ -652,9 +651,6 @@ rocksdb::Status Database::typeInternal(engine::Context &ctx, const Slice &key, R

rocksdb::Status Database::Copy(engine::Context &ctx, const std::string &key, const std::string &new_key, bool nx,
bool delete_old, CopyResult *res) {
std::vector<std::string> lock_keys = {key, new_key};
MultiLockGuard guard(storage_->GetLockManager(), lock_keys);

RedisType type = kRedisNone;
auto s = typeInternal(ctx, key, &type);
if (!s.ok()) return s;
Expand Down
10 changes: 1 addition & 9 deletions src/types/redis_bitmap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ rocksdb::Status Bitmap::SetBit(engine::Context &ctx, const Slice &user_key, uint
std::string raw_value;
std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
BitmapMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata, &raw_value);
if (!s.ok() && !s.IsNotFound()) return s;
Expand Down Expand Up @@ -461,7 +460,6 @@ rocksdb::Status Bitmap::BitOp(engine::Context &ctx, BitOpFlags op_flag, const st
const Slice &user_key, const std::vector<Slice> &op_keys, int64_t *len) {
std::string raw_value;
std::string ns_key = AppendNamespacePrefix(user_key);
LockGuard guard(storage_->GetLockManager(), ns_key);

std::vector<std::pair<std::string, BitmapMetadata>> meta_pairs;
uint64_t max_bitmap_size = 0;
Expand Down Expand Up @@ -824,15 +822,9 @@ template <bool ReadOnly>
rocksdb::Status Bitmap::bitfield(engine::Context &ctx, const Slice &user_key, const std::vector<BitfieldOperation> &ops,
std::vector<std::optional<BitfieldValue>> *rets) {
std::string ns_key = AppendNamespacePrefix(user_key);

std::optional<LockGuard> guard;
if constexpr (!ReadOnly) {
guard = LockGuard(storage_->GetLockManager(), ns_key);
}

BitmapMetadata metadata;
std::string raw_value;
// TODO(mwish): maintain snapshot for read-only bitfield.
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved

auto s = GetMetadata(ctx, ns_key, &metadata, &raw_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
Expand Down
2 changes: 0 additions & 2 deletions src/types/redis_bloom_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ rocksdb::Status BloomChain::Reserve(engine::Context &ctx, const Slice &user_key,
uint16_t expansion) {
std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
BloomChainMetadata bloom_chain_metadata;
rocksdb::Status s = getBloomChainMetadata(ctx, ns_key, &bloom_chain_metadata);
if (!s.ok() && !s.IsNotFound()) return s;
Expand Down Expand Up @@ -156,7 +155,6 @@ rocksdb::Status BloomChain::InsertCommon(engine::Context &ctx, const Slice &user
const BloomFilterInsertOptions &insert_options,
std::vector<BloomFilterAddResult> *rets) {
std::string ns_key = AppendNamespacePrefix(user_key);
LockGuard guard(storage_->GetLockManager(), ns_key);

BloomChainMetadata metadata;
rocksdb::Status s = getBloomChainMetadata(ctx, ns_key, &metadata);
Expand Down
5 changes: 1 addition & 4 deletions src/types/redis_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ rocksdb::Status Hash::IncrBy(engine::Context &ctx, const Slice &user_key, const

std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
HashMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
Expand Down Expand Up @@ -117,7 +116,6 @@ rocksdb::Status Hash::IncrByFloat(engine::Context &ctx, const Slice &user_key, c

std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
HashMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
Expand Down Expand Up @@ -211,7 +209,7 @@ rocksdb::Status Hash::Delete(engine::Context &ctx, const Slice &user_key, const
WriteBatchLogData log_data(kRedisHash);
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
LockGuard guard(storage_->GetLockManager(), ns_key);

s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;

Expand Down Expand Up @@ -245,7 +243,6 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const Slice &user_key, const st
*added_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
HashMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
Expand Down
2 changes: 0 additions & 2 deletions src/types/redis_hyperloglog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ rocksdb::Status HyperLogLog::Add(engine::Context &ctx, const Slice &user_key,
*ret = 0;
std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
HyperLogLogMetadata metadata{};
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
Expand Down Expand Up @@ -238,7 +237,6 @@ rocksdb::Status HyperLogLog::Merge(engine::Context &ctx, const Slice &dest_user_
}

std::string dest_key = AppendNamespacePrefix(dest_user_key);
LockGuard guard(storage_->GetLockManager(), dest_key);
std::vector<std::string> registers;
HyperLogLogMetadata metadata;

Expand Down
Loading
Loading