Skip to content

Commit

Permalink
coordinator, mcs/scheduling: fix the default schedulers initialization (
Browse files Browse the repository at this point in the history
tikv#7236)

close tikv#7169

Fix the default scheduler initialization of the scheduling service.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato authored and rleungx committed Dec 1, 2023
1 parent ffb312e commit 528071b
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 23 deletions.
18 changes: 13 additions & 5 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"os"
"path/filepath"
"reflect"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -232,6 +233,14 @@ func (o *PersistConfig) getSchedulersUpdatingNotifier() chan<- struct{} {
return v.(chan<- struct{})
}

func (o *PersistConfig) tryNotifySchedulersUpdating() {
notifier := o.getSchedulersUpdatingNotifier()
if notifier == nil {
return
}
notifier <- struct{}{}
}

// GetClusterVersion returns the cluster version.
func (o *PersistConfig) GetClusterVersion() *semver.Version {
return (*semver.Version)(atomic.LoadPointer(&o.clusterVersion))
Expand All @@ -251,11 +260,10 @@ func (o *PersistConfig) GetScheduleConfig() *sc.ScheduleConfig {
func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) {
old := o.GetScheduleConfig()
o.schedule.Store(cfg)
// The coordinator is not aware of the underlying scheduler config changes, however, it
// should react on the scheduler number changes to handle the add/remove scheduler events.
if notifier := o.getSchedulersUpdatingNotifier(); notifier != nil &&
len(old.Schedulers) != len(cfg.Schedulers) {
notifier <- struct{}{}
// The coordinator is not aware of the underlying scheduler config changes,
// we should notify it to update the schedulers proactively.
if !reflect.DeepEqual(old.Schedulers, cfg.Schedulers) {
o.tryNotifySchedulersUpdating()
}
}

Expand Down
21 changes: 15 additions & 6 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,13 +421,16 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
log.Error("can not create scheduler with independent configuration", zap.String("scheduler-name", name), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
continue
}
log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName()))
if needRun {
log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName()))
if err = c.schedulers.AddScheduler(s); err != nil {
log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
}
} else if err = c.schedulers.AddSchedulerHandler(s); err != nil {
log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
} else {
log.Info("create scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()))
if err = c.schedulers.AddSchedulerHandler(s); err != nil {
log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
}
}
}

Expand All @@ -447,17 +450,23 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
continue
}

log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args))
if needRun {
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args))
if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
} else {
// Only records the valid scheduler config.
scheduleCfg.Schedulers[k] = schedulerCfg
k++
}
} else if err = c.schedulers.AddSchedulerHandler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
} else {
log.Info("create scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args))
if err = c.schedulers.AddSchedulerHandler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
} else {
scheduleCfg.Schedulers[k] = schedulerCfg
k++
}
}
}

Expand Down
38 changes: 26 additions & 12 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,18 +265,32 @@ func (suite *serverTestSuite) TestSchedulerSync() {
api.MustDeleteScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName)
checkEvictLeaderSchedulerExist(re, schedulersController, false)

// TODO: test more schedulers.
// Fixme: the following code will fail because the scheduler is not removed but not synced.
// checkDelete := func(schedulerName string) {
// re.NotNil(schedulersController.GetScheduler(schedulers.BalanceLeaderName) != nil)
// api.MustDeleteScheduler(re, suite.backendEndpoints, schedulers.BalanceLeaderName)
// testutil.Eventually(re, func() bool {
// return schedulersController.GetScheduler(schedulers.BalanceLeaderName) == nil
// })
// }
// checkDelete(schedulers.BalanceLeaderName)
// checkDelete(schedulers.BalanceRegionName)
// checkDelete(schedulers.HotRegionName)
// The default scheduler could not be deleted, it could only be disabled.
defaultSchedulerNames := []string{
schedulers.BalanceLeaderName,
schedulers.BalanceRegionName,
schedulers.BalanceWitnessName,
schedulers.HotRegionName,
schedulers.TransferWitnessLeaderName,
}
checkDisabled := func(name string, shouldDisabled bool) {
re.NotNil(schedulersController.GetScheduler(name), name)
testutil.Eventually(re, func() bool {
disabled, err := schedulersController.IsSchedulerDisabled(name)
re.NoError(err, name)
return disabled == shouldDisabled
})
}
for _, name := range defaultSchedulerNames {
checkDisabled(name, false)
api.MustDeleteScheduler(re, suite.backendEndpoints, name)
checkDisabled(name, true)
}
for _, name := range defaultSchedulerNames {
checkDisabled(name, true)
api.MustAddScheduler(re, suite.backendEndpoints, name, nil)
checkDisabled(name, false)
}
}

func checkEvictLeaderSchedulerExist(re *require.Assertions, sc *schedulers.Controller, exist bool) {
Expand Down

0 comments on commit 528071b

Please sign in to comment.