Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: update kvrpc.Cleanup proto and change its behaviour #12212

Merged
merged 15 commits into from
Sep 23, 2019
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190904075355-9a1bd6a31da2
github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pingcap/parser v0.0.0-20190912032624-978b8272c04e
github.com/pingcap/pd v0.0.0-20190712044914-75a1f9f3062b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190904075355-9a1bd6a31da2 h1:wBORZD4gvEKK0tGP4g1Rv0Y7f2cNnObzI/ckPhsU11M=
github.com/pingcap/kvproto v0.0.0-20190904075355-9a1bd6a31da2/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1 h1:DNvxkdcjA0TBIIIF+K2w9KMlTzMZzLZ5JVF26kTCPhg=
github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (s *testMockTiKVSuite) TestRollbackAndWriteConflict(c *C) {
s.mustPutOK(c, "test", "test2", 5, 8)

// simulate `getTxnStatus` for txn 2.
err := s.store.Cleanup([]byte("test"), 2)
err := s.store.Cleanup([]byte("test"), 2, math.MaxUint64)
c.Assert(err, IsNil)
req = &kvrpcpb.PrewriteRequest{
Mutations: putMutations("test", "test3"),
Expand Down Expand Up @@ -712,7 +712,7 @@ func (s *testMVCCLevelDB) TestTxnHeartBeat(c *C) {
c.Assert(ttl, Greater, uint64(300))

// The lock has already been clean up
c.Assert(s.store.Cleanup([]byte("pk"), 5), IsNil)
c.Assert(s.store.Cleanup([]byte("pk"), 5, math.MaxUint64), IsNil)
_, err = s.store.TxnHeartBeat([]byte("pk"), 5, 1000)
c.Assert(err, NotNil)
}
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ type MVCCStore interface {
Prewrite(req *kvrpcpb.PrewriteRequest) []error
Commit(keys [][]byte, startTS, commitTS uint64) error
Rollback(keys [][]byte, startTS uint64) error
Cleanup(key []byte, startTS uint64) error
Cleanup(key []byte, startTS, currentTS uint64) error
ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error)
TxnHeartBeat(primaryKey []byte, startTS uint64, adviseTTL uint64) (uint64, error)
ResolveLock(startKey, endKey []byte, startTS, commitTS uint64) error
Expand Down
66 changes: 63 additions & 3 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,12 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
}
if ok {
if dec.lock.startTS != startTS {
if isPessimisticLock {
// NOTE: A special handling.
// When pessimistic txn prewrite meets lock, set the TTL = 0 means
// telling TiDB to rollback the transaction **unconditionly**.
dec.lock.ttl = 0
}
return dec.lock.lockErr(mutation.Key)
}
if dec.lock.op != kvrpcpb.Op_PessimisticLock {
Expand Down Expand Up @@ -926,19 +932,73 @@ func getTxnCommitInfo(iter *Iterator, expectKey []byte, startTS uint64) (mvccVal
}

// Cleanup implements the MVCCStore interface.
func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS uint64) error {
// Cleanup API is deprecated, use CheckTxnStatus instead.
func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
mvcc.mu.Lock()
defer func() {
mvcc.mu.Unlock()
mvcc.deadlockDetector.CleanUp(startTS)
}()

batch := &leveldb.Batch{}
err := rollbackKey(mvcc.db, batch, key, startTS)
startKey := mvccEncode(key, lockVer)
iter := newIterator(mvcc.db, &util.Range{
Start: startKey,
})
defer iter.Release()

if iter.Valid() {
dec := lockDecoder{
expectKey: key,
}
ok, err := dec.Decode(iter)
if err != nil {
return err
}
// If current transaction's lock exists.
if ok && dec.lock.startTS == startTS {

// If the lock has already outdated, clean up it.
if currentTS == 0 || uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
if err = rollbackLock(batch, dec.lock, key, startTS); err != nil {
return err
}
return mvcc.db.Write(batch, nil)
}

// Otherwise, return a locked error with the TTL information.
return dec.lock.lockErr(key)
}

// If current transaction's lock does not exist.
// If the commit information of the current transaction exist.
c, ok, err := getTxnCommitInfo(iter, key, startTS)
if err != nil {
return errors.Trace(err)
}
if ok {
// If the current transaction has already committed.
if c.valueType != typeRollback {
return ErrAlreadyCommitted(c.commitTS)
}
// If the current transaction has already rollbacked.
return nil
}
}

// If current transaction is not prewritted before.
value := mvccValue{
valueType: typeRollback,
startTS: startTS,
commitTS: startTS,
}
writeKey := mvccEncode(key, startTS)
writeValue, err := value.MarshalBinary()
if err != nil {
return errors.Trace(err)
}
return mvcc.db.Write(batch, nil)
batch.Put(writeKey, writeValue)
return nil
}

// CheckTxnStatus checks the primary lock of a transaction to decide its status.
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (h *rpcHandler) handleKvCleanup(req *kvrpcpb.CleanupRequest) *kvrpcpb.Clean
panic("KvCleanup: key not in region")
}
var resp kvrpcpb.CleanupResponse
err := h.mvccStore.Cleanup(req.Key, req.GetStartVersion())
err := h.mvccStore.Cleanup(req.Key, req.GetStartVersion(), req.GetCurrentTs())
if err != nil {
if commitTS, ok := errors.Cause(err).(ErrAlreadyCommitted); ok {
resp.CommitVersion = uint64(commitTS)
Expand Down
125 changes: 92 additions & 33 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,16 @@ func (lr *LockResolver) getResolved(txnID uint64) (TxnStatus, bool) {
return s, ok
}

// BatchResolveLocks resolve locks in a batch
// BatchResolveLocks resolve locks in a batch.
// Used it in gcworker only!
func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc RegionVerID) (bool, error) {
if len(locks) == 0 {
return true, nil
}

tikvLockResolverCountWithBatchResolve.Inc()

var expiredLocks []*Lock
expiredLocks := make([]*Lock, 0, len(locks))
for _, l := range locks {
if lr.store.GetOracle().IsExpired(l.TxnID, l.TTL) {
tikvLockResolverCountWithExpired.Inc()
Expand All @@ -205,7 +206,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
continue
}

status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary)
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0)
if err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -266,9 +267,10 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
// commit status.
// 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnExpired int64, err error) {
func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (int64, error) {
var msBeforeTxnExpired txnExpireTime
if len(locks) == 0 {
return
return msBeforeTxnExpired.value(), nil
}

tikvLockResolverCountWithResolve.Inc()
Expand All @@ -277,61 +279,111 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnE
for _, l := range locks {
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL)
if msBeforeLockExpired <= 0 {
tikvLockResolverCountWithExpired.Inc()
expiredLocks = append(expiredLocks, l)
} else {
if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired {
msBeforeTxnExpired = msBeforeLockExpired
}
msBeforeTxnExpired.update(int64(l.TTL))
tikvLockResolverCountWithNotExpired.Inc()
}
}
if len(expiredLocks) == 0 {
if msBeforeTxnExpired > 0 {
tikvLockResolverCountWithWaitExpired.Inc()
}
return
}

// TxnID -> []Region, record resolved Regions.
// TODO: Maybe put it in LockResolver and share by all txns.
cleanTxns := make(map[uint64]map[RegionVerID]struct{})
for _, l := range expiredLocks {
var status TxnStatus
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary)
status, err := lr.getTxnStatusFromLock(bo, l)
if err != nil {
msBeforeTxnExpired = 0
msBeforeTxnExpired.update(0)
err = errors.Trace(err)
return
return msBeforeTxnExpired.value(), err
}

cleanRegions, exists := cleanTxns[l.TxnID]
if !exists {
cleanRegions = make(map[RegionVerID]struct{})
cleanTxns[l.TxnID] = cleanRegions
}
if status.ttl == 0 {
tikvLockResolverCountWithExpired.Inc()
// If the lock is committed or rollbacked, resolve lock.
cleanRegions, exists := cleanTxns[l.TxnID]
if !exists {
cleanRegions = make(map[RegionVerID]struct{})
cleanTxns[l.TxnID] = cleanRegions
}

err = lr.resolveLock(bo, l, status, cleanRegions)
if err != nil {
msBeforeTxnExpired = 0
err = errors.Trace(err)
return
err = lr.resolveLock(bo, l, status, cleanRegions)
if err != nil {
msBeforeTxnExpired.update(0)
err = errors.Trace(err)
return msBeforeTxnExpired.value(), err
}
} else {
tikvLockResolverCountWithNotExpired.Inc()
// If the lock is valid, the txn may be a pessimistic transaction.
// Update the txn expire time.
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl)
msBeforeTxnExpired.update(msBeforeLockExpired)
}
}

