From 7fe81b7490fbc98026de07d179ab3e9ebd38fab4 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 30 Oct 2019 17:23:23 +0800 Subject: [PATCH] store: when commit get a `CommitTsExpired` error, retry with a new commitTS (#12980) --- go.mod | 2 +- go.sum | 4 +- store/mockstore/mocktikv/errors.go | 15 +++- store/mockstore/mocktikv/mock_tikv_test.go | 13 ++++ store/mockstore/mocktikv/mvcc_leveldb.go | 10 +++ store/mockstore/mocktikv/rpc.go | 5 ++ store/tikv/2pc.go | 20 ++++++ store/tikv/2pc_test.go | 84 +++++++++++++++++----- 8 files changed, 132 insertions(+), 21 deletions(-) diff --git a/go.mod b/go.mod index 81a94d322b824..7b839a5ae6bbd 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/pingcap/errors v0.11.4 github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1 + github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1 github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 github.com/pingcap/parser v0.0.0-20191025082927-f8adf1670b97 github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 diff --git a/go.sum b/go.sum index fa6f7b7e08946..b2747797be1f0 100644 --- a/go.sum +++ b/go.sum @@ -163,8 +163,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1 h1:DNvxkdcjA0TBIIIF+K2w9KMlTzMZzLZ5JVF26kTCPhg= -github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1 h1:qqGSXCFr9Uc5VIDBEt4zlmMcI8e4GlkWfDVzQ+dexRk= +github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/store/mockstore/mocktikv/errors.go b/store/mockstore/mocktikv/errors.go index 32dce279ee52d..99bdca22a3121 100644 --- a/store/mockstore/mocktikv/errors.go +++ b/store/mockstore/mocktikv/errors.go @@ -13,7 +13,11 @@ package mocktikv -import "fmt" +import ( + "fmt" + + "github.com/pingcap/kvproto/pkg/kvrpcpb" +) // ErrLocked is returned when trying to Read/Write on a locked key. Client should // backoff or cleanup the lock then retry. @@ -83,3 +87,12 @@ type ErrDeadlock struct { func (e *ErrDeadlock) Error() string { return "deadlock" } + +// ErrCommitTSExpired is returned when commit.CommitTS < lock.MinCommitTS +type ErrCommitTSExpired struct { + kvrpcpb.CommitTsExpired +} + +func (e *ErrCommitTSExpired) Error() string { + return "commit ts expired" +} diff --git a/store/mockstore/mocktikv/mock_tikv_test.go b/store/mockstore/mocktikv/mock_tikv_test.go index 9a354365c04f7..1e56befb67606 100644 --- a/store/mockstore/mocktikv/mock_tikv_test.go +++ b/store/mockstore/mocktikv/mock_tikv_test.go @@ -18,6 +18,7 @@ import ( "testing" . "github.com/pingcap/check" + "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" ) @@ -166,6 +167,7 @@ func (s *testMockTiKVSuite) mustPrewriteWithTTLOK(c *C, mutations []*kvrpcpb.Mut PrimaryLock: []byte(primary), StartVersion: startTS, LockTtl: ttl, + MinCommitTs: startTS + 1, } errs := s.store.Prewrite(req) for _, err := range errs { @@ -682,6 +684,17 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) { c.Assert(commitTS, Equals, uint64(0)) } +func (s *testMVCCLevelDB) TestRejectCommitTS(c *C) { + s.mustPrewriteOK(c, putMutations("x", "A"), "x", 5) + // Push the minCommitTS + _, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100) + c.Assert(err, IsNil) + err = s.store.Commit([][]byte{[]byte("x")}, 5, 10) + e, ok := errors.Cause(err).(*ErrCommitTSExpired) + c.Assert(ok, IsTrue) + c.Assert(e.MinCommitTs, Equals, uint64(101)) +} + func (s *testMVCCLevelDB) TestMvccGetByKey(c *C) { s.mustPrewriteOK(c, putMutations("q1", "v5"), "p1", 5) debugger, ok := s.store.(MVCCDebugger) diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 5365a74ba0fdc..b83622347e891 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -798,6 +798,16 @@ func commitKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS, commit } return ErrRetryable("txn not found") } + // Reject the commit request whose commitTS is less than minCommiTS. + if dec.lock.minCommitTS > commitTS { + return &ErrCommitTSExpired{ + kvrpcpb.CommitTsExpired{ + StartTs: startTS, + AttemptedCommitTs: commitTS, + Key: key, + MinCommitTs: dec.lock.minCommitTS, + }} + } if err = commitLock(batch, dec.lock, key, startTS, commitTS); err != nil { return errors.Trace(err) diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index 3dc2b10edcb90..4d860b9db722e 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -92,6 +92,11 @@ func convertToKeyError(err error) *kvrpcpb.KeyError { Retryable: retryable.Error(), } } + if expired, ok := errors.Cause(err).(*ErrCommitTSExpired); ok { + return &kvrpcpb.KeyError{ + CommitTsExpired: &expired.CommitTsExpired, + } + } return &kvrpcpb.KeyError{ Abort: err.Error(), } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index d704e6fe6000f..dc8ccba1171d5 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -859,6 +859,26 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch c.setUndeterminedErr(nil) } if keyErr := commitResp.GetError(); keyErr != nil { + if rejected := keyErr.GetCommitTsExpired(); rejected != nil { + logutil.Logger(bo.ctx).Info("2PC commitTS rejected by TiKV, retry with a newer commitTS", + zap.Uint64("txnStartTS", c.startTS), + zap.Stringer("info", logutil.Hex(rejected))) + + // Update commit ts and retry. + commitTS, err := c.store.getTimestampWithRetry(bo) + if err != nil { + logutil.Logger(bo.ctx).Warn("2PC get commitTS failed", + zap.Error(err), + zap.Uint64("txnStartTS", c.startTS)) + return errors.Trace(err) + } + + c.mu.Lock() + c.commitTS = commitTS + c.mu.Unlock() + return c.commitKeys(bo, batch.keys) + } + c.mu.RLock() defer c.mu.RUnlock() err = extractKeyErr(keyErr) diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index ebfcf966b1f3e..cb47a8bf057c8 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -14,6 +14,7 @@ package tikv import ( + "bytes" "context" "math" "math/rand" @@ -340,12 +341,12 @@ func (s *testCommitterSuite) TestCommitBeforePrewrite(c *C) { txn := s.begin(c) err := txn.Set([]byte("a"), []byte("a1")) c.Assert(err, IsNil) - commiter, err := newTwoPhaseCommitterWithInit(txn, 0) + committer, err := newTwoPhaseCommitterWithInit(txn, 0) c.Assert(err, IsNil) ctx := context.Background() - err = commiter.cleanupKeys(NewBackoffer(ctx, cleanupMaxBackoff), commiter.keys) + err = committer.cleanupKeys(NewBackoffer(ctx, cleanupMaxBackoff), committer.keys) c.Assert(err, IsNil) - err = commiter.prewriteKeys(NewBackoffer(ctx, PrewriteMaxBackoff), commiter.keys) + err = committer.prewriteKeys(NewBackoffer(ctx, PrewriteMaxBackoff), committer.keys) c.Assert(err, NotNil) errMsgMustContain(c, err, "conflictCommitTS") } @@ -385,9 +386,9 @@ func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) { // clean again, shouldn't be failed when a rollback already exist. ctx := context.Background() - commiter, err := newTwoPhaseCommitterWithInit(txn2, 0) + committer, err := newTwoPhaseCommitterWithInit(txn2, 0) c.Assert(err, IsNil) - err = commiter.cleanupKeys(NewBackoffer(ctx, cleanupMaxBackoff), commiter.keys) + err = committer.cleanupKeys(NewBackoffer(ctx, cleanupMaxBackoff), committer.keys) c.Assert(err, IsNil) // check the data after rollback twice. @@ -420,17 +421,17 @@ func (s *testCommitterSuite) TestWrittenKeysOnConflict(c *C) { txn1 := s.begin(c) txn2 := s.begin(c) txn2.Set([]byte("x1"), []byte("1")) - commiter2, err := newTwoPhaseCommitterWithInit(txn2, 2) + committer2, err := newTwoPhaseCommitterWithInit(txn2, 2) c.Assert(err, IsNil) - err = commiter2.execute(context.Background()) + err = committer2.execute(context.Background()) c.Assert(err, IsNil) txn1.Set([]byte("x1"), []byte("1")) txn1.Set([]byte("y1"), []byte("2")) - commiter1, err := newTwoPhaseCommitterWithInit(txn1, 2) + committer1, err := newTwoPhaseCommitterWithInit(txn1, 2) c.Assert(err, IsNil) - err = commiter1.execute(context.Background()) + err = committer1.execute(context.Background()) c.Assert(err, NotNil) - commiter1.cleanWg.Wait() + committer1.cleanWg.Wait() txn3 := s.begin(c) start := time.Now() txn3.Get(context.TODO(), []byte("y1")) @@ -454,11 +455,11 @@ func (s *testCommitterSuite) TestPrewriteTxnSize(c *C) { c.Assert(err, IsNil) } - commiter, err := newTwoPhaseCommitterWithInit(txn, 1) + committer, err := newTwoPhaseCommitterWithInit(txn, 1) c.Assert(err, IsNil) ctx := context.Background() - err = commiter.prewriteKeys(NewBackoffer(ctx, PrewriteMaxBackoff), commiter.keys) + err = committer.prewriteKeys(NewBackoffer(ctx, PrewriteMaxBackoff), committer.keys) c.Assert(err, IsNil) // Check the written locks in the first region (50 keys) @@ -474,19 +475,68 @@ func (s *testCommitterSuite) TestPrewriteTxnSize(c *C) { } } +func (s *testCommitterSuite) TestRejectCommitTS(c *C) { + txn := s.begin(c) + c.Assert(txn.Set([]byte("x"), []byte("v")), IsNil) + + committer, err := newTwoPhaseCommitterWithInit(txn, 1) + c.Assert(err, IsNil) + bo := NewBackoffer(context.Background(), getMaxBackoff) + loc, err := s.store.regionCache.LocateKey(bo, []byte("x")) + c.Assert(err, IsNil) + batch := batchKeys{region: loc.Region, keys: [][]byte{[]byte("x")}} + mutations := make([]*kvrpcpb.Mutation, len(batch.keys)) + for i, k := range batch.keys { + tmp := committer.mutations[string(k)] + mutations[i] = &tmp.Mutation + } + prewrite := &kvrpcpb.PrewriteRequest{ + Mutations: mutations, + PrimaryLock: committer.primary(), + StartVersion: committer.startTS, + LockTtl: committer.lockTTL, + MinCommitTs: committer.startTS + 100, // Set minCommitTS + } + req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, prewrite) + _, err = s.store.SendReq(bo, req, loc.Region, readTimeoutShort) + c.Assert(err, IsNil) + + // Make commitTS less than minCommitTS. + committer.commitTS = committer.startTS + 1 + // Ensure that the new commit ts is greater than minCommitTS when retry + time.Sleep(3 * time.Millisecond) + err = committer.commitKeys(bo, committer.keys) + c.Assert(err, IsNil) + + // Use startTS+2 to read the data and get nothing. + // Use max.Uint64 to read the data and success. + // That means the final commitTS > startTS+2, it's not the one we provide. + // So we cover the rety commitTS logic. + txn1, err := s.store.BeginWithStartTS(committer.startTS + 2) + c.Assert(err, IsNil) + _, err = txn1.Get(bo.ctx, []byte("x")) + c.Assert(kv.IsErrNotFound(err), IsTrue) + + txn2, err := s.store.BeginWithStartTS(math.MaxUint64) + c.Assert(err, IsNil) + val, err := txn2.Get(bo.ctx, []byte("x")) + c.Assert(err, IsNil) + c.Assert(bytes.Equal(val, []byte("v")), IsTrue) +} + func (s *testCommitterSuite) TestPessimisticPrewriteRequest(c *C) { // This test checks that the isPessimisticLock field is set in the request even when no keys are pessimistic lock. txn := s.begin(c) txn.SetOption(kv.Pessimistic, true) err := txn.Set([]byte("t1"), []byte("v1")) c.Assert(err, IsNil) - commiter, err := newTwoPhaseCommitterWithInit(txn, 0) + committer, err := newTwoPhaseCommitterWithInit(txn, 0) c.Assert(err, IsNil) - commiter.forUpdateTS = 100 + committer.forUpdateTS = 100 var batch batchKeys batch.keys = append(batch.keys, []byte("t1")) batch.region = RegionVerID{1, 1, 1} - req := commiter.buildPrewriteRequest(batch, 1) + req := committer.buildPrewriteRequest(batch, 1) c.Assert(len(req.Prewrite().IsPessimisticLock), Greater, 0) c.Assert(req.Prewrite().ForUpdateTs, Equals, uint64(100)) } @@ -568,13 +618,13 @@ func (s *testCommitterSuite) getLockInfo(c *C, key []byte) *kvrpcpb.LockInfo { txn := s.begin(c) err := txn.Set(key, key) c.Assert(err, IsNil) - commiter, err := newTwoPhaseCommitterWithInit(txn, 1) + committer, err := newTwoPhaseCommitterWithInit(txn, 1) c.Assert(err, IsNil) bo := NewBackoffer(context.Background(), getMaxBackoff) loc, err := s.store.regionCache.LocateKey(bo, key) c.Assert(err, IsNil) batch := batchKeys{region: loc.Region, keys: [][]byte{key}} - req := commiter.buildPrewriteRequest(batch, 1) + req := committer.buildPrewriteRequest(batch, 1) resp, err := s.store.SendReq(bo, req, loc.Region, readTimeoutShort) c.Assert(err, IsNil) c.Assert(resp.Resp, NotNil)