Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: upgrade the CheckTxnStatus API #13123

Merged
merged 18 commits into from
Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191031081038-bfb0c3adf567
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466 h1:C5nV9osqA+R/R2fxYxVfqAUlCi3Oo5yJ/JSKDeHSAOk=
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1 h1:J5oimSv+0emw5e/D1ZX/zh2WcMv0pOVT9QKruXfvJbg=
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
3 changes: 3 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2443,6 +2443,8 @@ func (s *testSessionSuite) TestKVVars(c *C) {
}
wg.Done()
}()

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockSleepBetween2PC", "return"), IsNil)
go func() {
for {
tk.MustExec("update kvvars set b = b + 1 where a = 1")
Expand All @@ -2453,6 +2455,7 @@ func (s *testSessionSuite) TestKVVars(c *C) {
wg.Done()
}()
wg.Wait()
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockSleepBetween2PC"), IsNil)
for {
tk2.MustQuery("select * from kvvars")
if atomic.LoadInt32(backOffWeightVal) != 0 {
Expand Down
9 changes: 9 additions & 0 deletions store/mockstore/mocktikv/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,12 @@ type ErrCommitTSExpired struct {
func (e *ErrCommitTSExpired) Error() string {
return "commit ts expired"
}

// ErrTxnNotFound is returned when the primary lock of the txn is not found.
type ErrTxnNotFound struct {
kvrpcpb.TxnNotFound
}

func (e *ErrTxnNotFound) Error() string {
return "txn not found"
}
8 changes: 4 additions & 4 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,22 +663,22 @@ func (s *testMVCCLevelDB) TestErrors(c *C) {
func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", 5, 666)

ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666)
ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to cover the case where the last parameter is true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(666))
c.Assert(commitTS, Equals, uint64(0))

s.mustCommitOK(c, [][]byte{[]byte("pk")}, 5, 30)

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666)
ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(30))

s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", 5, 666)
s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, 5)

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666)
ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))
Expand All @@ -687,7 +687,7 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
func (s *testMVCCLevelDB) TestRejectCommitTS(c *C) {
s.mustPrewriteOK(c, putMutations("x", "A"), "x", 5)
// Push the minCommitTS
_, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100)
_, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100, false)
c.Assert(err, IsNil)
err = s.store.Commit([][]byte{[]byte("x")}, 5, 10)
e, ok := errors.Cause(err).(*ErrCommitTSExpired)
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ type MVCCStore interface {
BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error
GC(startKey, endKey []byte, safePoint uint64) error
DeleteRange(startKey, endKey []byte) error
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64) (ttl, commitTS uint64, err error)
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (ttl, commitTS uint64, err error)
Close() error
}

