From 8f64f368646b791cd91630c9f3f89f6315261bb0 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 4 Apr 2023 17:56:57 +0800 Subject: [PATCH] lightning: refactor for reuse in load data part4 (#42727) ref pingcap/tidb#40499 --- br/pkg/lightning/backend/BUILD.bazel | 2 - br/pkg/lightning/backend/backend.go | 30 +- br/pkg/lightning/backend/local/duplicate.go | 226 ++++++++- .../lightning/backend/local/duplicate_test.go | 5 +- br/pkg/lightning/backend/local/engine.go | 4 +- br/pkg/lightning/backend/local/iterator.go | 11 +- .../lightning/backend/local/iterator_test.go | 4 +- br/pkg/lightning/backend/local/local.go | 440 +++++++----------- br/pkg/lightning/backend/local/local_test.go | 24 +- br/pkg/lightning/backend/local/localhelper.go | 15 +- .../backend/local/localhelper_test.go | 6 +- br/pkg/lightning/backend/noop/BUILD.bazel | 2 - br/pkg/lightning/backend/noop/noop.go | 17 - br/pkg/lightning/backend/tidb/tidb.go | 12 - br/pkg/lightning/config/config.go | 23 +- br/pkg/lightning/config/const.go | 2 + br/pkg/lightning/importer/import.go | 3 +- br/pkg/lightning/importer/table_import.go | 14 +- br/pkg/lightning/mydump/csv_parser_test.go | 66 +-- br/pkg/lightning/mydump/parser.go | 4 +- br/pkg/lightning/mydump/parser_test.go | 12 +- br/pkg/lightning/mydump/region.go | 183 +++++--- br/pkg/lightning/mydump/region_test.go | 76 ++- br/pkg/mock/BUILD.bazel | 1 - br/pkg/mock/backend.go | 46 -- ddl/ingest/backend.go | 10 +- ddl/ingest/backend_mgr.go | 6 +- disttask/loaddata/wrapper.go | 3 +- 28 files changed, 633 insertions(+), 614 deletions(-) diff --git a/br/pkg/lightning/backend/BUILD.bazel b/br/pkg/lightning/backend/BUILD.bazel index e9396fcffa9ad..e98f2f9b604db 100644 --- a/br/pkg/lightning/backend/BUILD.bazel +++ b/br/pkg/lightning/backend/BUILD.bazel @@ -9,12 +9,10 @@ go_library( "//br/pkg/lightning/backend/encode", "//br/pkg/lightning/checkpoints", "//br/pkg/lightning/common", - "//br/pkg/lightning/config", "//br/pkg/lightning/log", "//br/pkg/lightning/metric", "//br/pkg/lightning/mydump", "//parser/model", - "//table", "@com_github_google_uuid//:uuid", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", diff --git a/br/pkg/lightning/backend/backend.go b/br/pkg/lightning/backend/backend.go index 775fd617992a5..6763886d28e2e 100644 --- a/br/pkg/lightning/backend/backend.go +++ b/br/pkg/lightning/backend/backend.go @@ -25,12 +25,10 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/br/pkg/lightning/common" - "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/metric" "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/table" "go.uber.org/zap" "golang.org/x/exp/slices" ) @@ -196,18 +194,6 @@ type AbstractBackend interface { // LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine. LocalWriter(ctx context.Context, cfg *LocalWriterConfig, engineUUID uuid.UUID) (EngineWriter, error) - // CollectLocalDuplicateRows collect duplicate keys from local db. We will store the duplicate keys which - // may be repeated with other keys in local data source. - CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (hasDupe bool, err error) - - // CollectRemoteDuplicateRows collect duplicate keys from remote TiKV storage. This keys may be duplicate with - // the data import by other lightning. - CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (hasDupe bool, err error) - - // ResolveDuplicateRows resolves duplicated rows by deleting/inserting data - // according to the required algorithm. - ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) error - // TotalMemoryConsume counts total memory usage. This is only used for local backend. TotalMemoryConsume() int64 } @@ -370,19 +356,9 @@ func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableNam }, nil } -// CollectLocalDuplicateRows collects duplicate rows from the local backend. -func (be Backend) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (bool, error) { - return be.abstract.CollectLocalDuplicateRows(ctx, tbl, tableName, opts) -} - -// CollectRemoteDuplicateRows collects duplicate rows from the remote backend. -func (be Backend) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (bool, error) { - return be.abstract.CollectRemoteDuplicateRows(ctx, tbl, tableName, opts) -} - -// ResolveDuplicateRows resolves duplicate rows from the backend. -func (be Backend) ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) error { - return be.abstract.ResolveDuplicateRows(ctx, tbl, tableName, algorithm) +// Inner returns the underlying abstract backend. +func (be Backend) Inner() AbstractBackend { + return be.abstract } // Close the opened engine to prepare it for importing. diff --git a/br/pkg/lightning/backend/local/duplicate.go b/br/pkg/lightning/backend/local/duplicate.go index 34a8dea8165d7..b03ef4ddeaa1e 100644 --- a/br/pkg/lightning/backend/local/duplicate.go +++ b/br/pkg/lightning/backend/local/duplicate.go @@ -17,6 +17,7 @@ package local import ( "bytes" "context" + "fmt" "io" "math" "sync" @@ -31,6 +32,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/errormanager" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/logutil" @@ -39,15 +41,20 @@ import ( "github.com/pingcap/tidb/distsql" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/ranger" + tikverror "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/tikv" "go.uber.org/atomic" "go.uber.org/zap" + "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" ) const ( @@ -387,9 +394,10 @@ func (s *RemoteDupKVStream) Close() error { return nil } -// DuplicateManager provides methods to collect and decode duplicated KV pairs into row data. The results +// DupeDetector provides methods to collect and decode duplicated KV pairs into row data. The results // are stored into the errorMgr. -type DuplicateManager struct { +// this object can only be used once, either for local or remote deduplication. +type DupeDetector struct { tbl table.Table tableName string splitCli split.SplitClient @@ -399,12 +407,12 @@ type DuplicateManager struct { decoder *kv.TableKVDecoder logger log.Logger concurrency int - hasDupe *atomic.Bool + hasDupe atomic.Bool indexID int64 } -// NewDuplicateManager creates a new DuplicateManager. -func NewDuplicateManager( +// NewDupeDetector creates a new DupeDetector. +func NewDupeDetector( tbl table.Table, tableName string, splitCli split.SplitClient, @@ -413,15 +421,14 @@ func NewDuplicateManager( errMgr *errormanager.ErrorManager, sessOpts *encode.SessionOptions, concurrency int, - hasDupe *atomic.Bool, logger log.Logger, -) (*DuplicateManager, error) { +) (*DupeDetector, error) { logger = logger.With(zap.String("tableName", tableName)) decoder, err := kv.NewTableKVDecoder(tbl, tableName, sessOpts, logger) if err != nil { return nil, errors.Trace(err) } - return &DuplicateManager{ + return &DupeDetector{ tbl: tbl, tableName: tableName, splitCli: splitCli, @@ -431,13 +438,17 @@ func NewDuplicateManager( decoder: decoder, logger: logger, concurrency: concurrency, - hasDupe: hasDupe, indexID: sessOpts.IndexID, }, nil } +// HasDuplicate returns true if there are duplicated KV pairs. +func (m *DupeDetector) HasDuplicate() bool { + return m.hasDupe.Load() +} + // RecordDataConflictError records data conflicts to errorMgr. The key received from stream must be a row key. -func (m *DuplicateManager) RecordDataConflictError(ctx context.Context, stream DupKVStream) error { +func (m *DupeDetector) RecordDataConflictError(ctx context.Context, stream DupKVStream) error { //nolint: errcheck defer stream.Close() var dataConflictInfos []errormanager.DataConflictInfo @@ -481,7 +492,7 @@ func (m *DuplicateManager) RecordDataConflictError(ctx context.Context, stream D return nil } -func (m *DuplicateManager) saveIndexHandles(ctx context.Context, handles pendingIndexHandles) error { +func (m *DupeDetector) saveIndexHandles(ctx context.Context, handles pendingIndexHandles) error { snapshot := m.tikvCli.GetSnapshot(math.MaxUint64) batchGetMap, err := snapshot.BatchGet(ctx, handles.rawHandles) if err != nil { @@ -506,7 +517,7 @@ func (m *DuplicateManager) saveIndexHandles(ctx context.Context, handles pending } // RecordIndexConflictError records index conflicts to errorMgr. The key received from stream must be an index key. -func (m *DuplicateManager) RecordIndexConflictError(ctx context.Context, stream DupKVStream, tableID int64, indexInfo *model.IndexInfo) error { +func (m *DupeDetector) RecordIndexConflictError(ctx context.Context, stream DupKVStream, tableID int64, indexInfo *model.IndexInfo) error { //nolint: errcheck defer stream.Close() indexHandles := makePendingIndexHandlesWithCapacity(0) @@ -552,7 +563,7 @@ func (m *DuplicateManager) RecordIndexConflictError(ctx context.Context, stream } // BuildDuplicateTaskForTest is only used for test. -var BuildDuplicateTaskForTest = func(m *DuplicateManager) ([]dupTask, error) { +var BuildDuplicateTaskForTest = func(m *DupeDetector) ([]dupTask, error) { return m.buildDupTasks() } @@ -562,7 +573,7 @@ type dupTask struct { indexInfo *model.IndexInfo } -func (m *DuplicateManager) buildDupTasks() ([]dupTask, error) { +func (m *DupeDetector) buildDupTasks() ([]dupTask, error) { if m.indexID != 0 { return m.buildIndexDupTasks() } @@ -607,7 +618,7 @@ func (m *DuplicateManager) buildDupTasks() ([]dupTask, error) { return tasks, nil } -func (m *DuplicateManager) buildIndexDupTasks() ([]dupTask, error) { +func (m *DupeDetector) buildIndexDupTasks() ([]dupTask, error) { for _, indexInfo := range m.tbl.Meta().Indices { if m.indexID != indexInfo.ID { continue @@ -634,7 +645,7 @@ func (m *DuplicateManager) buildIndexDupTasks() ([]dupTask, error) { return nil, nil } -func (m *DuplicateManager) splitLocalDupTaskByKeys( +func (m *DupeDetector) splitLocalDupTaskByKeys( task dupTask, dupDB *pebble.DB, keyAdapter KeyAdapter, @@ -660,7 +671,7 @@ func (m *DuplicateManager) splitLocalDupTaskByKeys( return newDupTasks, nil } -func (m *DuplicateManager) buildLocalDupTasks(dupDB *pebble.DB, keyAdapter KeyAdapter) ([]dupTask, error) { +func (m *DupeDetector) buildLocalDupTasks(dupDB *pebble.DB, keyAdapter KeyAdapter) ([]dupTask, error) { tasks, err := m.buildDupTasks() if err != nil { return nil, errors.Trace(err) @@ -679,7 +690,7 @@ func (m *DuplicateManager) buildLocalDupTasks(dupDB *pebble.DB, keyAdapter KeyAd } // CollectDuplicateRowsFromDupDB collects duplicates from the duplicate DB and records all duplicate row info into errorMgr. -func (m *DuplicateManager) CollectDuplicateRowsFromDupDB(ctx context.Context, dupDB *pebble.DB, keyAdapter KeyAdapter) error { +func (m *DupeDetector) CollectDuplicateRowsFromDupDB(ctx context.Context, dupDB *pebble.DB, keyAdapter KeyAdapter) error { tasks, err := m.buildLocalDupTasks(dupDB, keyAdapter) if err != nil { return errors.Trace(err) @@ -715,7 +726,7 @@ func (m *DuplicateManager) CollectDuplicateRowsFromDupDB(ctx context.Context, du return errors.Trace(g.Wait()) } -func (m *DuplicateManager) splitKeyRangeByRegions( +func (m *DupeDetector) splitKeyRangeByRegions( ctx context.Context, keyRange tidbkv.KeyRange, ) ([]*split.RegionInfo, []tidbkv.KeyRange, error) { rawStartKey := codec.EncodeBytes(nil, keyRange.StartKey) @@ -758,7 +769,7 @@ func (m *DuplicateManager) splitKeyRangeByRegions( return regions, keyRanges, nil } -func (m *DuplicateManager) processRemoteDupTaskOnce( +func (m *DupeDetector) processRemoteDupTaskOnce( ctx context.Context, task dupTask, logger log.Logger, @@ -835,7 +846,7 @@ func (m *DuplicateManager) processRemoteDupTaskOnce( // processRemoteDupTask processes a remoteDupTask. A task contains a key range. // A key range is associated with multiple regions. processRemoteDupTask tries // to collect duplicates from each region. -func (m *DuplicateManager) processRemoteDupTask( +func (m *DupeDetector) processRemoteDupTask( ctx context.Context, task dupTask, logger log.Logger, @@ -870,7 +881,7 @@ func (m *DuplicateManager) processRemoteDupTask( } // CollectDuplicateRowsFromTiKV collects duplicates from the remote TiKV and records all duplicate row info into errorMgr. -func (m *DuplicateManager) CollectDuplicateRowsFromTiKV(ctx context.Context, importClientFactory ImportClientFactory) error { +func (m *DupeDetector) CollectDuplicateRowsFromTiKV(ctx context.Context, importClientFactory ImportClientFactory) error { tasks, err := m.buildDupTasks() if err != nil { return errors.Trace(err) @@ -901,3 +912,174 @@ func (m *DuplicateManager) CollectDuplicateRowsFromTiKV(ctx context.Context, imp } return errors.Trace(g.Wait()) } + +// DupeController is used to collect duplicate keys from local and remote data source and resolve duplication. +type DupeController struct { + splitCli split.SplitClient + tikvCli *tikv.KVStore + tikvCodec tikv.Codec + errorMgr *errormanager.ErrorManager + // number of workers to do duplicate detection on local db and TiKV + // on TiKV, it is the max number of regions being checked concurrently + dupeConcurrency int + duplicateDB *pebble.DB + keyAdapter KeyAdapter + importClientFactory ImportClientFactory +} + +// CollectLocalDuplicateRows collect duplicate keys from local db. We will store the duplicate keys which +// may be repeated with other keys in local data source. +func (local *DupeController) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (hasDupe bool, err error) { + logger := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "[detect-dupe] collect local duplicate keys") + defer func() { + logger.End(zap.ErrorLevel, err) + }() + + duplicateManager, err := NewDupeDetector(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec, + local.errorMgr, opts, local.dupeConcurrency, log.FromContext(ctx)) + if err != nil { + return false, errors.Trace(err) + } + if err := duplicateManager.CollectDuplicateRowsFromDupDB(ctx, local.duplicateDB, local.keyAdapter); err != nil { + return false, errors.Trace(err) + } + return duplicateManager.HasDuplicate(), nil +} + +// CollectRemoteDuplicateRows collect duplicate keys from remote TiKV storage. This keys may be duplicate with +// the data import by other lightning. +func (local *DupeController) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (hasDupe bool, err error) { + logger := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "[detect-dupe] collect remote duplicate keys") + defer func() { + logger.End(zap.ErrorLevel, err) + }() + + duplicateManager, err := NewDupeDetector(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec, + local.errorMgr, opts, local.dupeConcurrency, log.FromContext(ctx)) + if err != nil { + return false, errors.Trace(err) + } + if err := duplicateManager.CollectDuplicateRowsFromTiKV(ctx, local.importClientFactory); err != nil { + return false, errors.Trace(err) + } + return duplicateManager.HasDuplicate(), nil +} + +// ResolveDuplicateRows resolves duplicated rows by deleting/inserting data +// according to the required algorithm. +func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) (err error) { + logger := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "[resolve-dupe] resolve duplicate rows") + defer func() { + logger.End(zap.ErrorLevel, err) + }() + + switch algorithm { + case config.DupeResAlgRecord, config.DupeResAlgNone: + logger.Warn("[resolve-dupe] skipping resolution due to selected algorithm. this table will become inconsistent!", zap.Stringer("algorithm", algorithm)) + return nil + case config.DupeResAlgRemove: + default: + panic(fmt.Sprintf("[resolve-dupe] unknown resolution algorithm %v", algorithm)) + } + + // TODO: reuse the *kv.SessionOptions from NewEncoder for picking the correct time zone. + decoder, err := kv.NewTableKVDecoder(tbl, tableName, &encode.SessionOptions{ + SQLMode: mysql.ModeStrictAllTables, + }, log.FromContext(ctx)) + if err != nil { + return err + } + + tableIDs := physicalTableIDs(tbl.Meta()) + keyInTable := func(key []byte) bool { + return slices.Contains(tableIDs, tablecodec.DecodeTableID(key)) + } + + errLimiter := rate.NewLimiter(1, 1) + pool := utils.NewWorkerPool(uint(local.dupeConcurrency), "resolve duplicate rows") + err = local.errorMgr.ResolveAllConflictKeys( + ctx, tableName, pool, + func(ctx context.Context, handleRows [][2][]byte) error { + for { + err := local.deleteDuplicateRows(ctx, logger, handleRows, decoder, keyInTable) + if err == nil { + return nil + } + if types.ErrBadNumber.Equal(err) { + logger.Warn("delete duplicate rows encounter error", log.ShortError(err)) + return common.ErrResolveDuplicateRows.Wrap(err).GenWithStackByArgs(tableName) + } + if log.IsContextCanceledError(err) { + return err + } + if !tikverror.IsErrWriteConflict(errors.Cause(err)) { + logger.Warn("delete duplicate rows encounter error", log.ShortError(err)) + } + if err = errLimiter.Wait(ctx); err != nil { + return err + } + } + }, + ) + return errors.Trace(err) +} + +func (local *DupeController) deleteDuplicateRows( + ctx context.Context, + logger *log.Task, + handleRows [][2][]byte, + decoder *kv.TableKVDecoder, + keyInTable func(key []byte) bool, +) (err error) { + // Starts a Delete transaction. + txn, err := local.tikvCli.Begin() + if err != nil { + return err + } + defer func() { + if err == nil { + err = txn.Commit(ctx) + } else { + if rollbackErr := txn.Rollback(); rollbackErr != nil { + logger.Warn("failed to rollback transaction", zap.Error(rollbackErr)) + } + } + }() + + deleteKey := func(key []byte) error { + logger.Debug("[resolve-dupe] will delete key", logutil.Key("key", key)) + return txn.Delete(key) + } + + // Collect all rows & index keys into the deletion transaction. + // (if the number of duplicates is small this should fit entirely in memory) + // (Txn's MemBuf's bufferSizeLimit is currently infinity) + for _, handleRow := range handleRows { + // Skip the row key if it's not in the table. + // This can happen if the table has been recreated or truncated, + // and the duplicate key is from the old table. + if !keyInTable(handleRow[0]) { + continue + } + logger.Debug("[resolve-dupe] found row to resolve", + logutil.Key("handle", handleRow[0]), + logutil.Key("row", handleRow[1])) + + if err := deleteKey(handleRow[0]); err != nil { + return err + } + + handle, err := decoder.DecodeHandleFromRowKey(handleRow[0]) + if err != nil { + return err + } + + err = decoder.IterRawIndexKeys(handle, handleRow[1], deleteKey) + if err != nil { + return err + } + } + + logger.Debug("[resolve-dupe] number of KV pairs to be deleted", zap.Int("count", txn.Len())) + return nil +} diff --git a/br/pkg/lightning/backend/local/duplicate_test.go b/br/pkg/lightning/backend/local/duplicate_test.go index 723f975028e5a..f88ea248f956d 100644 --- a/br/pkg/lightning/backend/local/duplicate_test.go +++ b/br/pkg/lightning/backend/local/duplicate_test.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/mock" "github.com/stretchr/testify/require" - "go.uber.org/atomic" ) func TestBuildDupTask(t *testing.T) { @@ -54,8 +53,8 @@ func TestBuildDupTask(t *testing.T) { {&encode.SessionOptions{IndexID: info.Indices[1].ID}, false}, } for _, tc := range testCases { - dupMgr, err := local.NewDuplicateManager(tbl, "t", nil, nil, keyspace.CodecV1, nil, - tc.sessOpt, 4, atomic.NewBool(false), log.FromContext(context.Background())) + dupMgr, err := local.NewDupeDetector(tbl, "t", nil, nil, keyspace.CodecV1, nil, + tc.sessOpt, 4, log.FromContext(context.Background())) require.NoError(t, err) tasks, err := local.BuildDuplicateTaskForTest(dupMgr) require.NoError(t, err) diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index f08b191937b8f..d555ee8ecd715 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -38,7 +38,6 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/br/pkg/lightning/common" - "github.com/pingcap/tidb/br/pkg/lightning/errormanager" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/membuf" @@ -127,7 +126,7 @@ type Engine struct { config backend.LocalEngineConfig tableInfo *checkpoints.TidbTableInfo - dupDetectOpt dupDetectOpt + dupDetectOpt DupDetectOpt // total size of SST files waiting to be ingested pendingFileSize atomic.Int64 @@ -139,7 +138,6 @@ type Engine struct { keyAdapter KeyAdapter duplicateDetection bool duplicateDB *pebble.DB - errorMgr *errormanager.ErrorManager logger log.Logger } diff --git a/br/pkg/lightning/backend/local/iterator.go b/br/pkg/lightning/backend/local/iterator.go index d0394a158fb1c..feb3bbc8a7d94 100644 --- a/br/pkg/lightning/backend/local/iterator.go +++ b/br/pkg/lightning/backend/local/iterator.go @@ -80,11 +80,12 @@ type dupDetectIter struct { writeBatch *pebble.Batch writeBatchSize int64 logger log.Logger - option dupDetectOpt + option DupDetectOpt } -type dupDetectOpt struct { - reportErrOnDup bool +// DupDetectOpt is the option for duplicate detection. +type DupDetectOpt struct { + ReportErrOnDup bool } func (d *dupDetectIter) Seek(key []byte) bool { @@ -152,7 +153,7 @@ func (d *dupDetectIter) Next() bool { d.curVal = append(d.curVal[:0], d.iter.Value()...) return true } - if d.option.reportErrOnDup { + if d.option.ReportErrOnDup { dupKey := make([]byte, len(d.curKey)) dupVal := make([]byte, len(d.iter.Value())) copy(dupKey, d.curKey) @@ -200,7 +201,7 @@ func (d *dupDetectIter) OpType() sst.Pair_OP { var _ Iter = &dupDetectIter{} func newDupDetectIter(db *pebble.DB, keyAdapter KeyAdapter, - opts *pebble.IterOptions, dupDB *pebble.DB, logger log.Logger, dupOpt dupDetectOpt) *dupDetectIter { + opts *pebble.IterOptions, dupDB *pebble.DB, logger log.Logger, dupOpt DupDetectOpt) *dupDetectIter { newOpts := &pebble.IterOptions{TableFilter: opts.TableFilter} if len(opts.LowerBound) > 0 { newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, MinRowID) diff --git a/br/pkg/lightning/backend/local/iterator_test.go b/br/pkg/lightning/backend/local/iterator_test.go index 75560ac012cd3..0a00cb5864cc0 100644 --- a/br/pkg/lightning/backend/local/iterator_test.go +++ b/br/pkg/lightning/backend/local/iterator_test.go @@ -121,7 +121,7 @@ func TestDupDetectIterator(t *testing.T) { dupDB, err := pebble.Open(filepath.Join(storeDir, "duplicates"), &pebble.Options{}) require.NoError(t, err) var iter Iter - iter = newDupDetectIter(db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L(), dupDetectOpt{}) + iter = newDupDetectIter(db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L(), DupDetectOpt{}) sort.Slice(pairs, func(i, j int) bool { key1 := keyAdapter.Encode(nil, pairs[i].Key, pairs[i].RowID) key2 := keyAdapter.Encode(nil, pairs[j].Key, pairs[j].RowID) @@ -216,7 +216,7 @@ func TestDupDetectIterSeek(t *testing.T) { dupDB, err := pebble.Open(filepath.Join(storeDir, "duplicates"), &pebble.Options{}) require.NoError(t, err) - iter := newDupDetectIter(db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L(), dupDetectOpt{}) + iter := newDupDetectIter(db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L(), DupDetectOpt{}) require.True(t, iter.Seek([]byte{1, 2, 3, 1})) require.Equal(t, pairs[1].Val, iter.Value()) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 0b9d000d8ebf7..da870e7d4495b 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -17,7 +17,6 @@ package local import ( "bytes" "context" - "fmt" "io" "math" "net" @@ -50,27 +49,20 @@ import ( "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/restore/split" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/store/pdtypes" - "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/engine" "github.com/pingcap/tidb/util/mathutil" - tikverror "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/oracle" tikvclient "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" "go.uber.org/atomic" "go.uber.org/zap" - "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" - "golang.org/x/time/rate" "google.golang.org/grpc" "google.golang.org/grpc/backoff" "google.golang.org/grpc/codes" @@ -216,6 +208,7 @@ func (f *importClientFactoryImpl) getGrpcConn(ctx context.Context, storeID uint6 }) } +// Create creates a new import client for specific store. func (f *importClientFactoryImpl) Create(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) { conn, err := f.getGrpcConn(ctx, storeID) if err != nil { @@ -224,6 +217,7 @@ func (f *importClientFactoryImpl) Create(ctx context.Context, storeID uint64) (s return sst.NewImportSSTClient(conn), nil } +// Close closes the factory. func (f *importClientFactoryImpl) Close() { f.conns.Close() } @@ -232,6 +226,7 @@ type loggingConn struct { net.Conn } +// Write implements net.Conn.Write func (c loggingConn) Write(b []byte) (int, error) { log.L().Debug("import write", zap.Int("bytes", len(b))) return c.Conn.Write(b) @@ -329,6 +324,7 @@ type tblName struct { type tblNames []tblName +// String implements fmt.Stringer func (t tblNames) String() string { var b strings.Builder b.WriteByte('[') @@ -383,48 +379,89 @@ func checkTiFlashVersion(ctx context.Context, g glue.Glue, checkCtx *backend.Che return nil } -type local struct { +// BackendConfig is the config for local backend. +type BackendConfig struct { + // comma separated list of PD endpoints. + PDAddr string + LocalStoreDir string + // max number of cached grpc.ClientConn to a store. + // note: this is not the limit of actual connections, each grpc.ClientConn can have one or more of it. + MaxConnPerStore int + // compress type when write or ingest into tikv + ConnCompressType config.CompressionType + // number of import(write & ingest) workers + WorkerConcurrency int + KVWriteBatchSize int + CheckpointEnabled bool + // memory table size of pebble. since pebble can have multiple mem tables, the max memory used is + // MemTableSize * MemTableStopWritesThreshold, see pebble.Options for more details. + MemTableSize int + LocalWriterMemCacheSize int64 + // whether check TiKV capacity before write & ingest. + ShouldCheckTiKV bool + DupeDetectEnabled bool + DuplicateDetectOpt DupDetectOpt + // max write speed in bytes per second to each store(burst is allowed), 0 means no limit + StoreWriteBWLimit int + // When TiKV is in normal mode, ingesting too many SSTs will cause TiKV write stall. + // To avoid this, we should check write stall before ingesting SSTs. Note that, we + // must check both leader node and followers in client side, because followers will + // not check write stall as long as ingest command is accepted by leader. + ShouldCheckWriteStall bool + // soft limit on the number of open files that can be used by pebble DB. + // the minimum value is 128. + MaxOpenFiles int + KeyspaceName string +} + +// NewBackendConfig creates a new BackendConfig. +func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName string) BackendConfig { + return BackendConfig{ + PDAddr: cfg.TiDB.PdAddr, + LocalStoreDir: cfg.TikvImporter.SortedKVDir, + MaxConnPerStore: cfg.TikvImporter.RangeConcurrency, + ConnCompressType: cfg.TikvImporter.CompressKVPairs, + WorkerConcurrency: cfg.TikvImporter.RangeConcurrency * 2, + KVWriteBatchSize: cfg.TikvImporter.SendKVPairs, + CheckpointEnabled: cfg.Checkpoint.Enable, + MemTableSize: int(cfg.TikvImporter.EngineMemCacheSize), + LocalWriterMemCacheSize: int64(cfg.TikvImporter.LocalWriterMemCacheSize), + ShouldCheckTiKV: cfg.App.CheckRequirements, + DupeDetectEnabled: cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone, + DuplicateDetectOpt: DupDetectOpt{ReportErrOnDup: cfg.TikvImporter.DuplicateResolution == config.DupeResAlgErr}, + StoreWriteBWLimit: int(cfg.TikvImporter.StoreWriteBWLimit), + ShouldCheckWriteStall: cfg.Cron.SwitchMode.Duration == 0, + MaxOpenFiles: maxOpenFiles, + KeyspaceName: keyspaceName, + } +} + +func (c *BackendConfig) adjust() { + c.MaxOpenFiles = mathutil.Max(c.MaxOpenFiles, openFilesLowerThreshold) +} + +// Local is a local backend. +type Local struct { engines sync.Map // sync version of map[uuid.UUID]*Engine pdCtl *pdutil.PdController splitCli split.SplitClient tikvCli *tikvclient.KVStore tls *common.TLS - pdAddr string regionSizeGetter TableRegionSizeGetter tikvCodec tikvclient.Codec - localStoreDir string - - workerConcurrency int - kvWriteBatchSize int - checkpointEnabled bool + BackendConfig - dupeConcurrency int - maxOpenFiles int - - engineMemCacheSize int - localWriterMemCacheSize int64 - supportMultiIngest bool - - shouldCheckTiKV bool - duplicateDetection bool - duplicateDetectOpt dupDetectOpt + supportMultiIngest bool duplicateDB *pebble.DB keyAdapter KeyAdapter - errorMgr *errormanager.ErrorManager importClientFactory ImportClientFactory bufferPool *membuf.Pool metrics *metric.Metrics writeLimiter StoreWriteLimiter logger log.Logger - - // When TiKV is in normal mode, ingesting too many SSTs will cause TiKV write stall. - // To avoid this, we should check write stall before ingesting SSTs. Note that, we - // must check both leader node and followers in client side, because followers will - // not check write stall as long as ingest command is accepted by leader. - shouldCheckWriteStall bool } func openDuplicateDB(storeDir string) (*pebble.DB, error) { @@ -449,24 +486,19 @@ var ( func NewLocalBackend( ctx context.Context, tls *common.TLS, - cfg *config.Config, + config BackendConfig, regionSizeGetter TableRegionSizeGetter, - maxOpenFiles int, - errorMgr *errormanager.ErrorManager, - keyspaceName string, ) (backend.Backend, error) { - localFile := cfg.TikvImporter.SortedKVDir - rangeConcurrency := cfg.TikvImporter.RangeConcurrency - - pdCtl, err := pdutil.NewPdController(ctx, cfg.TiDB.PdAddr, tls.TLSConfig(), tls.ToPDSecurityOption()) + config.adjust() + pdCtl, err := pdutil.NewPdController(ctx, config.PDAddr, tls.TLSConfig(), tls.ToPDSecurityOption()) if err != nil { return backend.MakeBackend(nil), common.NormalizeOrWrapErr(common.ErrCreatePDClient, err) } splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig(), false) shouldCreate := true - if cfg.Checkpoint.Enable { - if info, err := os.Stat(localFile); err != nil { + if config.CheckpointEnabled { + if info, err := os.Stat(config.LocalStoreDir); err != nil { if !os.IsNotExist(err) { return backend.MakeBackend(nil), err } @@ -476,31 +508,31 @@ func NewLocalBackend( } if shouldCreate { - err = os.Mkdir(localFile, 0o700) + err = os.Mkdir(config.LocalStoreDir, 0o700) if err != nil { - return backend.MakeBackend(nil), common.ErrInvalidSortedKVDir.Wrap(err).GenWithStackByArgs(localFile) + return backend.MakeBackend(nil), common.ErrInvalidSortedKVDir.Wrap(err).GenWithStackByArgs(config.LocalStoreDir) } } var duplicateDB *pebble.DB - if cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone { - duplicateDB, err = openDuplicateDB(localFile) + if config.DupeDetectEnabled { + duplicateDB, err = openDuplicateDB(config.LocalStoreDir) if err != nil { return backend.MakeBackend(nil), common.ErrOpenDuplicateDB.Wrap(err).GenWithStackByArgs() } } // The following copies tikv.NewTxnClient without creating yet another pdClient. - spkv, err := tikvclient.NewEtcdSafePointKV(strings.Split(cfg.TiDB.PdAddr, ","), tls.TLSConfig()) + spkv, err := tikvclient.NewEtcdSafePointKV(strings.Split(config.PDAddr, ","), tls.TLSConfig()) if err != nil { return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs() } var pdCliForTiKV *tikvclient.CodecPDClient - if keyspaceName == "" { + if config.KeyspaceName == "" { pdCliForTiKV = tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient()) } else { - pdCliForTiKV, err = tikvclient.NewCodecPDClientWithKeyspace(tikvclient.ModeTxn, pdCtl.GetPDClient(), keyspaceName) + pdCliForTiKV, err = tikvclient.NewCodecPDClientWithKeyspace(tikvclient.ModeTxn, pdCtl.GetPDClient(), config.KeyspaceName) if err != nil { return backend.MakeBackend(nil), common.ErrCreatePDClient.Wrap(err).GenWithStackByArgs() } @@ -512,15 +544,14 @@ func NewLocalBackend( if err != nil { return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs() } - importClientFactory := newImportClientFactoryImpl(splitCli, tls, rangeConcurrency, cfg.TikvImporter.CompressKVPairs) - duplicateDetection := cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone + importClientFactory := newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType) keyAdapter := KeyAdapter(noopKeyAdapter{}) - if duplicateDetection { + if config.DupeDetectEnabled { keyAdapter = dupDetectKeyAdapter{} } var writeLimiter StoreWriteLimiter - if cfg.TikvImporter.StoreWriteBWLimit > 0 { - writeLimiter = newStoreWriteLimiter(int(cfg.TikvImporter.StoreWriteBWLimit)) + if config.StoreWriteBWLimit > 0 { + writeLimiter = newStoreWriteLimiter(config.StoreWriteBWLimit) } else { writeLimiter = noopStoreWriteLimiter{} } @@ -529,36 +560,23 @@ func NewLocalBackend( alloc.RefCnt = new(atomic.Int64) LastAlloc = alloc } - local := &local{ + local := &Local{ engines: sync.Map{}, pdCtl: pdCtl, splitCli: splitCli, tikvCli: tikvCli, tls: tls, - pdAddr: cfg.TiDB.PdAddr, regionSizeGetter: regionSizeGetter, tikvCodec: tikvCodec, - localStoreDir: localFile, - workerConcurrency: rangeConcurrency * 2, - dupeConcurrency: rangeConcurrency * 2, - kvWriteBatchSize: cfg.TikvImporter.SendKVPairs, - checkpointEnabled: cfg.Checkpoint.Enable, - maxOpenFiles: mathutil.Max(maxOpenFiles, openFilesLowerThreshold), - - engineMemCacheSize: int(cfg.TikvImporter.EngineMemCacheSize), - localWriterMemCacheSize: int64(cfg.TikvImporter.LocalWriterMemCacheSize), - duplicateDetection: duplicateDetection, - duplicateDetectOpt: dupDetectOpt{duplicateDetection && cfg.TikvImporter.DuplicateResolution == config.DupeResAlgErr}, - shouldCheckTiKV: cfg.App.CheckRequirements, - duplicateDB: duplicateDB, - keyAdapter: keyAdapter, - errorMgr: errorMgr, - importClientFactory: importClientFactory, - bufferPool: membuf.NewPool(membuf.WithAllocator(alloc)), - writeLimiter: writeLimiter, - logger: log.FromContext(ctx), - shouldCheckWriteStall: cfg.Cron.SwitchMode.Duration == 0, + BackendConfig: config, + + duplicateDB: duplicateDB, + keyAdapter: keyAdapter, + importClientFactory: importClientFactory, + bufferPool: membuf.NewPool(membuf.WithAllocator(alloc)), + writeLimiter: writeLimiter, + logger: log.FromContext(ctx), } if m, ok := metric.FromContext(ctx); ok { local.metrics = m @@ -570,7 +588,8 @@ func NewLocalBackend( return backend.MakeBackend(local), nil } -func (local *local) TotalMemoryConsume() int64 { +// TotalMemoryConsume returns the total memory usage of the local backend. +func (local *Local) TotalMemoryConsume() int64 { var memConsume int64 = 0 local.engines.Range(func(k, v interface{}) bool { e := v.(*Engine) @@ -582,7 +601,7 @@ func (local *local) TotalMemoryConsume() int64 { return memConsume + local.bufferPool.TotalSize() } -func (local *local) checkMultiIngestSupport(ctx context.Context) error { +func (local *Local) checkMultiIngestSupport(ctx context.Context) error { stores, err := local.pdCtl.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone()) if err != nil { return errors.Trace(err) @@ -648,7 +667,7 @@ func (local *local) checkMultiIngestSupport(ctx context.Context) error { } // rlock read locks a local file and returns the Engine instance if it exists. -func (local *local) rLockEngine(engineID uuid.UUID) *Engine { +func (local *Local) rLockEngine(engineID uuid.UUID) *Engine { if e, ok := local.engines.Load(engineID); ok { engine := e.(*Engine) engine.rLock() @@ -658,7 +677,7 @@ func (local *local) rLockEngine(engineID uuid.UUID) *Engine { } // lock locks a local file and returns the Engine instance if it exists. -func (local *local) lockEngine(engineID uuid.UUID, state importMutexState) *Engine { +func (local *Local) lockEngine(engineID uuid.UUID, state importMutexState) *Engine { if e, ok := local.engines.Load(engineID); ok { engine := e.(*Engine) engine.lock(state) @@ -668,7 +687,7 @@ func (local *local) lockEngine(engineID uuid.UUID, state importMutexState) *Engi } // tryRLockAllEngines tries to read lock all engines, return all `Engine`s that are successfully locked. -func (local *local) tryRLockAllEngines() []*Engine { +func (local *Local) tryRLockAllEngines() []*Engine { var allEngines []*Engine local.engines.Range(func(k, v interface{}) bool { engine := v.(*Engine) @@ -687,7 +706,7 @@ func (local *local) tryRLockAllEngines() []*Engine { // lockAllEnginesUnless tries to lock all engines, unless those which are already locked in the // state given by ignoreStateMask. Returns the list of locked engines. -func (local *local) lockAllEnginesUnless(newState, ignoreStateMask importMutexState) []*Engine { +func (local *Local) lockAllEnginesUnless(newState, ignoreStateMask importMutexState) []*Engine { var allEngines []*Engine local.engines.Range(func(k, v interface{}) bool { engine := v.(*Engine) @@ -700,7 +719,7 @@ func (local *local) lockAllEnginesUnless(newState, ignoreStateMask importMutexSt } // Close the local backend. -func (local *local) Close() { +func (local *Local) Close() { allEngines := local.lockAllEnginesUnless(importMutexStateClose, 0) local.engines = sync.Map{} @@ -731,8 +750,8 @@ func (local *local) Close() { } // If checkpoint is disabled, or we don't detect any duplicate, then this duplicate // db dir will be useless, so we clean up this dir. - if allIsWell && (!local.checkpointEnabled || !hasDuplicates) { - if err := os.RemoveAll(filepath.Join(local.localStoreDir, duplicateDBName)); err != nil { + if allIsWell && (!local.CheckpointEnabled || !hasDuplicates) { + if err := os.RemoveAll(filepath.Join(local.LocalStoreDir, duplicateDBName)); err != nil { local.logger.Warn("remove duplicate db file failed", zap.Error(err)) } } @@ -741,8 +760,8 @@ func (local *local) Close() { // if checkpoint is disable or we finish load all data successfully, then files in this // dir will be useless, so we clean up this dir and all files in it. - if !local.checkpointEnabled || common.IsEmptyDir(local.localStoreDir) { - err := os.RemoveAll(local.localStoreDir) + if !local.CheckpointEnabled || common.IsEmptyDir(local.LocalStoreDir) { + err := os.RemoveAll(local.LocalStoreDir) if err != nil { local.logger.Warn("remove local db file failed", zap.Error(err)) } @@ -752,7 +771,7 @@ func (local *local) Close() { } // FlushEngine ensure the written data is saved successfully, to make sure no data lose after restart -func (local *local) FlushEngine(ctx context.Context, engineID uuid.UUID) error { +func (local *Local) FlushEngine(ctx context.Context, engineID uuid.UUID) error { engine := local.rLockEngine(engineID) // the engine cannot be deleted after while we've acquired the lock identified by UUID. @@ -766,7 +785,8 @@ func (local *local) FlushEngine(ctx context.Context, engineID uuid.UUID) error { return engine.flushEngineWithoutLock(ctx) } -func (local *local) FlushAllEngines(parentCtx context.Context) (err error) { +// FlushAllEngines flush all engines. +func (local *Local) FlushAllEngines(parentCtx context.Context) (err error) { allEngines := local.tryRLockAllEngines() defer func() { for _, engine := range allEngines { @@ -784,17 +804,19 @@ func (local *local) FlushAllEngines(parentCtx context.Context) (err error) { return eg.Wait() } -func (local *local) RetryImportDelay() time.Duration { +// RetryImportDelay returns the delay time before retrying to import a file. +func (local *Local) RetryImportDelay() time.Duration { return defaultRetryBackoffTime } -func (local *local) ShouldPostProcess() bool { +// ShouldPostProcess returns true if the backend should post process the data. +func (local *Local) ShouldPostProcess() bool { return true } -func (local *local) openEngineDB(engineUUID uuid.UUID, readOnly bool) (*pebble.DB, error) { +func (local *Local) openEngineDB(engineUUID uuid.UUID, readOnly bool) (*pebble.DB, error) { opt := &pebble.Options{ - MemTableSize: local.engineMemCacheSize, + MemTableSize: local.MemTableSize, // the default threshold value may cause write stall. MemTableStopWritesThreshold: 8, MaxConcurrentCompactions: 16, @@ -802,7 +824,7 @@ func (local *local) openEngineDB(engineUUID uuid.UUID, readOnly bool) (*pebble.D L0CompactionThreshold: math.MaxInt32, L0StopWritesThreshold: math.MaxInt32, LBaseMaxBytes: 16 * units.TiB, - MaxOpenFiles: local.maxOpenFiles, + MaxOpenFiles: local.MaxOpenFiles, DisableWAL: true, ReadOnly: readOnly, TablePropertyCollectors: []func() pebble.TablePropertyCollector{ @@ -816,19 +838,19 @@ func (local *local) openEngineDB(engineUUID uuid.UUID, readOnly bool) (*pebble.D }, } - dbPath := filepath.Join(local.localStoreDir, engineUUID.String()) + dbPath := filepath.Join(local.LocalStoreDir, engineUUID.String()) db, err := pebble.Open(dbPath, opt) return db, errors.Trace(err) } // OpenEngine must be called with holding mutex of Engine. -func (local *local) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error { +func (local *Local) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error { db, err := local.openEngineDB(engineUUID, false) if err != nil { return err } - sstDir := engineSSTDir(local.localStoreDir, engineUUID) + sstDir := engineSSTDir(local.LocalStoreDir, engineUUID) if err := os.RemoveAll(sstDir); err != nil { return errors.Trace(err) } @@ -847,10 +869,9 @@ func (local *local) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, e cancel: cancel, config: cfg.Local, tableInfo: cfg.TableInfo, - duplicateDetection: local.duplicateDetection, - dupDetectOpt: local.duplicateDetectOpt, + duplicateDetection: local.DupeDetectEnabled, + dupDetectOpt: local.DuplicateDetectOpt, duplicateDB: local.duplicateDB, - errorMgr: local.errorMgr, keyAdapter: local.keyAdapter, logger: log.FromContext(ctx), }) @@ -868,7 +889,7 @@ func (local *local) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, e return nil } -func (local *local) allocateTSIfNotExists(ctx context.Context, engine *Engine) error { +func (local *Local) allocateTSIfNotExists(ctx context.Context, engine *Engine) error { if engine.TS > 0 { return nil } @@ -882,7 +903,7 @@ func (local *local) allocateTSIfNotExists(ctx context.Context, engine *Engine) e } // CloseEngine closes backend engine by uuid. -func (local *local) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error { +func (local *Local) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error { // flush mem table to storage, to free memory, // ask others' advise, looks like unnecessary, but with this we can control memory precisely. engineI, ok := local.engines.Load(engineUUID) @@ -898,10 +919,9 @@ func (local *local) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, sstMetasChan: make(chan metaOrFlush), tableInfo: cfg.TableInfo, keyAdapter: local.keyAdapter, - duplicateDetection: local.duplicateDetection, - dupDetectOpt: local.duplicateDetectOpt, + duplicateDetection: local.DupeDetectEnabled, + dupDetectOpt: local.DuplicateDetectOpt, duplicateDB: local.duplicateDB, - errorMgr: local.errorMgr, logger: log.FromContext(ctx), } engine.sstIngester = dbSSTIngester{e: engine} @@ -935,7 +955,7 @@ func (local *local) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, return engine.ingestErr.Get() } -func (local *local) getImportClient(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) { +func (local *Local) getImportClient(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) { return local.importClientFactory.Create(ctx, storeID) } @@ -974,7 +994,7 @@ func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit return ranges } -func (local *local) readAndSplitIntoRange( +func (local *Local) readAndSplitIntoRange( ctx context.Context, engine *Engine, sizeLimit int64, @@ -1017,7 +1037,7 @@ func (local *local) readAndSplitIntoRange( // prepareAndGenerateUnfinishedJob will read the engine to get unfinished key range, // then split and scatter regions for these range and generate region jobs. -func (local *local) prepareAndGenerateUnfinishedJob( +func (local *Local) prepareAndGenerateUnfinishedJob( ctx context.Context, engineUUID uuid.UUID, lf *Engine, @@ -1064,7 +1084,7 @@ func (local *local) prepareAndGenerateUnfinishedJob( } // generateJobInRanges scans the region in ranges and generate region jobs. -func (local *local) generateJobInRanges( +func (local *Local) generateJobInRanges( ctx context.Context, engine *Engine, jobRanges []Range, @@ -1147,7 +1167,7 @@ func (local *local) generateJobInRanges( // stops. // this function must send the job back to jobOutCh after read it from jobInCh, // even if any error happens. -func (local *local) startWorker( +func (local *Local) startWorker( ctx context.Context, jobInCh, jobOutCh chan *regionJob, ) error { @@ -1182,7 +1202,7 @@ func (local *local) startWorker( } } -func (local *local) isRetryableImportTiKVError(err error) bool { +func (local *Local) isRetryableImportTiKVError(err error) bool { err = errors.Cause(err) // io.EOF is not retryable in normal case // but on TiKV restart, if we're writing to TiKV(through GRPC) @@ -1200,7 +1220,7 @@ func (local *local) isRetryableImportTiKVError(err error) bool { // If non-retryable error occurs, it will return the error. // If retryable error occurs, it will return nil and caller should check the stage // of the regionJob to determine what to do with it. -func (local *local) executeJob( +func (local *Local) executeJob( ctx context.Context, job *regionJob, ) error { @@ -1208,7 +1228,7 @@ func (local *local) executeJob( failpoint.Return( errors.Errorf("The available disk of TiKV (%s) only left %d, and capacity is %d", "", 0, 0)) }) - if local.shouldCheckTiKV { + if local.ShouldCheckTiKV { for _, peer := range job.region.Region.GetPeers() { var ( store *pdtypes.StoreInfo @@ -1238,7 +1258,7 @@ func (local *local) executeJob( err := job.writeToTiKV(ctx, local.tikvCodec.GetAPIVersion(), local.importClientFactory, - local.kvWriteBatchSize, + local.KVWriteBatchSize, local.bufferPool, local.writeLimiter) if err != nil { @@ -1256,7 +1276,7 @@ func (local *local) executeJob( local.importClientFactory, local.splitCli, local.supportMultiIngest, - local.shouldCheckWriteStall, + local.ShouldCheckWriteStall, ) if err != nil { if !local.isRetryableImportTiKVError(err) { @@ -1269,7 +1289,8 @@ func (local *local) executeJob( return nil } -func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error { +// ImportEngine imports an engine to TiKV. +func (local *Local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error { lf := local.lockEngine(engineUUID, importMutexStateImport) if lf == nil { // skip if engine not exist. See the comment of `CloseEngine` for more detail. @@ -1383,7 +1404,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi } }() - for i := 0; i < local.workerConcurrency; i++ { + for i := 0; i < local.WorkerConcurrency; i++ { workGroup.Go(func() error { return local.startWorker(workerCtx, jobToWorkerCh, jobFromWorkerCh) }) @@ -1450,160 +1471,8 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi return workGroup.Wait() } -func (local *local) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (hasDupe bool, err error) { - logger := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "[detect-dupe] collect local duplicate keys") - defer func() { - logger.End(zap.ErrorLevel, err) - }() - - atomicHasDupe := atomic.NewBool(false) - duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec, - local.errorMgr, opts, local.dupeConcurrency, atomicHasDupe, log.FromContext(ctx)) - if err != nil { - return false, errors.Trace(err) - } - if err := duplicateManager.CollectDuplicateRowsFromDupDB(ctx, local.duplicateDB, local.keyAdapter); err != nil { - return false, errors.Trace(err) - } - return atomicHasDupe.Load(), nil -} - -func (local *local) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (hasDupe bool, err error) { - logger := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "[detect-dupe] collect remote duplicate keys") - defer func() { - logger.End(zap.ErrorLevel, err) - }() - - atomicHasDupe := atomic.NewBool(false) - duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec, - local.errorMgr, opts, local.dupeConcurrency, atomicHasDupe, log.FromContext(ctx)) - if err != nil { - return false, errors.Trace(err) - } - if err := duplicateManager.CollectDuplicateRowsFromTiKV(ctx, local.importClientFactory); err != nil { - return false, errors.Trace(err) - } - return atomicHasDupe.Load(), nil -} - -func (local *local) ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) (err error) { - logger := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "[resolve-dupe] resolve duplicate rows") - defer func() { - logger.End(zap.ErrorLevel, err) - }() - - switch algorithm { - case config.DupeResAlgRecord, config.DupeResAlgNone: - logger.Warn("[resolve-dupe] skipping resolution due to selected algorithm. this table will become inconsistent!", zap.Stringer("algorithm", algorithm)) - return nil - case config.DupeResAlgRemove: - default: - panic(fmt.Sprintf("[resolve-dupe] unknown resolution algorithm %v", algorithm)) - } - - // TODO: reuse the *kv.SessionOptions from NewEncoder for picking the correct time zone. - decoder, err := kv.NewTableKVDecoder(tbl, tableName, &encode.SessionOptions{ - SQLMode: mysql.ModeStrictAllTables, - }, log.FromContext(ctx)) - if err != nil { - return err - } - - tableIDs := physicalTableIDs(tbl.Meta()) - keyInTable := func(key []byte) bool { - return slices.Contains(tableIDs, tablecodec.DecodeTableID(key)) - } - - errLimiter := rate.NewLimiter(1, 1) - pool := utils.NewWorkerPool(uint(local.dupeConcurrency), "resolve duplicate rows") - err = local.errorMgr.ResolveAllConflictKeys( - ctx, tableName, pool, - func(ctx context.Context, handleRows [][2][]byte) error { - for { - err := local.deleteDuplicateRows(ctx, logger, handleRows, decoder, keyInTable) - if err == nil { - return nil - } - if types.ErrBadNumber.Equal(err) { - logger.Warn("delete duplicate rows encounter error", log.ShortError(err)) - return common.ErrResolveDuplicateRows.Wrap(err).GenWithStackByArgs(tableName) - } - if log.IsContextCanceledError(err) { - return err - } - if !tikverror.IsErrWriteConflict(errors.Cause(err)) { - logger.Warn("delete duplicate rows encounter error", log.ShortError(err)) - } - if err = errLimiter.Wait(ctx); err != nil { - return err - } - } - }, - ) - return errors.Trace(err) -} - -func (local *local) deleteDuplicateRows( - ctx context.Context, - logger *log.Task, - handleRows [][2][]byte, - decoder *kv.TableKVDecoder, - keyInTable func(key []byte) bool, -) (err error) { - // Starts a Delete transaction. - txn, err := local.tikvCli.Begin() - if err != nil { - return err - } - defer func() { - if err == nil { - err = txn.Commit(ctx) - } else { - if rollbackErr := txn.Rollback(); rollbackErr != nil { - logger.Warn("failed to rollback transaction", zap.Error(rollbackErr)) - } - } - }() - - deleteKey := func(key []byte) error { - logger.Debug("[resolve-dupe] will delete key", logutil.Key("key", key)) - return txn.Delete(key) - } - - // Collect all rows & index keys into the deletion transaction. - // (if the number of duplicates is small this should fit entirely in memory) - // (Txn's MemBuf's bufferSizeLimit is currently infinity) - for _, handleRow := range handleRows { - // Skip the row key if it's not in the table. - // This can happen if the table has been recreated or truncated, - // and the duplicate key is from the old table. - if !keyInTable(handleRow[0]) { - continue - } - logger.Debug("[resolve-dupe] found row to resolve", - logutil.Key("handle", handleRow[0]), - logutil.Key("row", handleRow[1])) - - if err := deleteKey(handleRow[0]); err != nil { - return err - } - - handle, err := decoder.DecodeHandleFromRowKey(handleRow[0]) - if err != nil { - return err - } - - err = decoder.IterRawIndexKeys(handle, handleRow[1], deleteKey) - if err != nil { - return err - } - } - - logger.Debug("[resolve-dupe] number of KV pairs to be deleted", zap.Int("count", txn.Len())) - return nil -} - -func (local *local) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error { +// ResetEngine reset the engine and reclaim the space. +func (local *Local) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error { // the only way to reset the engine + reclaim the space is to delete and reopen it 🤷 localEngine := local.lockEngine(engineUUID, importMutexStateClose) if localEngine == nil { @@ -1614,7 +1483,7 @@ func (local *local) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error if err := localEngine.Close(); err != nil { return err } - if err := localEngine.Cleanup(local.localStoreDir); err != nil { + if err := localEngine.Cleanup(local.LocalStoreDir); err != nil { return err } db, err := local.openEngineDB(engineUUID, false) @@ -1636,7 +1505,8 @@ func (local *local) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error return err } -func (local *local) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error { +// CleanupEngine cleanup the engine and reclaim the space. +func (local *Local) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error { localEngine := local.lockEngine(engineUUID, importMutexStateClose) // release this engine after import success if localEngine == nil { @@ -1654,7 +1524,7 @@ func (local *local) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) err if err != nil { return err } - err = localEngine.Cleanup(local.localStoreDir) + err = localEngine.Cleanup(local.LocalStoreDir) if err != nil { return err } @@ -1663,17 +1533,32 @@ func (local *local) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) err return nil } +// GetDupeController returns a new dupe controller. +func (local *Local) GetDupeController(dupeConcurrency int, errorMgr *errormanager.ErrorManager) *DupeController { + return &DupeController{ + splitCli: local.splitCli, + tikvCli: local.tikvCli, + tikvCodec: local.tikvCodec, + errorMgr: errorMgr, + dupeConcurrency: dupeConcurrency, + duplicateDB: local.duplicateDB, + keyAdapter: local.keyAdapter, + importClientFactory: local.importClientFactory, + } +} + func engineSSTDir(storeDir string, engineUUID uuid.UUID) string { return filepath.Join(storeDir, engineUUID.String()+".sst") } -func (local *local) LocalWriter(ctx context.Context, cfg *backend.LocalWriterConfig, engineUUID uuid.UUID) (backend.EngineWriter, error) { +// LocalWriter returns a new local writer. +func (local *Local) LocalWriter(ctx context.Context, cfg *backend.LocalWriterConfig, engineUUID uuid.UUID) (backend.EngineWriter, error) { e, ok := local.engines.Load(engineUUID) if !ok { return nil, errors.Errorf("could not find engine for %s", engineUUID.String()) } engine := e.(*Engine) - return openLocalWriter(cfg, engine, local.tikvCodec, local.localWriterMemCacheSize, local.bufferPool.NewBuffer()) + return openLocalWriter(cfg, engine, local.tikvCodec, local.LocalWriterMemCacheSize, local.bufferPool.NewBuffer()) } func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, tikvCodec tikvclient.Codec, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) { @@ -1721,7 +1606,8 @@ func nextKey(key []byte) []byte { return res } -func (local *local) EngineFileSizes() (res []backend.EngineFileSize) { +// EngineFileSizes returns the file size of each engine. +func (local *Local) EngineFileSizes() (res []backend.EngineFileSize) { local.engines.Range(func(k, v interface{}) bool { engine := v.(*Engine) res = append(res, engine.getEngineFileSize()) diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index ec5be334b9f5d..a70fd8d3f1822 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -1084,7 +1084,7 @@ func TestMultiIngest(t *testing.T) { pdCtl := &pdutil.PdController{} pdCtl.SetPDClient(&mockPdClient{stores: stores}) - local := &local{ + local := &Local{ pdCtl: pdCtl, importClientFactory: &mockImportClientFactory{ stores: allStores, @@ -1105,7 +1105,7 @@ func TestMultiIngest(t *testing.T) { } func TestLocalWriteAndIngestPairsFailFast(t *testing.T) { - bak := local{} + bak := Local{} require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace", "return(true)")) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace")) @@ -1153,7 +1153,7 @@ func TestGetRegionSplitSizeKeys(t *testing.T) { } func TestLocalIsRetryableTiKVWriteError(t *testing.T) { - l := local{} + l := Local{} require.True(t, l.isRetryableImportTiKVError(io.EOF)) require.True(t, l.isRetryableImportTiKVError(errors.Trace(io.EOF))) } @@ -1175,7 +1175,7 @@ func TestCheckPeersBusy(t *testing.T) { }} createTimeStore12 := 0 - local := &local{ + local := &Local{ importClientFactory: &mockImportClientFactory{ stores: []*metapb.Store{ {Id: 11}, {Id: 12}, {Id: 13}, // region ["a", "b") @@ -1196,12 +1196,14 @@ func TestCheckPeersBusy(t *testing.T) { return importCli }, }, - logger: log.L(), - writeLimiter: noopStoreWriteLimiter{}, - bufferPool: membuf.NewPool(), - supportMultiIngest: true, - shouldCheckWriteStall: true, - tikvCodec: keyspace.CodecV1, + logger: log.L(), + writeLimiter: noopStoreWriteLimiter{}, + bufferPool: membuf.NewPool(), + supportMultiIngest: true, + BackendConfig: BackendConfig{ + ShouldCheckWriteStall: true, + }, + tikvCodec: keyspace.CodecV1, } db, tmpPath := makePebbleDB(t, nil) @@ -1350,7 +1352,7 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) { getSizePropertiesFn = backup }) - local := &local{ + local := &Local{ splitCli: initTestSplitClient( [][]byte{{1}, {11}}, // we have one big region panicSplitRegionClient{}, // make sure no further split region diff --git a/br/pkg/lightning/backend/local/localhelper.go b/br/pkg/lightning/backend/local/localhelper.go index ac1990e175d54..3f8e66be06eeb 100644 --- a/br/pkg/lightning/backend/local/localhelper.go +++ b/br/pkg/lightning/backend/local/localhelper.go @@ -111,7 +111,7 @@ func (g *TableRegionSizeGetterImpl) GetTableRegionSize(ctx context.Context, tabl // SplitAndScatterRegionInBatches splits&scatter regions in batches. // Too many split&scatter requests may put a lot of pressure on TiKV and PD. -func (local *local) SplitAndScatterRegionInBatches( +func (local *Local) SplitAndScatterRegionInBatches( ctx context.Context, ranges []Range, tableInfo *checkpoints.TidbTableInfo, @@ -135,7 +135,7 @@ func (local *local) SplitAndScatterRegionInBatches( // we can simply call br function, but we need to change some function signature of br // When the ranges total size is small, we can skip the split to avoid generate empty regions. // TODO: remove this file and use br internal functions -func (local *local) SplitAndScatterRegionByRanges( +func (local *Local) SplitAndScatterRegionByRanges( ctx context.Context, ranges []Range, tableInfo *checkpoints.TidbTableInfo, @@ -392,7 +392,8 @@ func (local *local) SplitAndScatterRegionByRanges( return nil } -func (local *local) BatchSplitRegions( +// BatchSplitRegions splits the region into multiple regions by given split keys. +func (local *Local) BatchSplitRegions( ctx context.Context, region *split.RegionInfo, keys [][]byte, @@ -437,7 +438,7 @@ func (local *local) BatchSplitRegions( return region, newRegions, nil } -func (local *local) hasRegion(ctx context.Context, regionID uint64) (bool, error) { +func (local *Local) hasRegion(ctx context.Context, regionID uint64) (bool, error) { regionInfo, err := local.splitCli.GetRegionByID(ctx, regionID) if err != nil { return false, err @@ -445,7 +446,7 @@ func (local *local) hasRegion(ctx context.Context, regionID uint64) (bool, error return regionInfo != nil, nil } -func (local *local) waitForSplit(ctx context.Context, regionID uint64) { +func (local *Local) waitForSplit(ctx context.Context, regionID uint64) { for i := 0; i < split.SplitCheckMaxRetryTimes; i++ { ok, err := local.hasRegion(ctx, regionID) if err != nil { @@ -463,7 +464,7 @@ func (local *local) waitForSplit(ctx context.Context, regionID uint64) { } } -func (local *local) waitForScatterRegions(ctx context.Context, regions []*split.RegionInfo) (scatterCount int, _ error) { +func (local *Local) waitForScatterRegions(ctx context.Context, regions []*split.RegionInfo) (scatterCount int, _ error) { subCtx, cancel := context.WithTimeout(ctx, split.ScatterWaitUpperInterval) defer cancel() @@ -494,7 +495,7 @@ func (local *local) waitForScatterRegions(ctx context.Context, regions []*split. return scatterCount, nil } -func (local *local) checkRegionScatteredOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) { +func (local *Local) checkRegionScatteredOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) { resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId()) if err != nil { return false, err diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index 95f495518711e..328ee3e6a930c 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -455,7 +455,7 @@ func doTestBatchSplitRegionByRanges(ctx context.Context, t *testing.T, hook clie keys := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")} client := initTestSplitClient(keys, hook) - local := &local{ + local := &Local{ splitCli: client, regionSizeGetter: &TableRegionSizeGetterImpl{}, logger: log.L(), @@ -629,7 +629,7 @@ func TestSplitAndScatterRegionInBatches(t *testing.T) { keys := [][]byte{[]byte(""), []byte("a"), []byte("b"), []byte("")} client := initTestSplitClient(keys, nil) - local := &local{ + local := &Local{ splitCli: client, regionSizeGetter: &TableRegionSizeGetterImpl{}, logger: log.L(), @@ -716,7 +716,7 @@ func doTestBatchSplitByRangesWithClusteredIndex(t *testing.T, hook clientHook) { } keys = append(keys, tableEndKey, []byte("")) client := initTestSplitClient(keys, hook) - local := &local{ + local := &Local{ splitCli: client, regionSizeGetter: &TableRegionSizeGetterImpl{}, logger: log.L(), diff --git a/br/pkg/lightning/backend/noop/BUILD.bazel b/br/pkg/lightning/backend/noop/BUILD.bazel index 98f23ca1bf819..b7f46c4c1166d 100644 --- a/br/pkg/lightning/backend/noop/BUILD.bazel +++ b/br/pkg/lightning/backend/noop/BUILD.bazel @@ -8,10 +8,8 @@ go_library( deps = [ "//br/pkg/lightning/backend", "//br/pkg/lightning/backend/encode", - "//br/pkg/lightning/config", "//br/pkg/lightning/verification", "//parser/model", - "//table", "//types", "@com_github_google_uuid//:uuid", ], diff --git a/br/pkg/lightning/backend/noop/noop.go b/br/pkg/lightning/backend/noop/noop.go index 6bd75a329fccc..08f05759adfc8 100644 --- a/br/pkg/lightning/backend/noop/noop.go +++ b/br/pkg/lightning/backend/noop/noop.go @@ -21,10 +21,8 @@ import ( "github.com/google/uuid" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" - "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" ) @@ -148,21 +146,6 @@ func (b noopBackend) LocalWriter(context.Context, *backend.LocalWriterConfig, uu return Writer{}, nil } -// CollectLocalDuplicateRows collects duplicate rows from local backend. -func (b noopBackend) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (bool, error) { - panic("Unsupported Operation") -} - -// CollectRemoteDuplicateRows collects duplicate rows from remote backend. -func (b noopBackend) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (bool, error) { - panic("Unsupported Operation") -} - -// ResolveDuplicateRows resolves duplicate rows. -func (b noopBackend) ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) error { - return nil -} - // TotalMemoryConsume returns the total memory usage of the backend. func (b noopBackend) TotalMemoryConsume() int64 { return 0 diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index 85a6a7e92f414..ed70135461e9b 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -566,18 +566,6 @@ func (be *tidbBackend) CleanupEngine(context.Context, uuid.UUID) error { return nil } -func (be *tidbBackend) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (bool, error) { - panic("Unsupported Operation") -} - -func (be *tidbBackend) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (bool, error) { - panic("Unsupported Operation") -} - -func (be *tidbBackend) ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) error { - return nil -} - func (be *tidbBackend) ImportEngine(context.Context, uuid.UUID, int64, int64) error { return nil } diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index f7d403cd5a21c..02cc92501d71d 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -45,6 +45,7 @@ import ( "go.uber.org/zap" ) +// constants for config items const ( // ImportMode defines mode of import for tikv. ImportMode = "import" @@ -69,11 +70,14 @@ const ( // ErrorOnDup indicates using INSERT INTO to insert data, which would violate PK or UNIQUE constraint ErrorOnDup = "error" + KVWriteBatchSize = 32768 + DefaultRangeConcurrency = 16 + defaultDistSQLScanConcurrency = 15 defaultBuildStatsConcurrency = 20 defaultIndexSerialScanConcurrency = 20 defaultChecksumTableConcurrency = 2 - defaultTableConcurrency = 6 + DefaultTableConcurrency = 6 defaultIndexConcurrency = 2 // defaultMetaSchemaName is the default database name used to store lightning metadata @@ -88,8 +92,9 @@ const ( // With cron.check-disk-quota = 1m, region-concurrency = 40, this should // contribute 2.3 GiB to the reserved size. // autoDiskQuotaLocalReservedSpeed uint64 = 1 * units.KiB - defaultEngineMemCacheSize = 512 * units.MiB - defaultLocalWriterMemCacheSize = 128 * units.MiB + + DefaultEngineMemCacheSize = 512 * units.MiB + DefaultLocalWriterMemCacheSize = 128 * units.MiB defaultCSVDataCharacterSet = "binary" defaultCSVDataInvalidCharReplace = utf8.RuneError @@ -913,7 +918,7 @@ func NewConfig() *Config { Backend: "", OnDuplicate: ReplaceOnDup, MaxKVPairs: 4096, - SendKVPairs: 32768, + SendKVPairs: KVWriteBatchSize, RegionSplitSize: 0, DiskQuota: ByteSize(math.MaxInt64), DuplicateResolution: DupeResAlgNone, @@ -1152,10 +1157,10 @@ func (cfg *Config) AdjustCommon() (bool, error) { // TODO calculate these from the machine's free memory. if cfg.TikvImporter.EngineMemCacheSize == 0 { - cfg.TikvImporter.EngineMemCacheSize = defaultEngineMemCacheSize + cfg.TikvImporter.EngineMemCacheSize = DefaultEngineMemCacheSize } if cfg.TikvImporter.LocalWriterMemCacheSize == 0 { - cfg.TikvImporter.LocalWriterMemCacheSize = defaultLocalWriterMemCacheSize + cfg.TikvImporter.LocalWriterMemCacheSize = DefaultLocalWriterMemCacheSize } if cfg.TikvImporter.Backend == BackendLocal { @@ -1233,14 +1238,14 @@ func (cfg *Config) DefaultVarsForImporterAndLocalBackend() { cfg.App.IndexConcurrency = defaultIndexConcurrency } if cfg.App.TableConcurrency == 0 { - cfg.App.TableConcurrency = defaultTableConcurrency + cfg.App.TableConcurrency = DefaultTableConcurrency } if len(cfg.App.MetaSchemaName) == 0 { cfg.App.MetaSchemaName = defaultMetaSchemaName } if cfg.TikvImporter.RangeConcurrency == 0 { - cfg.TikvImporter.RangeConcurrency = 16 + cfg.TikvImporter.RangeConcurrency = DefaultRangeConcurrency } if cfg.TiDB.BuildStatsConcurrency == 0 { cfg.TiDB.BuildStatsConcurrency = defaultBuildStatsConcurrency @@ -1379,7 +1384,7 @@ func (cfg *Config) AdjustCheckPoint() { // AdjustMydumper adjusts the mydumper config. func (cfg *Config) AdjustMydumper() { if cfg.Mydumper.BatchImportRatio < 0.0 || cfg.Mydumper.BatchImportRatio >= 1.0 { - cfg.Mydumper.BatchImportRatio = 0.75 + cfg.Mydumper.BatchImportRatio = DefaultBatchImportRatio } if cfg.Mydumper.ReadBlockSize <= 0 { cfg.Mydumper.ReadBlockSize = ReadBlockSize diff --git a/br/pkg/lightning/config/const.go b/br/pkg/lightning/config/const.go index 8402182425029..f42568229b77f 100644 --- a/br/pkg/lightning/config/const.go +++ b/br/pkg/lightning/config/const.go @@ -24,6 +24,8 @@ import ( // some constants const ( + DefaultBatchImportRatio = 0.75 + ReadBlockSize ByteSize = 64 * units.KiB // SplitRegionSize See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360 // lower the max-key-count to avoid tikv trigger region auto split diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index 5d70733e9e20c..cb1f53305d7db 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -368,7 +368,8 @@ func NewImportControllerWithPauser( regionSizeGetter := &local.TableRegionSizeGetterImpl{ DB: db, } - backendObj, err = local.NewLocalBackend(ctx, tls, cfg, regionSizeGetter, maxOpenFiles, errorMgr, p.KeyspaceName) + backendConfig := local.NewBackendConfig(cfg, maxOpenFiles, p.KeyspaceName) + backendObj, err = local.NewLocalBackend(ctx, tls, backendConfig, regionSizeGetter) if err != nil { return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err) } diff --git a/br/pkg/lightning/importer/table_import.go b/br/pkg/lightning/importer/table_import.go index 6790100a530fe..793816d709971 100644 --- a/br/pkg/lightning/importer/table_import.go +++ b/br/pkg/lightning/importer/table_import.go @@ -219,7 +219,8 @@ func (tr *TableImporter) Close() { func (tr *TableImporter) populateChunks(ctx context.Context, rc *Controller, cp *checkpoints.TableCheckpoint) error { task := tr.logger.Begin(zap.InfoLevel, "load engines and files") - tableRegions, err := mydump.MakeTableRegions(ctx, tr.tableMeta, len(tr.tableInfo.Core.Columns), rc.cfg, rc.ioWorkers, rc.store) + divideConfig := mydump.NewDataDivideConfig(rc.cfg, len(tr.tableInfo.Core.Columns), rc.ioWorkers, rc.store, tr.tableMeta) + tableRegions, err := mydump.MakeTableRegions(ctx, divideConfig) if err == nil { timestamp := time.Now().Unix() failpoint.Inject("PopulateChunkTimestamp", func(v failpoint.Value) { @@ -920,6 +921,11 @@ func (tr *TableImporter) postProcess( tr.logger.Info("local checksum", zap.Object("checksum", &localChecksum)) // 4.5. do duplicate detection. + // if we came here, it must be a local backend. + // todo: remove this cast after we refactor the backend interface. Physical mode is so different, we shouldn't + // try to abstract it with logical mode. + localBackend := rc.backend.Inner().(*local.Local) + dupeController := localBackend.GetDupeController(rc.cfg.TikvImporter.RangeConcurrency*2, rc.errorMgr) hasDupe := false if rc.cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone { opts := &encode.SessionOptions{ @@ -927,7 +933,7 @@ func (tr *TableImporter) postProcess( SysVars: rc.sysVars, } var err error - hasLocalDupe, err := rc.backend.CollectLocalDuplicateRows(ctx, tr.encTable, tr.tableName, opts) + hasLocalDupe, err := dupeController.CollectLocalDuplicateRows(ctx, tr.encTable, tr.tableName, opts) if err != nil { tr.logger.Error("collect local duplicate keys failed", log.ShortError(err)) return false, err @@ -953,7 +959,7 @@ func (tr *TableImporter) postProcess( SQLMode: mysql.ModeStrictAllTables, SysVars: rc.sysVars, } - hasRemoteDupe, e := rc.backend.CollectRemoteDuplicateRows(ctx, tr.encTable, tr.tableName, opts) + hasRemoteDupe, e := dupeController.CollectRemoteDuplicateRows(ctx, tr.encTable, tr.tableName, opts) if e != nil { tr.logger.Error("collect remote duplicate keys failed", log.ShortError(e)) return false, e @@ -961,7 +967,7 @@ func (tr *TableImporter) postProcess( hasDupe = hasDupe || hasRemoteDupe if hasDupe { - if err = rc.backend.ResolveDuplicateRows(ctx, tr.encTable, tr.tableName, rc.cfg.TikvImporter.DuplicateResolution); err != nil { + if err = dupeController.ResolveDuplicateRows(ctx, tr.encTable, tr.tableName, rc.cfg.TikvImporter.DuplicateResolution); err != nil { tr.logger.Error("resolve remote duplicate keys failed", log.ShortError(err)) return false, err } diff --git a/br/pkg/lightning/mydump/csv_parser_test.go b/br/pkg/lightning/mydump/csv_parser_test.go index 3e002eeef42ab..6c3d9041c71fe 100644 --- a/br/pkg/lightning/mydump/csv_parser_test.go +++ b/br/pkg/lightning/mydump/csv_parser_test.go @@ -23,7 +23,7 @@ import ( "go.uber.org/zap" ) -var ioWorkers = worker.NewPool(context.Background(), 5, "test_csv") +var ioWorkersForCSV = worker.NewPool(context.Background(), 5, "test_csv") func assertPosEqual(t *testing.T, parser mydump.Parser, expectPos, expectRowID int64) { pos, rowID := parser.Pos() @@ -42,7 +42,7 @@ func runTestCasesCSV(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int for _, tc := range cases { charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) assert.NoError(t, err) - parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(tc.input), blockBufSize, ioWorkers, false, charsetConvertor) + parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(tc.input), blockBufSize, ioWorkersForCSV, false, charsetConvertor) assert.NoError(t, err) for i, row := range tc.expected { comment := fmt.Sprintf("input = %q, row = %d", tc.input, i+1) @@ -59,7 +59,7 @@ func runFailingTestCasesCSV(t *testing.T, cfg *config.MydumperRuntime, blockBufS for _, tc := range cases { charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) assert.NoError(t, err) - parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(tc), blockBufSize, ioWorkers, false, charsetConvertor) + parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(tc), blockBufSize, ioWorkersForCSV, false, charsetConvertor) require.NoError(t, err) e := parser.ReadRow() assert.Regexpf(t, "syntax error.*", e.Error(), "input = %q / %s", tc, errors.ErrorStack(e)) @@ -138,7 +138,7 @@ func TestTPCH(t *testing.T) { TrimLastSep: true, } - parser, err := mydump.NewCSVParser(context.Background(), &cfg, reader, int64(config.ReadBlockSize), ioWorkers, false, nil) + parser, err := mydump.NewCSVParser(context.Background(), &cfg, reader, int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) require.Nil(t, parser.ReadRow()) require.Equal(t, mydump.Row{ @@ -216,7 +216,7 @@ func TestTPCHMultiBytes(t *testing.T) { } reader := mydump.NewStringReader(inputStr) - parser, err := mydump.NewCSVParser(context.Background(), &cfg, reader, int64(config.ReadBlockSize), ioWorkers, false, nil) + parser, err := mydump.NewCSVParser(context.Background(), &cfg, reader, int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) for i, expectedParserPos := range allExpectedParserPos { @@ -238,7 +238,7 @@ func TestRFC4180(t *testing.T) { // example 1, trailing new lines - parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader("aaa,bbb,ccc\nzzz,yyy,xxx\n"), int64(config.ReadBlockSize), ioWorkers, false, nil) + parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader("aaa,bbb,ccc\nzzz,yyy,xxx\n"), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) require.Nil(t, parser.ReadRow()) @@ -269,7 +269,7 @@ func TestRFC4180(t *testing.T) { // example 2, no trailing new lines - parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader("aaa,bbb,ccc\nzzz,yyy,xxx"), int64(config.ReadBlockSize), ioWorkers, false, nil) + parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader("aaa,bbb,ccc\nzzz,yyy,xxx"), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) require.Nil(t, parser.ReadRow()) @@ -300,7 +300,7 @@ func TestRFC4180(t *testing.T) { // example 5, quoted fields - parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`"aaa","bbb","ccc"`+"\nzzz,yyy,xxx"), int64(config.ReadBlockSize), ioWorkers, false, nil) + parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`"aaa","bbb","ccc"`+"\nzzz,yyy,xxx"), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) require.Nil(t, parser.ReadRow()) @@ -333,7 +333,7 @@ func TestRFC4180(t *testing.T) { parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`"aaa","b bb","ccc" -zzz,yyy,xxx`), int64(config.ReadBlockSize), ioWorkers, false, nil) +zzz,yyy,xxx`), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) require.Nil(t, parser.ReadRow()) @@ -364,7 +364,7 @@ zzz,yyy,xxx`), int64(config.ReadBlockSize), ioWorkers, false, nil) // example 7, quote escaping - parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`"aaa","b""bb","ccc"`), int64(config.ReadBlockSize), ioWorkers, false, nil) + parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`"aaa","b""bb","ccc"`), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) require.Nil(t, parser.ReadRow()) @@ -394,7 +394,7 @@ func TestMySQL(t *testing.T) { parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`"\"","\\","\?" "\ -",\N,\\N`), int64(config.ReadBlockSize), ioWorkers, false, nil) +",\N,\\N`), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) require.NoError(t, parser.ReadRow()) @@ -426,7 +426,7 @@ func TestMySQL(t *testing.T) { parser, err = mydump.NewCSVParser( context.Background(), &cfg, mydump.NewStringReader(`"\0\b\n\r\t\Z\\\ \c\'\""`), - int64(config.ReadBlockSize), ioWorkers, false, nil) + int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) require.NoError(t, parser.ReadRow()) @@ -443,7 +443,7 @@ func TestMySQL(t *testing.T) { context.Background(), &cfg, mydump.NewStringReader(`3,"a string containing a " quote",102.20 `), - int64(config.ReadBlockSize), ioWorkers, false, nil) + int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) require.NoError(t, parser.ReadRow()) @@ -460,7 +460,7 @@ func TestMySQL(t *testing.T) { parser, err = mydump.NewCSVParser( context.Background(), &cfg, mydump.NewStringReader(`3,"a string containing a " quote","102.20"`), - int64(config.ReadBlockSize), ioWorkers, false, nil) + int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) require.NoError(t, parser.ReadRow()) @@ -477,7 +477,7 @@ func TestMySQL(t *testing.T) { parser, err = mydump.NewCSVParser( context.Background(), &cfg, mydump.NewStringReader(`"a"b",c"d"e`), - int64(config.ReadBlockSize), ioWorkers, false, nil) + int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) require.NoError(t, parser.ReadRow()) @@ -502,7 +502,7 @@ func TestCustomEscapeChar(t *testing.T) { parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`"!"","!!","!\" "! -",!N,!!N`), int64(config.ReadBlockSize), ioWorkers, false, nil) +",!N,!!N`), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) require.Nil(t, parser.ReadRow()) @@ -542,7 +542,7 @@ func TestCustomEscapeChar(t *testing.T) { parser, err = mydump.NewCSVParser( context.Background(), &cfg, mydump.NewStringReader(`"{""itemRangeType"":0,""itemContainType"":0,""shopRangeType"":1,""shopJson"":""[{\""id\"":\""A1234\"",\""shopName\"":\""AAAAAA\""}]""}"`), - int64(config.ReadBlockSize), ioWorkers, false, nil) + int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) require.Nil(t, parser.ReadRow()) @@ -596,7 +596,7 @@ func TestTSV(t *testing.T) { parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`a b c d e f 0 foo 0000-00-00 0 foo 0000-00-00 -0 abc def ghi bar 1999-12-31`), int64(config.ReadBlockSize), ioWorkers, true, nil) +0 abc def ghi bar 1999-12-31`), int64(config.ReadBlockSize), ioWorkersForCSV, true, nil) require.NoError(t, err) require.Nil(t, parser.ReadRow()) @@ -655,7 +655,7 @@ func TestCsvWithWhiteSpaceLine(t *testing.T) { Null: []string{""}, } data := " \r\n\r\n0,,abc\r\n \r\n123,1999-12-31,test\r\n" - parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(data), int64(config.ReadBlockSize), ioWorkers, false, nil) + parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(data), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) require.Nil(t, parser.ReadRow()) require.Equal(t, mydump.Row{ @@ -684,7 +684,7 @@ func TestCsvWithWhiteSpaceLine(t *testing.T) { cfg.Header = true cfg.HeaderSchemaMatch = true data = " \r\na,b,c\r\n0,,abc\r\n" - parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(data), int64(config.ReadBlockSize), ioWorkers, true, nil) + parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(data), int64(config.ReadBlockSize), ioWorkersForCSV, true, nil) require.NoError(t, err) require.Nil(t, parser.ReadRow()) require.Equal(t, []string{"a", "b", "c"}, parser.Columns()) @@ -708,7 +708,7 @@ func TestEmpty(t *testing.T) { Delimiter: `"`, } - parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(""), int64(config.ReadBlockSize), ioWorkers, false, nil) + parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(""), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF) @@ -717,11 +717,11 @@ func TestEmpty(t *testing.T) { cfg.Header = true cfg.HeaderSchemaMatch = true - parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(""), int64(config.ReadBlockSize), ioWorkers, true, nil) + parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(""), int64(config.ReadBlockSize), ioWorkersForCSV, true, nil) require.NoError(t, err) require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF) - parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader("h\n"), int64(config.ReadBlockSize), ioWorkers, true, nil) + parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader("h\n"), int64(config.ReadBlockSize), ioWorkersForCSV, true, nil) require.NoError(t, err) require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF) } @@ -731,7 +731,7 @@ func TestCRLF(t *testing.T) { Separator: ",", Delimiter: `"`, } - parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader("a\rb\r\nc\n\n\n\nd"), int64(config.ReadBlockSize), ioWorkers, false, nil) + parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader("a\rb\r\nc\n\n\n\nd"), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) require.Nil(t, parser.ReadRow()) @@ -771,7 +771,7 @@ func TestQuotedSeparator(t *testing.T) { Delimiter: `"`, } - parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`",",','`), int64(config.ReadBlockSize), ioWorkers, false, nil) + parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`",",','`), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) require.Nil(t, parser.ReadRow()) require.Equal(t, mydump.Row{ @@ -833,7 +833,7 @@ func TestTooLargeRow(t *testing.T) { } charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) require.NoError(t, err) - parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(testCase.String()), int64(config.ReadBlockSize), ioWorkers, false, charsetConvertor) + parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(testCase.String()), int64(config.ReadBlockSize), ioWorkersForCSV, false, charsetConvertor) require.NoError(t, err) e := parser.ReadRow() require.Error(t, e) @@ -1002,7 +1002,7 @@ func TestReadError(t *testing.T) { Delimiter: `"`, } - parser, err := mydump.NewCSVParser(context.Background(), &cfg, &errorReader{}, int64(config.ReadBlockSize), ioWorkers, false, nil) + parser, err := mydump.NewCSVParser(context.Background(), &cfg, &errorReader{}, int64(config.ReadBlockSize), ioWorkersForCSV, false, nil) require.NoError(t, err) require.Regexp(t, "fake read error", parser.ReadRow().Error()) } @@ -1017,7 +1017,7 @@ func TestSyntaxErrorLog(t *testing.T) { } tc := mydump.NewStringReader("x'" + strings.Repeat("y", 50000)) - parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, tc, 50000, ioWorkers, false, nil) + parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, tc, 50000, ioWorkersForCSV, false, nil) require.NoError(t, err) logger, buffer := log.MakeTestLogger() parser.SetLogger(logger) @@ -1044,7 +1044,7 @@ func TestTrimLastSep(t *testing.T) { &cfg.CSV, mydump.NewStringReader("123,456,789,\r\na,b,,\r\n,,,\r\n\"a\",\"\",\"\",\r\n"), int64(config.ReadBlockSize), - ioWorkers, + ioWorkersForCSV, false, nil, ) @@ -1102,7 +1102,7 @@ func TestReadUntilTerminator(t *testing.T) { &cfg.CSV, mydump.NewStringReader("xxx1#2#3#4#\n56#78"), int64(config.ReadBlockSize), - ioWorkers, + ioWorkersForCSV, false, nil, ) @@ -1345,7 +1345,7 @@ yyy",5,xx"xxxx,8 StartingBy: "x\nxx", }, } - _, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, nil, 1, ioWorkers, false, nil) + _, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, nil, 1, ioWorkersForCSV, false, nil) require.ErrorContains(t, err, "STARTING BY 'x\nxx' cannot contain LINES TERMINATED BY '\n'") } @@ -1421,7 +1421,7 @@ func BenchmarkReadRowUsingMydumpCSVParser(b *testing.B) { }() cfg := config.CSVConfig{Separator: ","} - parser, err := mydump.NewCSVParser(context.Background(), &cfg, file, 65536, ioWorkers, false, nil) + parser, err := mydump.NewCSVParser(context.Background(), &cfg, file, 65536, ioWorkersForCSV, false, nil) require.NoError(b, err) parser.SetLogger(log.Logger{Logger: zap.NewNop()}) @@ -1528,7 +1528,7 @@ func TestHeaderSchemaMatch(t *testing.T) { cfg.CSV.HeaderSchemaMatch = tc.HeaderSchemaMatch charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) assert.NoError(t, err) - parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(inputData), int64(config.ReadBlockSize), ioWorkers, tc.Header, charsetConvertor) + parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(inputData), int64(config.ReadBlockSize), ioWorkersForCSV, tc.Header, charsetConvertor) assert.NoError(t, err) for i, row := range tc.ExpectedData { comment := fmt.Sprintf("row = %d, header = %v, header-schema-match = %v", i+1, tc.Header, tc.HeaderSchemaMatch) diff --git a/br/pkg/lightning/mydump/parser.go b/br/pkg/lightning/mydump/parser.go index 71045c510a2b1..a84eee0082d3d 100644 --- a/br/pkg/lightning/mydump/parser.go +++ b/br/pkg/lightning/mydump/parser.go @@ -94,7 +94,9 @@ type ChunkParser struct { // Chunk represents a portion of the data file. type Chunk struct { - Offset int64 + Offset int64 + // for parquet file, it's the total row count + // see makeParquetFileRegion EndOffset int64 RealOffset int64 // we estimate row-id range of the chunk using file-size divided by some factor(depends on column count) diff --git a/br/pkg/lightning/mydump/parser_test.go b/br/pkg/lightning/mydump/parser_test.go index 8ea22657fe0be..bd471ccabce15 100644 --- a/br/pkg/lightning/mydump/parser_test.go +++ b/br/pkg/lightning/mydump/parser_test.go @@ -31,7 +31,7 @@ import ( func runTestCases(t *testing.T, mode mysql.SQLMode, blockBufSize int64, cases []testCase) { for _, tc := range cases { - parser := mydump.NewChunkParser(context.Background(), mode, mydump.NewStringReader(tc.input), blockBufSize, ioWorkers) + parser := mydump.NewChunkParser(context.Background(), mode, mydump.NewStringReader(tc.input), blockBufSize, ioWorkersForCSV) for i, row := range tc.expected { e := parser.ReadRow() comment := fmt.Sprintf("input = %q, row = %d, err = %s", tc.input, i+1, errors.ErrorStack(e)) @@ -45,7 +45,7 @@ func runTestCases(t *testing.T, mode mysql.SQLMode, blockBufSize int64, cases [] func runFailingTestCases(t *testing.T, mode mysql.SQLMode, blockBufSize int64, cases []string) { for _, tc := range cases { - parser := mydump.NewChunkParser(context.Background(), mode, mydump.NewStringReader(tc), blockBufSize, ioWorkers) + parser := mydump.NewChunkParser(context.Background(), mode, mydump.NewStringReader(tc), blockBufSize, ioWorkersForCSV) assert.Regexpf(t, "syntax error.*", parser.ReadRow().Error(), "input = %q", tc) } } @@ -58,7 +58,7 @@ func TestReadRow(t *testing.T) { "insert another_table values (10,11e1,12, '(13)', '(', 14, ')');", ) - parser := mydump.NewChunkParser(context.Background(), mysql.ModeNone, reader, int64(config.ReadBlockSize), ioWorkers) + parser := mydump.NewChunkParser(context.Background(), mysql.ModeNone, reader, int64(config.ReadBlockSize), ioWorkersForCSV) require.NoError(t, parser.ReadRow()) require.Equal(t, mydump.Row{ @@ -134,7 +134,7 @@ func TestReadChunks(t *testing.T) { INSERT foo VALUES (29,30,31,32),(33,34,35,36); `) - parser := mydump.NewChunkParser(context.Background(), mysql.ModeNone, reader, int64(config.ReadBlockSize), ioWorkers) + parser := mydump.NewChunkParser(context.Background(), mysql.ModeNone, reader, int64(config.ReadBlockSize), ioWorkersForCSV) chunks, err := mydump.ReadChunks(parser, 32) require.NoError(t, err) @@ -180,7 +180,7 @@ func TestNestedRow(t *testing.T) { ("789",CONVERT("[]" USING UTF8MB4)); `) - parser := mydump.NewChunkParser(context.Background(), mysql.ModeNone, reader, int64(config.ReadBlockSize), ioWorkers) + parser := mydump.NewChunkParser(context.Background(), mysql.ModeNone, reader, int64(config.ReadBlockSize), ioWorkersForCSV) chunks, err := mydump.ReadChunks(parser, 96) require.NoError(t, err) @@ -413,7 +413,7 @@ func TestPseudoKeywords(t *testing.T) { ) VALUES (); `) - parser := mydump.NewChunkParser(context.Background(), mysql.ModeNone, reader, int64(config.ReadBlockSize), ioWorkers) + parser := mydump.NewChunkParser(context.Background(), mysql.ModeNone, reader, int64(config.ReadBlockSize), ioWorkersForCSV) require.NoError(t, parser.ReadRow()) require.Equal(t, []string{ "c", "c", diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index 779a862959e3e..10d0f03689489 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -79,7 +79,7 @@ func AllocateEngineIDs( dataFileSizes []float64, batchSize float64, batchImportRatio float64, - tableConcurrency float64, + engineConcurrency float64, ) { totalDataFileSize := 0.0 for _, dataFileSize := range dataFileSizes { @@ -103,7 +103,7 @@ func AllocateEngineIDs( // Total/B1 = 1/(1-R) * (N - 1/beta(N, R)) // ≲ N/(1-R) // - // We use a simple brute force search since the search space is extremely small. + // We use a simple brute force search since the search space is small. ratio := totalDataFileSize * (1 - batchImportRatio) / batchSize n := math.Ceil(ratio) logGammaNPlusR, _ := math.Lgamma(n + batchImportRatio) @@ -111,8 +111,8 @@ func AllocateEngineIDs( logGammaR, _ := math.Lgamma(batchImportRatio) invBetaNR := math.Exp(logGammaNPlusR - logGammaN - logGammaR) // 1/B(N, R) = Γ(N+R)/Γ(N)Γ(R) for { - if n <= 0 || n > tableConcurrency { - n = tableConcurrency + if n <= 0 || n > engineConcurrency { + n = engineConcurrency break } realRatio := n - invBetaNR @@ -145,15 +145,67 @@ func AllocateEngineIDs( } } +// DataDivideConfig config used to divide data files into chunks/engines(regions in this context). +type DataDivideConfig struct { + ColumnCnt int + // limit of engine size, we have a complex algorithm to calculate the best engine size, see AllocateEngineIDs. + EngineDataSize int64 + // max chunk size(inside this file we named it region which collides with TiKV region) + MaxChunkSize int64 + // number of concurrent workers to dive data files + Concurrency int + // number of engine runs concurrently, need this to calculate the best engine size for pipelining local-sort and import. + // todo: remove those 2 params, the algorithm seems useless, since we can import concurrently now, the foundation + // assumption of the algorithm is broken. + EngineConcurrency int + // used together with prev param. it is 0.75 nearly all the time, see Mydumper.BatchImportRatio. + // this variable is defined as speed-write-to-TiKV / speed-to-do-local-sort + BatchImportRatio float64 + // used to split large CSV files, to limit concurrency of data read/seek operations + // when nil, no limit. + IOWorkers *worker.Pool + // we need it read row-count for parquet, and to read line terminator to split large CSV files + Store storage.ExternalStorage + TableMeta *MDTableMeta + + // only used when split large CSV files. + StrictFormat bool + DataCharacterSet string + DataInvalidCharReplace string + ReadBlockSize int64 + CSV config.CSVConfig +} + +// NewDataDivideConfig creates a new DataDivideConfig from lightning cfg. +func NewDataDivideConfig(cfg *config.Config, + columns int, + ioWorkers *worker.Pool, + store storage.ExternalStorage, + meta *MDTableMeta, +) *DataDivideConfig { + return &DataDivideConfig{ + ColumnCnt: columns, + EngineDataSize: int64(cfg.Mydumper.BatchSize), + MaxChunkSize: int64(cfg.Mydumper.MaxRegionSize), + Concurrency: cfg.App.RegionConcurrency, + EngineConcurrency: cfg.App.TableConcurrency, + BatchImportRatio: cfg.Mydumper.BatchImportRatio, + IOWorkers: ioWorkers, + Store: store, + TableMeta: meta, + StrictFormat: cfg.Mydumper.StrictFormat, + DataCharacterSet: cfg.Mydumper.DataCharacterSet, + DataInvalidCharReplace: cfg.Mydumper.DataInvalidCharReplace, + ReadBlockSize: int64(cfg.Mydumper.ReadBlockSize), + CSV: cfg.Mydumper.CSV, + } +} + // MakeTableRegions create a new table region. // row-id range of returned TableRegion is increasing monotonically func MakeTableRegions( ctx context.Context, - meta *MDTableMeta, - columns int, - cfg *config.Config, - ioWorkers *worker.Pool, - store storage.ExternalStorage, + cfg *DataDivideConfig, ) ([]*TableRegion, error) { // Split files into regions type fileRegionRes struct { @@ -168,7 +220,7 @@ func MakeTableRegions( execCtx, cancel := context.WithCancel(ctx) defer cancel() - concurrency := mathutil.Max(cfg.App.RegionConcurrency, 2) + concurrency := mathutil.Max(cfg.Concurrency, 2) fileChan := make(chan FileInfo, concurrency) resultChan := make(chan fileRegionRes, concurrency) var wg sync.WaitGroup @@ -177,7 +229,27 @@ func MakeTableRegions( go func() { defer wg.Done() for info := range fileChan { - regions, sizes, err := MakeSourceFileRegion(execCtx, meta, info, columns, cfg, ioWorkers, store) + var ( + regions []*TableRegion + sizes []float64 + err error + ) + dataFileSize := info.FileMeta.FileSize + if info.FileMeta.Type == SourceTypeParquet { + regions, sizes, err = makeParquetFileRegion(ctx, cfg, info) + } else if info.FileMeta.Type == SourceTypeCSV && cfg.StrictFormat && + info.FileMeta.Compression == CompressionNone && + dataFileSize > cfg.MaxChunkSize+cfg.MaxChunkSize/largeCSVLowerThresholdRation { + // If a csv file is overlarge, we need to split it into multiple regions. + // Note: We can only split a csv file whose format is strict. + // We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools + // like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can + // avoid split a lot of small chunks. + // If a csv file is compressed, we can't split it now because we can't get the exact size of a row. + regions, sizes, err = SplitLargeCSV(ctx, cfg, info) + } else { + regions, sizes, err = MakeSourceFileRegion(execCtx, cfg, info) + } select { case resultChan <- fileRegionRes{info: info, regions: regions, sizes: sizes, err: err}: case <-ctx.Done(): @@ -197,6 +269,7 @@ func MakeTableRegions( }() errChan := make(chan error, 1) + meta := cfg.TableMeta fileRegionsMap := make(map[string]fileRegionRes, len(meta.DataFiles)) go func() { for res := range resultChan { @@ -240,14 +313,14 @@ func MakeTableRegions( rowIDBase = fileRegionsRes.regions[len(fileRegionsRes.regions)-1].Chunk.RowIDMax } - batchSize := CalculateBatchSize(float64(cfg.Mydumper.BatchSize), meta.IsRowOrdered, float64(meta.TotalSize)) + batchSize := CalculateBatchSize(float64(cfg.EngineDataSize), meta.IsRowOrdered, float64(meta.TotalSize)) log.FromContext(ctx).Info("makeTableRegions", zap.Int("filesCount", len(meta.DataFiles)), - zap.Int64("MaxRegionSize", int64(cfg.Mydumper.MaxRegionSize)), + zap.Int64("MaxChunkSize", cfg.MaxChunkSize), zap.Int("RegionsCount", len(filesRegions)), zap.Float64("BatchSize", batchSize), zap.Duration("cost", time.Since(start))) - AllocateEngineIDs(filesRegions, dataFileSizes, batchSize, cfg.Mydumper.BatchImportRatio, float64(cfg.App.TableConcurrency)) + AllocateEngineIDs(filesRegions, dataFileSizes, batchSize, cfg.BatchImportRatio, float64(cfg.EngineConcurrency)) return filesRegions, nil } @@ -267,38 +340,14 @@ func CalculateBatchSize(mydumperBatchSize float64, isRowOrdered bool, totalSize // MakeSourceFileRegion create a new source file region. func MakeSourceFileRegion( ctx context.Context, - meta *MDTableMeta, + cfg *DataDivideConfig, fi FileInfo, - columns int, - cfg *config.Config, - ioWorkers *worker.Pool, - store storage.ExternalStorage, ) ([]*TableRegion, []float64, error) { - if fi.FileMeta.Type == SourceTypeParquet { - region, err := makeParquetFileRegion(ctx, store, meta, fi) - if err != nil { - return nil, nil, err - } - return []*TableRegion{region}, []float64{float64(fi.FileMeta.FileSize)}, nil - } - - dataFileSize := fi.FileMeta.FileSize - divisor := int64(columns) + divisor := int64(cfg.ColumnCnt) isCsvFile := fi.FileMeta.Type == SourceTypeCSV if !isCsvFile { divisor += 2 } - // If a csv file is overlarge, we need to split it into multiple regions. - // Note: We can only split a csv file whose format is strict. - // We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools - // like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can - // avoid split a lot of small chunks. - // If a csv file is compressed, we can't split it now because we can't get the exact size of a row. - if isCsvFile && cfg.Mydumper.StrictFormat && fi.FileMeta.Compression == CompressionNone && - dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) { - regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, ioWorkers, store) - return regions, subFileSizes, err - } fileSize := fi.FileMeta.FileSize rowIDMax := fileSize / divisor @@ -312,8 +361,8 @@ func MakeSourceFileRegion( fileSize = TableFileSizeINF } tableRegion := &TableRegion{ - DB: meta.DB, - Table: meta.Name, + DB: cfg.TableMeta.DB, + Table: cfg.TableMeta.Name, FileMeta: fi.FileMeta, Chunk: Chunk{ Offset: 0, @@ -341,22 +390,21 @@ func MakeSourceFileRegion( // parquet file are column orient, so the offset is read line number func makeParquetFileRegion( ctx context.Context, - store storage.ExternalStorage, - meta *MDTableMeta, + cfg *DataDivideConfig, dataFile FileInfo, -) (*TableRegion, error) { +) ([]*TableRegion, []float64, error) { numberRows := dataFile.FileMeta.Rows var err error // for safety if numberRows <= 0 { - numberRows, err = ReadParquetFileRowCountByFile(ctx, store, dataFile.FileMeta) + numberRows, err = ReadParquetFileRowCountByFile(ctx, cfg.Store, dataFile.FileMeta) if err != nil { - return nil, err + return nil, nil, err } } region := &TableRegion{ - DB: meta.DB, - Table: meta.Name, + DB: cfg.TableMeta.DB, + Table: cfg.TableMeta.Name, FileMeta: dataFile.FileMeta, Chunk: Chunk{ Offset: 0, @@ -366,48 +414,44 @@ func makeParquetFileRegion( RowIDMax: numberRows, }, } - return region, nil + return []*TableRegion{region}, []float64{float64(dataFile.FileMeta.FileSize)}, nil } -// SplitLargeFile splits a large csv file into multiple regions, the size of +// SplitLargeCSV splits a large csv file into multiple regions, the size of // each regions is specified by `config.MaxRegionSize`. // Note: We split the file coarsely, thus the format of csv file is needed to be // strict. // e.g. // - CSV file with header is invalid // - a complete tuple split into multiple lines is invalid -func SplitLargeFile( +func SplitLargeCSV( ctx context.Context, - meta *MDTableMeta, - cfg *config.Config, + cfg *DataDivideConfig, dataFile FileInfo, - divisor int64, - ioWorker *worker.Pool, - store storage.ExternalStorage, ) (regions []*TableRegion, dataFileSizes []float64, err error) { - maxRegionSize := int64(cfg.Mydumper.MaxRegionSize) + maxRegionSize := cfg.MaxChunkSize dataFileSizes = make([]float64, 0, dataFile.FileMeta.FileSize/maxRegionSize+1) startOffset, endOffset := int64(0), maxRegionSize var columns []string var prevRowIdxMax int64 - if cfg.Mydumper.CSV.Header { - r, err := store.Open(ctx, dataFile.FileMeta.Path) + if cfg.CSV.Header { + r, err := cfg.Store.Open(ctx, dataFile.FileMeta.Path) if err != nil { return nil, nil, err } // Create a utf8mb4 convertor to encode and decode data with the charset of CSV files. - charsetConvertor, err := NewCharsetConvertor(cfg.Mydumper.DataCharacterSet, cfg.Mydumper.DataInvalidCharReplace) + charsetConvertor, err := NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) if err != nil { return nil, nil, err } - parser, err := NewCSVParser(ctx, &cfg.Mydumper.CSV, r, int64(cfg.Mydumper.ReadBlockSize), ioWorker, true, charsetConvertor) + parser, err := NewCSVParser(ctx, &cfg.CSV, r, cfg.ReadBlockSize, cfg.IOWorkers, true, charsetConvertor) if err != nil { return nil, nil, err } if err = parser.ReadColumns(); err != nil { return nil, nil, err } - if cfg.Mydumper.CSV.HeaderSchemaMatch { + if cfg.CSV.HeaderSchemaMatch { columns = parser.Columns() } startOffset, _ = parser.Pos() @@ -416,20 +460,21 @@ func SplitLargeFile( endOffset = dataFile.FileMeta.FileSize } } + divisor := int64(cfg.ColumnCnt) for { curRowsCnt := (endOffset - startOffset) / divisor rowIDMax := prevRowIdxMax + curRowsCnt if endOffset != dataFile.FileMeta.FileSize { - r, err := store.Open(ctx, dataFile.FileMeta.Path) + r, err := cfg.Store.Open(ctx, dataFile.FileMeta.Path) if err != nil { return nil, nil, err } // Create a utf8mb4 convertor to encode and decode data with the charset of CSV files. - charsetConvertor, err := NewCharsetConvertor(cfg.Mydumper.DataCharacterSet, cfg.Mydumper.DataInvalidCharReplace) + charsetConvertor, err := NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) if err != nil { return nil, nil, err } - parser, err := NewCSVParser(ctx, &cfg.Mydumper.CSV, r, int64(cfg.Mydumper.ReadBlockSize), ioWorker, false, charsetConvertor) + parser, err := NewCSVParser(ctx, &cfg.CSV, r, cfg.ReadBlockSize, cfg.IOWorkers, false, charsetConvertor) if err != nil { return nil, nil, err } @@ -443,7 +488,7 @@ func SplitLargeFile( } log.FromContext(ctx).Warn("file contains no terminator at end", zap.String("path", dataFile.FileMeta.Path), - zap.String("terminator", cfg.Mydumper.CSV.Terminator)) + zap.String("terminator", cfg.CSV.Terminator)) pos = dataFile.FileMeta.FileSize } endOffset = pos @@ -451,8 +496,8 @@ func SplitLargeFile( } regions = append(regions, &TableRegion{ - DB: meta.DB, - Table: meta.Name, + DB: cfg.TableMeta.DB, + Table: cfg.TableMeta.Name, FileMeta: dataFile.FileMeta, Chunk: Chunk{ Offset: startOffset, diff --git a/br/pkg/lightning/mydump/region_test.go b/br/pkg/lightning/mydump/region_test.go index 3f2718532f251..2633a6541e947 100644 --- a/br/pkg/lightning/mydump/region_test.go +++ b/br/pkg/lightning/mydump/region_test.go @@ -45,7 +45,8 @@ func TestTableRegion(t *testing.T) { ioWorkers := worker.NewPool(context.Background(), 1, "io") for _, meta := range dbMeta.Tables { - regions, err := MakeTableRegions(context.Background(), meta, 1, cfg, ioWorkers, loader.GetStore()) + divideConfig := NewDataDivideConfig(cfg, 1, ioWorkers, loader.GetStore(), meta) + regions, err := MakeTableRegions(context.Background(), divideConfig) require.NoError(t, err) // check - region-size vs file-size @@ -164,11 +165,7 @@ func TestAllocateEngineIDs(t *testing.T) { }) } -func TestMakeSourceFileRegion(t *testing.T) { - meta := &MDTableMeta{ - DB: "csv", - Name: "large_csv_file", - } +func TestMakeTableRegionsSplitLargeFile(t *testing.T) { cfg := &config.Config{ Mydumper: config.MydumperRuntime{ ReadBlockSize: config.ReadBlockSize, @@ -194,14 +191,19 @@ func TestMakeSourceFileRegion(t *testing.T) { fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: filePath, Type: SourceTypeCSV, FileSize: fileSize}} colCnt := 3 columns := []string{"a", "b", "c"} + meta := &MDTableMeta{ + DB: "csv", + Name: "large_csv_file", + DataFiles: []FileInfo{fileInfo}, + } ctx := context.Background() - ioWorkers := worker.NewPool(ctx, 4, "io") store, err := storage.NewLocalStorage(".") assert.NoError(t, err) - fileInfo.FileMeta.Compression = CompressionNone - regions, _, err := MakeSourceFileRegion(ctx, meta, fileInfo, colCnt, cfg, ioWorkers, store) + meta.DataFiles[0].FileMeta.Compression = CompressionNone + divideConfig := NewDataDivideConfig(cfg, colCnt, nil, store, meta) + regions, err := MakeTableRegions(ctx, divideConfig) assert.NoError(t, err) offsets := [][]int64{{6, 12}, {12, 18}, {18, 24}, {24, 30}} assert.Len(t, regions, len(offsets)) @@ -212,8 +214,8 @@ func TestMakeSourceFileRegion(t *testing.T) { } // test - gzip compression - fileInfo.FileMeta.Compression = CompressionGZ - regions, _, err = MakeSourceFileRegion(ctx, meta, fileInfo, colCnt, cfg, ioWorkers, store) + meta.DataFiles[0].FileMeta.Compression = CompressionGZ + regions, err = MakeTableRegions(ctx, divideConfig) assert.NoError(t, err) assert.Len(t, regions, 1) assert.Equal(t, int64(0), regions[0].Chunk.Offset) @@ -226,24 +228,6 @@ func TestCompressedMakeSourceFileRegion(t *testing.T) { DB: "csv", Name: "large_csv_file", } - cfg := &config.Config{ - Mydumper: config.MydumperRuntime{ - ReadBlockSize: config.ReadBlockSize, - MaxRegionSize: 1, - CSV: config.CSVConfig{ - Separator: ",", - Delimiter: "", - Header: true, - HeaderSchemaMatch: true, - TrimLastSep: false, - NotNull: false, - Null: []string{"NULL"}, - EscapedBy: `\`, - }, - StrictFormat: true, - Filter: []string{"*.*"}, - }, - } filePath := "./csv/split_large_file.csv.zst" dataFileInfo, err := os.Stat(filePath) require.NoError(t, err) @@ -258,14 +242,17 @@ func TestCompressedMakeSourceFileRegion(t *testing.T) { colCnt := 3 ctx := context.Background() - ioWorkers := worker.NewPool(ctx, 4, "io") store, err := storage.NewLocalStorage(".") assert.NoError(t, err) compressRatio, err := SampleFileCompressRatio(ctx, fileInfo.FileMeta, store) require.NoError(t, err) fileInfo.FileMeta.RealSize = int64(compressRatio * float64(fileInfo.FileMeta.FileSize)) - regions, sizes, err := MakeSourceFileRegion(ctx, meta, fileInfo, colCnt, cfg, ioWorkers, store) + divideConfig := &DataDivideConfig{ + ColumnCnt: colCnt, + TableMeta: meta, + } + regions, sizes, err := MakeSourceFileRegion(ctx, divideConfig, fileInfo) assert.NoError(t, err) assert.Len(t, regions, 1) assert.Equal(t, int64(0), regions[0].Chunk.Offset) @@ -304,7 +291,10 @@ func TestSplitLargeFile(t *testing.T) { require.NoError(t, err) fileSize := dataFileInfo.Size() fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: filePath, Type: SourceTypeCSV, FileSize: fileSize}} - colCnt := int64(3) + ioWorker := worker.NewPool(context.Background(), 4, "io") + store, err := storage.NewLocalStorage(".") + assert.NoError(t, err) + divideConfig := NewDataDivideConfig(cfg, 3, ioWorker, store, meta) columns := []string{"a", "b", "c"} for _, tc := range []struct { maxRegionSize config.ByteSize @@ -318,13 +308,9 @@ func TestSplitLargeFile(t *testing.T) { {18, [][]int64{{6, 30}}}, {19, [][]int64{{6, 30}}}, } { - cfg.Mydumper.MaxRegionSize = tc.maxRegionSize - ioWorker := worker.NewPool(context.Background(), 4, "io") + divideConfig.MaxChunkSize = int64(tc.maxRegionSize) - store, err := storage.NewLocalStorage(".") - assert.NoError(t, err) - - regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, ioWorker, store) + regions, _, err := SplitLargeCSV(context.Background(), divideConfig, fileInfo) assert.NoError(t, err) assert.Len(t, regions, len(tc.offsets)) for i := range tc.offsets { @@ -372,16 +358,16 @@ func TestSplitLargeFileNoNewLineAtEOF(t *testing.T) { require.NoError(t, err) fileSize := dataFileInfo.Size() fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}} - colCnt := int64(2) - columns := []string{"a", "b"} ioWorker := worker.NewPool(context.Background(), 4, "io") store, err := storage.NewLocalStorage(dir) require.NoError(t, err) + divideConfig := NewDataDivideConfig(cfg, 2, ioWorker, store, meta) + columns := []string{"a", "b"} offsets := [][]int64{{4, 13}, {13, 21}} - regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, ioWorker, store) + regions, _, err := SplitLargeCSV(context.Background(), divideConfig, fileInfo) require.NoError(t, err) require.Len(t, regions, len(offsets)) for i := range offsets { @@ -422,15 +408,15 @@ func TestSplitLargeFileWithCustomTerminator(t *testing.T) { require.NoError(t, err) fileSize := dataFileInfo.Size() fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}} - colCnt := int64(3) ioWorker := worker.NewPool(context.Background(), 4, "io") store, err := storage.NewLocalStorage(dir) require.NoError(t, err) + divideConfig := NewDataDivideConfig(cfg, 3, ioWorker, store, meta) offsets := [][]int64{{0, 23}, {23, 38}, {38, 47}} - regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, ioWorker, store) + regions, _, err := SplitLargeCSV(context.Background(), divideConfig, fileInfo) require.NoError(t, err) require.Len(t, regions, len(offsets)) for i := range offsets { @@ -476,16 +462,16 @@ func TestSplitLargeFileOnlyOneChunk(t *testing.T) { require.NoError(t, err) fileSize := dataFileInfo.Size() fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}} - colCnt := int64(2) columns := []string{"field1", "field2"} ioWorker := worker.NewPool(context.Background(), 4, "io") store, err := storage.NewLocalStorage(dir) require.NoError(t, err) + divideConfig := NewDataDivideConfig(cfg, 2, ioWorker, store, meta) offsets := [][]int64{{14, 24}} - regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, ioWorker, store) + regions, _, err := SplitLargeCSV(context.Background(), divideConfig, fileInfo) require.NoError(t, err) require.Len(t, regions, len(offsets)) for i := range offsets { diff --git a/br/pkg/mock/BUILD.bazel b/br/pkg/mock/BUILD.bazel index bc729021a8de2..d5f24094afe88 100644 --- a/br/pkg/mock/BUILD.bazel +++ b/br/pkg/mock/BUILD.bazel @@ -30,7 +30,6 @@ go_library( "//server", "//session", "//store/mockstore", - "//table", "//types", "//util/sqlexec", "@com_github_aws_aws_sdk_go//aws/request", diff --git a/br/pkg/mock/backend.go b/br/pkg/mock/backend.go index 35d4158546bd3..f9fecf65866fb 100644 --- a/br/pkg/mock/backend.go +++ b/br/pkg/mock/backend.go @@ -13,9 +13,7 @@ import ( uuid "github.com/google/uuid" backend "github.com/pingcap/tidb/br/pkg/lightning/backend" encode "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" - config "github.com/pingcap/tidb/br/pkg/lightning/config" model "github.com/pingcap/tidb/parser/model" - table "github.com/pingcap/tidb/table" ) // MockBackend is a mock of AbstractBackend interface. @@ -81,36 +79,6 @@ func (mr *MockBackendMockRecorder) CloseEngine(arg0, arg1, arg2 interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseEngine", reflect.TypeOf((*MockBackend)(nil).CloseEngine), arg0, arg1, arg2) } -// CollectLocalDuplicateRows mocks base method. -func (m *MockBackend) CollectLocalDuplicateRows(arg0 context.Context, arg1 table.Table, arg2 string, arg3 *encode.SessionOptions) (bool, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CollectLocalDuplicateRows", arg0, arg1, arg2, arg3) - ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// CollectLocalDuplicateRows indicates an expected call of CollectLocalDuplicateRows. -func (mr *MockBackendMockRecorder) CollectLocalDuplicateRows(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CollectLocalDuplicateRows", reflect.TypeOf((*MockBackend)(nil).CollectLocalDuplicateRows), arg0, arg1, arg2, arg3) -} - -// CollectRemoteDuplicateRows mocks base method. -func (m *MockBackend) CollectRemoteDuplicateRows(arg0 context.Context, arg1 table.Table, arg2 string, arg3 *encode.SessionOptions) (bool, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CollectRemoteDuplicateRows", arg0, arg1, arg2, arg3) - ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// CollectRemoteDuplicateRows indicates an expected call of CollectRemoteDuplicateRows. -func (mr *MockBackendMockRecorder) CollectRemoteDuplicateRows(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CollectRemoteDuplicateRows", reflect.TypeOf((*MockBackend)(nil).CollectRemoteDuplicateRows), arg0, arg1, arg2, arg3) -} - // EngineFileSizes mocks base method. func (m *MockBackend) EngineFileSizes() []backend.EngineFileSize { m.ctrl.T.Helper() @@ -210,20 +178,6 @@ func (mr *MockBackendMockRecorder) ResetEngine(arg0, arg1 interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetEngine", reflect.TypeOf((*MockBackend)(nil).ResetEngine), arg0, arg1) } -// ResolveDuplicateRows mocks base method. -func (m *MockBackend) ResolveDuplicateRows(arg0 context.Context, arg1 table.Table, arg2 string, arg3 config.DuplicateResolutionAlgorithm) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ResolveDuplicateRows", arg0, arg1, arg2, arg3) - ret0, _ := ret[0].(error) - return ret0 -} - -// ResolveDuplicateRows indicates an expected call of ResolveDuplicateRows. -func (mr *MockBackendMockRecorder) ResolveDuplicateRows(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResolveDuplicateRows", reflect.TypeOf((*MockBackend)(nil).ResolveDuplicateRows), arg0, arg1, arg2, arg3) -} - // RetryImportDelay mocks base method. func (m *MockBackend) RetryImportDelay() time.Duration { m.ctrl.T.Helper() diff --git a/ddl/ingest/backend.go b/ddl/ingest/backend.go index beb3d529b39a4..be913a87a7576 100644 --- a/ddl/ingest/backend.go +++ b/ddl/ingest/backend.go @@ -19,7 +19,10 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" + "github.com/pingcap/tidb/br/pkg/lightning/backend/local" lightning "github.com/pingcap/tidb/br/pkg/lightning/config" + "github.com/pingcap/tidb/br/pkg/lightning/errormanager" + "github.com/pingcap/tidb/br/pkg/lightning/log" tikv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/table" @@ -55,7 +58,12 @@ func (bc *BackendContext) FinishImport(indexID int64, unique bool, tbl table.Tab // Check remote duplicate value for the index. if unique { - hasDupe, err := bc.backend.CollectRemoteDuplicateRows(bc.ctx, tbl, tbl.Meta().Name.L, &encode.SessionOptions{ + errorMgr := errormanager.New(nil, bc.cfg, log.Logger{Logger: logutil.BgLogger()}) + // backend must be a local backend. + // todo: when we can separate local backend completely from tidb backend, will remove this cast. + //nolint:forcetypeassert + dupeController := bc.backend.Inner().(*local.Local).GetDupeController(bc.cfg.TikvImporter.RangeConcurrency*2, errorMgr) + hasDupe, err := dupeController.CollectRemoteDuplicateRows(bc.ctx, tbl, tbl.Meta().Name.L, &encode.SessionOptions{ SQLMode: mysql.ModeStrictAllTables, SysVars: bc.sysVars, IndexID: ei.indexID, diff --git a/ddl/ingest/backend_mgr.go b/ddl/ingest/backend_mgr.go index 9b70f9d4da93f..d5a19a85b485e 100644 --- a/ddl/ingest/backend_mgr.go +++ b/ddl/ingest/backend_mgr.go @@ -24,9 +24,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend/local" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/br/pkg/lightning/config" - "github.com/pingcap/tidb/br/pkg/lightning/errormanager" "github.com/pingcap/tidb/br/pkg/lightning/glue" - "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" @@ -88,7 +86,6 @@ func createLocalBackend(ctx context.Context, cfg *Config, glue glue.Glue) (backe } logutil.BgLogger().Info("[ddl-ingest] create local backend for adding index", zap.String("keyspaceName", cfg.KeyspaceName)) - errorMgr := errormanager.New(nil, cfg.Lightning, log.Logger{Logger: logutil.BgLogger()}) db, err := glue.GetDB() if err != nil { return backend.Backend{}, err @@ -96,7 +93,8 @@ func createLocalBackend(ctx context.Context, cfg *Config, glue glue.Glue) (backe regionSizeGetter := &local.TableRegionSizeGetterImpl{ DB: db, } - return local.NewLocalBackend(ctx, tls, cfg.Lightning, regionSizeGetter, int(LitRLimit), errorMgr, cfg.KeyspaceName) + backendConfig := local.NewBackendConfig(cfg.Lightning, int(LitRLimit), cfg.KeyspaceName) + return local.NewLocalBackend(ctx, tls, backendConfig, regionSizeGetter) } func newBackendContext(ctx context.Context, jobID int64, be *backend.Backend, diff --git a/disttask/loaddata/wrapper.go b/disttask/loaddata/wrapper.go index 4aaaef5949fff..715fbc3cd86bd 100644 --- a/disttask/loaddata/wrapper.go +++ b/disttask/loaddata/wrapper.go @@ -80,7 +80,8 @@ func makeTableRegions(ctx context.Context, task *TaskMeta, concurrency int) ([]* }, } - return mydump.MakeTableRegions(ctx, meta, len(task.Table.TargetColumns), cfg, nil, store) + dataDivideConfig := mydump.NewDataDivideConfig(cfg, len(task.Table.TargetColumns), nil, store, meta) + return mydump.MakeTableRegions(ctx, dataDivideConfig) } func transformSourceType(tp string) (mydump.SourceType, error) {