Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Apr 11, 2024
1 parent 2a91f8c commit 4de6106
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 29 deletions.
4 changes: 2 additions & 2 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
TaskName: "Log",
Limit: limiter,
},
func(ctx context.Context) {
func(_ context.Context) {
d(msg, fields...)
},
)
Expand All @@ -753,7 +753,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
TaskName: "Log",
Limit: limiter,
},
func(ctx context.Context) {
func(_ context.Context) {
i(msg, fields...)
},
)
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ func (c *Cluster) processRegionHeartbeat(ctx context.Context, region *core.Regio
TaskName: "HandleStatsAsync",
Limit: limiter,
},
func(ctx context.Context) {
func(_ context.Context) {
cluster.HandleStatsAsync(c, region)
},
)
Expand All @@ -618,7 +618,7 @@ func (c *Cluster) processRegionHeartbeat(ctx context.Context, region *core.Regio
TaskName: "ObserveRegionStatsAsync",
Limit: limiter,
},
func(ctx context.Context) {
func(_ context.Context) {
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
}
Expand Down Expand Up @@ -646,7 +646,7 @@ func (c *Cluster) processRegionHeartbeat(ctx context.Context, region *core.Regio
TaskName: "HandleOverlaps",
Limit: limiter,
},
func(ctx context.Context) {
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
},
)
Expand All @@ -659,7 +659,7 @@ func (c *Cluster) processRegionHeartbeat(ctx context.Context, region *core.Regio
TaskName: "CollectRegionStatsAsync",
Limit: c.hbConcurrencyLimiter,
},
func(ctx context.Context) {
func(_ context.Context) {
cluster.Collect(c, region, hasRegionStats)
},
)
Expand Down
24 changes: 12 additions & 12 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type Task struct {
// ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum.
var ErrMaxWaitingTasksExceeded = errors.New("max waiting tasks exceeded")

// AsyncRunner is a simple task runner that limits the number of concurrent tasks.
type AsyncRunner struct {
// ConcurrentRunner is a simple task runner that limits the number of concurrent tasks.
type ConcurrentRunner struct {
name string
maxPendingDuration time.Duration
taskChan chan *Task
Expand All @@ -55,9 +55,9 @@ type AsyncRunner struct {
wg sync.WaitGroup
}

// NewAsyncRunner creates a new AsyncRunner.
func NewAsyncRunner(name string, maxPendingDuration time.Duration) *AsyncRunner {
s := &AsyncRunner{
// NewConcurrentRunner creates a new ConcurrentRunner.
func NewConcurrentRunner(name string, maxPendingDuration time.Duration) *ConcurrentRunner {
s := &ConcurrentRunner{
name: name,
maxPendingDuration: maxPendingDuration,
taskChan: make(chan *Task),
Expand All @@ -74,7 +74,7 @@ type TaskOpts struct {
}

// Start starts the runner.
func (s *AsyncRunner) Start() {
func (s *ConcurrentRunner) Start() {
s.stopChan = make(chan struct{})
s.wg.Add(1)
go func() {
Expand All @@ -100,15 +100,15 @@ func (s *AsyncRunner) Start() {
}()
}

func (s *AsyncRunner) run(ctx context.Context, task func(context.Context), token *TaskToken) {
func (s *ConcurrentRunner) run(ctx context.Context, task func(context.Context), token *TaskToken) {
task(ctx)
if token != nil {
token.Release()
s.processPendingTasks()
}
}

func (s *AsyncRunner) processPendingTasks() {
func (s *ConcurrentRunner) processPendingTasks() {
s.pendingMu.Lock()
defer s.pendingMu.Unlock()
for len(s.pendingTasks) > 0 {
Expand All @@ -124,13 +124,13 @@ func (s *AsyncRunner) processPendingTasks() {
}

// Stop stops the runner.
func (s *AsyncRunner) Stop() {
func (s *ConcurrentRunner) Stop() {
close(s.stopChan)
s.wg.Wait()
}

// RunTask runs the task asynchronously.
func (s *AsyncRunner) RunTask(ctx context.Context, opt TaskOpts, f func(context.Context)) error {
func (s *ConcurrentRunner) RunTask(ctx context.Context, opt TaskOpts, f func(context.Context)) error {
task := &Task{
Ctx: ctx,
Opts: opt,
Expand Down Expand Up @@ -169,7 +169,7 @@ func (*SyncRunner) RunTask(ctx context.Context, _ TaskOpts, f func(context.Conte
}

// Start starts the runner.
func (s *SyncRunner) Start() {}
func (*SyncRunner) Start() {}

// Stop stops the runner.
func (s *SyncRunner) Stop() {}
func (*SyncRunner) Stop() {}
18 changes: 9 additions & 9 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ const (
minSnapshotDurationSec = 5

// heartbeat relative const
hbAsyncRunner = "heartbeat-async-task-runner"
hbConcurrentRunner = "heartbeat-async-task-runner"
)

// Server is the interface for cluster.
Expand Down Expand Up @@ -198,7 +198,7 @@ func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.Ba
etcdClient: etcdClient,
core: basicCluster,
storage: storage,
taskRunner: ratelimit.NewAsyncRunner(hbAsyncRunner, time.Minute),
taskRunner: ratelimit.NewConcurrentRunner(hbConcurrentRunner, time.Minute),
hbConcurrencyLimiter: ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU() * 2)),
}
}
Expand Down Expand Up @@ -1029,7 +1029,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R
TaskName: "HandleStatsAsync",
Limit: limiter,
},
func(ctx context.Context) {
func(_ context.Context) {
cluster.HandleStatsAsync(c, region)
},
)
Expand All @@ -1053,7 +1053,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R
TaskName: "ObserveRegionStatsAsync",
Limit: limiter,
},
func(ctx context.Context) {
func(_ context.Context) {
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
}
Expand Down Expand Up @@ -1086,7 +1086,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R
TaskName: "HandleOverlaps",
Limit: limiter,
},
func(ctx context.Context) {
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
},
)
Expand All @@ -1102,7 +1102,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R
TaskName: "CollectRegionStatsAsync",
Limit: c.hbConcurrencyLimiter,
},
func(ctx context.Context) {
func(_ context.Context) {
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
// region stats needs to be collected in API mode.
// We need to think of a better way to reduce this part of the cost in the future.
Expand All @@ -1119,7 +1119,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R
TaskName: "SaveRegionToKV",
Limit: c.hbConcurrencyLimiter,
},
func(ctx context.Context) {
func(_ context.Context) {
// If there are concurrent heartbeats from the same region, the last write will win even if
// writes to storage in the critical area. So don't use mutex to protect it.
// Not successfully saved to storage is not fatal, it only leads to longer warm-up
Expand Down Expand Up @@ -2111,7 +2111,7 @@ func (c *RaftCluster) collectMetrics() {
}

func (c *RaftCluster) resetMetrics() {
resetHealthStatus()
c.resetHealthStatus()
c.resetProgressIndicator()
}

Expand All @@ -2130,7 +2130,7 @@ func (c *RaftCluster) collectHealthStatus() {
}
}

func resetHealthStatus() {
func (*RaftCluster) resetHealthStatus() {
healthStatusGauge.Reset()
}

Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2139,7 +2139,7 @@ func newTestRaftCluster(
opt *config.PersistOptions,
s storage.Storage,
) *RaftCluster {
opt.GetScheduleConfig().EnableHeartbeatAsyncRunner = false
opt.GetScheduleConfig().EnableHeartbeatConcurrentRunner = false
rc := &RaftCluster{serverCtx: ctx, core: core.NewBasicCluster(), storage: s}
rc.InitCluster(id, opt, nil, nil)
rc.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), rc, opt)
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
tracer.Begin()
ctx := context.WithValue(c.ctx, ctxutil.HeartbeatTracerKey, tracer)
ctx = context.WithValue(ctx, ctxutil.LimiterKey, c.hbConcurrencyLimiter)
if c.GetScheduleConfig().EnableHeartbeatAsyncRunner {
if c.GetScheduleConfig().EnableHeartbeatConcurrentRunner {
ctx = context.WithValue(ctx, ctxutil.TaskRunnerKey, c.taskRunner)
}

Expand Down

0 comments on commit 4de6106

Please sign in to comment.