Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: little transaction cleanup #2608

Merged
merged 2 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1293,11 +1293,9 @@ OpResult<void> GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des
Transaction* transaction = cntx->transaction;

if (transaction->GetUniqueShardCnt() == 1) {
transaction->RenableAutoJournal(); // Safe to use RENAME with single shard
auto cb = [&](Transaction* t, EngineShard* shard) {
auto ec = OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest);
// Incase of uniqe shard count we can use rename command in replica.
t->RenableAutoJournal();
return ec;
return OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest);
};
OpResult<void> result = transaction->ScheduleSingleHopT(std::move(cb));

Expand Down
7 changes: 2 additions & 5 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -717,13 +717,10 @@ void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, Lis
OpResult<string> result;

if (cntx->transaction->GetUniqueShardCnt() == 1) {
cntx->transaction->RenableAutoJournal(); // On single shard we can use the auto journal flow.
auto cb = [&](Transaction* t, EngineShard* shard) {
auto ec = OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir);
// On single shard we can use the auto journal flow.
t->RenableAutoJournal();
return ec;
return OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir);
};

result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
} else {
cntx->transaction->Schedule();
Expand Down
68 changes: 37 additions & 31 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ thread_local Transaction::TLTmpSpace Transaction::tmp_space;

namespace {

// Global txid sequence
atomic_uint64_t op_seq{1};

constexpr size_t kTransSize [[maybe_unused]] = sizeof(Transaction);
Expand Down Expand Up @@ -90,12 +91,12 @@ std::ostream& operator<<(std::ostream& os, Transaction::time_point tp) {
return os << ms << "ms";
}

} // namespace

IntentLock::Mode Transaction::LockMode() const {
return cid_->IsReadOnly() ? IntentLock::SHARED : IntentLock::EXCLUSIVE;
uint16_t trans_id(const Transaction* ptr) {
return (intptr_t(ptr) >> 8) & 0xFFFF;
}

} // namespace

void Transaction::PhasedBarrier::Start(uint32_t count) {
DCHECK_EQ(DEBUG_Count(), 0u);
count_.store(count, memory_order_release);
Expand Down Expand Up @@ -159,13 +160,6 @@ cv_status Transaction::BatonBarrier::Wait(time_point tp) {
return cv_status::no_timeout;
}

/**
* @brief Construct a new Transaction:: Transaction object
*
* @param cid
* @param ess
* @param cs
*/
Transaction::Transaction(const CommandId* cid) : cid_{cid} {
InitTxTime();
string_view cmd_name(cid_->name());
Expand Down Expand Up @@ -1155,6 +1149,16 @@ uint16_t Transaction::GetLocalMask(ShardId sid) const {
return shard_data_[SidToId(sid)].local_mask;
}

IntentLock::Mode Transaction::LockMode() const {
return cid_->IsReadOnly() ? IntentLock::SHARED : IntentLock::EXCLUSIVE;
}

OpArgs Transaction::GetOpArgs(EngineShard* shard) const {
DCHECK(IsActive(shard->shard_id()));
DCHECK((multi_ && multi_->role == SQUASHED_STUB) || (run_barrier_.DEBUG_Count() > 0));
return OpArgs{shard, this, GetDbContext()};
}

// Runs within a engine shard thread.
// Optimized path that schedules and runs transactions out of order if possible.
// Returns true if eagerly executed, false if the callback will be handled by the transaction
Expand Down Expand Up @@ -1278,35 +1282,31 @@ bool Transaction::CancelShardCb(EngineShard* shard) {
ShardId idx = SidToId(shard->shard_id());
auto& sd = shard_data_[idx];

auto pos = sd.pq_pos;
if (pos == TxQueue::kEnd)
TxQueue::Iterator prev_pos = exchange(sd.pq_pos, TxQueue::kEnd);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe instead prev_pos -> q_pos?

if (prev_pos == TxQueue::kEnd) {
DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED) == 0);
return false;

sd.pq_pos = TxQueue::kEnd;
}

TxQueue* txq = shard->txq();
TxQueue::Iterator head = txq->Head();
auto val = txq->At(pos);
Transaction* trans = absl::get<Transaction*>(val);
DCHECK(trans == this) << "Pos " << pos << ", txq size " << txq->size() << ", trans " << trans;
txq->Remove(pos);

if (sd.local_mask & KEYLOCK_ACQUIRED) {
auto mode = LockMode();
Transaction* trans = absl::get<Transaction*>(txq->At(prev_pos));
DCHECK(trans == this) << txq->size() << ' ' << sd.pq_pos << ' ' << trans->DebugId();
txq->Remove(prev_pos);

if (IsGlobal()) {
shard->shard_lock()->Release(LockMode());
} else {
auto lock_args = GetLockArgs(shard->shard_id());
DCHECK(sd.local_mask & KEYLOCK_ACQUIRED);
DCHECK(lock_args.args.size() > 0);
shard->db_slice().Release(mode, lock_args);
shard->db_slice().Release(LockMode(), lock_args);
sd.local_mask &= ~KEYLOCK_ACQUIRED;
}
if (IsGlobal()) {
shard->shard_lock()->Release(LockMode());
}

if (pos == head && !txq->Empty()) {
return true;
}

return false;
// Check if we need to poll the next head
return prev_pos == head && !txq->Empty();
}

// runs in engine-shard thread.
Expand Down Expand Up @@ -1541,7 +1541,7 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul
}

// If autojournaling was disabled and not re-enabled, skip it
if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !renabled_auto_journal_.load(memory_order_relaxed))
if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !re_enabled_auto_journal_)
return;

