Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: update kvproto.CheckTxnStatus response #13432

Merged
merged 15 commits into from
Nov 18, 2019
Merged
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1
github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191112053614-3b43b46331d5
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
Expand Down
10 changes: 7 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,13 @@ github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1 h1:J5oimSv+0emw5e/D1ZX/zh2WcMv0pOVT9QKruXfvJbg=
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191113075618-7ce83b774d70 h1:l9VcGUPRHvmM7mkFHo4JqxZeCvioRuL1/4tFUQcs6jQ=
github.com/pingcap/kvproto v0.0.0-20191113075618-7ce83b774d70/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191113115126-45e0702fff1e/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191118030148-ec389ef1b41f h1:CJ1IdT7bPbIvyq2Of9VhC/fhEGh6+0ItdT1dPBv7x7I=
github.com/pingcap/kvproto v0.0.0-20191118030148-ec389ef1b41f/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0 h1:CHOC95Ct4abJ9EdmWqzpUxV+bgjB4lOxd3AFxqgoyzQ=
github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down Expand Up @@ -218,7 +223,6 @@ github.com/remyoudompheng/bigfft v0.0.0-20190512091148-babf20351dd7 h1:FUL3b97ZY
github.com/remyoudompheng/bigfft v0.0.0-20190512091148-babf20351dd7/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 h1:tB9NOR21++IjLyVx3/PCPhWMwqGNCMQEH96A6dMZ/gc=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil v2.18.10+incompatible h1:cy84jW6EVRPa5g9HAHrlbxMSIjBhDSX0OFYyMYminYs=
github.com/shirou/gopsutil v2.18.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
Expand Down
34 changes: 23 additions & 11 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,40 +661,52 @@ func (s *testMVCCLevelDB) TestErrors(c *C) {
}

func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", 5, 666)
startTS := uint64(5 << 18)
s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", startTS, 666)

ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false)
ttl, commitTS, action, err := s.store.CheckTxnStatus([]byte("pk"), startTS, startTS+100, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(666))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(action, Equals, kvrpcpb.Action_MinCommitTSPushed)

s.mustCommitOK(c, [][]byte{[]byte("pk")}, 5, 30)
s.mustCommitOK(c, [][]byte{[]byte("pk")}, startTS, startTS+101)

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false)
ttl, commitTS, _, err = s.store.CheckTxnStatus([]byte("pk"), startTS, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(30))
c.Assert(commitTS, Equals, uint64(startTS+101))

s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", startTS, 666)
s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, startTS)

s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", 5, 666)
s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, 5)
ttl, commitTS, action, err = s.store.CheckTxnStatus([]byte("pk1"), startTS, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(action, Equals, kvrpcpb.Action_NoAction)

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666, false)
s.mustPrewriteWithTTLOK(c, putMutations("pk2", "val"), "pk2", startTS, 666)
currentTS := uint64(777 << 18)
ttl, commitTS, action, err = s.store.CheckTxnStatus([]byte("pk2"), startTS, 0, currentTS, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(action, Equals, kvrpcpb.Action_TTLExpireRollback)

// Cover the TxnNotFound case.
_, _, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, false)
_, _, _, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, false)
c.Assert(err, NotNil)
notFound, ok := errors.Cause(err).(*ErrTxnNotFound)
c.Assert(ok, IsTrue)
c.Assert(notFound.StartTs, Equals, uint64(5))
c.Assert(string(notFound.PrimaryKey), Equals, "txnNotFound")

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, true)
ttl, commitTS, action, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, true)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(action, Equals, kvrpcpb.Action_LockNotExistRollback)

// Check the rollback tombstone blocks this prewrite which comes with a smaller startTS.
req := &kvrpcpb.PrewriteRequest{
Expand All @@ -710,7 +722,7 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
func (s *testMVCCLevelDB) TestRejectCommitTS(c *C) {
s.mustPrewriteOK(c, putMutations("x", "A"), "x", 5)
// Push the minCommitTS
_, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100, false)
_, _, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100, false)
c.Assert(err, IsNil)
err = s.store.Commit([][]byte{[]byte("x")}, 5, 10)
e, ok := errors.Cause(err).(*ErrCommitTSExpired)
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ type MVCCStore interface {
BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error
GC(startKey, endKey []byte, safePoint uint64) error
DeleteRange(startKey, endKey []byte) error
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (ttl, commitTS uint64, err error)
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (uint64, uint64, kvrpcpb.Action, error)
Close() error
}

