Skip to content

Commit

Permalink
EVAL multi modes + non atomic modes (#818)
Browse files Browse the repository at this point in the history
- Implement multi modes for eval
- Implement non atomic mode
- Enhance tests
  • Loading branch information
dranikpg authored Feb 20, 2023
1 parent dd952c3 commit 03e99a5
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 80 deletions.
4 changes: 0 additions & 4 deletions src/server/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ StoredCmd::StoredCmd(const CommandId* d, CmdArgList args) : descr(d) {
arg_list_ = {arg_vec_.data(), arg_vec_.size()};
}

void StoredCmd::Invoke(ConnectionContext* ctx) {
descr->Invoke(arg_list_, ctx);
}

void ConnectionContext::ChangeMonitor(bool start) {
// This will either remove or register a new connection
// at the "top level" thread --> ServerState context
Expand Down
2 changes: 0 additions & 2 deletions src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ struct StoredCmd {
CmdArgList ArgList() const {
return arg_list_;
}

void Invoke(ConnectionContext* ctx);
};

struct ConnectionState {
Expand Down
162 changes: 120 additions & 42 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ ABSL_FLAG(uint32_t, memcache_port, 0, "Memcached port");

ABSL_FLAG(int, multi_exec_mode, 1,
"Set multi exec atomicity mode: 1 for global, 2 for locking ahead, 3 for locking "
"incrementally");
"incrementally, 4 for non atomic");
ABSL_FLAG(int, multi_eval_mode, 2,
"Set EVAL atomicity mode: 1 for global, 2 for locking ahead, 3 for locking "
"incrementally, 4 for non atomic");

namespace dfly {

Expand Down Expand Up @@ -534,6 +537,34 @@ static void MultiSetError(ConnectionContext* cntx) {
}
}

// Return OK if all keys are allowed to be accessed: either declared in EVAL or
// transaction is running in global or non-atomic mode.
OpStatus CheckKeysDeclared(const ConnectionState::ScriptInfo& eval_info, const CommandId* cid,
CmdArgList args, Transaction* trans) {
Transaction::MultiMode multi_mode = trans->GetMultiMode();

// We either scheduled on all shards or re-schedule for each operation,
// so we are not restricted to any keys.
if (multi_mode == Transaction::GLOBAL || multi_mode == Transaction::NON_ATOMIC)
return OpStatus::OK;

OpResult<KeyIndex> key_index_res = DetermineKeys(cid, args);
if (!key_index_res)
return key_index_res.status();

const auto& key_index = *key_index_res;
for (unsigned i = key_index.start; i < key_index.end; ++i) {
if (!eval_info.keys.contains(ArgS(args, i))) {
return OpStatus::KEY_NOTFOUND;
}
}

if (unsigned i = key_index.bonus; i && !eval_info.keys.contains(ArgS(args, i)))
return OpStatus::KEY_NOTFOUND;

return OpStatus::OK;
}

void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) {
CHECK(!args.empty());
DCHECK_NE(0u, shard_set->size()) << "Init was not called";
Expand Down Expand Up @@ -656,24 +687,20 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
if (under_script) {
DCHECK(dfly_cntx->transaction);
if (IsTransactional(cid)) {
OpResult<KeyIndex> key_index_res = DetermineKeys(cid, args);
if (!key_index_res)
return (*cntx)->SendError(key_index_res.status());

const auto& key_index = *key_index_res;
for (unsigned i = key_index.start; i < key_index.end; ++i) {
string_view key = ArgS(args, i);
if (!dfly_cntx->conn_state.script_info->keys.contains(key)) {
return (*cntx)->SendError("script tried accessing undeclared key");
}
}
OpStatus status =
CheckKeysDeclared(*dfly_cntx->conn_state.script_info, cid, args, dfly_cntx->transaction);

if (status == OpStatus::KEY_NOTFOUND)
return (*cntx)->SendError("script tried accessing undeclared key");

if (status != OpStatus::OK)
return (*cntx)->SendError(status);

dfly_cntx->transaction->MultiSwitchCmd(cid);
OpStatus st = dfly_cntx->transaction->InitByArgs(dfly_cntx->conn_state.db_index, args);
status = dfly_cntx->transaction->InitByArgs(dfly_cntx->conn_state.db_index, args);

if (st != OpStatus::OK) {
return (*cntx)->SendError(st);
}
if (status != OpStatus::OK)
return (*cntx)->SendError(status);
}
} else {
DCHECK(dfly_cntx->transaction == nullptr);
Expand All @@ -696,29 +723,45 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)

dfly_cntx->cid = cid;

try {
cid->Invoke(args, dfly_cntx);
} catch (std::exception& e) {
LOG(ERROR) << "Internal error, system probably unstable " << e.what();
// Collect stats for all regular transactions and all multi transactions from scripts, except EVAL
// itself. EXEC does not use DispatchCommand for dispatching.
bool collect_stats =
dfly_cntx->transaction && (!dfly_cntx->transaction->IsMulti() || under_script);
if (!InvokeCmd(args, cid, dfly_cntx, collect_stats)) {
dfly_cntx->reply_builder()->SendError("Internal Error");
dfly_cntx->reply_builder()->CloseConnection();
}

end_usec = ProactorBase::GetMonotonicTimeNs();

request_latency_usec.IncBy(cmd_str, (end_usec - start_usec) / 1000);
if (dist_trans) {
bool is_ooo = dist_trans->IsOOO();
dfly_cntx->last_command_debug.clock = dist_trans->txid();
dfly_cntx->last_command_debug.is_ooo = is_ooo;
etl.stats.ooo_tx_cnt += is_ooo;
}

if (!under_script) {
dfly_cntx->transaction = nullptr;
}
}

bool Service::InvokeCmd(CmdArgList args, const CommandId* cid, ConnectionContext* cntx,
bool record_stats) {
try {
cid->Invoke(args, cntx);
} catch (std::exception& e) {
LOG(ERROR) << "Internal error, system probably unstable " << e.what();
return false;
}

if (record_stats) {
DCHECK(cntx->transaction);
ServerState& etl = *ServerState::tlocal();
bool is_ooo = cntx->transaction->IsOOO();

cntx->last_command_debug.clock = cntx->transaction->txid();
cntx->last_command_debug.is_ooo = is_ooo;
etl.stats.ooo_tx_cnt += is_ooo;
}

return true;
}

void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
facade::ConnectionContext* cntx) {
absl::InlinedVector<MutableSlice, 8> args;
Expand Down Expand Up @@ -1007,6 +1050,13 @@ void Service::EvalSha(CmdArgList args, ConnectionContext* cntx) {
ss->RecordCallLatency(sha, (end - start) / 1000);
}

vector<bool> DetermineKeyShards(CmdArgList keys) {
vector<bool> out(shard_set->size());
for (auto k : keys)
out[Shard(facade::ToSV(k), out.size())] = true;
return out;
}

void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter,
ConnectionContext* cntx) {
DCHECK(!eval_args.sha.empty());
Expand Down Expand Up @@ -1042,8 +1092,24 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter,
}
DCHECK(cntx->transaction);

if (!eval_args.keys.empty())
bool scheduled = false;
int multi_mode = absl::GetFlag(FLAGS_multi_eval_mode);
DCHECK(multi_mode >= Transaction::GLOBAL && multi_mode <= Transaction::NON_ATOMIC);

if (multi_mode == Transaction::GLOBAL) {
scheduled = true;
cntx->transaction->StartMultiGlobal(cntx->db_index());
} else if (multi_mode == Transaction::LOCK_INCREMENTAL && !eval_args.keys.empty()) {
scheduled = true;
vector<bool> shards = DetermineKeyShards(eval_args.keys);
cntx->transaction->StartMultiLockedIncr(cntx->db_index(), shards);
} else if (multi_mode == Transaction::LOCK_AHEAD && !eval_args.keys.empty()) {
scheduled = true;
cntx->transaction->StartMultiLockedAhead(cntx->db_index(), eval_args.keys);
} else if (multi_mode == Transaction::NON_ATOMIC) {
scheduled = true;
cntx->transaction->StartMultiNonAtomic();
};

interpreter->SetGlobalArray("KEYS", eval_args.keys);
interpreter->SetGlobalArray("ARGV", eval_args.args);
Expand All @@ -1056,7 +1122,7 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter,
cntx->conn_state.script_info.reset(); // reset script_info

// Conclude the transaction.
if (!eval_args.keys.empty())
if (scheduled)
cntx->transaction->UnlockMulti();

if (result == Interpreter::RUN_ERR) {
Expand Down Expand Up @@ -1187,18 +1253,30 @@ bool StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo*
return false;

int multi_mode = absl::GetFlag(FLAGS_multi_exec_mode);
CHECK(multi_mode >= 1 && multi_mode <= 3);
DCHECK(multi_mode >= Transaction::GLOBAL && multi_mode <= Transaction::NON_ATOMIC);

if (global || multi_mode == Transaction::GLOBAL) {
trans->StartMultiGlobal(dbid);
} else if (multi_mode == Transaction::LOCK_AHEAD) {
*tmp_keys = CollectAllKeys(exec_info);
trans->StartMultiLockedAhead(dbid, CmdArgList{*tmp_keys});
} else {
vector<bool> shards = DetermineKeyShards(exec_info);
DCHECK(std::any_of(shards.begin(), shards.end(), [](bool s) { return s; }));
trans->StartMultiLockedIncr(dbid, shards);
}
// Atomic modes fall back to GLOBAL if they contain global commands.
if (global &&
(multi_mode == Transaction::LOCK_AHEAD || multi_mode == Transaction::LOCK_INCREMENTAL))
multi_mode = Transaction::GLOBAL;

switch ((Transaction::MultiMode)multi_mode) {
case Transaction::GLOBAL:
trans->StartMultiGlobal(dbid);
break;
case Transaction::LOCK_AHEAD:
*tmp_keys = CollectAllKeys(exec_info);
trans->StartMultiLockedAhead(dbid, CmdArgList{*tmp_keys});
break;
case Transaction::LOCK_INCREMENTAL:
trans->StartMultiLockedIncr(dbid, DetermineKeyShards(exec_info));
break;
case Transaction::NON_ATOMIC:
trans->StartMultiNonAtomic();
break;
case Transaction::NOT_DETERMINED:
DCHECK(false);
};
return true;
}

Expand Down Expand Up @@ -1248,8 +1326,8 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
break;
}
}
scmd.Invoke(cntx);
if (rb->GetError()) // checks for i/o error, not logical error.
bool ok = InvokeCmd(scmd.ArgList(), scmd.descr, cntx, true);
if (!ok || rb->GetError()) // checks for i/o error, not logical error.
break;
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/server/main_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class Service : public facade::ServiceInterface {
void Shutdown();

void DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) final;

// Returns true if command was executed successfully.
bool InvokeCmd(CmdArgList args, const CommandId* cid, ConnectionContext* cntx, bool record_stats);

void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
facade::ConnectionContext* cntx) final;

Expand Down
Loading

0 comments on commit 03e99a5

Please sign in to comment.