diff --git a/src/server/transaction.cc b/src/server/transaction.cc index d63981f18d1a..1fc10bec934c 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -296,7 +296,7 @@ void Transaction::InitByKeys(KeyIndex key_index) { for (const auto& sd : shard_data_) { // sd.local_mask may be non-zero for multi transactions with instant locking. // Specifically EVALs may maintain state between calls. - DCHECK_EQ(0, sd.local_mask & ARMED); + DCHECK(!sd.is_armed.load(std::memory_order_relaxed)); if (!multi_) { DCHECK_EQ(TxQueue::kEnd, sd.pq_pos); } @@ -405,8 +405,9 @@ bool Transaction::RunInShard(EngineShard* shard) { unsigned idx = SidToId(shard->shard_id()); auto& sd = shard_data_[idx]; - DCHECK(sd.local_mask & ARMED); - sd.local_mask &= ~ARMED; + bool prev_armed = sd.is_armed.load(memory_order_relaxed); + DCHECK(prev_armed); + sd.is_armed.store(false, memory_order_relaxed); VLOG(2) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id() << " " << sd.local_mask; @@ -649,7 +650,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { DCHECK_EQ(1u, shard_data_.size()); // IsArmedInShard() first checks run_count_ before shard_data, so use release ordering. - shard_data_[0].local_mask |= ARMED; + shard_data_[0].is_armed.store(true, memory_order_relaxed); run_count_.store(1, memory_order_release); time_now_ms_ = GetCurrentTimeMs(); @@ -784,7 +785,9 @@ void Transaction::ExecuteAsync() { // safely. use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed); - IterateActiveShards([](PerShardData& sd, auto i) { sd.local_mask |= ARMED; }); + // We access sd.is_armed outside of shard-threads but we guard it with run_count_ release. + IterateActiveShards( + [](PerShardData& sd, auto i) { sd.is_armed.store(true, memory_order_relaxed); }); uint32_t seq = seqlock_.load(memory_order_relaxed); @@ -798,24 +801,35 @@ void Transaction::ExecuteAsync() { auto cb = [seq, this] { EngineShard* shard = EngineShard::tlocal(); - uint32_t seq_after = seqlock_.load(memory_order_acquire); - bool should_poll = (seq_after == seq) && (GetLocalMask(shard->shard_id()) & ARMED); - - DVLOG(3) << "PollExecCb " << DebugId() << " sid(" << shard->shard_id() << ") " - << run_count_.load(memory_order_relaxed) << ", should_poll: " << should_poll; - - // We verify that this callback is still relevant. - // If we still have the same sequence number and local_mask is ARMED it means - // the coordinator thread has not crossed WaitForShardCallbacks barrier. - // Otherwise, this callback is redundant. We may still call PollExecution but - // we should not pass this to it since it can be in undefined state for this callback. - if (should_poll) { - // shard->PollExecution(this) does not necessarily execute this transaction. - // Therefore, everything that should be handled during the callback execution - // should go into RunInShard. - shard->PollExecution("exec_cb", this); - } else { - VLOG(1) << "Skipping PollExecution " << DebugId() << " sid(" << shard->shard_id() << ")"; + bool is_armed = IsArmedInShard(shard->shard_id()); + // First we check that this shard should run a callback by checking IsArmedInShard. + if (is_armed) { + uint32_t seq_after = seqlock_.load(memory_order_relaxed); + + DVLOG(3) << "PollExecCb " << DebugId() << " sid(" << shard->shard_id() << ") " + << run_count_.load(memory_order_relaxed); + + // We also make sure that for mult-operation transactions like Multi/Eval + // this callback runs on a correct operation. We want to avoid a situation + // where the first operation is executed and the second operation is armed and + // now this callback from the previous operation finally runs and calls PollExecution. + // It is usually ok, but for single shard operations we abuse index 0 in shard_data_ + // Therefore we may end up with a situation where this old callback runs on shard 7, + // accessing shard_data_[0] that now represents shard 5 for the next operation. + // seqlock provides protection for that so each cb will only run on the operation it has + // been tasked with. + // We also must first check is_armed and only then seqlock. The first check ensures that + // the coordinator thread crossed + // "run_count_.store(unique_shard_cnt_, memory_order_release);" barrier and our seqlock_ + // is valid. + if (seq_after == seq) { + // shard->PollExecution(this) does not necessarily execute this transaction. + // Therefore, everything that should be handled during the callback execution + // should go into RunInShard. + shard->PollExecution("exec_cb", this); + } else { + VLOG(1) << "Skipping PollExecution " << DebugId() << " sid(" << shard->shard_id() << ")"; + } } DVLOG(3) << "ptr_release " << DebugId() << " " << seq; @@ -851,7 +865,7 @@ void Transaction::RunQuickie(EngineShard* shard) { LogAutoJournalOnShard(shard); - sd.local_mask &= ~ARMED; + sd.is_armed.store(false, memory_order_relaxed); cb_ = nullptr; // We can do it because only a single shard runs the callback. } @@ -870,10 +884,7 @@ void Transaction::UnwatchBlocking(bool should_expire, WaitKeysProvider wcb) { UnwatchShardCb(wkeys, should_expire, es); }; - IterateActiveShards([&expire_cb](PerShardData& sd, auto i) { - DCHECK_EQ(0, sd.local_mask & ARMED); - shard_set->Add(i, expire_cb); - }); + IterateActiveShards([&expire_cb](PerShardData& sd, auto i) { shard_set->Add(i, expire_cb); }); // Wait for all callbacks to conclude. WaitForShardCallbacks(); diff --git a/src/server/transaction.h b/src/server/transaction.h index 836c0a7fde94..e7329ff16190 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -92,8 +92,8 @@ class Transaction { // State on specific shard. enum LocalMask : uint16_t { - ACTIVE = 1, // Set on all active shards. - ARMED = 1 << 1, // Whether callback cb_ is set + ACTIVE = 1, // Set on all active shards. + // UNUSED = 1 << 1, OUT_OF_ORDER = 1 << 2, // Whether its running out of order KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired SUSPENDED_Q = 1 << 4, // Whether is suspened (by WatchInShard()) @@ -191,7 +191,8 @@ class Transaction { sid = 0; // We use acquire so that no reordering will move before this load. - return run_count_.load(std::memory_order_acquire) > 0 && shard_data_[sid].local_mask & ARMED; + return run_count_.load(std::memory_order_acquire) > 0 && + shard_data_[sid].is_armed.load(std::memory_order_relaxed); } // Called from engine set shard threads. @@ -275,6 +276,12 @@ class Transaction { PerShardData() = default; + // this is the only variable that is accessed by both shard and coordinator threads. + std::atomic_bool is_armed{false}; + + // We pad with some memory so that atomic loads won't cause false sharing betweem threads. + char pad[48]; // to make sure PerShardData is 64 bytes and takes full cacheline. + uint32_t arg_start = 0; // Indices into args_ array. uint16_t arg_count = 0; @@ -287,6 +294,8 @@ class Transaction { uint32_t pq_pos = TxQueue::kEnd; }; + static_assert(sizeof(PerShardData) == 64); // cacheline + // State of a multi transaction. struct MultiData { // Increase lock counts for all current keys for mode. Clear keys. @@ -397,7 +406,7 @@ class Transaction { void WaitForShardCallbacks() { run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); }); - seqlock_.fetch_add(1, std::memory_order_acq_rel); + seqlock_.fetch_add(1, std::memory_order_release); } // Log command in shard's journal, if this is a write command with auto-journaling enabled.