From 4de610609c33f4ca1f9474342ff45863a513d444 Mon Sep 17 00:00:00 2001 From: nolouch Date: Thu, 11 Apr 2024 19:25:00 +0800 Subject: [PATCH] rename Signed-off-by: nolouch --- pkg/core/region.go | 4 ++-- pkg/mcs/scheduling/server/cluster.go | 8 ++++---- pkg/ratelimit/runner.go | 24 ++++++++++++------------ server/cluster/cluster.go | 18 +++++++++--------- server/cluster/cluster_test.go | 2 +- server/cluster/cluster_worker.go | 2 +- 6 files changed, 29 insertions(+), 29 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index 00174ae188b..4f84d20d94f 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -741,7 +741,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { TaskName: "Log", Limit: limiter, }, - func(ctx context.Context) { + func(_ context.Context) { d(msg, fields...) }, ) @@ -753,7 +753,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { TaskName: "Log", Limit: limiter, }, - func(ctx context.Context) { + func(_ context.Context) { i(msg, fields...) }, ) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index affc606524e..7c471d783a4 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -598,7 +598,7 @@ func (c *Cluster) processRegionHeartbeat(ctx context.Context, region *core.Regio TaskName: "HandleStatsAsync", Limit: limiter, }, - func(ctx context.Context) { + func(_ context.Context) { cluster.HandleStatsAsync(c, region) }, ) @@ -618,7 +618,7 @@ func (c *Cluster) processRegionHeartbeat(ctx context.Context, region *core.Regio TaskName: "ObserveRegionStatsAsync", Limit: limiter, }, - func(ctx context.Context) { + func(_ context.Context) { if c.regionStats.RegionStatsNeedUpdate(region) { cluster.Collect(c, region, hasRegionStats) } @@ -646,7 +646,7 @@ func (c *Cluster) processRegionHeartbeat(ctx context.Context, region *core.Regio TaskName: "HandleOverlaps", Limit: limiter, }, - func(ctx context.Context) { + func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, ) @@ -659,7 +659,7 @@ func (c *Cluster) processRegionHeartbeat(ctx context.Context, region *core.Regio TaskName: "CollectRegionStatsAsync", Limit: c.hbConcurrencyLimiter, }, - func(ctx context.Context) { + func(_ context.Context) { cluster.Collect(c, region, hasRegionStats) }, ) diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 26994ace1a2..f87ced11d5b 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -44,8 +44,8 @@ type Task struct { // ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum. var ErrMaxWaitingTasksExceeded = errors.New("max waiting tasks exceeded") -// AsyncRunner is a simple task runner that limits the number of concurrent tasks. -type AsyncRunner struct { +// ConcurrentRunner is a simple task runner that limits the number of concurrent tasks. +type ConcurrentRunner struct { name string maxPendingDuration time.Duration taskChan chan *Task @@ -55,9 +55,9 @@ type AsyncRunner struct { wg sync.WaitGroup } -// NewAsyncRunner creates a new AsyncRunner. -func NewAsyncRunner(name string, maxPendingDuration time.Duration) *AsyncRunner { - s := &AsyncRunner{ +// NewConcurrentRunner creates a new ConcurrentRunner. +func NewConcurrentRunner(name string, maxPendingDuration time.Duration) *ConcurrentRunner { + s := &ConcurrentRunner{ name: name, maxPendingDuration: maxPendingDuration, taskChan: make(chan *Task), @@ -74,7 +74,7 @@ type TaskOpts struct { } // Start starts the runner. -func (s *AsyncRunner) Start() { +func (s *ConcurrentRunner) Start() { s.stopChan = make(chan struct{}) s.wg.Add(1) go func() { @@ -100,7 +100,7 @@ func (s *AsyncRunner) Start() { }() } -func (s *AsyncRunner) run(ctx context.Context, task func(context.Context), token *TaskToken) { +func (s *ConcurrentRunner) run(ctx context.Context, task func(context.Context), token *TaskToken) { task(ctx) if token != nil { token.Release() @@ -108,7 +108,7 @@ func (s *AsyncRunner) run(ctx context.Context, task func(context.Context), token } } -func (s *AsyncRunner) processPendingTasks() { +func (s *ConcurrentRunner) processPendingTasks() { s.pendingMu.Lock() defer s.pendingMu.Unlock() for len(s.pendingTasks) > 0 { @@ -124,13 +124,13 @@ func (s *AsyncRunner) processPendingTasks() { } // Stop stops the runner. -func (s *AsyncRunner) Stop() { +func (s *ConcurrentRunner) Stop() { close(s.stopChan) s.wg.Wait() } // RunTask runs the task asynchronously. -func (s *AsyncRunner) RunTask(ctx context.Context, opt TaskOpts, f func(context.Context)) error { +func (s *ConcurrentRunner) RunTask(ctx context.Context, opt TaskOpts, f func(context.Context)) error { task := &Task{ Ctx: ctx, Opts: opt, @@ -169,7 +169,7 @@ func (*SyncRunner) RunTask(ctx context.Context, _ TaskOpts, f func(context.Conte } // Start starts the runner. -func (s *SyncRunner) Start() {} +func (*SyncRunner) Start() {} // Stop stops the runner. -func (s *SyncRunner) Stop() {} +func (*SyncRunner) Stop() {} diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 2d0bdd7b839..132633c8a1c 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -108,7 +108,7 @@ const ( minSnapshotDurationSec = 5 // heartbeat relative const - hbAsyncRunner = "heartbeat-async-task-runner" + hbConcurrentRunner = "heartbeat-async-task-runner" ) // Server is the interface for cluster. @@ -198,7 +198,7 @@ func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.Ba etcdClient: etcdClient, core: basicCluster, storage: storage, - taskRunner: ratelimit.NewAsyncRunner(hbAsyncRunner, time.Minute), + taskRunner: ratelimit.NewConcurrentRunner(hbConcurrentRunner, time.Minute), hbConcurrencyLimiter: ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU() * 2)), } } @@ -1029,7 +1029,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R TaskName: "HandleStatsAsync", Limit: limiter, }, - func(ctx context.Context) { + func(_ context.Context) { cluster.HandleStatsAsync(c, region) }, ) @@ -1053,7 +1053,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R TaskName: "ObserveRegionStatsAsync", Limit: limiter, }, - func(ctx context.Context) { + func(_ context.Context) { if c.regionStats.RegionStatsNeedUpdate(region) { cluster.Collect(c, region, hasRegionStats) } @@ -1086,7 +1086,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R TaskName: "HandleOverlaps", Limit: limiter, }, - func(ctx context.Context) { + func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, ) @@ -1102,7 +1102,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R TaskName: "CollectRegionStatsAsync", Limit: c.hbConcurrencyLimiter, }, - func(ctx context.Context) { + func(_ context.Context) { // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", // region stats needs to be collected in API mode. // We need to think of a better way to reduce this part of the cost in the future. @@ -1119,7 +1119,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R TaskName: "SaveRegionToKV", Limit: c.hbConcurrencyLimiter, }, - func(ctx context.Context) { + func(_ context.Context) { // If there are concurrent heartbeats from the same region, the last write will win even if // writes to storage in the critical area. So don't use mutex to protect it. // Not successfully saved to storage is not fatal, it only leads to longer warm-up @@ -2111,7 +2111,7 @@ func (c *RaftCluster) collectMetrics() { } func (c *RaftCluster) resetMetrics() { - resetHealthStatus() + c.resetHealthStatus() c.resetProgressIndicator() } @@ -2130,7 +2130,7 @@ func (c *RaftCluster) collectHealthStatus() { } } -func resetHealthStatus() { +func (*RaftCluster) resetHealthStatus() { healthStatusGauge.Reset() } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index fdb5f3909bf..bd921189e6f 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2139,7 +2139,7 @@ func newTestRaftCluster( opt *config.PersistOptions, s storage.Storage, ) *RaftCluster { - opt.GetScheduleConfig().EnableHeartbeatAsyncRunner = false + opt.GetScheduleConfig().EnableHeartbeatConcurrentRunner = false rc := &RaftCluster{serverCtx: ctx, core: core.NewBasicCluster(), storage: s} rc.InitCluster(id, opt, nil, nil) rc.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), rc, opt) diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 4d6db729c07..6cf5e0caff8 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -43,7 +43,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { tracer.Begin() ctx := context.WithValue(c.ctx, ctxutil.HeartbeatTracerKey, tracer) ctx = context.WithValue(ctx, ctxutil.LimiterKey, c.hbConcurrencyLimiter) - if c.GetScheduleConfig().EnableHeartbeatAsyncRunner { + if c.GetScheduleConfig().EnableHeartbeatConcurrentRunner { ctx = context.WithValue(ctx, ctxutil.TaskRunnerKey, c.taskRunner) }