diff --git a/src/server/blocking_controller.cc b/src/server/blocking_controller.cc index bf470f37fd62..97b19361a029 100644 --- a/src/server/blocking_controller.cc +++ b/src/server/blocking_controller.cc @@ -199,8 +199,6 @@ void BlockingController::RemoveWatched(ArgSlice keys, Transaction* trans) { if (wt.queue_map.empty()) { watched_dbs_.erase(dbit); } - - awakened_transactions_.erase(trans); } // Called from commands like lpush. diff --git a/src/server/blocking_controller.h b/src/server/blocking_controller.h index 61ae33eb066e..5964177a65cb 100644 --- a/src/server/blocking_controller.h +++ b/src/server/blocking_controller.h @@ -48,6 +48,10 @@ class BlockingController { size_t NumWatched(DbIndex db_indx) const; std::vector GetWatchedKeys(DbIndex db_indx) const; + void RemoveAwaked(Transaction* trans) { + awakened_transactions_.erase(trans); + } + private: struct WatchQueue; struct DbWatchTable; @@ -70,7 +74,5 @@ class BlockingController { // There can be multiple transactions like this because a transaction // could awaken arbitrary number of keys. absl::flat_hash_set awakened_transactions_; - - // absl::btree_multimap waiting_convergence_; }; } // namespace dfly diff --git a/src/server/blocking_controller_test.cc b/src/server/blocking_controller_test.cc index b64106ba9965..a51733df027b 100644 --- a/src/server/blocking_controller_test.cc +++ b/src/server/blocking_controller_test.cc @@ -81,8 +81,7 @@ TEST_F(BlockingControllerTest, Timeout) { time_point tp = steady_clock::now() + chrono::milliseconds(10); trans_->Schedule(); - auto keys = trans_->ShardArgsInShard(0); - auto cb = [&](Transaction* t, EngineShard* shard) { return t->WatchInShard(keys, shard); }; + auto cb = [&](Transaction* t, EngineShard* shard) { return trans_->ShardArgsInShard(0); }; bool res = trans_->WaitOnWatch(tp, cb); diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 2bb42665dc02..c2ebc83f4133 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -292,8 +292,11 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { CHECK_EQ(committed_txid_, trans->notify_txid()); bool keep = trans->RunInShard(this); - if (keep) + if (keep) { return; + } else { + blocking_controller_->RemoveAwaked(trans); + } } if (continuation_trans_) { diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 32075b07cabb..9f4e1c9f3e6a 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -202,6 +202,7 @@ class EngineShard { uint32_t periodic_task_ = 0; uint32_t defrag_task_ = 0; + DefragTaskState defrag_state_; std::unique_ptr tiered_storage_; std::unique_ptr blocking_controller_; diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 3d023d841394..85b374bfa282 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -240,13 +240,12 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) { } // Block - auto cb = [&](Transaction* t, EngineShard* shard) { - auto keys = t->ShardArgsInShard(shard->shard_id()); - return t->WatchInShard(keys, shard); + auto wcb = [&](Transaction* t, EngineShard* shard) { + return t->ShardArgsInShard(shard->shard_id()); }; ++stats->num_blocked_clients; - bool wait_succeeded = t->WaitOnWatch(tp, std::move(cb)); + bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb)); --stats->num_blocked_clients; if (!wait_succeeded) @@ -884,10 +883,8 @@ OpResult BPopPusher::RunSingle(Transaction* t, time_point tp) { } auto* stats = ServerState::tl_connection_stats(); - auto wcb = [&](Transaction* t, EngineShard* shard) { - ArgSlice keys{&this->pop_key_, 1}; - return t->WatchInShard(keys, shard); - }; + + auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; }; // Block ++stats->num_blocked_clients; @@ -919,10 +916,7 @@ OpResult BPopPusher::RunPair(Transaction* t, time_point tp) { // Therefore we follow the regular flow of watching the key but for the destination shard it // will never be triggerred. // This allows us to run Transaction::Execute on watched transactions in both shards. - auto wcb = [&](Transaction* t, EngineShard* shard) { - ArgSlice keys{&this->pop_key_, 1}; - return t->WatchInShard(keys, shard); - }; + auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; }; ++stats->num_blocked_clients; diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index b396fbc317ab..e643abeb082b 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -9,6 +9,7 @@ #include "base/gtest.h" #include "base/logging.h" #include "facade/facade_test.h" +#include "server/blocking_controller.h" #include "server/command_registry.h" #include "server/conn_context.h" #include "server/engine_shard_set.h" @@ -28,6 +29,17 @@ class ListFamilyTest : public BaseFamilyTest { ListFamilyTest() { num_threads_ = 4; } + + unsigned NumWatched() { + atomic_uint32_t sum{0}; + shard_set->RunBriefInParallel([&](EngineShard* es) { + auto* bc = es->blocking_controller(); + if (bc) + sum.fetch_add(bc->NumWatched(0), memory_order_relaxed); + }); + + return sum.load(); + } }; const char kKey1[] = "x"; @@ -114,6 +126,7 @@ TEST_F(ListFamilyTest, BLPopBlocking) { ASSERT_THAT(resp0, ArrLen(2)); EXPECT_THAT(resp0.GetVec(), ElementsAre("x", "1")); ASSERT_FALSE(IsLocked(0, "x")); + ASSERT_EQ(0, NumWatched()); } TEST_F(ListFamilyTest, BLPopMultiple) { @@ -137,7 +150,7 @@ TEST_F(ListFamilyTest, BLPopMultiple) { EXPECT_THAT(resp0.GetVec(), ElementsAre(kKey1, "3")); ASSERT_FALSE(IsLocked(0, kKey1)); ASSERT_FALSE(IsLocked(0, kKey2)); - // ess_->RunBriefInParallel([](EngineShard* es) { ASSERT_FALSE(es->HasAwakedTransaction()); }); + ASSERT_EQ(0, NumWatched()); } TEST_F(ListFamilyTest, BLPopTimeout) { @@ -155,6 +168,7 @@ TEST_F(ListFamilyTest, BLPopTimeout) { EXPECT_THAT(resp, ArgType(RespExpr::NIL_ARRAY)); ASSERT_FALSE(service_->IsLocked(0, kKey1)); + ASSERT_EQ(0, NumWatched()); } TEST_F(ListFamilyTest, BLPopTimeout2) { @@ -171,6 +185,7 @@ TEST_F(ListFamilyTest, BLPopTimeout2) { Run({"DEL", "blist2"}); Run({"RPUSH", "blist2", "d"}); Run({"BLPOP", "blist1", "blist2", "1"}); + ASSERT_EQ(0, NumWatched()); } TEST_F(ListFamilyTest, BLPopMultiPush) { @@ -210,6 +225,7 @@ TEST_F(ListFamilyTest, BLPopMultiPush) { ASSERT_THAT(blpop_resp, ArrLen(2)); auto resp_arr = blpop_resp.GetVec(); EXPECT_THAT(resp_arr, ElementsAre(kKey1, "A")); + ASSERT_EQ(0, NumWatched()); } TEST_F(ListFamilyTest, BLPopSerialize) { @@ -638,10 +654,10 @@ TEST_F(ListFamilyTest, TwoQueueBug451) { auto push_fiber = [&]() { auto id = "t-" + std::to_string(it_cnt.fetch_add(1)); - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < 300; i++) { Run(id, {"rpush", "a", "DATA"}); } - fibers_ext::SleepFor(100ms); + fibers_ext::SleepFor(50ms); running = false; }; @@ -662,6 +678,7 @@ TEST_F(ListFamilyTest, TwoQueueBug451) { TEST_F(ListFamilyTest, BRPopLPushSingleShard) { EXPECT_THAT(Run({"brpoplpush", "x", "y", "0.05"}), ArgType(RespExpr::NIL)); + ASSERT_EQ(0, NumWatched()); EXPECT_THAT(Run({"lpush", "x", "val1"}), IntArg(1)); EXPECT_EQ(Run({"brpoplpush", "x", "y", "0.01"}), "val1"); @@ -682,6 +699,7 @@ TEST_F(ListFamilyTest, BRPopLPushSingleShard) { EXPECT_THAT(resp, ArgType(RespExpr::NIL)); ASSERT_FALSE(IsLocked(0, "x")); ASSERT_FALSE(IsLocked(0, "y")); + ASSERT_EQ(0, NumWatched()); } TEST_F(ListFamilyTest, BRPopLPushSingleShardBlocking) { @@ -699,12 +717,15 @@ TEST_F(ListFamilyTest, BRPopLPushSingleShardBlocking) { ASSERT_EQ(resp, "1"); ASSERT_FALSE(IsLocked(0, "x")); ASSERT_FALSE(IsLocked(0, "y")); + ASSERT_EQ(0, NumWatched()); } TEST_F(ListFamilyTest, BRPopLPushTwoShards) { RespExpr resp; - EXPECT_THAT(Run({"brpoplpush", "x", "z", "0.05"}), ArgType(RespExpr::NIL)); + + ASSERT_EQ(0, NumWatched()); + Run({"lpush", "x", "val"}); EXPECT_EQ(Run({"brpoplpush", "x", "z", "0"}), "val"); resp = Run({"lrange", "z", "0", "-1"}); @@ -732,6 +753,7 @@ TEST_F(ListFamilyTest, BRPopLPushTwoShards) { ASSERT_THAT(resp.GetVec(), ElementsAre("val1", "val2")); ASSERT_FALSE(IsLocked(0, "x")); ASSERT_FALSE(IsLocked(0, "z")); + ASSERT_EQ(0, NumWatched()); // TODO: there is a bug here. // we do not wake the dest shard, when source is awaked which prevents // the atomicity and causes the first bug as well. diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 719ff450d1d9..6571e65f0c16 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -402,10 +402,6 @@ bool Transaction::RunInShard(EngineShard* shard) { if (was_suspended || !become_suspended) { shard->db_slice().Release(mode, largs); sd.local_mask &= ~KEYLOCK_ACQUIRED; - - if (was_suspended || (sd.local_mask & AWAKED_Q)) { - shard->blocking_controller()->RemoveWatched(ShardArgsInShard(shard->shard_id()), this); - } } sd.local_mask &= ~OUT_OF_ORDER; } @@ -813,13 +809,18 @@ void Transaction::RunQuickie(EngineShard* shard) { // runs in coordinator thread. // Marks the transaction as expired and removes it from the waiting queue. -void Transaction::ExpireBlocking() { - DVLOG(1) << "ExpireBlocking " << DebugId(); +void Transaction::UnwatchBlocking(bool should_expire, WaitKeysPovider wcb) { + DVLOG(1) << "UnwatchBlocking " << DebugId(); DCHECK(!IsGlobal()); run_count_.store(unique_shard_cnt_, memory_order_release); - auto expire_cb = [this] { ExpireShardCb(EngineShard::tlocal()); }; + auto expire_cb = [&] { + EngineShard* es = EngineShard::tlocal(); + ArgSlice wkeys = wcb(this, es); + + UnwatchShardCb(wkeys, should_expire, es); + }; if (unique_shard_cnt_ == 1) { DCHECK_LT(unique_shard_id_, shard_set->size()); @@ -837,7 +838,7 @@ void Transaction::ExpireBlocking() { // Wait for all callbacks to conclude. WaitForShardCallbacks(); - DVLOG(1) << "ExpireBlocking finished " << DebugId(); + DVLOG(1) << "UnwatchBlocking finished " << DebugId(); } const char* Transaction::Name() const { @@ -1009,12 +1010,17 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const { return reverse_index_[sd.arg_start + arg_index]; } -bool Transaction::WaitOnWatch(const time_point& tp, RunnableType cb) { +bool Transaction::WaitOnWatch(const time_point& tp, WaitKeysPovider wkeys_provider) { // Assumes that transaction is pending and scheduled. TODO: To verify it with state machine. VLOG(2) << "WaitOnWatch Start use_count(" << use_count() << ")"; using namespace chrono; - Execute(cb, true); + auto cb = [&](Transaction* t, EngineShard* shard) { + auto keys = wkeys_provider(t, shard); + return t->WatchInShard(keys, shard); + }; + + Execute(move(cb), true); coordinator_state_ |= COORD_BLOCKED; @@ -1038,40 +1044,11 @@ bool Transaction::WaitOnWatch(const time_point& tp, RunnableType cb) { DVLOG(1) << "WaitOnWatch await_until " << int(status); } - if ((coordinator_state_ & COORD_CANCELLED) || status == cv_status::timeout) { - ExpireBlocking(); - coordinator_state_ &= ~COORD_BLOCKED; - return false; - } - -#if 0 - // We were notified by a shard, so lets make sure that our notifications converged to a stable - // form. - if (unique_shard_cnt_ > 1) { - run_count_.store(unique_shard_cnt_, memory_order_release); - - auto converge_cb = [this] { - this->CheckForConvergence(EngineShard::tlocal()); - }; - - for (ShardId i = 0; i < shard_data_.size(); ++i) { - auto& sd = shard_data_[i]; - DCHECK_EQ(0, sd.local_mask & ARMED); - if (sd.arg_count == 0) - continue; - shard_set->Add(i, converge_cb); - } - - // Wait for all callbacks to conclude. - WaitForShardCallbacks(); - DVLOG(1) << "Convergence finished " << DebugId(); - } -#endif - - // Lift blocking mask. + bool is_expired = (coordinator_state_ & COORD_CANCELLED) || status == cv_status::timeout; + UnwatchBlocking(is_expired, wkeys_provider); coordinator_state_ &= ~COORD_BLOCKED; - return true; + return !is_expired; } // Runs only in the shard thread. @@ -1091,22 +1068,24 @@ OpStatus Transaction::WatchInShard(ArgSlice keys, EngineShard* shard) { return OpStatus::OK; } -void Transaction::ExpireShardCb(EngineShard* shard) { - auto lock_args = GetLockArgs(shard->shard_id()); - shard->db_slice().Release(Mode(), lock_args); +void Transaction::UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard* shard) { + if (should_expire) { + auto lock_args = GetLockArgs(shard->shard_id()); + shard->db_slice().Release(Mode(), lock_args); - unsigned sd_idx = SidToId(shard->shard_id()); - auto& sd = shard_data_[sd_idx]; - sd.local_mask |= EXPIRED_Q; - sd.local_mask &= ~KEYLOCK_ACQUIRED; + unsigned sd_idx = SidToId(shard->shard_id()); + auto& sd = shard_data_[sd_idx]; + sd.local_mask |= EXPIRED_Q; + sd.local_mask &= ~KEYLOCK_ACQUIRED; + } - shard->blocking_controller()->RemoveWatched(ShardArgsInShard(shard->shard_id()), this); + shard->blocking_controller()->RemoveWatched(wkeys, this); // Need to see why I decided to call this. // My guess - probably to trigger the run of stalled transactions in case // this shard concurrently awoke this transaction and stalled the processing // of TxQueue. - shard->PollExecution("expirecb", nullptr); + shard->PollExecution("unwatchcb", nullptr); CHECK_GE(DecreaseRunCnt(), 1u); } diff --git a/src/server/transaction.h b/src/server/transaction.h index 60281439c036..c635324cba62 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -161,7 +161,8 @@ class Transaction { // or b) tp is reached. If tp is time_point::max() then waits indefinitely. // Expects that the transaction had been scheduled before, and uses Execute(.., true) to register. // Returns false if timeout occurred, true if was notified by one of the keys. - bool WaitOnWatch(const time_point& tp, RunnableType cb); + using WaitKeysPovider = std::function; + bool WaitOnWatch(const time_point& tp, WaitKeysPovider cb); // Returns true if transaction is awaked, false if it's timed-out and can be removed from the // blocking queue. NotifySuspended may be called from (multiple) shard threads and @@ -191,9 +192,6 @@ class Transaction { return db_index_; } - // Adds itself to watched queue in the shard. Must run in that shard thread. - OpStatus WatchInShard(ArgSlice keys, EngineShard* shard); - private: struct LockCnt { unsigned cnt[2] = {0, 0}; @@ -208,7 +206,7 @@ class Transaction { void ScheduleInternal(); void LockMulti(); - void ExpireBlocking(); + void UnwatchBlocking(bool should_expire, WaitKeysPovider wcb); void ExecuteAsync(); // Optimized version of RunInShard for single shard uncontended cases. @@ -226,9 +224,12 @@ class Transaction { // Returns true if we need to follow up with PollExecution on this shard. bool CancelShardCb(EngineShard* shard); - void ExpireShardCb(EngineShard* shard); + void UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard* shard); void UnlockMultiShardCb(const std::vector& sharded_keys, EngineShard* shard); + // Adds itself to watched queue in the shard. Must run in that shard thread. + OpStatus WatchInShard(ArgSlice keys, EngineShard* shard); + void WaitForShardCallbacks() { run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); });