Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: reorganize cluster start and stop process #7155

Merged
merged 5 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Cluster struct {
checkMembershipCh chan struct{}
apiServerLeader atomic.Value
clusterID uint64
running atomic.Bool
}

const regionLabelGCInterval = time.Hour
Expand Down Expand Up @@ -215,6 +216,9 @@ func (c *Cluster) updateScheduler() {
// Make sure the check will be triggered once later.
notifier <- struct{}{}
c.persistConfig.SetSchedulersUpdatingNotifier(notifier)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-c.ctx.Done():
Expand All @@ -224,6 +228,18 @@ func (c *Cluster) updateScheduler() {
// This is triggered by the watcher when the schedulers are updated.
}

if !c.running.Load() {
select {
case <-c.ctx.Done():
log.Info("cluster is closing, stop listening the schedulers updating notifier")
return
case <-ticker.C:
// retry
notifier <- struct{}{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible we have a deadlock here? Since the length of the channel is only 1 and if the scheduler config watcher just sent it before, it could be blocked here.

continue
}
}

log.Info("schedulers updating notifier is triggered, try to update the scheduler")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If stop server here, is there data race?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is the same as the current PD.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other word,is it possible to meet data race when add scheduler and coordinator wait at the same time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so but the possibility is much smaller than before.

Copy link
Member Author

@rleungx rleungx Sep 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way: we can check the cluster status before adding a scheduler every time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But there is still a gap between check status and adding scheduluer, if stop server here after checking the cluster status and before adding scheduler, it is possible to meet data race too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is the way we use the wait group for the scheduler controller is not proper instead of the wait group itself.

var (
schedulersController = c.coordinator.GetSchedulersController()
Expand Down Expand Up @@ -394,15 +410,29 @@ func (c *Cluster) runUpdateStoreStats() {
}
}

// runCoordinator runs the main scheduling loop.
func (c *Cluster) runCoordinator() {
defer logutil.LogPanic()
defer c.wg.Done()
c.coordinator.RunUntilStop()
}

// StartBackgroundJobs starts background jobs.
func (c *Cluster) StartBackgroundJobs() {
c.wg.Add(2)
c.wg.Add(3)
go c.updateScheduler()
go c.runUpdateStoreStats()
go c.runCoordinator()
c.running.Store(true)
}

// StopBackgroundJobs stops background jobs.
func (c *Cluster) StopBackgroundJobs() {
if !c.running.Load() {
return
}
c.running.Store(false)
c.coordinator.Stop()
c.cancel()
c.wg.Wait()
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,16 +444,12 @@ func (s *Server) startCluster(context.Context) error {
}
s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController())
s.cluster.StartBackgroundJobs()
go s.GetCoordinator().RunUntilStop()
return nil
}

func (s *Server) stopCluster() {
s.GetCoordinator().Stop()
s.cluster.StopBackgroundJobs()
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
s.stopWatcher()
}

func (s *Server) startWatcher() (err error) {
Expand All @@ -469,6 +465,12 @@ func (s *Server) startWatcher() (err error) {
return err
}

func (s *Server) stopWatcher() {
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
}

// GetPersistConfig returns the persist config.
// It's used to test.
func (s *Server) GetPersistConfig() *config.PersistConfig {
Expand Down
2 changes: 2 additions & 0 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage e

// Wait waits on all schedulers to exit.
func (c *Controller) Wait() {
c.Lock()
defer c.Unlock()
c.wg.Wait()
}

Expand Down