Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix region stats check (#7748) #7812

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) {
}

// Collect collects the cluster information.
func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats, isNew, isPrepared bool) {
func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats bool) {
if hasRegionStats {
c.GetRegionStats().Observe(region, stores)
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ func (r *RegionInfo) LoadedFromStorage() bool {
return r.source == Storage
}

// LoadedFromSync means this region's meta info loaded from region syncer.
// Only used for test.
func (r *RegionInfo) LoadedFromSync() bool {
return r.source == Sync
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
func NewRegionInfo(region *metapb.Region, leader *metapb.Peer, opts ...RegionCreateOption) *RegionInfo {
regionInfo := &RegionInfo{
Expand Down Expand Up @@ -705,7 +711,7 @@ func (r *RegionInfo) isRegionRecreated() bool {

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool)
type RegionGuideFunc func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
Expand All @@ -718,19 +724,15 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
return func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) {
return func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool) {
if origin == nil {
if log.GetLevel() <= zap.DebugLevel {
debug("insert new region",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
}
saveKV, saveCache, isNew = true, true, true
saveKV, saveCache = true, true
} else {
if origin.LoadedFromStorage() {
isNew = true
}
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
if r.GetVersion() > o.GetVersion() {
Expand All @@ -756,9 +758,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
saveKV, saveCache = true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() == 0 {
isNew = true
} else if log.GetLevel() <= zap.InfoLevel {
if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel {
info("leader changed",
zap.Uint64("region-id", region.GetID()),
zap.Uint64("from", origin.GetLeader().GetStoreId()),
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func TestNeedSync(t *testing.T) {
for _, testCase := range testCases {
regionA := region.Clone(testCase.optionsA...)
regionB := region.Clone(testCase.optionsB...)
_, _, _, needSync := RegionGuide(regionA, regionB)
_, _, needSync := RegionGuide(regionA, regionB)
re.Equal(testCase.needSync, needSync)
}
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,9 +537,8 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin)
if !saveCache && !isNew {
_, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin)
if !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
Expand All @@ -561,7 +560,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
cluster.HandleOverlaps(c, overlaps)
}

cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared())
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat
s.hbStreams.BindStream(storeID, server)
lastBind = time.Now()
}
region := core.RegionFromHeartbeat(request, core.SetSource(core.Heartbeat))
region := core.RegionFromHeartbeat(request)
err = c.HandleRegionHeartbeat(region)
if err != nil {
// TODO: if we need to send the error back to API server.
Expand Down
25 changes: 20 additions & 5 deletions pkg/statistics/region_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type RegionInfoProvider interface {
// RegionStatisticType represents the type of the region's status.
type RegionStatisticType uint32

const emptyStatistic = RegionStatisticType(0)

// region status type
const (
MissPeer RegionStatisticType = 1 << iota
Expand Down Expand Up @@ -148,6 +150,9 @@ func (r *RegionStatistics) deleteEntry(deleteIndex RegionStatisticType, regionID
// due to some special state types.
func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool {
regionID := region.GetID()
if !r.isObserved(regionID) {
return true
}
if r.IsRegionStatsType(regionID, OversizedRegion) !=
region.IsOversized(int64(r.conf.GetRegionMaxSize()), int64(r.conf.GetRegionMaxKeys())) {
return true
Expand All @@ -156,6 +161,14 @@ func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool {
region.NeedMerge(int64(r.conf.GetMaxMergeRegionSize()), int64(r.conf.GetMaxMergeRegionKeys()))
}

// isObserved returns whether the region is observed. And it also shows whether PD received heartbeat of this region.
func (r *RegionStatistics) isObserved(id uint64) bool {
r.RLock()
defer r.RUnlock()
_, ok := r.index[id]
return ok
}

// Observe records the current regions' status.
func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.StoreInfo) {
r.Lock()
Expand All @@ -164,7 +177,6 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store
desiredReplicas = r.conf.GetMaxReplicas()
desiredVoters = desiredReplicas
peerTypeIndex RegionStatisticType
deleteIndex RegionStatisticType
)
// Check if the region meets count requirements of its rules.
if r.conf.IsPlacementRulesEnabled() {
Expand Down Expand Up @@ -240,10 +252,10 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store
}
}
// Remove the info if any of the conditions are not met any more.
if oldIndex, ok := r.index[regionID]; ok {
deleteIndex = oldIndex &^ peerTypeIndex
if oldIndex, ok := r.index[regionID]; ok && oldIndex > emptyStatistic {
deleteIndex := oldIndex &^ peerTypeIndex
r.deleteEntry(deleteIndex, regionID)
}
r.deleteEntry(deleteIndex, regionID)
r.index[regionID] = peerTypeIndex
}

Expand All @@ -252,7 +264,10 @@ func (r *RegionStatistics) ClearDefunctRegion(regionID uint64) {
r.Lock()
defer r.Unlock()
if oldIndex, ok := r.index[regionID]; ok {
r.deleteEntry(oldIndex, regionID)
delete(r.index, regionID)
if oldIndex > emptyStatistic {
r.deleteEntry(oldIndex, regionID)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
continue
}
_, saveKV, _, _ := regionGuide(region, origin)
saveKV, _, _ := regionGuide(region, origin)
overlaps := bc.PutRegion(region)

if hasBuckets {
Expand Down
2 changes: 2 additions & 0 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ func (suite *regionTestSuite) TestRegionCheck() {
histKeys := []*histItem{{Start: 1000, End: 1999, Count: 1}}
suite.Equal(histKeys, r7)

// ref https://github.com/tikv/pd/issues/3558, we should change size to pass `NeedUpdate` for observing.
r = r.Clone(core.SetApproximateKeys(0))
mustPutStore(re, suite.svr, 2, metapb.StoreState_Offline, metapb.NodeState_Removing, []*metapb.StoreLabel{})
mustRegionHeartbeat(re, suite.svr, r)
url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "offline-peer")
Expand Down
6 changes: 3 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1120,8 +1120,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, saveKV, saveCache, needSync := regionGuide(region, origin)
if !c.isAPIServiceMode && !saveKV && !saveCache && !isNew {
saveKV, saveCache, needSync := regionGuide(region, origin)
if !c.isAPIServiceMode && !saveKV && !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
Expand Down Expand Up @@ -1153,7 +1153,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}

if !c.isAPIServiceMode {
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared())
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats)
}

if c.storage != nil {
Expand Down
94 changes: 94 additions & 0 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/syncer"
"github.com/tikv/pd/pkg/tso"
Expand Down Expand Up @@ -181,6 +182,99 @@ func TestDamagedRegion(t *testing.T) {
re.Equal(uint64(1), rc.GetOperatorController().OperatorCount(operator.OpAdmin))
}

func TestRegionStatistics(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tc, err := tests.NewTestCluster(ctx, 2)
defer tc.Destroy()
re.NoError(err)

err = tc.RunInitialServers()
re.NoError(err)

leaderName := tc.WaitLeader()
leaderServer := tc.GetLeaderServer()
grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr())
clusterID := leaderServer.GetClusterID()
bootstrapCluster(re, clusterID, grpcPDClient)
rc := leaderServer.GetRaftCluster()

region := &metapb.Region{
Id: 10,
StartKey: []byte("abc"),
EndKey: []byte("xyz"),
Peers: []*metapb.Peer{
{Id: 101, StoreId: 1},
{Id: 102, StoreId: 2},
{Id: 103, StoreId: 3},
{Id: 104, StoreId: 4, Role: metapb.PeerRole_Learner},
},
}

// To put region.
regionInfo := core.NewRegionInfo(region, region.Peers[0], core.SetApproximateSize(0))
err = tc.HandleRegionHeartbeat(regionInfo)
re.NoError(err)
regions := rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Len(regions, 1)

// wait for sync region
time.Sleep(1000 * time.Millisecond)

leaderServer.ResignLeader()
newLeaderName := tc.WaitLeader()
re.NotEqual(newLeaderName, leaderName)
leaderServer = tc.GetLeaderServer()
rc = leaderServer.GetRaftCluster()
r := rc.GetRegion(region.Id)
re.NotNil(r)
re.True(r.LoadedFromSync())
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Empty(regions)
err = tc.HandleRegionHeartbeat(regionInfo)
re.NoError(err)
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Len(regions, 1)

leaderServer.ResignLeader()
newLeaderName = tc.WaitLeader()
re.Equal(newLeaderName, leaderName)
leaderServer = tc.GetLeaderServer()
rc = leaderServer.GetRaftCluster()
re.NotNil(r)
re.True(r.LoadedFromStorage() || r.LoadedFromSync())
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Empty(regions)
regionInfo = regionInfo.Clone(core.SetSource(core.Heartbeat), core.SetApproximateSize(30))
err = tc.HandleRegionHeartbeat(regionInfo)
re.NoError(err)
rc = leaderServer.GetRaftCluster()
r = rc.GetRegion(region.Id)
re.NotNil(r)
re.False(r.LoadedFromStorage() && r.LoadedFromSync())

leaderServer.ResignLeader()
newLeaderName = tc.WaitLeader()
re.NotEqual(newLeaderName, leaderName)
leaderServer.ResignLeader()
newLeaderName = tc.WaitLeader()
re.Equal(newLeaderName, leaderName)
leaderServer = tc.GetLeaderServer()
rc = leaderServer.GetRaftCluster()
r = rc.GetRegion(region.Id)
re.NotNil(r)
re.False(r.LoadedFromStorage() && r.LoadedFromSync())
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Empty(regions)

regionInfo = regionInfo.Clone(core.SetSource(core.Heartbeat), core.SetApproximateSize(30))
err = tc.HandleRegionHeartbeat(regionInfo)
re.NoError(err)
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Len(regions, 1)
}

func TestStaleRegion(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
Loading