Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: upgrade the CheckTxnStatus API #13123

Merged
merged 18 commits into from
Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191031081038-bfb0c3adf567
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466 h1:C5nV9osqA+R/R2fxYxVfqAUlCi3Oo5yJ/JSKDeHSAOk=
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1 h1:J5oimSv+0emw5e/D1ZX/zh2WcMv0pOVT9QKruXfvJbg=
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
9 changes: 9 additions & 0 deletions store/mockstore/mocktikv/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,12 @@ type ErrCommitTSExpired struct {
func (e *ErrCommitTSExpired) Error() string {
return "commit ts expired"
}

// ErrTxnNotFound is returned when the lock is not found.
type ErrTxnNotFound struct {
kvrpcpb.TxnNotFound
}

func (e *ErrTxnNotFound) Error() string {
return "txn not found"
}
8 changes: 4 additions & 4 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,22 +663,22 @@ func (s *testMVCCLevelDB) TestErrors(c *C) {
func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", 5, 666)

ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666)
ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to cover the case where the last parameter is true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(666))
c.Assert(commitTS, Equals, uint64(0))

s.mustCommitOK(c, [][]byte{[]byte("pk")}, 5, 30)

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666)
ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(30))

s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", 5, 666)
s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, 5)

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666)
ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))
Expand All @@ -687,7 +687,7 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
func (s *testMVCCLevelDB) TestRejectCommitTS(c *C) {
s.mustPrewriteOK(c, putMutations("x", "A"), "x", 5)
// Push the minCommitTS
_, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100)
_, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100, false)
c.Assert(err, IsNil)
err = s.store.Commit([][]byte{[]byte("x")}, 5, 10)
e, ok := errors.Cause(err).(*ErrCommitTSExpired)
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ type MVCCStore interface {
BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error
GC(startKey, endKey []byte, safePoint uint64) error
DeleteRange(startKey, endKey []byte) error
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64) (ttl, commitTS uint64, err error)
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (ttl, commitTS uint64, err error)
Close() error
}

Expand Down
20 changes: 16 additions & 4 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
// primaryKey + lockTS together could locate the primary lock.
// callerStartTS is the start ts of reader transaction.
// currentTS is the current ts, but it may be inaccurate. Just use it to check TTL.
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64) (uint64, uint64, error) {
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (uint64, uint64, error) {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

Expand Down Expand Up @@ -1112,9 +1112,21 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
}

// If current transaction is not prewritted before, it may be pessimistic lock.
// When pessimistic lock rollback, it may not leave a 'rollbacked' tombstone.
logutil.BgLogger().Debug("CheckTxnStatus can't find the primary lock, pessimistic rollback?")
return 0, 0, nil
// When pessimistic txn rollback statement, it may not leave a 'rollbacked' tombstone.

// Or maybe caused by concurrent prewrite operation.
// Especially in the non-block reading case, the secondary lock is likely to be
// written before the primary lock.

if rollbackIfNotExist {
// ttl, commitTS = 0, 0 means rollback
return 0, 0, nil
}

return 0, 0, &ErrTxnNotFound{kvrpcpb.TxnNotFound{
StartTs: lockTS,
PrimaryKey: primaryKey,
}}
}

// TxnHeartBeat implements the MVCCStore interface.
Expand Down
7 changes: 6 additions & 1 deletion store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func convertToKeyError(err error) *kvrpcpb.KeyError {
CommitTsExpired: &expired.CommitTsExpired,
}
}
if tmp, ok := errors.Cause(err).(*ErrTxnNotFound); ok {
return &kvrpcpb.KeyError{
TxnNotFound: &tmp.TxnNotFound,
}
}
return &kvrpcpb.KeyError{
Abort: err.Error(),
}
Expand Down Expand Up @@ -382,7 +387,7 @@ func (h *rpcHandler) handleKvCheckTxnStatus(req *kvrpcpb.CheckTxnStatusRequest)
panic("KvCheckTxnStatus: key not in region")
}
var resp kvrpcpb.CheckTxnStatusResponse
ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs())
ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs(), req.GetRollbackIfNotExist())
if err != nil {
resp.Error = convertToKeyError(err)
} else {
Expand Down
65 changes: 48 additions & 17 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,20 +279,10 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks

tikvLockResolverCountWithResolve.Inc()

var expiredLocks []*Lock
for _, l := range locks {
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL)
if msBeforeLockExpired <= 0 {
expiredLocks = append(expiredLocks, l)
} else {
msBeforeTxnExpired.update(int64(l.TTL))
tikvLockResolverCountWithNotExpired.Inc()
}
}
// TxnID -> []Region, record resolved Regions.
// TODO: Maybe put it in LockResolver and share by all txns.
cleanTxns := make(map[uint64]map[RegionVerID]struct{})
for _, l := range expiredLocks {
for _, l := range locks {
status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS)
if err != nil {
msBeforeTxnExpired.update(0)
Expand Down Expand Up @@ -373,35 +363,71 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary

func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) {
var currentTS uint64
var err error
var status TxnStatus
if l.TTL == 0 {
// NOTE: l.TTL = 0 is a special protocol!!!
// When the pessimistic txn prewrite meets locks of a txn, it should resolve the lock **unconditionally**.
// In this case, TiKV use lock TTL = 0 to notify TiDB, and TiDB should resolve the lock!
// Set currentTS to max uint64 to make the lock expired.
currentTS = math.MaxUint64
} else {
var err error
currentTS, err = lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx)
if err != nil {
return TxnStatus{}, err
}
}
return lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS)

