Skip to content

Commit

Permalink
coordinator: use a healthy region count to start coordinator (tikv#7044)
Browse files Browse the repository at this point in the history
close tikv#6988, close tikv#7016

Signed-off-by: husharp <jinhao.hu@pingcap.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Dec 1, 2023
1 parent adfa678 commit 5d6e908
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 31 deletions.
37 changes: 29 additions & 8 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,26 @@ type RegionInfo struct {
queryStats *pdpb.QueryStats
flowRoundDivisor uint64
// buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version.
buckets unsafe.Pointer
fromHeartbeat bool
buckets unsafe.Pointer
// source is used to indicate region's source, such as Storage/Sync/Heartbeat.
source RegionSource
}

// RegionSource is the source of region.
type RegionSource uint32

const (
// Storage means this region's meta info might be stale.
Storage RegionSource = iota
// Sync means this region's meta info is relatively fresher.
Sync
// Heartbeat means this region's meta info is relatively fresher.
Heartbeat
)

// LoadedFromStorage means this region's meta info loaded from storage.
func (r *RegionInfo) LoadedFromStorage() bool {
return r.source == Storage
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down Expand Up @@ -192,6 +210,7 @@ func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateO
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
queryStats: heartbeat.GetQueryStats(),
source: Heartbeat,
}

// scheduling service doesn't need the following fields.
Expand Down Expand Up @@ -662,11 +681,6 @@ func (r *RegionInfo) GetReplicationStatus() *replication_modepb.RegionReplicatio
return r.replicationStatus
}

// IsFromHeartbeat returns whether the region info is from the region heartbeat.
func (r *RegionInfo) IsFromHeartbeat() bool {
return r.fromHeartbeat
}

