diff --git a/include/pika_transaction.h b/include/pika_transaction.h index 9cdd73dbf..637f64171 100644 --- a/include/pika_transaction.h +++ b/include/pika_transaction.h @@ -34,7 +34,7 @@ class ExecCmd : public Cmd { void Split(const HintKeys& hint_keys) override {} void Merge() override {} std::vector current_key() const override { return {}; } - + void Execute() override; private: struct CmdInfo { public: diff --git a/src/pika_transaction.cc b/src/pika_transaction.cc index 1db5b1054..e29ff51be 100644 --- a/src/pika_transaction.cc +++ b/src/pika_transaction.cc @@ -8,10 +8,10 @@ #include "include/pika_transaction.h" #include "include/pika_admin.h" #include "include/pika_client_conn.h" +#include "include/pika_define.h" #include "include/pika_list.h" #include "include/pika_rm.h" #include "include/pika_server.h" -#include "include/pika_transaction.h" #include "src/pstd/include/scope_record_lock.h" extern std::unique_ptr g_pika_server; @@ -42,22 +42,6 @@ void MultiCmd::DoInitial() { void ExecCmd::Do() { auto conn = GetConn(); auto client_conn = std::dynamic_pointer_cast(conn); - if (client_conn == nullptr) { - res_.SetRes(CmdRes::kErrOther, name()); - return; - } - if (!client_conn->IsInTxn()) { - res_.SetRes(CmdRes::kErrOther, "EXEC without MULTI"); - return; - } - if (IsTxnFailedAndSetState()) { - client_conn->ExitTxn(); - return; - } - SetCmdsVec(); - Lock(); - conn = GetConn(); - client_conn = std::dynamic_pointer_cast(conn); std::vector res_vec = {}; std::vector> resp_strs; for (size_t i = 0; i < cmds_.size(); ++i) { @@ -96,9 +80,30 @@ void ExecCmd::Do() { }); res_.AppendArrayLen(res_vec.size()); - for (auto &r : res_vec) { + for (auto& r : res_vec) { res_.AppendStringRaw(r.message()); } +} + +void ExecCmd::Execute() { + auto conn = GetConn(); + auto client_conn = std::dynamic_pointer_cast(conn); + if (client_conn == nullptr) { + res_.SetRes(CmdRes::kErrOther, name()); + return; + } + if (!client_conn->IsInTxn()) { + res_.SetRes(CmdRes::kErrOther, "EXEC without MULTI"); + return; + } + if (IsTxnFailedAndSetState()) { + client_conn->ExitTxn(); + return; + } + SetCmdsVec(); + Lock(); + Do(); + Unlock(); ServeToBLrPopWithKeys(); list_cmd_.clear(); diff --git a/tests/integration/replication_test.go b/tests/integration/replication_test.go index bf2f30067..9fe2d6a42 100644 --- a/tests/integration/replication_test.go +++ b/tests/integration/replication_test.go @@ -620,9 +620,9 @@ var _ = Describe("should replication ", func() { for i := int64(0); i < clientMaster.LLen(ctx, "list0").Val(); i++ { Expect(clientMaster.LIndex(ctx, "list0", i)).To(Equal(clientSlave.LIndex(ctx, "list0", i))) } -// for i := int64(0); i < clientMaster.LLen(ctx, "list1").Val(); i++ { -// Expect(clientMaster.LIndex(ctx, "list1", i)).To(Equal(clientSlave.LIndex(ctx, "list1", i))) -// } + // for i := int64(0); i < clientMaster.LLen(ctx, "list1").Val(); i++ { + // Expect(clientMaster.LIndex(ctx, "list1", i)).To(Equal(clientSlave.LIndex(ctx, "list1", i))) + // } } err = clientMaster.Del(ctx, lists...) @@ -738,6 +738,49 @@ var _ = Describe("should replication ", func() { Expect(r.Err()).To(MatchError("ERR EXEC without MULTI")) err = clientMaster.Del(ctx, "txkey1") + + //The test below is related with issue: https://github.com/OpenAtomFoundation/pika/issues/2643 + r1 := clientMaster.Do(ctx, "MULTI") + Expect(r1.Err()).NotTo(HaveOccurred()) + + setkey1 := clientMaster.Set(ctx, "Tnxkey1", "Tnxvalue1", 0) + Expect(setkey1.Err()).NotTo(HaveOccurred()) + Expect(setkey1.Val()).To(Equal("QUEUED")) + + setkey2 := clientMaster.Set(ctx, "Tnxkey2", "Tnxvalue2", 0) + Expect(setkey2.Err()).NotTo(HaveOccurred()) + Expect(get.Val()).To(Equal("QUEUED")) + + r2 := clientMaster.Do(ctx, "EXEC") + Expect(r2.Err()).NotTo(HaveOccurred()) + Expect(r2.Val()).To(Equal([]interface{}{"OK", "OK"})) + + time.Sleep(3 * time.Second) + + getkey1 := clientSlave.Get(ctx, "Tnxkey1") + Expect(getkey1.Err()).NotTo(HaveOccurred()) + Expect(getkey1.Val()).To(Equal("Tnxvalue1")) + + getkey2 := clientSlave.Get(ctx, "Tnxkey2") + Expect(getkey2.Err()).NotTo(HaveOccurred()) + Expect(getkey2.Val()).To(Equal("Tnxvalue2")) + + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + loopCount := 0 + + for loopCount < 10 { + select { + case <-ticker.C: + infoResExec := clientSlave.Info(ctx, "replication") + Expect(infoResExec.Err()).NotTo(HaveOccurred()) + Expect(infoResExec.Val()).To(ContainSubstring("master_link_status:up")) + loopCount++ + if loopCount >= 10 { + ticker.Stop() + } + } + } log.Println("master-slave replication test success") })