From ce4601ba7377ef9b927a9b36345ec0878b041ead Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 8 Feb 2023 00:13:58 +0800 Subject: [PATCH] store/copr: add a param "limit" to region cache's `SplitRegionRanges` (#40411) (#41152) close pingcap/tidb#38436 --- ddl/backfilling.go | 6 +++--- store/copr/batch_coprocessor.go | 2 +- store/copr/coprocessor_test.go | 21 +++++++++++++-------- store/copr/region_cache.go | 14 ++++++++++---- 4 files changed, 27 insertions(+), 16 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 5ea026184f13a..58de2d07acb95 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -359,7 +359,7 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { // splitTableRanges uses PD region's key ranges to split the backfilling table key range space, // to speed up backfilling data in table with disperse handle. // The `t` should be a non-partitioned table or a partition. -func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey kv.Key) ([]kv.KeyRange, error) { +func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey kv.Key, limit int) ([]kv.KeyRange, error) { logutil.BgLogger().Info("[ddl] split table range from PD", zap.Int64("physicalTableID", t.GetPhysicalID()), zap.String("start key", hex.EncodeToString(startKey)), @@ -374,7 +374,7 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey maxSleep := 10000 // ms bo := backoff.NewBackofferWithVars(context.Background(), maxSleep, nil) rc := copr.NewRegionCache(s.GetRegionCache()) - ranges, err := rc.SplitRegionRanges(bo, []kv.KeyRange{kvRange}) + ranges, err := rc.SplitRegionRanges(bo, []kv.KeyRange{kvRange}, limit) if err != nil { return nil, errors.Trace(err) } @@ -823,7 +823,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic } for { - kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey) + kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey, backfillTaskChanSize) if err != nil { return errors.Trace(err) } diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 9bc53d3aabc45..056957855e280 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -530,7 +530,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach rangesLen = 0 for i, ranges := range rangesForEachPhysicalTable { rangesLen += ranges.Len() - locations, err := cache.SplitKeyRangesByLocations(bo, ranges) + locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) if err != nil { return nil, errors.Trace(err) } diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index ee6f79ed3dc82..a291f947f074c 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -371,46 +371,51 @@ func TestSplitRegionRanges(t *testing.T) { bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) - ranges, err := cache.SplitRegionRanges(bo, BuildKeyRanges("a", "c")) + ranges, err := cache.SplitRegionRanges(bo, BuildKeyRanges("a", "c"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 1) rangeEqual(t, ranges, "a", "c") - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("h", "y")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("h", "y"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 3) rangeEqual(t, ranges, "h", "n", "n", "t", "t", "y") - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "z")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "z"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 2) rangeEqual(t, ranges, "s", "t", "t", "z") - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "s")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "s"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 1) rangeEqual(t, ranges, "s", "s") - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "t")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "t"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 1) rangeEqual(t, ranges, "t", "t") - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "u")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "u"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 1) rangeEqual(t, ranges, "t", "u") - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("u", "z")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("u", "z"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 1) rangeEqual(t, ranges, "u", "z") // min --> max - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "z")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "z"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 4) rangeEqual(t, ranges, "a", "g", "g", "n", "n", "t", "t", "z") + + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "z"), 3) + require.NoError(t, err) + require.Len(t, ranges, 3) + rangeEqual(t, ranges, "a", "g", "g", "n", "n", "t") } func TestRebuild(t *testing.T) { diff --git a/store/copr/region_cache.go b/store/copr/region_cache.go index a3fd20e036d43..73d718cab423a 100644 --- a/store/copr/region_cache.go +++ b/store/copr/region_cache.go @@ -43,10 +43,10 @@ func NewRegionCache(rc *tikv.RegionCache) *RegionCache { } // SplitRegionRanges gets the split ranges from pd region. -func (c *RegionCache) SplitRegionRanges(bo *Backoffer, keyRanges []kv.KeyRange) ([]kv.KeyRange, error) { +func (c *RegionCache) SplitRegionRanges(bo *Backoffer, keyRanges []kv.KeyRange, limit int) ([]kv.KeyRange, error) { ranges := NewKeyRanges(keyRanges) - locations, err := c.SplitKeyRangesByLocations(bo, ranges) + locations, err := c.SplitKeyRangesByLocations(bo, ranges, limit) if err != nil { return nil, derr.ToTiDBErr(err) } @@ -123,10 +123,16 @@ func (l *LocationKeyRanges) splitKeyRangesByBuckets() []*LocationKeyRanges { return res } +// UnspecifiedLimit means no limit. +const UnspecifiedLimit = -1 + // SplitKeyRangesByLocations splits the KeyRanges by logical info in the cache. -func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) { +func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) { res := make([]*LocationKeyRanges, 0) for ranges.Len() > 0 { + if limit != UnspecifiedLimit && len(res) >= limit { + break + } loc, err := c.LocateKey(bo.TiKVBackoffer(), ranges.At(0).StartKey) if err != nil { return res, derr.ToTiDBErr(err) @@ -177,7 +183,7 @@ func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges // // TODO(youjiali1995): Try to do it in one round and reduce allocations if bucket is not enabled. func (c *RegionCache) SplitKeyRangesByBuckets(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) { - locs, err := c.SplitKeyRangesByLocations(bo, ranges) + locs, err := c.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) if err != nil { return nil, derr.ToTiDBErr(err) }