From 100192b682b65d23d4b4bd6ef7fd3508142923cf Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Tue, 9 Jan 2024 23:09:24 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #7674 close tikv/pd#7672 Signed-off-by: ti-chi-bot --- pkg/schedule/schedulers/evict_slow_store.go | 343 +++++++++ pkg/schedule/schedulers/evict_slow_trend.go | 667 ++++++++++++++++ server/api/router.go | 12 +- server/api/scheduler.go | 2 +- server/api/server_test.go | 9 + server/schedulers/evict_leader.go | 2 +- server/schedulers/grant_leader.go | 2 +- .../pd-ctl/tests/scheduler/scheduler_test.go | 716 ++++++++++++++++++ 8 files changed, 1747 insertions(+), 6 deletions(-) create mode 100644 pkg/schedule/schedulers/evict_slow_store.go create mode 100644 pkg/schedule/schedulers/evict_slow_trend.go create mode 100644 tools/pd-ctl/tests/scheduler/scheduler_test.go diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go new file mode 100644 index 00000000000..79715a6fd44 --- /dev/null +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -0,0 +1,343 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulers + +import ( + "net/http" + "time" + + "github.com/gorilla/mux" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/plan" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/syncutil" + "github.com/unrolled/render" + "go.uber.org/zap" +) + +const ( + // EvictSlowStoreName is evict leader scheduler name. + EvictSlowStoreName = "evict-slow-store-scheduler" + // EvictSlowStoreType is evict leader scheduler type. + EvictSlowStoreType = "evict-slow-store" + + slowStoreEvictThreshold = 100 + slowStoreRecoverThreshold = 1 +) + +// WithLabelValues is a heavy operation, define variable to avoid call it every time. +var evictSlowStoreCounter = schedulerCounter.WithLabelValues(EvictSlowStoreName, "schedule") + +type evictSlowStoreSchedulerConfig struct { + syncutil.RWMutex + cluster *core.BasicCluster + storage endpoint.ConfigStorage + // Last timestamp of the chosen slow store for eviction. + lastSlowStoreCaptureTS time.Time + // Duration gap for recovering the candidate, unit: s. + RecoveryDurationGap uint64 `json:"recovery-duration"` + EvictedStores []uint64 `json:"evict-stores"` +} + +func initEvictSlowStoreSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowStoreSchedulerConfig { + return &evictSlowStoreSchedulerConfig{ + storage: storage, + lastSlowStoreCaptureTS: time.Time{}, + RecoveryDurationGap: defaultRecoveryDurationGap, + EvictedStores: make([]uint64, 0), + } +} + +func (conf *evictSlowStoreSchedulerConfig) Clone() *evictSlowStoreSchedulerConfig { + conf.RLock() + defer conf.RUnlock() + return &evictSlowStoreSchedulerConfig{ + RecoveryDurationGap: conf.RecoveryDurationGap, + } +} + +func (conf *evictSlowStoreSchedulerConfig) persistLocked() error { + name := EvictSlowStoreName + data, err := EncodeConfig(conf) + failpoint.Inject("persistFail", func() { + err = errors.New("fail to persist") + }) + if err != nil { + return err + } + return conf.storage.SaveSchedulerConfig(name, data) +} + +func (conf *evictSlowStoreSchedulerConfig) getStores() []uint64 { + conf.RLock() + defer conf.RUnlock() + return conf.EvictedStores +} + +func (conf *evictSlowStoreSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange { + if conf.evictStore() != id { + return nil + } + return []core.KeyRange{core.NewKeyRange("", "")} +} + +func (conf *evictSlowStoreSchedulerConfig) evictStore() uint64 { + if len(conf.getStores()) == 0 { + return 0 + } + return conf.getStores()[0] +} + +// readyForRecovery checks whether the last cpatured candidate is ready for recovery. +func (conf *evictSlowStoreSchedulerConfig) readyForRecovery() bool { + conf.RLock() + defer conf.RUnlock() + recoveryDurationGap := conf.RecoveryDurationGap + failpoint.Inject("transientRecoveryGap", func() { + recoveryDurationGap = 0 + }) + return uint64(time.Since(conf.lastSlowStoreCaptureTS).Seconds()) >= recoveryDurationGap +} + +func (conf *evictSlowStoreSchedulerConfig) setStoreAndPersist(id uint64) error { + conf.Lock() + defer conf.Unlock() + conf.EvictedStores = []uint64{id} + conf.lastSlowStoreCaptureTS = time.Now() + return conf.persistLocked() +} + +func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err error) { + oldID = conf.evictStore() + conf.Lock() + defer conf.Unlock() + if oldID > 0 { + conf.EvictedStores = []uint64{} + conf.lastSlowStoreCaptureTS = time.Time{} + err = conf.persistLocked() + } + return +} + +type evictSlowStoreHandler struct { + rd *render.Render + config *evictSlowStoreSchedulerConfig +} + +func newEvictSlowStoreHandler(config *evictSlowStoreSchedulerConfig) http.Handler { + h := &evictSlowStoreHandler{ + config: config, + rd: render.New(render.Options{IndentJSON: true}), + } + router := mux.NewRouter() + router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) + return router +} + +func (handler *evictSlowStoreHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { + var input map[string]interface{} + if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { + return + } + recoveryDurationGapFloat, ok := input["recovery-duration"].(float64) + if !ok { + handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) + return + } + handler.config.Lock() + defer handler.config.Unlock() + prevRecoveryDurationGap := handler.config.RecoveryDurationGap + recoveryDurationGap := uint64(recoveryDurationGapFloat) + handler.config.RecoveryDurationGap = recoveryDurationGap + if err := handler.config.persistLocked(); err != nil { + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.config.RecoveryDurationGap = prevRecoveryDurationGap + return + } + log.Info("evict-slow-store-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap)) + handler.rd.JSON(w, http.StatusOK, "Config updated.") +} + +func (handler *evictSlowStoreHandler) ListConfig(w http.ResponseWriter, r *http.Request) { + conf := handler.config.Clone() + handler.rd.JSON(w, http.StatusOK, conf) +} + +type evictSlowStoreScheduler struct { + *BaseScheduler + conf *evictSlowStoreSchedulerConfig + handler http.Handler +} + +func (s *evictSlowStoreScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) +} + +func (s *evictSlowStoreScheduler) GetName() string { + return EvictSlowStoreName +} + +func (s *evictSlowStoreScheduler) GetType() string { + return EvictSlowStoreType +} + +func (s *evictSlowStoreScheduler) EncodeConfig() ([]byte, error) { + return EncodeConfig(s.conf) +} + +func (s *evictSlowStoreScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() + cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &evictSlowStoreSchedulerConfig{} + if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + old := make(map[uint64]struct{}) + for _, id := range s.conf.EvictedStores { + old[id] = struct{}{} + } + new := make(map[uint64]struct{}) + for _, id := range newCfg.EvictedStores { + new[id] = struct{}{} + } + pauseAndResumeLeaderTransfer(s.conf.cluster, old, new) + s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap + s.conf.EvictedStores = newCfg.EvictedStores + return nil +} + +func (s *evictSlowStoreScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { + evictStore := s.conf.evictStore() + if evictStore != 0 { + return cluster.SlowStoreEvicted(evictStore) + } + return nil +} + +func (s *evictSlowStoreScheduler) CleanConfig(cluster sche.SchedulerCluster) { + s.cleanupEvictLeader(cluster) +} + +func (s *evictSlowStoreScheduler) prepareEvictLeader(cluster sche.SchedulerCluster, storeID uint64) error { + err := s.conf.setStoreAndPersist(storeID) + if err != nil { + log.Info("evict-slow-store-scheduler persist config failed", zap.Uint64("store-id", storeID)) + return err + } + + return cluster.SlowStoreEvicted(storeID) +} + +func (s *evictSlowStoreScheduler) cleanupEvictLeader(cluster sche.SchedulerCluster) { + evictSlowStore, err := s.conf.clearAndPersist() + if err != nil { + log.Info("evict-slow-store-scheduler persist config failed", zap.Uint64("store-id", evictSlowStore)) + } + if evictSlowStore == 0 { + return + } + cluster.SlowStoreRecovered(evictSlowStore) +} + +func (s *evictSlowStoreScheduler) schedulerEvictLeader(cluster sche.SchedulerCluster) []*operator.Operator { + return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) +} + +func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + if s.conf.evictStore() != 0 { + allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() + if !allowed { + operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpLeader.String()).Inc() + } + return allowed + } + return true +} + +func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { + evictSlowStoreCounter.Inc() + var ops []*operator.Operator + + if s.conf.evictStore() != 0 { + store := cluster.GetStore(s.conf.evictStore()) + if store == nil || store.IsRemoved() { + // Previous slow store had been removed, remove the scheduler and check + // slow node next time. + log.Info("slow store has been removed", + zap.Uint64("store-id", store.GetID())) + } else if store.GetSlowScore() <= slowStoreRecoverThreshold && s.conf.readyForRecovery() { + log.Info("slow store has been recovered", + zap.Uint64("store-id", store.GetID())) + } else { + return s.schedulerEvictLeader(cluster), nil + } + s.cleanupEvictLeader(cluster) + return ops, nil + } + + var slowStore *core.StoreInfo + + for _, store := range cluster.GetStores() { + if store.IsRemoved() { + continue + } + + if (store.IsPreparing() || store.IsServing()) && store.IsSlow() { + // Do nothing if there is more than one slow store. + if slowStore != nil { + return ops, nil + } + slowStore = store + } + } + + if slowStore == nil || slowStore.GetSlowScore() < slowStoreEvictThreshold { + return ops, nil + } + + // If there is only one slow store, evict leaders from that store. + log.Info("detected slow store, start to evict leaders", + zap.Uint64("store-id", slowStore.GetID())) + err := s.prepareEvictLeader(cluster, slowStore.GetID()) + if err != nil { + log.Info("prepare for evicting leader failed", zap.Error(err), zap.Uint64("store-id", slowStore.GetID())) + return ops, nil + } + return s.schedulerEvictLeader(cluster), nil +} + +// newEvictSlowStoreScheduler creates a scheduler that detects and evicts slow stores. +func newEvictSlowStoreScheduler(opController *operator.Controller, conf *evictSlowStoreSchedulerConfig) Scheduler { + handler := newEvictSlowStoreHandler(conf) + return &evictSlowStoreScheduler{ + BaseScheduler: NewBaseScheduler(opController), + conf: conf, + handler: handler, + } +} diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go new file mode 100644 index 00000000000..20c53219765 --- /dev/null +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -0,0 +1,667 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulers + +import ( + "net/http" + "strconv" + "time" + + "github.com/gorilla/mux" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/plan" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/syncutil" + "github.com/unrolled/render" + "go.uber.org/zap" +) + +const ( + // EvictSlowTrendName is evict leader by slow trend scheduler name. + EvictSlowTrendName = "evict-slow-trend-scheduler" + // EvictSlowTrendType is evict leader by slow trend scheduler type. + EvictSlowTrendType = "evict-slow-trend" +) + +const ( + alterEpsilon = 1e-9 + minReCheckDurationGap = 120 // default gap for re-check the slow node, unit: s + defaultRecoveryDurationGap = 600 // default gap for recovery, unit: s. +) + +type slowCandidate struct { + storeID uint64 + captureTS time.Time + recoverTS time.Time +} + +type evictSlowTrendSchedulerConfig struct { + syncutil.RWMutex + cluster *core.BasicCluster + storage endpoint.ConfigStorage + // Candidate for eviction in current tick. + evictCandidate slowCandidate + // Last chosen candidate for eviction. + lastEvictCandidate slowCandidate + // Duration gap for recovering the candidate, unit: s. + RecoveryDurationGap uint64 `json:"recovery-duration"` + // Only evict one store for now + EvictedStores []uint64 `json:"evict-by-trend-stores"` +} + +func initEvictSlowTrendSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowTrendSchedulerConfig { + return &evictSlowTrendSchedulerConfig{ + storage: storage, + evictCandidate: slowCandidate{}, + lastEvictCandidate: slowCandidate{}, + RecoveryDurationGap: defaultRecoveryDurationGap, + EvictedStores: make([]uint64, 0), + } +} + +func (conf *evictSlowTrendSchedulerConfig) Clone() *evictSlowTrendSchedulerConfig { + conf.RLock() + defer conf.RUnlock() + return &evictSlowTrendSchedulerConfig{ + RecoveryDurationGap: conf.RecoveryDurationGap, + } +} + +func (conf *evictSlowTrendSchedulerConfig) persistLocked() error { + name := EvictSlowTrendName + data, err := EncodeConfig(conf) + failpoint.Inject("persistFail", func() { + err = errors.New("fail to persist") + }) + if err != nil { + return err + } + return conf.storage.SaveSchedulerConfig(name, data) +} + +func (conf *evictSlowTrendSchedulerConfig) getStores() []uint64 { + conf.RLock() + defer conf.RUnlock() + return conf.EvictedStores +} + +func (conf *evictSlowTrendSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange { + if conf.evictedStore() != id { + return nil + } + return []core.KeyRange{core.NewKeyRange("", "")} +} + +func (conf *evictSlowTrendSchedulerConfig) hasEvictedStores() bool { + conf.RLock() + defer conf.RUnlock() + return len(conf.EvictedStores) > 0 +} + +func (conf *evictSlowTrendSchedulerConfig) evictedStore() uint64 { + if !conf.hasEvictedStores() { + return 0 + } + conf.RLock() + defer conf.RUnlock() + // If a candidate passes all checks and proved to be slow, it will be + // recorded in `conf.EvictStores`, and `conf.lastEvictCandidate` will record + // the captured timestamp of this store. + return conf.EvictedStores[0] +} + +func (conf *evictSlowTrendSchedulerConfig) candidate() uint64 { + conf.RLock() + defer conf.RUnlock() + return conf.evictCandidate.storeID +} + +func (conf *evictSlowTrendSchedulerConfig) captureTS() time.Time { + conf.RLock() + defer conf.RUnlock() + return conf.evictCandidate.captureTS +} + +func (conf *evictSlowTrendSchedulerConfig) candidateCapturedSecs() uint64 { + conf.RLock() + defer conf.RUnlock() + return DurationSinceAsSecs(conf.evictCandidate.captureTS) +} + +func (conf *evictSlowTrendSchedulerConfig) lastCapturedCandidate() *slowCandidate { + conf.RLock() + defer conf.RUnlock() + return &conf.lastEvictCandidate +} + +func (conf *evictSlowTrendSchedulerConfig) lastCandidateCapturedSecs() uint64 { + return DurationSinceAsSecs(conf.lastEvictCandidate.captureTS) +} + +// readyForRecovery checks whether the last cpatured candidate is ready for recovery. +func (conf *evictSlowTrendSchedulerConfig) readyForRecovery() bool { + conf.RLock() + defer conf.RUnlock() + recoveryDurationGap := conf.RecoveryDurationGap + failpoint.Inject("transientRecoveryGap", func() { + recoveryDurationGap = 0 + }) + return conf.lastCandidateCapturedSecs() >= recoveryDurationGap +} + +func (conf *evictSlowTrendSchedulerConfig) captureCandidate(id uint64) { + conf.Lock() + defer conf.Unlock() + conf.evictCandidate = slowCandidate{ + storeID: id, + captureTS: time.Now(), + recoverTS: time.Now(), + } + if conf.lastEvictCandidate == (slowCandidate{}) { + conf.lastEvictCandidate = conf.evictCandidate + } +} + +func (conf *evictSlowTrendSchedulerConfig) popCandidate(updLast bool) uint64 { + conf.Lock() + defer conf.Unlock() + id := conf.evictCandidate.storeID + if updLast { + conf.lastEvictCandidate = conf.evictCandidate + } + conf.evictCandidate = slowCandidate{} + return id +} + +func (conf *evictSlowTrendSchedulerConfig) markCandidateRecovered() { + conf.Lock() + defer conf.Unlock() + if conf.lastEvictCandidate != (slowCandidate{}) { + conf.lastEvictCandidate.recoverTS = time.Now() + } +} + +func (conf *evictSlowTrendSchedulerConfig) setStoreAndPersist(id uint64) error { + conf.Lock() + defer conf.Unlock() + conf.EvictedStores = []uint64{id} + return conf.persistLocked() +} + +func (conf *evictSlowTrendSchedulerConfig) clearAndPersist(cluster sche.SchedulerCluster) (oldID uint64, err error) { + oldID = conf.evictedStore() + if oldID == 0 { + return + } + address := "?" + store := cluster.GetStore(oldID) + if store != nil { + address = store.GetAddress() + } + storeSlowTrendEvictedStatusGauge.WithLabelValues(address, strconv.FormatUint(oldID, 10)).Set(0) + conf.Lock() + defer conf.Unlock() + conf.EvictedStores = []uint64{} + return oldID, conf.persistLocked() +} + +type evictSlowTrendHandler struct { + rd *render.Render + config *evictSlowTrendSchedulerConfig +} + +func newEvictSlowTrendHandler(config *evictSlowTrendSchedulerConfig) http.Handler { + h := &evictSlowTrendHandler{ + config: config, + rd: render.New(render.Options{IndentJSON: true}), + } + router := mux.NewRouter() + router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) + return router +} + +func (handler *evictSlowTrendHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { + var input map[string]interface{} + if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { + return + } + recoveryDurationGapFloat, ok := input["recovery-duration"].(float64) + if !ok { + handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) + return + } + handler.config.Lock() + defer handler.config.Unlock() + prevRecoveryDurationGap := handler.config.RecoveryDurationGap + recoveryDurationGap := uint64(recoveryDurationGapFloat) + handler.config.RecoveryDurationGap = recoveryDurationGap + if err := handler.config.persistLocked(); err != nil { + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.config.RecoveryDurationGap = prevRecoveryDurationGap + return + } + log.Info("evict-slow-trend-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap)) + handler.rd.JSON(w, http.StatusOK, "Config updated.") +} + +func (handler *evictSlowTrendHandler) ListConfig(w http.ResponseWriter, r *http.Request) { + conf := handler.config.Clone() + handler.rd.JSON(w, http.StatusOK, conf) +} + +type evictSlowTrendScheduler struct { + *BaseScheduler + conf *evictSlowTrendSchedulerConfig + handler http.Handler +} + +func (s *evictSlowTrendScheduler) GetNextInterval(interval time.Duration) time.Duration { + var growthType intervalGrowthType + // If it already found a slow node as candidate, the next interval should be shorter + // to make the next scheduling as soon as possible. This adjustment will decrease the + // response time, as heartbeats from other nodes will be received and updated more quickly. + if s.conf.hasEvictedStores() { + growthType = zeroGrowth + } else { + growthType = exponentialGrowth + } + return intervalGrow(s.GetMinInterval(), MaxScheduleInterval, growthType) +} + +func (s *evictSlowTrendScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) +} + +func (s *evictSlowTrendScheduler) GetName() string { + return EvictSlowTrendName +} + +func (s *evictSlowTrendScheduler) GetType() string { + return EvictSlowTrendType +} + +func (s *evictSlowTrendScheduler) EncodeConfig() ([]byte, error) { + return EncodeConfig(s.conf) +} + +func (s *evictSlowTrendScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() + cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &evictSlowTrendSchedulerConfig{} + if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + old := make(map[uint64]struct{}) + for _, id := range s.conf.EvictedStores { + old[id] = struct{}{} + } + new := make(map[uint64]struct{}) + for _, id := range newCfg.EvictedStores { + new[id] = struct{}{} + } + pauseAndResumeLeaderTransfer(s.conf.cluster, old, new) + s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap + s.conf.EvictedStores = newCfg.EvictedStores + return nil +} + +func (s *evictSlowTrendScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { + evictedStoreID := s.conf.evictedStore() + if evictedStoreID == 0 { + return nil + } + return cluster.SlowTrendEvicted(evictedStoreID) +} + +func (s *evictSlowTrendScheduler) CleanConfig(cluster sche.SchedulerCluster) { + s.cleanupEvictLeader(cluster) +} + +func (s *evictSlowTrendScheduler) prepareEvictLeader(cluster sche.SchedulerCluster, storeID uint64) error { + err := s.conf.setStoreAndPersist(storeID) + if err != nil { + log.Info("evict-slow-trend-scheduler persist config failed", zap.Uint64("store-id", storeID)) + return err + } + return cluster.SlowTrendEvicted(storeID) +} + +func (s *evictSlowTrendScheduler) cleanupEvictLeader(cluster sche.SchedulerCluster) { + evictedStoreID, err := s.conf.clearAndPersist(cluster) + if err != nil { + log.Info("evict-slow-trend-scheduler persist config failed", zap.Uint64("store-id", evictedStoreID)) + } + if evictedStoreID != 0 { + // Assertion: evictStoreID == s.conf.LastEvictCandidate.storeID + s.conf.markCandidateRecovered() + cluster.SlowTrendRecovered(evictedStoreID) + } +} + +func (s *evictSlowTrendScheduler) scheduleEvictLeader(cluster sche.SchedulerCluster) []*operator.Operator { + store := cluster.GetStore(s.conf.evictedStore()) + if store == nil { + return nil + } + storeSlowTrendEvictedStatusGauge.WithLabelValues(store.GetAddress(), strconv.FormatUint(store.GetID(), 10)).Set(1) + return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) +} + +func (s *evictSlowTrendScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + if s.conf.evictedStore() == 0 { + return true + } + allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() + if !allowed { + operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpLeader.String()).Inc() + } + return allowed +} + +func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { + schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc() + + var ops []*operator.Operator + + if s.conf.evictedStore() != 0 { + store := cluster.GetStore(s.conf.evictedStore()) + if store == nil || store.IsRemoved() { + // Previous slow store had been removed, remove the scheduler and check + // slow node next time. + log.Info("store evicted by slow trend has been removed", zap.Uint64("store-id", store.GetID())) + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "stop_removed").Inc() + } else if checkStoreCanRecover(cluster, store) && s.conf.readyForRecovery() { + log.Info("store evicted by slow trend has been recovered", zap.Uint64("store-id", store.GetID())) + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "stop_recovered").Inc() + } else { + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "continue").Inc() + return s.scheduleEvictLeader(cluster), nil + } + s.cleanupEvictLeader(cluster) + return ops, nil + } + + candFreshCaptured := false + if s.conf.candidate() == 0 { + candidate := chooseEvictCandidate(cluster, s.conf.lastCapturedCandidate()) + if candidate != nil { + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "captured").Inc() + s.conf.captureCandidate(candidate.GetID()) + candFreshCaptured = true + } + } else { + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "continue").Inc() + } + + slowStoreID := s.conf.candidate() + if slowStoreID == 0 { + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none").Inc() + return ops, nil + } + + slowStore := cluster.GetStore(slowStoreID) + if !candFreshCaptured && checkStoreFasterThanOthers(cluster, slowStore) { + s.conf.popCandidate(false) + log.Info("slow store candidate by trend has been cancel", zap.Uint64("store-id", slowStoreID)) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "canceled_too_faster").Inc() + return ops, nil + } + if slowStoreRecordTS := s.conf.captureTS(); !checkStoresAreUpdated(cluster, slowStoreID, slowStoreRecordTS) { + log.Info("slow store candidate waiting for other stores to update heartbeats", zap.Uint64("store-id", slowStoreID)) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "wait").Inc() + return ops, nil + } + + candCapturedSecs := s.conf.candidateCapturedSecs() + log.Info("detected slow store by trend, start to evict leaders", + zap.Uint64("store-id", slowStoreID), + zap.Uint64("candidate-captured-secs", candCapturedSecs)) + storeSlowTrendMiscGauge.WithLabelValues("candidate", "captured_secs").Set(float64(candCapturedSecs)) + if err := s.prepareEvictLeader(cluster, s.conf.popCandidate(true)); err != nil { + log.Info("prepare for evicting leader by slow trend failed", zap.Error(err), zap.Uint64("store-id", slowStoreID)) + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "prepare_err").Inc() + return ops, nil + } + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "start").Inc() + return s.scheduleEvictLeader(cluster), nil +} + +func newEvictSlowTrendScheduler(opController *operator.Controller, conf *evictSlowTrendSchedulerConfig) Scheduler { + handler := newEvictSlowTrendHandler(conf) + return &evictSlowTrendScheduler{ + BaseScheduler: NewBaseScheduler(opController), + conf: conf, + handler: handler, + } +} + +func chooseEvictCandidate(cluster sche.SchedulerCluster, lastEvictCandidate *slowCandidate) (slowStore *core.StoreInfo) { + isRaftKV2 := cluster.GetStoreConfig().IsRaftKV2() + failpoint.Inject("mockRaftKV2", func() { + isRaftKV2 = true + }) + stores := cluster.GetStores() + if len(stores) < 3 { + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_too_few").Inc() + return + } + + var candidates []*core.StoreInfo + var affectedStoreCount int + for _, store := range stores { + if store.IsRemoved() { + continue + } + if !(store.IsPreparing() || store.IsServing()) { + continue + } + if slowTrend := store.GetSlowTrend(); slowTrend != nil { + if slowTrend.ResultRate < -alterEpsilon { + affectedStoreCount += 1 + } + // For the cases of disk io jitters. + // Normally, if there exists jitters on disk io or network io, the slow store must have a descending + // trend on QPS and ascending trend on duration. So, the slowTrend must match the following pattern. + if slowTrend.CauseRate > alterEpsilon && slowTrend.ResultRate < -alterEpsilon { + candidates = append(candidates, store) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "add").Inc() + log.Info("evict-slow-trend-scheduler pre-captured candidate", + zap.Uint64("store-id", store.GetID()), + zap.Float64("cause-rate", slowTrend.CauseRate), + zap.Float64("result-rate", slowTrend.ResultRate), + zap.Float64("cause-value", slowTrend.CauseValue), + zap.Float64("result-value", slowTrend.ResultValue)) + } else if isRaftKV2 && slowTrend.CauseRate > alterEpsilon { + // Meanwhile, if the store was previously experiencing slowness in the `Duration` dimension, it should + // re-check whether this node is still encountering network I/O-related jitters. And If this node matches + // the last identified candidate, it indicates that the node is still being affected by delays in network I/O, + // and consequently, it should be re-designated as slow once more. + // Prerequisite: `raft-kv2` engine has the ability to percept the slow trend on network io jitters. + // TODO: maybe make it compatible to `raft-kv` later. + if lastEvictCandidate != nil && lastEvictCandidate.storeID == store.GetID() && DurationSinceAsSecs(lastEvictCandidate.recoverTS) <= minReCheckDurationGap { + candidates = append(candidates, store) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "add").Inc() + log.Info("evict-slow-trend-scheduler pre-captured candidate in raft-kv2 cluster", + zap.Uint64("store-id", store.GetID()), + zap.Float64("cause-rate", slowTrend.CauseRate), + zap.Float64("result-rate", slowTrend.ResultRate), + zap.Float64("cause-value", slowTrend.CauseValue), + zap.Float64("result-value", slowTrend.ResultValue)) + } + } + } + } + if len(candidates) == 0 { + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_no_fit").Inc() + return + } + // TODO: Calculate to judge if one store is way slower than the others + if len(candidates) > 1 { + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_too_many").Inc() + return + } + + store := candidates[0] + + affectedStoreThreshold := int(float64(len(stores)) * cluster.GetSchedulerConfig().GetSlowStoreEvictingAffectedStoreRatioThreshold()) + if affectedStoreCount < affectedStoreThreshold { + log.Info("evict-slow-trend-scheduler failed to confirm candidate: it only affect a few stores", zap.Uint64("store-id", store.GetID())) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_affect_a_few").Inc() + return + } + + if !checkStoreSlowerThanOthers(cluster, store) { + log.Info("evict-slow-trend-scheduler failed to confirm candidate: it's not slower than others", zap.Uint64("store-id", store.GetID())) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_not_slower").Inc() + return + } + + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "add").Inc() + log.Info("evict-slow-trend-scheduler captured candidate", zap.Uint64("store-id", store.GetID())) + return store +} + +func checkStoresAreUpdated(cluster sche.SchedulerCluster, slowStoreID uint64, slowStoreRecordTS time.Time) bool { + stores := cluster.GetStores() + if len(stores) <= 1 { + return false + } + expected := (len(stores) + 1) / 2 + updatedStores := 0 + for _, store := range stores { + if store.IsRemoved() { + updatedStores += 1 + continue + } + if !(store.IsPreparing() || store.IsServing()) { + updatedStores += 1 + continue + } + if store.GetID() == slowStoreID { + updatedStores += 1 + continue + } + if slowStoreRecordTS.Compare(store.GetLastHeartbeatTS()) <= 0 { + updatedStores += 1 + } + } + storeSlowTrendMiscGauge.WithLabelValues("store", "check_updated_count").Set(float64(updatedStores)) + storeSlowTrendMiscGauge.WithLabelValues("store", "check_updated_expected").Set(float64(expected)) + return updatedStores >= expected +} + +func checkStoreSlowerThanOthers(cluster sche.SchedulerCluster, target *core.StoreInfo) bool { + stores := cluster.GetStores() + expected := (len(stores)*2 + 1) / 3 + targetSlowTrend := target.GetSlowTrend() + if targetSlowTrend == nil { + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "check_slower_no_data").Inc() + return false + } + slowerThanStoresNum := 0 + for _, store := range stores { + if store.IsRemoved() { + continue + } + if !(store.IsPreparing() || store.IsServing()) { + continue + } + if store.GetID() == target.GetID() { + continue + } + slowTrend := store.GetSlowTrend() + // Use `SlowTrend.ResultValue` at first, but not good, `CauseValue` is better + // Greater `CuaseValue` means slower + if slowTrend != nil && (targetSlowTrend.CauseValue-slowTrend.CauseValue) > alterEpsilon && slowTrend.CauseValue > alterEpsilon { + slowerThanStoresNum += 1 + } + } + storeSlowTrendMiscGauge.WithLabelValues("store", "check_slower_count").Set(float64(slowerThanStoresNum)) + storeSlowTrendMiscGauge.WithLabelValues("store", "check_slower_expected").Set(float64(expected)) + return slowerThanStoresNum >= expected +} + +func checkStoreCanRecover(cluster sche.SchedulerCluster, target *core.StoreInfo) bool { + /* + // + // This might not be necessary, + // and it also have tiny chances to cause `stuck in evicted` + // status when this store restarted, + // the `become fast` might be ignore on tikv side + // because of the detecting windows are not fully filled yet. + // Hence, we disabled this event capturing by now but keep the code here for further checking. + // + + // Wait for the evicted store's `become fast` event + slowTrend := target.GetSlowTrend() + if slowTrend == nil || slowTrend.CauseRate >= 0 && slowTrend.ResultRate <= 0 { + storeSlowTrendActionStatusGauge.WithLabelValues("recover.reject:no-event").Inc() + return false + } else { + storeSlowTrendActionStatusGauge.WithLabelValues("recover.judging:got-event").Inc() + } + */ + return checkStoreFasterThanOthers(cluster, target) +} + +func checkStoreFasterThanOthers(cluster sche.SchedulerCluster, target *core.StoreInfo) bool { + stores := cluster.GetStores() + expected := (len(stores) + 1) / 2 + targetSlowTrend := target.GetSlowTrend() + if targetSlowTrend == nil { + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "check_faster_no_data").Inc() + return false + } + fasterThanStores := 0 + for _, store := range stores { + if store.IsRemoved() { + continue + } + if !(store.IsPreparing() || store.IsServing()) { + continue + } + if store.GetID() == target.GetID() { + continue + } + slowTrend := store.GetSlowTrend() + // Greater `CauseValue` means slower + if slowTrend != nil && targetSlowTrend.CauseValue <= slowTrend.CauseValue*1.1 && + slowTrend.CauseValue > alterEpsilon && targetSlowTrend.CauseValue > alterEpsilon { + fasterThanStores += 1 + } + } + storeSlowTrendMiscGauge.WithLabelValues("store", "check_faster_count").Set(float64(fasterThanStores)) + storeSlowTrendMiscGauge.WithLabelValues("store", "check_faster_expected").Set(float64(expected)) + return fasterThanStores >= expected +} + +// DurationSinceAsSecs returns the duration gap since the given startTS, unit: s. +func DurationSinceAsSecs(startTS time.Time) uint64 { + return uint64(time.Since(startTS).Seconds()) +} diff --git a/server/api/router.go b/server/api/router.go index eb87ef05bc2..9eae8ff3d77 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -84,10 +84,10 @@ func getFunctionName(f interface{}) string { // @BasePath /pd/api/v1 func createRouter(prefix string, svr *server.Server) *mux.Router { serviceMiddle := newServiceMiddlewareBuilder(svr) - registerPrefix := func(router *mux.Router, prefixPath string, + registerPrefix := func(router *mux.Router, prefixPath, name string, handleFunc func(http.ResponseWriter, *http.Request), opts ...createRouteOption) { routeCreateFunc(router.PathPrefix(prefixPath), serviceMiddle.createHandler(handleFunc), - getFunctionName(handleFunc), opts...) + name, opts...) } registerFunc := func(router *mux.Router, path string, handleFunc func(http.ResponseWriter, *http.Request), opts ...createRouteOption) { @@ -147,7 +147,8 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(clusterRouter, "/schedulers/diagnostic/{name}", diagnosticHandler.GetDiagnosticResult, setMethods(http.MethodGet), setAuditBackend(prometheus)) schedulerConfigHandler := newSchedulerConfigHandler(svr, rd) - registerPrefix(apiRouter, "/scheduler-config", schedulerConfigHandler.GetSchedulerConfig, setAuditBackend(prometheus)) + registerPrefix(apiRouter, "/scheduler-config", "HandleSchedulerConfig", schedulerConfigHandler.HandleSchedulerConfig, setMethods(http.MethodPost, http.MethodDelete, http.MethodPut, http.MethodPatch), setAuditBackend(localLog, prometheus)) + registerPrefix(apiRouter, "/scheduler-config", "GetSchedulerConfig", schedulerConfigHandler.HandleSchedulerConfig, setMethods(http.MethodGet), setAuditBackend(prometheus)) clusterHandler := newClusterHandler(svr, rd) registerFunc(apiRouter, "/cluster", clusterHandler.GetCluster, setMethods(http.MethodGet), setAuditBackend(prometheus)) @@ -361,9 +362,14 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { unsafeOperationHandler.GetFailedStoresRemovalStatus, setMethods(http.MethodGet), setAuditBackend(prometheus)) // API to set or unset failpoints +<<<<<<< HEAD failpoint.Inject("enableFailpointAPI", func() { // this function will be named to "func2". It may be used in test registerPrefix(apiRouter, "/fail", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +======= + if enableFailPointAPI { + registerPrefix(apiRouter, "/fail", "FailPoint", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +>>>>>>> 8b8c78a78 (scheduler: add aduit log for scheduler config API and add resp msg for evict-leader (#7674)) // The HTTP handler of failpoint requires the full path to be the failpoint path. r.URL.Path = strings.TrimPrefix(r.URL.Path, prefix+apiPrefix+"/fail") new(failpoint.HttpHandler).ServeHTTP(w, r) diff --git a/server/api/scheduler.go b/server/api/scheduler.go index 9b690a93249..8cbd19f167c 100644 --- a/server/api/scheduler.go +++ b/server/api/scheduler.go @@ -357,7 +357,7 @@ func newSchedulerConfigHandler(svr *server.Server, rd *render.Render) *scheduler } } -func (h *schedulerConfigHandler) GetSchedulerConfig(w http.ResponseWriter, r *http.Request) { +func (h *schedulerConfigHandler) HandleSchedulerConfig(w http.ResponseWriter, r *http.Request) { handler := h.svr.GetHandler() sh, err := handler.GetSchedulerConfigHandler() if err == nil && sh != nil { diff --git a/server/api/server_test.go b/server/api/server_test.go index bdf929b17b3..6471e344414 100644 --- a/server/api/server_test.go +++ b/server/api/server_test.go @@ -172,9 +172,18 @@ func (suite *serviceTestSuite) TestServiceLabels() { suite.Equal("Profile", serviceLabel) accessPaths = suite.svr.GetServiceLabels("GetSchedulerConfig") +<<<<<<< HEAD suite.Len(accessPaths, 1) suite.Equal("/pd/api/v1/scheduler-config", accessPaths[0].Path) suite.Equal("", accessPaths[0].Method) +======= + re.Len(accessPaths, 1) + re.Equal("/pd/api/v1/scheduler-config", accessPaths[0].Path) + re.Equal("GET", accessPaths[0].Method) + accessPaths = suite.svr.GetServiceLabels("HandleSchedulerConfig") + re.Len(accessPaths, 4) + re.Equal("/pd/api/v1/scheduler-config", accessPaths[0].Path) +>>>>>>> 8b8c78a78 (scheduler: add aduit log for scheduler config API and add resp msg for evict-leader (#7674)) accessPaths = suite.svr.GetServiceLabels("ResignLeader") suite.Len(accessPaths, 1) diff --git a/server/schedulers/evict_leader.go b/server/schedulers/evict_leader.go index 716b848b402..cbefceeb020 100644 --- a/server/schedulers/evict_leader.go +++ b/server/schedulers/evict_leader.go @@ -397,7 +397,7 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - handler.rd.JSON(w, http.StatusOK, nil) + handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.") } func (handler *evictLeaderHandler) ListConfig(w http.ResponseWriter, r *http.Request) { diff --git a/server/schedulers/grant_leader.go b/server/schedulers/grant_leader.go index bac48675323..18be38f16a2 100644 --- a/server/schedulers/grant_leader.go +++ b/server/schedulers/grant_leader.go @@ -301,7 +301,7 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - handler.rd.JSON(w, http.StatusOK, nil) + handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.") } func (handler *grantLeaderHandler) ListConfig(w http.ResponseWriter, r *http.Request) { diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go new file mode 100644 index 00000000000..ab96d430523 --- /dev/null +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -0,0 +1,716 @@ +// Copyright 2019 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler_test + +import ( + "encoding/json" + "fmt" + "reflect" + "strings" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/spf13/cobra" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/core" + sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/slice" + "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/versioninfo" + pdTests "github.com/tikv/pd/tests" + ctl "github.com/tikv/pd/tools/pd-ctl/pdctl" + "github.com/tikv/pd/tools/pd-ctl/tests" +) + +type schedulerTestSuite struct { + suite.Suite + env *pdTests.SchedulingTestEnvironment + defaultSchedulers []string +} + +func TestSchedulerTestSuite(t *testing.T) { + suite.Run(t, new(schedulerTestSuite)) +} + +func (suite *schedulerTestSuite) SetupSuite() { + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/skipStoreConfigSync", `return(true)`)) + suite.env = pdTests.NewSchedulingTestEnvironment(suite.T()) + suite.defaultSchedulers = []string{ + "balance-leader-scheduler", + "balance-region-scheduler", + "balance-hot-region-scheduler", + "balance-witness-scheduler", + "transfer-witness-leader-scheduler", + "evict-slow-store-scheduler", + } +} + +func (suite *schedulerTestSuite) TearDownSuite() { + re := suite.Require() + suite.env.Cleanup() + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/skipStoreConfigSync")) +} + +func (suite *schedulerTestSuite) TearDownTest() { + cleanFunc := func(cluster *pdTests.TestCluster) { + re := suite.Require() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := ctl.GetRootCmd() + + var currentSchedulers []string + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, ¤tSchedulers) + for _, scheduler := range suite.defaultSchedulers { + if slice.NoneOf(currentSchedulers, func(i int) bool { + return currentSchedulers[i] == scheduler + }) { + echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", scheduler}, nil) + re.Contains(echo, "Success!") + } + } + for _, scheduler := range currentSchedulers { + if slice.NoneOf(suite.defaultSchedulers, func(i int) bool { + return suite.defaultSchedulers[i] == scheduler + }) { + echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", scheduler}, nil) + re.Contains(echo, "Success!") + } + } + } + suite.env.RunFuncInTwoModes(cleanFunc) +} + +func (suite *schedulerTestSuite) TestScheduler() { + suite.env.RunTestInTwoModes(suite.checkScheduler) +} + +func (suite *schedulerTestSuite) checkScheduler(cluster *pdTests.TestCluster) { + re := suite.Require() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := ctl.GetRootCmd() + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 4, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + } + + mustUsage := func(args []string) { + output, err := tests.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "Usage") + } + + checkSchedulerCommand := func(args []string, expected map[string]bool) { + if args != nil { + echo := mustExec(re, cmd, args, nil) + re.Contains(echo, "Success!") + } + testutil.Eventually(re, func() bool { + var schedulers []string + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, &schedulers) + if len(schedulers) != len(expected) { + return false + } + for _, scheduler := range schedulers { + if _, ok := expected[scheduler]; !ok { + return false + } + } + return true + }) + } + + checkSchedulerConfigCommand := func(expectedConfig map[string]interface{}, schedulerName string) { + testutil.Eventually(re, func() bool { + configInfo := make(map[string]interface{}) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName}, &configInfo) + return reflect.DeepEqual(expectedConfig, configInfo) + }) + } + + leaderServer := cluster.GetLeaderServer() + for _, store := range stores { + pdTests.MustPutStore(re, cluster, store) + } + + // note: because pdqsort is a unstable sort algorithm, set ApproximateSize for this region. + pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) + + // scheduler show command + expected := map[string]bool{ + "balance-region-scheduler": true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(nil, expected) + + // scheduler delete command + args := []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(args, expected) + + // avoid the influence of the scheduler order + schedulers := []string{"evict-leader-scheduler", "grant-leader-scheduler", "evict-leader-scheduler", "grant-leader-scheduler"} + + checkStorePause := func(changedStores []uint64, schedulerName string) { + status := func() string { + switch schedulerName { + case "evict-leader-scheduler": + return "paused" + case "grant-leader-scheduler": + return "resumed" + default: + re.Fail(fmt.Sprintf("unknown scheduler %s", schedulerName)) + return "" + } + }() + for _, store := range stores { + isStorePaused := !cluster.GetLeaderServer().GetRaftCluster().GetStore(store.GetId()).AllowLeaderTransfer() + if slice.AnyOf(changedStores, func(i int) bool { + return store.GetId() == changedStores[i] + }) { + re.True(isStorePaused, + fmt.Sprintf("store %d should be %s with %s", store.GetId(), status, schedulerName)) + } else { + re.False(isStorePaused, + fmt.Sprintf("store %d should not be %s with %s", store.GetId(), status, schedulerName)) + } + if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { + re.Equal(isStorePaused, !sche.GetCluster().GetStore(store.GetId()).AllowLeaderTransfer()) + } + } + } + + for idx := range schedulers { + checkStorePause([]uint64{}, schedulers[idx]) + // scheduler add command + args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "2"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + schedulers[idx]: true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(args, expected) + + // scheduler config show command + expectedConfig := make(map[string]interface{}) + expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} + checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2}, schedulers[idx]) + + // scheduler config update command + args = []string{"-u", pdAddr, "scheduler", "config", schedulers[idx], "add-store", "3"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + schedulers[idx]: true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + } + + // check update success + checkSchedulerCommand(args, expected) + expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}, "3": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} + checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2, 3}, schedulers[idx]) + + // scheduler delete command + args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx]} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(args, expected) + checkStorePause([]uint64{}, schedulers[idx]) + + // scheduler add command + args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "2"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + schedulers[idx]: true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(args, expected) + checkStorePause([]uint64{2}, schedulers[idx]) + + // scheduler add command twice + args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "4"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + schedulers[idx]: true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(args, expected) + + // check add success + expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}, "4": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} + checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2, 4}, schedulers[idx]) + + // scheduler remove command [old] + args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx] + "-4"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + schedulers[idx]: true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(args, expected) + + // check remove success + expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} + checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2}, schedulers[idx]) + + // scheduler remove command, when remove the last store, it should remove whole scheduler + args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx] + "-2"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(args, expected) + checkStorePause([]uint64{}, schedulers[idx]) + } + + // test shuffle region config + checkSchedulerCommand([]string{"-u", pdAddr, "scheduler", "add", "shuffle-region-scheduler"}, map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "shuffle-region-scheduler": true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + }) + var roles []string + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) + re.Equal([]string{"leader", "follower", "learner"}, roles) + echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "set-roles", "learner"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) + return reflect.DeepEqual([]string{"learner"}, roles) + }) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler"}, &roles) + re.Equal([]string{"learner"}, roles) + + // test grant hot region scheduler config + checkSchedulerCommand([]string{"-u", pdAddr, "scheduler", "add", "grant-hot-region-scheduler", "1", "1,2,3"}, map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "shuffle-region-scheduler": true, + "grant-hot-region-scheduler": true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + }) + var conf3 map[string]interface{} + expected3 := map[string]interface{}{ + "store-id": []interface{}{float64(1), float64(2), float64(3)}, + "store-leader-id": float64(1), + } + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) + re.Equal(expected3, conf3) + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler", "set", "2", "1,2,3"}, nil) + re.Contains(echo, "Success!") + expected3["store-leader-id"] = float64(2) + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) + return reflect.DeepEqual(expected3, conf3) + }) + + // test remove and add scheduler + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) + re.NotContains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}, nil) + re.Contains(echo, "Success! The scheduler is created.") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "2"}, nil) + re.Contains(echo, "Success! The scheduler has been applied to the store.") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-2"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) + re.Contains(echo, "404") + testutil.Eventually(re, func() bool { // wait for removed scheduler to be synced to scheduling server. + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler"}, nil) + return strings.Contains(echo, "[404] scheduler not found") + }) + + // test hot region config + expected1 := map[string]interface{}{ + "min-hot-byte-rate": float64(100), + "min-hot-key-rate": float64(10), + "min-hot-query-rate": float64(10), + "max-zombie-rounds": float64(3), + "max-peer-number": float64(1000), + "byte-rate-rank-step-ratio": 0.05, + "key-rate-rank-step-ratio": 0.05, + "query-rate-rank-step-ratio": 0.05, + "count-rank-step-ratio": 0.01, + "great-dec-ratio": 0.95, + "minor-dec-ratio": 0.99, + "src-tolerance-ratio": 1.05, + "dst-tolerance-ratio": 1.05, + "read-priorities": []interface{}{"byte", "key"}, + "write-leader-priorities": []interface{}{"key", "byte"}, + "write-peer-priorities": []interface{}{"byte", "key"}, + "strict-picking-store": "true", + "enable-for-tiflash": "true", + "rank-formula-version": "v2", + "split-thresholds": 0.2, + } + checkHotSchedulerConfig := func(expect map[string]interface{}) { + testutil.Eventually(re, func() bool { + var conf1 map[string]interface{} + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + return reflect.DeepEqual(expect, conf1) + }) + } + + var conf map[string]interface{} + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "list"}, &conf) + re.Equal(expected1, conf) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "show"}, &conf) + re.Equal(expected1, conf) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "src-tolerance-ratio", "1.02"}, nil) + re.Contains(echo, "Success!") + expected1["src-tolerance-ratio"] = 1.02 + checkHotSchedulerConfig(expected1) + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,key"}, nil) + re.Contains(echo, "Success!") + expected1["read-priorities"] = []interface{}{"byte", "key"} + checkHotSchedulerConfig(expected1) + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key"}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,byte"}, nil) + re.Contains(echo, "Success!") + expected1["read-priorities"] = []interface{}{"key", "byte"} + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "foo,bar"}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", ""}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key"}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,byte"}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key,byte"}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + + // write-priorities is divided into write-leader-priorities and write-peer-priorities + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) + re.Contains(echo, "Failed!") + re.Contains(echo, "Config item is not found.") + checkHotSchedulerConfig(expected1) + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v0"}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + expected1["rank-formula-version"] = "v2" + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v2"}, nil) + re.Contains(echo, "Success!") + checkHotSchedulerConfig(expected1) + expected1["rank-formula-version"] = "v1" + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v1"}, nil) + re.Contains(echo, "Success!") + checkHotSchedulerConfig(expected1) + + expected1["forbid-rw-type"] = "read" + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "forbid-rw-type", "read"}, nil) + re.Contains(echo, "Success!") + checkHotSchedulerConfig(expected1) + + // test compatibility + re.Equal("2.0.0", leaderServer.GetClusterVersion().String()) + for _, store := range stores { + version := versioninfo.HotScheduleWithQuery + store.Version = versioninfo.MinSupportedVersion(version).String() + store.LastHeartbeat = time.Now().UnixNano() + pdTests.MustPutStore(re, cluster, store) + } + re.Equal("5.2.0", leaderServer.GetClusterVersion().String()) + // After upgrading, we should not use query. + checkHotSchedulerConfig(expected1) + // cannot set qps as write-peer-priorities + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil) + re.Contains(echo, "query is not allowed to be set in priorities for write-peer-priorities") + checkHotSchedulerConfig(expected1) + + // test remove and add + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil) + re.Contains(echo, "Success") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-hot-region-scheduler"}, nil) + re.Contains(echo, "Success") + + // test balance leader config + conf = make(map[string]interface{}) + conf1 := make(map[string]interface{}) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "show"}, &conf) + re.Equal(4., conf["batch"]) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "set", "batch", "3"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler"}, &conf1) + return conf1["batch"] == 3. + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) + re.NotContains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-leader-scheduler"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-leader-scheduler"}, nil) + re.Contains(echo, "404") + re.Contains(echo, "PD:scheduler:ErrSchedulerNotFound]scheduler not found") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler"}, nil) + re.Contains(echo, "404") + re.Contains(echo, "scheduler not found") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) + re.Contains(echo, "Success!") + + // test evict-slow-store && evict-slow-trend schedulers config + evictSlownessSchedulers := []string{"evict-slow-store-scheduler", "evict-slow-trend-scheduler"} + for _, schedulerName := range evictSlownessSchedulers { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", schedulerName}, nil) + if strings.Contains(echo, "Success!") { + re.Contains(echo, "Success!") + } else { + re.Contains(echo, "scheduler existed") + } + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return strings.Contains(echo, schedulerName) + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "set", "recovery-duration", "100"}, nil) + re.Contains(echo, "Success! Config updated.") + conf = make(map[string]interface{}) + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "show"}, &conf) + return conf["recovery-duration"] == 100. + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", schedulerName}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return !strings.Contains(echo, schedulerName) + }) + } + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-slow-store-scheduler"}, nil) + re.Contains(echo, "Success!") + + // test shuffle hot region scheduler + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "shuffle-hot-region-scheduler"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return strings.Contains(echo, "shuffle-hot-region-scheduler") + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-hot-region-scheduler", "set", "limit", "127"}, nil) + re.Contains(echo, "Success!") + conf = make(map[string]interface{}) + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-hot-region-scheduler", "show"}, &conf) + return conf["limit"] == 127. + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "shuffle-hot-region-scheduler"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return !strings.Contains(echo, "shuffle-hot-region-scheduler") + }) + + // test show scheduler with paused and disabled status. + checkSchedulerWithStatusCommand := func(status string, expected []string) { + testutil.Eventually(re, func() bool { + var schedulers []string + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show", "--status", status}, &schedulers) + return reflect.DeepEqual(expected, schedulers) + }) + } + + mustUsage([]string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler"}) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + re.Contains(echo, "Success!") + checkSchedulerWithStatusCommand("paused", []string{ + "balance-leader-scheduler", + }) + result := make(map[string]interface{}) + testutil.Eventually(re, func() bool { + mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", "balance-leader-scheduler"}, &result) + return len(result) != 0 && result["status"] == "paused" && result["summary"] == "" + }, testutil.WithWaitFor(30*time.Second)) + + mustUsage([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler", "60"}) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) + re.Contains(echo, "Success!") + checkSchedulerWithStatusCommand("paused", []string{}) + + // set label scheduler to disabled manually. + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "label-scheduler"}, nil) + re.Contains(echo, "Success!") + cfg := leaderServer.GetServer().GetScheduleConfig() + origin := cfg.Schedulers + cfg.Schedulers = sc.SchedulerConfigs{{Type: "label", Disable: true}} + err := leaderServer.GetServer().SetScheduleConfig(*cfg) + re.NoError(err) + checkSchedulerWithStatusCommand("disabled", []string{"label-scheduler"}) + // reset Schedulers in ScheduleConfig + cfg.Schedulers = origin + err = leaderServer.GetServer().SetScheduleConfig(*cfg) + re.NoError(err) + checkSchedulerWithStatusCommand("disabled", []string{}) +} + +func (suite *schedulerTestSuite) TestSchedulerDiagnostic() { + suite.env.RunTestInTwoModes(suite.checkSchedulerDiagnostic) +} + +func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *pdTests.TestCluster) { + re := suite.Require() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := ctl.GetRootCmd() + + checkSchedulerDescribeCommand := func(schedulerName, expectedStatus, expectedSummary string) { + result := make(map[string]interface{}) + testutil.Eventually(re, func() bool { + mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", schedulerName}, &result) + return len(result) != 0 && expectedStatus == result["status"] && expectedSummary == result["summary"] + }, testutil.WithTickInterval(50*time.Millisecond)) + } + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 4, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + } + for _, store := range stores { + pdTests.MustPutStore(re, cluster, store) + } + + // note: because pdqsort is an unstable sort algorithm, set ApproximateSize for this region. + pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) + + echo := mustExec(re, cmd, []string{"-u", pdAddr, "config", "set", "enable-diagnostic", "true"}, nil) + re.Contains(echo, "Success!") + checkSchedulerDescribeCommand("balance-region-scheduler", "pending", "1 store(s) RegionNotMatchRule; ") + + // scheduler delete command + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) + re.Contains(echo, "Success!") + checkSchedulerDescribeCommand("balance-region-scheduler", "disabled", "") + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) + re.Contains(echo, "Success!") + checkSchedulerDescribeCommand("balance-leader-scheduler", "normal", "") +} + +func mustExec(re *require.Assertions, cmd *cobra.Command, args []string, v interface{}) string { + output, err := tests.ExecuteCommand(cmd, args...) + re.NoError(err) + if v == nil { + return string(output) + } + re.NoError(json.Unmarshal(output, v), string(output)) + return "" +} + +func mightExec(re *require.Assertions, cmd *cobra.Command, args []string, v interface{}) { + output, err := tests.ExecuteCommand(cmd, args...) + re.NoError(err) + if v == nil { + return + } + json.Unmarshal(output, v) +} From c726ae756158c92d49b66571a77275af681fa78f Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Wed, 10 Jan 2024 10:37:59 +0800 Subject: [PATCH 2/2] fix conflict Signed-off-by: Cabinfever_B --- pkg/schedule/schedulers/evict_slow_store.go | 343 --------- pkg/schedule/schedulers/evict_slow_trend.go | 667 ---------------- server/api/router.go | 5 - server/api/server_test.go | 12 +- .../pd-ctl/tests/scheduler/scheduler_test.go | 716 ------------------ 5 files changed, 3 insertions(+), 1740 deletions(-) delete mode 100644 pkg/schedule/schedulers/evict_slow_store.go delete mode 100644 pkg/schedule/schedulers/evict_slow_trend.go delete mode 100644 tools/pd-ctl/tests/scheduler/scheduler_test.go diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go deleted file mode 100644 index 79715a6fd44..00000000000 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ /dev/null @@ -1,343 +0,0 @@ -// Copyright 2021 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package schedulers - -import ( - "net/http" - "time" - - "github.com/gorilla/mux" - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/log" - "github.com/tikv/pd/pkg/core" - sche "github.com/tikv/pd/pkg/schedule/core" - "github.com/tikv/pd/pkg/schedule/operator" - "github.com/tikv/pd/pkg/schedule/plan" - "github.com/tikv/pd/pkg/storage/endpoint" - "github.com/tikv/pd/pkg/utils/apiutil" - "github.com/tikv/pd/pkg/utils/syncutil" - "github.com/unrolled/render" - "go.uber.org/zap" -) - -const ( - // EvictSlowStoreName is evict leader scheduler name. - EvictSlowStoreName = "evict-slow-store-scheduler" - // EvictSlowStoreType is evict leader scheduler type. - EvictSlowStoreType = "evict-slow-store" - - slowStoreEvictThreshold = 100 - slowStoreRecoverThreshold = 1 -) - -// WithLabelValues is a heavy operation, define variable to avoid call it every time. -var evictSlowStoreCounter = schedulerCounter.WithLabelValues(EvictSlowStoreName, "schedule") - -type evictSlowStoreSchedulerConfig struct { - syncutil.RWMutex - cluster *core.BasicCluster - storage endpoint.ConfigStorage - // Last timestamp of the chosen slow store for eviction. - lastSlowStoreCaptureTS time.Time - // Duration gap for recovering the candidate, unit: s. - RecoveryDurationGap uint64 `json:"recovery-duration"` - EvictedStores []uint64 `json:"evict-stores"` -} - -func initEvictSlowStoreSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowStoreSchedulerConfig { - return &evictSlowStoreSchedulerConfig{ - storage: storage, - lastSlowStoreCaptureTS: time.Time{}, - RecoveryDurationGap: defaultRecoveryDurationGap, - EvictedStores: make([]uint64, 0), - } -} - -func (conf *evictSlowStoreSchedulerConfig) Clone() *evictSlowStoreSchedulerConfig { - conf.RLock() - defer conf.RUnlock() - return &evictSlowStoreSchedulerConfig{ - RecoveryDurationGap: conf.RecoveryDurationGap, - } -} - -func (conf *evictSlowStoreSchedulerConfig) persistLocked() error { - name := EvictSlowStoreName - data, err := EncodeConfig(conf) - failpoint.Inject("persistFail", func() { - err = errors.New("fail to persist") - }) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(name, data) -} - -func (conf *evictSlowStoreSchedulerConfig) getStores() []uint64 { - conf.RLock() - defer conf.RUnlock() - return conf.EvictedStores -} - -func (conf *evictSlowStoreSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange { - if conf.evictStore() != id { - return nil - } - return []core.KeyRange{core.NewKeyRange("", "")} -} - -func (conf *evictSlowStoreSchedulerConfig) evictStore() uint64 { - if len(conf.getStores()) == 0 { - return 0 - } - return conf.getStores()[0] -} - -// readyForRecovery checks whether the last cpatured candidate is ready for recovery. -func (conf *evictSlowStoreSchedulerConfig) readyForRecovery() bool { - conf.RLock() - defer conf.RUnlock() - recoveryDurationGap := conf.RecoveryDurationGap - failpoint.Inject("transientRecoveryGap", func() { - recoveryDurationGap = 0 - }) - return uint64(time.Since(conf.lastSlowStoreCaptureTS).Seconds()) >= recoveryDurationGap -} - -func (conf *evictSlowStoreSchedulerConfig) setStoreAndPersist(id uint64) error { - conf.Lock() - defer conf.Unlock() - conf.EvictedStores = []uint64{id} - conf.lastSlowStoreCaptureTS = time.Now() - return conf.persistLocked() -} - -func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err error) { - oldID = conf.evictStore() - conf.Lock() - defer conf.Unlock() - if oldID > 0 { - conf.EvictedStores = []uint64{} - conf.lastSlowStoreCaptureTS = time.Time{} - err = conf.persistLocked() - } - return -} - -type evictSlowStoreHandler struct { - rd *render.Render - config *evictSlowStoreSchedulerConfig -} - -func newEvictSlowStoreHandler(config *evictSlowStoreSchedulerConfig) http.Handler { - h := &evictSlowStoreHandler{ - config: config, - rd: render.New(render.Options{IndentJSON: true}), - } - router := mux.NewRouter() - router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) - router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) - return router -} - -func (handler *evictSlowStoreHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { - var input map[string]interface{} - if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { - return - } - recoveryDurationGapFloat, ok := input["recovery-duration"].(float64) - if !ok { - handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) - return - } - handler.config.Lock() - defer handler.config.Unlock() - prevRecoveryDurationGap := handler.config.RecoveryDurationGap - recoveryDurationGap := uint64(recoveryDurationGapFloat) - handler.config.RecoveryDurationGap = recoveryDurationGap - if err := handler.config.persistLocked(); err != nil { - handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) - handler.config.RecoveryDurationGap = prevRecoveryDurationGap - return - } - log.Info("evict-slow-store-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap)) - handler.rd.JSON(w, http.StatusOK, "Config updated.") -} - -func (handler *evictSlowStoreHandler) ListConfig(w http.ResponseWriter, r *http.Request) { - conf := handler.config.Clone() - handler.rd.JSON(w, http.StatusOK, conf) -} - -type evictSlowStoreScheduler struct { - *BaseScheduler - conf *evictSlowStoreSchedulerConfig - handler http.Handler -} - -func (s *evictSlowStoreScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - s.handler.ServeHTTP(w, r) -} - -func (s *evictSlowStoreScheduler) GetName() string { - return EvictSlowStoreName -} - -func (s *evictSlowStoreScheduler) GetType() string { - return EvictSlowStoreType -} - -func (s *evictSlowStoreScheduler) EncodeConfig() ([]byte, error) { - return EncodeConfig(s.conf) -} - -func (s *evictSlowStoreScheduler) ReloadConfig() error { - s.conf.Lock() - defer s.conf.Unlock() - cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } - newCfg := &evictSlowStoreSchedulerConfig{} - if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { - return err - } - old := make(map[uint64]struct{}) - for _, id := range s.conf.EvictedStores { - old[id] = struct{}{} - } - new := make(map[uint64]struct{}) - for _, id := range newCfg.EvictedStores { - new[id] = struct{}{} - } - pauseAndResumeLeaderTransfer(s.conf.cluster, old, new) - s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap - s.conf.EvictedStores = newCfg.EvictedStores - return nil -} - -func (s *evictSlowStoreScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { - evictStore := s.conf.evictStore() - if evictStore != 0 { - return cluster.SlowStoreEvicted(evictStore) - } - return nil -} - -func (s *evictSlowStoreScheduler) CleanConfig(cluster sche.SchedulerCluster) { - s.cleanupEvictLeader(cluster) -} - -func (s *evictSlowStoreScheduler) prepareEvictLeader(cluster sche.SchedulerCluster, storeID uint64) error { - err := s.conf.setStoreAndPersist(storeID) - if err != nil { - log.Info("evict-slow-store-scheduler persist config failed", zap.Uint64("store-id", storeID)) - return err - } - - return cluster.SlowStoreEvicted(storeID) -} - -func (s *evictSlowStoreScheduler) cleanupEvictLeader(cluster sche.SchedulerCluster) { - evictSlowStore, err := s.conf.clearAndPersist() - if err != nil { - log.Info("evict-slow-store-scheduler persist config failed", zap.Uint64("store-id", evictSlowStore)) - } - if evictSlowStore == 0 { - return - } - cluster.SlowStoreRecovered(evictSlowStore) -} - -func (s *evictSlowStoreScheduler) schedulerEvictLeader(cluster sche.SchedulerCluster) []*operator.Operator { - return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) -} - -func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { - if s.conf.evictStore() != 0 { - allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() - if !allowed { - operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpLeader.String()).Inc() - } - return allowed - } - return true -} - -func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { - evictSlowStoreCounter.Inc() - var ops []*operator.Operator - - if s.conf.evictStore() != 0 { - store := cluster.GetStore(s.conf.evictStore()) - if store == nil || store.IsRemoved() { - // Previous slow store had been removed, remove the scheduler and check - // slow node next time. - log.Info("slow store has been removed", - zap.Uint64("store-id", store.GetID())) - } else if store.GetSlowScore() <= slowStoreRecoverThreshold && s.conf.readyForRecovery() { - log.Info("slow store has been recovered", - zap.Uint64("store-id", store.GetID())) - } else { - return s.schedulerEvictLeader(cluster), nil - } - s.cleanupEvictLeader(cluster) - return ops, nil - } - - var slowStore *core.StoreInfo - - for _, store := range cluster.GetStores() { - if store.IsRemoved() { - continue - } - - if (store.IsPreparing() || store.IsServing()) && store.IsSlow() { - // Do nothing if there is more than one slow store. - if slowStore != nil { - return ops, nil - } - slowStore = store - } - } - - if slowStore == nil || slowStore.GetSlowScore() < slowStoreEvictThreshold { - return ops, nil - } - - // If there is only one slow store, evict leaders from that store. - log.Info("detected slow store, start to evict leaders", - zap.Uint64("store-id", slowStore.GetID())) - err := s.prepareEvictLeader(cluster, slowStore.GetID()) - if err != nil { - log.Info("prepare for evicting leader failed", zap.Error(err), zap.Uint64("store-id", slowStore.GetID())) - return ops, nil - } - return s.schedulerEvictLeader(cluster), nil -} - -// newEvictSlowStoreScheduler creates a scheduler that detects and evicts slow stores. -func newEvictSlowStoreScheduler(opController *operator.Controller, conf *evictSlowStoreSchedulerConfig) Scheduler { - handler := newEvictSlowStoreHandler(conf) - return &evictSlowStoreScheduler{ - BaseScheduler: NewBaseScheduler(opController), - conf: conf, - handler: handler, - } -} diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go deleted file mode 100644 index 20c53219765..00000000000 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ /dev/null @@ -1,667 +0,0 @@ -// Copyright 2023 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package schedulers - -import ( - "net/http" - "strconv" - "time" - - "github.com/gorilla/mux" - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/log" - "github.com/tikv/pd/pkg/core" - sche "github.com/tikv/pd/pkg/schedule/core" - "github.com/tikv/pd/pkg/schedule/operator" - "github.com/tikv/pd/pkg/schedule/plan" - "github.com/tikv/pd/pkg/storage/endpoint" - "github.com/tikv/pd/pkg/utils/apiutil" - "github.com/tikv/pd/pkg/utils/syncutil" - "github.com/unrolled/render" - "go.uber.org/zap" -) - -const ( - // EvictSlowTrendName is evict leader by slow trend scheduler name. - EvictSlowTrendName = "evict-slow-trend-scheduler" - // EvictSlowTrendType is evict leader by slow trend scheduler type. - EvictSlowTrendType = "evict-slow-trend" -) - -const ( - alterEpsilon = 1e-9 - minReCheckDurationGap = 120 // default gap for re-check the slow node, unit: s - defaultRecoveryDurationGap = 600 // default gap for recovery, unit: s. -) - -type slowCandidate struct { - storeID uint64 - captureTS time.Time - recoverTS time.Time -} - -type evictSlowTrendSchedulerConfig struct { - syncutil.RWMutex - cluster *core.BasicCluster - storage endpoint.ConfigStorage - // Candidate for eviction in current tick. - evictCandidate slowCandidate - // Last chosen candidate for eviction. - lastEvictCandidate slowCandidate - // Duration gap for recovering the candidate, unit: s. - RecoveryDurationGap uint64 `json:"recovery-duration"` - // Only evict one store for now - EvictedStores []uint64 `json:"evict-by-trend-stores"` -} - -func initEvictSlowTrendSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowTrendSchedulerConfig { - return &evictSlowTrendSchedulerConfig{ - storage: storage, - evictCandidate: slowCandidate{}, - lastEvictCandidate: slowCandidate{}, - RecoveryDurationGap: defaultRecoveryDurationGap, - EvictedStores: make([]uint64, 0), - } -} - -func (conf *evictSlowTrendSchedulerConfig) Clone() *evictSlowTrendSchedulerConfig { - conf.RLock() - defer conf.RUnlock() - return &evictSlowTrendSchedulerConfig{ - RecoveryDurationGap: conf.RecoveryDurationGap, - } -} - -func (conf *evictSlowTrendSchedulerConfig) persistLocked() error { - name := EvictSlowTrendName - data, err := EncodeConfig(conf) - failpoint.Inject("persistFail", func() { - err = errors.New("fail to persist") - }) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(name, data) -} - -func (conf *evictSlowTrendSchedulerConfig) getStores() []uint64 { - conf.RLock() - defer conf.RUnlock() - return conf.EvictedStores -} - -func (conf *evictSlowTrendSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange { - if conf.evictedStore() != id { - return nil - } - return []core.KeyRange{core.NewKeyRange("", "")} -} - -func (conf *evictSlowTrendSchedulerConfig) hasEvictedStores() bool { - conf.RLock() - defer conf.RUnlock() - return len(conf.EvictedStores) > 0 -} - -func (conf *evictSlowTrendSchedulerConfig) evictedStore() uint64 { - if !conf.hasEvictedStores() { - return 0 - } - conf.RLock() - defer conf.RUnlock() - // If a candidate passes all checks and proved to be slow, it will be - // recorded in `conf.EvictStores`, and `conf.lastEvictCandidate` will record - // the captured timestamp of this store. - return conf.EvictedStores[0] -} - -func (conf *evictSlowTrendSchedulerConfig) candidate() uint64 { - conf.RLock() - defer conf.RUnlock() - return conf.evictCandidate.storeID -} - -func (conf *evictSlowTrendSchedulerConfig) captureTS() time.Time { - conf.RLock() - defer conf.RUnlock() - return conf.evictCandidate.captureTS -} - -func (conf *evictSlowTrendSchedulerConfig) candidateCapturedSecs() uint64 { - conf.RLock() - defer conf.RUnlock() - return DurationSinceAsSecs(conf.evictCandidate.captureTS) -} - -func (conf *evictSlowTrendSchedulerConfig) lastCapturedCandidate() *slowCandidate { - conf.RLock() - defer conf.RUnlock() - return &conf.lastEvictCandidate -} - -func (conf *evictSlowTrendSchedulerConfig) lastCandidateCapturedSecs() uint64 { - return DurationSinceAsSecs(conf.lastEvictCandidate.captureTS) -} - -// readyForRecovery checks whether the last cpatured candidate is ready for recovery. -func (conf *evictSlowTrendSchedulerConfig) readyForRecovery() bool { - conf.RLock() - defer conf.RUnlock() - recoveryDurationGap := conf.RecoveryDurationGap - failpoint.Inject("transientRecoveryGap", func() { - recoveryDurationGap = 0 - }) - return conf.lastCandidateCapturedSecs() >= recoveryDurationGap -} - -func (conf *evictSlowTrendSchedulerConfig) captureCandidate(id uint64) { - conf.Lock() - defer conf.Unlock() - conf.evictCandidate = slowCandidate{ - storeID: id, - captureTS: time.Now(), - recoverTS: time.Now(), - } - if conf.lastEvictCandidate == (slowCandidate{}) { - conf.lastEvictCandidate = conf.evictCandidate - } -} - -func (conf *evictSlowTrendSchedulerConfig) popCandidate(updLast bool) uint64 { - conf.Lock() - defer conf.Unlock() - id := conf.evictCandidate.storeID - if updLast { - conf.lastEvictCandidate = conf.evictCandidate - } - conf.evictCandidate = slowCandidate{} - return id -} - -func (conf *evictSlowTrendSchedulerConfig) markCandidateRecovered() { - conf.Lock() - defer conf.Unlock() - if conf.lastEvictCandidate != (slowCandidate{}) { - conf.lastEvictCandidate.recoverTS = time.Now() - } -} - -func (conf *evictSlowTrendSchedulerConfig) setStoreAndPersist(id uint64) error { - conf.Lock() - defer conf.Unlock() - conf.EvictedStores = []uint64{id} - return conf.persistLocked() -} - -func (conf *evictSlowTrendSchedulerConfig) clearAndPersist(cluster sche.SchedulerCluster) (oldID uint64, err error) { - oldID = conf.evictedStore() - if oldID == 0 { - return - } - address := "?" - store := cluster.GetStore(oldID) - if store != nil { - address = store.GetAddress() - } - storeSlowTrendEvictedStatusGauge.WithLabelValues(address, strconv.FormatUint(oldID, 10)).Set(0) - conf.Lock() - defer conf.Unlock() - conf.EvictedStores = []uint64{} - return oldID, conf.persistLocked() -} - -type evictSlowTrendHandler struct { - rd *render.Render - config *evictSlowTrendSchedulerConfig -} - -func newEvictSlowTrendHandler(config *evictSlowTrendSchedulerConfig) http.Handler { - h := &evictSlowTrendHandler{ - config: config, - rd: render.New(render.Options{IndentJSON: true}), - } - router := mux.NewRouter() - router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) - router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) - return router -} - -func (handler *evictSlowTrendHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { - var input map[string]interface{} - if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { - return - } - recoveryDurationGapFloat, ok := input["recovery-duration"].(float64) - if !ok { - handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) - return - } - handler.config.Lock() - defer handler.config.Unlock() - prevRecoveryDurationGap := handler.config.RecoveryDurationGap - recoveryDurationGap := uint64(recoveryDurationGapFloat) - handler.config.RecoveryDurationGap = recoveryDurationGap - if err := handler.config.persistLocked(); err != nil { - handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) - handler.config.RecoveryDurationGap = prevRecoveryDurationGap - return - } - log.Info("evict-slow-trend-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap)) - handler.rd.JSON(w, http.StatusOK, "Config updated.") -} - -func (handler *evictSlowTrendHandler) ListConfig(w http.ResponseWriter, r *http.Request) { - conf := handler.config.Clone() - handler.rd.JSON(w, http.StatusOK, conf) -} - -type evictSlowTrendScheduler struct { - *BaseScheduler - conf *evictSlowTrendSchedulerConfig - handler http.Handler -} - -func (s *evictSlowTrendScheduler) GetNextInterval(interval time.Duration) time.Duration { - var growthType intervalGrowthType - // If it already found a slow node as candidate, the next interval should be shorter - // to make the next scheduling as soon as possible. This adjustment will decrease the - // response time, as heartbeats from other nodes will be received and updated more quickly. - if s.conf.hasEvictedStores() { - growthType = zeroGrowth - } else { - growthType = exponentialGrowth - } - return intervalGrow(s.GetMinInterval(), MaxScheduleInterval, growthType) -} - -func (s *evictSlowTrendScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - s.handler.ServeHTTP(w, r) -} - -func (s *evictSlowTrendScheduler) GetName() string { - return EvictSlowTrendName -} - -func (s *evictSlowTrendScheduler) GetType() string { - return EvictSlowTrendType -} - -func (s *evictSlowTrendScheduler) EncodeConfig() ([]byte, error) { - return EncodeConfig(s.conf) -} - -func (s *evictSlowTrendScheduler) ReloadConfig() error { - s.conf.Lock() - defer s.conf.Unlock() - cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } - newCfg := &evictSlowTrendSchedulerConfig{} - if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { - return err - } - old := make(map[uint64]struct{}) - for _, id := range s.conf.EvictedStores { - old[id] = struct{}{} - } - new := make(map[uint64]struct{}) - for _, id := range newCfg.EvictedStores { - new[id] = struct{}{} - } - pauseAndResumeLeaderTransfer(s.conf.cluster, old, new) - s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap - s.conf.EvictedStores = newCfg.EvictedStores - return nil -} - -func (s *evictSlowTrendScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { - evictedStoreID := s.conf.evictedStore() - if evictedStoreID == 0 { - return nil - } - return cluster.SlowTrendEvicted(evictedStoreID) -} - -func (s *evictSlowTrendScheduler) CleanConfig(cluster sche.SchedulerCluster) { - s.cleanupEvictLeader(cluster) -} - -func (s *evictSlowTrendScheduler) prepareEvictLeader(cluster sche.SchedulerCluster, storeID uint64) error { - err := s.conf.setStoreAndPersist(storeID) - if err != nil { - log.Info("evict-slow-trend-scheduler persist config failed", zap.Uint64("store-id", storeID)) - return err - } - return cluster.SlowTrendEvicted(storeID) -} - -func (s *evictSlowTrendScheduler) cleanupEvictLeader(cluster sche.SchedulerCluster) { - evictedStoreID, err := s.conf.clearAndPersist(cluster) - if err != nil { - log.Info("evict-slow-trend-scheduler persist config failed", zap.Uint64("store-id", evictedStoreID)) - } - if evictedStoreID != 0 { - // Assertion: evictStoreID == s.conf.LastEvictCandidate.storeID - s.conf.markCandidateRecovered() - cluster.SlowTrendRecovered(evictedStoreID) - } -} - -func (s *evictSlowTrendScheduler) scheduleEvictLeader(cluster sche.SchedulerCluster) []*operator.Operator { - store := cluster.GetStore(s.conf.evictedStore()) - if store == nil { - return nil - } - storeSlowTrendEvictedStatusGauge.WithLabelValues(store.GetAddress(), strconv.FormatUint(store.GetID(), 10)).Set(1) - return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) -} - -func (s *evictSlowTrendScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { - if s.conf.evictedStore() == 0 { - return true - } - allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() - if !allowed { - operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpLeader.String()).Inc() - } - return allowed -} - -func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { - schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc() - - var ops []*operator.Operator - - if s.conf.evictedStore() != 0 { - store := cluster.GetStore(s.conf.evictedStore()) - if store == nil || store.IsRemoved() { - // Previous slow store had been removed, remove the scheduler and check - // slow node next time. - log.Info("store evicted by slow trend has been removed", zap.Uint64("store-id", store.GetID())) - storeSlowTrendActionStatusGauge.WithLabelValues("evict", "stop_removed").Inc() - } else if checkStoreCanRecover(cluster, store) && s.conf.readyForRecovery() { - log.Info("store evicted by slow trend has been recovered", zap.Uint64("store-id", store.GetID())) - storeSlowTrendActionStatusGauge.WithLabelValues("evict", "stop_recovered").Inc() - } else { - storeSlowTrendActionStatusGauge.WithLabelValues("evict", "continue").Inc() - return s.scheduleEvictLeader(cluster), nil - } - s.cleanupEvictLeader(cluster) - return ops, nil - } - - candFreshCaptured := false - if s.conf.candidate() == 0 { - candidate := chooseEvictCandidate(cluster, s.conf.lastCapturedCandidate()) - if candidate != nil { - storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "captured").Inc() - s.conf.captureCandidate(candidate.GetID()) - candFreshCaptured = true - } - } else { - storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "continue").Inc() - } - - slowStoreID := s.conf.candidate() - if slowStoreID == 0 { - storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none").Inc() - return ops, nil - } - - slowStore := cluster.GetStore(slowStoreID) - if !candFreshCaptured && checkStoreFasterThanOthers(cluster, slowStore) { - s.conf.popCandidate(false) - log.Info("slow store candidate by trend has been cancel", zap.Uint64("store-id", slowStoreID)) - storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "canceled_too_faster").Inc() - return ops, nil - } - if slowStoreRecordTS := s.conf.captureTS(); !checkStoresAreUpdated(cluster, slowStoreID, slowStoreRecordTS) { - log.Info("slow store candidate waiting for other stores to update heartbeats", zap.Uint64("store-id", slowStoreID)) - storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "wait").Inc() - return ops, nil - } - - candCapturedSecs := s.conf.candidateCapturedSecs() - log.Info("detected slow store by trend, start to evict leaders", - zap.Uint64("store-id", slowStoreID), - zap.Uint64("candidate-captured-secs", candCapturedSecs)) - storeSlowTrendMiscGauge.WithLabelValues("candidate", "captured_secs").Set(float64(candCapturedSecs)) - if err := s.prepareEvictLeader(cluster, s.conf.popCandidate(true)); err != nil { - log.Info("prepare for evicting leader by slow trend failed", zap.Error(err), zap.Uint64("store-id", slowStoreID)) - storeSlowTrendActionStatusGauge.WithLabelValues("evict", "prepare_err").Inc() - return ops, nil - } - storeSlowTrendActionStatusGauge.WithLabelValues("evict", "start").Inc() - return s.scheduleEvictLeader(cluster), nil -} - -func newEvictSlowTrendScheduler(opController *operator.Controller, conf *evictSlowTrendSchedulerConfig) Scheduler { - handler := newEvictSlowTrendHandler(conf) - return &evictSlowTrendScheduler{ - BaseScheduler: NewBaseScheduler(opController), - conf: conf, - handler: handler, - } -} - -func chooseEvictCandidate(cluster sche.SchedulerCluster, lastEvictCandidate *slowCandidate) (slowStore *core.StoreInfo) { - isRaftKV2 := cluster.GetStoreConfig().IsRaftKV2() - failpoint.Inject("mockRaftKV2", func() { - isRaftKV2 = true - }) - stores := cluster.GetStores() - if len(stores) < 3 { - storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_too_few").Inc() - return - } - - var candidates []*core.StoreInfo - var affectedStoreCount int - for _, store := range stores { - if store.IsRemoved() { - continue - } - if !(store.IsPreparing() || store.IsServing()) { - continue - } - if slowTrend := store.GetSlowTrend(); slowTrend != nil { - if slowTrend.ResultRate < -alterEpsilon { - affectedStoreCount += 1 - } - // For the cases of disk io jitters. - // Normally, if there exists jitters on disk io or network io, the slow store must have a descending - // trend on QPS and ascending trend on duration. So, the slowTrend must match the following pattern. - if slowTrend.CauseRate > alterEpsilon && slowTrend.ResultRate < -alterEpsilon { - candidates = append(candidates, store) - storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "add").Inc() - log.Info("evict-slow-trend-scheduler pre-captured candidate", - zap.Uint64("store-id", store.GetID()), - zap.Float64("cause-rate", slowTrend.CauseRate), - zap.Float64("result-rate", slowTrend.ResultRate), - zap.Float64("cause-value", slowTrend.CauseValue), - zap.Float64("result-value", slowTrend.ResultValue)) - } else if isRaftKV2 && slowTrend.CauseRate > alterEpsilon { - // Meanwhile, if the store was previously experiencing slowness in the `Duration` dimension, it should - // re-check whether this node is still encountering network I/O-related jitters. And If this node matches - // the last identified candidate, it indicates that the node is still being affected by delays in network I/O, - // and consequently, it should be re-designated as slow once more. - // Prerequisite: `raft-kv2` engine has the ability to percept the slow trend on network io jitters. - // TODO: maybe make it compatible to `raft-kv` later. - if lastEvictCandidate != nil && lastEvictCandidate.storeID == store.GetID() && DurationSinceAsSecs(lastEvictCandidate.recoverTS) <= minReCheckDurationGap { - candidates = append(candidates, store) - storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "add").Inc() - log.Info("evict-slow-trend-scheduler pre-captured candidate in raft-kv2 cluster", - zap.Uint64("store-id", store.GetID()), - zap.Float64("cause-rate", slowTrend.CauseRate), - zap.Float64("result-rate", slowTrend.ResultRate), - zap.Float64("cause-value", slowTrend.CauseValue), - zap.Float64("result-value", slowTrend.ResultValue)) - } - } - } - } - if len(candidates) == 0 { - storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_no_fit").Inc() - return - } - // TODO: Calculate to judge if one store is way slower than the others - if len(candidates) > 1 { - storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_too_many").Inc() - return - } - - store := candidates[0] - - affectedStoreThreshold := int(float64(len(stores)) * cluster.GetSchedulerConfig().GetSlowStoreEvictingAffectedStoreRatioThreshold()) - if affectedStoreCount < affectedStoreThreshold { - log.Info("evict-slow-trend-scheduler failed to confirm candidate: it only affect a few stores", zap.Uint64("store-id", store.GetID())) - storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_affect_a_few").Inc() - return - } - - if !checkStoreSlowerThanOthers(cluster, store) { - log.Info("evict-slow-trend-scheduler failed to confirm candidate: it's not slower than others", zap.Uint64("store-id", store.GetID())) - storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_not_slower").Inc() - return - } - - storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "add").Inc() - log.Info("evict-slow-trend-scheduler captured candidate", zap.Uint64("store-id", store.GetID())) - return store -} - -func checkStoresAreUpdated(cluster sche.SchedulerCluster, slowStoreID uint64, slowStoreRecordTS time.Time) bool { - stores := cluster.GetStores() - if len(stores) <= 1 { - return false - } - expected := (len(stores) + 1) / 2 - updatedStores := 0 - for _, store := range stores { - if store.IsRemoved() { - updatedStores += 1 - continue - } - if !(store.IsPreparing() || store.IsServing()) { - updatedStores += 1 - continue - } - if store.GetID() == slowStoreID { - updatedStores += 1 - continue - } - if slowStoreRecordTS.Compare(store.GetLastHeartbeatTS()) <= 0 { - updatedStores += 1 - } - } - storeSlowTrendMiscGauge.WithLabelValues("store", "check_updated_count").Set(float64(updatedStores)) - storeSlowTrendMiscGauge.WithLabelValues("store", "check_updated_expected").Set(float64(expected)) - return updatedStores >= expected -} - -func checkStoreSlowerThanOthers(cluster sche.SchedulerCluster, target *core.StoreInfo) bool { - stores := cluster.GetStores() - expected := (len(stores)*2 + 1) / 3 - targetSlowTrend := target.GetSlowTrend() - if targetSlowTrend == nil { - storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "check_slower_no_data").Inc() - return false - } - slowerThanStoresNum := 0 - for _, store := range stores { - if store.IsRemoved() { - continue - } - if !(store.IsPreparing() || store.IsServing()) { - continue - } - if store.GetID() == target.GetID() { - continue - } - slowTrend := store.GetSlowTrend() - // Use `SlowTrend.ResultValue` at first, but not good, `CauseValue` is better - // Greater `CuaseValue` means slower - if slowTrend != nil && (targetSlowTrend.CauseValue-slowTrend.CauseValue) > alterEpsilon && slowTrend.CauseValue > alterEpsilon { - slowerThanStoresNum += 1 - } - } - storeSlowTrendMiscGauge.WithLabelValues("store", "check_slower_count").Set(float64(slowerThanStoresNum)) - storeSlowTrendMiscGauge.WithLabelValues("store", "check_slower_expected").Set(float64(expected)) - return slowerThanStoresNum >= expected -} - -func checkStoreCanRecover(cluster sche.SchedulerCluster, target *core.StoreInfo) bool { - /* - // - // This might not be necessary, - // and it also have tiny chances to cause `stuck in evicted` - // status when this store restarted, - // the `become fast` might be ignore on tikv side - // because of the detecting windows are not fully filled yet. - // Hence, we disabled this event capturing by now but keep the code here for further checking. - // - - // Wait for the evicted store's `become fast` event - slowTrend := target.GetSlowTrend() - if slowTrend == nil || slowTrend.CauseRate >= 0 && slowTrend.ResultRate <= 0 { - storeSlowTrendActionStatusGauge.WithLabelValues("recover.reject:no-event").Inc() - return false - } else { - storeSlowTrendActionStatusGauge.WithLabelValues("recover.judging:got-event").Inc() - } - */ - return checkStoreFasterThanOthers(cluster, target) -} - -func checkStoreFasterThanOthers(cluster sche.SchedulerCluster, target *core.StoreInfo) bool { - stores := cluster.GetStores() - expected := (len(stores) + 1) / 2 - targetSlowTrend := target.GetSlowTrend() - if targetSlowTrend == nil { - storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "check_faster_no_data").Inc() - return false - } - fasterThanStores := 0 - for _, store := range stores { - if store.IsRemoved() { - continue - } - if !(store.IsPreparing() || store.IsServing()) { - continue - } - if store.GetID() == target.GetID() { - continue - } - slowTrend := store.GetSlowTrend() - // Greater `CauseValue` means slower - if slowTrend != nil && targetSlowTrend.CauseValue <= slowTrend.CauseValue*1.1 && - slowTrend.CauseValue > alterEpsilon && targetSlowTrend.CauseValue > alterEpsilon { - fasterThanStores += 1 - } - } - storeSlowTrendMiscGauge.WithLabelValues("store", "check_faster_count").Set(float64(fasterThanStores)) - storeSlowTrendMiscGauge.WithLabelValues("store", "check_faster_expected").Set(float64(expected)) - return fasterThanStores >= expected -} - -// DurationSinceAsSecs returns the duration gap since the given startTS, unit: s. -func DurationSinceAsSecs(startTS time.Time) uint64 { - return uint64(time.Since(startTS).Seconds()) -} diff --git a/server/api/router.go b/server/api/router.go index 9eae8ff3d77..cf604c47738 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -362,14 +362,9 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { unsafeOperationHandler.GetFailedStoresRemovalStatus, setMethods(http.MethodGet), setAuditBackend(prometheus)) // API to set or unset failpoints -<<<<<<< HEAD failpoint.Inject("enableFailpointAPI", func() { // this function will be named to "func2". It may be used in test - registerPrefix(apiRouter, "/fail", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { -======= - if enableFailPointAPI { registerPrefix(apiRouter, "/fail", "FailPoint", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ->>>>>>> 8b8c78a78 (scheduler: add aduit log for scheduler config API and add resp msg for evict-leader (#7674)) // The HTTP handler of failpoint requires the full path to be the failpoint path. r.URL.Path = strings.TrimPrefix(r.URL.Path, prefix+apiPrefix+"/fail") new(failpoint.HttpHandler).ServeHTTP(w, r) diff --git a/server/api/server_test.go b/server/api/server_test.go index 6471e344414..759bfce4237 100644 --- a/server/api/server_test.go +++ b/server/api/server_test.go @@ -172,18 +172,12 @@ func (suite *serviceTestSuite) TestServiceLabels() { suite.Equal("Profile", serviceLabel) accessPaths = suite.svr.GetServiceLabels("GetSchedulerConfig") -<<<<<<< HEAD suite.Len(accessPaths, 1) suite.Equal("/pd/api/v1/scheduler-config", accessPaths[0].Path) - suite.Equal("", accessPaths[0].Method) -======= - re.Len(accessPaths, 1) - re.Equal("/pd/api/v1/scheduler-config", accessPaths[0].Path) - re.Equal("GET", accessPaths[0].Method) + suite.Equal("GET", accessPaths[0].Method) accessPaths = suite.svr.GetServiceLabels("HandleSchedulerConfig") - re.Len(accessPaths, 4) - re.Equal("/pd/api/v1/scheduler-config", accessPaths[0].Path) ->>>>>>> 8b8c78a78 (scheduler: add aduit log for scheduler config API and add resp msg for evict-leader (#7674)) + suite.Len(accessPaths, 4) + suite.Equal("/pd/api/v1/scheduler-config", accessPaths[0].Path) accessPaths = suite.svr.GetServiceLabels("ResignLeader") suite.Len(accessPaths, 1) diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go deleted file mode 100644 index ab96d430523..00000000000 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ /dev/null @@ -1,716 +0,0 @@ -// Copyright 2019 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package scheduler_test - -import ( - "encoding/json" - "fmt" - "reflect" - "strings" - "testing" - "time" - - "github.com/pingcap/failpoint" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/spf13/cobra" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "github.com/tikv/pd/pkg/core" - sc "github.com/tikv/pd/pkg/schedule/config" - "github.com/tikv/pd/pkg/slice" - "github.com/tikv/pd/pkg/utils/testutil" - "github.com/tikv/pd/pkg/versioninfo" - pdTests "github.com/tikv/pd/tests" - ctl "github.com/tikv/pd/tools/pd-ctl/pdctl" - "github.com/tikv/pd/tools/pd-ctl/tests" -) - -type schedulerTestSuite struct { - suite.Suite - env *pdTests.SchedulingTestEnvironment - defaultSchedulers []string -} - -func TestSchedulerTestSuite(t *testing.T) { - suite.Run(t, new(schedulerTestSuite)) -} - -func (suite *schedulerTestSuite) SetupSuite() { - re := suite.Require() - re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/skipStoreConfigSync", `return(true)`)) - suite.env = pdTests.NewSchedulingTestEnvironment(suite.T()) - suite.defaultSchedulers = []string{ - "balance-leader-scheduler", - "balance-region-scheduler", - "balance-hot-region-scheduler", - "balance-witness-scheduler", - "transfer-witness-leader-scheduler", - "evict-slow-store-scheduler", - } -} - -func (suite *schedulerTestSuite) TearDownSuite() { - re := suite.Require() - suite.env.Cleanup() - re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/skipStoreConfigSync")) -} - -func (suite *schedulerTestSuite) TearDownTest() { - cleanFunc := func(cluster *pdTests.TestCluster) { - re := suite.Require() - pdAddr := cluster.GetConfig().GetClientURL() - cmd := ctl.GetRootCmd() - - var currentSchedulers []string - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, ¤tSchedulers) - for _, scheduler := range suite.defaultSchedulers { - if slice.NoneOf(currentSchedulers, func(i int) bool { - return currentSchedulers[i] == scheduler - }) { - echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", scheduler}, nil) - re.Contains(echo, "Success!") - } - } - for _, scheduler := range currentSchedulers { - if slice.NoneOf(suite.defaultSchedulers, func(i int) bool { - return suite.defaultSchedulers[i] == scheduler - }) { - echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", scheduler}, nil) - re.Contains(echo, "Success!") - } - } - } - suite.env.RunFuncInTwoModes(cleanFunc) -} - -func (suite *schedulerTestSuite) TestScheduler() { - suite.env.RunTestInTwoModes(suite.checkScheduler) -} - -func (suite *schedulerTestSuite) checkScheduler(cluster *pdTests.TestCluster) { - re := suite.Require() - pdAddr := cluster.GetConfig().GetClientURL() - cmd := ctl.GetRootCmd() - - stores := []*metapb.Store{ - { - Id: 1, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 2, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 3, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 4, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - } - - mustUsage := func(args []string) { - output, err := tests.ExecuteCommand(cmd, args...) - re.NoError(err) - re.Contains(string(output), "Usage") - } - - checkSchedulerCommand := func(args []string, expected map[string]bool) { - if args != nil { - echo := mustExec(re, cmd, args, nil) - re.Contains(echo, "Success!") - } - testutil.Eventually(re, func() bool { - var schedulers []string - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, &schedulers) - if len(schedulers) != len(expected) { - return false - } - for _, scheduler := range schedulers { - if _, ok := expected[scheduler]; !ok { - return false - } - } - return true - }) - } - - checkSchedulerConfigCommand := func(expectedConfig map[string]interface{}, schedulerName string) { - testutil.Eventually(re, func() bool { - configInfo := make(map[string]interface{}) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName}, &configInfo) - return reflect.DeepEqual(expectedConfig, configInfo) - }) - } - - leaderServer := cluster.GetLeaderServer() - for _, store := range stores { - pdTests.MustPutStore(re, cluster, store) - } - - // note: because pdqsort is a unstable sort algorithm, set ApproximateSize for this region. - pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) - - // scheduler show command - expected := map[string]bool{ - "balance-region-scheduler": true, - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "transfer-witness-leader-scheduler": true, - "balance-witness-scheduler": true, - "evict-slow-store-scheduler": true, - } - checkSchedulerCommand(nil, expected) - - // scheduler delete command - args := []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"} - expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "transfer-witness-leader-scheduler": true, - "balance-witness-scheduler": true, - "evict-slow-store-scheduler": true, - } - checkSchedulerCommand(args, expected) - - // avoid the influence of the scheduler order - schedulers := []string{"evict-leader-scheduler", "grant-leader-scheduler", "evict-leader-scheduler", "grant-leader-scheduler"} - - checkStorePause := func(changedStores []uint64, schedulerName string) { - status := func() string { - switch schedulerName { - case "evict-leader-scheduler": - return "paused" - case "grant-leader-scheduler": - return "resumed" - default: - re.Fail(fmt.Sprintf("unknown scheduler %s", schedulerName)) - return "" - } - }() - for _, store := range stores { - isStorePaused := !cluster.GetLeaderServer().GetRaftCluster().GetStore(store.GetId()).AllowLeaderTransfer() - if slice.AnyOf(changedStores, func(i int) bool { - return store.GetId() == changedStores[i] - }) { - re.True(isStorePaused, - fmt.Sprintf("store %d should be %s with %s", store.GetId(), status, schedulerName)) - } else { - re.False(isStorePaused, - fmt.Sprintf("store %d should not be %s with %s", store.GetId(), status, schedulerName)) - } - if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { - re.Equal(isStorePaused, !sche.GetCluster().GetStore(store.GetId()).AllowLeaderTransfer()) - } - } - } - - for idx := range schedulers { - checkStorePause([]uint64{}, schedulers[idx]) - // scheduler add command - args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "2"} - expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - schedulers[idx]: true, - "transfer-witness-leader-scheduler": true, - "balance-witness-scheduler": true, - "evict-slow-store-scheduler": true, - } - checkSchedulerCommand(args, expected) - - // scheduler config show command - expectedConfig := make(map[string]interface{}) - expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} - checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) - checkStorePause([]uint64{2}, schedulers[idx]) - - // scheduler config update command - args = []string{"-u", pdAddr, "scheduler", "config", schedulers[idx], "add-store", "3"} - expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - schedulers[idx]: true, - "transfer-witness-leader-scheduler": true, - "balance-witness-scheduler": true, - "evict-slow-store-scheduler": true, - } - - // check update success - checkSchedulerCommand(args, expected) - expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}, "3": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} - checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) - checkStorePause([]uint64{2, 3}, schedulers[idx]) - - // scheduler delete command - args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx]} - expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "transfer-witness-leader-scheduler": true, - "balance-witness-scheduler": true, - "evict-slow-store-scheduler": true, - } - checkSchedulerCommand(args, expected) - checkStorePause([]uint64{}, schedulers[idx]) - - // scheduler add command - args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "2"} - expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - schedulers[idx]: true, - "transfer-witness-leader-scheduler": true, - "balance-witness-scheduler": true, - "evict-slow-store-scheduler": true, - } - checkSchedulerCommand(args, expected) - checkStorePause([]uint64{2}, schedulers[idx]) - - // scheduler add command twice - args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "4"} - expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - schedulers[idx]: true, - "transfer-witness-leader-scheduler": true, - "balance-witness-scheduler": true, - "evict-slow-store-scheduler": true, - } - checkSchedulerCommand(args, expected) - - // check add success - expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}, "4": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} - checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) - checkStorePause([]uint64{2, 4}, schedulers[idx]) - - // scheduler remove command [old] - args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx] + "-4"} - expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - schedulers[idx]: true, - "transfer-witness-leader-scheduler": true, - "balance-witness-scheduler": true, - "evict-slow-store-scheduler": true, - } - checkSchedulerCommand(args, expected) - - // check remove success - expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} - checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) - checkStorePause([]uint64{2}, schedulers[idx]) - - // scheduler remove command, when remove the last store, it should remove whole scheduler - args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx] + "-2"} - expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "transfer-witness-leader-scheduler": true, - "balance-witness-scheduler": true, - "evict-slow-store-scheduler": true, - } - checkSchedulerCommand(args, expected) - checkStorePause([]uint64{}, schedulers[idx]) - } - - // test shuffle region config - checkSchedulerCommand([]string{"-u", pdAddr, "scheduler", "add", "shuffle-region-scheduler"}, map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "shuffle-region-scheduler": true, - "transfer-witness-leader-scheduler": true, - "balance-witness-scheduler": true, - "evict-slow-store-scheduler": true, - }) - var roles []string - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) - re.Equal([]string{"leader", "follower", "learner"}, roles) - echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "set-roles", "learner"}, nil) - re.Contains(echo, "Success!") - testutil.Eventually(re, func() bool { - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) - return reflect.DeepEqual([]string{"learner"}, roles) - }) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler"}, &roles) - re.Equal([]string{"learner"}, roles) - - // test grant hot region scheduler config - checkSchedulerCommand([]string{"-u", pdAddr, "scheduler", "add", "grant-hot-region-scheduler", "1", "1,2,3"}, map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "shuffle-region-scheduler": true, - "grant-hot-region-scheduler": true, - "transfer-witness-leader-scheduler": true, - "balance-witness-scheduler": true, - "evict-slow-store-scheduler": true, - }) - var conf3 map[string]interface{} - expected3 := map[string]interface{}{ - "store-id": []interface{}{float64(1), float64(2), float64(3)}, - "store-leader-id": float64(1), - } - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) - re.Equal(expected3, conf3) - - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler", "set", "2", "1,2,3"}, nil) - re.Contains(echo, "Success!") - expected3["store-leader-id"] = float64(2) - testutil.Eventually(re, func() bool { - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) - return reflect.DeepEqual(expected3, conf3) - }) - - // test remove and add scheduler - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) - re.NotContains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}, nil) - re.Contains(echo, "Success! The scheduler is created.") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "2"}, nil) - re.Contains(echo, "Success! The scheduler has been applied to the store.") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-2"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) - re.Contains(echo, "404") - testutil.Eventually(re, func() bool { // wait for removed scheduler to be synced to scheduling server. - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler"}, nil) - return strings.Contains(echo, "[404] scheduler not found") - }) - - // test hot region config - expected1 := map[string]interface{}{ - "min-hot-byte-rate": float64(100), - "min-hot-key-rate": float64(10), - "min-hot-query-rate": float64(10), - "max-zombie-rounds": float64(3), - "max-peer-number": float64(1000), - "byte-rate-rank-step-ratio": 0.05, - "key-rate-rank-step-ratio": 0.05, - "query-rate-rank-step-ratio": 0.05, - "count-rank-step-ratio": 0.01, - "great-dec-ratio": 0.95, - "minor-dec-ratio": 0.99, - "src-tolerance-ratio": 1.05, - "dst-tolerance-ratio": 1.05, - "read-priorities": []interface{}{"byte", "key"}, - "write-leader-priorities": []interface{}{"key", "byte"}, - "write-peer-priorities": []interface{}{"byte", "key"}, - "strict-picking-store": "true", - "enable-for-tiflash": "true", - "rank-formula-version": "v2", - "split-thresholds": 0.2, - } - checkHotSchedulerConfig := func(expect map[string]interface{}) { - testutil.Eventually(re, func() bool { - var conf1 map[string]interface{} - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - return reflect.DeepEqual(expect, conf1) - }) - } - - var conf map[string]interface{} - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "list"}, &conf) - re.Equal(expected1, conf) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "show"}, &conf) - re.Equal(expected1, conf) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "src-tolerance-ratio", "1.02"}, nil) - re.Contains(echo, "Success!") - expected1["src-tolerance-ratio"] = 1.02 - checkHotSchedulerConfig(expected1) - - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,key"}, nil) - re.Contains(echo, "Success!") - expected1["read-priorities"] = []interface{}{"byte", "key"} - checkHotSchedulerConfig(expected1) - - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key"}, nil) - re.Contains(echo, "Failed!") - checkHotSchedulerConfig(expected1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,byte"}, nil) - re.Contains(echo, "Success!") - expected1["read-priorities"] = []interface{}{"key", "byte"} - checkHotSchedulerConfig(expected1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "foo,bar"}, nil) - re.Contains(echo, "Failed!") - checkHotSchedulerConfig(expected1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", ""}, nil) - re.Contains(echo, "Failed!") - checkHotSchedulerConfig(expected1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key"}, nil) - re.Contains(echo, "Failed!") - checkHotSchedulerConfig(expected1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,byte"}, nil) - re.Contains(echo, "Failed!") - checkHotSchedulerConfig(expected1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key,byte"}, nil) - re.Contains(echo, "Failed!") - checkHotSchedulerConfig(expected1) - - // write-priorities is divided into write-leader-priorities and write-peer-priorities - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) - re.Contains(echo, "Failed!") - re.Contains(echo, "Config item is not found.") - checkHotSchedulerConfig(expected1) - - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v0"}, nil) - re.Contains(echo, "Failed!") - checkHotSchedulerConfig(expected1) - expected1["rank-formula-version"] = "v2" - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v2"}, nil) - re.Contains(echo, "Success!") - checkHotSchedulerConfig(expected1) - expected1["rank-formula-version"] = "v1" - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v1"}, nil) - re.Contains(echo, "Success!") - checkHotSchedulerConfig(expected1) - - expected1["forbid-rw-type"] = "read" - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "forbid-rw-type", "read"}, nil) - re.Contains(echo, "Success!") - checkHotSchedulerConfig(expected1) - - // test compatibility - re.Equal("2.0.0", leaderServer.GetClusterVersion().String()) - for _, store := range stores { - version := versioninfo.HotScheduleWithQuery - store.Version = versioninfo.MinSupportedVersion(version).String() - store.LastHeartbeat = time.Now().UnixNano() - pdTests.MustPutStore(re, cluster, store) - } - re.Equal("5.2.0", leaderServer.GetClusterVersion().String()) - // After upgrading, we should not use query. - checkHotSchedulerConfig(expected1) - // cannot set qps as write-peer-priorities - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil) - re.Contains(echo, "query is not allowed to be set in priorities for write-peer-priorities") - checkHotSchedulerConfig(expected1) - - // test remove and add - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil) - re.Contains(echo, "Success") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-hot-region-scheduler"}, nil) - re.Contains(echo, "Success") - - // test balance leader config - conf = make(map[string]interface{}) - conf1 := make(map[string]interface{}) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "show"}, &conf) - re.Equal(4., conf["batch"]) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "set", "batch", "3"}, nil) - re.Contains(echo, "Success!") - testutil.Eventually(re, func() bool { - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler"}, &conf1) - return conf1["batch"] == 3. - }) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) - re.NotContains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-leader-scheduler"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-leader-scheduler"}, nil) - re.Contains(echo, "404") - re.Contains(echo, "PD:scheduler:ErrSchedulerNotFound]scheduler not found") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler"}, nil) - re.Contains(echo, "404") - re.Contains(echo, "scheduler not found") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) - re.Contains(echo, "Success!") - - // test evict-slow-store && evict-slow-trend schedulers config - evictSlownessSchedulers := []string{"evict-slow-store-scheduler", "evict-slow-trend-scheduler"} - for _, schedulerName := range evictSlownessSchedulers { - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", schedulerName}, nil) - if strings.Contains(echo, "Success!") { - re.Contains(echo, "Success!") - } else { - re.Contains(echo, "scheduler existed") - } - testutil.Eventually(re, func() bool { - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - return strings.Contains(echo, schedulerName) - }) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "set", "recovery-duration", "100"}, nil) - re.Contains(echo, "Success! Config updated.") - conf = make(map[string]interface{}) - testutil.Eventually(re, func() bool { - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "show"}, &conf) - return conf["recovery-duration"] == 100. - }) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", schedulerName}, nil) - re.Contains(echo, "Success!") - testutil.Eventually(re, func() bool { - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - return !strings.Contains(echo, schedulerName) - }) - } - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-slow-store-scheduler"}, nil) - re.Contains(echo, "Success!") - - // test shuffle hot region scheduler - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "shuffle-hot-region-scheduler"}, nil) - re.Contains(echo, "Success!") - testutil.Eventually(re, func() bool { - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - return strings.Contains(echo, "shuffle-hot-region-scheduler") - }) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-hot-region-scheduler", "set", "limit", "127"}, nil) - re.Contains(echo, "Success!") - conf = make(map[string]interface{}) - testutil.Eventually(re, func() bool { - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-hot-region-scheduler", "show"}, &conf) - return conf["limit"] == 127. - }) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "shuffle-hot-region-scheduler"}, nil) - re.Contains(echo, "Success!") - testutil.Eventually(re, func() bool { - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - return !strings.Contains(echo, "shuffle-hot-region-scheduler") - }) - - // test show scheduler with paused and disabled status. - checkSchedulerWithStatusCommand := func(status string, expected []string) { - testutil.Eventually(re, func() bool { - var schedulers []string - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show", "--status", status}, &schedulers) - return reflect.DeepEqual(expected, schedulers) - }) - } - - mustUsage([]string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler"}) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) - re.Contains(echo, "Success!") - checkSchedulerWithStatusCommand("paused", []string{ - "balance-leader-scheduler", - }) - result := make(map[string]interface{}) - testutil.Eventually(re, func() bool { - mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", "balance-leader-scheduler"}, &result) - return len(result) != 0 && result["status"] == "paused" && result["summary"] == "" - }, testutil.WithWaitFor(30*time.Second)) - - mustUsage([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler", "60"}) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) - re.Contains(echo, "Success!") - checkSchedulerWithStatusCommand("paused", []string{}) - - // set label scheduler to disabled manually. - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "label-scheduler"}, nil) - re.Contains(echo, "Success!") - cfg := leaderServer.GetServer().GetScheduleConfig() - origin := cfg.Schedulers - cfg.Schedulers = sc.SchedulerConfigs{{Type: "label", Disable: true}} - err := leaderServer.GetServer().SetScheduleConfig(*cfg) - re.NoError(err) - checkSchedulerWithStatusCommand("disabled", []string{"label-scheduler"}) - // reset Schedulers in ScheduleConfig - cfg.Schedulers = origin - err = leaderServer.GetServer().SetScheduleConfig(*cfg) - re.NoError(err) - checkSchedulerWithStatusCommand("disabled", []string{}) -} - -func (suite *schedulerTestSuite) TestSchedulerDiagnostic() { - suite.env.RunTestInTwoModes(suite.checkSchedulerDiagnostic) -} - -func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *pdTests.TestCluster) { - re := suite.Require() - pdAddr := cluster.GetConfig().GetClientURL() - cmd := ctl.GetRootCmd() - - checkSchedulerDescribeCommand := func(schedulerName, expectedStatus, expectedSummary string) { - result := make(map[string]interface{}) - testutil.Eventually(re, func() bool { - mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", schedulerName}, &result) - return len(result) != 0 && expectedStatus == result["status"] && expectedSummary == result["summary"] - }, testutil.WithTickInterval(50*time.Millisecond)) - } - - stores := []*metapb.Store{ - { - Id: 1, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 2, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 3, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 4, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - } - for _, store := range stores { - pdTests.MustPutStore(re, cluster, store) - } - - // note: because pdqsort is an unstable sort algorithm, set ApproximateSize for this region. - pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) - - echo := mustExec(re, cmd, []string{"-u", pdAddr, "config", "set", "enable-diagnostic", "true"}, nil) - re.Contains(echo, "Success!") - checkSchedulerDescribeCommand("balance-region-scheduler", "pending", "1 store(s) RegionNotMatchRule; ") - - // scheduler delete command - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) - re.Contains(echo, "Success!") - checkSchedulerDescribeCommand("balance-region-scheduler", "disabled", "") - - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) - re.Contains(echo, "Success!") - checkSchedulerDescribeCommand("balance-leader-scheduler", "normal", "") -} - -func mustExec(re *require.Assertions, cmd *cobra.Command, args []string, v interface{}) string { - output, err := tests.ExecuteCommand(cmd, args...) - re.NoError(err) - if v == nil { - return string(output) - } - re.NoError(json.Unmarshal(output, v), string(output)) - return "" -} - -func mightExec(re *require.Assertions, cmd *cobra.Command, args []string, v interface{}) { - output, err := tests.ExecuteCommand(cmd, args...) - re.NoError(err) - if v == nil { - return - } - json.Unmarshal(output, v) -}