Skip to content

Commit

Permalink
feat: sample files and pre-alloc id
Browse files Browse the repository at this point in the history
  • Loading branch information
buchuitoudegou committed May 5, 2022
1 parent 6b3b73b commit 411ba4e
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 24 deletions.
26 changes: 26 additions & 0 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,3 +542,29 @@ func (kvs *KvPairs) Clear() Rows {
kvs.pairs = kvs.pairs[:0]
return kvs
}

func (kvs *KvPairs) GetRowIDs() []int64 {
res := make([]int64, len(kvs.pairs))
for _, kv := range kvs.pairs {
res = append(res, kv.RowID)
}
return res
}

func (kvs *KvPairs) Slice(idx int) *KvPairs {
if idx >= len(kvs.pairs) {
return nil
}
res := &KvPairs{
pairs: kvs.pairs[idx:],
}
kvs.pairs = kvs.pairs[:idx]
return res
}

func (kvs *KvPairs) SetRowID(idx int, newRowID int64) {
if idx >= len(kvs.pairs) {
return
}
kvs.pairs[idx].RowID = newRowID
}
56 changes: 51 additions & 5 deletions br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/worker"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"

"go.uber.org/zap"
)

Expand Down Expand Up @@ -266,18 +267,22 @@ func makeSourceFileRegion(
}

dataFileSize := fi.FileMeta.FileSize
divisor := int64(columns)
// divisor := int64(columns)
isCsvFile := fi.FileMeta.Type == SourceTypeCSV
if !isCsvFile {
divisor += 2
// if !isCsvFile {
// divisor += 2
// }
sizePerRow, err := sampleAndGetAvgRowSize(&fi, cfg, ioWorkers, store)
if err != nil {
return nil, nil, err
}
// If a csv file is overlarge, we need to split it into multiple regions.
// Note: We can only split a csv file whose format is strict.
// We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools
// like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can
// avoid split a lot of small chunks.
if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) {
_, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store)
_, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, sizePerRow, 0, ioWorkers, store)
return regions, subFileSizes, err
}

Expand All @@ -289,7 +294,7 @@ func makeSourceFileRegion(
Offset: 0,
EndOffset: fi.FileMeta.FileSize,
PrevRowIDMax: 0,
RowIDMax: fi.FileMeta.FileSize / divisor,
RowIDMax: fi.FileMeta.FileSize / sizePerRow,
},
}

Expand All @@ -302,6 +307,47 @@ func makeSourceFileRegion(
return []*TableRegion{tableRegion}, []float64{float64(fi.FileMeta.FileSize)}, nil
}

func sampleAndGetAvgRowSize(
fileInfo *FileInfo,
cfg *config.Config,
ioWorkers *worker.Pool,
store storage.ExternalStorage,
) (int64, error) {
reader, err := store.Open(context.Background(), fileInfo.FileMeta.Path)
if err != nil {
return 0, nil
}
var parser Parser
switch fileInfo.FileMeta.Type {
case SourceTypeCSV:
hasHeader := cfg.Mydumper.CSV.Header
charsetConvertor, err := NewCharsetConvertor(cfg.Mydumper.DataCharacterSet, cfg.Mydumper.DataInvalidCharReplace)
if err != nil {
return 0, err
}
parser, err = NewCSVParser(&cfg.Mydumper.CSV, reader, int64(cfg.Mydumper.ReadBlockSize), ioWorkers, hasHeader, charsetConvertor)
if err != nil {
return 0, err
}
case SourceTypeSQL:
parser = NewChunkParser(cfg.TiDB.SQLMode, reader, int64(cfg.Mydumper.ReadBlockSize), ioWorkers)
}
totalBytes := 0
totalRows := 0
defaultSampleRows := 10 // todo: may be configurable
for i := 0; i < defaultSampleRows; i++ {
err = parser.ReadRow()
if err != nil && errors.Cause(err) == io.EOF {
break
} else if err != nil {
return 0, err
}
totalBytes += parser.LastRow().Length
totalRows++
}
return int64(totalBytes) / int64(totalRows), nil
}

// because parquet files can't seek efficiently, there is no benefit in split.
// parquet file are column orient, so the offset is read line number
func makeParquetFileRegion(
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/restore/chunk_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *chunkRestoreSuite) TestDeliverLoopCancel() {
ctx, cancel := context.WithCancel(context.Background())
kvsCh := make(chan []deliveredKVs)
go cancel()
_, err := s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, nil, nil, rc)
_, err := s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, nil, nil, rc, nil, nil, nil)
require.Equal(s.T(), context.Canceled, errors.Cause(err))
}

