From 0ce3267864983aee0506488663be25f291887fdf Mon Sep 17 00:00:00 2001 From: chejinge Date: Thu, 9 May 2024 14:48:13 +0800 Subject: [PATCH 1/2] fix:slave parse tnx binlog failed --- include/pika_transaction.h | 2 +- src/pika_transaction.cc | 41 +++++++++++++---------- tests/integration/replication_test.go | 48 +++++++++++++++++++++++++-- 3 files changed, 69 insertions(+), 22 deletions(-) diff --git a/include/pika_transaction.h b/include/pika_transaction.h index 9cdd73dbf0..637f64171e 100644 --- a/include/pika_transaction.h +++ b/include/pika_transaction.h @@ -34,7 +34,7 @@ class ExecCmd : public Cmd { void Split(const HintKeys& hint_keys) override {} void Merge() override {} std::vector current_key() const override { return {}; } - + void Execute() override; private: struct CmdInfo { public: diff --git a/src/pika_transaction.cc b/src/pika_transaction.cc index 1db5b10548..e29ff51be2 100644 --- a/src/pika_transaction.cc +++ b/src/pika_transaction.cc @@ -8,10 +8,10 @@ #include "include/pika_transaction.h" #include "include/pika_admin.h" #include "include/pika_client_conn.h" +#include "include/pika_define.h" #include "include/pika_list.h" #include "include/pika_rm.h" #include "include/pika_server.h" -#include "include/pika_transaction.h" #include "src/pstd/include/scope_record_lock.h" extern std::unique_ptr g_pika_server; @@ -42,22 +42,6 @@ void MultiCmd::DoInitial() { void ExecCmd::Do() { auto conn = GetConn(); auto client_conn = std::dynamic_pointer_cast(conn); - if (client_conn == nullptr) { - res_.SetRes(CmdRes::kErrOther, name()); - return; - } - if (!client_conn->IsInTxn()) { - res_.SetRes(CmdRes::kErrOther, "EXEC without MULTI"); - return; - } - if (IsTxnFailedAndSetState()) { - client_conn->ExitTxn(); - return; - } - SetCmdsVec(); - Lock(); - conn = GetConn(); - client_conn = std::dynamic_pointer_cast(conn); std::vector res_vec = {}; std::vector> resp_strs; for (size_t i = 0; i < cmds_.size(); ++i) { @@ -96,9 +80,30 @@ void ExecCmd::Do() { }); res_.AppendArrayLen(res_vec.size()); - for (auto &r : res_vec) { + for (auto& r : res_vec) { res_.AppendStringRaw(r.message()); } +} + +void ExecCmd::Execute() { + auto conn = GetConn(); + auto client_conn = std::dynamic_pointer_cast(conn); + if (client_conn == nullptr) { + res_.SetRes(CmdRes::kErrOther, name()); + return; + } + if (!client_conn->IsInTxn()) { + res_.SetRes(CmdRes::kErrOther, "EXEC without MULTI"); + return; + } + if (IsTxnFailedAndSetState()) { + client_conn->ExitTxn(); + return; + } + SetCmdsVec(); + Lock(); + Do(); + Unlock(); ServeToBLrPopWithKeys(); list_cmd_.clear(); diff --git a/tests/integration/replication_test.go b/tests/integration/replication_test.go index bf2f300672..d552e52da6 100644 --- a/tests/integration/replication_test.go +++ b/tests/integration/replication_test.go @@ -620,9 +620,9 @@ var _ = Describe("should replication ", func() { for i := int64(0); i < clientMaster.LLen(ctx, "list0").Val(); i++ { Expect(clientMaster.LIndex(ctx, "list0", i)).To(Equal(clientSlave.LIndex(ctx, "list0", i))) } -// for i := int64(0); i < clientMaster.LLen(ctx, "list1").Val(); i++ { -// Expect(clientMaster.LIndex(ctx, "list1", i)).To(Equal(clientSlave.LIndex(ctx, "list1", i))) -// } + // for i := int64(0); i < clientMaster.LLen(ctx, "list1").Val(); i++ { + // Expect(clientMaster.LIndex(ctx, "list1", i)).To(Equal(clientSlave.LIndex(ctx, "list1", i))) + // } } err = clientMaster.Del(ctx, lists...) @@ -738,6 +738,48 @@ var _ = Describe("should replication ", func() { Expect(r.Err()).To(MatchError("ERR EXEC without MULTI")) err = clientMaster.Del(ctx, "txkey1") + + r1 := clientMaster.Do(ctx, "MULTI") + Expect(r1.Err()).NotTo(HaveOccurred()) + + setkey1 := clientMaster.Set(ctx, "key1", "value1", 0) + Expect(setkey1.Err()).NotTo(HaveOccurred()) + Expect(setkey1.Val()).To(Equal("QUEUED")) + + setkey2 := clientMaster.Set(ctx, "key2", "value2", 0) + Expect(setkey2.Err()).NotTo(HaveOccurred()) + Expect(get.Val()).To(Equal("QUEUED")) + + r2 := clientMaster.Do(ctx, "EXEC") + Expect(r2.Err()).NotTo(HaveOccurred()) + Expect(r2.Val()).To(Equal([]interface{}{"OK", "OK"})) + + time.Sleep(100 * time.Millisecond) + + getkey1 := clientSlave.Get(ctx, "key1") + Expect(getkey1.Err()).NotTo(HaveOccurred()) + Expect(getkey1.Val()).To(Equal("value1")) + + getkey2 := clientSlave.Get(ctx, "key2") + Expect(getkey2.Err()).NotTo(HaveOccurred()) + Expect(getkey2.Val()).To(Equal("value2")) + + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + loopCount := 0 + + for loopCount < 10 { + select { + case <-ticker.C: + infoResExec := clientSlave.Info(ctx, "replication") + Expect(infoResExec.Err()).NotTo(HaveOccurred()) + Expect(infoResExec.Val()).To(ContainSubstring("master_link_status:up")) + loopCount++ + if loopCount >= 10 { + ticker.Stop() + } + } + } log.Println("master-slave replication test success") }) From 59207b04839366c2da54867aa3bfb0cf5c8b6389 Mon Sep 17 00:00:00 2001 From: chejinge Date: Thu, 9 May 2024 14:52:18 +0800 Subject: [PATCH 2/2] fix:slave parse tnx binlog failed --- tests/integration/replication_test.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/integration/replication_test.go b/tests/integration/replication_test.go index d552e52da6..9fe2d6a422 100644 --- a/tests/integration/replication_test.go +++ b/tests/integration/replication_test.go @@ -739,14 +739,15 @@ var _ = Describe("should replication ", func() { err = clientMaster.Del(ctx, "txkey1") + //The test below is related with issue: https://github.com/OpenAtomFoundation/pika/issues/2643 r1 := clientMaster.Do(ctx, "MULTI") Expect(r1.Err()).NotTo(HaveOccurred()) - setkey1 := clientMaster.Set(ctx, "key1", "value1", 0) + setkey1 := clientMaster.Set(ctx, "Tnxkey1", "Tnxvalue1", 0) Expect(setkey1.Err()).NotTo(HaveOccurred()) Expect(setkey1.Val()).To(Equal("QUEUED")) - setkey2 := clientMaster.Set(ctx, "key2", "value2", 0) + setkey2 := clientMaster.Set(ctx, "Tnxkey2", "Tnxvalue2", 0) Expect(setkey2.Err()).NotTo(HaveOccurred()) Expect(get.Val()).To(Equal("QUEUED")) @@ -754,15 +755,15 @@ var _ = Describe("should replication ", func() { Expect(r2.Err()).NotTo(HaveOccurred()) Expect(r2.Val()).To(Equal([]interface{}{"OK", "OK"})) - time.Sleep(100 * time.Millisecond) + time.Sleep(3 * time.Second) - getkey1 := clientSlave.Get(ctx, "key1") + getkey1 := clientSlave.Get(ctx, "Tnxkey1") Expect(getkey1.Err()).NotTo(HaveOccurred()) - Expect(getkey1.Val()).To(Equal("value1")) + Expect(getkey1.Val()).To(Equal("Tnxvalue1")) - getkey2 := clientSlave.Get(ctx, "key2") + getkey2 := clientSlave.Get(ctx, "Tnxkey2") Expect(getkey2.Err()).NotTo(HaveOccurred()) - Expect(getkey2.Val()).To(Equal("value2")) + Expect(getkey2.Val()).To(Equal("Tnxvalue2")) ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop()