Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt, schedulers: consider placement rules when verify region healthy #1897

Merged
merged 8 commits into from
Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/pd/pkg/mock/mockid"
"github.com/pingcap/pd/pkg/mock/mockoption"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/kv"
"github.com/pingcap/pd/server/schedule/placement"
"github.com/pingcap/pd/server/statistics"
"go.uber.org/zap"
Expand All @@ -42,10 +43,12 @@ type Cluster struct {

// NewCluster creates a new Cluster
func NewCluster(opt *mockoption.ScheduleOptions) *Cluster {
ruleManager, _ := placement.NewRuleManager(core.NewStorage(kv.NewMemoryKV()), opt.MaxReplicas, opt.GetLocationLabels())
return &Cluster{
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
ScheduleOptions: opt,
RuleManager: ruleManager,
HotCache: statistics.NewHotCache(),
StoresStats: statistics.NewStoresStats(),
}
Expand Down
11 changes: 6 additions & 5 deletions server/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/pd/pkg/testutil"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/kv"
"github.com/pingcap/pd/server/schedule/opt"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -840,10 +841,10 @@ func (s *testRegionsInfoSuite) Test(c *C) {
}

for i := uint64(0); i < n; i++ {
region := cache.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")})
region := cluster.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")})
c.Assert(region.GetLeader().GetStoreId(), Equals, i)

region = cache.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")})
region = cluster.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")})
c.Assert(region.GetLeader().GetStoreId(), Not(Equals), i)

c.Assert(region.GetStorePeer(i), NotNil)
Expand All @@ -859,14 +860,14 @@ func (s *testRegionsInfoSuite) Test(c *C) {
// All regions will be filtered out if they have pending peers.
for i := uint64(0); i < n; i++ {
for j := 0; j < cache.GetStoreLeaderCount(i); j++ {
region := cluster.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, core.HealthRegion())
region := cluster.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, opt.HealthRegion(cluster))
newRegion := region.Clone(core.WithPendingPeers(region.GetPeers()))
cache.SetRegion(newRegion)
}
c.Assert(cluster.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, core.HealthRegion()), IsNil)
c.Assert(cluster.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, opt.HealthRegion(cluster)), IsNil)
}
for i := uint64(0); i < n; i++ {
c.Assert(cluster.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, core.HealthRegion()), IsNil)
c.Assert(cluster.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, opt.HealthRegion(cluster)), IsNil)
}
}

