Skip to content

Commit

Permalink
feat(server):Async unlock multi
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
  • Loading branch information
dranikpg committed Feb 10, 2023
1 parent 2362e55 commit 0c292bd
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 18 deletions.
29 changes: 12 additions & 17 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -600,33 +600,28 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
// Runs in the coordinator fiber.
void Transaction::UnlockMulti() {
VLOG(1) << "UnlockMulti " << DebugId();

DCHECK(multi_);
using KeyList = vector<pair<std::string_view, LockCnt>>;
vector<KeyList> sharded_keys(shard_set->size());

// It's LE and not EQ because there may be callbacks in progress that increase use_count_.
DCHECK_LE(1u, GetUseCount());
DCHECK_GE(GetUseCount(), 1u); // Greater-equal because there may be callbacks in progress.

for (const auto& k_v : multi_->lock_counts) {
ShardId sid = Shard(k_v.first, sharded_keys.size());
sharded_keys[sid].push_back(k_v);
auto sharded_keys = make_shared<vector<KeyList>>(shard_set->size());
for (const auto& [key, cnt] : multi_->lock_counts) {
ShardId sid = Shard(key, sharded_keys->size());
(*sharded_keys)[sid].emplace_back(key, cnt);
}

uint32_t shard_journals_cnt = 0;
if (ServerState::tlocal()->journal()) {
shard_journals_cnt = CalcMultiNumOfShardJournals();
}
unsigned shard_journals_cnt =
ServerState::tlocal()->journal() ? CalcMultiNumOfShardJournals() : 0;

uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed);
DCHECK_EQ(prev, 0u);

use_count_.fetch_add(shard_data_.size(), std::memory_order_relaxed);
for (ShardId i = 0; i < shard_data_.size(); ++i) {
shard_set->Add(
i, [&] { UnlockMultiShardCb(sharded_keys, EngineShard::tlocal(), shard_journals_cnt); });
shard_set->Add(i, [this, sharded_keys, shard_journals_cnt]() {
this->UnlockMultiShardCb(*sharded_keys, EngineShard::tlocal(), shard_journals_cnt);
intrusive_ptr_release(this);
});
}
WaitForShardCallbacks();
DCHECK_GE(GetUseCount(), 1u);

VLOG(1) << "UnlockMultiEnd " << DebugId();
}
Expand Down
3 changes: 2 additions & 1 deletion src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ class Transaction {
unsigned cnt[2] = {0, 0};
};

using KeyList = std::vector<std::pair<std::string_view, LockCnt>>;
// owned std::string because callbacks its used in run fully async and can outlive the entries.
using KeyList = std::vector<std::pair<std::string, LockCnt>>;

struct PerShardData {
PerShardData(PerShardData&&) noexcept {
Expand Down

0 comments on commit 0c292bd

Please sign in to comment.