diff --git a/store/mockoracle/oracle.go b/store/mockoracle/oracle.go index 07947cde6422f..aef34e2cfbfec 100644 --- a/store/mockoracle/oracle.go +++ b/store/mockoracle/oracle.go @@ -100,6 +100,13 @@ func (o *MockOracle) IsExpired(lockTimestamp uint64, TTL uint64) bool { return oracle.GetPhysical(time.Now().Add(o.offset)) >= oracle.ExtractPhysical(lockTimestamp)+int64(TTL) } +// UntilExpired implement oracle.Oracle interface. +func (o *MockOracle) UntilExpired(lockTimeStamp uint64, TTL uint64) int64 { + o.RLock() + defer o.RUnlock() + return oracle.ExtractPhysical(lockTimeStamp) + int64(TTL) - oracle.GetPhysical(time.Now().Add(o.offset)) +} + // Close implements oracle.Oracle interface. func (o *MockOracle) Close() { diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 70f191e947039..d86d21c85f45d 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -532,13 +532,13 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) locks = append(locks, lock) } start := time.Now() - ok, err := c.store.lockResolver.ResolveLocks(bo, locks) + msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, locks) if err != nil { return errors.Trace(err) } atomic.AddInt64(&c.detail.ResolveLockTime, int64(time.Since(start))) - if !ok { - err = bo.Backoff(BoTxnLock, errors.Errorf("2PC prewrite lockedKeys: %d", len(locks))) + if msBeforeExpired > 0 { + err = bo.BackoffWithMaxSleep(BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks))) if err != nil { return errors.Trace(err) } @@ -619,12 +619,12 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc } locks = append(locks, lock) } - ok, err := c.store.lockResolver.ResolveLocks(bo, locks) + msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, locks) if err != nil { return errors.Trace(err) } - if !ok { - err = bo.Backoff(BoTxnLock, errors.Errorf("2PC prewrite lockedKeys: %d", len(locks))) + if msBeforeExpired > 0 { + err = bo.BackoffWithMaxSleep(BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks))) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index ee24dc7af6c00..c34d5eb65643a 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -78,14 +78,14 @@ func (t backoffType) Counter() prometheus.Counter { // NewBackoffFn creates a backoff func which implements exponential backoff with // optional jitters. // See http://www.awsarchitectureblog.com/2015/03/backoff.html -func NewBackoffFn(base, cap, jitter int) func(ctx context.Context) int { +func NewBackoffFn(base, cap, jitter int) func(ctx context.Context, maxSleepMs int) int { if base < 2 { // Top prevent panic in 'rand.Intn'. base = 2 } attempts := 0 lastSleep := base - return func(ctx context.Context) int { + return func(ctx context.Context, maxSleepMs int) int { var sleep int switch jitter { case NoJitter: @@ -102,8 +102,14 @@ func NewBackoffFn(base, cap, jitter int) func(ctx context.Context) int { logutil.Logger(context.Background()).Debug("backoff", zap.Int("base", base), zap.Int("sleep", sleep)) + + realSleep := sleep + // when set maxSleepMs >= 0 in `tikv.BackoffWithMaxSleep` will force sleep maxSleepMs milliseconds. + if maxSleepMs >= 0 && realSleep > maxSleepMs { + realSleep = maxSleepMs + } select { - case <-time.After(time.Duration(sleep) * time.Millisecond): + case <-time.After(time.Duration(realSleep) * time.Millisecond): case <-ctx.Done(): } @@ -130,7 +136,7 @@ const ( boServerBusy ) -func (t backoffType) createFn(vars *kv.Variables) func(context.Context) int { +func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int { if vars.Hook != nil { vars.Hook(t.String(), vars) } @@ -217,7 +223,7 @@ var CommitMaxBackoff = 41000 type Backoffer struct { ctx context.Context - fn map[backoffType]func(context.Context) int + fn map[backoffType]func(context.Context, int) int maxSleep int totalSleep int errors []error @@ -253,6 +259,12 @@ func (b *Backoffer) WithVars(vars *kv.Variables) *Backoffer { // Backoff sleeps a while base on the backoffType and records the error message. // It returns a retryable error if total sleep time exceeds maxSleep. func (b *Backoffer) Backoff(typ backoffType, err error) error { + return b.BackoffWithMaxSleep(typ, -1, err) +} + +// BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message +// and never sleep more than maxSleepMs for each sleep. +func (b *Backoffer) BackoffWithMaxSleep(typ backoffType, maxSleepMs int, err error) error { if strings.Contains(err.Error(), mismatchClusterID) { logutil.Logger(context.Background()).Fatal("critical error", zap.Error(err)) } @@ -265,7 +277,7 @@ func (b *Backoffer) Backoff(typ backoffType, err error) error { typ.Counter().Inc() // Lazy initialize. if b.fn == nil { - b.fn = make(map[backoffType]func(context.Context) int) + b.fn = make(map[backoffType]func(context.Context, int) int) } f, ok := b.fn[typ] if !ok { @@ -273,7 +285,7 @@ func (b *Backoffer) Backoff(typ backoffType, err error) error { b.fn[typ] = f } - b.totalSleep += f(b.ctx) + b.totalSleep += f(b.ctx, maxSleepMs) b.types = append(b.types, typ) var startTs interface{} diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 0855b01552506..7b5b41f224248 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -778,12 +778,12 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon if lockErr := resp.pbResp.GetLocked(); lockErr != nil { logutil.Logger(context.Background()).Debug("coprocessor encounters", zap.Stringer("lock", lockErr)) - ok, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)}) + msBeforeExpired, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)}) if err1 != nil { return nil, errors.Trace(err1) } - if !ok { - if err := bo.Backoff(boTxnLockFast, errors.New(lockErr.String())); err != nil { + if msBeforeExpired > 0 { + if err := bo.BackoffWithMaxSleep(boTxnLockFast, int(msBeforeExpired), errors.New(lockErr.String())); err != nil { return nil, errors.Trace(err) } } diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 944026bcb13a0..59c4fc97bb23d 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -37,6 +37,7 @@ var ( tikvLockResolverCountWithBatchResolve = metrics.TiKVLockResolverCounter.WithLabelValues("batch_resolve") tikvLockResolverCountWithExpired = metrics.TiKVLockResolverCounter.WithLabelValues("expired") tikvLockResolverCountWithNotExpired = metrics.TiKVLockResolverCounter.WithLabelValues("not_expired") + tikvLockResolverCountWithWaitExpired = metrics.TiKVLockResolverCounter.WithLabelValues("wait_expired") tikvLockResolverCountWithResolve = metrics.TiKVLockResolverCounter.WithLabelValues("resolve") tikvLockResolverCountWithQueryTxnStatus = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status") tikvLockResolverCountWithQueryTxnStatusCommitted = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status_committed") @@ -265,47 +266,59 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi // commit status. // 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to // the same transaction. -func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (ok bool, err error) { +func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnExpired int64, err error) { if len(locks) == 0 { - return true, nil + return } tikvLockResolverCountWithResolve.Inc() var expiredLocks []*Lock for _, l := range locks { - if lr.store.GetOracle().IsExpired(l.TxnID, l.TTL) { + msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) + if msBeforeLockExpired <= 0 { tikvLockResolverCountWithExpired.Inc() expiredLocks = append(expiredLocks, l) } else { + if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired { + msBeforeTxnExpired = msBeforeLockExpired + } tikvLockResolverCountWithNotExpired.Inc() } } if len(expiredLocks) == 0 { - return false, nil + if msBeforeTxnExpired > 0 { + tikvLockResolverCountWithWaitExpired.Inc() + } + return } // TxnID -> []Region, record resolved Regions. // TODO: Maybe put it in LockResolver and share by all txns. cleanTxns := make(map[uint64]map[RegionVerID]struct{}) for _, l := range expiredLocks { - status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary) + var status TxnStatus + status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary) if err != nil { - return false, errors.Trace(err) + msBeforeTxnExpired = 0 + err = errors.Trace(err) + return } - cleanRegions := cleanTxns[l.TxnID] - if cleanRegions == nil { + cleanRegions, exists := cleanTxns[l.TxnID] + if !exists { cleanRegions = make(map[RegionVerID]struct{}) cleanTxns[l.TxnID] = cleanRegions } err = lr.resolveLock(bo, l, status, cleanRegions) if err != nil { - return false, errors.Trace(err) + msBeforeTxnExpired = 0 + err = errors.Trace(err) + return } } - return len(expiredLocks) == len(locks), nil + return } // GetTxnStatus queries tikv-server for a txn's status (commit/rollback). diff --git a/store/tikv/oracle/oracle.go b/store/tikv/oracle/oracle.go index b7b8857e8a93d..c04909b3d1458 100644 --- a/store/tikv/oracle/oracle.go +++ b/store/tikv/oracle/oracle.go @@ -23,6 +23,7 @@ type Oracle interface { GetTimestamp(ctx context.Context) (uint64, error) GetTimestampAsync(ctx context.Context) Future IsExpired(lockTimestamp uint64, TTL uint64) bool + UntilExpired(lockTimeStamp uint64, TTL uint64) int64 Close() } diff --git a/store/tikv/oracle/oracles/export_test.go b/store/tikv/oracle/oracles/export_test.go new file mode 100644 index 0000000000000..999c8cf5f6e39 --- /dev/null +++ b/store/tikv/oracle/oracles/export_test.go @@ -0,0 +1,54 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package oracles + +import ( + "time" + + "github.com/pingcap/tidb/store/tikv/oracle" +) + +// SetOracleHookCurrentTime exports localOracle's time hook to test. +func SetOracleHookCurrentTime(oc oracle.Oracle, t time.Time) { + switch o := oc.(type) { + case *localOracle: + if o.hook == nil { + o.hook = &struct { + currentTime time.Time + }{} + } + o.hook.currentTime = t + } +} + +// ClearOracleHook exports localOracle's clear hook method +func ClearOracleHook(oc oracle.Oracle) { + switch o := oc.(type) { + case *localOracle: + o.hook = nil + } +} + +// NewEmptyPDOracle exports pdOracle struct to test +func NewEmptyPDOracle() oracle.Oracle { + return &pdOracle{} +} + +// SetEmptyPDOracleLastTs exports PD oracle's last ts to test. +func SetEmptyPDOracleLastTs(oc oracle.Oracle, ts uint64) { + switch o := oc.(type) { + case *pdOracle: + o.lastTS = ts + } +} diff --git a/store/tikv/oracle/oracles/local.go b/store/tikv/oracle/oracles/local.go index b56a32f378127..25543ec332b39 100644 --- a/store/tikv/oracle/oracles/local.go +++ b/store/tikv/oracle/oracles/local.go @@ -27,6 +27,9 @@ type localOracle struct { sync.Mutex lastTimeStampTS uint64 n uint64 + hook *struct { + currentTime time.Time + } } // NewLocalOracle creates an Oracle that uses local time as data source. @@ -35,13 +38,21 @@ func NewLocalOracle() oracle.Oracle { } func (l *localOracle) IsExpired(lockTS uint64, TTL uint64) bool { - return oracle.GetPhysical(time.Now()) >= oracle.ExtractPhysical(lockTS)+int64(TTL) + now := time.Now() + if l.hook != nil { + now = l.hook.currentTime + } + return oracle.GetPhysical(now) >= oracle.ExtractPhysical(lockTS)+int64(TTL) } func (l *localOracle) GetTimestamp(context.Context) (uint64, error) { l.Lock() defer l.Unlock() - physical := oracle.GetPhysical(time.Now()) + now := time.Now() + if l.hook != nil { + now = l.hook.currentTime + } + physical := oracle.GetPhysical(now) ts := oracle.ComposeTS(physical, 0) if l.lastTimeStampTS == ts { l.n++ @@ -68,5 +79,14 @@ func (f *future) Wait() (uint64, error) { return f.l.GetTimestamp(f.ctx) } +// UntilExpired implement oracle.Oracle interface. +func (l *localOracle) UntilExpired(lockTimeStamp uint64, TTL uint64) int64 { + now := time.Now() + if l.hook != nil { + now = l.hook.currentTime + } + return oracle.ExtractPhysical(lockTimeStamp) + int64(TTL) - oracle.GetPhysical(now) +} + func (l *localOracle) Close() { } diff --git a/store/tikv/oracle/oracles/local_test.go b/store/tikv/oracle/oracles/local_test.go index 809d3fd711a7e..678eb005bc8ff 100644 --- a/store/tikv/oracle/oracles/local_test.go +++ b/store/tikv/oracle/oracles/local_test.go @@ -11,16 +11,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -package oracles +package oracles_test import ( "context" "testing" "time" + + "github.com/pingcap/tidb/store/tikv/oracle/oracles" ) func TestLocalOracle(t *testing.T) { - l := NewLocalOracle() + l := oracles.NewLocalOracle() defer l.Close() m := map[uint64]struct{}{} for i := 0; i < 100000; i++ { @@ -37,11 +39,13 @@ func TestLocalOracle(t *testing.T) { } func TestIsExpired(t *testing.T) { - o := NewLocalOracle() + o := oracles.NewLocalOracle() defer o.Close() + start := time.Now() + oracles.SetOracleHookCurrentTime(o, start) ts, _ := o.GetTimestamp(context.Background()) - time.Sleep(50 * time.Millisecond) - expire := o.IsExpired(uint64(ts), 40) + oracles.SetOracleHookCurrentTime(o, start.Add(10*time.Millisecond)) + expire := o.IsExpired(uint64(ts), 5) if !expire { t.Error("should expired") } @@ -50,3 +54,15 @@ func TestIsExpired(t *testing.T) { t.Error("should not expired") } } + +func TestLocalOracle_UntilExpired(t *testing.T) { + o := oracles.NewLocalOracle() + defer o.Close() + start := time.Now() + oracles.SetOracleHookCurrentTime(o, start) + ts, _ := o.GetTimestamp(context.Background()) + oracles.SetOracleHookCurrentTime(o, start.Add(10*time.Millisecond)) + if o.UntilExpired(uint64(ts), 5) != -5 || o.UntilExpired(uint64(ts), 15) != 5 { + t.Error("until expired should be +-5") + } +} diff --git a/store/tikv/oracle/oracles/pd.go b/store/tikv/oracle/oracles/pd.go index 3a044d261714e..2128c6d2d8860 100644 --- a/store/tikv/oracle/oracles/pd.go +++ b/store/tikv/oracle/oracles/pd.go @@ -137,6 +137,12 @@ func (o *pdOracle) updateTS(ctx context.Context, interval time.Duration) { } } +// UntilExpired implement oracle.Oracle interface. +func (o *pdOracle) UntilExpired(lockTS uint64, TTL uint64) int64 { + lastTS := atomic.LoadUint64(&o.lastTS) + return oracle.ExtractPhysical(lockTS) + int64(TTL) - oracle.ExtractPhysical(lastTS) +} + func (o *pdOracle) Close() { close(o.quit) } diff --git a/store/tikv/oracle/oracles/pd_test.go b/store/tikv/oracle/oracles/pd_test.go new file mode 100644 index 0000000000000..882c0dc64f578 --- /dev/null +++ b/store/tikv/oracle/oracles/pd_test.go @@ -0,0 +1,34 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package oracles_test + +import ( + "testing" + "time" + + "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/pingcap/tidb/store/tikv/oracle/oracles" +) + +func TestPDOracle_UntilExpired(t *testing.T) { + lockAfter, lockExp := 10, 15 + o := oracles.NewEmptyPDOracle() + start := time.Now() + oracles.SetEmptyPDOracleLastTs(o, oracle.ComposeTS(oracle.GetPhysical(start), 0)) + lockTs := oracle.ComposeTS(oracle.GetPhysical(start.Add(time.Duration(lockAfter)*time.Millisecond)), 1) + waitTs := o.UntilExpired(lockTs, uint64(lockExp)) + if waitTs != int64(lockAfter+lockExp) { + t.Errorf("waitTs shoulb be %d but got %d", int64(lockAfter+lockExp), waitTs) + } +} diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index b87c206840fbc..fc137bc62395d 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -203,12 +203,12 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll locks = append(locks, lock) } if len(lockedKeys) > 0 { - ok, err := s.store.lockResolver.ResolveLocks(bo, locks) + msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, locks) if err != nil { return errors.Trace(err) } - if !ok { - err = bo.Backoff(boTxnLockFast, errors.Errorf("batchGet lockedKeys: %d", len(lockedKeys))) + if msBeforeExpired > 0 { + err = bo.BackoffWithMaxSleep(boTxnLockFast, int(msBeforeExpired), errors.Errorf("batchGet lockedKeys: %d", len(lockedKeys))) if err != nil { return errors.Trace(err) } @@ -277,12 +277,12 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { if err != nil { return nil, errors.Trace(err) } - ok, err := s.store.lockResolver.ResolveLocks(bo, []*Lock{lock}) + msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, []*Lock{lock}) if err != nil { return nil, errors.Trace(err) } - if !ok { - err = bo.Backoff(boTxnLockFast, errors.New(keyErr.String())) + if msBeforeExpired > 0 { + err = bo.BackoffWithMaxSleep(boTxnLockFast, int(msBeforeExpired), errors.New(keyErr.String())) if err != nil { return nil, errors.Trace(err) }