Skip to content

Commit

Permalink
chore(transaction): Introduce RunCallback
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
  • Loading branch information
dranikpg committed Mar 21, 2024
1 parent f7292de commit 295861a
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 44 deletions.
102 changes: 58 additions & 44 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <absl/strings/match.h>

#include "base/logging.h"
#include "glog/logging.h"
#include "server/blocking_controller.h"
#include "server/command_registry.h"
#include "server/db_slice.h"
Expand Down Expand Up @@ -573,61 +574,21 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {

bool was_suspended = sd.local_mask & SUSPENDED_Q;
bool awaked_prerun = sd.local_mask & AWAKED_Q;
bool is_concluding = coordinator_state_ & COORD_CONCLUDING;

IntentLock::Mode mode = LockMode();

DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED) || (multi_ && multi_->mode == GLOBAL));
DCHECK(!txq_ooo || (sd.local_mask & OUT_OF_ORDER));

/*************************************************************************/
// Actually running the callback.
// If you change the logic here, also please change the logic
RunnableResult result;
try {
// if a transaction is suspended, we still run it because of brpoplpush/blmove case
// that needs to run lpush on its suspended shard.
result = (*cb_ptr_)(this, shard);

if (unique_shard_cnt_ == 1) {
cb_ptr_ = nullptr; // We can do it because only a single thread runs the callback.
local_result_ = result;
} else {
if (result == OpStatus::OUT_OF_MEMORY) {
absl::base_internal::SpinLockHolder lk{&local_result_mu_};
CHECK(local_result_ == OpStatus::OK || local_result_ == OpStatus::OUT_OF_MEMORY);
local_result_ = result;
} else {
CHECK_EQ(OpStatus::OK, result);
}
}
} catch (std::bad_alloc&) {
LOG_FIRST_N(ERROR, 16) << " out of memory"; // TODO: to log at most once per sec.
absl::base_internal::SpinLockHolder lk{&local_result_mu_};
local_result_ = OpStatus::OUT_OF_MEMORY;
} catch (std::exception& e) {
LOG(FATAL) << "Unexpected exception " << e.what();
}
RunCallback(shard);

/*************************************************************************/
// at least the coordinator thread owns the reference.
DCHECK_GE(GetUseCount(), 1u);

shard->db_slice().OnCbFinish();

// Handle result flags to alter behaviour.
if (result.flags & RunnableResult::AVOID_CONCLUDING) {
// Multi shard callbacks should either all or none choose to conclude. Because they can't
// communicate, the must know their decision ahead, consequently there is no point in using this
// flag.
CHECK_EQ(unique_shard_cnt_, 1u);
DCHECK(is_concluding || multi_->concluding);
is_concluding = false;
}

// Log to jounrnal only once the command finished running
if (is_concluding || (multi_ && multi_->concluding))
LogAutoJournalOnShard(shard, result);
bool is_concluding = coordinator_state_ & COORD_CONCLUDING;

// If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation
// and successive hops are run by continuation_trans_ in engine shard.
Expand Down Expand Up @@ -692,6 +653,54 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
return !is_concluding;
}

void Transaction::RunCallback(EngineShard* shard) {
DCHECK_EQ(EngineShard::tlocal(), shard);

// Actually running the callback.
// If you change the logic here, also please change the logic
RunnableResult result;
try {
// if a transaction is suspended, we still run it because of brpoplpush/blmove case
// that needs to run lpush on its suspended shard.
result = (*cb_ptr_)(this, shard);

if (unique_shard_cnt_ == 1) {
cb_ptr_ = nullptr; // We can do it because only a single thread runs the callback.
local_result_ = result;
} else {
if (result == OpStatus::OUT_OF_MEMORY) {
absl::base_internal::SpinLockHolder lk{&local_result_mu_};
CHECK(local_result_ == OpStatus::OK || local_result_ == OpStatus::OUT_OF_MEMORY);
local_result_ = result;
} else {
CHECK_EQ(OpStatus::OK, result);
}
}
} catch (std::bad_alloc&) {
LOG_FIRST_N(ERROR, 16) << " out of memory"; // TODO: to log at most once per sec.
absl::base_internal::SpinLockHolder lk{&local_result_mu_};
local_result_ = OpStatus::OUT_OF_MEMORY;
} catch (std::exception& e) {
LOG(FATAL) << "Unexpected exception " << e.what();
}

shard->db_slice().OnCbFinish();

// Handle result flags to alter behaviour.
if (result.flags & RunnableResult::AVOID_CONCLUDING) {
// Multi shard callbacks should either all or none choose to conclude. Because they can't
// communicate, the must know their decision ahead, consequently there is no point in using this
// flag.
CHECK_EQ(unique_shard_cnt_, 1u);
DCHECK((coordinator_state_ & COORD_CONCLUDING) || multi_->concluding);
coordinator_state_ &= ~COORD_CONCLUDING; // safe because single shard
}

// Log to jounrnal only once the command finished running
if ((coordinator_state_ & COORD_CONCLUDING) || (multi_ && multi_->concluding))
LogAutoJournalOnShard(shard, result);
}

// TODO: For multi-transactions we should be able to deduce mode() at run-time based
// on the context. For regular multi-transactions we can actually inspect all commands.
// For eval-like transactions - we can decide based on the command flavor (EVAL/EVALRO) or
Expand Down Expand Up @@ -942,8 +951,7 @@ void Transaction::ExecuteAsync() {
});

auto* ss = ServerState::tlocal();
if (unique_shard_cnt_ == 1 && ss->thread_index() == unique_shard_id_ &&
ss->AllowInlineScheduling()) {
if (CanRunInlined()) {
DVLOG(1) << "Short-circuit ExecuteAsync " << DebugId();
EngineShard::tlocal()->PollExecution("exec_cb", this);
return;
Expand Down Expand Up @@ -1597,6 +1605,12 @@ void Transaction::CancelBlocking(std::function<OpStatus(ArgSlice)> status_cb) {
blocking_barrier_.Close();
}

bool Transaction::CanRunInlined() const {
auto* ss = ServerState::tlocal();
return unique_shard_cnt_ == 1 && unique_shard_id_ == ss->thread_index() &&
ss->AllowInlineScheduling();
}

OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
KeyIndex key_index;

Expand Down
6 changes: 6 additions & 0 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,9 @@ class Transaction {
// Finish hop, decrement run barrier
void FinishHop();

// Run actual callback on shard, store result if single shard or OOM was catched
void RunCallback(EngineShard* shard);

// Adds itself to watched queue in the shard. Must run in that shard thread.
OpStatus WatchInShard(ArgSlice keys, EngineShard* shard, KeyReadyChecker krc);

Expand Down Expand Up @@ -553,6 +556,9 @@ class Transaction {
// Should be called immediately after the last hop.
void LogAutoJournalOnShard(EngineShard* shard, RunnableResult shard_result);

// Whether the callback can be run directly on this thread without dispatching on the shard queue
bool CanRunInlined() const;

uint32_t GetUseCount() const {
return use_count_.load(std::memory_order_relaxed);
}
Expand Down

0 comments on commit 295861a

Please sign in to comment.