-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: implement the CheckTxnStatus
API for the large transaction
#11974
Changes from all commits
b1c67f0
52c1d29
0768035
67b4801
b394023
53c065d
7ba3db5
d0f064c
b9e9dc6
f867a41
eb2a38d
83d6e21
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -165,6 +165,7 @@ func (s *testMockTiKVSuite) mustPrewriteWithTTLOK(c *C, mutations []*kvrpcpb.Mut | |
Mutations: mutations, | ||
PrimaryLock: []byte(primary), | ||
StartVersion: startTS, | ||
LockTtl: ttl, | ||
} | ||
errs := s.store.Prewrite(req) | ||
for _, err := range errs { | ||
|
@@ -608,11 +609,12 @@ func (s *testMockTiKVSuite) TestRC(c *C) { | |
|
||
func (s testMarshal) TestMarshalmvccLock(c *C) { | ||
l := mvccLock{ | ||
startTS: 47, | ||
primary: []byte{'a', 'b', 'c'}, | ||
value: []byte{'d', 'e'}, | ||
op: kvrpcpb.Op_Put, | ||
ttl: 444, | ||
startTS: 47, | ||
primary: []byte{'a', 'b', 'c'}, | ||
value: []byte{'d', 'e'}, | ||
op: kvrpcpb.Op_Put, | ||
ttl: 444, | ||
minCommitTS: 666, | ||
} | ||
bin, err := l.MarshalBinary() | ||
c.Assert(err, IsNil) | ||
|
@@ -626,6 +628,7 @@ func (s testMarshal) TestMarshalmvccLock(c *C) { | |
c.Assert(l.ttl, Equals, l1.ttl) | ||
c.Assert(string(l.primary), Equals, string(l1.primary)) | ||
c.Assert(string(l.value), Equals, string(l1.value)) | ||
c.Assert(l.minCommitTS, Equals, l1.minCommitTS) | ||
} | ||
|
||
func (s testMarshal) TestMarshalmvccValue(c *C) { | ||
|
@@ -655,6 +658,30 @@ func (s *testMVCCLevelDB) TestErrors(c *C) { | |
c.Assert((&ErrConflict{}).Error(), Equals, "write conflict") | ||
} | ||
|
||
func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) { | ||
s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", 5, 666) | ||
|
||
ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666) | ||
c.Assert(err, IsNil) | ||
c.Assert(ttl, Equals, uint64(666)) | ||
c.Assert(commitTS, Equals, uint64(0)) | ||
|
||
s.mustCommitOK(c, [][]byte{[]byte("pk")}, 5, 30) | ||
|
||
ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666) | ||
c.Assert(err, IsNil) | ||
c.Assert(ttl, Equals, uint64(0)) | ||
c.Assert(commitTS, Equals, uint64(30)) | ||
|
||
s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", 5, 666) | ||
s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, 5) | ||
|
||
ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666) | ||
c.Assert(err, IsNil) | ||
c.Assert(ttl, Equals, uint64(0)) | ||
c.Assert(commitTS, Equals, uint64(0)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add test for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The minCommitTS is not returned in the CheckTxnStatus response, so we can't check it here. |
||
} | ||
|
||
func (s *testMVCCLevelDB) TestMvccGetByKey(c *C) { | ||
s.mustPrewriteOK(c, putMutations("q1", "v5"), "p1", 5) | ||
debugger, ok := s.store.(MVCCDebugger) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ import ( | |
"github.com/pingcap/goleveldb/leveldb/util" | ||
"github.com/pingcap/kvproto/pkg/kvrpcpb" | ||
"github.com/pingcap/parser/terror" | ||
"github.com/pingcap/tidb/store/tikv/oracle" | ||
"github.com/pingcap/tidb/util/codec" | ||
"github.com/pingcap/tidb/util/deadlock" | ||
"github.com/pingcap/tidb/util/logutil" | ||
|
@@ -589,6 +590,7 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error { | |
primary := req.PrimaryLock | ||
startTS := req.StartVersion | ||
ttl := req.LockTtl | ||
minCommitTS := req.MinCommitTs | ||
mvcc.mu.Lock() | ||
defer mvcc.mu.Unlock() | ||
|
||
|
@@ -616,7 +618,7 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error { | |
} | ||
} | ||
isPessimisticLock := len(req.IsPessimisticLock) > 0 && req.IsPessimisticLock[i] | ||
err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, txnSize, isPessimisticLock) | ||
err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, txnSize, isPessimisticLock, minCommitTS) | ||
errs = append(errs, err) | ||
if err != nil { | ||
anyError = true | ||
|
@@ -679,7 +681,10 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, startTS uint64) err | |
return nil | ||
} | ||
|
||
func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mutation, startTS uint64, primary []byte, ttl uint64, txnSize uint64, isPessimisticLock bool) error { | ||
func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, | ||
mutation *kvrpcpb.Mutation, startTS uint64, | ||
primary []byte, ttl uint64, txnSize uint64, | ||
isPessimisticLock bool, minCommitTS uint64) error { | ||
startKey := mvccEncode(mutation.Key, lockVer) | ||
iter := newIterator(db, &util.Range{ | ||
Start: startKey, | ||
|
@@ -723,6 +728,11 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu | |
ttl: ttl, | ||
txnSize: txnSize, | ||
} | ||
// Write minCommitTS on the primary lock. | ||
if bytes.Equal(primary, mutation.GetKey()) { | ||
lock.minCommitTS = minCommitTS | ||
} | ||
|
||
writeKey := mvccEncode(mutation.Key, lockVer) | ||
writeValue, err := lock.MarshalBinary() | ||
if err != nil { | ||
|
@@ -931,6 +941,102 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS uint64) error { | |
return mvcc.db.Write(batch, nil) | ||
} | ||
|
||
// CheckTxnStatus checks the primary lock of a transaction to decide its status. | ||
// The return values are (ttl, commitTS, err): | ||
// If the transaction is active, this function returns the ttl of the lock; | ||
// If the transaction is committed, this function returns the commitTS; | ||
// If the transaction is rollbacked, this function returns (0, 0, nil) | ||
// Note that CheckTxnStatus may also push forward the `minCommitTS` of the | ||
// transaction, so it's not simply a read-only operation. | ||
// | ||
// primaryKey + lockTS together could locate the primary lock. | ||
// callerStartTS is the start ts of reader transaction. | ||
// currentTS is the current ts, but it may be inaccurate. Just use it to check TTL. | ||
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64) (uint64, uint64, error) { | ||
mvcc.mu.Lock() | ||
defer mvcc.mu.Unlock() | ||
|
||
startKey := mvccEncode(primaryKey, lockVer) | ||
iter := newIterator(mvcc.db, &util.Range{ | ||
Start: startKey, | ||
}) | ||
defer iter.Release() | ||
|
||
if iter.Valid() { | ||
dec := lockDecoder{ | ||
expectKey: primaryKey, | ||
} | ||
ok, err := dec.Decode(iter) | ||
if err != nil { | ||
return 0, 0, errors.Trace(err) | ||
} | ||
// If current transaction's lock exists. | ||
if ok && dec.lock.startTS == lockTS { | ||
lock := dec.lock | ||
batch := &leveldb.Batch{} | ||
|
||
// If the lock has already outdated, clean up it. | ||
if uint64(oracle.ExtractPhysical(lock.startTS))+lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) { | ||
if err = rollbackLock(batch, lock, primaryKey, lockTS); err != nil { | ||
return 0, 0, errors.Trace(err) | ||
} | ||
if err = mvcc.db.Write(batch, nil); err != nil { | ||
return 0, 0, errors.Trace(err) | ||
} | ||
return 0, 0, nil | ||
} | ||
|
||
// If this is a large transaction and the lock is active, push forward the minCommitTS. | ||
// lock.minCommitTS == 0 may be a secondary lock, or not a large transaction. | ||
if lock.minCommitTS > 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does ther caller need to know if it did update the minCommitTS ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's unnecessary @cfzjywxk |
||
// We *must* guarantee the invariance lock.minCommitTS >= callerStartTS + 1 | ||
if lock.minCommitTS < callerStartTS+1 { | ||
lock.minCommitTS = callerStartTS + 1 | ||
tiancaiamao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Remove this condition should not affect correctness. | ||
// We do it because pushing forward minCommitTS as far as possible could avoid | ||
// the lock been pushed again several times, and thus reduce write operations. | ||
if lock.minCommitTS < currentTS { | ||
lock.minCommitTS = currentTS | ||
} | ||
|
||
writeKey := mvccEncode(primaryKey, lockVer) | ||
writeValue, err := lock.MarshalBinary() | ||
tiancaiamao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err != nil { | ||
return 0, 0, errors.Trace(err) | ||
} | ||
batch.Put(writeKey, writeValue) | ||
if err = mvcc.db.Write(batch, nil); err != nil { | ||
return 0, 0, errors.Trace(err) | ||
} | ||
} | ||
} | ||
|
||
return lock.ttl, 0, nil | ||
} | ||
|
||
// If current transaction's lock does not exist. | ||
// If the commit info of the current transaction exists. | ||
c, ok, err := getTxnCommitInfo(iter, primaryKey, lockTS) | ||
if err != nil { | ||
return 0, 0, errors.Trace(err) | ||
} | ||
if ok { | ||
// If current transaction is already committed. | ||
if c.valueType != typeRollback { | ||
return 0, c.commitTS, nil | ||
} | ||
// If current transaction is already rollback. | ||
return 0, 0, nil | ||
} | ||
} | ||
|
||
// If current transaction is not prewritted before, it may be pessimistic lock. | ||
// When pessimistic lock rollback, it may not leave a 'rollbacked' tombstone. | ||
logutil.BgLogger().Debug("CheckTxnStatus can't find the primary lock, pessimistic rollback?") | ||
return 0, 0, nil | ||
} | ||
|
||
// TxnHeartBeat implements the MVCCStore interface. | ||
func (mvcc *MVCCLevelDB) TxnHeartBeat(key []byte, startTS uint64, adviseTTL uint64) (uint64, error) { | ||
mvcc.mu.Lock() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -103,14 +103,17 @@ func NewLockResolver(etcdAddrs []string, security config.Security) (*LockResolve | |
return s.lockResolver, nil | ||
} | ||
|
||
// TxnStatus represents a txn's final status. It should be Commit or Rollback. | ||
type TxnStatus uint64 | ||
// TxnStatus represents a txn's final status. It should be Lock or Commit or Rollback. | ||
type TxnStatus struct { | ||
ttl uint64 | ||
commitTS uint64 | ||
} | ||
|
||
// IsCommitted returns true if the txn's final status is Commit. | ||
func (s TxnStatus) IsCommitted() bool { return s > 0 } | ||
func (s TxnStatus) IsCommitted() bool { return s.ttl == 0 && s.commitTS > 0 } | ||
|
||
// CommitTS returns the txn's commitTS. It is valid iff `IsCommitted` is true. | ||
func (s TxnStatus) CommitTS() uint64 { return uint64(s) } | ||
func (s TxnStatus) CommitTS() uint64 { return uint64(s.commitTS) } | ||
|
||
// By default, locks after 3000ms is considered unusual (the client created the | ||
// lock might be dead). Other client may cleanup this kind of lock. | ||
|
@@ -206,7 +209,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi | |
if err != nil { | ||
return false, errors.Trace(err) | ||
} | ||
txnInfos[l.TxnID] = uint64(status) | ||
txnInfos[l.TxnID] = uint64(status.commitTS) | ||
} | ||
logutil.BgLogger().Info("BatchResolveLocks: lookup txn status", | ||
zap.Duration("cost time", time.Since(startTime)), | ||
|
@@ -370,7 +373,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte | |
return status, err | ||
} | ||
if cmdResp.CommitVersion != 0 { | ||
status = TxnStatus(cmdResp.GetCommitVersion()) | ||
status = TxnStatus{0, cmdResp.GetCommitVersion()} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the |
||
tikvLockResolverCountWithQueryTxnStatusCommitted.Inc() | ||
} else { | ||
tikvLockResolverCountWithQueryTxnStatusRolledBack.Inc() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need check in the below