Skip to content

Commit

Permalink
introduce the cluster informer for decoupling the dependencies
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed May 19, 2023
1 parent f4241b0 commit 5054d41
Show file tree
Hide file tree
Showing 45 changed files with 268 additions and 240 deletions.
6 changes: 3 additions & 3 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/scheduling"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
Expand Down Expand Up @@ -71,7 +71,7 @@ type Manager struct {
// store is the storage for keyspace related information.
store endpoint.KeyspaceStorage
// rc is the raft cluster of the server.
cluster schedule.Cluster
cluster scheduling.ClusterInformer
// ctx is the context of the manager, to be used in transaction.
ctx context.Context
// config is the configurations of the manager.
Expand All @@ -98,7 +98,7 @@ type CreateKeyspaceRequest struct {
func NewKeyspaceManager(
ctx context.Context,
store endpoint.KeyspaceStorage,
cluster schedule.Cluster,
cluster scheduling.ClusterInformer,
idAllocator id.Allocator,
config Config,
kgm *GroupManager,
Expand Down
11 changes: 9 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (

// Cluster is used to mock a cluster for test purpose.
type Cluster struct {
ctx context.Context
*core.BasicCluster
*mockid.IDAllocator
*placement.RuleManager
Expand All @@ -57,20 +58,21 @@ type Cluster struct {
suspectRegions map[uint64]struct{}
*config.StoreConfigManager
*buckets.HotBucketCache
ctx context.Context
storage.Storage
}

// NewCluster creates a new Cluster
func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
clus := &Cluster{
ctx: ctx,
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
HotStat: statistics.NewHotStat(ctx),
HotBucketCache: buckets.NewBucketsCache(ctx),
PersistOptions: opts,
suspectRegions: map[uint64]struct{}{},
StoreConfigManager: config.NewTestStoreConfigManager(nil),
ctx: ctx,
Storage: storage.NewStorageWithMemoryBackend(),
}
if clus.PersistOptions.GetReplicationConfig().EnablePlacementRules {
clus.initRuleManager()
Expand All @@ -96,6 +98,11 @@ func (mc *Cluster) GetAllocator() id.Allocator {
return mc.IDAllocator
}

// GetStorage returns the storage.
func (mc *Cluster) GetStorage() storage.Storage {
return mc.Storage
}

// ScanRegions scans region with start key, until number greater than limit.
func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo {
return mc.ScanRange(startKey, endKey, limit)
Expand Down
5 changes: 3 additions & 2 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/scheduling"
"github.com/tikv/pd/pkg/utils/keyutil"
)

Expand All @@ -37,7 +38,7 @@ var denyCheckersByLabelerCounter = schedule.LabelerEventCounter.WithLabelValues(

// Controller is used to manage all checkers.
type Controller struct {
cluster schedule.Cluster
cluster scheduling.ClusterInformer
conf config.Config
opController *schedule.OperatorController
learnerChecker *LearnerChecker
Expand All @@ -53,7 +54,7 @@ type Controller struct {
}

// NewController create a new Controller.
func NewController(ctx context.Context, cluster schedule.Cluster, conf config.Config, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *schedule.OperatorController) *Controller {
func NewController(ctx context.Context, cluster scheduling.ClusterInformer, conf config.Config, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *schedule.OperatorController) *Controller {
regionWaitingList := cache.NewDefaultCache(DefaultCacheSize)
return &Controller{
cluster: cluster,
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/joint_state_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/scheduling"
)

// JointStateChecker ensures region is in joint state will leave.
type JointStateChecker struct {
PauseController
cluster schedule.Cluster
cluster scheduling.ClusterInformer
}

const jointStateCheckerName = "joint_state_checker"
Expand All @@ -41,7 +41,7 @@ var (
)

// NewJointStateChecker creates a joint state checker.
func NewJointStateChecker(cluster schedule.Cluster) *JointStateChecker {
func NewJointStateChecker(cluster scheduling.ClusterInformer) *JointStateChecker {
return &JointStateChecker{
cluster: cluster,
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/learner_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/scheduling"
)

// LearnerChecker ensures region has a learner will be promoted.
type LearnerChecker struct {
PauseController
cluster schedule.Cluster
cluster scheduling.ClusterInformer
}

var (
Expand All @@ -34,7 +34,7 @@ var (
)

// NewLearnerChecker creates a learner checker.
func NewLearnerChecker(cluster schedule.Cluster) *LearnerChecker {
func NewLearnerChecker(cluster scheduling.ClusterInformer) *LearnerChecker {
return &LearnerChecker{
cluster: cluster,
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/scheduling"
"github.com/tikv/pd/pkg/utils/logutil"
)

Expand Down Expand Up @@ -76,14 +76,14 @@ var (
// MergeChecker ensures region to merge with adjacent region when size is small
type MergeChecker struct {
PauseController
cluster schedule.Cluster
cluster scheduling.ClusterInformer
conf config.Config
splitCache *cache.TTLUint64
startTime time.Time // it's used to judge whether server recently start.
}

// NewMergeChecker creates a merge checker.
func NewMergeChecker(ctx context.Context, cluster schedule.Cluster, conf config.Config) *MergeChecker {
func NewMergeChecker(ctx context.Context, cluster scheduling.ClusterInformer, conf config.Config) *MergeChecker {
splitCache := cache.NewIDTTL(ctx, time.Minute, conf.GetSplitMergeInterval())
return &MergeChecker{
cluster: cluster,
Expand Down Expand Up @@ -250,7 +250,7 @@ func (m *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool {
}

// AllowMerge returns true if two regions can be merged according to the key type.
func AllowMerge(cluster schedule.Cluster, region, adjacent *core.RegionInfo) bool {
func AllowMerge(cluster scheduling.ClusterInformer, region, adjacent *core.RegionInfo) bool {
var start, end []byte
if bytes.Equal(region.GetEndKey(), adjacent.GetStartKey()) && len(region.GetEndKey()) != 0 {
start, end = region.GetStartKey(), adjacent.GetEndKey()
Expand Down Expand Up @@ -306,7 +306,7 @@ func isTableIDSame(region, adjacent *core.RegionInfo) bool {
// Check whether there is a peer of the adjacent region on an offline store,
// while the source region has no peer on it. This is to prevent from bringing
// any other peer into an offline store to slow down the offline process.
func checkPeerStore(cluster schedule.Cluster, region, adjacent *core.RegionInfo) bool {
func checkPeerStore(cluster scheduling.ClusterInformer, region, adjacent *core.RegionInfo) bool {
regionStoreIDs := region.GetStoreIDs()
for _, peer := range adjacent.GetPeers() {
storeID := peer.GetStoreId()
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/priority_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@ import (

"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/scheduling"
)

// the default value of priority queue size
const defaultPriorityQueueSize = 1280

// PriorityInspector ensures high priority region should run first
type PriorityInspector struct {
cluster schedule.Cluster
cluster scheduling.ClusterInformer
conf config.Config
queue *cache.PriorityQueue
}

// NewPriorityInspector creates a priority inspector.
func NewPriorityInspector(cluster schedule.Cluster, conf config.Config) *PriorityInspector {
func NewPriorityInspector(cluster scheduling.ClusterInformer, conf config.Config) *PriorityInspector {
return &PriorityInspector{
cluster: cluster,
conf: conf,
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/scheduling"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -61,13 +61,13 @@ var (
// Location management, mainly used for cross data center deployment.
type ReplicaChecker struct {
PauseController
cluster schedule.Cluster
cluster scheduling.ClusterInformer
conf config.Config
regionWaitingList cache.Cache
}

// NewReplicaChecker creates a replica checker.
func NewReplicaChecker(cluster schedule.Cluster, conf config.Config, regionWaitingList cache.Cache) *ReplicaChecker {
func NewReplicaChecker(cluster scheduling.ClusterInformer, conf config.Config, regionWaitingList cache.Cache) *ReplicaChecker {
return &ReplicaChecker{
cluster: cluster,
conf: conf,
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/replica_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/scheduling"
"go.uber.org/zap"
)

// ReplicaStrategy collects some utilities to manipulate region peers. It
// exists to allow replica_checker and rule_checker to reuse common logics.
type ReplicaStrategy struct {
checkerName string // replica-checker / rule-checker
cluster schedule.Cluster
cluster scheduling.ClusterInformer
locationLabels []string
isolationLevel string
region *core.RegionInfo
Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/scheduling"
"github.com/tikv/pd/pkg/versioninfo"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -81,7 +81,7 @@ var (
// RuleChecker fix/improve region by placement rules.
type RuleChecker struct {
PauseController
cluster schedule.Cluster
cluster scheduling.ClusterInformer
ruleManager *placement.RuleManager
name string
regionWaitingList cache.Cache
Expand All @@ -91,7 +91,7 @@ type RuleChecker struct {
}

// NewRuleChecker creates a checker instance.
func NewRuleChecker(ctx context.Context, cluster schedule.Cluster, ruleManager *placement.RuleManager, regionWaitingList cache.Cache) *RuleChecker {
func NewRuleChecker(ctx context.Context, cluster scheduling.ClusterInformer, ruleManager *placement.RuleManager, regionWaitingList cache.Cache) *RuleChecker {
return &RuleChecker{
cluster: cluster,
ruleManager: ruleManager,
Expand Down Expand Up @@ -572,7 +572,7 @@ func (o *recorder) incOfflineLeaderCount(storeID uint64) {
// Offline is triggered manually and only appears when the node makes some adjustments. here is an operator timeout / 2.
var offlineCounterTTL = 5 * time.Minute

func (o *recorder) refresh(cluster schedule.Cluster) {
func (o *recorder) refresh(cluster scheduling.ClusterInformer) {
// re-count the offlineLeaderCounter if the store is already tombstone or store is gone.
if len(o.offlineLeaderCounter) > 0 && time.Since(o.lastUpdateTime) > offlineCounterTTL {
needClean := false
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/split_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/scheduling"
)

// SplitChecker splits regions when the key range spans across rule/label boundary.
type SplitChecker struct {
PauseController
cluster schedule.Cluster
cluster scheduling.ClusterInformer
ruleManager *placement.RuleManager
labeler *labeler.RegionLabeler
}
Expand All @@ -42,7 +42,7 @@ var (
)

// NewSplitChecker creates a new SplitChecker.
func NewSplitChecker(cluster schedule.Cluster, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler) *SplitChecker {
func NewSplitChecker(cluster scheduling.ClusterInformer, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler) *SplitChecker {
return &SplitChecker{
cluster: cluster,
ruleManager: ruleManager,
Expand Down
22 changes: 4 additions & 18 deletions pkg/schedule/filter/healthy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ package filter

import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/scheduling"
)

// IsRegionHealthy checks if a region is healthy for scheduling. It requires the
Expand All @@ -43,30 +42,17 @@ func hasDownPeers(region *core.RegionInfo) bool {
// IsRegionReplicated checks if a region is fully replicated. When placement
// rules is enabled, its peers should fit corresponding rules. When placement
// rules is disabled, it should have enough replicas and no any learner peer.
func IsRegionReplicated(cluster regionHealthCluster, region *core.RegionInfo) bool {
func IsRegionReplicated(cluster scheduling.RegionHealthCluster, region *core.RegionInfo) bool {
if cluster.GetOpts().IsPlacementRulesEnabled() {
return isRegionPlacementRuleSatisfied(cluster, region)
}
return isRegionReplicasSatisfied(cluster, region)
}

func isRegionPlacementRuleSatisfied(cluster regionHealthCluster, region *core.RegionInfo) bool {
func isRegionPlacementRuleSatisfied(cluster scheduling.RegionHealthCluster, region *core.RegionInfo) bool {
return cluster.GetRuleManager().FitRegion(cluster, region).IsSatisfied()
}

func isRegionReplicasSatisfied(cluster regionHealthCluster, region *core.RegionInfo) bool {
func isRegionReplicasSatisfied(cluster scheduling.RegionHealthCluster, region *core.RegionInfo) bool {
return len(region.GetLearners()) == 0 && len(region.GetPeers()) == cluster.GetOpts().GetMaxReplicas()
}

// ReplicatedRegion returns a function that checks if a region is fully replicated.
func ReplicatedRegion(cluster regionHealthCluster) func(*core.RegionInfo) bool {
return func(region *core.RegionInfo) bool { return IsRegionReplicated(cluster, region) }
}

// cluster provides an overview of a cluster's regions distribution.
type regionHealthCluster interface {
core.StoreSetInformer
core.RegionSetInformer
GetOpts() config.Config
GetRuleManager() *placement.RuleManager
}
Loading

0 comments on commit 5054d41

Please sign in to comment.