diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 88b79ad9669..1d56817068a 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -6,17 +6,23 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" + sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/statistics/buckets" + "github.com/tikv/pd/pkg/statistics/utils" + "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/storage/endpoint" ) // Cluster is used to manage all information for scheduling purpose. type Cluster struct { - basicCluster *core.BasicCluster + *core.BasicCluster ruleManager *placement.RuleManager labelerManager *labeler.RegionLabeler persistConfig *config.PersistConfig + hotStat *statistics.HotStat } const regionLabelGCInterval = time.Hour @@ -29,10 +35,89 @@ func NewCluster(ctx context.Context, storage endpoint.RuleStorage, cfg *config.C if err != nil { return nil, err } + return &Cluster{ - basicCluster: basicCluster, + BasicCluster: basicCluster, ruleManager: placement.NewRuleManager(storage, basicCluster, persistConfig), labelerManager: labelerManager, persistConfig: persistConfig, + hotStat: statistics.NewHotStat(ctx), }, nil } + +// GetBasicCluster returns the basic cluster. +func (c *Cluster) GetBasicCluster() *core.BasicCluster { + return c.BasicCluster +} + +// GetSharedConfig returns the shared config. +func (c *Cluster) GetSharedConfig() sc.SharedConfigProvider { + return c.persistConfig +} + +// GetRuleManager returns the rule manager. +func (c *Cluster) GetRuleManager() *placement.RuleManager { + return c.ruleManager +} + +// GetRegionLabeler returns the region labeler. +func (c *Cluster) GetRegionLabeler() *labeler.RegionLabeler { + return c.labelerManager +} + +// GetStoresLoads returns load stats of all stores. +func (c *Cluster) GetStoresLoads() map[uint64][]float64 { + return c.hotStat.GetStoresLoads() +} + +// IsRegionHot checks if a region is in hot state. +func (c *Cluster) IsRegionHot(region *core.RegionInfo) bool { + return c.hotStat.IsRegionHot(region, c.persistConfig.GetHotRegionCacheHitsThreshold()) +} + +// GetHotPeerStat returns hot peer stat with specified regionID and storeID. +func (c *Cluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat { + return c.hotStat.GetHotPeerStat(rw, regionID, storeID) +} + +// RegionReadStats returns hot region's read stats. +// The result only includes peers that are hot enough. +// RegionStats is a thread-safe method +func (c *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { + // As read stats are reported by store heartbeat, the threshold needs to be adjusted. + threshold := c.persistConfig.GetHotRegionCacheHitsThreshold() * + (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) + return c.hotStat.RegionStats(utils.Read, threshold) +} + +// RegionWriteStats returns hot region's write stats. +// The result only includes peers that are hot enough. +func (c *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { + // RegionStats is a thread-safe method + return c.hotStat.RegionStats(utils.Write, c.persistConfig.GetHotRegionCacheHitsThreshold()) +} + +// BucketsStats returns hot region's buckets stats. +func (c *Cluster) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat { + return c.hotStat.BucketsStats(degree, regionIDs...) +} + +// TODO: implement the following methods + +// GetStorage returns the storage. +func (c *Cluster) GetStorage() storage.Storage { return nil } + +// UpdateRegionsLabelLevelStats updates the region label level stats. +func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {} + +// GetStoreConfig returns the store config. +func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return nil } + +// AllocID allocates a new ID. +func (c *Cluster) AllocID() (uint64, error) { return 0, nil } + +// GetCheckerConfig returns the checker config. +func (c *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { return nil } + +// GetSchedulerConfig returns the scheduler config. +func (c *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { return nil } diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index ac693f38e49..7fa16492178 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -33,10 +33,11 @@ import ( "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/mcs/utils" sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/configutil" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/metricutil" - "github.com/tikv/pd/server/config" + "github.com/tikv/pd/pkg/utils/typeutil" "go.uber.org/zap" ) @@ -205,7 +206,7 @@ func NewPersistConfig(cfg *Config) *PersistConfig { o.replication.Store(&cfg.Replication) // storeConfig will be fetched from TiKV by PD API server, // so we just set an empty value here first. - o.storeConfig.Store(&config.StoreConfig{}) + o.storeConfig.Store(&sc.StoreConfig{}) return o } @@ -240,7 +241,7 @@ func (o *PersistConfig) SetReplicationConfig(cfg *sc.ReplicationConfig) { } // SetStoreConfig sets the TiKV store configuration. -func (o *PersistConfig) SetStoreConfig(cfg *config.StoreConfig) { +func (o *PersistConfig) SetStoreConfig(cfg *sc.StoreConfig) { // Some of the fields won't be persisted and watched, // so we need to adjust it here before storing it. cfg.Adjust() @@ -248,8 +249,8 @@ func (o *PersistConfig) SetStoreConfig(cfg *config.StoreConfig) { } // GetStoreConfig returns the TiKV store configuration. -func (o *PersistConfig) GetStoreConfig() *config.StoreConfig { - return o.storeConfig.Load().(*config.StoreConfig) +func (o *PersistConfig) GetStoreConfig() *sc.StoreConfig { + return o.storeConfig.Load().(*sc.StoreConfig) } // GetMaxReplicas returns the max replicas. @@ -282,24 +283,54 @@ func (o *PersistConfig) GetHighSpaceRatio() float64 { return o.GetScheduleConfig().HighSpaceRatio } +// GetHotRegionScheduleLimit returns the limit for hot region schedule. +func (o *PersistConfig) GetHotRegionScheduleLimit() uint64 { + return o.GetScheduleConfig().HotRegionScheduleLimit +} + +// GetRegionScheduleLimit returns the limit for region schedule. +func (o *PersistConfig) GetRegionScheduleLimit() uint64 { + return o.GetScheduleConfig().RegionScheduleLimit +} + +// GetLeaderScheduleLimit returns the limit for leader schedule. +func (o *PersistConfig) GetLeaderScheduleLimit() uint64 { + return o.GetScheduleConfig().LeaderScheduleLimit +} + +// GetReplicaScheduleLimit returns the limit for replica schedule. +func (o *PersistConfig) GetReplicaScheduleLimit() uint64 { + return o.GetScheduleConfig().ReplicaScheduleLimit +} + +// GetMergeScheduleLimit returns the limit for merge schedule. +func (o *PersistConfig) GetMergeScheduleLimit() uint64 { + return o.GetScheduleConfig().MergeScheduleLimit +} + +// GetLeaderSchedulePolicy is to get leader schedule policy. +func (o *PersistConfig) GetLeaderSchedulePolicy() constant.SchedulePolicy { + return constant.StringToSchedulePolicy(o.GetScheduleConfig().LeaderSchedulePolicy) +} + // GetMaxStoreDownTime returns the max store downtime. func (o *PersistConfig) GetMaxStoreDownTime() time.Duration { return o.GetScheduleConfig().MaxStoreDownTime.Duration } +// GetIsolationLevel returns the isolation label for each region. +func (o *PersistConfig) GetIsolationLevel() string { + return o.GetReplicationConfig().IsolationLevel +} + // GetLocationLabels returns the location labels. func (o *PersistConfig) GetLocationLabels() []string { return o.GetReplicationConfig().LocationLabels } -// CheckLabelProperty checks if the label property is satisfied. -func (o *PersistConfig) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool { - return false -} - // IsUseJointConsensus returns if the joint consensus is enabled. func (o *PersistConfig) IsUseJointConsensus() bool { - return true + return o.GetScheduleConfig().EnableJointConsensus } // GetKeyType returns the key type. @@ -317,9 +348,14 @@ func (o *PersistConfig) IsOneWayMergeEnabled() bool { return o.GetScheduleConfig().EnableOneWayMerge } -// GetMergeScheduleLimit returns the merge schedule limit. -func (o *PersistConfig) GetMergeScheduleLimit() uint64 { - return o.GetScheduleConfig().MergeScheduleLimit +// GetMaxMergeRegionSize returns the max region size. +func (o *PersistConfig) GetMaxMergeRegionSize() uint64 { + return o.GetScheduleConfig().MaxMergeRegionSize +} + +// GetMaxMergeRegionKeys returns the max region keys. +func (o *PersistConfig) GetMaxMergeRegionKeys() uint64 { + return o.GetScheduleConfig().MaxMergeRegionKeys } // GetRegionScoreFormulaVersion returns the region score formula version. @@ -332,6 +368,96 @@ func (o *PersistConfig) GetSchedulerMaxWaitingOperator() uint64 { return o.GetScheduleConfig().SchedulerMaxWaitingOperator } +// GetHotRegionCacheHitsThreshold returns the hot region cache hits threshold. +func (o *PersistConfig) GetHotRegionCacheHitsThreshold() int { + return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold) +} + +// GetMaxMovableHotPeerSize returns the max movable hot peer size. +func (o *PersistConfig) GetMaxMovableHotPeerSize() int64 { + return o.GetScheduleConfig().MaxMovableHotPeerSize +} + +// GetSwitchWitnessInterval returns the interval between promote to non-witness and starting to switch to witness. +func (o *PersistConfig) GetSwitchWitnessInterval() time.Duration { + return o.GetScheduleConfig().SwitchWitnessInterval.Duration +} + +// GetSplitMergeInterval returns the interval between finishing split and starting to merge. +func (o *PersistConfig) GetSplitMergeInterval() time.Duration { + return o.GetScheduleConfig().SplitMergeInterval.Duration +} + +// GetSlowStoreEvictingAffectedStoreRatioThreshold returns the affected ratio threshold when judging a store is slow. +func (o *PersistConfig) GetSlowStoreEvictingAffectedStoreRatioThreshold() float64 { + return o.GetScheduleConfig().SlowStoreEvictingAffectedStoreRatioThreshold +} + +// GetPatrolRegionInterval returns the interval of patrolling region. +func (o *PersistConfig) GetPatrolRegionInterval() time.Duration { + return o.GetScheduleConfig().PatrolRegionInterval.Duration +} + +// GetTolerantSizeRatio gets the tolerant size ratio. +func (o *PersistConfig) GetTolerantSizeRatio() float64 { + return o.GetScheduleConfig().TolerantSizeRatio +} + +// GetWitnessScheduleLimit returns the limit for region schedule. +func (o *PersistConfig) GetWitnessScheduleLimit() uint64 { + return o.GetScheduleConfig().WitnessScheduleLimit +} + +// IsDebugMetricsEnabled returns if debug metrics is enabled. +func (o *PersistConfig) IsDebugMetricsEnabled() bool { + return o.GetScheduleConfig().EnableDebugMetrics +} + +// IsDiagnosticAllowed returns whether is enable to use diagnostic. +func (o *PersistConfig) IsDiagnosticAllowed() bool { + return o.GetScheduleConfig().EnableDiagnostic +} + +// IsRemoveDownReplicaEnabled returns if remove down replica is enabled. +func (o *PersistConfig) IsRemoveDownReplicaEnabled() bool { + return o.GetScheduleConfig().EnableRemoveDownReplica +} + +// IsReplaceOfflineReplicaEnabled returns if replace offline replica is enabled. +func (o *PersistConfig) IsReplaceOfflineReplicaEnabled() bool { + return o.GetScheduleConfig().EnableReplaceOfflineReplica +} + +// IsMakeUpReplicaEnabled returns if make up replica is enabled. +func (o *PersistConfig) IsMakeUpReplicaEnabled() bool { + return o.GetScheduleConfig().EnableMakeUpReplica +} + +// IsRemoveExtraReplicaEnabled returns if remove extra replica is enabled. +func (o *PersistConfig) IsRemoveExtraReplicaEnabled() bool { + return o.GetScheduleConfig().EnableRemoveExtraReplica +} + +// IsLocationReplacementEnabled returns if location replace is enabled. +func (o *PersistConfig) IsLocationReplacementEnabled() bool { + return o.GetScheduleConfig().EnableLocationReplacement +} + +// IsWitnessAllowed returns if the witness is allowed. +func (o *PersistConfig) IsWitnessAllowed() bool { + return o.GetScheduleConfig().EnableWitness +} + +// IsPlacementRulesCacheEnabled returns if the placement rules cache is enabled. +func (o *PersistConfig) IsPlacementRulesCacheEnabled() bool { + return o.GetReplicationConfig().EnablePlacementRulesCache +} + +// IsSchedulingHalted returns if PD scheduling is halted. +func (o *PersistConfig) IsSchedulingHalted() bool { + return o.GetScheduleConfig().HaltScheduling +} + // GetStoreLimitByType returns the limit of a store with a given type. func (o *PersistConfig) GetStoreLimitByType(storeID uint64, typ storelimit.Type) (returned float64) { limit := o.GetStoreLimit(storeID) @@ -364,18 +490,103 @@ func (o *PersistConfig) GetStoreLimit(storeID uint64) (returnSC sc.StoreLimitCon return o.GetScheduleConfig().StoreLimit[storeID] } -// IsWitnessAllowed returns if the witness is allowed. -func (o *PersistConfig) IsWitnessAllowed() bool { - return false +// SetAllStoresLimit sets all store limit for a given type and rate. +func (o *PersistConfig) SetAllStoresLimit(typ storelimit.Type, ratePerMin float64) { + v := o.GetScheduleConfig().Clone() + switch typ { + case storelimit.AddPeer: + sc.DefaultStoreLimit.SetDefaultStoreLimit(storelimit.AddPeer, ratePerMin) + for storeID := range v.StoreLimit { + sc := sc.StoreLimitConfig{AddPeer: ratePerMin, RemovePeer: v.StoreLimit[storeID].RemovePeer} + v.StoreLimit[storeID] = sc + } + case storelimit.RemovePeer: + sc.DefaultStoreLimit.SetDefaultStoreLimit(storelimit.RemovePeer, ratePerMin) + for storeID := range v.StoreLimit { + sc := sc.StoreLimitConfig{AddPeer: v.StoreLimit[storeID].AddPeer, RemovePeer: ratePerMin} + v.StoreLimit[storeID] = sc + } + } + + o.SetScheduleConfig(v) } -// IsPlacementRulesCacheEnabled returns if the placement rules cache is enabled. -func (o *PersistConfig) IsPlacementRulesCacheEnabled() bool { +// SetMaxReplicas sets the number of replicas for each region. +func (o *PersistConfig) SetMaxReplicas(replicas int) { + v := o.GetReplicationConfig().Clone() + v.MaxReplicas = uint64(replicas) + o.SetReplicationConfig(v) +} + +// IsSchedulerDisabled returns if the scheduler is disabled. +func (o *PersistConfig) IsSchedulerDisabled(t string) bool { + schedulers := o.GetScheduleConfig().Schedulers + for _, s := range schedulers { + if t == s.Type { + return s.Disable + } + } return false } // SetPlacementRulesCacheEnabled sets if the placement rules cache is enabled. -func (o *PersistConfig) SetPlacementRulesCacheEnabled(b bool) {} +func (o *PersistConfig) SetPlacementRulesCacheEnabled(enabled bool) { + v := o.GetReplicationConfig().Clone() + v.EnablePlacementRulesCache = enabled + o.SetReplicationConfig(v) +} // SetEnableWitness sets if the witness is enabled. -func (o *PersistConfig) SetEnableWitness(b bool) {} +func (o *PersistConfig) SetEnableWitness(enable bool) { + v := o.GetScheduleConfig().Clone() + v.EnableWitness = enable + o.SetScheduleConfig(v) +} + +// SetPlacementRuleEnabled set PlacementRuleEnabled +func (o *PersistConfig) SetPlacementRuleEnabled(enabled bool) { + v := o.GetReplicationConfig().Clone() + v.EnablePlacementRules = enabled + o.SetReplicationConfig(v) +} + +// SetSplitMergeInterval to set the interval between finishing split and starting to merge. It's only used to test. +func (o *PersistConfig) SetSplitMergeInterval(splitMergeInterval time.Duration) { + v := o.GetScheduleConfig().Clone() + v.SplitMergeInterval = typeutil.Duration{Duration: splitMergeInterval} + o.SetScheduleConfig(v) +} + +// SetHaltScheduling set HaltScheduling. +func (o *PersistConfig) SetHaltScheduling(halt bool, source string) { + v := o.GetScheduleConfig().Clone() + v.HaltScheduling = halt + o.SetScheduleConfig(v) +} + +// TODO: implement the following methods + +// AddSchedulerCfg adds the scheduler configurations. +func (o *PersistConfig) AddSchedulerCfg(string, []string) {} + +// CheckLabelProperty checks if the label property is satisfied. +func (o *PersistConfig) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool { + return false +} + +// IsTraceRegionFlow returns if the region flow is tracing. +// If the accuracy cannot reach 0.1 MB, it is considered not. +func (o *PersistConfig) IsTraceRegionFlow() bool { + return false +} + +// Persist saves the configuration to the storage. +func (o *PersistConfig) Persist(storage endpoint.ConfigStorage) error { + return nil +} + +// RemoveSchedulerCfg removes the scheduler configurations. +func (o *PersistConfig) RemoveSchedulerCfg(tp string) {} + +// UseRaftV2 set some config for raft store v2 by default temporary. +func (o *PersistConfig) UseRaftV2() {} diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index 32b8c6cc508..81ec4b62f0c 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/log" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/utils/etcdutil" - "github.com/tikv/pd/server/config" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" @@ -46,7 +45,7 @@ type persistedConfig struct { ClusterVersion semver.Version `json:"cluster-version"` Schedule sc.ScheduleConfig `json:"schedule"` Replication sc.ReplicationConfig `json:"replication"` - Store config.StoreConfig `json:"store"` + Store sc.StoreConfig `json:"store"` } // NewWatcher creates a new watcher to watch the config meta change from PD API server. diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index b83c4c13ef5..c721b4238a1 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -44,6 +44,8 @@ import ( "github.com/tikv/pd/pkg/mcs/scheduling/server/rule" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" + "github.com/tikv/pd/pkg/schedule" + "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" @@ -107,8 +109,11 @@ type Server struct { serviceID *discovery.ServiceRegistryEntry serviceRegister *discovery.ServiceRegister - cluster *Cluster - storage *endpoint.StorageEndpoint + cluster *Cluster + hbStreams *hbstream.HeartbeatStreams + storage *endpoint.StorageEndpoint + + coordinator *schedule.Coordinator // for watching the PD API server meta info updates that are related to the scheduling. configWatcher *config.Watcher @@ -487,6 +492,8 @@ func (s *Server) startServer() (err error) { if err != nil { return err } + s.hbStreams = hbstream.NewHeartbeatStreams(s.ctx, s.clusterID, s.cluster.GetBasicCluster()) + s.coordinator = schedule.NewCoordinator(s.ctx, s.cluster, s.hbStreams) tlsConfig, err := s.cfg.Security.ToTLSConfig() if err != nil { return err diff --git a/pkg/mock/mockcluster/config.go b/pkg/mock/mockcluster/config.go index d8d583dd2f2..6febba026e8 100644 --- a/pkg/mock/mockcluster/config.go +++ b/pkg/mock/mockcluster/config.go @@ -20,7 +20,6 @@ import ( sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/utils/typeutil" - "github.com/tikv/pd/server/config" ) // SetMaxMergeRegionSize updates the MaxMergeRegionSize configuration. @@ -173,15 +172,15 @@ func (mc *Cluster) SetMaxReplicasWithLabel(enablePlacementRules bool, num int, l // SetRegionMaxSize sets the region max size. func (mc *Cluster) SetRegionMaxSize(v string) { - mc.updateStoreConfig(func(r *config.StoreConfig) { r.RegionMaxSize = v }) + mc.updateStoreConfig(func(r *sc.StoreConfig) { r.RegionMaxSize = v }) } // SetRegionSizeMB sets the region max size. func (mc *Cluster) SetRegionSizeMB(v uint64) { - mc.updateStoreConfig(func(r *config.StoreConfig) { r.RegionMaxSizeMB = v }) + mc.updateStoreConfig(func(r *sc.StoreConfig) { r.RegionMaxSizeMB = v }) } -func (mc *Cluster) updateStoreConfig(f func(*config.StoreConfig)) { +func (mc *Cluster) updateStoreConfig(f func(*sc.StoreConfig)) { r := mc.PersistOptions.GetStoreConfig().Clone() f(r) mc.SetStoreConfig(r) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index bbbf4c98375..a0417a863d3 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -112,11 +112,6 @@ func (mc *Cluster) AllocID() (uint64, error) { return mc.IDAllocator.Alloc() } -// GetPersistOptions returns the persist options. -func (mc *Cluster) GetPersistOptions() *config.PersistOptions { - return mc.PersistOptions -} - // UpdateRegionsLabelLevelStats updates the label level stats for the regions. func (mc *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {} diff --git a/pkg/schedule/config/config_provider.go b/pkg/schedule/config/config_provider.go index a4f8deb8c15..b9bb2757663 100644 --- a/pkg/schedule/config/config_provider.go +++ b/pkg/schedule/config/config_provider.go @@ -68,6 +68,9 @@ type SchedulerConfigProvider interface { IsDebugMetricsEnabled() bool IsDiagnosticAllowed() bool GetSlowStoreEvictingAffectedStoreRatioThreshold() float64 + + GetScheduleConfig() *ScheduleConfig + SetScheduleConfig(*ScheduleConfig) } // CheckerConfigProvider is the interface for checker configurations. @@ -111,6 +114,7 @@ type SharedConfigProvider interface { GetStoreLimitByType(uint64, storelimit.Type) float64 IsWitnessAllowed() bool IsPlacementRulesCacheEnabled() bool + SetHaltScheduling(bool, string) // for test purpose SetPlacementRulesCacheEnabled(bool) @@ -126,7 +130,7 @@ type ConfProvider interface { SetPlacementRuleEnabled(bool) SetSplitMergeInterval(time.Duration) SetMaxReplicas(int) - SetAllStoresLimit(typ storelimit.Type, ratePerMin float64) + SetAllStoresLimit(storelimit.Type, float64) // only for store configuration UseRaftV2() } diff --git a/server/config/store_config.go b/pkg/schedule/config/store_config.go similarity index 100% rename from server/config/store_config.go rename to pkg/schedule/config/store_config.go diff --git a/server/config/store_config_test.go b/pkg/schedule/config/store_config_test.go similarity index 100% rename from server/config/store_config_test.go rename to pkg/schedule/config/store_config_test.go diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 76334dc02ba..58fd3093299 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -89,14 +89,15 @@ func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams ctx, cancel := context.WithCancel(ctx) opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), hbStreams) schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController) + checkers := checker.NewController(ctx, cluster, cluster.GetCheckerConfig(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController) return &Coordinator{ ctx: ctx, cancel: cancel, cluster: cluster, prepareChecker: newPrepareChecker(), - checkers: checker.NewController(ctx, cluster, cluster.GetCheckerConfig(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController), - regionScatterer: scatter.NewRegionScatterer(ctx, cluster, opController), - regionSplitter: splitter.NewRegionSplitter(cluster, splitter.NewSplitRegionsHandler(cluster, opController)), + checkers: checkers, + regionScatterer: scatter.NewRegionScatterer(ctx, cluster, opController, checkers.AddSuspectRegions), + regionSplitter: splitter.NewRegionSplitter(cluster, splitter.NewSplitRegionsHandler(cluster, opController), checkers.AddSuspectRegions), schedulers: schedulers, opController: opController, hbStreams: hbStreams, @@ -168,7 +169,7 @@ func (c *Coordinator) PatrolRegions() { } func (c *Coordinator) isSchedulingHalted() bool { - return c.cluster.GetPersistOptions().IsSchedulingHalted() + return c.cluster.GetSchedulerConfig().IsSchedulingHalted() } func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) { @@ -375,7 +376,7 @@ func (c *Coordinator) initSchedulers() { log.Fatal("cannot load schedulers' config", errs.ZapError(err)) } - scheduleCfg := c.cluster.GetPersistOptions().GetScheduleConfig().Clone() + scheduleCfg := c.cluster.GetSchedulerConfig().GetScheduleConfig().Clone() // The new way to create scheduler with the independent configuration. for i, name := range scheduleNames { data := configs[i] @@ -434,8 +435,8 @@ func (c *Coordinator) initSchedulers() { // Removes the invalid scheduler config and persist. scheduleCfg.Schedulers = scheduleCfg.Schedulers[:k] - c.cluster.GetPersistOptions().SetScheduleConfig(scheduleCfg) - if err := c.cluster.GetPersistOptions().Persist(c.cluster.GetStorage()); err != nil { + c.cluster.GetSchedulerConfig().SetScheduleConfig(scheduleCfg) + if err := c.cluster.GetSchedulerConfig().Persist(c.cluster.GetStorage()); err != nil { log.Error("cannot persist schedule config", errs.ZapError(err)) } } diff --git a/pkg/schedule/core/cluster_informer.go b/pkg/schedule/core/cluster_informer.go index 388d9dcf3cd..63dacd0c30d 100644 --- a/pkg/schedule/core/cluster_informer.go +++ b/pkg/schedule/core/cluster_informer.go @@ -22,18 +22,15 @@ import ( "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/storage" - "github.com/tikv/pd/server/config" ) // ClusterInformer provides the necessary information of a cluster. type ClusterInformer interface { SchedulerCluster CheckerCluster - ScatterCluster GetStorage() storage.Storage UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) - GetPersistOptions() *config.PersistOptions } // SchedulerCluster is an aggregate interface that wraps multiple interfaces @@ -56,13 +53,6 @@ type CheckerCluster interface { GetStoreConfig() sc.StoreConfigProvider } -// ScatterCluster is an aggregate interface that wraps multiple interfaces -type ScatterCluster interface { - SharedCluster - - AddSuspectRegions(ids ...uint64) -} - // SharedCluster is an aggregate interface that wraps multiple interfaces type SharedCluster interface { BasicCluster diff --git a/pkg/schedule/scatter/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go index 75b06316294..c31461eb06f 100644 --- a/pkg/schedule/scatter/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -136,22 +136,24 @@ func (s *selectedStores) getDistributionByGroupLocked(group string) (map[uint64] // RegionScatterer scatters regions. type RegionScatterer struct { - ctx context.Context - name string - cluster sche.ScatterCluster - ordinaryEngine engineContext - specialEngines sync.Map - opController *operator.Controller + ctx context.Context + name string + cluster sche.SharedCluster + ordinaryEngine engineContext + specialEngines sync.Map + opController *operator.Controller + addSuspectRegions func(regionIDs ...uint64) } // NewRegionScatterer creates a region scatterer. // RegionScatter is used for the `Lightning`, it will scatter the specified regions before import data. -func NewRegionScatterer(ctx context.Context, cluster sche.ScatterCluster, opController *operator.Controller) *RegionScatterer { +func NewRegionScatterer(ctx context.Context, cluster sche.SharedCluster, opController *operator.Controller, addSuspectRegions func(regionIDs ...uint64)) *RegionScatterer { return &RegionScatterer{ - ctx: ctx, - name: regionScatterName, - cluster: cluster, - opController: opController, + ctx: ctx, + name: regionScatterName, + cluster: cluster, + opController: opController, + addSuspectRegions: addSuspectRegions, ordinaryEngine: newEngineContext(ctx, func() filter.Filter { return filter.NewEngineFilter(regionScatterName, filter.NotSpecialEngines) }), @@ -283,7 +285,7 @@ func (r *RegionScatterer) scatterRegions(regions map[uint64]*core.RegionInfo, fa // in a group level instead of cluster level. func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string, skipStoreLimit bool) (*operator.Operator, error) { if !filter.IsRegionReplicated(r.cluster, region) { - r.cluster.AddSuspectRegions(region.GetID()) + r.addSuspectRegions(region.GetID()) scatterSkipNotReplicatedCounter.Inc() log.Warn("region not replicated during scatter", zap.Uint64("region-id", region.GetID())) return nil, errors.Errorf("region %d is not fully replicated", region.GetID()) diff --git a/pkg/schedule/scatter/region_scatterer_test.go b/pkg/schedule/scatter/region_scatterer_test.go index f4ee5953e6f..c0724e481f6 100644 --- a/pkg/schedule/scatter/region_scatterer_test.go +++ b/pkg/schedule/scatter/region_scatterer_test.go @@ -105,7 +105,7 @@ func scatter(re *require.Assertions, numStores, numRegions uint64, useRules bool // region distributed in same stores. tc.AddLeaderRegion(i, 1, 2, 3) } - scatterer := NewRegionScatterer(ctx, tc, oc) + scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions) for i := uint64(1); i <= numRegions; i++ { region := tc.GetRegion(i) @@ -175,7 +175,7 @@ func scatterSpecial(re *require.Assertions, numOrdinaryStores, numSpecialStores, []uint64{numOrdinaryStores + 1, numOrdinaryStores + 2, numOrdinaryStores + 3}, ) } - scatterer := NewRegionScatterer(ctx, tc, oc) + scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions) for i := uint64(1); i <= numRegions; i++ { region := tc.GetRegion(i) @@ -242,7 +242,7 @@ func TestStoreLimit(t *testing.T) { tc.AddLeaderRegion(i, seq.next(), seq.next(), seq.next()) } - scatterer := NewRegionScatterer(ctx, tc, oc) + scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions) for i := uint64(1); i <= 5; i++ { region := tc.GetRegion(i) @@ -287,7 +287,7 @@ func TestScatterCheck(t *testing.T) { } for _, testCase := range testCases { t.Log(testCase.name) - scatterer := NewRegionScatterer(ctx, tc, oc) + scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions) _, err := scatterer.Scatter(testCase.checkRegion, "", false) if testCase.needFix { re.Error(err) @@ -328,7 +328,7 @@ func TestSomeStoresFilteredScatterGroupInConcurrency(t *testing.T) { tc.SetStoreLastHeartbeatInterval(i, 40*time.Minute) } re.Equal(tc.GetStore(uint64(6)).IsDisconnected(), true) - scatterer := NewRegionScatterer(ctx, tc, oc) + scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions) var wg sync.WaitGroup for j := 0; j < 10; j++ { wg.Add(1) @@ -382,7 +382,7 @@ func TestScatterGroupInConcurrency(t *testing.T) { // We send scatter interweave request for each group to simulate scattering multiple region groups in concurrency. for _, testCase := range testCases { t.Log(testCase.name) - scatterer := NewRegionScatterer(ctx, tc, oc) + scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions) regionID := 1 for i := 0; i < 100; i++ { for j := 0; j < testCase.groupCount; j++ { @@ -433,7 +433,7 @@ func TestScatterForManyRegion(t *testing.T) { tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute) } - scatterer := NewRegionScatterer(ctx, tc, oc) + scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions) regions := make(map[uint64]*core.RegionInfo) for i := 1; i <= 1200; i++ { regions[uint64(i)] = tc.AddLightWeightLeaderRegion(uint64(i), 1, 2, 3) @@ -475,7 +475,7 @@ func TestScattersGroup(t *testing.T) { for id, testCase := range testCases { group := fmt.Sprintf("gourp-%d", id) t.Log(testCase.name) - scatterer := NewRegionScatterer(ctx, tc, oc) + scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions) regions := map[uint64]*core.RegionInfo{} for i := 1; i <= 100; i++ { regions[uint64(i)] = tc.AddLightWeightLeaderRegion(uint64(i), 1, 2, 3) @@ -548,7 +548,7 @@ func TestRegionFromDifferentGroups(t *testing.T) { for i := uint64(1); i <= uint64(storeCount); i++ { tc.AddRegionStore(i, 0) } - scatterer := NewRegionScatterer(ctx, tc, oc) + scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions) regionCount := 50 for i := 1; i <= regionCount; i++ { p := rand.Perm(storeCount) @@ -614,7 +614,7 @@ func TestRegionHasLearner(t *testing.T) { }, }, }) - scatterer := NewRegionScatterer(ctx, tc, oc) + scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions) regionCount := 50 for i := 1; i <= regionCount; i++ { _, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), "group", false) @@ -674,7 +674,7 @@ func TestSelectedStoresTooFewPeers(t *testing.T) { tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute) } group := "group" - scatterer := NewRegionScatterer(ctx, tc, oc) + scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions) // Put a lot of regions in Store 1/2/3. for i := uint64(1); i < 100; i++ { @@ -711,7 +711,7 @@ func TestSelectedStoresTooManyPeers(t *testing.T) { tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute) } group := "group" - scatterer := NewRegionScatterer(ctx, tc, oc) + scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions) // priority 4 > 1 > 5 > 2 == 3 for i := 0; i < 1200; i++ { scatterer.ordinaryEngine.selectedPeer.Put(2, group) @@ -749,7 +749,7 @@ func TestBalanceRegion(t *testing.T) { tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute) } group := "group" - scatterer := NewRegionScatterer(ctx, tc, oc) + scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions) for i := uint64(1001); i <= 1300; i++ { region := tc.AddLeaderRegion(i, 2, 4, 6) op := scatterer.scatterRegion(region, group, false) @@ -807,7 +807,7 @@ func TestRemoveStoreLimit(t *testing.T) { tc.AddLeaderRegion(i, seq.next(), seq.next(), seq.next()) } - scatterer := NewRegionScatterer(ctx, tc, oc) + scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions) for i := uint64(1); i <= 5; i++ { region := tc.GetRegion(i) diff --git a/pkg/schedule/splitter/region_splitter.go b/pkg/schedule/splitter/region_splitter.go index f20346d5b59..f0da8442a2c 100644 --- a/pkg/schedule/splitter/region_splitter.go +++ b/pkg/schedule/splitter/region_splitter.go @@ -56,15 +56,17 @@ func NewSplitRegionsHandler(cluster sche.ClusterInformer, oc *operator.Controlle // RegionSplitter handles split regions type RegionSplitter struct { - cluster sche.ClusterInformer - handler SplitRegionsHandler + cluster sche.ClusterInformer + handler SplitRegionsHandler + addSuspectRegions func(ids ...uint64) } // NewRegionSplitter return a region splitter -func NewRegionSplitter(cluster sche.ClusterInformer, handler SplitRegionsHandler) *RegionSplitter { +func NewRegionSplitter(cluster sche.ClusterInformer, handler SplitRegionsHandler, addSuspectRegions func(ids ...uint64)) *RegionSplitter { return &RegionSplitter{ - cluster: cluster, - handler: handler, + cluster: cluster, + handler: handler, + addSuspectRegions: addSuspectRegions, } } @@ -170,7 +172,7 @@ func (r *RegionSplitter) groupKeysByRegion(keys [][]byte) map[uint64]*regionGrou func (r *RegionSplitter) checkRegionValid(region *core.RegionInfo) bool { if !filter.IsRegionReplicated(r.cluster, region) { - r.cluster.AddSuspectRegions(region.GetID()) + r.addSuspectRegions(region.GetID()) return false } if region.GetLeader() == nil { diff --git a/pkg/schedule/splitter/region_splitter_test.go b/pkg/schedule/splitter/region_splitter_test.go index f293446e6cd..8753d8bf2ec 100644 --- a/pkg/schedule/splitter/region_splitter_test.go +++ b/pkg/schedule/splitter/region_splitter_test.go @@ -86,7 +86,7 @@ func (suite *regionSplitterTestSuite) TestRegionSplitter() { tc := mockcluster.NewCluster(suite.ctx, opt) handler := newMockSplitRegionsHandler() tc.AddLeaderRegionWithRange(1, "eee", "hhh", 2, 3, 4) - splitter := NewRegionSplitter(tc, handler) + splitter := NewRegionSplitter(tc, handler, tc.AddSuspectRegions) newRegions := map[uint64]struct{}{} // assert success failureKeys := splitter.splitRegionsByKeys(suite.ctx, [][]byte{[]byte("fff"), []byte("ggg")}, newRegions) @@ -115,7 +115,7 @@ func (suite *regionSplitterTestSuite) TestGroupKeysByRegion() { tc.AddLeaderRegionWithRange(1, "aaa", "ccc", 2, 3, 4) tc.AddLeaderRegionWithRange(2, "ccc", "eee", 2, 3, 4) tc.AddLeaderRegionWithRange(3, "fff", "ggg", 2, 3, 4) - splitter := NewRegionSplitter(tc, handler) + splitter := NewRegionSplitter(tc, handler, tc.AddSuspectRegions) groupKeys := splitter.groupKeysByRegion([][]byte{ []byte("bbb"), []byte("ddd"), diff --git a/pkg/unsaferecovery/unsafe_recovery_controller.go b/pkg/unsaferecovery/unsafe_recovery_controller.go index 8d5f1b473fc..675f76b91a9 100644 --- a/pkg/unsaferecovery/unsafe_recovery_controller.go +++ b/pkg/unsaferecovery/unsafe_recovery_controller.go @@ -31,9 +31,9 @@ import ( "github.com/tikv/pd/pkg/codec" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" + sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" - "github.com/tikv/pd/server/config" "go.uber.org/zap" ) @@ -109,7 +109,7 @@ type cluster interface { DropCacheAllRegion() AllocID() (uint64, error) BuryStore(storeID uint64, forceBury bool) error - GetPersistOptions() *config.PersistOptions + GetSchedulerConfig() sc.SchedulerConfigProvider } // Controller is used to control the unsafe recovery process. @@ -495,7 +495,7 @@ func (u *Controller) changeStage(stage stage) { u.stage = stage // Halt and resume the scheduling once the running state changed. running := isRunning(stage) - if opt := u.cluster.GetPersistOptions(); opt.IsSchedulingHalted() != running { + if opt := u.cluster.GetSchedulerConfig(); opt.IsSchedulingHalted() != running { opt.SetHaltScheduling(running, "online-unsafe-recovery") } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index fd4ef1ad295..68578eb8a0a 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -261,11 +261,11 @@ func (c *RaftCluster) loadBootstrapTime() (time.Time, error) { // InitCluster initializes the raft cluster. func (c *RaftCluster) InitCluster( id id.Allocator, - opt *config.PersistOptions, + opt sc.ConfProvider, storage storage.Storage, basicCluster *core.BasicCluster, keyspaceGroupManager *keyspace.GroupManager) { - c.core, c.opt, c.storage, c.id = basicCluster, opt, storage, id + c.core, c.opt, c.storage, c.id = basicCluster, opt.(*config.PersistOptions), storage, id c.ctx, c.cancel = context.WithCancel(c.serverCtx) c.labelLevelStats = statistics.NewLabelStatistics() c.hotStat = statistics.NewHotStat(c.ctx) @@ -506,19 +506,19 @@ func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (b } // updateStoreConfig updates the store config. This is extracted for testing. -func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *config.StoreConfig) (bool, error) { +func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *sc.StoreConfig) (bool, error) { cfg.Adjust() c.opt.SetStoreConfig(cfg) - return oldCfg.Storage.Engine != config.RaftstoreV2 && cfg.Storage.Engine == config.RaftstoreV2, nil + return oldCfg.Storage.Engine != sc.RaftstoreV2 && cfg.Storage.Engine == sc.RaftstoreV2, nil } // fetchStoreConfigFromTiKV tries to fetch the config from the TiKV store URL. -func (c *RaftCluster) fetchStoreConfigFromTiKV(ctx context.Context, statusAddress string) (*config.StoreConfig, error) { - cfg := &config.StoreConfig{} +func (c *RaftCluster) fetchStoreConfigFromTiKV(ctx context.Context, statusAddress string) (*sc.StoreConfig, error) { + cfg := &sc.StoreConfig{} failpoint.Inject("mockFetchStoreConfigFromTiKV", func(val failpoint.Value) { if regionMaxSize, ok := val.(string); ok { cfg.RegionMaxSize = regionMaxSize - cfg.Storage.Engine = config.RaftstoreV2 + cfg.Storage.Engine = sc.RaftstoreV2 } failpoint.Return(cfg, nil) }) @@ -836,11 +836,6 @@ func (c *RaftCluster) GetOpts() sc.ConfProvider { return c.opt } -// GetPersistOptions returns cluster's configuration. -func (c *RaftCluster) GetPersistOptions() *config.PersistOptions { - return c.opt -} - // GetScheduleConfig returns scheduling configurations. func (c *RaftCluster) GetScheduleConfig() *sc.ScheduleConfig { return c.opt.GetScheduleConfig() @@ -1064,7 +1059,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { if err != nil { return err } - region.Inherit(origin, c.GetPersistOptions().GetStoreConfig().IsEnableRegionBucket()) + region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) c.hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) c.hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 10ea4dd9dbd..8c7c46b6e08 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1343,7 +1343,7 @@ func TestStoreConfigUpdate(t *testing.T) { "consistency-check-method": "mvcc", "perf-level": 2 }}` - var config config.StoreConfig + var config sc.StoreConfig re.NoError(json.Unmarshal([]byte(body), &config)) tc.updateStoreConfig(opt.GetStoreConfig(), &config) re.Equal(uint64(144000000), opt.GetRegionMaxKeys()) @@ -1354,7 +1354,7 @@ func TestStoreConfigUpdate(t *testing.T) { // Case2: empty config. { body := `{}` - var config config.StoreConfig + var config sc.StoreConfig re.NoError(json.Unmarshal([]byte(body), &config)) tc.updateStoreConfig(opt.GetStoreConfig(), &config) re.Equal(uint64(1440000), opt.GetRegionMaxKeys()) @@ -1380,7 +1380,7 @@ func TestStoreConfigUpdate(t *testing.T) { "storage":{ "engine":"raft-kv2" }}` - var config config.StoreConfig + var config sc.StoreConfig re.NoError(json.Unmarshal([]byte(body), &config)) tc.updateStoreConfig(opt.GetStoreConfig(), &config) re.Equal(uint64(96), opt.GetRegionBucketSize()) @@ -1400,7 +1400,7 @@ func TestSyncConfigContext(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { time.Sleep(time.Second * 100) - cfg := &config.StoreConfig{} + cfg := &sc.StoreConfig{} b, err := json.Marshal(cfg) if err != nil { res.WriteHeader(http.StatusInternalServerError) @@ -1458,7 +1458,7 @@ func TestStoreConfigSync(t *testing.T) { re.Empty(opt.GetStoreConfig()) err = opt.Reload(tc.GetStorage()) re.NoError(err) - re.Equal(tc.GetPersistOptions().GetStoreConfig(), opt.GetStoreConfig()) + re.Equal(tc.GetOpts().(*config.PersistOptions).GetStoreConfig(), opt.GetStoreConfig()) } func TestUpdateStorePendingPeerCount(t *testing.T) { @@ -3089,7 +3089,7 @@ func TestPersistScheduler(t *testing.T) { re.NoError(controller.RemoveScheduler(schedulers.BalanceWitnessName)) re.NoError(controller.RemoveScheduler(schedulers.TransferWitnessLeaderName)) re.Len(controller.GetSchedulerNames(), defaultCount-3) - re.NoError(co.GetCluster().GetPersistOptions().Persist(storage)) + re.NoError(co.GetCluster().GetSchedulerConfig().Persist(storage)) co.Stop() co.GetSchedulersController().Wait() co.GetWaitGroup().Wait() @@ -3142,12 +3142,12 @@ func TestPersistScheduler(t *testing.T) { // the scheduler option should contain 6 items // the `hot scheduler` are disabled - re.Len(co.GetCluster().GetPersistOptions().GetSchedulers(), defaultCount+3) + re.Len(co.GetCluster().GetSchedulerConfig().(*config.PersistOptions).GetSchedulers(), defaultCount+3) re.NoError(controller.RemoveScheduler(schedulers.GrantLeaderName)) // the scheduler that is not enable by default will be completely deleted - re.Len(co.GetCluster().GetPersistOptions().GetSchedulers(), defaultCount+2) + re.Len(co.GetCluster().GetSchedulerConfig().(*config.PersistOptions).GetSchedulers(), defaultCount+2) re.Len(controller.GetSchedulerNames(), 4) - re.NoError(co.GetCluster().GetPersistOptions().Persist(co.GetCluster().GetStorage())) + re.NoError(co.GetCluster().GetSchedulerConfig().Persist(co.GetCluster().GetStorage())) co.Stop() co.GetSchedulersController().Wait() co.GetWaitGroup().Wait() @@ -3204,7 +3204,7 @@ func TestRemoveScheduler(t *testing.T) { re.NoError(err) re.Empty(sches) re.Empty(controller.GetSchedulerNames()) - re.NoError(co.GetCluster().GetPersistOptions().Persist(co.GetCluster().GetStorage())) + re.NoError(co.GetCluster().GetSchedulerConfig().Persist(co.GetCluster().GetStorage())) co.Stop() co.GetSchedulersController().Wait() co.GetWaitGroup().Wait() @@ -3218,7 +3218,7 @@ func TestRemoveScheduler(t *testing.T) { co.Run() re.Empty(controller.GetSchedulerNames()) // the option remains default scheduler - re.Len(co.GetCluster().GetPersistOptions().GetSchedulers(), defaultCount) + re.Len(co.GetCluster().GetSchedulerConfig().(*config.PersistOptions).GetSchedulers(), defaultCount) co.Stop() co.GetSchedulersController().Wait() co.GetWaitGroup().Wait() diff --git a/server/cluster/store_limiter.go b/server/cluster/store_limiter.go index 4e77590ddc5..b40bcef2eed 100644 --- a/server/cluster/store_limiter.go +++ b/server/cluster/store_limiter.go @@ -16,21 +16,21 @@ package cluster import ( "github.com/tikv/pd/pkg/core/storelimit" + sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/utils/syncutil" - "github.com/tikv/pd/server/config" ) // StoreLimiter adjust the store limit dynamically type StoreLimiter struct { m syncutil.RWMutex - opt *config.PersistOptions + opt sc.ConfProvider scene map[storelimit.Type]*storelimit.Scene state *State current LoadState } // NewStoreLimiter builds a store limiter object using the operator controller -func NewStoreLimiter(opt *config.PersistOptions) *StoreLimiter { +func NewStoreLimiter(opt sc.ConfProvider) *StoreLimiter { defaultScene := map[storelimit.Type]*storelimit.Scene{ storelimit.AddPeer: storelimit.DefaultScene(storelimit.AddPeer), storelimit.RemovePeer: storelimit.DefaultScene(storelimit.RemovePeer), diff --git a/server/config/persist_options.go b/server/config/persist_options.go index caee986f5d6..3b248843e73 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -67,7 +67,7 @@ func NewPersistOptions(cfg *Config) *PersistOptions { o.keyspace.Store(&cfg.Keyspace) // storeConfig will be fetched from TiKV later, // set it to an empty config here first. - o.storeConfig.Store(&StoreConfig{}) + o.storeConfig.Store(&sc.StoreConfig{}) o.SetClusterVersion(&cfg.ClusterVersion) o.ttl = nil return o @@ -134,12 +134,12 @@ func (o *PersistOptions) SetKeyspaceConfig(cfg *KeyspaceConfig) { } // GetStoreConfig returns the store config. -func (o *PersistOptions) GetStoreConfig() *StoreConfig { - return o.storeConfig.Load().(*StoreConfig) +func (o *PersistOptions) GetStoreConfig() *sc.StoreConfig { + return o.storeConfig.Load().(*sc.StoreConfig) } // SetStoreConfig sets the store configuration. -func (o *PersistOptions) SetStoreConfig(cfg *StoreConfig) { +func (o *PersistOptions) SetStoreConfig(cfg *sc.StoreConfig) { o.storeConfig.Store(cfg) } @@ -616,16 +616,16 @@ func (o *PersistOptions) IsRemoveExtraReplicaEnabled() bool { return o.GetScheduleConfig().EnableRemoveExtraReplica } -// IsTikvRegionSplitEnabled returns whether tikv split region is disabled. -func (o *PersistOptions) IsTikvRegionSplitEnabled() bool { - return o.getTTLBoolOr(enableTiKVSplitRegion, o.GetScheduleConfig().EnableTiKVSplitRegion) -} - // IsLocationReplacementEnabled returns if location replace is enabled. func (o *PersistOptions) IsLocationReplacementEnabled() bool { return o.getTTLBoolOr(enableLocationReplacement, o.GetScheduleConfig().EnableLocationReplacement) } +// IsTikvRegionSplitEnabled returns whether tikv split region is disabled. +func (o *PersistOptions) IsTikvRegionSplitEnabled() bool { + return o.getTTLBoolOr(enableTiKVSplitRegion, o.GetScheduleConfig().EnableTiKVSplitRegion) +} + // GetMaxMovableHotPeerSize returns the max movable hot peer size. func (o *PersistOptions) GetMaxMovableHotPeerSize() int64 { return o.GetScheduleConfig().MaxMovableHotPeerSize @@ -762,7 +762,7 @@ func (o *PersistOptions) DeleteLabelProperty(typ, labelKey, labelValue string) { type persistedConfig struct { *Config // StoreConfig is injected into Config to avoid breaking the original API. - StoreConfig StoreConfig `json:"store"` + StoreConfig sc.StoreConfig `json:"store"` } // Persist saves the configuration to the storage. diff --git a/tests/integrations/mcs/scheduling/config_test.go b/tests/integrations/mcs/scheduling/config_test.go index 17cb422f0ae..e2f2eeacfd1 100644 --- a/tests/integrations/mcs/scheduling/config_test.go +++ b/tests/integrations/mcs/scheduling/config_test.go @@ -25,7 +25,6 @@ import ( "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/versioninfo" - severcfg "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" ) @@ -93,12 +92,12 @@ func (suite *configTestSuite) TestConfigWatch() { testutil.Eventually(re, func() bool { return watcher.GetScheduleConfig().SplitMergeInterval.Duration == 2*sc.DefaultSplitMergeInterval }) - persistOpts.SetStoreConfig(&severcfg.StoreConfig{ - Coprocessor: severcfg.Coprocessor{ + persistOpts.SetStoreConfig(&sc.StoreConfig{ + Coprocessor: sc.Coprocessor{ RegionMaxSize: "144MiB", }, - Storage: severcfg.Storage{ - Engine: severcfg.RaftstoreV2, + Storage: sc.Storage{ + Engine: sc.RaftstoreV2, }, }) persistConfig(re, suite.pdLeaderServer)