diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index d5cace80541e..15568c874e72 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -145,7 +145,7 @@ func (c *Coordinator) PatrolRegions() { log.Info("patrol regions has been stopped") return } - if allowed, _ := c.cluster.CheckSchedulingAllowance(); !allowed { + if !c.isSchedulingHalted() { continue } @@ -172,6 +172,10 @@ func (c *Coordinator) PatrolRegions() { } } +func (c *Coordinator) isSchedulingHalted() bool { + return c.cluster.GetPersistOptions().IsSchedulingHalted() +} + func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) { regions = c.cluster.ScanRegions(startKey, nil, patrolScanRegionLimit) if len(regions) == 0 { @@ -561,7 +565,7 @@ func (c *Coordinator) CollectSchedulerMetrics() { var allowScheduler float64 // If the scheduler is not allowed to schedule, it will disappear in Grafana panel. // See issue #1341. - if allowed, _ := s.cluster.CheckSchedulingAllowance(); !s.IsPaused() && allowed { + if !s.IsPaused() && !c.isSchedulingHalted() { allowScheduler = 1 } schedulerStatusGauge.WithLabelValues(s.Scheduler.GetName(), "allow").Set(allowScheduler) @@ -1036,8 +1040,7 @@ func (s *scheduleController) AllowSchedule(diagnosable bool) bool { } return false } - allowed, _ := s.cluster.CheckSchedulingAllowance() - if !allowed { + if s.isSchedulingHalted() { if diagnosable { s.diagnosticRecorder.setResultFromStatus(halted) } @@ -1052,6 +1055,10 @@ func (s *scheduleController) AllowSchedule(diagnosable bool) bool { return true } +func (s *scheduleController) isSchedulingHalted() bool { + return s.cluster.GetPersistOptions().IsSchedulingHalted() +} + // isPaused returns if a scheduler is paused. func (s *scheduleController) IsPaused() bool { delayUntil := atomic.LoadInt64(&s.delayUntil) diff --git a/pkg/schedule/core/cluster_informer.go b/pkg/schedule/core/cluster_informer.go index bcf440a587d8..72e124c501e1 100644 --- a/pkg/schedule/core/cluster_informer.go +++ b/pkg/schedule/core/cluster_informer.go @@ -39,7 +39,6 @@ type ClusterInformer interface { GetRegionLabeler() *labeler.RegionLabeler GetStorage() storage.Storage UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) - CheckSchedulingAllowance() (bool, error) AddSuspectRegions(ids ...uint64) GetPersistOptions() *config.PersistOptions } diff --git a/pkg/unsaferecovery/unsafe_recovery_controller.go b/pkg/unsaferecovery/unsafe_recovery_controller.go index 1eb960059d66..0c7d087aa72a 100644 --- a/pkg/unsaferecovery/unsafe_recovery_controller.go +++ b/pkg/unsaferecovery/unsafe_recovery_controller.go @@ -34,6 +34,7 @@ import ( "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" + "github.com/tikv/pd/server/config" "go.uber.org/zap" ) @@ -109,6 +110,7 @@ type cluster interface { DropCacheAllRegion() GetAllocator() id.Allocator BuryStore(storeID uint64, forceBury bool) error + GetPersistOptions() *config.PersistOptions } // Controller is used to control the unsafe recovery process. @@ -174,11 +176,11 @@ func (u *Controller) reset() { func (u *Controller) IsRunning() bool { u.RLock() defer u.RUnlock() - return u.isRunningLocked() + return isRunning(u.stage) } -func (u *Controller) isRunningLocked() bool { - return u.stage != Idle && u.stage != Finished && u.stage != Failed +func isRunning(s stage) bool { + return s != Idle && s != Finished && s != Failed } // RemoveFailedStores removes Failed stores from the cluster. @@ -186,7 +188,7 @@ func (u *Controller) RemoveFailedStores(failedStores map[uint64]struct{}, timeou u.Lock() defer u.Unlock() - if u.isRunningLocked() { + if isRunning(u.stage) { return errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs() } @@ -316,7 +318,7 @@ func (u *Controller) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest, u.Lock() defer u.Unlock() - if !u.isRunningLocked() { + if !isRunning(u.stage) { // no recovery in progress, do nothing return } @@ -490,6 +492,11 @@ func (u *Controller) GetStage() stage { func (u *Controller) changeStage(stage stage) { u.stage = stage + // Halt and resume the scheduling once the running state changed. + running := isRunning(stage) + if opt := u.cluster.GetPersistOptions(); opt.IsSchedulingHalted() != running { + opt.SetHaltScheduling(running, "online-unsafe-recovery") + } var output StageOutput output.Time = time.Now().Format("2006-01-02 15:04:05.000") diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 3e1e5c03b84d..e5426f2828cb 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -2678,25 +2678,3 @@ func (c *RaftCluster) GetPausedSchedulerDelayAt(name string) (int64, error) { func (c *RaftCluster) GetPausedSchedulerDelayUntil(name string) (int64, error) { return c.coordinator.GetPausedSchedulerDelayUntil(name) } - -var ( - onlineUnsafeRecoveryStatus = schedulingAllowanceStatusGauge.WithLabelValues("online-unsafe-recovery") - haltSchedulingStatus = schedulingAllowanceStatusGauge.WithLabelValues("halt-scheduling") -) - -// CheckSchedulingAllowance checks if the cluster allows scheduling currently. -func (c *RaftCluster) CheckSchedulingAllowance() (bool, error) { - // If the cluster is in the process of online unsafe recovery, it should not allow scheduling. - if c.GetUnsafeRecoveryController().IsRunning() { - onlineUnsafeRecoveryStatus.Set(1) - return false, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs() - } - onlineUnsafeRecoveryStatus.Set(0) - // If the halt-scheduling is set, it should not allow scheduling. - if c.opt.IsSchedulingHalted() { - haltSchedulingStatus.Set(1) - return false, errs.ErrSchedulingIsHalted.FastGenByArgs() - } - haltSchedulingStatus.Set(0) - return true, nil -} diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index cc401731a6f8..51781bde7f6a 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -43,8 +43,8 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { // HandleAskSplit handles the split request. func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) { - if allowed, err := c.CheckSchedulingAllowance(); !allowed { - return nil, err + if c.isSchedulingHalted() { + return nil, errs.ErrSchedulingIsHalted.FastGenByArgs() } if !c.opt.IsTikvRegionSplitEnabled() { return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs() @@ -86,6 +86,10 @@ func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSp return split, nil } +func (c *RaftCluster) isSchedulingHalted() bool { + return c.opt.IsSchedulingHalted() +} + // ValidRequestRegion is used to decide if the region is valid. func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error { startKey := reqRegion.GetStartKey() @@ -105,8 +109,8 @@ func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error { // HandleAskBatchSplit handles the batch split request. func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) { - if allowed, err := c.CheckSchedulingAllowance(); !allowed { - return nil, err + if c.isSchedulingHalted() { + return nil, errs.ErrSchedulingIsHalted.FastGenByArgs() } if !c.opt.IsTikvRegionSplitEnabled() { return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs() diff --git a/server/config/metrics.go b/server/config/metrics.go new file mode 100644 index 000000000000..8919a0844f7a --- /dev/null +++ b/server/config/metrics.go @@ -0,0 +1,29 @@ +// Copyright 2019 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import "github.com/prometheus/client_golang/prometheus" + +var schedulingAllowanceStatusGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "scheduling", + Name: "allowance_status", + Help: "Status of the scheduling allowance.", + }, []string{"kind"}) + +func init() { + prometheus.MustRegister(schedulingAllowanceStatusGauge) +} diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 1b18f9fad541..4b87fa4b351b 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -920,14 +920,26 @@ func (o *PersistOptions) SetAllStoresLimitTTL(ctx context.Context, client *clien return err } +var haltSchedulingStatus = schedulingAllowanceStatusGauge.WithLabelValues("halt-scheduling") + // SetHaltScheduling set HaltScheduling. -func (o *PersistOptions) SetHaltScheduling(halt bool) { +func (o *PersistOptions) SetHaltScheduling(halt bool, source string) { v := o.GetScheduleConfig().Clone() v.HaltScheduling = halt o.SetScheduleConfig(v) + if halt { + haltSchedulingStatus.Set(1) + schedulingAllowanceStatusGauge.WithLabelValues(source).Set(1) + } else { + haltSchedulingStatus.Set(0) + schedulingAllowanceStatusGauge.WithLabelValues(source).Set(0) + } } // IsSchedulingHalted returns if PD scheduling is halted. func (o *PersistOptions) IsSchedulingHalted() bool { + if o == nil { + return false + } return o.GetScheduleConfig().HaltScheduling }