Skip to content

Commit

Permalink
prepare_check: remove redundant check (#7217) (#7818)
Browse files Browse the repository at this point in the history
ref #7016

remove redundant check in prepare_check

Signed-off-by: husharp <jinhao.hu@pingcap.com>

Co-authored-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
ti-chi-bot and HuSharp authored Feb 10, 2024
1 parent ae9db49 commit b8feb2b
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 48 deletions.
3 changes: 0 additions & 3 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,4 @@ func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRe
if hasRegionStats {
c.GetRegionStats().Observe(region, stores)
}
if !isPrepared && isNew {
c.GetCoordinator().GetPrepareChecker().Collect(region)
}
}
18 changes: 15 additions & 3 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1340,11 +1340,23 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo
return
}

// GetClusterNotFromStorageRegionsCnt gets the total count of regions that not loaded from storage anymore
// GetClusterNotFromStorageRegionsCnt gets the `NotFromStorageRegionsCnt` count of regions that not loaded from storage anymore.
func (r *RegionsInfo) GetClusterNotFromStorageRegionsCnt() int {
r.st.RLock()
defer r.st.RUnlock()
return r.tree.notFromStorageRegionsCnt
return r.tree.notFromStorageRegionsCount()
}

// GetNotFromStorageRegionsCntByStore gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID.
func (r *RegionsInfo) GetNotFromStorageRegionsCntByStore(storeID uint64) int {
r.st.RLock()
defer r.st.RUnlock()
return r.getNotFromStorageRegionsCntByStoreLocked(storeID)
}

// getNotFromStorageRegionsCntByStoreLocked gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID.
func (r *RegionsInfo) getNotFromStorageRegionsCntByStoreLocked(storeID uint64) int {
return r.leaders[storeID].notFromStorageRegionsCount() + r.followers[storeID].notFromStorageRegionsCount() + r.learners[storeID].notFromStorageRegionsCount()
}

// GetMetaRegions gets a set of metapb.Region from regionMap
Expand Down Expand Up @@ -1380,7 +1392,7 @@ func (r *RegionsInfo) GetStoreRegionCount(storeID uint64) int {
return r.getStoreRegionCountLocked(storeID)
}

// GetStoreRegionCount gets the total count of a store's leader, follower and learner RegionInfo by storeID
// getStoreRegionCountLocked gets the total count of a store's leader, follower and learner RegionInfo by storeID
func (r *RegionsInfo) getStoreRegionCountLocked(storeID uint64) int {
return r.leaders[storeID].length() + r.followers[storeID].length() + r.learners[storeID].length()
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ func (t *regionTree) length() int {
return t.tree.Len()
}

func (t *regionTree) notFromStorageRegionsCount() int {
if t == nil {
return 0
}
return t.notFromStorageRegionsCnt
}

// GetOverlaps returns the range items that has some intersections with the given items.
func (t *regionTree) overlaps(item *regionItem) []*regionItem {
// note that Find() gets the last item that is less or equal than the item.
Expand Down
35 changes: 6 additions & 29 deletions pkg/schedule/prepare_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@ import (

type prepareChecker struct {
syncutil.RWMutex
reactiveRegions map[uint64]int
start time.Time
sum int
prepared bool
start time.Time
prepared bool
}

func newPrepareChecker() *prepareChecker {
return &prepareChecker{
start: time.Now(),
reactiveRegions: make(map[uint64]int),
start: time.Now(),
}
}

Expand All @@ -51,13 +48,8 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool {
}
notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt()
totalRegionsCnt := c.GetTotalRegionCount()
if float64(notLoadedFromRegionsCnt) > float64(totalRegionsCnt)*collectFactor {
log.Info("meta not loaded from region number is satisfied, finish prepare checker", zap.Int("not-from-storage-region", notLoadedFromRegionsCnt), zap.Int("total-region", totalRegionsCnt))
checker.prepared = true
return true
}
// The number of active regions should be more than total region of all stores * collectFactor
if float64(totalRegionsCnt)*collectFactor > float64(checker.sum) {
if float64(totalRegionsCnt)*collectFactor > float64(notLoadedFromRegionsCnt) {
return false
}
for _, store := range c.GetStores() {
Expand All @@ -66,23 +58,15 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool {
}
storeID := store.GetID()
// For each store, the number of active regions should be more than total region of the store * collectFactor
if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) {
if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) {
return false
}
}
log.Info("not loaded from storage region number is satisfied, finish prepare checker", zap.Int("not-from-storage-region", notLoadedFromRegionsCnt), zap.Int("total-region", totalRegionsCnt))
checker.prepared = true
return true
}

func (checker *prepareChecker) Collect(region *core.RegionInfo) {
checker.Lock()
defer checker.Unlock()
for _, p := range region.GetPeers() {
checker.reactiveRegions[p.GetStoreId()]++
}
checker.sum++
}

func (checker *prepareChecker) IsPrepared() bool {
checker.RLock()
defer checker.RUnlock()
Expand All @@ -95,10 +79,3 @@ func (checker *prepareChecker) SetPrepared() {
defer checker.Unlock()
checker.prepared = true
}

// for test purpose
func (checker *prepareChecker) GetSum() int {
checker.RLock()
defer checker.RUnlock()
return checker.sum
}
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
// However it can't solve the race condition of concurrent heartbeats from the same region.
// However, it can't solve the race condition of concurrent heartbeats from the same region.
if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil {
return err
}
Expand Down
18 changes: 8 additions & 10 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2383,7 +2383,7 @@ func (c *testCluster) LoadRegion(regionID uint64, followerStoreIDs ...uint64) er
peer, _ := c.AllocPeer(id)
region.Peers = append(region.Peers, peer)
}
return c.putRegion(core.NewRegionInfo(region, nil))
return c.putRegion(core.NewRegionInfo(region, nil, core.SetSource(core.Storage)))
}

func TestBasic(t *testing.T) {
Expand Down Expand Up @@ -2468,7 +2468,7 @@ func TestDispatch(t *testing.T) {

func dispatchHeartbeat(co *schedule.Coordinator, region *core.RegionInfo, stream hbstream.HeartbeatStream) error {
co.GetHeartbeatStreams().BindStream(region.GetLeader().GetStoreId(), stream)
if err := co.GetCluster().(*RaftCluster).putRegion(region.Clone()); err != nil {
if err := co.GetCluster().(*RaftCluster).putRegion(region.Clone(core.SetSource(core.Heartbeat))); err != nil {
return err
}
co.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, nil)
Expand Down Expand Up @@ -2942,14 +2942,14 @@ func TestShouldRun(t *testing.T) {

for _, testCase := range testCases {
r := tc.GetRegion(testCase.regionID)
nr := r.Clone(core.WithLeader(r.GetPeers()[0]))
nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat))
re.NoError(tc.processRegionHeartbeat(nr))
re.Equal(testCase.ShouldRun, co.ShouldRun())
}
nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}}
newRegion := core.NewRegionInfo(nr, nil)
newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat))
re.Error(tc.processRegionHeartbeat(newRegion))
re.Equal(7, co.GetPrepareChecker().GetSum())
re.Equal(7, tc.core.GetClusterNotFromStorageRegionsCnt())
}