Expand Down Expand Up @@ -124,7 +124,7 @@ func (s *chunkRestoreSuite) TestDeliverLoopEmptyData() {

kvsCh := make(chan []deliveredKVs, 1)
kvsCh <- []deliveredKVs{}
_, err = s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, dataWriter, indexWriter, rc)
_, err = s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, dataWriter, indexWriter, rc, dataEngine, indexEngine, nil)
require.NoError(s.T(), err)
}

Expand Down Expand Up @@ -218,7 +218,7 @@ func (s *chunkRestoreSuite) TestDeliverLoop() {
cfg := &config.Config{}
rc := &Controller{cfg: cfg, saveCpCh: saveCpCh, backend: importer}

_, err = s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, dataWriter, indexWriter, rc)
_, err = s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, dataWriter, indexWriter, rc, dataEngine, indexEngine, &backend.LocalWriterConfig{})
require.NoError(s.T(), err)
require.Len(s.T(), saveCpCh, 2)
require.Equal(s.T(), int64(12), s.cr.chunk.Chunk.Offset)
Expand Down Expand Up @@ -561,7 +561,7 @@ func (s *chunkRestoreSuite) TestRestore() {
saveCpCh: saveCpCh,
backend: importer,
pauser: DeliverPauser,
})
}, dataEngine, indexEngine, &backend.LocalWriterConfig{})
require.NoError(s.T(), err)
require.Len(s.T(), saveCpCh, 2)
}
90 changes: 76 additions & 14 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2028,9 +2028,13 @@ func (rc *Controller) DataCheck(ctx context.Context) error {
}

type chunkRestore struct {
parser mydump.Parser
index int
chunk *checkpoints.ChunkCheckpoint
parser mydump.Parser
index int
chunk *checkpoints.ChunkCheckpoint
allocMaxRowID int64 // upper bound
allocRowIDBase int64 // lower bound

originalMaxRowID int64
}

func newChunkRestore(
Expand Down Expand Up @@ -2087,9 +2091,12 @@ func newChunkRestore(
}

return &chunkRestore{
parser: parser,
index: index,
chunk: chunk,
parser: parser,
index: index,
chunk: chunk,
allocMaxRowID: chunk.Chunk.RowIDMax,
allocRowIDBase: chunk.Chunk.PrevRowIDMax,
originalMaxRowID: chunk.Chunk.RowIDMax,
}, nil
}

Expand Down Expand Up @@ -2142,14 +2149,54 @@ type deliverResult struct {
err error
}

func (cr *chunkRestore) adjustRowID(rawData *deliveredKVs, t *TableRestore, pendingDataKVs, pendingIndexKVs *kv.Rows) bool {
data := rawData.kvs.(*kv.KvPairs)
res := false
curRowIDs := data.GetRowIDs()
illegalIdx := -1
for i := range curRowIDs {
if curRowIDs[i] > cr.originalMaxRowID {
illegalIdx = i
break
}
}
if illegalIdx >= 0 {
if illegalIdx > 0 {
rawData.rowID = curRowIDs[illegalIdx-1]
}
invalidLen := len(curRowIDs) - illegalIdx
invalidKVs := data.Slice(illegalIdx)
if curRowIDs[illegalIdx] > cr.allocMaxRowID {
// re-allocate rowIDs
cr.allocRowIDBase, cr.allocMaxRowID = t.allocateRowIDs()
res = true
}
// set new rowIDs
for i := 0; i < invalidLen; i++ {
invalidKVs.SetRowID(i, cr.allocRowIDBase)
cr.allocRowIDBase++
if cr.allocRowIDBase > cr.allocMaxRowID {
// allocated ids run out
cr.allocRowIDBase, cr.allocMaxRowID = t.allocateRowIDs()
res = true
}
}
var dataChecksum, indexChecksum verify.KVChecksum
invalidKVs.ClassifyAndAppend(pendingDataKVs, &dataChecksum, pendingIndexKVs, &indexChecksum)
}
return res
}

//nolint:nakedret // TODO: refactor
func (cr *chunkRestore) deliverLoop(
ctx context.Context,
kvsCh <-chan []deliveredKVs,
t *TableRestore,
engineID int32,
dataEngine, indexEngine *backend.LocalEngineWriter,
dataWriter, indexWriter *backend.LocalEngineWriter,
rc *Controller,
dataEngine, indexEngine *backend.OpenedEngine,
writerCfg *backend.LocalWriterConfig,
) (deliverTotalDur time.Duration, err error) {
var channelClosed bool

Expand All @@ -2162,6 +2209,8 @@ func (cr *chunkRestore) deliverLoop(
// Fetch enough KV pairs from the source.
dataKVs := rc.backend.MakeEmptyRows()
indexKVs := rc.backend.MakeEmptyRows()
pendingDataKVs := rc.backend.MakeEmptyRows()
pendingIndexKVs := rc.backend.MakeEmptyRows()

dataSynced := true
for !channelClosed {
Expand All @@ -2173,7 +2222,7 @@ func (cr *chunkRestore) deliverLoop(
startOffset := cr.chunk.Chunk.Offset
currOffset := startOffset
rowID := cr.chunk.Chunk.PrevRowIDMax

reAlloc := false
populate:
for dataChecksum.SumSize()+indexChecksum.SumSize() < minDeliverBytes {
select {
Expand All @@ -2183,7 +2232,11 @@ func (cr *chunkRestore) deliverLoop(
break populate
}
for _, p := range kvPacket {
reAlloc = cr.adjustRowID(&p, t, &pendingDataKVs, &pendingIndexKVs)
p.kvs.ClassifyAndAppend(&dataKVs, &dataChecksum, &indexKVs, &indexChecksum)
if !reAlloc {
// combine pendingDataKVs and dataKVs
}
columns = p.columns
currOffset = p.offset
rowID = p.rowID
Expand All @@ -2202,7 +2255,7 @@ func (cr *chunkRestore) deliverLoop(
for !rc.diskQuotaLock.TryRLock() {
// try to update chunk checkpoint, this can help save checkpoint after importing when disk-quota is triggered
if !dataSynced {
dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataEngine, indexEngine)
dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataWriter, indexWriter)
}
time.Sleep(time.Millisecond)
}
Expand All @@ -2211,14 +2264,14 @@ func (cr *chunkRestore) deliverLoop(
// Write KVs into the engine
start := time.Now()

if err = dataEngine.WriteRows(ctx, columns, dataKVs); err != nil {
if err = dataWriter.WriteRows(ctx, columns, dataKVs); err != nil {
if !common.IsContextCanceledError(err) {
deliverLogger.Error("write to data engine failed", log.ShortError(err))
}

return errors.Trace(err)
}
if err = indexEngine.WriteRows(ctx, columns, indexKVs); err != nil {
if err = indexWriter.WriteRows(ctx, columns, indexKVs); err != nil {
if !common.IsContextCanceledError(err) {
deliverLogger.Error("write to index engine failed", log.ShortError(err))
}
Expand Down Expand Up @@ -2255,7 +2308,14 @@ func (cr *chunkRestore) deliverLoop(

if dataChecksum.SumKVS() != 0 || indexChecksum.SumKVS() != 0 {
// No need to save checkpoint if nothing was delivered.
dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataEngine, indexEngine)
dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataWriter, indexWriter)
}
if reAlloc {
dataWriter, err = dataEngine.LocalWriter(ctx, writerCfg)
if err != nil {
return
}
indexWriter, err = indexEngine.LocalWriter(ctx, &backend.LocalWriterConfig{})
}
failpoint.Inject("SlowDownWriteRows", func() {
deliverLogger.Warn("Slowed down write rows")
Expand Down Expand Up @@ -2481,8 +2541,10 @@ func (cr *chunkRestore) restore(
ctx context.Context,
t *TableRestore,
engineID int32,
dataEngine, indexEngine *backend.LocalEngineWriter,
dataWriter, indexWriter *backend.LocalEngineWriter,
rc *Controller,
dataEngine, indexEngine *backend.OpenedEngine,
writerCfg *backend.LocalWriterConfig,
) error {
// Create the encoder.
kvEncoder, err := rc.backend.NewEncoder(t.encTable, &kv.SessionOptions{
Expand All @@ -2507,7 +2569,7 @@ func (cr *chunkRestore) restore(

go func() {
defer close(deliverCompleteCh)
dur, err := cr.deliverLoop(ctx, kvsCh, t, engineID, dataEngine, indexEngine, rc)
dur, err := cr.deliverLoop(ctx, kvsCh, t, engineID, dataWriter, indexWriter, rc, dataEngine, indexEngine, writerCfg)
select {
case <-ctx.Done():
case deliverCompleteCh <- deliverResult{dur, err}:
Expand Down
Loading

0 comments on commit 411ba4e

Please sign in to comment.