-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
do gc parallel #5832
do gc parallel #5832
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -107,7 +107,8 @@ const ( | |
gcSafePointKey = "tikv_gc_safe_point" | ||
gcSafePointCacheInterval = tikv.GcSafePointCacheInterval | ||
// We don't want gc to sweep out the cached info belong to other processes, like coprocessor. | ||
gcScanLockLimit = tikv.ResolvedCacheSize / 2 | ||
gcScanLockLimit = tikv.ResolvedCacheSize / 2 | ||
gcDefaultGCConcurrency = 8 | ||
) | ||
|
||
var gcVariableComments = map[string]string{ | ||
|
@@ -422,6 +423,127 @@ func (w *GCWorker) deleteRanges(ctx goctx.Context, safePoint uint64) error { | |
return nil | ||
} | ||
|
||
type gcTask struct { | ||
startKey []byte | ||
endKey []byte | ||
safePoint uint64 | ||
} | ||
|
||
type gcResult struct { | ||
task *gcTask | ||
successRegions int | ||
failedRegions int | ||
err error | ||
} | ||
|
||
func getNextGCTask(store tikv.Storage, bo *tikv.Backoffer, safePoint uint64, lastKey kv.Key) (*gcTask, error) { | ||
loc, err := store.GetRegionCache().LocateKey(bo, lastKey) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
|
||
task := &gcTask{ | ||
startKey: lastKey, | ||
endKey: loc.EndKey, | ||
safePoint: safePoint, | ||
} | ||
return task, nil | ||
} | ||
|
||
func doGC(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier string) error { | ||
gcWorkerCounter.WithLabelValues("do_gc").Inc() | ||
|
||
err := saveSafePoint(store.GetSafePointKV(), tikv.GcSavedSafePoint, safePoint) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
|
||
// Sleep to wait for all other tidb instances update their safepoint cache. | ||
time.Sleep(gcSafePointCacheInterval) | ||
|
||
successRegions := 0 | ||
failedRegions := 0 | ||
remained := 0 | ||
startTime := time.Now() | ||
log.Infof("[gc worker] %s start gc, safePoint: %v.", identifier, safePoint) | ||
defer func() { | ||
log.Infof("[gc worker] %s finish gc, running jobs %v, safePoint: %v, successful regions: %v,"+ | ||
" failed regions: %v, cost time: %s", | ||
identifier, remained, safePoint, successRegions, failedRegions, time.Since(startTime)) | ||
gcHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds()) | ||
}() | ||
|
||
var key []byte | ||
gcResultChan := make(chan *gcResult, gcDefaultGCConcurrency) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. gcTaskQueue := make(chan gcTask, gcDefaultGCConcurrency) ...receive task results if have |
||
bo := tikv.NewBackoffer(tikv.GcGetTaskMaxBackoff, goctx.Background()) | ||
jobCtx, cancel := goctx.WithCancel(ctx) | ||
defer cancel() | ||
for i := 0; i < gcDefaultGCConcurrency; i++ { | ||
task, err := getNextGCTask(store, bo, safePoint, key) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
key = task.endKey | ||
|
||
if task != nil { | ||
go doGCTask(jobCtx, store, identifier, gcResultChan, task) | ||
remained++ | ||
} | ||
|
||
if len(key) == 0 { | ||
break | ||
} | ||
} | ||
|
||
ticker := time.NewTicker(gcJobLogTickInterval) | ||
defer ticker.Stop() | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return errors.New("[gc worker] gc job canceled") | ||
case <-ticker.C: | ||
log.Infof("[gc worker] %s gc in process, running jobs %v, safePoint: %v, successful regions: %v, "+ | ||
"failed regions: %v, cost time: %s", | ||
identifier, remained, safePoint, successRegions, failedRegions, time.Since(startTime)) | ||
case res := <-gcResultChan: | ||
remained-- | ||
|
||
successRegions += res.successRegions | ||
failedRegions += res.failedRegions | ||
|
||
if len(key) == 0 { | ||
if remained == 0 { | ||
return nil | ||
} | ||
|
||
continue | ||
} | ||
|
||
task, err := getNextGCTask(store, bo, safePoint, key) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
key = task.endKey | ||
|
||
if task != nil { | ||
go doGCTask(jobCtx, store, identifier, gcResultChan, task) | ||
remained++ | ||
} | ||
} | ||
} | ||
} | ||
|
||
func doGCTask(ctx goctx.Context, store tikv.Storage, identifier string, resChan chan *gcResult, task *gcTask) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function should wait on a task queue and handle task one by one util receive nil. |
||
res := doGCForOneRange(ctx, store, task, identifier) | ||
|
||
select { | ||
case <-ctx.Done(): | ||
return | ||
default: | ||
resChan <- res | ||
} | ||
} | ||
|
||
func resolveLocks(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier string) error { | ||
gcWorkerCounter.WithLabelValues("resolve_locks").Inc() | ||
|
||
|
@@ -511,44 +633,30 @@ func resolveLocks(ctx goctx.Context, store tikv.Storage, safePoint uint64, ident | |
return nil | ||
} | ||
|
||
func doGC(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier string) error { | ||
gcWorkerCounter.WithLabelValues("do_gc").Inc() | ||
|
||
err := saveSafePoint(store.GetSafePointKV(), tikv.GcSavedSafePoint, safePoint) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
|
||
// Sleep to wait for all other tidb instances update their safepoint cache. | ||
time.Sleep(gcSafePointCacheInterval) | ||
|
||
log.Infof("[gc worker] %s start gc, safePoint: %v.", identifier, safePoint) | ||
startTime := time.Now() | ||
func doGCForOneRange(ctx goctx.Context, store tikv.Storage, task *gcTask, identifier string) *gcResult { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems you should use a range as the input args from the function name. |
||
successRegions := 0 | ||
failedRegions := 0 | ||
|
||
ticker := time.NewTicker(gcJobLogTickInterval) | ||
defer ticker.Stop() | ||
|
||
bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx) | ||
var key []byte | ||
key := task.startKey | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return errors.New("[gc worker] gc job canceled") | ||
case <-ticker.C: | ||
log.Infof("[gc worker] %s gc in process, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s", | ||
identifier, safePoint, successRegions, failedRegions, time.Since(startTime)) | ||
return &gcResult{ | ||
task, | ||
successRegions, | ||
failedRegions, | ||
errors.New("[gc worker] gc job canceled")} | ||
default: | ||
} | ||
|
||
loc, err := store.GetRegionCache().LocateKey(bo, key) | ||
if err != nil { | ||
return errors.Trace(err) | ||
return &gcResult{task, successRegions, failedRegions, errors.Trace(err)} | ||
} | ||
|
||
var regionErr *errorpb.Error | ||
regionErr, err = doGCForOneRegion(bo, store, safePoint, loc.Region) | ||
regionErr, err = doGCForOneRegion(bo, store, task.safePoint, loc.Region) | ||
|
||
// we check regionErr here first, because we know 'regionErr' and 'err' should not return together, to keep it to | ||
// make the process correct. | ||
|
@@ -569,14 +677,11 @@ func doGC(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier st | |
} | ||
|
||
key = loc.EndKey | ||
if len(key) == 0 { | ||
if len(key) == 0 || bytes.Compare(key, task.endKey) >= 0 { | ||
break | ||
} | ||
bo = tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx) | ||
} | ||
log.Infof("[gc worker] %s finish gc, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s", | ||
identifier, safePoint, successRegions, failedRegions, time.Since(startTime)) | ||
gcHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds()) | ||
return nil | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/getNextGCTask/genNextGCTask
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok