diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 2ad7002f8ff..f6163f72f61 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -288,7 +288,19 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto for _, i := range h.r.Perm(storesStat[srcStoreID].RegionsStat.Len()) { rs := storesStat[srcStoreID].RegionsStat[i] srcRegion := cluster.GetRegion(rs.RegionID) - if srcRegion == nil || len(srcRegion.GetDownPeers()) != 0 || len(srcRegion.GetPendingPeers()) != 0 { + if srcRegion == nil { + schedulerCounter.WithLabelValues(h.GetName(), "no_region").Inc() + continue + } + + if isRegionUnhealthy(srcRegion) { + schedulerCounter.WithLabelValues(h.GetName(), "unhealthy_replica").Inc() + continue + } + + if len(srcRegion.GetPeers()) != cluster.GetMaxReplicas() { + log.Debug("region has abnormal replica count", zap.String("scheduler", h.GetName()), zap.Uint64("region-id", srcRegion.GetID())) + schedulerCounter.WithLabelValues(h.GetName(), "abnormal_replica").Inc() continue } @@ -345,7 +357,13 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, s for _, i := range h.r.Perm(storesStat[srcStoreID].RegionsStat.Len()) { rs := storesStat[srcStoreID].RegionsStat[i] srcRegion := cluster.GetRegion(rs.RegionID) - if srcRegion == nil || len(srcRegion.GetDownPeers()) != 0 || len(srcRegion.GetPendingPeers()) != 0 { + if srcRegion == nil { + schedulerCounter.WithLabelValues(h.GetName(), "no_region").Inc() + continue + } + + if isRegionUnhealthy(srcRegion) { + schedulerCounter.WithLabelValues(h.GetName(), "unhealthy_replica").Inc() continue } diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index 96748748ec8..951e167a2a8 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -361,6 +361,34 @@ func (s *testShuffleHotRegionSchedulerSuite) TestBalance(c *C) { c.Assert(op[0].Step(1).(schedule.PromoteLearner).ToStore, Not(Equals), 6) } +var _ = Suite(&testHotRegionSchedulerSuite{}) + +type testHotRegionSchedulerSuite struct{} + +func (s *testHotRegionSchedulerSuite) TestAbnormalReplica(c *C) { + opt := mockoption.NewScheduleOptions() + opt.LeaderScheduleLimit = 0 + tc := mockcluster.NewCluster(opt) + hb, err := schedule.CreateScheduler("hot-read-region", schedule.NewOperatorController(nil, nil)) + c.Assert(err, IsNil) + + tc.AddRegionStore(1, 3) + tc.AddRegionStore(2, 2) + tc.AddRegionStore(3, 2) + + // Report store read bytes. + tc.UpdateStorageReadBytes(1, 75*1024*1024) + tc.UpdateStorageReadBytes(2, 45*1024*1024) + tc.UpdateStorageReadBytes(3, 45*1024*1024) + + tc.AddLeaderRegionWithReadInfo(1, 1, 512*1024*statistics.RegionHeartBeatReportInterval, 2) + tc.AddLeaderRegionWithReadInfo(2, 2, 512*1024*statistics.RegionHeartBeatReportInterval, 1, 3) + tc.AddLeaderRegionWithReadInfo(3, 1, 512*1024*statistics.RegionHeartBeatReportInterval, 2, 3) + opt.HotRegionCacheHitsThreshold = 0 + c.Assert(tc.IsRegionHot(1), IsTrue) + c.Assert(hb.Schedule(tc), IsNil) +} + var _ = Suite(&testEvictLeaderSuite{}) type testEvictLeaderSuite struct{} diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index 31dcf030e0e..37ae6ce7c6e 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -49,6 +49,10 @@ func minDuration(a, b time.Duration) time.Duration { return b } +func isRegionUnhealthy(region *core.RegionInfo) bool { + return len(region.GetDownPeers()) != 0 || len(region.GetLearners()) != 0 +} + func shouldBalance(cluster schedule.Cluster, source, target *core.StoreInfo, region *core.RegionInfo, kind core.ResourceKind, opInfluence schedule.OpInfluence) bool { // The reason we use max(regionSize, averageRegionSize) to check is: // 1. prevent moving small regions between stores with close scores, leading to unnecessary balance. diff --git a/server/schedulers/utils_test.go b/server/schedulers/utils_test.go index b82f2da344d..e24d69b30cb 100644 --- a/server/schedulers/utils_test.go +++ b/server/schedulers/utils_test.go @@ -18,6 +18,9 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/pd/server/core" ) func TestSchedulers(t *testing.T) { @@ -45,3 +48,32 @@ func (s *testMinMaxSuite) TestMinDuration(c *C) { c.Assert(minDuration(time.Second, time.Minute), Equals, time.Second) c.Assert(minDuration(time.Second, time.Second), Equals, time.Second) } + +var _ = Suite(&testRegionUnhealthySuite{}) + +type testRegionUnhealthySuite struct{} + +func (s *testRegionUnhealthySuite) TestIsRegionUnhealthy(c *C) { + peers := make([]*metapb.Peer, 0, 3) + for i := uint64(0); i < 2; i++ { + p := &metapb.Peer{ + Id: i, + StoreId: i, + } + peers = append(peers, p) + } + peers = append(peers, &metapb.Peer{ + Id: 2, + StoreId: 2, + IsLearner: true, + }) + + r1 := core.NewRegionInfo(&metapb.Region{Peers: peers[:2]}, peers[0], core.WithDownPeers([]*pdpb.PeerStats{{Peer: peers[1]}})) + r2 := core.NewRegionInfo(&metapb.Region{Peers: peers[:2]}, peers[0], core.WithPendingPeers([]*metapb.Peer{peers[1]})) + r3 := core.NewRegionInfo(&metapb.Region{Peers: peers[:3]}, peers[0], core.WithLearners([]*metapb.Peer{peers[2]})) + r4 := core.NewRegionInfo(&metapb.Region{Peers: peers[:2]}, peers[0]) + c.Assert(isRegionUnhealthy(r1), IsTrue) + c.Assert(isRegionUnhealthy(r2), IsFalse) + c.Assert(isRegionUnhealthy(r3), IsTrue) + c.Assert(isRegionUnhealthy(r4), IsFalse) +}