Skip to content

Commit

Permalink
feat(set family): rewrite set store commands to journal (#691)
Browse files Browse the repository at this point in the history
  • Loading branch information
adiholden authored Jan 16, 2023
1 parent daf5473 commit 1f5811f
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ bool ParseDouble(string_view src, double* value) {
}

void OpArgs::RecordJournal(string_view cmd, ArgSlice args) const {
tx->LogJournalOnShard(shard, make_pair(cmd, args));
tx->LogJournalOnShard(shard, make_pair(cmd, args), 1);
}

#define ADD(x) (x) += o.x
Expand Down
57 changes: 38 additions & 19 deletions src/server/set_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -527,8 +527,8 @@ OpStatus NoOpCb(Transaction* t, EngineShard* shard) {
};

// if overwrite is true then OpAdd writes vals into the key and discards its previous value.
OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice vals,
bool overwrite) {
OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice vals, bool overwrite,
bool journal_update) {
auto* es = op_args.shard;
auto& db_slice = es->db_slice();

Expand All @@ -538,7 +538,9 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
if (overwrite && vals.empty()) {
auto it = db_slice.FindExt(op_args.db_cntx, key).first;
db_slice.Del(op_args.db_cntx.db_index, it);

if (journal_update && op_args.shard->journal()) {
op_args.RecordJournal("DEL"sv, ArgSlice{key});
}
return 0;
}

Expand Down Expand Up @@ -609,7 +611,13 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
}

db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !new_key);

if (journal_update && op_args.shard->journal()) {
op_args.RecordJournal("DEL"sv, ArgSlice{key});
vector<string_view> mapped(vals.size() + 1);
mapped[0] = key;
std::copy(vals.begin(), vals.end(), mapped.begin() + 1);
op_args.RecordJournal("SADD"sv, mapped);
}
return res;
}

Expand Down Expand Up @@ -658,7 +666,8 @@ OpResult<uint32_t> OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_
return res;
}

OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, const ArgSlice& vals) {
OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, const ArgSlice& vals,
bool journal_rewrite) {
auto* es = op_args.shard;
auto& db_slice = es->db_slice();
OpResult<PrimeIterator> find_res = db_slice.Find(op_args.db_cntx, key, OBJ_SET);
Expand All @@ -676,6 +685,12 @@ OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, const ArgSlice&
if (isempty) {
CHECK(db_slice.Del(op_args.db_cntx.db_index, find_res.value()));
}
if (journal_rewrite && op_args.shard->journal()) {
vector<string_view> mapped(vals.size() + 1);
mapped[0] = key;
std::copy(vals.begin(), vals.end(), mapped.begin() + 1);
op_args.RecordJournal("SREM"sv, mapped);
}

return removed;
}
Expand All @@ -685,8 +700,8 @@ OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, const ArgSlice&
// and reports the result.
class Mover {
public:
Mover(string_view src, string_view dest, string_view member)
: src_(src), dest_(dest), member_(member) {
Mover(string_view src, string_view dest, string_view member, bool journal_rewrite)
: src_(src), dest_(dest), member_(member), journal_rewrite_(journal_rewrite) {
}

void Find(Transaction* t);
Expand All @@ -698,6 +713,7 @@ class Mover {

string_view src_, dest_, member_;
OpResult<bool> found_[2];
bool journal_rewrite_;
};

OpStatus Mover::OpFind(Transaction* t, EngineShard* es) {
Expand Down Expand Up @@ -729,10 +745,10 @@ OpStatus Mover::OpMutate(Transaction* t, EngineShard* es) {
OpArgs op_args = t->GetOpArgs(es);
for (auto k : largs) {
if (k == src_) {
CHECK_EQ(1u, OpRem(op_args, k, {member_}).value()); // must succeed.
CHECK_EQ(1u, OpRem(op_args, k, {member_}, journal_rewrite_).value()); // must succeed.
} else {
DCHECK_EQ(k, dest_);
OpAdd(op_args, k, {member_}, false);
OpAdd(op_args, k, {member_}, false, journal_rewrite_);
}
}

Expand Down Expand Up @@ -1036,7 +1052,7 @@ void SAdd(CmdArgList args, ConnectionContext* cntx) {
ArgSlice arg_slice{vals.data(), vals.size()};

auto cb = [&](Transaction* t, EngineShard* shard) {
return OpAdd(t->GetOpArgs(shard), key, arg_slice, false);
return OpAdd(t->GetOpArgs(shard), key, arg_slice, false, false);
};

OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
Expand Down Expand Up @@ -1107,7 +1123,7 @@ void SMove(CmdArgList args, ConnectionContext* cntx) {
string_view dest = ArgS(args, 2);
string_view member = ArgS(args, 3);

Mover mover{src, dest, member};
Mover mover{src, dest, member, true};
cntx->transaction->Schedule();

mover.Find(cntx->transaction);
Expand All @@ -1130,7 +1146,7 @@ void SRem(CmdArgList args, ConnectionContext* cntx) {
ArgSlice span{vals.data(), vals.size()};

auto cb = [&](Transaction* t, EngineShard* shard) {
return OpRem(t->GetOpArgs(shard), key, span);
return OpRem(t->GetOpArgs(shard), key, span, false);
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));

Expand Down Expand Up @@ -1276,7 +1292,7 @@ void SDiffStore(CmdArgList args, ConnectionContext* cntx) {
SvArray result = ToSvArray(rsv.value());
auto store_cb = [&](Transaction* t, EngineShard* shard) {
if (shard->shard_id() == dest_shard) {
OpAdd(t->GetOpArgs(shard), dest_key, result, true);
OpAdd(t->GetOpArgs(shard), dest_key, result, true, true);
}

return OpStatus::OK;
Expand Down Expand Up @@ -1355,7 +1371,7 @@ void SInterStore(CmdArgList args, ConnectionContext* cntx) {

auto store_cb = [&](Transaction* t, EngineShard* shard) {
if (shard->shard_id() == dest_shard) {
OpAdd(t->GetOpArgs(shard), dest_key, result.value(), true);
OpAdd(t->GetOpArgs(shard), dest_key, result.value(), true, true);
}

return OpStatus::OK;
Expand Down Expand Up @@ -1419,7 +1435,7 @@ void SUnionStore(CmdArgList args, ConnectionContext* cntx) {

auto store_cb = [&](Transaction* t, EngineShard* shard) {
if (shard->shard_id() == dest_shard) {
OpAdd(t->GetOpArgs(shard), dest_key, result, true);
OpAdd(t->GetOpArgs(shard), dest_key, result, true, true);
}

return OpStatus::OK;
Expand Down Expand Up @@ -1551,18 +1567,21 @@ using CI = CommandId;
void SetFamily::Register(CommandRegistry* registry) {
*registry << CI{"SADD", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(SAdd)
<< CI{"SDIFF", CO::READONLY, -2, 1, -1, 1}.HFUNC(SDiff)
<< CI{"SDIFFSTORE", CO::WRITE | CO::DENYOOM, -3, 1, -1, 1}.HFUNC(SDiffStore)
<< CI{"SDIFFSTORE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, -3, 1, -1, 1}.HFUNC(
SDiffStore)
<< CI{"SINTER", CO::READONLY, -2, 1, -1, 1}.HFUNC(SInter)
<< CI{"SINTERSTORE", CO::WRITE | CO::DENYOOM, -3, 1, -1, 1}.HFUNC(SInterStore)
<< CI{"SINTERSTORE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, -3, 1, -1, 1}.HFUNC(
SInterStore)
<< CI{"SMEMBERS", CO::READONLY, 2, 1, 1, 1}.HFUNC(SMembers)
<< CI{"SISMEMBER", CO::FAST | CO::READONLY, 3, 1, 1, 1}.HFUNC(SIsMember)
<< CI{"SMISMEMBER", CO::READONLY, -3, 1, 1, 1}.HFUNC(SMIsMember)
<< CI{"SMOVE", CO::FAST | CO::WRITE, 4, 1, 2, 1}.HFUNC(SMove)
<< CI{"SMOVE", CO::FAST | CO::WRITE | CO::NO_AUTOJOURNAL, 4, 1, 2, 1}.HFUNC(SMove)
<< CI{"SREM", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(SRem)
<< CI{"SCARD", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(SCard)
<< CI{"SPOP", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, -2, 1, 1, 1}.HFUNC(SPop)
<< CI{"SUNION", CO::READONLY, -2, 1, -1, 1}.HFUNC(SUnion)
<< CI{"SUNIONSTORE", CO::WRITE | CO::DENYOOM, -3, 1, -1, 1}.HFUNC(SUnionStore)
<< CI{"SUNIONSTORE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, -3, 1, -1, 1}.HFUNC(
SUnionStore)
<< CI{"SSCAN", CO::READONLY, -3, 1, 1, 1}.HFUNC(SScan);

if (absl::GetFlag(FLAGS_use_set2)) {
Expand Down
7 changes: 4 additions & 3 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1239,14 +1239,15 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) {
entry_payload = make_pair(cmd, ShardArgsInShard(shard->shard_id()));
}

LogJournalOnShard(shard, std::move(entry_payload));
LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_);
}

void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload) const {
void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload,
uint32_t shard_cnt) const {
auto journal = shard->journal();
CHECK(journal);
auto opcode = multi_ ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND;
journal->RecordEntry(txid_, opcode, db_index_, unique_shard_cnt_, std::move(payload));
journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, std::move(payload));
}

void Transaction::BreakOnShutdown() {
Expand Down
3 changes: 2 additions & 1 deletion src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ class Transaction {
}

// Log a journal entry on shard with payload.
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload) const;
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload,
uint32_t shard_cnt) const;

private:
struct LockCnt {
Expand Down

0 comments on commit 1f5811f

Please sign in to comment.