Expand Down
24 changes: 9 additions & 15 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,6 @@ import (
// RegionOption is used to select region.
type RegionOption func(region *RegionInfo) bool

// HealthRegion checks if the region is healthy.
func HealthRegion() RegionOption {
return func(region *RegionInfo) bool {
return len(region.downPeers) == 0 && len(region.pendingPeers) == 0 && len(region.learners) == 0
}
}

// HealthRegionAllowPending checks if the region is healthy with allowing the pending peer.
func HealthRegionAllowPending() RegionOption {
return func(region *RegionInfo) bool {
return len(region.downPeers) == 0 && len(region.learners) == 0
}
}

// RegionCreateOption used to create region.
type RegionCreateOption func(region *RegionInfo)

Expand All @@ -55,7 +41,15 @@ func WithPendingPeers(pengdingPeers []*metapb.Peer) RegionCreateOption {
// WithLearners sets the learners for the region.
func WithLearners(learners []*metapb.Peer) RegionCreateOption {
return func(region *RegionInfo) {
region.learners = learners
peers := region.meta.GetPeers()
for i := range peers {
for _, l := range learners {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just mention it, why not try to keep the same style of the loop like for j := range learners, then use learners[j] later instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only because l is shorter than learners[j].

if peers[i].GetId() == l.GetId() {
peers[i] = &metapb.Peer{Id: l.GetId(), StoreId: l.GetStoreId(), IsLearner: true}
break
}
}
}
}
}

Expand Down
7 changes: 3 additions & 4 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule"
"github.com/pingcap/pd/server/schedule/operator"
"github.com/pingcap/pd/server/schedule/opt"
"github.com/pingcap/pd/server/statistics"
"github.com/pkg/errors"
"go.uber.org/zap"
Expand Down Expand Up @@ -612,13 +613,11 @@ func (h *Handler) AddMergeRegionOperator(regionID uint64, targetID uint64) error
return ErrRegionNotFound(targetID)
}

if len(region.GetDownPeers()) > 0 || len(region.GetPendingPeers()) > 0 || len(region.GetLearners()) > 0 ||
len(region.GetPeers()) != c.cluster.GetMaxReplicas() {
if !opt.IsRegionHealthy(c.cluster, region) || !opt.IsRegionReplicated(c.cluster, region) {
return ErrRegionAbnormalPeer(regionID)
}

if len(target.GetDownPeers()) > 0 || len(target.GetPendingPeers()) > 0 || len(target.GetLearners()) > 0 ||
len(target.GetMeta().GetPeers()) != c.cluster.GetMaxReplicas() {
if !opt.IsRegionHealthy(c.cluster, target) || !opt.IsRegionReplicated(c.cluster, target) {
return ErrRegionAbnormalPeer(targetID)
}

Expand Down
7 changes: 3 additions & 4 deletions server/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {
}

// skip region has down peers or pending peers or learner peers
if len(region.GetDownPeers()) > 0 || len(region.GetPendingPeers()) > 0 || len(region.GetLearners()) > 0 {
if !opt.IsRegionHealthy(m.cluster, region) {
checkerCounter.WithLabelValues("merge_checker", "special-peer").Inc()
return nil
}

if len(region.GetPeers()) != m.cluster.GetMaxReplicas() {
if !opt.IsRegionReplicated(m.cluster, region) {
checkerCounter.WithLabelValues("merge_checker", "abnormal-replica").Inc()
return nil
}
Expand Down Expand Up @@ -132,8 +132,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {

func (m *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool {
return adjacent != nil && !m.cluster.IsRegionHot(adjacent) && m.allowMerge(region, adjacent) &&
len(adjacent.GetDownPeers()) == 0 && len(adjacent.GetPendingPeers()) == 0 && len(adjacent.GetLearners()) == 0 && // no special peer
len(adjacent.GetPeers()) == m.cluster.GetMaxReplicas() // peer count should equal
opt.IsRegionHealthy(m.cluster, adjacent) && opt.IsRegionReplicated(m.cluster, adjacent)
}

// allowMerge returns true if two regions can be merged according to the key type.
Expand Down
61 changes: 61 additions & 0 deletions server/schedule/opt/healthy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package opt

import "github.com/pingcap/pd/server/core"

// IsRegionHealthy checks if a region is healthy for scheduling. It requires the
// region does not have any down or pending peers. And when placement rules
// feature is disabled, it requires the region does not have any learner peer.
func IsRegionHealthy(cluster Cluster, region *core.RegionInfo) bool {
return IsHealthyAllowPending(cluster, region) && len(region.GetPendingPeers()) == 0
}

// IsHealthyAllowPending checks if a region is healthy for scheduling.
// Differs from IsRegionHealthy, it allows the region to have pending peers.
func IsHealthyAllowPending(cluster Cluster, region *core.RegionInfo) bool {
if !cluster.IsPlacementRulesEnabled() && len(region.GetLearners()) > 0 {
return false
}
return len(region.GetDownPeers()) == 0
}

// HealthRegion returns a function that checks if a region is healthy for
// scheduling. It requires the region does not have any down or pending peers,
// and does not have any learner peers when placement rules is disabled.
func HealthRegion(cluster Cluster) func(*core.RegionInfo) bool {
return func(region *core.RegionInfo) bool { return IsRegionHealthy(cluster, region) }
}

// HealthAllowPending returns a function that checks if a region is
// healthy for scheduling. Differs from HealthRegion, it allows the region
// to have pending peers.
func HealthAllowPending(cluster Cluster) func(*core.RegionInfo) bool {
return func(region *core.RegionInfo) bool { return IsHealthyAllowPending(cluster, region) }
}

// IsRegionReplicated checks if a region is fully replicated. When placement
// rules is enabled, its peers should fit corresponding rules. When placement
// rules is disabled, it should have enough replicas and no any learner peer.
func IsRegionReplicated(cluster Cluster, region *core.RegionInfo) bool {
if cluster.IsPlacementRulesEnabled() {
return cluster.FitRegion(region).IsSatisfied()
}
return len(region.GetLearners()) == 0 && len(region.GetPeers()) == cluster.GetMaxReplicas()
}

// ReplicatedRegion returns a function that checks if a region is fully replicated.
func ReplicatedRegion(cluster Cluster) func(*core.RegionInfo) bool {
return func(region *core.RegionInfo) bool { return IsRegionReplicated(cluster, region) }
}
91 changes: 91 additions & 0 deletions server/schedule/opt/healthy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package opt

import (
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/pkg/mock/mockcluster"
"github.com/pingcap/pd/pkg/mock/mockoption"
"github.com/pingcap/pd/server/core"
)

func TestOpt(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testRegionHealthySuite{})

type testRegionHealthySuite struct{}

func (s *testRegionHealthySuite) TestIsRegionHealthy(c *C) {
c.Skip("enable it after rule fit merged")

peers := func(ids ...uint64) []*metapb.Peer {
var peers []*metapb.Peer
for _, id := range ids {
p := &metapb.Peer{
Id: id,
StoreId: id,
}
peers = append(peers, p)
}
return peers
}

region := func(peers []*metapb.Peer, opts ...core.RegionCreateOption) *core.RegionInfo {
return core.NewRegionInfo(&metapb.Region{Peers: peers}, peers[0], opts...)
}

type testCase struct {
region *core.RegionInfo
// disable placement rules
healthy1 bool
healthyAllowPending1 bool
replicated1 bool
// enable placement rules
healthy2 bool
healthyAllowPending2 bool
replicated2 bool
}

cases := []testCase{
{region(peers(1, 2, 3)), true, true, true, true, true, true},
{region(peers(1, 2, 3), core.WithPendingPeers(peers(1))), false, true, true, false, true, true},
{region(peers(1, 2, 3), core.WithLearners(peers(1))), false, false, false, true, true, false},
{region(peers(1, 2, 3), core.WithDownPeers([]*pdpb.PeerStats{{Peer: peers(1)[0]}})), false, false, true, false, false, true},
{region(peers(1, 2)), true, true, false, true, true, false},
{region(peers(1, 2, 3, 4), core.WithLearners(peers(1))), false, false, false, true, true, false},
}

opt := mockoption.NewScheduleOptions()
tc := mockcluster.NewCluster(opt)
tc.AddRegionStore(1, 1)
tc.AddRegionStore(2, 1)
tc.AddRegionStore(3, 1)
tc.AddRegionStore(4, 1)
for _, t := range cases {
opt.EnablePlacementRules = false
c.Assert(IsRegionHealthy(tc, t.region), Equals, t.healthy1)
c.Assert(IsHealthyAllowPending(tc, t.region), Equals, t.healthyAllowPending1)
c.Assert(IsRegionReplicated(tc, t.region), Equals, t.replicated1)
opt.EnablePlacementRules = true
c.Assert(IsRegionHealthy(tc, t.region), Equals, t.healthy2)
c.Assert(IsHealthyAllowPending(tc, t.region), Equals, t.healthyAllowPending2)
c.Assert(IsRegionReplicated(tc, t.region), Equals, t.replicated2)
}
}
4 changes: 2 additions & 2 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func NewRegionScatterer(cluster opt.Cluster) *RegionScatterer {

// Scatter relocates the region.
func (r *RegionScatterer) Scatter(region *core.RegionInfo) (*operator.Operator, error) {
if len(region.GetPeers()) != r.cluster.GetMaxReplicas() {
return nil, errors.Errorf("the number replicas of region %d is not expected", region.GetID())
if !opt.IsRegionReplicated(r.cluster, region) {
return nil, errors.Errorf("region %d is not fully replicated", region.GetID())
}

if region.GetLeader() == nil {
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/adjacent_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (l *balanceAdjacentRegionScheduler) process(cluster opt.Cluster) []*operato
}

func (l *balanceAdjacentRegionScheduler) unsafeToBalance(cluster opt.Cluster, region *core.RegionInfo) bool {
if len(region.GetPeers()) != cluster.GetMaxReplicas() {
if !opt.IsRegionReplicated(cluster, region) {
return true
}
storeID := region.GetLeader().GetStoreId()
Expand Down
4 changes: 2 additions & 2 deletions server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
// the best follower peer and transfers the leader.
func (l *balanceLeaderScheduler) transferLeaderOut(cluster opt.Cluster, source *core.StoreInfo) []*operator.Operator {
sourceID := source.GetID()
region := cluster.RandLeaderRegion(sourceID, l.conf.Ranges, core.HealthRegion())
region := cluster.RandLeaderRegion(sourceID, l.conf.Ranges, opt.HealthRegion(cluster))
if region == nil {
log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", sourceID))
schedulerCounter.WithLabelValues(l.GetName(), "no-leader-region").Inc()
Expand All @@ -204,7 +204,7 @@ func (l *balanceLeaderScheduler) transferLeaderOut(cluster opt.Cluster, source *
// the worst follower peer and transfers the leader.
func (l *balanceLeaderScheduler) transferLeaderIn(cluster opt.Cluster, target *core.StoreInfo) []*operator.Operator {
targetID := target.GetID()
region := cluster.RandFollowerRegion(targetID, l.conf.Ranges, core.HealthRegion())
region := cluster.RandFollowerRegion(targetID, l.conf.Ranges, opt.HealthRegion(cluster))
if region == nil {
log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", targetID))
schedulerCounter.WithLabelValues(l.GetName(), "no-follower-region").Inc()
Expand Down
13 changes: 3 additions & 10 deletions server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,28 +135,21 @@ func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
for i := 0; i < balanceRegionRetryLimit; i++ {
// Priority picks the region that has a pending peer.
// Pending region may means the disk is overload, remove the pending region firstly.
region := cluster.RandPendingRegion(sourceID, s.conf.Ranges, core.HealthRegionAllowPending())
region := cluster.RandPendingRegion(sourceID, s.conf.Ranges, opt.HealthAllowPending(cluster), opt.ReplicatedRegion(cluster))
if region == nil {
// Then picks the region that has a follower in the source store.
region = cluster.RandFollowerRegion(sourceID, s.conf.Ranges, core.HealthRegion())
region = cluster.RandFollowerRegion(sourceID, s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster))
}
if region == nil {
// Last, picks the region has the leader in the source store.
region = cluster.RandLeaderRegion(sourceID, s.conf.Ranges, core.HealthRegion())
region = cluster.RandLeaderRegion(sourceID, s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster))
}
if region == nil {
schedulerCounter.WithLabelValues(s.GetName(), "no-region").Inc()
continue
}
log.Debug("select region", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", region.GetID()))

// We don't schedule region with abnormal number of replicas.
if len(region.GetPeers()) != cluster.GetMaxReplicas() {
log.Debug("region has abnormal replica count", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", region.GetID()))
schedulerCounter.WithLabelValues(s.GetName(), "abnormal-replica").Inc()
continue
}

// Skip hot regions.
if cluster.IsRegionHot(region) {
log.Debug("region is hot", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", region.GetID()))
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {

func (s *evictLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Operator {
schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc()
region := cluster.RandLeaderRegion(s.conf.StoreID, s.conf.Ranges, core.HealthRegion())
region := cluster.RandLeaderRegion(s.conf.StoreID, s.conf.Ranges, opt.HealthRegion(cluster))
if region == nil {
schedulerCounter.WithLabelValues(s.GetName(), "no-leader").Inc()
return nil
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *grantLeaderScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {

func (s *grantLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Operator {
schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc()
region := cluster.RandFollowerRegion(s.conf.StoreID, s.conf.Ranges, core.HealthRegion())
region := cluster.RandFollowerRegion(s.conf.StoreID, s.conf.Ranges, opt.HealthRegion(cluster))
if region == nil {
schedulerCounter.WithLabelValues(s.GetName(), "no-follower").Inc()
return nil
Expand Down
Loading