From fe848299f28a11d47e11bd93b506d6d0f1c0477f Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 22 Nov 2023 17:17:42 +0800 Subject: [PATCH] schedulers: add reload config function (#7406) close tikv/pd#7257 Signed-off-by: lhy1024 --- pkg/schedule/schedulers/balance_leader.go | 22 +- pkg/schedule/schedulers/balance_region.go | 1 + pkg/schedule/schedulers/balance_witness.go | 22 +- pkg/schedule/schedulers/evict_leader.go | 72 +++---- pkg/schedule/schedulers/evict_slow_store.go | 75 +++++-- pkg/schedule/schedulers/evict_slow_trend.go | 89 ++++++-- pkg/schedule/schedulers/grant_hot_region.go | 45 ++-- pkg/schedule/schedulers/grant_leader.go | 52 ++--- pkg/schedule/schedulers/hot_region.go | 38 ++++ pkg/schedule/schedulers/init.go | 2 + pkg/schedule/schedulers/label.go | 1 + pkg/schedule/schedulers/random_merge.go | 1 + pkg/schedule/schedulers/scatter_range.go | 47 ++-- .../schedulers/scheduler_controller.go | 3 +- pkg/schedule/schedulers/shuffle_hot_region.go | 106 ++++++++- pkg/schedule/schedulers/shuffle_leader.go | 1 + pkg/schedule/schedulers/split_bucket.go | 22 +- .../schedulers/transfer_witness_leader.go | 1 + pkg/schedule/schedulers/utils.go | 16 ++ pkg/window/policy.go | 5 +- server/cluster/scheduling_controller.go | 3 +- tests/pdctl/scheduler/scheduler_test.go | 202 ++++++++++-------- tests/server/api/scheduler_test.go | 38 ++-- tools/pd-ctl/pdctl/command/scheduler.go | 20 ++ tools/pd-simulator/simulator/node.go | 5 +- 25 files changed, 605 insertions(+), 284 deletions(-) diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index e5516317f46..6a64edf7e70 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -67,7 +67,7 @@ var ( ) type balanceLeaderSchedulerConfig struct { - mu syncutil.RWMutex + syncutil.RWMutex storage endpoint.ConfigStorage Ranges []core.KeyRange `json:"ranges"` // Batch is used to generate multiple operators by one scheduling @@ -75,8 +75,8 @@ type balanceLeaderSchedulerConfig struct { } func (conf *balanceLeaderSchedulerConfig) Update(data []byte) (int, interface{}) { - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() oldc, _ := json.Marshal(conf) @@ -109,8 +109,8 @@ func (conf *balanceLeaderSchedulerConfig) validate() bool { } func (conf *balanceLeaderSchedulerConfig) Clone() *balanceLeaderSchedulerConfig { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() ranges := make([]core.KeyRange, len(conf.Ranges)) copy(ranges, conf.Ranges) return &balanceLeaderSchedulerConfig{ @@ -210,14 +210,14 @@ func (l *balanceLeaderScheduler) GetType() string { } func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) { - l.conf.mu.RLock() - defer l.conf.mu.RUnlock() + l.conf.RLock() + defer l.conf.RUnlock() return EncodeConfig(l.conf) } func (l *balanceLeaderScheduler) ReloadConfig() error { - l.conf.mu.Lock() - defer l.conf.mu.Unlock() + l.conf.Lock() + defer l.conf.Unlock() cfgData, err := l.conf.storage.LoadSchedulerConfig(l.GetName()) if err != nil { return err @@ -335,8 +335,8 @@ func (cs *candidateStores) resortStoreWithPos(pos int) { } func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { - l.conf.mu.RLock() - defer l.conf.mu.RUnlock() + l.conf.RLock() + defer l.conf.RUnlock() basePlan := plan.NewBalanceSchedulerPlan() var collector *plan.Collector if dryRun { diff --git a/pkg/schedule/schedulers/balance_region.go b/pkg/schedule/schedulers/balance_region.go index 1343600af06..1cef3a4615b 100644 --- a/pkg/schedule/schedulers/balance_region.go +++ b/pkg/schedule/schedulers/balance_region.go @@ -51,6 +51,7 @@ var ( type balanceRegionSchedulerConfig struct { Name string `json:"name"` Ranges []core.KeyRange `json:"ranges"` + // TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler. } type balanceRegionScheduler struct { diff --git a/pkg/schedule/schedulers/balance_witness.go b/pkg/schedule/schedulers/balance_witness.go index e9bab6c1bc7..bf3fbbb83da 100644 --- a/pkg/schedule/schedulers/balance_witness.go +++ b/pkg/schedule/schedulers/balance_witness.go @@ -53,7 +53,7 @@ const ( ) type balanceWitnessSchedulerConfig struct { - mu syncutil.RWMutex + syncutil.RWMutex storage endpoint.ConfigStorage Ranges []core.KeyRange `json:"ranges"` // Batch is used to generate multiple operators by one scheduling @@ -61,8 +61,8 @@ type balanceWitnessSchedulerConfig struct { } func (conf *balanceWitnessSchedulerConfig) Update(data []byte) (int, interface{}) { - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() oldc, _ := json.Marshal(conf) @@ -95,8 +95,8 @@ func (conf *balanceWitnessSchedulerConfig) validate() bool { } func (conf *balanceWitnessSchedulerConfig) Clone() *balanceWitnessSchedulerConfig { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() ranges := make([]core.KeyRange, len(conf.Ranges)) copy(ranges, conf.Ranges) return &balanceWitnessSchedulerConfig{ @@ -205,14 +205,14 @@ func (b *balanceWitnessScheduler) GetType() string { } func (b *balanceWitnessScheduler) EncodeConfig() ([]byte, error) { - b.conf.mu.RLock() - defer b.conf.mu.RUnlock() + b.conf.RLock() + defer b.conf.RUnlock() return EncodeConfig(b.conf) } func (b *balanceWitnessScheduler) ReloadConfig() error { - b.conf.mu.Lock() - defer b.conf.mu.Unlock() + b.conf.Lock() + defer b.conf.Unlock() cfgData, err := b.conf.storage.LoadSchedulerConfig(b.GetName()) if err != nil { return err @@ -238,8 +238,8 @@ func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.SchedulerCluste } func (b *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { - b.conf.mu.RLock() - defer b.conf.mu.RUnlock() + b.conf.RLock() + defer b.conf.RUnlock() basePlan := plan.NewBalanceSchedulerPlan() var collector *plan.Collector if dryRun { diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 332002043a3..879aa9869b3 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -56,7 +56,7 @@ var ( ) type evictLeaderSchedulerConfig struct { - mu syncutil.RWMutex + syncutil.RWMutex storage endpoint.ConfigStorage StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` cluster *core.BasicCluster @@ -64,8 +64,8 @@ type evictLeaderSchedulerConfig struct { } func (conf *evictLeaderSchedulerConfig) getStores() []uint64 { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() stores := make([]uint64, 0, len(conf.StoreIDWithRanges)) for storeID := range conf.StoreIDWithRanges { stores = append(stores, storeID) @@ -86,15 +86,15 @@ func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error { if err != nil { return err } - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() conf.StoreIDWithRanges[id] = ranges return nil } func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() storeIDWithRanges := make(map[uint64][]core.KeyRange) for id, ranges := range conf.StoreIDWithRanges { storeIDWithRanges[id] = append(storeIDWithRanges[id], ranges...) @@ -106,8 +106,8 @@ func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig { func (conf *evictLeaderSchedulerConfig) Persist() error { name := conf.getSchedulerName() - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() data, err := EncodeConfig(conf) failpoint.Inject("persistFail", func() { err = errors.New("fail to persist") @@ -123,8 +123,8 @@ func (conf *evictLeaderSchedulerConfig) getSchedulerName() string { } func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() ranges := conf.StoreIDWithRanges[id] res := make([]string, 0, len(ranges)*2) for index := range ranges { @@ -134,8 +134,8 @@ func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string { } func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last bool) { - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() _, exists := conf.StoreIDWithRanges[id] succ, last = false, false if exists { @@ -148,15 +148,15 @@ func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last } func (conf *evictLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) { - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() conf.cluster.PauseLeaderTransfer(id) conf.StoreIDWithRanges[id] = keyRange } func (conf *evictLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() if ranges, exist := conf.StoreIDWithRanges[id]; exist { return ranges } @@ -199,14 +199,14 @@ func (s *evictLeaderScheduler) GetType() string { } func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { - s.conf.mu.RLock() - defer s.conf.mu.RUnlock() + s.conf.RLock() + defer s.conf.RUnlock() return EncodeConfig(s.conf) } func (s *evictLeaderScheduler) ReloadConfig() error { - s.conf.mu.Lock() - defer s.conf.mu.Unlock() + s.conf.Lock() + defer s.conf.Unlock() cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) if err != nil { return err @@ -223,25 +223,9 @@ func (s *evictLeaderScheduler) ReloadConfig() error { return nil } -// pauseAndResumeLeaderTransfer checks the old and new store IDs, and pause or resume the leader transfer. -func pauseAndResumeLeaderTransfer(cluster *core.BasicCluster, old, new map[uint64][]core.KeyRange) { - for id := range old { - if _, ok := new[id]; ok { - continue - } - cluster.ResumeLeaderTransfer(id) - } - for id := range new { - if _, ok := old[id]; ok { - continue - } - cluster.PauseLeaderTransfer(id) - } -} - func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { - s.conf.mu.RLock() - defer s.conf.mu.RUnlock() + s.conf.RLock() + defer s.conf.RUnlock() var res error for id := range s.conf.StoreIDWithRanges { if err := cluster.PauseLeaderTransfer(id); err != nil { @@ -252,8 +236,8 @@ func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) erro } func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { - s.conf.mu.RLock() - defer s.conf.mu.RUnlock() + s.conf.RLock() + defer s.conf.RUnlock() for id := range s.conf.StoreIDWithRanges { cluster.ResumeLeaderTransfer(id) } @@ -382,15 +366,15 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R idFloat, ok := input["store_id"].(float64) if ok { id = (uint64)(idFloat) - handler.config.mu.RLock() + handler.config.RLock() if _, exists = handler.config.StoreIDWithRanges[id]; !exists { if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil { - handler.config.mu.RUnlock() + handler.config.RUnlock() handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } } - handler.config.mu.RUnlock() + handler.config.RUnlock() args = append(args, strconv.FormatUint(id, 10)) } diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index 563f9f68c45..713920828cc 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -16,7 +16,6 @@ package schedulers import ( "net/http" - "sync/atomic" "time" "github.com/gorilla/mux" @@ -29,6 +28,7 @@ import ( "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" "go.uber.org/zap" ) @@ -47,6 +47,8 @@ const ( var evictSlowStoreCounter = schedulerCounter.WithLabelValues(EvictSlowStoreName, "schedule") type evictSlowStoreSchedulerConfig struct { + syncutil.RWMutex + cluster *core.BasicCluster storage endpoint.ConfigStorage // Last timestamp of the chosen slow store for eviction. lastSlowStoreCaptureTS time.Time @@ -65,13 +67,15 @@ func initEvictSlowStoreSchedulerConfig(storage endpoint.ConfigStorage) *evictSlo } func (conf *evictSlowStoreSchedulerConfig) Clone() *evictSlowStoreSchedulerConfig { + conf.RLock() + defer conf.RUnlock() return &evictSlowStoreSchedulerConfig{ - RecoveryDurationGap: atomic.LoadUint64(&conf.RecoveryDurationGap), + RecoveryDurationGap: conf.RecoveryDurationGap, } } -func (conf *evictSlowStoreSchedulerConfig) Persist() error { - name := conf.getSchedulerName() +func (conf *evictSlowStoreSchedulerConfig) persistLocked() error { + name := EvictSlowStoreName data, err := EncodeConfig(conf) failpoint.Inject("persistFail", func() { err = errors.New("fail to persist") @@ -82,11 +86,9 @@ func (conf *evictSlowStoreSchedulerConfig) Persist() error { return conf.storage.SaveSchedulerConfig(name, data) } -func (conf *evictSlowStoreSchedulerConfig) getSchedulerName() string { - return EvictSlowStoreName -} - func (conf *evictSlowStoreSchedulerConfig) getStores() []uint64 { + conf.RLock() + defer conf.RUnlock() return conf.EvictedStores } @@ -98,15 +100,17 @@ func (conf *evictSlowStoreSchedulerConfig) getKeyRangesByID(id uint64) []core.Ke } func (conf *evictSlowStoreSchedulerConfig) evictStore() uint64 { - if len(conf.EvictedStores) == 0 { + if len(conf.getStores()) == 0 { return 0 } - return conf.EvictedStores[0] + return conf.getStores()[0] } // readyForRecovery checks whether the last cpatured candidate is ready for recovery. func (conf *evictSlowStoreSchedulerConfig) readyForRecovery() bool { - recoveryDurationGap := atomic.LoadUint64(&conf.RecoveryDurationGap) + conf.RLock() + defer conf.RUnlock() + recoveryDurationGap := conf.RecoveryDurationGap failpoint.Inject("transientRecoveryGap", func() { recoveryDurationGap = 0 }) @@ -114,17 +118,21 @@ func (conf *evictSlowStoreSchedulerConfig) readyForRecovery() bool { } func (conf *evictSlowStoreSchedulerConfig) setStoreAndPersist(id uint64) error { + conf.Lock() + defer conf.Unlock() conf.EvictedStores = []uint64{id} conf.lastSlowStoreCaptureTS = time.Now() - return conf.Persist() + return conf.persistLocked() } func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err error) { oldID = conf.evictStore() + conf.Lock() + defer conf.Unlock() if oldID > 0 { conf.EvictedStores = []uint64{} conf.lastSlowStoreCaptureTS = time.Time{} - err = conf.Persist() + err = conf.persistLocked() } return } @@ -155,9 +163,16 @@ func (handler *evictSlowStoreHandler) UpdateConfig(w http.ResponseWriter, r *htt handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) return } - recoveryDurationGap := (uint64)(recoveryDurationGapFloat) - prevRecoveryDurationGap := atomic.LoadUint64(&handler.config.RecoveryDurationGap) - atomic.StoreUint64(&handler.config.RecoveryDurationGap, recoveryDurationGap) + handler.config.Lock() + defer handler.config.Unlock() + prevRecoveryDurationGap := handler.config.RecoveryDurationGap + recoveryDurationGap := uint64(recoveryDurationGapFloat) + handler.config.RecoveryDurationGap = recoveryDurationGap + if err := handler.config.persistLocked(); err != nil { + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.config.RecoveryDurationGap = prevRecoveryDurationGap + return + } log.Info("evict-slow-store-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap)) handler.rd.JSON(w, http.StatusOK, nil) } @@ -189,6 +204,34 @@ func (s *evictSlowStoreScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +func (s *evictSlowStoreScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() + cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &evictSlowStoreSchedulerConfig{} + if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + old := make(map[uint64]struct{}) + for _, id := range s.conf.EvictedStores { + old[id] = struct{}{} + } + new := make(map[uint64]struct{}) + for _, id := range newCfg.EvictedStores { + new[id] = struct{}{} + } + pauseAndResumeLeaderTransfer(s.conf.cluster, old, new) + s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap + s.conf.EvictedStores = newCfg.EvictedStores + return nil +} + func (s *evictSlowStoreScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { evictStore := s.conf.evictStore() if evictStore != 0 { diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index 0d2c10e2bfe..53e096baec7 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -17,7 +17,6 @@ package schedulers import ( "net/http" "strconv" - "sync/atomic" "time" "github.com/gorilla/mux" @@ -30,6 +29,7 @@ import ( "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" "go.uber.org/zap" ) @@ -54,6 +54,8 @@ type slowCandidate struct { } type evictSlowTrendSchedulerConfig struct { + syncutil.RWMutex + cluster *core.BasicCluster storage endpoint.ConfigStorage // Candidate for eviction in current tick. evictCandidate slowCandidate @@ -76,13 +78,15 @@ func initEvictSlowTrendSchedulerConfig(storage endpoint.ConfigStorage) *evictSlo } func (conf *evictSlowTrendSchedulerConfig) Clone() *evictSlowTrendSchedulerConfig { + conf.RLock() + defer conf.RUnlock() return &evictSlowTrendSchedulerConfig{ - RecoveryDurationGap: atomic.LoadUint64(&conf.RecoveryDurationGap), + RecoveryDurationGap: conf.RecoveryDurationGap, } } -func (conf *evictSlowTrendSchedulerConfig) Persist() error { - name := conf.getSchedulerName() +func (conf *evictSlowTrendSchedulerConfig) persistLocked() error { + name := EvictSlowTrendName data, err := EncodeConfig(conf) failpoint.Inject("persistFail", func() { err = errors.New("fail to persist") @@ -93,11 +97,9 @@ func (conf *evictSlowTrendSchedulerConfig) Persist() error { return conf.storage.SaveSchedulerConfig(name, data) } -func (conf *evictSlowTrendSchedulerConfig) getSchedulerName() string { - return EvictSlowTrendName -} - func (conf *evictSlowTrendSchedulerConfig) getStores() []uint64 { + conf.RLock() + defer conf.RUnlock() return conf.EvictedStores } @@ -109,6 +111,8 @@ func (conf *evictSlowTrendSchedulerConfig) getKeyRangesByID(id uint64) []core.Ke } func (conf *evictSlowTrendSchedulerConfig) hasEvictedStores() bool { + conf.RLock() + defer conf.RUnlock() return len(conf.EvictedStores) > 0 } @@ -116,6 +120,8 @@ func (conf *evictSlowTrendSchedulerConfig) evictedStore() uint64 { if !conf.hasEvictedStores() { return 0 } + conf.RLock() + defer conf.RUnlock() // If a candidate passes all checks and proved to be slow, it will be // recorded in `conf.EvictStores`, and `conf.lastEvictCandidate` will record // the captured timestamp of this store. @@ -123,18 +129,26 @@ func (conf *evictSlowTrendSchedulerConfig) evictedStore() uint64 { } func (conf *evictSlowTrendSchedulerConfig) candidate() uint64 { + conf.RLock() + defer conf.RUnlock() return conf.evictCandidate.storeID } func (conf *evictSlowTrendSchedulerConfig) captureTS() time.Time { + conf.RLock() + defer conf.RUnlock() return conf.evictCandidate.captureTS } func (conf *evictSlowTrendSchedulerConfig) candidateCapturedSecs() uint64 { + conf.RLock() + defer conf.RUnlock() return DurationSinceAsSecs(conf.evictCandidate.captureTS) } func (conf *evictSlowTrendSchedulerConfig) lastCapturedCandidate() *slowCandidate { + conf.RLock() + defer conf.RUnlock() return &conf.lastEvictCandidate } @@ -144,7 +158,9 @@ func (conf *evictSlowTrendSchedulerConfig) lastCandidateCapturedSecs() uint64 { // readyForRecovery checks whether the last cpatured candidate is ready for recovery. func (conf *evictSlowTrendSchedulerConfig) readyForRecovery() bool { - recoveryDurationGap := atomic.LoadUint64(&conf.RecoveryDurationGap) + conf.RLock() + defer conf.RUnlock() + recoveryDurationGap := conf.RecoveryDurationGap failpoint.Inject("transientRecoveryGap", func() { recoveryDurationGap = 0 }) @@ -152,6 +168,8 @@ func (conf *evictSlowTrendSchedulerConfig) readyForRecovery() bool { } func (conf *evictSlowTrendSchedulerConfig) captureCandidate(id uint64) { + conf.Lock() + defer conf.Unlock() conf.evictCandidate = slowCandidate{ storeID: id, captureTS: time.Now(), @@ -163,6 +181,8 @@ func (conf *evictSlowTrendSchedulerConfig) captureCandidate(id uint64) { } func (conf *evictSlowTrendSchedulerConfig) popCandidate(updLast bool) uint64 { + conf.Lock() + defer conf.Unlock() id := conf.evictCandidate.storeID if updLast { conf.lastEvictCandidate = conf.evictCandidate @@ -172,14 +192,18 @@ func (conf *evictSlowTrendSchedulerConfig) popCandidate(updLast bool) uint64 { } func (conf *evictSlowTrendSchedulerConfig) markCandidateRecovered() { + conf.Lock() + defer conf.Unlock() if conf.lastEvictCandidate != (slowCandidate{}) { conf.lastEvictCandidate.recoverTS = time.Now() } } func (conf *evictSlowTrendSchedulerConfig) setStoreAndPersist(id uint64) error { + conf.Lock() + defer conf.Unlock() conf.EvictedStores = []uint64{id} - return conf.Persist() + return conf.persistLocked() } func (conf *evictSlowTrendSchedulerConfig) clearAndPersist(cluster sche.SchedulerCluster) (oldID uint64, err error) { @@ -193,8 +217,10 @@ func (conf *evictSlowTrendSchedulerConfig) clearAndPersist(cluster sche.Schedule address = store.GetAddress() } storeSlowTrendEvictedStatusGauge.WithLabelValues(address, strconv.FormatUint(oldID, 10)).Set(0) + conf.Lock() + defer conf.Unlock() conf.EvictedStores = []uint64{} - return oldID, conf.Persist() + return oldID, conf.persistLocked() } type evictSlowTrendHandler struct { @@ -223,9 +249,16 @@ func (handler *evictSlowTrendHandler) UpdateConfig(w http.ResponseWriter, r *htt handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) return } - recoveryDurationGap := (uint64)(recoveryDurationGapFloat) - prevRecoveryDurationGap := atomic.LoadUint64(&handler.config.RecoveryDurationGap) - atomic.StoreUint64(&handler.config.RecoveryDurationGap, recoveryDurationGap) + handler.config.Lock() + defer handler.config.Unlock() + prevRecoveryDurationGap := handler.config.RecoveryDurationGap + recoveryDurationGap := uint64(recoveryDurationGapFloat) + handler.config.RecoveryDurationGap = recoveryDurationGap + if err := handler.config.persistLocked(); err != nil { + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.config.RecoveryDurationGap = prevRecoveryDurationGap + return + } log.Info("evict-slow-trend-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap)) handler.rd.JSON(w, http.StatusOK, nil) } @@ -270,6 +303,34 @@ func (s *evictSlowTrendScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +func (s *evictSlowTrendScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() + cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &evictSlowTrendSchedulerConfig{} + if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + old := make(map[uint64]struct{}) + for _, id := range s.conf.EvictedStores { + old[id] = struct{}{} + } + new := make(map[uint64]struct{}) + for _, id := range newCfg.EvictedStores { + new[id] = struct{}{} + } + pauseAndResumeLeaderTransfer(s.conf.cluster, old, new) + s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap + s.conf.EvictedStores = newCfg.EvictedStores + return nil +} + func (s *evictSlowTrendScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { evictedStoreID := s.conf.evictedStore() if evictedStoreID == 0 { diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index 5a68da069b8..6ab689ea5d4 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -54,7 +54,7 @@ var ( ) type grantHotRegionSchedulerConfig struct { - mu syncutil.RWMutex + syncutil.RWMutex storage endpoint.ConfigStorage cluster *core.BasicCluster StoreIDs []uint64 `json:"store-id"` @@ -62,8 +62,8 @@ type grantHotRegionSchedulerConfig struct { } func (conf *grantHotRegionSchedulerConfig) setStore(leaderID uint64, peers []uint64) bool { - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() ret := slice.AnyOf(peers, func(i int) bool { return leaderID == peers[i] }) @@ -75,20 +75,20 @@ func (conf *grantHotRegionSchedulerConfig) setStore(leaderID uint64, peers []uin } func (conf *grantHotRegionSchedulerConfig) GetStoreLeaderID() uint64 { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() return conf.StoreLeaderID } func (conf *grantHotRegionSchedulerConfig) SetStoreLeaderID(id uint64) { - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() conf.StoreLeaderID = id } func (conf *grantHotRegionSchedulerConfig) Clone() *grantHotRegionSchedulerConfig { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() newStoreIDs := make([]uint64, len(conf.StoreIDs)) copy(newStoreIDs, conf.StoreIDs) return &grantHotRegionSchedulerConfig{ @@ -99,8 +99,8 @@ func (conf *grantHotRegionSchedulerConfig) Clone() *grantHotRegionSchedulerConfi func (conf *grantHotRegionSchedulerConfig) Persist() error { name := conf.getSchedulerName() - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() data, err := EncodeConfig(conf) if err != nil { return err @@ -113,8 +113,8 @@ func (conf *grantHotRegionSchedulerConfig) getSchedulerName() string { } func (conf *grantHotRegionSchedulerConfig) has(storeID uint64) bool { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() return slice.AnyOf(conf.StoreIDs, func(i int) bool { return storeID == conf.StoreIDs[i] }) @@ -151,6 +151,25 @@ func (s *grantHotRegionScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +func (s *grantHotRegionScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() + cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &grantHotRegionSchedulerConfig{} + if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + s.conf.StoreIDs = newCfg.StoreIDs + s.conf.StoreLeaderID = newCfg.StoreLeaderID + return nil +} + // IsScheduleAllowed returns whether the scheduler is allowed to schedule. // TODO it should check if there is any scheduler such as evict or hot region scheduler func (s *grantHotRegionScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index 84f830f368b..47e14af4902 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -49,7 +49,7 @@ var ( ) type grantLeaderSchedulerConfig struct { - mu syncutil.RWMutex + syncutil.RWMutex storage endpoint.ConfigStorage StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` cluster *core.BasicCluster @@ -69,15 +69,15 @@ func (conf *grantLeaderSchedulerConfig) BuildWithArgs(args []string) error { if err != nil { return err } - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() conf.StoreIDWithRanges[id] = ranges return nil } func (conf *grantLeaderSchedulerConfig) Clone() *grantLeaderSchedulerConfig { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() newStoreIDWithRanges := make(map[uint64][]core.KeyRange) for k, v := range conf.StoreIDWithRanges { newStoreIDWithRanges[k] = v @@ -89,8 +89,8 @@ func (conf *grantLeaderSchedulerConfig) Clone() *grantLeaderSchedulerConfig { func (conf *grantLeaderSchedulerConfig) Persist() error { name := conf.getSchedulerName() - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() data, err := EncodeConfig(conf) if err != nil { return err @@ -103,8 +103,8 @@ func (conf *grantLeaderSchedulerConfig) getSchedulerName() string { } func (conf *grantLeaderSchedulerConfig) getRanges(id uint64) []string { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() ranges := conf.StoreIDWithRanges[id] res := make([]string, 0, len(ranges)*2) for index := range ranges { @@ -114,8 +114,8 @@ func (conf *grantLeaderSchedulerConfig) getRanges(id uint64) []string { } func (conf *grantLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last bool) { - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() _, exists := conf.StoreIDWithRanges[id] succ, last = false, false if exists { @@ -128,15 +128,15 @@ func (conf *grantLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last } func (conf *grantLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) { - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() conf.cluster.PauseLeaderTransfer(id) conf.StoreIDWithRanges[id] = keyRange } func (conf *grantLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() if ranges, exist := conf.StoreIDWithRanges[id]; exist { return ranges } @@ -179,8 +179,8 @@ func (s *grantLeaderScheduler) EncodeConfig() ([]byte, error) { } func (s *grantLeaderScheduler) ReloadConfig() error { - s.conf.mu.Lock() - defer s.conf.mu.Unlock() + s.conf.Lock() + defer s.conf.Unlock() cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) if err != nil { return err @@ -198,8 +198,8 @@ func (s *grantLeaderScheduler) ReloadConfig() error { } func (s *grantLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { - s.conf.mu.RLock() - defer s.conf.mu.RUnlock() + s.conf.RLock() + defer s.conf.RUnlock() var res error for id := range s.conf.StoreIDWithRanges { if err := cluster.PauseLeaderTransfer(id); err != nil { @@ -210,8 +210,8 @@ func (s *grantLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) erro } func (s *grantLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { - s.conf.mu.RLock() - defer s.conf.mu.RUnlock() + s.conf.RLock() + defer s.conf.RUnlock() for id := range s.conf.StoreIDWithRanges { cluster.ResumeLeaderTransfer(id) } @@ -227,8 +227,8 @@ func (s *grantLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) func (s *grantLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { grantLeaderCounter.Inc() - s.conf.mu.RLock() - defer s.conf.mu.RUnlock() + s.conf.RLock() + defer s.conf.RUnlock() ops := make([]*operator.Operator, 0, len(s.conf.StoreIDWithRanges)) pendingFilter := filter.NewRegionPendingFilter() downFilter := filter.NewRegionDownFilter() @@ -268,15 +268,15 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R idFloat, ok := input["store_id"].(float64) if ok { id = (uint64)(idFloat) - handler.config.mu.RLock() + handler.config.RLock() if _, exists = handler.config.StoreIDWithRanges[id]; !exists { if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil { - handler.config.mu.RUnlock() + handler.config.RUnlock() handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } } - handler.config.mu.RUnlock() + handler.config.RUnlock() args = append(args, strconv.FormatUint(id, 10)) } diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 4806180e450..fdd07e85145 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -257,6 +257,44 @@ func (h *hotScheduler) EncodeConfig() ([]byte, error) { return h.conf.EncodeConfig() } +func (h *hotScheduler) ReloadConfig() error { + h.conf.Lock() + defer h.conf.Unlock() + cfgData, err := h.conf.storage.LoadSchedulerConfig(h.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &hotRegionSchedulerConfig{} + if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + h.conf.MinHotByteRate = newCfg.MinHotByteRate + h.conf.MinHotKeyRate = newCfg.MinHotKeyRate + h.conf.MinHotQueryRate = newCfg.MinHotQueryRate + h.conf.MaxZombieRounds = newCfg.MaxZombieRounds + h.conf.MaxPeerNum = newCfg.MaxPeerNum + h.conf.ByteRateRankStepRatio = newCfg.ByteRateRankStepRatio + h.conf.KeyRateRankStepRatio = newCfg.KeyRateRankStepRatio + h.conf.QueryRateRankStepRatio = newCfg.QueryRateRankStepRatio + h.conf.CountRankStepRatio = newCfg.CountRankStepRatio + h.conf.GreatDecRatio = newCfg.GreatDecRatio + h.conf.MinorDecRatio = newCfg.MinorDecRatio + h.conf.SrcToleranceRatio = newCfg.SrcToleranceRatio + h.conf.DstToleranceRatio = newCfg.DstToleranceRatio + h.conf.WriteLeaderPriorities = newCfg.WriteLeaderPriorities + h.conf.WritePeerPriorities = newCfg.WritePeerPriorities + h.conf.ReadPriorities = newCfg.ReadPriorities + h.conf.StrictPickingStore = newCfg.StrictPickingStore + h.conf.EnableForTiFlash = newCfg.EnableForTiFlash + h.conf.RankFormulaVersion = newCfg.RankFormulaVersion + h.conf.ForbidRWType = newCfg.ForbidRWType + h.conf.SplitThresholds = newCfg.SplitThresholds + return nil +} + func (h *hotScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.conf.ServeHTTP(w, r) } diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index d45602b90e1..f60be1e5b06 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -163,6 +163,7 @@ func schedulersRegister() { if err := decoder(conf); err != nil { return nil, err } + conf.cluster = opController.GetCluster() return newEvictSlowStoreScheduler(opController, conf), nil }) @@ -378,6 +379,7 @@ func schedulersRegister() { if err := decoder(conf); err != nil { return nil, err } + conf.storage = storage return newShuffleHotRegionScheduler(opController, conf), nil }) diff --git a/pkg/schedule/schedulers/label.go b/pkg/schedule/schedulers/label.go index 62a1100d16b..90310bcf10e 100644 --- a/pkg/schedule/schedulers/label.go +++ b/pkg/schedule/schedulers/label.go @@ -46,6 +46,7 @@ var ( type labelSchedulerConfig struct { Name string `json:"name"` Ranges []core.KeyRange `json:"ranges"` + // TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler. } type labelScheduler struct { diff --git a/pkg/schedule/schedulers/random_merge.go b/pkg/schedule/schedulers/random_merge.go index a621b595198..44bb5081ef9 100644 --- a/pkg/schedule/schedulers/random_merge.go +++ b/pkg/schedule/schedulers/random_merge.go @@ -48,6 +48,7 @@ var ( type randomMergeSchedulerConfig struct { Name string `json:"name"` Ranges []core.KeyRange `json:"ranges"` + // TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler. } type randomMergeScheduler struct { diff --git a/pkg/schedule/schedulers/scatter_range.go b/pkg/schedule/schedulers/scatter_range.go index e301b4c6e76..1bc6eafb58e 100644 --- a/pkg/schedule/schedulers/scatter_range.go +++ b/pkg/schedule/schedulers/scatter_range.go @@ -49,7 +49,7 @@ var ( ) type scatterRangeSchedulerConfig struct { - mu syncutil.RWMutex + syncutil.RWMutex storage endpoint.ConfigStorage RangeName string `json:"range-name"` StartKey string `json:"start-key"` @@ -60,8 +60,8 @@ func (conf *scatterRangeSchedulerConfig) BuildWithArgs(args []string) error { if len(args) != 3 { return errs.ErrSchedulerConfig.FastGenByArgs("ranges and name") } - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() conf.RangeName = args[0] conf.StartKey = args[1] @@ -70,8 +70,8 @@ func (conf *scatterRangeSchedulerConfig) BuildWithArgs(args []string) error { } func (conf *scatterRangeSchedulerConfig) Clone() *scatterRangeSchedulerConfig { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() return &scatterRangeSchedulerConfig{ StartKey: conf.StartKey, EndKey: conf.EndKey, @@ -81,8 +81,8 @@ func (conf *scatterRangeSchedulerConfig) Clone() *scatterRangeSchedulerConfig { func (conf *scatterRangeSchedulerConfig) Persist() error { name := conf.getSchedulerName() - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() data, err := EncodeConfig(conf) if err != nil { return err @@ -91,26 +91,26 @@ func (conf *scatterRangeSchedulerConfig) Persist() error { } func (conf *scatterRangeSchedulerConfig) GetRangeName() string { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() return conf.RangeName } func (conf *scatterRangeSchedulerConfig) GetStartKey() []byte { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() return []byte(conf.StartKey) } func (conf *scatterRangeSchedulerConfig) GetEndKey() []byte { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() return []byte(conf.EndKey) } func (conf *scatterRangeSchedulerConfig) getSchedulerName() string { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() return fmt.Sprintf("scatter-range-%s", conf.RangeName) } @@ -161,14 +161,14 @@ func (l *scatterRangeScheduler) GetType() string { } func (l *scatterRangeScheduler) EncodeConfig() ([]byte, error) { - l.config.mu.RLock() - defer l.config.mu.RUnlock() + l.config.RLock() + defer l.config.RUnlock() return EncodeConfig(l.config) } func (l *scatterRangeScheduler) ReloadConfig() error { - l.config.mu.Lock() - defer l.config.mu.Unlock() + l.config.Lock() + defer l.config.Unlock() cfgData, err := l.config.storage.LoadSchedulerConfig(l.GetName()) if err != nil { return err @@ -176,7 +176,14 @@ func (l *scatterRangeScheduler) ReloadConfig() error { if len(cfgData) == 0 { return nil } - return DecodeConfig([]byte(cfgData), l.config) + newCfg := &scatterRangeSchedulerConfig{} + if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + l.config.RangeName = newCfg.RangeName + l.config.StartKey = newCfg.StartKey + l.config.EndKey = newCfg.EndKey + return nil } func (l *scatterRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index b65173c1f5b..818f02685ea 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -16,6 +16,7 @@ package schedulers import ( "context" + "fmt" "net/http" "sync" "sync/atomic" @@ -280,7 +281,7 @@ func (c *Controller) PauseOrResumeScheduler(name string, t int64) error { // ReloadSchedulerConfig reloads a scheduler's config if it exists. func (c *Controller) ReloadSchedulerConfig(name string) error { if exist, _ := c.IsSchedulerExisted(name); !exist { - return nil + return fmt.Errorf("scheduler %s is not existed", name) } return c.GetScheduler(name).ReloadConfig() } diff --git a/pkg/schedule/schedulers/shuffle_hot_region.go b/pkg/schedule/schedulers/shuffle_hot_region.go index d5264b90428..6ad6656fd18 100644 --- a/pkg/schedule/schedulers/shuffle_hot_region.go +++ b/pkg/schedule/schedulers/shuffle_hot_region.go @@ -15,6 +15,9 @@ package schedulers import ( + "net/http" + + "github.com/gorilla/mux" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core/constant" @@ -24,6 +27,10 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/syncutil" + "github.com/unrolled/render" "go.uber.org/zap" ) @@ -42,8 +49,32 @@ var ( ) type shuffleHotRegionSchedulerConfig struct { - Name string `json:"name"` - Limit uint64 `json:"limit"` + syncutil.RWMutex + storage endpoint.ConfigStorage + Name string `json:"name"` + Limit uint64 `json:"limit"` +} + +func (conf *shuffleHotRegionSchedulerConfig) getSchedulerName() string { + return conf.Name +} + +func (conf *shuffleHotRegionSchedulerConfig) Clone() *shuffleHotRegionSchedulerConfig { + conf.RLock() + defer conf.RUnlock() + return &shuffleHotRegionSchedulerConfig{ + Name: conf.Name, + Limit: conf.Limit, + } +} + +func (conf *shuffleHotRegionSchedulerConfig) persistLocked() error { + name := conf.getSchedulerName() + data, err := EncodeConfig(conf) + if err != nil { + return err + } + return conf.storage.SaveSchedulerConfig(name, data) } // ShuffleHotRegionScheduler mainly used to test. @@ -52,19 +83,26 @@ type shuffleHotRegionSchedulerConfig struct { // the hot peer. type shuffleHotRegionScheduler struct { *baseHotScheduler - conf *shuffleHotRegionSchedulerConfig + conf *shuffleHotRegionSchedulerConfig + handler http.Handler } // newShuffleHotRegionScheduler creates an admin scheduler that random balance hot regions func newShuffleHotRegionScheduler(opController *operator.Controller, conf *shuffleHotRegionSchedulerConfig) Scheduler { base := newBaseHotScheduler(opController) + handler := newShuffleHotRegionHandler(conf) ret := &shuffleHotRegionScheduler{ baseHotScheduler: base, conf: conf, + handler: handler, } return ret } +func (s *shuffleHotRegionScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) +} + func (s *shuffleHotRegionScheduler) GetName() string { return s.conf.Name } @@ -77,6 +115,24 @@ func (s *shuffleHotRegionScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +func (s *shuffleHotRegionScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() + cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &shuffleHotRegionSchedulerConfig{} + if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + s.conf.Limit = newCfg.Limit + return nil +} + func (s *shuffleHotRegionScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { hotRegionAllowed := s.OpController.OperatorCount(operator.OpHotRegion) < s.conf.Limit conf := cluster.GetSchedulerConfig() @@ -158,3 +214,47 @@ func (s *shuffleHotRegionScheduler) randomSchedule(cluster sche.SchedulerCluster shuffleHotRegionSkipCounter.Inc() return nil } + +type shuffleHotRegionHandler struct { + rd *render.Render + config *shuffleHotRegionSchedulerConfig +} + +func (handler *shuffleHotRegionHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { + var input map[string]interface{} + if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { + return + } + limit, ok := input["limit"].(float64) + if !ok { + handler.rd.JSON(w, http.StatusBadRequest, "invalid limit") + return + } + handler.config.Lock() + defer handler.config.Unlock() + previous := handler.config.Limit + handler.config.Limit = uint64(limit) + err := handler.config.persistLocked() + if err != nil { + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.config.Limit = previous + return + } + handler.rd.JSON(w, http.StatusOK, nil) +} + +func (handler *shuffleHotRegionHandler) ListConfig(w http.ResponseWriter, r *http.Request) { + conf := handler.config.Clone() + handler.rd.JSON(w, http.StatusOK, conf) +} + +func newShuffleHotRegionHandler(config *shuffleHotRegionSchedulerConfig) http.Handler { + h := &shuffleHotRegionHandler{ + config: config, + rd: render.New(render.Options{IndentJSON: true}), + } + router := mux.NewRouter() + router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) + return router +} diff --git a/pkg/schedule/schedulers/shuffle_leader.go b/pkg/schedule/schedulers/shuffle_leader.go index 0e33fa802db..a6ff4baf65b 100644 --- a/pkg/schedule/schedulers/shuffle_leader.go +++ b/pkg/schedule/schedulers/shuffle_leader.go @@ -43,6 +43,7 @@ var ( type shuffleLeaderSchedulerConfig struct { Name string `json:"name"` Ranges []core.KeyRange `json:"ranges"` + // TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler. } type shuffleLeaderScheduler struct { diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index a08c84372b5..d536f3f8dc8 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -65,15 +65,15 @@ func initSplitBucketConfig() *splitBucketSchedulerConfig { } type splitBucketSchedulerConfig struct { - mu syncutil.RWMutex + syncutil.RWMutex storage endpoint.ConfigStorage Degree int `json:"degree"` SplitLimit uint64 `json:"split-limit"` } func (conf *splitBucketSchedulerConfig) Clone() *splitBucketSchedulerConfig { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() return &splitBucketSchedulerConfig{ Degree: conf.Degree, } @@ -104,8 +104,8 @@ func (h *splitBucketHandler) ListConfig(w http.ResponseWriter, _ *http.Request) } func (h *splitBucketHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { - h.conf.mu.Lock() - defer h.conf.mu.Unlock() + h.conf.Lock() + defer h.conf.Unlock() rd := render.New(render.Options{IndentJSON: true}) oldc, _ := json.Marshal(h.conf) data, err := io.ReadAll(r.Body) @@ -173,8 +173,8 @@ func (s *splitBucketScheduler) GetType() string { } func (s *splitBucketScheduler) ReloadConfig() error { - s.conf.mu.Lock() - defer s.conf.mu.Unlock() + s.conf.Lock() + defer s.conf.Unlock() cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) if err != nil { return err @@ -182,7 +182,13 @@ func (s *splitBucketScheduler) ReloadConfig() error { if len(cfgData) == 0 { return nil } - return DecodeConfig([]byte(cfgData), s.conf) + newCfg := &splitBucketSchedulerConfig{} + if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + s.conf.SplitLimit = newCfg.SplitLimit + s.conf.Degree = newCfg.Degree + return nil } // ServerHTTP implement Http server. diff --git a/pkg/schedule/schedulers/transfer_witness_leader.go b/pkg/schedule/schedulers/transfer_witness_leader.go index 2586065ea80..c651a8ef872 100644 --- a/pkg/schedule/schedulers/transfer_witness_leader.go +++ b/pkg/schedule/schedulers/transfer_witness_leader.go @@ -34,6 +34,7 @@ const ( // TransferWitnessLeaderBatchSize is the number of operators to to transfer // leaders by one scheduling transferWitnessLeaderBatchSize = 3 + // TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler. // TransferWitnessLeaderRecvMaxRegionSize is the max number of region can receive // TODO: make it a reasonable value transferWitnessLeaderRecvMaxRegionSize = 10000 diff --git a/pkg/schedule/schedulers/utils.go b/pkg/schedule/schedulers/utils.go index c7cdf9191ca..fea51798d1c 100644 --- a/pkg/schedule/schedulers/utils.go +++ b/pkg/schedule/schedulers/utils.go @@ -390,3 +390,19 @@ func (q *retryQuota) GC(keepStores []*core.StoreInfo) { } } } + +// pauseAndResumeLeaderTransfer checks the old and new store IDs, and pause or resume the leader transfer. +func pauseAndResumeLeaderTransfer[T any](cluster *core.BasicCluster, old, new map[uint64]T) { + for id := range old { + if _, ok := new[id]; ok { + continue + } + cluster.ResumeLeaderTransfer(id) + } + for id := range new { + if _, ok := old[id]; ok { + continue + } + cluster.PauseLeaderTransfer(id) + } +} diff --git a/pkg/window/policy.go b/pkg/window/policy.go index d67a8aa6e59..fed4fedc32a 100644 --- a/pkg/window/policy.go +++ b/pkg/window/policy.go @@ -18,8 +18,9 @@ package window import ( - "sync" "time" + + "github.com/tikv/pd/pkg/utils/syncutil" ) // RollingPolicy is a policy for ring window based on time duration. @@ -27,7 +28,7 @@ import ( // e.g. If the last point is appended one bucket duration ago, // RollingPolicy will increment current offset. type RollingPolicy struct { - mu sync.RWMutex + mu syncutil.RWMutex size int window *Window offset int diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index 04c77498948..5e8cb8462df 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -37,6 +37,7 @@ import ( "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/syncutil" ) // schedulingController is used to manage all schedulers and checkers. @@ -44,7 +45,7 @@ type schedulingController struct { parentCtx context.Context ctx context.Context cancel context.CancelFunc - mu sync.RWMutex + mu syncutil.RWMutex wg sync.WaitGroup *core.BasicCluster opt sc.ConfProvider diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 7098637c84a..7c5d53e387a 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -285,10 +285,12 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { var roles []string mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) re.Equal([]string{"leader", "follower", "learner"}, roles) - echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "set-roles", "learner"}, nil) // todo:add check output + echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "set-roles", "learner"}, nil) re.Contains(echo, "Success!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) - re.Equal([]string{"learner"}, roles) + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) + return reflect.DeepEqual([]string{"learner"}, roles) + }) mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler"}, &roles) re.Equal([]string{"learner"}, roles) @@ -312,11 +314,10 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler", "set", "2", "1,2,3"}, nil) re.Contains(echo, "Success!") expected3["store-leader-id"] = float64(2) - // FIXME: remove this check after scheduler config is updated - if cluster.GetSchedulingPrimaryServer() == nil { // "grant-hot-region-scheduler" + testutil.Eventually(re, func() bool { mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) - re.Equal(expected3, conf3) - } + return reflect.DeepEqual(expected3, conf3) + }) // test remove and add scheduler echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) @@ -370,91 +371,88 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { re.Contains(echo, "Success!") expected1["src-tolerance-ratio"] = 1.02 var conf1 map[string]interface{} - // FIXME: remove this check after scheduler config is updated - if cluster.GetSchedulingPrimaryServer() == nil { // "balance-hot-region-scheduler" - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,key"}, nil) - re.Contains(echo, "Success!") - expected1["read-priorities"] = []interface{}{"byte", "key"} - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key"}, nil) - re.Contains(echo, "Failed!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,byte"}, nil) - re.Contains(echo, "Success!") - expected1["read-priorities"] = []interface{}{"key", "byte"} - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "foo,bar"}, nil) - re.Contains(echo, "Failed!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", ""}, nil) - re.Contains(echo, "Failed!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key"}, nil) - re.Contains(echo, "Failed!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,byte"}, nil) - re.Contains(echo, "Failed!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key,byte"}, nil) - re.Contains(echo, "Failed!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - - // write-priorities is divided into write-leader-priorities and write-peer-priorities - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) - re.Contains(echo, "Failed!") - re.Contains(echo, "Config item is not found.") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v0"}, nil) - re.Contains(echo, "Failed!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - expected1["rank-formula-version"] = "v2" - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v2"}, nil) - re.Contains(echo, "Success!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - expected1["rank-formula-version"] = "v1" - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v1"}, nil) - re.Contains(echo, "Success!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,key"}, nil) + re.Contains(echo, "Success!") + expected1["read-priorities"] = []interface{}{"byte", "key"} + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key"}, nil) + re.Contains(echo, "Failed!") + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,byte"}, nil) + re.Contains(echo, "Success!") + expected1["read-priorities"] = []interface{}{"key", "byte"} + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "foo,bar"}, nil) + re.Contains(echo, "Failed!") + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", ""}, nil) + re.Contains(echo, "Failed!") + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key"}, nil) + re.Contains(echo, "Failed!") + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,byte"}, nil) + re.Contains(echo, "Failed!") + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key,byte"}, nil) + re.Contains(echo, "Failed!") + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + + // write-priorities is divided into write-leader-priorities and write-peer-priorities + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) + re.Contains(echo, "Failed!") + re.Contains(echo, "Config item is not found.") + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v0"}, nil) + re.Contains(echo, "Failed!") + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + expected1["rank-formula-version"] = "v2" + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v2"}, nil) + re.Contains(echo, "Success!") + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + expected1["rank-formula-version"] = "v1" + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v1"}, nil) + re.Contains(echo, "Success!") + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) - expected1["forbid-rw-type"] = "read" - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "forbid-rw-type", "read"}, nil) - re.Contains(echo, "Success!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) + expected1["forbid-rw-type"] = "read" + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "forbid-rw-type", "read"}, nil) + re.Contains(echo, "Success!") + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) - // test compatibility - re.Equal("2.0.0", leaderServer.GetClusterVersion().String()) - for _, store := range stores { - version := versioninfo.HotScheduleWithQuery - store.Version = versioninfo.MinSupportedVersion(version).String() - tests.MustPutStore(re, cluster, store) - } - re.Equal("5.2.0", leaderServer.GetClusterVersion().String()) - // After upgrading, we should not use query. - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(conf1["read-priorities"], []interface{}{"key", "byte"}) - // cannot set qps as write-peer-priorities - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil) - re.Contains(echo, "query is not allowed to be set in priorities for write-peer-priorities") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(conf1["write-peer-priorities"], []interface{}{"byte", "key"}) + // test compatibility + re.Equal("2.0.0", leaderServer.GetClusterVersion().String()) + for _, store := range stores { + version := versioninfo.HotScheduleWithQuery + store.Version = versioninfo.MinSupportedVersion(version).String() + tests.MustPutStore(re, cluster, store) } + re.Equal("5.2.0", leaderServer.GetClusterVersion().String()) + // After upgrading, we should not use query. + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(conf1["read-priorities"], []interface{}{"key", "byte"}) + // cannot set qps as write-peer-priorities + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil) + re.Contains(echo, "query is not allowed to be set in priorities for write-peer-priorities") + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(conf1["write-peer-priorities"], []interface{}{"byte", "key"}) // test remove and add echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil) @@ -498,11 +496,10 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "set", "recovery-duration", "100"}, nil) re.Contains(echo, "Success!") conf = make(map[string]interface{}) - // FIXME: remove this check after scheduler config is updated - if cluster.GetSchedulingPrimaryServer() == nil && schedulerName == "evict-slow-store-scheduler" { + testutil.Eventually(re, func() bool { mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "show"}, &conf) - re.Equal(100., conf["recovery-duration"]) - } + return conf["recovery-duration"] == 100. + }) echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", schedulerName}, nil) re.Contains(echo, "Success!") testutil.Eventually(re, func() bool { @@ -511,6 +508,27 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { }) } + // test shuffle hot region scheduler + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "shuffle-hot-region-scheduler"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return strings.Contains(echo, "shuffle-hot-region-scheduler") + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-hot-region-scheduler", "set", "limit", "127"}, nil) + re.Contains(echo, "Success!") + conf = make(map[string]interface{}) + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-hot-region-scheduler", "show"}, &conf) + return conf["limit"] == 127. + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "shuffle-hot-region-scheduler"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return !strings.Contains(echo, "shuffle-hot-region-scheduler") + }) + // test show scheduler with paused and disabled status. checkSchedulerWithStatusCommand := func(status string, expected []string) { testutil.Eventually(re, func() bool { diff --git a/tests/server/api/scheduler_test.go b/tests/server/api/scheduler_test.go index 4d6dde6f2b9..86b932b0a7a 100644 --- a/tests/server/api/scheduler_test.go +++ b/tests/server/api/scheduler_test.go @@ -236,27 +236,25 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { suite.NoError(tu.CheckPostJSON(testDialClient, updateURL, body, tu.StatusOK(re))) resp = make(map[string]interface{}) suite.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - // FIXME: remove this check after scheduler config is updated - if cluster.GetSchedulingPrimaryServer() == nil { // "balance-hot-region-scheduler" - for key := range expectMap { - suite.Equal(expectMap[key], resp[key], "key %s", key) - } - - // update again - err = tu.CheckPostJSON(testDialClient, updateURL, body, - tu.StatusOK(re), - tu.StringEqual(re, "Config is the same with origin, so do nothing.")) - suite.NoError(err) - // config item not found - dataMap = map[string]interface{}{} - dataMap["error"] = 3 - body, err = json.Marshal(dataMap) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, updateURL, body, - tu.Status(re, http.StatusBadRequest), - tu.StringEqual(re, "Config item is not found.")) - suite.NoError(err) + + for key := range expectMap { + suite.Equal(expectMap[key], resp[key], "key %s", key) } + + // update again + err = tu.CheckPostJSON(testDialClient, updateURL, body, + tu.StatusOK(re), + tu.StringEqual(re, "Config is the same with origin, so do nothing.")) + suite.NoError(err) + // config item not found + dataMap = map[string]interface{}{} + dataMap["error"] = 3 + body, err = json.Marshal(dataMap) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, updateURL, body, + tu.Status(re, http.StatusBadRequest), + tu.StringEqual(re, "Config item is not found.")) + suite.NoError(err) }, }, { diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 526ff2646dc..695576edf84 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -500,6 +500,7 @@ func NewConfigSchedulerCommand() *cobra.Command { newConfigBalanceLeaderCommand(), newSplitBucketCommand(), newConfigEvictSlowStoreCommand(), + newConfigShuffleHotRegionSchedulerCommand(), newConfigEvictSlowTrendCommand(), ) return c @@ -802,6 +803,25 @@ func newConfigEvictSlowStoreCommand() *cobra.Command { return c } +func newConfigShuffleHotRegionSchedulerCommand() *cobra.Command { + c := &cobra.Command{ + Use: "shuffle-hot-region-scheduler", + Short: "shuffle-hot-region-scheduler config", + Run: listSchedulerConfigCommandFunc, + } + + c.AddCommand(&cobra.Command{ + Use: "show", + Short: "list the config item", + Run: listSchedulerConfigCommandFunc, + }, &cobra.Command{ + Use: "set ", + Short: "set the config item", + Run: func(cmd *cobra.Command, args []string) { postSchedulerConfigCommandFunc(cmd, c.Name(), args) }, + }) + return c +} + func newConfigEvictSlowTrendCommand() *cobra.Command { c := &cobra.Command{ Use: "evict-slow-trend-scheduler", diff --git a/tools/pd-simulator/simulator/node.go b/tools/pd-simulator/simulator/node.go index b8fb422d6dd..68a10a8638e 100644 --- a/tools/pd-simulator/simulator/node.go +++ b/tools/pd-simulator/simulator/node.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/pkg/ratelimit" + "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/tools/pd-simulator/simulator/cases" "github.com/tikv/pd/tools/pd-simulator/simulator/info" "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" @@ -39,7 +40,7 @@ const ( // Node simulates a TiKV. type Node struct { *metapb.Store - sync.RWMutex + syncutil.RWMutex stats *info.StoreStats tick uint64 wg sync.WaitGroup @@ -50,7 +51,7 @@ type Node struct { cancel context.CancelFunc raftEngine *RaftEngine limiter *ratelimit.RateLimiter - sizeMutex sync.Mutex + sizeMutex syncutil.Mutex hasExtraUsedSpace bool snapStats []*pdpb.SnapshotStat }