diff --git a/pkg/core/region.go b/pkg/core/region.go index 244fef836f8..9768a258889 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -45,7 +45,8 @@ import ( const ( randomRegionMaxRetry = 10 scanRegionLimit = 1000 - CollectFactor = 0.9 + // CollectFactor is the factor to collect the count of region. + CollectFactor = 0.9 ) // errRegionIsStale is error info for region is stale. @@ -721,7 +722,7 @@ func (r *RegionInfo) isRegionRecreated() bool { return r.GetRegionEpoch().GetVersion() == 1 && r.GetRegionEpoch().GetConfVer() == 1 && (len(r.GetStartKey()) != 0 || len(r.GetEndKey()) != 0) } -func (r *RegionInfo) Contains(key []byte) bool { +func (r *RegionInfo) contain(key []byte) bool { start, end := r.GetStartKey(), r.GetEndKey() return bytes.Compare(key, start) >= 0 && (len(end) == 0 || bytes.Compare(key, end) < 0) } diff --git a/pkg/core/region_tree.go b/pkg/core/region_tree.go index 0be207d515d..12e2c5c8878 100644 --- a/pkg/core/region_tree.go +++ b/pkg/core/region_tree.go @@ -261,7 +261,7 @@ func (t *regionTree) find(item *regionItem) *regionItem { return false }) - if result == nil || !result.Contains(item.GetStartKey()) { + if result == nil || !result.contain(item.GetStartKey()) { return nil } @@ -370,7 +370,7 @@ func (t *regionTree) RandomRegions(n int, ranges []KeyRange) []*RegionInfo { // we need to check if the previous item contains the key. if startIndex != 0 && startItem == nil { region = t.tree.GetAt(startIndex - 1).RegionInfo - if region.Contains(startKey) { + if region.contain(startKey) { startIndex-- } } diff --git a/pkg/core/region_tree_test.go b/pkg/core/region_tree_test.go index 2726b4fdab5..a2b1bfab7a7 100644 --- a/pkg/core/region_tree_test.go +++ b/pkg/core/region_tree_test.go @@ -102,15 +102,15 @@ func TestRegionItem(t *testing.T) { re.False(item.Less(newRegionItem([]byte("b"), []byte{}))) re.True(item.Less(newRegionItem([]byte("c"), []byte{}))) - re.False(item.Contains([]byte("a"))) - re.True(item.Contains([]byte("b"))) - re.True(item.Contains([]byte("c"))) + re.False(item.contain([]byte("a"))) + re.True(item.contain([]byte("b"))) + re.True(item.contain([]byte("c"))) item = newRegionItem([]byte("b"), []byte("d")) - re.False(item.Contains([]byte("a"))) - re.True(item.Contains([]byte("b"))) - re.True(item.Contains([]byte("c"))) - re.False(item.Contains([]byte("d"))) + re.False(item.contain([]byte("a"))) + re.True(item.contain([]byte("b"))) + re.True(item.contain([]byte("c"))) + re.False(item.contain([]byte("d"))) } func newRegionWithStat(start, end string, size, keys int64) *RegionInfo { diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index 1459ccd3bac..d068aa5c058 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -51,6 +51,7 @@ var SetUpRestHandler = func(*Service) (http.Handler, apiutil.APIServiceGroup) { type dummyRestService struct{} +// ServeHTTP implements the http.Handler interface. func (dummyRestService) ServeHTTP(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusNotImplemented) w.Write([]byte("not implemented")) @@ -83,6 +84,7 @@ type heartbeatServer struct { closed int32 } +// Send implements the HeartbeatStream interface. func (s *heartbeatServer) Send(m core.RegionHeartbeatResponse) error { if atomic.LoadInt32(&s.closed) == 1 { return io.EOF @@ -106,7 +108,7 @@ func (s *heartbeatServer) Send(m core.RegionHeartbeatResponse) error { } } -func (s *heartbeatServer) Recv() (*schedulingpb.RegionHeartbeatRequest, error) { +func (s *heartbeatServer) recv() (*schedulingpb.RegionHeartbeatRequest, error) { if atomic.LoadInt32(&s.closed) == 1 { return nil, io.EOF } @@ -133,7 +135,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat }() for { - request, err := server.Recv() + request, err := server.recv() if err == io.EOF { return nil } diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 4b1b51f1768..1d65ff6a568 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -65,6 +65,7 @@ type taskID struct { name string } +// ConcurrentRunner is a task runner that limits the number of concurrent tasks. type ConcurrentRunner struct { ctx context.Context cancel context.CancelFunc diff --git a/pkg/schedule/filter/filters.go b/pkg/schedule/filter/filters.go index 6c5dd748d17..e2846e6c9a6 100644 --- a/pkg/schedule/filter/filters.go +++ b/pkg/schedule/filter/filters.go @@ -65,7 +65,7 @@ func SelectUnavailableTargetStores(stores []*core.StoreInfo, filters []Filter, c cfilter, ok := filters[i].(comparingFilter) sourceID := uint64(0) if ok { - sourceID = cfilter.GetSourceStoreID() + sourceID = cfilter.getSourceStoreID() } if counter != nil { counter.inc(target, filters[i].Type(), sourceID, s.GetID()) @@ -99,7 +99,7 @@ func SelectTargetStores(stores []*core.StoreInfo, filters []Filter, conf config. cfilter, ok := filter.(comparingFilter) sourceID := uint64(0) if ok { - sourceID = cfilter.GetSourceStoreID() + sourceID = cfilter.getSourceStoreID() } if counter != nil { counter.inc(target, filter.Type(), sourceID, s.GetID()) @@ -141,8 +141,8 @@ type Filter interface { // comparingFilter is an interface to filter target store by comparing source and target stores type comparingFilter interface { Filter - // GetSourceStoreID returns the source store when comparing. - GetSourceStoreID() uint64 + // getSourceStoreID returns the source store when comparing. + getSourceStoreID() uint64 } // Target checks if store can pass all Filters as target store. @@ -156,7 +156,7 @@ func Target(conf config.SharedConfigProvider, store *core.StoreInfo, filters []F targetID := storeID sourceID := "" if ok { - sourceID = strconv.FormatUint(cfilter.GetSourceStoreID(), 10) + sourceID = strconv.FormatUint(cfilter.getSourceStoreID(), 10) } filterCounter.WithLabelValues(target.String(), filter.Scope(), filter.Type().String(), sourceID, targetID).Inc() } @@ -319,8 +319,8 @@ func (f *distinctScoreFilter) Target(_ config.SharedConfigProvider, store *core. return statusStoreNotMatchIsolation } -// GetSourceStoreID implements the ComparingFilter -func (f *distinctScoreFilter) GetSourceStoreID() uint64 { +// getSourceStoreID implements the ComparingFilter +func (f *distinctScoreFilter) getSourceStoreID() uint64 { return f.srcStore } @@ -669,8 +669,8 @@ func (f *ruleFitFilter) Target(_ config.SharedConfigProvider, store *core.StoreI return statusStoreNotMatchRule } -// GetSourceStoreID implements the ComparingFilter -func (f *ruleFitFilter) GetSourceStoreID() uint64 { +// getSourceStoreID implements the ComparingFilter +func (f *ruleFitFilter) getSourceStoreID() uint64 { return f.srcStore } @@ -730,7 +730,7 @@ func (f *ruleLeaderFitFilter) Target(_ config.SharedConfigProvider, store *core. return statusStoreNotMatchRule } -func (f *ruleLeaderFitFilter) GetSourceStoreID() uint64 { +func (f *ruleLeaderFitFilter) getSourceStoreID() uint64 { return f.srcLeaderStoreID } diff --git a/pkg/schedule/filter/region_filters.go b/pkg/schedule/filter/region_filters.go index e233ec75973..dca15dbf8ed 100644 --- a/pkg/schedule/filter/region_filters.go +++ b/pkg/schedule/filter/region_filters.go @@ -142,6 +142,7 @@ func NewRegionEmptyFilter(cluster sche.SharedCluster) RegionFilter { return ®ionEmptyFilter{cluster: cluster} } +// Select implements the RegionFilter interface. func (f *regionEmptyFilter) Select(region *core.RegionInfo) *plan.Status { if !isEmptyRegionAllowBalance(f.cluster, region) { return statusRegionEmpty @@ -163,6 +164,7 @@ func NewRegionWitnessFilter(storeID uint64) RegionFilter { return ®ionWitnessFilter{storeID: storeID} } +// Select implements the RegionFilter interface. func (f *regionWitnessFilter) Select(region *core.RegionInfo) *plan.Status { if region.GetStoreWitness(f.storeID) != nil { return statusRegionWitnessPeer diff --git a/pkg/schedule/operator/builder.go b/pkg/schedule/operator/builder.go index e28e7de973a..29b8aedf978 100644 --- a/pkg/schedule/operator/builder.go +++ b/pkg/schedule/operator/builder.go @@ -117,15 +117,15 @@ func NewBuilder(desc string, ci sche.SharedCluster, region *core.RegionInfo, opt err = errors.Errorf("cannot build operator for region with nil peer") break } - originPeers.Set(p) + originPeers.set(p) } for _, p := range region.GetPendingPeers() { - unhealthyPeers.Set(p) + unhealthyPeers.set(p) } for _, p := range region.GetDownPeers() { - unhealthyPeers.Set(p.Peer) + unhealthyPeers.set(p.Peer) } // origin leader @@ -158,7 +158,7 @@ func NewBuilder(desc string, ci sche.SharedCluster, region *core.RegionInfo, opt b.originPeers = originPeers b.unhealthyPeers = unhealthyPeers b.originLeaderStoreID = originLeaderStoreID - b.targetPeers = originPeers.Copy() + b.targetPeers = originPeers.copy() b.useJointConsensus = supportConfChangeV2 && b.GetSharedConfig().IsUseJointConsensus() b.err = err return b @@ -177,7 +177,7 @@ func (b *Builder) AddPeer(peer *metapb.Peer) *Builder { } else if old, ok := b.targetPeers[peer.GetStoreId()]; ok { b.err = errors.Errorf("cannot add peer %s: already have peer %s", peer, old) } else { - b.targetPeers.Set(peer) + b.targetPeers.set(peer) } return b } @@ -209,7 +209,7 @@ func (b *Builder) PromoteLearner(storeID uint64) *Builder { } else if _, ok := b.unhealthyPeers[storeID]; ok { b.err = errors.Errorf("cannot promote peer %d: unhealthy", storeID) } else { - b.targetPeers.Set(&metapb.Peer{ + b.targetPeers.set(&metapb.Peer{ Id: peer.GetId(), StoreId: peer.GetStoreId(), Role: metapb.PeerRole_Voter, @@ -229,7 +229,7 @@ func (b *Builder) DemoteVoter(storeID uint64) *Builder { } else if core.IsLearner(peer) { b.err = errors.Errorf("cannot demote voter %d: is already learner", storeID) } else { - b.targetPeers.Set(&metapb.Peer{ + b.targetPeers.set(&metapb.Peer{ Id: peer.GetId(), StoreId: peer.GetStoreId(), Role: metapb.PeerRole_Learner, @@ -249,7 +249,7 @@ func (b *Builder) BecomeWitness(storeID uint64) *Builder { } else if core.IsWitness(peer) { b.err = errors.Errorf("cannot switch peer to witness %d: is already witness", storeID) } else { - b.targetPeers.Set(&metapb.Peer{ + b.targetPeers.set(&metapb.Peer{ Id: peer.GetId(), StoreId: peer.GetStoreId(), Role: peer.GetRole(), @@ -269,7 +269,7 @@ func (b *Builder) BecomeNonWitness(storeID uint64) *Builder { } else if !core.IsWitness(peer) { b.err = errors.Errorf("cannot switch peer to non-witness %d: is already non-witness", storeID) } else { - b.targetPeers.Set(&metapb.Peer{ + b.targetPeers.set(&metapb.Peer{ Id: peer.GetId(), StoreId: peer.GetStoreId(), Role: peer.GetRole(), @@ -335,7 +335,7 @@ func (b *Builder) SetPeers(peers map[uint64]*metapb.Peer) *Builder { b.targetLeaderStoreID = 0 } - b.targetPeers = peersMap(peers).Copy() + b.targetPeers = peersMap(peers).copy() return b } @@ -439,7 +439,7 @@ func (b *Builder) prepareBuild() (string, error) { for _, o := range b.originPeers { n := b.targetPeers[o.GetStoreId()] if n == nil { - b.toRemove.Set(o) + b.toRemove.set(o) continue } @@ -461,25 +461,25 @@ func (b *Builder) prepareBuild() (string, error) { if !core.IsLearner(n) { n.Role = metapb.PeerRole_Learner n.IsWitness = true - b.toPromoteNonWitness.Set(n) + b.toPromoteNonWitness.set(n) } - b.toNonWitness.Set(n) + b.toNonWitness.set(n) } else if !isOriginPeerWitness && isTargetPeerWitness { - b.toWitness.Set(n) + b.toWitness.set(n) } isOriginPeerLearner := core.IsLearner(o) isTargetPeerLearner := core.IsLearner(n) if isOriginPeerLearner && !isTargetPeerLearner { // learner -> voter - b.toPromote.Set(n) + b.toPromote.set(n) } else if !isOriginPeerLearner && isTargetPeerLearner { // voter -> learner if b.useJointConsensus { - b.toDemote.Set(n) + b.toDemote.set(n) } else { - b.toRemove.Set(o) - // the targetPeers loop below will add `b.toAdd.Set(n)` + b.toRemove.set(o) + // the targetPeers loop below will add `b.toAdd.set(n)` } } } @@ -500,8 +500,8 @@ func (b *Builder) prepareBuild() (string, error) { IsWitness: n.GetIsWitness(), } } - // It is a pair with `b.toRemove.Set(o)` when `o != nil`. - b.toAdd.Set(n) + // It is a pair with `b.toRemove.set(o)` when `o != nil`. + b.toAdd.set(n) } } @@ -510,7 +510,7 @@ func (b *Builder) prepareBuild() (string, error) { b.targetLeaderStoreID = 0 } - b.currentPeers, b.currentLeaderStoreID = b.originPeers.Copy(), b.originLeaderStoreID + b.currentPeers, b.currentLeaderStoreID = b.originPeers.copy(), b.originLeaderStoreID if b.targetLeaderStoreID != 0 { targetLeader := b.targetPeers[b.targetLeaderStoreID] @@ -580,7 +580,7 @@ func (b *Builder) buildStepsWithJointConsensus(kind OpKind) (OpKind, error) { Role: metapb.PeerRole_Learner, IsWitness: peer.GetIsWitness(), }) - b.toPromote.Set(peer) + b.toPromote.set(peer) } else { b.execAddPeer(peer) } @@ -596,7 +596,7 @@ func (b *Builder) buildStepsWithJointConsensus(kind OpKind) (OpKind, error) { for _, remove := range b.toRemove.IDs() { peer := b.toRemove[remove] if !core.IsLearner(peer) { - b.toDemote.Set(&metapb.Peer{ + b.toDemote.set(&metapb.Peer{ Id: peer.GetId(), StoreId: peer.GetStoreId(), Role: metapb.PeerRole_Learner, @@ -637,7 +637,7 @@ func (b *Builder) buildStepsWithJointConsensus(kind OpKind) (OpKind, error) { for _, promote := range b.toPromoteNonWitness.IDs() { peer := b.toPromoteNonWitness[promote] peer.IsWitness = false - b.toPromote.Set(peer) + b.toPromote.set(peer) kind |= OpRegion } b.toPromoteNonWitness = newPeersMap() @@ -771,13 +771,13 @@ func (b *Builder) execTransferLeader(targetStoreID uint64, targetStoreIDs []uint func (b *Builder) execPromoteLearner(peer *metapb.Peer) { b.steps = append(b.steps, PromoteLearner{ToStore: peer.GetStoreId(), PeerID: peer.GetId(), IsWitness: peer.GetIsWitness()}) - b.currentPeers.Set(peer) + b.currentPeers.set(peer) delete(b.toPromote, peer.GetStoreId()) } func (b *Builder) execPromoteNonWitness(peer *metapb.Peer) { b.steps = append(b.steps, PromoteLearner{ToStore: peer.GetStoreId(), PeerID: peer.GetId(), IsWitness: false}) - b.currentPeers.Set(peer) + b.currentPeers.set(peer) delete(b.toPromoteNonWitness, peer.GetStoreId()) } @@ -786,7 +786,7 @@ func (b *Builder) execAddPeer(peer *metapb.Peer) { if !core.IsLearner(peer) { b.steps = append(b.steps, PromoteLearner{ToStore: peer.GetStoreId(), PeerID: peer.GetId(), IsWitness: peer.GetIsWitness()}) } - b.currentPeers.Set(peer) + b.currentPeers.set(peer) b.peerAddStep[peer.GetStoreId()] = len(b.steps) delete(b.toAdd, peer.GetStoreId()) } @@ -824,14 +824,14 @@ func (b *Builder) execChangePeerV2(needEnter bool, needTransferLeader bool) { for _, p := range b.toPromote.IDs() { peer := b.toPromote[p] step.PromoteLearners = append(step.PromoteLearners, PromoteLearner{ToStore: peer.GetStoreId(), PeerID: peer.GetId(), IsWitness: peer.GetIsWitness()}) - b.currentPeers.Set(peer) + b.currentPeers.set(peer) } b.toPromote = newPeersMap() for _, d := range b.toDemote.IDs() { peer := b.toDemote[d] step.DemoteVoters = append(step.DemoteVoters, DemoteVoter{ToStore: peer.GetStoreId(), PeerID: peer.GetId(), IsWitness: peer.GetIsWitness()}) - b.currentPeers.Set(peer) + b.currentPeers.set(peer) } b.toDemote = newPeersMap() @@ -1279,10 +1279,11 @@ func (pm peersMap) IDs() []uint64 { return ids } -func (pm peersMap) Set(peer *metapb.Peer) { +func (pm peersMap) set(peer *metapb.Peer) { pm[peer.GetStoreId()] = peer } +// String returns a brief description of the peersMap. func (pm peersMap) String() string { ids := make([]uint64, 0, len(pm)) for _, p := range pm { @@ -1291,10 +1292,10 @@ func (pm peersMap) String() string { return fmt.Sprintf("%v", ids) } -func (pm peersMap) Copy() peersMap { +func (pm peersMap) copy() peersMap { var pm2 peersMap = make(map[uint64]*metapb.Peer, len(pm)) for _, p := range pm { - pm2.Set(p) + pm2.set(p) } return pm2 } diff --git a/pkg/schedule/operator/create_operator.go b/pkg/schedule/operator/create_operator.go index 64680520933..4fae7f9e3f2 100644 --- a/pkg/schedule/operator/create_operator.go +++ b/pkg/schedule/operator/create_operator.go @@ -285,9 +285,9 @@ func CreateLeaveJointStateOperator(desc string, ci sche.SharedCluster, origin *c for _, o := range b.originPeers { switch o.GetRole() { case metapb.PeerRole_IncomingVoter: - b.toPromote.Set(o) + b.toPromote.set(o) case metapb.PeerRole_DemotingVoter: - b.toDemote.Set(o) + b.toDemote.set(o) } } @@ -298,7 +298,7 @@ func CreateLeaveJointStateOperator(desc string, ci sche.SharedCluster, origin *c b.targetLeaderStoreID = b.originLeaderStoreID } - b.currentPeers, b.currentLeaderStoreID = b.originPeers.Copy(), b.originLeaderStoreID + b.currentPeers, b.currentLeaderStoreID = b.originPeers.copy(), b.originLeaderStoreID b.peerAddStep = make(map[uint64]int) brief := b.brief() diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index fe93bd98756..e4da6ead0ef 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -235,10 +235,10 @@ func getNextPushOperatorTime(step OpStep, now time.Time) time.Time { // "next" is true to indicate that it may exist in next attempt, // and false is the end for the poll. func (oc *Controller) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) { - if oc.opNotifierQueue.Len() == 0 { + if oc.opNotifierQueue.len() == 0 { return nil, false } - item, _ := oc.opNotifierQueue.Pop() + item, _ := oc.opNotifierQueue.pop() regionID := item.op.RegionID() opi, ok := oc.operators.Load(regionID) if !ok || opi.(*Operator) == nil { @@ -265,13 +265,13 @@ func (oc *Controller) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) { } now := time.Now() if now.Before(item.time) { - oc.opNotifierQueue.Push(item) + oc.opNotifierQueue.push(item) return nil, false } // pushes with new notify time. item.time = getNextPushOperatorTime(step, now) - oc.opNotifierQueue.Push(item) + oc.opNotifierQueue.push(item) return r, true } @@ -561,7 +561,7 @@ func (oc *Controller) addOperatorInner(op *Operator) bool { } } - oc.opNotifierQueue.Push(&operatorWithTime{op: op, time: getNextPushOperatorTime(step, time.Now())}) + oc.opNotifierQueue.push(&operatorWithTime{op: op, time: getNextPushOperatorTime(step, time.Now())}) operatorCounter.WithLabelValues(op.Desc(), "create").Inc() for _, counter := range op.Counters { counter.Inc() @@ -753,7 +753,7 @@ func (oc *Controller) GetOperator(regionID uint64) *Operator { // GetOperators gets operators from the running operators. func (oc *Controller) GetOperators() []*Operator { - operators := make([]*Operator, 0, oc.opNotifierQueue.Len()) + operators := make([]*Operator, 0, oc.opNotifierQueue.len()) oc.operators.Range( func(_, value any) bool { operators = append(operators, value.(*Operator)) @@ -769,7 +769,7 @@ func (oc *Controller) GetWaitingOperators() []*Operator { // GetOperatorsOfKind returns the running operators of the kind. func (oc *Controller) GetOperatorsOfKind(mask OpKind) []*Operator { - operators := make([]*Operator, 0, oc.opNotifierQueue.Len()) + operators := make([]*Operator, 0, oc.opNotifierQueue.len()) oc.operators.Range( func(_, value any) bool { op := value.(*Operator) diff --git a/pkg/schedule/operator/operator_controller_test.go b/pkg/schedule/operator/operator_controller_test.go index 2b16516c4c7..3894df7e5e7 100644 --- a/pkg/schedule/operator/operator_controller_test.go +++ b/pkg/schedule/operator/operator_controller_test.go @@ -364,10 +364,10 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegion() { oc.SetOperator(op4) re.True(op2.Start()) oc.SetOperator(op2) - oc.opNotifierQueue.Push(&operatorWithTime{op: op1, time: time.Now().Add(100 * time.Millisecond)}) - oc.opNotifierQueue.Push(&operatorWithTime{op: op3, time: time.Now().Add(300 * time.Millisecond)}) - oc.opNotifierQueue.Push(&operatorWithTime{op: op4, time: time.Now().Add(499 * time.Millisecond)}) - oc.opNotifierQueue.Push(&operatorWithTime{op: op2, time: time.Now().Add(500 * time.Millisecond)}) + oc.opNotifierQueue.push(&operatorWithTime{op: op1, time: time.Now().Add(100 * time.Millisecond)}) + oc.opNotifierQueue.push(&operatorWithTime{op: op3, time: time.Now().Add(300 * time.Millisecond)}) + oc.opNotifierQueue.push(&operatorWithTime{op: op4, time: time.Now().Add(499 * time.Millisecond)}) + oc.opNotifierQueue.push(&operatorWithTime{op: op2, time: time.Now().Add(500 * time.Millisecond)}) } // first poll got nil r, next := oc.pollNeedDispatchRegion() @@ -447,7 +447,7 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() r, next = controller.pollNeedDispatchRegion() re.True(next) re.Nil(r) - re.Equal(1, controller.opNotifierQueue.Len()) + re.Equal(1, controller.opNotifierQueue.len()) re.Empty(controller.GetOperators()) re.Empty(controller.wop.ListOperator()) re.NotNil(controller.records.Get(101)) @@ -458,7 +458,7 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() r, next = controller.pollNeedDispatchRegion() re.True(next) re.Nil(r) - re.Equal(0, controller.opNotifierQueue.Len()) + re.Equal(0, controller.opNotifierQueue.len()) // Add the two ops to waiting operators again. source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 0} @@ -478,7 +478,7 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() r, next = controller.pollNeedDispatchRegion() re.True(next) re.Nil(r) - re.Equal(1, controller.opNotifierQueue.Len()) + re.Equal(1, controller.opNotifierQueue.len()) re.Empty(controller.GetOperators()) re.Empty(controller.wop.ListOperator()) re.NotNil(controller.records.Get(101)) @@ -488,7 +488,7 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() r, next = controller.pollNeedDispatchRegion() re.True(next) re.Nil(r) - re.Equal(0, controller.opNotifierQueue.Len()) + re.Equal(0, controller.opNotifierQueue.len()) } func (suite *operatorControllerTestSuite) TestCheckOperatorLightly() { diff --git a/pkg/schedule/operator/operator_queue.go b/pkg/schedule/operator/operator_queue.go index 8643717d5ad..51991ff7ab4 100644 --- a/pkg/schedule/operator/operator_queue.go +++ b/pkg/schedule/operator/operator_queue.go @@ -67,19 +67,19 @@ func newConcurrentHeapOpQueue() *concurrentHeapOpQueue { return &concurrentHeapOpQueue{heap: make(operatorQueue, 0)} } -func (ch *concurrentHeapOpQueue) Len() int { +func (ch *concurrentHeapOpQueue) len() int { ch.Lock() defer ch.Unlock() return len(ch.heap) } -func (ch *concurrentHeapOpQueue) Push(x *operatorWithTime) { +func (ch *concurrentHeapOpQueue) push(x *operatorWithTime) { ch.Lock() defer ch.Unlock() heap.Push(&ch.heap, x) } -func (ch *concurrentHeapOpQueue) Pop() (*operatorWithTime, bool) { +func (ch *concurrentHeapOpQueue) pop() (*operatorWithTime, bool) { ch.Lock() defer ch.Unlock() if len(ch.heap) == 0 { diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 6762c8751e4..f6c8dd5d1b6 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -64,7 +64,7 @@ type balanceLeaderSchedulerConfig struct { Batch int `json:"batch"` } -func (conf *balanceLeaderSchedulerConfig) Update(data []byte) (int, any) { +func (conf *balanceLeaderSchedulerConfig) update(data []byte) (int, any) { conf.Lock() defer conf.Unlock() @@ -146,19 +146,19 @@ func newBalanceLeaderHandler(conf *balanceLeaderSchedulerConfig) http.Handler { rd: render.New(render.Options{IndentJSON: true}), } router := mux.NewRouter() - router.HandleFunc("/config", handler.UpdateConfig).Methods(http.MethodPost) - router.HandleFunc("/list", handler.ListConfig).Methods(http.MethodGet) + router.HandleFunc("/config", handler.updateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", handler.listConfig).Methods(http.MethodGet) return router } -func (handler *balanceLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { +func (handler *balanceLeaderHandler) updateConfig(w http.ResponseWriter, r *http.Request) { data, _ := io.ReadAll(r.Body) r.Body.Close() - httpCode, v := handler.config.Update(data) + httpCode, v := handler.config.update(data) handler.rd.JSON(w, httpCode, v) } -func (handler *balanceLeaderHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { +func (handler *balanceLeaderHandler) listConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() handler.rd.JSON(w, http.StatusOK, conf) } @@ -348,7 +348,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun stores := cluster.GetStores() scoreFunc := func(store *core.StoreInfo) float64 { - return store.LeaderScore(solver.kind.Policy, solver.GetOpInfluence(store.GetID())) + return store.LeaderScore(solver.kind.Policy, solver.getOpInfluence(store.GetID())) } sourceCandidate := newCandidateStores(filter.SelectSourceStores(stores, l.filters, cluster.GetSchedulerConfig(), collector, l.filterCounter), false, scoreFunc) targetCandidate := newCandidateStores(filter.SelectTargetStores(stores, l.filters, cluster.GetSchedulerConfig(), nil, l.filterCounter), true, scoreFunc) @@ -379,7 +379,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun } } } - l.retryQuota.GC(append(sourceCandidate.stores, targetCandidate.stores...)) + l.retryQuota.gc(append(sourceCandidate.stores, targetCandidate.stores...)) return result, collector.GetPlans() } @@ -388,7 +388,7 @@ func createTransferLeaderOperator(cs *candidateStores, dir string, l *balanceLea store := cs.getStore() ssolver.Step++ defer func() { ssolver.Step-- }() - retryLimit := l.retryQuota.GetLimit(store) + retryLimit := l.retryQuota.getLimit(store) var creator func(*solver, *plan.Collector) *operator.Operator switch dir { case transferOut: @@ -408,9 +408,9 @@ func createTransferLeaderOperator(cs *candidateStores, dir string, l *balanceLea } } if op != nil { - l.retryQuota.ResetLimit(store) + l.retryQuota.resetLimit(store) } else { - l.Attenuate(store) + l.attenuate(store) log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64(dir, store.GetID())) cs.next() } @@ -436,10 +436,10 @@ func makeInfluence(op *operator.Operator, plan *solver, usedRegions map[uint64]s // It randomly selects a health region from the source store, then picks // the best follower peer and transfers the leader. func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *plan.Collector) *operator.Operator { - solver.Region = filter.SelectOneRegion(solver.RandLeaderRegions(solver.SourceStoreID(), l.conf.getRanges()), + solver.Region = filter.SelectOneRegion(solver.RandLeaderRegions(solver.sourceStoreID(), l.conf.getRanges()), collector, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) if solver.Region == nil { - log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.SourceStoreID())) + log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.sourceStoreID())) balanceLeaderNoLeaderRegionCounter.Inc() return nil } @@ -462,8 +462,8 @@ func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *pl targets = filter.SelectTargetStores(targets, finalFilters, conf, collector, l.filterCounter) leaderSchedulePolicy := conf.GetLeaderSchedulePolicy() sort.Slice(targets, func(i, j int) bool { - iOp := solver.GetOpInfluence(targets[i].GetID()) - jOp := solver.GetOpInfluence(targets[j].GetID()) + iOp := solver.getOpInfluence(targets[i].GetID()) + jOp := solver.getOpInfluence(targets[j].GetID()) return targets[i].LeaderScore(leaderSchedulePolicy, iOp) < targets[j].LeaderScore(leaderSchedulePolicy, jOp) }) for _, solver.Target = range targets { @@ -480,10 +480,10 @@ func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *pl // It randomly selects a health region from the target store, then picks // the worst follower peer and transfers the leader. func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *plan.Collector) *operator.Operator { - solver.Region = filter.SelectOneRegion(solver.RandFollowerRegions(solver.TargetStoreID(), l.conf.getRanges()), + solver.Region = filter.SelectOneRegion(solver.RandFollowerRegions(solver.targetStoreID(), l.conf.getRanges()), nil, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) if solver.Region == nil { - log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.TargetStoreID())) + log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.targetStoreID())) balanceLeaderNoFollowerRegionCounter.Inc() return nil } @@ -536,7 +536,7 @@ func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan. } solver.Step++ defer func() { solver.Step-- }() - op, err := operator.CreateTransferLeaderOperator(BalanceLeaderType, solver, solver.Region, solver.TargetStoreID(), []uint64{}, operator.OpLeader) + op, err := operator.CreateTransferLeaderOperator(BalanceLeaderType, solver, solver.Region, solver.targetStoreID(), []uint64{}, operator.OpLeader) if err != nil { log.Debug("fail to create balance leader operator", errs.ZapError(err)) if collector != nil { @@ -548,7 +548,7 @@ func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan. balanceLeaderNewOpCounter, ) op.FinishedCounters = append(op.FinishedCounters, - balanceDirectionCounter.WithLabelValues(l.GetName(), solver.SourceMetricLabel(), solver.TargetMetricLabel()), + balanceDirectionCounter.WithLabelValues(l.GetName(), solver.sourceMetricLabel(), solver.targetMetricLabel()), ) op.SetAdditionalInfo("sourceScore", strconv.FormatFloat(solver.sourceScore, 'f', 2, 64)) op.SetAdditionalInfo("targetScore", strconv.FormatFloat(solver.targetScore, 'f', 2, 64)) diff --git a/pkg/schedule/schedulers/balance_region.go b/pkg/schedule/schedulers/balance_region.go index 3ef01345aea..7c19187dd74 100644 --- a/pkg/schedule/schedulers/balance_region.go +++ b/pkg/schedule/schedulers/balance_region.go @@ -81,10 +81,12 @@ func WithBalanceRegionName(name string) BalanceRegionCreateOption { } } +// EncodeConfig implements the Scheduler interface. func (s *balanceRegionScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +// IsScheduleAllowed implements the Scheduler interface. func (s *balanceRegionScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := s.OpController.OperatorCount(operator.OpRegion) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() if !allowed { @@ -93,6 +95,7 @@ func (s *balanceRegionScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster return allowed } +// Schedule implements the Scheduler interface. func (s *balanceRegionScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { basePlan := plan.NewBalanceSchedulerPlan() defer s.filterCounter.Flush() @@ -112,8 +115,8 @@ func (s *balanceRegionScheduler) Schedule(cluster sche.SchedulerCluster, dryRun solver := newSolver(basePlan, kind, cluster, opInfluence) sort.Slice(sourceStores, func(i, j int) bool { - iOp := solver.GetOpInfluence(sourceStores[i].GetID()) - jOp := solver.GetOpInfluence(sourceStores[j].GetID()) + iOp := solver.getOpInfluence(sourceStores[i].GetID()) + jOp := solver.getOpInfluence(sourceStores[j].GetID()) return sourceStores[i].RegionScore(conf.GetRegionScoreFormulaVersion(), conf.GetHighSpaceRatio(), conf.GetLowSpaceRatio(), iOp) > sourceStores[j].RegionScore(conf.GetRegionScoreFormulaVersion(), conf.GetHighSpaceRatio(), conf.GetLowSpaceRatio(), jOp) }) @@ -138,7 +141,7 @@ func (s *balanceRegionScheduler) Schedule(cluster sche.SchedulerCluster, dryRun // sourcesStore is sorted by region score desc, so we pick the first store as source store. for sourceIndex, solver.Source = range sourceStores { - retryLimit := s.retryQuota.GetLimit(solver.Source) + retryLimit := s.retryQuota.getLimit(solver.Source) solver.sourceScore = solver.sourceStoreScore(s.GetName()) if sourceIndex == len(sourceStores)-1 { break @@ -146,22 +149,22 @@ func (s *balanceRegionScheduler) Schedule(cluster sche.SchedulerCluster, dryRun for i := 0; i < retryLimit; i++ { // Priority pick the region that has a pending peer. // Pending region may mean the disk is overload, remove the pending region firstly. - solver.Region = filter.SelectOneRegion(cluster.RandPendingRegions(solver.SourceStoreID(), s.conf.Ranges), collector, - append(baseRegionFilters, filter.NewRegionWitnessFilter(solver.SourceStoreID()))...) + solver.Region = filter.SelectOneRegion(cluster.RandPendingRegions(solver.sourceStoreID(), s.conf.Ranges), collector, + append(baseRegionFilters, filter.NewRegionWitnessFilter(solver.sourceStoreID()))...) if solver.Region == nil { // Then pick the region that has a follower in the source store. - solver.Region = filter.SelectOneRegion(cluster.RandFollowerRegions(solver.SourceStoreID(), s.conf.Ranges), collector, - append(baseRegionFilters, filter.NewRegionWitnessFilter(solver.SourceStoreID()), pendingFilter)...) + solver.Region = filter.SelectOneRegion(cluster.RandFollowerRegions(solver.sourceStoreID(), s.conf.Ranges), collector, + append(baseRegionFilters, filter.NewRegionWitnessFilter(solver.sourceStoreID()), pendingFilter)...) } if solver.Region == nil { // Then pick the region has the leader in the source store. - solver.Region = filter.SelectOneRegion(cluster.RandLeaderRegions(solver.SourceStoreID(), s.conf.Ranges), collector, - append(baseRegionFilters, filter.NewRegionWitnessFilter(solver.SourceStoreID()), pendingFilter)...) + solver.Region = filter.SelectOneRegion(cluster.RandLeaderRegions(solver.sourceStoreID(), s.conf.Ranges), collector, + append(baseRegionFilters, filter.NewRegionWitnessFilter(solver.sourceStoreID()), pendingFilter)...) } if solver.Region == nil { // Finally, pick learner. - solver.Region = filter.SelectOneRegion(cluster.RandLearnerRegions(solver.SourceStoreID(), s.conf.Ranges), collector, - append(baseRegionFilters, filter.NewRegionWitnessFilter(solver.SourceStoreID()), pendingFilter)...) + solver.Region = filter.SelectOneRegion(cluster.RandLearnerRegions(solver.sourceStoreID(), s.conf.Ranges), collector, + append(baseRegionFilters, filter.NewRegionWitnessFilter(solver.sourceStoreID()), pendingFilter)...) } if solver.Region == nil { balanceRegionNoRegionCounter.Inc() @@ -191,15 +194,15 @@ func (s *balanceRegionScheduler) Schedule(cluster sche.SchedulerCluster, dryRun // satisfy all the filters, so the region fit must belong the scheduled region. solver.fit = replicaFilter.(*filter.RegionReplicatedFilter).GetFit() if op := s.transferPeer(solver, collector, sourceStores[sourceIndex+1:], faultTargets); op != nil { - s.retryQuota.ResetLimit(solver.Source) + s.retryQuota.resetLimit(solver.Source) op.Counters = append(op.Counters, balanceRegionNewOpCounter) return []*operator.Operator{op}, collector.GetPlans() } solver.Step-- } - s.retryQuota.Attenuate(solver.Source) + s.retryQuota.attenuate(solver.Source) } - s.retryQuota.GC(stores) + s.retryQuota.gc(stores) return nil, collector.GetPlans() } diff --git a/pkg/schedule/schedulers/balance_test.go b/pkg/schedule/schedulers/balance_test.go index 26214ed5456..0cfaf510f1b 100644 --- a/pkg/schedule/schedulers/balance_test.go +++ b/pkg/schedule/schedulers/balance_test.go @@ -1399,8 +1399,8 @@ func TestConcurrencyUpdateConfig(t *testing.T) { return default: } - sche.config.BuildWithArgs(args) - re.NoError(sche.config.Persist()) + sche.config.buildWithArgs(args) + re.NoError(sche.config.persist()) } }() for i := 0; i < 1000; i++ { diff --git a/pkg/schedule/schedulers/balance_witness.go b/pkg/schedule/schedulers/balance_witness.go index 319a0f2493a..dbb0d012c72 100644 --- a/pkg/schedule/schedulers/balance_witness.go +++ b/pkg/schedule/schedulers/balance_witness.go @@ -143,19 +143,19 @@ func newBalanceWitnessHandler(conf *balanceWitnessSchedulerConfig) http.Handler rd: render.New(render.Options{IndentJSON: true}), } router := mux.NewRouter() - router.HandleFunc("/config", handler.UpdateConfig).Methods(http.MethodPost) - router.HandleFunc("/list", handler.ListConfig).Methods(http.MethodGet) + router.HandleFunc("/config", handler.updateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", handler.listConfig).Methods(http.MethodGet) return router } -func (handler *balanceWitnessHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { +func (handler *balanceWitnessHandler) updateConfig(w http.ResponseWriter, r *http.Request) { data, _ := io.ReadAll(r.Body) r.Body.Close() httpCode, v := handler.config.Update(data) handler.rd.JSON(w, httpCode, v) } -func (handler *balanceWitnessHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { +func (handler *balanceWitnessHandler) listConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() handler.rd.JSON(w, http.StatusOK, conf) } @@ -191,6 +191,7 @@ func newBalanceWitnessScheduler(opController *operator.Controller, conf *balance return s } +// ServeHTTP implements the http.Handler interface. func (b *balanceWitnessScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { b.handler.ServeHTTP(w, r) } @@ -205,12 +206,14 @@ func WithBalanceWitnessCounter(counter *prometheus.CounterVec) BalanceWitnessCre } } +// EncodeConfig implements the Scheduler interface. func (b *balanceWitnessScheduler) EncodeConfig() ([]byte, error) { b.conf.RLock() defer b.conf.RUnlock() return EncodeConfig(b.conf) } +// ReloadConfig implements the Scheduler interface. func (b *balanceWitnessScheduler) ReloadConfig() error { b.conf.Lock() defer b.conf.Unlock() @@ -230,6 +233,7 @@ func (b *balanceWitnessScheduler) ReloadConfig() error { return nil } +// IsScheduleAllowed implements the Scheduler interface. func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := b.OpController.OperatorCount(operator.OpWitness) < cluster.GetSchedulerConfig().GetWitnessScheduleLimit() if !allowed { @@ -238,6 +242,7 @@ func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.SchedulerCluste return allowed } +// Schedule implements the Scheduler interface. func (b *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { basePlan := plan.NewBalanceSchedulerPlan() var collector *plan.Collector @@ -253,7 +258,7 @@ func (b *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun stores := cluster.GetStores() scoreFunc := func(store *core.StoreInfo) float64 { - return store.WitnessScore(solver.GetOpInfluence(store.GetID())) + return store.WitnessScore(solver.getOpInfluence(store.GetID())) } sourceCandidate := newCandidateStores(filter.SelectSourceStores(stores, b.filters, cluster.GetSchedulerConfig(), collector, b.filterCounter), false, scoreFunc) usedRegions := make(map[uint64]struct{}) @@ -269,7 +274,7 @@ func (b *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun makeInfluence(op, solver, usedRegions, sourceCandidate) } } - b.retryQuota.GC(sourceCandidate.stores) + b.retryQuota.gc(sourceCandidate.stores) return result, collector.GetPlans() } @@ -278,7 +283,7 @@ func createTransferWitnessOperator(cs *candidateStores, b *balanceWitnessSchedul store := cs.getStore() ssolver.Step++ defer func() { ssolver.Step-- }() - retryLimit := b.retryQuota.GetLimit(store) + retryLimit := b.retryQuota.getLimit(store) ssolver.Source, ssolver.Target = store, nil var op *operator.Operator for i := 0; i < retryLimit; i++ { @@ -291,9 +296,9 @@ func createTransferWitnessOperator(cs *candidateStores, b *balanceWitnessSchedul } } if op != nil { - b.retryQuota.ResetLimit(store) + b.retryQuota.resetLimit(store) } else { - b.Attenuate(store) + b.attenuate(store) log.Debug("no operator created for selected stores", zap.String("scheduler", b.GetName()), zap.Uint64("transfer-out", store.GetID())) cs.next() } @@ -304,10 +309,10 @@ func createTransferWitnessOperator(cs *candidateStores, b *balanceWitnessSchedul // It randomly selects a health region from the source store, then picks // the best follower peer and transfers the witness. func (b *balanceWitnessScheduler) transferWitnessOut(solver *solver, collector *plan.Collector) *operator.Operator { - solver.Region = filter.SelectOneRegion(solver.RandWitnessRegions(solver.SourceStoreID(), b.conf.getRanges()), + solver.Region = filter.SelectOneRegion(solver.RandWitnessRegions(solver.sourceStoreID(), b.conf.getRanges()), collector, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) if solver.Region == nil { - log.Debug("store has no witness", zap.String("scheduler", b.GetName()), zap.Uint64("store-id", solver.SourceStoreID())) + log.Debug("store has no witness", zap.String("scheduler", b.GetName()), zap.Uint64("store-id", solver.sourceStoreID())) schedulerCounter.WithLabelValues(b.GetName(), "no-witness-region").Inc() return nil } @@ -321,8 +326,8 @@ func (b *balanceWitnessScheduler) transferWitnessOut(solver *solver, collector * } targets = filter.SelectTargetStores(targets, finalFilters, conf, collector, b.filterCounter) sort.Slice(targets, func(i, j int) bool { - iOp := solver.GetOpInfluence(targets[i].GetID()) - jOp := solver.GetOpInfluence(targets[j].GetID()) + iOp := solver.getOpInfluence(targets[i].GetID()) + jOp := solver.getOpInfluence(targets[j].GetID()) return targets[i].WitnessScore(iOp) < targets[j].WitnessScore(jOp) }) for _, solver.Target = range targets { @@ -352,7 +357,7 @@ func (b *balanceWitnessScheduler) createOperator(solver *solver, collector *plan } solver.Step++ defer func() { solver.Step-- }() - op, err := operator.CreateMoveWitnessOperator(BalanceWitnessType, solver, solver.Region, solver.SourceStoreID(), solver.TargetStoreID()) + op, err := operator.CreateMoveWitnessOperator(BalanceWitnessType, solver, solver.Region, solver.sourceStoreID(), solver.targetStoreID()) if err != nil { log.Debug("fail to create balance witness operator", errs.ZapError(err)) return nil @@ -361,9 +366,9 @@ func (b *balanceWitnessScheduler) createOperator(solver *solver, collector *plan schedulerCounter.WithLabelValues(b.GetName(), "new-operator"), ) op.FinishedCounters = append(op.FinishedCounters, - balanceDirectionCounter.WithLabelValues(b.GetName(), solver.SourceMetricLabel(), solver.TargetMetricLabel()), - b.counter.WithLabelValues("move-witness", solver.SourceMetricLabel()+"-out"), - b.counter.WithLabelValues("move-witness", solver.TargetMetricLabel()+"-in"), + balanceDirectionCounter.WithLabelValues(b.GetName(), solver.sourceMetricLabel(), solver.targetMetricLabel()), + b.counter.WithLabelValues("move-witness", solver.sourceMetricLabel()+"-out"), + b.counter.WithLabelValues("move-witness", solver.targetMetricLabel()+"-in"), ) op.SetAdditionalInfo("sourceScore", strconv.FormatFloat(solver.sourceScore, 'f', 2, 64)) op.SetAdditionalInfo("targetScore", strconv.FormatFloat(solver.targetScore, 'f', 2, 64)) diff --git a/pkg/schedule/schedulers/base_scheduler.go b/pkg/schedule/schedulers/base_scheduler.go index 6cd02d2b555..b3dae9856e6 100644 --- a/pkg/schedule/schedulers/base_scheduler.go +++ b/pkg/schedule/schedulers/base_scheduler.go @@ -102,6 +102,7 @@ func (*BaseScheduler) PrepareConfig(sche.SchedulerCluster) error { return nil } // CleanConfig does some cleanup work about config. func (*BaseScheduler) CleanConfig(sche.SchedulerCluster) {} +// GetName returns the name of the scheduler func (s *BaseScheduler) GetName() string { if len(s.name) == 0 { return s.tp.String() @@ -109,6 +110,7 @@ func (s *BaseScheduler) GetName() string { return s.name } +// GetType returns the type of the scheduler func (s *BaseScheduler) GetType() types.CheckerSchedulerType { return s.tp } diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 3aba9a5d184..7e5c4706043 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -265,26 +265,32 @@ func (s *evictLeaderScheduler) EvictStoreIDs() []uint64 { return s.conf.getStores() } +// ServeHTTP implements the http.Handler interface. func (s *evictLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.handler.ServeHTTP(w, r) } +// GetName implements the Scheduler interface. func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { return s.conf.encodeConfig() } +// ReloadConfig reloads the config from the storage. func (s *evictLeaderScheduler) ReloadConfig() error { return s.conf.reloadConfig(s.GetName()) } +// PrepareConfig implements the Scheduler interface. func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { return s.conf.pauseLeaderTransfer(cluster) } +// CleanConfig implements the Scheduler interface. func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.conf.resumeLeaderTransfer(cluster) } +// IsScheduleAllowed implements the Scheduler interface. func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() if !allowed { @@ -293,6 +299,7 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) return allowed } +// Schedule implements the Scheduler interface. func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { evictLeaderCounter.Inc() return scheduleEvictLeaderBatch(s.GetName(), cluster, s.conf), nil @@ -399,7 +406,7 @@ type evictLeaderHandler struct { config *evictLeaderSchedulerConfig } -func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { +func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.Request) { var input map[string]any if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { return @@ -454,12 +461,12 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.") } -func (handler *evictLeaderHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { +func (handler *evictLeaderHandler) listConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() handler.rd.JSON(w, http.StatusOK, conf) } -func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.Request) { +func (handler *evictLeaderHandler) deleteConfig(w http.ResponseWriter, r *http.Request) { idStr := mux.Vars(r)["store_id"] id, err := strconv.ParseUint(idStr, 10, 64) if err != nil { @@ -486,8 +493,8 @@ func newEvictLeaderHandler(config *evictLeaderSchedulerConfig) http.Handler { rd: render.New(render.Options{IndentJSON: true}), } router := mux.NewRouter() - router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) - router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) - router.HandleFunc("/delete/{store_id}", h.DeleteConfig).Methods(http.MethodDelete) + router.HandleFunc("/config", h.updateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.listConfig).Methods(http.MethodGet) + router.HandleFunc("/delete/{store_id}", h.deleteConfig).Methods(http.MethodDelete) return router } diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index 721444d1da7..bc0590531af 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -64,7 +64,7 @@ func initEvictSlowStoreSchedulerConfig(storage endpoint.ConfigStorage) *evictSlo } } -func (conf *evictSlowStoreSchedulerConfig) Clone() *evictSlowStoreSchedulerConfig { +func (conf *evictSlowStoreSchedulerConfig) clone() *evictSlowStoreSchedulerConfig { conf.RLock() defer conf.RUnlock() return &evictSlowStoreSchedulerConfig{ @@ -149,12 +149,12 @@ func newEvictSlowStoreHandler(config *evictSlowStoreSchedulerConfig) http.Handle rd: render.New(render.Options{IndentJSON: true}), } router := mux.NewRouter() - router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) - router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) + router.HandleFunc("/config", h.updateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.listConfig).Methods(http.MethodGet) return router } -func (handler *evictSlowStoreHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { +func (handler *evictSlowStoreHandler) updateConfig(w http.ResponseWriter, r *http.Request) { var input map[string]any if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { return @@ -178,8 +178,8 @@ func (handler *evictSlowStoreHandler) UpdateConfig(w http.ResponseWriter, r *htt handler.rd.JSON(w, http.StatusOK, "Config updated.") } -func (handler *evictSlowStoreHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { - conf := handler.config.Clone() +func (handler *evictSlowStoreHandler) listConfig(w http.ResponseWriter, _ *http.Request) { + conf := handler.config.clone() handler.rd.JSON(w, http.StatusOK, conf) } @@ -189,14 +189,17 @@ type evictSlowStoreScheduler struct { handler http.Handler } +// ServeHTTP implements the http.Handler interface. func (s *evictSlowStoreScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.handler.ServeHTTP(w, r) } +// EncodeConfig implements the Scheduler interface. func (s *evictSlowStoreScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +// ReloadConfig implements the Scheduler interface. func (s *evictSlowStoreScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() @@ -225,6 +228,7 @@ func (s *evictSlowStoreScheduler) ReloadConfig() error { return nil } +// PrepareConfig implements the Scheduler interface. func (s *evictSlowStoreScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { evictStore := s.conf.evictStore() if evictStore != 0 { @@ -233,6 +237,7 @@ func (s *evictSlowStoreScheduler) PrepareConfig(cluster sche.SchedulerCluster) e return nil } +// CleanConfig implements the Scheduler interface. func (s *evictSlowStoreScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.cleanupEvictLeader(cluster) } @@ -262,6 +267,7 @@ func (s *evictSlowStoreScheduler) schedulerEvictLeader(cluster sche.SchedulerClu return scheduleEvictLeaderBatch(s.GetName(), cluster, s.conf) } +// IsScheduleAllowed implements the Scheduler interface. func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { if s.conf.evictStore() != 0 { allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() @@ -273,6 +279,7 @@ func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster sche.SchedulerCluste return true } +// Schedule implements the Scheduler interface. func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { evictSlowStoreCounter.Inc() diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index d14cec1e06a..5fa799c45b5 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -238,12 +238,12 @@ func newEvictSlowTrendHandler(config *evictSlowTrendSchedulerConfig) http.Handle rd: render.New(render.Options{IndentJSON: true}), } router := mux.NewRouter() - router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) - router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) + router.HandleFunc("/config", h.updateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.listConfig).Methods(http.MethodGet) return router } -func (handler *evictSlowTrendHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { +func (handler *evictSlowTrendHandler) updateConfig(w http.ResponseWriter, r *http.Request) { var input map[string]any if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { return @@ -267,7 +267,7 @@ func (handler *evictSlowTrendHandler) UpdateConfig(w http.ResponseWriter, r *htt handler.rd.JSON(w, http.StatusOK, "Config updated.") } -func (handler *evictSlowTrendHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { +func (handler *evictSlowTrendHandler) listConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() handler.rd.JSON(w, http.StatusOK, conf) } @@ -291,14 +291,17 @@ func (s *evictSlowTrendScheduler) GetNextInterval(time.Duration) time.Duration { return intervalGrow(s.GetMinInterval(), MaxScheduleInterval, growthType) } +// ServeHTTP implements the http.Handler interface. func (s *evictSlowTrendScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.handler.ServeHTTP(w, r) } +// EncodeConfig implements the Scheduler interface. func (s *evictSlowTrendScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +// ReloadConfig implements the Scheduler interface. func (s *evictSlowTrendScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() @@ -327,6 +330,7 @@ func (s *evictSlowTrendScheduler) ReloadConfig() error { return nil } +// PrepareConfig implements the Scheduler interface. func (s *evictSlowTrendScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { evictedStoreID := s.conf.evictedStore() if evictedStoreID == 0 { @@ -335,6 +339,7 @@ func (s *evictSlowTrendScheduler) PrepareConfig(cluster sche.SchedulerCluster) e return cluster.SlowTrendEvicted(evictedStoreID) } +// CleanConfig implements the Scheduler interface. func (s *evictSlowTrendScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.cleanupEvictLeader(cluster) } @@ -369,6 +374,7 @@ func (s *evictSlowTrendScheduler) scheduleEvictLeader(cluster sche.SchedulerClus return scheduleEvictLeaderBatch(s.GetName(), cluster, s.conf) } +// IsScheduleAllowed implements the Scheduler interface. func (s *evictSlowTrendScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { if s.conf.evictedStore() == 0 { return true @@ -380,6 +386,7 @@ func (s *evictSlowTrendScheduler) IsScheduleAllowed(cluster sche.SchedulerCluste return allowed } +// Schedule implements the Scheduler interface. func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc() diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index 4289effd7bd..1e45096a881 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -69,19 +69,19 @@ func (conf *grantHotRegionSchedulerConfig) setStore(leaderID uint64, peers []uin return ret } -func (conf *grantHotRegionSchedulerConfig) GetStoreLeaderID() uint64 { +func (conf *grantHotRegionSchedulerConfig) getStoreLeaderID() uint64 { conf.RLock() defer conf.RUnlock() return conf.StoreLeaderID } -func (conf *grantHotRegionSchedulerConfig) SetStoreLeaderID(id uint64) { +func (conf *grantHotRegionSchedulerConfig) setStoreLeaderID(id uint64) { conf.Lock() defer conf.Unlock() conf.StoreLeaderID = id } -func (conf *grantHotRegionSchedulerConfig) Clone() *grantHotRegionSchedulerConfig { +func (conf *grantHotRegionSchedulerConfig) clone() *grantHotRegionSchedulerConfig { conf.RLock() defer conf.RUnlock() newStoreIDs := make([]uint64, len(conf.StoreIDs)) @@ -139,10 +139,12 @@ func newGrantHotRegionScheduler(opController *operator.Controller, conf *grantHo return ret } +// EncodeConfig implements the Scheduler interface. func (s *grantHotRegionScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +// ReloadConfig implements the Scheduler interface. func (s *grantHotRegionScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() @@ -186,7 +188,7 @@ type grantHotRegionHandler struct { config *grantHotRegionSchedulerConfig } -func (handler *grantHotRegionHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { +func (handler *grantHotRegionHandler) updateConfig(w http.ResponseWriter, r *http.Request) { var input map[string]any if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { return @@ -216,15 +218,15 @@ func (handler *grantHotRegionHandler) UpdateConfig(w http.ResponseWriter, r *htt } if err = handler.config.Persist(); err != nil { - handler.config.SetStoreLeaderID(0) + handler.config.setStoreLeaderID(0) handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } handler.rd.JSON(w, http.StatusOK, nil) } -func (handler *grantHotRegionHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { - conf := handler.config.Clone() +func (handler *grantHotRegionHandler) listConfig(w http.ResponseWriter, _ *http.Request) { + conf := handler.config.clone() handler.rd.JSON(w, http.StatusOK, conf) } @@ -234,8 +236,8 @@ func newGrantHotRegionHandler(config *grantHotRegionSchedulerConfig) http.Handle rd: render.New(render.Options{IndentJSON: true}), } router := mux.NewRouter() - router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) - router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) + router.HandleFunc("/config", h.updateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.listConfig).Methods(http.MethodGet) return router } @@ -269,7 +271,7 @@ func (s *grantHotRegionScheduler) randomSchedule(cluster sche.SchedulerCluster, continue } } else { - if !s.conf.has(srcStoreID) || srcStoreID == s.conf.GetStoreLeaderID() { + if !s.conf.has(srcStoreID) || srcStoreID == s.conf.getStoreLeaderID() { continue } } @@ -310,7 +312,7 @@ func (s *grantHotRegionScheduler) transfer(cluster sche.SchedulerCluster, region var candidate []uint64 if isLeader { filters = append(filters, &filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.High}) - candidate = []uint64{s.conf.GetStoreLeaderID()} + candidate = []uint64{s.conf.getStoreLeaderID()} } else { filters = append(filters, &filter.StoreStateFilter{ActionScope: s.GetName(), MoveRegion: true, OperatorLevel: constant.High}, filter.NewExcludedFilter(s.GetName(), srcRegion.GetStoreIDs(), srcRegion.GetStoreIDs())) diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index 41e6debaafa..1cf194c5f49 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -70,7 +70,7 @@ func (conf *grantLeaderSchedulerConfig) BuildWithArgs(args []string) error { return nil } -func (conf *grantLeaderSchedulerConfig) Clone() *grantLeaderSchedulerConfig { +func (conf *grantLeaderSchedulerConfig) clone() *grantLeaderSchedulerConfig { conf.RLock() defer conf.RUnlock() newStoreIDWithRanges := make(map[uint64][]core.KeyRange) @@ -82,7 +82,7 @@ func (conf *grantLeaderSchedulerConfig) Clone() *grantLeaderSchedulerConfig { } } -func (conf *grantLeaderSchedulerConfig) Persist() error { +func (conf *grantLeaderSchedulerConfig) persist() error { conf.RLock() defer conf.RUnlock() data, err := EncodeConfig(conf) @@ -164,14 +164,17 @@ func newGrantLeaderScheduler(opController *operator.Controller, conf *grantLeade } } +// ServeHTTP implements the http.Handler interface. func (s *grantLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.handler.ServeHTTP(w, r) } +// EncodeConfig implements the Scheduler interface. func (s *grantLeaderScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +// ReloadConfig implements the Scheduler interface. func (s *grantLeaderScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() @@ -191,6 +194,7 @@ func (s *grantLeaderScheduler) ReloadConfig() error { return nil } +// PrepareConfig implements the Scheduler interface. func (s *grantLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { s.conf.RLock() defer s.conf.RUnlock() @@ -203,6 +207,7 @@ func (s *grantLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) erro return res } +// CleanConfig implements the Scheduler interface. func (s *grantLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.conf.RLock() defer s.conf.RUnlock() @@ -211,6 +216,7 @@ func (s *grantLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { } } +// IsScheduleAllowed implements the Scheduler interface. func (s *grantLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() if !allowed { @@ -219,6 +225,7 @@ func (s *grantLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) return allowed } +// Schedule implements the Scheduler interface. func (s *grantLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { grantLeaderCounter.Inc() storeIDWithRanges := s.conf.getStoreIDWithRanges() @@ -250,7 +257,7 @@ type grantLeaderHandler struct { config *grantLeaderSchedulerConfig } -func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { +func (handler *grantLeaderHandler) updateConfig(w http.ResponseWriter, r *http.Request) { var input map[string]any if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { return @@ -285,7 +292,7 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R handler.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - err = handler.config.Persist() + err = handler.config.persist() if err != nil { handler.config.removeStore(id) handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) @@ -294,12 +301,12 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.") } -func (handler *grantLeaderHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { - conf := handler.config.Clone() +func (handler *grantLeaderHandler) listConfig(w http.ResponseWriter, _ *http.Request) { + conf := handler.config.clone() handler.rd.JSON(w, http.StatusOK, conf) } -func (handler *grantLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.Request) { +func (handler *grantLeaderHandler) deleteConfig(w http.ResponseWriter, r *http.Request) { idStr := mux.Vars(r)["store_id"] id, err := strconv.ParseUint(idStr, 10, 64) if err != nil { @@ -311,7 +318,7 @@ func (handler *grantLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R keyRanges := handler.config.getKeyRangesByID(id) succ, last := handler.config.removeStore(id) if succ { - err = handler.config.Persist() + err = handler.config.persist() if err != nil { handler.config.resetStore(id, keyRanges) handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) @@ -342,8 +349,8 @@ func newGrantLeaderHandler(config *grantLeaderSchedulerConfig) http.Handler { rd: render.New(render.Options{IndentJSON: true}), } router := mux.NewRouter() - router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) - router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) - router.HandleFunc("/delete/{store_id}", h.DeleteConfig).Methods(http.MethodDelete) + router.HandleFunc("/config", h.updateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.listConfig).Methods(http.MethodGet) + router.HandleFunc("/delete/{store_id}", h.deleteConfig).Methods(http.MethodDelete) return router } diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index fe9b3964139..ff837e67ad2 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -203,7 +203,7 @@ type hotScheduler struct { func newHotScheduler(opController *operator.Controller, conf *hotRegionSchedulerConfig) *hotScheduler { base := newBaseHotScheduler(opController, - conf.GetHistorySampleDuration(), conf.GetHistorySampleInterval()) + conf.getHistorySampleDuration(), conf.getHistorySampleInterval()) ret := &hotScheduler{ name: HotRegionName, baseHotScheduler: base, @@ -215,10 +215,12 @@ func newHotScheduler(opController *operator.Controller, conf *hotRegionScheduler return ret } +// EncodeConfig implements the Scheduler interface. func (h *hotScheduler) EncodeConfig() ([]byte, error) { - return h.conf.EncodeConfig() + return h.conf.encodeConfig() } +// ReloadConfig impl func (h *hotScheduler) ReloadConfig() error { h.conf.Lock() defer h.conf.Unlock() @@ -259,18 +261,22 @@ func (h *hotScheduler) ReloadConfig() error { return nil } +// ServeHTTP implements the http.Handler interface. func (h *hotScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.conf.ServeHTTP(w, r) } +// GetMinInterval implements the Scheduler interface. func (*hotScheduler) GetMinInterval() time.Duration { return minHotScheduleInterval } +// GetNextInterval implements the Scheduler interface. func (h *hotScheduler) GetNextInterval(time.Duration) time.Duration { return intervalGrow(h.GetMinInterval(), maxHotScheduleInterval, exponentialGrowth) } +// IsScheduleAllowed implements the Scheduler interface. func (h *hotScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := h.OpController.OperatorCount(operator.OpHotRegion) < cluster.GetSchedulerConfig().GetHotRegionScheduleLimit() if !allowed { @@ -279,6 +285,7 @@ func (h *hotScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { return allowed } +// Schedule implements the Scheduler interface. func (h *hotScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { hotSchedulerCounter.Inc() typ := h.randomType() @@ -288,22 +295,22 @@ func (h *hotScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*opera func (h *hotScheduler) dispatch(typ resourceType, cluster sche.SchedulerCluster) []*operator.Operator { h.Lock() defer h.Unlock() - h.updateHistoryLoadConfig(h.conf.GetHistorySampleDuration(), h.conf.GetHistorySampleInterval()) + h.updateHistoryLoadConfig(h.conf.getHistorySampleDuration(), h.conf.getHistorySampleInterval()) h.prepareForBalance(typ, cluster) - // IsForbidRWType can not be move earlier to support to use api and metrics. + // isForbidRWType can not be move earlier to support to use api and metrics. switch typ { case readLeader, readPeer: - if h.conf.IsForbidRWType(utils.Read) { + if h.conf.isForbidRWType(utils.Read) { return nil } return h.balanceHotReadRegions(cluster) case writePeer: - if h.conf.IsForbidRWType(utils.Write) { + if h.conf.isForbidRWType(utils.Write) { return nil } return h.balanceHotWritePeers(cluster) case writeLeader: - if h.conf.IsForbidRWType(utils.Write) { + if h.conf.isForbidRWType(utils.Write) { return nil } return h.balanceHotWriteLeaders(cluster) @@ -499,11 +506,11 @@ type balanceSolver struct { func (bs *balanceSolver) init() { // Load the configuration items of the scheduler. bs.resourceTy = toResourceType(bs.rwTy, bs.opTy) - bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber() + bs.maxPeerNum = bs.sche.conf.getMaxPeerNumber() bs.minHotDegree = bs.GetSchedulerConfig().GetHotRegionCacheHitsThreshold() bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities()) - bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio() - switch bs.sche.conf.GetRankFormulaVersion() { + bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.getGreatDecRatio(), bs.sche.conf.getMinorDecRatio() + switch bs.sche.conf.getRankFormulaVersion() { case "v1": bs.rank = initRankV1(bs) default: @@ -534,16 +541,16 @@ func (bs *balanceSolver) init() { } rankStepRatios := []float64{ - utils.ByteDim: bs.sche.conf.GetByteRankStepRatio(), - utils.KeyDim: bs.sche.conf.GetKeyRankStepRatio(), - utils.QueryDim: bs.sche.conf.GetQueryRateRankStepRatio()} + utils.ByteDim: bs.sche.conf.getByteRankStepRatio(), + utils.KeyDim: bs.sche.conf.getKeyRankStepRatio(), + utils.QueryDim: bs.sche.conf.getQueryRateRankStepRatio()} stepLoads := make([]float64, utils.DimLen) for i := range stepLoads { stepLoads[i] = maxCur.Loads[i] * rankStepRatios[i] } bs.rankStep = &statistics.StoreLoad{ Loads: stepLoads, - Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(), + Count: maxCur.Count * bs.sche.conf.getCountRankStepRatio(), } } @@ -557,11 +564,11 @@ func (bs *balanceSolver) getPriorities() []string { // For write, they are different switch bs.resourceTy { case readLeader, readPeer: - return adjustPrioritiesConfig(querySupport, bs.sche.conf.GetReadPriorities(), getReadPriorities) + return adjustPrioritiesConfig(querySupport, bs.sche.conf.getReadPriorities(), getReadPriorities) case writeLeader: - return adjustPrioritiesConfig(querySupport, bs.sche.conf.GetWriteLeaderPriorities(), getWriteLeaderPriorities) + return adjustPrioritiesConfig(querySupport, bs.sche.conf.getWriteLeaderPriorities(), getWriteLeaderPriorities) case writePeer: - return adjustPrioritiesConfig(querySupport, bs.sche.conf.GetWritePeerPriorities(), getWritePeerPriorities) + return adjustPrioritiesConfig(querySupport, bs.sche.conf.getWritePeerPriorities(), getWritePeerPriorities) } log.Error("illegal type or illegal operator while getting the priority", zap.String("type", bs.rwTy.String()), zap.String("operator", bs.opTy.String())) return []string{} @@ -763,16 +770,16 @@ func (bs *balanceSolver) calcMaxZombieDur() time.Duration { // We use store query info rather than total of hot write leader to guide hot write leader scheduler // when its first priority is `QueryDim`, because `Write-peer` does not have `QueryDim`. // The reason is the same with `tikvCollector.GetLoads`. - return bs.sche.conf.GetStoreStatZombieDuration() + return bs.sche.conf.getStoreStatZombieDuration() } - return bs.sche.conf.GetRegionsStatZombieDuration() + return bs.sche.conf.getRegionsStatZombieDuration() case writePeer: if bs.best.srcStore.IsTiFlash() { - return bs.sche.conf.GetRegionsStatZombieDuration() + return bs.sche.conf.getRegionsStatZombieDuration() } - return bs.sche.conf.GetStoreStatZombieDuration() + return bs.sche.conf.getStoreStatZombieDuration() default: - return bs.sche.conf.GetStoreStatZombieDuration() + return bs.sche.conf.getStoreStatZombieDuration() } } @@ -780,8 +787,8 @@ func (bs *balanceSolver) calcMaxZombieDur() time.Duration { // its expectation * ratio, the store would be selected as hot source store func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetail { ret := make(map[uint64]*statistics.StoreLoadDetail) - confSrcToleranceRatio := bs.sche.conf.GetSrcToleranceRatio() - confEnableForTiFlash := bs.sche.conf.GetEnableForTiFlash() + confSrcToleranceRatio := bs.sche.conf.getSrcToleranceRatio() + confEnableForTiFlash := bs.sche.conf.getEnableForTiFlash() for id, detail := range bs.stLoadDetail { srcToleranceRatio := confSrcToleranceRatio if detail.IsTiFlash() { @@ -1019,8 +1026,8 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*statistics.StoreLoadDetai func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*statistics.StoreLoadDetail) map[uint64]*statistics.StoreLoadDetail { ret := make(map[uint64]*statistics.StoreLoadDetail, len(candidates)) - confDstToleranceRatio := bs.sche.conf.GetDstToleranceRatio() - confEnableForTiFlash := bs.sche.conf.GetEnableForTiFlash() + confDstToleranceRatio := bs.sche.conf.getDstToleranceRatio() + confEnableForTiFlash := bs.sche.conf.getEnableForTiFlash() for _, detail := range candidates { store := detail.StoreInfo dstToleranceRatio := confDstToleranceRatio @@ -1113,7 +1120,7 @@ func (bs *balanceSolver) checkHistoryLoadsByPriorityAndToleranceFirstOnly(_ [][] } func (bs *balanceSolver) enableExpectation() bool { - return bs.sche.conf.GetDstToleranceRatio() > 0 && bs.sche.conf.GetSrcToleranceRatio() > 0 + return bs.sche.conf.getDstToleranceRatio() > 0 && bs.sche.conf.getSrcToleranceRatio() > 0 } func (bs *balanceSolver) isUniformFirstPriority(store *statistics.StoreLoadDetail) bool { @@ -1149,11 +1156,11 @@ func (bs *balanceSolver) isTolerance(dim int, reverse bool) bool { func (bs *balanceSolver) getMinRate(dim int) float64 { switch dim { case utils.KeyDim: - return bs.sche.conf.GetMinHotKeyRate() + return bs.sche.conf.getMinHotKeyRate() case utils.ByteDim: - return bs.sche.conf.GetMinHotByteRate() + return bs.sche.conf.getMinHotByteRate() case utils.QueryDim: - return bs.sche.conf.GetMinHotQueryRate() + return bs.sche.conf.getMinHotQueryRate() } return -1 } diff --git a/pkg/schedule/schedulers/hot_region_config.go b/pkg/schedule/schedulers/hot_region_config.go index 5f08d755f76..83121254cc0 100644 --- a/pkg/schedule/schedulers/hot_region_config.go +++ b/pkg/schedule/schedulers/hot_region_config.go @@ -157,181 +157,183 @@ type hotRegionSchedulerConfig struct { HistorySampleInterval typeutil.Duration `json:"history-sample-interval"` } -func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) { +func (conf *hotRegionSchedulerConfig) encodeConfig() ([]byte, error) { conf.RLock() defer conf.RUnlock() return EncodeConfig(conf) } -func (conf *hotRegionSchedulerConfig) GetStoreStatZombieDuration() time.Duration { +func (conf *hotRegionSchedulerConfig) getStoreStatZombieDuration() time.Duration { conf.RLock() defer conf.RUnlock() return time.Duration(conf.MaxZombieRounds*utils.StoreHeartBeatReportInterval) * time.Second } -func (conf *hotRegionSchedulerConfig) GetRegionsStatZombieDuration() time.Duration { +func (conf *hotRegionSchedulerConfig) getRegionsStatZombieDuration() time.Duration { conf.RLock() defer conf.RUnlock() return time.Duration(conf.MaxZombieRounds*utils.RegionHeartBeatReportInterval) * time.Second } -func (conf *hotRegionSchedulerConfig) GetMaxPeerNumber() int { +func (conf *hotRegionSchedulerConfig) getMaxPeerNumber() int { conf.RLock() defer conf.RUnlock() return conf.MaxPeerNum } -func (conf *hotRegionSchedulerConfig) GetSrcToleranceRatio() float64 { +func (conf *hotRegionSchedulerConfig) getSrcToleranceRatio() float64 { conf.RLock() defer conf.RUnlock() return conf.SrcToleranceRatio } -func (conf *hotRegionSchedulerConfig) SetSrcToleranceRatio(tol float64) { +func (conf *hotRegionSchedulerConfig) setSrcToleranceRatio(tol float64) { conf.Lock() defer conf.Unlock() conf.SrcToleranceRatio = tol } -func (conf *hotRegionSchedulerConfig) GetDstToleranceRatio() float64 { +func (conf *hotRegionSchedulerConfig) getDstToleranceRatio() float64 { conf.RLock() defer conf.RUnlock() return conf.DstToleranceRatio } -func (conf *hotRegionSchedulerConfig) SetDstToleranceRatio(tol float64) { +func (conf *hotRegionSchedulerConfig) setDstToleranceRatio(tol float64) { conf.Lock() defer conf.Unlock() conf.DstToleranceRatio = tol } -func (conf *hotRegionSchedulerConfig) GetByteRankStepRatio() float64 { +func (conf *hotRegionSchedulerConfig) getByteRankStepRatio() float64 { conf.RLock() defer conf.RUnlock() return conf.ByteRateRankStepRatio } -func (conf *hotRegionSchedulerConfig) GetKeyRankStepRatio() float64 { +func (conf *hotRegionSchedulerConfig) getKeyRankStepRatio() float64 { conf.RLock() defer conf.RUnlock() return conf.KeyRateRankStepRatio } -func (conf *hotRegionSchedulerConfig) GetQueryRateRankStepRatio() float64 { +func (conf *hotRegionSchedulerConfig) getQueryRateRankStepRatio() float64 { conf.RLock() defer conf.RUnlock() return conf.QueryRateRankStepRatio } -func (conf *hotRegionSchedulerConfig) GetCountRankStepRatio() float64 { +func (conf *hotRegionSchedulerConfig) getCountRankStepRatio() float64 { conf.RLock() defer conf.RUnlock() return conf.CountRankStepRatio } -func (conf *hotRegionSchedulerConfig) GetGreatDecRatio() float64 { +func (conf *hotRegionSchedulerConfig) getGreatDecRatio() float64 { conf.RLock() defer conf.RUnlock() return conf.GreatDecRatio } -func (conf *hotRegionSchedulerConfig) SetStrictPickingStore(v bool) { +func (conf *hotRegionSchedulerConfig) setStrictPickingStore(v bool) { conf.RLock() defer conf.RUnlock() conf.StrictPickingStore = v } -func (conf *hotRegionSchedulerConfig) GetMinorDecRatio() float64 { +func (conf *hotRegionSchedulerConfig) getMinorDecRatio() float64 { conf.RLock() defer conf.RUnlock() return conf.MinorDecRatio } -func (conf *hotRegionSchedulerConfig) GetMinHotKeyRate() float64 { +func (conf *hotRegionSchedulerConfig) getMinHotKeyRate() float64 { conf.RLock() defer conf.RUnlock() return conf.MinHotKeyRate } -func (conf *hotRegionSchedulerConfig) GetMinHotByteRate() float64 { +func (conf *hotRegionSchedulerConfig) getMinHotByteRate() float64 { conf.RLock() defer conf.RUnlock() return conf.MinHotByteRate } -func (conf *hotRegionSchedulerConfig) GetEnableForTiFlash() bool { +func (conf *hotRegionSchedulerConfig) getEnableForTiFlash() bool { conf.RLock() defer conf.RUnlock() return conf.EnableForTiFlash } -func (conf *hotRegionSchedulerConfig) SetEnableForTiFlash(enable bool) { +func (conf *hotRegionSchedulerConfig) setEnableForTiFlash(enable bool) { conf.Lock() defer conf.Unlock() conf.EnableForTiFlash = enable } -func (conf *hotRegionSchedulerConfig) GetMinHotQueryRate() float64 { +func (conf *hotRegionSchedulerConfig) getMinHotQueryRate() float64 { conf.RLock() defer conf.RUnlock() return conf.MinHotQueryRate } -func (conf *hotRegionSchedulerConfig) GetReadPriorities() []string { +func (conf *hotRegionSchedulerConfig) getReadPriorities() []string { conf.RLock() defer conf.RUnlock() return conf.ReadPriorities } -func (conf *hotRegionSchedulerConfig) GetWriteLeaderPriorities() []string { +func (conf *hotRegionSchedulerConfig) getWriteLeaderPriorities() []string { conf.RLock() defer conf.RUnlock() return conf.WriteLeaderPriorities } -func (conf *hotRegionSchedulerConfig) GetWritePeerPriorities() []string { +func (conf *hotRegionSchedulerConfig) getWritePeerPriorities() []string { conf.RLock() defer conf.RUnlock() return conf.WritePeerPriorities } -func (conf *hotRegionSchedulerConfig) IsStrictPickingStoreEnabled() bool { +func (conf *hotRegionSchedulerConfig) isStrictPickingStoreEnabled() bool { conf.RLock() defer conf.RUnlock() return conf.StrictPickingStore } -func (conf *hotRegionSchedulerConfig) SetRankFormulaVersion(v string) { +func (conf *hotRegionSchedulerConfig) setRankFormulaVersion(v string) { conf.Lock() defer conf.Unlock() conf.RankFormulaVersion = v } -func (conf *hotRegionSchedulerConfig) GetRankFormulaVersion() string { +func (conf *hotRegionSchedulerConfig) getRankFormulaVersion() string { conf.RLock() defer conf.RUnlock() return conf.getRankFormulaVersionLocked() } -func (conf *hotRegionSchedulerConfig) GetHistorySampleDuration() time.Duration { +func (conf *hotRegionSchedulerConfig) getHistorySampleDuration() time.Duration { conf.RLock() defer conf.RUnlock() return conf.HistorySampleDuration.Duration } -func (conf *hotRegionSchedulerConfig) GetHistorySampleInterval() time.Duration { +func (conf *hotRegionSchedulerConfig) getHistorySampleInterval() time.Duration { conf.RLock() defer conf.RUnlock() return conf.HistorySampleInterval.Duration } -func (conf *hotRegionSchedulerConfig) SetHistorySampleDuration(d time.Duration) { +// nolint: unused, unparam +func (conf *hotRegionSchedulerConfig) setHistorySampleDuration(d time.Duration) { conf.Lock() defer conf.Unlock() conf.HistorySampleDuration = typeutil.NewDuration(d) } -func (conf *hotRegionSchedulerConfig) SetHistorySampleInterval(d time.Duration) { +// nolint: unused +func (conf *hotRegionSchedulerConfig) setHistorySampleInterval(d time.Duration) { conf.Lock() defer conf.Unlock() conf.HistorySampleInterval = typeutil.NewDuration(d) @@ -346,7 +348,7 @@ func (conf *hotRegionSchedulerConfig) getRankFormulaVersionLocked() string { } } -func (conf *hotRegionSchedulerConfig) IsForbidRWType(rw utils.RWType) bool { +func (conf *hotRegionSchedulerConfig) isForbidRWType(rw utils.RWType) bool { conf.RLock() defer conf.RUnlock() return rw.String() == conf.ForbidRWType @@ -367,6 +369,7 @@ func (conf *hotRegionSchedulerConfig) getForbidRWTypeLocked() string { } } +// ServeHTTP implements the http.Handler interface. func (conf *hotRegionSchedulerConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) { router := mux.NewRouter() router.HandleFunc("/list", conf.handleGetConfig).Methods(http.MethodGet) diff --git a/pkg/schedule/schedulers/hot_region_rank_v1.go b/pkg/schedule/schedulers/hot_region_rank_v1.go index ebf6e9bf744..9005dff8861 100644 --- a/pkg/schedule/schedulers/hot_region_rank_v1.go +++ b/pkg/schedule/schedulers/hot_region_rank_v1.go @@ -39,7 +39,7 @@ func (r *rankV1) checkByPriorityAndTolerance(loads []float64, f func(int) bool) switch { case r.resourceTy == writeLeader: return r.checkByPriorityAndToleranceFirstOnly(loads, f) - case r.sche.conf.IsStrictPickingStoreEnabled(): + case r.sche.conf.isStrictPickingStoreEnabled(): return r.checkByPriorityAndToleranceAllOf(loads, f) default: return r.checkByPriorityAndToleranceFirstOnly(loads, f) @@ -50,7 +50,7 @@ func (r *rankV1) checkHistoryLoadsByPriority(loads [][]float64, f func(int) bool switch { case r.resourceTy == writeLeader: return r.checkHistoryLoadsByPriorityAndToleranceFirstOnly(loads, f) - case r.sche.conf.IsStrictPickingStoreEnabled(): + case r.sche.conf.isStrictPickingStoreEnabled(): return r.checkHistoryLoadsByPriorityAndToleranceAllOf(loads, f) default: return r.checkHistoryLoadsByPriorityAndToleranceFirstOnly(loads, f) diff --git a/pkg/schedule/schedulers/hot_region_rank_v2_test.go b/pkg/schedule/schedulers/hot_region_rank_v2_test.go index 0237c2156ec..029d47c3c51 100644 --- a/pkg/schedule/schedulers/hot_region_rank_v2_test.go +++ b/pkg/schedule/schedulers/hot_region_rank_v2_test.go @@ -36,10 +36,10 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { re.NoError(err) hb := sche.(*hotScheduler) hb.types = []resourceType{writePeer} - hb.conf.SetDstToleranceRatio(0.0) - hb.conf.SetSrcToleranceRatio(0.0) - hb.conf.SetRankFormulaVersion("v1") - hb.conf.SetHistorySampleDuration(0) + hb.conf.setDstToleranceRatio(0.0) + hb.conf.setSrcToleranceRatio(0.0) + hb.conf.setRankFormulaVersion("v1") + hb.conf.setHistorySampleDuration(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -62,7 +62,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { re.Empty(ops) re.False(hb.searchRevertRegions[writePeer]) - hb.conf.SetRankFormulaVersion("v2") + hb.conf.setRankFormulaVersion("v2") // searchRevertRegions becomes true after the first `Schedule`. ops, _ = hb.Schedule(tc, false) re.Empty(ops) @@ -97,10 +97,10 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirst(t *testing.T) { re.NoError(err) hb := sche.(*hotScheduler) hb.types = []resourceType{writePeer} - hb.conf.SetDstToleranceRatio(0.0) - hb.conf.SetSrcToleranceRatio(0.0) - hb.conf.SetRankFormulaVersion("v1") - hb.conf.SetHistorySampleDuration(0) + hb.conf.setDstToleranceRatio(0.0) + hb.conf.setSrcToleranceRatio(0.0) + hb.conf.setRankFormulaVersion("v1") + hb.conf.setHistorySampleDuration(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -125,7 +125,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirst(t *testing.T) { re.False(hb.searchRevertRegions[writePeer]) clearPendingInfluence(hb) - hb.conf.SetRankFormulaVersion("v2") + hb.conf.setRankFormulaVersion("v2") // searchRevertRegions becomes true after the first `Schedule`. ops, _ = hb.Schedule(tc, false) re.Len(ops, 1) @@ -149,10 +149,10 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirstOnly(t *testing.T) { re.NoError(err) hb := sche.(*hotScheduler) hb.types = []resourceType{writePeer} - hb.conf.SetDstToleranceRatio(0.0) - hb.conf.SetSrcToleranceRatio(0.0) - hb.conf.SetRankFormulaVersion("v1") - hb.conf.SetHistorySampleDuration(0) + hb.conf.setDstToleranceRatio(0.0) + hb.conf.setSrcToleranceRatio(0.0) + hb.conf.setRankFormulaVersion("v1") + hb.conf.setHistorySampleDuration(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -177,7 +177,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirstOnly(t *testing.T) { re.False(hb.searchRevertRegions[writePeer]) clearPendingInfluence(hb) - hb.conf.SetRankFormulaVersion("v2") + hb.conf.setRankFormulaVersion("v2") // searchRevertRegions becomes true after the first `Schedule`. ops, _ = hb.Schedule(tc, false) re.Len(ops, 1) @@ -209,10 +209,10 @@ func TestHotReadRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { sche, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) - hb.conf.SetDstToleranceRatio(0.0) - hb.conf.SetSrcToleranceRatio(0.0) - hb.conf.SetRankFormulaVersion("v1") - hb.conf.SetHistorySampleDuration(0) + hb.conf.setDstToleranceRatio(0.0) + hb.conf.setSrcToleranceRatio(0.0) + hb.conf.setRankFormulaVersion("v1") + hb.conf.setHistorySampleDuration(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -235,7 +235,7 @@ func TestHotReadRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { re.Empty(ops) re.False(hb.searchRevertRegions[readLeader]) - hb.conf.SetRankFormulaVersion("v2") + hb.conf.setRankFormulaVersion("v2") // searchRevertRegions becomes true after the first `Schedule`. ops, _ = hb.Schedule(tc, false) re.Empty(ops) @@ -267,11 +267,11 @@ func TestSkipUniformStore(t *testing.T) { defer cancel() hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) - hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) - hb.(*hotScheduler).conf.SetDstToleranceRatio(1) - hb.(*hotScheduler).conf.SetRankFormulaVersion("v2") + hb.(*hotScheduler).conf.setSrcToleranceRatio(1) + hb.(*hotScheduler).conf.setDstToleranceRatio(1) + hb.(*hotScheduler).conf.setRankFormulaVersion("v2") hb.(*hotScheduler).conf.ReadPriorities = []string{utils.BytePriority, utils.KeyPriority} - hb.(*hotScheduler).conf.SetHistorySampleDuration(0) + hb.(*hotScheduler).conf.setHistorySampleDuration(0) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) tc.AddRegionStore(3, 20) @@ -422,9 +422,9 @@ func checkHotReadRegionScheduleWithSmallHotRegion(re *require.Assertions, highLo sche, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) - hb.conf.SetSrcToleranceRatio(1) - hb.conf.SetDstToleranceRatio(1) - hb.conf.SetRankFormulaVersion("v2") + hb.conf.setSrcToleranceRatio(1) + hb.conf.setDstToleranceRatio(1) + hb.conf.setRankFormulaVersion("v2") hb.conf.ReadPriorities = []string{utils.QueryPriority, utils.BytePriority} tc.AddRegionStore(1, 40) tc.AddRegionStore(2, 10) diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 3b563106dc0..fc7a6ae4417 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -84,36 +84,36 @@ func TestUpgrade(t *testing.T) { sche, err := CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(HotRegionType, nil)) re.NoError(err) hb := sche.(*hotScheduler) - re.Equal([]string{utils.QueryPriority, utils.BytePriority}, hb.conf.GetReadPriorities()) - re.Equal([]string{utils.QueryPriority, utils.BytePriority}, hb.conf.GetWriteLeaderPriorities()) - re.Equal([]string{utils.BytePriority, utils.KeyPriority}, hb.conf.GetWritePeerPriorities()) - re.Equal("v2", hb.conf.GetRankFormulaVersion()) + re.Equal([]string{utils.QueryPriority, utils.BytePriority}, hb.conf.getReadPriorities()) + re.Equal([]string{utils.QueryPriority, utils.BytePriority}, hb.conf.getWriteLeaderPriorities()) + re.Equal([]string{utils.BytePriority, utils.KeyPriority}, hb.conf.getWritePeerPriorities()) + re.Equal("v2", hb.conf.getRankFormulaVersion()) // upgrade from json(null) sche, err = CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) re.NoError(err) hb = sche.(*hotScheduler) - re.Equal([]string{utils.QueryPriority, utils.BytePriority}, hb.conf.GetReadPriorities()) - re.Equal([]string{utils.QueryPriority, utils.BytePriority}, hb.conf.GetWriteLeaderPriorities()) - re.Equal([]string{utils.BytePriority, utils.KeyPriority}, hb.conf.GetWritePeerPriorities()) - re.Equal("v2", hb.conf.GetRankFormulaVersion()) + re.Equal([]string{utils.QueryPriority, utils.BytePriority}, hb.conf.getReadPriorities()) + re.Equal([]string{utils.QueryPriority, utils.BytePriority}, hb.conf.getWriteLeaderPriorities()) + re.Equal([]string{utils.BytePriority, utils.KeyPriority}, hb.conf.getWritePeerPriorities()) + re.Equal("v2", hb.conf.getRankFormulaVersion()) // upgrade from < 5.2 config51 := `{"min-hot-byte-rate":100,"min-hot-key-rate":10,"min-hot-query-rate":10,"max-zombie-rounds":5,"max-peer-number":1000,"byte-rate-rank-step-ratio":0.05,"key-rate-rank-step-ratio":0.05,"query-rate-rank-step-ratio":0.05,"count-rank-step-ratio":0.01,"great-dec-ratio":0.95,"minor-dec-ratio":0.99,"src-tolerance-ratio":1.05,"dst-tolerance-ratio":1.05,"strict-picking-store":"true","enable-for-tiflash":"true"}` sche, err = CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte(config51))) re.NoError(err) hb = sche.(*hotScheduler) - re.Equal([]string{utils.BytePriority, utils.KeyPriority}, hb.conf.GetReadPriorities()) - re.Equal([]string{utils.KeyPriority, utils.BytePriority}, hb.conf.GetWriteLeaderPriorities()) - re.Equal([]string{utils.BytePriority, utils.KeyPriority}, hb.conf.GetWritePeerPriorities()) - re.Equal("v1", hb.conf.GetRankFormulaVersion()) + re.Equal([]string{utils.BytePriority, utils.KeyPriority}, hb.conf.getReadPriorities()) + re.Equal([]string{utils.KeyPriority, utils.BytePriority}, hb.conf.getWriteLeaderPriorities()) + re.Equal([]string{utils.BytePriority, utils.KeyPriority}, hb.conf.getWritePeerPriorities()) + re.Equal("v1", hb.conf.getRankFormulaVersion()) // upgrade from < 6.4 config54 := `{"min-hot-byte-rate":100,"min-hot-key-rate":10,"min-hot-query-rate":10,"max-zombie-rounds":5,"max-peer-number":1000,"byte-rate-rank-step-ratio":0.05,"key-rate-rank-step-ratio":0.05,"query-rate-rank-step-ratio":0.05,"count-rank-step-ratio":0.01,"great-dec-ratio":0.95,"minor-dec-ratio":0.99,"src-tolerance-ratio":1.05,"dst-tolerance-ratio":1.05,"read-priorities":["query","byte"],"write-leader-priorities":["query","byte"],"write-peer-priorities":["byte","key"],"strict-picking-store":"true","enable-for-tiflash":"true","forbid-rw-type":"none"}` sche, err = CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte(config54))) re.NoError(err) hb = sche.(*hotScheduler) - re.Equal([]string{utils.QueryPriority, utils.BytePriority}, hb.conf.GetReadPriorities()) - re.Equal([]string{utils.QueryPriority, utils.BytePriority}, hb.conf.GetWriteLeaderPriorities()) - re.Equal([]string{utils.BytePriority, utils.KeyPriority}, hb.conf.GetWritePeerPriorities()) - re.Equal("v1", hb.conf.GetRankFormulaVersion()) + re.Equal([]string{utils.QueryPriority, utils.BytePriority}, hb.conf.getReadPriorities()) + re.Equal([]string{utils.QueryPriority, utils.BytePriority}, hb.conf.getWriteLeaderPriorities()) + re.Equal([]string{utils.BytePriority, utils.KeyPriority}, hb.conf.getWritePeerPriorities()) + re.Equal("v1", hb.conf.getRankFormulaVersion()) } func TestGCPendingOpInfos(t *testing.T) { @@ -151,7 +151,7 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) { op.Start() op.SetStatusReachTime(operator.CREATED, time.Now().Add(-5*utils.StoreHeartBeatReportInterval*time.Second)) op.SetStatusReachTime(operator.STARTED, time.Now().Add((-5*utils.StoreHeartBeatReportInterval+1)*time.Second)) - return newPendingInfluence(op, []uint64{2}, 4, statistics.Influence{}, hb.conf.GetStoreStatZombieDuration()) + return newPendingInfluence(op, []uint64{2}, 4, statistics.Influence{}, hb.conf.getStoreStatZombieDuration()) } justDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence { infl := notDoneOpInfluence(region, ty) @@ -400,7 +400,7 @@ func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules b hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).types = []resourceType{writePeer} - hb.(*hotScheduler).conf.SetHistorySampleDuration(0) + hb.(*hotScheduler).conf.setHistorySampleDuration(0) tc.AddLabelsStore(1, 2, map[string]string{"zone": "z1", "host": "h1"}) tc.AddLabelsStore(2, 2, map[string]string{"zone": "z1", "host": "h2"}) @@ -456,7 +456,7 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) - hb.(*hotScheduler).conf.SetHistorySampleDuration(0) + hb.(*hotScheduler).conf.setHistorySampleDuration(0) hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.BytePriority, utils.KeyPriority} // Add stores 1, 2, 3, 4, 5, 6 with region counts 3, 2, 2, 2, 0, 0. @@ -652,7 +652,7 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb := sche.(*hotScheduler) - hb.conf.SetHistorySampleDuration(0) + hb.conf.setHistorySampleDuration(0) // Add TiKV stores 1, 2, 3, 4, 5, 6, 7 (Down) with region counts 3, 3, 2, 2, 0, 0, 0. // Add TiFlash stores 8, 9, 10 with region counts 2, 1, 1. @@ -734,7 +734,7 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { } pdServerCfg.FlowRoundByDigit = 3 // Disable for TiFlash - hb.conf.SetEnableForTiFlash(false) + hb.conf.setEnableForTiFlash(false) for i := 0; i < 20; i++ { clearPendingInfluence(hb) ops, _ := hb.Schedule(tc, false) @@ -848,10 +848,10 @@ func TestHotWriteRegionScheduleWithQuery(t *testing.T) { hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) - hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) - hb.(*hotScheduler).conf.SetDstToleranceRatio(1) + hb.(*hotScheduler).conf.setSrcToleranceRatio(1) + hb.(*hotScheduler).conf.setDstToleranceRatio(1) hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.QueryPriority, utils.BytePriority} - hb.(*hotScheduler).conf.SetHistorySampleDuration(0) + hb.(*hotScheduler).conf.setHistorySampleDuration(0) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -884,11 +884,11 @@ func TestHotWriteRegionScheduleWithKeyRate(t *testing.T) { hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).types = []resourceType{writePeer} - hb.(*hotScheduler).conf.SetDstToleranceRatio(1) - hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) + hb.(*hotScheduler).conf.setDstToleranceRatio(1) + hb.(*hotScheduler).conf.setSrcToleranceRatio(1) hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority} hb.(*hotScheduler).conf.RankFormulaVersion = "v1" - hb.(*hotScheduler).conf.SetHistorySampleDuration(0) + hb.(*hotScheduler).conf.setHistorySampleDuration(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) @@ -941,8 +941,8 @@ func TestHotWriteRegionScheduleUnhealthyStore(t *testing.T) { defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) - hb.(*hotScheduler).conf.SetDstToleranceRatio(1) - hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) + hb.(*hotScheduler).conf.setDstToleranceRatio(1) + hb.(*hotScheduler).conf.setSrcToleranceRatio(1) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) @@ -986,8 +986,8 @@ func TestHotWriteRegionScheduleCheckHot(t *testing.T) { defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) - hb.(*hotScheduler).conf.SetDstToleranceRatio(1) - hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) + hb.(*hotScheduler).conf.setDstToleranceRatio(1) + hb.(*hotScheduler).conf.setSrcToleranceRatio(1) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) @@ -1019,7 +1019,7 @@ func TestHotWriteRegionScheduleWithLeader(t *testing.T) { hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) hb.(*hotScheduler).types = []resourceType{writeLeader} hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority} - hb.(*hotScheduler).conf.SetHistorySampleDuration(0) + hb.(*hotScheduler).conf.setHistorySampleDuration(0) re.NoError(err) tc.AddRegionStore(1, 20) @@ -1085,7 +1085,7 @@ func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.RankFormulaVersion = "v1" - hb.(*hotScheduler).conf.SetHistorySampleDuration(0) + hb.(*hotScheduler).conf.setHistorySampleDuration(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) @@ -1169,7 +1169,7 @@ func TestHotWriteRegionScheduleWithRuleEnabled(t *testing.T) { hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority} - hb.(*hotScheduler).conf.SetHistorySampleDuration(0) + hb.(*hotScheduler).conf.setHistorySampleDuration(0) key, err := hex.DecodeString("") re.NoError(err) @@ -1250,7 +1250,7 @@ func TestHotReadRegionScheduleByteRateOnly(t *testing.T) { re.NoError(err) hb := scheduler.(*hotScheduler) hb.conf.ReadPriorities = []string{utils.BytePriority, utils.KeyPriority} - hb.conf.SetHistorySampleDuration(0) + hb.conf.setHistorySampleDuration(0) // Add stores 1, 2, 3, 4, 5 with region counts 3, 2, 2, 2, 0. tc.AddRegionStore(1, 3) @@ -1370,10 +1370,10 @@ func TestHotReadRegionScheduleWithQuery(t *testing.T) { defer cancel() hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) - hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) - hb.(*hotScheduler).conf.SetDstToleranceRatio(1) + hb.(*hotScheduler).conf.setSrcToleranceRatio(1) + hb.(*hotScheduler).conf.setDstToleranceRatio(1) hb.(*hotScheduler).conf.RankFormulaVersion = "v1" - hb.(*hotScheduler).conf.SetHistorySampleDuration(0) + hb.(*hotScheduler).conf.setHistorySampleDuration(0) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -1404,10 +1404,10 @@ func TestHotReadRegionScheduleWithKeyRate(t *testing.T) { hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.RankFormulaVersion = "v1" - hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) - hb.(*hotScheduler).conf.SetDstToleranceRatio(1) + hb.(*hotScheduler).conf.setSrcToleranceRatio(1) + hb.(*hotScheduler).conf.setDstToleranceRatio(1) hb.(*hotScheduler).conf.ReadPriorities = []string{utils.BytePriority, utils.KeyPriority} - hb.(*hotScheduler).conf.SetHistorySampleDuration(0) + hb.(*hotScheduler).conf.setHistorySampleDuration(0) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -1469,7 +1469,7 @@ func checkHotReadRegionScheduleWithPendingInfluence(re *require.Assertions, dim hb.(*hotScheduler).conf.MinorDecRatio = 1 hb.(*hotScheduler).conf.DstToleranceRatio = 1 hb.(*hotScheduler).conf.ReadPriorities = []string{utils.BytePriority, utils.KeyPriority} - hb.(*hotScheduler).conf.SetHistorySampleDuration(0) + hb.(*hotScheduler).conf.setHistorySampleDuration(0) pendingAmpFactor = 0.0 tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) @@ -1575,9 +1575,9 @@ func TestHotReadWithEvictLeaderScheduler(t *testing.T) { defer cancel() hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) - hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) - hb.(*hotScheduler).conf.SetDstToleranceRatio(1) - hb.(*hotScheduler).conf.SetStrictPickingStore(false) + hb.(*hotScheduler).conf.setSrcToleranceRatio(1) + hb.(*hotScheduler).conf.setDstToleranceRatio(1) + hb.(*hotScheduler).conf.setStrictPickingStore(false) hb.(*hotScheduler).conf.ReadPriorities = []string{utils.BytePriority, utils.KeyPriority} tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) @@ -2042,9 +2042,9 @@ func TestInfluenceByRWType(t *testing.T) { hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).types = []resourceType{writePeer} - hb.(*hotScheduler).conf.SetDstToleranceRatio(1) - hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) - hb.(*hotScheduler).conf.SetHistorySampleDuration(0) + hb.(*hotScheduler).conf.setDstToleranceRatio(1) + hb.(*hotScheduler).conf.setSrcToleranceRatio(1) + hb.(*hotScheduler).conf.setHistorySampleDuration(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -2162,9 +2162,9 @@ func TestHotScheduleWithPriority(t *testing.T) { hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).types = []resourceType{writePeer} - hb.(*hotScheduler).conf.SetDstToleranceRatio(1.05) - hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.05) - hb.(*hotScheduler).conf.SetHistorySampleDuration(0) + hb.(*hotScheduler).conf.setDstToleranceRatio(1.05) + hb.(*hotScheduler).conf.setSrcToleranceRatio(1.05) + hb.(*hotScheduler).conf.setHistorySampleDuration(0) // skip stddev check stddevThreshold = -1.0 @@ -2207,7 +2207,7 @@ func TestHotScheduleWithPriority(t *testing.T) { addRegionInfo(tc, utils.Read, []testRegionInfo{ {1, []uint64{1, 2, 3}, 2 * units.MiB, 2 * units.MiB, 0}, }) - hb.(*hotScheduler).conf.SetHistorySampleDuration(0) + hb.(*hotScheduler).conf.setHistorySampleDuration(0) hb.(*hotScheduler).conf.ReadPriorities = []string{utils.BytePriority, utils.KeyPriority} ops, _ = hb.Schedule(tc, false) re.Len(ops, 1) @@ -2222,7 +2222,7 @@ func TestHotScheduleWithPriority(t *testing.T) { hb.(*hotScheduler).types = []resourceType{writePeer} hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority} hb.(*hotScheduler).conf.RankFormulaVersion = "v1" - hb.(*hotScheduler).conf.SetHistorySampleDuration(0) + hb.(*hotScheduler).conf.setHistorySampleDuration(0) re.NoError(err) // assert loose store picking @@ -2264,8 +2264,8 @@ func TestHotScheduleWithStddev(t *testing.T) { hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).types = []resourceType{writePeer} - hb.(*hotScheduler).conf.SetDstToleranceRatio(1.0) - hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.0) + hb.(*hotScheduler).conf.setDstToleranceRatio(1.0) + hb.(*hotScheduler).conf.setSrcToleranceRatio(1.0) hb.(*hotScheduler).conf.RankFormulaVersion = "v1" tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) @@ -2274,7 +2274,7 @@ func TestHotScheduleWithStddev(t *testing.T) { tc.AddRegionStore(4, 20) tc.AddRegionStore(5, 20) hb.(*hotScheduler).conf.StrictPickingStore = false - hb.(*hotScheduler).conf.SetHistorySampleDuration(0) + hb.(*hotScheduler).conf.setHistorySampleDuration(0) // skip uniform cluster tc.UpdateStorageWrittenStats(1, 5*units.MiB*utils.StoreHeartBeatReportInterval, 5*units.MiB*utils.StoreHeartBeatReportInterval) @@ -2323,9 +2323,9 @@ func TestHotWriteLeaderScheduleWithPriority(t *testing.T) { hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).types = []resourceType{writeLeader} - hb.(*hotScheduler).conf.SetDstToleranceRatio(1) - hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) - hb.(*hotScheduler).conf.SetHistorySampleDuration(0) + hb.(*hotScheduler).conf.setDstToleranceRatio(1) + hb.(*hotScheduler).conf.setSrcToleranceRatio(1) + hb.(*hotScheduler).conf.setHistorySampleDuration(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) @@ -2533,17 +2533,17 @@ func TestConfigValidation(t *testing.T) { // rank-formula-version // default hc = initHotRegionScheduleConfig() - re.Equal("v2", hc.GetRankFormulaVersion()) + re.Equal("v2", hc.getRankFormulaVersion()) // v1 hc.RankFormulaVersion = "v1" err = hc.validateLocked() re.NoError(err) - re.Equal("v1", hc.GetRankFormulaVersion()) + re.Equal("v1", hc.getRankFormulaVersion()) // v2 hc.RankFormulaVersion = "v2" err = hc.validateLocked() re.NoError(err) - re.Equal("v2", hc.GetRankFormulaVersion()) + re.Equal("v2", hc.getRankFormulaVersion()) // illegal hc.RankFormulaVersion = "v0" err = hc.validateLocked() @@ -2552,20 +2552,20 @@ func TestConfigValidation(t *testing.T) { // forbid-rw-type // default hc = initHotRegionScheduleConfig() - re.False(hc.IsForbidRWType(utils.Read)) - re.False(hc.IsForbidRWType(utils.Write)) + re.False(hc.isForbidRWType(utils.Read)) + re.False(hc.isForbidRWType(utils.Write)) // read hc.ForbidRWType = "read" err = hc.validateLocked() re.NoError(err) - re.True(hc.IsForbidRWType(utils.Read)) - re.False(hc.IsForbidRWType(utils.Write)) + re.True(hc.isForbidRWType(utils.Read)) + re.False(hc.isForbidRWType(utils.Write)) // write hc.ForbidRWType = "write" err = hc.validateLocked() re.NoError(err) - re.False(hc.IsForbidRWType(utils.Read)) - re.True(hc.IsForbidRWType(utils.Write)) + re.False(hc.isForbidRWType(utils.Read)) + re.True(hc.isForbidRWType(utils.Write)) // illegal hc.ForbidRWType = "test" err = hc.validateLocked() diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 988bbc30475..5990aa2cda3 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -225,7 +225,7 @@ func schedulersRegister() { // For clusters with the initial version >= v5.2, it will be overwritten by the default config. conf.applyPrioritiesConfig(compatiblePrioritiesConfig) // For clusters with the initial version >= v6.4, it will be overwritten by the default config. - conf.SetRankFormulaVersion("") + conf.setRankFormulaVersion("") if err := decoder(conf); err != nil { return nil, err } diff --git a/pkg/schedule/schedulers/label.go b/pkg/schedule/schedulers/label.go index 814f525a76c..f57d82b3149 100644 --- a/pkg/schedule/schedulers/label.go +++ b/pkg/schedule/schedulers/label.go @@ -55,10 +55,12 @@ func newLabelScheduler(opController *operator.Controller, conf *labelSchedulerCo } } +// EncodeConfig implements the Scheduler interface. func (s *labelScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +// IsScheduleAllowed implements the Scheduler interface. func (s *labelScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() if !allowed { @@ -67,6 +69,7 @@ func (s *labelScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { return allowed } +// Schedule implements the Scheduler interface. func (s *labelScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { labelCounter.Inc() stores := cluster.GetStores() diff --git a/pkg/schedule/schedulers/random_merge.go b/pkg/schedule/schedulers/random_merge.go index 2d425746cea..751ab1eaa9d 100644 --- a/pkg/schedule/schedulers/random_merge.go +++ b/pkg/schedule/schedulers/random_merge.go @@ -56,10 +56,12 @@ func newRandomMergeScheduler(opController *operator.Controller, conf *randomMerg } } +// EncodeConfig implements the Scheduler interface. func (s *randomMergeScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +// IsScheduleAllowed implements the Scheduler interface. func (s *randomMergeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := s.OpController.OperatorCount(operator.OpMerge) < cluster.GetSchedulerConfig().GetMergeScheduleLimit() if !allowed { @@ -68,6 +70,7 @@ func (s *randomMergeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) return allowed } +// Schedule implements the Scheduler interface. func (s *randomMergeScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { randomMergeCounter.Inc() diff --git a/pkg/schedule/schedulers/scatter_range.go b/pkg/schedule/schedulers/scatter_range.go index 8874eb19cff..76a47dd973b 100644 --- a/pkg/schedule/schedulers/scatter_range.go +++ b/pkg/schedule/schedulers/scatter_range.go @@ -47,7 +47,7 @@ type scatterRangeSchedulerConfig struct { EndKey string `json:"end-key"` } -func (conf *scatterRangeSchedulerConfig) BuildWithArgs(args []string) error { +func (conf *scatterRangeSchedulerConfig) buildWithArgs(args []string) error { if len(args) != 3 { return errs.ErrSchedulerConfig.FastGenByArgs("ranges and name") } @@ -60,7 +60,7 @@ func (conf *scatterRangeSchedulerConfig) BuildWithArgs(args []string) error { return nil } -func (conf *scatterRangeSchedulerConfig) Clone() *scatterRangeSchedulerConfig { +func (conf *scatterRangeSchedulerConfig) clone() *scatterRangeSchedulerConfig { conf.RLock() defer conf.RUnlock() return &scatterRangeSchedulerConfig{ @@ -70,7 +70,7 @@ func (conf *scatterRangeSchedulerConfig) Clone() *scatterRangeSchedulerConfig { } } -func (conf *scatterRangeSchedulerConfig) Persist() error { +func (conf *scatterRangeSchedulerConfig) persist() error { name := conf.getSchedulerName() conf.RLock() defer conf.RUnlock() @@ -81,19 +81,19 @@ func (conf *scatterRangeSchedulerConfig) Persist() error { return conf.storage.SaveSchedulerConfig(name, data) } -func (conf *scatterRangeSchedulerConfig) GetRangeName() string { +func (conf *scatterRangeSchedulerConfig) getRangeName() string { conf.RLock() defer conf.RUnlock() return conf.RangeName } -func (conf *scatterRangeSchedulerConfig) GetStartKey() []byte { +func (conf *scatterRangeSchedulerConfig) getStartKey() []byte { conf.RLock() defer conf.RUnlock() return []byte(conf.StartKey) } -func (conf *scatterRangeSchedulerConfig) GetEndKey() []byte { +func (conf *scatterRangeSchedulerConfig) getEndKey() []byte { conf.RLock() defer conf.RUnlock() return []byte(conf.EndKey) @@ -139,16 +139,19 @@ func newScatterRangeScheduler(opController *operator.Controller, config *scatter return scheduler } +// ServeHTTP implements the http.Handler interface. func (l *scatterRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { l.handler.ServeHTTP(w, r) } +// EncodeConfig implements the Scheduler interface. func (l *scatterRangeScheduler) EncodeConfig() ([]byte, error) { l.config.RLock() defer l.config.RUnlock() return EncodeConfig(l.config) } +// ReloadConfig implements the Scheduler interface. func (l *scatterRangeScheduler) ReloadConfig() error { l.config.Lock() defer l.config.Unlock() @@ -169,6 +172,7 @@ func (l *scatterRangeScheduler) ReloadConfig() error { return nil } +// IsScheduleAllowed implements the Scheduler interface. func (l *scatterRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { return l.allowBalanceLeader(cluster) || l.allowBalanceRegion(cluster) } @@ -189,15 +193,16 @@ func (l *scatterRangeScheduler) allowBalanceRegion(cluster sche.SchedulerCluster return allowed } +// Schedule implements the Scheduler interface. func (l *scatterRangeScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { scatterRangeCounter.Inc() // isolate a new cluster according to the key range - c := genRangeCluster(cluster, l.config.GetStartKey(), l.config.GetEndKey()) + c := genRangeCluster(cluster, l.config.getStartKey(), l.config.getEndKey()) c.SetTolerantSizeRatio(2) if l.allowBalanceLeader(cluster) { ops, _ := l.balanceLeader.Schedule(c, false) if len(ops) > 0 { - ops[0].SetDesc(fmt.Sprintf("scatter-range-leader-%s", l.config.GetRangeName())) + ops[0].SetDesc(fmt.Sprintf("scatter-range-leader-%s", l.config.getRangeName())) ops[0].AttachKind(operator.OpRange) ops[0].Counters = append(ops[0].Counters, scatterRangeNewOperatorCounter, @@ -209,7 +214,7 @@ func (l *scatterRangeScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) if l.allowBalanceRegion(cluster) { ops, _ := l.balanceRegion.Schedule(c, false) if len(ops) > 0 { - ops[0].SetDesc(fmt.Sprintf("scatter-range-region-%s", l.config.GetRangeName())) + ops[0].SetDesc(fmt.Sprintf("scatter-range-region-%s", l.config.getRangeName())) ops[0].AttachKind(operator.OpRange) ops[0].Counters = append(ops[0].Counters, scatterRangeNewOperatorCounter, @@ -227,7 +232,7 @@ type scatterRangeHandler struct { config *scatterRangeSchedulerConfig } -func (handler *scatterRangeHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { +func (handler *scatterRangeHandler) updateConfig(w http.ResponseWriter, r *http.Request) { var input map[string]any if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { return @@ -235,42 +240,42 @@ func (handler *scatterRangeHandler) UpdateConfig(w http.ResponseWriter, r *http. var args []string name, ok := input["range-name"].(string) if ok { - if name != handler.config.GetRangeName() { + if name != handler.config.getRangeName() { handler.rd.JSON(w, http.StatusInternalServerError, errors.New("Cannot change the range name, please delete this schedule").Error()) return } args = append(args, name) } else { - args = append(args, handler.config.GetRangeName()) + args = append(args, handler.config.getRangeName()) } startKey, ok := input["start-key"].(string) if ok { args = append(args, startKey) } else { - args = append(args, string(handler.config.GetStartKey())) + args = append(args, string(handler.config.getStartKey())) } endKey, ok := input["end-key"].(string) if ok { args = append(args, endKey) } else { - args = append(args, string(handler.config.GetEndKey())) + args = append(args, string(handler.config.getEndKey())) } - err := handler.config.BuildWithArgs(args) + err := handler.config.buildWithArgs(args) if err != nil { handler.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - err = handler.config.Persist() + err = handler.config.persist() if err != nil { handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) } handler.rd.JSON(w, http.StatusOK, nil) } -func (handler *scatterRangeHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { - conf := handler.config.Clone() +func (handler *scatterRangeHandler) listConfig(w http.ResponseWriter, _ *http.Request) { + conf := handler.config.clone() handler.rd.JSON(w, http.StatusOK, conf) } @@ -280,7 +285,7 @@ func newScatterRangeHandler(config *scatterRangeSchedulerConfig) http.Handler { rd: render.New(render.Options{IndentJSON: true}), } router := mux.NewRouter() - router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) - router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) + router.HandleFunc("/config", h.updateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.listConfig).Methods(http.MethodGet) return router } diff --git a/pkg/schedule/schedulers/shuffle_hot_region.go b/pkg/schedule/schedulers/shuffle_hot_region.go index 32384a19df1..5bb5d269b63 100644 --- a/pkg/schedule/schedulers/shuffle_hot_region.go +++ b/pkg/schedule/schedulers/shuffle_hot_region.go @@ -94,14 +94,17 @@ func newShuffleHotRegionScheduler(opController *operator.Controller, conf *shuff return ret } +// ServeHTTP implements the http.Handler interface. func (s *shuffleHotRegionScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.handler.ServeHTTP(w, r) } +// EncodeConfig implements the Scheduler interface. func (s *shuffleHotRegionScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +// ReloadConfig implements the Scheduler interface. func (s *shuffleHotRegionScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() @@ -120,6 +123,7 @@ func (s *shuffleHotRegionScheduler) ReloadConfig() error { return nil } +// IsScheduleAllowed implements the Scheduler interface. func (s *shuffleHotRegionScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { hotRegionAllowed := s.OpController.OperatorCount(operator.OpHotRegion) < s.conf.getLimit() conf := cluster.GetSchedulerConfig() @@ -137,6 +141,7 @@ func (s *shuffleHotRegionScheduler) IsScheduleAllowed(cluster sche.SchedulerClus return hotRegionAllowed && regionAllowed && leaderAllowed } +// Schedule implements the Scheduler interface. func (s *shuffleHotRegionScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { shuffleHotRegionCounter.Inc() typ := s.randomType() @@ -211,7 +216,7 @@ type shuffleHotRegionHandler struct { config *shuffleHotRegionSchedulerConfig } -func (handler *shuffleHotRegionHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { +func (handler *shuffleHotRegionHandler) updateConfig(w http.ResponseWriter, r *http.Request) { var input map[string]any if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { return @@ -234,7 +239,7 @@ func (handler *shuffleHotRegionHandler) UpdateConfig(w http.ResponseWriter, r *h handler.rd.JSON(w, http.StatusOK, nil) } -func (handler *shuffleHotRegionHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { +func (handler *shuffleHotRegionHandler) listConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() handler.rd.JSON(w, http.StatusOK, conf) } @@ -245,7 +250,7 @@ func newShuffleHotRegionHandler(config *shuffleHotRegionSchedulerConfig) http.Ha rd: render.New(render.Options{IndentJSON: true}), } router := mux.NewRouter() - router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) - router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) + router.HandleFunc("/config", h.updateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.listConfig).Methods(http.MethodGet) return router } diff --git a/pkg/schedule/schedulers/shuffle_leader.go b/pkg/schedule/schedulers/shuffle_leader.go index ce2c8cd31d5..46e04efb23d 100644 --- a/pkg/schedule/schedulers/shuffle_leader.go +++ b/pkg/schedule/schedulers/shuffle_leader.go @@ -60,10 +60,12 @@ func newShuffleLeaderScheduler(opController *operator.Controller, conf *shuffleL } } +// EncodeConfig implements the Scheduler interface. func (s *shuffleLeaderScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +// IsScheduleAllowed implements the Scheduler interface. func (s *shuffleLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() if !allowed { @@ -72,6 +74,7 @@ func (s *shuffleLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster return allowed } +// Schedule implements the Scheduler interface. func (s *shuffleLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { // We shuffle leaders between stores by: // 1. random select a valid store. diff --git a/pkg/schedule/schedulers/shuffle_region.go b/pkg/schedule/schedulers/shuffle_region.go index b59e97b2a11..ca759042e8f 100644 --- a/pkg/schedule/schedulers/shuffle_region.go +++ b/pkg/schedule/schedulers/shuffle_region.go @@ -55,14 +55,17 @@ func newShuffleRegionScheduler(opController *operator.Controller, conf *shuffleR } } +// ServeHTTP implements the http.Handler interface. func (s *shuffleRegionScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.conf.ServeHTTP(w, r) } +// EncodeConfig implements the Scheduler interface. func (s *shuffleRegionScheduler) EncodeConfig() ([]byte, error) { - return s.conf.EncodeConfig() + return s.conf.encodeConfig() } +// ReloadConfig implements the Scheduler interface. func (s *shuffleRegionScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() @@ -82,6 +85,7 @@ func (s *shuffleRegionScheduler) ReloadConfig() error { return nil } +// IsScheduleAllowed implements the Scheduler interface. func (s *shuffleRegionScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := s.OpController.OperatorCount(operator.OpRegion) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() if !allowed { @@ -90,6 +94,7 @@ func (s *shuffleRegionScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster return allowed } +// Schedule implements the Scheduler interface. func (s *shuffleRegionScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { shuffleRegionCounter.Inc() region, oldPeer := s.scheduleRemovePeer(cluster) @@ -122,18 +127,18 @@ func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster sche.SchedulerCluste pendingFilter := filter.NewRegionPendingFilter() downFilter := filter.NewRegionDownFilter() replicaFilter := filter.NewRegionReplicatedFilter(cluster) - ranges := s.conf.GetRanges() + ranges := s.conf.getRanges() for _, source := range candidates.Stores { var region *core.RegionInfo - if s.conf.IsRoleAllow(roleFollower) { + if s.conf.isRoleAllow(roleFollower) { region = filter.SelectOneRegion(cluster.RandFollowerRegions(source.GetID(), ranges), nil, pendingFilter, downFilter, replicaFilter) } - if region == nil && s.conf.IsRoleAllow(roleLeader) { + if region == nil && s.conf.isRoleAllow(roleLeader) { region = filter.SelectOneRegion(cluster.RandLeaderRegions(source.GetID(), ranges), nil, pendingFilter, downFilter, replicaFilter) } - if region == nil && s.conf.IsRoleAllow(roleLearner) { + if region == nil && s.conf.isRoleAllow(roleLearner) { region = filter.SelectOneRegion(cluster.RandLearnerRegions(source.GetID(), ranges), nil, pendingFilter, downFilter, replicaFilter) } diff --git a/pkg/schedule/schedulers/shuffle_region_config.go b/pkg/schedule/schedulers/shuffle_region_config.go index bce64f743b8..fbf53cfeb4d 100644 --- a/pkg/schedule/schedulers/shuffle_region_config.go +++ b/pkg/schedule/schedulers/shuffle_region_config.go @@ -43,19 +43,19 @@ type shuffleRegionSchedulerConfig struct { Roles []string `json:"roles"` // can include `leader`, `follower`, `learner`. } -func (conf *shuffleRegionSchedulerConfig) EncodeConfig() ([]byte, error) { +func (conf *shuffleRegionSchedulerConfig) encodeConfig() ([]byte, error) { conf.RLock() defer conf.RUnlock() return EncodeConfig(conf) } -func (conf *shuffleRegionSchedulerConfig) GetRoles() []string { +func (conf *shuffleRegionSchedulerConfig) getRoles() []string { conf.RLock() defer conf.RUnlock() return conf.Roles } -func (conf *shuffleRegionSchedulerConfig) GetRanges() []core.KeyRange { +func (conf *shuffleRegionSchedulerConfig) getRanges() []core.KeyRange { conf.RLock() defer conf.RUnlock() ranges := make([]core.KeyRange, len(conf.Ranges)) @@ -63,12 +63,13 @@ func (conf *shuffleRegionSchedulerConfig) GetRanges() []core.KeyRange { return ranges } -func (conf *shuffleRegionSchedulerConfig) IsRoleAllow(role string) bool { +func (conf *shuffleRegionSchedulerConfig) isRoleAllow(role string) bool { conf.RLock() defer conf.RUnlock() return slice.AnyOf(conf.Roles, func(i int) bool { return conf.Roles[i] == role }) } +// ServeHTTP implements the http.Handler interface. func (conf *shuffleRegionSchedulerConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) { router := mux.NewRouter() router.HandleFunc("/list", conf.handleGetRoles).Methods(http.MethodGet) @@ -79,7 +80,7 @@ func (conf *shuffleRegionSchedulerConfig) ServeHTTP(w http.ResponseWriter, r *ht func (conf *shuffleRegionSchedulerConfig) handleGetRoles(w http.ResponseWriter, _ *http.Request) { rd := render.New(render.Options{IndentJSON: true}) - rd.JSON(w, http.StatusOK, conf.GetRoles()) + rd.JSON(w, http.StatusOK, conf.getRoles()) } func (conf *shuffleRegionSchedulerConfig) handleSetRoles(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index 2031e232aee..7f33b996f1c 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -60,7 +60,7 @@ type splitBucketSchedulerConfig struct { SplitLimit uint64 `json:"split-limit"` } -func (conf *splitBucketSchedulerConfig) Clone() *splitBucketSchedulerConfig { +func (conf *splitBucketSchedulerConfig) clone() *splitBucketSchedulerConfig { conf.RLock() defer conf.RUnlock() return &splitBucketSchedulerConfig{ @@ -100,7 +100,7 @@ type splitBucketHandler struct { } func (h *splitBucketHandler) listConfig(w http.ResponseWriter, _ *http.Request) { - conf := h.conf.Clone() + conf := h.conf.clone() h.rd.JSON(w, http.StatusOK, conf) } @@ -213,7 +213,7 @@ type splitBucketPlan struct { // Schedule return operators if some bucket is too hot. func (s *splitBucketScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { splitBucketScheduleCounter.Inc() - conf := s.conf.Clone() + conf := s.conf.clone() plan := &splitBucketPlan{ conf: conf, cluster: cluster, diff --git a/pkg/schedule/schedulers/transfer_witness_leader.go b/pkg/schedule/schedulers/transfer_witness_leader.go index 8b6e9c39f1d..c1c59620735 100644 --- a/pkg/schedule/schedulers/transfer_witness_leader.go +++ b/pkg/schedule/schedulers/transfer_witness_leader.go @@ -54,10 +54,12 @@ func newTransferWitnessLeaderScheduler(opController *operator.Controller) Schedu } } +// IsScheduleAllowed implements the Scheduler interface. func (*transferWitnessLeaderScheduler) IsScheduleAllowed(sche.SchedulerCluster) bool { return true } +// Schedule implements the Scheduler interface. func (s *transferWitnessLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { transferWitnessLeaderCounter.Inc() return s.scheduleTransferWitnessLeaderBatch(s.GetName(), cluster, transferWitnessLeaderBatchSize), nil diff --git a/pkg/schedule/schedulers/utils.go b/pkg/schedule/schedulers/utils.go index c708541e02e..1e911cf7b06 100644 --- a/pkg/schedule/schedulers/utils.go +++ b/pkg/schedule/schedulers/utils.go @@ -65,24 +65,24 @@ func newSolver(basePlan *plan.BalanceSchedulerPlan, kind constant.ScheduleKind, } } -func (p *solver) GetOpInfluence(storeID uint64) int64 { +func (p *solver) getOpInfluence(storeID uint64) int64 { return p.opInfluence.GetStoreInfluence(storeID).ResourceProperty(p.kind) } -func (p *solver) SourceStoreID() uint64 { +func (p *solver) sourceStoreID() uint64 { return p.Source.GetID() } -func (p *solver) SourceMetricLabel() string { - return strconv.FormatUint(p.SourceStoreID(), 10) +func (p *solver) sourceMetricLabel() string { + return strconv.FormatUint(p.sourceStoreID(), 10) } -func (p *solver) TargetStoreID() uint64 { +func (p *solver) targetStoreID() uint64 { return p.Target.GetID() } -func (p *solver) TargetMetricLabel() string { - return strconv.FormatUint(p.TargetStoreID(), 10) +func (p *solver) targetMetricLabel() string { + return strconv.FormatUint(p.targetStoreID(), 10) } func (p *solver) sourceStoreScore(scheduleName string) float64 { @@ -90,7 +90,7 @@ func (p *solver) sourceStoreScore(scheduleName string) float64 { tolerantResource := p.getTolerantResource() // to avoid schedule too much, if A's core greater than B and C a little // we want that A should be moved out one region not two - influence := p.GetOpInfluence(sourceID) + influence := p.getOpInfluence(sourceID) if influence > 0 { influence = -influence } @@ -121,7 +121,7 @@ func (p *solver) targetStoreScore(scheduleName string) float64 { tolerantResource := p.getTolerantResource() // to avoid schedule call back // A->B, A's influence is negative, so A will be target, C may move region to A - influence := p.GetOpInfluence(targetID) + influence := p.getOpInfluence(targetID) if influence < 0 { influence = -influence } @@ -358,7 +358,7 @@ func newRetryQuota() *retryQuota { } } -func (q *retryQuota) GetLimit(store *core.StoreInfo) int { +func (q *retryQuota) getLimit(store *core.StoreInfo) int { id := store.GetID() if limit, ok := q.limits[id]; ok { return limit @@ -367,19 +367,19 @@ func (q *retryQuota) GetLimit(store *core.StoreInfo) int { return q.initialLimit } -func (q *retryQuota) ResetLimit(store *core.StoreInfo) { +func (q *retryQuota) resetLimit(store *core.StoreInfo) { q.limits[store.GetID()] = q.initialLimit } -func (q *retryQuota) Attenuate(store *core.StoreInfo) { - newLimit := q.GetLimit(store) / q.attenuation +func (q *retryQuota) attenuate(store *core.StoreInfo) { + newLimit := q.getLimit(store) / q.attenuation if newLimit < q.minLimit { newLimit = q.minLimit } q.limits[store.GetID()] = newLimit } -func (q *retryQuota) GC(keepStores []*core.StoreInfo) { +func (q *retryQuota) gc(keepStores []*core.StoreInfo) { set := make(map[uint64]struct{}, len(keepStores)) for _, store := range keepStores { set[store.GetID()] = struct{}{} diff --git a/pkg/schedule/schedulers/utils_test.go b/pkg/schedule/schedulers/utils_test.go index a2f5aa4dad0..deb7c6e1038 100644 --- a/pkg/schedule/schedulers/utils_test.go +++ b/pkg/schedule/schedulers/utils_test.go @@ -30,24 +30,24 @@ func TestRetryQuota(t *testing.T) { store2 := core.NewStoreInfo(&metapb.Store{Id: 2}) keepStores := []*core.StoreInfo{store1} - // test GetLimit - re.Equal(10, q.GetLimit(store1)) + // test getLimit + re.Equal(10, q.getLimit(store1)) - // test Attenuate + // test attenuate for _, expected := range []int{5, 2, 1, 1, 1} { - q.Attenuate(store1) - re.Equal(expected, q.GetLimit(store1)) + q.attenuate(store1) + re.Equal(expected, q.getLimit(store1)) } // test GC - re.Equal(10, q.GetLimit(store2)) - q.Attenuate(store2) - re.Equal(5, q.GetLimit(store2)) - q.GC(keepStores) - re.Equal(1, q.GetLimit(store1)) - re.Equal(10, q.GetLimit(store2)) - - // test ResetLimit - q.ResetLimit(store1) - re.Equal(10, q.GetLimit(store1)) + re.Equal(10, q.getLimit(store2)) + q.attenuate(store2) + re.Equal(5, q.getLimit(store2)) + q.gc(keepStores) + re.Equal(1, q.getLimit(store1)) + re.Equal(10, q.getLimit(store2)) + + // test resetLimit + q.resetLimit(store1) + re.Equal(10, q.getLimit(store1)) } diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index 83d4771adc4..49156abc40c 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -44,7 +44,7 @@ const ( EvictLeaderType = "user-evict-leader" noStoreInSchedulerInfo = "No store in user-evict-leader-scheduler-config" - UserEvictLeaderScheduler types.CheckerSchedulerType = "user-evict-leader-scheduler" + userEvictLeaderScheduler types.CheckerSchedulerType = "user-evict-leader-scheduler" ) func init() { @@ -158,7 +158,7 @@ type evictLeaderScheduler struct { // newEvictLeaderScheduler creates an admin scheduler that transfers all leaders // out of a store. func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeaderSchedulerConfig) schedulers.Scheduler { - base := schedulers.NewBaseScheduler(opController, UserEvictLeaderScheduler) + base := schedulers.NewBaseScheduler(opController, userEvictLeaderScheduler) handler := newEvictLeaderHandler(conf) return &evictLeaderScheduler{ BaseScheduler: base, @@ -172,6 +172,7 @@ func (s *evictLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) s.handler.ServeHTTP(w, r) } +// EncodeConfig implements the Scheduler interface. func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() @@ -244,8 +245,8 @@ type evictLeaderHandler struct { config *evictLeaderSchedulerConfig } -// UpdateConfig updates the config. -func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { +// updateConfig updates the config. +func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.Request) { var input map[string]any if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { return @@ -285,14 +286,12 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R handler.rd.JSON(w, http.StatusOK, nil) } -// ListConfig lists the config. -func (handler *evictLeaderHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { +func (handler *evictLeaderHandler) listConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() handler.rd.JSON(w, http.StatusOK, conf) } -// DeleteConfig deletes the config. -func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.Request) { +func (handler *evictLeaderHandler) deleteConfig(w http.ResponseWriter, r *http.Request) { idStr := mux.Vars(r)["store_id"] id, err := strconv.ParseUint(idStr, 10, 64) if err != nil { @@ -331,9 +330,9 @@ func newEvictLeaderHandler(config *evictLeaderSchedulerConfig) http.Handler { rd: render.New(render.Options{IndentJSON: true}), } router := mux.NewRouter() - router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) - router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) - router.HandleFunc("/delete/{store_id}", h.DeleteConfig).Methods(http.MethodDelete) + router.HandleFunc("/config", h.updateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.listConfig).Methods(http.MethodGet) + router.HandleFunc("/delete/{store_id}", h.deleteConfig).Methods(http.MethodDelete) return router } diff --git a/server/forward.go b/server/forward.go index 650833e1fc1..5c49b871020 100644 --- a/server/forward.go +++ b/server/forward.go @@ -122,7 +122,7 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { default: } - request, err := server.Recv(s.GetTSOProxyRecvFromClientTimeout()) + request, err := server.recv(s.GetTSOProxyRecvFromClientTimeout()) if err == io.EOF { return nil } @@ -189,7 +189,7 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { Count: tsopbResp.GetCount(), Timestamp: tsopbResp.GetTimestamp(), } - if err := server.Send(response); err != nil { + if err := server.send(response); err != nil { return errors.WithStack(err) } } diff --git a/server/grpc_service.go b/server/grpc_service.go index 7b18be47fde..fa9156e884e 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -112,7 +112,7 @@ type pdpbTSORequest struct { err error } -func (s *tsoServer) Send(m *pdpb.TsoResponse) error { +func (s *tsoServer) send(m *pdpb.TsoResponse) error { if atomic.LoadInt32(&s.closed) == 1 { return io.EOF } @@ -139,7 +139,7 @@ func (s *tsoServer) Send(m *pdpb.TsoResponse) error { } } -func (s *tsoServer) Recv(timeout time.Duration) (*pdpb.TsoRequest, error) { +func (s *tsoServer) recv(timeout time.Duration) (*pdpb.TsoRequest, error) { if atomic.LoadInt32(&s.closed) == 1 { return nil, io.EOF } @@ -176,6 +176,7 @@ type heartbeatServer struct { closed int32 } +// Send wraps Send() of PD_RegionHeartbeatServer. func (s *heartbeatServer) Send(m core.RegionHeartbeatResponse) error { if atomic.LoadInt32(&s.closed) == 1 { return io.EOF @@ -199,6 +200,7 @@ func (s *heartbeatServer) Send(m core.RegionHeartbeatResponse) error { } } +// Recv wraps Recv() of PD_RegionHeartbeatServer. func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) { if atomic.LoadInt32(&s.closed) == 1 { return nil, io.EOF diff --git a/server/handler.go b/server/handler.go index d36dd6656ae..34a78a93c3c 100644 --- a/server/handler.go +++ b/server/handler.go @@ -53,6 +53,7 @@ type server struct { *Server } +// GetCoordinator returns the coordinator. func (s *server) GetCoordinator() *schedule.Coordinator { c := s.GetRaftCluster() if c == nil { @@ -61,6 +62,7 @@ func (s *server) GetCoordinator() *schedule.Coordinator { return c.GetCoordinator() } +// GetCluster returns RaftCluster. func (s *server) GetCluster() sche.SchedulerCluster { return s.GetRaftCluster() }