Skip to content

Commit

Permalink
store/tikv:Ignore error and do gc anyway (#5797) (#5815)
Browse files Browse the repository at this point in the history
* ignore error when gc, and continue do gc next region
  • Loading branch information
wentaoxu authored and coocood committed Feb 7, 2018
1 parent 9665073 commit cbf90f4
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 33 deletions.
2 changes: 1 addition & 1 deletion store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ const (
getMaxBackoff = 20000
prewriteMaxBackoff = 20000
cleanupMaxBackoff = 20000
gcMaxBackoff = 100000
gcOneRegionMaxBackoff = 20000
gcResolveLockMaxBackoff = 100000
gcDeleteRangeMaxBackoff = 100000
rawkvMaxBackoff = 20000
Expand Down
1 change: 1 addition & 0 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
readTimeoutShort = 20 * time.Second // For requests that read/write several key-values.
readTimeoutMedium = 60 * time.Second // For requests that may need scan region.
readTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times.
gcTimeout = 5 * time.Minute

grpcInitialWindowSize = 1 << 30
grpcInitialConnWindowSize = 1 << 30
Expand Down
104 changes: 72 additions & 32 deletions store/tikv/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/coreos/etcd/clientv3"
"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb"
"github.com/pingcap/tidb/ddl/util"
Expand Down Expand Up @@ -189,6 +190,7 @@ func (w *GCWorker) Close() {
const (
gcTimeFormat = "20060102-15:04:05 -0700 MST"
gcWorkerTickInterval = time.Minute
gcJobLogTickInterval = time.Minute * 10
gcWorkerLease = time.Minute * 2
gcLeaderUUIDKey = "tikv_gc_leader_uuid"
gcLeaderDescKey = "tikv_gc_leader_desc"
Expand Down Expand Up @@ -315,7 +317,7 @@ func (w *GCWorker) leaderTick(ctx goctx.Context) error {
}

w.gcIsRunning = true
log.Infof("[gc worker] %s starts GC job, safePoint: %v", w.uuid, safePoint)
log.Infof("[gc worker] %s starts the whole job, safePoint: %v", w.uuid, safePoint)
go w.runGCJob(ctx, safePoint)
return nil
}
Expand Down Expand Up @@ -413,18 +415,23 @@ func (w *GCWorker) runGCJob(ctx goctx.Context, safePoint uint64) {
gcWorkerCounter.WithLabelValues("run_job").Inc()
err := resolveLocks(ctx, w.store, safePoint, w.uuid)
if err != nil {
log.Errorf("[gc worker] %s resolve locks returns an error %v", w.uuid, err)
gcJobFailureCounter.WithLabelValues("resolve_lock").Inc()
w.done <- errors.Trace(err)
return
}
err = w.deleteRanges(ctx, safePoint)
if err != nil {
log.Errorf("[gc worker] %s delete range returns an error %v", w.uuid, err)
gcJobFailureCounter.WithLabelValues("delete_range").Inc()
w.done <- errors.Trace(err)
return
}
err = doGC(ctx, w.store, safePoint, w.uuid)
if err != nil {
log.Error("do GC returns an error", err)
log.Errorf("[gc worker] %s do GC returns an error %v", w.uuid, err)
w.gcIsRunning = false
gcJobFailureCounter.WithLabelValues("gc").Inc()
w.done <- errors.Trace(err)
return
}
Expand All @@ -441,7 +448,7 @@ func (w *GCWorker) deleteRanges(ctx goctx.Context, safePoint uint64) error {
return errors.Trace(err)
}

bo := NewBackoffer(gcDeleteRangeMaxBackoff, goctx.Background())
bo := NewBackoffer(gcDeleteRangeMaxBackoff, ctx)
log.Infof("[gc worker] %s start delete %v ranges", w.uuid, len(ranges))
startTime := time.Now()
regions := 0
Expand Down Expand Up @@ -520,7 +527,7 @@ func resolveLocks(ctx goctx.Context, store *tikvStore, safePoint uint64, identif
MaxVersion: safePoint,
},
}
bo := NewBackoffer(gcResolveLockMaxBackoff, goctx.Background())
bo := NewBackoffer(gcResolveLockMaxBackoff, ctx)

log.Infof("[gc worker] %s start resolve locks, safePoint: %v.", identifier, safePoint)
startTime := time.Now()
Expand Down Expand Up @@ -599,63 +606,96 @@ func doGC(ctx goctx.Context, store *tikvStore, safePoint uint64, identifier stri
// Sleep to wait for all other tidb instances update their safepoint cache.
time.Sleep(gcSafePointCacheInterval)

req := &tikvrpc.Request{
Type: tikvrpc.CmdGC,
GC: &kvrpcpb.GCRequest{
SafePoint: safePoint,
},
}
bo := NewBackoffer(gcMaxBackoff, goctx.Background())

log.Infof("[gc worker] %s start gc, safePoint: %v.", identifier, safePoint)
startTime := time.Now()
regions := 0
successRegions := 0
failedRegions := 0

ticker := time.NewTicker(gcJobLogTickInterval)
defer ticker.Stop()

bo := NewBackoffer(gcOneRegionMaxBackoff, ctx)
var key []byte
for {
select {
case <-ctx.Done():
return errors.New("[gc worker] gc job canceled")
case <-ticker.C:
log.Infof("[gc worker] %s gc in process, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s",
identifier, safePoint, successRegions, failedRegions, time.Since(startTime))
default:
}

loc, err := store.regionCache.LocateKey(bo, key)
if err != nil {
return errors.Trace(err)
}
resp, err := store.SendReq(bo, req, loc.Region, readTimeoutLong)
if err != nil {
return errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}

var regionErr *errorpb.Error
regionErr, err = doGCForOneRegion(bo, store, safePoint, loc.Region)

// we check regionErr here first, because we know 'regionErr' and 'err' should not return together, to keep it to
// make the process correct.
if regionErr != nil {
err = bo.Backoff(boRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
if err == nil {
continue
}
continue
}
gcResp := resp.GC
if gcResp == nil {
return errors.Trace(errBodyMissing)
}
if gcResp.GetError() != nil {
return errors.Errorf("unexpected gc error: %s", gcResp.GetError())

if err != nil {
failedRegions++
gcActionRegionResultCounter.WithLabelValues("fail").Inc()
log.Warnf("[gc worker] %s failed to do gc on region(%s, %s), ignore it", identifier, string(loc.StartKey), string(loc.EndKey))
} else {
successRegions++
gcActionRegionResultCounter.WithLabelValues("success").Inc()
}
regions++

key = loc.EndKey
if len(key) == 0 {
break
}
bo = NewBackoffer(gcOneRegionMaxBackoff, ctx)
}
log.Infof("[gc worker] %s finish gc, safePoint: %v, regions: %v, cost time: %s", identifier, safePoint, regions, time.Since(startTime))
log.Infof("[gc worker] %s finish gc, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s",
identifier, safePoint, successRegions, failedRegions, time.Since(startTime))
gcHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds())
return nil
}

// these two errors should not return together, for more, see the func 'doGC'
func doGCForOneRegion(bo *Backoffer, store *tikvStore, safePoint uint64, region RegionVerID) (*errorpb.Error, error) {
req := &tikvrpc.Request{
Type: tikvrpc.CmdGC,
GC: &kvrpcpb.GCRequest{
SafePoint: safePoint,
},
}

resp, err := store.SendReq(bo, req, region, gcTimeout)
if err != nil {
return nil, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return nil, errors.Trace(err)
}
if regionErr != nil {
return regionErr, nil
}

gcResp := resp.GC
if gcResp == nil {
return nil, errors.Trace(errBodyMissing)
}
if gcResp.GetError() != nil {
return nil, errors.Errorf("unexpected gc error: %s", gcResp.GetError())
}

return nil, nil
}

func (w *GCWorker) checkLeader() (bool, error) {
gcWorkerCounter.WithLabelValues("check_leader").Inc()
session := createSession(w.store)
Expand Down
16 changes: 16 additions & 0 deletions store/tikv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,22 @@ var (
}, []string{"type"},
)

gcJobFailureCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Name: "gc_failure",
Help: "Counter of gc job failure.",
}, []string{"type"})

gcActionRegionResultCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Name: "gc_action_result",
Help: "Counter of gc action result on region level.",
}, []string{"type"})

lockResolverCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Expand Down

0 comments on commit cbf90f4

Please sign in to comment.