func TestShouldRunWithNonLeaderRegions(t *testing.T) {
Expand Down Expand Up @@ -2985,14 +2985,14 @@ func TestShouldRunWithNonLeaderRegions(t *testing.T) {

for _, testCase := range testCases {
r := tc.GetRegion(testCase.regionID)
nr := r.Clone(core.WithLeader(r.GetPeers()[0]))
nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat))
re.NoError(tc.processRegionHeartbeat(nr))
re.Equal(testCase.ShouldRun, co.ShouldRun())
}
nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}}
newRegion := core.NewRegionInfo(nr, nil)
newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat))
re.Error(tc.processRegionHeartbeat(newRegion))
re.Equal(9, co.GetPrepareChecker().GetSum())
re.Equal(9, tc.core.GetClusterNotFromStorageRegionsCnt())

// Now, after server is prepared, there exist some regions with no leader.
re.Equal(uint64(0), tc.GetRegion(10).GetLeader().GetStoreId())
Expand Down Expand Up @@ -3262,7 +3262,6 @@ func TestRestart(t *testing.T) {
re.NoError(tc.addRegionStore(3, 3))
re.NoError(tc.addLeaderRegion(1, 1))
region := tc.GetRegion(1)
co.GetPrepareChecker().Collect(region)

// Add 1 replica on store 2.
stream := mockhbstream.NewHeartbeatStream()
Expand All @@ -3276,7 +3275,6 @@ func TestRestart(t *testing.T) {

// Recreate coordinator then add another replica on store 3.
co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams)
co.GetPrepareChecker().Collect(region)
co.Run()
re.NoError(dispatchHeartbeat(co, region, stream))
region = waitAddLearner(re, stream, region, 3)
Expand Down
2 changes: 1 addition & 1 deletion tests/pdctl/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *tests.TestClu
tests.MustPutStore(re, cluster, store)
}

// note: because pdqsort is a unstable sort algorithm, set ApproximateSize for this region.
// note: because pdqsort is an unstable sort algorithm, set ApproximateSize for this region.
tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10))
time.Sleep(3 * time.Second)

Expand Down
2 changes: 1 addition & 1 deletion tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1402,7 +1402,7 @@ func putRegionWithLeader(re *require.Assertions, rc *cluster.RaftCluster, id id.
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
}
rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0]))
rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0], core.SetSource(core.Heartbeat)))
}

time.Sleep(50 * time.Millisecond)
Expand Down
1 change: 1 addition & 0 deletions tests/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func MustPutRegion(re *require.Assertions, cluster *TestCluster, regionID, store
Peers: []*metapb.Peer{leader},
RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1},
}
opts = append(opts, core.SetSource(core.Heartbeat))
r := core.NewRegionInfo(metaRegion, leader, opts...)
MustPutRegionInfo(re, cluster, r)
return r
Expand Down

0 comments on commit b8feb2b

Please sign in to comment.