From 6b5fcf7ec51505a9fbd9a2633cc342689208a8a9 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Tue, 2 Jul 2019 13:35:36 +0800 Subject: [PATCH] executor: fix point get snapshot TS for pessimistic transaction. (#11012) (#11015) --- executor/point_get.go | 6 +++++- session/pessimistic_test.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/executor/point_get.go b/executor/point_get.go index 80730ca2c48c4..955d68ede1032 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -82,8 +82,12 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { return nil } e.done = true + snapshotTS := e.startTS + if e.lock { + snapshotTS = e.ctx.GetSessionVars().TxnCtx.GetForUpdateTS() + } var err error - e.snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.startTS}) + e.snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: snapshotTS}) if err != nil { return err } diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 1fb6332b1c8b6..b8110e4f5d488 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -322,3 +322,32 @@ func (s *testPessimisticSuite) TestPointGetKeyLock(c *C) { tk.MustExec("commit") syncCh <- struct{}{} } + +func (s *testPessimisticSuite) TestBankTransfer(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists accounts") + tk.MustExec("create table accounts (id int primary key, c int)") + tk.MustExec("insert accounts values (1, 100), (2, 100), (3, 100)") + syncCh := make(chan struct{}) + + tk.MustExec("begin pessimistic") + tk.MustQuery("select * from accounts where id = 1 for update").Check(testkit.Rows("1 100")) + go func() { + tk2.MustExec("begin pessimistic") + tk2.MustExec("select * from accounts where id = 2 for update") + <-syncCh + tk2.MustExec("select * from accounts where id = 3 for update") + tk2.MustExec("update accounts set c = 50 where id = 2") + tk2.MustExec("update accounts set c = 150 where id = 3") + tk2.MustExec("commit") + <-syncCh + }() + syncCh <- struct{}{} + tk.MustQuery("select * from accounts where id = 2 for update").Check(testkit.Rows("2 50")) + tk.MustExec("update accounts set c = 50 where id = 1") + tk.MustExec("update accounts set c = 100 where id = 2") + tk.MustExec("commit") + syncCh <- struct{}{} + tk.MustQuery("select sum(c) from accounts").Check(testkit.Rows("300")) +}