diff --git a/pkg/core/region.go b/pkg/core/region.go index 8d0379f266f..6631005a608 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -73,8 +73,26 @@ type RegionInfo struct { queryStats *pdpb.QueryStats flowRoundDivisor uint64 // buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version. - buckets unsafe.Pointer - fromHeartbeat bool + buckets unsafe.Pointer + // source is used to indicate region's source, such as Storage/Sync/Heartbeat. + source RegionSource +} + +// RegionSource is the source of region. +type RegionSource uint32 + +const ( + // Storage means this region's meta info might be stale. + Storage RegionSource = iota + // Sync means this region's meta info is relatively fresher. + Sync + // Heartbeat means this region's meta info is relatively fresher. + Heartbeat +) + +// LoadedFromStorage means this region's meta info loaded from storage. +func (r *RegionInfo) LoadedFromStorage() bool { + return r.source == Storage } // NewRegionInfo creates RegionInfo with region's meta and leader peer. @@ -192,6 +210,7 @@ func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateO approximateKeys: int64(heartbeat.GetApproximateKeys()), interval: heartbeat.GetInterval(), queryStats: heartbeat.GetQueryStats(), + source: Heartbeat, } // scheduling service doesn't need the following fields. @@ -667,11 +686,6 @@ func (r *RegionInfo) IsFlashbackChanged(l *RegionInfo) bool { return r.meta.FlashbackStartTs != l.meta.FlashbackStartTs || r.meta.IsInFlashback != l.meta.IsInFlashback } -// IsFromHeartbeat returns whether the region info is from the region heartbeat. -func (r *RegionInfo) IsFromHeartbeat() bool { - return r.fromHeartbeat -} - func (r *RegionInfo) isInvolved(startKey, endKey []byte) bool { return bytes.Compare(r.GetStartKey(), startKey) >= 0 && (len(endKey) == 0 || (len(r.GetEndKey()) > 0 && bytes.Compare(r.GetEndKey(), endKey) <= 0)) } @@ -711,7 +725,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } saveKV, saveCache, isNew = true, true, true } else { - if !origin.IsFromHeartbeat() { + if origin.LoadedFromStorage() { isNew = true } r := region.GetRegionEpoch() @@ -1323,6 +1337,13 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo return } +// GetClusterNotFromStorageRegionsCnt gets the total count of regions that not loaded from storage anymore +func (r *RegionsInfo) GetClusterNotFromStorageRegionsCnt() int { + r.st.RLock() + defer r.st.RUnlock() + return r.tree.notFromStorageRegionsCnt +} + // GetMetaRegions gets a set of metapb.Region from regionMap func (r *RegionsInfo) GetMetaRegions() []*metapb.Region { r.t.RLock() diff --git a/pkg/core/region_option.go b/pkg/core/region_option.go index ce959905810..36db7cf3460 100644 --- a/pkg/core/region_option.go +++ b/pkg/core/region_option.go @@ -381,10 +381,10 @@ func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption { } } -// SetFromHeartbeat sets if the region info comes from the region heartbeat. -func SetFromHeartbeat(fromHeartbeat bool) RegionCreateOption { +// SetSource sets the region info's come from. +func SetSource(source RegionSource) RegionCreateOption { return func(region *RegionInfo) { - region.fromHeartbeat = fromHeartbeat + region.source = source } } diff --git a/pkg/core/region_tree.go b/pkg/core/region_tree.go index db1c8c28fc7..ed3445de6b6 100644 --- a/pkg/core/region_tree.go +++ b/pkg/core/region_tree.go @@ -61,14 +61,17 @@ type regionTree struct { totalSize int64 totalWriteBytesRate float64 totalWriteKeysRate float64 + // count the number of regions that not loaded from storage. + notFromStorageRegionsCnt int } func newRegionTree() *regionTree { return ®ionTree{ - tree: btree.NewG[*regionItem](defaultBTreeDegree), - totalSize: 0, - totalWriteBytesRate: 0, - totalWriteKeysRate: 0, + tree: btree.NewG[*regionItem](defaultBTreeDegree), + totalSize: 0, + totalWriteBytesRate: 0, + totalWriteKeysRate: 0, + notFromStorageRegionsCnt: 0, } } @@ -112,6 +115,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate() t.totalWriteBytesRate += regionWriteBytesRate t.totalWriteKeysRate += regionWriteKeysRate + if !region.LoadedFromStorage() { + t.notFromStorageRegionsCnt++ + } if !withOverlaps { overlaps = t.overlaps(item) @@ -133,6 +139,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re regionWriteBytesRate, regionWriteKeysRate = old.GetWriteRate() t.totalWriteBytesRate -= regionWriteBytesRate t.totalWriteKeysRate -= regionWriteKeysRate + if !old.LoadedFromStorage() { + t.notFromStorageRegionsCnt-- + } } return result @@ -149,6 +158,15 @@ func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) { regionWriteBytesRate, regionWriteKeysRate = origin.GetWriteRate() t.totalWriteBytesRate -= regionWriteBytesRate t.totalWriteKeysRate -= regionWriteKeysRate + + // If the region meta information not loaded from storage anymore, decrease the counter. + if origin.LoadedFromStorage() && !region.LoadedFromStorage() { + t.notFromStorageRegionsCnt++ + } + // If the region meta information updated to load from storage, increase the counter. + if !origin.LoadedFromStorage() && region.LoadedFromStorage() { + t.notFromStorageRegionsCnt-- + } } // remove removes a region if the region is in the tree. @@ -168,6 +186,9 @@ func (t *regionTree) remove(region *RegionInfo) { regionWriteBytesRate, regionWriteKeysRate := result.GetWriteRate() t.totalWriteBytesRate -= regionWriteBytesRate t.totalWriteKeysRate -= regionWriteKeysRate + if !region.LoadedFromStorage() { + t.notFromStorageRegionsCnt-- + } t.tree.Delete(item) } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index e8489fdaa15..891a645053f 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -547,7 +547,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, // check its validation again here. // - // However it can't solve the race condition of concurrent heartbeats from the same region. + // However, it can't solve the race condition of concurrent heartbeats from the same region. if overlaps, err = c.AtomicCheckAndPutRegion(region); err != nil { return err } diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index 4558688822a..b9b682dfa74 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -161,7 +161,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat s.hbStreams.BindStream(storeID, server) lastBind = time.Now() } - region := core.RegionFromHeartbeat(request, core.SetFromHeartbeat(true)) + region := core.RegionFromHeartbeat(request, core.SetSource(core.Heartbeat)) err = c.HandleRegionHeartbeat(region) if err != nil { // TODO: if we need to send the error back to API server. diff --git a/pkg/schedule/prepare_checker.go b/pkg/schedule/prepare_checker.go index cd4b076b2d6..c7faa57af81 100644 --- a/pkg/schedule/prepare_checker.go +++ b/pkg/schedule/prepare_checker.go @@ -17,8 +17,10 @@ package schedule import ( "time" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/utils/syncutil" + "go.uber.org/zap" ) type prepareChecker struct { @@ -47,8 +49,15 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool { checker.prepared = true return true } + notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt() + totalRegionsCnt := c.GetTotalRegionCount() + if float64(notLoadedFromRegionsCnt) > float64(totalRegionsCnt)*collectFactor { + log.Info("meta not loaded from region number is satisfied, finish prepare checker", zap.Int("not-from-storage-region", notLoadedFromRegionsCnt), zap.Int("total-region", totalRegionsCnt)) + checker.prepared = true + return true + } // The number of active regions should be more than total region of all stores * collectFactor - if float64(c.GetTotalRegionCount())*collectFactor > float64(checker.sum) { + if float64(totalRegionsCnt)*collectFactor > float64(checker.sum) { return false } for _, store := range c.GetStores() { diff --git a/pkg/storage/endpoint/meta.go b/pkg/storage/endpoint/meta.go index 4ba9eb42c5c..d83e2b386c8 100644 --- a/pkg/storage/endpoint/meta.go +++ b/pkg/storage/endpoint/meta.go @@ -203,7 +203,7 @@ func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core. } nextID = region.GetId() + 1 - overlaps := f(core.NewRegionInfo(region, nil)) + overlaps := f(core.NewRegionInfo(region, nil, core.SetSource(core.Storage))) for _, item := range overlaps { if err := se.DeleteRegion(item.GetMeta()); err != nil { return err diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 77f94183403..dbb5a03b264 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -407,6 +407,7 @@ func saveRegions(lb *levelDBBackend, n int, ratio int) error { } func benchmarkLoadRegions(b *testing.B, n int, ratio int) { + re := require.New(b) ctx := context.Background() dir := b.TempDir() lb, err := newLevelDBBackend(ctx, dir, nil) @@ -426,10 +427,8 @@ func benchmarkLoadRegions(b *testing.B, n int, ratio int) { }() b.ResetTimer() - err = lb.LoadRegions(ctx, cluster.CheckAndPutRegion) - if err != nil { - b.Fatal(err) - } + err = lb.LoadRegions(context.Background(), cluster.CheckAndPutRegion) + re.NoError(err) } var volumes = []struct { diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index ac409f90115..f61ce320a74 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -95,7 +95,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { log.Info("region syncer start load region") start := time.Now() err := storage.TryLoadRegionsOnce(ctx, regionStorage, bc.CheckAndPutRegion) - log.Info("region syncer finished load region", zap.Duration("time-cost", time.Since(start))) + log.Info("region syncer finished load regions", zap.Duration("time-cost", time.Since(start))) if err != nil { log.Warn("failed to load regions", errs.ZapError(err)) } @@ -183,10 +183,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { core.SetWrittenKeys(stats[i].KeysWritten), core.SetReadBytes(stats[i].BytesRead), core.SetReadKeys(stats[i].KeysRead), - core.SetFromHeartbeat(false), + core.SetSource(core.Sync), ) } else { - region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false)) + region = core.NewRegionInfo(r, regionLeader, core.SetSource(core.Sync)) } origin, _, err := bc.PreCheckPutRegion(region) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 332a8b27a3f..094dd482107 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -593,7 +593,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { start = time.Now() // used to load region from kv storage to cache storage. - if err := storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil { + if err = storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil { return nil, err } log.Info("load regions", diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 31f6bb357c3..b7b9dcfb736 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1002,7 +1002,7 @@ func TestRegionSizeChanged(t *testing.T) { core.WithLeader(region.GetPeers()[2]), core.SetApproximateSize(curMaxMergeSize-1), core.SetApproximateKeys(curMaxMergeKeys-1), - core.SetFromHeartbeat(true), + core.SetSource(core.Heartbeat), ) cluster.processRegionHeartbeat(region) regionID := region.GetID() @@ -1012,7 +1012,7 @@ func TestRegionSizeChanged(t *testing.T) { core.WithLeader(region.GetPeers()[2]), core.SetApproximateSize(curMaxMergeSize+1), core.SetApproximateKeys(curMaxMergeKeys+1), - core.SetFromHeartbeat(true), + core.SetSource(core.Heartbeat), ) cluster.processRegionHeartbeat(region) re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) diff --git a/server/grpc_service.go b/server/grpc_service.go index febb666c22d..d9f37a64d7d 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1316,7 +1316,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error lastBind = time.Now() } - region := core.RegionFromHeartbeat(request, flowRoundOption, core.SetFromHeartbeat(true)) + region := core.RegionFromHeartbeat(request, flowRoundOption) if region.GetLeader() == nil { log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil)) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc() diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index f672f82f1f6..b73d4abb9b5 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -246,6 +246,52 @@ func TestPrepareChecker(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker")) } +// ref: https://github.com/tikv/pd/issues/6988 +func TestPrepareCheckerWithTransferLeader(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`)) + cluster, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true }) + defer cluster.Destroy() + re.NoError(err) + + err = cluster.RunInitialServers() + re.NoError(err) + cluster.WaitLeader() + leaderServer := cluster.GetServer(cluster.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + rc := leaderServer.GetServer().GetRaftCluster() + re.NotNil(rc) + regionLen := 100 + regions := initRegions(regionLen) + for _, region := range regions { + err = rc.HandleRegionHeartbeat(region) + re.NoError(err) + } + // ensure flush to region storage + time.Sleep(3 * time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + + // join new PD + pd2, err := cluster.Join(ctx) + re.NoError(err) + err = pd2.Run() + re.NoError(err) + // waiting for synchronization to complete + time.Sleep(3 * time.Second) + err = cluster.ResignLeader() + re.NoError(err) + re.Equal("pd2", cluster.WaitLeader()) + + // transfer leader to pd1, can start coordinator immediately. + err = cluster.ResignLeader() + re.NoError(err) + re.Equal("pd1", cluster.WaitLeader()) + re.True(rc.IsPrepared()) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker")) +} + func initRegions(regionLen int) []*core.RegionInfo { allocator := &idAllocator{allocator: mockid.NewIDAllocator()} regions := make([]*core.RegionInfo, 0, regionLen) @@ -264,7 +310,7 @@ func initRegions(regionLen int) []*core.RegionInfo { {Id: allocator.alloc(), StoreId: uint64(3)}, }, } - region := core.NewRegionInfo(r, r.Peers[0]) + region := core.NewRegionInfo(r, r.Peers[0], core.SetSource(core.Heartbeat)) // Here is used to simulate the upgrade process. if i < regionLen/2 { buckets := &metapb.Buckets{