Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coordinator: use a healthy region count to start coordinator #7044

Merged
merged 25 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
accb06b
fix
HuSharp Sep 6, 2023
46406e7
Merge branch 'master' into fix_coordinator
HuSharp Sep 6, 2023
d161acb
address comment
HuSharp Sep 6, 2023
6be6f17
address comment
HuSharp Sep 7, 2023
1f16cda
Merge remote-tracking branch 'upstream/master' into fix_coordinator
HuSharp Sep 11, 2023
68a1107
address comment
HuSharp Sep 11, 2023
c67ed32
Merge remote-tracking branch 'upstream/master' into fix_coordinator
HuSharp Sep 13, 2023
67b4081
address comment
HuSharp Sep 13, 2023
7bae37c
change to healthy region and move to region tree
HuSharp Sep 18, 2023
382816d
Merge remote-tracking branch 'upstream/master' into fix_coordinator
HuSharp Sep 19, 2023
f89619a
add test
HuSharp Sep 19, 2023
4eebfe3
merge master
HuSharp Sep 20, 2023
e90fa56
remove redundant line
HuSharp Sep 20, 2023
cc2d832
merge master
HuSharp Sep 21, 2023
664427a
merge master
HuSharp Sep 26, 2023
8a0c520
follower to leader can start coordinator immediately
HuSharp Sep 28, 2023
c88260d
Merge branch 'master' into fix_coordinator
HuSharp Sep 28, 2023
32f856f
Merge branch 'master' into fix_coordinator
HuSharp Sep 28, 2023
be7296f
Merge branch 'master' into fix_coordinator
HuSharp Sep 28, 2023
99dc596
Merge remote-tracking branch 'upstream/master' into fix_coordinator
HuSharp Oct 9, 2023
0663294
change to from
HuSharp Oct 9, 2023
183649a
change proper name
HuSharp Oct 9, 2023
45d2cc5
Merge branch 'master' into fix_coordinator
HuSharp Oct 9, 2023
6281684
address comment
HuSharp Oct 10, 2023
a9d6e3e
Merge branch 'master' into fix_coordinator
ti-chi-bot[bot] Oct 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 43 additions & 8 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,14 @@ type RegionInfo struct {
queryStats *pdpb.QueryStats
flowRoundDivisor uint64
// buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version.
buckets unsafe.Pointer
fromHeartbeat bool
buckets unsafe.Pointer
// source is used to indicate region's source, such as FromHeartbeat/FromSync/InDisk.
source RegionSource
}

// GetRegionSource returns the region source.
func (r *RegionInfo) GetRegionSource() RegionSource {
return r.source
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down Expand Up @@ -171,6 +177,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
interval: heartbeat.GetInterval(),
replicationStatus: heartbeat.GetReplicationStatus(),
queryStats: heartbeat.GetQueryStats(),
source: FromHeartbeat,
}

for _, opt := range opts {
Expand Down Expand Up @@ -639,11 +646,6 @@ func (r *RegionInfo) IsFlashbackChanged(l *RegionInfo) bool {
return r.meta.FlashbackStartTs != l.meta.FlashbackStartTs || r.meta.IsInFlashback != l.meta.IsInFlashback
}

// IsFromHeartbeat returns whether the region info is from the region heartbeat.
func (r *RegionInfo) IsFromHeartbeat() bool {
return r.fromHeartbeat
}

func (r *RegionInfo) isInvolved(startKey, endKey []byte) bool {
return bytes.Compare(r.GetStartKey(), startKey) >= 0 && (len(endKey) == 0 || (len(r.GetEndKey()) > 0 && bytes.Compare(r.GetEndKey(), endKey) <= 0))
}
Expand Down Expand Up @@ -683,7 +685,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
saveKV, saveCache, isNew = true, true, true
} else {
if !origin.IsFromHeartbeat() {
if origin.source == FromSync || origin.source == InDisk {
isNew = true
}
r := region.GetRegionEpoch()
Expand Down Expand Up @@ -793,6 +795,18 @@ type RegionsInfo struct {
pendingPeers map[uint64]*regionTree // storeID -> sub regionTree
}

// RegionSource is the source of region.
type RegionSource uint32

const (
// InDisk means region is stale.
InDisk RegionSource = iota
Copy link
Member

@rleungx rleungx Sep 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about FromLoad or FromStorage?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update to FromStorage

// FromSync means region is stale.
FromSync
// FromHeartbeat means region is fresh.
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
FromHeartbeat
)

// NewRegionsInfo creates RegionsInfo with tree, regions, leaders and followers
func NewRegionsInfo() *RegionsInfo {
return &RegionsInfo{
Expand Down Expand Up @@ -840,6 +854,8 @@ func (r *RegionsInfo) CheckAndPutRegion(region *RegionInfo) []*RegionInfo {
origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...)
r.t.Unlock()
r.UpdateSubTree(region, origin, overlaps, rangeChanged)
// InDisk means region is stale.
r.AtomicAddStaleRegionCnt()
return overlaps
}

Expand All @@ -857,6 +873,21 @@ func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*reg
return origin, overlaps, err
}

// GetStaleRegionCnt returns the stale region count.
func (r *RegionsInfo) GetStaleRegionCnt() int64 {
r.t.RLock()
defer r.t.RUnlock()
if r.tree.length() == 0 {
return 0
}
return r.tree.staleRegionCnt
}

// AtomicAddStaleRegionCnt atomically adds the stale region count.
func (r *RegionsInfo) AtomicAddStaleRegionCnt() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can maintain it in the tree?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the difference is between putting in RegionsInfo and tree.
Otherwise, if we put RegionsInfo into tree, we would need to add a layer to get the tree's StaleRegion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

r.tree.AtomicAddStaleRegionCnt()
}

// AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put.
func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo, error) {
r.t.Lock()
Expand All @@ -870,6 +901,10 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo
r.t.Unlock()
return nil, err
}
// If origin is stale, need to sub the stale region count.
if origin != nil && origin.source != FromHeartbeat && region.source == FromHeartbeat {
r.tree.AtomicSubStaleRegionCnt()
}
origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...)
r.t.Unlock()
r.UpdateSubTree(region, origin, overlaps, rangeChanged)
Expand Down
6 changes: 3 additions & 3 deletions pkg/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,10 @@ func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption {
}
}

