From 11476fc5207c54da5be1c36300174bd37b596136 Mon Sep 17 00:00:00 2001 From: adi_holden Date: Wed, 1 Feb 2023 13:46:43 +0200 Subject: [PATCH 1/3] feat(list family): support blocking command for replication Signed-off-by: adi_holden --- src/server/list_family.cc | 31 ++++++++++++++++++++--------- tests/dragonfly/replication_test.py | 8 ++++++-- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 6548b02a4894..5332b7d543b6 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -191,7 +191,7 @@ class BPopper { } private: - OpStatus Pop(Transaction* t, EngineShard* shard); + void Pop(Transaction* t, EngineShard* shard); ListDir dir_; @@ -267,13 +267,21 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) { VLOG(1) << "Popping an element " << t->DebugId(); ff_result_ = move(result.value()); - auto cb = [this](Transaction* t, EngineShard* shard) { return Pop(t, shard); }; + auto cb = [this](Transaction* t, EngineShard* shard) { + Pop(t, shard); + OpArgs op_args = t->GetOpArgs(shard); + if (op_args.shard->journal()) { + string command = dir_ == ListDir::LEFT ? "LPOP" : "RPOP"; + RecordJournal(op_args, command, ArgSlice{key_}, 1); + } + return OpStatus::OK; + }; t->Execute(std::move(cb), true); return OpStatus::OK; } -OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) { +void BPopper::Pop(Transaction* t, EngineShard* shard) { if (shard->shard_id() == ff_result_.sid) { ff_result_.key.GetString(&key_); auto& db_slice = shard->db_slice(); @@ -289,8 +297,6 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) { CHECK(shard->db_slice().Del(t->GetDbIndex(), it)); } } - - return OpStatus::OK; } OpResult OpMoveSingleShard(const OpArgs& op_args, string_view src, string_view dest, @@ -882,8 +888,13 @@ OpResult BPopPusher::RunSingle(Transaction* t, time_point tp) { OpResult op_res; bool is_multi = t->IsMulti(); auto cb_move = [&](Transaction* t, EngineShard* shard) { - op_res = OpMoveSingleShard(t->GetOpArgs(shard), pop_key_, push_key_, popdir_, pushdir_); - t->RenableAutoJournal(); // With single shard run auto journal flow. + OpArgs op_args = t->GetOpArgs(shard); + op_res = OpMoveSingleShard(op_args, pop_key_, push_key_, popdir_, pushdir_); + if (op_res) { + if (op_args.shard->journal()) { + RecordJournal(op_args, "RPOPLPUSH", ArgSlice{pop_key_, push_key_}, 1); + } + } return OpStatus::OK; }; t->Execute(cb_move, false); @@ -1341,8 +1352,10 @@ void ListFamily::Register(CommandRegistry* registry) { .SetHandler(RPopLPush) << CI{"BRPOPLPUSH", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING | CO::NO_AUTOJOURNAL, 4, 1, 2, 1} .SetHandler(BRPopLPush) - << CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BLPop) - << CI{"BRPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BRPop) + << CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING | CO::NO_AUTOJOURNAL, -3, 1, -2, 1} + .HFUNC(BLPop) + << CI{"BRPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING | CO::NO_AUTOJOURNAL, -3, 1, -2, 1} + .HFUNC(BRPop) << CI{"LLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(LLen) << CI{"LPOS", CO::READONLY | CO::FAST, -3, 1, 1, 1}.HFUNC(LPos) << CI{"LINDEX", CO::READONLY, 3, 1, 1, 1}.HFUNC(LIndex) diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index f9dc57847634..1a742faf103d 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -513,8 +513,12 @@ async def check_expire(key): # Check there is no rewrite for RPOPLPUSH on single shard await check("RPOPLPUSH list list", r"RPOPLPUSH list list") - # Check there is no rewrite for BRPOPLPUSH on single shard - await check("BRPOPLPUSH list list 0", r"BRPOPLPUSH list list 0") + # Check BRPOPLPUSH on single shard turns into RPOPLPUSH + await check("BRPOPLPUSH list list 0", r"RPOPLPUSH list list") + # Check BLPOP turns into LPOP + await check("BLPOP list 0", r"LPOP list") + # Check BRPOP turns into RPOP + await check("BRPOP list 0", r"RPOP list") await c_master.lpush("list1s", "v1", "v2", "v3", "v4") From 6194d0bf28b805311db85e85175e63bd64caaacb Mon Sep 17 00:00:00 2001 From: adi_holden Date: Wed, 1 Feb 2023 14:02:38 +0200 Subject: [PATCH 2/3] feat(transaction): remove was_suspended which is always false Signed-off-by: adi_holden --- src/server/transaction.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 88fe86f6b6a4..223a9ea17063 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -320,7 +320,6 @@ bool Transaction::RunInShard(EngineShard* shard) { DCHECK(sd.local_mask & ARMED); sd.local_mask &= ~ARMED; - bool was_suspended = sd.local_mask & SUSPENDED_Q; bool awaked_prerun = (sd.local_mask & AWAKED_Q) != 0; bool incremental_lock = multi_ && multi_->is_expanding; @@ -373,7 +372,7 @@ bool Transaction::RunInShard(EngineShard* shard) { /*************************************************************************/ - if (!was_suspended && is_concluding) // Check last hop & non suspended. + if (is_concluding) // Check last hop LogAutoJournalOnShard(shard); // at least the coordinator thread owns the reference. @@ -400,7 +399,7 @@ bool Transaction::RunInShard(EngineShard* shard) { // If a transaction has been suspended, we keep the lock so that future transaction // touching those keys will be ordered via TxQueue. It's necessary because we preserve // the atomicity of awaked transactions by halting the TxQueue. - if (was_suspended || !become_suspended) { + if (!become_suspended) { shard->db_slice().Release(mode, largs); sd.local_mask &= ~KEYLOCK_ACQUIRED; } From 59a96182d8b10506765979a5beaef179af6c26d1 Mon Sep 17 00:00:00 2001 From: adi_holden Date: Wed, 1 Feb 2023 14:32:54 +0200 Subject: [PATCH 3/3] Revert "feat(transaction): remove was_suspended which is always false" This reverts commit 6194d0bf28b805311db85e85175e63bd64caaacb. --- src/server/transaction.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 223a9ea17063..88fe86f6b6a4 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -320,6 +320,7 @@ bool Transaction::RunInShard(EngineShard* shard) { DCHECK(sd.local_mask & ARMED); sd.local_mask &= ~ARMED; + bool was_suspended = sd.local_mask & SUSPENDED_Q; bool awaked_prerun = (sd.local_mask & AWAKED_Q) != 0; bool incremental_lock = multi_ && multi_->is_expanding; @@ -372,7 +373,7 @@ bool Transaction::RunInShard(EngineShard* shard) { /*************************************************************************/ - if (is_concluding) // Check last hop + if (!was_suspended && is_concluding) // Check last hop & non suspended. LogAutoJournalOnShard(shard); // at least the coordinator thread owns the reference. @@ -399,7 +400,7 @@ bool Transaction::RunInShard(EngineShard* shard) { // If a transaction has been suspended, we keep the lock so that future transaction // touching those keys will be ordered via TxQueue. It's necessary because we preserve // the atomicity of awaked transactions by halting the TxQueue. - if (!become_suspended) { + if (was_suspended || !become_suspended) { shard->db_slice().Release(mode, largs); sd.local_mask &= ~KEYLOCK_ACQUIRED; }