diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 655030fbacd3..f09812bf347f 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -184,8 +184,7 @@ void Transaction::InitShardData(absl::Span shard_index, siz sd.arg_start = args_.size(); // Multi transactions can re-initialize on different shards, so clear ACTIVE flag. - if (multi_) - sd.local_mask &= ~ACTIVE; + DCHECK_EQ(sd.local_mask & ACTIVE, 0); if (sd.arg_count == 0) continue; @@ -277,20 +276,22 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { bool is_stub = multi_ && multi_->role == SQUASHED_STUB; if ((key_index.HasSingleKey() && !IsAtomicMulti()) || is_stub) { + DCHECK(!IsActiveMulti() || multi_->mode == NON_ATOMIC); + // We don't have to split the arguments by shards, so we can copy them directly. StoreKeysInArgs(key_index, needs_reverse_mapping); - // Multi transactions that execute commands on their own (not stubs) can't shrink the backing - // array, as it still might be read by leftover callbacks. - shard_data_.resize(IsActiveMulti() ? shard_set->size() : 1); - shard_data_.front().local_mask |= ACTIVE; - unique_shard_cnt_ = 1; if (is_stub) // stub transactions don't migrate DCHECK_EQ(unique_shard_id_, Shard(args_.front(), shard_set->size())); else unique_shard_id_ = Shard(args_.front(), shard_set->size()); + // Multi transactions that execute commands on their own (not stubs) can't shrink the backing + // array, as it still might be read by leftover callbacks. + shard_data_.resize(IsActiveMulti() ? shard_set->size() : 1); + shard_data_[SidToId(unique_shard_id_)].local_mask |= ACTIVE; + return; } @@ -316,12 +317,13 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { if (unique_shard_cnt_ == 1) { PerShardData* sd; if (IsActiveMulti()) { - sd = &shard_data_[unique_shard_id_]; + sd = &shard_data_[SidToId(unique_shard_id_)]; + DCHECK(sd->local_mask & ACTIVE); } else { shard_data_.resize(1); sd = &shard_data_.front(); + sd->local_mask |= ACTIVE; } - sd->local_mask |= ACTIVE; sd->arg_count = -1; sd->arg_start = -1; } @@ -431,9 +433,10 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) { DCHECK(multi_); DCHECK(!cb_ptr_); + multi_->cmd_seq_num++; + if (multi_->role != SQUASHED_STUB) // stub transactions don't migrate between threads unique_shard_id_ = 0; - multi_->cmd_seq_num++; unique_shard_cnt_ = 0; args_.clear(); @@ -442,18 +445,25 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) { cid_ = cid; cb_ptr_ = nullptr; - if (multi_->mode == NON_ATOMIC || multi_->role == SQUASHED_STUB) { - // Reset shard data without resizing because armed might be read from cancelled callbacks. - for (auto& sd : shard_data_) { - sd.arg_count = sd.arg_start = sd.local_mask = 0; - sd.pq_pos = TxQueue::kEnd; - DCHECK_EQ(sd.is_armed.load(memory_order_relaxed), false); + for (auto& sd : shard_data_) { + sd.arg_count = sd.arg_start = 0; + + if (multi_->mode == NON_ATOMIC) { + sd.local_mask = 0; // Non atomic transactions schedule each time, so remove all flags + DCHECK_EQ(sd.pq_pos, TxQueue::kEnd); + } else { + DCHECK(IsAtomicMulti()); // Every command determines it's own active shards + sd.local_mask &= ~ACTIVE; // so remove ACTIVE flags, but keep KEYLOCK_ACQUIRED } - coordinator_state_ = 0; + DCHECK(!sd.is_armed.load(memory_order_relaxed)); } - if (multi_->mode == NON_ATOMIC) + if (multi_->mode == NON_ATOMIC) { + coordinator_state_ = 0; txid_ = 0; + } else if (multi_->role == SQUASHED_STUB) { + DCHECK_EQ(coordinator_state_, 0u); + } if (multi_->role == SQUASHER) multi_->role = DEFAULT; @@ -710,6 +720,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { if (schedule_fast) { DCHECK_NE(unique_shard_id_, kInvalidSid); + DCHECK(IsActive(unique_shard_id_)); DCHECK(shard_data_.size() == 1 || multi_->mode == NON_ATOMIC); // IsArmedInShard() first checks run_count_ before shard_data, so use release ordering. @@ -1044,6 +1055,18 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const { return res; } +bool Transaction::IsActive(ShardId sid) const { + // If we have only one shard, we often don't store infromation about all shards, so determine it + // solely by id + if (unique_shard_cnt_ == 1) { + // However the active flag is still supposed to be set for our unique shard + DCHECK((shard_data_[SidToId(unique_shard_id_)].local_mask & ACTIVE)); + return sid == unique_shard_id_; + } + + return shard_data_[SidToId(sid)].local_mask & ACTIVE; +} + // Runs within a engine shard thread. // Optimized path that schedules and runs transactions out of order if possible. // Returns true if eagerly executed, false if the callback will be handled by the transaction diff --git a/src/server/transaction.h b/src/server/transaction.h index d3d5bb357f85..d02119ea3134 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -253,11 +253,7 @@ class Transaction { KeyLockArgs GetLockArgs(ShardId sid) const; // Returns true if the transaction spans this shard_id. - // Runs from the coordinator thread. - bool IsActive(ShardId shard_id) const { - return unique_shard_cnt_ == 1 ? (unique_shard_id_ == shard_id) - : shard_data_[shard_id].local_mask & ACTIVE; - } + bool IsActive(ShardId shard_id) const; //! Returns true if the transaction is armed for execution on this sid (used to avoid //! duplicate runs). Supports local transactions under multi as well.