// SetFromHeartbeat sets if the region info comes from the region heartbeat.
func SetFromHeartbeat(fromHeartbeat bool) RegionCreateOption {
// SetSource sets the region info's come from.
func SetSource(source RegionSource) RegionCreateOption {
return func(region *RegionInfo) {
region.fromHeartbeat = fromHeartbeat
region.source = source
}
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package core
import (
"bytes"
"math/rand"
"sync/atomic"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -61,6 +62,8 @@ type regionTree struct {
totalSize int64
totalWriteBytesRate float64
totalWriteKeysRate float64

staleRegionCnt int64
}

func newRegionTree() *regionTree {
Expand All @@ -69,6 +72,7 @@ func newRegionTree() *regionTree {
totalSize: 0,
totalWriteBytesRate: 0,
totalWriteKeysRate: 0,
staleRegionCnt: 0,
}
}

Expand Down Expand Up @@ -342,3 +346,20 @@ func (t *regionTree) TotalWriteRate() (bytesRate, keysRate float64) {
}
return t.totalWriteBytesRate, t.totalWriteKeysRate
}

func (t *regionTree) AtomicAddStaleRegionCnt() {
if t.length() == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to check the length here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we do not need to check it after moving staleRegionCnt into regionsInfo

return
}
atomic.AddInt64(&t.staleRegionCnt, 1)
}

func (t *regionTree) AtomicSubStaleRegionCnt() {
if t.length() == 0 {
return
}
if t.staleRegionCnt == 0 {
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
return
}
atomic.AddInt64(&t.staleRegionCnt, -1)
}
7 changes: 7 additions & 0 deletions pkg/schedule/prepare_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package schedule
import (
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)

type prepareChecker struct {
Expand Down Expand Up @@ -47,6 +49,11 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool {
checker.prepared = true
return true
}
if float64(c.GetStaleRegionCnt()) < float64(c.GetTotalRegionCount())*(1-collectFactor) {
log.Info("stale region num is satisfied, skip prepare checker", zap.Int64("stale-region", c.GetStaleRegionCnt()), zap.Int("total-region", c.GetTotalRegionCount()))
checker.prepared = true
return true
}
// The number of active regions should be more than total region of all stores * collectFactor
if float64(c.GetTotalRegionCount())*collectFactor > float64(checker.sum) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need it?

Copy link
Member Author

@HuSharp HuSharp Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it is better to remove them in another pr? This pr just focuses on accelerating coordinator.
What do u think :)

return false
Expand Down
8 changes: 6 additions & 2 deletions pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,21 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
core.SetWrittenKeys(stats[i].KeysWritten),
core.SetReadBytes(stats[i].BytesRead),
core.SetReadKeys(stats[i].KeysRead),
core.SetFromHeartbeat(false),
core.SetSource(core.FromSync),
)
} else {
region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false))
region = core.NewRegionInfo(r, regionLeader, core.SetSource(core.FromSync))
}

origin, _, err := bc.PreCheckPutRegion(region)
if err != nil {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
continue
}
// FromSync means region is stale.
if origin == nil || (origin != nil && origin.GetRegionSource() == core.FromHeartbeat) {
bc.RegionsInfo.AtomicAddStaleRegionCnt()
}
_, saveKV, _, _ := regionGuide(region, origin)
overlaps := bc.PutRegion(region)

Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
// However it can't solve the race condition of concurrent heartbeats from the same region.
// However, it can't solve the race condition of concurrent heartbeats from the same region.
if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,7 @@ func TestRegionSizeChanged(t *testing.T) {
core.WithLeader(region.GetPeers()[2]),
core.SetApproximateSize(curMaxMergeSize-1),
core.SetApproximateKeys(curMaxMergeKeys-1),
core.SetFromHeartbeat(true),
core.SetSource(core.FromHeartbeat),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any test to cover the problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

)
cluster.processRegionHeartbeat(region)
regionID := region.GetID()
Expand All @@ -1012,7 +1012,7 @@ func TestRegionSizeChanged(t *testing.T) {
core.WithLeader(region.GetPeers()[2]),
core.SetApproximateSize(curMaxMergeSize+1),
core.SetApproximateKeys(curMaxMergeKeys+1),
core.SetFromHeartbeat(true),
core.SetSource(core.FromHeartbeat),
)
cluster.processRegionHeartbeat(region)
re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
Expand Down Expand Up @@ -2375,6 +2375,7 @@ func (c *testCluster) LoadRegion(regionID uint64, followerStoreIDs ...uint64) er
peer, _ := c.AllocPeer(id)
region.Peers = append(region.Peers, peer)
}
c.core.AtomicAddStaleRegionCnt()
return c.putRegion(core.NewRegionInfo(region, nil))
}

Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
lastBind = time.Now()
}

region := core.RegionFromHeartbeat(request, flowRoundOption, core.SetFromHeartbeat(true))
region := core.RegionFromHeartbeat(request, flowRoundOption)
if region.GetLeader() == nil {
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()
Expand Down