if msBeforeTxnExpired.value() > 0 {
tikvLockResolverCountWithWaitExpired.Inc()
}
return msBeforeTxnExpired.value(), nil
}

type txnExpireTime struct {
initialized bool
txnExpire int64
}

func (t *txnExpireTime) update(lockExpire int64) {
if lockExpire <= 0 {
lockExpire = 0
}
if !t.initialized {
t.txnExpire = lockExpire
t.initialized = true
return
}
if lockExpire < t.txnExpire {
t.txnExpire = lockExpire
}
return
}

func (t *txnExpireTime) value() int64 {
if !t.initialized {
return 0
}
return t.txnExpire
}

// GetTxnStatus queries tikv-server for a txn's status (commit/rollback).
// If the primary key is still locked, it will launch a Rollback to abort it.
// To avoid unnecessarily aborting too many txns, it is wiser to wait a few
// seconds before calling it after Prewrite.
func (lr *LockResolver) GetTxnStatus(txnID uint64, primary []byte) (TxnStatus, error) {
var status TxnStatus
bo := NewBackoffer(context.Background(), cleanupMaxBackoff)
status, err := lr.getTxnStatus(bo, txnID, primary)
return status, errors.Trace(err)
currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx)
if err != nil {
return status, err
}
return lr.getTxnStatus(bo, txnID, primary, currentTS)
}

