From 173b8b59f4872d0cfc5e0faad930f7dd82d0aa77 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Wed, 29 Jun 2022 15:45:44 +0800 Subject: [PATCH] Revert "lightning: sample files and pre-allocate rowID before restoring chunk (#34288)" This reverts commit 674def3b7870f7863166f2774457a1121af76839. --- Makefile | 8 -- br/pkg/lightning/backend/kv/sql2kv.go | 48 -------- br/pkg/lightning/backend/local/engine.go | 51 +------- br/pkg/lightning/backend/local/engine_test.go | 111 ------------------ br/pkg/lightning/mydump/region.go | 63 ---------- br/pkg/lightning/mydump/region_test.go | 89 -------------- .../lightning/restore/chunk_restore_test.go | 2 +- br/pkg/lightning/restore/meta_manager.go | 56 --------- br/pkg/lightning/restore/meta_manager_test.go | 32 ----- br/pkg/lightning/restore/restore.go | 84 +++---------- br/pkg/lightning/restore/table_restore.go | 35 +----- .../lightning/restore/table_restore_test.go | 59 +++++----- br/tests/lightning_auto_random_default/run.sh | 8 +- br/tests/lightning_realloc_id/config.toml | 3 - br/tests/lightning_realloc_id/config1.toml | 2 - br/tests/lightning_realloc_id/config2.toml | 3 - .../data/db-schema-create.sql | 1 - .../data/db.test-schema.sql | 4 - .../data/db.test.000000000.csv | 11 -- .../data/db.test.000000001.sql | 11 -- .../data1/db-schema-create.sql | 1 - .../data1/db.test-schema.sql | 4 - .../data1/db.test.000000000.csv | 11 -- .../data1/db.test.000000001.sql | 11 -- .../data2/db.test.000000000.csv | 11 -- br/tests/lightning_realloc_id/run.sh | 93 --------------- 26 files changed, 50 insertions(+), 762 deletions(-) delete mode 100644 br/tests/lightning_realloc_id/config.toml delete mode 100644 br/tests/lightning_realloc_id/config1.toml delete mode 100644 br/tests/lightning_realloc_id/config2.toml delete mode 100644 br/tests/lightning_realloc_id/data/db-schema-create.sql delete mode 100644 br/tests/lightning_realloc_id/data/db.test-schema.sql delete mode 100644 br/tests/lightning_realloc_id/data/db.test.000000000.csv delete mode 100644 br/tests/lightning_realloc_id/data/db.test.000000001.sql delete mode 100644 br/tests/lightning_realloc_id/data1/db-schema-create.sql delete mode 100644 br/tests/lightning_realloc_id/data1/db.test-schema.sql delete mode 100644 br/tests/lightning_realloc_id/data1/db.test.000000000.csv delete mode 100644 br/tests/lightning_realloc_id/data1/db.test.000000001.sql delete mode 100644 br/tests/lightning_realloc_id/data2/db.test.000000000.csv delete mode 100644 br/tests/lightning_realloc_id/run.sh diff --git a/Makefile b/Makefile index 2314bcff43e63..cd5c7c5555027 100644 --- a/Makefile +++ b/Makefile @@ -327,14 +327,6 @@ build_for_br_integration_test: ) || (make failpoint-disable && exit 1) @make failpoint-disable -build_for_lightning_test: - @make failpoint-enable - $(GOTEST) -c -cover -covermode=count \ - -coverpkg=github.com/pingcap/tidb/br/... \ - -o $(LIGHTNING_BIN).test \ - github.com/pingcap/tidb/br/cmd/tidb-lightning - @make failpoint-disable - br_unit_test: export ARGS=$$($(BR_PACKAGES)) br_unit_test: @make failpoint-enable diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index bd13f27e38954..8a10ce607d9f7 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -450,49 +450,6 @@ func isPKCol(colInfo *model.ColumnInfo) bool { return mysql.HasPriKeyFlag(colInfo.GetFlag()) } -func isRowIDOverflow(meta *model.ColumnInfo, rowID int64) bool { - isUnsigned := mysql.HasUnsignedFlag(meta.GetFlag()) - switch meta.GetType() { - // MEDIUM INT - case mysql.TypeInt24: - if !isUnsigned { - return rowID > mysql.MaxInt24 - } - return rowID > mysql.MaxUint24 - // INT - case mysql.TypeLong: - if !isUnsigned { - return rowID > math.MaxInt32 - } - return rowID > math.MaxUint32 - // SMALLINT - case mysql.TypeShort: - if !isUnsigned { - return rowID > math.MaxInt16 - } - return rowID > math.MaxUint16 - // TINYINT - case mysql.TypeTiny: - if !isUnsigned { - return rowID > math.MaxInt8 - } - return rowID > math.MaxUint8 - // FLOAT - case mysql.TypeFloat: - if !isUnsigned { - return float32(rowID) > math.MaxFloat32 - } - return float64(rowID) > math.MaxFloat32*2 - // DOUBLE - case mysql.TypeDouble: - if !isUnsigned { - return float64(rowID) > math.MaxFloat64 - } - // impossible for rowID exceeding MaxFloat64 - } - return false -} - func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) { var ( value types.Datum @@ -520,11 +477,6 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa // handle special values switch { case isAutoIncCol(col.ToInfo()): - // rowID is going to auto-filled the omitted column, - // which should be checked before restore - if isRowIDOverflow(col.ToInfo(), rowID) { - return value, errors.Errorf("PK %d is out of range", rowID) - } // we still need a conversion, e.g. to catch overflow with a TINYINT column. value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false) case isTableAutoRandom(tblMeta) && isPKCol(col.ToInfo()): diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 04036e57b16ac..ad0b37cd01963 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -33,7 +33,6 @@ import ( "github.com/google/btree" "github.com/google/uuid" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" @@ -1003,21 +1002,6 @@ type Writer struct { batchSize int64 lastMetaSeq int32 - prevRowID int64 // only used for appendRowsSorted -} - -func (w *Writer) flushAndNewWriter() error { - var err error - err = w.flush(context.Background()) - if err != nil { - return errors.Trace(err) - } - newWriter, err := w.createSSTWriter() - if err != nil { - return errors.Trace(err) - } - w.writer = newWriter - return nil } func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { @@ -1028,17 +1012,6 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { } w.writer = writer } - if len(kvs) == 0 { - return nil - } - if w.prevRowID != 0 && kvs[0].RowID > w.prevRowID+1 { - // rowID leap. probably re-alloc id - // should write to different sst - err := w.flushAndNewWriter() - if err != nil { - return err - } - } keyAdapter := w.engine.keyAdapter totalKeySize := 0 @@ -1063,26 +1036,7 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { } kvs = newKvs } - startIdx := 0 - w.prevRowID = kvs[len(kvs)-1].RowID - for i := 1; i < len(kvs); i++ { - if kvs[i].RowID > kvs[i-1].RowID+1 { - // leap id - err := w.writer.writeKVs(kvs[startIdx:i]) - if err != nil { - return err - } - err = w.flushAndNewWriter() - if err != nil { - return err - } - startIdx = i - } - } - if startIdx < len(kvs) { - return w.writer.writeKVs(kvs[startIdx:]) - } - return nil + return w.writer.writeKVs(kvs) } func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) error { @@ -1149,9 +1103,6 @@ func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames [ } func (w *Writer) flush(ctx context.Context) error { - failpoint.Inject("MockFlushWriter", func() { - failpoint.Return(nil) - }) w.Lock() defer w.Unlock() if w.batchCount == 0 { diff --git a/br/pkg/lightning/backend/local/engine_test.go b/br/pkg/lightning/backend/local/engine_test.go index 13c890c028297..a0d8592d5398f 100644 --- a/br/pkg/lightning/backend/local/engine_test.go +++ b/br/pkg/lightning/backend/local/engine_test.go @@ -29,9 +29,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/stretchr/testify/require" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/backend" - "github.com/pingcap/tidb/br/pkg/lightning/common" ) func TestIngestSSTWithClosedEngine(t *testing.T) { @@ -87,112 +85,3 @@ func TestIngestSSTWithClosedEngine(t *testing.T) { }, }), errorEngineClosed) } - -func TestAutoSplitSST(t *testing.T) { - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/MockFlushWriter", "return(true)")) - defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/MockFlushWriter")) - }() - var err error - dir := os.TempDir() - w := &Writer{ - engine: &Engine{ - sstDir: dir, - keyAdapter: noopKeyAdapter{}, - logger: log.L(), - }, - isKVSorted: true, - isWriteBatchSorted: true, - } - w.engine.closed.Store(false) - w.writer, err = w.createSSTWriter() - require.Nil(t, err) - kvs := []common.KvPair{ - { - Key: []byte("1"), - Val: []byte("val1"), - RowID: 1, - }, - { - Key: []byte("2"), - Val: []byte("val1"), - RowID: 2, - }, - } - prevWriter := w.writer - err = w.appendRowsSorted(kvs) - require.Nil(t, err) - require.True(t, prevWriter == w.writer) - kvs = []common.KvPair{ - { - Key: []byte("10"), - Val: []byte("val10"), - RowID: 10, - }, - { - Key: []byte("11"), - Val: []byte("val11"), - RowID: 11, - }, - } - err = w.appendRowsSorted(kvs) - require.Nil(t, err) - require.False(t, prevWriter == w.writer) // id leap, should flush and create - prevWriter = w.writer - kvs = []common.KvPair{ - { - Key: []byte("12"), - Val: []byte("val12"), - RowID: 10, - }, - { - Key: []byte("13"), - Val: []byte("val13"), - RowID: 11, - }, - { - Key: []byte("15"), - Val: []byte("val15"), - RowID: 15, - }, - } - err = w.appendRowsSorted(kvs) - require.Nil(t, err) - require.False(t, prevWriter == w.writer) // id leap, should flush and create - prevWriter = w.writer - kvs = []common.KvPair{ - { - Key: []byte("16"), - Val: []byte("val16"), - RowID: 16, - }, - { - Key: []byte("17"), - Val: []byte("val17"), - RowID: 17, - }, - { - Key: []byte("19"), - Val: []byte("val19"), - RowID: 19, - }, - { - Key: []byte("20"), - Val: []byte("val20"), - RowID: 20, - }, - { - Key: []byte("22"), - Val: []byte("val22"), - RowID: 22, - }, - { - Key: []byte("23"), - Val: []byte("val23"), - RowID: 22, - }, - } - err = w.appendRowsSorted(kvs) - require.Nil(t, err) - require.False(t, prevWriter == w.writer) // id leap, should flush and create -} diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index 04cc75e5567ae..b4f2537fb2507 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -22,7 +22,6 @@ import ( "time" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/worker" @@ -272,12 +271,6 @@ func makeSourceFileRegion( if !isCsvFile { divisor += 2 } - sizePerRow, err := GetSampledAvgRowSize(&fi, cfg, ioWorkers, store) - if err == nil && sizePerRow != 0 { - log.FromContext(ctx).Warn("fail to sample file", zap.String("path", fi.FileMeta.Path), zap.Error(err)) - divisor = sizePerRow - } - log.FromContext(ctx).Debug("avg row size", zap.String("path", fi.FileMeta.Path), zap.Int64("size per row", sizePerRow)) // If a csv file is overlarge, we need to split it into multiple regions. // Note: We can only split a csv file whose format is strict. // We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools @@ -299,10 +292,6 @@ func makeSourceFileRegion( RowIDMax: fi.FileMeta.FileSize / divisor, }, } - failpoint.Inject("MockInaccurateRowID", func() { - // only allocates 5 rows but contains 10 rows - tableRegion.Chunk.RowIDMax = 5 - }) if tableRegion.Size() > tableRegionSizeWarningThreshold { log.FromContext(ctx).Warn( @@ -313,55 +302,6 @@ func makeSourceFileRegion( return []*TableRegion{tableRegion}, []float64{float64(fi.FileMeta.FileSize)}, nil } -func GetSampledAvgRowSize( - fileInfo *FileInfo, - cfg *config.Config, - ioWorkers *worker.Pool, - store storage.ExternalStorage, -) (int64, error) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - reader, err := store.Open(ctx, fileInfo.FileMeta.Path) - if err != nil { - return 0, err - } - var parser Parser - switch fileInfo.FileMeta.Type { - case SourceTypeCSV: - hasHeader := cfg.Mydumper.CSV.Header - charsetConvertor, err := NewCharsetConvertor(cfg.Mydumper.DataCharacterSet, cfg.Mydumper.DataInvalidCharReplace) - if err != nil { - return 0, err - } - parser, err = NewCSVParser(ctx, &cfg.Mydumper.CSV, reader, int64(cfg.Mydumper.ReadBlockSize), ioWorkers, hasHeader, charsetConvertor) - if err != nil { - return 0, err - } - case SourceTypeSQL: - parser = NewChunkParser(ctx, cfg.TiDB.SQLMode, reader, int64(cfg.Mydumper.ReadBlockSize), ioWorkers) - default: - return 0, errors.Errorf("source file %s is none of csv, sql, or parquet file", fileInfo.FileMeta.Path) - } - totalBytes := 0 - totalRows := 0 - defaultSampleRows := 10 // todo: may be configurable - for i := 0; i < defaultSampleRows; i++ { - err = parser.ReadRow() - if err != nil && errors.Cause(err) == io.EOF { - break - } else if err != nil { - return 0, err - } - totalBytes += parser.LastRow().Length - totalRows++ - } - if totalRows > 0 { - return int64(totalBytes) / int64(totalRows), nil - } else { - return 0, nil - } -} - // because parquet files can't seek efficiently, there is no benefit in split. // parquet file are column orient, so the offset is read line number func makeParquetFileRegion( @@ -441,9 +381,6 @@ func SplitLargeFile( } for { curRowsCnt := (endOffset - startOffset) / divisor - if curRowsCnt == 0 && endOffset != startOffset { - curRowsCnt = 1 - } rowIDMax := prevRowIdxMax + curRowsCnt if endOffset != dataFile.FileMeta.FileSize { r, err := store.Open(ctx, dataFile.FileMeta.Path) diff --git a/br/pkg/lightning/mydump/region_test.go b/br/pkg/lightning/mydump/region_test.go index 37ba4e4028e39..a1dbb9f290a69 100644 --- a/br/pkg/lightning/mydump/region_test.go +++ b/br/pkg/lightning/mydump/region_test.go @@ -40,8 +40,6 @@ import ( */ func TestTableRegion(t *testing.T) { cfg := newConfigWithSourceDir("./examples") - // specify ReadBlockSize because we need to sample files - cfg.Mydumper.ReadBlockSize = config.ReadBlockSize loader, _ := NewMyDumpLoader(context.Background(), cfg) dbMeta := loader.GetDatabases()[0] @@ -384,90 +382,3 @@ func TestSplitLargeFileOnlyOneChunk(t *testing.T) { require.Equal(t, columns, regions[i].Chunk.Columns) } } - -func TestSampleAndGetAvgRowSize(t *testing.T) { - // It's more difficult to estimate sizes of SQL files than csv files, - // because when reading the first row of them, parser may read other info (e.g. table name) - // so that make it hard to get good estimate, especially when files have few rows. - sqlFiles := []string{ - // 1. long table name, values: - // 1.1 short and even len - "INSERT INTO `test_db_mock_long.test_table_very_long_name` VALUES (1),(2);", - // 1.2 short and not even - "INSERT INTO `test_db_mock_long.test_table_very_long_name` VALUES (123452123,1234123125),(2,1);", - "INSERT INTO `test_db_mock_long.test_table_very_long_name` VALUES (2,1),(123452123,1234123125);", - // 1.3 long and even - "INSERT INTO `test_db_mock_long.test_table_very_long_name` VALUES (123452123,1234123125),(1234123125,12341231251);", - // 1.4 long but not even - "INSERT INTO `test_db_mock_long.test_table_very_long_name` VALUES ('abcdefghidgjla','lkjadsfasfdkjl'),('1111111','1');", - // 2. short table name, values: - // 2.1 short and even len - "INSERT INTO `a` VALUES (1),(2);", - // 2.2 short and not even - "INSERT INTO `a` VALUES (123452123,1234123125),(2,1);", - "INSERT INTO `a` VALUES (2,1),(123452123,1234123125);", - // 2.3 long and even - "INSERT INTO `a` VALUES (123452123,1234123125),(1234123125,12341231251);", - // 2.4 long but not even - "INSERT INTO `a` VALUES ('abcdefghidgjla','lkjadsfasfdkjl'),('1111111','1');", - } - - csvFiles := []string{ - // even and short - "a,b,c\r\n1,2,3\r\n4,5,6\r\n", - // not even but short - "a,b,c\r\n1112,1234,1923\r\n1,2,3", - // even and long - "a,b,c\r\n14712312,123122,1231233\r\n4456364,34525,423426\r\n", - // not even but long - "a,b,c\r\nsadlk;fja;lskdfj;alksdfj,sdlk;fjaksld;fja;l,qpoiwuepqou\r\n0,0,0\r\n", - } - testFunc := func(files []string, fileType SourceType) { - for _, file := range files { - dir := t.TempDir() - - var fileName string - if fileType == SourceTypeCSV { - fileName = "test.csv" - } else { - fileName = "test.sql" - } - filePath := filepath.Join(dir, fileName) - - content := []byte(file) - err := os.WriteFile(filePath, content, 0o644) - require.Nil(t, err) - dataFileInfo, err := os.Stat(filePath) - require.Nil(t, err) - fileSize := dataFileInfo.Size() - - cfg := newConfigWithSourceDir(dir) - loader, _ := NewMyDumpLoader(context.Background(), cfg) - ioWorkers := worker.NewPool(context.Background(), 1, "io") - - // specify ReadBlockSize because we need to sample files - cfg.Mydumper.ReadBlockSize = config.ReadBlockSize - fileInfo := FileInfo{ - FileMeta: SourceFileMeta{ - Path: fileName, - Type: fileType, - FileSize: fileSize, - }, - } - cfg.Mydumper.CSV = config.CSVConfig{ - Separator: ",", - Delimiter: `"`, - Header: true, - NotNull: false, - Null: `\N`, - BackslashEscape: true, - TrimLastSep: false, - } - size, err := GetSampledAvgRowSize(&fileInfo, cfg, ioWorkers, loader.GetStore()) - require.Nil(t, err) - require.GreaterOrEqual(t, fileSize/size, int64(2)) - } - } - testFunc(sqlFiles, SourceTypeSQL) - testFunc(csvFiles, SourceTypeCSV) -} diff --git a/br/pkg/lightning/restore/chunk_restore_test.go b/br/pkg/lightning/restore/chunk_restore_test.go index 59d083d85561c..2a9a42434c77b 100644 --- a/br/pkg/lightning/restore/chunk_restore_test.go +++ b/br/pkg/lightning/restore/chunk_restore_test.go @@ -73,7 +73,7 @@ func (s *chunkRestoreSuite) SetupTest() { } var err error - s.cr, err = newChunkRestore(context.Background(), 1, s.cfg, &chunk, w, s.store, nil, nil) + s.cr, err = newChunkRestore(context.Background(), 1, s.cfg, &chunk, w, s.store, nil) require.NoError(s.T(), err) } diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index b94bde8208be6..0bac9bd436613 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -78,11 +78,6 @@ func (b *dbMetaMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr { type tableMetaMgr interface { InitTableMeta(ctx context.Context) error AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) - // ReallocTableRowIDs reallocates the row IDs of a table. - // It returns new rowIDBase and maxRowID or any error it encounters. - // Note that noopTableMetaMgr has a noop implementation of this function. - // If maxRowID is 0, caller should maintain rowIDBase and maxRowID itself. - ReallocTableRowIDs(ctx context.Context, newRowIDCount int64) (int64, int64, error) UpdateTableStatus(ctx context.Context, status metaStatus) error UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) ( @@ -165,51 +160,6 @@ func parseMetaStatus(s string) (metaStatus, error) { } } -func (m *dbTableMetaMgr) ReallocTableRowIDs(ctx context.Context, newRowIDCount int64) (int64, int64, error) { - conn, err := m.session.Conn(ctx) - if err != nil { - return 0, 0, errors.Trace(err) - } - defer conn.Close() - exec := &common.SQLWithRetry{ - DB: m.session, - Logger: m.tr.logger, - } - err = exec.Exec(ctx, "enable pessimistic transaction", "SET SESSION tidb_txn_mode = 'pessimistic';") - if err != nil { - return 0, 0, errors.Annotate(err, "enable pessimistic transaction failed") - } - var ( - maxRowIDMax int64 - newRowIDMax int64 - ) - err = exec.Transact(ctx, "realloc table rowID", func(ctx context.Context, tx *sql.Tx) error { - row := tx.QueryRowContext( - ctx, - fmt.Sprintf("SELECT MAX(row_id_max) from %s WHERE table_id = ? FOR UPDATE", m.tableName), - m.tr.tableInfo.ID, - ) - if row.Err() != nil { - return errors.Trace(err) - } - if err := row.Scan(&maxRowIDMax); err != nil { - return errors.Trace(err) - } - newRowIDMax = maxRowIDMax + newRowIDCount - // nolint:gosec - query := fmt.Sprintf("UPDATE %s SET row_id_max = ? WHERE table_id = ? AND task_id = ?", m.tableName) - if _, err := tx.ExecContext(ctx, query, newRowIDMax, m.tr.tableInfo.ID, m.taskID); err != nil { - return err - } - return nil - }) - if err != nil { - return 0, 0, errors.Trace(err) - } - // newRowIDBase = maxRowIDMax + 1 - return maxRowIDMax + 1, newRowIDMax, nil -} - func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) { conn, err := m.session.Conn(ctx) if err != nil { @@ -1097,12 +1047,6 @@ func (m noopTableMetaMgr) InitTableMeta(ctx context.Context) error { return nil } -func (m noopTableMetaMgr) ReallocTableRowIDs(ctx context.Context, _ int64) (int64, int64, error) { - // we don't need to reconcile rowIDs across all the instances - // barring using parallel import - return 0, 0, nil -} - func (m noopTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) { return nil, 0, nil } diff --git a/br/pkg/lightning/restore/meta_manager_test.go b/br/pkg/lightning/restore/meta_manager_test.go index 23102b56f07a6..8480bf077d6de 100644 --- a/br/pkg/lightning/restore/meta_manager_test.go +++ b/br/pkg/lightning/restore/meta_manager_test.go @@ -384,35 +384,3 @@ func TestSingleTaskMetaMgr(t *testing.T) { }) require.NoError(t, err) } - -func TestReallocTableRowIDs(t *testing.T) { - s, clean := newMetaMgrSuite(t) - defer clean() - - ctx := context.WithValue(context.Background(), &checksumManagerKey, s.checksumMgr) - - rows := [][]driver.Value{ - {int64(1), int64(998), int64(1008), uint64(0), uint64(0), uint64(0), metaStatusRowIDAllocated.String()}, - } - checksum := verification.MakeKVChecksum(2, 1, 3) - s.prepareMock(rows, nil, nil, &checksum, nil) - - ck, rowIDBase, err := s.mgr.AllocTableRowIDs(ctx, 10) - require.NoError(t, err) - require.Equal(t, int64(998), rowIDBase) - require.Equal(t, &checksum, ck) - require.Equal(t, 1, s.checksumMgr.callCnt) - s.mockDB.ExpectExec("SET SESSION tidb_txn_mode = 'pessimistic';"). - WillReturnResult(sqlmock.NewResult(int64(0), int64(0))) - - s.mockDB.ExpectBegin() - s.mockDB.ExpectQuery("\\QSELECT MAX(row_id_max) from `test`.`table_meta` WHERE table_id = ? FOR UPDATE\\E").WithArgs(int64(1)). - WillReturnRows(sqlmock.NewRows([]string{"row_id_max"}).AddRow(1008)) - s.mockDB.ExpectExec("\\QUPDATE `test`.`table_meta` SET row_id_max = ? WHERE table_id = ? AND task_id = ?\\E").WithArgs(int64(1018), int64(1), int64(1)). - WillReturnResult(sqlmock.NewResult(int64(0), int64(1))) - s.mockDB.ExpectCommit() - newBase, newMax, err := s.mgr.ReallocTableRowIDs(context.Background(), 10) - require.Nil(t, err) - require.Equal(t, int64(1009), newBase) - require.Equal(t, int64(1018), newMax) -} diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index faf774efa5c7a..0a43936ae5661 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -1610,9 +1610,6 @@ func (tr *TableRestore) restoreTable( rowIDMax = engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax } } - if rowIDMax > tr.curMaxRowID { - tr.curMaxRowID = rowIDMax - } db, _ := rc.tidbGlue.GetDB() versionStr, err := version.FetchVersion(ctx, db) if err != nil { @@ -2055,16 +2052,9 @@ func (rc *Controller) DataCheck(ctx context.Context) error { } type chunkRestore struct { - parser mydump.Parser - index int - chunk *checkpoints.ChunkCheckpoint - originalRowIDMax int64 - curRowIDBase int64 - curRowIDMax int64 - tableRestore *TableRestore - - rowCount int - curAccmRowSize uint64 // has a maximum of 18446744.07370955 TB + parser mydump.Parser + index int + chunk *checkpoints.ChunkCheckpoint } func newChunkRestore( @@ -2075,7 +2065,6 @@ func newChunkRestore( ioWorkers *worker.Pool, store storage.ExternalStorage, tableInfo *checkpoints.TidbTableInfo, - tableRestore *TableRestore, ) (*chunkRestore, error) { blockBufSize := int64(cfg.Mydumper.ReadBlockSize) @@ -2122,11 +2111,9 @@ func newChunkRestore( } return &chunkRestore{ - parser: parser, - index: index, - chunk: chunk, - originalRowIDMax: chunk.Chunk.RowIDMax, - tableRestore: tableRestore, + parser: parser, + index: index, + chunk: chunk, }, nil } @@ -2179,52 +2166,13 @@ type deliverResult struct { err error } -func (cr *chunkRestore) adjustRowID(rowID int64, rc *Controller) (int64, error) { - if rowID <= cr.originalRowIDMax { - // no need to ajust - return rowID, nil - } - // need to adjust rowID - // rowID should be within [curRowIDBase, curRowIDMax] - if cr.curRowIDBase == 0 || cr.curRowIDBase > cr.curRowIDMax { - logger := cr.tableRestore.logger.With( - zap.String("tableName", cr.tableRestore.tableName), - zap.Int("fileIndex", cr.index), - zap.Stringer("path", &cr.chunk.Key), - zap.String("task", "re-allocate rowID"), - ) - logger.Info("start re-allocating") - // 1. curRowIDBase == 0 -> no previous re-allocation - // 2. curRowIDBase > curRowIDMax -> run out of allocated IDs - pos, _ := cr.parser.Pos() - leftFileSize := cr.chunk.Chunk.EndOffset - pos - avgRowSize := cr.curAccmRowSize / uint64(cr.rowCount) - newRowIDCount := leftFileSize/int64(avgRowSize) + 1 // plus the current row - newBase, newMax, err := cr.tableRestore.allocateRowIDs(newRowIDCount, rc) - if err != nil { - logger.Error("fail to re-allocate rowIDs", zap.Error(err)) - return 0, err - } - cr.curRowIDBase = newBase - cr.curRowIDMax = newMax - } - rowID = cr.curRowIDBase - cr.curRowIDBase++ - return rowID, nil -} - -func (cr *chunkRestore) updateRowStats(rowSize int) { - cr.curAccmRowSize += uint64(rowSize) - cr.rowCount++ -} - //nolint:nakedret // TODO: refactor func (cr *chunkRestore) deliverLoop( ctx context.Context, kvsCh <-chan []deliveredKVs, t *TableRestore, engineID int32, - dataWriter, indexWriter *backend.LocalEngineWriter, + dataEngine, indexEngine *backend.LocalEngineWriter, rc *Controller, ) (deliverTotalDur time.Duration, err error) { deliverLogger := t.logger.With( @@ -2248,6 +2196,7 @@ func (cr *chunkRestore) deliverLoop( startOffset := cr.chunk.Chunk.Offset currOffset := startOffset rowID := cr.chunk.Chunk.PrevRowIDMax + populate: for dataChecksum.SumSize()+indexChecksum.SumSize() < minDeliverBytes { select { @@ -2282,7 +2231,7 @@ func (cr *chunkRestore) deliverLoop( for !rc.diskQuotaLock.TryRLock() { // try to update chunk checkpoint, this can help save checkpoint after importing when disk-quota is triggered if !dataSynced { - dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataWriter, indexWriter) + dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataEngine, indexEngine) } time.Sleep(time.Millisecond) } @@ -2291,14 +2240,14 @@ func (cr *chunkRestore) deliverLoop( // Write KVs into the engine start := time.Now() - if err = dataWriter.WriteRows(ctx, columns, dataKVs); err != nil { + if err = dataEngine.WriteRows(ctx, columns, dataKVs); err != nil { if !common.IsContextCanceledError(err) { deliverLogger.Error("write to data engine failed", log.ShortError(err)) } return errors.Trace(err) } - if err = indexWriter.WriteRows(ctx, columns, indexKVs); err != nil { + if err = indexEngine.WriteRows(ctx, columns, indexKVs); err != nil { if !common.IsContextCanceledError(err) { deliverLogger.Error("write to index engine failed", log.ShortError(err)) } @@ -2340,7 +2289,7 @@ func (cr *chunkRestore) deliverLoop( if currOffset > lastOffset || dataChecksum.SumKVS() != 0 || indexChecksum.SumKVS() != 0 { // No need to save checkpoint if nothing was delivered. - dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataWriter, indexWriter) + dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataEngine, indexEngine) } failpoint.Inject("SlowDownWriteRows", func() { deliverLogger.Warn("Slowed down write rows") @@ -2511,11 +2460,6 @@ func (cr *chunkRestore) encodeLoop( encodeDurStart := time.Now() lastRow := cr.parser.LastRow() // sql -> kv - if lastRow.RowID, err = cr.adjustRowID(lastRow.RowID, rc); err != nil { - return - } - cr.updateRowStats(lastRow.Length) - rowID = lastRow.RowID kvs, encodeErr := kvEncoder.Encode(logger, lastRow.Row, lastRow.RowID, cr.chunk.ColumnPermutation, cr.chunk.Key.Path, curOffset) encodeDur += time.Since(encodeDurStart) @@ -2578,7 +2522,7 @@ func (cr *chunkRestore) restore( ctx context.Context, t *TableRestore, engineID int32, - dataWriter, indexWriter *backend.LocalEngineWriter, + dataEngine, indexEngine *backend.LocalEngineWriter, rc *Controller, ) error { // Create the encoder. @@ -2599,7 +2543,7 @@ func (cr *chunkRestore) restore( go func() { defer close(deliverCompleteCh) - dur, err := cr.deliverLoop(ctx, kvsCh, t, engineID, dataWriter, indexWriter, rc) + dur, err := cr.deliverLoop(ctx, kvsCh, t, engineID, dataEngine, indexEngine, rc) select { case <-ctx.Done(): case deliverCompleteCh <- deliverResult{dur, err}: diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 491a59fa1c33b..7e44234f1d773 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -56,8 +56,6 @@ type TableRestore struct { logger log.Logger ignoreColumns map[string]struct{} - rowIDLock sync.Mutex - curMaxRowID int64 } func NewTableRestore( @@ -150,9 +148,6 @@ func (tr *TableRestore) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowID for _, chunk := range engine.Chunks { chunk.Chunk.PrevRowIDMax += rowIDBase chunk.Chunk.RowIDMax += rowIDBase - if chunk.Chunk.RowIDMax > tr.curMaxRowID { - tr.curMaxRowID = chunk.Chunk.RowIDMax - } } } } @@ -510,7 +505,7 @@ func (tr *TableRestore) restoreEngine( // 2. sql -> kvs // 3. load kvs data (into kv deliver server) // 4. flush kvs data (into tikv node) - cr, err := newChunkRestore(ctx, chunkIndex, rc.cfg, chunk, rc.ioWorkers, rc.store, tr.tableInfo, tr) + cr, err := newChunkRestore(ctx, chunkIndex, rc.cfg, chunk, rc.ioWorkers, rc.store, tr.tableInfo) if err != nil { setError(err) break @@ -1053,31 +1048,3 @@ func estimateCompactionThreshold(cp *checkpoints.TableCheckpoint, factor int64) return threshold } - -func (tr *TableRestore) allocateRowIDs(newRowCount int64, rc *Controller) (int64, int64, error) { - tr.rowIDLock.Lock() - defer tr.rowIDLock.Unlock() - metaMgr := rc.metaMgrBuilder.TableMetaMgr(tr) - // try to re-allocate from downstream - // if we are using parallel import, rowID should be reconciled globally. - // Otherwise, this function will simply return 0. - newRowIDBase, newRowIDMax, err := metaMgr.ReallocTableRowIDs(context.Background(), newRowCount) - if err != nil { - return 0, 0, err - } - // TODO: refinement: currently, when we're not using SSTMode + incremental, - // metadata of the table restore is not maintained globally. - // So we have to deviate this two disparate situations here and make - // code complexer. - var rowIDBase int64 - if newRowIDMax != 0 { - // re-alloc from downstream - rowIDBase = newRowIDBase - tr.curMaxRowID = newRowIDMax - } else { - // single import mode: re-allocate rowID from memory - rowIDBase = tr.curMaxRowID + 1 - tr.curMaxRowID += newRowCount - } - return rowIDBase, tr.curMaxRowID, nil -} diff --git a/br/pkg/lightning/restore/table_restore_test.go b/br/pkg/lightning/restore/table_restore_test.go index 87aa389c7167b..5a8799ad43002 100644 --- a/br/pkg/lightning/restore/table_restore_test.go +++ b/br/pkg/lightning/restore/table_restore_test.go @@ -198,7 +198,6 @@ func (s *tableRestoreSuite) TestPopulateChunks() { Engines: make(map[int32]*checkpoints.EngineCheckpoint), } - s.cfg.Mydumper.CSV.Header = false rc := &Controller{cfg: s.cfg, ioWorkers: worker.NewPool(context.Background(), 1, "io"), store: s.store} err := s.tr.populateChunks(context.Background(), rc, cp) require.NoError(s.T(), err) @@ -217,7 +216,7 @@ func (s *tableRestoreSuite) TestPopulateChunks() { Offset: 0, EndOffset: 37, PrevRowIDMax: 0, - RowIDMax: 1, + RowIDMax: 7, // 37 bytes with 3 columns can store at most 7 rows. }, Timestamp: 1234567897, }, @@ -227,8 +226,8 @@ func (s *tableRestoreSuite) TestPopulateChunks() { Chunk: mydump.Chunk{ Offset: 0, EndOffset: 37, - PrevRowIDMax: 1, - RowIDMax: 2, + PrevRowIDMax: 7, + RowIDMax: 14, }, Timestamp: 1234567897, }, @@ -238,8 +237,8 @@ func (s *tableRestoreSuite) TestPopulateChunks() { Chunk: mydump.Chunk{ Offset: 0, EndOffset: 37, - PrevRowIDMax: 2, - RowIDMax: 3, + PrevRowIDMax: 14, + RowIDMax: 21, }, Timestamp: 1234567897, }, @@ -254,8 +253,8 @@ func (s *tableRestoreSuite) TestPopulateChunks() { Chunk: mydump.Chunk{ Offset: 0, EndOffset: 37, - PrevRowIDMax: 3, - RowIDMax: 4, + PrevRowIDMax: 21, + RowIDMax: 28, }, Timestamp: 1234567897, }, @@ -265,8 +264,8 @@ func (s *tableRestoreSuite) TestPopulateChunks() { Chunk: mydump.Chunk{ Offset: 0, EndOffset: 37, - PrevRowIDMax: 4, - RowIDMax: 5, + PrevRowIDMax: 28, + RowIDMax: 35, }, Timestamp: 1234567897, }, @@ -276,8 +275,8 @@ func (s *tableRestoreSuite) TestPopulateChunks() { Chunk: mydump.Chunk{ Offset: 0, EndOffset: 37, - PrevRowIDMax: 5, - RowIDMax: 6, + PrevRowIDMax: 35, + RowIDMax: 42, }, Timestamp: 1234567897, }, @@ -292,8 +291,8 @@ func (s *tableRestoreSuite) TestPopulateChunks() { Chunk: mydump.Chunk{ Offset: 0, EndOffset: 14, - PrevRowIDMax: 6, - RowIDMax: 10, + PrevRowIDMax: 42, + RowIDMax: 46, }, Timestamp: 1234567897, }, @@ -473,7 +472,7 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { Offset: 0, EndOffset: 14, PrevRowIDMax: 0, - RowIDMax: 4, // 14 bytes and 3 byte for each row + RowIDMax: 4, // 37 bytes with 3 columns can store at most 7 rows. }, Timestamp: 1234567897, }, @@ -484,7 +483,7 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { Offset: 0, EndOffset: 10, PrevRowIDMax: 4, - RowIDMax: 9, // 10 bytes and 2 byte for each row + RowIDMax: 7, }, Timestamp: 1234567897, }, @@ -495,8 +494,8 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { Chunk: mydump.Chunk{ Offset: 6, EndOffset: 52, - PrevRowIDMax: 9, - RowIDMax: 13, + PrevRowIDMax: 7, + RowIDMax: 20, Columns: []string{"a", "b", "c"}, }, @@ -509,8 +508,8 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { Chunk: mydump.Chunk{ Offset: 52, EndOffset: 60, - PrevRowIDMax: 13, - RowIDMax: 14, + PrevRowIDMax: 20, + RowIDMax: 22, Columns: []string{"a", "b", "c"}, }, Timestamp: 1234567897, @@ -522,8 +521,8 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { Chunk: mydump.Chunk{ Offset: 6, EndOffset: 48, - PrevRowIDMax: 14, - RowIDMax: 17, + PrevRowIDMax: 22, + RowIDMax: 35, Columns: []string{"c", "a", "b"}, }, Timestamp: 1234567897, @@ -540,8 +539,8 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { Chunk: mydump.Chunk{ Offset: 48, EndOffset: 101, - PrevRowIDMax: 17, - RowIDMax: 20, + PrevRowIDMax: 35, + RowIDMax: 48, Columns: []string{"c", "a", "b"}, }, Timestamp: 1234567897, @@ -553,8 +552,8 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { Chunk: mydump.Chunk{ Offset: 101, EndOffset: 102, - PrevRowIDMax: 20, - RowIDMax: 21, + PrevRowIDMax: 48, + RowIDMax: 48, Columns: []string{"c", "a", "b"}, }, Timestamp: 1234567897, @@ -566,8 +565,8 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { Chunk: mydump.Chunk{ Offset: 4, EndOffset: 59, - PrevRowIDMax: 21, - RowIDMax: 23, + PrevRowIDMax: 48, + RowIDMax: 61, Columns: []string{"b", "c"}, }, Timestamp: 1234567897, @@ -584,8 +583,8 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { Chunk: mydump.Chunk{ Offset: 59, EndOffset: 60, - PrevRowIDMax: 23, - RowIDMax: 24, + PrevRowIDMax: 61, + RowIDMax: 61, Columns: []string{"b", "c"}, }, Timestamp: 1234567897, diff --git a/br/tests/lightning_auto_random_default/run.sh b/br/tests/lightning_auto_random_default/run.sh index c54ca6ac7ee0e..41b9798de4560 100644 --- a/br/tests/lightning_auto_random_default/run.sh +++ b/br/tests/lightning_auto_random_default/run.sh @@ -40,10 +40,10 @@ for backend in tidb local; do check_contains 'inc: 6' NEXT_AUTO_RAND_VAL=7 else - check_contains 'inc: 6' - check_contains 'inc: 7' - check_contains 'inc: 8' - NEXT_AUTO_RAND_VAL=9 + check_contains 'inc: 25' + check_contains 'inc: 26' + check_contains 'inc: 27' + NEXT_AUTO_RAND_VAL=28 fi # tidb backend randomly generate the auto-random bit for each statement, so with 2 statements, diff --git a/br/tests/lightning_realloc_id/config.toml b/br/tests/lightning_realloc_id/config.toml deleted file mode 100644 index f32593b43b798..0000000000000 --- a/br/tests/lightning_realloc_id/config.toml +++ /dev/null @@ -1,3 +0,0 @@ -[tikv-importer] -incremental-import = true -backend = 'local' diff --git a/br/tests/lightning_realloc_id/config1.toml b/br/tests/lightning_realloc_id/config1.toml deleted file mode 100644 index d2152b47c922a..0000000000000 --- a/br/tests/lightning_realloc_id/config1.toml +++ /dev/null @@ -1,2 +0,0 @@ -[tikv-importer] -backend = 'local' diff --git a/br/tests/lightning_realloc_id/config2.toml b/br/tests/lightning_realloc_id/config2.toml deleted file mode 100644 index f32593b43b798..0000000000000 --- a/br/tests/lightning_realloc_id/config2.toml +++ /dev/null @@ -1,3 +0,0 @@ -[tikv-importer] -incremental-import = true -backend = 'local' diff --git a/br/tests/lightning_realloc_id/data/db-schema-create.sql b/br/tests/lightning_realloc_id/data/db-schema-create.sql deleted file mode 100644 index c88b0e3150e76..0000000000000 --- a/br/tests/lightning_realloc_id/data/db-schema-create.sql +++ /dev/null @@ -1 +0,0 @@ -create database db; \ No newline at end of file diff --git a/br/tests/lightning_realloc_id/data/db.test-schema.sql b/br/tests/lightning_realloc_id/data/db.test-schema.sql deleted file mode 100644 index 0490cd81e1c2e..0000000000000 --- a/br/tests/lightning_realloc_id/data/db.test-schema.sql +++ /dev/null @@ -1,4 +0,0 @@ -create table db.test( - id int auto_increment unique key, - a int primary key -); \ No newline at end of file diff --git a/br/tests/lightning_realloc_id/data/db.test.000000000.csv b/br/tests/lightning_realloc_id/data/db.test.000000000.csv deleted file mode 100644 index f2ce71fb561c5..0000000000000 --- a/br/tests/lightning_realloc_id/data/db.test.000000000.csv +++ /dev/null @@ -1,11 +0,0 @@ -a -100 -101 -102 -103 -104 -105 -106 -107 -108 -109 \ No newline at end of file diff --git a/br/tests/lightning_realloc_id/data/db.test.000000001.sql b/br/tests/lightning_realloc_id/data/db.test.000000001.sql deleted file mode 100644 index 611b5f5dbeba6..0000000000000 --- a/br/tests/lightning_realloc_id/data/db.test.000000001.sql +++ /dev/null @@ -1,11 +0,0 @@ -insert into db.test(a) values -(200), -(201), -(202), -(203), -(204), -(205), -(206), -(207), -(208), -(209); \ No newline at end of file diff --git a/br/tests/lightning_realloc_id/data1/db-schema-create.sql b/br/tests/lightning_realloc_id/data1/db-schema-create.sql deleted file mode 100644 index c88b0e3150e76..0000000000000 --- a/br/tests/lightning_realloc_id/data1/db-schema-create.sql +++ /dev/null @@ -1 +0,0 @@ -create database db; \ No newline at end of file diff --git a/br/tests/lightning_realloc_id/data1/db.test-schema.sql b/br/tests/lightning_realloc_id/data1/db.test-schema.sql deleted file mode 100644 index 0490cd81e1c2e..0000000000000 --- a/br/tests/lightning_realloc_id/data1/db.test-schema.sql +++ /dev/null @@ -1,4 +0,0 @@ -create table db.test( - id int auto_increment unique key, - a int primary key -); \ No newline at end of file diff --git a/br/tests/lightning_realloc_id/data1/db.test.000000000.csv b/br/tests/lightning_realloc_id/data1/db.test.000000000.csv deleted file mode 100644 index 70ae8fd5a20a7..0000000000000 --- a/br/tests/lightning_realloc_id/data1/db.test.000000000.csv +++ /dev/null @@ -1,11 +0,0 @@ -a -300 -301 -302 -303 -304 -305 -306 -307 -308 -309 \ No newline at end of file diff --git a/br/tests/lightning_realloc_id/data1/db.test.000000001.sql b/br/tests/lightning_realloc_id/data1/db.test.000000001.sql deleted file mode 100644 index 461cf4c3fccaf..0000000000000 --- a/br/tests/lightning_realloc_id/data1/db.test.000000001.sql +++ /dev/null @@ -1,11 +0,0 @@ -insert into db.test(a) values -(400), -(401), -(402), -(403), -(404), -(405), -(406), -(407), -(408), -(409); \ No newline at end of file diff --git a/br/tests/lightning_realloc_id/data2/db.test.000000000.csv b/br/tests/lightning_realloc_id/data2/db.test.000000000.csv deleted file mode 100644 index 12d1d9e0bc948..0000000000000 --- a/br/tests/lightning_realloc_id/data2/db.test.000000000.csv +++ /dev/null @@ -1,11 +0,0 @@ -a -1 -2 -3 -4 -5 -6 -7 -8 -9 -10 \ No newline at end of file diff --git a/br/tests/lightning_realloc_id/run.sh b/br/tests/lightning_realloc_id/run.sh deleted file mode 100644 index eead3b2fc1f33..0000000000000 --- a/br/tests/lightning_realloc_id/run.sh +++ /dev/null @@ -1,93 +0,0 @@ -#!/bin/sh -# -# Copyright 2022 PingCAP, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Basic check for whether partitioned tables work. - -set -eu -check_cluster_version 4 0 0 'local backend' -LOG_FILE1="$TEST_DIR/lightning-realloc-import1.log" -LOG_FILE2="$TEST_DIR/lightning-realloc-import2.log" -LOG_FILE3="$TEST_DIR/lightning-realloc-import3.log" - -function run_lightning_expecting_fail() { - set +e - run_lightning "$@" - ERRCODE=$? - set -e - [ "$ERRCODE" != 0 ] -} - -function check_result() { - run_sql 'SHOW DATABASES;' - check_contains 'Database: db'; - run_sql 'SHOW TABLES IN db;' - check_contains 'Tables_in_db: test' - run_sql 'SELECT count(*) FROM db.test;' - check_contains 'count(*): 20' - run_sql 'SELECT * FROM db.test;' - check_contains 'id: 15' - check_contains 'id: 20' -} - -function parallel_import() { - run_lightning -d "tests/$TEST_NAME/data" \ - --sorted-kv-dir "$TEST_DIR/lightning_realloc_import.sorted1" \ - --log-file "$LOG_FILE1" \ - --config "tests/$TEST_NAME/config.toml" & - pid1="$!" - run_lightning -d "tests/$TEST_NAME/data1" \ - --sorted-kv-dir "$TEST_DIR/lightning_realloc_import.sorted2" \ - --log-file "$LOG_FILE2" \ - --config "tests/$TEST_NAME/config.toml" & - pid2="$!" - wait "$pid1" "$pid2" -} - -function overflow_import() { - run_sql 'create database if not exists db' - run_sql 'create table db.test(id int auto_increment primary key, a int)' - run_sql 'alter table db.test auto_increment=2147483640' # too few available rowID - echo "lightning stdout:" > "$TEST_DIR/sql_res.$TEST_NAME.txt" - run_lightning_expecting_fail -d "tests/$TEST_NAME/data2" \ - --sorted-kv-dir "$TEST_DIR/lightning_realloc_import.sorted3" \ - --log-file "$LOG_FILE3" \ - --config "tests/$TEST_NAME/config2.toml" 2>&1 | tee -a "$TEST_DIR/sql_res.$TEST_NAME.txt" - if ! grep -q "out of range" "$TEST_DIR/sql_res.$TEST_NAME.txt"; then - echo "TEST FAILED: OUTPUT DOES NOT CONTAIN 'out of range'" - exit 1 - fi -} - -function check_parallel_result() { - run_sql 'SHOW DATABASES;' - check_contains 'Database: db'; - run_sql 'SHOW TABLES IN db;' - check_contains 'Tables_in_db: test' - run_sql 'SELECT count(*) FROM db.test;' - check_contains 'count(*): 40' -} - -run_sql 'DROP DATABASE IF EXISTS db;' -export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/mydump/MockInaccurateRowID=return(true)' -run_lightning --config "tests/$TEST_NAME/config1.toml" -check_result -run_sql 'DROP DATABASE IF EXISTS db;' -parallel_import -check_parallel_result -run_sql 'DROP DATABASE IF EXISTS db;' -overflow_import -run_sql 'DROP DATABASE IF EXISTS db;' -unset GO_FAILPOINTS \ No newline at end of file