Skip to content

Commit

Permalink
fix:slave parse tnx binlog failed (#2642)
Browse files Browse the repository at this point in the history
* fix:slave parse tnx binlog failed

* fix:slave parse tnx binlog failed

---------

Co-authored-by: chejinge <chejinge@360.cn>
  • Loading branch information
chejinge and brother-jin committed May 9, 2024
1 parent 4fde858 commit 4653d98
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 22 deletions.
2 changes: 1 addition & 1 deletion include/pika_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ExecCmd : public Cmd {
void Split(const HintKeys& hint_keys) override {}
void Merge() override {}
std::vector<std::string> current_key() const override { return {}; }

void Execute() override;
private:
struct CmdInfo {
public:
Expand Down
40 changes: 22 additions & 18 deletions src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include "include/pika_list.h"
#include "include/pika_rm.h"
#include "include/pika_server.h"
#include "include/pika_transaction.h"
#include "src/pstd/include/scope_record_lock.h"

extern std::unique_ptr<PikaServer> g_pika_server;
Expand Down Expand Up @@ -43,22 +42,6 @@ void MultiCmd::DoInitial() {
void ExecCmd::Do() {
auto conn = GetConn();
auto client_conn = std::dynamic_pointer_cast<PikaClientConn>(conn);
if (client_conn == nullptr) {
res_.SetRes(CmdRes::kErrOther, name());
return;
}
if (!client_conn->IsInTxn()) {
res_.SetRes(CmdRes::kErrOther, "EXEC without MULTI");
return;
}
if (IsTxnFailedAndSetState()) {
client_conn->ExitTxn();
return;
}
SetCmdsVec();
Lock();
conn = GetConn();
client_conn = std::dynamic_pointer_cast<PikaClientConn>(conn);
std::vector<CmdRes> res_vec = {};
std::vector<std::shared_ptr<std::string>> resp_strs;
for (size_t i = 0; i < cmds_.size(); ++i) {
Expand Down Expand Up @@ -97,9 +80,30 @@ void ExecCmd::Do() {
});

res_.AppendArrayLen(res_vec.size());
for (auto &r : res_vec) {
for (auto& r : res_vec) {
res_.AppendStringRaw(r.message());
}
}

void ExecCmd::Execute() {
auto conn = GetConn();
auto client_conn = std::dynamic_pointer_cast<PikaClientConn>(conn);
if (client_conn == nullptr) {
res_.SetRes(CmdRes::kErrOther, name());
return;
}
if (!client_conn->IsInTxn()) {
res_.SetRes(CmdRes::kErrOther, "EXEC without MULTI");
return;
}
if (IsTxnFailedAndSetState()) {
client_conn->ExitTxn();
return;
}
SetCmdsVec();
Lock();
Do();

Unlock();
ServeToBLrPopWithKeys();
list_cmd_.clear();
Expand Down
49 changes: 46 additions & 3 deletions tests/integration/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,9 +623,9 @@ var _ = Describe("should replication ", func() {
for i := int64(0); i < clientMaster.LLen(ctx, "list0").Val(); i++ {
Expect(clientMaster.LIndex(ctx, "list0", i)).To(Equal(clientSlave.LIndex(ctx, "list0", i)))
}
// for i := int64(0); i < clientMaster.LLen(ctx, "list1").Val(); i++ {
// Expect(clientMaster.LIndex(ctx, "list1", i)).To(Equal(clientSlave.LIndex(ctx, "list1", i)))
// }
// for i := int64(0); i < clientMaster.LLen(ctx, "list1").Val(); i++ {
// Expect(clientMaster.LIndex(ctx, "list1", i)).To(Equal(clientSlave.LIndex(ctx, "list1", i)))
// }
}
err = clientMaster.Del(ctx, lists...)

Expand All @@ -648,6 +648,49 @@ var _ = Describe("should replication ", func() {
Expect(clientMaster.LIndex(ctx, "blist0", i)).To(Equal(clientSlave.LIndex(ctx, "blist0", i)))
}
err = clientMaster.Del(ctx, lists...)

//The test below is related with issue: https://github.com/OpenAtomFoundation/pika/issues/2643
r1 := clientMaster.Do(ctx, "MULTI")
Expect(r1.Err()).NotTo(HaveOccurred())

setkey1 := clientMaster.Set(ctx, "Tnxkey1", "Tnxvalue1", 0)
Expect(setkey1.Err()).NotTo(HaveOccurred())
Expect(setkey1.Val()).To(Equal("QUEUED"))

setkey2 := clientMaster.Set(ctx, "Tnxkey2", "Tnxvalue2", 0)
Expect(setkey2.Err()).NotTo(HaveOccurred())
Expect(setkey2.Val()).To(Equal("QUEUED"))

r2 := clientMaster.Do(ctx, "EXEC")
Expect(r2.Err()).NotTo(HaveOccurred())
Expect(r2.Val()).To(Equal([]interface{}{"OK", "OK"}))

time.Sleep(3 * time.Second)

getkey1 := clientSlave.Get(ctx, "Tnxkey1")
Expect(getkey1.Err()).NotTo(HaveOccurred())
Expect(getkey1.Val()).To(Equal("Tnxvalue1"))

getkey2 := clientSlave.Get(ctx, "Tnxkey2")
Expect(getkey2.Err()).NotTo(HaveOccurred())
Expect(getkey2.Val()).To(Equal("Tnxvalue2"))

ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
loopCount := 0

for loopCount < 10 {
select {
case <-ticker.C:
infoResExec := clientSlave.Info(ctx, "replication")
Expect(infoResExec.Err()).NotTo(HaveOccurred())
Expect(infoResExec.Val()).To(ContainSubstring("master_link_status:up"))
loopCount++
if loopCount >= 10 {
ticker.Stop()
}
}
}
log.Println("master-slave replication test success")
})

Expand Down

0 comments on commit 4653d98

Please sign in to comment.