Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster, schedule: unify the scheduling halt to decouple dependencies #6569

Merged
merged 5 commits into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (c *Coordinator) PatrolRegions() {
log.Info("patrol regions has been stopped")
return
}
if allowed, _ := c.cluster.CheckSchedulingAllowance(); !allowed {
if c.isSchedulingHalted() {
continue
}

Expand All @@ -172,6 +172,10 @@ func (c *Coordinator) PatrolRegions() {
}
}

func (c *Coordinator) isSchedulingHalted() bool {
return c.cluster.GetPersistOptions().IsSchedulingHalted()
}

func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) {
regions = c.cluster.ScanRegions(startKey, nil, patrolScanRegionLimit)
if len(regions) == 0 {
Expand Down Expand Up @@ -561,7 +565,7 @@ func (c *Coordinator) CollectSchedulerMetrics() {
var allowScheduler float64
// If the scheduler is not allowed to schedule, it will disappear in Grafana panel.
// See issue #1341.
if allowed, _ := s.cluster.CheckSchedulingAllowance(); !s.IsPaused() && allowed {
if !s.IsPaused() && !c.isSchedulingHalted() {
allowScheduler = 1
}
schedulerStatusGauge.WithLabelValues(s.Scheduler.GetName(), "allow").Set(allowScheduler)
Expand Down Expand Up @@ -1036,8 +1040,7 @@ func (s *scheduleController) AllowSchedule(diagnosable bool) bool {
}
return false
}
allowed, _ := s.cluster.CheckSchedulingAllowance()
if !allowed {
if s.isSchedulingHalted() {
if diagnosable {
s.diagnosticRecorder.setResultFromStatus(halted)
}
Expand All @@ -1052,6 +1055,10 @@ func (s *scheduleController) AllowSchedule(diagnosable bool) bool {
return true
}

func (s *scheduleController) isSchedulingHalted() bool {
return s.cluster.GetPersistOptions().IsSchedulingHalted()
}

// isPaused returns if a scheduler is paused.
func (s *scheduleController) IsPaused() bool {
delayUntil := atomic.LoadInt64(&s.delayUntil)
Expand Down
1 change: 0 additions & 1 deletion pkg/schedule/core/cluster_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type ClusterInformer interface {
GetRegionLabeler() *labeler.RegionLabeler
GetStorage() storage.Storage
UpdateRegionsLabelLevelStats(regions []*core.RegionInfo)
CheckSchedulingAllowance() (bool, error)
AddSuspectRegions(ids ...uint64)
GetPersistOptions() *config.PersistOptions
}
Expand Down
17 changes: 12 additions & 5 deletions pkg/unsaferecovery/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/server/config"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -109,6 +110,7 @@ type cluster interface {
DropCacheAllRegion()
GetAllocator() id.Allocator
BuryStore(storeID uint64, forceBury bool) error
GetPersistOptions() *config.PersistOptions
}

// Controller is used to control the unsafe recovery process.
Expand Down Expand Up @@ -174,19 +176,19 @@ func (u *Controller) reset() {
func (u *Controller) IsRunning() bool {
u.RLock()
defer u.RUnlock()
return u.isRunningLocked()
return isRunning(u.stage)
}

func (u *Controller) isRunningLocked() bool {
return u.stage != Idle && u.stage != Finished && u.stage != Failed
func isRunning(s stage) bool {
return s != Idle && s != Finished && s != Failed
}

// RemoveFailedStores removes Failed stores from the cluster.
func (u *Controller) RemoveFailedStores(failedStores map[uint64]struct{}, timeout uint64, autoDetect bool) error {
u.Lock()
defer u.Unlock()

if u.isRunningLocked() {
if isRunning(u.stage) {
return errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
}

Expand Down Expand Up @@ -316,7 +318,7 @@ func (u *Controller) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest,
u.Lock()
defer u.Unlock()

if !u.isRunningLocked() {
if !isRunning(u.stage) {
// no recovery in progress, do nothing
return
}
Expand Down Expand Up @@ -490,6 +492,11 @@ func (u *Controller) GetStage() stage {

func (u *Controller) changeStage(stage stage) {
u.stage = stage
// Halt and resume the scheduling once the running state changed.
running := isRunning(stage)
if opt := u.cluster.GetPersistOptions(); opt.IsSchedulingHalted() != running {
opt.SetHaltScheduling(running, "online-unsafe-recovery")
}

var output StageOutput
output.Time = time.Now().Format("2006-01-02 15:04:05.000")
Expand Down
22 changes: 0 additions & 22 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2678,25 +2678,3 @@ func (c *RaftCluster) GetPausedSchedulerDelayAt(name string) (int64, error) {
func (c *RaftCluster) GetPausedSchedulerDelayUntil(name string) (int64, error) {
return c.coordinator.GetPausedSchedulerDelayUntil(name)
}

var (
onlineUnsafeRecoveryStatus = schedulingAllowanceStatusGauge.WithLabelValues("online-unsafe-recovery")
haltSchedulingStatus = schedulingAllowanceStatusGauge.WithLabelValues("halt-scheduling")
)

// CheckSchedulingAllowance checks if the cluster allows scheduling currently.
func (c *RaftCluster) CheckSchedulingAllowance() (bool, error) {
// If the cluster is in the process of online unsafe recovery, it should not allow scheduling.
if c.GetUnsafeRecoveryController().IsRunning() {
onlineUnsafeRecoveryStatus.Set(1)
return false, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
}
onlineUnsafeRecoveryStatus.Set(0)
// If the halt-scheduling is set, it should not allow scheduling.
if c.opt.IsSchedulingHalted() {
haltSchedulingStatus.Set(1)
return false, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
haltSchedulingStatus.Set(0)
return true, nil
}
12 changes: 8 additions & 4 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {

// HandleAskSplit handles the split request.
func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) {
if allowed, err := c.CheckSchedulingAllowance(); !allowed {
return nil, err
if c.isSchedulingHalted() {
return nil, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
if !c.opt.IsTikvRegionSplitEnabled() {
return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs()
Expand Down Expand Up @@ -86,6 +86,10 @@ func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSp
return split, nil
}

func (c *RaftCluster) isSchedulingHalted() bool {
return c.opt.IsSchedulingHalted()
}

// ValidRequestRegion is used to decide if the region is valid.
func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error {
startKey := reqRegion.GetStartKey()
Expand All @@ -105,8 +109,8 @@ func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error {

// HandleAskBatchSplit handles the batch split request.
func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) {
if allowed, err := c.CheckSchedulingAllowance(); !allowed {
return nil, err
if c.isSchedulingHalted() {
return nil, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
if !c.opt.IsTikvRegionSplitEnabled() {
return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs()
Expand Down
9 changes: 0 additions & 9 deletions server/cluster/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,11 @@ var (
Name: "store_sync",
Help: "The state of store sync config",
}, []string{"address", "state"})

schedulingAllowanceStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "scheduling",
Name: "allowance_status",
Help: "Status of the scheduling allowance.",
}, []string{"kind"})
)

func init() {
prometheus.MustRegister(regionEventCounter)
prometheus.MustRegister(healthStatusGauge)
prometheus.MustRegister(schedulingAllowanceStatusGauge)
prometheus.MustRegister(clusterStateCPUGauge)
prometheus.MustRegister(clusterStateCurrent)
prometheus.MustRegister(bucketEventCounter)
Expand Down
29 changes: 29 additions & 0 deletions server/config/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import "github.com/prometheus/client_golang/prometheus"

var schedulingAllowanceStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "scheduling",
Name: "allowance_status",
Help: "Status of the scheduling allowance.",
}, []string{"kind"})

func init() {
prometheus.MustRegister(schedulingAllowanceStatusGauge)
}
14 changes: 13 additions & 1 deletion server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,14 +920,26 @@ func (o *PersistOptions) SetAllStoresLimitTTL(ctx context.Context, client *clien
return err
}

var haltSchedulingStatus = schedulingAllowanceStatusGauge.WithLabelValues("halt-scheduling")

// SetHaltScheduling set HaltScheduling.
func (o *PersistOptions) SetHaltScheduling(halt bool) {
func (o *PersistOptions) SetHaltScheduling(halt bool, source string) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need to be persisted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though it's only used for the metrics and may be lost due to the crash, it's OK for me not to persist it in the PD. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok to me.

v := o.GetScheduleConfig().Clone()
v.HaltScheduling = halt
o.SetScheduleConfig(v)
if halt {
haltSchedulingStatus.Set(1)
schedulingAllowanceStatusGauge.WithLabelValues(source).Set(1)
} else {
haltSchedulingStatus.Set(0)
schedulingAllowanceStatusGauge.WithLabelValues(source).Set(0)
}
}

// IsSchedulingHalted returns if PD scheduling is halted.
func (o *PersistOptions) IsSchedulingHalted() bool {
if o == nil {
return false
}
return o.GetScheduleConfig().HaltScheduling
}