From f842cd9ffbb29b424e7743cde34d43735205dc85 Mon Sep 17 00:00:00 2001 From: tangenta Date: Sun, 29 Jan 2023 16:05:54 +0800 Subject: [PATCH] store/copr: add a param "limit" to region cache's `SplitRegionRanges` (#40411) close pingcap/tidb#38436 --- ddl/backfilling.go | 8 ++++---- store/copr/batch_coprocessor.go | 4 ++-- store/copr/coprocessor_test.go | 21 +++++++++++++-------- store/copr/region_cache.go | 14 ++++++++++---- 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index aae3a9b75790e..5036e56c096d8 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -489,7 +489,7 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { // splitTableRanges uses PD region's key ranges to split the backfilling table key range space, // to speed up backfilling data in table with disperse handle. // The `t` should be a non-partitioned table or a partition. -func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey kv.Key) ([]kv.KeyRange, error) { +func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey kv.Key, limit int) ([]kv.KeyRange, error) { logutil.BgLogger().Info("[ddl] split table range from PD", zap.Int64("physicalTableID", t.GetPhysicalID()), zap.String("start key", hex.EncodeToString(startKey)), @@ -504,7 +504,7 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey maxSleep := 10000 // ms bo := backoff.NewBackofferWithVars(context.Background(), maxSleep, nil) rc := copr.NewRegionCache(s.GetRegionCache()) - ranges, err := rc.SplitRegionRanges(bo, []kv.KeyRange{kvRange}) + ranges, err := rc.SplitRegionRanges(bo, []kv.KeyRange{kvRange}, limit) if err != nil { return nil, errors.Trace(err) } @@ -981,7 +981,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic } for { - kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey) + kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey, backfillTaskChanSize) if err != nil { return errors.Trace(err) } @@ -1093,7 +1093,7 @@ func (*ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, pTb isFirstOps := true bJobs := make([]*BackfillJob, 0, genTaskBatch) for { - kvRanges, err := splitTableRanges(pTbl, reorgInfo.d.store, startKey, endKey) + kvRanges, err := splitTableRanges(pTbl, reorgInfo.d.store, startKey, endKey, genTaskBatch) if err != nil { return errors.Trace(err) } diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 9f9d4ef6fb002..065fc3621dabd 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -567,7 +567,7 @@ func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, for i, ranges := range rangesForEachPhysicalTable { rangesLen += ranges.Len() - locations, err := cache.SplitKeyRangesByLocations(bo, ranges) + locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) if err != nil { return nil, errors.Trace(err) } @@ -677,7 +677,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach rangesLen = 0 for i, ranges := range rangesForEachPhysicalTable { rangesLen += ranges.Len() - locations, err := cache.SplitKeyRangesByLocations(bo, ranges) + locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) if err != nil { return nil, errors.Trace(err) } diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index 36ae88758bbc5..f7b15ebfd682d 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -381,46 +381,51 @@ func TestSplitRegionRanges(t *testing.T) { bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) - ranges, err := cache.SplitRegionRanges(bo, BuildKeyRanges("a", "c")) + ranges, err := cache.SplitRegionRanges(bo, BuildKeyRanges("a", "c"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 1) rangeEqual(t, ranges, "a", "c") - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("h", "y")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("h", "y"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 3) rangeEqual(t, ranges, "h", "n", "n", "t", "t", "y") - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "z")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "z"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 2) rangeEqual(t, ranges, "s", "t", "t", "z") - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "s")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "s"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 1) rangeEqual(t, ranges, "s", "s") - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "t")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "t"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 1) rangeEqual(t, ranges, "t", "t") - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "u")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "u"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 1) rangeEqual(t, ranges, "t", "u") - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("u", "z")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("u", "z"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 1) rangeEqual(t, ranges, "u", "z") // min --> max - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "z")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "z"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 4) rangeEqual(t, ranges, "a", "g", "g", "n", "n", "t", "t", "z") + + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "z"), 3) + require.NoError(t, err) + require.Len(t, ranges, 3) + rangeEqual(t, ranges, "a", "g", "g", "n", "n", "t") } func TestRebuild(t *testing.T) { diff --git a/store/copr/region_cache.go b/store/copr/region_cache.go index 97c3d705c223b..aa33656c39cca 100644 --- a/store/copr/region_cache.go +++ b/store/copr/region_cache.go @@ -42,10 +42,10 @@ func NewRegionCache(rc *tikv.RegionCache) *RegionCache { } // SplitRegionRanges gets the split ranges from pd region. -func (c *RegionCache) SplitRegionRanges(bo *Backoffer, keyRanges []kv.KeyRange) ([]kv.KeyRange, error) { +func (c *RegionCache) SplitRegionRanges(bo *Backoffer, keyRanges []kv.KeyRange, limit int) ([]kv.KeyRange, error) { ranges := NewKeyRanges(keyRanges) - locations, err := c.SplitKeyRangesByLocations(bo, ranges) + locations, err := c.SplitKeyRangesByLocations(bo, ranges, limit) if err != nil { return nil, derr.ToTiDBErr(err) } @@ -122,10 +122,16 @@ func (l *LocationKeyRanges) splitKeyRangesByBuckets() []*LocationKeyRanges { return res } +// UnspecifiedLimit means no limit. +const UnspecifiedLimit = -1 + // SplitKeyRangesByLocations splits the KeyRanges by logical info in the cache. -func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) { +func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) { res := make([]*LocationKeyRanges, 0) for ranges.Len() > 0 { + if limit != UnspecifiedLimit && len(res) >= limit { + break + } loc, err := c.LocateKey(bo.TiKVBackoffer(), ranges.At(0).StartKey) if err != nil { return res, derr.ToTiDBErr(err) @@ -176,7 +182,7 @@ func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges // // TODO(youjiali1995): Try to do it in one round and reduce allocations if bucket is not enabled. func (c *RegionCache) SplitKeyRangesByBuckets(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) { - locs, err := c.SplitKeyRangesByLocations(bo, ranges) + locs, err := c.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) if err != nil { return nil, derr.ToTiDBErr(err) }