From 4d586f869cf43b1bd0b73654e51c033085164971 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 9 Oct 2023 18:07:19 +0800 Subject: [PATCH 1/4] --wip-- [skip ci] --- br/pkg/lightning/backend/external/engine.go | 9 ++- br/pkg/lightning/backend/local/engine.go | 15 +++- br/pkg/lightning/backend/local/local.go | 78 ++++++++++---------- br/pkg/lightning/backend/local/local_test.go | 2 + br/pkg/lightning/common/engine.go | 5 +- br/pkg/lightning/common/ingest_data.go | 6 ++ 6 files changed, 71 insertions(+), 44 deletions(-) diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go index a9d9234614e86..ce0a6dd35c350 100644 --- a/br/pkg/lightning/backend/external/engine.go +++ b/br/pkg/lightning/backend/external/engine.go @@ -97,9 +97,12 @@ func NewExternalEngine( // LoadIngestData loads the data from the external storage to memory in [start, // end) range, so local backend can ingest it. The used byte slice of ingest data // are allocated from Engine.bufPool and must be released by -// MemoryIngestData.Finish(). For external.Engine, LoadIngestData must be called -// with strictly increasing start / end key. -func (e *Engine) LoadIngestData(ctx context.Context, start, end []byte) (common.IngestData, error) { +// MemoryIngestData.DecRef(). +func (e *Engine) LoadIngestData( + ctx context.Context, + regionRanges []common.Range, + outCh chan<- common.DataAndRange, +) error { if bytes.Equal(start, end) { return nil, errors.Errorf("start key and end key must not be the same: %s", hex.EncodeToString(start)) diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index e9cc49ce18c7f..88b6c560562bf 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -1046,8 +1046,19 @@ func (e *Engine) Finish(totalBytes, totalCount int64) { // LoadIngestData return (local) Engine itself because Engine has implemented // IngestData interface. -func (e *Engine) LoadIngestData(_ context.Context, _, _ []byte) (common.IngestData, error) { - return e, nil +func (e *Engine) LoadIngestData( + ctx context.Context, + regionRanges []common.Range, + outCh chan<- common.DataAndRange, +) error { + for _, r := range regionRanges { + select { + case <-ctx.Done(): + return ctx.Err() + case outCh <- common.DataAndRange{Data: e, Range: r}: + } + } + return nil } type sstMeta struct { diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index bf2e08e54d85a..56e035df5c6ad 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1166,8 +1166,7 @@ func (local *Backend) generateAndSendJob( jobWg *sync.WaitGroup, ) error { logger := log.FromContext(ctx) - // TODO(lance6716): external engine should also support split into smaller ranges - // to improve concurrency. + // for external engine, it will split into smaller data inside LoadIngestData if localEngine, ok := engine.(*Engine); ok { // when use dynamic region feature, the region may be very big, we need // to split to smaller ranges to increase the concurrency. @@ -1186,50 +1185,55 @@ func (local *Backend) generateAndSendJob( logger.Debug("the ranges length write to tikv", zap.Int("length", len(jobRanges))) - ctx, cancel := context.WithCancel(ctx) - defer cancel() eg, egCtx := errgroup.WithContext(ctx) - eg.SetLimit(local.WorkerConcurrency) - for _, jobRange := range jobRanges { - r := jobRange - data, err := engine.LoadIngestData(ctx, r.Start, r.End) - if err != nil { - cancel() - err2 := eg.Wait() - if err2 != nil && !common.IsContextCanceledError(err2) { - logger.Warn("meet error when canceling", log.ShortError(err2)) - } - return errors.Trace(err) - } - eg.Go(func() error { - if egCtx.Err() != nil { - return nil - } - failpoint.Inject("beforeGenerateJob", nil) - jobs, err := local.generateJobForRange(egCtx, data, r, regionSplitSize, regionSplitKeys) - if err != nil { - if common.IsContextCanceledError(err) { - return nil - } - return err - } - for _, job := range jobs { - job.ref(jobWg) + dataAndRangeCh := make(chan common.DataAndRange) + for i := 0; i < local.WorkerConcurrency; i++ { + eg.Go(func() error { + for { select { case <-egCtx.Done(): - // this job is not put into jobToWorkerCh - job.done(jobWg) - // if the context is canceled, it means worker has error, the first error can be - // found by worker's error group LATER. if this function returns an error it will - // seize the "first error". return nil - case jobToWorkerCh <- job: + case p, ok := <-dataAndRangeCh: + if !ok { + return nil + } + + failpoint.Inject("beforeGenerateJob", nil) + jobs, err := local.generateJobForRange(egCtx, p.Data, p.Range, regionSplitSize, regionSplitKeys) + if err != nil { + if common.IsContextCanceledError(err) { + return nil + } + return err + } + for _, job := range jobs { + job.ref(jobWg) + select { + case <-egCtx.Done(): + // this job is not put into jobToWorkerCh + job.done(jobWg) + // if the context is canceled, it means worker has error, the first error can be + // found by worker's error group LATER. if this function returns an error it will + // seize the "first error". + return nil + case jobToWorkerCh <- job: + } + } } } - return nil }) } + + eg.Go(func() error { + err := engine.LoadIngestData(egCtx, jobRanges, dataAndRangeCh) + if err != nil { + return errors.Trace(err) + } + close(dataAndRangeCh) + return nil + }) + return eg.Wait() } diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 3f4047909f963..f2b6530b72610 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -1746,6 +1746,8 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) { jobWg.Done() } jobWg.Wait() + + // TODO(lance6716): test external engine } func getSuccessInjectedBehaviour() []injectedBehaviour { diff --git a/br/pkg/lightning/common/engine.go b/br/pkg/lightning/common/engine.go index a7d0ad06dca3f..136e4edd0aa86 100644 --- a/br/pkg/lightning/common/engine.go +++ b/br/pkg/lightning/common/engine.go @@ -31,8 +31,9 @@ type Range struct { type Engine interface { // ID is the identifier of an engine. ID() string - // LoadIngestData returns an IngestData that contains the data in [start, end). - LoadIngestData(ctx context.Context, start, end []byte) (IngestData, error) + // LoadIngestData sends DataAndRange to outCh. Implementation may choose smaller + // ranges than given regionRanges, and data is contained in its range. + LoadIngestData(ctx context.Context, regionRanges []Range, outCh chan<- DataAndRange) error // KVStatistics returns the total kv size and total kv count. KVStatistics() (totalKVSize int64, totalKVCount int64) // ImportedStatistics returns the imported kv size and imported kv count. diff --git a/br/pkg/lightning/common/ingest_data.go b/br/pkg/lightning/common/ingest_data.go index a1567cb20857c..33fc4f434b3c4 100644 --- a/br/pkg/lightning/common/ingest_data.go +++ b/br/pkg/lightning/common/ingest_data.go @@ -56,3 +56,9 @@ type ForwardIter interface { // Error return current error on this iter. Error() error } + +// DataAndRange is a pair of IngestData and Range. +type DataAndRange struct { + Data IngestData + Range Range +} From 7f7ac1cdb682e36b68e103c2bcc5721989fd0b0c Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 10 Oct 2023 13:28:20 +0800 Subject: [PATCH 2/4] finish Signed-off-by: lance6716 --- br/pkg/lightning/backend/backend.go | 13 +- br/pkg/lightning/backend/external/engine.go | 168 ++++++++++++++----- br/pkg/lightning/backend/external/split.go | 5 + br/pkg/lightning/backend/local/local.go | 1 + br/pkg/lightning/backend/local/local_test.go | 71 +++++++- disttask/importinto/planner.go | 1 + disttask/importinto/proto.go | 1 + disttask/importinto/scheduler.go | 17 +- 8 files changed, 216 insertions(+), 61 deletions(-) diff --git a/br/pkg/lightning/backend/backend.go b/br/pkg/lightning/backend/backend.go index d50731dac713a..1c6d77a291a9b 100644 --- a/br/pkg/lightning/backend/backend.go +++ b/br/pkg/lightning/backend/backend.go @@ -103,12 +103,13 @@ type LocalEngineConfig struct { // ExternalEngineConfig is the configuration used for local backend external engine. type ExternalEngineConfig struct { - StorageURI string - DataFiles []string - StatFiles []string - MinKey []byte - MaxKey []byte - SplitKeys [][]byte + StorageURI string + DataFiles []string + StatFiles []string + MinKey []byte + MaxKey []byte + SplitKeys [][]byte + RegionSplitSize int64 // TotalFileSize can be an estimated value. TotalFileSize int64 // TotalKVCount can be an estimated value. diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go index ce0a6dd35c350..97466ed37f68e 100644 --- a/br/pkg/lightning/backend/external/engine.go +++ b/br/pkg/lightning/backend/external/engine.go @@ -18,12 +18,14 @@ import ( "bytes" "context" "encoding/hex" + "slices" "sort" "time" "github.com/cockroachdb/pebble" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/storage" @@ -31,19 +33,19 @@ import ( "github.com/pingcap/tidb/util/logutil" "go.uber.org/atomic" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) // Engine stored sorted key/value pairs in an external storage. type Engine struct { - storage storage.ExternalStorage - dataFiles []string - statsFiles []string - minKey []byte - maxKey []byte - splitKeys [][]byte - bufPool *membuf.Pool - - iter *MergeKVIter + storage storage.ExternalStorage + dataFiles []string + statsFiles []string + minKey []byte + maxKey []byte + splitKeys [][]byte + regionSplitSize int64 + bufPool *membuf.Pool keyAdapter common.KeyAdapter duplicateDetection bool @@ -66,6 +68,7 @@ func NewExternalEngine( minKey []byte, maxKey []byte, splitKeys [][]byte, + regionSplitSize int64, keyAdapter common.KeyAdapter, duplicateDetection bool, duplicateDB *pebble.DB, @@ -81,6 +84,7 @@ func NewExternalEngine( minKey: minKey, maxKey: maxKey, splitKeys: splitKeys, + regionSplitSize: regionSplitSize, bufPool: membuf.NewPool(), keyAdapter: keyAdapter, duplicateDetection: duplicateDetection, @@ -103,6 +107,78 @@ func (e *Engine) LoadIngestData( regionRanges []common.Range, outCh chan<- common.DataAndRange, ) error { + // estimate we will open at most 1000 files, so if e.dataFiles is small we can + // try to concurrently process ranges. + concurrency := int(MergeSortOverlapThreshold) / len(e.dataFiles) + numPerGroup := len(regionRanges) / concurrency + rangeGroups := make([][]common.Range, 0, concurrency) + + if numPerGroup > 0 { + for i := 0; i < concurrency-1; i++ { + rangeGroups = append(rangeGroups, regionRanges[i*numPerGroup:(i+1)*numPerGroup]) + } + } + rangeGroups = append(rangeGroups, regionRanges[(concurrency-1)*numPerGroup:]) + + eg, egCtx := errgroup.WithContext(ctx) + for _, ranges := range rangeGroups { + ranges := ranges + eg.Go(func() error { + iter, err := e.createMergeIter(egCtx, ranges[0].Start) + if err != nil { + return errors.Trace(err) + } + defer iter.Close() + + if !iter.Next() { + return iter.Error() + } + for _, r := range ranges { + results, err := e.loadIngestData(egCtx, iter, r.Start, r.End) + if err != nil { + return errors.Trace(err) + } + for _, result := range results { + select { + case <-egCtx.Done(): + return egCtx.Err() + case outCh <- result: + } + } + } + return nil + }) + } + return eg.Wait() +} + +func (e *Engine) buildIngestData(keys, values [][]byte, buf *membuf.Buffer) *MemoryIngestData { + return &MemoryIngestData{ + keyAdapter: e.keyAdapter, + duplicateDetection: e.duplicateDetection, + duplicateDB: e.duplicateDB, + dupDetectOpt: e.dupDetectOpt, + keys: keys, + values: values, + ts: e.ts, + memBuf: buf, + refCnt: atomic.NewInt64(0), + importedKVSize: e.importedKVSize, + importedKVCount: e.importedKVCount, + } +} + +// LargeRegionSplitDataThreshold is exposed for test. +var LargeRegionSplitDataThreshold = int(config.SplitRegionSize) + +// loadIngestData loads the data from the external storage to memory in [start, +// end) range, and if the range is large enough, it will return multiple data. +// The input `iter` should be called Next() before calling this function. +func (e *Engine) loadIngestData( + ctx context.Context, + iter *MergeKVIter, + start, end []byte, +) ([]common.DataAndRange, error) { if bytes.Equal(start, end) { return nil, errors.Errorf("start key and end key must not be the same: %s", hex.EncodeToString(start)) @@ -112,54 +188,59 @@ func (e *Engine) LoadIngestData( keys := make([][]byte, 0, 1024) values := make([][]byte, 0, 1024) memBuf := e.bufPool.NewBuffer() - - if e.iter == nil { - iter, err := e.createMergeIter(ctx, start) - if err != nil { - return nil, errors.Trace(err) - } - e.iter = iter - } else { - // there should be a key that just exceeds the end key in last LoadIngestData - // invocation. - k, v := e.iter.Key(), e.iter.Value() + cnt := 0 + size := 0 + largeRegion := e.regionSplitSize > 2*int64(config.SplitRegionSize) + ret := make([]common.DataAndRange, 0, 1) + curStart := start + + // there should be a key that just exceeds the end key in last loadIngestData + // invocation. + k, v := iter.Key(), iter.Value() + if len(k) > 0 { keys = append(keys, memBuf.AddBytes(k)) values = append(values, memBuf.AddBytes(v)) + cnt++ + size += len(k) + len(v) } - cnt := 0 - for e.iter.Next() { - cnt++ - k, v := e.iter.Key(), e.iter.Value() + for iter.Next() { + k, v = iter.Key(), iter.Value() if bytes.Compare(k, start) < 0 { continue } if bytes.Compare(k, end) >= 0 { break } + if largeRegion && size > LargeRegionSplitDataThreshold { + curKey := slices.Clone(k) + ret = append(ret, common.DataAndRange{ + Data: e.buildIngestData(keys, values, memBuf), + Range: common.Range{Start: curStart, End: curKey}, + }) + keys = make([][]byte, 0, 1024) + values = make([][]byte, 0, 1024) + size = 0 + curStart = curKey + } + keys = append(keys, memBuf.AddBytes(k)) values = append(values, memBuf.AddBytes(v)) + cnt++ + size += len(k) + len(v) } - if e.iter.Error() != nil { - return nil, errors.Trace(e.iter.Error()) + if iter.Error() != nil { + return nil, errors.Trace(iter.Error()) } logutil.Logger(ctx).Info("load data from external storage", zap.Duration("cost time", time.Since(now)), zap.Int("iterated count", cnt)) - return &MemoryIngestData{ - keyAdapter: e.keyAdapter, - duplicateDetection: e.duplicateDetection, - duplicateDB: e.duplicateDB, - dupDetectOpt: e.dupDetectOpt, - keys: keys, - values: values, - ts: e.ts, - memBuf: memBuf, - refCnt: atomic.NewInt64(0), - importedKVSize: e.importedKVSize, - importedKVCount: e.importedKVCount, - }, nil + ret = append(ret, common.DataAndRange{ + Data: e.buildIngestData(keys, values, memBuf), + Range: common.Range{Start: curStart, End: end}, + }) + return ret, nil } func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIter, error) { @@ -230,13 +311,8 @@ func (e *Engine) SplitRanges( return ranges, nil } -// Close releases the resources of the engine. -func (e *Engine) Close() error { - if e.iter == nil { - return nil - } - return errors.Trace(e.iter.Close()) -} +// Close implements common.Engine. +func (e *Engine) Close() error { return nil } // MemoryIngestData is the in-memory implementation of IngestData. type MemoryIngestData struct { diff --git a/br/pkg/lightning/backend/external/split.go b/br/pkg/lightning/backend/external/split.go index f04fc7bdc662d..40d713ed32259 100644 --- a/br/pkg/lightning/backend/external/split.go +++ b/br/pkg/lightning/backend/external/split.go @@ -121,6 +121,11 @@ func (r *RangeSplitter) Close() error { return r.propIter.Close() } +// GetRangeSplitSize returns the expected size of one range. +func (r *RangeSplitter) GetRangeSplitSize() int64 { + return r.rangeSize +} + // SplitOneRangesGroup splits one group of ranges. `endKeyOfGroup` represents the // end key of the group, but it will be nil when the group is the last one. // `dataFiles` and `statFiles` are all the files that have overlapping key ranges diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 56e035df5c6ad..3114956fa2d50 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -964,6 +964,7 @@ func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig externalCfg.MinKey, externalCfg.MaxKey, externalCfg.SplitKeys, + externalCfg.RegionSplitSize, local.keyAdapter, local.DupeDetectEnabled, local.duplicateDB, diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index f2b6530b72610..82b7740cc06f6 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -1746,8 +1746,77 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) { jobWg.Done() } jobWg.Wait() +} + +func TestSplitRangeAgain4BigRegionExternalEngine(t *testing.T) { + backup := external.LargeRegionSplitDataThreshold + external.LargeRegionSplitDataThreshold = 1 + t.Cleanup(func() { + external.LargeRegionSplitDataThreshold = backup + }) - // TODO(lance6716): test external engine + ctx := context.Background() + local := &Backend{ + splitCli: initTestSplitClient( + [][]byte{{1}, {11}}, // we have one big region + panicSplitRegionClient{}, // make sure no further split region + ), + } + local.BackendConfig.WorkerConcurrency = 1 + bigRegionRange := []common.Range{{Start: []byte{1}, End: []byte{11}}} + + keys := make([][]byte, 0, 10) + value := make([][]byte, 0, 10) + for i := byte(1); i <= 10; i++ { + keys = append(keys, []byte{i}) + value = append(value, []byte{i}) + } + memStore := storage.NewMemStorage() + + dataFiles, statFiles, err := external.MockExternalEngine(memStore, keys, value) + require.NoError(t, err) + + extEngine := external.NewExternalEngine( + memStore, + dataFiles, + statFiles, + []byte{1}, + []byte{10}, + [][]byte{{1}, {11}}, + 1<<30, + common.NoopKeyAdapter{}, + false, + nil, + common.DupDetectOpt{}, + 123, + 456, + 789, + ) + + jobCh := make(chan *regionJob, 10) + jobWg := sync.WaitGroup{} + err = local.generateAndSendJob( + ctx, + extEngine, + bigRegionRange, + 10*units.GB, + 1<<30, + jobCh, + &jobWg, + ) + require.NoError(t, err) + require.Len(t, jobCh, 10) + for i := 0; i < 10; i++ { + job := <-jobCh + require.Equal(t, []byte{byte(i + 1)}, job.keyRange.Start) + require.Equal(t, []byte{byte(i + 2)}, job.keyRange.End) + firstKey, lastKey, err := job.ingestData.GetFirstAndLastKey(nil, nil) + require.NoError(t, err) + require.Equal(t, []byte{byte(i + 1)}, firstKey) + require.Equal(t, []byte{byte(i + 1)}, lastKey) + jobWg.Done() + } + jobWg.Wait() } func getSuccessInjectedBehaviour() []injectedBehaviour { diff --git a/disttask/importinto/planner.go b/disttask/importinto/planner.go index 6713070d165ef..b88fbd27a7883 100644 --- a/disttask/importinto/planner.go +++ b/disttask/importinto/planner.go @@ -399,6 +399,7 @@ func generateWriteIngestSpecs(planCtx planner.PlanCtx, p *LogicalPlan) ([]planne TotalKVSize: uint64(config.DefaultBatchSize), }, RangeSplitKeys: rangeSplitKeys, + RangeSplitSize: splitter.GetRangeSplitSize(), } specs = append(specs, &WriteIngestSpec{m}) diff --git a/disttask/importinto/proto.go b/disttask/importinto/proto.go index 547f30158570a..34a87e5879ca7 100644 --- a/disttask/importinto/proto.go +++ b/disttask/importinto/proto.go @@ -109,6 +109,7 @@ type WriteIngestStepMeta struct { KVGroup string `json:"kv-group"` external.SortedKVMeta `json:"sorted-kv-meta"` RangeSplitKeys [][]byte `json:"range-split-keys"` + RangeSplitSize int64 `json:"range-split-size"` Result Result } diff --git a/disttask/importinto/scheduler.go b/disttask/importinto/scheduler.go index ddbdb768eb6b5..a629d66baafe1 100644 --- a/disttask/importinto/scheduler.go +++ b/disttask/importinto/scheduler.go @@ -373,14 +373,15 @@ func (e *writeAndIngestStepExecutor) RunSubtask(ctx context.Context, subtask *pr localBackend := e.tableImporter.Backend() err = localBackend.CloseEngine(ctx, &backend.EngineConfig{ External: &backend.ExternalEngineConfig{ - StorageURI: e.taskMeta.Plan.CloudStorageURI, - DataFiles: sm.DataFiles, - StatFiles: sm.StatFiles, - MinKey: sm.MinKey, - MaxKey: sm.MaxKey, - SplitKeys: sm.RangeSplitKeys, - TotalFileSize: int64(sm.TotalKVSize), - TotalKVCount: 0, + StorageURI: e.taskMeta.Plan.CloudStorageURI, + DataFiles: sm.DataFiles, + StatFiles: sm.StatFiles, + MinKey: sm.MinKey, + MaxKey: sm.MaxKey, + SplitKeys: sm.RangeSplitKeys, + RegionSplitSize: sm.RangeSplitSize, + TotalFileSize: int64(sm.TotalKVSize), + TotalKVCount: 0, }, }, engineUUID) if err != nil { From 5dd447504c23eca7308caf987c01d9e309bbf333 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 10 Oct 2023 13:30:16 +0800 Subject: [PATCH 3/4] fix bazel Signed-off-by: lance6716 --- br/pkg/lightning/backend/external/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 28a4a7d917019..91635f733d3c1 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//br/pkg/lightning/backend/encode", "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/common", + "//br/pkg/lightning/config", "//br/pkg/lightning/log", "//br/pkg/membuf", "//br/pkg/storage", From e2cee99630321fc71f4eb1ff9d9525ad14cd0c9e Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 10 Oct 2023 15:55:47 +0800 Subject: [PATCH 4/4] address comment Signed-off-by: lance6716 --- br/pkg/lightning/backend/external/BUILD.bazel | 2 +- br/pkg/lightning/backend/external/engine.go | 31 +++++++++---- .../lightning/backend/external/engine_test.go | 44 +++++++++++++++++++ 3 files changed, 67 insertions(+), 10 deletions(-) diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 91635f733d3c1..3000bded8943c 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -58,7 +58,7 @@ go_test( ], embed = [":external"], flaky = True, - shard_count = 39, + shard_count = 40, deps = [ "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/common", diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go index 97466ed37f68e..0e00c94887035 100644 --- a/br/pkg/lightning/backend/external/engine.go +++ b/br/pkg/lightning/backend/external/engine.go @@ -98,6 +98,26 @@ func NewExternalEngine( } } +func split[T any](in []T, groupNum int) [][]T { + if len(in) == 0 { + return nil + } + if groupNum <= 0 { + groupNum = 1 + } + ceil := (len(in) + groupNum - 1) / groupNum + ret := make([][]T, 0, groupNum) + l := len(in) + for i := 0; i < l; i += ceil { + if i+ceil > l { + ret = append(ret, in[i:]) + } else { + ret = append(ret, in[i:i+ceil]) + } + } + return ret +} + // LoadIngestData loads the data from the external storage to memory in [start, // end) range, so local backend can ingest it. The used byte slice of ingest data // are allocated from Engine.bufPool and must be released by @@ -110,15 +130,8 @@ func (e *Engine) LoadIngestData( // estimate we will open at most 1000 files, so if e.dataFiles is small we can // try to concurrently process ranges. concurrency := int(MergeSortOverlapThreshold) / len(e.dataFiles) - numPerGroup := len(regionRanges) / concurrency - rangeGroups := make([][]common.Range, 0, concurrency) - - if numPerGroup > 0 { - for i := 0; i < concurrency-1; i++ { - rangeGroups = append(rangeGroups, regionRanges[i*numPerGroup:(i+1)*numPerGroup]) - } - } - rangeGroups = append(rangeGroups, regionRanges[(concurrency-1)*numPerGroup:]) + concurrency = min(concurrency, 8) + rangeGroups := split(regionRanges, concurrency) eg, egCtx := errgroup.WithContext(ctx) for _, ranges := range rangeGroups { diff --git a/br/pkg/lightning/backend/external/engine_test.go b/br/pkg/lightning/backend/external/engine_test.go index ffaad89dd096c..6e5a95fdcf474 100644 --- a/br/pkg/lightning/backend/external/engine_test.go +++ b/br/pkg/lightning/backend/external/engine_test.go @@ -277,3 +277,47 @@ func TestMemoryIngestData(t *testing.T) { testNewIter(t, data, []byte("key6"), []byte("key9"), nil, nil) checkDupDB(t, db, nil, nil) } + +func TestSplit(t *testing.T) { + cases := []struct { + input []int + conc int + expected [][]int + }{ + { + input: []int{1, 2, 3, 4, 5}, + conc: 1, + expected: [][]int{{1, 2, 3, 4, 5}}, + }, + { + input: []int{1, 2, 3, 4, 5}, + conc: 2, + expected: [][]int{{1, 2, 3}, {4, 5}}, + }, + { + input: []int{1, 2, 3, 4, 5}, + conc: 0, + expected: [][]int{{1, 2, 3, 4, 5}}, + }, + { + input: []int{1, 2, 3, 4, 5}, + conc: 5, + expected: [][]int{{1}, {2}, {3}, {4}, {5}}, + }, + { + input: []int{}, + conc: 5, + expected: nil, + }, + { + input: []int{1, 2, 3, 4, 5}, + conc: 100, + expected: [][]int{{1}, {2}, {3}, {4}, {5}}, + }, + } + + for _, c := range cases { + got := split(c.input, c.conc) + require.Equal(t, c.expected, got) + } +}