func (r *RegionInfo) isInvolved(startKey, endKey []byte) bool {
return bytes.Compare(r.GetStartKey(), startKey) >= 0 && (len(endKey) == 0 || (len(r.GetEndKey()) > 0 && bytes.Compare(r.GetEndKey(), endKey) <= 0))
}
Expand Down Expand Up @@ -706,7 +720,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
saveKV, saveCache, isNew = true, true, true
} else {
if !origin.IsFromHeartbeat() {
if origin.LoadedFromStorage() {
isNew = true
}
r := region.GetRegionEpoch()
Expand Down Expand Up @@ -1310,6 +1324,13 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo
return
}

// GetClusterNotFromStorageRegionsCnt gets the total count of regions that not loaded from storage anymore
func (r *RegionsInfo) GetClusterNotFromStorageRegionsCnt() int {
r.st.RLock()
defer r.st.RUnlock()
return r.tree.notFromStorageRegionsCnt
}

// GetMetaRegions gets a set of metapb.Region from regionMap
func (r *RegionsInfo) GetMetaRegions() []*metapb.Region {
r.t.RLock()
Expand Down
6 changes: 3 additions & 3 deletions pkg/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,10 @@ func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption {
}
}

// SetFromHeartbeat sets if the region info comes from the region heartbeat.
func SetFromHeartbeat(fromHeartbeat bool) RegionCreateOption {
// SetSource sets the region info's come from.
func SetSource(source RegionSource) RegionCreateOption {
return func(region *RegionInfo) {
region.fromHeartbeat = fromHeartbeat
region.source = source
}
}

Expand Down
29 changes: 25 additions & 4 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,17 @@ type regionTree struct {
totalSize int64
totalWriteBytesRate float64
totalWriteKeysRate float64
// count the number of regions that not loaded from storage.
notFromStorageRegionsCnt int
}

func newRegionTree() *regionTree {
return &regionTree{
tree: btree.NewG[*regionItem](defaultBTreeDegree),
totalSize: 0,
totalWriteBytesRate: 0,
totalWriteKeysRate: 0,
tree: btree.NewG[*regionItem](defaultBTreeDegree),
totalSize: 0,
totalWriteBytesRate: 0,
totalWriteKeysRate: 0,
notFromStorageRegionsCnt: 0,
}
}

Expand Down Expand Up @@ -112,6 +115,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
t.totalWriteBytesRate += regionWriteBytesRate
t.totalWriteKeysRate += regionWriteKeysRate
if !region.LoadedFromStorage() {
t.notFromStorageRegionsCnt++
}

if !withOverlaps {
overlaps = t.overlaps(item)
Expand All @@ -133,6 +139,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
regionWriteBytesRate, regionWriteKeysRate = old.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
if !old.LoadedFromStorage() {
t.notFromStorageRegionsCnt--
}
}

return result
Expand All @@ -149,6 +158,15 @@ func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) {
regionWriteBytesRate, regionWriteKeysRate = origin.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate

// If the region meta information not loaded from storage anymore, decrease the counter.
if origin.LoadedFromStorage() && !region.LoadedFromStorage() {
t.notFromStorageRegionsCnt++
}
// If the region meta information updated to load from storage, increase the counter.
if !origin.LoadedFromStorage() && region.LoadedFromStorage() {
t.notFromStorageRegionsCnt--
}
}

// remove removes a region if the region is in the tree.
Expand All @@ -168,6 +186,9 @@ func (t *regionTree) remove(region *RegionInfo) {
regionWriteBytesRate, regionWriteKeysRate := result.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
if !region.LoadedFromStorage() {
t.notFromStorageRegionsCnt--
}
t.tree.Delete(item)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
// However it can't solve the race condition of concurrent heartbeats from the same region.
// However, it can't solve the race condition of concurrent heartbeats from the same region.
if overlaps, err = c.AtomicCheckAndPutRegion(region); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat
s.hbStreams.BindStream(storeID, server)
lastBind = time.Now()
}
region := core.RegionFromHeartbeat(request, core.SetFromHeartbeat(true))
region := core.RegionFromHeartbeat(request, core.SetSource(core.Heartbeat))
err = c.HandleRegionHeartbeat(region)
if err != nil {
// TODO: if we need to send the error back to API server.
Expand Down
11 changes: 10 additions & 1 deletion pkg/schedule/prepare_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package schedule
import (
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)

type prepareChecker struct {
Expand Down Expand Up @@ -47,8 +49,15 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool {
checker.prepared = true
return true
}
notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt()
totalRegionsCnt := c.GetTotalRegionCount()
if float64(notLoadedFromRegionsCnt) > float64(totalRegionsCnt)*collectFactor {
log.Info("meta not loaded from region number is satisfied, finish prepare checker", zap.Int("not-from-storage-region", notLoadedFromRegionsCnt), zap.Int("total-region", totalRegionsCnt))
checker.prepared = true
return true
}
// The number of active regions should be more than total region of all stores * collectFactor
if float64(c.GetTotalRegionCount())*collectFactor > float64(checker.sum) {
if float64(totalRegionsCnt)*collectFactor > float64(checker.sum) {
return false
}
for _, store := range c.GetStores() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/endpoint/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core.
}

nextID = region.GetId() + 1
overlaps := f(core.NewRegionInfo(region, nil))
overlaps := f(core.NewRegionInfo(region, nil, core.SetSource(core.Storage)))
for _, item := range overlaps {
if err := se.DeleteRegion(item.GetMeta()); err != nil {
return err
Expand Down
7 changes: 3 additions & 4 deletions pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ func saveRegions(lb *levelDBBackend, n int, ratio int) error {
}

func benchmarkLoadRegions(b *testing.B, n int, ratio int) {
re := require.New(b)
ctx := context.Background()
dir := b.TempDir()
lb, err := newLevelDBBackend(ctx, dir, nil)
Expand All @@ -426,10 +427,8 @@ func benchmarkLoadRegions(b *testing.B, n int, ratio int) {
}()

b.ResetTimer()
err = lb.LoadRegions(ctx, cluster.CheckAndPutRegion)
if err != nil {
b.Fatal(err)
}
err = lb.LoadRegions(context.Background(), cluster.CheckAndPutRegion)
re.NoError(err)
}

var volumes = []struct {
Expand Down
6 changes: 3 additions & 3 deletions pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
log.Info("region syncer start load region")
start := time.Now()
err := storage.TryLoadRegionsOnce(ctx, regionStorage, bc.CheckAndPutRegion)
log.Info("region syncer finished load region", zap.Duration("time-cost", time.Since(start)))
log.Info("region syncer finished load regions", zap.Duration("time-cost", time.Since(start)))
if err != nil {
log.Warn("failed to load regions", errs.ZapError(err))
}
Expand Down Expand Up @@ -183,10 +183,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
core.SetWrittenKeys(stats[i].KeysWritten),
core.SetReadBytes(stats[i].BytesRead),
core.SetReadKeys(stats[i].KeysRead),
core.SetFromHeartbeat(false),
core.SetSource(core.Sync),
)
} else {
region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false))
region = core.NewRegionInfo(r, regionLeader, core.SetSource(core.Sync))
}

origin, _, err := bc.PreCheckPutRegion(region)
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
start = time.Now()

// used to load region from kv storage to cache storage.
if err := storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil {
if err = storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil {
return nil, err
}
log.Info("load regions",
Expand Down
4 changes: 2 additions & 2 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ func TestRegionSizeChanged(t *testing.T) {
core.WithLeader(region.GetPeers()[2]),
core.SetApproximateSize(curMaxMergeSize-1),
core.SetApproximateKeys(curMaxMergeKeys-1),
core.SetFromHeartbeat(true),
core.SetSource(core.Heartbeat),
)
cluster.processRegionHeartbeat(region)
regionID := region.GetID()
Expand All @@ -1002,7 +1002,7 @@ func TestRegionSizeChanged(t *testing.T) {
core.WithLeader(region.GetPeers()[2]),
core.SetApproximateSize(curMaxMergeSize+1),
core.SetApproximateKeys(curMaxMergeKeys+1),
core.SetFromHeartbeat(true),
core.SetSource(core.Heartbeat),
)
cluster.processRegionHeartbeat(region)
re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1267,7 +1267,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
lastBind = time.Now()
}

region := core.RegionFromHeartbeat(request, flowRoundOption, core.SetFromHeartbeat(true))
region := core.RegionFromHeartbeat(request, flowRoundOption)
if region.GetLeader() == nil {
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()
Expand Down
48 changes: 47 additions & 1 deletion tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,52 @@ func TestPrepareChecker(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker"))
}

// ref: https://github.com/tikv/pd/issues/6988
func TestPrepareCheckerWithTransferLeader(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`))
cluster, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true })
defer cluster.Destroy()
re.NoError(err)

err = cluster.RunInitialServers()
re.NoError(err)
cluster.WaitLeader()
leaderServer := cluster.GetServer(cluster.GetLeader())
re.NoError(leaderServer.BootstrapCluster())
rc := leaderServer.GetServer().GetRaftCluster()
re.NotNil(rc)
regionLen := 100
regions := initRegions(regionLen)
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
re.NoError(err)
}
// ensure flush to region storage
time.Sleep(3 * time.Second)
re.True(leaderServer.GetRaftCluster().IsPrepared())

// join new PD
pd2, err := cluster.Join(ctx)
re.NoError(err)
err = pd2.Run()
re.NoError(err)
// waiting for synchronization to complete
time.Sleep(3 * time.Second)
err = cluster.ResignLeader()
re.NoError(err)
re.Equal("pd2", cluster.WaitLeader())

// transfer leader to pd1, can start coordinator immediately.
err = cluster.ResignLeader()
re.NoError(err)
re.Equal("pd1", cluster.WaitLeader())
re.True(rc.IsPrepared())
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker"))
}

func initRegions(regionLen int) []*core.RegionInfo {
allocator := &idAllocator{allocator: mockid.NewIDAllocator()}
regions := make([]*core.RegionInfo, 0, regionLen)
Expand All @@ -264,7 +310,7 @@ func initRegions(regionLen int) []*core.RegionInfo {
{Id: allocator.alloc(), StoreId: uint64(3)},
},
}
region := core.NewRegionInfo(r, r.Peers[0])
region := core.NewRegionInfo(r, r.Peers[0], core.SetSource(core.Heartbeat))
// Here is used to simulate the upgrade process.
if i < regionLen/2 {
buckets := &metapb.Buckets{
Expand Down

0 comments on commit 5d6e908

Please sign in to comment.