From 61819204fbd1a6e9ed8da916e11a65da882f1b06 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 4 Nov 2019 23:04:54 +0800 Subject: [PATCH 01/14] store: upgrade the CheckTxnStatus API CheckTxnStatus introduces a non-block read mode. In this mode, TiDB can ignore the secondary lock TTL check and send the CheckTxnStatus request. --- go.mod | 2 +- go.sum | 2 + store/mockstore/mocktikv/errors.go | 8 ++ store/mockstore/mocktikv/mock_tikv_test.go | 8 +- store/mockstore/mocktikv/mvcc.go | 2 +- store/mockstore/mocktikv/mvcc_leveldb.go | 20 ++++- store/mockstore/mocktikv/rpc.go | 7 +- store/tikv/2pc.go | 4 +- store/tikv/2pc_test.go | 2 +- store/tikv/backoff.go | 5 ++ store/tikv/coprocessor.go | 2 +- store/tikv/lock_resolver.go | 88 +++++++++++++++++----- store/tikv/lock_test.go | 55 ++++++++++++-- store/tikv/snapshot.go | 4 +- 14 files changed, 168 insertions(+), 41 deletions(-) diff --git a/go.mod b/go.mod index c2d577a44c153..6aa12feba725f 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1 + github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1 github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 github.com/pingcap/parser v0.0.0-20191031081038-bfb0c3adf567 github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 diff --git a/go.sum b/go.sum index b3cfcf6079010..75b63f28e236a 100644 --- a/go.sum +++ b/go.sum @@ -167,6 +167,8 @@ github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17Xtb github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1 h1:qqGSXCFr9Uc5VIDBEt4zlmMcI8e4GlkWfDVzQ+dexRk= github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1 h1:J5oimSv+0emw5e/D1ZX/zh2WcMv0pOVT9QKruXfvJbg= +github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/store/mockstore/mocktikv/errors.go b/store/mockstore/mocktikv/errors.go index 99bdca22a3121..c94b3d987b8c4 100644 --- a/store/mockstore/mocktikv/errors.go +++ b/store/mockstore/mocktikv/errors.go @@ -96,3 +96,11 @@ type ErrCommitTSExpired struct { func (e *ErrCommitTSExpired) Error() string { return "commit ts expired" } + +type ErrTxnNotFound struct { + kvrpcpb.TxnNotFound +} + +func (e *ErrTxnNotFound) Error() string { + return "txn not found" +} diff --git a/store/mockstore/mocktikv/mock_tikv_test.go b/store/mockstore/mocktikv/mock_tikv_test.go index 1e56befb67606..0004a6f372b45 100644 --- a/store/mockstore/mocktikv/mock_tikv_test.go +++ b/store/mockstore/mocktikv/mock_tikv_test.go @@ -663,14 +663,14 @@ func (s *testMVCCLevelDB) TestErrors(c *C) { func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) { s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", 5, 666) - ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666) + ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false) c.Assert(err, IsNil) c.Assert(ttl, Equals, uint64(666)) c.Assert(commitTS, Equals, uint64(0)) s.mustCommitOK(c, [][]byte{[]byte("pk")}, 5, 30) - ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666) + ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false) c.Assert(err, IsNil) c.Assert(ttl, Equals, uint64(0)) c.Assert(commitTS, Equals, uint64(30)) @@ -678,7 +678,7 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) { s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", 5, 666) s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, 5) - ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666) + ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666, false) c.Assert(err, IsNil) c.Assert(ttl, Equals, uint64(0)) c.Assert(commitTS, Equals, uint64(0)) @@ -687,7 +687,7 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) { func (s *testMVCCLevelDB) TestRejectCommitTS(c *C) { s.mustPrewriteOK(c, putMutations("x", "A"), "x", 5) // Push the minCommitTS - _, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100) + _, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100, false) c.Assert(err, IsNil) err = s.store.Commit([][]byte{[]byte("x")}, 5, 10) e, ok := errors.Cause(err).(*ErrCommitTSExpired) diff --git a/store/mockstore/mocktikv/mvcc.go b/store/mockstore/mocktikv/mvcc.go index be85563479903..e6ed8a677876c 100644 --- a/store/mockstore/mocktikv/mvcc.go +++ b/store/mockstore/mocktikv/mvcc.go @@ -266,7 +266,7 @@ type MVCCStore interface { BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error GC(startKey, endKey []byte, safePoint uint64) error DeleteRange(startKey, endKey []byte) error - CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64) (ttl, commitTS uint64, err error) + CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (ttl, commitTS uint64, err error) Close() error } diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index b83622347e891..faa1e3c630b48 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -1025,7 +1025,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { // primaryKey + lockTS together could locate the primary lock. // callerStartTS is the start ts of reader transaction. // currentTS is the current ts, but it may be inaccurate. Just use it to check TTL. -func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64) (uint64, uint64, error) { +func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (uint64, uint64, error) { mvcc.mu.Lock() defer mvcc.mu.Unlock() @@ -1105,9 +1105,21 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS } // If current transaction is not prewritted before, it may be pessimistic lock. - // When pessimistic lock rollback, it may not leave a 'rollbacked' tombstone. - logutil.BgLogger().Debug("CheckTxnStatus can't find the primary lock, pessimistic rollback?") - return 0, 0, nil + // When pessimistic txn rollback statement, it may not leave a 'rollbacked' tombstone. + + // Or maybe caused by concurrent prewrite operation. + // Especially in the non-block reading case, the secondary lock is likely to be + // written before the primary lock. + + if rollbackIfNotExist { + // ttl, commitTS = 0, 0 means rollback + return 0, 0, nil + } + + return 0, 0, &ErrTxnNotFound{kvrpcpb.TxnNotFound{ + StartTs: lockTS, + PrimaryKey: primaryKey, + }} } // TxnHeartBeat implements the MVCCStore interface. diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index 4d860b9db722e..ca0c10a1c4eb6 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -97,6 +97,11 @@ func convertToKeyError(err error) *kvrpcpb.KeyError { CommitTsExpired: &expired.CommitTsExpired, } } + if tmp, ok := errors.Cause(err).(*ErrTxnNotFound); ok { + return &kvrpcpb.KeyError{ + TxnNotFound: &tmp.TxnNotFound, + } + } return &kvrpcpb.KeyError{ Abort: err.Error(), } @@ -379,7 +384,7 @@ func (h *rpcHandler) handleKvCheckTxnStatus(req *kvrpcpb.CheckTxnStatusRequest) panic("KvCheckTxnStatus: key not in region") } var resp kvrpcpb.CheckTxnStatusResponse - ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs()) + ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs(), req.GetRollbackIfNotExist()) if err != nil { resp.Error = convertToKeyError(err) } else { diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 4a9a8d0948f4a..54d5db6fce949 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -554,7 +554,7 @@ func (actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, bat locks = append(locks, lock) } start := time.Now() - msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, c.startTS, locks) + msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, c.startTS, locks, false) if err != nil { return errors.Trace(err) } @@ -720,7 +720,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * locks = append(locks, lock) } // Because we already waited on tikv, no need to Backoff here. - _, err = c.store.lockResolver.ResolveLocks(bo, c.startTS, locks) + _, err = c.store.lockResolver.ResolveLocks(bo, c.startTS, locks, false) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index cb47a8bf057c8..a1e5c55f554f3 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -592,7 +592,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) { lr := newLockResolver(s.store) bo := NewBackoffer(context.Background(), getMaxBackoff) - status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS) + status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, false) c.Assert(err, IsNil) c.Assert(status.ttl, GreaterEqual, lockInfo.LockTtl) diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index 06def2a31ffce..58aa1ebe18322 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -138,6 +138,7 @@ const ( boTiKVRPC backoffType = iota BoTxnLock boTxnLockFast + boTxnNotFound BoPDRPC BoRegionMiss BoUpdateLeader @@ -157,6 +158,8 @@ func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int return NewBackoffFn(vars.BackoffLockFast, 3000, EqualJitter) case BoPDRPC: return NewBackoffFn(500, 3000, EqualJitter) + case boTxnNotFound: + return NewBackoffFn(5, 1000, EqualJitter) case BoRegionMiss: // change base time to 2ms, because it may recover soon. return NewBackoffFn(2, 500, NoJitter) @@ -184,6 +187,8 @@ func (t backoffType) String() string { return "updateLeader" case boServerBusy: return "serverBusy" + case boTxnNotFound: + return "txnNotFound" } return "" } diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 25bac38da5f70..06c14089ccc04 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -786,7 +786,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon if lockErr := resp.pbResp.GetLocked(); lockErr != nil { logutil.BgLogger().Debug("coprocessor encounters", zap.Stringer("lock", lockErr)) - msBeforeExpired, err1 := worker.store.lockResolver.ResolveLocks(bo, worker.req.StartTs, []*Lock{NewLock(lockErr)}) + msBeforeExpired, err1 := worker.store.lockResolver.ResolveLocks(bo, worker.req.StartTs, []*Lock{NewLock(lockErr)}, false) if err1 != nil { return nil, errors.Trace(err1) } diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 42a17c095477e..f211e783b754b 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -202,7 +202,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi tikvLockResolverCountWithExpired.Inc() // Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not! - status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64) + status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, false) if err != nil { return false, err } @@ -269,7 +269,8 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi // commit status. // 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to // the same transaction. -func (lr *LockResolver) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) { +// If noWait is true, ResolveLocks would not wait the secondary lock TTL to expire. +func (lr *LockResolver) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks []*Lock, noWait bool) (int64, error) { var msBeforeTxnExpired txnExpireTime if len(locks) == 0 { return msBeforeTxnExpired.value(), nil @@ -278,20 +279,25 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks tikvLockResolverCountWithResolve.Inc() var expiredLocks []*Lock - for _, l := range locks { - msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) - if msBeforeLockExpired <= 0 { - expiredLocks = append(expiredLocks, l) - } else { - msBeforeTxnExpired.update(int64(l.TTL)) - tikvLockResolverCountWithNotExpired.Inc() + if noWait { + // Check all locks status. + expiredLocks = locks + } else { + for _, l := range locks { + msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) + if msBeforeLockExpired <= 0 { + expiredLocks = append(expiredLocks, l) + } else { + msBeforeTxnExpired.update(int64(l.TTL)) + tikvLockResolverCountWithNotExpired.Inc() + } } } // TxnID -> []Region, record resolved Regions. // TODO: Maybe put it in LockResolver and share by all txns. cleanTxns := make(map[uint64]map[RegionVerID]struct{}) for _, l := range expiredLocks { - status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS) + status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS, noWait) if err != nil { msBeforeTxnExpired.update(0) err = errors.Trace(err) @@ -366,11 +372,13 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary if err != nil { return status, err } - return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS) + return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, false) } -func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) { +func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64, nonBlockRead bool) (TxnStatus, error) { var currentTS uint64 + var err error + var status TxnStatus if l.TTL == 0 { // NOTE: l.TTL = 0 is a special protocol!!! // When the pessimistic txn prewrite meets locks of a txn, it should resolve the lock **unconditionally**. @@ -378,28 +386,64 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart // Set currentTS to max uint64 to make the lock expired. currentTS = math.MaxUint64 } else { - var err error currentTS, err = lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) if err != nil { return TxnStatus{}, err } } - return lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS) + + for { + status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, nonBlockRead) + if err == nil { + return status, err + } + if _, ok := errors.Cause(err).(txnNotFoundErr); !ok { + return status, err + } + err = bo.Backoff(boTxnNotFound, err) + if err != nil { + return status, errors.Trace(err) + } + } +} + +type txnNotFoundErr struct { + *kvrpcpb.TxnNotFound } -func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64) (TxnStatus, error) { +func (e txnNotFoundErr) Error() string { + return e.TxnNotFound.String() +} + +// If nonBlockRead is true, the caller should handle the txnNotFoundErr. +func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64, nonBlockRead bool) (TxnStatus, error) { if s, ok := lr.getResolved(txnID); ok { return s, nil } tikvLockResolverCountWithQueryTxnStatus.Inc() + // CheckTxnStatus would meet the following cases: + // 1. LOCK + // 1.1 Lock expired -- orphan lock, fail to update TTL, crash recovery etc. + // 1.2 Lock TTL -- active transaction holding the lock. + // 2. NO LOCK + // 2.1 Txn Committed + // 2.2 Txn Rollbacked -- rollback itself, rollback by others, GC tomb etc. + // 2.3 No lock -- pessimistic lock rollback, concurrence prewrite. + + // When we are using nonBlockRead, we must take care of the concurrence prewrite case. + // The secondary lock is written while the primary lock doesn't exist (the packet may be ongoing). + // CheckTxnStatus should not rollback the transaction in this case. + rollbackIfNotExist := !nonBlockRead + var status TxnStatus req := tikvrpc.NewRequest(tikvrpc.CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{ - PrimaryKey: primary, - LockTs: txnID, - CallerStartTs: callerStartTS, - CurrentTs: currentTS, + PrimaryKey: primary, + LockTs: txnID, + CallerStartTs: callerStartTS, + CurrentTs: currentTS, + RollbackIfNotExist: rollbackIfNotExist, }) for { loc, err := lr.store.GetRegionCache().LocateKey(bo, primary) @@ -426,6 +470,12 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte } cmdResp := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse) if keyErr := cmdResp.GetError(); keyErr != nil { + txnNotFound := keyErr.GetTxnNotFound() + // Only when nonBlockRead is true should this happen, otherwise there're bugs. + if nonBlockRead && txnNotFound != nil { + return status, txnNotFoundErr{txnNotFound} + } + err = errors.Errorf("unexpected err: %s, tid: %v", keyErr, txnID) logutil.BgLogger().Error("getTxnStatus error", zap.Error(err)) return status, err diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 1baa091ffc557..7f467b650ffbb 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -21,6 +21,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/tikvrpc" @@ -286,7 +287,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { bo := NewBackoffer(context.Background(), PrewriteMaxBackoff) resolver := newLockResolver(s.store) // Call getTxnStatus to check the lock status. - status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) + status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsFalse) c.Assert(status.ttl, Greater, uint64(0)) @@ -294,32 +295,76 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { // Test the ResolveLocks API lock := s.mustGetLock(c, []byte("second")) - timeBeforeExpire, err := resolver.ResolveLocks(bo, currentTS, []*Lock{lock}) + timeBeforeExpire, err := resolver.ResolveLocks(bo, currentTS, []*Lock{lock}, false) c.Assert(err, IsNil) c.Assert(timeBeforeExpire > int64(0), IsTrue) // Force rollback the lock using lock.TTL = 0. lock.TTL = uint64(0) - timeBeforeExpire, err = resolver.ResolveLocks(bo, currentTS, []*Lock{lock}) + timeBeforeExpire, err = resolver.ResolveLocks(bo, currentTS, []*Lock{lock}, false) c.Assert(err, IsNil) c.Assert(timeBeforeExpire, Equals, int64(0)) // Then call getTxnStatus again and check the lock status. currentTS, err = oracle.GetTimestamp(context.Background()) c.Assert(err, IsNil) - status, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) + status, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, uint64(0)) // Call getTxnStatus on a committed transaction. startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) - status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS) + status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, false) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, commitTS) } +func (s *testLockSuite) TestCheckTxnStatusNoBlockRead(c *C) { + txn, err := s.store.Begin() + c.Assert(err, IsNil) + txn.Set(kv.Key("key"), []byte("value")) + txn.Set(kv.Key("second"), []byte("xxx")) + committer, err := newTwoPhaseCommitterWithInit(txn.(*tikvTxn), 0) + c.Assert(err, IsNil) + // Increase lock TTL to make CI more stable. + committer.lockTTL = txnLockTTL(txn.(*tikvTxn).startTime, 200*1024*1024) + + // Only prewrite the secondary key to simulate a concurrent prewrite case: + // prewrite secondary regions success and prewrite the primary region is pending. + err = committer.prewriteKeys(NewBackoffer(context.Background(), PrewriteMaxBackoff), [][]byte{[]byte("second")}) + c.Assert(err, IsNil) + + oracle := s.store.GetOracle() + currentTS, err := oracle.GetTimestamp(context.Background()) + c.Assert(err, IsNil) + bo := NewBackoffer(context.Background(), PrewriteMaxBackoff) + resolver := newLockResolver(s.store) + // Call getTxnStatus with non-block reading flag. + _, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true) + c.Assert(err, NotNil) + _, ok := errors.Cause(err).(txnNotFoundErr) + c.Assert(ok, IsTrue) + + errCh := make(chan error) + go func() { + errCh <- committer.prewriteKeys(NewBackoffer(context.Background(), PrewriteMaxBackoff), [][]byte{[]byte("key")}) + }() + + lock := &Lock{ + Key: []byte("second"), + Primary: []byte("key"), + TxnID: txn.StartTS(), + TTL: 100000, // Not zero. + } + // Call getTxnStatus with non-block reading flag, this time with retry to cover more code. + status, err := resolver.getTxnStatusFromLock(bo, lock, currentTS, true) + c.Assert(err, IsNil) + c.Assert(status.ttl, Greater, uint64(0)) + c.Assert(<-errCh, IsNil) +} + func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) { committer, err := newTwoPhaseCommitterWithInit(txn, 0) c.Assert(err, IsNil) diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 9423a33781733..ffd3a450449fe 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -244,7 +244,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll locks = append(locks, lock) } if len(lockedKeys) > 0 { - msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, locks) + msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, locks, false) if err != nil { return errors.Trace(err) } @@ -332,7 +332,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { if err != nil { return nil, errors.Trace(err) } - msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, []*Lock{lock}) + msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, []*Lock{lock}, false) if err != nil { return nil, errors.Trace(err) } From c0285ba58c7ea3cbe20576bb71f7cf902147b72c Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 4 Nov 2019 23:30:55 +0800 Subject: [PATCH 02/14] make golint happy --- store/mockstore/mocktikv/errors.go | 1 + 1 file changed, 1 insertion(+) diff --git a/store/mockstore/mocktikv/errors.go b/store/mockstore/mocktikv/errors.go index 536c5ae03ac1a..effe1871aac79 100644 --- a/store/mockstore/mocktikv/errors.go +++ b/store/mockstore/mocktikv/errors.go @@ -99,6 +99,7 @@ func (e *ErrCommitTSExpired) Error() string { return "commit ts expired" } +// ErrTxnNotFound is returned when the lock is not found. type ErrTxnNotFound struct { kvrpcpb.TxnNotFound } From e7aad0d1a2c757b380e1aa3c4a4392e858b962f9 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 5 Nov 2019 17:06:30 +0800 Subject: [PATCH 03/14] address comment --- store/tikv/2pc.go | 4 +-- store/tikv/2pc_test.go | 2 +- store/tikv/backoff.go | 5 ---- store/tikv/coprocessor.go | 2 +- store/tikv/lock_resolver.go | 51 ++++++++++++------------------------- store/tikv/lock_test.go | 15 ++++++----- store/tikv/snapshot.go | 4 +-- 7 files changed, 30 insertions(+), 53 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index f113ad4f6c925..d5d94a5c7cdaf 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -557,7 +557,7 @@ func (actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, bat locks = append(locks, lock) } start := time.Now() - msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, c.startTS, locks, false) + msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, c.startTS, locks) if err != nil { return errors.Trace(err) } @@ -735,7 +735,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * locks = append(locks, lock) } // Because we already waited on tikv, no need to Backoff here. - _, err = c.store.lockResolver.ResolveLocks(bo, c.startTS, locks, false) + _, err = c.store.lockResolver.ResolveLocks(bo, c.startTS, locks) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 97691bab2b7bf..04b375af886a8 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -592,7 +592,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) { lr := newLockResolver(s.store) bo := NewBackoffer(context.Background(), getMaxBackoff) - status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, false) + status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS) c.Assert(err, IsNil) c.Assert(status.ttl, GreaterEqual, lockInfo.LockTtl) diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index 58aa1ebe18322..06def2a31ffce 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -138,7 +138,6 @@ const ( boTiKVRPC backoffType = iota BoTxnLock boTxnLockFast - boTxnNotFound BoPDRPC BoRegionMiss BoUpdateLeader @@ -158,8 +157,6 @@ func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int return NewBackoffFn(vars.BackoffLockFast, 3000, EqualJitter) case BoPDRPC: return NewBackoffFn(500, 3000, EqualJitter) - case boTxnNotFound: - return NewBackoffFn(5, 1000, EqualJitter) case BoRegionMiss: // change base time to 2ms, because it may recover soon. return NewBackoffFn(2, 500, NoJitter) @@ -187,8 +184,6 @@ func (t backoffType) String() string { return "updateLeader" case boServerBusy: return "serverBusy" - case boTxnNotFound: - return "txnNotFound" } return "" } diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 06c14089ccc04..25bac38da5f70 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -786,7 +786,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon if lockErr := resp.pbResp.GetLocked(); lockErr != nil { logutil.BgLogger().Debug("coprocessor encounters", zap.Stringer("lock", lockErr)) - msBeforeExpired, err1 := worker.store.lockResolver.ResolveLocks(bo, worker.req.StartTs, []*Lock{NewLock(lockErr)}, false) + msBeforeExpired, err1 := worker.store.lockResolver.ResolveLocks(bo, worker.req.StartTs, []*Lock{NewLock(lockErr)}) if err1 != nil { return nil, errors.Trace(err1) } diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index b945981dfa37b..fab38be43a866 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -204,7 +204,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi tikvLockResolverCountWithExpired.Inc() // Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not! - status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, false) + status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64) if err != nil { return false, err } @@ -271,8 +271,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi // commit status. // 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to // the same transaction. -// If noWait is true, ResolveLocks would not wait the secondary lock TTL to expire. -func (lr *LockResolver) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks []*Lock, noWait bool) (int64, error) { +func (lr *LockResolver) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) { var msBeforeTxnExpired txnExpireTime if len(locks) == 0 { return msBeforeTxnExpired.value(), nil @@ -280,26 +279,11 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks tikvLockResolverCountWithResolve.Inc() - var expiredLocks []*Lock - if noWait { - // Check all locks status. - expiredLocks = locks - } else { - for _, l := range locks { - msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) - if msBeforeLockExpired <= 0 { - expiredLocks = append(expiredLocks, l) - } else { - msBeforeTxnExpired.update(int64(l.TTL)) - tikvLockResolverCountWithNotExpired.Inc() - } - } - } // TxnID -> []Region, record resolved Regions. // TODO: Maybe put it in LockResolver and share by all txns. cleanTxns := make(map[uint64]map[RegionVerID]struct{}) - for _, l := range expiredLocks { - status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS, noWait) + for _, l := range locks { + status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS) if err != nil { msBeforeTxnExpired.update(0) err = errors.Trace(err) @@ -374,10 +358,10 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary if err != nil { return status, err } - return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, false) + return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS) } -func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64, nonBlockRead bool) (TxnStatus, error) { +func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) { var currentTS uint64 var err error var status TxnStatus @@ -395,17 +379,20 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart } for { - status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, nonBlockRead) + status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS) if err == nil { return status, err } if _, ok := errors.Cause(err).(txnNotFoundErr); !ok { return status, err } - err = bo.Backoff(boTxnNotFound, err) - if err != nil { - return status, errors.Trace(err) + + // Handle txnNotFound error. + // If the (secondary) lock TTL has expired, return rollbacked status. + if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) <= 0 { + return status, nil } + time.Sleep(5 * time.Millisecond) } } @@ -418,7 +405,7 @@ func (e txnNotFoundErr) Error() string { } // If nonBlockRead is true, the caller should handle the txnNotFoundErr. -func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64, nonBlockRead bool) (TxnStatus, error) { +func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64) (TxnStatus, error) { if s, ok := lr.getResolved(txnID); ok { return s, nil } @@ -434,18 +421,13 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte // 2.2 Txn Rollbacked -- rollback itself, rollback by others, GC tomb etc. // 2.3 No lock -- pessimistic lock rollback, concurrence prewrite. - // When we are using nonBlockRead, we must take care of the concurrence prewrite case. - // The secondary lock is written while the primary lock doesn't exist (the packet may be ongoing). - // CheckTxnStatus should not rollback the transaction in this case. - rollbackIfNotExist := !nonBlockRead - var status TxnStatus req := tikvrpc.NewRequest(tikvrpc.CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{ PrimaryKey: primary, LockTs: txnID, CallerStartTs: callerStartTS, CurrentTs: currentTS, - RollbackIfNotExist: rollbackIfNotExist, + RollbackIfNotExist: false, }) for { loc, err := lr.store.GetRegionCache().LocateKey(bo, primary) @@ -473,8 +455,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte cmdResp := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse) if keyErr := cmdResp.GetError(); keyErr != nil { txnNotFound := keyErr.GetTxnNotFound() - // Only when nonBlockRead is true should this happen, otherwise there're bugs. - if nonBlockRead && txnNotFound != nil { + if txnNotFound != nil { return status, txnNotFoundErr{txnNotFound} } diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 7f467b650ffbb..ce9b97a70f454 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -287,7 +287,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { bo := NewBackoffer(context.Background(), PrewriteMaxBackoff) resolver := newLockResolver(s.store) // Call getTxnStatus to check the lock status. - status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false) + status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsFalse) c.Assert(status.ttl, Greater, uint64(0)) @@ -295,27 +295,27 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { // Test the ResolveLocks API lock := s.mustGetLock(c, []byte("second")) - timeBeforeExpire, err := resolver.ResolveLocks(bo, currentTS, []*Lock{lock}, false) + timeBeforeExpire, err := resolver.ResolveLocks(bo, currentTS, []*Lock{lock}) c.Assert(err, IsNil) c.Assert(timeBeforeExpire > int64(0), IsTrue) // Force rollback the lock using lock.TTL = 0. lock.TTL = uint64(0) - timeBeforeExpire, err = resolver.ResolveLocks(bo, currentTS, []*Lock{lock}, false) + timeBeforeExpire, err = resolver.ResolveLocks(bo, currentTS, []*Lock{lock}) c.Assert(err, IsNil) c.Assert(timeBeforeExpire, Equals, int64(0)) // Then call getTxnStatus again and check the lock status. currentTS, err = oracle.GetTimestamp(context.Background()) c.Assert(err, IsNil) - status, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false) + status, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, uint64(0)) // Call getTxnStatus on a committed transaction. startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) - status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, false) + status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, commitTS) @@ -341,8 +341,9 @@ func (s *testLockSuite) TestCheckTxnStatusNoBlockRead(c *C) { c.Assert(err, IsNil) bo := NewBackoffer(context.Background(), PrewriteMaxBackoff) resolver := newLockResolver(s.store) + // Call getTxnStatus with non-block reading flag. - _, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true) + _, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) c.Assert(err, NotNil) _, ok := errors.Cause(err).(txnNotFoundErr) c.Assert(ok, IsTrue) @@ -359,7 +360,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoBlockRead(c *C) { TTL: 100000, // Not zero. } // Call getTxnStatus with non-block reading flag, this time with retry to cover more code. - status, err := resolver.getTxnStatusFromLock(bo, lock, currentTS, true) + status, err := resolver.getTxnStatusFromLock(bo, lock, currentTS) c.Assert(err, IsNil) c.Assert(status.ttl, Greater, uint64(0)) c.Assert(<-errCh, IsNil) diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index ffd3a450449fe..9423a33781733 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -244,7 +244,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll locks = append(locks, lock) } if len(lockedKeys) > 0 { - msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, locks, false) + msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, locks) if err != nil { return errors.Trace(err) } @@ -332,7 +332,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { if err != nil { return nil, errors.Trace(err) } - msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, []*Lock{lock}, false) + msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, []*Lock{lock}) if err != nil { return nil, errors.Trace(err) } From 8106b330848bbe46bdb8dfe0a2fc32ceda025f64 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 5 Nov 2019 19:41:44 +0800 Subject: [PATCH 04/14] address comment --- store/tikv/2pc_test.go | 2 +- store/tikv/lock_resolver.go | 15 +++++++------- store/tikv/lock_test.go | 40 +++++++++++++++++++++++++++---------- 3 files changed, 39 insertions(+), 18 deletions(-) diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 643f74bac6d77..29c46939267d0 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -597,7 +597,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) { lr := newLockResolver(s.store) bo := NewBackoffer(context.Background(), getMaxBackoff) - status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS) + status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, true) c.Assert(err, IsNil) c.Assert(status.ttl, GreaterEqual, lockInfo.LockTtl) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index fab38be43a866..c9389a92f00e2 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -204,7 +204,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi tikvLockResolverCountWithExpired.Inc() // Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not! - status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64) + status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true) if err != nil { return false, err } @@ -358,7 +358,7 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary if err != nil { return status, err } - return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS) + return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true) } func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) { @@ -378,8 +378,9 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart } } + rollbackIfNotExist := false for { - status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS) + status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist) if err == nil { return status, err } @@ -389,10 +390,10 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart // Handle txnNotFound error. // If the (secondary) lock TTL has expired, return rollbacked status. + time.Sleep(5 * time.Millisecond) if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) <= 0 { - return status, nil + rollbackIfNotExist = true } - time.Sleep(5 * time.Millisecond) } } @@ -405,7 +406,7 @@ func (e txnNotFoundErr) Error() string { } // If nonBlockRead is true, the caller should handle the txnNotFoundErr. -func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64) (TxnStatus, error) { +func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (TxnStatus, error) { if s, ok := lr.getResolved(txnID); ok { return s, nil } @@ -427,7 +428,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte LockTs: txnID, CallerStartTs: callerStartTS, CurrentTs: currentTS, - RollbackIfNotExist: false, + RollbackIfNotExist: rollbackIfNotExist, }) for { loc, err := lr.store.GetRegionCache().LocateKey(bo, primary) diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index ce9b97a70f454..b1a5f71159387 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -208,7 +208,12 @@ func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) { txn, err := s.store.Begin() c.Assert(err, IsNil) txn.Set(kv.Key("key"), []byte("value")) - s.prewriteTxn(c, txn.(*tikvTxn)) + committer, err := newTwoPhaseCommitterWithInit(txn.(*tikvTxn), 0) + c.Assert(err, IsNil) + // Increase lock TTL to make CI more stable. + committer.lockTTL = txnLockTTL(txn.(*tikvTxn).startTime, 200*1024*1024) + err = committer.prewriteKeys(NewBackoffer(context.Background(), PrewriteMaxBackoff), committer.keys) + c.Assert(err, IsNil) bo := NewBackoffer(context.Background(), PrewriteMaxBackoff) lr := newLockResolver(s.store) @@ -287,7 +292,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { bo := NewBackoffer(context.Background(), PrewriteMaxBackoff) resolver := newLockResolver(s.store) // Call getTxnStatus to check the lock status. - status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) + status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsFalse) c.Assert(status.ttl, Greater, uint64(0)) @@ -308,20 +313,20 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { // Then call getTxnStatus again and check the lock status. currentTS, err = oracle.GetTimestamp(context.Background()) c.Assert(err, IsNil) - status, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) + status, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, uint64(0)) // Call getTxnStatus on a committed transaction. startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) - status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS) + status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, true) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, commitTS) } -func (s *testLockSuite) TestCheckTxnStatusNoBlockRead(c *C) { +func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) { txn, err := s.store.Begin() c.Assert(err, IsNil) txn.Set(kv.Key("key"), []byte("value")) @@ -342,28 +347,43 @@ func (s *testLockSuite) TestCheckTxnStatusNoBlockRead(c *C) { bo := NewBackoffer(context.Background(), PrewriteMaxBackoff) resolver := newLockResolver(s.store) - // Call getTxnStatus with non-block reading flag. - _, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) + // Call getTxnStatus for the TxnNotFound case. + _, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false) c.Assert(err, NotNil) _, ok := errors.Cause(err).(txnNotFoundErr) c.Assert(ok, IsTrue) errCh := make(chan error) go func() { - errCh <- committer.prewriteKeys(NewBackoffer(context.Background(), PrewriteMaxBackoff), [][]byte{[]byte("key")}) + errCh <- committer.prewriteKeys(bo, [][]byte{[]byte("key")}) }() lock := &Lock{ Key: []byte("second"), Primary: []byte("key"), TxnID: txn.StartTS(), - TTL: 100000, // Not zero. + TTL: 100000, } - // Call getTxnStatus with non-block reading flag, this time with retry to cover more code. + // Call getTxnStatusFromLock to cover the retry logic. status, err := resolver.getTxnStatusFromLock(bo, lock, currentTS) c.Assert(err, IsNil) c.Assert(status.ttl, Greater, uint64(0)) c.Assert(<-errCh, IsNil) + c.Assert(committer.cleanupKeys(bo, committer.keys), IsNil) + + // Call getTxnStatusFromLock to cover TxnNotFound and retry timeout. + startTS, err := oracle.GetTimestamp(context.Background()) + c.Assert(err, IsNil) + lock = &Lock{ + Key: []byte("second"), + Primary: []byte("key"), + TxnID: startTS, + TTL: 1000, + } + status, err = resolver.getTxnStatusFromLock(bo, lock, currentTS) + c.Assert(err, IsNil) + c.Assert(status.ttl, Equals, uint64(0)) + c.Assert(status.commitTS, Equals, uint64(0)) } func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) { From 386ca77430df49586f90e5d8b14669f5e5235f2c Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 5 Nov 2019 19:58:04 +0800 Subject: [PATCH 05/14] write the rollback key in mocktikv --- store/mockstore/mocktikv/mvcc_leveldb.go | 20 +++++++++++++------- store/tikv/lock_resolver.go | 1 - 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index b53dbbf553dd9..7f8349dd57b02 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -882,7 +882,7 @@ func rollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS uint6 } // If current transaction's lock exist. if ok && dec.lock.startTS == startTS { - if err = rollbackLock(batch, dec.lock, key, startTS); err != nil { + if err = rollbackLock(batch, key, startTS); err != nil { return errors.Trace(err) } return nil @@ -919,7 +919,7 @@ func rollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS uint6 return nil } -func rollbackLock(batch *leveldb.Batch, lock mvccLock, key []byte, startTS uint64) error { +func rollbackLock(batch *leveldb.Batch, key []byte, startTS uint64) error { tomb := mvccValue{ valueType: typeRollback, startTS: startTS, @@ -980,7 +980,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { if ok && dec.lock.startTS == startTS { // If the lock has already outdated, clean up it. if currentTS == 0 || uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) { - if err = rollbackLock(batch, dec.lock, key, startTS); err != nil { + if err = rollbackLock(batch, key, startTS); err != nil { return err } return mvcc.db.Write(batch, nil) @@ -1057,7 +1057,7 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS // If the lock has already outdated, clean up it. if uint64(oracle.ExtractPhysical(lock.startTS))+lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) { - if err = rollbackLock(batch, lock, primaryKey, lockTS); err != nil { + if err = rollbackLock(batch, primaryKey, lockTS); err != nil { return 0, 0, errors.Trace(err) } if err = mvcc.db.Write(batch, nil); err != nil { @@ -1119,7 +1119,13 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS // written before the primary lock. if rollbackIfNotExist { - // ttl, commitTS = 0, 0 means rollback + batch := &leveldb.Batch{} + if err := rollbackLock(batch, primaryKey, lockTS); err != nil { + return 0, 0, errors.Trace(err) + } + if err := mvcc.db.Write(batch, nil); err != nil { + return 0, 0, errors.Trace(err) + } return 0, 0, nil } @@ -1232,7 +1238,7 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS if commitTS > 0 { err = commitLock(batch, dec.lock, currKey, startTS, commitTS) } else { - err = rollbackLock(batch, dec.lock, currKey, startTS) + err = rollbackLock(batch, currKey, startTS) } if err != nil { return errors.Trace(err) @@ -1272,7 +1278,7 @@ func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[ if commitTS > 0 { err = commitLock(batch, dec.lock, currKey, dec.lock.startTS, commitTS) } else { - err = rollbackLock(batch, dec.lock, currKey, dec.lock.startTS) + err = rollbackLock(batch, currKey, dec.lock.startTS) } if err != nil { return errors.Trace(err) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index c9389a92f00e2..dd8ff08eeb3a7 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -389,7 +389,6 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart } // Handle txnNotFound error. - // If the (secondary) lock TTL has expired, return rollbacked status. time.Sleep(5 * time.Millisecond) if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) <= 0 { rollbackIfNotExist = true From bba8d048675c52a2786318a1980c3762cec59f71 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 5 Nov 2019 20:09:50 +0800 Subject: [PATCH 06/14] tiny clean up --- store/mockstore/mocktikv/errors.go | 2 +- store/tikv/lock_resolver.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/store/mockstore/mocktikv/errors.go b/store/mockstore/mocktikv/errors.go index effe1871aac79..82643730f0df3 100644 --- a/store/mockstore/mocktikv/errors.go +++ b/store/mockstore/mocktikv/errors.go @@ -99,7 +99,7 @@ func (e *ErrCommitTSExpired) Error() string { return "commit ts expired" } -// ErrTxnNotFound is returned when the lock is not found. +// ErrTxnNotFound is returned when the primary lock of the txn is not found. type ErrTxnNotFound struct { kvrpcpb.TxnNotFound } diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index dd8ff08eeb3a7..5b4440ae0529d 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -404,7 +404,8 @@ func (e txnNotFoundErr) Error() string { return e.TxnNotFound.String() } -// If nonBlockRead is true, the caller should handle the txnNotFoundErr. +// getTxnStatus sends the CheckTxnStatus request to the TiKV server. +// When rollbackIfNotExist is false, the caller should be careful with the txnNotFoundErr error. func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (TxnStatus, error) { if s, ok := lr.getResolved(txnID); ok { return s, nil @@ -412,7 +413,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte tikvLockResolverCountWithQueryTxnStatus.Inc() - // CheckTxnStatus would meet the following cases: + // CheckTxnStatus may meet the following cases: // 1. LOCK // 1.1 Lock expired -- orphan lock, fail to update TTL, crash recovery etc. // 1.2 Lock TTL -- active transaction holding the lock. From fbc600f1298e6d6a36e35b45b85d8e1211bce1eb Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 5 Nov 2019 21:08:32 +0800 Subject: [PATCH 07/14] address comment --- store/tikv/lock_resolver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 5b4440ae0529d..3c18218cdbba4 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -358,7 +358,7 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary if err != nil { return status, err } - return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true) + return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, false) } func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) { From 5fb661a76f44d4ffe765d35ca9d746f97b349b09 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 5 Nov 2019 22:52:52 +0800 Subject: [PATCH 08/14] make CI stable --- session/session_test.go | 3 +++ store/tikv/2pc.go | 6 ++++++ store/tikv/backoff.go | 7 ++++++- store/tikv/lock_resolver.go | 8 +++++--- 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/session/session_test.go b/session/session_test.go index 2f41d9a0bf69e..569297cb7058f 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2443,6 +2443,8 @@ func (s *testSessionSuite) TestKVVars(c *C) { } wg.Done() }() + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockSleepBetween2PC", "return"), IsNil) go func() { for { tk.MustExec("update kvvars set b = b + 1 where a = 1") @@ -2453,6 +2455,7 @@ func (s *testSessionSuite) TestKVVars(c *C) { wg.Done() }() wg.Wait() + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockSleepBetween2PC"), IsNil) for { tk2.MustQuery("select * from kvvars") if atomic.LoadInt32(backOffWeightVal) != 0 { diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 9391ade5b5c2c..c055ef0a8f885 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -24,6 +24,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/kv" @@ -1075,6 +1076,11 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error { return err } + failpoint.Inject("mockSleepBetween2PC", func() error { + time.Sleep(100 * time.Millisecond) + return nil + }) + start = time.Now() commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars) err = c.commitKeys(commitBo, c.keys) diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index 06def2a31ffce..25d1ea8645e7b 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -142,6 +142,7 @@ const ( BoRegionMiss BoUpdateLeader boServerBusy + boTxnNotFound ) func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int { @@ -160,6 +161,8 @@ func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int case BoRegionMiss: // change base time to 2ms, because it may recover soon. return NewBackoffFn(2, 500, NoJitter) + case boTxnNotFound: + return NewBackoffFn(2, 500, NoJitter) case BoUpdateLeader: return NewBackoffFn(1, 10, NoJitter) case boServerBusy: @@ -184,6 +187,8 @@ func (t backoffType) String() string { return "updateLeader" case boServerBusy: return "serverBusy" + case boTxnNotFound: + return "txnNotFound" } return "" } @@ -192,7 +197,7 @@ func (t backoffType) TError() error { switch t { case boTiKVRPC: return ErrTiKVServerTimeout - case BoTxnLock, boTxnLockFast: + case BoTxnLock, boTxnLockFast, boTxnNotFound: return ErrResolveLockTimeout case BoPDRPC: return ErrPDServerTimeout diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 3c18218cdbba4..553be55b63809 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -382,14 +382,16 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart for { status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist) if err == nil { - return status, err + return status, nil } if _, ok := errors.Cause(err).(txnNotFoundErr); !ok { - return status, err + return TxnStatus{}, err } // Handle txnNotFound error. - time.Sleep(5 * time.Millisecond) + if err := bo.Backoff(boTxnNotFound, err); err != nil { + logutil.BgLogger().Warn("getTxnStatusFromLock backoff fail", zap.Error(err)) + } if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) <= 0 { rollbackIfNotExist = true } From bbd42e3fd1329b756c027db5b7e843d5711d3b69 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 5 Nov 2019 22:55:03 +0800 Subject: [PATCH 09/14] address comment --- store/tikv/lock_resolver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 553be55b63809..4a67deb614df0 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -358,7 +358,7 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary if err != nil { return status, err } - return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, false) + return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true) } func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) { From e77e17b4dcf04e0ae479f6dd687b51245270bd56 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 5 Nov 2019 23:08:03 +0800 Subject: [PATCH 10/14] fix a data race --- store/tikv/lock_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index b1a5f71159387..d6ed1e44ed0d2 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -364,6 +364,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) { TxnID: txn.StartTS(), TTL: 100000, } + bo = NewBackoffer(context.Background(), PrewriteMaxBackoff) // Call getTxnStatusFromLock to cover the retry logic. status, err := resolver.getTxnStatusFromLock(bo, lock, currentTS) c.Assert(err, IsNil) From 64721b2bd5f0c665c1ba4407ec7112e4d1859337 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 6 Nov 2019 00:11:59 +0800 Subject: [PATCH 11/14] fix data race --- store/tikv/lock_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index d6ed1e44ed0d2..41d06b5316f9c 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -355,7 +355,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) { errCh := make(chan error) go func() { - errCh <- committer.prewriteKeys(bo, [][]byte{[]byte("key")}) + errCh <- committer.prewriteKeys(NewBackoffer(context.Background(), PrewriteMaxBackoff), [][]byte{[]byte("key")}) }() lock := &Lock{ @@ -364,7 +364,6 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) { TxnID: txn.StartTS(), TTL: 100000, } - bo = NewBackoffer(context.Background(), PrewriteMaxBackoff) // Call getTxnStatusFromLock to cover the retry logic. status, err := resolver.getTxnStatusFromLock(bo, lock, currentTS) c.Assert(err, IsNil) From d4e192a679b299f813386dfc46c7e9e216fe99e3 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 6 Nov 2019 17:02:48 +0800 Subject: [PATCH 12/14] address comment --- store/mockstore/mocktikv/mock_tikv_test.go | 23 ++++++++++++++++++++++ store/tikv/lock_resolver.go | 5 +++++ 2 files changed, 28 insertions(+) diff --git a/store/mockstore/mocktikv/mock_tikv_test.go b/store/mockstore/mocktikv/mock_tikv_test.go index 0004a6f372b45..fa07665438a6a 100644 --- a/store/mockstore/mocktikv/mock_tikv_test.go +++ b/store/mockstore/mocktikv/mock_tikv_test.go @@ -682,6 +682,29 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) { c.Assert(err, IsNil) c.Assert(ttl, Equals, uint64(0)) c.Assert(commitTS, Equals, uint64(0)) + + // Cover the TxnNotFound case. + _, _, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, false) + c.Assert(err, NotNil) + notFound, ok := errors.Cause(err).(*ErrTxnNotFound) + c.Assert(ok, IsTrue) + c.Assert(notFound.StartTs, Equals, uint64(5)) + c.Assert(string(notFound.PrimaryKey), Equals, "txnNotFound") + + ttl, commitTS, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, true) + c.Assert(err, IsNil) + c.Assert(ttl, Equals, uint64(0)) + c.Assert(commitTS, Equals, uint64(0)) + + // Check the rollback tombstone blocks this prewrite which comes with a smaller startTS. + req := &kvrpcpb.PrewriteRequest{ + Mutations: putMutations("txnNotFound", "val"), + PrimaryLock: []byte("txnNotFound"), + StartVersion: 4, + MinCommitTs: 6, + } + errs := s.store.Prewrite(req) + c.Assert(errs, NotNil) } func (s *testMVCCLevelDB) TestRejectCommitTS(c *C) { diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 4a67deb614df0..3c51525eb7c48 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -384,11 +384,16 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart if err == nil { return status, nil } + // If the error is something other than txnNotFoundErr, throw the error (network + // unavailable, tikv down, backoff timeout etc) to the caller. if _, ok := errors.Cause(err).(txnNotFoundErr); !ok { return TxnStatus{}, err } // Handle txnNotFound error. + // getTxnStatus() returns it when the secondary locks exist while the primary lock doesn't. + // This is likely to happen in the concurrently prewrite when secondary regions + // success before the primary region. if err := bo.Backoff(boTxnNotFound, err); err != nil { logutil.BgLogger().Warn("getTxnStatusFromLock backoff fail", zap.Error(err)) } From 0f3ae0f28b6ff87d7f2fd169cc95820ae3f1d8eb Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 7 Nov 2019 15:47:40 +0800 Subject: [PATCH 13/14] make CI stable --- store/tikv/lock_resolver.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 3c51525eb7c48..fd91923b9c8f5 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -390,6 +390,10 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart return TxnStatus{}, err } + if l.LockType == kvrpcpb.Op_PessimisticLock { + return TxnStatus{l.TTL, 0}, nil + } + // Handle txnNotFound error. // getTxnStatus() returns it when the secondary locks exist while the primary lock doesn't. // This is likely to happen in the concurrently prewrite when secondary regions @@ -397,6 +401,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart if err := bo.Backoff(boTxnNotFound, err); err != nil { logutil.BgLogger().Warn("getTxnStatusFromLock backoff fail", zap.Error(err)) } + if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) <= 0 { rollbackIfNotExist = true } From 1aa8f9bf79a81dae890d45b63e4c31f4f3d81ae4 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 7 Nov 2019 19:02:57 +0800 Subject: [PATCH 14/14] try to make CI stable --- executor/analyze_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/executor/analyze_test.go b/executor/analyze_test.go index d9da61bc15b81..fcbf738b8be0b 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -455,24 +455,24 @@ func (s *testFastAnalyze) TestFastAnalyzeRetryRowCount(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") - tk.MustExec("create table t(a int primary key)") + tk.MustExec("create table retry_row_count(a int primary key)") c.Assert(s.dom.StatsHandle().Update(s.dom.InfoSchema()), IsNil) tk.MustExec("set @@session.tidb_enable_fast_analyze=1") tk.MustExec("set @@session.tidb_build_stats_concurrency=1") - tblInfo, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + tblInfo, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("retry_row_count")) c.Assert(err, IsNil) tid := tblInfo.Meta().ID // construct 6 regions split by {6, 12, 18, 24, 30} splitKeys := generateTableSplitKeyForInt(tid, []int{6, 12, 18, 24, 30}) regionIDs := manipulateCluster(s.cluster, splitKeys) for i := 0; i < 30; i++ { - tk.MustExec(fmt.Sprintf("insert into t values (%d)", i)) + tk.MustExec(fmt.Sprintf("insert into retry_row_count values (%d)", i)) } s.cli.setFailRegion(regionIDs[4]) - tk.MustExec("analyze table t") + tk.MustExec("analyze table retry_row_count") // 4 regions will be sampled, and it will retry the last failed region. c.Assert(s.cli.mu.count, Equals, int64(5)) - row := tk.MustQuery(`show stats_meta where db_name = "test" and table_name = "t"`).Rows()[0] + row := tk.MustQuery(`show stats_meta where db_name = "test" and table_name = "retry_row_count"`).Rows()[0] c.Assert(row[5], Equals, "30") }