diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index b8bc19cb36af3..03e0acff65dda 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -142,7 +142,7 @@ const ( getMaxBackoff = 20000 prewriteMaxBackoff = 20000 cleanupMaxBackoff = 20000 - gcMaxBackoff = 100000 + gcOneRegionMaxBackoff = 20000 gcResolveLockMaxBackoff = 100000 gcDeleteRangeMaxBackoff = 100000 rawkvMaxBackoff = 20000 diff --git a/store/tikv/client.go b/store/tikv/client.go index 9d69739719bf8..256daa3ce52f2 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -34,6 +34,7 @@ const ( readTimeoutShort = 20 * time.Second // For requests that read/write several key-values. readTimeoutMedium = 60 * time.Second // For requests that may need scan region. readTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times. + gcTimeout = 5 * time.Minute grpcInitialWindowSize = 1 << 30 grpcInitialConnWindowSize = 1 << 30 diff --git a/store/tikv/gc_worker.go b/store/tikv/gc_worker.go index 27094c1a836f8..55abfa1741f86 100644 --- a/store/tikv/gc_worker.go +++ b/store/tikv/gc_worker.go @@ -24,6 +24,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/coreos/etcd/clientv3" "github.com/juju/errors" + "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb" "github.com/pingcap/tidb/ddl/util" @@ -189,6 +190,7 @@ func (w *GCWorker) Close() { const ( gcTimeFormat = "20060102-15:04:05 -0700 MST" gcWorkerTickInterval = time.Minute + gcJobLogTickInterval = time.Minute * 10 gcWorkerLease = time.Minute * 2 gcLeaderUUIDKey = "tikv_gc_leader_uuid" gcLeaderDescKey = "tikv_gc_leader_desc" @@ -315,7 +317,7 @@ func (w *GCWorker) leaderTick(ctx goctx.Context) error { } w.gcIsRunning = true - log.Infof("[gc worker] %s starts GC job, safePoint: %v", w.uuid, safePoint) + log.Infof("[gc worker] %s starts the whole job, safePoint: %v", w.uuid, safePoint) go w.runGCJob(ctx, safePoint) return nil } @@ -413,18 +415,23 @@ func (w *GCWorker) runGCJob(ctx goctx.Context, safePoint uint64) { gcWorkerCounter.WithLabelValues("run_job").Inc() err := resolveLocks(ctx, w.store, safePoint, w.uuid) if err != nil { + log.Errorf("[gc worker] %s resolve locks returns an error %v", w.uuid, err) + gcJobFailureCounter.WithLabelValues("resolve_lock").Inc() w.done <- errors.Trace(err) return } err = w.deleteRanges(ctx, safePoint) if err != nil { + log.Errorf("[gc worker] %s delete range returns an error %v", w.uuid, err) + gcJobFailureCounter.WithLabelValues("delete_range").Inc() w.done <- errors.Trace(err) return } err = doGC(ctx, w.store, safePoint, w.uuid) if err != nil { - log.Error("do GC returns an error", err) + log.Errorf("[gc worker] %s do GC returns an error %v", w.uuid, err) w.gcIsRunning = false + gcJobFailureCounter.WithLabelValues("gc").Inc() w.done <- errors.Trace(err) return } @@ -441,7 +448,7 @@ func (w *GCWorker) deleteRanges(ctx goctx.Context, safePoint uint64) error { return errors.Trace(err) } - bo := NewBackoffer(gcDeleteRangeMaxBackoff, goctx.Background()) + bo := NewBackoffer(gcDeleteRangeMaxBackoff, ctx) log.Infof("[gc worker] %s start delete %v ranges", w.uuid, len(ranges)) startTime := time.Now() regions := 0 @@ -520,7 +527,7 @@ func resolveLocks(ctx goctx.Context, store *tikvStore, safePoint uint64, identif MaxVersion: safePoint, }, } - bo := NewBackoffer(gcResolveLockMaxBackoff, goctx.Background()) + bo := NewBackoffer(gcResolveLockMaxBackoff, ctx) log.Infof("[gc worker] %s start resolve locks, safePoint: %v.", identifier, safePoint) startTime := time.Now() @@ -599,23 +606,23 @@ func doGC(ctx goctx.Context, store *tikvStore, safePoint uint64, identifier stri // Sleep to wait for all other tidb instances update their safepoint cache. time.Sleep(gcSafePointCacheInterval) - req := &tikvrpc.Request{ - Type: tikvrpc.CmdGC, - GC: &kvrpcpb.GCRequest{ - SafePoint: safePoint, - }, - } - bo := NewBackoffer(gcMaxBackoff, goctx.Background()) - log.Infof("[gc worker] %s start gc, safePoint: %v.", identifier, safePoint) startTime := time.Now() - regions := 0 + successRegions := 0 + failedRegions := 0 + + ticker := time.NewTicker(gcJobLogTickInterval) + defer ticker.Stop() + bo := NewBackoffer(gcOneRegionMaxBackoff, ctx) var key []byte for { select { case <-ctx.Done(): return errors.New("[gc worker] gc job canceled") + case <-ticker.C: + log.Infof("[gc worker] %s gc in process, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s", + identifier, safePoint, successRegions, failedRegions, time.Since(startTime)) default: } @@ -623,39 +630,72 @@ func doGC(ctx goctx.Context, store *tikvStore, safePoint uint64, identifier stri if err != nil { return errors.Trace(err) } - resp, err := store.SendReq(bo, req, loc.Region, readTimeoutLong) - if err != nil { - return errors.Trace(err) - } - regionErr, err := resp.GetRegionError() - if err != nil { - return errors.Trace(err) - } + + var regionErr *errorpb.Error + regionErr, err = doGCForOneRegion(bo, store, safePoint, loc.Region) + + // we check regionErr here first, because we know 'regionErr' and 'err' should not return together, to keep it to + // make the process correct. if regionErr != nil { err = bo.Backoff(boRegionMiss, errors.New(regionErr.String())) - if err != nil { - return errors.Trace(err) + if err == nil { + continue } - continue - } - gcResp := resp.GC - if gcResp == nil { - return errors.Trace(errBodyMissing) } - if gcResp.GetError() != nil { - return errors.Errorf("unexpected gc error: %s", gcResp.GetError()) + + if err != nil { + failedRegions++ + gcActionRegionResultCounter.WithLabelValues("fail").Inc() + log.Warnf("[gc worker] %s failed to do gc on region(%s, %s), ignore it", identifier, string(loc.StartKey), string(loc.EndKey)) + } else { + successRegions++ + gcActionRegionResultCounter.WithLabelValues("success").Inc() } - regions++ + key = loc.EndKey if len(key) == 0 { break } + bo = NewBackoffer(gcOneRegionMaxBackoff, ctx) } - log.Infof("[gc worker] %s finish gc, safePoint: %v, regions: %v, cost time: %s", identifier, safePoint, regions, time.Since(startTime)) + log.Infof("[gc worker] %s finish gc, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s", + identifier, safePoint, successRegions, failedRegions, time.Since(startTime)) gcHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds()) return nil } +// these two errors should not return together, for more, see the func 'doGC' +func doGCForOneRegion(bo *Backoffer, store *tikvStore, safePoint uint64, region RegionVerID) (*errorpb.Error, error) { + req := &tikvrpc.Request{ + Type: tikvrpc.CmdGC, + GC: &kvrpcpb.GCRequest{ + SafePoint: safePoint, + }, + } + + resp, err := store.SendReq(bo, req, region, gcTimeout) + if err != nil { + return nil, errors.Trace(err) + } + regionErr, err := resp.GetRegionError() + if err != nil { + return nil, errors.Trace(err) + } + if regionErr != nil { + return regionErr, nil + } + + gcResp := resp.GC + if gcResp == nil { + return nil, errors.Trace(errBodyMissing) + } + if gcResp.GetError() != nil { + return nil, errors.Errorf("unexpected gc error: %s", gcResp.GetError()) + } + + return nil, nil +} + func (w *GCWorker) checkLeader() (bool, error) { gcWorkerCounter.WithLabelValues("check_leader").Inc() session := createSession(w.store) diff --git a/store/tikv/metrics.go b/store/tikv/metrics.go index 054371360f17b..4c12d27f80342 100644 --- a/store/tikv/metrics.go +++ b/store/tikv/metrics.go @@ -130,6 +130,22 @@ var ( }, []string{"type"}, ) + gcJobFailureCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "tikvclient", + Name: "gc_failure", + Help: "Counter of gc job failure.", + }, []string{"type"}) + + gcActionRegionResultCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "tikvclient", + Name: "gc_action_result", + Help: "Counter of gc action result on region level.", + }, []string{"type"}) + lockResolverCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "tidb",