diff --git a/executor/analyze_test.go b/executor/analyze_test.go index d9da61bc15b81..fcbf738b8be0b 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -455,24 +455,24 @@ func (s *testFastAnalyze) TestFastAnalyzeRetryRowCount(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") - tk.MustExec("create table t(a int primary key)") + tk.MustExec("create table retry_row_count(a int primary key)") c.Assert(s.dom.StatsHandle().Update(s.dom.InfoSchema()), IsNil) tk.MustExec("set @@session.tidb_enable_fast_analyze=1") tk.MustExec("set @@session.tidb_build_stats_concurrency=1") - tblInfo, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + tblInfo, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("retry_row_count")) c.Assert(err, IsNil) tid := tblInfo.Meta().ID // construct 6 regions split by {6, 12, 18, 24, 30} splitKeys := generateTableSplitKeyForInt(tid, []int{6, 12, 18, 24, 30}) regionIDs := manipulateCluster(s.cluster, splitKeys) for i := 0; i < 30; i++ { - tk.MustExec(fmt.Sprintf("insert into t values (%d)", i)) + tk.MustExec(fmt.Sprintf("insert into retry_row_count values (%d)", i)) } s.cli.setFailRegion(regionIDs[4]) - tk.MustExec("analyze table t") + tk.MustExec("analyze table retry_row_count") // 4 regions will be sampled, and it will retry the last failed region. c.Assert(s.cli.mu.count, Equals, int64(5)) - row := tk.MustQuery(`show stats_meta where db_name = "test" and table_name = "t"`).Rows()[0] + row := tk.MustQuery(`show stats_meta where db_name = "test" and table_name = "retry_row_count"`).Rows()[0] c.Assert(row[5], Equals, "30") } diff --git a/go.mod b/go.mod index c9c38b5dabe5a..e4fa4f3eb7461 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466 + github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1 github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 github.com/pingcap/parser v0.0.0-20191031081038-bfb0c3adf567 github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 diff --git a/go.sum b/go.sum index 186e5a1341048..0e5e1e7dfb0e3 100644 --- a/go.sum +++ b/go.sum @@ -158,8 +158,8 @@ github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89 github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466 h1:C5nV9osqA+R/R2fxYxVfqAUlCi3Oo5yJ/JSKDeHSAOk= -github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1 h1:J5oimSv+0emw5e/D1ZX/zh2WcMv0pOVT9QKruXfvJbg= +github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/session/session_test.go b/session/session_test.go index 9e9cf72beb36b..22c15b77cc35d 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2443,6 +2443,8 @@ func (s *testSessionSuite) TestKVVars(c *C) { } wg.Done() }() + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockSleepBetween2PC", "return"), IsNil) go func() { for { tk.MustExec("update kvvars set b = b + 1 where a = 1") @@ -2453,6 +2455,7 @@ func (s *testSessionSuite) TestKVVars(c *C) { wg.Done() }() wg.Wait() + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockSleepBetween2PC"), IsNil) for { tk2.MustQuery("select * from kvvars") if atomic.LoadInt32(backOffWeightVal) != 0 { diff --git a/store/mockstore/mocktikv/errors.go b/store/mockstore/mocktikv/errors.go index ea26645d04a84..82643730f0df3 100644 --- a/store/mockstore/mocktikv/errors.go +++ b/store/mockstore/mocktikv/errors.go @@ -98,3 +98,12 @@ type ErrCommitTSExpired struct { func (e *ErrCommitTSExpired) Error() string { return "commit ts expired" } + +// ErrTxnNotFound is returned when the primary lock of the txn is not found. +type ErrTxnNotFound struct { + kvrpcpb.TxnNotFound +} + +func (e *ErrTxnNotFound) Error() string { + return "txn not found" +} diff --git a/store/mockstore/mocktikv/mock_tikv_test.go b/store/mockstore/mocktikv/mock_tikv_test.go index 1e56befb67606..fa07665438a6a 100644 --- a/store/mockstore/mocktikv/mock_tikv_test.go +++ b/store/mockstore/mocktikv/mock_tikv_test.go @@ -663,14 +663,14 @@ func (s *testMVCCLevelDB) TestErrors(c *C) { func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) { s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", 5, 666) - ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666) + ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false) c.Assert(err, IsNil) c.Assert(ttl, Equals, uint64(666)) c.Assert(commitTS, Equals, uint64(0)) s.mustCommitOK(c, [][]byte{[]byte("pk")}, 5, 30) - ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666) + ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false) c.Assert(err, IsNil) c.Assert(ttl, Equals, uint64(0)) c.Assert(commitTS, Equals, uint64(30)) @@ -678,16 +678,39 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) { s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", 5, 666) s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, 5) - ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666) + ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666, false) c.Assert(err, IsNil) c.Assert(ttl, Equals, uint64(0)) c.Assert(commitTS, Equals, uint64(0)) + + // Cover the TxnNotFound case. + _, _, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, false) + c.Assert(err, NotNil) + notFound, ok := errors.Cause(err).(*ErrTxnNotFound) + c.Assert(ok, IsTrue) + c.Assert(notFound.StartTs, Equals, uint64(5)) + c.Assert(string(notFound.PrimaryKey), Equals, "txnNotFound") + + ttl, commitTS, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, true) + c.Assert(err, IsNil) + c.Assert(ttl, Equals, uint64(0)) + c.Assert(commitTS, Equals, uint64(0)) + + // Check the rollback tombstone blocks this prewrite which comes with a smaller startTS. + req := &kvrpcpb.PrewriteRequest{ + Mutations: putMutations("txnNotFound", "val"), + PrimaryLock: []byte("txnNotFound"), + StartVersion: 4, + MinCommitTs: 6, + } + errs := s.store.Prewrite(req) + c.Assert(errs, NotNil) } func (s *testMVCCLevelDB) TestRejectCommitTS(c *C) { s.mustPrewriteOK(c, putMutations("x", "A"), "x", 5) // Push the minCommitTS - _, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100) + _, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100, false) c.Assert(err, IsNil) err = s.store.Commit([][]byte{[]byte("x")}, 5, 10) e, ok := errors.Cause(err).(*ErrCommitTSExpired) diff --git a/store/mockstore/mocktikv/mvcc.go b/store/mockstore/mocktikv/mvcc.go index ffed3c0b22e14..d5067adbdc1dd 100644 --- a/store/mockstore/mocktikv/mvcc.go +++ b/store/mockstore/mocktikv/mvcc.go @@ -268,7 +268,7 @@ type MVCCStore interface { BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error GC(startKey, endKey []byte, safePoint uint64) error DeleteRange(startKey, endKey []byte) error - CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64) (ttl, commitTS uint64, err error) + CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (ttl, commitTS uint64, err error) Close() error } diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 74c67228fb287..7f8349dd57b02 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -882,7 +882,7 @@ func rollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS uint6 } // If current transaction's lock exist. if ok && dec.lock.startTS == startTS { - if err = rollbackLock(batch, dec.lock, key, startTS); err != nil { + if err = rollbackLock(batch, key, startTS); err != nil { return errors.Trace(err) } return nil @@ -919,7 +919,7 @@ func rollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS uint6 return nil } -func rollbackLock(batch *leveldb.Batch, lock mvccLock, key []byte, startTS uint64) error { +func rollbackLock(batch *leveldb.Batch, key []byte, startTS uint64) error { tomb := mvccValue{ valueType: typeRollback, startTS: startTS, @@ -980,7 +980,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { if ok && dec.lock.startTS == startTS { // If the lock has already outdated, clean up it. if currentTS == 0 || uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) { - if err = rollbackLock(batch, dec.lock, key, startTS); err != nil { + if err = rollbackLock(batch, key, startTS); err != nil { return err } return mvcc.db.Write(batch, nil) @@ -1032,7 +1032,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { // primaryKey + lockTS together could locate the primary lock. // callerStartTS is the start ts of reader transaction. // currentTS is the current ts, but it may be inaccurate. Just use it to check TTL. -func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64) (uint64, uint64, error) { +func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (uint64, uint64, error) { mvcc.mu.Lock() defer mvcc.mu.Unlock() @@ -1057,7 +1057,7 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS // If the lock has already outdated, clean up it. if uint64(oracle.ExtractPhysical(lock.startTS))+lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) { - if err = rollbackLock(batch, lock, primaryKey, lockTS); err != nil { + if err = rollbackLock(batch, primaryKey, lockTS); err != nil { return 0, 0, errors.Trace(err) } if err = mvcc.db.Write(batch, nil); err != nil { @@ -1112,9 +1112,27 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS } // If current transaction is not prewritted before, it may be pessimistic lock. - // When pessimistic lock rollback, it may not leave a 'rollbacked' tombstone. - logutil.BgLogger().Debug("CheckTxnStatus can't find the primary lock, pessimistic rollback?") - return 0, 0, nil + // When pessimistic txn rollback statement, it may not leave a 'rollbacked' tombstone. + + // Or maybe caused by concurrent prewrite operation. + // Especially in the non-block reading case, the secondary lock is likely to be + // written before the primary lock. + + if rollbackIfNotExist { + batch := &leveldb.Batch{} + if err := rollbackLock(batch, primaryKey, lockTS); err != nil { + return 0, 0, errors.Trace(err) + } + if err := mvcc.db.Write(batch, nil); err != nil { + return 0, 0, errors.Trace(err) + } + return 0, 0, nil + } + + return 0, 0, &ErrTxnNotFound{kvrpcpb.TxnNotFound{ + StartTs: lockTS, + PrimaryKey: primaryKey, + }} } // TxnHeartBeat implements the MVCCStore interface. @@ -1220,7 +1238,7 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS if commitTS > 0 { err = commitLock(batch, dec.lock, currKey, startTS, commitTS) } else { - err = rollbackLock(batch, dec.lock, currKey, startTS) + err = rollbackLock(batch, currKey, startTS) } if err != nil { return errors.Trace(err) @@ -1260,7 +1278,7 @@ func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[ if commitTS > 0 { err = commitLock(batch, dec.lock, currKey, dec.lock.startTS, commitTS) } else { - err = rollbackLock(batch, dec.lock, currKey, dec.lock.startTS) + err = rollbackLock(batch, currKey, dec.lock.startTS) } if err != nil { return errors.Trace(err) diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index ad2f485d1ef85..c87fb76b23cad 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -98,6 +98,11 @@ func convertToKeyError(err error) *kvrpcpb.KeyError { CommitTsExpired: &expired.CommitTsExpired, } } + if tmp, ok := errors.Cause(err).(*ErrTxnNotFound); ok { + return &kvrpcpb.KeyError{ + TxnNotFound: &tmp.TxnNotFound, + } + } return &kvrpcpb.KeyError{ Abort: err.Error(), } @@ -382,7 +387,7 @@ func (h *rpcHandler) handleKvCheckTxnStatus(req *kvrpcpb.CheckTxnStatusRequest) panic("KvCheckTxnStatus: key not in region") } var resp kvrpcpb.CheckTxnStatusResponse - ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs()) + ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs(), req.GetRollbackIfNotExist()) if err != nil { resp.Error = convertToKeyError(err) } else { diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index e623821139829..921d549ff2e0e 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -24,6 +24,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/kv" @@ -1098,6 +1099,11 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error { return err } + failpoint.Inject("mockSleepBetween2PC", func() error { + time.Sleep(100 * time.Millisecond) + return nil + }) + start = time.Now() commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars) err = c.commitKeys(commitBo, c.keys) diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 643f74bac6d77..29c46939267d0 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -597,7 +597,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) { lr := newLockResolver(s.store) bo := NewBackoffer(context.Background(), getMaxBackoff) - status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS) + status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, true) c.Assert(err, IsNil) c.Assert(status.ttl, GreaterEqual, lockInfo.LockTtl) diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index 06def2a31ffce..25d1ea8645e7b 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -142,6 +142,7 @@ const ( BoRegionMiss BoUpdateLeader boServerBusy + boTxnNotFound ) func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int { @@ -160,6 +161,8 @@ func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int case BoRegionMiss: // change base time to 2ms, because it may recover soon. return NewBackoffFn(2, 500, NoJitter) + case boTxnNotFound: + return NewBackoffFn(2, 500, NoJitter) case BoUpdateLeader: return NewBackoffFn(1, 10, NoJitter) case boServerBusy: @@ -184,6 +187,8 @@ func (t backoffType) String() string { return "updateLeader" case boServerBusy: return "serverBusy" + case boTxnNotFound: + return "txnNotFound" } return "" } @@ -192,7 +197,7 @@ func (t backoffType) TError() error { switch t { case boTiKVRPC: return ErrTiKVServerTimeout - case BoTxnLock, boTxnLockFast: + case BoTxnLock, boTxnLockFast, boTxnNotFound: return ErrResolveLockTimeout case BoPDRPC: return ErrPDServerTimeout diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 7262095a2ce83..fd91923b9c8f5 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -204,7 +204,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi tikvLockResolverCountWithExpired.Inc() // Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not! - status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64) + status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true) if err != nil { return false, err } @@ -279,20 +279,10 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks tikvLockResolverCountWithResolve.Inc() - var expiredLocks []*Lock - for _, l := range locks { - msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) - if msBeforeLockExpired <= 0 { - expiredLocks = append(expiredLocks, l) - } else { - msBeforeTxnExpired.update(int64(l.TTL)) - tikvLockResolverCountWithNotExpired.Inc() - } - } // TxnID -> []Region, record resolved Regions. // TODO: Maybe put it in LockResolver and share by all txns. cleanTxns := make(map[uint64]map[RegionVerID]struct{}) - for _, l := range expiredLocks { + for _, l := range locks { status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS) if err != nil { msBeforeTxnExpired.update(0) @@ -368,11 +358,13 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary if err != nil { return status, err } - return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS) + return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true) } func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) { var currentTS uint64 + var err error + var status TxnStatus if l.TTL == 0 { // NOTE: l.TTL = 0 is a special protocol!!! // When the pessimistic txn prewrite meets locks of a txn, it should resolve the lock **unconditionally**. @@ -380,28 +372,75 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart // Set currentTS to max uint64 to make the lock expired. currentTS = math.MaxUint64 } else { - var err error currentTS, err = lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) if err != nil { return TxnStatus{}, err } } - return lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS) + + rollbackIfNotExist := false + for { + status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist) + if err == nil { + return status, nil + } + // If the error is something other than txnNotFoundErr, throw the error (network + // unavailable, tikv down, backoff timeout etc) to the caller. + if _, ok := errors.Cause(err).(txnNotFoundErr); !ok { + return TxnStatus{}, err + } + + if l.LockType == kvrpcpb.Op_PessimisticLock { + return TxnStatus{l.TTL, 0}, nil + } + + // Handle txnNotFound error. + // getTxnStatus() returns it when the secondary locks exist while the primary lock doesn't. + // This is likely to happen in the concurrently prewrite when secondary regions + // success before the primary region. + if err := bo.Backoff(boTxnNotFound, err); err != nil { + logutil.BgLogger().Warn("getTxnStatusFromLock backoff fail", zap.Error(err)) + } + + if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) <= 0 { + rollbackIfNotExist = true + } + } } -func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64) (TxnStatus, error) { +type txnNotFoundErr struct { + *kvrpcpb.TxnNotFound +} + +func (e txnNotFoundErr) Error() string { + return e.TxnNotFound.String() +} + +// getTxnStatus sends the CheckTxnStatus request to the TiKV server. +// When rollbackIfNotExist is false, the caller should be careful with the txnNotFoundErr error. +func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (TxnStatus, error) { if s, ok := lr.getResolved(txnID); ok { return s, nil } tikvLockResolverCountWithQueryTxnStatus.Inc() + // CheckTxnStatus may meet the following cases: + // 1. LOCK + // 1.1 Lock expired -- orphan lock, fail to update TTL, crash recovery etc. + // 1.2 Lock TTL -- active transaction holding the lock. + // 2. NO LOCK + // 2.1 Txn Committed + // 2.2 Txn Rollbacked -- rollback itself, rollback by others, GC tomb etc. + // 2.3 No lock -- pessimistic lock rollback, concurrence prewrite. + var status TxnStatus req := tikvrpc.NewRequest(tikvrpc.CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{ - PrimaryKey: primary, - LockTs: txnID, - CallerStartTs: callerStartTS, - CurrentTs: currentTS, + PrimaryKey: primary, + LockTs: txnID, + CallerStartTs: callerStartTS, + CurrentTs: currentTS, + RollbackIfNotExist: rollbackIfNotExist, }) for { loc, err := lr.store.GetRegionCache().LocateKey(bo, primary) @@ -428,6 +467,11 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte } cmdResp := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse) if keyErr := cmdResp.GetError(); keyErr != nil { + txnNotFound := keyErr.GetTxnNotFound() + if txnNotFound != nil { + return status, txnNotFoundErr{txnNotFound} + } + err = errors.Errorf("unexpected err: %s, tid: %v", keyErr, txnID) logutil.BgLogger().Error("getTxnStatus error", zap.Error(err)) return status, err diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 1baa091ffc557..41d06b5316f9c 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -21,6 +21,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/tikvrpc" @@ -207,7 +208,12 @@ func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) { txn, err := s.store.Begin() c.Assert(err, IsNil) txn.Set(kv.Key("key"), []byte("value")) - s.prewriteTxn(c, txn.(*tikvTxn)) + committer, err := newTwoPhaseCommitterWithInit(txn.(*tikvTxn), 0) + c.Assert(err, IsNil) + // Increase lock TTL to make CI more stable. + committer.lockTTL = txnLockTTL(txn.(*tikvTxn).startTime, 200*1024*1024) + err = committer.prewriteKeys(NewBackoffer(context.Background(), PrewriteMaxBackoff), committer.keys) + c.Assert(err, IsNil) bo := NewBackoffer(context.Background(), PrewriteMaxBackoff) lr := newLockResolver(s.store) @@ -286,7 +292,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { bo := NewBackoffer(context.Background(), PrewriteMaxBackoff) resolver := newLockResolver(s.store) // Call getTxnStatus to check the lock status. - status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) + status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsFalse) c.Assert(status.ttl, Greater, uint64(0)) @@ -307,19 +313,79 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { // Then call getTxnStatus again and check the lock status. currentTS, err = oracle.GetTimestamp(context.Background()) c.Assert(err, IsNil) - status, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) + status, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, uint64(0)) // Call getTxnStatus on a committed transaction. startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) - status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS) + status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, true) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, commitTS) } +func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) { + txn, err := s.store.Begin() + c.Assert(err, IsNil) + txn.Set(kv.Key("key"), []byte("value")) + txn.Set(kv.Key("second"), []byte("xxx")) + committer, err := newTwoPhaseCommitterWithInit(txn.(*tikvTxn), 0) + c.Assert(err, IsNil) + // Increase lock TTL to make CI more stable. + committer.lockTTL = txnLockTTL(txn.(*tikvTxn).startTime, 200*1024*1024) + + // Only prewrite the secondary key to simulate a concurrent prewrite case: + // prewrite secondary regions success and prewrite the primary region is pending. + err = committer.prewriteKeys(NewBackoffer(context.Background(), PrewriteMaxBackoff), [][]byte{[]byte("second")}) + c.Assert(err, IsNil) + + oracle := s.store.GetOracle() + currentTS, err := oracle.GetTimestamp(context.Background()) + c.Assert(err, IsNil) + bo := NewBackoffer(context.Background(), PrewriteMaxBackoff) + resolver := newLockResolver(s.store) + + // Call getTxnStatus for the TxnNotFound case. + _, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false) + c.Assert(err, NotNil) + _, ok := errors.Cause(err).(txnNotFoundErr) + c.Assert(ok, IsTrue) + + errCh := make(chan error) + go func() { + errCh <- committer.prewriteKeys(NewBackoffer(context.Background(), PrewriteMaxBackoff), [][]byte{[]byte("key")}) + }() + + lock := &Lock{ + Key: []byte("second"), + Primary: []byte("key"), + TxnID: txn.StartTS(), + TTL: 100000, + } + // Call getTxnStatusFromLock to cover the retry logic. + status, err := resolver.getTxnStatusFromLock(bo, lock, currentTS) + c.Assert(err, IsNil) + c.Assert(status.ttl, Greater, uint64(0)) + c.Assert(<-errCh, IsNil) + c.Assert(committer.cleanupKeys(bo, committer.keys), IsNil) + + // Call getTxnStatusFromLock to cover TxnNotFound and retry timeout. + startTS, err := oracle.GetTimestamp(context.Background()) + c.Assert(err, IsNil) + lock = &Lock{ + Key: []byte("second"), + Primary: []byte("key"), + TxnID: startTS, + TTL: 1000, + } + status, err = resolver.getTxnStatusFromLock(bo, lock, currentTS) + c.Assert(err, IsNil) + c.Assert(status.ttl, Equals, uint64(0)) + c.Assert(status.commitTS, Equals, uint64(0)) +} + func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) { committer, err := newTwoPhaseCommitterWithInit(txn, 0) c.Assert(err, IsNil)