func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock) (TxnStatus, error) {
// NOTE: l.TTL = 0 is a special protocol!!!
// When the pessimistic txn prewrite meets locks of a txn, it should rollback that txn **unconditionally**.
// In this case, TiKV set the lock TTL = 0, and TiDB use currentTS = 0 to call
// getTxnStatus, and getTxnStatus with currentTS = 0 would rollback the transaction.
if l.TTL == 0 {
return lr.getTxnStatus(bo, l.TxnID, l.Primary, 0)
}

currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx)
if err != nil {
return TxnStatus{}, err
}
return lr.getTxnStatus(bo, l.TxnID, l.Primary, currentTS)
}

func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte) (TxnStatus, error) {
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, currentTS uint64) (TxnStatus, error) {
if s, ok := lr.getResolved(txnID); ok {
return s, nil
}
Expand All @@ -342,6 +394,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
req := tikvrpc.NewRequest(tikvrpc.CmdCleanup, &kvrpcpb.CleanupRequest{
Key: primary,
StartVersion: txnID,
CurrentTs: currentTS,
})
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, primary)
Expand All @@ -368,6 +421,12 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
}
cmdResp := resp.Resp.(*kvrpcpb.CleanupResponse)
if keyErr := cmdResp.GetError(); keyErr != nil {
// If the TTL of the primary lock is not outdated, the proto returns a ErrLocked contains the TTL.
if lockInfo := keyErr.GetLocked(); lockInfo != nil {
status.ttl = lockInfo.LockTtl
status.commitTS = 0
return status, nil
}
err = errors.Errorf("unexpected cleanup err: %s, tid: %v", keyErr, txnID)
logutil.BgLogger().Error("getTxnStatus error", zap.Error(err))
return status, err
Expand Down
Loading