Skip to content

Commit

Permalink
update kvproto
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Nov 18, 2019
1 parent 5a47660 commit fe11fb9
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 31 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20191113115126-45e0702fff1e
github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191112053614-3b43b46331d5
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
Expand Down
7 changes: 6 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,13 @@ github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191113115126-45e0702fff1e h1:TSRdaTUS0soSRxj/0z+LrqcDRuObQLHfea6ZdUYLw9g=
github.com/pingcap/kvproto v0.0.0-20191113075618-7ce83b774d70 h1:l9VcGUPRHvmM7mkFHo4JqxZeCvioRuL1/4tFUQcs6jQ=
github.com/pingcap/kvproto v0.0.0-20191113075618-7ce83b774d70/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191113115126-45e0702fff1e/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191118030148-ec389ef1b41f h1:CJ1IdT7bPbIvyq2Of9VhC/fhEGh6+0ItdT1dPBv7x7I=
github.com/pingcap/kvproto v0.0.0-20191118030148-ec389ef1b41f/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0 h1:CHOC95Ct4abJ9EdmWqzpUxV+bgjB4lOxd3AFxqgoyzQ=
github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
20 changes: 10 additions & 10 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,35 +664,35 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
startTS := uint64(5 << 18)
s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", startTS, 666)

ttl, commitTS, _, err := s.store.CheckTxnStatus([]byte("pk"), startTS, 0, 666, false)
ttl, commitTS, action, err := s.store.CheckTxnStatus([]byte("pk"), startTS, startTS+100, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(666))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(action, Equals, kvrpcpb.Action_MinCommitTSPushed)

s.mustCommitOK(c, [][]byte{[]byte("pk")}, startTS, startTS+2)
s.mustCommitOK(c, [][]byte{[]byte("pk")}, startTS, startTS+101)

ttl, commitTS, _, err = s.store.CheckTxnStatus([]byte("pk"), startTS, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(startTS+2))
c.Assert(commitTS, Equals, uint64(startTS+101))

s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", startTS, 666)
s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, startTS)

var reason kvrpcpb.RollbackReason
ttl, commitTS, reason, err = s.store.CheckTxnStatus([]byte("pk1"), startTS, 0, 666, false)
ttl, commitTS, action, err = s.store.CheckTxnStatus([]byte("pk1"), startTS, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(reason, Equals, kvrpcpb.RollbackReason_NoReason)
c.Assert(action, Equals, kvrpcpb.Action_NoAction)

s.mustPrewriteWithTTLOK(c, putMutations("pk2", "val"), "pk2", startTS, 666)
currentTS := uint64(777 << 18)
ttl, commitTS, reason, err = s.store.CheckTxnStatus([]byte("pk2"), startTS, 0, currentTS, false)
ttl, commitTS, action, err = s.store.CheckTxnStatus([]byte("pk2"), startTS, 0, currentTS, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(reason, Equals, kvrpcpb.RollbackReason_TTLExpire)
c.Assert(action, Equals, kvrpcpb.Action_TTLExpireRollback)

// Cover the TxnNotFound case.
_, _, _, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, false)
Expand All @@ -702,11 +702,11 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
c.Assert(notFound.StartTs, Equals, uint64(5))
c.Assert(string(notFound.PrimaryKey), Equals, "txnNotFound")

ttl, commitTS, reason, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, true)
ttl, commitTS, action, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, true)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(reason, Equals, kvrpcpb.RollbackReason_LockNotExist)
c.Assert(action, Equals, kvrpcpb.Action_LockNotExistRollback)

// Check the rollback tombstone blocks this prewrite which comes with a smaller startTS.
req := &kvrpcpb.PrewriteRequest{
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ type MVCCStore interface {
BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error
GC(startKey, endKey []byte, safePoint uint64) error
DeleteRange(startKey, endKey []byte) error
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (uint64, uint64, kvrpcpb.RollbackReason, error)
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (uint64, uint64, kvrpcpb.Action, error)
Close() error
}

Expand Down
17 changes: 9 additions & 8 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,11 +1033,11 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
// callerStartTS is the start ts of reader transaction.
// currentTS is the current ts, but it may be inaccurate. Just use it to check TTL.
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64,
rollbackIfNotExist bool) (ttl uint64, commitTS uint64, rollbackReason kvrpcpb.RollbackReason, err error) {
rollbackIfNotExist bool) (ttl uint64, commitTS uint64, action kvrpcpb.Action, err error) {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

rollbackReason = kvrpcpb.RollbackReason_NoReason
action = kvrpcpb.Action_NoAction

startKey := mvccEncode(primaryKey, lockVer)
iter := newIterator(mvcc.db, &util.Range{
Expand Down Expand Up @@ -1070,7 +1070,7 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
err = errors.Trace(err)
return
}
return 0, 0, kvrpcpb.RollbackReason_TTLExpire, nil
return 0, 0, kvrpcpb.Action_TTLExpireRollback, nil
}

// If this is a large transaction and the lock is active, push forward the minCommitTS.
Expand All @@ -1079,6 +1079,7 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
// We *must* guarantee the invariance lock.minCommitTS >= callerStartTS + 1
if lock.minCommitTS < callerStartTS+1 {
lock.minCommitTS = callerStartTS + 1
action = kvrpcpb.Action_MinCommitTSPushed

// Remove this condition should not affect correctness.
// We do it because pushing forward minCommitTS as far as possible could avoid
Expand All @@ -1101,7 +1102,7 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
}
}

