Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: upgrade the CheckTxnStatus API #13123

Merged
merged 18 commits into from
Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191031081038-bfb0c3adf567
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466 h1:C5nV9osqA+R/R2fxYxVfqAUlCi3Oo5yJ/JSKDeHSAOk=
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1 h1:J5oimSv+0emw5e/D1ZX/zh2WcMv0pOVT9QKruXfvJbg=
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
9 changes: 9 additions & 0 deletions store/mockstore/mocktikv/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,12 @@ type ErrCommitTSExpired struct {
func (e *ErrCommitTSExpired) Error() string {
return "commit ts expired"
}

// ErrTxnNotFound is returned when the lock is not found.
type ErrTxnNotFound struct {
kvrpcpb.TxnNotFound
}

func (e *ErrTxnNotFound) Error() string {
return "txn not found"
}
8 changes: 4 additions & 4 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,22 +663,22 @@ func (s *testMVCCLevelDB) TestErrors(c *C) {
func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", 5, 666)

ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666)
ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to cover the case where the last parameter is true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(666))
c.Assert(commitTS, Equals, uint64(0))

s.mustCommitOK(c, [][]byte{[]byte("pk")}, 5, 30)

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666)
ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(30))

s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", 5, 666)
s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, 5)

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666)
ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))
Expand All @@ -687,7 +687,7 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
func (s *testMVCCLevelDB) TestRejectCommitTS(c *C) {
s.mustPrewriteOK(c, putMutations("x", "A"), "x", 5)
// Push the minCommitTS
_, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100)
_, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100, false)
c.Assert(err, IsNil)
err = s.store.Commit([][]byte{[]byte("x")}, 5, 10)
e, ok := errors.Cause(err).(*ErrCommitTSExpired)
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ type MVCCStore interface {
BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error
GC(startKey, endKey []byte, safePoint uint64) error
DeleteRange(startKey, endKey []byte) error
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64) (ttl, commitTS uint64, err error)
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (ttl, commitTS uint64, err error)
Close() error
}

Expand Down
38 changes: 28 additions & 10 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ func rollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS uint6
}
// If current transaction's lock exist.
if ok && dec.lock.startTS == startTS {
if err = rollbackLock(batch, dec.lock, key, startTS); err != nil {
if err = rollbackLock(batch, key, startTS); err != nil {
return errors.Trace(err)
}
return nil
Expand Down Expand Up @@ -919,7 +919,7 @@ func rollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS uint6
return nil
}

func rollbackLock(batch *leveldb.Batch, lock mvccLock, key []byte, startTS uint64) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parameter is not used in the function, so I remove it

func rollbackLock(batch *leveldb.Batch, key []byte, startTS uint64) error {
tomb := mvccValue{
valueType: typeRollback,
startTS: startTS,
Expand Down Expand Up @@ -980,7 +980,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
if ok && dec.lock.startTS == startTS {
// If the lock has already outdated, clean up it.
if currentTS == 0 || uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
if err = rollbackLock(batch, dec.lock, key, startTS); err != nil {
if err = rollbackLock(batch, key, startTS); err != nil {
return err
}
return mvcc.db.Write(batch, nil)
Expand Down Expand Up @@ -1032,7 +1032,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
// primaryKey + lockTS together could locate the primary lock.
// callerStartTS is the start ts of reader transaction.
// currentTS is the current ts, but it may be inaccurate. Just use it to check TTL.
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64) (uint64, uint64, error) {
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (uint64, uint64, error) {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

Expand All @@ -1057,7 +1057,7 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS

// If the lock has already outdated, clean up it.
if uint64(oracle.ExtractPhysical(lock.startTS))+lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
if err = rollbackLock(batch, lock, primaryKey, lockTS); err != nil {
if err = rollbackLock(batch, primaryKey, lockTS); err != nil {
return 0, 0, errors.Trace(err)
}
if err = mvcc.db.Write(batch, nil); err != nil {
Expand Down Expand Up @@ -1112,9 +1112,27 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
}

// If current transaction is not prewritted before, it may be pessimistic lock.
// When pessimistic lock rollback, it may not leave a 'rollbacked' tombstone.
logutil.BgLogger().Debug("CheckTxnStatus can't find the primary lock, pessimistic rollback?")
return 0, 0, nil
// When pessimistic txn rollback statement, it may not leave a 'rollbacked' tombstone.

// Or maybe caused by concurrent prewrite operation.
// Especially in the non-block reading case, the secondary lock is likely to be
// written before the primary lock.

if rollbackIfNotExist {
batch := &leveldb.Batch{}
if err := rollbackLock(batch, primaryKey, lockTS); err != nil {
return 0, 0, errors.Trace(err)
}
if err := mvcc.db.Write(batch, nil); err != nil {
return 0, 0, errors.Trace(err)
}
return 0, 0, nil
}

return 0, 0, &ErrTxnNotFound{kvrpcpb.TxnNotFound{
StartTs: lockTS,
PrimaryKey: primaryKey,
}}
}

// TxnHeartBeat implements the MVCCStore interface.
Expand Down Expand Up @@ -1220,7 +1238,7 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS
if commitTS > 0 {
err = commitLock(batch, dec.lock, currKey, startTS, commitTS)
} else {
err = rollbackLock(batch, dec.lock, currKey, startTS)
err = rollbackLock(batch, currKey, startTS)
}
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1260,7 +1278,7 @@ func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[
if commitTS > 0 {
err = commitLock(batch, dec.lock, currKey, dec.lock.startTS, commitTS)
} else {
err = rollbackLock(batch, dec.lock, currKey, dec.lock.startTS)
err = rollbackLock(batch, currKey, dec.lock.startTS)
}
if err != nil {
return errors.Trace(err)
Expand Down
7 changes: 6 additions & 1 deletion store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func convertToKeyError(err error) *kvrpcpb.KeyError {
CommitTsExpired: &expired.CommitTsExpired,
}
}
if tmp, ok := errors.Cause(err).(*ErrTxnNotFound); ok {
return &kvrpcpb.KeyError{
TxnNotFound: &tmp.TxnNotFound,
}
}
return &kvrpcpb.KeyError{
Abort: err.Error(),
}
Expand Down Expand Up @@ -382,7 +387,7 @@ func (h *rpcHandler) handleKvCheckTxnStatus(req *kvrpcpb.CheckTxnStatusRequest)
panic("KvCheckTxnStatus: key not in region")
}
var resp kvrpcpb.CheckTxnStatusResponse
ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs())
ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs(), req.GetRollbackIfNotExist())
if err != nil {
resp.Error = convertToKeyError(err)
} else {
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {

lr := newLockResolver(s.store)
bo := NewBackoffer(context.Background(), getMaxBackoff)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, true)
c.Assert(err, IsNil)
c.Assert(status.ttl, GreaterEqual, lockInfo.LockTtl)

Expand Down
71 changes: 51 additions & 20 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
tikvLockResolverCountWithExpired.Inc()

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64)
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -279,20 +279,10 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks

tikvLockResolverCountWithResolve.Inc()

var expiredLocks []*Lock
for _, l := range locks {
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL)
if msBeforeLockExpired <= 0 {
expiredLocks = append(expiredLocks, l)
} else {
msBeforeTxnExpired.update(int64(l.TTL))
tikvLockResolverCountWithNotExpired.Inc()
}
}
// TxnID -> []Region, record resolved Regions.
// TODO: Maybe put it in LockResolver and share by all txns.
cleanTxns := make(map[uint64]map[RegionVerID]struct{})
for _, l := range expiredLocks {
for _, l := range locks {
status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS)
if err != nil {
msBeforeTxnExpired.update(0)
Expand Down Expand Up @@ -368,40 +358,76 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary
if err != nil {
return status, err
}
return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS)
return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true)
MyonKeminta marked this conversation as resolved.
Show resolved Hide resolved
}

func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) {
var currentTS uint64
var err error
var status TxnStatus
if l.TTL == 0 {
// NOTE: l.TTL = 0 is a special protocol!!!
// When the pessimistic txn prewrite meets locks of a txn, it should resolve the lock **unconditionally**.
// In this case, TiKV use lock TTL = 0 to notify TiDB, and TiDB should resolve the lock!
// Set currentTS to max uint64 to make the lock expired.
currentTS = math.MaxUint64
} else {
var err error
currentTS, err = lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx)
if err != nil {
return TxnStatus{}, err
}
}
return lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS)

