From 12b3da4f0419b6509d474d3a625fc62214cc1d9c Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 26 Sep 2023 15:44:24 +0800 Subject: [PATCH 1/4] reorganize cluster start and stop process Signed-off-by: Ryan Leung --- pkg/mcs/scheduling/server/cluster.go | 32 +++++++++++++++++++++++++++- pkg/mcs/scheduling/server/server.go | 12 ++++++----- 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index b5e42b40ac8..50c27de6dcf 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -47,6 +47,7 @@ type Cluster struct { checkMembershipCh chan struct{} apiServerLeader atomic.Value clusterID uint64 + running atomic.Bool } const regionLabelGCInterval = time.Hour @@ -215,6 +216,9 @@ func (c *Cluster) updateScheduler() { // Make sure the check will be triggered once later. notifier <- struct{}{} c.persistConfig.SetSchedulersUpdatingNotifier(notifier) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { select { case <-c.ctx.Done(): @@ -224,6 +228,18 @@ func (c *Cluster) updateScheduler() { // This is triggered by the watcher when the schedulers are updated. } + if !c.running.Load() { + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop listening the schedulers updating notifier") + return + case <-ticker.C: + // retry + notifier <- struct{}{} + continue + } + } + log.Info("schedulers updating notifier is triggered, try to update the scheduler") var ( schedulersController = c.coordinator.GetSchedulersController() @@ -394,15 +410,29 @@ func (c *Cluster) runUpdateStoreStats() { } } +// runCoordinator runs the main scheduling loop. +func (c *Cluster) runCoordinator() { + defer logutil.LogPanic() + defer c.wg.Done() + c.coordinator.RunUntilStop() +} + // StartBackgroundJobs starts background jobs. func (c *Cluster) StartBackgroundJobs() { - c.wg.Add(2) + c.wg.Add(3) go c.updateScheduler() go c.runUpdateStoreStats() + go c.runCoordinator() + c.running.Store(true) } // StopBackgroundJobs stops background jobs. func (c *Cluster) StopBackgroundJobs() { + if !c.running.Load() { + return + } + c.running.Store(false) + c.coordinator.Stop() c.cancel() c.wg.Wait() } diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index c1aecc2f18b..78892378e21 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -444,16 +444,12 @@ func (s *Server) startCluster(context.Context) error { } s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController()) s.cluster.StartBackgroundJobs() - go s.GetCoordinator().RunUntilStop() return nil } func (s *Server) stopCluster() { - s.GetCoordinator().Stop() s.cluster.StopBackgroundJobs() - s.ruleWatcher.Close() - s.configWatcher.Close() - s.metaWatcher.Close() + s.stopWatcher() } func (s *Server) startWatcher() (err error) { @@ -469,6 +465,12 @@ func (s *Server) startWatcher() (err error) { return err } +func (s *Server) stopWatcher() { + s.ruleWatcher.Close() + s.configWatcher.Close() + s.metaWatcher.Close() +} + // GetPersistConfig returns the persist config. // It's used to test. func (s *Server) GetPersistConfig() *config.PersistConfig { From 40887ae5401cd1223e7a1549e1595f150de1fe74 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 26 Sep 2023 17:21:25 +0800 Subject: [PATCH 2/4] add lock Signed-off-by: Ryan Leung --- pkg/schedule/schedulers/scheduler_controller.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index 46b4947b6cd..d58a78ca82f 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -68,6 +68,8 @@ func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage e // Wait waits on all schedulers to exit. func (c *Controller) Wait() { + c.Lock() + defer c.Unlock() c.wg.Wait() } From 035489ca551750e3af16c846e460e3453a662fb6 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Sun, 8 Oct 2023 17:40:48 +0800 Subject: [PATCH 3/4] address the comment Signed-off-by: Ryan Leung --- pkg/mcs/scheduling/server/cluster.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 50c27de6dcf..db753a8647d 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -214,7 +214,12 @@ func (c *Cluster) updateScheduler() { // Establish a notifier to listen the schedulers updating. notifier := make(chan struct{}, 1) // Make sure the check will be triggered once later. - notifier <- struct{}{} + select { + case notifier <- struct{}{}: + // If the channel is not empty, it means the check is triggered. + default: + } + c.persistConfig.SetSchedulersUpdatingNotifier(notifier) ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -235,7 +240,11 @@ func (c *Cluster) updateScheduler() { return case <-ticker.C: // retry - notifier <- struct{}{} + select { + case notifier <- struct{}{}: + // If the channel is not empty, it means the check is triggered. + default: + } continue } } From 891a32269bf380eea36f98e6e56c5ba8e553dbce Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 9 Oct 2023 10:27:04 +0800 Subject: [PATCH 4/4] address the comment Signed-off-by: Ryan Leung --- pkg/mcs/scheduling/server/cluster.go | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index db753a8647d..9fd420f76c3 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -204,6 +204,14 @@ func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool { return c.apiServerLeader.CompareAndSwap(old, new) } +func trySend(notifier chan struct{}) { + select { + case notifier <- struct{}{}: + // If the channel is not empty, it means the check is triggered. + default: + } +} + // updateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion. func (c *Cluster) updateScheduler() { defer logutil.LogPanic() @@ -214,12 +222,7 @@ func (c *Cluster) updateScheduler() { // Establish a notifier to listen the schedulers updating. notifier := make(chan struct{}, 1) // Make sure the check will be triggered once later. - select { - case notifier <- struct{}{}: - // If the channel is not empty, it means the check is triggered. - default: - } - + trySend(notifier) c.persistConfig.SetSchedulersUpdatingNotifier(notifier) ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -240,11 +243,7 @@ func (c *Cluster) updateScheduler() { return case <-ticker.C: // retry - select { - case notifier <- struct{}{}: - // If the channel is not empty, it means the check is triggered. - default: - } + trySend(notifier) continue } }