From f3895532929cd1370a98d14fabf0c11c3a6a4f37 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Fri, 16 Feb 2024 09:58:50 +0300 Subject: [PATCH 1/2] chore: little transaction cleanup Signed-off-by: Vladislav Oleshko --- src/server/generic_family.cc | 6 ++-- src/server/list_family.cc | 7 ++-- src/server/transaction.cc | 68 ++++++++++++++++++++---------------- src/server/transaction.h | 45 +++++++++++------------- 4 files changed, 61 insertions(+), 65 deletions(-) diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 8a7d45343374..0fb187e07dc4 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -1293,11 +1293,9 @@ OpResult GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des Transaction* transaction = cntx->transaction; if (transaction->GetUniqueShardCnt() == 1) { + transaction->RenableAutoJournal(); // Safe to use RENAME with single shard auto cb = [&](Transaction* t, EngineShard* shard) { - auto ec = OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest); - // Incase of uniqe shard count we can use rename command in replica. - t->RenableAutoJournal(); - return ec; + return OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest); }; OpResult result = transaction->ScheduleSingleHopT(std::move(cb)); diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 99e20b33846b..85dacd6b539c 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -717,13 +717,10 @@ void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, Lis OpResult result; if (cntx->transaction->GetUniqueShardCnt() == 1) { + cntx->transaction->RenableAutoJournal(); // On single shard we can use the auto journal flow. auto cb = [&](Transaction* t, EngineShard* shard) { - auto ec = OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir); - // On single shard we can use the auto journal flow. - t->RenableAutoJournal(); - return ec; + return OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir); }; - result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); } else { cntx->transaction->Schedule(); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 3ce72ed62b43..9c9379a29c8b 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -27,6 +27,7 @@ thread_local Transaction::TLTmpSpace Transaction::tmp_space; namespace { +// Global txid sequence atomic_uint64_t op_seq{1}; constexpr size_t kTransSize [[maybe_unused]] = sizeof(Transaction); @@ -90,12 +91,12 @@ std::ostream& operator<<(std::ostream& os, Transaction::time_point tp) { return os << ms << "ms"; } -} // namespace - -IntentLock::Mode Transaction::LockMode() const { - return cid_->IsReadOnly() ? IntentLock::SHARED : IntentLock::EXCLUSIVE; +uint16_t trans_id(const Transaction* ptr) { + return (intptr_t(ptr) >> 8) & 0xFFFF; } +} // namespace + void Transaction::PhasedBarrier::Start(uint32_t count) { DCHECK_EQ(DEBUG_Count(), 0u); count_.store(count, memory_order_release); @@ -159,13 +160,6 @@ cv_status Transaction::BatonBarrier::Wait(time_point tp) { return cv_status::no_timeout; } -/** - * @brief Construct a new Transaction:: Transaction object - * - * @param cid - * @param ess - * @param cs - */ Transaction::Transaction(const CommandId* cid) : cid_{cid} { InitTxTime(); string_view cmd_name(cid_->name()); @@ -1155,6 +1149,16 @@ uint16_t Transaction::GetLocalMask(ShardId sid) const { return shard_data_[SidToId(sid)].local_mask; } +IntentLock::Mode Transaction::LockMode() const { + return cid_->IsReadOnly() ? IntentLock::SHARED : IntentLock::EXCLUSIVE; +} + +OpArgs Transaction::GetOpArgs(EngineShard* shard) const { + DCHECK(IsActive(shard->shard_id())); + DCHECK((multi_ && multi_->role == SQUASHED_STUB) || (run_barrier_.DEBUG_Count() > 0)); + return OpArgs{shard, this, GetDbContext()}; +} + // Runs within a engine shard thread. // Optimized path that schedules and runs transactions out of order if possible. // Returns true if eagerly executed, false if the callback will be handled by the transaction @@ -1278,35 +1282,31 @@ bool Transaction::CancelShardCb(EngineShard* shard) { ShardId idx = SidToId(shard->shard_id()); auto& sd = shard_data_[idx]; - auto pos = sd.pq_pos; - if (pos == TxQueue::kEnd) + TxQueue::Iterator prev_pos = exchange(sd.pq_pos, TxQueue::kEnd); + if (prev_pos == TxQueue::kEnd) { + DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED) == 0); return false; - - sd.pq_pos = TxQueue::kEnd; + } TxQueue* txq = shard->txq(); TxQueue::Iterator head = txq->Head(); - auto val = txq->At(pos); - Transaction* trans = absl::get(val); - DCHECK(trans == this) << "Pos " << pos << ", txq size " << txq->size() << ", trans " << trans; - txq->Remove(pos); - if (sd.local_mask & KEYLOCK_ACQUIRED) { - auto mode = LockMode(); + Transaction* trans = absl::get(txq->At(prev_pos)); + DCHECK(trans == this) << txq->size() << ' ' << sd.pq_pos << ' ' << trans->DebugId(); + txq->Remove(prev_pos); + + if (IsGlobal()) { + shard->shard_lock()->Release(LockMode()); + } else { auto lock_args = GetLockArgs(shard->shard_id()); + DCHECK(sd.local_mask & KEYLOCK_ACQUIRED); DCHECK(lock_args.args.size() > 0); - shard->db_slice().Release(mode, lock_args); + shard->db_slice().Release(LockMode(), lock_args); sd.local_mask &= ~KEYLOCK_ACQUIRED; } - if (IsGlobal()) { - shard->shard_lock()->Release(LockMode()); - } - - if (pos == head && !txq->Empty()) { - return true; - } - return false; + // Check if we need to poll the next head + return prev_pos == head && !txq->Empty(); } // runs in engine-shard thread. @@ -1541,7 +1541,7 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul } // If autojournaling was disabled and not re-enabled, skip it - if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !renabled_auto_journal_.load(memory_order_relaxed)) + if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !re_enabled_auto_journal_) return; // TODO: Handle complex commands like LMPOP correctly once they are implemented. @@ -1586,6 +1586,12 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt unique_slot_checker_.GetUniqueSlotId(), {}, false); } +void Transaction::RenableAutoJournal() { + DCHECK(cid_->opt_mask() & CO::NO_AUTOJOURNAL); + DCHECK_EQ(run_barrier_.DEBUG_Count(), 0u); // Can't be changed while dispatching + re_enabled_auto_journal_ = true; +} + void Transaction::CancelBlocking(std::function status_cb) { // We're on the owning thread of this transaction, so we can safely access it's data below. // First, check if it makes sense to proceed. diff --git a/src/server/transaction.h b/src/server/transaction.h index f1ea955b0d14..88c2add6475f 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -158,12 +158,13 @@ class Transaction { enum LocalMask : uint16_t { ACTIVE = 1, // Whether its active on this shard (to schedule or execute hops) ARMED = 1 << 1, // Whether its armed (the hop was prepared) - OUT_OF_ORDER = - 1 << 2, // Whether it can run out of order. Undefined if KEYLOCK_ACQUIRED isn't set - KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired - SUSPENDED_Q = 1 << 4, // Whether it suspended (by WatchInShard()) - AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended()) - UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb + // Whether it can run out of order. Undefined if KEYLOCK_ACQUIRED isn't set + OUT_OF_ORDER = 1 << 2, + // Whether its key locks are acquired, never set for global commands. + KEYLOCK_ACQUIRED = 1 << 3, + SUSPENDED_Q = 1 << 4, // Whether it suspended (by WatchInShard()) + AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended()) + UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb }; public: @@ -220,11 +221,6 @@ class Transaction { // Must be called from coordinator thread. void CancelBlocking(std::function); - // In some cases for non auto-journaling commands we want to enable the auto journal flow. - void RenableAutoJournal() { - renabled_auto_journal_.store(true, std::memory_order_relaxed); - } - // Prepare a squashed hop on given shards. // Only compatible with multi modes that acquire all locks ahead - global and lock_ahead. void PrepareSquashedMultiHop(const CommandId* cid, absl::FunctionRef enabled); @@ -268,6 +264,12 @@ class Transaction { // Returns the state mask on this shard. Safe only when the transaction is armed (or blocked). uint16_t GetLocalMask(ShardId sid) const; + // If blocking tx was woken up on this shard, get wake key. + std::optional GetWakeKey(ShardId sid) const; + + // Get OpArgs for specific shard + OpArgs GetOpArgs(EngineShard* shard) const; + uint32_t GetLocalTxqPos(ShardId sid) const { return shard_data_[SidToId(sid)].pq_pos; } @@ -311,13 +313,6 @@ class Transaction { bool IsGlobal() const; - // If blocking tx was woken up on this shard, get wake key. - std::optional GetWakeKey(ShardId sid) const; - - OpArgs GetOpArgs(EngineShard* shard) const { - return OpArgs{shard, this, GetDbContext()}; - } - DbContext GetDbContext() const { return DbContext{.db_index = db_index_, .time_now_ms = time_now_ms_}; } @@ -346,6 +341,10 @@ class Transaction { bool multi_commands, bool allow_await) const; void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const; + // Re-enable auto journal for commands marked as NO_AUTOJOURNAL. Call during setup. + void RenableAutoJournal(); + + // Clear all state to make transaction re-usable void Refurbish(); // Get keys multi transaction was initialized with, normalized and unique @@ -377,7 +376,7 @@ class Transaction { uint32_t arg_count = 0; // Position in the tx queue. OOO or cancelled schedules remove themselves by this index. - uint32_t pq_pos = TxQueue::kEnd; + TxQueue::Iterator pq_pos = TxQueue::kEnd; // State of shard - bitmask with LocalState flags uint16_t local_mask = 0; @@ -603,8 +602,8 @@ class Transaction { // Stores the full undivided command. CmdArgList full_args_; - // True if NO_AUTOJOURNAL command asked to enable auto journal - std::atomic renabled_auto_journal_ = false; + // Set if a NO_AUTOJOURNAL command asked to enable auto journal again + bool re_enabled_auto_journal_ = false; // Reverse argument mapping for ReverseArgIndex to convert from shard index to original index. std::vector reverse_index_; @@ -661,10 +660,6 @@ template auto Transaction::ScheduleSingleHopT(F&& f) -> decltype(f( return res; } -inline uint16_t trans_id(const Transaction* ptr) { - return (intptr_t(ptr) >> 8) & 0xFFFF; -} - OpResult DetermineKeys(const CommandId* cid, CmdArgList args); } // namespace dfly From edb6dd1a28f87f3cd2a5783c5e7ebe342f33cbeb Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Fri, 8 Mar 2024 09:17:11 +0300 Subject: [PATCH 2/2] fix: fixes --- src/server/generic_family.cc | 2 +- src/server/list_family.cc | 2 +- src/server/transaction.cc | 18 +++++++++--------- src/server/transaction.h | 2 +- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 0fb187e07dc4..2a970c8e8286 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -1293,7 +1293,7 @@ OpResult GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des Transaction* transaction = cntx->transaction; if (transaction->GetUniqueShardCnt() == 1) { - transaction->RenableAutoJournal(); // Safe to use RENAME with single shard + transaction->ReviveAutoJournal(); // Safe to use RENAME with single shard auto cb = [&](Transaction* t, EngineShard* shard) { return OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest); }; diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 85dacd6b539c..fc06a484bcef 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -717,7 +717,7 @@ void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, Lis OpResult result; if (cntx->transaction->GetUniqueShardCnt() == 1) { - cntx->transaction->RenableAutoJournal(); // On single shard we can use the auto journal flow. + cntx->transaction->ReviveAutoJournal(); // On single shard we can use the auto journal flow. auto cb = [&](Transaction* t, EngineShard* shard) { return OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir); }; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 9c9379a29c8b..4d1c7765b17a 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1282,31 +1282,31 @@ bool Transaction::CancelShardCb(EngineShard* shard) { ShardId idx = SidToId(shard->shard_id()); auto& sd = shard_data_[idx]; - TxQueue::Iterator prev_pos = exchange(sd.pq_pos, TxQueue::kEnd); - if (prev_pos == TxQueue::kEnd) { - DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED) == 0); + TxQueue::Iterator q_pos = exchange(sd.pq_pos, TxQueue::kEnd); + if (q_pos == TxQueue::kEnd) { + DCHECK_EQ(sd.local_mask & KEYLOCK_ACQUIRED, 0); return false; } TxQueue* txq = shard->txq(); - TxQueue::Iterator head = txq->Head(); + bool was_head = txq->Head() == q_pos; - Transaction* trans = absl::get(txq->At(prev_pos)); + Transaction* trans = absl::get(txq->At(q_pos)); DCHECK(trans == this) << txq->size() << ' ' << sd.pq_pos << ' ' << trans->DebugId(); - txq->Remove(prev_pos); + txq->Remove(q_pos); if (IsGlobal()) { shard->shard_lock()->Release(LockMode()); } else { auto lock_args = GetLockArgs(shard->shard_id()); DCHECK(sd.local_mask & KEYLOCK_ACQUIRED); - DCHECK(lock_args.args.size() > 0); + DCHECK(!lock_args.args.empty()); shard->db_slice().Release(LockMode(), lock_args); sd.local_mask &= ~KEYLOCK_ACQUIRED; } // Check if we need to poll the next head - return prev_pos == head && !txq->Empty(); + return was_head && !txq->Empty(); } // runs in engine-shard thread. @@ -1586,7 +1586,7 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt unique_slot_checker_.GetUniqueSlotId(), {}, false); } -void Transaction::RenableAutoJournal() { +void Transaction::ReviveAutoJournal() { DCHECK(cid_->opt_mask() & CO::NO_AUTOJOURNAL); DCHECK_EQ(run_barrier_.DEBUG_Count(), 0u); // Can't be changed while dispatching re_enabled_auto_journal_ = true; diff --git a/src/server/transaction.h b/src/server/transaction.h index 88c2add6475f..ba1c23df85e7 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -342,7 +342,7 @@ class Transaction { void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const; // Re-enable auto journal for commands marked as NO_AUTOJOURNAL. Call during setup. - void RenableAutoJournal(); + void ReviveAutoJournal(); // Clear all state to make transaction re-usable void Refurbish();