-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
external engine: support concurrent read and split into smaller data #47510
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,32 +18,34 @@ import ( | |
"bytes" | ||
"context" | ||
"encoding/hex" | ||
"slices" | ||
"sort" | ||
"time" | ||
|
||
"github.com/cockroachdb/pebble" | ||
"github.com/pingcap/errors" | ||
"github.com/pingcap/tidb/br/pkg/lightning/common" | ||
"github.com/pingcap/tidb/br/pkg/lightning/config" | ||
"github.com/pingcap/tidb/br/pkg/lightning/log" | ||
"github.com/pingcap/tidb/br/pkg/membuf" | ||
"github.com/pingcap/tidb/br/pkg/storage" | ||
"github.com/pingcap/tidb/kv" | ||
"github.com/pingcap/tidb/util/logutil" | ||
"go.uber.org/atomic" | ||
"go.uber.org/zap" | ||
"golang.org/x/sync/errgroup" | ||
) | ||
|
||
// Engine stored sorted key/value pairs in an external storage. | ||
type Engine struct { | ||
storage storage.ExternalStorage | ||
dataFiles []string | ||
statsFiles []string | ||
minKey []byte | ||
maxKey []byte | ||
splitKeys [][]byte | ||
bufPool *membuf.Pool | ||
|
||
iter *MergeKVIter | ||
storage storage.ExternalStorage | ||
dataFiles []string | ||
statsFiles []string | ||
minKey []byte | ||
maxKey []byte | ||
splitKeys [][]byte | ||
regionSplitSize int64 | ||
bufPool *membuf.Pool | ||
|
||
keyAdapter common.KeyAdapter | ||
duplicateDetection bool | ||
|
@@ -66,6 +68,7 @@ func NewExternalEngine( | |
minKey []byte, | ||
maxKey []byte, | ||
splitKeys [][]byte, | ||
regionSplitSize int64, | ||
keyAdapter common.KeyAdapter, | ||
duplicateDetection bool, | ||
duplicateDB *pebble.DB, | ||
|
@@ -81,6 +84,7 @@ func NewExternalEngine( | |
minKey: minKey, | ||
maxKey: maxKey, | ||
splitKeys: splitKeys, | ||
regionSplitSize: regionSplitSize, | ||
bufPool: membuf.NewPool(), | ||
keyAdapter: keyAdapter, | ||
duplicateDetection: duplicateDetection, | ||
|
@@ -97,9 +101,84 @@ func NewExternalEngine( | |
// LoadIngestData loads the data from the external storage to memory in [start, | ||
// end) range, so local backend can ingest it. The used byte slice of ingest data | ||
// are allocated from Engine.bufPool and must be released by | ||
// MemoryIngestData.Finish(). For external.Engine, LoadIngestData must be called | ||
// with strictly increasing start / end key. | ||
func (e *Engine) LoadIngestData(ctx context.Context, start, end []byte) (common.IngestData, error) { | ||
// MemoryIngestData.DecRef(). | ||
func (e *Engine) LoadIngestData( | ||
ctx context.Context, | ||
regionRanges []common.Range, | ||
outCh chan<- common.DataAndRange, | ||
) error { | ||
// estimate we will open at most 1000 files, so if e.dataFiles is small we can | ||
// try to concurrently process ranges. | ||
concurrency := int(MergeSortOverlapThreshold) / len(e.dataFiles) | ||
numPerGroup := len(regionRanges) / concurrency | ||
rangeGroups := make([][]common.Range, 0, concurrency) | ||
|
||
if numPerGroup > 0 { | ||
for i := 0; i < concurrency-1; i++ { | ||
rangeGroups = append(rangeGroups, regionRanges[i*numPerGroup:(i+1)*numPerGroup]) | ||
} | ||
} | ||
rangeGroups = append(rangeGroups, regionRanges[(concurrency-1)*numPerGroup:]) | ||
|
||
eg, egCtx := errgroup.WithContext(ctx) | ||
for _, ranges := range rangeGroups { | ||
ranges := ranges | ||
eg.Go(func() error { | ||
iter, err := e.createMergeIter(egCtx, ranges[0].Start) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
defer iter.Close() | ||
|
||
if !iter.Next() { | ||
return iter.Error() | ||
} | ||
for _, r := range ranges { | ||
results, err := e.loadIngestData(egCtx, iter, r.Start, r.End) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
for _, result := range results { | ||
select { | ||
case <-egCtx.Done(): | ||
return egCtx.Err() | ||
case outCh <- result: | ||
} | ||
} | ||
} | ||
return nil | ||
}) | ||
} | ||
return eg.Wait() | ||
} | ||
|
||
func (e *Engine) buildIngestData(keys, values [][]byte, buf *membuf.Buffer) *MemoryIngestData { | ||
return &MemoryIngestData{ | ||
keyAdapter: e.keyAdapter, | ||
duplicateDetection: e.duplicateDetection, | ||
duplicateDB: e.duplicateDB, | ||
dupDetectOpt: e.dupDetectOpt, | ||
keys: keys, | ||
values: values, | ||
ts: e.ts, | ||
memBuf: buf, | ||
refCnt: atomic.NewInt64(0), | ||
importedKVSize: e.importedKVSize, | ||
importedKVCount: e.importedKVCount, | ||
} | ||
} | ||
|
||
// LargeRegionSplitDataThreshold is exposed for test. | ||
var LargeRegionSplitDataThreshold = int(config.SplitRegionSize) | ||
|
||
// loadIngestData loads the data from the external storage to memory in [start, | ||
// end) range, and if the range is large enough, it will return multiple data. | ||
// The input `iter` should be called Next() before calling this function. | ||
func (e *Engine) loadIngestData( | ||
ctx context.Context, | ||
iter *MergeKVIter, | ||
start, end []byte, | ||
) ([]common.DataAndRange, error) { | ||
if bytes.Equal(start, end) { | ||
return nil, errors.Errorf("start key and end key must not be the same: %s", | ||
hex.EncodeToString(start)) | ||
|
@@ -109,54 +188,59 @@ func (e *Engine) LoadIngestData(ctx context.Context, start, end []byte) (common. | |
keys := make([][]byte, 0, 1024) | ||
values := make([][]byte, 0, 1024) | ||
memBuf := e.bufPool.NewBuffer() | ||
|
||
if e.iter == nil { | ||
iter, err := e.createMergeIter(ctx, start) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
e.iter = iter | ||
} else { | ||
// there should be a key that just exceeds the end key in last LoadIngestData | ||
// invocation. | ||
k, v := e.iter.Key(), e.iter.Value() | ||
cnt := 0 | ||
size := 0 | ||
largeRegion := e.regionSplitSize > 2*int64(config.SplitRegionSize) | ||
ret := make([]common.DataAndRange, 0, 1) | ||
curStart := start | ||
|
||
// there should be a key that just exceeds the end key in last loadIngestData | ||
// invocation. | ||
k, v := iter.Key(), iter.Value() | ||
if len(k) > 0 { | ||
keys = append(keys, memBuf.AddBytes(k)) | ||
values = append(values, memBuf.AddBytes(v)) | ||
cnt++ | ||
size += len(k) + len(v) | ||
} | ||
|
||
cnt := 0 | ||
for e.iter.Next() { | ||
cnt++ | ||
k, v := e.iter.Key(), e.iter.Value() | ||
for iter.Next() { | ||
k, v = iter.Key(), iter.Value() | ||
if bytes.Compare(k, start) < 0 { | ||
continue | ||
} | ||
if bytes.Compare(k, end) >= 0 { | ||
break | ||
} | ||
if largeRegion && size > LargeRegionSplitDataThreshold { | ||
curKey := slices.Clone(k) | ||
ret = append(ret, common.DataAndRange{ | ||
Data: e.buildIngestData(keys, values, memBuf), | ||
Range: common.Range{Start: curStart, End: curKey}, | ||
}) | ||
keys = make([][]byte, 0, 1024) | ||
values = make([][]byte, 0, 1024) | ||
size = 0 | ||
curStart = curKey | ||
} | ||
|
||
keys = append(keys, memBuf.AddBytes(k)) | ||
values = append(values, memBuf.AddBytes(v)) | ||
cnt++ | ||
size += len(k) + len(v) | ||
} | ||
if e.iter.Error() != nil { | ||
return nil, errors.Trace(e.iter.Error()) | ||
if iter.Error() != nil { | ||
return nil, errors.Trace(iter.Error()) | ||
} | ||
|
||
logutil.Logger(ctx).Info("load data from external storage", | ||
zap.Duration("cost time", time.Since(now)), | ||
zap.Int("iterated count", cnt)) | ||
return &MemoryIngestData{ | ||
keyAdapter: e.keyAdapter, | ||
duplicateDetection: e.duplicateDetection, | ||
duplicateDB: e.duplicateDB, | ||
dupDetectOpt: e.dupDetectOpt, | ||
keys: keys, | ||
values: values, | ||
ts: e.ts, | ||
memBuf: memBuf, | ||
refCnt: atomic.NewInt64(0), | ||
importedKVSize: e.importedKVSize, | ||
importedKVCount: e.importedKVCount, | ||
}, nil | ||
ret = append(ret, common.DataAndRange{ | ||
Data: e.buildIngestData(keys, values, memBuf), | ||
Range: common.Range{Start: curStart, End: end}, | ||
}) | ||
return ret, nil | ||
} | ||
|
||
func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIter, error) { | ||
|
@@ -227,13 +311,8 @@ func (e *Engine) SplitRanges( | |
return ranges, nil | ||
} | ||
|
||
// Close releases the resources of the engine. | ||
func (e *Engine) Close() error { | ||
if e.iter == nil { | ||
return nil | ||
} | ||
return errors.Trace(e.iter.Close()) | ||
} | ||
// Close implements common.Engine. | ||
func (e *Engine) Close() error { return nil } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doube confirm, external engine close do not need release any resource, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the rest lgtm. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, local engine need to close its pebble DB. And before this PR external engine holds an iterator member that needs to be closed, but in this PR the iterator becomes a local variable that closed in line 144. |
||
|
||
// MemoryIngestData is the in-memory implementation of IngestData. | ||
type MemoryIngestData struct { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set a lower/upper limit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e2cee99