From 10de1563e2f05976d44d58961004058662d13997 Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 22 Apr 2024 15:20:05 +0800 Subject: [PATCH] address Signed-off-by: nolouch --- pkg/core/region.go | 18 +++++++++++++++++ pkg/mcs/scheduling/server/cluster.go | 25 +++++------------------ pkg/ratelimit/metrics.go | 8 +++++--- server/cluster/cluster.go | 30 ++++++---------------------- 4 files changed, 34 insertions(+), 47 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index f4cf5409856..387557af580 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -856,6 +856,24 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } } +// RegionHeartbeatStageName is the name of the stage of the region heartbeat. +const ( + HandleStatsAsync = "HandleStatsAsync" + ObserveRegionStatsAsync = "ObserveRegionStatsAsync" + UpdateSubTree = "UpdateSubTree" + HandleOverlaps = "HandleOverlaps" + CollectRegionStatsAsync = "CollectRegionStatsAsync" + SaveRegionToKV = "SaveRegionToKV" +) + +// ExtraTaskOpts returns the task options for the task. +func ExtraTaskOpts(ctx *MetaProcessContext, name string) ratelimit.TaskOpts { + return ratelimit.TaskOpts{ + TaskName: name, + Limit: ctx.Limiter, + } +} + // RWLockStats is a read-write lock with statistics. type RWLockStats struct { syncutil.RWMutex diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 88d21421405..94b24f4ca16 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -590,10 +590,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c ctx.TaskRunner.RunTask( ctx, - ratelimit.TaskOpts{ - TaskName: "HandleStatsAsync", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.HandleStatsAsync), func(_ context.Context) { cluster.HandleStatsAsync(c, region) }, @@ -610,10 +607,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.TaskRunner.RunTask( ctx, - ratelimit.TaskOpts{ - TaskName: "ObserveRegionStatsAsync", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.ObserveRegionStatsAsync), func(_ context.Context) { if c.regionStats.RegionStatsNeedUpdate(region) { cluster.Collect(c, region, hasRegionStats) @@ -638,10 +632,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c } ctx.TaskRunner.RunTask( ctx, - ratelimit.TaskOpts{ - TaskName: "UpdateSubTree", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.UpdateSubTree), func(_ context.Context) { c.CheckAndPutSubTree(region) }, @@ -649,10 +640,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c tracer.OnUpdateSubTreeFinished() ctx.TaskRunner.RunTask( ctx, - ratelimit.TaskOpts{ - TaskName: "HandleOverlaps", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.HandleOverlaps), func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, @@ -662,10 +650,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // handle region stats ctx.TaskRunner.RunTask( ctx, - ratelimit.TaskOpts{ - TaskName: "CollectRegionStatsAsync", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.CollectRegionStatsAsync), func(_ context.Context) { cluster.Collect(c, region, hasRegionStats) }, diff --git a/pkg/ratelimit/metrics.go b/pkg/ratelimit/metrics.go index bb6e3b890fd..3c5020554a8 100644 --- a/pkg/ratelimit/metrics.go +++ b/pkg/ratelimit/metrics.go @@ -18,6 +18,8 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +const nameStr = "runner_name" + var ( RunnerTaskMaxWaitingDuration = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -25,7 +27,7 @@ var ( Subsystem: "ratelimit", Name: "runner_task_max_waiting_duration_seconds", Help: "The duration of tasks waiting in the runner.", - }, []string{"name"}) + }, []string{nameStr}) RunnerTaskPendingTasks = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -33,14 +35,14 @@ var ( Subsystem: "ratelimit", Name: "runner_task_pending_tasks", Help: "The number of pending tasks in the runner.", - }, []string{"name"}) + }, []string{nameStr}) RunnerTaskFailedTasks = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", Subsystem: "ratelimit", Name: "runner_task_failed_tasks_total", Help: "The number of failed tasks in the runner.", - }, []string{"name"}) + }, []string{nameStr}) ) func init() { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 32ce23ef09b..dbc6a6cadf3 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1015,10 +1015,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { ctx.TaskRunner.RunTask( ctx.Context, - ratelimit.TaskOpts{ - TaskName: "HandleStatsAsync", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.HandleStatsAsync), func(_ context.Context) { cluster.HandleStatsAsync(c, region) }, @@ -1039,10 +1036,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.TaskRunner.RunTask( ctx.Context, - ratelimit.TaskOpts{ - TaskName: "ObserveRegionStatsAsync", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.ObserveRegionStatsAsync), func(_ context.Context) { if c.regionStats.RegionStatsNeedUpdate(region) { cluster.Collect(c, region, hasRegionStats) @@ -1071,10 +1065,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio } ctx.TaskRunner.RunTask( ctx, - ratelimit.TaskOpts{ - TaskName: "UpdateSubTree", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.UpdateSubTree), func(_ context.Context) { c.CheckAndPutSubTree(region) }, @@ -1084,10 +1075,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { ctx.TaskRunner.RunTask( ctx.Context, - ratelimit.TaskOpts{ - TaskName: "HandleOverlaps", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.HandleOverlaps), func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, @@ -1100,10 +1088,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // handle region stats ctx.TaskRunner.RunTask( ctx.Context, - ratelimit.TaskOpts{ - TaskName: "CollectRegionStatsAsync", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.CollectRegionStatsAsync), func(_ context.Context) { // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", // region stats needs to be collected in API mode. @@ -1117,10 +1102,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if saveKV { ctx.TaskRunner.RunTask( ctx.Context, - ratelimit.TaskOpts{ - TaskName: "SaveRegionToKV", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.SaveRegionToKV), func(_ context.Context) { // If there are concurrent heartbeats from the same region, the last write will win even if // writes to storage in the critical area. So don't use mutex to protect it.