Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikvclient: reduce wait backoff time when lock has be expired #10006

Merged
merged 4 commits into from
May 17, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions store/mockoracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ func (o *MockOracle) IsExpired(lockTimestamp uint64, TTL uint64) bool {
return oracle.GetPhysical(time.Now().Add(o.offset)) >= oracle.ExtractPhysical(lockTimestamp)+int64(TTL)
}

// UntilExpired implement oracle.Oracle interface.
func (o *MockOracle) UntilExpired(lockTimeStamp uint64, TTL uint64) int64 {
o.RLock()
defer o.RUnlock()
return oracle.ExtractPhysical(lockTimeStamp) + int64(TTL) - oracle.GetPhysical(time.Now().Add(o.offset))
}

// Close implements oracle.Oracle interface.
func (o *MockOracle) Close() {

Expand Down
12 changes: 6 additions & 6 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,13 +528,13 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
locks = append(locks, lock)
}
start := time.Now()
ok, err := c.store.lockResolver.ResolveLocks(bo, locks)
msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, locks)
if err != nil {
return errors.Trace(err)
}
atomic.AddInt64(&c.detail.ResolveLockTime, int64(time.Since(start)))
if !ok {
err = bo.Backoff(BoTxnLock, errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if msBeforeExpired > 0 {
err = bo.BackoffWithMaxSleep(BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -615,12 +615,12 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc
}
locks = append(locks, lock)
}
ok, err := c.store.lockResolver.ResolveLocks(bo, locks)
msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, locks)
if err != nil {
return errors.Trace(err)
}
if !ok {
err = bo.Backoff(BoTxnLock, errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if msBeforeExpired > 0 {
err = bo.BackoffWithMaxSleep(BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if err != nil {
return errors.Trace(err)
}
Expand Down
25 changes: 18 additions & 7 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func (t backoffType) Counter() prometheus.Counter {
// NewBackoffFn creates a backoff func which implements exponential backoff with
// optional jitters.
// See http://www.awsarchitectureblog.com/2015/03/backoff.html
func NewBackoffFn(base, cap, jitter int) func(ctx context.Context) int {
func NewBackoffFn(base, cap, jitter int) func(ctx context.Context, maxSleepMs int) int {
if base < 2 {
// Top prevent panic in 'rand.Intn'.
base = 2
}
attempts := 0
lastSleep := base
return func(ctx context.Context) int {
return func(ctx context.Context, maxSleepMs int) int {
var sleep int
switch jitter {
case NoJitter:
Expand All @@ -102,8 +102,13 @@ func NewBackoffFn(base, cap, jitter int) func(ctx context.Context) int {
logutil.Logger(context.Background()).Debug("backoff",
zap.Int("base", base),
zap.Int("sleep", sleep))

realSleep := sleep
if maxSleepMs >= 0 && realSleep > maxSleepMs {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the maxSleepMs will be 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxSleepMs never should be 0 in current usage, but backoff is a common function and we add new maxSleepMs Param, so we should take care about it and in logic "force sleep zero" is useful.

realSleep = maxSleepMs
}
select {
case <-time.After(time.Duration(sleep) * time.Millisecond):
case <-time.After(time.Duration(realSleep) * time.Millisecond):
case <-ctx.Done():
}

Expand All @@ -130,7 +135,7 @@ const (
boServerBusy
)

func (t backoffType) createFn(vars *kv.Variables) func(context.Context) int {
func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int {
if vars.Hook != nil {
vars.Hook(t.String(), vars)
}
Expand Down Expand Up @@ -216,7 +221,7 @@ var CommitMaxBackoff = 41000
type Backoffer struct {
ctx context.Context

fn map[backoffType]func(context.Context) int
fn map[backoffType]func(context.Context, int) int
maxSleep int
totalSleep int
errors []error
Expand Down Expand Up @@ -252,6 +257,12 @@ func (b *Backoffer) WithVars(vars *kv.Variables) *Backoffer {
// Backoff sleeps a while base on the backoffType and records the error message.
// It returns a retryable error if total sleep time exceeds maxSleep.
func (b *Backoffer) Backoff(typ backoffType, err error) error {
return b.BackoffWithMaxSleep(typ, -1, err)
}

// BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message
// and never sleep more than maxSleepMs for each sleep.
func (b *Backoffer) BackoffWithMaxSleep(typ backoffType, maxSleepMs int, err error) error {
if strings.Contains(err.Error(), mismatchClusterID) {
logutil.Logger(context.Background()).Fatal("critical error", zap.Error(err))
}
Expand All @@ -264,15 +275,15 @@ func (b *Backoffer) Backoff(typ backoffType, err error) error {
typ.Counter().Inc()
// Lazy initialize.
if b.fn == nil {
b.fn = make(map[backoffType]func(context.Context) int)
b.fn = make(map[backoffType]func(context.Context, int) int)
}
f, ok := b.fn[typ]
if !ok {
f = typ.createFn(b.vars)
b.fn[typ] = f
}

b.totalSleep += f(b.ctx)
b.totalSleep += f(b.ctx, maxSleepMs)
b.types = append(b.types, typ)

var startTs interface{}
Expand Down
6 changes: 3 additions & 3 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,12 +778,12 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
logutil.Logger(context.Background()).Debug("coprocessor encounters",
zap.Stringer("lock", lockErr))
ok, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)})
msBeforeExpired, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)})
if err1 != nil {
return nil, errors.Trace(err1)
}
if !ok {
if err := bo.Backoff(boTxnLockFast, errors.New(lockErr.String())); err != nil {
if msBeforeExpired > 0 {
if err := bo.BackoffWithMaxSleep(boTxnLockFast, int(msBeforeExpired), errors.New(lockErr.String())); err != nil {
return nil, errors.Trace(err)
}
}
Expand Down
33 changes: 23 additions & 10 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
tikvLockResolverCountWithBatchResolve = metrics.TiKVLockResolverCounter.WithLabelValues("batch_resolve")
tikvLockResolverCountWithExpired = metrics.TiKVLockResolverCounter.WithLabelValues("expired")
tikvLockResolverCountWithNotExpired = metrics.TiKVLockResolverCounter.WithLabelValues("not_expired")
tikvLockResolverCountWithWaitExpired = metrics.TiKVLockResolverCounter.WithLabelValues("wait_expired")
tikvLockResolverCountWithResolve = metrics.TiKVLockResolverCounter.WithLabelValues("resolve")
tikvLockResolverCountWithQueryTxnStatus = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status")
tikvLockResolverCountWithQueryTxnStatusCommitted = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status_committed")
Expand Down Expand Up @@ -265,47 +266,59 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
// commit status.
// 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (ok bool, err error) {
func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnExpired int64, err error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

msBeforeTxnExpired stands for "milliseconds before transaction expired"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems yes.

if len(locks) == 0 {
return true, nil
return
}

tikvLockResolverCountWithResolve.Inc()

var expiredLocks []*Lock
for _, l := range locks {
if lr.store.GetOracle().IsExpired(l.TxnID, l.TTL) {
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL)
if msBeforeLockExpired <= 0 {
tikvLockResolverCountWithExpired.Inc()
expiredLocks = append(expiredLocks, l)
} else {
if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired {
msBeforeTxnExpired = msBeforeLockExpired
}
tikvLockResolverCountWithNotExpired.Inc()
}
}
if len(expiredLocks) == 0 {
return false, nil
if msBeforeTxnExpired > 0 {
tikvLockResolverCountWithWaitExpired.Inc()
}
return
}

// TxnID -> []Region, record resolved Regions.
// TODO: Maybe put it in LockResolver and share by all txns.
cleanTxns := make(map[uint64]map[RegionVerID]struct{})
for _, l := range expiredLocks {
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary)
var status TxnStatus
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary)
if err != nil {
return false, errors.Trace(err)
msBeforeTxnExpired = 0
err = errors.Trace(err)
return
}

cleanRegions := cleanTxns[l.TxnID]
if cleanRegions == nil {
cleanRegions, exists := cleanTxns[l.TxnID]
if !exists {
cleanRegions = make(map[RegionVerID]struct{})
cleanTxns[l.TxnID] = cleanRegions
}

err = lr.resolveLock(bo, l, status, cleanRegions)
if err != nil {
return false, errors.Trace(err)
msBeforeTxnExpired = 0
err = errors.Trace(err)
return
}
}
return len(expiredLocks) == len(locks), nil
return
}

// GetTxnStatus queries tikv-server for a txn's status (commit/rollback).
Expand Down
1 change: 1 addition & 0 deletions store/tikv/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Oracle interface {
GetTimestamp(ctx context.Context) (uint64, error)
GetTimestampAsync(ctx context.Context) Future
IsExpired(lockTimestamp uint64, TTL uint64) bool
UntilExpired(lockTimeStamp uint64, TTL uint64) int64
Close()
}

Expand Down
5 changes: 5 additions & 0 deletions store/tikv/oracle/oracles/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,10 @@ func (f *future) Wait() (uint64, error) {
return f.l.GetTimestamp(f.ctx)
}

// UntilExpired implement oracle.Oracle interface.
func (l *localOracle) UntilExpired(lockTimeStamp uint64, TTL uint64) int64 {
lysu marked this conversation as resolved.
Show resolved Hide resolved
return oracle.ExtractPhysical(lockTimeStamp) + int64(TTL) - oracle.GetPhysical(time.Now())
}

func (l *localOracle) Close() {
}
6 changes: 6 additions & 0 deletions store/tikv/oracle/oracles/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ func (o *pdOracle) updateTS(ctx context.Context, interval time.Duration) {
}
}

// UntilExpired implement oracle.Oracle interface.
func (o *pdOracle) UntilExpired(lockTS uint64, TTL uint64) int64 {
lysu marked this conversation as resolved.
Show resolved Hide resolved
lastTS := atomic.LoadUint64(&o.lastTS)
return oracle.ExtractPhysical(lockTS) + int64(TTL) - oracle.ExtractPhysical(lastTS)
}

func (o *pdOracle) Close() {
close(o.quit)
}
12 changes: 6 additions & 6 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,12 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
locks = append(locks, lock)
}
if len(lockedKeys) > 0 {
ok, err := s.store.lockResolver.ResolveLocks(bo, locks)
msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, locks)
if err != nil {
return errors.Trace(err)
}
if !ok {
err = bo.Backoff(boTxnLockFast, errors.Errorf("batchGet lockedKeys: %d", len(lockedKeys)))
if msBeforeExpired > 0 {
err = bo.BackoffWithMaxSleep(boTxnLockFast, int(msBeforeExpired), errors.Errorf("batchGet lockedKeys: %d", len(lockedKeys)))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -276,12 +276,12 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
if err != nil {
return nil, errors.Trace(err)
}
ok, err := s.store.lockResolver.ResolveLocks(bo, []*Lock{lock})
msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, []*Lock{lock})
if err != nil {
return nil, errors.Trace(err)
}
if !ok {
err = bo.Backoff(boTxnLockFast, errors.New(keyErr.String()))
if msBeforeExpired > 0 {
err = bo.BackoffWithMaxSleep(boTxnLockFast, int(msBeforeExpired), errors.New(keyErr.String()))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down