From 411ba4e15a19a2c4104ee57bacf60fcbde5dad77 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Thu, 5 May 2022 17:37:25 +0800 Subject: [PATCH 01/21] feat: sample files and pre-alloc id --- br/pkg/lightning/backend/kv/sql2kv.go | 26 ++++++ br/pkg/lightning/mydump/region.go | 56 ++++++++++-- .../lightning/restore/chunk_restore_test.go | 8 +- br/pkg/lightning/restore/restore.go | 90 ++++++++++++++++--- br/pkg/lightning/restore/table_restore.go | 23 ++++- 5 files changed, 179 insertions(+), 24 deletions(-) diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index 62eba924575d9..36dfdf38d6dfe 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -542,3 +542,29 @@ func (kvs *KvPairs) Clear() Rows { kvs.pairs = kvs.pairs[:0] return kvs } + +func (kvs *KvPairs) GetRowIDs() []int64 { + res := make([]int64, len(kvs.pairs)) + for _, kv := range kvs.pairs { + res = append(res, kv.RowID) + } + return res +} + +func (kvs *KvPairs) Slice(idx int) *KvPairs { + if idx >= len(kvs.pairs) { + return nil + } + res := &KvPairs{ + pairs: kvs.pairs[idx:], + } + kvs.pairs = kvs.pairs[:idx] + return res +} + +func (kvs *KvPairs) SetRowID(idx int, newRowID int64) { + if idx >= len(kvs.pairs) { + return + } + kvs.pairs[idx].RowID = newRowID +} diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index 73456d461bb0b..5b44cf8753f7d 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/worker" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/utils" + "go.uber.org/zap" ) @@ -266,10 +267,14 @@ func makeSourceFileRegion( } dataFileSize := fi.FileMeta.FileSize - divisor := int64(columns) + // divisor := int64(columns) isCsvFile := fi.FileMeta.Type == SourceTypeCSV - if !isCsvFile { - divisor += 2 + // if !isCsvFile { + // divisor += 2 + // } + sizePerRow, err := sampleAndGetAvgRowSize(&fi, cfg, ioWorkers, store) + if err != nil { + return nil, nil, err } // If a csv file is overlarge, we need to split it into multiple regions. // Note: We can only split a csv file whose format is strict. @@ -277,7 +282,7 @@ func makeSourceFileRegion( // like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can // avoid split a lot of small chunks. if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) { - _, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store) + _, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, sizePerRow, 0, ioWorkers, store) return regions, subFileSizes, err } @@ -289,7 +294,7 @@ func makeSourceFileRegion( Offset: 0, EndOffset: fi.FileMeta.FileSize, PrevRowIDMax: 0, - RowIDMax: fi.FileMeta.FileSize / divisor, + RowIDMax: fi.FileMeta.FileSize / sizePerRow, }, } @@ -302,6 +307,47 @@ func makeSourceFileRegion( return []*TableRegion{tableRegion}, []float64{float64(fi.FileMeta.FileSize)}, nil } +func sampleAndGetAvgRowSize( + fileInfo *FileInfo, + cfg *config.Config, + ioWorkers *worker.Pool, + store storage.ExternalStorage, +) (int64, error) { + reader, err := store.Open(context.Background(), fileInfo.FileMeta.Path) + if err != nil { + return 0, nil + } + var parser Parser + switch fileInfo.FileMeta.Type { + case SourceTypeCSV: + hasHeader := cfg.Mydumper.CSV.Header + charsetConvertor, err := NewCharsetConvertor(cfg.Mydumper.DataCharacterSet, cfg.Mydumper.DataInvalidCharReplace) + if err != nil { + return 0, err + } + parser, err = NewCSVParser(&cfg.Mydumper.CSV, reader, int64(cfg.Mydumper.ReadBlockSize), ioWorkers, hasHeader, charsetConvertor) + if err != nil { + return 0, err + } + case SourceTypeSQL: + parser = NewChunkParser(cfg.TiDB.SQLMode, reader, int64(cfg.Mydumper.ReadBlockSize), ioWorkers) + } + totalBytes := 0 + totalRows := 0 + defaultSampleRows := 10 // todo: may be configurable + for i := 0; i < defaultSampleRows; i++ { + err = parser.ReadRow() + if err != nil && errors.Cause(err) == io.EOF { + break + } else if err != nil { + return 0, err + } + totalBytes += parser.LastRow().Length + totalRows++ + } + return int64(totalBytes) / int64(totalRows), nil +} + // because parquet files can't seek efficiently, there is no benefit in split. // parquet file are column orient, so the offset is read line number func makeParquetFileRegion( diff --git a/br/pkg/lightning/restore/chunk_restore_test.go b/br/pkg/lightning/restore/chunk_restore_test.go index e32e7b57a15e0..52fe5f47d9f00 100644 --- a/br/pkg/lightning/restore/chunk_restore_test.go +++ b/br/pkg/lightning/restore/chunk_restore_test.go @@ -86,7 +86,7 @@ func (s *chunkRestoreSuite) TestDeliverLoopCancel() { ctx, cancel := context.WithCancel(context.Background()) kvsCh := make(chan []deliveredKVs) go cancel() - _, err := s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, nil, nil, rc) + _, err := s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, nil, nil, rc, nil, nil, nil) require.Equal(s.T(), context.Canceled, errors.Cause(err)) } @@ -124,7 +124,7 @@ func (s *chunkRestoreSuite) TestDeliverLoopEmptyData() { kvsCh := make(chan []deliveredKVs, 1) kvsCh <- []deliveredKVs{} - _, err = s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, dataWriter, indexWriter, rc) + _, err = s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, dataWriter, indexWriter, rc, dataEngine, indexEngine, nil) require.NoError(s.T(), err) } @@ -218,7 +218,7 @@ func (s *chunkRestoreSuite) TestDeliverLoop() { cfg := &config.Config{} rc := &Controller{cfg: cfg, saveCpCh: saveCpCh, backend: importer} - _, err = s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, dataWriter, indexWriter, rc) + _, err = s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, dataWriter, indexWriter, rc, dataEngine, indexEngine, &backend.LocalWriterConfig{}) require.NoError(s.T(), err) require.Len(s.T(), saveCpCh, 2) require.Equal(s.T(), int64(12), s.cr.chunk.Chunk.Offset) @@ -561,7 +561,7 @@ func (s *chunkRestoreSuite) TestRestore() { saveCpCh: saveCpCh, backend: importer, pauser: DeliverPauser, - }) + }, dataEngine, indexEngine, &backend.LocalWriterConfig{}) require.NoError(s.T(), err) require.Len(s.T(), saveCpCh, 2) } diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 5ac2d791131f8..427405161c6fa 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2028,9 +2028,13 @@ func (rc *Controller) DataCheck(ctx context.Context) error { } type chunkRestore struct { - parser mydump.Parser - index int - chunk *checkpoints.ChunkCheckpoint + parser mydump.Parser + index int + chunk *checkpoints.ChunkCheckpoint + allocMaxRowID int64 // upper bound + allocRowIDBase int64 // lower bound + + originalMaxRowID int64 } func newChunkRestore( @@ -2087,9 +2091,12 @@ func newChunkRestore( } return &chunkRestore{ - parser: parser, - index: index, - chunk: chunk, + parser: parser, + index: index, + chunk: chunk, + allocMaxRowID: chunk.Chunk.RowIDMax, + allocRowIDBase: chunk.Chunk.PrevRowIDMax, + originalMaxRowID: chunk.Chunk.RowIDMax, }, nil } @@ -2142,14 +2149,54 @@ type deliverResult struct { err error } +func (cr *chunkRestore) adjustRowID(rawData *deliveredKVs, t *TableRestore, pendingDataKVs, pendingIndexKVs *kv.Rows) bool { + data := rawData.kvs.(*kv.KvPairs) + res := false + curRowIDs := data.GetRowIDs() + illegalIdx := -1 + for i := range curRowIDs { + if curRowIDs[i] > cr.originalMaxRowID { + illegalIdx = i + break + } + } + if illegalIdx >= 0 { + if illegalIdx > 0 { + rawData.rowID = curRowIDs[illegalIdx-1] + } + invalidLen := len(curRowIDs) - illegalIdx + invalidKVs := data.Slice(illegalIdx) + if curRowIDs[illegalIdx] > cr.allocMaxRowID { + // re-allocate rowIDs + cr.allocRowIDBase, cr.allocMaxRowID = t.allocateRowIDs() + res = true + } + // set new rowIDs + for i := 0; i < invalidLen; i++ { + invalidKVs.SetRowID(i, cr.allocRowIDBase) + cr.allocRowIDBase++ + if cr.allocRowIDBase > cr.allocMaxRowID { + // allocated ids run out + cr.allocRowIDBase, cr.allocMaxRowID = t.allocateRowIDs() + res = true + } + } + var dataChecksum, indexChecksum verify.KVChecksum + invalidKVs.ClassifyAndAppend(pendingDataKVs, &dataChecksum, pendingIndexKVs, &indexChecksum) + } + return res +} + //nolint:nakedret // TODO: refactor func (cr *chunkRestore) deliverLoop( ctx context.Context, kvsCh <-chan []deliveredKVs, t *TableRestore, engineID int32, - dataEngine, indexEngine *backend.LocalEngineWriter, + dataWriter, indexWriter *backend.LocalEngineWriter, rc *Controller, + dataEngine, indexEngine *backend.OpenedEngine, + writerCfg *backend.LocalWriterConfig, ) (deliverTotalDur time.Duration, err error) { var channelClosed bool @@ -2162,6 +2209,8 @@ func (cr *chunkRestore) deliverLoop( // Fetch enough KV pairs from the source. dataKVs := rc.backend.MakeEmptyRows() indexKVs := rc.backend.MakeEmptyRows() + pendingDataKVs := rc.backend.MakeEmptyRows() + pendingIndexKVs := rc.backend.MakeEmptyRows() dataSynced := true for !channelClosed { @@ -2173,7 +2222,7 @@ func (cr *chunkRestore) deliverLoop( startOffset := cr.chunk.Chunk.Offset currOffset := startOffset rowID := cr.chunk.Chunk.PrevRowIDMax - + reAlloc := false populate: for dataChecksum.SumSize()+indexChecksum.SumSize() < minDeliverBytes { select { @@ -2183,7 +2232,11 @@ func (cr *chunkRestore) deliverLoop( break populate } for _, p := range kvPacket { + reAlloc = cr.adjustRowID(&p, t, &pendingDataKVs, &pendingIndexKVs) p.kvs.ClassifyAndAppend(&dataKVs, &dataChecksum, &indexKVs, &indexChecksum) + if !reAlloc { + // combine pendingDataKVs and dataKVs + } columns = p.columns currOffset = p.offset rowID = p.rowID @@ -2202,7 +2255,7 @@ func (cr *chunkRestore) deliverLoop( for !rc.diskQuotaLock.TryRLock() { // try to update chunk checkpoint, this can help save checkpoint after importing when disk-quota is triggered if !dataSynced { - dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataEngine, indexEngine) + dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataWriter, indexWriter) } time.Sleep(time.Millisecond) } @@ -2211,14 +2264,14 @@ func (cr *chunkRestore) deliverLoop( // Write KVs into the engine start := time.Now() - if err = dataEngine.WriteRows(ctx, columns, dataKVs); err != nil { + if err = dataWriter.WriteRows(ctx, columns, dataKVs); err != nil { if !common.IsContextCanceledError(err) { deliverLogger.Error("write to data engine failed", log.ShortError(err)) } return errors.Trace(err) } - if err = indexEngine.WriteRows(ctx, columns, indexKVs); err != nil { + if err = indexWriter.WriteRows(ctx, columns, indexKVs); err != nil { if !common.IsContextCanceledError(err) { deliverLogger.Error("write to index engine failed", log.ShortError(err)) } @@ -2255,7 +2308,14 @@ func (cr *chunkRestore) deliverLoop( if dataChecksum.SumKVS() != 0 || indexChecksum.SumKVS() != 0 { // No need to save checkpoint if nothing was delivered. - dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataEngine, indexEngine) + dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataWriter, indexWriter) + } + if reAlloc { + dataWriter, err = dataEngine.LocalWriter(ctx, writerCfg) + if err != nil { + return + } + indexWriter, err = indexEngine.LocalWriter(ctx, &backend.LocalWriterConfig{}) } failpoint.Inject("SlowDownWriteRows", func() { deliverLogger.Warn("Slowed down write rows") @@ -2481,8 +2541,10 @@ func (cr *chunkRestore) restore( ctx context.Context, t *TableRestore, engineID int32, - dataEngine, indexEngine *backend.LocalEngineWriter, + dataWriter, indexWriter *backend.LocalEngineWriter, rc *Controller, + dataEngine, indexEngine *backend.OpenedEngine, + writerCfg *backend.LocalWriterConfig, ) error { // Create the encoder. kvEncoder, err := rc.backend.NewEncoder(t.encTable, &kv.SessionOptions{ @@ -2507,7 +2569,7 @@ func (cr *chunkRestore) restore( go func() { defer close(deliverCompleteCh) - dur, err := cr.deliverLoop(ctx, kvsCh, t, engineID, dataEngine, indexEngine, rc) + dur, err := cr.deliverLoop(ctx, kvsCh, t, engineID, dataWriter, indexWriter, rc, dataEngine, indexEngine, writerCfg) select { case <-ctx.Done(): case deliverCompleteCh <- deliverResult{dur, err}: diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 27410428ede8a..e2f52a96e1d26 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/backend" @@ -44,6 +45,15 @@ import ( "go.uber.org/zap" ) +const ( + defaultRowPerFile = 10 + fileCntThrd = 10 + fileByteThrd = 10 * units.KiB + // we are expecting the estimates are closed to the reality, + // hence, TableRestore re-allocates 100000 new RowIDs for extreme cases. + defaultNewAllocIDCnt = 100000 +) + type TableRestore struct { // The unique table name in the form "`db`.`tbl`". tableName string @@ -55,6 +65,8 @@ type TableRestore struct { logger log.Logger ignoreColumns map[string]struct{} + + curMaxRowID int64 } func NewTableRestore( @@ -97,6 +109,9 @@ func (tr *TableRestore) populateChunks(ctx context.Context, rc *Controller, cp * timestamp = int64(v.(int)) }) for _, chunk := range chunks { + if chunk.Chunk.RowIDMax > tr.curMaxRowID { + tr.curMaxRowID = chunk.Chunk.RowIDMax + } engine, found := cp.Engines[chunk.EngineID] if !found { engine = &checkpoints.EngineCheckpoint{ @@ -528,7 +543,7 @@ func (tr *TableRestore) restoreEngine( rc.regionWorkers.Recycle(w) }() metric.ChunkCounter.WithLabelValues(metric.ChunkStateRunning).Add(remainChunkCnt) - err := cr.restore(ctx, tr, engineID, dataWriter, indexWriter, rc) + err := cr.restore(ctx, tr, engineID, dataWriter, indexWriter, rc, dataEngine, indexEngine, dataWriterCfg) var dataFlushStatus, indexFlushStaus backend.ChunkFlushStatus if err == nil { dataFlushStatus, err = dataWriter.Close(ctx) @@ -967,6 +982,12 @@ func (tr *TableRestore) analyzeTable(ctx context.Context, g glue.SQLExecutor) er return err } +func (tr *TableRestore) allocateRowIDs() (int64, int64) { + newBase := tr.curMaxRowID + 1 + tr.curMaxRowID += defaultNewAllocIDCnt + return newBase, tr.curMaxRowID +} + // estimate SST files compression threshold by total row file size // with a higher compression threshold, the compression time increases, but the iteration time decreases. // Try to limit the total SST files number under 500. But size compress 32GB SST files cost about 20min, From d2b4ac7044f80b5e729e781143bbaf6cd3f7f23d Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Fri, 6 May 2022 15:42:35 +0800 Subject: [PATCH 02/21] fix: region_test --- br/pkg/lightning/mydump/region.go | 20 +++++++++++--------- br/pkg/lightning/mydump/region_test.go | 2 ++ 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index 5b44cf8753f7d..baa9c2e18456b 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -267,14 +267,14 @@ func makeSourceFileRegion( } dataFileSize := fi.FileMeta.FileSize - // divisor := int64(columns) + divisor := int64(columns) isCsvFile := fi.FileMeta.Type == SourceTypeCSV - // if !isCsvFile { - // divisor += 2 - // } + if !isCsvFile { + divisor += 2 + } sizePerRow, err := sampleAndGetAvgRowSize(&fi, cfg, ioWorkers, store) - if err != nil { - return nil, nil, err + if err == nil { + divisor = sizePerRow } // If a csv file is overlarge, we need to split it into multiple regions. // Note: We can only split a csv file whose format is strict. @@ -282,7 +282,7 @@ func makeSourceFileRegion( // like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can // avoid split a lot of small chunks. if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) { - _, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, sizePerRow, 0, ioWorkers, store) + _, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store) return regions, subFileSizes, err } @@ -294,7 +294,7 @@ func makeSourceFileRegion( Offset: 0, EndOffset: fi.FileMeta.FileSize, PrevRowIDMax: 0, - RowIDMax: fi.FileMeta.FileSize / sizePerRow, + RowIDMax: fi.FileMeta.FileSize / divisor, }, } @@ -313,7 +313,9 @@ func sampleAndGetAvgRowSize( ioWorkers *worker.Pool, store storage.ExternalStorage, ) (int64, error) { - reader, err := store.Open(context.Background(), fileInfo.FileMeta.Path) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + reader, err := store.Open(ctx, fileInfo.FileMeta.Path) if err != nil { return 0, nil } diff --git a/br/pkg/lightning/mydump/region_test.go b/br/pkg/lightning/mydump/region_test.go index a1dbb9f290a69..89d0799a96ef0 100644 --- a/br/pkg/lightning/mydump/region_test.go +++ b/br/pkg/lightning/mydump/region_test.go @@ -40,6 +40,8 @@ import ( */ func TestTableRegion(t *testing.T) { cfg := newConfigWithSourceDir("./examples") + // specify ReadBlockSize because we need to sample files + cfg.Mydumper.ReadBlockSize = config.ReadBlockSize loader, _ := NewMyDumpLoader(context.Background(), cfg) dbMeta := loader.GetDatabases()[0] From 6c8574e468de1dd9a55327b820eae7a0bf9ac41a Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Fri, 6 May 2022 15:47:09 +0800 Subject: [PATCH 03/21] revert: dyn alloc row-id --- .../lightning/restore/chunk_restore_test.go | 8 +-- br/pkg/lightning/restore/restore.go | 58 +------------------ br/pkg/lightning/restore/table_restore.go | 2 +- 3 files changed, 6 insertions(+), 62 deletions(-) diff --git a/br/pkg/lightning/restore/chunk_restore_test.go b/br/pkg/lightning/restore/chunk_restore_test.go index 52fe5f47d9f00..e32e7b57a15e0 100644 --- a/br/pkg/lightning/restore/chunk_restore_test.go +++ b/br/pkg/lightning/restore/chunk_restore_test.go @@ -86,7 +86,7 @@ func (s *chunkRestoreSuite) TestDeliverLoopCancel() { ctx, cancel := context.WithCancel(context.Background()) kvsCh := make(chan []deliveredKVs) go cancel() - _, err := s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, nil, nil, rc, nil, nil, nil) + _, err := s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, nil, nil, rc) require.Equal(s.T(), context.Canceled, errors.Cause(err)) } @@ -124,7 +124,7 @@ func (s *chunkRestoreSuite) TestDeliverLoopEmptyData() { kvsCh := make(chan []deliveredKVs, 1) kvsCh <- []deliveredKVs{} - _, err = s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, dataWriter, indexWriter, rc, dataEngine, indexEngine, nil) + _, err = s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, dataWriter, indexWriter, rc) require.NoError(s.T(), err) } @@ -218,7 +218,7 @@ func (s *chunkRestoreSuite) TestDeliverLoop() { cfg := &config.Config{} rc := &Controller{cfg: cfg, saveCpCh: saveCpCh, backend: importer} - _, err = s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, dataWriter, indexWriter, rc, dataEngine, indexEngine, &backend.LocalWriterConfig{}) + _, err = s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, dataWriter, indexWriter, rc) require.NoError(s.T(), err) require.Len(s.T(), saveCpCh, 2) require.Equal(s.T(), int64(12), s.cr.chunk.Chunk.Offset) @@ -561,7 +561,7 @@ func (s *chunkRestoreSuite) TestRestore() { saveCpCh: saveCpCh, backend: importer, pauser: DeliverPauser, - }, dataEngine, indexEngine, &backend.LocalWriterConfig{}) + }) require.NoError(s.T(), err) require.Len(s.T(), saveCpCh, 2) } diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 427405161c6fa..5fc0a0e377510 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2149,44 +2149,6 @@ type deliverResult struct { err error } -func (cr *chunkRestore) adjustRowID(rawData *deliveredKVs, t *TableRestore, pendingDataKVs, pendingIndexKVs *kv.Rows) bool { - data := rawData.kvs.(*kv.KvPairs) - res := false - curRowIDs := data.GetRowIDs() - illegalIdx := -1 - for i := range curRowIDs { - if curRowIDs[i] > cr.originalMaxRowID { - illegalIdx = i - break - } - } - if illegalIdx >= 0 { - if illegalIdx > 0 { - rawData.rowID = curRowIDs[illegalIdx-1] - } - invalidLen := len(curRowIDs) - illegalIdx - invalidKVs := data.Slice(illegalIdx) - if curRowIDs[illegalIdx] > cr.allocMaxRowID { - // re-allocate rowIDs - cr.allocRowIDBase, cr.allocMaxRowID = t.allocateRowIDs() - res = true - } - // set new rowIDs - for i := 0; i < invalidLen; i++ { - invalidKVs.SetRowID(i, cr.allocRowIDBase) - cr.allocRowIDBase++ - if cr.allocRowIDBase > cr.allocMaxRowID { - // allocated ids run out - cr.allocRowIDBase, cr.allocMaxRowID = t.allocateRowIDs() - res = true - } - } - var dataChecksum, indexChecksum verify.KVChecksum - invalidKVs.ClassifyAndAppend(pendingDataKVs, &dataChecksum, pendingIndexKVs, &indexChecksum) - } - return res -} - //nolint:nakedret // TODO: refactor func (cr *chunkRestore) deliverLoop( ctx context.Context, @@ -2195,8 +2157,6 @@ func (cr *chunkRestore) deliverLoop( engineID int32, dataWriter, indexWriter *backend.LocalEngineWriter, rc *Controller, - dataEngine, indexEngine *backend.OpenedEngine, - writerCfg *backend.LocalWriterConfig, ) (deliverTotalDur time.Duration, err error) { var channelClosed bool @@ -2209,8 +2169,6 @@ func (cr *chunkRestore) deliverLoop( // Fetch enough KV pairs from the source. dataKVs := rc.backend.MakeEmptyRows() indexKVs := rc.backend.MakeEmptyRows() - pendingDataKVs := rc.backend.MakeEmptyRows() - pendingIndexKVs := rc.backend.MakeEmptyRows() dataSynced := true for !channelClosed { @@ -2222,7 +2180,6 @@ func (cr *chunkRestore) deliverLoop( startOffset := cr.chunk.Chunk.Offset currOffset := startOffset rowID := cr.chunk.Chunk.PrevRowIDMax - reAlloc := false populate: for dataChecksum.SumSize()+indexChecksum.SumSize() < minDeliverBytes { select { @@ -2232,11 +2189,7 @@ func (cr *chunkRestore) deliverLoop( break populate } for _, p := range kvPacket { - reAlloc = cr.adjustRowID(&p, t, &pendingDataKVs, &pendingIndexKVs) p.kvs.ClassifyAndAppend(&dataKVs, &dataChecksum, &indexKVs, &indexChecksum) - if !reAlloc { - // combine pendingDataKVs and dataKVs - } columns = p.columns currOffset = p.offset rowID = p.rowID @@ -2310,13 +2263,6 @@ func (cr *chunkRestore) deliverLoop( // No need to save checkpoint if nothing was delivered. dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataWriter, indexWriter) } - if reAlloc { - dataWriter, err = dataEngine.LocalWriter(ctx, writerCfg) - if err != nil { - return - } - indexWriter, err = indexEngine.LocalWriter(ctx, &backend.LocalWriterConfig{}) - } failpoint.Inject("SlowDownWriteRows", func() { deliverLogger.Warn("Slowed down write rows") }) @@ -2543,8 +2489,6 @@ func (cr *chunkRestore) restore( engineID int32, dataWriter, indexWriter *backend.LocalEngineWriter, rc *Controller, - dataEngine, indexEngine *backend.OpenedEngine, - writerCfg *backend.LocalWriterConfig, ) error { // Create the encoder. kvEncoder, err := rc.backend.NewEncoder(t.encTable, &kv.SessionOptions{ @@ -2569,7 +2513,7 @@ func (cr *chunkRestore) restore( go func() { defer close(deliverCompleteCh) - dur, err := cr.deliverLoop(ctx, kvsCh, t, engineID, dataWriter, indexWriter, rc, dataEngine, indexEngine, writerCfg) + dur, err := cr.deliverLoop(ctx, kvsCh, t, engineID, dataWriter, indexWriter, rc) select { case <-ctx.Done(): case deliverCompleteCh <- deliverResult{dur, err}: diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index e2f52a96e1d26..726e3cfd575a6 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -543,7 +543,7 @@ func (tr *TableRestore) restoreEngine( rc.regionWorkers.Recycle(w) }() metric.ChunkCounter.WithLabelValues(metric.ChunkStateRunning).Add(remainChunkCnt) - err := cr.restore(ctx, tr, engineID, dataWriter, indexWriter, rc, dataEngine, indexEngine, dataWriterCfg) + err := cr.restore(ctx, tr, engineID, dataWriter, indexWriter, rc) var dataFlushStatus, indexFlushStaus backend.ChunkFlushStatus if err == nil { dataFlushStatus, err = dataWriter.Close(ctx) From ab9db3232d2801b06d00ba91537fc67c6c0aae37 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Sat, 7 May 2022 11:43:56 +0800 Subject: [PATCH 04/21] fix: ut --- br/pkg/lightning/backend/kv/sql2kv.go | 26 ------ br/pkg/lightning/mydump/region.go | 7 +- br/pkg/lightning/mydump/region_test.go | 86 +++++++++++++++++++ br/pkg/lightning/restore/restore.go | 19 ++-- br/pkg/lightning/restore/table_restore.go | 21 ----- .../lightning/restore/table_restore_test.go | 59 ++++++------- 6 files changed, 127 insertions(+), 91 deletions(-) diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index e26031186fcec..cfee7eceaea7a 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -542,29 +542,3 @@ func (kvs *KvPairs) Clear() Rows { kvs.pairs = kvs.pairs[:0] return kvs } - -func (kvs *KvPairs) GetRowIDs() []int64 { - res := make([]int64, len(kvs.pairs)) - for _, kv := range kvs.pairs { - res = append(res, kv.RowID) - } - return res -} - -func (kvs *KvPairs) Slice(idx int) *KvPairs { - if idx >= len(kvs.pairs) { - return nil - } - res := &KvPairs{ - pairs: kvs.pairs[idx:], - } - kvs.pairs = kvs.pairs[:idx] - return res -} - -func (kvs *KvPairs) SetRowID(idx int, newRowID int64) { - if idx >= len(kvs.pairs) { - return - } - kvs.pairs[idx].RowID = newRowID -} diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index e112a0d7a6ec2..88ed7580ebc6c 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -272,7 +272,7 @@ func makeSourceFileRegion( if !isCsvFile { divisor += 2 } - sizePerRow, err := sampleAndGetAvgRowSize(&fi, cfg, ioWorkers, store) + sizePerRow, err := SampleAndGetAvgRowSize(&fi, cfg, ioWorkers, store) if err == nil { divisor = sizePerRow } @@ -307,7 +307,7 @@ func makeSourceFileRegion( return []*TableRegion{tableRegion}, []float64{float64(fi.FileMeta.FileSize)}, nil } -func sampleAndGetAvgRowSize( +func SampleAndGetAvgRowSize( fileInfo *FileInfo, cfg *config.Config, ioWorkers *worker.Pool, @@ -429,6 +429,9 @@ func SplitLargeFile( } for { curRowsCnt := (endOffset - startOffset) / divisor + if curRowsCnt == 0 && endOffset != startOffset { + curRowsCnt = 1 + } rowIDMax := prevRowIdxMax + curRowsCnt if endOffset != dataFile.FileMeta.FileSize { r, err := store.Open(ctx, dataFile.FileMeta.Path) diff --git a/br/pkg/lightning/mydump/region_test.go b/br/pkg/lightning/mydump/region_test.go index 89d0799a96ef0..401bae3f8f703 100644 --- a/br/pkg/lightning/mydump/region_test.go +++ b/br/pkg/lightning/mydump/region_test.go @@ -384,3 +384,89 @@ func TestSplitLargeFileOnlyOneChunk(t *testing.T) { require.Equal(t, columns, regions[i].Chunk.Columns) } } + +func TestSampleAndGetAvgRowSize(t *testing.T) { + // It's more difficult to estimate sizes of SQL files than csv files, + // because when reading the first row of them, parser may read other info (e.g. table name) + // so that make it hard to get good estimate, especially when files have few rows. + sqlFiles := []string{ + // 1. long table name, values: + // 1.1 short and even len + "INSERT INTO `test_db_mock_long.test_table_very_long_name` VALUES (1),(2);", + // 1.2 short and not even + "INSERT INTO `test_db_mock_long.test_table_very_long_name` VALUES (123452123,1234123125),(2,1);", + "INSERT INTO `test_db_mock_long.test_table_very_long_name` VALUES (2,1),(123452123,1234123125);", + // 1.3 long and even + "INSERT INTO `test_db_mock_long.test_table_very_long_name` VALUES (123452123,1234123125),(1234123125,12341231251);", + // 1.4 long but not even + "INSERT INTO `test_db_mock_long.test_table_very_long_name` VALUES ('abcdefghidgjla','lkjadsfasfdkjl'),('1111111','1');", + // 2. short table name, values: + // 2.1 short and even len + "INSERT INTO `a` VALUES (1),(2);", + // 2.2 short and not even + "INSERT INTO `a` VALUES (123452123,1234123125),(2,1);", + "INSERT INTO `a` VALUES (2,1),(123452123,1234123125);", + // 2.3 long and even + "INSERT INTO `a` VALUES (123452123,1234123125),(1234123125,12341231251);", + // 2.4 long but not even + "INSERT INTO `a` VALUES ('abcdefghidgjla','lkjadsfasfdkjl'),('1111111','1');", + } + + csvFiles := []string{ + // even and short + "a,b,c\r\n1,2,3\r\n4,5,6\r\n", + // not even but short + "a,b,c\r\n1112,1234,1923\r\n1,2,3", + // even and long + "a,b,c\r\n14712312,123122,1231233\r\n4456364,34525,423426\r\n", + // not even but long + "a,b,c\r\nsadlk;fja;lskdfj;alksdfj,sdlk;fjaksld;fja;l,qpoiwuepqou\r\n0,0,0\r\n", + } + testFunc := func(files []string, fileType SourceType) { + for _, file := range files { + dir := t.TempDir() + + var fileName string + if fileType == SourceTypeCSV { + fileName = "test.csv" + } else { + fileName = "test.sql" + } + filePath := filepath.Join(dir, fileName) + + content := []byte(file) + err := os.WriteFile(filePath, content, 0o644) + require.Nil(t, err) + dataFileInfo, err := os.Stat(filePath) + fileSize := dataFileInfo.Size() + + cfg := newConfigWithSourceDir(dir) + loader, _ := NewMyDumpLoader(context.Background(), cfg) + ioWorkers := worker.NewPool(context.Background(), 1, "io") + + // specify ReadBlockSize because we need to sample files + cfg.Mydumper.ReadBlockSize = config.ReadBlockSize + fileInfo := FileInfo{ + FileMeta: SourceFileMeta{ + Path: fileName, + Type: fileType, + FileSize: fileSize, + }, + } + cfg.Mydumper.CSV = config.CSVConfig{ + Separator: ",", + Delimiter: `"`, + Header: true, + NotNull: false, + Null: `\N`, + BackslashEscape: true, + TrimLastSep: false, + } + size, err := SampleAndGetAvgRowSize(&fileInfo, cfg, ioWorkers, loader.GetStore()) + require.Nil(t, err) + require.GreaterOrEqual(t, fileSize/size, int64(2)) + } + } + testFunc(sqlFiles, SourceTypeSQL) + testFunc(csvFiles, SourceTypeCSV) +} diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 49ef88e46281a..ea03e301eef50 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2028,13 +2028,9 @@ func (rc *Controller) DataCheck(ctx context.Context) error { } type chunkRestore struct { - parser mydump.Parser - index int - chunk *checkpoints.ChunkCheckpoint - allocMaxRowID int64 // upper bound - allocRowIDBase int64 // lower bound - - originalMaxRowID int64 + parser mydump.Parser + index int + chunk *checkpoints.ChunkCheckpoint } func newChunkRestore( @@ -2091,12 +2087,9 @@ func newChunkRestore( } return &chunkRestore{ - parser: parser, - index: index, - chunk: chunk, - allocMaxRowID: chunk.Chunk.RowIDMax, - allocRowIDBase: chunk.Chunk.PrevRowIDMax, - originalMaxRowID: chunk.Chunk.RowIDMax, + parser: parser, + index: index, + chunk: chunk, }, nil } diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index e01b5b6736322..48057c0f17607 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -21,7 +21,6 @@ import ( "sync" "time" - "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/backend" @@ -46,15 +45,6 @@ import ( "go.uber.org/zap" ) -const ( - defaultRowPerFile = 10 - fileCntThrd = 10 - fileByteThrd = 10 * units.KiB - // we are expecting the estimates are closed to the reality, - // hence, TableRestore re-allocates 100000 new RowIDs for extreme cases. - defaultNewAllocIDCnt = 100000 -) - type TableRestore struct { // The unique table name in the form "`db`.`tbl`". tableName string @@ -66,8 +56,6 @@ type TableRestore struct { logger log.Logger ignoreColumns map[string]struct{} - - curMaxRowID int64 } func NewTableRestore( @@ -110,9 +98,6 @@ func (tr *TableRestore) populateChunks(ctx context.Context, rc *Controller, cp * timestamp = int64(v.(int)) }) for _, chunk := range chunks { - if chunk.Chunk.RowIDMax > tr.curMaxRowID { - tr.curMaxRowID = chunk.Chunk.RowIDMax - } engine, found := cp.Engines[chunk.EngineID] if !found { engine = &checkpoints.EngineCheckpoint{ @@ -991,12 +976,6 @@ func (tr *TableRestore) analyzeTable(ctx context.Context, g glue.SQLExecutor) er return err } -func (tr *TableRestore) allocateRowIDs() (int64, int64) { - newBase := tr.curMaxRowID + 1 - tr.curMaxRowID += defaultNewAllocIDCnt - return newBase, tr.curMaxRowID -} - // estimate SST files compression threshold by total row file size // with a higher compression threshold, the compression time increases, but the iteration time decreases. // Try to limit the total SST files number under 500. But size compress 32GB SST files cost about 20min, diff --git a/br/pkg/lightning/restore/table_restore_test.go b/br/pkg/lightning/restore/table_restore_test.go index dd3b081b2854b..b4bf1d36fc2f2 100644 --- a/br/pkg/lightning/restore/table_restore_test.go +++ b/br/pkg/lightning/restore/table_restore_test.go @@ -197,6 +197,7 @@ func (s *tableRestoreSuite) TestPopulateChunks() { Engines: make(map[int32]*checkpoints.EngineCheckpoint), } + s.cfg.Mydumper.CSV.Header = false rc := &Controller{cfg: s.cfg, ioWorkers: worker.NewPool(context.Background(), 1, "io"), store: s.store} err := s.tr.populateChunks(context.Background(), rc, cp) require.NoError(s.T(), err) @@ -215,7 +216,7 @@ func (s *tableRestoreSuite) TestPopulateChunks() { Offset: 0, EndOffset: 37, PrevRowIDMax: 0, - RowIDMax: 7, // 37 bytes with 3 columns can store at most 7 rows. + RowIDMax: 1, }, Timestamp: 1234567897, }, @@ -225,8 +226,8 @@ func (s *tableRestoreSuite) TestPopulateChunks() { Chunk: mydump.Chunk{ Offset: 0, EndOffset: 37, - PrevRowIDMax: 7, - RowIDMax: 14, + PrevRowIDMax: 1, + RowIDMax: 2, }, Timestamp: 1234567897, }, @@ -236,8 +237,8 @@ func (s *tableRestoreSuite) TestPopulateChunks() { Chunk: mydump.Chunk{ Offset: 0, EndOffset: 37, - PrevRowIDMax: 14, - RowIDMax: 21, + PrevRowIDMax: 2, + RowIDMax: 3, }, Timestamp: 1234567897, }, @@ -252,8 +253,8 @@ func (s *tableRestoreSuite) TestPopulateChunks() { Chunk: mydump.Chunk{ Offset: 0, EndOffset: 37, - PrevRowIDMax: 21, - RowIDMax: 28, + PrevRowIDMax: 3, + RowIDMax: 4, }, Timestamp: 1234567897, }, @@ -263,8 +264,8 @@ func (s *tableRestoreSuite) TestPopulateChunks() { Chunk: mydump.Chunk{ Offset: 0, EndOffset: 37, - PrevRowIDMax: 28, - RowIDMax: 35, + PrevRowIDMax: 4, + RowIDMax: 5, }, Timestamp: 1234567897, }, @@ -274,8 +275,8 @@ func (s *tableRestoreSuite) TestPopulateChunks() { Chunk: mydump.Chunk{ Offset: 0, EndOffset: 37, - PrevRowIDMax: 35, - RowIDMax: 42, + PrevRowIDMax: 5, + RowIDMax: 6, }, Timestamp: 1234567897, }, @@ -290,8 +291,8 @@ func (s *tableRestoreSuite) TestPopulateChunks() { Chunk: mydump.Chunk{ Offset: 0, EndOffset: 14, - PrevRowIDMax: 42, - RowIDMax: 46, + PrevRowIDMax: 6, + RowIDMax: 10, }, Timestamp: 1234567897, }, @@ -471,7 +472,7 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { Offset: 0, EndOffset: 14, PrevRowIDMax: 0, - RowIDMax: 4, // 37 bytes with 3 columns can store at most 7 rows. + RowIDMax: 4, // 14 bytes and 3 byte for each row }, Timestamp: 1234567897, }, @@ -482,7 +483,7 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { Offset: 0, EndOffset: 10, PrevRowIDMax: 4, - RowIDMax: 7, + RowIDMax: 9, // 10 bytes and 2 byte for each row }, Timestamp: 1234567897, }, @@ -493,8 +494,8 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { Chunk: mydump.Chunk{ Offset: 6, EndOffset: 52, - PrevRowIDMax: 7, - RowIDMax: 20, + PrevRowIDMax: 9, + RowIDMax: 13, Columns: []string{"a", "b", "c"}, }, @@ -507,8 +508,8 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { Chunk: mydump.Chunk{ Offset: 52, EndOffset: 60, - PrevRowIDMax: 20, - RowIDMax: 22, + PrevRowIDMax: 13, + RowIDMax: 14, Columns: []string{"a", "b", "c"}, }, Timestamp: 1234567897, @@ -520,8 +521,8 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { Chunk: mydump.Chunk{ Offset: 6, EndOffset: 48, - PrevRowIDMax: 22, - RowIDMax: 35, + PrevRowIDMax: 14, + RowIDMax: 17, Columns: []string{"c", "a", "b"}, }, Timestamp: 1234567897, @@ -538,8 +539,8 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { Chunk: mydump.Chunk{ Offset: 48, EndOffset: 101, - PrevRowIDMax: 35, - RowIDMax: 48, + PrevRowIDMax: 17, + RowIDMax: 20, Columns: []string{"c", "a", "b"}, }, Timestamp: 1234567897, @@ -551,8 +552,8 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { Chunk: mydump.Chunk{ Offset: 101, EndOffset: 102, - PrevRowIDMax: 48, - RowIDMax: 48, + PrevRowIDMax: 20, + RowIDMax: 21, Columns: []string{"c", "a", "b"}, }, Timestamp: 1234567897, @@ -564,8 +565,8 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { Chunk: mydump.Chunk{ Offset: 4, EndOffset: 59, - PrevRowIDMax: 48, - RowIDMax: 61, + PrevRowIDMax: 21, + RowIDMax: 23, Columns: []string{"b", "c"}, }, Timestamp: 1234567897, @@ -582,8 +583,8 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { Chunk: mydump.Chunk{ Offset: 59, EndOffset: 60, - PrevRowIDMax: 61, - RowIDMax: 61, + PrevRowIDMax: 23, + RowIDMax: 24, Columns: []string{"b", "c"}, }, Timestamp: 1234567897, From 6638d9c99ca1b01acf4b2a5a34db78c2ef7c8229 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Sat, 7 May 2022 14:21:50 +0800 Subject: [PATCH 05/21] try fix it --- br/pkg/lightning/mydump/region.go | 8 ++++++-- br/pkg/lightning/mydump/region_test.go | 1 + br/tests/lightning_auto_random_default/run.sh | 8 ++++---- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index 88ed7580ebc6c..2d456f19308ca 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -273,7 +273,7 @@ func makeSourceFileRegion( divisor += 2 } sizePerRow, err := SampleAndGetAvgRowSize(&fi, cfg, ioWorkers, store) - if err == nil { + if err == nil && sizePerRow != 0 { divisor = sizePerRow } // If a csv file is overlarge, we need to split it into multiple regions. @@ -347,7 +347,11 @@ func SampleAndGetAvgRowSize( totalBytes += parser.LastRow().Length totalRows++ } - return int64(totalBytes) / int64(totalRows), nil + if totalRows > 0 { + return int64(totalBytes) / int64(totalRows), nil + } else { + return 0, nil + } } // because parquet files can't seek efficiently, there is no benefit in split. diff --git a/br/pkg/lightning/mydump/region_test.go b/br/pkg/lightning/mydump/region_test.go index 401bae3f8f703..e2c6d69271dc1 100644 --- a/br/pkg/lightning/mydump/region_test.go +++ b/br/pkg/lightning/mydump/region_test.go @@ -438,6 +438,7 @@ func TestSampleAndGetAvgRowSize(t *testing.T) { err := os.WriteFile(filePath, content, 0o644) require.Nil(t, err) dataFileInfo, err := os.Stat(filePath) + require.Nil(t, err) fileSize := dataFileInfo.Size() cfg := newConfigWithSourceDir(dir) diff --git a/br/tests/lightning_auto_random_default/run.sh b/br/tests/lightning_auto_random_default/run.sh index 432582bf72e24..2d296620f8c12 100644 --- a/br/tests/lightning_auto_random_default/run.sh +++ b/br/tests/lightning_auto_random_default/run.sh @@ -40,10 +40,10 @@ for backend in tidb importer local; do check_contains 'inc: 6' NEXT_AUTO_RAND_VAL=7 else - check_contains 'inc: 25' - check_contains 'inc: 26' - check_contains 'inc: 27' - NEXT_AUTO_RAND_VAL=28 + check_contains 'inc: 6' + check_contains 'inc: 7' + check_contains 'inc: 8' + NEXT_AUTO_RAND_VAL=9 fi # tidb backend randomly generate the auto-random bit for each statement, so with 2 statements, From 2fca896ad5ce608f197685bfad27346e3cac2cf3 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Wed, 11 May 2022 16:31:02 +0800 Subject: [PATCH 06/21] feat: realloc id when run out temp refactor: realloc id feat: refine it & ut --- Makefile | 8 ++ br/pkg/lightning/backend/kv/sql2kv.go | 13 ++++ br/pkg/lightning/backend/local/engine.go | 51 +++++++++++++ br/pkg/lightning/backend/local/engine_test.go | 74 +++++++++++++++++++ br/pkg/lightning/mydump/region.go | 5 ++ .../lightning/restore/chunk_restore_test.go | 2 +- br/pkg/lightning/restore/restore.go | 32 ++++++-- br/pkg/lightning/restore/table_restore.go | 17 ++++- br/tests/lightning_realloc_id/config.toml | 2 + .../data/db-schema-create.sql | 1 + .../data/db.test-schema.sql | 4 + .../data/db.test.000000000.csv | 11 +++ .../data/db.test.000000001.sql | 11 +++ br/tests/lightning_realloc_id/run.sh | 39 ++++++++++ 14 files changed, 262 insertions(+), 8 deletions(-) create mode 100644 br/tests/lightning_realloc_id/config.toml create mode 100644 br/tests/lightning_realloc_id/data/db-schema-create.sql create mode 100644 br/tests/lightning_realloc_id/data/db.test-schema.sql create mode 100644 br/tests/lightning_realloc_id/data/db.test.000000000.csv create mode 100644 br/tests/lightning_realloc_id/data/db.test.000000001.sql create mode 100644 br/tests/lightning_realloc_id/run.sh diff --git a/Makefile b/Makefile index 7cb288823fefb..f41ee25518874 100644 --- a/Makefile +++ b/Makefile @@ -332,6 +332,14 @@ build_for_br_integration_test: ) || (make failpoint-disable && exit 1) @make failpoint-disable +build_for_lightning_test: + @make failpoint-enable + $(GOTEST) -c -cover -covermode=count \ + -coverpkg=github.com/pingcap/tidb/br/... \ + -o $(LIGHTNING_BIN).test \ + github.com/pingcap/tidb/br/cmd/tidb-lightning + @make failpoint-disable + br_unit_test: export ARGS=$$($(BR_PACKAGES)) br_unit_test: @make failpoint-enable diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index cfee7eceaea7a..481d25b2da73a 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -542,3 +542,16 @@ func (kvs *KvPairs) Clear() Rows { kvs.pairs = kvs.pairs[:0] return kvs } + +func (kvs *KvPairs) GetRowIDToKv() map[int64][]*common.KvPair { + res := make(map[int64][]*common.KvPair, 0) + for i, p := range kvs.pairs { + if _, ok := res[p.RowID]; !ok { + res[p.RowID] = make([]*common.KvPair, 0) + res[p.RowID] = append(res[p.RowID], &kvs.pairs[i]) + } else { + res[p.RowID] = append(res[p.RowID], &kvs.pairs[i]) + } + } + return res +} diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 82ebc4c4c3e65..eefde3c14adb0 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -33,6 +33,7 @@ import ( "github.com/google/btree" "github.com/google/uuid" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" @@ -1000,6 +1001,25 @@ type Writer struct { batchSize int64 lastMetaSeq int32 + prevRowID int64 // only used for appendRowsSorted +} + +func (w *Writer) flushAndNewWriter() error { + var err error + failpoint.Inject("MockFlushWriter", func() { + failpoint.Goto("CreateNewWriter") + }) + err = w.flush(context.Background()) + if err != nil { + return errors.Trace(err) + } + failpoint.Label("CreateNewWriter") + newWriter, err := w.createSSTWriter() + if err != nil { + return errors.Trace(err) + } + w.writer = newWriter + return nil } func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { @@ -1010,6 +1030,14 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { } w.writer = writer } + if len(kvs) > 0 && w.prevRowID != 0 && kvs[0].RowID > w.prevRowID+1 { + // rowID leap. probably re-alloc id + // should write to different sst + err := w.flushAndNewWriter() + if err != nil { + return err + } + } keyAdapter := w.engine.keyAdapter totalKeySize := 0 @@ -1034,6 +1062,29 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { } kvs = newKvs } + sliceIdx := -1 + for i := 1; i < len(kvs); i++ { + if kvs[i].RowID > kvs[i-1].RowID+1 && sliceIdx == -1 { + // rowID leap, probably re-alloc id + // should write to different sst + sliceIdx = i + } + if i == len(kvs)-1 { + w.prevRowID = kvs[i].RowID + } + } + if sliceIdx > 0 { + oldKvs := kvs[:sliceIdx] + kvs = kvs[sliceIdx:] + err := w.writer.writeKVs(oldKvs) + if err != nil { + return err + } + err = w.flushAndNewWriter() + if err != nil { + return err + } + } return w.writer.writeKVs(kvs) } diff --git a/br/pkg/lightning/backend/local/engine_test.go b/br/pkg/lightning/backend/local/engine_test.go index d78aa29ee1c36..3d13d4f0e8c4d 100644 --- a/br/pkg/lightning/backend/local/engine_test.go +++ b/br/pkg/lightning/backend/local/engine_test.go @@ -28,7 +28,9 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/require" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/backend" + "github.com/pingcap/tidb/br/pkg/lightning/common" ) func TestIngestSSTWithClosedEngine(t *testing.T) { @@ -83,3 +85,75 @@ func TestIngestSSTWithClosedEngine(t *testing.T) { }, }), errorEngineClosed) } + +func TestAutoSplitSST(t *testing.T) { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/MockFlushWriter", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/MockFlushWriter")) + }() + var err error + dir := os.TempDir() + w := &Writer{ + engine: &Engine{ + sstDir: dir, + keyAdapter: noopKeyAdapter{}, + }, + isKVSorted: true, + isWriteBatchSorted: true, + } + w.engine.closed.Store(false) + w.writer, err = w.createSSTWriter() + require.Nil(t, err) + kvs := []common.KvPair{ + { + Key: []byte("1"), + Val: []byte("val1"), + RowID: 1, + }, + { + Key: []byte("2"), + Val: []byte("val1"), + RowID: 2, + }, + } + prevWriter := w.writer + err = w.appendRowsSorted(kvs) + require.Nil(t, err) + require.True(t, prevWriter == w.writer) + kvs = []common.KvPair{ + { + Key: []byte("10"), + Val: []byte("val10"), + RowID: 10, + }, + { + Key: []byte("11"), + Val: []byte("val11"), + RowID: 11, + }, + } + err = w.appendRowsSorted(kvs) + require.Nil(t, err) + require.False(t, prevWriter == w.writer) // id leap, should flush and create + prevWriter = w.writer + kvs = []common.KvPair{ + { + Key: []byte("12"), + Val: []byte("val12"), + RowID: 10, + }, + { + Key: []byte("13"), + Val: []byte("val13"), + RowID: 11, + }, + { + Key: []byte("15"), + Val: []byte("val15"), + RowID: 15, + }, + } + err = w.appendRowsSorted(kvs) + require.Nil(t, err) + require.False(t, prevWriter == w.writer) // id leap, should flush and create +} diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index 2d456f19308ca..d4bb8e91ac628 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -22,6 +22,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/worker" @@ -297,6 +298,10 @@ func makeSourceFileRegion( RowIDMax: fi.FileMeta.FileSize / divisor, }, } + failpoint.Inject("MockInaccurateRowID", func() { + // only allocates 5 rows but contains 10 rows + tableRegion.Chunk.RowIDMax = 5 + }) if tableRegion.Size() > tableRegionSizeWarningThreshold { log.L().Warn( diff --git a/br/pkg/lightning/restore/chunk_restore_test.go b/br/pkg/lightning/restore/chunk_restore_test.go index e32e7b57a15e0..09d9e520f560d 100644 --- a/br/pkg/lightning/restore/chunk_restore_test.go +++ b/br/pkg/lightning/restore/chunk_restore_test.go @@ -72,7 +72,7 @@ func (s *chunkRestoreSuite) SetupTest() { } var err error - s.cr, err = newChunkRestore(context.Background(), 1, s.cfg, &chunk, w, s.store, nil) + s.cr, err = newChunkRestore(context.Background(), 1, s.cfg, &chunk, w, s.store, nil, nil) require.NoError(s.T(), err) } diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index ea03e301eef50..81050ca360089 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2028,9 +2028,13 @@ func (rc *Controller) DataCheck(ctx context.Context) error { } type chunkRestore struct { - parser mydump.Parser - index int - chunk *checkpoints.ChunkCheckpoint + parser mydump.Parser + index int + chunk *checkpoints.ChunkCheckpoint + originalRowIDMax int64 + curRowIDBase int64 + curRowIDMax int64 + tableRestore *TableRestore } func newChunkRestore( @@ -2041,6 +2045,7 @@ func newChunkRestore( ioWorkers *worker.Pool, store storage.ExternalStorage, tableInfo *checkpoints.TidbTableInfo, + tableRestore *TableRestore, ) (*chunkRestore, error) { blockBufSize := int64(cfg.Mydumper.ReadBlockSize) @@ -2087,9 +2092,11 @@ func newChunkRestore( } return &chunkRestore{ - parser: parser, - index: index, - chunk: chunk, + parser: parser, + index: index, + chunk: chunk, + originalRowIDMax: chunk.Chunk.RowIDMax, + tableRestore: tableRestore, }, nil } @@ -2142,6 +2149,17 @@ type deliverResult struct { err error } +func (cr *chunkRestore) adjustRowID(rowID *int64) { + if *rowID > cr.originalRowIDMax { + if cr.curRowIDBase >= cr.curRowIDMax { + // reallocate rowID + cr.curRowIDBase, cr.curRowIDMax = cr.tableRestore.allocateRowIDs() + } + *rowID = cr.curRowIDBase + cr.curRowIDBase++ + } +} + //nolint:nakedret // TODO: refactor func (cr *chunkRestore) deliverLoop( ctx context.Context, @@ -2422,6 +2440,8 @@ func (cr *chunkRestore) encodeLoop( encodeDurStart := time.Now() lastRow := cr.parser.LastRow() // sql -> kv + cr.adjustRowID(&lastRow.RowID) + rowID = lastRow.RowID kvs, encodeErr := kvEncoder.Encode(logger, lastRow.Row, lastRow.RowID, cr.chunk.ColumnPermutation, cr.chunk.Key.Path, curOffset) encodeDur += time.Since(encodeDurStart) diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 48057c0f17607..ec83699ab22c4 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -45,6 +45,8 @@ import ( "go.uber.org/zap" ) +const defaultNewAllocIDCnt = 100000 + type TableRestore struct { // The unique table name in the form "`db`.`tbl`". tableName string @@ -56,6 +58,8 @@ type TableRestore struct { logger log.Logger ignoreColumns map[string]struct{} + rowIDLock sync.Mutex + curMaxRowID int64 } func NewTableRestore( @@ -98,6 +102,9 @@ func (tr *TableRestore) populateChunks(ctx context.Context, rc *Controller, cp * timestamp = int64(v.(int)) }) for _, chunk := range chunks { + if chunk.Chunk.RowIDMax > tr.curMaxRowID { + tr.curMaxRowID = chunk.Chunk.RowIDMax + } engine, found := cp.Engines[chunk.EngineID] if !found { engine = &checkpoints.EngineCheckpoint{ @@ -493,7 +500,7 @@ func (tr *TableRestore) restoreEngine( // 2. sql -> kvs // 3. load kvs data (into kv deliver server) // 4. flush kvs data (into tikv node) - cr, err := newChunkRestore(ctx, chunkIndex, rc.cfg, chunk, rc.ioWorkers, rc.store, tr.tableInfo) + cr, err := newChunkRestore(ctx, chunkIndex, rc.cfg, chunk, rc.ioWorkers, rc.store, tr.tableInfo, tr) if err != nil { setError(err) break @@ -1012,3 +1019,11 @@ func estimateCompactionThreshold(cp *checkpoints.TableCheckpoint, factor int64) return threshold } + +func (tr *TableRestore) allocateRowIDs() (int64, int64) { + tr.rowIDLock.Lock() + defer tr.rowIDLock.Unlock() + newBase := tr.curMaxRowID + 1 + tr.curMaxRowID += defaultNewAllocIDCnt + return newBase, tr.curMaxRowID +} diff --git a/br/tests/lightning_realloc_id/config.toml b/br/tests/lightning_realloc_id/config.toml new file mode 100644 index 0000000000000..d2152b47c922a --- /dev/null +++ b/br/tests/lightning_realloc_id/config.toml @@ -0,0 +1,2 @@ +[tikv-importer] +backend = 'local' diff --git a/br/tests/lightning_realloc_id/data/db-schema-create.sql b/br/tests/lightning_realloc_id/data/db-schema-create.sql new file mode 100644 index 0000000000000..c88b0e3150e76 --- /dev/null +++ b/br/tests/lightning_realloc_id/data/db-schema-create.sql @@ -0,0 +1 @@ +create database db; \ No newline at end of file diff --git a/br/tests/lightning_realloc_id/data/db.test-schema.sql b/br/tests/lightning_realloc_id/data/db.test-schema.sql new file mode 100644 index 0000000000000..0490cd81e1c2e --- /dev/null +++ b/br/tests/lightning_realloc_id/data/db.test-schema.sql @@ -0,0 +1,4 @@ +create table db.test( + id int auto_increment unique key, + a int primary key +); \ No newline at end of file diff --git a/br/tests/lightning_realloc_id/data/db.test.000000000.csv b/br/tests/lightning_realloc_id/data/db.test.000000000.csv new file mode 100644 index 0000000000000..f2ce71fb561c5 --- /dev/null +++ b/br/tests/lightning_realloc_id/data/db.test.000000000.csv @@ -0,0 +1,11 @@ +a +100 +101 +102 +103 +104 +105 +106 +107 +108 +109 \ No newline at end of file diff --git a/br/tests/lightning_realloc_id/data/db.test.000000001.sql b/br/tests/lightning_realloc_id/data/db.test.000000001.sql new file mode 100644 index 0000000000000..611b5f5dbeba6 --- /dev/null +++ b/br/tests/lightning_realloc_id/data/db.test.000000001.sql @@ -0,0 +1,11 @@ +insert into db.test(a) values +(200), +(201), +(202), +(203), +(204), +(205), +(206), +(207), +(208), +(209); \ No newline at end of file diff --git a/br/tests/lightning_realloc_id/run.sh b/br/tests/lightning_realloc_id/run.sh new file mode 100644 index 0000000000000..d56392b44f110 --- /dev/null +++ b/br/tests/lightning_realloc_id/run.sh @@ -0,0 +1,39 @@ +#!/bin/sh +# +# Copyright 2022 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Basic check for whether partitioned tables work. + +set -eu +check_cluster_version 4 0 0 'local backend' + +function check_result() { + run_sql 'SHOW DATABASES;' + check_contains 'Database: db'; + run_sql 'SHOW TABLES IN db;' + check_contains 'Tables_in_db: test' + run_sql 'SELECT count(*) FROM db.test;' + check_contains 'count(*): 20' + run_sql 'SELECT * FROM db.test;' + check_contains 'id: 100015' + check_contains 'id: 15' +} + +run_sql 'DROP DATABASE IF EXISTS db;' +export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/mydump/MockInaccurateRowID=return(true)' +run_lightning --config "tests/$TEST_NAME/config.toml" +check_result +run_sql 'DROP DATABASE IF EXISTS db;' +unset GO_FAILPOINTS \ No newline at end of file From 9251a9c2bd309e8847e2d836ce7d49e0cfec6f69 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Mon, 23 May 2022 12:39:19 +0800 Subject: [PATCH 07/21] fix: parallel import --- br/pkg/lightning/restore/restore.go | 42 +++++++++++++++---- br/pkg/lightning/restore/table_restore.go | 25 +++++++---- br/tests/lightning_realloc_id/config.toml | 1 + .../data1/db-schema-create.sql | 1 + .../data1/db.test-schema.sql | 4 ++ .../data1/db.test.000000000.csv | 11 +++++ .../data1/db.test.000000001.sql | 11 +++++ br/tests/lightning_realloc_id/run.sh | 30 ++++++++++++- 8 files changed, 110 insertions(+), 15 deletions(-) create mode 100644 br/tests/lightning_realloc_id/data1/db-schema-create.sql create mode 100644 br/tests/lightning_realloc_id/data1/db.test-schema.sql create mode 100644 br/tests/lightning_realloc_id/data1/db.test.000000000.csv create mode 100644 br/tests/lightning_realloc_id/data1/db.test.000000001.sql diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 81050ca360089..2da5e1b90eaa9 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -1587,6 +1587,9 @@ func (tr *TableRestore) restoreTable( rowIDMax = engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax } } + if rowIDMax > tr.curMaxRowID { + tr.curMaxRowID = rowIDMax + } db, _ := rc.tidbGlue.GetDB() versionStr, err := version.FetchVersion(ctx, db) if err != nil { @@ -2035,6 +2038,9 @@ type chunkRestore struct { curRowIDBase int64 curRowIDMax int64 tableRestore *TableRestore + + rowCount int + avgRowSize int } func newChunkRestore( @@ -2149,15 +2155,36 @@ type deliverResult struct { err error } -func (cr *chunkRestore) adjustRowID(rowID *int64) { - if *rowID > cr.originalRowIDMax { - if cr.curRowIDBase >= cr.curRowIDMax { +func (cr *chunkRestore) adjustRowID(rowID int64, rc *Controller) int64 { + if rowID > cr.originalRowIDMax { + if cr.curRowIDBase == 0 || cr.curRowIDBase > cr.curRowIDMax { // reallocate rowID - cr.curRowIDBase, cr.curRowIDMax = cr.tableRestore.allocateRowIDs() - } - *rowID = cr.curRowIDBase + pos, _ := cr.parser.Pos() + leftFileSize := cr.chunk.Chunk.EndOffset - pos + newRowIDCount := leftFileSize/int64(cr.avgRowSize) + 1 // plus the current row + newBase, newMax, err := cr.tableRestore.allocateRowIDs(newRowIDCount, rc) + if err != nil { + logger := cr.tableRestore.logger.With( + zap.String("tableName", cr.tableRestore.tableName), + zap.Int("fileIndex", cr.index), + zap.Stringer("path", &cr.chunk.Key), + zap.String("task", "re-allocate rowID"), + ) + logger.Error("fail to re-allocate rowIDs", zap.Error(err)) + return rowID + } + cr.curRowIDBase = newBase + cr.curRowIDMax = newMax + } + rowID = cr.curRowIDBase cr.curRowIDBase++ } + return rowID +} + +func (cr *chunkRestore) updateRowStats(rowSize int) { + cr.avgRowSize = (cr.avgRowSize*cr.rowCount + rowSize) / (cr.rowCount + 1) + cr.rowCount++ } //nolint:nakedret // TODO: refactor @@ -2440,7 +2467,8 @@ func (cr *chunkRestore) encodeLoop( encodeDurStart := time.Now() lastRow := cr.parser.LastRow() // sql -> kv - cr.adjustRowID(&lastRow.RowID) + lastRow.RowID = cr.adjustRowID(lastRow.RowID, rc) + cr.updateRowStats(lastRow.Length) rowID = lastRow.RowID kvs, encodeErr := kvEncoder.Encode(logger, lastRow.Row, lastRow.RowID, cr.chunk.ColumnPermutation, cr.chunk.Key.Path, curOffset) encodeDur += time.Since(encodeDurStart) diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index ec83699ab22c4..001c0c828b14e 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -102,9 +102,6 @@ func (tr *TableRestore) populateChunks(ctx context.Context, rc *Controller, cp * timestamp = int64(v.(int)) }) for _, chunk := range chunks { - if chunk.Chunk.RowIDMax > tr.curMaxRowID { - tr.curMaxRowID = chunk.Chunk.RowIDMax - } engine, found := cp.Engines[chunk.EngineID] if !found { engine = &checkpoints.EngineCheckpoint{ @@ -150,6 +147,9 @@ func (tr *TableRestore) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowID for _, chunk := range engine.Chunks { chunk.Chunk.PrevRowIDMax += rowIDBase chunk.Chunk.RowIDMax += rowIDBase + if chunk.Chunk.RowIDMax > tr.curMaxRowID { + tr.curMaxRowID = chunk.Chunk.RowIDMax + } } } } @@ -1020,10 +1020,21 @@ func estimateCompactionThreshold(cp *checkpoints.TableCheckpoint, factor int64) return threshold } -func (tr *TableRestore) allocateRowIDs() (int64, int64) { +func (tr *TableRestore) allocateRowIDs(newRowCount int64, rc *Controller) (int64, int64, error) { tr.rowIDLock.Lock() defer tr.rowIDLock.Unlock() - newBase := tr.curMaxRowID + 1 - tr.curMaxRowID += defaultNewAllocIDCnt - return newBase, tr.curMaxRowID + metaMgr := rc.metaMgrBuilder.TableMetaMgr(tr) + _, newBase, err := metaMgr.AllocTableRowIDs(context.Background(), newRowCount) + if err != nil { + return 0, 0, err + } + if newBase != 0 { + // re-alloc from downstream + tr.curMaxRowID = newBase + newRowCount + return newBase, newBase + newRowCount, nil + } else { + prevBase := tr.curMaxRowID + 1 + tr.curMaxRowID += newRowCount + return prevBase, tr.curMaxRowID, nil + } } diff --git a/br/tests/lightning_realloc_id/config.toml b/br/tests/lightning_realloc_id/config.toml index d2152b47c922a..f32593b43b798 100644 --- a/br/tests/lightning_realloc_id/config.toml +++ b/br/tests/lightning_realloc_id/config.toml @@ -1,2 +1,3 @@ [tikv-importer] +incremental-import = true backend = 'local' diff --git a/br/tests/lightning_realloc_id/data1/db-schema-create.sql b/br/tests/lightning_realloc_id/data1/db-schema-create.sql new file mode 100644 index 0000000000000..c88b0e3150e76 --- /dev/null +++ b/br/tests/lightning_realloc_id/data1/db-schema-create.sql @@ -0,0 +1 @@ +create database db; \ No newline at end of file diff --git a/br/tests/lightning_realloc_id/data1/db.test-schema.sql b/br/tests/lightning_realloc_id/data1/db.test-schema.sql new file mode 100644 index 0000000000000..0490cd81e1c2e --- /dev/null +++ b/br/tests/lightning_realloc_id/data1/db.test-schema.sql @@ -0,0 +1,4 @@ +create table db.test( + id int auto_increment unique key, + a int primary key +); \ No newline at end of file diff --git a/br/tests/lightning_realloc_id/data1/db.test.000000000.csv b/br/tests/lightning_realloc_id/data1/db.test.000000000.csv new file mode 100644 index 0000000000000..70ae8fd5a20a7 --- /dev/null +++ b/br/tests/lightning_realloc_id/data1/db.test.000000000.csv @@ -0,0 +1,11 @@ +a +300 +301 +302 +303 +304 +305 +306 +307 +308 +309 \ No newline at end of file diff --git a/br/tests/lightning_realloc_id/data1/db.test.000000001.sql b/br/tests/lightning_realloc_id/data1/db.test.000000001.sql new file mode 100644 index 0000000000000..461cf4c3fccaf --- /dev/null +++ b/br/tests/lightning_realloc_id/data1/db.test.000000001.sql @@ -0,0 +1,11 @@ +insert into db.test(a) values +(400), +(401), +(402), +(403), +(404), +(405), +(406), +(407), +(408), +(409); \ No newline at end of file diff --git a/br/tests/lightning_realloc_id/run.sh b/br/tests/lightning_realloc_id/run.sh index d56392b44f110..884f93a3b613a 100644 --- a/br/tests/lightning_realloc_id/run.sh +++ b/br/tests/lightning_realloc_id/run.sh @@ -18,6 +18,8 @@ set -eu check_cluster_version 4 0 0 'local backend' +LOG_FILE1="$TEST_DIR/lightning-distributed-import1.log" +LOG_FILE2="$TEST_DIR/lightning-distributed-import2.log" function check_result() { run_sql 'SHOW DATABASES;' @@ -27,8 +29,31 @@ function check_result() { run_sql 'SELECT count(*) FROM db.test;' check_contains 'count(*): 20' run_sql 'SELECT * FROM db.test;' - check_contains 'id: 100015' check_contains 'id: 15' + check_contains 'id: 20' +} + +function parallel_import() { + run_lightning --data-source-dir "tests/$TEST_NAME/data" \ + --sorted-kv-dir "$TEST_DIR/lightning_distributed_import.sorted1" \ + --log-file "$LOG_FILE1" \ + --config "tests/$TEST_NAME/config.toml" & + pid1="$!" + run_lightning --data-source-dir "tests/$TEST_NAME/data1" \ + --sorted-kv-dir "$TEST_DIR/lightning_distributed_import.sorted2" \ + --log-file "$LOG_FILE2" \ + --config "tests/$TEST_NAME/config.toml" & + pid2="$!" + wait "$pid1" "$pid2" +} + +function check_parallel_result() { + run_sql 'SHOW DATABASES;' + check_contains 'Database: db'; + run_sql 'SHOW TABLES IN db;' + check_contains 'Tables_in_db: test' + run_sql 'SELECT count(*) FROM db.test;' + check_contains 'count(*): 40' } run_sql 'DROP DATABASE IF EXISTS db;' @@ -36,4 +61,7 @@ export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/mydump/MockInaccu run_lightning --config "tests/$TEST_NAME/config.toml" check_result run_sql 'DROP DATABASE IF EXISTS db;' +parallel_import +check_parallel_result +run_sql 'DROP DATABASE IF EXISTS db;' unset GO_FAILPOINTS \ No newline at end of file From ca863ea62fc20239ecf101f58f6940a564d309e2 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Tue, 24 May 2022 14:38:29 +0800 Subject: [PATCH 08/21] fix: parallel import --- br/pkg/lightning/backend/kv/sql2kv.go | 13 ----- br/pkg/lightning/restore/meta_manager.go | 58 +++++++++++++++++++ br/pkg/lightning/restore/meta_manager_test.go | 31 ++++++++++ br/pkg/lightning/restore/restore.go | 10 ++-- br/pkg/lightning/restore/table_restore.go | 17 ++++-- 5 files changed, 106 insertions(+), 23 deletions(-) diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index 481d25b2da73a..cfee7eceaea7a 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -542,16 +542,3 @@ func (kvs *KvPairs) Clear() Rows { kvs.pairs = kvs.pairs[:0] return kvs } - -func (kvs *KvPairs) GetRowIDToKv() map[int64][]*common.KvPair { - res := make(map[int64][]*common.KvPair, 0) - for i, p := range kvs.pairs { - if _, ok := res[p.RowID]; !ok { - res[p.RowID] = make([]*common.KvPair, 0) - res[p.RowID] = append(res[p.RowID], &kvs.pairs[i]) - } else { - res[p.RowID] = append(res[p.RowID], &kvs.pairs[i]) - } - } - return res -} diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index 8eace8c5f979d..cdf171e798760 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -78,6 +78,7 @@ func (b *dbMetaMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr { type tableMetaMgr interface { InitTableMeta(ctx context.Context) error AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) + ReAllocTableRowIDs(ctx context.Context, newRowIDCount int64) (int64, int64, error) UpdateTableStatus(ctx context.Context, status metaStatus) error UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) ( @@ -160,6 +161,59 @@ func parseMetaStatus(s string) (metaStatus, error) { } } +func (m *dbTableMetaMgr) ReAllocTableRowIDs(ctx context.Context, newRowIDCount int64) (int64, int64, error) { + conn, err := m.session.Conn(ctx) + if err != nil { + return 0, 0, errors.Trace(err) + } + defer conn.Close() + exec := &common.SQLWithRetry{ + DB: m.session, + Logger: m.tr.logger, + } + err = exec.Exec(ctx, "enable pessimistic transaction", "SET SESSION tidb_txn_mode = 'pessimistic';") + if err != nil { + return 0, 0, errors.Annotate(err, "enable pessimistic transaction failed") + } + var ( + newRowIDBase int64 + newRowIDMax int64 + ) + err = exec.Transact(ctx, "realloc table rowID", func(ctx context.Context, tx *sql.Tx) error { + rows, err := tx.QueryContext( + ctx, + fmt.Sprintf("SELECT row_id_max from %s WHERE table_id = ? FOR UPDATE", m.tableName), + m.tr.tableInfo.ID, + ) + if err != nil { + return errors.Trace(err) + } + defer rows.Close() + var query string + for rows.Next() { + var curRowIDMax int64 + if err := rows.Scan(&curRowIDMax); err != nil { + return errors.Trace(err) + } + tempRowIDMax := curRowIDMax + newRowIDCount + if tempRowIDMax > newRowIDMax { + // find current maxRowIDMax across all parallel lightning + newRowIDMax = tempRowIDMax + newRowIDBase = curRowIDMax + query = fmt.Sprintf("UPDATE %s SET row_id_max = %d WHERE table_id = %d", m.tableName, newRowIDMax, m.tr.tableInfo.ID) + } + } + if _, err := tx.ExecContext(ctx, query); err != nil { + return err + } + return nil + }) + if err != nil { + return 0, 0, errors.Trace(err) + } + return newRowIDBase, newRowIDMax, nil +} + func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) { conn, err := m.session.Conn(ctx) if err != nil { @@ -1031,6 +1085,10 @@ func (m noopTableMetaMgr) InitTableMeta(ctx context.Context) error { return nil } +func (m noopTableMetaMgr) ReAllocTableRowIDs(ctx context.Context, _ int64) (int64, int64, error) { + return 0, 0, nil +} + func (m noopTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) { return nil, 0, nil } diff --git a/br/pkg/lightning/restore/meta_manager_test.go b/br/pkg/lightning/restore/meta_manager_test.go index 8480bf077d6de..d0209259e9805 100644 --- a/br/pkg/lightning/restore/meta_manager_test.go +++ b/br/pkg/lightning/restore/meta_manager_test.go @@ -384,3 +384,34 @@ func TestSingleTaskMetaMgr(t *testing.T) { }) require.NoError(t, err) } + +func TestReAllocTableRowIDs(t *testing.T) { + s, clean := newMetaMgrSuite(t) + defer clean() + + ctx := context.WithValue(context.Background(), &checksumManagerKey, s.checksumMgr) + + rows := [][]driver.Value{ + {int64(1), int64(998), int64(1008), uint64(0), uint64(0), uint64(0), metaStatusRowIDAllocated.String()}, + } + checksum := verification.MakeKVChecksum(2, 1, 3) + s.prepareMock(rows, nil, nil, &checksum, nil) + + ck, rowIDBase, err := s.mgr.AllocTableRowIDs(ctx, 10) + require.NoError(t, err) + require.Equal(t, int64(998), rowIDBase) + require.Equal(t, &checksum, ck) + require.Equal(t, 1, s.checksumMgr.callCnt) + s.mockDB.ExpectExec("SET SESSION tidb_txn_mode = 'pessimistic';"). + WillReturnResult(sqlmock.NewResult(int64(0), int64(0))) + + s.mockDB.ExpectBegin() + s.mockDB.ExpectQuery("\\QSELECT row_id_max from `test`.`table_meta` WHERE table_id = ? FOR UPDATE\\E").WithArgs(int64(1)). + WillReturnRows(sqlmock.NewRows([]string{"row_id_max"}).AddRow(1008)) + s.mockDB.ExpectExec("UPDATE `test`.`table_meta` SET row_id_max = 1018 WHERE table_id = 1").WillReturnResult(sqlmock.NewResult(int64(0), int64(1))) + s.mockDB.ExpectCommit() + preMax, newMax, err := s.mgr.ReAllocTableRowIDs(context.Background(), 10) + require.Nil(t, err) + require.Equal(t, int64(1008), preMax) + require.Equal(t, int64(1018), newMax) +} diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 2da5e1b90eaa9..1b8fa146d3215 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2155,7 +2155,7 @@ type deliverResult struct { err error } -func (cr *chunkRestore) adjustRowID(rowID int64, rc *Controller) int64 { +func (cr *chunkRestore) adjustRowID(rowID int64, rc *Controller) (int64, error) { if rowID > cr.originalRowIDMax { if cr.curRowIDBase == 0 || cr.curRowIDBase > cr.curRowIDMax { // reallocate rowID @@ -2171,7 +2171,7 @@ func (cr *chunkRestore) adjustRowID(rowID int64, rc *Controller) int64 { zap.String("task", "re-allocate rowID"), ) logger.Error("fail to re-allocate rowIDs", zap.Error(err)) - return rowID + return 0, err } cr.curRowIDBase = newBase cr.curRowIDMax = newMax @@ -2179,7 +2179,7 @@ func (cr *chunkRestore) adjustRowID(rowID int64, rc *Controller) int64 { rowID = cr.curRowIDBase cr.curRowIDBase++ } - return rowID + return rowID, nil } func (cr *chunkRestore) updateRowStats(rowSize int) { @@ -2467,7 +2467,9 @@ func (cr *chunkRestore) encodeLoop( encodeDurStart := time.Now() lastRow := cr.parser.LastRow() // sql -> kv - lastRow.RowID = cr.adjustRowID(lastRow.RowID, rc) + if lastRow.RowID, err = cr.adjustRowID(lastRow.RowID, rc); err != nil { + return + } cr.updateRowStats(lastRow.Length) rowID = lastRow.RowID kvs, encodeErr := kvEncoder.Encode(logger, lastRow.Row, lastRow.RowID, cr.chunk.ColumnPermutation, cr.chunk.Key.Path, curOffset) diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 001c0c828b14e..862c617c969f7 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -1024,17 +1024,22 @@ func (tr *TableRestore) allocateRowIDs(newRowCount int64, rc *Controller) (int64 tr.rowIDLock.Lock() defer tr.rowIDLock.Unlock() metaMgr := rc.metaMgrBuilder.TableMetaMgr(tr) - _, newBase, err := metaMgr.AllocTableRowIDs(context.Background(), newRowCount) + // try to re-allocate from downstream + // if we are using parallel import, rowID should be reconciled globally. + // Otherwise, this function will simply return 0. + preRowIDMax, newRowIDMax, err := metaMgr.ReAllocTableRowIDs(context.Background(), newRowCount) if err != nil { return 0, 0, err } - if newBase != 0 { + var rowIDBase int64 + if newRowIDMax != 0 { // re-alloc from downstream - tr.curMaxRowID = newBase + newRowCount - return newBase, newBase + newRowCount, nil + rowIDBase = preRowIDMax + 1 + tr.curMaxRowID = newRowIDMax } else { - prevBase := tr.curMaxRowID + 1 + // single import mode: re-allocate rowID from memory + rowIDBase = tr.curMaxRowID + 1 tr.curMaxRowID += newRowCount - return prevBase, tr.curMaxRowID, nil } + return rowIDBase, tr.curMaxRowID, nil } From 6a5fba1c4d973befedd4318d423a5c7adb2333f0 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Tue, 24 May 2022 14:56:06 +0800 Subject: [PATCH 09/21] fix lint --- br/pkg/lightning/restore/table_restore.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 862c617c969f7..b1e9a3a4b9a84 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -45,8 +45,6 @@ import ( "go.uber.org/zap" ) -const defaultNewAllocIDCnt = 100000 - type TableRestore struct { // The unique table name in the form "`db`.`tbl`". tableName string From d62c98ef3e2f13ac5e332529bbfd142db3a42173 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Tue, 24 May 2022 15:18:47 +0800 Subject: [PATCH 10/21] fix it --- br/tests/lightning_realloc_id/run.sh | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/br/tests/lightning_realloc_id/run.sh b/br/tests/lightning_realloc_id/run.sh index 884f93a3b613a..67710b698832c 100644 --- a/br/tests/lightning_realloc_id/run.sh +++ b/br/tests/lightning_realloc_id/run.sh @@ -18,8 +18,8 @@ set -eu check_cluster_version 4 0 0 'local backend' -LOG_FILE1="$TEST_DIR/lightning-distributed-import1.log" -LOG_FILE2="$TEST_DIR/lightning-distributed-import2.log" +LOG_FILE1="$TEST_DIR/lightning-realloc-import1.log" +LOG_FILE2="$TEST_DIR/lightning-realloc-import2.log" function check_result() { run_sql 'SHOW DATABASES;' @@ -34,13 +34,13 @@ function check_result() { } function parallel_import() { - run_lightning --data-source-dir "tests/$TEST_NAME/data" \ - --sorted-kv-dir "$TEST_DIR/lightning_distributed_import.sorted1" \ + run_lightning -d "tests/$TEST_NAME/data" \ + --sorted-kv-dir "$TEST_DIR/lightning_realloc_import.sorted1" \ --log-file "$LOG_FILE1" \ --config "tests/$TEST_NAME/config.toml" & pid1="$!" - run_lightning --data-source-dir "tests/$TEST_NAME/data1" \ - --sorted-kv-dir "$TEST_DIR/lightning_distributed_import.sorted2" \ + run_lightning -d "tests/$TEST_NAME/data1" \ + --sorted-kv-dir "$TEST_DIR/lightning_realloc_import.sorted2" \ --log-file "$LOG_FILE2" \ --config "tests/$TEST_NAME/config.toml" & pid2="$!" From a29c3bd5b65977444c70d90bcd3cab55b22696fe Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Fri, 27 May 2022 13:31:05 +0800 Subject: [PATCH 11/21] fix comment --- br/pkg/lightning/backend/local/engine.go | 46 ++++++++++--------- br/pkg/lightning/backend/local/engine_test.go | 36 +++++++++++++++ br/pkg/lightning/mydump/region.go | 3 ++ br/pkg/lightning/restore/meta_manager.go | 22 ++++----- br/pkg/lightning/restore/meta_manager_test.go | 9 ++-- br/pkg/lightning/restore/table_restore.go | 6 ++- br/tests/lightning_realloc_id/config1.toml | 2 + br/tests/lightning_realloc_id/run.sh | 2 +- 8 files changed, 85 insertions(+), 41 deletions(-) create mode 100644 br/tests/lightning_realloc_id/config1.toml diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index eefde3c14adb0..196e0cd9e95c5 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -1006,14 +1006,10 @@ type Writer struct { func (w *Writer) flushAndNewWriter() error { var err error - failpoint.Inject("MockFlushWriter", func() { - failpoint.Goto("CreateNewWriter") - }) err = w.flush(context.Background()) if err != nil { return errors.Trace(err) } - failpoint.Label("CreateNewWriter") newWriter, err := w.createSSTWriter() if err != nil { return errors.Trace(err) @@ -1062,30 +1058,35 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { } kvs = newKvs } - sliceIdx := -1 + tempKvs := make([]common.KvPair, 0) + if len(kvs) > 0 { + tempKvs = append(tempKvs, kvs[0]) + w.prevRowID = kvs[0].RowID + } for i := 1; i < len(kvs); i++ { - if kvs[i].RowID > kvs[i-1].RowID+1 && sliceIdx == -1 { - // rowID leap, probably re-alloc id - // should write to different sst - sliceIdx = i + if kvs[i].RowID > kvs[i-1].RowID+1 { + // leap id + err := w.writer.writeKVs(tempKvs) + if err != nil { + return err + } + err = w.flushAndNewWriter() + if err != nil { + return err + } + tempKvs = make([]common.KvPair, 0) + tempKvs = append(tempKvs, kvs[i]) + } else { + tempKvs = append(tempKvs, kvs[i]) } if i == len(kvs)-1 { w.prevRowID = kvs[i].RowID } } - if sliceIdx > 0 { - oldKvs := kvs[:sliceIdx] - kvs = kvs[sliceIdx:] - err := w.writer.writeKVs(oldKvs) - if err != nil { - return err - } - err = w.flushAndNewWriter() - if err != nil { - return err - } + if len(tempKvs) > 0 { + return w.writer.writeKVs(tempKvs) } - return w.writer.writeKVs(kvs) + return nil } func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) error { @@ -1152,6 +1153,9 @@ func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames [ } func (w *Writer) flush(ctx context.Context) error { + failpoint.Inject("MockFlushWriter", func() { + failpoint.Return(nil) + }) w.Lock() defer w.Unlock() if w.batchCount == 0 { diff --git a/br/pkg/lightning/backend/local/engine_test.go b/br/pkg/lightning/backend/local/engine_test.go index 3d13d4f0e8c4d..cb2dbd6fc3a97 100644 --- a/br/pkg/lightning/backend/local/engine_test.go +++ b/br/pkg/lightning/backend/local/engine_test.go @@ -156,4 +156,40 @@ func TestAutoSplitSST(t *testing.T) { err = w.appendRowsSorted(kvs) require.Nil(t, err) require.False(t, prevWriter == w.writer) // id leap, should flush and create + prevWriter = w.writer + kvs = []common.KvPair{ + { + Key: []byte("16"), + Val: []byte("val16"), + RowID: 16, + }, + { + Key: []byte("17"), + Val: []byte("val17"), + RowID: 17, + }, + { + Key: []byte("19"), + Val: []byte("val19"), + RowID: 19, + }, + { + Key: []byte("20"), + Val: []byte("val20"), + RowID: 20, + }, + { + Key: []byte("22"), + Val: []byte("val22"), + RowID: 22, + }, + { + Key: []byte("23"), + Val: []byte("val23"), + RowID: 22, + }, + } + err = w.appendRowsSorted(kvs) + require.Nil(t, err) + require.False(t, prevWriter == w.writer) // id leap, should flush and create } diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index d4bb8e91ac628..3e67312f288bf 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -338,6 +338,9 @@ func SampleAndGetAvgRowSize( } case SourceTypeSQL: parser = NewChunkParser(cfg.TiDB.SQLMode, reader, int64(cfg.Mydumper.ReadBlockSize), ioWorkers) + default: + err = errors.New("unknown source type") + return 0, errors.Annotatef(err, "source file %s is none of csv, sql, or parquet file", fileInfo.FileMeta.Path) } totalBytes := 0 totalRows := 0 diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index cdf171e798760..b26ffa2235efc 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -78,7 +78,7 @@ func (b *dbMetaMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr { type tableMetaMgr interface { InitTableMeta(ctx context.Context) error AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) - ReAllocTableRowIDs(ctx context.Context, newRowIDCount int64) (int64, int64, error) + ReallocTableRowIDs(ctx context.Context, newRowIDCount int64) (int64, int64, error) UpdateTableStatus(ctx context.Context, status metaStatus) error UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) ( @@ -161,7 +161,7 @@ func parseMetaStatus(s string) (metaStatus, error) { } } -func (m *dbTableMetaMgr) ReAllocTableRowIDs(ctx context.Context, newRowIDCount int64) (int64, int64, error) { +func (m *dbTableMetaMgr) ReallocTableRowIDs(ctx context.Context, newRowIDCount int64) (int64, int64, error) { conn, err := m.session.Conn(ctx) if err != nil { return 0, 0, errors.Trace(err) @@ -182,7 +182,7 @@ func (m *dbTableMetaMgr) ReAllocTableRowIDs(ctx context.Context, newRowIDCount i err = exec.Transact(ctx, "realloc table rowID", func(ctx context.Context, tx *sql.Tx) error { rows, err := tx.QueryContext( ctx, - fmt.Sprintf("SELECT row_id_max from %s WHERE table_id = ? FOR UPDATE", m.tableName), + fmt.Sprintf("SELECT MAX(row_id_max) from %s WHERE table_id = ? FOR UPDATE", m.tableName), m.tr.tableInfo.ID, ) if err != nil { @@ -191,19 +191,13 @@ func (m *dbTableMetaMgr) ReAllocTableRowIDs(ctx context.Context, newRowIDCount i defer rows.Close() var query string for rows.Next() { - var curRowIDMax int64 - if err := rows.Scan(&curRowIDMax); err != nil { + if err := rows.Scan(&newRowIDBase); err != nil { return errors.Trace(err) } - tempRowIDMax := curRowIDMax + newRowIDCount - if tempRowIDMax > newRowIDMax { - // find current maxRowIDMax across all parallel lightning - newRowIDMax = tempRowIDMax - newRowIDBase = curRowIDMax - query = fmt.Sprintf("UPDATE %s SET row_id_max = %d WHERE table_id = %d", m.tableName, newRowIDMax, m.tr.tableInfo.ID) - } + newRowIDMax = newRowIDBase + newRowIDCount + query = fmt.Sprintf("UPDATE %s SET row_id_max = %d WHERE table_id = ? AND task_id = ?", m.tableName, newRowIDMax) } - if _, err := tx.ExecContext(ctx, query); err != nil { + if _, err := tx.ExecContext(ctx, query, m.tr.tableInfo.ID, m.taskID); err != nil { return err } return nil @@ -1085,7 +1079,7 @@ func (m noopTableMetaMgr) InitTableMeta(ctx context.Context) error { return nil } -func (m noopTableMetaMgr) ReAllocTableRowIDs(ctx context.Context, _ int64) (int64, int64, error) { +func (m noopTableMetaMgr) ReallocTableRowIDs(ctx context.Context, _ int64) (int64, int64, error) { return 0, 0, nil } diff --git a/br/pkg/lightning/restore/meta_manager_test.go b/br/pkg/lightning/restore/meta_manager_test.go index d0209259e9805..c91590ef527e0 100644 --- a/br/pkg/lightning/restore/meta_manager_test.go +++ b/br/pkg/lightning/restore/meta_manager_test.go @@ -385,7 +385,7 @@ func TestSingleTaskMetaMgr(t *testing.T) { require.NoError(t, err) } -func TestReAllocTableRowIDs(t *testing.T) { +func TestReallocTableRowIDs(t *testing.T) { s, clean := newMetaMgrSuite(t) defer clean() @@ -406,11 +406,12 @@ func TestReAllocTableRowIDs(t *testing.T) { WillReturnResult(sqlmock.NewResult(int64(0), int64(0))) s.mockDB.ExpectBegin() - s.mockDB.ExpectQuery("\\QSELECT row_id_max from `test`.`table_meta` WHERE table_id = ? FOR UPDATE\\E").WithArgs(int64(1)). + s.mockDB.ExpectQuery("\\QSELECT MAX(row_id_max) from `test`.`table_meta` WHERE table_id = ? FOR UPDATE\\E").WithArgs(int64(1)). WillReturnRows(sqlmock.NewRows([]string{"row_id_max"}).AddRow(1008)) - s.mockDB.ExpectExec("UPDATE `test`.`table_meta` SET row_id_max = 1018 WHERE table_id = 1").WillReturnResult(sqlmock.NewResult(int64(0), int64(1))) + s.mockDB.ExpectExec("\\QUPDATE `test`.`table_meta` SET row_id_max = 1018 WHERE table_id = ? AND task_id = ?\\E").WithArgs(int64(1), int64(1)). + WillReturnResult(sqlmock.NewResult(int64(0), int64(1))) s.mockDB.ExpectCommit() - preMax, newMax, err := s.mgr.ReAllocTableRowIDs(context.Background(), 10) + preMax, newMax, err := s.mgr.ReallocTableRowIDs(context.Background(), 10) require.Nil(t, err) require.Equal(t, int64(1008), preMax) require.Equal(t, int64(1018), newMax) diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index b1e9a3a4b9a84..e4480a3b210da 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -1025,10 +1025,14 @@ func (tr *TableRestore) allocateRowIDs(newRowCount int64, rc *Controller) (int64 // try to re-allocate from downstream // if we are using parallel import, rowID should be reconciled globally. // Otherwise, this function will simply return 0. - preRowIDMax, newRowIDMax, err := metaMgr.ReAllocTableRowIDs(context.Background(), newRowCount) + preRowIDMax, newRowIDMax, err := metaMgr.ReallocTableRowIDs(context.Background(), newRowCount) if err != nil { return 0, 0, err } + // TODO: refinement: currently, when we're not using SSTMode + incremental, + // metadata of the table restore is not maintained globally. + // So we have to deviate this two disparate situations here and make + // code complexer. var rowIDBase int64 if newRowIDMax != 0 { // re-alloc from downstream diff --git a/br/tests/lightning_realloc_id/config1.toml b/br/tests/lightning_realloc_id/config1.toml new file mode 100644 index 0000000000000..d2152b47c922a --- /dev/null +++ b/br/tests/lightning_realloc_id/config1.toml @@ -0,0 +1,2 @@ +[tikv-importer] +backend = 'local' diff --git a/br/tests/lightning_realloc_id/run.sh b/br/tests/lightning_realloc_id/run.sh index 67710b698832c..02af369e3e0fb 100644 --- a/br/tests/lightning_realloc_id/run.sh +++ b/br/tests/lightning_realloc_id/run.sh @@ -58,7 +58,7 @@ function check_parallel_result() { run_sql 'DROP DATABASE IF EXISTS db;' export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/mydump/MockInaccurateRowID=return(true)' -run_lightning --config "tests/$TEST_NAME/config.toml" +run_lightning --config "tests/$TEST_NAME/config1.toml" check_result run_sql 'DROP DATABASE IF EXISTS db;' parallel_import From 4b5b4155917d93ec5564b870412ac07075fb438d Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Fri, 27 May 2022 15:21:58 +0800 Subject: [PATCH 12/21] add comment for reallocRowID --- br/pkg/lightning/restore/meta_manager.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index b26ffa2235efc..fb4ea13d1ac97 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -78,6 +78,8 @@ func (b *dbMetaMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr { type tableMetaMgr interface { InitTableMeta(ctx context.Context) error AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) + // re-allocate rowIDs across lightning instances + // only parallel import needs this ReallocTableRowIDs(ctx context.Context, newRowIDCount int64) (int64, int64, error) UpdateTableStatus(ctx context.Context, status metaStatus) error UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error @@ -180,23 +182,19 @@ func (m *dbTableMetaMgr) ReallocTableRowIDs(ctx context.Context, newRowIDCount i newRowIDMax int64 ) err = exec.Transact(ctx, "realloc table rowID", func(ctx context.Context, tx *sql.Tx) error { - rows, err := tx.QueryContext( + row := tx.QueryRowContext( ctx, fmt.Sprintf("SELECT MAX(row_id_max) from %s WHERE table_id = ? FOR UPDATE", m.tableName), m.tr.tableInfo.ID, ) - if err != nil { + if row.Err() != nil { return errors.Trace(err) } - defer rows.Close() - var query string - for rows.Next() { - if err := rows.Scan(&newRowIDBase); err != nil { - return errors.Trace(err) - } - newRowIDMax = newRowIDBase + newRowIDCount - query = fmt.Sprintf("UPDATE %s SET row_id_max = %d WHERE table_id = ? AND task_id = ?", m.tableName, newRowIDMax) + if err := row.Scan(&newRowIDBase); err != nil { + return errors.Trace(err) } + newRowIDMax = newRowIDBase + newRowIDCount + query := fmt.Sprintf("UPDATE %s SET row_id_max = %d WHERE table_id = ? AND task_id = ?", m.tableName, newRowIDMax) if _, err := tx.ExecContext(ctx, query, m.tr.tableInfo.ID, m.taskID); err != nil { return err } @@ -1080,6 +1078,8 @@ func (m noopTableMetaMgr) InitTableMeta(ctx context.Context) error { } func (m noopTableMetaMgr) ReallocTableRowIDs(ctx context.Context, _ int64) (int64, int64, error) { + // we don't need to reconcile rowIDs across all the instances + // barring using parallel import return 0, 0, nil } From b36b531bf859586633060a5d7911d9a043658524 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Mon, 30 May 2022 11:12:36 +0800 Subject: [PATCH 13/21] fix lint --- br/pkg/lightning/restore/meta_manager.go | 5 +++-- br/pkg/lightning/restore/meta_manager_test.go | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index fb4ea13d1ac97..808464ee3f126 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -194,8 +194,9 @@ func (m *dbTableMetaMgr) ReallocTableRowIDs(ctx context.Context, newRowIDCount i return errors.Trace(err) } newRowIDMax = newRowIDBase + newRowIDCount - query := fmt.Sprintf("UPDATE %s SET row_id_max = %d WHERE table_id = ? AND task_id = ?", m.tableName, newRowIDMax) - if _, err := tx.ExecContext(ctx, query, m.tr.tableInfo.ID, m.taskID); err != nil { + // nolint:gosec + query := fmt.Sprintf("UPDATE %s SET row_id_max = ? WHERE table_id = ? AND task_id = ?", m.tableName) + if _, err := tx.ExecContext(ctx, query, newRowIDMax, m.tr.tableInfo.ID, m.taskID); err != nil { return err } return nil diff --git a/br/pkg/lightning/restore/meta_manager_test.go b/br/pkg/lightning/restore/meta_manager_test.go index c91590ef527e0..3286198e835e6 100644 --- a/br/pkg/lightning/restore/meta_manager_test.go +++ b/br/pkg/lightning/restore/meta_manager_test.go @@ -408,7 +408,7 @@ func TestReallocTableRowIDs(t *testing.T) { s.mockDB.ExpectBegin() s.mockDB.ExpectQuery("\\QSELECT MAX(row_id_max) from `test`.`table_meta` WHERE table_id = ? FOR UPDATE\\E").WithArgs(int64(1)). WillReturnRows(sqlmock.NewRows([]string{"row_id_max"}).AddRow(1008)) - s.mockDB.ExpectExec("\\QUPDATE `test`.`table_meta` SET row_id_max = 1018 WHERE table_id = ? AND task_id = ?\\E").WithArgs(int64(1), int64(1)). + s.mockDB.ExpectExec("\\QUPDATE `test`.`table_meta` SET row_id_max = ? WHERE table_id = ? AND task_id = ?\\E").WithArgs(int64(1018), int64(1), int64(1)). WillReturnResult(sqlmock.NewResult(int64(0), int64(1))) s.mockDB.ExpectCommit() preMax, newMax, err := s.mgr.ReallocTableRowIDs(context.Background(), 10) From 4df0f47e82891da28e7cd86291c4354e5eeeb5d2 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Mon, 30 May 2022 17:53:16 +0800 Subject: [PATCH 14/21] refine comment --- br/pkg/lightning/restore/meta_manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index 808464ee3f126..d443271bdbb64 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -78,8 +78,8 @@ func (b *dbMetaMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr { type tableMetaMgr interface { InitTableMeta(ctx context.Context) error AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) - // re-allocate rowIDs across lightning instances - // only parallel import needs this + // ReallocTableRowIDs: re-allocate rowIDs across lightning instances. only parallel import needs this + // returns: prevMaxRowID, newlyAllocatedMaxRowID, error ReallocTableRowIDs(ctx context.Context, newRowIDCount int64) (int64, int64, error) UpdateTableStatus(ctx context.Context, status metaStatus) error UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error From c338f4702a9cae60c42fb3a24948b8b68c6af5b7 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Mon, 6 Jun 2022 16:30:02 +0800 Subject: [PATCH 15/21] fix: comment --- br/pkg/lightning/backend/local/engine.go | 11 +++--- br/pkg/lightning/mydump/region.go | 8 ++-- br/pkg/lightning/mydump/region_test.go | 2 +- br/pkg/lightning/restore/restore.go | 47 +++++++++++++----------- 4 files changed, 37 insertions(+), 31 deletions(-) diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 196e0cd9e95c5..4accfa993e419 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -1026,7 +1026,10 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { } w.writer = writer } - if len(kvs) > 0 && w.prevRowID != 0 && kvs[0].RowID > w.prevRowID+1 { + if len(kvs) == 0 { + return nil + } + if w.prevRowID != 0 && kvs[0].RowID > w.prevRowID+1 { // rowID leap. probably re-alloc id // should write to different sst err := w.flushAndNewWriter() @@ -1059,10 +1062,8 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { kvs = newKvs } tempKvs := make([]common.KvPair, 0) - if len(kvs) > 0 { - tempKvs = append(tempKvs, kvs[0]) - w.prevRowID = kvs[0].RowID - } + tempKvs = append(tempKvs, kvs[0]) + w.prevRowID = kvs[0].RowID for i := 1; i < len(kvs); i++ { if kvs[i].RowID > kvs[i-1].RowID+1 { // leap id diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index 3e67312f288bf..c0883a5e4d2c2 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/worker" "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tidb/util/mathutil" "go.uber.org/zap" ) @@ -273,8 +272,9 @@ func makeSourceFileRegion( if !isCsvFile { divisor += 2 } - sizePerRow, err := SampleAndGetAvgRowSize(&fi, cfg, ioWorkers, store) + sizePerRow, err := GetSampledAvgRowSize(&fi, cfg, ioWorkers, store) if err == nil && sizePerRow != 0 { + log.L().Warn("fail to sample file", zap.String("path", fi.FileMeta.Path), zap.Error(err)) divisor = sizePerRow } // If a csv file is overlarge, we need to split it into multiple regions. @@ -312,7 +312,7 @@ func makeSourceFileRegion( return []*TableRegion{tableRegion}, []float64{float64(fi.FileMeta.FileSize)}, nil } -func SampleAndGetAvgRowSize( +func GetSampledAvgRowSize( fileInfo *FileInfo, cfg *config.Config, ioWorkers *worker.Pool, @@ -322,7 +322,7 @@ func SampleAndGetAvgRowSize( defer cancel() reader, err := store.Open(ctx, fileInfo.FileMeta.Path) if err != nil { - return 0, nil + return 0, err } var parser Parser switch fileInfo.FileMeta.Type { diff --git a/br/pkg/lightning/mydump/region_test.go b/br/pkg/lightning/mydump/region_test.go index e2c6d69271dc1..37ba4e4028e39 100644 --- a/br/pkg/lightning/mydump/region_test.go +++ b/br/pkg/lightning/mydump/region_test.go @@ -463,7 +463,7 @@ func TestSampleAndGetAvgRowSize(t *testing.T) { BackslashEscape: true, TrimLastSep: false, } - size, err := SampleAndGetAvgRowSize(&fileInfo, cfg, ioWorkers, loader.GetStore()) + size, err := GetSampledAvgRowSize(&fileInfo, cfg, ioWorkers, loader.GetStore()) require.Nil(t, err) require.GreaterOrEqual(t, fileSize/size, int64(2)) } diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index b7af7b7605852..70be9efdbeb93 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2151,29 +2151,34 @@ type deliverResult struct { } func (cr *chunkRestore) adjustRowID(rowID int64, rc *Controller) (int64, error) { - if rowID > cr.originalRowIDMax { - if cr.curRowIDBase == 0 || cr.curRowIDBase > cr.curRowIDMax { - // reallocate rowID - pos, _ := cr.parser.Pos() - leftFileSize := cr.chunk.Chunk.EndOffset - pos - newRowIDCount := leftFileSize/int64(cr.avgRowSize) + 1 // plus the current row - newBase, newMax, err := cr.tableRestore.allocateRowIDs(newRowIDCount, rc) - if err != nil { - logger := cr.tableRestore.logger.With( - zap.String("tableName", cr.tableRestore.tableName), - zap.Int("fileIndex", cr.index), - zap.Stringer("path", &cr.chunk.Key), - zap.String("task", "re-allocate rowID"), - ) - logger.Error("fail to re-allocate rowIDs", zap.Error(err)) - return 0, err - } - cr.curRowIDBase = newBase - cr.curRowIDMax = newMax + if rowID <= cr.originalRowIDMax { + // no need to ajust + return rowID, nil + } + // need to adjust rowID + // rowID should be within [curRowIDBase, curRowIDMax] + if cr.curRowIDBase == 0 || cr.curRowIDBase > cr.curRowIDMax { + // 1. curRowIDBase == 0 -> no previous re-allocation + // 2. curRowIDBase > curRowIDMax -> run out of allocated IDs + pos, _ := cr.parser.Pos() + leftFileSize := cr.chunk.Chunk.EndOffset - pos + newRowIDCount := leftFileSize/int64(cr.avgRowSize) + 1 // plus the current row + newBase, newMax, err := cr.tableRestore.allocateRowIDs(newRowIDCount, rc) + if err != nil { + logger := cr.tableRestore.logger.With( + zap.String("tableName", cr.tableRestore.tableName), + zap.Int("fileIndex", cr.index), + zap.Stringer("path", &cr.chunk.Key), + zap.String("task", "re-allocate rowID"), + ) + logger.Error("fail to re-allocate rowIDs", zap.Error(err)) + return 0, err } - rowID = cr.curRowIDBase - cr.curRowIDBase++ + cr.curRowIDBase = newBase + cr.curRowIDMax = newMax } + rowID = cr.curRowIDBase + cr.curRowIDBase++ return rowID, nil } From 6f7a44f693091f09f965111eed5219e178e64b57 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Mon, 13 Jun 2022 12:30:43 +0800 Subject: [PATCH 16/21] fix: return newRowIDBase & fix comments --- br/pkg/lightning/backend/local/engine.go | 19 ++++++------------- br/pkg/lightning/mydump/region.go | 3 +-- br/pkg/lightning/restore/meta_manager.go | 17 ++++++++++------- br/pkg/lightning/restore/restore.go | 9 +++++---- br/pkg/lightning/restore/table_restore.go | 4 ++-- 5 files changed, 24 insertions(+), 28 deletions(-) diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 4accfa993e419..f82334e0d58d2 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -1061,13 +1061,12 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { } kvs = newKvs } - tempKvs := make([]common.KvPair, 0) - tempKvs = append(tempKvs, kvs[0]) - w.prevRowID = kvs[0].RowID + startIdx := 0 + w.prevRowID = kvs[len(kvs)-1].RowID for i := 1; i < len(kvs); i++ { if kvs[i].RowID > kvs[i-1].RowID+1 { // leap id - err := w.writer.writeKVs(tempKvs) + err := w.writer.writeKVs(kvs[startIdx:i]) if err != nil { return err } @@ -1075,17 +1074,11 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { if err != nil { return err } - tempKvs = make([]common.KvPair, 0) - tempKvs = append(tempKvs, kvs[i]) - } else { - tempKvs = append(tempKvs, kvs[i]) - } - if i == len(kvs)-1 { - w.prevRowID = kvs[i].RowID + startIdx = i } } - if len(tempKvs) > 0 { - return w.writer.writeKVs(tempKvs) + if startIdx < len(kvs) { + return w.writer.writeKVs(kvs[startIdx:]) } return nil } diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index c0883a5e4d2c2..00ba5a14ad5e9 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -339,8 +339,7 @@ func GetSampledAvgRowSize( case SourceTypeSQL: parser = NewChunkParser(cfg.TiDB.SQLMode, reader, int64(cfg.Mydumper.ReadBlockSize), ioWorkers) default: - err = errors.New("unknown source type") - return 0, errors.Annotatef(err, "source file %s is none of csv, sql, or parquet file", fileInfo.FileMeta.Path) + return 0, errors.Errorf("source file %s is none of csv, sql, or parquet file", fileInfo.FileMeta.Path) } totalBytes := 0 totalRows := 0 diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index d443271bdbb64..b039906448a18 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -78,8 +78,10 @@ func (b *dbMetaMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr { type tableMetaMgr interface { InitTableMeta(ctx context.Context) error AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) - // ReallocTableRowIDs: re-allocate rowIDs across lightning instances. only parallel import needs this - // returns: prevMaxRowID, newlyAllocatedMaxRowID, error + // ReallocTableRowIDs reallocates the row IDs of a table. + // It returns new rowIDBase and maxRowID or any error it encounters. + // Note that noopTableMetaMgr has a noop implementation of this function. + // If maxRowID is 0, caller should maintain rowIDBase and maxRowID itself. ReallocTableRowIDs(ctx context.Context, newRowIDCount int64) (int64, int64, error) UpdateTableStatus(ctx context.Context, status metaStatus) error UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error @@ -178,8 +180,8 @@ func (m *dbTableMetaMgr) ReallocTableRowIDs(ctx context.Context, newRowIDCount i return 0, 0, errors.Annotate(err, "enable pessimistic transaction failed") } var ( - newRowIDBase int64 - newRowIDMax int64 + maxRowIDMax int64 + newRowIDMax int64 ) err = exec.Transact(ctx, "realloc table rowID", func(ctx context.Context, tx *sql.Tx) error { row := tx.QueryRowContext( @@ -190,10 +192,10 @@ func (m *dbTableMetaMgr) ReallocTableRowIDs(ctx context.Context, newRowIDCount i if row.Err() != nil { return errors.Trace(err) } - if err := row.Scan(&newRowIDBase); err != nil { + if err := row.Scan(&maxRowIDMax); err != nil { return errors.Trace(err) } - newRowIDMax = newRowIDBase + newRowIDCount + newRowIDMax = maxRowIDMax + newRowIDCount // nolint:gosec query := fmt.Sprintf("UPDATE %s SET row_id_max = ? WHERE table_id = ? AND task_id = ?", m.tableName) if _, err := tx.ExecContext(ctx, query, newRowIDMax, m.tr.tableInfo.ID, m.taskID); err != nil { @@ -204,7 +206,8 @@ func (m *dbTableMetaMgr) ReallocTableRowIDs(ctx context.Context, newRowIDCount i if err != nil { return 0, 0, errors.Trace(err) } - return newRowIDBase, newRowIDMax, nil + // newRowIDBase = maxRowIDMax + 1 + return maxRowIDMax + 1, newRowIDMax, nil } func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) { diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 70be9efdbeb93..b8a5906116ffe 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2034,8 +2034,8 @@ type chunkRestore struct { curRowIDMax int64 tableRestore *TableRestore - rowCount int - avgRowSize int + rowCount int + curAccmRowSize uint64 // has a maximum of 18446744.07370955 TB } func newChunkRestore( @@ -2162,7 +2162,8 @@ func (cr *chunkRestore) adjustRowID(rowID int64, rc *Controller) (int64, error) // 2. curRowIDBase > curRowIDMax -> run out of allocated IDs pos, _ := cr.parser.Pos() leftFileSize := cr.chunk.Chunk.EndOffset - pos - newRowIDCount := leftFileSize/int64(cr.avgRowSize) + 1 // plus the current row + avgRowSize := cr.curAccmRowSize / uint64(cr.rowCount) + newRowIDCount := leftFileSize/int64(avgRowSize) + 1 // plus the current row newBase, newMax, err := cr.tableRestore.allocateRowIDs(newRowIDCount, rc) if err != nil { logger := cr.tableRestore.logger.With( @@ -2183,7 +2184,7 @@ func (cr *chunkRestore) adjustRowID(rowID int64, rc *Controller) (int64, error) } func (cr *chunkRestore) updateRowStats(rowSize int) { - cr.avgRowSize = (cr.avgRowSize*cr.rowCount + rowSize) / (cr.rowCount + 1) + cr.curAccmRowSize += uint64(rowSize) cr.rowCount++ } diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index fec56d8532dec..c55ea2de91c38 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -1034,7 +1034,7 @@ func (tr *TableRestore) allocateRowIDs(newRowCount int64, rc *Controller) (int64 // try to re-allocate from downstream // if we are using parallel import, rowID should be reconciled globally. // Otherwise, this function will simply return 0. - preRowIDMax, newRowIDMax, err := metaMgr.ReallocTableRowIDs(context.Background(), newRowCount) + newRowIDBase, newRowIDMax, err := metaMgr.ReallocTableRowIDs(context.Background(), newRowCount) if err != nil { return 0, 0, err } @@ -1045,7 +1045,7 @@ func (tr *TableRestore) allocateRowIDs(newRowCount int64, rc *Controller) (int64 var rowIDBase int64 if newRowIDMax != 0 { // re-alloc from downstream - rowIDBase = preRowIDMax + 1 + rowIDBase = newRowIDBase tr.curMaxRowID = newRowIDMax } else { // single import mode: re-allocate rowID from memory From 3c83880790300bce8df26d2baa03ca5c1ccc68ca Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Mon, 13 Jun 2022 17:00:19 +0800 Subject: [PATCH 17/21] rowID overflow check --- br/pkg/lightning/backend/kv/sql2kv.go | 55 +++++++++++++++++++ br/tests/lightning_realloc_id/config2.toml | 3 + .../data2/db.test.000000000.csv | 11 ++++ br/tests/lightning_realloc_id/run.sh | 23 ++++++++ 4 files changed, 92 insertions(+) create mode 100644 br/tests/lightning_realloc_id/config2.toml create mode 100644 br/tests/lightning_realloc_id/data2/db.test.000000000.csv diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index 4dc80e0c17ce2..1f780b48b9bad 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -439,6 +439,56 @@ func isPKCol(colInfo *model.ColumnInfo) bool { return mysql.HasPriKeyFlag(colInfo.GetFlag()) } +func isRowIDOverflow(meta *model.ColumnInfo, rowID int64) bool { + isUnsigned := mysql.HasUnsignedFlag(meta.GetFlag()) + switch meta.GetType() { + // MEDIUM INT + case mysql.TypeInt24: + if !isUnsigned { + return rowID > mysql.MaxInt24 + } + return rowID > mysql.MaxUint24 + // INT + case mysql.TypeLong: + if !isUnsigned { + return rowID > math.MaxInt32 + } + return rowID > math.MaxUint32 + // SMALLINT + case mysql.TypeShort: + if !isUnsigned { + return rowID > math.MaxInt16 + } + return rowID > math.MaxUint16 + // TINYINT + case mysql.TypeTiny: + if !isUnsigned { + return rowID > math.MaxInt8 + } + return rowID > math.MaxUint8 + // BIGINT + case mysql.TypeLonglong: + if !isUnsigned { + return rowID > math.MaxInt64 + } + // impossible for rowID exceeding MaxUint64 + return uint64(rowID) > math.MaxUint64 + // FLOAT + case mysql.TypeFloat: + if !isUnsigned { + return float32(rowID) > math.MaxFloat32 + } + return float64(rowID) > math.MaxFloat32*2 + // DOUBLE + case mysql.TypeDouble: + if !isUnsigned { + return float64(rowID) > math.MaxFloat64 + } + // impossible for rowID exceeding MaxFloat64 + } + return false +} + func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) { var ( value types.Datum @@ -466,6 +516,11 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa // handle special values switch { case isAutoIncCol(col.ToInfo()): + // rowID is going to auto-filled the omitted column, + // which should be checked before restore + if isRowIDOverflow(col.ToInfo(), rowID) { + return value, errors.Errorf("PK %d is out of range", rowID) + } // we still need a conversion, e.g. to catch overflow with a TINYINT column. value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false) case isTableAutoRandom(tblMeta) && isPKCol(col.ToInfo()): diff --git a/br/tests/lightning_realloc_id/config2.toml b/br/tests/lightning_realloc_id/config2.toml new file mode 100644 index 0000000000000..f32593b43b798 --- /dev/null +++ b/br/tests/lightning_realloc_id/config2.toml @@ -0,0 +1,3 @@ +[tikv-importer] +incremental-import = true +backend = 'local' diff --git a/br/tests/lightning_realloc_id/data2/db.test.000000000.csv b/br/tests/lightning_realloc_id/data2/db.test.000000000.csv new file mode 100644 index 0000000000000..12d1d9e0bc948 --- /dev/null +++ b/br/tests/lightning_realloc_id/data2/db.test.000000000.csv @@ -0,0 +1,11 @@ +a +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 \ No newline at end of file diff --git a/br/tests/lightning_realloc_id/run.sh b/br/tests/lightning_realloc_id/run.sh index 02af369e3e0fb..1727b84189eed 100644 --- a/br/tests/lightning_realloc_id/run.sh +++ b/br/tests/lightning_realloc_id/run.sh @@ -20,6 +20,15 @@ set -eu check_cluster_version 4 0 0 'local backend' LOG_FILE1="$TEST_DIR/lightning-realloc-import1.log" LOG_FILE2="$TEST_DIR/lightning-realloc-import2.log" +LOG_FILE3="$TEST_DIR/lightning-realloc-import3.log" + +function run_lightning_expecting_fail() { + set +e + run_lightning "$@" + ERRCODE=$? + set -e + [ "$ERRCODE" != 0 ] +} function check_result() { run_sql 'SHOW DATABASES;' @@ -47,6 +56,18 @@ function parallel_import() { wait "$pid1" "$pid2" } +function overflow_import() { + run_sql 'create database if not exists db' + run_sql 'create table db.test(id int auto_increment primary key, a int)' + run_sql 'alter table db.test auto_increment=2147483640' # too few available rowID + echo "lightning stdout:" > "$TEST_DIR/sql_res.$TEST_NAME.txt" + run_lightning_expecting_fail -d "tests/$TEST_NAME/data2" \ + --sorted-kv-dir "$TEST_DIR/lightning_realloc_import.sorted3" \ + --log-file "$LOG_FILE3" \ + --config "tests/$TEST_NAME/config2.toml" | tee -a "$TEST_DIR/sql_res.$TEST_NAME.txt" + check_contains 'out of range' +} + function check_parallel_result() { run_sql 'SHOW DATABASES;' check_contains 'Database: db'; @@ -64,4 +85,6 @@ run_sql 'DROP DATABASE IF EXISTS db;' parallel_import check_parallel_result run_sql 'DROP DATABASE IF EXISTS db;' +overflow_import +run_sql 'DROP DATABASE IF EXISTS db;' unset GO_FAILPOINTS \ No newline at end of file From c31335bf1b14071a81e7d02c28cbab1509c07157 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Tue, 14 Jun 2022 11:25:33 +0800 Subject: [PATCH 18/21] fix: compile error --- br/pkg/lightning/mydump/region.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index 5f604c30abbb4..c85bef7662ed7 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -332,12 +332,12 @@ func GetSampledAvgRowSize( if err != nil { return 0, err } - parser, err = NewCSVParser(&cfg.Mydumper.CSV, reader, int64(cfg.Mydumper.ReadBlockSize), ioWorkers, hasHeader, charsetConvertor) + parser, err = NewCSVParser(ctx, &cfg.Mydumper.CSV, reader, int64(cfg.Mydumper.ReadBlockSize), ioWorkers, hasHeader, charsetConvertor) if err != nil { return 0, err } case SourceTypeSQL: - parser = NewChunkParser(cfg.TiDB.SQLMode, reader, int64(cfg.Mydumper.ReadBlockSize), ioWorkers) + parser = NewChunkParser(ctx, cfg.TiDB.SQLMode, reader, int64(cfg.Mydumper.ReadBlockSize), ioWorkers) default: return 0, errors.Errorf("source file %s is none of csv, sql, or parquet file", fileInfo.FileMeta.Path) } From 6246695a4d50eba46c11d66bea94c048e4e85ad7 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Tue, 14 Jun 2022 12:03:52 +0800 Subject: [PATCH 19/21] fix ut --- br/pkg/lightning/restore/meta_manager_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/lightning/restore/meta_manager_test.go b/br/pkg/lightning/restore/meta_manager_test.go index 3286198e835e6..23102b56f07a6 100644 --- a/br/pkg/lightning/restore/meta_manager_test.go +++ b/br/pkg/lightning/restore/meta_manager_test.go @@ -411,8 +411,8 @@ func TestReallocTableRowIDs(t *testing.T) { s.mockDB.ExpectExec("\\QUPDATE `test`.`table_meta` SET row_id_max = ? WHERE table_id = ? AND task_id = ?\\E").WithArgs(int64(1018), int64(1), int64(1)). WillReturnResult(sqlmock.NewResult(int64(0), int64(1))) s.mockDB.ExpectCommit() - preMax, newMax, err := s.mgr.ReallocTableRowIDs(context.Background(), 10) + newBase, newMax, err := s.mgr.ReallocTableRowIDs(context.Background(), 10) require.Nil(t, err) - require.Equal(t, int64(1008), preMax) + require.Equal(t, int64(1009), newBase) require.Equal(t, int64(1018), newMax) } From 3bfd82c0743712b20297adaba6ffb761aa230393 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Tue, 14 Jun 2022 14:46:20 +0800 Subject: [PATCH 20/21] fix it --- br/tests/lightning_realloc_id/run.sh | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/br/tests/lightning_realloc_id/run.sh b/br/tests/lightning_realloc_id/run.sh index 1727b84189eed..eead3b2fc1f33 100644 --- a/br/tests/lightning_realloc_id/run.sh +++ b/br/tests/lightning_realloc_id/run.sh @@ -64,8 +64,11 @@ function overflow_import() { run_lightning_expecting_fail -d "tests/$TEST_NAME/data2" \ --sorted-kv-dir "$TEST_DIR/lightning_realloc_import.sorted3" \ --log-file "$LOG_FILE3" \ - --config "tests/$TEST_NAME/config2.toml" | tee -a "$TEST_DIR/sql_res.$TEST_NAME.txt" - check_contains 'out of range' + --config "tests/$TEST_NAME/config2.toml" 2>&1 | tee -a "$TEST_DIR/sql_res.$TEST_NAME.txt" + if ! grep -q "out of range" "$TEST_DIR/sql_res.$TEST_NAME.txt"; then + echo "TEST FAILED: OUTPUT DOES NOT CONTAIN 'out of range'" + exit 1 + fi } function check_parallel_result() { From 5a839c96771ff4456264e0f11978e9997807403f Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Thu, 16 Jun 2022 16:53:28 +0800 Subject: [PATCH 21/21] add log & remove reduntant test --- br/pkg/lightning/backend/kv/sql2kv.go | 7 ------- br/pkg/lightning/mydump/region.go | 1 + br/pkg/lightning/restore/restore.go | 13 +++++++------ 3 files changed, 8 insertions(+), 13 deletions(-) diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index 13d914e496b80..54b34979b980e 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -472,13 +472,6 @@ func isRowIDOverflow(meta *model.ColumnInfo, rowID int64) bool { return rowID > math.MaxInt8 } return rowID > math.MaxUint8 - // BIGINT - case mysql.TypeLonglong: - if !isUnsigned { - return rowID > math.MaxInt64 - } - // impossible for rowID exceeding MaxUint64 - return uint64(rowID) > math.MaxUint64 // FLOAT case mysql.TypeFloat: if !isUnsigned { diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index c85bef7662ed7..b347d27bb9ab8 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -277,6 +277,7 @@ func makeSourceFileRegion( log.L().Warn("fail to sample file", zap.String("path", fi.FileMeta.Path), zap.Error(err)) divisor = sizePerRow } + log.L().Debug("avg row size", zap.String("path", fi.FileMeta.Path), zap.Int64("size per row", sizePerRow)) // If a csv file is overlarge, we need to split it into multiple regions. // Note: We can only split a csv file whose format is strict. // We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 5b2f37a8e1173..9778c5d4be352 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2175,6 +2175,13 @@ func (cr *chunkRestore) adjustRowID(rowID int64, rc *Controller) (int64, error) // need to adjust rowID // rowID should be within [curRowIDBase, curRowIDMax] if cr.curRowIDBase == 0 || cr.curRowIDBase > cr.curRowIDMax { + logger := cr.tableRestore.logger.With( + zap.String("tableName", cr.tableRestore.tableName), + zap.Int("fileIndex", cr.index), + zap.Stringer("path", &cr.chunk.Key), + zap.String("task", "re-allocate rowID"), + ) + logger.Info("start re-allocating") // 1. curRowIDBase == 0 -> no previous re-allocation // 2. curRowIDBase > curRowIDMax -> run out of allocated IDs pos, _ := cr.parser.Pos() @@ -2183,12 +2190,6 @@ func (cr *chunkRestore) adjustRowID(rowID int64, rc *Controller) (int64, error) newRowIDCount := leftFileSize/int64(avgRowSize) + 1 // plus the current row newBase, newMax, err := cr.tableRestore.allocateRowIDs(newRowIDCount, rc) if err != nil { - logger := cr.tableRestore.logger.With( - zap.String("tableName", cr.tableRestore.tableName), - zap.Int("fileIndex", cr.index), - zap.Stringer("path", &cr.chunk.Key), - zap.String("task", "re-allocate rowID"), - ) logger.Error("fail to re-allocate rowIDs", zap.Error(err)) return 0, err }