Skip to content

Commit

Permalink
scheduler: scatter range receive npe when get store info of non-exist…
Browse files Browse the repository at this point in the history
… stores (#1439)
  • Loading branch information
nanne007 authored and rleungx committed Feb 27, 2019
1 parent c996046 commit 0690cea
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 2 deletions.
47 changes: 47 additions & 0 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,53 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) {
c.Assert(co.cluster.prepareChecker.sum, Equals, 7)

}
func (s *testCoordinatorSuite) TestShouldRunWithNonLeaderRegions(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID())
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)

c.Assert(tc.addLeaderStore(1, 10), IsNil)
c.Assert(tc.addLeaderStore(2, 0), IsNil)
c.Assert(tc.addLeaderStore(3, 0), IsNil)
for i := 0; i < 10; i++ {
c.Assert(tc.LoadRegion(uint64(i+1), 1, 2, 3), IsNil)
}
c.Assert(co.shouldRun(), IsFalse)
c.Assert(tc.core.Regions.GetStoreRegionCount(1), Equals, 10)

tbl := []struct {
regionID uint64
shouldRun bool
}{
{1, false},
{2, false},
{3, false},
{4, false},
{5, false},
{6, false},
{7, false},
{8, true},
}

for _, t := range tbl {
r := tc.GetRegion(t.regionID)
nr := r.Clone(core.WithLeader(r.GetPeers()[0]))
c.Assert(tc.handleRegionHeartbeat(nr), IsNil)
c.Assert(co.shouldRun(), Equals, t.shouldRun)
}
nr := &metapb.Region{Id: 8, Peers: []*metapb.Peer{}}
newRegion := core.NewRegionInfo(nr, nil)
c.Assert(tc.handleRegionHeartbeat(newRegion), NotNil)
c.Assert(co.cluster.prepareChecker.sum, Equals, 8)

// Now, after server is prepared, there exist some regions with no leader.
c.Assert(tc.GetRegion(9).GetLeader().GetStoreId(), Equals, uint64(0))
c.Assert(tc.GetRegion(10).GetLeader().GetStoreId(), Equals, uint64(0))
}

func (s *testCoordinatorSuite) TestAddScheduler(c *C) {
cfg, opt, err := newTestScheduleConfig()
Expand Down
8 changes: 6 additions & 2 deletions server/schedule/range_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ func (r *RangeCluster) updateStoreInfo(s *core.StoreInfo) *core.StoreInfo {
// GetStore searches for a store by ID.
func (r *RangeCluster) GetStore(id uint64) *core.StoreInfo {
s := r.Cluster.GetStore(id)
r.updateStoreInfo(s)
if s != nil {
r.updateStoreInfo(s)
}
return s
}

Expand Down Expand Up @@ -154,6 +156,8 @@ func (r *RangeCluster) GetFollowerStores(region *core.RegionInfo) []*core.StoreI
// GetLeaderStore returns all stores that contains the region's leader peer.
func (r *RangeCluster) GetLeaderStore(region *core.RegionInfo) *core.StoreInfo {
s := r.Cluster.GetLeaderStore(region)
r.updateStoreInfo(s)
if s != nil {
r.updateStoreInfo(s)
}
return s
}
66 changes: 66 additions & 0 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1306,3 +1306,69 @@ func (s *testScatterRangeLeaderSuite) TestBalance(c *C) {
c.Check(regionCount, LessEqual, 32)
}
}

func (s *testScatterRangeLeaderSuite) TestBalanceWhenRegionNotHeartbeat(c *C) {
opt := schedule.NewMockSchedulerOptions()
tc := schedule.NewMockCluster(opt)
// Add stores 1,2,3.
tc.AddRegionStore(1, 0)
tc.AddRegionStore(2, 0)
tc.AddRegionStore(3, 0)
var (
id uint64
regions []*metapb.Region
)
for i := 0; i < 10; i++ {
peers := []*metapb.Peer{
{Id: id + 1, StoreId: 1},
{Id: id + 2, StoreId: 2},
{Id: id + 3, StoreId: 3},
}
regions = append(regions, &metapb.Region{
Id: id + 4,
Peers: peers,
StartKey: []byte(fmt.Sprintf("s_%02d", i)),
EndKey: []byte(fmt.Sprintf("s_%02d", i+1)),
})
id += 4
}
// empty case
regions[9].EndKey = []byte("")

// To simulate server prepared,
// store 1 contains 8 leader region peers and leaders of 2 regions are unknown yet.
for _, meta := range regions {
var leader *metapb.Peer
if meta.Id < 8 {
leader = meta.Peers[0]
}
regionInfo := core.NewRegionInfo(
meta,
leader,
core.SetApproximateKeys(96),
core.SetApproximateSize(96),
)

tc.Regions.SetRegion(regionInfo)
}

for i := 1; i <= 3; i++ {
tc.UpdateStoreStatus(uint64(i))
}

oc := schedule.NewOperatorController(nil, nil)
hb := newScatterRangeScheduler(oc, []string{"s_00", "s_09", "t"})

limit := 0
for {
if limit > 100 {
break
}
ops := hb.Schedule(tc)
if ops == nil {
limit++
continue
}
tc.ApplyOperator(ops[0])
}
}

0 comments on commit 0690cea

Please sign in to comment.