// TODO: Handle complex commands like LMPOP correctly once they are implemented.
Expand Down Expand Up @@ -1586,6 +1586,12 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt
unique_slot_checker_.GetUniqueSlotId(), {}, false);
}

void Transaction::RenableAutoJournal() {
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
DCHECK(cid_->opt_mask() & CO::NO_AUTOJOURNAL);
DCHECK_EQ(run_barrier_.DEBUG_Count(), 0u); // Can't be changed while dispatching
re_enabled_auto_journal_ = true;
}

void Transaction::CancelBlocking(std::function<OpStatus(ArgSlice)> status_cb) {
// We're on the owning thread of this transaction, so we can safely access it's data below.
// First, check if it makes sense to proceed.
Expand Down
45 changes: 20 additions & 25 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,13 @@ class Transaction {
enum LocalMask : uint16_t {
ACTIVE = 1, // Whether its active on this shard (to schedule or execute hops)
ARMED = 1 << 1, // Whether its armed (the hop was prepared)
OUT_OF_ORDER =
1 << 2, // Whether it can run out of order. Undefined if KEYLOCK_ACQUIRED isn't set
KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired
SUSPENDED_Q = 1 << 4, // Whether it suspended (by WatchInShard())
AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended())
UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb
// Whether it can run out of order. Undefined if KEYLOCK_ACQUIRED isn't set
OUT_OF_ORDER = 1 << 2,
// Whether its key locks are acquired, never set for global commands.
KEYLOCK_ACQUIRED = 1 << 3,
SUSPENDED_Q = 1 << 4, // Whether it suspended (by WatchInShard())
AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended())
UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb
};

public:
Expand Down Expand Up @@ -220,11 +221,6 @@ class Transaction {
// Must be called from coordinator thread.
void CancelBlocking(std::function<OpStatus(ArgSlice)>);

// In some cases for non auto-journaling commands we want to enable the auto journal flow.
void RenableAutoJournal() {
renabled_auto_journal_.store(true, std::memory_order_relaxed);
}

// Prepare a squashed hop on given shards.
// Only compatible with multi modes that acquire all locks ahead - global and lock_ahead.
void PrepareSquashedMultiHop(const CommandId* cid, absl::FunctionRef<bool(ShardId)> enabled);
Expand Down Expand Up @@ -268,6 +264,12 @@ class Transaction {
// Returns the state mask on this shard. Safe only when the transaction is armed (or blocked).
uint16_t GetLocalMask(ShardId sid) const;

// If blocking tx was woken up on this shard, get wake key.
std::optional<std::string_view> GetWakeKey(ShardId sid) const;

// Get OpArgs for specific shard
OpArgs GetOpArgs(EngineShard* shard) const;

uint32_t GetLocalTxqPos(ShardId sid) const {
return shard_data_[SidToId(sid)].pq_pos;
}
Expand Down Expand Up @@ -311,13 +313,6 @@ class Transaction {

bool IsGlobal() const;

// If blocking tx was woken up on this shard, get wake key.
std::optional<std::string_view> GetWakeKey(ShardId sid) const;

OpArgs GetOpArgs(EngineShard* shard) const {
return OpArgs{shard, this, GetDbContext()};
}

DbContext GetDbContext() const {
return DbContext{.db_index = db_index_, .time_now_ms = time_now_ms_};
}
Expand Down Expand Up @@ -346,6 +341,10 @@ class Transaction {
bool multi_commands, bool allow_await) const;
void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const;

// Re-enable auto journal for commands marked as NO_AUTOJOURNAL. Call during setup.
void RenableAutoJournal();

// Clear all state to make transaction re-usable
void Refurbish();

// Get keys multi transaction was initialized with, normalized and unique
Expand Down Expand Up @@ -377,7 +376,7 @@ class Transaction {
uint32_t arg_count = 0;

// Position in the tx queue. OOO or cancelled schedules remove themselves by this index.
uint32_t pq_pos = TxQueue::kEnd;
TxQueue::Iterator pq_pos = TxQueue::kEnd;

// State of shard - bitmask with LocalState flags
uint16_t local_mask = 0;
Expand Down Expand Up @@ -603,8 +602,8 @@ class Transaction {
// Stores the full undivided command.
CmdArgList full_args_;

// True if NO_AUTOJOURNAL command asked to enable auto journal
std::atomic<bool> renabled_auto_journal_ = false;
// Set if a NO_AUTOJOURNAL command asked to enable auto journal again
bool re_enabled_auto_journal_ = false;

// Reverse argument mapping for ReverseArgIndex to convert from shard index to original index.
std::vector<uint32_t> reverse_index_;
Expand Down Expand Up @@ -661,10 +660,6 @@ template <typename F> auto Transaction::ScheduleSingleHopT(F&& f) -> decltype(f(
return res;
}

inline uint16_t trans_id(const Transaction* ptr) {
return (intptr_t(ptr) >> 8) & 0xFFFF;
}

OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args);

} // namespace dfly
Loading