rollbackIfNotExist := false
for {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a for loop in getTxnStatus why not handle retry in getTxnStatus directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make the unit test easier.

status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist)
if err == nil {
return status, err
}
if _, ok := errors.Cause(err).(txnNotFoundErr); !ok {
return status, err
}

// Handle txnNotFound error.
time.Sleep(5 * time.Millisecond)
if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) <= 0 {
rollbackIfNotExist = true
}
}
}

type txnNotFoundErr struct {
*kvrpcpb.TxnNotFound
}

func (e txnNotFoundErr) Error() string {
return e.TxnNotFound.String()
}

func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64) (TxnStatus, error) {
// If nonBlockRead is true, the caller should handle the txnNotFoundErr.
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (TxnStatus, error) {
if s, ok := lr.getResolved(txnID); ok {
return s, nil
}

tikvLockResolverCountWithQueryTxnStatus.Inc()

// CheckTxnStatus would meet the following cases:
// 1. LOCK
// 1.1 Lock expired -- orphan lock, fail to update TTL, crash recovery etc.
// 1.2 Lock TTL -- active transaction holding the lock.
// 2. NO LOCK
// 2.1 Txn Committed
// 2.2 Txn Rollbacked -- rollback itself, rollback by others, GC tomb etc.
// 2.3 No lock -- pessimistic lock rollback, concurrence prewrite.

var status TxnStatus
req := tikvrpc.NewRequest(tikvrpc.CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{
PrimaryKey: primary,
LockTs: txnID,
CallerStartTs: callerStartTS,
CurrentTs: currentTS,
PrimaryKey: primary,
LockTs: txnID,
CallerStartTs: callerStartTS,
CurrentTs: currentTS,
RollbackIfNotExist: rollbackIfNotExist,
})
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, primary)
Expand All @@ -428,6 +454,11 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
}
cmdResp := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse)
if keyErr := cmdResp.GetError(); keyErr != nil {
txnNotFound := keyErr.GetTxnNotFound()
if txnNotFound != nil {
return status, txnNotFoundErr{txnNotFound}
}

err = errors.Errorf("unexpected err: %s, tid: %v", keyErr, txnID)
logutil.BgLogger().Error("getTxnStatus error", zap.Error(err))
return status, err
Expand Down
Loading