Expand Down
38 changes: 28 additions & 10 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ func rollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS uint6
}
// If current transaction's lock exist.
if ok && dec.lock.startTS == startTS {
if err = rollbackLock(batch, dec.lock, key, startTS); err != nil {
if err = rollbackLock(batch, key, startTS); err != nil {
return errors.Trace(err)
}
return nil
Expand Down Expand Up @@ -919,7 +919,7 @@ func rollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS uint6
return nil
}

func rollbackLock(batch *leveldb.Batch, lock mvccLock, key []byte, startTS uint64) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parameter is not used in the function, so I remove it

func rollbackLock(batch *leveldb.Batch, key []byte, startTS uint64) error {
tomb := mvccValue{
valueType: typeRollback,
startTS: startTS,
Expand Down Expand Up @@ -980,7 +980,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
if ok && dec.lock.startTS == startTS {
// If the lock has already outdated, clean up it.
if currentTS == 0 || uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
if err = rollbackLock(batch, dec.lock, key, startTS); err != nil {
if err = rollbackLock(batch, key, startTS); err != nil {
return err
}
return mvcc.db.Write(batch, nil)
Expand Down Expand Up @@ -1032,7 +1032,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
// primaryKey + lockTS together could locate the primary lock.
// callerStartTS is the start ts of reader transaction.
// currentTS is the current ts, but it may be inaccurate. Just use it to check TTL.
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64) (uint64, uint64, error) {
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (uint64, uint64, error) {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

Expand All @@ -1057,7 +1057,7 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS

// If the lock has already outdated, clean up it.
if uint64(oracle.ExtractPhysical(lock.startTS))+lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
if err = rollbackLock(batch, lock, primaryKey, lockTS); err != nil {
if err = rollbackLock(batch, primaryKey, lockTS); err != nil {
return 0, 0, errors.Trace(err)
}
if err = mvcc.db.Write(batch, nil); err != nil {
Expand Down Expand Up @@ -1112,9 +1112,27 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
}

// If current transaction is not prewritted before, it may be pessimistic lock.
// When pessimistic lock rollback, it may not leave a 'rollbacked' tombstone.
logutil.BgLogger().Debug("CheckTxnStatus can't find the primary lock, pessimistic rollback?")
return 0, 0, nil
// When pessimistic txn rollback statement, it may not leave a 'rollbacked' tombstone.

// Or maybe caused by concurrent prewrite operation.
// Especially in the non-block reading case, the secondary lock is likely to be
// written before the primary lock.

if rollbackIfNotExist {
batch := &leveldb.Batch{}
if err := rollbackLock(batch, primaryKey, lockTS); err != nil {
return 0, 0, errors.Trace(err)
}
if err := mvcc.db.Write(batch, nil); err != nil {
return 0, 0, errors.Trace(err)
}
return 0, 0, nil
}

return 0, 0, &ErrTxnNotFound{kvrpcpb.TxnNotFound{
StartTs: lockTS,
PrimaryKey: primaryKey,
}}
}

// TxnHeartBeat implements the MVCCStore interface.
Expand Down Expand Up @@ -1220,7 +1238,7 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS
if commitTS > 0 {
err = commitLock(batch, dec.lock, currKey, startTS, commitTS)
} else {
err = rollbackLock(batch, dec.lock, currKey, startTS)
err = rollbackLock(batch, currKey, startTS)
}
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1260,7 +1278,7 @@ func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[
if commitTS > 0 {
err = commitLock(batch, dec.lock, currKey, dec.lock.startTS, commitTS)
} else {
err = rollbackLock(batch, dec.lock, currKey, dec.lock.startTS)
err = rollbackLock(batch, currKey, dec.lock.startTS)
}
if err != nil {
return errors.Trace(err)
Expand Down
7 changes: 6 additions & 1 deletion store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func convertToKeyError(err error) *kvrpcpb.KeyError {
CommitTsExpired: &expired.CommitTsExpired,
}
}
if tmp, ok := errors.Cause(err).(*ErrTxnNotFound); ok {
return &kvrpcpb.KeyError{
TxnNotFound: &tmp.TxnNotFound,
}
}
return &kvrpcpb.KeyError{
Abort: err.Error(),
}
Expand Down Expand Up @@ -382,7 +387,7 @@ func (h *rpcHandler) handleKvCheckTxnStatus(req *kvrpcpb.CheckTxnStatusRequest)
panic("KvCheckTxnStatus: key not in region")
}
var resp kvrpcpb.CheckTxnStatusResponse
ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs())
ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs(), req.GetRollbackIfNotExist())
if err != nil {
resp.Error = convertToKeyError(err)
} else {
Expand Down
6 changes: 6 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -1098,6 +1099,11 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
return err
}

failpoint.Inject("mockSleepBetween2PC", func() error {
time.Sleep(100 * time.Millisecond)
return nil
})

start = time.Now()
commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars)
err = c.commitKeys(commitBo, c.keys)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {

lr := newLockResolver(s.store)
bo := NewBackoffer(context.Background(), getMaxBackoff)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, true)
c.Assert(err, IsNil)
c.Assert(status.ttl, GreaterEqual, lockInfo.LockTtl)

Expand Down
7 changes: 6 additions & 1 deletion store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ const (
BoRegionMiss
BoUpdateLeader
boServerBusy
boTxnNotFound
)

func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int {
Expand All @@ -160,6 +161,8 @@ func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int
case BoRegionMiss:
// change base time to 2ms, because it may recover soon.
return NewBackoffFn(2, 500, NoJitter)
case boTxnNotFound:
return NewBackoffFn(2, 500, NoJitter)
case BoUpdateLeader:
return NewBackoffFn(1, 10, NoJitter)
case boServerBusy:
Expand All @@ -184,6 +187,8 @@ func (t backoffType) String() string {
return "updateLeader"
case boServerBusy:
return "serverBusy"
case boTxnNotFound:
return "txnNotFound"
}
return ""
}
Expand All @@ -192,7 +197,7 @@ func (t backoffType) TError() error {
switch t {
case boTiKVRPC:
return ErrTiKVServerTimeout
case BoTxnLock, boTxnLockFast:
case BoTxnLock, boTxnLockFast, boTxnNotFound:
return ErrResolveLockTimeout
case BoPDRPC:
return ErrPDServerTimeout
Expand Down
Loading