Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coordinator: use a healthy region count to start coordinator #7044

Merged
merged 25 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
accb06b
fix
HuSharp Sep 6, 2023
46406e7
Merge branch 'master' into fix_coordinator
HuSharp Sep 6, 2023
d161acb
address comment
HuSharp Sep 6, 2023
6be6f17
address comment
HuSharp Sep 7, 2023
1f16cda
Merge remote-tracking branch 'upstream/master' into fix_coordinator
HuSharp Sep 11, 2023
68a1107
address comment
HuSharp Sep 11, 2023
c67ed32
Merge remote-tracking branch 'upstream/master' into fix_coordinator
HuSharp Sep 13, 2023
67b4081
address comment
HuSharp Sep 13, 2023
7bae37c
change to healthy region and move to region tree
HuSharp Sep 18, 2023
382816d
Merge remote-tracking branch 'upstream/master' into fix_coordinator
HuSharp Sep 19, 2023
f89619a
add test
HuSharp Sep 19, 2023
4eebfe3
merge master
HuSharp Sep 20, 2023
e90fa56
remove redundant line
HuSharp Sep 20, 2023
cc2d832
merge master
HuSharp Sep 21, 2023
664427a
merge master
HuSharp Sep 26, 2023
8a0c520
follower to leader can start coordinator immediately
HuSharp Sep 28, 2023
c88260d
Merge branch 'master' into fix_coordinator
HuSharp Sep 28, 2023
32f856f
Merge branch 'master' into fix_coordinator
HuSharp Sep 28, 2023
be7296f
Merge branch 'master' into fix_coordinator
HuSharp Sep 28, 2023
99dc596
Merge remote-tracking branch 'upstream/master' into fix_coordinator
HuSharp Oct 9, 2023
0663294
change to from
HuSharp Oct 9, 2023
183649a
change proper name
HuSharp Oct 9, 2023
45d2cc5
Merge branch 'master' into fix_coordinator
HuSharp Oct 9, 2023
6281684
address comment
HuSharp Oct 10, 2023
a9d6e3e
Merge branch 'master' into fix_coordinator
ti-chi-bot[bot] Oct 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,26 @@ type RegionInfo struct {
queryStats *pdpb.QueryStats
flowRoundDivisor uint64
// buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version.
buckets unsafe.Pointer
fromHeartbeat bool
buckets unsafe.Pointer
// source is used to indicate region's source, such as Storage/Sync/Heartbeat.
source RegionSource
}

// RegionSource is the source of region.
type RegionSource uint32

const (
// Storage means this region's meta info might be stale.
Storage RegionSource = iota
// Sync means this region's meta info is relatively fresher.
Sync
// Heartbeat means this region's meta info is relatively fresher.
Heartbeat
)

// LoadedFromStorage means this region's meta info loaded from storage.
func (r *RegionInfo) LoadedFromStorage() bool {
return r.source == Storage
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down Expand Up @@ -192,6 +210,7 @@ func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateO
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
queryStats: heartbeat.GetQueryStats(),
source: Heartbeat,
}

// scheduling service doesn't need the following fields.
Expand Down Expand Up @@ -667,11 +686,6 @@ func (r *RegionInfo) IsFlashbackChanged(l *RegionInfo) bool {
return r.meta.FlashbackStartTs != l.meta.FlashbackStartTs || r.meta.IsInFlashback != l.meta.IsInFlashback
}

// IsFromHeartbeat returns whether the region info is from the region heartbeat.
func (r *RegionInfo) IsFromHeartbeat() bool {
return r.fromHeartbeat
}

