Skip to content

Commit

Permalink
store/copr: add a param "limit" to region cache's SplitRegionRanges (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 7, 2023
1 parent 3eeca82 commit ce4601b
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 16 deletions.
6 changes: 3 additions & 3 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
// splitTableRanges uses PD region's key ranges to split the backfilling table key range space,
// to speed up backfilling data in table with disperse handle.
// The `t` should be a non-partitioned table or a partition.
func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey kv.Key) ([]kv.KeyRange, error) {
func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey kv.Key, limit int) ([]kv.KeyRange, error) {
logutil.BgLogger().Info("[ddl] split table range from PD",
zap.Int64("physicalTableID", t.GetPhysicalID()),
zap.String("start key", hex.EncodeToString(startKey)),
Expand All @@ -374,7 +374,7 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey
maxSleep := 10000 // ms
bo := backoff.NewBackofferWithVars(context.Background(), maxSleep, nil)
rc := copr.NewRegionCache(s.GetRegionCache())
ranges, err := rc.SplitRegionRanges(bo, []kv.KeyRange{kvRange})
ranges, err := rc.SplitRegionRanges(bo, []kv.KeyRange{kvRange}, limit)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -823,7 +823,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
}

for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey)
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey, backfillTaskChanSize)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
rangesLen = 0
for i, ranges := range rangesForEachPhysicalTable {
rangesLen += ranges.Len()
locations, err := cache.SplitKeyRangesByLocations(bo, ranges)
locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
21 changes: 13 additions & 8 deletions store/copr/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,46 +371,51 @@ func TestSplitRegionRanges(t *testing.T) {

bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil)

ranges, err := cache.SplitRegionRanges(bo, BuildKeyRanges("a", "c"))
ranges, err := cache.SplitRegionRanges(bo, BuildKeyRanges("a", "c"), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, ranges, 1)
rangeEqual(t, ranges, "a", "c")

ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("h", "y"))
ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("h", "y"), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, ranges, 3)
rangeEqual(t, ranges, "h", "n", "n", "t", "t", "y")

ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "z"))
ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "z"), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, ranges, 2)
rangeEqual(t, ranges, "s", "t", "t", "z")

ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "s"))
ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "s"), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, ranges, 1)
rangeEqual(t, ranges, "s", "s")

ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "t"))
ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "t"), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, ranges, 1)
rangeEqual(t, ranges, "t", "t")

ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "u"))
ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "u"), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, ranges, 1)
rangeEqual(t, ranges, "t", "u")

ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("u", "z"))
ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("u", "z"), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, ranges, 1)
rangeEqual(t, ranges, "u", "z")

// min --> max
ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "z"))
ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "z"), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, ranges, 4)
rangeEqual(t, ranges, "a", "g", "g", "n", "n", "t", "t", "z")

ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "z"), 3)
require.NoError(t, err)
require.Len(t, ranges, 3)
rangeEqual(t, ranges, "a", "g", "g", "n", "n", "t")
}

func TestRebuild(t *testing.T) {
Expand Down
14 changes: 10 additions & 4 deletions store/copr/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ func NewRegionCache(rc *tikv.RegionCache) *RegionCache {
}

// SplitRegionRanges gets the split ranges from pd region.
func (c *RegionCache) SplitRegionRanges(bo *Backoffer, keyRanges []kv.KeyRange) ([]kv.KeyRange, error) {
func (c *RegionCache) SplitRegionRanges(bo *Backoffer, keyRanges []kv.KeyRange, limit int) ([]kv.KeyRange, error) {
ranges := NewKeyRanges(keyRanges)

locations, err := c.SplitKeyRangesByLocations(bo, ranges)
locations, err := c.SplitKeyRangesByLocations(bo, ranges, limit)
if err != nil {
return nil, derr.ToTiDBErr(err)
}
Expand Down Expand Up @@ -123,10 +123,16 @@ func (l *LocationKeyRanges) splitKeyRangesByBuckets() []*LocationKeyRanges {
return res
}

// UnspecifiedLimit means no limit.
const UnspecifiedLimit = -1

// SplitKeyRangesByLocations splits the KeyRanges by logical info in the cache.
func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) {
func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) {
res := make([]*LocationKeyRanges, 0)
for ranges.Len() > 0 {
if limit != UnspecifiedLimit && len(res) >= limit {
break
}
loc, err := c.LocateKey(bo.TiKVBackoffer(), ranges.At(0).StartKey)
if err != nil {
return res, derr.ToTiDBErr(err)
Expand Down Expand Up @@ -177,7 +183,7 @@ func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges
//
// TODO(youjiali1995): Try to do it in one round and reduce allocations if bucket is not enabled.
func (c *RegionCache) SplitKeyRangesByBuckets(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) {
locs, err := c.SplitKeyRangesByLocations(bo, ranges)
locs, err := c.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit)
if err != nil {
return nil, derr.ToTiDBErr(err)
}
Expand Down

0 comments on commit ce4601b

Please sign in to comment.