diff --git a/src/server/common.cc b/src/server/common.cc index 6e8379153d70..970a91763b69 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -194,7 +194,7 @@ bool ParseDouble(string_view src, double* value) { } void OpArgs::RecordJournal(string_view cmd, ArgSlice args) const { - tx->LogJournalOnShard(shard, make_pair(cmd, args)); + tx->LogJournalOnShard(shard, make_pair(cmd, args), 1); } #define ADD(x) (x) += o.x diff --git a/src/server/set_family.cc b/src/server/set_family.cc index 3f54bbddd229..7313fa0cf54a 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -527,8 +527,8 @@ OpStatus NoOpCb(Transaction* t, EngineShard* shard) { }; // if overwrite is true then OpAdd writes vals into the key and discards its previous value. -OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice vals, - bool overwrite) { +OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice vals, bool overwrite, + bool journal_update) { auto* es = op_args.shard; auto& db_slice = es->db_slice(); @@ -538,7 +538,9 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v if (overwrite && vals.empty()) { auto it = db_slice.FindExt(op_args.db_cntx, key).first; db_slice.Del(op_args.db_cntx.db_index, it); - + if (journal_update && op_args.shard->journal()) { + op_args.RecordJournal("DEL"sv, ArgSlice{key}); + } return 0; } @@ -609,7 +611,13 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v } db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !new_key); - + if (journal_update && op_args.shard->journal()) { + op_args.RecordJournal("DEL"sv, ArgSlice{key}); + vector mapped(vals.size() + 1); + mapped[0] = key; + std::copy(vals.begin(), vals.end(), mapped.begin() + 1); + op_args.RecordJournal("SADD"sv, mapped); + } return res; } @@ -658,7 +666,8 @@ OpResult OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_ return res; } -OpResult OpRem(const OpArgs& op_args, string_view key, const ArgSlice& vals) { +OpResult OpRem(const OpArgs& op_args, string_view key, const ArgSlice& vals, + bool journal_rewrite) { auto* es = op_args.shard; auto& db_slice = es->db_slice(); OpResult find_res = db_slice.Find(op_args.db_cntx, key, OBJ_SET); @@ -676,6 +685,12 @@ OpResult OpRem(const OpArgs& op_args, string_view key, const ArgSlice& if (isempty) { CHECK(db_slice.Del(op_args.db_cntx.db_index, find_res.value())); } + if (journal_rewrite && op_args.shard->journal()) { + vector mapped(vals.size() + 1); + mapped[0] = key; + std::copy(vals.begin(), vals.end(), mapped.begin() + 1); + op_args.RecordJournal("SREM"sv, mapped); + } return removed; } @@ -685,8 +700,8 @@ OpResult OpRem(const OpArgs& op_args, string_view key, const ArgSlice& // and reports the result. class Mover { public: - Mover(string_view src, string_view dest, string_view member) - : src_(src), dest_(dest), member_(member) { + Mover(string_view src, string_view dest, string_view member, bool journal_rewrite) + : src_(src), dest_(dest), member_(member), journal_rewrite_(journal_rewrite) { } void Find(Transaction* t); @@ -698,6 +713,7 @@ class Mover { string_view src_, dest_, member_; OpResult found_[2]; + bool journal_rewrite_; }; OpStatus Mover::OpFind(Transaction* t, EngineShard* es) { @@ -729,10 +745,10 @@ OpStatus Mover::OpMutate(Transaction* t, EngineShard* es) { OpArgs op_args = t->GetOpArgs(es); for (auto k : largs) { if (k == src_) { - CHECK_EQ(1u, OpRem(op_args, k, {member_}).value()); // must succeed. + CHECK_EQ(1u, OpRem(op_args, k, {member_}, journal_rewrite_).value()); // must succeed. } else { DCHECK_EQ(k, dest_); - OpAdd(op_args, k, {member_}, false); + OpAdd(op_args, k, {member_}, false, journal_rewrite_); } } @@ -1036,7 +1052,7 @@ void SAdd(CmdArgList args, ConnectionContext* cntx) { ArgSlice arg_slice{vals.data(), vals.size()}; auto cb = [&](Transaction* t, EngineShard* shard) { - return OpAdd(t->GetOpArgs(shard), key, arg_slice, false); + return OpAdd(t->GetOpArgs(shard), key, arg_slice, false, false); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1107,7 +1123,7 @@ void SMove(CmdArgList args, ConnectionContext* cntx) { string_view dest = ArgS(args, 2); string_view member = ArgS(args, 3); - Mover mover{src, dest, member}; + Mover mover{src, dest, member, true}; cntx->transaction->Schedule(); mover.Find(cntx->transaction); @@ -1130,7 +1146,7 @@ void SRem(CmdArgList args, ConnectionContext* cntx) { ArgSlice span{vals.data(), vals.size()}; auto cb = [&](Transaction* t, EngineShard* shard) { - return OpRem(t->GetOpArgs(shard), key, span); + return OpRem(t->GetOpArgs(shard), key, span, false); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1276,7 +1292,7 @@ void SDiffStore(CmdArgList args, ConnectionContext* cntx) { SvArray result = ToSvArray(rsv.value()); auto store_cb = [&](Transaction* t, EngineShard* shard) { if (shard->shard_id() == dest_shard) { - OpAdd(t->GetOpArgs(shard), dest_key, result, true); + OpAdd(t->GetOpArgs(shard), dest_key, result, true, true); } return OpStatus::OK; @@ -1355,7 +1371,7 @@ void SInterStore(CmdArgList args, ConnectionContext* cntx) { auto store_cb = [&](Transaction* t, EngineShard* shard) { if (shard->shard_id() == dest_shard) { - OpAdd(t->GetOpArgs(shard), dest_key, result.value(), true); + OpAdd(t->GetOpArgs(shard), dest_key, result.value(), true, true); } return OpStatus::OK; @@ -1419,7 +1435,7 @@ void SUnionStore(CmdArgList args, ConnectionContext* cntx) { auto store_cb = [&](Transaction* t, EngineShard* shard) { if (shard->shard_id() == dest_shard) { - OpAdd(t->GetOpArgs(shard), dest_key, result, true); + OpAdd(t->GetOpArgs(shard), dest_key, result, true, true); } return OpStatus::OK; @@ -1551,18 +1567,21 @@ using CI = CommandId; void SetFamily::Register(CommandRegistry* registry) { *registry << CI{"SADD", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(SAdd) << CI{"SDIFF", CO::READONLY, -2, 1, -1, 1}.HFUNC(SDiff) - << CI{"SDIFFSTORE", CO::WRITE | CO::DENYOOM, -3, 1, -1, 1}.HFUNC(SDiffStore) + << CI{"SDIFFSTORE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, -3, 1, -1, 1}.HFUNC( + SDiffStore) << CI{"SINTER", CO::READONLY, -2, 1, -1, 1}.HFUNC(SInter) - << CI{"SINTERSTORE", CO::WRITE | CO::DENYOOM, -3, 1, -1, 1}.HFUNC(SInterStore) + << CI{"SINTERSTORE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, -3, 1, -1, 1}.HFUNC( + SInterStore) << CI{"SMEMBERS", CO::READONLY, 2, 1, 1, 1}.HFUNC(SMembers) << CI{"SISMEMBER", CO::FAST | CO::READONLY, 3, 1, 1, 1}.HFUNC(SIsMember) << CI{"SMISMEMBER", CO::READONLY, -3, 1, 1, 1}.HFUNC(SMIsMember) - << CI{"SMOVE", CO::FAST | CO::WRITE, 4, 1, 2, 1}.HFUNC(SMove) + << CI{"SMOVE", CO::FAST | CO::WRITE | CO::NO_AUTOJOURNAL, 4, 1, 2, 1}.HFUNC(SMove) << CI{"SREM", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(SRem) << CI{"SCARD", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(SCard) << CI{"SPOP", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, -2, 1, 1, 1}.HFUNC(SPop) << CI{"SUNION", CO::READONLY, -2, 1, -1, 1}.HFUNC(SUnion) - << CI{"SUNIONSTORE", CO::WRITE | CO::DENYOOM, -3, 1, -1, 1}.HFUNC(SUnionStore) + << CI{"SUNIONSTORE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, -3, 1, -1, 1}.HFUNC( + SUnionStore) << CI{"SSCAN", CO::READONLY, -3, 1, 1, 1}.HFUNC(SScan); if (absl::GetFlag(FLAGS_use_set2)) { diff --git a/src/server/transaction.cc b/src/server/transaction.cc index da4954e80445..49e795826cbb 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1239,14 +1239,15 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) { entry_payload = make_pair(cmd, ShardArgsInShard(shard->shard_id())); } - LogJournalOnShard(shard, std::move(entry_payload)); + LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_); } -void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload) const { +void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, + uint32_t shard_cnt) const { auto journal = shard->journal(); CHECK(journal); auto opcode = multi_ ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND; - journal->RecordEntry(txid_, opcode, db_index_, unique_shard_cnt_, std::move(payload)); + journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, std::move(payload)); } void Transaction::BreakOnShutdown() { diff --git a/src/server/transaction.h b/src/server/transaction.h index 7a6bb5f76a61..660107a77857 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -194,7 +194,8 @@ class Transaction { } // Log a journal entry on shard with payload. - void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload) const; + void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, + uint32_t shard_cnt) const; private: struct LockCnt {