Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(transaction): Improve ACTIVE flags management #2458

Merged
merged 2 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 41 additions & 18 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, siz
sd.arg_start = args_.size();

// Multi transactions can re-initialize on different shards, so clear ACTIVE flag.
if (multi_)
sd.local_mask &= ~ACTIVE;
DCHECK_EQ(sd.local_mask & ACTIVE, 0);

if (sd.arg_count == 0)
continue;
Expand Down Expand Up @@ -277,20 +276,22 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
bool is_stub = multi_ && multi_->role == SQUASHED_STUB;

if ((key_index.HasSingleKey() && !IsAtomicMulti()) || is_stub) {
DCHECK(!IsActiveMulti() || multi_->mode == NON_ATOMIC);

// We don't have to split the arguments by shards, so we can copy them directly.
StoreKeysInArgs(key_index, needs_reverse_mapping);

// Multi transactions that execute commands on their own (not stubs) can't shrink the backing
// array, as it still might be read by leftover callbacks.
shard_data_.resize(IsActiveMulti() ? shard_set->size() : 1);
shard_data_.front().local_mask |= ACTIVE;

unique_shard_cnt_ = 1;
if (is_stub) // stub transactions don't migrate
DCHECK_EQ(unique_shard_id_, Shard(args_.front(), shard_set->size()));
else
unique_shard_id_ = Shard(args_.front(), shard_set->size());

// Multi transactions that execute commands on their own (not stubs) can't shrink the backing
// array, as it still might be read by leftover callbacks.
shard_data_.resize(IsActiveMulti() ? shard_set->size() : 1);
shard_data_[SidToId(unique_shard_id_)].local_mask |= ACTIVE;

return;
}

Expand All @@ -316,12 +317,13 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
if (unique_shard_cnt_ == 1) {
PerShardData* sd;
if (IsActiveMulti()) {
sd = &shard_data_[unique_shard_id_];
sd = &shard_data_[SidToId(unique_shard_id_)];
DCHECK(sd->local_mask & ACTIVE);
} else {
shard_data_.resize(1);
sd = &shard_data_.front();
sd->local_mask |= ACTIVE;
}
sd->local_mask |= ACTIVE;
sd->arg_count = -1;
sd->arg_start = -1;
}
Expand Down Expand Up @@ -431,9 +433,10 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {
DCHECK(multi_);
DCHECK(!cb_ptr_);

multi_->cmd_seq_num++;

if (multi_->role != SQUASHED_STUB) // stub transactions don't migrate between threads
unique_shard_id_ = 0;
multi_->cmd_seq_num++;
unique_shard_cnt_ = 0;

args_.clear();
Expand All @@ -442,18 +445,25 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {
cid_ = cid;
cb_ptr_ = nullptr;

if (multi_->mode == NON_ATOMIC || multi_->role == SQUASHED_STUB) {
// Reset shard data without resizing because armed might be read from cancelled callbacks.
for (auto& sd : shard_data_) {
sd.arg_count = sd.arg_start = sd.local_mask = 0;
sd.pq_pos = TxQueue::kEnd;
DCHECK_EQ(sd.is_armed.load(memory_order_relaxed), false);
for (auto& sd : shard_data_) {
sd.arg_count = sd.arg_start = 0;

if (multi_->mode == NON_ATOMIC) {
sd.local_mask = 0; // Non atomic transactions schedule each time, so remove all flags
DCHECK_EQ(sd.pq_pos, TxQueue::kEnd);
} else {
DCHECK(IsAtomicMulti()); // Every command determines it's own active shards
sd.local_mask &= ~ACTIVE; // so remove ACTIVE flags, but keep KEYLOCK_ACQUIRED
}
coordinator_state_ = 0;
DCHECK(!sd.is_armed.load(memory_order_relaxed));
}

if (multi_->mode == NON_ATOMIC)
if (multi_->mode == NON_ATOMIC) {
coordinator_state_ = 0;
txid_ = 0;
} else if (multi_->role == SQUASHED_STUB) {
DCHECK_EQ(coordinator_state_, 0u);
}

if (multi_->role == SQUASHER)
multi_->role = DEFAULT;
Expand Down Expand Up @@ -710,6 +720,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {

if (schedule_fast) {
DCHECK_NE(unique_shard_id_, kInvalidSid);
DCHECK(IsActive(unique_shard_id_));
DCHECK(shard_data_.size() == 1 || multi_->mode == NON_ATOMIC);

// IsArmedInShard() first checks run_count_ before shard_data, so use release ordering.
Expand Down Expand Up @@ -1044,6 +1055,18 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const {
return res;
}

bool Transaction::IsActive(ShardId sid) const {
// If we have only one shard, we often don't store infromation about all shards, so determine it
// solely by id
if (unique_shard_cnt_ == 1) {
// However the active flag is still supposed to be set for our unique shard
DCHECK((shard_data_[SidToId(unique_shard_id_)].local_mask & ACTIVE));
return sid == unique_shard_id_;
}

return shard_data_[SidToId(sid)].local_mask & ACTIVE;
}

// Runs within a engine shard thread.
// Optimized path that schedules and runs transactions out of order if possible.
// Returns true if eagerly executed, false if the callback will be handled by the transaction
Expand Down
6 changes: 1 addition & 5 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,7 @@ class Transaction {
KeyLockArgs GetLockArgs(ShardId sid) const;

// Returns true if the transaction spans this shard_id.
// Runs from the coordinator thread.
bool IsActive(ShardId shard_id) const {
return unique_shard_cnt_ == 1 ? (unique_shard_id_ == shard_id)
: shard_data_[shard_id].local_mask & ACTIVE;
}
bool IsActive(ShardId shard_id) const;

//! Returns true if the transaction is armed for execution on this sid (used to avoid
//! duplicate runs). Supports local transactions under multi as well.
Expand Down
Loading