-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
store: upgrade the CheckTxnStatus API #13123
Changes from 6 commits
6181920
576c051
c0285ba
e7aad0d
8da07bc
8106b33
386ca77
bba8d04
fbc600f
5fb661a
bbd42e3
e77e17b
64721b2
c01bc0a
d4e192a
0f3ae0f
913e29d
1aa8f9b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -204,7 +204,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi | |
tikvLockResolverCountWithExpired.Inc() | ||
|
||
// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not! | ||
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64) | ||
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true) | ||
if err != nil { | ||
return false, err | ||
} | ||
|
@@ -279,20 +279,10 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks | |
|
||
tikvLockResolverCountWithResolve.Inc() | ||
|
||
var expiredLocks []*Lock | ||
for _, l := range locks { | ||
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) | ||
if msBeforeLockExpired <= 0 { | ||
expiredLocks = append(expiredLocks, l) | ||
} else { | ||
msBeforeTxnExpired.update(int64(l.TTL)) | ||
tikvLockResolverCountWithNotExpired.Inc() | ||
} | ||
} | ||
// TxnID -> []Region, record resolved Regions. | ||
// TODO: Maybe put it in LockResolver and share by all txns. | ||
cleanTxns := make(map[uint64]map[RegionVerID]struct{}) | ||
for _, l := range expiredLocks { | ||
for _, l := range locks { | ||
status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS) | ||
if err != nil { | ||
msBeforeTxnExpired.update(0) | ||
|
@@ -368,40 +358,77 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary | |
if err != nil { | ||
return status, err | ||
} | ||
return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS) | ||
return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true) | ||
MyonKeminta marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) { | ||
var currentTS uint64 | ||
var err error | ||
var status TxnStatus | ||
if l.TTL == 0 { | ||
// NOTE: l.TTL = 0 is a special protocol!!! | ||
// When the pessimistic txn prewrite meets locks of a txn, it should resolve the lock **unconditionally**. | ||
// In this case, TiKV use lock TTL = 0 to notify TiDB, and TiDB should resolve the lock! | ||
// Set currentTS to max uint64 to make the lock expired. | ||
currentTS = math.MaxUint64 | ||
} else { | ||
var err error | ||
currentTS, err = lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) | ||
if err != nil { | ||
return TxnStatus{}, err | ||
} | ||
} | ||
return lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS) | ||
|
||
rollbackIfNotExist := false | ||
for { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is already a for loop in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To make the unit test easier. |
||
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist) | ||
if err == nil { | ||
return status, err | ||
} | ||
if _, ok := errors.Cause(err).(txnNotFoundErr); !ok { | ||
return status, err | ||
} | ||
|
||
// Handle txnNotFound error. | ||
// If the (secondary) lock TTL has expired, return rollbacked status. | ||
tiancaiamao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
time.Sleep(5 * time.Millisecond) | ||
if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) <= 0 { | ||
rollbackIfNotExist = true | ||
} | ||
} | ||
} | ||
|
||
type txnNotFoundErr struct { | ||
*kvrpcpb.TxnNotFound | ||
} | ||
|
||
func (e txnNotFoundErr) Error() string { | ||
return e.TxnNotFound.String() | ||
} | ||
|
||
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64) (TxnStatus, error) { | ||
// If nonBlockRead is true, the caller should handle the txnNotFoundErr. | ||
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (TxnStatus, error) { | ||
if s, ok := lr.getResolved(txnID); ok { | ||
return s, nil | ||
} | ||
|
||
tikvLockResolverCountWithQueryTxnStatus.Inc() | ||
|
||
// CheckTxnStatus would meet the following cases: | ||
// 1. LOCK | ||
// 1.1 Lock expired -- orphan lock, fail to update TTL, crash recovery etc. | ||
// 1.2 Lock TTL -- active transaction holding the lock. | ||
// 2. NO LOCK | ||
// 2.1 Txn Committed | ||
// 2.2 Txn Rollbacked -- rollback itself, rollback by others, GC tomb etc. | ||
// 2.3 No lock -- pessimistic lock rollback, concurrence prewrite. | ||
|
||
var status TxnStatus | ||
req := tikvrpc.NewRequest(tikvrpc.CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{ | ||
PrimaryKey: primary, | ||
LockTs: txnID, | ||
CallerStartTs: callerStartTS, | ||
CurrentTs: currentTS, | ||
PrimaryKey: primary, | ||
LockTs: txnID, | ||
CallerStartTs: callerStartTS, | ||
CurrentTs: currentTS, | ||
RollbackIfNotExist: rollbackIfNotExist, | ||
}) | ||
for { | ||
loc, err := lr.store.GetRegionCache().LocateKey(bo, primary) | ||
|
@@ -428,6 +455,11 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte | |
} | ||
cmdResp := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse) | ||
if keyErr := cmdResp.GetError(); keyErr != nil { | ||
txnNotFound := keyErr.GetTxnNotFound() | ||
if txnNotFound != nil { | ||
return status, txnNotFoundErr{txnNotFound} | ||
} | ||
|
||
err = errors.Errorf("unexpected err: %s, tid: %v", keyErr, txnID) | ||
logutil.BgLogger().Error("getTxnStatus error", zap.Error(err)) | ||
return status, err | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ import ( | |
"time" | ||
|
||
. "github.com/pingcap/check" | ||
"github.com/pingcap/errors" | ||
"github.com/pingcap/kvproto/pkg/kvrpcpb" | ||
"github.com/pingcap/tidb/kv" | ||
"github.com/pingcap/tidb/store/tikv/tikvrpc" | ||
|
@@ -207,7 +208,12 @@ func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) { | |
txn, err := s.store.Begin() | ||
c.Assert(err, IsNil) | ||
txn.Set(kv.Key("key"), []byte("value")) | ||
s.prewriteTxn(c, txn.(*tikvTxn)) | ||
committer, err := newTwoPhaseCommitterWithInit(txn.(*tikvTxn), 0) | ||
c.Assert(err, IsNil) | ||
// Increase lock TTL to make CI more stable. | ||
committer.lockTTL = txnLockTTL(txn.(*tikvTxn).startTime, 200*1024*1024) | ||
err = committer.prewriteKeys(NewBackoffer(context.Background(), PrewriteMaxBackoff), committer.keys) | ||
c.Assert(err, IsNil) | ||
|
||
bo := NewBackoffer(context.Background(), PrewriteMaxBackoff) | ||
lr := newLockResolver(s.store) | ||
|
@@ -286,7 +292,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { | |
bo := NewBackoffer(context.Background(), PrewriteMaxBackoff) | ||
resolver := newLockResolver(s.store) | ||
// Call getTxnStatus to check the lock status. | ||
status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) | ||
status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true) | ||
c.Assert(err, IsNil) | ||
c.Assert(status.IsCommitted(), IsFalse) | ||
c.Assert(status.ttl, Greater, uint64(0)) | ||
|
@@ -307,19 +313,79 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { | |
// Then call getTxnStatus again and check the lock status. | ||
currentTS, err = oracle.GetTimestamp(context.Background()) | ||
c.Assert(err, IsNil) | ||
status, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) | ||
status, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true) | ||
c.Assert(err, IsNil) | ||
c.Assert(status.ttl, Equals, uint64(0)) | ||
c.Assert(status.commitTS, Equals, uint64(0)) | ||
|
||
// Call getTxnStatus on a committed transaction. | ||
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) | ||
status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS) | ||
status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, true) | ||
c.Assert(err, IsNil) | ||
c.Assert(status.ttl, Equals, uint64(0)) | ||
c.Assert(status.commitTS, Equals, commitTS) | ||
} | ||
|
||
func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) { | ||
txn, err := s.store.Begin() | ||
c.Assert(err, IsNil) | ||
txn.Set(kv.Key("key"), []byte("value")) | ||
txn.Set(kv.Key("second"), []byte("xxx")) | ||
committer, err := newTwoPhaseCommitterWithInit(txn.(*tikvTxn), 0) | ||
c.Assert(err, IsNil) | ||
// Increase lock TTL to make CI more stable. | ||
committer.lockTTL = txnLockTTL(txn.(*tikvTxn).startTime, 200*1024*1024) | ||
|
||
// Only prewrite the secondary key to simulate a concurrent prewrite case: | ||
// prewrite secondary regions success and prewrite the primary region is pending. | ||
err = committer.prewriteKeys(NewBackoffer(context.Background(), PrewriteMaxBackoff), [][]byte{[]byte("second")}) | ||
c.Assert(err, IsNil) | ||
|
||
oracle := s.store.GetOracle() | ||
currentTS, err := oracle.GetTimestamp(context.Background()) | ||
c.Assert(err, IsNil) | ||
bo := NewBackoffer(context.Background(), PrewriteMaxBackoff) | ||
resolver := newLockResolver(s.store) | ||
|
||
// Call getTxnStatus for the TxnNotFound case. | ||
_, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false) | ||
c.Assert(err, NotNil) | ||
_, ok := errors.Cause(err).(txnNotFoundErr) | ||
c.Assert(ok, IsTrue) | ||
|
||
errCh := make(chan error) | ||
go func() { | ||
errCh <- committer.prewriteKeys(bo, [][]byte{[]byte("key")}) | ||
}() | ||
|
||
lock := &Lock{ | ||
Key: []byte("second"), | ||
Primary: []byte("key"), | ||
TxnID: txn.StartTS(), | ||
TTL: 100000, | ||
} | ||
// Call getTxnStatusFromLock to cover the retry logic. | ||
status, err := resolver.getTxnStatusFromLock(bo, lock, currentTS) | ||
c.Assert(err, IsNil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At this time, the lock may still not written, we can only be sure it is written after There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The lock may still not written, and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or we add some sleep before There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
So as to make the CI slower? |
||
c.Assert(status.ttl, Greater, uint64(0)) | ||
c.Assert(<-errCh, IsNil) | ||
c.Assert(committer.cleanupKeys(bo, committer.keys), IsNil) | ||
|
||
// Call getTxnStatusFromLock to cover TxnNotFound and retry timeout. | ||
startTS, err := oracle.GetTimestamp(context.Background()) | ||
c.Assert(err, IsNil) | ||
lock = &Lock{ | ||
Key: []byte("second"), | ||
Primary: []byte("key"), | ||
TxnID: startTS, | ||
TTL: 1000, | ||
} | ||
status, err = resolver.getTxnStatusFromLock(bo, lock, currentTS) | ||
c.Assert(err, IsNil) | ||
c.Assert(status.ttl, Equals, uint64(0)) | ||
c.Assert(status.commitTS, Equals, uint64(0)) | ||
} | ||
|
||
func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) { | ||
committer, err := newTwoPhaseCommitterWithInit(txn, 0) | ||
c.Assert(err, IsNil) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need to cover the case where the last parameter is
true
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done