Expand Down
59 changes: 36 additions & 23 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,10 +1032,13 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
// primaryKey + lockTS together could locate the primary lock.
// callerStartTS is the start ts of reader transaction.
// currentTS is the current ts, but it may be inaccurate. Just use it to check TTL.
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (uint64, uint64, error) {
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64,
rollbackIfNotExist bool) (ttl uint64, commitTS uint64, action kvrpcpb.Action, err error) {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

action = kvrpcpb.Action_NoAction

startKey := mvccEncode(primaryKey, lockVer)
iter := newIterator(mvcc.db, &util.Range{
Start: startKey,
Expand All @@ -1046,9 +1049,11 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
dec := lockDecoder{
expectKey: primaryKey,
}
ok, err := dec.Decode(iter)
var ok bool
ok, err = dec.Decode(iter)
if err != nil {
return 0, 0, errors.Trace(err)
err = errors.Trace(err)
return
}
// If current transaction's lock exists.
if ok && dec.lock.startTS == lockTS {
Expand All @@ -1058,12 +1063,14 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
// If the lock has already outdated, clean up it.
if uint64(oracle.ExtractPhysical(lock.startTS))+lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
if err = rollbackLock(batch, primaryKey, lockTS); err != nil {
return 0, 0, errors.Trace(err)
err = errors.Trace(err)
return
}
if err = mvcc.db.Write(batch, nil); err != nil {
return 0, 0, errors.Trace(err)
err = errors.Trace(err)
return
}
return 0, 0, nil
return 0, 0, kvrpcpb.Action_TTLExpireRollback, nil
}

// If this is a large transaction and the lock is active, push forward the minCommitTS.
Expand All @@ -1072,6 +1079,7 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
// We *must* guarantee the invariance lock.minCommitTS >= callerStartTS + 1
if lock.minCommitTS < callerStartTS+1 {
lock.minCommitTS = callerStartTS + 1
action = kvrpcpb.Action_MinCommitTSPushed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to set this action even if minCommitTS is already greater than callerStartTS

Copy link
Contributor Author

@tiancaiamao tiancaiamao Nov 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, get it!
That information is actually 'could the caller read ignore the lock', rather than 'minCommitTS pushed'


// Remove this condition should not affect correctness.
// We do it because pushing forward minCommitTS as far as possible could avoid
Expand All @@ -1081,33 +1089,36 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
}

writeKey := mvccEncode(primaryKey, lockVer)
writeValue, err := lock.MarshalBinary()
if err != nil {
return 0, 0, errors.Trace(err)
writeValue, err1 := lock.MarshalBinary()
if err1 != nil {
err = errors.Trace(err1)
return
}
batch.Put(writeKey, writeValue)
if err = mvcc.db.Write(batch, nil); err != nil {
return 0, 0, errors.Trace(err)
if err1 = mvcc.db.Write(batch, nil); err1 != nil {
err = errors.Trace(err1)
return
}
}
}

return lock.ttl, 0, nil
return lock.ttl, 0, action, nil
}

// If current transaction's lock does not exist.
// If the commit info of the current transaction exists.
c, ok, err := getTxnCommitInfo(iter, primaryKey, lockTS)
if err != nil {
return 0, 0, errors.Trace(err)
c, ok, err1 := getTxnCommitInfo(iter, primaryKey, lockTS)
if err1 != nil {
err = errors.Trace(err1)
return
}
if ok {
// If current transaction is already committed.
if c.valueType != typeRollback {
return 0, c.commitTS, nil
return 0, c.commitTS, action, nil
}
// If current transaction is already rollback.
return 0, 0, nil
return 0, 0, kvrpcpb.Action_NoAction, nil
}
}

Expand All @@ -1120,16 +1131,18 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS

if rollbackIfNotExist {
batch := &leveldb.Batch{}
if err := rollbackLock(batch, primaryKey, lockTS); err != nil {
return 0, 0, errors.Trace(err)
if err1 := rollbackLock(batch, primaryKey, lockTS); err1 != nil {
err = errors.Trace(err1)
return
}
if err := mvcc.db.Write(batch, nil); err != nil {
return 0, 0, errors.Trace(err)
if err1 := mvcc.db.Write(batch, nil); err1 != nil {
err = errors.Trace(err1)
return
}
return 0, 0, nil
return 0, 0, kvrpcpb.Action_LockNotExistRollback, nil
}

return 0, 0, &ErrTxnNotFound{kvrpcpb.TxnNotFound{
return 0, 0, action, &ErrTxnNotFound{kvrpcpb.TxnNotFound{
StartTs: lockTS,
PrimaryKey: primaryKey,
}}
Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,11 @@ func (h *rpcHandler) handleKvCheckTxnStatus(req *kvrpcpb.CheckTxnStatusRequest)
panic("KvCheckTxnStatus: key not in region")
}
var resp kvrpcpb.CheckTxnStatusResponse
ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs(), req.GetRollbackIfNotExist())
ttl, commitTS, rollbackReason, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs(), req.GetRollbackIfNotExist())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variable name should update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if err != nil {
resp.Error = convertToKeyError(err)
} else {
resp.LockTtl, resp.CommitVersion = ttl, commitTS
resp.LockTtl, resp.CommitVersion, resp.Action = ttl, commitTS, rollbackReason
}
return &resp
}
Expand Down
4 changes: 3 additions & 1 deletion store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func NewLockResolver(etcdAddrs []string, security config.Security) (*LockResolve
type TxnStatus struct {
ttl uint64
commitTS uint64
action kvrpcpb.Action
}
Comment on lines 109 to 113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this definition be moved to a better place in store directory, so those CheckTxnStatus functions can return this struct instead of returning its three fields in a long tuple?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would involve too many irrelevant changes in this PR


// IsCommitted returns true if the txn's final status is Commit.
Expand Down Expand Up @@ -397,7 +398,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart
}

if l.LockType == kvrpcpb.Op_PessimisticLock {
return TxnStatus{l.TTL, 0}, nil
return TxnStatus{ttl: l.TTL}, nil
}

// Handle txnNotFound error.
Expand Down Expand Up @@ -482,6 +483,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
logutil.BgLogger().Error("getTxnStatus error", zap.Error(err))
return status, err
}
status.action = cmdResp.Action
if cmdResp.LockTtl != 0 {
status.ttl = cmdResp.LockTtl
youjiali1995 marked this conversation as resolved.
Show resolved Hide resolved
} else {
Expand Down
10 changes: 7 additions & 3 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (s *testLockSuite) TestGetTxnStatus(c *C) {
status, err = s.store.lockResolver.GetTxnStatus(startTS, startTS, []byte("a"))
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsFalse)
c.Assert(status.ttl, Greater, uint64(0))
c.Assert(status.ttl, Greater, uint64(0), Commentf("rollback reason:%s", status.action))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log message need update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) {
Expand Down Expand Up @@ -234,6 +234,7 @@ func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) {
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))
c.Assert(status.action, Equals, kvrpcpb.Action_NoAction)

// Check a committed txn.
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
Expand Down Expand Up @@ -287,6 +288,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
c.Assert(status.IsCommitted(), IsFalse)
c.Assert(status.ttl, Greater, uint64(0))
c.Assert(status.CommitTS(), Equals, uint64(0))
c.Assert(status.action, kvrpcpb.Action_MinCommitTSPushed)

// Test the ResolveLocks API
lock := s.mustGetLock(c, []byte("second"))
Expand All @@ -303,10 +305,11 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
// Then call getTxnStatus again and check the lock status.
currentTS, err = oracle.GetTimestamp(context.Background())
c.Assert(err, IsNil)
status, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true)
status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))
c.Assert(status.action, Equals, kvrpcpb.Action_NoAction)

// Call getTxnStatus on a committed transaction.
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
Expand Down Expand Up @@ -366,14 +369,15 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) {
c.Assert(err, IsNil)
lock = &Lock{
Key: []byte("second"),
Primary: []byte("key"),
Primary: []byte("key_not_exist"),
TxnID: startTS,
TTL: 1000,
}
status, err = resolver.getTxnStatusFromLock(bo, lock, currentTS)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))
c.Assert(status.action, Equals, kvrpcpb.Action_LockNotExistRollback)
}

func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) {
Expand Down