for {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a for loop in getTxnStatus why not handle retry in getTxnStatus directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make the unit test easier.

status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS)
if err == nil {
return status, err
}
if _, ok := errors.Cause(err).(txnNotFoundErr); !ok {
return status, err
}

// Handle txnNotFound error.
// If the (secondary) lock TTL has expired, return rollbacked status.
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) <= 0 {
return status, nil
}
time.Sleep(5 * time.Millisecond)
}
}

type txnNotFoundErr struct {
*kvrpcpb.TxnNotFound
}

func (e txnNotFoundErr) Error() string {
return e.TxnNotFound.String()
}

// If nonBlockRead is true, the caller should handle the txnNotFoundErr.
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64) (TxnStatus, error) {
if s, ok := lr.getResolved(txnID); ok {
return s, nil
}

tikvLockResolverCountWithQueryTxnStatus.Inc()

// CheckTxnStatus would meet the following cases:
// 1. LOCK
// 1.1 Lock expired -- orphan lock, fail to update TTL, crash recovery etc.
// 1.2 Lock TTL -- active transaction holding the lock.
// 2. NO LOCK
// 2.1 Txn Committed
// 2.2 Txn Rollbacked -- rollback itself, rollback by others, GC tomb etc.
// 2.3 No lock -- pessimistic lock rollback, concurrence prewrite.

var status TxnStatus
req := tikvrpc.NewRequest(tikvrpc.CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{
PrimaryKey: primary,
LockTs: txnID,
CallerStartTs: callerStartTS,
CurrentTs: currentTS,
PrimaryKey: primary,
LockTs: txnID,
CallerStartTs: callerStartTS,
CurrentTs: currentTS,
RollbackIfNotExist: false,
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
})
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, primary)
Expand All @@ -428,6 +454,11 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
}
cmdResp := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse)
if keyErr := cmdResp.GetError(); keyErr != nil {
txnNotFound := keyErr.GetTxnNotFound()
if txnNotFound != nil {
return status, txnNotFoundErr{txnNotFound}
}

err = errors.Errorf("unexpected err: %s, tid: %v", keyErr, txnID)
logutil.BgLogger().Error("getTxnStatus error", zap.Error(err))
return status, err
Expand Down
46 changes: 46 additions & 0 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
Expand Down Expand Up @@ -320,6 +321,51 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
c.Assert(status.commitTS, Equals, commitTS)
}

func (s *testLockSuite) TestCheckTxnStatusNoBlockRead(c *C) {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.Set(kv.Key("key"), []byte("value"))
txn.Set(kv.Key("second"), []byte("xxx"))
committer, err := newTwoPhaseCommitterWithInit(txn.(*tikvTxn), 0)
c.Assert(err, IsNil)
// Increase lock TTL to make CI more stable.
committer.lockTTL = txnLockTTL(txn.(*tikvTxn).startTime, 200*1024*1024)

// Only prewrite the secondary key to simulate a concurrent prewrite case:
// prewrite secondary regions success and prewrite the primary region is pending.
err = committer.prewriteKeys(NewBackoffer(context.Background(), PrewriteMaxBackoff), [][]byte{[]byte("second")})
c.Assert(err, IsNil)

oracle := s.store.GetOracle()
currentTS, err := oracle.GetTimestamp(context.Background())
c.Assert(err, IsNil)
bo := NewBackoffer(context.Background(), PrewriteMaxBackoff)
resolver := newLockResolver(s.store)

// Call getTxnStatus with non-block reading flag.
_, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS)
c.Assert(err, NotNil)
_, ok := errors.Cause(err).(txnNotFoundErr)
c.Assert(ok, IsTrue)

errCh := make(chan error)
go func() {
errCh <- committer.prewriteKeys(NewBackoffer(context.Background(), PrewriteMaxBackoff), [][]byte{[]byte("key")})
}()

lock := &Lock{
Key: []byte("second"),
Primary: []byte("key"),
TxnID: txn.StartTS(),
TTL: 100000, // Not zero.
}
// Call getTxnStatus with non-block reading flag, this time with retry to cover more code.
status, err := resolver.getTxnStatusFromLock(bo, lock, currentTS)
c.Assert(err, IsNil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this time, the lock may still not written, we can only be sure it is written after <-errCh.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock may still not written, and getTxnStatusFromLock should retry in that case, so we cover the retry logic.
The lock TTL is set to 10s, long enough for the goroutine to finish, the result should be stable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we add some sleep before errCh <- xxx to ensure covering the retry path

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we add some sleep before errCh <- xxx to ensure covering the retry path

So as to make the CI slower?

c.Assert(status.ttl, Greater, uint64(0))
c.Assert(<-errCh, IsNil)
}

func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) {
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
Expand Down