return lock.ttl, 0, rollbackReason, nil
return lock.ttl, 0, action, nil
}

// If current transaction's lock does not exist.
Expand All @@ -1114,10 +1115,10 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
if ok {
// If current transaction is already committed.
if c.valueType != typeRollback {
return 0, c.commitTS, rollbackReason, nil
return 0, c.commitTS, action, nil
}
// If current transaction is already rollback.
return 0, 0, kvrpcpb.RollbackReason_NoReason, nil
return 0, 0, kvrpcpb.Action_NoAction, nil
}
}

Expand All @@ -1138,10 +1139,10 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
err = errors.Trace(err1)
return
}
return 0, 0, kvrpcpb.RollbackReason_LockNotExist, nil
return 0, 0, kvrpcpb.Action_LockNotExistRollback, nil
}

return 0, 0, rollbackReason, &ErrTxnNotFound{kvrpcpb.TxnNotFound{
return 0, 0, action, &ErrTxnNotFound{kvrpcpb.TxnNotFound{
StartTs: lockTS,
PrimaryKey: primaryKey,
}}
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (h *rpcHandler) handleKvCheckTxnStatus(req *kvrpcpb.CheckTxnStatusRequest)
if err != nil {
resp.Error = convertToKeyError(err)
} else {
resp.LockTtl, resp.CommitVersion, resp.RollbackReason = ttl, commitTS, rollbackReason
resp.LockTtl, resp.CommitVersion, resp.Action = ttl, commitTS, rollbackReason
}
return &resp
}
Expand Down
8 changes: 4 additions & 4 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ func NewLockResolver(etcdAddrs []string, security config.Security) (*LockResolve

// TxnStatus represents a txn's final status. It should be Lock or Commit or Rollback.
type TxnStatus struct {
ttl uint64
commitTS uint64
rollbackReason kvrpcpb.RollbackReason
ttl uint64
commitTS uint64
action kvrpcpb.Action
}

// IsCommitted returns true if the txn's final status is Commit.
Expand Down Expand Up @@ -487,7 +487,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
status.ttl = cmdResp.LockTtl
} else {
if cmdResp.CommitVersion == 0 {
status.rollbackReason = cmdResp.RollbackReason
status.action = cmdResp.Action
tikvLockResolverCountWithQueryTxnStatusRolledBack.Inc()
} else {
tikvLockResolverCountWithQueryTxnStatusCommitted.Inc()
Expand Down
10 changes: 5 additions & 5 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (s *testLockSuite) TestGetTxnStatus(c *C) {
status, err = s.store.lockResolver.GetTxnStatus(startTS, startTS, []byte("a"))
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsFalse)
c.Assert(status.ttl, Greater, uint64(0), Commentf("rollback reason:%s", status.rollbackReason))
c.Assert(status.ttl, Greater, uint64(0), Commentf("rollback reason:%s", status.action))
}

func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) {
Expand Down Expand Up @@ -234,7 +234,7 @@ func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) {
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))
c.Assert(status.rollbackReason, Equals, kvrpcpb.RollbackReason_NoReason)
c.Assert(status.action, Equals, kvrpcpb.Action_NoAction)

// Check a committed txn.
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
Expand Down Expand Up @@ -308,7 +308,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))
c.Assert(status.rollbackReason, Equals, kvrpcpb.RollbackReason_NoReason)
c.Assert(status.action, Equals, kvrpcpb.Action_NoAction)

// Call getTxnStatus on a committed transaction.
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
Expand Down Expand Up @@ -368,15 +368,15 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) {
c.Assert(err, IsNil)
lock = &Lock{
Key: []byte("second"),
Primary: []byte("key"),
Primary: []byte("key_not_exist"),
TxnID: startTS,
TTL: 1000,
}
status, err = resolver.getTxnStatusFromLock(bo, lock, currentTS)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))
c.Assert(status.rollbackReason, Equals, kvrpcpb.RollbackReason_LockNotExist)
c.Assert(status.action, Equals, kvrpcpb.Action_LockNotExistRollback)
}

func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) {
Expand Down

0 comments on commit fe11fb9

Please sign in to comment.