diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 9b7b84c69907..58041f26bc30 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -140,7 +140,6 @@ TEST_F(DflyEngineTest, Multi) { ASSERT_THAT(resp.GetVec(), ElementsAre(ArgType(RespExpr::NIL), ArgType(RespExpr::NIL))); atomic_bool tx_empty = true; - shard_set->RunBriefInParallel([&](EngineShard* shard) { if (!shard->txq()->Empty()) tx_empty.store(false); @@ -264,6 +263,7 @@ TEST_F(DflyEngineTest, MultiConsistent) { mset_fb.Join(); fb.Join(); + ASSERT_FALSE(service_->IsLocked(0, kKey1)); ASSERT_FALSE(service_->IsLocked(0, kKey4)); ASSERT_FALSE(service_->IsShardSetLocked()); @@ -865,6 +865,7 @@ TEST_F(DflyEngineTest, Bug468) { resp = Run({"exec"}); ASSERT_THAT(resp, ErrArg("not an integer")); + ASSERT_FALSE(service_->IsLocked(0, "foo")); resp = Run({"eval", "return redis.call('set', 'foo', 'bar', 'EX', 'moo')", "1", "foo"}); diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 20d1a538606a..59b36856b1f7 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -207,6 +207,11 @@ RespExpr BaseFamilyTest::Run(std::string_view id, ArgSlice slice) { DCHECK(context->transaction == nullptr); + auto cmd = absl::AsciiStrToUpper(slice.front()); + if (cmd == "EVAL" || cmd == "EVALSHA" || cmd == "EXEC") { + shard_set->AwaitRunningOnShardQueue([](auto*) {}); // Wait for async UnlockMulti. + } + unique_lock lk(mu_); last_cmd_dbg_info_ = context->last_command_debug; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 1985ec163764..f2b01079dfc3 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -600,33 +600,28 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { // Runs in the coordinator fiber. void Transaction::UnlockMulti() { VLOG(1) << "UnlockMulti " << DebugId(); - DCHECK(multi_); - using KeyList = vector>; - vector sharded_keys(shard_set->size()); - - // It's LE and not EQ because there may be callbacks in progress that increase use_count_. - DCHECK_LE(1u, GetUseCount()); + DCHECK_GE(GetUseCount(), 1u); // Greater-equal because there may be callbacks in progress. - for (const auto& k_v : multi_->lock_counts) { - ShardId sid = Shard(k_v.first, sharded_keys.size()); - sharded_keys[sid].push_back(k_v); + auto sharded_keys = make_shared>(shard_set->size()); + for (const auto& [key, cnt] : multi_->lock_counts) { + ShardId sid = Shard(key, sharded_keys->size()); + (*sharded_keys)[sid].emplace_back(key, cnt); } - uint32_t shard_journals_cnt = 0; - if (ServerState::tlocal()->journal()) { - shard_journals_cnt = CalcMultiNumOfShardJournals(); - } + unsigned shard_journals_cnt = + ServerState::tlocal()->journal() ? CalcMultiNumOfShardJournals() : 0; uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed); DCHECK_EQ(prev, 0u); + use_count_.fetch_add(shard_data_.size(), std::memory_order_relaxed); for (ShardId i = 0; i < shard_data_.size(); ++i) { - shard_set->Add( - i, [&] { UnlockMultiShardCb(sharded_keys, EngineShard::tlocal(), shard_journals_cnt); }); + shard_set->Add(i, [this, sharded_keys, shard_journals_cnt]() { + this->UnlockMultiShardCb(*sharded_keys, EngineShard::tlocal(), shard_journals_cnt); + intrusive_ptr_release(this); + }); } - WaitForShardCallbacks(); - DCHECK_GE(GetUseCount(), 1u); VLOG(1) << "UnlockMultiEnd " << DebugId(); } diff --git a/src/server/transaction.h b/src/server/transaction.h index c4f1f223a80f..c0cda3f9c1c1 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -227,7 +227,8 @@ class Transaction { unsigned cnt[2] = {0, 0}; }; - using KeyList = std::vector>; + // owned std::string because callbacks its used in run fully async and can outlive the entries. + using KeyList = std::vector>; struct PerShardData { PerShardData(PerShardData&&) noexcept {