func (r *RegionInfo) isInvolved(startKey, endKey []byte) bool {
return bytes.Compare(r.GetStartKey(), startKey) >= 0 && (len(endKey) == 0 || (len(r.GetEndKey()) > 0 && bytes.Compare(r.GetEndKey(), endKey) <= 0))
}
Expand Down Expand Up @@ -711,7 +725,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
saveKV, saveCache, isNew = true, true, true
} else {
if !origin.IsFromHeartbeat() {
if origin.LoadedFromStorage() {
isNew = true
}
r := region.GetRegionEpoch()
Expand Down Expand Up @@ -1323,6 +1337,13 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo
return
}

// GetClusterNotFromStorageRegionsCnt gets the total count of regions that not loaded from storage anymore
func (r *RegionsInfo) GetClusterNotFromStorageRegionsCnt() int {
r.st.RLock()
defer r.st.RUnlock()
return r.tree.notFromStorageRegionsCnt
}

// GetMetaRegions gets a set of metapb.Region from regionMap
func (r *RegionsInfo) GetMetaRegions() []*metapb.Region {
r.t.RLock()
Expand Down
6 changes: 3 additions & 3 deletions pkg/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,10 @@ func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption {
}
}

// SetFromHeartbeat sets if the region info comes from the region heartbeat.
func SetFromHeartbeat(fromHeartbeat bool) RegionCreateOption {
// SetSource sets the region info's come from.
func SetSource(source RegionSource) RegionCreateOption {
return func(region *RegionInfo) {
region.fromHeartbeat = fromHeartbeat
region.source = source
}
}

Expand Down
29 changes: 25 additions & 4 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,17 @@ type regionTree struct {
totalSize int64
totalWriteBytesRate float64
totalWriteKeysRate float64
// count the number of regions that not loaded from storage.
notFromStorageRegionsCnt int
}

func newRegionTree() *regionTree {
return &regionTree{
tree: btree.NewG[*regionItem](defaultBTreeDegree),
totalSize: 0,
totalWriteBytesRate: 0,
totalWriteKeysRate: 0,
tree: btree.NewG[*regionItem](defaultBTreeDegree),
totalSize: 0,
totalWriteBytesRate: 0,
totalWriteKeysRate: 0,
notFromStorageRegionsCnt: 0,
}
}

Expand Down Expand Up @@ -112,6 +115,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
t.totalWriteBytesRate += regionWriteBytesRate
t.totalWriteKeysRate += regionWriteKeysRate
if !region.LoadedFromStorage() {
t.notFromStorageRegionsCnt++
}

if !withOverlaps {
overlaps = t.overlaps(item)
Expand All @@ -133,6 +139,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
regionWriteBytesRate, regionWriteKeysRate = old.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
if !old.LoadedFromStorage() {
t.notFromStorageRegionsCnt--
}
}

return result
Expand All @@ -149,6 +158,15 @@ func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) {
regionWriteBytesRate, regionWriteKeysRate = origin.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate

// If the region meta information not loaded from storage anymore, decrease the counter.
if origin.LoadedFromStorage() && !region.LoadedFromStorage() {
t.notFromStorageRegionsCnt++
}
// If the region meta information updated to load from storage, increase the counter.
if !origin.LoadedFromStorage() && region.LoadedFromStorage() {
t.notFromStorageRegionsCnt--
}
}

// remove removes a region if the region is in the tree.
Expand All @@ -168,6 +186,9 @@ func (t *regionTree) remove(region *RegionInfo) {
regionWriteBytesRate, regionWriteKeysRate := result.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
if !region.LoadedFromStorage() {
t.notFromStorageRegionsCnt--
}
t.tree.Delete(item)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
// However it can't solve the race condition of concurrent heartbeats from the same region.
// However, it can't solve the race condition of concurrent heartbeats from the same region.
if overlaps, err = c.AtomicCheckAndPutRegion(region); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat
s.hbStreams.BindStream(storeID, server)
lastBind = time.Now()
}
region := core.RegionFromHeartbeat(request, core.SetFromHeartbeat(true))
region := core.RegionFromHeartbeat(request, core.SetSource(core.Heartbeat))
err = c.HandleRegionHeartbeat(region)
if err != nil {
// TODO: if we need to send the error back to API server.
Expand Down
11 changes: 10 additions & 1 deletion pkg/schedule/prepare_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package schedule
import (
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)

type prepareChecker struct {
Expand Down Expand Up @@ -47,8 +49,15 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool {
checker.prepared = true
return true
}
notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt()
totalRegionsCnt := c.GetTotalRegionCount()
if float64(notLoadedFromRegionsCnt) > float64(totalRegionsCnt)*collectFactor {
log.Info("meta not loaded from region number is satisfied, finish prepare checker", zap.Int("not-from-storage-region", notLoadedFromRegionsCnt), zap.Int("total-region", totalRegionsCnt))
checker.prepared = true
return true
}
// The number of active regions should be more than total region of all stores * collectFactor
if float64(c.GetTotalRegionCount())*collectFactor > float64(checker.sum) {
if float64(totalRegionsCnt)*collectFactor > float64(checker.sum) {
return false
}
for _, store := range c.GetStores() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/endpoint/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core.
}

nextID = region.GetId() + 1
overlaps := f(core.NewRegionInfo(region, nil))
overlaps := f(core.NewRegionInfo(region, nil, core.SetSource(core.Storage)))
for _, item := range overlaps {
if err := se.DeleteRegion(item.GetMeta()); err != nil {
return err
Expand Down
7 changes: 3 additions & 4 deletions pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ func saveRegions(lb *levelDBBackend, n int, ratio int) error {
}

func benchmarkLoadRegions(b *testing.B, n int, ratio int) {
re := require.New(b)
ctx := context.Background()
dir := b.TempDir()
lb, err := newLevelDBBackend(ctx, dir, nil)
Expand All @@ -426,10 +427,8 @@ func benchmarkLoadRegions(b *testing.B, n int, ratio int) {
}()

b.ResetTimer()
err = lb.LoadRegions(ctx, cluster.CheckAndPutRegion)
if err != nil {
b.Fatal(err)
}
err = lb.LoadRegions(context.Background(), cluster.CheckAndPutRegion)
re.NoError(err)
}

var volumes = []struct {
Expand Down
6 changes: 3 additions & 3 deletions pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
log.Info("region syncer start load region")
start := time.Now()
err := storage.TryLoadRegionsOnce(ctx, regionStorage, bc.CheckAndPutRegion)
log.Info("region syncer finished load region", zap.Duration("time-cost", time.Since(start)))
log.Info("region syncer finished load regions", zap.Duration("time-cost", time.Since(start)))
if err != nil {
log.Warn("failed to load regions", errs.ZapError(err))
}
Expand Down Expand Up @@ -183,10 +183,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
core.SetWrittenKeys(stats[i].KeysWritten),
core.SetReadBytes(stats[i].BytesRead),
core.SetReadKeys(stats[i].KeysRead),
core.SetFromHeartbeat(false),
core.SetSource(core.Sync),
)
} else {
region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false))
region = core.NewRegionInfo(r, regionLeader, core.SetSource(core.Sync))
}

origin, _, err := bc.PreCheckPutRegion(region)
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
start = time.Now()

// used to load region from kv storage to cache storage.
if err := storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil {
if err = storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil {
return nil, err
}
log.Info("load regions",
Expand Down
4 changes: 2 additions & 2 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,7 @@ func TestRegionSizeChanged(t *testing.T) {
core.WithLeader(region.GetPeers()[2]),
core.SetApproximateSize(curMaxMergeSize-1),
core.SetApproximateKeys(curMaxMergeKeys-1),
core.SetFromHeartbeat(true),
core.SetSource(core.Heartbeat),
)
cluster.processRegionHeartbeat(region)
regionID := region.GetID()
Expand All @@ -1012,7 +1012,7 @@ func TestRegionSizeChanged(t *testing.T) {
core.WithLeader(region.GetPeers()[2]),
core.SetApproximateSize(curMaxMergeSize+1),
core.SetApproximateKeys(curMaxMergeKeys+1),
core.SetFromHeartbeat(true),
core.SetSource(core.Heartbeat),
)
cluster.processRegionHeartbeat(region)
re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
lastBind = time.Now()
}

region := core.RegionFromHeartbeat(request, flowRoundOption, core.SetFromHeartbeat(true))
region := core.RegionFromHeartbeat(request, flowRoundOption)
if region.GetLeader() == nil {
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()
Expand Down
48 changes: 47 additions & 1 deletion tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,52 @@ func TestPrepareChecker(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker"))
}

// ref: https://github.com/tikv/pd/issues/6988
func TestPrepareCheckerWithTransferLeader(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`))
cluster, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true })
defer cluster.Destroy()
re.NoError(err)

err = cluster.RunInitialServers()
re.NoError(err)
cluster.WaitLeader()
leaderServer := cluster.GetServer(cluster.GetLeader())
re.NoError(leaderServer.BootstrapCluster())
rc := leaderServer.GetServer().GetRaftCluster()
re.NotNil(rc)
regionLen := 100
regions := initRegions(regionLen)
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
re.NoError(err)
}
// ensure flush to region storage
time.Sleep(3 * time.Second)
re.True(leaderServer.GetRaftCluster().IsPrepared())

// join new PD
pd2, err := cluster.Join(ctx)
re.NoError(err)
err = pd2.Run()
re.NoError(err)
// waiting for synchronization to complete
time.Sleep(3 * time.Second)
err = cluster.ResignLeader()
re.NoError(err)
re.Equal("pd2", cluster.WaitLeader())

// transfer leader to pd1, can start coordinator immediately.
err = cluster.ResignLeader()
re.NoError(err)
re.Equal("pd1", cluster.WaitLeader())
re.True(rc.IsPrepared())
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker"))
}

func initRegions(regionLen int) []*core.RegionInfo {
allocator := &idAllocator{allocator: mockid.NewIDAllocator()}
regions := make([]*core.RegionInfo, 0, regionLen)
Expand All @@ -264,7 +310,7 @@ func initRegions(regionLen int) []*core.RegionInfo {
{Id: allocator.alloc(), StoreId: uint64(3)},
},
}
region := core.NewRegionInfo(r, r.Peers[0])
region := core.NewRegionInfo(r, r.Peers[0], core.SetSource(core.Heartbeat))
// Here is used to simulate the upgrade process.
if i < regionLen/2 {
buckets := &metapb.Buckets{
Expand Down