Skip to content

Commit

Permalink
txn: avoid the gc resolving pessimistic locks of ongoing transactions (
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk committed May 14, 2021
1 parent 18cbfaa commit e628bad
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 3 deletions.
13 changes: 11 additions & 2 deletions store/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,12 +409,21 @@ func (w *GCWorker) calcSafePointByMinStartTS(ctx context.Context, safePoint uint
return safePoint
}

if globalMinStartTS < safePoint {
// If the lock.ts <= max_ts(safePoint), it will be collected and resolved by the gc worker,
// the locks of ongoing pessimistic transactions could be resolved by the gc worker and then
// the transaction is aborted, decrement the value by 1 to avoid this.
globalMinStartAllowedTS := globalMinStartTS
if globalMinStartTS > 0 {
globalMinStartAllowedTS = globalMinStartTS - 1
}

if globalMinStartAllowedTS < safePoint {
logutil.Logger(ctx).Info("[gc worker] gc safepoint blocked by a running session",
zap.String("uuid", w.uuid),
zap.Uint64("globalMinStartTS", globalMinStartTS),
zap.Uint64("globalMinStartAllowedTS", globalMinStartAllowedTS),
zap.Uint64("safePoint", safePoint))
safePoint = globalMinStartTS
safePoint = globalMinStartAllowedTS
}
return safePoint
}
Expand Down
51 changes: 50 additions & 1 deletion store/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (s *testGCWorkerSuite) TestMinStartTS(c *C) {
strconv.FormatUint(now-oracle.EncodeTSO(20000), 10))
c.Assert(err, IsNil)
sp = s.gcWorker.calcSafePointByMinStartTS(ctx, now-oracle.EncodeTSO(10000))
c.Assert(sp, Equals, now-oracle.EncodeTSO(20000))
c.Assert(sp, Equals, now-oracle.EncodeTSO(20000)-1)
}

func (s *testGCWorkerSuite) TestPrepareGC(c *C) {
Expand Down Expand Up @@ -1589,3 +1589,52 @@ func (s *testGCWorkerSuite) TestGCPlacementRules(c *C) {
c.Assert(pid, Equals, int64(1))
c.Assert(err, IsNil)
}

func (s *testGCWorkerSuite) TestGCWithPendingTxn(c *C) {
ctx := context.Background()
gcSafePointCacheInterval = 0
err := s.gcWorker.saveValueToSysTable(gcEnableKey, booleanFalse)
c.Assert(err, IsNil)

k1 := []byte("tk1")
v1 := []byte("v1")
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.SetOption(kv.Pessimistic, true)
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}

// Lock the key.
err = txn.Set(k1, v1)
c.Assert(err, IsNil)
err = txn.LockKeys(ctx, lockCtx, k1)
c.Assert(err, IsNil)

// Prepare to run gc with txn's startTS as the safepoint ts.
spkv := s.tikvStore.GetSafePointKV()
err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), strconv.FormatUint(txn.StartTS(), 10))
c.Assert(err, IsNil)
s.mustSetTiDBServiceSafePoint(c, txn.StartTS(), txn.StartTS())
veryLong := gcDefaultLifeTime * 100
err = s.gcWorker.saveTime(gcLastRunTimeKey, oracle.GetTimeFromTS(s.mustAllocTs(c)).Add(-veryLong))
c.Assert(err, IsNil)
s.gcWorker.lastFinish = time.Now().Add(-veryLong)
s.oracle.AddOffset(time.Minute * 10)
err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanTrue)
c.Assert(err, IsNil)

// Trigger the tick let the gc job start.
err = s.gcWorker.leaderTick(ctx)
c.Assert(err, IsNil)
// Wait for GC finish
select {
case err = <-s.gcWorker.done:
s.gcWorker.gcIsRunning = false
break
case <-time.After(time.Second * 10):
err = errors.New("receive from s.gcWorker.done timeout")
}
c.Assert(err, IsNil)

err = txn.Commit(ctx)
c.Assert(err, IsNil)
}

0 comments on commit e628bad

Please sign in to comment.