From 213932fa29123e6313fe0ca91a95c3c31cddfb17 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 21 Feb 2023 19:22:31 +0800 Subject: [PATCH 01/19] executor: refine LOAD DATA logic --- executor/builder.go | 43 +-- executor/load_data.go | 378 ++++++++++++++------------ executor/loadremotetest/error_test.go | 53 ++++ executor/writetest/write_test.go | 34 +-- server/conn.go | 35 ++- 5 files changed, 308 insertions(+), 235 deletions(-) create mode 100644 executor/loadremotetest/error_test.go diff --git a/executor/builder.go b/executor/builder.go index 26807d47f674d..cf5d5b346f8ba 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -938,46 +938,19 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor { b.err = errors.Errorf("Can not get table %d", v.Table.TableInfo.ID) return nil } - insertVal := &InsertValues{ - baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()), - Table: tbl, - Columns: v.Columns, - GenExprs: v.GenCols.Exprs, - isLoadData: true, - txnInUse: sync.Mutex{}, - } - loadDataInfo := &LoadDataInfo{ - row: make([]types.Datum, 0, len(insertVal.insertColumns)), - InsertValues: insertVal, - Path: v.Path, - Format: v.Format, - Table: tbl, - FieldsInfo: v.FieldsInfo, - LinesInfo: v.LinesInfo, - NullInfo: v.NullInfo, - IgnoreLines: v.IgnoreLines, - ColumnAssignments: v.ColumnAssignments, - ColumnsAndUserVars: v.ColumnsAndUserVars, - OnDuplicate: v.OnDuplicate, - Ctx: b.ctx, - } - columnNames := loadDataInfo.initFieldMappings() - err := loadDataInfo.initLoadColumns(columnNames) + var defaultLoadDataBatchCnt uint64 = 20000 // TODO this will be changed to variable in another pr + worker, err := NewLoadDataWorker(b.ctx, v, tbl, defaultLoadDataBatchCnt) if err != nil { b.err = err return nil } - loadDataExec := &LoadDataExec{ - baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()), - FileLocRef: v.FileLocRef, - OnDuplicate: v.OnDuplicate, - loadDataInfo: loadDataInfo, - } - var defaultLoadDataBatchCnt uint64 = 20000 // TODO this will be changed to variable in another pr - loadDataExec.loadDataInfo.initQueues() - loadDataExec.loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt) - return loadDataExec + return &LoadDataExec{ + baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()), + FileLocRef: v.FileLocRef, + OnDuplicate: v.OnDuplicate, + loadDataWorker: worker, + } } func (b *executorBuilder) buildLoadStats(v *plannercore.LoadStats) Executor { diff --git a/executor/load_data.go b/executor/load_data.go index 5d206bd0149bf..c6c9a3eb2e96f 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" + plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/table" @@ -64,37 +65,37 @@ var ( type LoadDataExec struct { baseExecutor - FileLocRef ast.FileLocRefTp - OnDuplicate ast.OnDuplicateKeyHandlingType - loadDataInfo *LoadDataInfo + FileLocRef ast.FileLocRefTp + OnDuplicate ast.OnDuplicateKeyHandlingType + loadDataWorker *LoadDataWorker } // Next implements the Executor Next interface. func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error { req.GrowAndReset(e.maxChunkSize) - if e.loadDataInfo.Path == "" { + if e.loadDataWorker.Path == "" { return errors.New("Load Data: infile path is empty") } - if !e.loadDataInfo.Table.Meta().IsBaseTable() { + if !e.loadDataWorker.Table.Meta().IsBaseTable() { return errors.New("can only load data into base tables") } // CSV-like - if e.loadDataInfo.Format == "" { - if e.loadDataInfo.NullInfo != nil && e.loadDataInfo.NullInfo.OptEnclosed && - (e.loadDataInfo.FieldsInfo == nil || e.loadDataInfo.FieldsInfo.Enclosed == nil) { + if e.loadDataWorker.Format == "" { + if e.loadDataWorker.NullInfo != nil && e.loadDataWorker.NullInfo.OptEnclosed && + (e.loadDataWorker.FieldsInfo == nil || e.loadDataWorker.FieldsInfo.Enclosed == nil) { return errors.New("must specify FIELDS [OPTIONALLY] ENCLOSED BY when use NULL DEFINED BY OPTIONALLY ENCLOSED") } // TODO: support lines terminated is "". - if len(e.loadDataInfo.LinesInfo.Terminated) == 0 { + if len(e.loadDataWorker.LinesInfo.Terminated) == 0 { return errors.New("Load Data: don't support load data terminated is nil") } } switch e.FileLocRef { case ast.FileLocServerOrRemote: - u, err := storage.ParseRawURL(e.loadDataInfo.Path) + u, err := storage.ParseRawURL(e.loadDataWorker.Path) if err != nil { return err } @@ -110,13 +111,13 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error { return e.loadFromRemote(ctx, b, filename) case ast.FileLocClient: // let caller use handleQuerySpecial to read data in this connection - sctx := e.loadDataInfo.ctx + sctx := e.loadDataWorker.ctx val := sctx.Value(LoadDataVarKey) if val != nil { sctx.SetValue(LoadDataVarKey, nil) return errors.New("Load Data: previous load data option wasn't closed normally") } - sctx.SetValue(LoadDataVarKey, e.loadDataInfo) + sctx.SetValue(LoadDataVarKey, e.loadDataWorker) } return nil } @@ -140,28 +141,28 @@ func (e *LoadDataExec) loadFromRemote( } defer fileReader.Close() - e.loadDataInfo.loadRemoteInfo = loadRemoteInfo{ + e.loadDataWorker.loadRemoteInfo = loadRemoteInfo{ store: s, path: filename, } - return e.loadDataInfo.Load(ctx, fileReader) + return e.loadDataWorker.Load(ctx, fileReader) } // Close implements the Executor Close interface. func (e *LoadDataExec) Close() error { - if e.runtimeStats != nil && e.loadDataInfo != nil && e.loadDataInfo.stats != nil { - defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.loadDataInfo.stats) + if e.runtimeStats != nil && e.loadDataWorker != nil && e.loadDataWorker.stats != nil { + defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.loadDataWorker.stats) } return nil } // Open implements the Executor Open interface. func (e *LoadDataExec) Open(ctx context.Context) error { - if e.loadDataInfo.insertColumns != nil { - e.loadDataInfo.initEvalBuffer() + if e.loadDataWorker.insertColumns != nil { + e.loadDataWorker.initEvalBuffer() } // Init for runtime stats. - e.loadDataInfo.collectRuntimeStatsEnabled() + e.loadDataWorker.collectRuntimeStatsEnabled() return nil } @@ -176,9 +177,8 @@ type loadRemoteInfo struct { path string } -// LoadDataInfo saves the information of loading data operation. -// TODO: rename it and remove unnecessary public methods. -type LoadDataInfo struct { +// LoadDataWorker does a LOAD DATA job. +type LoadDataWorker struct { *InsertValues row []types.Datum @@ -191,7 +191,6 @@ type LoadDataInfo struct { IgnoreLines uint64 Ctx sessionctx.Context rows [][]types.Datum - Drained bool ColumnAssignments []*ast.Assignment ColumnsAndUserVars []*ast.ColumnNameOrUserVar @@ -205,16 +204,171 @@ type LoadDataInfo struct { loadRemoteInfo loadRemoteInfo } +// NewLoadDataWorker creates a new LoadDataWorker that is ready to work. +func NewLoadDataWorker( + sctx sessionctx.Context, + plan *plannercore.LoadData, + tbl table.Table, + batchSize uint64, +) (*LoadDataWorker, error) { + insertVal := &InsertValues{ + baseExecutor: newBaseExecutor(sctx, nil, plan.ID()), + Table: tbl, + Columns: plan.Columns, + GenExprs: plan.GenCols.Exprs, + isLoadData: true, + txnInUse: sync.Mutex{}, + maxRowsInBatch: uint64(sctx.GetSessionVars().DMLBatchSize), + } + loadDataWorker := &LoadDataWorker{ + row: make([]types.Datum, 0, len(insertVal.insertColumns)), + InsertValues: insertVal, + Path: plan.Path, + Format: plan.Format, + Table: tbl, + FieldsInfo: plan.FieldsInfo, + LinesInfo: plan.LinesInfo, + NullInfo: plan.NullInfo, + IgnoreLines: plan.IgnoreLines, + ColumnAssignments: plan.ColumnAssignments, + ColumnsAndUserVars: plan.ColumnsAndUserVars, + OnDuplicate: plan.OnDuplicate, + Ctx: sctx, + } + columnNames := loadDataWorker.initFieldMappings() + err := loadDataWorker.initLoadColumns(columnNames) + if err != nil { + return nil, err + } + loadDataWorker.initQueues() + loadDataWorker.ResetBatch() + return loadDataWorker, nil +} + // FieldMapping indicates the relationship between input field and table column or user variable type FieldMapping struct { Column *table.Column UserVar *ast.VariableExpr } +// initFieldMappings make a field mapping slice to implicitly map input field to table column or user defined variable +// the slice's order is the same as the order of the input fields. +// Returns a slice of same ordered column names without user defined variable names. +func (e *LoadDataWorker) initFieldMappings() []string { + columns := make([]string, 0, len(e.ColumnsAndUserVars)+len(e.ColumnAssignments)) + tableCols := e.Table.Cols() + + if len(e.ColumnsAndUserVars) == 0 { + for _, v := range tableCols { + fieldMapping := &FieldMapping{ + Column: v, + } + e.FieldMappings = append(e.FieldMappings, fieldMapping) + columns = append(columns, v.Name.O) + } + + return columns + } + + var column *table.Column + + for _, v := range e.ColumnsAndUserVars { + if v.ColumnName != nil { + column = table.FindCol(tableCols, v.ColumnName.Name.O) + columns = append(columns, v.ColumnName.Name.O) + } else { + column = nil + } + + fieldMapping := &FieldMapping{ + Column: column, + UserVar: v.UserVar, + } + e.FieldMappings = append(e.FieldMappings, fieldMapping) + } + + return columns +} + +// initLoadColumns sets columns which the input fields loaded to. +func (e *LoadDataWorker) initLoadColumns(columnNames []string) error { + var cols []*table.Column + var missingColName string + var err error + tableCols := e.Table.Cols() + + if len(columnNames) != len(tableCols) { + for _, v := range e.ColumnAssignments { + columnNames = append(columnNames, v.Column.Name.O) + } + } + + cols, missingColName = table.FindCols(tableCols, columnNames, e.Table.Meta().PKIsHandle) + if missingColName != "" { + return dbterror.ErrBadField.GenWithStackByArgs(missingColName, "field list") + } + + for _, col := range cols { + if !col.IsGenerated() { + e.insertColumns = append(e.insertColumns, col) + } + if col.Name.L == model.ExtraHandleName.L { + if !e.ctx.GetSessionVars().AllowWriteRowID { + return errors.Errorf("load data statement for _tidb_rowid are not supported") + } + e.hasExtraHandle = true + break + } + } + + // e.insertColumns is appended according to the original tables' column sequence. + // We have to reorder it to follow the use-specified column order which is shown in the columnNames. + if err = e.reorderColumns(columnNames); err != nil { + return err + } + + e.rowLen = len(e.insertColumns) + // Check column whether is specified only once. + err = table.CheckOnce(cols) + if err != nil { + return err + } + + return nil +} + +// reorderColumns reorder the e.insertColumns according to the order of columnNames +// Note: We must ensure there must be one-to-one mapping between e.insertColumns and columnNames in terms of column name. +func (e *LoadDataWorker) reorderColumns(columnNames []string) error { + cols := e.insertColumns + + if len(cols) != len(columnNames) { + return ErrColumnsNotMatched + } + + reorderedColumns := make([]*table.Column, len(cols)) + + if columnNames == nil { + return nil + } + + mapping := make(map[string]int) + for idx, colName := range columnNames { + mapping[strings.ToLower(colName)] = idx + } + + for _, col := range cols { + idx := mapping[col.Name.L] + reorderedColumns[idx] = col + } + + e.insertColumns = reorderedColumns + + return nil +} + // Load reads from readerFn and do load data job. -func (e *LoadDataInfo) Load(ctx context.Context, reader io.ReadSeekCloser) error { - e.initQueues() - e.SetMaxRowsInBatch(uint64(e.Ctx.GetSessionVars().DMLBatchSize)) +func (e *LoadDataWorker) Load(ctx context.Context, reader io.ReadSeekCloser) error { e.startStopWatcher() // let stop watcher goroutine quit defer e.forceQuit() @@ -232,7 +386,7 @@ func (e *LoadDataInfo) Load(ctx context.Context, reader io.ReadSeekCloser) error } // processStream process input stream from network -func processStream(ctx context.Context, reader io.ReadSeekCloser, loadDataInfo *LoadDataInfo, wg *sync.WaitGroup) { +func processStream(ctx context.Context, reader io.ReadSeekCloser, loadDataInfo *LoadDataWorker, wg *sync.WaitGroup) { var ( parser mydump.Parser err error @@ -251,7 +405,7 @@ func processStream(ctx context.Context, reader io.ReadSeekCloser, loadDataInfo * if err != nil || r != nil { loadDataInfo.forceQuit() } else { - loadDataInfo.CloseTaskQueue() + loadDataInfo.closeTaskQueue() } wg.Done() }() @@ -313,146 +467,31 @@ func processStream(ctx context.Context, reader io.ReadSeekCloser, loadDataInfo * } } -// reorderColumns reorder the e.insertColumns according to the order of columnNames -// Note: We must ensure there must be one-to-one mapping between e.insertColumns and columnNames in terms of column name. -func (e *LoadDataInfo) reorderColumns(columnNames []string) error { - cols := e.insertColumns - - if len(cols) != len(columnNames) { - return ErrColumnsNotMatched - } - - reorderedColumns := make([]*table.Column, len(cols)) - - if columnNames == nil { - return nil - } - - mapping := make(map[string]int) - for idx, colName := range columnNames { - mapping[strings.ToLower(colName)] = idx - } - - for _, col := range cols { - idx := mapping[col.Name.L] - reorderedColumns[idx] = col - } - - e.insertColumns = reorderedColumns - - return nil -} - -// initLoadColumns sets columns which the input fields loaded to. -func (e *LoadDataInfo) initLoadColumns(columnNames []string) error { - var cols []*table.Column - var missingColName string - var err error - tableCols := e.Table.Cols() - - if len(columnNames) != len(tableCols) { - for _, v := range e.ColumnAssignments { - columnNames = append(columnNames, v.Column.Name.O) - } - } - - cols, missingColName = table.FindCols(tableCols, columnNames, e.Table.Meta().PKIsHandle) - if missingColName != "" { - return dbterror.ErrBadField.GenWithStackByArgs(missingColName, "field list") - } - - for _, col := range cols { - if !col.IsGenerated() { - e.insertColumns = append(e.insertColumns, col) - } - if col.Name.L == model.ExtraHandleName.L { - if !e.ctx.GetSessionVars().AllowWriteRowID { - return errors.Errorf("load data statement for _tidb_rowid are not supported") - } - e.hasExtraHandle = true - break - } - } - - // e.insertColumns is appended according to the original tables' column sequence. - // We have to reorder it to follow the use-specified column order which is shown in the columnNames. - if err = e.reorderColumns(columnNames); err != nil { - return err - } - - e.rowLen = len(e.insertColumns) - // Check column whether is specified only once. - err = table.CheckOnce(cols) - if err != nil { - return err - } - - return nil -} - -// initFieldMappings make a field mapping slice to implicitly map input field to table column or user defined variable -// the slice's order is the same as the order of the input fields. -// Returns a slice of same ordered column names without user defined variable names. -func (e *LoadDataInfo) initFieldMappings() []string { - columns := make([]string, 0, len(e.ColumnsAndUserVars)+len(e.ColumnAssignments)) - tableCols := e.Table.Cols() - - if len(e.ColumnsAndUserVars) == 0 { - for _, v := range tableCols { - fieldMapping := &FieldMapping{ - Column: v, - } - e.FieldMappings = append(e.FieldMappings, fieldMapping) - columns = append(columns, v.Name.O) - } - - return columns - } - - var column *table.Column - - for _, v := range e.ColumnsAndUserVars { - if v.ColumnName != nil { - column = table.FindCol(tableCols, v.ColumnName.Name.O) - columns = append(columns, v.ColumnName.Name.O) - } else { - column = nil - } - - fieldMapping := &FieldMapping{ - Column: column, - UserVar: v.UserVar, - } - e.FieldMappings = append(e.FieldMappings, fieldMapping) - } - - return columns -} - // GetRows getter for rows -func (e *LoadDataInfo) GetRows() [][]types.Datum { +func (e *LoadDataWorker) GetRows() [][]types.Datum { return e.rows } // GetCurBatchCnt getter for curBatchCnt -func (e *LoadDataInfo) GetCurBatchCnt() uint64 { +func (e *LoadDataWorker) GetCurBatchCnt() uint64 { return e.curBatchCnt } -// CloseTaskQueue preparing routine to inform commit routine no more data -func (e *LoadDataInfo) CloseTaskQueue() { +// closeTaskQueue preparing routine to inform commit routine no more data +func (e *LoadDataWorker) closeTaskQueue() { close(e.commitTaskQueue) } // initQueues initialize task queue and error report queue -func (e *LoadDataInfo) initQueues() { +// TODO: this is needed? +func (e *LoadDataWorker) initQueues() { e.commitTaskQueue = make(chan commitTask, taskQueueSize) e.StopCh = make(chan struct{}, 2) e.QuitCh = make(chan struct{}) } // startStopWatcher monitor StopCh to force quit -func (e *LoadDataInfo) startStopWatcher() { +func (e *LoadDataWorker) startStopWatcher() { go func() { <-e.StopCh close(e.QuitCh) @@ -460,17 +499,17 @@ func (e *LoadDataInfo) startStopWatcher() { } // forceQuit let commit quit directly -func (e *LoadDataInfo) forceQuit() { +func (e *LoadDataWorker) forceQuit() { e.StopCh <- struct{}{} } -// makeCommitTask produce commit task with data in LoadDataInfo.rows LoadDataInfo.curBatchCnt -func (e *LoadDataInfo) makeCommitTask() commitTask { +// makeCommitTask produce commit task with data in LoadDataWorker.rows LoadDataWorker.curBatchCnt +func (e *LoadDataWorker) makeCommitTask() commitTask { return commitTask{e.curBatchCnt, e.rows} } // enqOneTask feed one batch commit task to commit work -func (e *LoadDataInfo) enqOneTask(ctx context.Context) error { +func (e *LoadDataWorker) enqOneTask(ctx context.Context) error { var err error if e.curBatchCnt > 0 { select { @@ -481,13 +520,13 @@ func (e *LoadDataInfo) enqOneTask(ctx context.Context) error { return err } // reset rows buffer, will reallocate buffer but NOT reuse - e.SetMaxRowsInBatch(e.maxRowsInBatch) + e.ResetBatch() } return err } -// CommitOneTask insert Data from LoadDataInfo.rows, then make commit and refresh txn -func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task commitTask) error { +// commitOneTask insert Data from LoadDataWorker.rows, then make commit and refresh txn +func (e *LoadDataWorker) commitOneTask(ctx context.Context, task commitTask) error { var err error defer func() { if err != nil { @@ -515,7 +554,7 @@ func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task commitTask) error } // commitWork commit batch sequentially -func (e *LoadDataInfo) commitWork(ctx context.Context) error { +func (e *LoadDataWorker) commitWork(ctx context.Context) error { var err error defer func() { r := recover() @@ -542,7 +581,7 @@ func (e *LoadDataInfo) commitWork(ctx context.Context) error { case commitTask, ok := <-e.commitTaskQueue: if ok { start := time.Now() - err = e.CommitOneTask(ctx, commitTask) + err = e.commitOneTask(ctx, commitTask) if err != nil { break } @@ -569,10 +608,9 @@ func (e *LoadDataInfo) commitWork(ctx context.Context) error { return err } -// SetMaxRowsInBatch sets the max number of rows to insert in a batch. -func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) { - e.maxRowsInBatch = limit - e.rows = make([][]types.Datum, 0, limit) +// ResetBatch reset the inner batch. +func (e *LoadDataWorker) ResetBatch() { + e.rows = make([][]types.Datum, 0, e.maxRowsInBatch) e.curBatchCnt = 0 } @@ -581,7 +619,7 @@ func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) { // will also return nil. // The result rows are saved in e.rows and update some members, caller can check // if curBatchCnt == 0 to know if reached EOF. -func (e *LoadDataInfo) ReadRows(ctx context.Context, parser mydump.Parser) error { +func (e *LoadDataWorker) ReadRows(ctx context.Context, parser mydump.Parser) error { ignoreOneLineFn := parser.ReadRow if csvParser, ok := parser.(*mydump.CSVParser); ok { ignoreOneLineFn = func() error { @@ -622,7 +660,7 @@ func (e *LoadDataInfo) ReadRows(ctx context.Context, parser mydump.Parser) error } // CheckAndInsertOneBatch is used to commit one transaction batch full filled data -func (e *LoadDataInfo) CheckAndInsertOneBatch(ctx context.Context, rows [][]types.Datum, cnt uint64) error { +func (e *LoadDataWorker) CheckAndInsertOneBatch(ctx context.Context, rows [][]types.Datum, cnt uint64) error { if e.stats != nil && e.stats.BasicRuntimeStats != nil { // Since this method will not call by executor Next, // so we need record the basic executor runtime stats by ourself. @@ -651,7 +689,7 @@ func (e *LoadDataInfo) CheckAndInsertOneBatch(ctx context.Context, rows [][]type // SetMessage sets info message(ERR_LOAD_INFO) generated by LOAD statement, it is public because of the special way that // LOAD statement is handled. -func (e *LoadDataInfo) SetMessage() { +func (e *LoadDataWorker) SetMessage() { stmtCtx := e.ctx.GetSessionVars().StmtCtx numRecords := stmtCtx.RecordRows() numDeletes := stmtCtx.DeletedRows() @@ -662,7 +700,7 @@ func (e *LoadDataInfo) SetMessage() { } // colsToRow encodes the data of parser output. -func (e *LoadDataInfo) colsToRow(ctx context.Context, cols []types.Datum) []types.Datum { +func (e *LoadDataWorker) colsToRow(ctx context.Context, cols []types.Datum) []types.Datum { row := make([]types.Datum, 0, len(e.insertColumns)) sessionVars := e.Ctx.GetSessionVars() setVar := func(name string, col *types.Datum) { @@ -725,7 +763,7 @@ func (e *LoadDataInfo) colsToRow(ctx context.Context, cols []types.Datum) []type return newRow } -func (e *LoadDataInfo) addRecordLD(ctx context.Context, row []types.Datum) error { +func (e *LoadDataWorker) addRecordLD(ctx context.Context, row []types.Datum) error { if row == nil { return nil } @@ -737,8 +775,8 @@ func (e *LoadDataInfo) addRecordLD(ctx context.Context, row []types.Datum) error return nil } -// GenerateCSVConfig generates a CSV config for parser from LoadDataInfo. -func (e *LoadDataInfo) GenerateCSVConfig() *config.CSVConfig { +// GenerateCSVConfig generates a CSV config for parser from LoadDataWorker. +func (e *LoadDataWorker) GenerateCSVConfig() *config.CSVConfig { var ( nullDef []string quotedNullIsText = true diff --git a/executor/loadremotetest/error_test.go b/executor/loadremotetest/error_test.go new file mode 100644 index 0000000000000..5fd3dc64f479f --- /dev/null +++ b/executor/loadremotetest/error_test.go @@ -0,0 +1,53 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package loadremotetest + +import ( + "testing" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/parser/terror" + "github.com/stretchr/testify/require" +) + +func checkClientErrorMessage(t *testing.T, err error, msg string) { + require.Error(t, err) + cause := errors.Cause(err) + terr, ok := cause.(*errors.Error) + require.True(t, ok, "%T", cause) + require.Contains(t, terror.ToSQLError(terr).Error(), msg) +} + +func (s *mockGCSSuite) TestErrorMessage() { + s.tk.MustExec("DROP DATABASE IF EXISTS load_csv;") + + err := s.tk.ExecToErr("LOAD DATA INFILE 'gs://1' INTO TABLE t") + checkClientErrorMessage(s.T(), err, "ERROR 1046 (3D000): No database selected") + err = s.tk.ExecToErr("LOAD DATA INFILE 'gs://1' INTO TABLE wrongdb.t") + checkClientErrorMessage(s.T(), err, "ERROR 1146 (42S02): Table 'wrongdb.t' doesn't exist") + + s.tk.MustExec("CREATE DATABASE load_csv;") + s.tk.MustExec("USE load_csv;") + s.tk.MustExec("CREATE TABLE t (i INT, s varchar(32));") + + err = s.tk.ExecToErr("LOAD DATA INFILE 'gs://1' INTO TABLE t (wrong)") + checkClientErrorMessage(s.T(), err, "ERROR 1054 (42S22): Unknown column 'wrong' in 'field list'") + // This behaviour is different from MySQL + err = s.tk.ExecToErr("LOAD DATA INFILE 'gs://1' INTO TABLE t (i,i)") + checkClientErrorMessage(s.T(), err, "ERROR 1110 (42000): Column 'i' specified twice") + err = s.tk.ExecToErr("LOAD DATA INFILE 'gs://1' INTO TABLE t (@v) SET wrong=@v") + checkClientErrorMessage(s.T(), err, "ERROR 1054 (42S22): Unknown column 'wrong' in 'field list'") + +} diff --git a/executor/writetest/write_test.go b/executor/writetest/write_test.go index 021e678708817..fdd8991a10f2e 100644 --- a/executor/writetest/write_test.go +++ b/executor/writetest/write_test.go @@ -1874,7 +1874,7 @@ type testCase struct { func checkCases( tests []testCase, - ld *executor.LoadDataInfo, + ld *executor.LoadDataWorker, t *testing.T, tk *testkit.TestKit, ctx sessionctx.Context, @@ -1903,7 +1903,7 @@ func checkCases( require.NoError(t, err1) err1 = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt()) require.NoError(t, err1) - ld.SetMaxRowsInBatch(20000) + ld.ResetBatch() ld.SetMessage() require.Equal(t, tt.expectedMsg, tk.Session().LastMessage()) ctx.StmtCommit(context.Background()) @@ -1924,7 +1924,7 @@ func TestLoadDataMissingColumn(t *testing.T) { tk.MustExec(createSQL) tk.MustExec("load data local infile '/tmp/nonexistence.csv' ignore into table load_data_missing") ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -1974,7 +1974,7 @@ func TestIssue18681(t *testing.T) { tk.MustExec(createSQL) tk.MustExec("load data local infile '/tmp/nonexistence.csv' ignore into table load_data_test") ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -1983,7 +1983,7 @@ func TestIssue18681(t *testing.T) { selectSQL := "select bin(a), bin(b), bin(c), bin(d) from load_data_test;" ctx.GetSessionVars().StmtCtx.DupKeyAsWarning = true ctx.GetSessionVars().StmtCtx.BadNullAsWarning = true - ld.SetMaxRowsInBatch(20000) + ld.ResetBatch() sc := ctx.GetSessionVars().StmtCtx originIgnoreTruncate := sc.IgnoreTruncate @@ -2026,7 +2026,7 @@ func TestIssue34358(t *testing.T) { tk.MustExec("create table load_data_test (a varchar(10), b varchar(10))") tk.MustExec("load data local infile '/tmp/nonexistence.csv' into table load_data_test ( @v1, @v2 ) set a = @v1, b = @v2") - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) require.NotNil(t, ld) checkCases([]testCase{ @@ -2048,7 +2048,7 @@ func TestLoadData(t *testing.T) { require.Error(t, err) tk.MustExec("load data local infile '/tmp/nonexistence.csv' ignore into table load_data_test") ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -2070,7 +2070,7 @@ func TestLoadData(t *testing.T) { require.NoError(t, err) err = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt()) require.NoError(t, err) - ld.SetMaxRowsInBatch(20000) + ld.ResetBatch() r := tk.MustQuery(selectSQL) r.Check(nil) @@ -2229,7 +2229,7 @@ func TestLoadDataEscape(t *testing.T) { tk.MustExec("CREATE TABLE load_data_test (id INT NOT NULL PRIMARY KEY, value TEXT NOT NULL) CHARACTER SET utf8") tk.MustExec("load data local infile '/tmp/nonexistence.csv' into table load_data_test") ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -2260,7 +2260,7 @@ func TestLoadDataSpecifiedColumns(t *testing.T) { tk.MustExec(`create table load_data_test (id int PRIMARY KEY AUTO_INCREMENT, c1 int, c2 varchar(255) default "def", c3 int default 0);`) tk.MustExec("load data local infile '/tmp/nonexistence.csv' into table load_data_test (c1, c2)") ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -2286,7 +2286,7 @@ func TestLoadDataIgnoreLines(t *testing.T) { tk.MustExec("CREATE TABLE load_data_test (id INT NOT NULL PRIMARY KEY, value TEXT NOT NULL) CHARACTER SET utf8") tk.MustExec("load data local infile '/tmp/nonexistence.csv' into table load_data_test ignore 1 lines") ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -2311,7 +2311,7 @@ func TestLoadDataNULL(t *testing.T) { tk.MustExec(`load data local infile '/tmp/nonexistence.csv' into table load_data_test FIELDS TERMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\n';`) ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -2338,7 +2338,7 @@ func TestLoadDataReplace(t *testing.T) { tk.MustExec("INSERT INTO load_data_replace VALUES(1,'val 1'),(2,'val 2')") tk.MustExec("LOAD DATA LOCAL INFILE '/tmp/nonexistence.csv' REPLACE INTO TABLE load_data_replace") ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -2359,7 +2359,7 @@ func TestLoadDataOverflowBigintUnsigned(t *testing.T) { tk.MustExec("CREATE TABLE load_data_test (a bigint unsigned);") tk.MustExec("load data local infile '/tmp/nonexistence.csv' into table load_data_test") ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -2380,7 +2380,7 @@ func TestLoadDataWithUppercaseUserVars(t *testing.T) { tk.MustExec("load data local infile '/tmp/nonexistence.csv' into table load_data_test (@V1)" + " set a = @V1, b = @V1*100") ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -2402,7 +2402,7 @@ func TestLoadDataIntoPartitionedTable(t *testing.T) { "partition p2 values less than (11))") tk.MustExec("load data local infile '/tmp/nonexistence.csv' into table range_t fields terminated by ','") ctx := tk.Session().(sessionctx.Context) - ld := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.Nil(t, sessiontxn.NewTxn(context.Background(), ctx)) parser, err := mydump.NewCSVParser( @@ -2419,7 +2419,7 @@ func TestLoadDataIntoPartitionedTable(t *testing.T) { require.NoError(t, err) err = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt()) require.NoError(t, err) - ld.SetMaxRowsInBatch(20000) + ld.ResetBatch() ld.SetMessage() ctx.StmtCommit(context.Background()) txn, err := ctx.Txn(true) diff --git a/server/conn.go b/server/conn.go index 9b7117390aebd..058921e1d37ee 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1616,23 +1616,29 @@ func (cc *clientConn) writeReq(ctx context.Context, filePath string) error { // handleLoadData does the additional work after processing the 'load data' query. // It sends client a file path, then reads the file content from client, inserts data into database. -func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor.LoadDataInfo) error { +func (cc *clientConn) handleLoadData(ctx context.Context, loadDataWorker *executor.LoadDataWorker) error { // If the server handles the load data request, the client has to set the ClientLocalFiles capability. if cc.capability&mysql.ClientLocalFiles == 0 { return errNotAllowedCommand } - if loadDataInfo == nil { + if loadDataWorker == nil { return errors.New("load data info is empty") } - err := cc.writeReq(ctx, loadDataInfo.Path) + err := cc.writeReq(ctx, loadDataWorker.Path) if err != nil { return err } - // use Pipe to convert cc.readPacket to io.Reader - r, w := io.Pipe() + var ( + // use Pipe to convert cc.readPacket to io.Reader + r, w = io.Pipe() + drained bool + wg sync.WaitGroup + ) + wg.Add(1) go func() { + defer wg.Done() //nolint: errcheck defer w.Close() @@ -1649,7 +1655,7 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor } // https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_query_response_local_infile_request.html if len(data) == 0 { - loadDataInfo.Drained = true + drained = true return } } @@ -1663,16 +1669,19 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor } }() - err = loadDataInfo.Load(ctx, executor.NewSimpleSeekerOnReadCloser(r)) + err = loadDataWorker.Load(ctx, executor.NewSimpleSeekerOnReadCloser(r)) + _ = r.Close() + wg.Done() + if err != nil { - if !loadDataInfo.Drained { + if !drained { logutil.Logger(ctx).Info("not drained yet, try reading left data from client connection") } // drain the data from client conn util empty packet received, otherwise the connection will be reset // https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_query_response_local_infile_request.html - for !loadDataInfo.Drained { + for !drained { // check kill flag again, let the draining loop could quit if empty packet could not be received - if atomic.CompareAndSwapUint32(&loadDataInfo.Ctx.GetSessionVars().Killed, 1, 0) { + if atomic.CompareAndSwapUint32(&loadDataWorker.Ctx.GetSessionVars().Killed, 1, 0) { logutil.Logger(ctx).Warn("receiving kill, stop draining data, connection may be reset") return executor.ErrQueryInterrupted } @@ -1682,13 +1691,13 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor break } if len(curData) == 0 { - loadDataInfo.Drained = true + drained = true logutil.Logger(ctx).Info("draining finished for error", zap.Error(err)) break } } } - loadDataInfo.SetMessage() + loadDataWorker.SetMessage() return err } @@ -2083,7 +2092,7 @@ func (cc *clientConn) handleFileTransInConn(ctx context.Context, status uint16) handled = true defer cc.ctx.SetValue(executor.LoadDataVarKey, nil) //nolint:forcetypeassert - if err := cc.handleLoadData(ctx, loadDataInfo.(*executor.LoadDataInfo)); err != nil { + if err := cc.handleLoadData(ctx, loadDataInfo.(*executor.LoadDataWorker)); err != nil { return handled, err } } From 0adb0e6b340ebca13b337447c9475a75a2294d71 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 22 Feb 2023 15:58:52 +0800 Subject: [PATCH 02/19] finish refine code Signed-off-by: lance6716 --- executor/builder.go | 3 +- executor/load_data.go | 433 ++++++++++++---------------- executor/loadremotetest/BUILD.bazel | 4 + server/conn.go | 1 - 4 files changed, 190 insertions(+), 251 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index cf5d5b346f8ba..339acc8361c75 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -938,8 +938,7 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor { b.err = errors.Errorf("Can not get table %d", v.Table.TableInfo.ID) return nil } - var defaultLoadDataBatchCnt uint64 = 20000 // TODO this will be changed to variable in another pr - worker, err := NewLoadDataWorker(b.ctx, v, tbl, defaultLoadDataBatchCnt) + worker, err := NewLoadDataWorker(b.ctx, v, tbl) if err != nil { b.err = err return nil diff --git a/executor/load_data.go b/executor/load_data.go index c6c9a3eb2e96f..bdad9b81f1cf5 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -44,6 +44,7 @@ import ( "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) const ( @@ -77,12 +78,12 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error { if e.loadDataWorker.Path == "" { return errors.New("Load Data: infile path is empty") } - if !e.loadDataWorker.Table.Meta().IsBaseTable() { + if !e.loadDataWorker.table.Meta().IsBaseTable() { return errors.New("can only load data into base tables") } // CSV-like - if e.loadDataWorker.Format == "" { + if e.loadDataWorker.format == "" { if e.loadDataWorker.NullInfo != nil && e.loadDataWorker.NullInfo.OptEnclosed && (e.loadDataWorker.FieldsInfo == nil || e.loadDataWorker.FieldsInfo.Enclosed == nil) { return errors.New("must specify FIELDS [OPTIONALLY] ENCLOSED BY when use NULL DEFINED BY OPTIONALLY ENCLOSED") @@ -181,27 +182,25 @@ type loadRemoteInfo struct { type LoadDataWorker struct { *InsertValues - row []types.Datum - Path string - Format string - Table table.Table + Path string + Ctx sessionctx.Context + // expose some fields for test FieldsInfo *ast.FieldsClause LinesInfo *ast.LinesClause NullInfo *ast.NullDefinedBy IgnoreLines uint64 - Ctx sessionctx.Context - rows [][]types.Datum - ColumnAssignments []*ast.Assignment - ColumnsAndUserVars []*ast.ColumnNameOrUserVar - FieldMappings []*FieldMapping + format string + columnAssignments []*ast.Assignment + columnsAndUserVars []*ast.ColumnNameOrUserVar + fieldMappings []*FieldMapping + onDuplicate ast.OnDuplicateKeyHandlingType + table table.Table + row []types.Datum + rows [][]types.Datum commitTaskQueue chan commitTask - StopCh chan struct{} - QuitCh chan struct{} - OnDuplicate ast.OnDuplicateKeyHandlingType - - loadRemoteInfo loadRemoteInfo + loadRemoteInfo loadRemoteInfo } // NewLoadDataWorker creates a new LoadDataWorker that is ready to work. @@ -209,7 +208,6 @@ func NewLoadDataWorker( sctx sessionctx.Context, plan *plannercore.LoadData, tbl table.Table, - batchSize uint64, ) (*LoadDataWorker, error) { insertVal := &InsertValues{ baseExecutor: newBaseExecutor(sctx, nil, plan.ID()), @@ -222,17 +220,18 @@ func NewLoadDataWorker( } loadDataWorker := &LoadDataWorker{ row: make([]types.Datum, 0, len(insertVal.insertColumns)), + commitTaskQueue: make(chan commitTask, taskQueueSize), InsertValues: insertVal, Path: plan.Path, - Format: plan.Format, - Table: tbl, + format: plan.Format, + table: tbl, FieldsInfo: plan.FieldsInfo, LinesInfo: plan.LinesInfo, NullInfo: plan.NullInfo, IgnoreLines: plan.IgnoreLines, - ColumnAssignments: plan.ColumnAssignments, - ColumnsAndUserVars: plan.ColumnsAndUserVars, - OnDuplicate: plan.OnDuplicate, + columnAssignments: plan.ColumnAssignments, + columnsAndUserVars: plan.ColumnsAndUserVars, + onDuplicate: plan.OnDuplicate, Ctx: sctx, } columnNames := loadDataWorker.initFieldMappings() @@ -240,7 +239,6 @@ func NewLoadDataWorker( if err != nil { return nil, err } - loadDataWorker.initQueues() loadDataWorker.ResetBatch() return loadDataWorker, nil } @@ -255,15 +253,15 @@ type FieldMapping struct { // the slice's order is the same as the order of the input fields. // Returns a slice of same ordered column names without user defined variable names. func (e *LoadDataWorker) initFieldMappings() []string { - columns := make([]string, 0, len(e.ColumnsAndUserVars)+len(e.ColumnAssignments)) - tableCols := e.Table.Cols() + columns := make([]string, 0, len(e.columnsAndUserVars)+len(e.columnAssignments)) + tableCols := e.table.Cols() - if len(e.ColumnsAndUserVars) == 0 { + if len(e.columnsAndUserVars) == 0 { for _, v := range tableCols { fieldMapping := &FieldMapping{ Column: v, } - e.FieldMappings = append(e.FieldMappings, fieldMapping) + e.fieldMappings = append(e.fieldMappings, fieldMapping) columns = append(columns, v.Name.O) } @@ -272,7 +270,7 @@ func (e *LoadDataWorker) initFieldMappings() []string { var column *table.Column - for _, v := range e.ColumnsAndUserVars { + for _, v := range e.columnsAndUserVars { if v.ColumnName != nil { column = table.FindCol(tableCols, v.ColumnName.Name.O) columns = append(columns, v.ColumnName.Name.O) @@ -284,7 +282,7 @@ func (e *LoadDataWorker) initFieldMappings() []string { Column: column, UserVar: v.UserVar, } - e.FieldMappings = append(e.FieldMappings, fieldMapping) + e.fieldMappings = append(e.fieldMappings, fieldMapping) } return columns @@ -295,15 +293,15 @@ func (e *LoadDataWorker) initLoadColumns(columnNames []string) error { var cols []*table.Column var missingColName string var err error - tableCols := e.Table.Cols() + tableCols := e.table.Cols() if len(columnNames) != len(tableCols) { - for _, v := range e.ColumnAssignments { + for _, v := range e.columnAssignments { columnNames = append(columnNames, v.Column.Name.O) } } - cols, missingColName = table.FindCols(tableCols, columnNames, e.Table.Meta().PKIsHandle) + cols, missingColName = table.FindCols(tableCols, columnNames, e.table.Meta().PKIsHandle) if missingColName != "" { return dbterror.ErrBadField.GenWithStackByArgs(missingColName, "field list") } @@ -369,53 +367,17 @@ func (e *LoadDataWorker) reorderColumns(columnNames []string) error { // Load reads from readerFn and do load data job. func (e *LoadDataWorker) Load(ctx context.Context, reader io.ReadSeekCloser) error { - e.startStopWatcher() - // let stop watcher goroutine quit - defer e.forceQuit() - err := sessiontxn.NewTxn(ctx, e.Ctx) - if err != nil { - return err - } - // processStream process input data, enqueue commit task - wg := new(sync.WaitGroup) - wg.Add(1) - go processStream(ctx, reader, e, wg) - err = e.commitWork(ctx) - wg.Wait() - return err -} - -// processStream process input stream from network -func processStream(ctx context.Context, reader io.ReadSeekCloser, loadDataInfo *LoadDataWorker, wg *sync.WaitGroup) { var ( parser mydump.Parser err error ) - defer func() { - r := recover() - if r != nil { - logutil.Logger(ctx).Error("process routine panicked", - zap.Reflect("r", r), - zap.Stack("stack")) - } - if err != nil { - logutil.Logger(ctx).Error("process routine meet error", - zap.Error(err)) - } - if err != nil || r != nil { - loadDataInfo.forceQuit() - } else { - loadDataInfo.closeTaskQueue() - } - wg.Done() - }() - switch strings.ToLower(loadDataInfo.Format) { + switch strings.ToLower(e.format) { case "": // CSV-like parser, err = mydump.NewCSVParser( ctx, - loadDataInfo.GenerateCSVConfig(), + e.GenerateCSVConfig(), reader, int64(config.ReadBlockSize), nil, @@ -425,104 +387,130 @@ func processStream(ctx context.Context, reader io.ReadSeekCloser, loadDataInfo * case LoadDataFormatSQLDump: parser = mydump.NewChunkParser( ctx, - loadDataInfo.Ctx.GetSessionVars().SQLMode, + e.Ctx.GetSessionVars().SQLMode, reader, int64(config.ReadBlockSize), nil, ) case LoadDataFormatParquet: - if loadDataInfo.loadRemoteInfo.store == nil { - err = errors.New("parquet format requires remote storage") - return + if e.loadRemoteInfo.store == nil { + return errors.New("parquet format requires remote storage") } parser, err = mydump.NewParquetParser( ctx, - loadDataInfo.loadRemoteInfo.store, + e.loadRemoteInfo.store, reader, - loadDataInfo.loadRemoteInfo.path, + e.loadRemoteInfo.path, ) default: - err = errors.Errorf("unsupported format: %s", loadDataInfo.Format) + err = errors.Errorf("unsupported format: %s", e.format) } if err != nil { - return + return err } - parser.SetLogger(log.Logger{Logger: logutil.Logger(ctx)}) + err = sessiontxn.NewTxn(ctx, e.Ctx) + if err != nil { + return err + } + group, groupCtx := errgroup.WithContext(ctx) + + // processStream process input data, enqueue commit task + group.Go(func() error { + return e.processStream(groupCtx, parser) + }) + group.Go(func() error { + return e.commitWork(groupCtx) + }) + + err = group.Wait() + e.SetMessage() + return err +} + +// processStream process input stream from network +func (e *LoadDataWorker) processStream( + ctx context.Context, + parser mydump.Parser, +) (err error) { + defer func() { + r := recover() + if r != nil { + logutil.Logger(ctx).Error("process routine panicked", + zap.Reflect("r", r), + zap.Stack("stack")) + err = errors.Errorf("%v", r) + } + }() + + checkKilled := time.NewTicker(30 * time.Second) + defer checkKilled.Stop() + for { // prepare batch and enqueue task - err = loadDataInfo.ReadRows(ctx, parser) - if err != nil { - logutil.Logger(ctx).Error("load data process stream error in ReadRows", zap.Error(err)) + if err = e.ReadRows(ctx, parser); err != nil { return } - if loadDataInfo.curBatchCnt == 0 { + if e.curBatchCnt == 0 { + close(e.commitTaskQueue) return } - if err = loadDataInfo.enqOneTask(ctx); err != nil { - logutil.Logger(ctx).Error("load data process stream error in enqOneTask", zap.Error(err)) - return + + select { + case <-ctx.Done(): + return ctx.Err() + case <-checkKilled.C: + if atomic.CompareAndSwapUint32(&e.Ctx.GetSessionVars().Killed, 1, 0) { + logutil.Logger(ctx).Info("load data query interrupted quit data processing") + return ErrQueryInterrupted + } + case e.commitTaskQueue <- commitTask{e.curBatchCnt, e.rows}: } + // reset rows buffer, will reallocate buffer but NOT reuse + e.ResetBatch() } } -// GetRows getter for rows -func (e *LoadDataWorker) GetRows() [][]types.Datum { - return e.rows -} - -// GetCurBatchCnt getter for curBatchCnt -func (e *LoadDataWorker) GetCurBatchCnt() uint64 { - return e.curBatchCnt -} - -// closeTaskQueue preparing routine to inform commit routine no more data -func (e *LoadDataWorker) closeTaskQueue() { - close(e.commitTaskQueue) -} - -// initQueues initialize task queue and error report queue -// TODO: this is needed? -func (e *LoadDataWorker) initQueues() { - e.commitTaskQueue = make(chan commitTask, taskQueueSize) - e.StopCh = make(chan struct{}, 2) - e.QuitCh = make(chan struct{}) +// ResetBatch reset the inner batch. +func (e *LoadDataWorker) ResetBatch() { + e.rows = make([][]types.Datum, 0, e.maxRowsInBatch) + e.curBatchCnt = 0 } -// startStopWatcher monitor StopCh to force quit -func (e *LoadDataWorker) startStopWatcher() { - go func() { - <-e.StopCh - close(e.QuitCh) +// commitWork commit batch sequentially +func (e *LoadDataWorker) commitWork(ctx context.Context) (err error) { + defer func() { + r := recover() + if r != nil { + logutil.Logger(ctx).Error("commitWork panicked", + zap.Reflect("r", r), + zap.Stack("stack")) + err = errors.Errorf("%v", r) + } }() -} - -// forceQuit let commit quit directly -func (e *LoadDataWorker) forceQuit() { - e.StopCh <- struct{}{} -} - -// makeCommitTask produce commit task with data in LoadDataWorker.rows LoadDataWorker.curBatchCnt -func (e *LoadDataWorker) makeCommitTask() commitTask { - return commitTask{e.curBatchCnt, e.rows} -} -// enqOneTask feed one batch commit task to commit work -func (e *LoadDataWorker) enqOneTask(ctx context.Context) error { - var err error - if e.curBatchCnt > 0 { + var tasks uint64 + for { select { - case e.commitTaskQueue <- e.makeCommitTask(): - case <-e.QuitCh: - err = errors.New("enqOneTask forced to quit") - logutil.Logger(ctx).Error("enqOneTask forced to quit, possible commitWork error") - return err + case <-ctx.Done(): + return ctx.Err() + case task, ok := <-e.commitTaskQueue: + if !ok { + return nil + } + start := time.Now() + if err = e.commitOneTask(ctx, task); err != nil { + return err + } + tasks++ + logutil.Logger(ctx).Info("commit one task success", + zap.Duration("commit time usage", time.Since(start)), + zap.Uint64("keys processed", task.cnt), + zap.Uint64("tasks processed", tasks), + zap.Int("tasks in queue", len(e.commitTaskQueue))) } - // reset rows buffer, will reallocate buffer but NOT reuse - e.ResetBatch() } - return err } // commitOneTask insert Data from LoadDataWorker.rows, then make commit and refresh txn @@ -553,65 +541,44 @@ func (e *LoadDataWorker) commitOneTask(ctx context.Context, task commitTask) err return err } -// commitWork commit batch sequentially -func (e *LoadDataWorker) commitWork(ctx context.Context) error { +// CheckAndInsertOneBatch is used to commit one transaction batch full filled data +func (e *LoadDataWorker) CheckAndInsertOneBatch(ctx context.Context, rows [][]types.Datum, cnt uint64) error { + if e.stats != nil && e.stats.BasicRuntimeStats != nil { + // Since this method will not call by executor Next, + // so we need record the basic executor runtime stats by ourself. + start := time.Now() + defer func() { + e.stats.BasicRuntimeStats.Record(time.Since(start), 0) + }() + } var err error - defer func() { - r := recover() - if r != nil { - logutil.Logger(ctx).Error("commitWork panicked", - zap.Reflect("r", r), - zap.Stack("stack")) - } - if err != nil || r != nil { - e.forceQuit() - } - if err != nil { - e.ctx.StmtRollback(ctx, false) - } - }() - var tasks uint64 - var end = false - for !end { - select { - case <-e.QuitCh: - err = errors.New("commit forced to quit") - logutil.Logger(ctx).Error("commit forced to quit, possible preparation failed") - return err - case commitTask, ok := <-e.commitTaskQueue: - if ok { - start := time.Now() - err = e.commitOneTask(ctx, commitTask) - if err != nil { - break - } - tasks++ - logutil.Logger(ctx).Info("commit one task success", - zap.Duration("commit time usage", time.Since(start)), - zap.Uint64("keys processed", commitTask.cnt), - zap.Uint64("tasks processed", tasks), - zap.Int("tasks in queue", len(e.commitTaskQueue))) - } else { - end = true - } - } - if err != nil { - logutil.Logger(ctx).Error("load data commit work error", zap.Error(err)) - break - } - if atomic.CompareAndSwapUint32(&e.Ctx.GetSessionVars().Killed, 1, 0) { - logutil.Logger(ctx).Info("load data query interrupted quit data processing") - err = ErrQueryInterrupted - break - } + if cnt == 0 { + return err + } + e.ctx.GetSessionVars().StmtCtx.AddRecordRows(cnt) + + replace := false + if e.onDuplicate == ast.OnDuplicateKeyHandlingReplace { + replace = true + } + + err = e.batchCheckAndInsert(ctx, rows[0:cnt], e.addRecordLD, replace) + if err != nil { + return err } return err } -// ResetBatch reset the inner batch. -func (e *LoadDataWorker) ResetBatch() { - e.rows = make([][]types.Datum, 0, e.maxRowsInBatch) - e.curBatchCnt = 0 +func (e *LoadDataWorker) addRecordLD(ctx context.Context, row []types.Datum) error { + if row == nil { + return nil + } + err := e.addRecord(ctx, row) + if err != nil { + e.handleWarning(err) + return err + } + return nil } // ReadRows reads rows from parser. When parser's reader meet EOF, it will return @@ -659,46 +626,6 @@ func (e *LoadDataWorker) ReadRows(ctx context.Context, parser mydump.Parser) err } } -// CheckAndInsertOneBatch is used to commit one transaction batch full filled data -func (e *LoadDataWorker) CheckAndInsertOneBatch(ctx context.Context, rows [][]types.Datum, cnt uint64) error { - if e.stats != nil && e.stats.BasicRuntimeStats != nil { - // Since this method will not call by executor Next, - // so we need record the basic executor runtime stats by ourself. - start := time.Now() - defer func() { - e.stats.BasicRuntimeStats.Record(time.Since(start), 0) - }() - } - var err error - if cnt == 0 { - return err - } - e.ctx.GetSessionVars().StmtCtx.AddRecordRows(cnt) - - replace := false - if e.OnDuplicate == ast.OnDuplicateKeyHandlingReplace { - replace = true - } - - err = e.batchCheckAndInsert(ctx, rows[0:cnt], e.addRecordLD, replace) - if err != nil { - return err - } - return err -} - -// SetMessage sets info message(ERR_LOAD_INFO) generated by LOAD statement, it is public because of the special way that -// LOAD statement is handled. -func (e *LoadDataWorker) SetMessage() { - stmtCtx := e.ctx.GetSessionVars().StmtCtx - numRecords := stmtCtx.RecordRows() - numDeletes := stmtCtx.DeletedRows() - numSkipped := numRecords - stmtCtx.CopiedRows() - numWarnings := stmtCtx.WarningCount() - msg := fmt.Sprintf(mysql.MySQLErrName[mysql.ErrLoadInfo].Raw, numRecords, numDeletes, numSkipped, numWarnings) - e.ctx.GetSessionVars().StmtCtx.SetMessage(msg) -} - // colsToRow encodes the data of parser output. func (e *LoadDataWorker) colsToRow(ctx context.Context, cols []types.Datum) []types.Datum { row := make([]types.Datum, 0, len(e.insertColumns)) @@ -714,16 +641,16 @@ func (e *LoadDataWorker) colsToRow(ctx context.Context, cols []types.Datum) []ty } } - for i := 0; i < len(e.FieldMappings); i++ { + for i := 0; i < len(e.fieldMappings); i++ { if i >= len(cols) { - if e.FieldMappings[i].Column == nil { - setVar(e.FieldMappings[i].UserVar.Name, nil) + if e.fieldMappings[i].Column == nil { + setVar(e.fieldMappings[i].UserVar.Name, nil) continue } // If some columns is missing and their type is time and has not null flag, they should be set as current time. - if types.IsTypeTime(e.FieldMappings[i].Column.GetType()) && mysql.HasNotNullFlag(e.FieldMappings[i].Column.GetFlag()) { - row = append(row, types.NewTimeDatum(types.CurrentTime(e.FieldMappings[i].Column.GetType()))) + if types.IsTypeTime(e.fieldMappings[i].Column.GetType()) && mysql.HasNotNullFlag(e.fieldMappings[i].Column.GetFlag()) { + row = append(row, types.NewTimeDatum(types.CurrentTime(e.fieldMappings[i].Column.GetType()))) continue } @@ -731,8 +658,8 @@ func (e *LoadDataWorker) colsToRow(ctx context.Context, cols []types.Datum) []ty continue } - if e.FieldMappings[i].Column == nil { - setVar(e.FieldMappings[i].UserVar.Name, &cols[i]) + if e.fieldMappings[i].Column == nil { + setVar(e.fieldMappings[i].UserVar.Name, &cols[i]) continue } @@ -743,9 +670,9 @@ func (e *LoadDataWorker) colsToRow(ctx context.Context, cols []types.Datum) []ty row = append(row, cols[i]) } - for i := 0; i < len(e.ColumnAssignments); i++ { + for i := 0; i < len(e.columnAssignments); i++ { // eval expression of `SET` clause - d, err := expression.EvalAstExpr(e.Ctx, e.ColumnAssignments[i].Expr) + d, err := expression.EvalAstExpr(e.Ctx, e.columnAssignments[i].Expr) if err != nil { e.handleWarning(err) return nil @@ -763,16 +690,16 @@ func (e *LoadDataWorker) colsToRow(ctx context.Context, cols []types.Datum) []ty return newRow } -func (e *LoadDataWorker) addRecordLD(ctx context.Context, row []types.Datum) error { - if row == nil { - return nil - } - err := e.addRecord(ctx, row) - if err != nil { - e.handleWarning(err) - return err - } - return nil +// SetMessage sets info message(ERR_LOAD_INFO) generated by LOAD statement, it is public because of the special way that +// LOAD statement is handled. +func (e *LoadDataWorker) SetMessage() { + stmtCtx := e.ctx.GetSessionVars().StmtCtx + numRecords := stmtCtx.RecordRows() + numDeletes := stmtCtx.DeletedRows() + numSkipped := numRecords - stmtCtx.CopiedRows() + numWarnings := stmtCtx.WarningCount() + msg := fmt.Sprintf(mysql.MySQLErrName[mysql.ErrLoadInfo].Raw, numRecords, numDeletes, numSkipped, numWarnings) + e.ctx.GetSessionVars().StmtCtx.SetMessage(msg) } // GenerateCSVConfig generates a CSV config for parser from LoadDataWorker. @@ -818,6 +745,16 @@ func (e *LoadDataWorker) GenerateCSVConfig() *config.CSVConfig { } } +// GetRows getter for rows +func (e *LoadDataWorker) GetRows() [][]types.Datum { + return e.rows +} + +// GetCurBatchCnt getter for curBatchCnt +func (e *LoadDataWorker) GetCurBatchCnt() uint64 { + return e.curBatchCnt +} + var _ io.ReadSeekCloser = (*SimpleSeekerOnReadCloser)(nil) // SimpleSeekerOnReadCloser provides Seek(0, SeekCurrent) on ReadCloser. diff --git a/executor/loadremotetest/BUILD.bazel b/executor/loadremotetest/BUILD.bazel index 57ddea3ad19ea..aa5279fac652c 100644 --- a/executor/loadremotetest/BUILD.bazel +++ b/executor/loadremotetest/BUILD.bazel @@ -4,6 +4,7 @@ go_test( name = "loadremotetest_test", timeout = "short", srcs = [ + "error_test.go", "main_test.go", "one_csv_test.go", "one_parquet_test.go", @@ -15,8 +16,11 @@ go_test( deps = [ "//executor", "//kv", + "//parser/terror", "//testkit", "@com_github_fsouza_fake_gcs_server//fakestorage", + "@com_github_pingcap_errors//:errors", + "@com_github_stretchr_testify//require", "@com_github_stretchr_testify//suite", "@org_uber_go_goleak//:goleak", ], diff --git a/server/conn.go b/server/conn.go index 058921e1d37ee..9188ebe4a70e6 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1697,7 +1697,6 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataWorker *execut } } } - loadDataWorker.SetMessage() return err } From 903fd6087f4474217f0cac69d2c87cf2a21639dc Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 22 Feb 2023 18:13:08 +0800 Subject: [PATCH 03/19] add error message Signed-off-by: lance6716 --- br/pkg/storage/s3.go | 4 +-- errno/errcode.go | 1 + errno/errname.go | 1 + executor/errors.go | 2 ++ executor/load_data.go | 43 +++++++++++++++++++-------- executor/loadremotetest/error_test.go | 9 +++++- server/conn.go | 2 +- 7 files changed, 46 insertions(+), 16 deletions(-) diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index aa052ebccc9a5..0ab259cc3a57f 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -157,10 +157,10 @@ func (options *S3BackendOptions) Apply(s3 *backuppb.S3) error { return errors.Trace(err) } if u.Scheme == "" { - return errors.Annotate(berrors.ErrStorageInvalidConfig, "scheme not found in endpoint") + return errors.Errorf("scheme not found in endpoint") } if u.Host == "" { - return errors.Annotate(berrors.ErrStorageInvalidConfig, "host not found in endpoint") + return errors.Errorf("host not found in endpoint") } } // In some cases, we need to set ForcePathStyle to false. diff --git a/errno/errcode.go b/errno/errcode.go index 0501bc0020e07..3cfcc9abfbf11 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1045,6 +1045,7 @@ const ( ErrTempTableNotAllowedWithTTL = 8151 ErrUnsupportedTTLReferencedByFK = 8152 ErrUnsupportedPrimaryKeyTypeWithTTL = 8153 + ErrLoadDataGeneral = 8154 // Error codes used by TiDB ddl package ErrUnsupportedDDLOperation = 8200 diff --git a/errno/errname.go b/errno/errname.go index 1d5a5242ca227..82687be40867e 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1040,6 +1040,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrTempTableNotAllowedWithTTL: mysql.Message("Set TTL for temporary table is not allowed", nil), ErrUnsupportedTTLReferencedByFK: mysql.Message("Set TTL for a table referenced by foreign key is not allowed", nil), ErrUnsupportedPrimaryKeyTypeWithTTL: mysql.Message("Unsupported clustered primary key type FLOAT/DOUBLE for TTL", nil), + ErrLoadDataGeneral: mysql.Message("Load data raise error(s): %s", nil), ErrWarnOptimizerHintInvalidInteger: mysql.Message("integer value is out of range in '%s'", nil), ErrWarnOptimizerHintUnsupportedHint: mysql.Message("Optimizer hint %s is not supported by TiDB and is ignored", nil), diff --git a/executor/errors.go b/executor/errors.go index 565a712d1c7d9..4d174e1cce56e 100644 --- a/executor/errors.go +++ b/executor/errors.go @@ -76,4 +76,6 @@ var ( errUnsupportedFlashbackTmpTable = dbterror.ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message("Recover/flashback table is not supported on temporary tables", nil)) errTruncateWrongInsertValue = dbterror.ClassTable.NewStdErr(mysql.ErrTruncatedWrongValue, parser_mysql.Message("Incorrect %-.32s value: '%-.128s' for column '%.192s' at row %d", nil)) ErrExistsInHistoryPassword = dbterror.ClassExecutor.NewStd(mysql.ErrExistsInHistoryPassword) + + ErrLoadDataGeneral = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataGeneral) ) diff --git a/executor/load_data.go b/executor/load_data.go index bdad9b81f1cf5..43f0abfa8f701 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -76,21 +76,21 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error { req.GrowAndReset(e.maxChunkSize) if e.loadDataWorker.Path == "" { - return errors.New("Load Data: infile path is empty") + return ErrLoadDataGeneral.GenWithStackByArgs("INFILE path is empty") } if !e.loadDataWorker.table.Meta().IsBaseTable() { - return errors.New("can only load data into base tables") + return ErrLoadDataGeneral.GenWithStackByArgs("can only load data into base tables") } // CSV-like if e.loadDataWorker.format == "" { if e.loadDataWorker.NullInfo != nil && e.loadDataWorker.NullInfo.OptEnclosed && (e.loadDataWorker.FieldsInfo == nil || e.loadDataWorker.FieldsInfo.Enclosed == nil) { - return errors.New("must specify FIELDS [OPTIONALLY] ENCLOSED BY when use NULL DEFINED BY OPTIONALLY ENCLOSED") + return ErrLoadDataGeneral.GenWithStackByArgs("must specify FIELDS [OPTIONALLY] ENCLOSED BY when use NULL DEFINED BY OPTIONALLY ENCLOSED") } // TODO: support lines terminated is "". if len(e.loadDataWorker.LinesInfo.Terminated) == 0 { - return errors.New("Load Data: don't support load data terminated is nil") + return ErrLoadDataGeneral.GenWithStackByArgs("don't support load data terminated is nil") } } @@ -98,16 +98,16 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error { case ast.FileLocServerOrRemote: u, err := storage.ParseRawURL(e.loadDataWorker.Path) if err != nil { - return err + return ErrLoadDataGeneral.GenWithStackByArgs(err.Error()) } var filename string u.Path, filename = filepath.Split(u.Path) b, err := storage.ParseBackendFromURL(u, nil) if err != nil { - return err + return ErrLoadDataGeneral.GenWithStackByArgs(getMsgFromBRError(err)) } if b.GetLocal() != nil { - return errors.Errorf("Load Data: don't support load data from tidb-server's disk") + return ErrLoadDataGeneral.GenWithStackByArgs("don't support load data from tidb-server's disk") } return e.loadFromRemote(ctx, b, filename) case ast.FileLocClient: @@ -116,13 +116,29 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error { val := sctx.Value(LoadDataVarKey) if val != nil { sctx.SetValue(LoadDataVarKey, nil) - return errors.New("Load Data: previous load data option wasn't closed normally") + return ErrLoadDataGeneral.GenWithStackByArgs("previous load data option wasn't closed normally") } sctx.SetValue(LoadDataVarKey, e.loadDataWorker) } return nil } +// TODO: add GetMsg() to errors package to replace this function. +func getMsgFromBRError(err error) string { + if err == nil { + return "" + } + if berr, ok := err.(*errors.Error); ok { + return berr.GetMsg() + } + raw := err.Error() + berrMsg := errors.Cause(err).Error() + if len(raw) <= len(berrMsg)+len(": ") { + return raw + } + return raw[:len(raw)-len(berrMsg)-len(": ")] +} + func (e *LoadDataExec) loadFromRemote( ctx context.Context, b *backup.StorageBackend, @@ -134,11 +150,11 @@ func (e *LoadDataExec) loadFromRemote( } s, err := storage.New(ctx, b, opt) if err != nil { - return err + return ErrLoadDataGeneral.GenWithStackByArgs(getMsgFromBRError(err)) } fileReader, err := s.Open(ctx, filename) if err != nil { - return err + return ErrLoadDataGeneral.GenWithStackByArgs(getMsgFromBRError(err)) } defer fileReader.Close() @@ -406,7 +422,7 @@ func (e *LoadDataWorker) Load(ctx context.Context, reader io.ReadSeekCloser) err err = errors.Errorf("unsupported format: %s", e.format) } if err != nil { - return err + return ErrLoadDataGeneral.GenWithStackByArgs(err.Error()) } parser.SetLogger(log.Logger{Logger: logutil.Logger(ctx)}) @@ -426,7 +442,10 @@ func (e *LoadDataWorker) Load(ctx context.Context, reader io.ReadSeekCloser) err err = group.Wait() e.SetMessage() - return err + if err != nil { + return ErrLoadDataGeneral.GenWithStackByArgs(err.Error()) + } + return nil } // processStream process input stream from network diff --git a/executor/loadremotetest/error_test.go b/executor/loadremotetest/error_test.go index 5fd3dc64f479f..c1be05241dc79 100644 --- a/executor/loadremotetest/error_test.go +++ b/executor/loadremotetest/error_test.go @@ -15,6 +15,7 @@ package loadremotetest import ( + "fmt" "testing" "github.com/pingcap/errors" @@ -49,5 +50,11 @@ func (s *mockGCSSuite) TestErrorMessage() { checkClientErrorMessage(s.T(), err, "ERROR 1110 (42000): Column 'i' specified twice") err = s.tk.ExecToErr("LOAD DATA INFILE 'gs://1' INTO TABLE t (@v) SET wrong=@v") checkClientErrorMessage(s.T(), err, "ERROR 1054 (42S22): Unknown column 'wrong' in 'field list'") - + err = s.tk.ExecToErr("LOAD DATA INFILE 'abc://1' INTO TABLE t;") + checkClientErrorMessage(s.T(), err, "ERROR 8154 (HY000): Load data raise error(s): storage abc not support yet") + err = s.tk.ExecToErr("LOAD DATA INFILE 's3://no-network' INTO TABLE t;") + checkClientErrorMessage(s.T(), err, "ERROR 8154 (HY000): Load data raise error(s): failed to get region of bucket no-network") + err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://wrong-bucket/p?endpoint=%s' + INTO TABLE t;`, gcsEndpoint)) + checkClientErrorMessage(s.T(), err, "ERROR 8154 (HY000): Load data raise error(s): failed to read gcs file, file info: input.bucket='wrong-bucket', input.key='p'") } diff --git a/server/conn.go b/server/conn.go index 9188ebe4a70e6..5b1c5aba7c30a 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1671,7 +1671,7 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataWorker *execut err = loadDataWorker.Load(ctx, executor.NewSimpleSeekerOnReadCloser(r)) _ = r.Close() - wg.Done() + wg.Wait() if err != nil { if !drained { From cd78ebd3c2efff99c9bb8141330f3406ea8a54bd Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 22 Feb 2023 18:19:44 +0800 Subject: [PATCH 04/19] fix errdoc Signed-off-by: lance6716 --- errors.toml | 5 +++++ executor/loadremotetest/error_test.go | 2 ++ 2 files changed, 7 insertions(+) diff --git a/errors.toml b/errors.toml index 4bf81f9ef7866..3b6c11b2f15b3 100644 --- a/errors.toml +++ b/errors.toml @@ -1646,6 +1646,11 @@ error = ''' transaction aborted because lazy uniqueness check is enabled and an error occurred: %s ''' +["executor:8154"] +error = ''' +Load data raise error(s): %s +''' + ["executor:8212"] error = ''' Failed to split region ranges: %s diff --git a/executor/loadremotetest/error_test.go b/executor/loadremotetest/error_test.go index c1be05241dc79..5df8a3faf4c2d 100644 --- a/executor/loadremotetest/error_test.go +++ b/executor/loadremotetest/error_test.go @@ -57,4 +57,6 @@ func (s *mockGCSSuite) TestErrorMessage() { err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://wrong-bucket/p?endpoint=%s' INTO TABLE t;`, gcsEndpoint)) checkClientErrorMessage(s.T(), err, "ERROR 8154 (HY000): Load data raise error(s): failed to read gcs file, file info: input.bucket='wrong-bucket', input.key='p'") + + // TODO: test error during execution, like insert NULL to NOT NULL column, duplicate key, etc. } From 72eccb0a6a2025abeccc495b2e67e399f94b5875 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 22 Feb 2023 20:44:31 +0800 Subject: [PATCH 05/19] fix UT Signed-off-by: lance6716 --- errno/errname.go | 2 +- errors.toml | 2 +- executor/loadremotetest/error_test.go | 33 +++++++++++++++++++++++---- server/server_test.go | 4 ++-- 4 files changed, 32 insertions(+), 9 deletions(-) diff --git a/errno/errname.go b/errno/errname.go index 82687be40867e..f249764b217ed 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1040,7 +1040,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrTempTableNotAllowedWithTTL: mysql.Message("Set TTL for temporary table is not allowed", nil), ErrUnsupportedTTLReferencedByFK: mysql.Message("Set TTL for a table referenced by foreign key is not allowed", nil), ErrUnsupportedPrimaryKeyTypeWithTTL: mysql.Message("Unsupported clustered primary key type FLOAT/DOUBLE for TTL", nil), - ErrLoadDataGeneral: mysql.Message("Load data raise error(s): %s", nil), + ErrLoadDataGeneral: mysql.Message("LOAD DATA raises error(s): %s", nil), ErrWarnOptimizerHintInvalidInteger: mysql.Message("integer value is out of range in '%s'", nil), ErrWarnOptimizerHintUnsupportedHint: mysql.Message("Optimizer hint %s is not supported by TiDB and is ignored", nil), diff --git a/errors.toml b/errors.toml index 3b6c11b2f15b3..f0b9a9b5e6d42 100644 --- a/errors.toml +++ b/errors.toml @@ -1648,7 +1648,7 @@ transaction aborted because lazy uniqueness check is enabled and an error occurr ["executor:8154"] error = ''' -Load data raise error(s): %s +LOAD DATA raises error(s): %s ''' ["executor:8212"] diff --git a/executor/loadremotetest/error_test.go b/executor/loadremotetest/error_test.go index 5df8a3faf4c2d..e5e431f1ffd47 100644 --- a/executor/loadremotetest/error_test.go +++ b/executor/loadremotetest/error_test.go @@ -41,7 +41,7 @@ func (s *mockGCSSuite) TestErrorMessage() { s.tk.MustExec("CREATE DATABASE load_csv;") s.tk.MustExec("USE load_csv;") - s.tk.MustExec("CREATE TABLE t (i INT, s varchar(32));") + s.tk.MustExec("CREATE TABLE t (i INT PRIMARY KEY, s varchar(32));") err = s.tk.ExecToErr("LOAD DATA INFILE 'gs://1' INTO TABLE t (wrong)") checkClientErrorMessage(s.T(), err, "ERROR 1054 (42S22): Unknown column 'wrong' in 'field list'") @@ -51,12 +51,35 @@ func (s *mockGCSSuite) TestErrorMessage() { err = s.tk.ExecToErr("LOAD DATA INFILE 'gs://1' INTO TABLE t (@v) SET wrong=@v") checkClientErrorMessage(s.T(), err, "ERROR 1054 (42S22): Unknown column 'wrong' in 'field list'") err = s.tk.ExecToErr("LOAD DATA INFILE 'abc://1' INTO TABLE t;") - checkClientErrorMessage(s.T(), err, "ERROR 8154 (HY000): Load data raise error(s): storage abc not support yet") + checkClientErrorMessage(s.T(), err, "ERROR 8154 (HY000): LOAD DATA raises error(s): storage abc not support yet") err = s.tk.ExecToErr("LOAD DATA INFILE 's3://no-network' INTO TABLE t;") - checkClientErrorMessage(s.T(), err, "ERROR 8154 (HY000): Load data raise error(s): failed to get region of bucket no-network") + checkClientErrorMessage(s.T(), err, "ERROR 8154 (HY000): LOAD DATA raises error(s): failed to get region of bucket no-network") err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://wrong-bucket/p?endpoint=%s' INTO TABLE t;`, gcsEndpoint)) - checkClientErrorMessage(s.T(), err, "ERROR 8154 (HY000): Load data raise error(s): failed to read gcs file, file info: input.bucket='wrong-bucket', input.key='p'") + checkClientErrorMessage(s.T(), err, "ERROR 8154 (HY000): LOAD DATA raises error(s): failed to read gcs file, file info: input.bucket='wrong-bucket', input.key='p'") - // TODO: test error during execution, like insert NULL to NOT NULL column, duplicate key, etc. + // TODO: don't use batchCheckAndInsert, mimic (*InsertExec).exec() + //s.server.CreateObject(fakestorage.Object{ + // ObjectAttrs: fakestorage.ObjectAttrs{ + // BucketName: "test-tsv", + // Name: "t.tsv", + // }, + // Content: []byte("1\t2\n" + + // "1\t4\n"), + //}) + //err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' + // INTO TABLE t;`, gcsEndpoint)) + //checkClientErrorMessage(s.T(), err, "ERROR 1062 (23000): Duplicate entry '1' for key 'PRIMARY'") + + //s.server.CreateObject(fakestorage.Object{ + // ObjectAttrs: fakestorage.ObjectAttrs{ + // BucketName: "test-tsv", + // Name: "t2.tsv", + // }, + // Content: []byte("null\t2\n" + + // "3\t4\n"), + //}) + //err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t2.tsv?endpoint=%s' + // INTO TABLE t NULL DEFINED BY 'null';`, gcsEndpoint)) + //checkClientErrorMessage(s.T(), err, "ERROR 8154 (HY000): LOAD DATA raises error(s): xxx") } diff --git a/server/server_test.go b/server/server_test.go index 6bf871c11abff..af04fe7831666 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1037,10 +1037,10 @@ func (cli *testServerClient) runTestLoadData(t *testing.T, server *Server) { // can't insert into views (in TiDB) or sequences. issue #20880 _, err = dbt.GetDB().Exec(fmt.Sprintf("load data local infile %q into table v1", path)) require.Error(t, err) - require.Equal(t, "Error 1105 (HY000): can only load data into base tables", err.Error()) + require.Equal(t, "Error 8154 (HY000): LOAD DATA raises error(s): can only load data into base tables", err.Error()) _, err = dbt.GetDB().Exec(fmt.Sprintf("load data local infile %q into table s1", path)) require.Error(t, err) - require.Equal(t, "Error 1105 (HY000): can only load data into base tables", err.Error()) + require.Equal(t, "Error 8154 (HY000): LOAD DATA raises error(s): can only load data into base tables", err.Error()) rs, err1 := dbt.GetDB().Exec(fmt.Sprintf("load data local infile %q into table test", path)) require.NoError(t, err1) From af7e91f05053893c43ef26789f745955ffb29a4a Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 22 Feb 2023 21:04:35 +0800 Subject: [PATCH 06/19] fix UT Signed-off-by: lance6716 --- server/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/server_test.go b/server/server_test.go index af04fe7831666..4db3ee56a6c88 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1412,7 +1412,7 @@ func (cli *testServerClient) runTestLoadData(t *testing.T, server *Server) { _, err1 = dbt.GetDB().Exec(fmt.Sprintf(`load data local infile %q into table pn FIELDS TERMINATED BY ','`, path)) mysqlErr, ok := err1.(*mysql.MySQLError) require.True(t, ok) - require.Equal(t, "mock commit one task error", mysqlErr.Message) + require.Equal(t, "LOAD DATA raises error(s): mock commit one task error", mysqlErr.Message) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/commitOneTaskErr")) dbt.MustExec("drop table if exists pn") From e177c81db0cb7574b3b622a1b746a63b4fbdeae8 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 23 Feb 2023 10:09:07 +0800 Subject: [PATCH 07/19] fix wrong check killed Signed-off-by: lance6716 --- executor/load_data.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/executor/load_data.go b/executor/load_data.go index 43f0abfa8f701..df3d20f44a908 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -476,6 +476,7 @@ func (e *LoadDataWorker) processStream( return } + TrySendTask: select { case <-ctx.Done(): return ctx.Err() @@ -484,6 +485,7 @@ func (e *LoadDataWorker) processStream( logutil.Logger(ctx).Info("load data query interrupted quit data processing") return ErrQueryInterrupted } + goto TrySendTask case e.commitTaskQueue <- commitTask{e.curBatchCnt, e.rows}: } // reset rows buffer, will reallocate buffer but NOT reuse From db25d8c8bbd624b4f32ffac2450926cfd81a84e5 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 24 Feb 2023 13:44:39 +0800 Subject: [PATCH 08/19] save work Signed-off-by: lance6716 --- errno/errcode.go | 4 +++- errno/errname.go | 4 +++- executor/errors.go | 4 +++- executor/load_data.go | 24 ++++++++++++------------ executor/loadremotetest/error_test.go | 9 ++++++--- 5 files changed, 27 insertions(+), 18 deletions(-) diff --git a/errno/errcode.go b/errno/errcode.go index 3cfcc9abfbf11..850ae498347e0 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1045,7 +1045,9 @@ const ( ErrTempTableNotAllowedWithTTL = 8151 ErrUnsupportedTTLReferencedByFK = 8152 ErrUnsupportedPrimaryKeyTypeWithTTL = 8153 - ErrLoadDataGeneral = 8154 + ErrLoadDataInvalidURI = 8154 + ErrLoadDataCantAccess = 8155 + ErrLoadDataNoFile = 8156 // Error codes used by TiDB ddl package ErrUnsupportedDDLOperation = 8200 diff --git a/errno/errname.go b/errno/errname.go index f249764b217ed..3e8b8967437c2 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1040,7 +1040,9 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrTempTableNotAllowedWithTTL: mysql.Message("Set TTL for temporary table is not allowed", nil), ErrUnsupportedTTLReferencedByFK: mysql.Message("Set TTL for a table referenced by foreign key is not allowed", nil), ErrUnsupportedPrimaryKeyTypeWithTTL: mysql.Message("Unsupported clustered primary key type FLOAT/DOUBLE for TTL", nil), - ErrLoadDataGeneral: mysql.Message("LOAD DATA raises error(s): %s", nil), + ErrLoadDataInvalidURI: mysql.Message("The URI of INFILE is invalid. Reason: %s. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}'", nil), + ErrLoadDataCantAccess: mysql.Message("Access to the source file has been denied. Please check the URI, access key and secret access key are correct", nil), + ErrLoadDataNoFile: mysql.Message("%s", nil), ErrWarnOptimizerHintInvalidInteger: mysql.Message("integer value is out of range in '%s'", nil), ErrWarnOptimizerHintUnsupportedHint: mysql.Message("Optimizer hint %s is not supported by TiDB and is ignored", nil), diff --git a/executor/errors.go b/executor/errors.go index 4d174e1cce56e..d60da95f6eaca 100644 --- a/executor/errors.go +++ b/executor/errors.go @@ -77,5 +77,7 @@ var ( errTruncateWrongInsertValue = dbterror.ClassTable.NewStdErr(mysql.ErrTruncatedWrongValue, parser_mysql.Message("Incorrect %-.32s value: '%-.128s' for column '%.192s' at row %d", nil)) ErrExistsInHistoryPassword = dbterror.ClassExecutor.NewStd(mysql.ErrExistsInHistoryPassword) - ErrLoadDataGeneral = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataGeneral) + ErrLoadDataURI = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataInvalidURI) + ErrLoadDataCantAccess = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataCantAccess) + ErrLoadDataNoFile = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataNoFile) ) diff --git a/executor/load_data.go b/executor/load_data.go index df3d20f44a908..02a77fad86752 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -76,21 +76,21 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error { req.GrowAndReset(e.maxChunkSize) if e.loadDataWorker.Path == "" { - return ErrLoadDataGeneral.GenWithStackByArgs("INFILE path is empty") + return ErrLoadDataURI.GenWithStackByArgs("INFILE path is empty") } if !e.loadDataWorker.table.Meta().IsBaseTable() { - return ErrLoadDataGeneral.GenWithStackByArgs("can only load data into base tables") + return ErrLoadDataURI.GenWithStackByArgs("can only load data into base tables") } // CSV-like if e.loadDataWorker.format == "" { if e.loadDataWorker.NullInfo != nil && e.loadDataWorker.NullInfo.OptEnclosed && (e.loadDataWorker.FieldsInfo == nil || e.loadDataWorker.FieldsInfo.Enclosed == nil) { - return ErrLoadDataGeneral.GenWithStackByArgs("must specify FIELDS [OPTIONALLY] ENCLOSED BY when use NULL DEFINED BY OPTIONALLY ENCLOSED") + return ErrLoadDataURI.GenWithStackByArgs("must specify FIELDS [OPTIONALLY] ENCLOSED BY when use NULL DEFINED BY OPTIONALLY ENCLOSED") } // TODO: support lines terminated is "". if len(e.loadDataWorker.LinesInfo.Terminated) == 0 { - return ErrLoadDataGeneral.GenWithStackByArgs("don't support load data terminated is nil") + return ErrLoadDataURI.GenWithStackByArgs("don't support load data terminated is nil") } } @@ -98,16 +98,16 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error { case ast.FileLocServerOrRemote: u, err := storage.ParseRawURL(e.loadDataWorker.Path) if err != nil { - return ErrLoadDataGeneral.GenWithStackByArgs(err.Error()) + return ErrLoadDataURI.GenWithStackByArgs(err.Error()) } var filename string u.Path, filename = filepath.Split(u.Path) b, err := storage.ParseBackendFromURL(u, nil) if err != nil { - return ErrLoadDataGeneral.GenWithStackByArgs(getMsgFromBRError(err)) + return ErrLoadDataURI.GenWithStackByArgs(getMsgFromBRError(err)) } if b.GetLocal() != nil { - return ErrLoadDataGeneral.GenWithStackByArgs("don't support load data from tidb-server's disk") + return ErrLoadDataURI.GenWithStackByArgs("don't support load data from tidb-server's disk") } return e.loadFromRemote(ctx, b, filename) case ast.FileLocClient: @@ -116,7 +116,7 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error { val := sctx.Value(LoadDataVarKey) if val != nil { sctx.SetValue(LoadDataVarKey, nil) - return ErrLoadDataGeneral.GenWithStackByArgs("previous load data option wasn't closed normally") + return ErrLoadDataURI.GenWithStackByArgs("previous load data option wasn't closed normally") } sctx.SetValue(LoadDataVarKey, e.loadDataWorker) } @@ -150,11 +150,11 @@ func (e *LoadDataExec) loadFromRemote( } s, err := storage.New(ctx, b, opt) if err != nil { - return ErrLoadDataGeneral.GenWithStackByArgs(getMsgFromBRError(err)) + return ErrLoadDataCantAccess } fileReader, err := s.Open(ctx, filename) if err != nil { - return ErrLoadDataGeneral.GenWithStackByArgs(getMsgFromBRError(err)) + return ErrLoadDataNoFile.GenWithStackByArgs(getMsgFromBRError(err)) } defer fileReader.Close() @@ -422,7 +422,7 @@ func (e *LoadDataWorker) Load(ctx context.Context, reader io.ReadSeekCloser) err err = errors.Errorf("unsupported format: %s", e.format) } if err != nil { - return ErrLoadDataGeneral.GenWithStackByArgs(err.Error()) + return ErrLoadDataURI.GenWithStackByArgs(err.Error()) } parser.SetLogger(log.Logger{Logger: logutil.Logger(ctx)}) @@ -443,7 +443,7 @@ func (e *LoadDataWorker) Load(ctx context.Context, reader io.ReadSeekCloser) err err = group.Wait() e.SetMessage() if err != nil { - return ErrLoadDataGeneral.GenWithStackByArgs(err.Error()) + return ErrLoadDataURI.GenWithStackByArgs(err.Error()) } return nil } diff --git a/executor/loadremotetest/error_test.go b/executor/loadremotetest/error_test.go index e5e431f1ffd47..c224cfc592a55 100644 --- a/executor/loadremotetest/error_test.go +++ b/executor/loadremotetest/error_test.go @@ -51,12 +51,15 @@ func (s *mockGCSSuite) TestErrorMessage() { err = s.tk.ExecToErr("LOAD DATA INFILE 'gs://1' INTO TABLE t (@v) SET wrong=@v") checkClientErrorMessage(s.T(), err, "ERROR 1054 (42S22): Unknown column 'wrong' in 'field list'") err = s.tk.ExecToErr("LOAD DATA INFILE 'abc://1' INTO TABLE t;") - checkClientErrorMessage(s.T(), err, "ERROR 8154 (HY000): LOAD DATA raises error(s): storage abc not support yet") + checkClientErrorMessage(s.T(), err, + "ERROR 8154 (HY000): The URI of INFILE is invalid. Reason: storage abc not support yet. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}'") err = s.tk.ExecToErr("LOAD DATA INFILE 's3://no-network' INTO TABLE t;") - checkClientErrorMessage(s.T(), err, "ERROR 8154 (HY000): LOAD DATA raises error(s): failed to get region of bucket no-network") + checkClientErrorMessage(s.T(), err, + "ERROR 8155 (HY000): Access to the source file has been denied. Please check the URI, access key and secret access key are correct") err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://wrong-bucket/p?endpoint=%s' INTO TABLE t;`, gcsEndpoint)) - checkClientErrorMessage(s.T(), err, "ERROR 8154 (HY000): LOAD DATA raises error(s): failed to read gcs file, file info: input.bucket='wrong-bucket', input.key='p'") + checkClientErrorMessage(s.T(), err, + "ERROR 8156 (HY000): failed to read gcs file, file info: input.bucket='wrong-bucket', input.key='p'") // TODO: don't use batchCheckAndInsert, mimic (*InsertExec).exec() //s.server.CreateObject(fakestorage.Object{ From 15e79997abc7c9d914e507eadd2c813ecbcd83df Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 24 Feb 2023 16:40:04 +0800 Subject: [PATCH 09/19] save work Signed-off-by: lance6716 --- errno/errcode.go | 10 +++++++--- errno/errname.go | 3 +++ executor/errors.go | 9 ++++++--- executor/load_data.go | 6 +++--- executor/loadremotetest/error_test.go | 4 ++-- 5 files changed, 21 insertions(+), 11 deletions(-) diff --git a/errno/errcode.go b/errno/errcode.go index 850ae498347e0..ec18725aec693 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1045,9 +1045,13 @@ const ( ErrTempTableNotAllowedWithTTL = 8151 ErrUnsupportedTTLReferencedByFK = 8152 ErrUnsupportedPrimaryKeyTypeWithTTL = 8153 - ErrLoadDataInvalidURI = 8154 - ErrLoadDataCantAccess = 8155 - ErrLoadDataNoFile = 8156 + ErrLoadDataFromServerDisk = 8154 + ErrLoadParquetFromLocal = 8155 + ErrLoadDataUnsupportedFormat = 8156 + ErrLoadDataInvalidURI = 8157 + ErrLoadDataCantAccess = 8158 + ErrLoadDataNoFile = 8159 + ErrLoadDataWrongConfig = 8161 // Error codes used by TiDB ddl package ErrUnsupportedDDLOperation = 8200 diff --git a/errno/errname.go b/errno/errname.go index 3e8b8967437c2..7440c0a8a728a 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1040,6 +1040,9 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrTempTableNotAllowedWithTTL: mysql.Message("Set TTL for temporary table is not allowed", nil), ErrUnsupportedTTLReferencedByFK: mysql.Message("Set TTL for a table referenced by foreign key is not allowed", nil), ErrUnsupportedPrimaryKeyTypeWithTTL: mysql.Message("Unsupported clustered primary key type FLOAT/DOUBLE for TTL", nil), + ErrLoadDataFromServerDisk: mysql.Message("The path of INFILE '%s' needs to specify the parameter of LOCAL first", nil), + ErrLoadParquetFromLocal: mysql.Message("Do not support loading parquet files from local. Please try to load the parquet files from the cloud storage", nil), + ErrLoadDataUnsupportedFormat: mysql.Message("The FORMAT '%s' is not supported", nil), ErrLoadDataInvalidURI: mysql.Message("The URI of INFILE is invalid. Reason: %s. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}'", nil), ErrLoadDataCantAccess: mysql.Message("Access to the source file has been denied. Please check the URI, access key and secret access key are correct", nil), ErrLoadDataNoFile: mysql.Message("%s", nil), diff --git a/executor/errors.go b/executor/errors.go index d60da95f6eaca..aae7217be6e3b 100644 --- a/executor/errors.go +++ b/executor/errors.go @@ -77,7 +77,10 @@ var ( errTruncateWrongInsertValue = dbterror.ClassTable.NewStdErr(mysql.ErrTruncatedWrongValue, parser_mysql.Message("Incorrect %-.32s value: '%-.128s' for column '%.192s' at row %d", nil)) ErrExistsInHistoryPassword = dbterror.ClassExecutor.NewStd(mysql.ErrExistsInHistoryPassword) - ErrLoadDataURI = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataInvalidURI) - ErrLoadDataCantAccess = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataCantAccess) - ErrLoadDataNoFile = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataNoFile) + ErrLoadDataFromServerDisk = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataFromServerDisk) + ErrLoadParquetFromLocal = dbterror.ClassExecutor.NewStd(mysql.ErrLoadParquetFromLocal) + ErrLoadDataUnsupportedFormat = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataUnsupportedFormat) + ErrLoadDataURI = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataInvalidURI) + ErrLoadDataCantAccess = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataCantAccess) + ErrLoadDataNoFile = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataNoFile) ) diff --git a/executor/load_data.go b/executor/load_data.go index 02a77fad86752..e3abc44539bb5 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -107,7 +107,7 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error { return ErrLoadDataURI.GenWithStackByArgs(getMsgFromBRError(err)) } if b.GetLocal() != nil { - return ErrLoadDataURI.GenWithStackByArgs("don't support load data from tidb-server's disk") + return ErrLoadDataFromServerDisk.GenWithStackByArgs(e.loadDataWorker.Path) } return e.loadFromRemote(ctx, b, filename) case ast.FileLocClient: @@ -410,7 +410,7 @@ func (e *LoadDataWorker) Load(ctx context.Context, reader io.ReadSeekCloser) err ) case LoadDataFormatParquet: if e.loadRemoteInfo.store == nil { - return errors.New("parquet format requires remote storage") + return ErrLoadParquetFromLocal } parser, err = mydump.NewParquetParser( ctx, @@ -419,7 +419,7 @@ func (e *LoadDataWorker) Load(ctx context.Context, reader io.ReadSeekCloser) err e.loadRemoteInfo.path, ) default: - err = errors.Errorf("unsupported format: %s", e.format) + err = ErrLoadDataUnsupportedFormat.GenWithStackByArgs(e.format) } if err != nil { return ErrLoadDataURI.GenWithStackByArgs(err.Error()) diff --git a/executor/loadremotetest/error_test.go b/executor/loadremotetest/error_test.go index c224cfc592a55..14a5676775945 100644 --- a/executor/loadremotetest/error_test.go +++ b/executor/loadremotetest/error_test.go @@ -52,10 +52,10 @@ func (s *mockGCSSuite) TestErrorMessage() { checkClientErrorMessage(s.T(), err, "ERROR 1054 (42S22): Unknown column 'wrong' in 'field list'") err = s.tk.ExecToErr("LOAD DATA INFILE 'abc://1' INTO TABLE t;") checkClientErrorMessage(s.T(), err, - "ERROR 8154 (HY000): The URI of INFILE is invalid. Reason: storage abc not support yet. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}'") + "ERROR 8157 (HY000): The URI of INFILE is invalid. Reason: storage abc not support yet. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}'") err = s.tk.ExecToErr("LOAD DATA INFILE 's3://no-network' INTO TABLE t;") checkClientErrorMessage(s.T(), err, - "ERROR 8155 (HY000): Access to the source file has been denied. Please check the URI, access key and secret access key are correct") + "ERROR 8158 (HY000): Access to the source file has been denied. Please check the URI, access key and secret access key are correct") err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://wrong-bucket/p?endpoint=%s' INTO TABLE t;`, gcsEndpoint)) checkClientErrorMessage(s.T(), err, From b66ebd176dbe777a954af060c6589fb2bc6fe8ba Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 24 Feb 2023 17:40:42 +0800 Subject: [PATCH 10/19] align with spec Signed-off-by: lance6716 --- br/pkg/lightning/mydump/csv_parser.go | 2 +- br/pkg/lightning/mydump/csv_parser_test.go | 2 +- errno/errcode.go | 5 ++-- errno/errname.go | 3 +- errors.toml | 32 +++++++++++++++++++- executor/errors.go | 3 +- executor/load_data.go | 16 +++++----- executor/loadremotetest/error_test.go | 35 ++++++++++++++++------ 8 files changed, 74 insertions(+), 24 deletions(-) diff --git a/br/pkg/lightning/mydump/csv_parser.go b/br/pkg/lightning/mydump/csv_parser.go index f3b9d45a5d989..fee616de99fe8 100644 --- a/br/pkg/lightning/mydump/csv_parser.go +++ b/br/pkg/lightning/mydump/csv_parser.go @@ -137,7 +137,7 @@ func NewCSVParser( if len(cfg.StartingBy) > 0 { if strings.Contains(cfg.StartingBy, terminator) { - return nil, errors.New("starting-by cannot contain (line) terminator") + return nil, errors.Errorf("STARTING BY '%s' cannot contain TERMINATED BY '%s'", cfg.StartingBy, terminator) } } diff --git a/br/pkg/lightning/mydump/csv_parser_test.go b/br/pkg/lightning/mydump/csv_parser_test.go index 795b11f1c1c20..db51d11b9e492 100644 --- a/br/pkg/lightning/mydump/csv_parser_test.go +++ b/br/pkg/lightning/mydump/csv_parser_test.go @@ -1346,7 +1346,7 @@ yyy",5,xx"xxxx,8 }, } _, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, nil, 1, ioWorkers, false, nil) - require.ErrorContains(t, err, "starting-by cannot contain (line) terminator") + require.ErrorContains(t, err, "STARTING BY 'x\nxx' cannot contain TERMINATED BY '\n'") } func TestCharsetConversion(t *testing.T) { diff --git a/errno/errcode.go b/errno/errcode.go index ec18725aec693..d3a86a2775b38 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1050,8 +1050,9 @@ const ( ErrLoadDataUnsupportedFormat = 8156 ErrLoadDataInvalidURI = 8157 ErrLoadDataCantAccess = 8158 - ErrLoadDataNoFile = 8159 - ErrLoadDataWrongConfig = 8161 + ErrLoadDataCantRead = 8159 + ErrLoadDataPhysicalImportTableNotEmpty = 8160 + ErrLoadDataWrongFormatConfig = 8161 // Error codes used by TiDB ddl package ErrUnsupportedDDLOperation = 8200 diff --git a/errno/errname.go b/errno/errname.go index 7440c0a8a728a..636dbd30c55bc 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1045,7 +1045,8 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrLoadDataUnsupportedFormat: mysql.Message("The FORMAT '%s' is not supported", nil), ErrLoadDataInvalidURI: mysql.Message("The URI of INFILE is invalid. Reason: %s. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}'", nil), ErrLoadDataCantAccess: mysql.Message("Access to the source file has been denied. Please check the URI, access key and secret access key are correct", nil), - ErrLoadDataNoFile: mysql.Message("%s", nil), + ErrLoadDataCantRead: mysql.Message("Failed to read source files. Reason: %s. %s", nil), + ErrLoadDataWrongFormatConfig: mysql.Message("", nil), ErrWarnOptimizerHintInvalidInteger: mysql.Message("integer value is out of range in '%s'", nil), ErrWarnOptimizerHintUnsupportedHint: mysql.Message("Optimizer hint %s is not supported by TiDB and is ignored", nil), diff --git a/errors.toml b/errors.toml index f0b9a9b5e6d42..2e5937b1d9046 100644 --- a/errors.toml +++ b/errors.toml @@ -1648,7 +1648,37 @@ transaction aborted because lazy uniqueness check is enabled and an error occurr ["executor:8154"] error = ''' -LOAD DATA raises error(s): %s +The path of INFILE '%s' needs to specify the parameter of LOCAL first +''' + +["executor:8155"] +error = ''' +Do not support loading parquet files from local. Please try to load the parquet files from the cloud storage +''' + +["executor:8156"] +error = ''' +The FORMAT '%s' is not supported +''' + +["executor:8157"] +error = ''' +The URI of INFILE is invalid. Reason: %s. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}' +''' + +["executor:8158"] +error = ''' +Access to the source file has been denied. Please check the URI, access key and secret access key are correct +''' + +["executor:8159"] +error = ''' +Failed to read source files. Reason: %s. %s +''' + +["executor:8161"] +error = ''' + ''' ["executor:8212"] diff --git a/executor/errors.go b/executor/errors.go index aae7217be6e3b..852e548d15a11 100644 --- a/executor/errors.go +++ b/executor/errors.go @@ -82,5 +82,6 @@ var ( ErrLoadDataUnsupportedFormat = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataUnsupportedFormat) ErrLoadDataURI = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataInvalidURI) ErrLoadDataCantAccess = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataCantAccess) - ErrLoadDataNoFile = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataNoFile) + ErrLoadDataCantRead = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataCantRead) + ErrLoadDataWrongFormatConfig = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataWrongFormatConfig) ) diff --git a/executor/load_data.go b/executor/load_data.go index e3abc44539bb5..05d86158b6cd9 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -154,7 +154,7 @@ func (e *LoadDataExec) loadFromRemote( } fileReader, err := s.Open(ctx, filename) if err != nil { - return ErrLoadDataNoFile.GenWithStackByArgs(getMsgFromBRError(err)) + return ErrLoadDataCantRead.GenWithStackByArgs(getMsgFromBRError(err), "Please check the INFILE path is correct") } defer fileReader.Close() @@ -419,10 +419,10 @@ func (e *LoadDataWorker) Load(ctx context.Context, reader io.ReadSeekCloser) err e.loadRemoteInfo.path, ) default: - err = ErrLoadDataUnsupportedFormat.GenWithStackByArgs(e.format) + return ErrLoadDataUnsupportedFormat.GenWithStackByArgs(e.format) } if err != nil { - return ErrLoadDataURI.GenWithStackByArgs(err.Error()) + return ErrLoadDataWrongFormatConfig.GenWithStack(err.Error()) } parser.SetLogger(log.Logger{Logger: logutil.Logger(ctx)}) @@ -442,10 +442,7 @@ func (e *LoadDataWorker) Load(ctx context.Context, reader io.ReadSeekCloser) err err = group.Wait() e.SetMessage() - if err != nil { - return ErrLoadDataURI.GenWithStackByArgs(err.Error()) - } - return nil + return err } // processStream process input stream from network @@ -632,7 +629,10 @@ func (e *LoadDataWorker) ReadRows(ctx context.Context, parser mydump.Parser) err if errors.Cause(err) == io.EOF { return nil } - return err + return ErrLoadDataCantRead.GenWithStackByArgs( + err.Error(), + "Only the following formats delimited text file (csv, tsv), parquet, sql are supported. Please provide the valid source file(s)", + ) } // rowCount will be used in fillRow(), last insert ID will be assigned according to the rowCount = 1. // So should add first here. diff --git a/executor/loadremotetest/error_test.go b/executor/loadremotetest/error_test.go index 14a5676775945..31e6dba520a55 100644 --- a/executor/loadremotetest/error_test.go +++ b/executor/loadremotetest/error_test.go @@ -18,6 +18,7 @@ import ( "fmt" "testing" + "github.com/fsouza/fake-gcs-server/fakestorage" "github.com/pingcap/errors" "github.com/pingcap/tidb/parser/terror" "github.com/stretchr/testify/require" @@ -59,17 +60,33 @@ func (s *mockGCSSuite) TestErrorMessage() { err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://wrong-bucket/p?endpoint=%s' INTO TABLE t;`, gcsEndpoint)) checkClientErrorMessage(s.T(), err, - "ERROR 8156 (HY000): failed to read gcs file, file info: input.bucket='wrong-bucket', input.key='p'") + "ERROR 8159 (HY000): Failed to read source files. Reason: failed to read gcs file, file info: input.bucket='wrong-bucket', input.key='p'. Please check the INFILE path is correct") + + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{ + BucketName: "test-tsv", + Name: "t.tsv", + }, + Content: []byte("1\t2\n" + + "1\t4\n"), + }) + err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' + FORMAT '123' INTO TABLE t;`, gcsEndpoint)) + checkClientErrorMessage(s.T(), err, + "ERROR 8156 (HY000): The FORMAT '123' is not supported") + err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' + FORMAT 'sqldumpfile' INTO TABLE t;`, gcsEndpoint)) + checkClientErrorMessage(s.T(), err, + "ERROR 8159 (HY000): Failed to read source files. Reason: syntax error: unexpected Integer (1) at offset 1, expecting start of row. Only the following formats delimited text file (csv, tsv), parquet, sql are supported. Please provide the valid source file(s)") + err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' + INTO TABLE t LINES STARTING BY '\n';`, gcsEndpoint)) + checkClientErrorMessage(s.T(), err, + `ERROR 8161 (HY000): STARTING BY ' +' cannot contain TERMINATED BY ' +'`) // TODO: don't use batchCheckAndInsert, mimic (*InsertExec).exec() - //s.server.CreateObject(fakestorage.Object{ - // ObjectAttrs: fakestorage.ObjectAttrs{ - // BucketName: "test-tsv", - // Name: "t.tsv", - // }, - // Content: []byte("1\t2\n" + - // "1\t4\n"), - //}) + //err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' // INTO TABLE t;`, gcsEndpoint)) //checkClientErrorMessage(s.T(), err, "ERROR 1062 (23000): Duplicate entry '1' for key 'PRIMARY'") From 2d532919650f9174975195499b3083d9fa9ac412 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 24 Feb 2023 18:09:48 +0800 Subject: [PATCH 11/19] fix some UT Signed-off-by: lance6716 --- errno/errcode.go | 13 +++++++------ errno/errname.go | 1 + errors.toml | 13 +++++++++---- executor/builder.go | 5 +++++ executor/errors.go | 1 + executor/load_data.go | 9 +++------ executor/loadremotetest/error_test.go | 24 ++++++++++++++++++------ executor/loadremotetest/one_csv_test.go | 2 +- 8 files changed, 45 insertions(+), 23 deletions(-) diff --git a/errno/errcode.go b/errno/errcode.go index d3a86a2775b38..a165e2f6ded2e 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1047,12 +1047,13 @@ const ( ErrUnsupportedPrimaryKeyTypeWithTTL = 8153 ErrLoadDataFromServerDisk = 8154 ErrLoadParquetFromLocal = 8155 - ErrLoadDataUnsupportedFormat = 8156 - ErrLoadDataInvalidURI = 8157 - ErrLoadDataCantAccess = 8158 - ErrLoadDataCantRead = 8159 - ErrLoadDataPhysicalImportTableNotEmpty = 8160 - ErrLoadDataWrongFormatConfig = 8161 + ErrLoadDataEmptyPath = 8156 + ErrLoadDataUnsupportedFormat = 8157 + ErrLoadDataInvalidURI = 8158 + ErrLoadDataCantAccess = 8159 + ErrLoadDataCantRead = 8160 + ErrLoadDataPhysicalImportTableNotEmpty = 8161 + ErrLoadDataWrongFormatConfig = 8162 // Error codes used by TiDB ddl package ErrUnsupportedDDLOperation = 8200 diff --git a/errno/errname.go b/errno/errname.go index 636dbd30c55bc..2040988cb6b89 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1042,6 +1042,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrUnsupportedPrimaryKeyTypeWithTTL: mysql.Message("Unsupported clustered primary key type FLOAT/DOUBLE for TTL", nil), ErrLoadDataFromServerDisk: mysql.Message("The path of INFILE '%s' needs to specify the parameter of LOCAL first", nil), ErrLoadParquetFromLocal: mysql.Message("Do not support loading parquet files from local. Please try to load the parquet files from the cloud storage", nil), + ErrLoadDataEmptyPath: mysql.Message("The value of INFILE must not be empty when LOAD DATA from LOCAL", nil), ErrLoadDataUnsupportedFormat: mysql.Message("The FORMAT '%s' is not supported", nil), ErrLoadDataInvalidURI: mysql.Message("The URI of INFILE is invalid. Reason: %s. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}'", nil), ErrLoadDataCantAccess: mysql.Message("Access to the source file has been denied. Please check the URI, access key and secret access key are correct", nil), diff --git a/errors.toml b/errors.toml index 2e5937b1d9046..8911c907cd855 100644 --- a/errors.toml +++ b/errors.toml @@ -1658,25 +1658,30 @@ Do not support loading parquet files from local. Please try to load the parquet ["executor:8156"] error = ''' -The FORMAT '%s' is not supported +The value of INFILE must not be empty when LOAD DATA from LOCAL ''' ["executor:8157"] error = ''' -The URI of INFILE is invalid. Reason: %s. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}' +The FORMAT '%s' is not supported ''' ["executor:8158"] error = ''' -Access to the source file has been denied. Please check the URI, access key and secret access key are correct +The URI of INFILE is invalid. Reason: %s. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}' ''' ["executor:8159"] error = ''' +Access to the source file has been denied. Please check the URI, access key and secret access key are correct +''' + +["executor:8160"] +error = ''' Failed to read source files. Reason: %s. %s ''' -["executor:8161"] +["executor:8162"] error = ''' ''' diff --git a/executor/builder.go b/executor/builder.go index a0a0f370ded13..b38e5d85148bb 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -938,6 +938,11 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor { b.err = errors.Errorf("Can not get table %d", v.Table.TableInfo.ID) return nil } + if !tbl.Meta().IsBaseTable() { + b.err = plannercore.ErrNonUpdatableTable.GenWithStackByArgs(tbl.Meta().Name.O, "LOAD") + return nil + } + worker, err := NewLoadDataWorker(b.ctx, v, tbl) if err != nil { b.err = err diff --git a/executor/errors.go b/executor/errors.go index 852e548d15a11..b2c2fd74aa2d5 100644 --- a/executor/errors.go +++ b/executor/errors.go @@ -79,6 +79,7 @@ var ( ErrLoadDataFromServerDisk = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataFromServerDisk) ErrLoadParquetFromLocal = dbterror.ClassExecutor.NewStd(mysql.ErrLoadParquetFromLocal) + ErrLoadDataEmptyPath = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataEmptyPath) ErrLoadDataUnsupportedFormat = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataUnsupportedFormat) ErrLoadDataURI = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataInvalidURI) ErrLoadDataCantAccess = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataCantAccess) diff --git a/executor/load_data.go b/executor/load_data.go index 05d86158b6cd9..b4d2dc3c3eedb 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -76,21 +76,18 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error { req.GrowAndReset(e.maxChunkSize) if e.loadDataWorker.Path == "" { - return ErrLoadDataURI.GenWithStackByArgs("INFILE path is empty") - } - if !e.loadDataWorker.table.Meta().IsBaseTable() { - return ErrLoadDataURI.GenWithStackByArgs("can only load data into base tables") + return ErrLoadDataEmptyPath } // CSV-like if e.loadDataWorker.format == "" { if e.loadDataWorker.NullInfo != nil && e.loadDataWorker.NullInfo.OptEnclosed && (e.loadDataWorker.FieldsInfo == nil || e.loadDataWorker.FieldsInfo.Enclosed == nil) { - return ErrLoadDataURI.GenWithStackByArgs("must specify FIELDS [OPTIONALLY] ENCLOSED BY when use NULL DEFINED BY OPTIONALLY ENCLOSED") + return ErrLoadDataWrongFormatConfig.GenWithStackByArgs("must specify FIELDS [OPTIONALLY] ENCLOSED BY when use NULL DEFINED BY OPTIONALLY ENCLOSED") } // TODO: support lines terminated is "". if len(e.loadDataWorker.LinesInfo.Terminated) == 0 { - return ErrLoadDataURI.GenWithStackByArgs("don't support load data terminated is nil") + return ErrLoadDataWrongFormatConfig.GenWithStackByArgs("don't support load data terminated is nil") } } diff --git a/executor/loadremotetest/error_test.go b/executor/loadremotetest/error_test.go index 31e6dba520a55..98f1450cbb518 100644 --- a/executor/loadremotetest/error_test.go +++ b/executor/loadremotetest/error_test.go @@ -53,14 +53,14 @@ func (s *mockGCSSuite) TestErrorMessage() { checkClientErrorMessage(s.T(), err, "ERROR 1054 (42S22): Unknown column 'wrong' in 'field list'") err = s.tk.ExecToErr("LOAD DATA INFILE 'abc://1' INTO TABLE t;") checkClientErrorMessage(s.T(), err, - "ERROR 8157 (HY000): The URI of INFILE is invalid. Reason: storage abc not support yet. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}'") + "ERROR 8158 (HY000): The URI of INFILE is invalid. Reason: storage abc not support yet. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}'") err = s.tk.ExecToErr("LOAD DATA INFILE 's3://no-network' INTO TABLE t;") checkClientErrorMessage(s.T(), err, - "ERROR 8158 (HY000): Access to the source file has been denied. Please check the URI, access key and secret access key are correct") + "ERROR 8159 (HY000): Access to the source file has been denied. Please check the URI, access key and secret access key are correct") err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://wrong-bucket/p?endpoint=%s' INTO TABLE t;`, gcsEndpoint)) checkClientErrorMessage(s.T(), err, - "ERROR 8159 (HY000): Failed to read source files. Reason: failed to read gcs file, file info: input.bucket='wrong-bucket', input.key='p'. Please check the INFILE path is correct") + "ERROR 8160 (HY000): Failed to read source files. Reason: failed to read gcs file, file info: input.bucket='wrong-bucket', input.key='p'. Please check the INFILE path is correct") s.server.CreateObject(fakestorage.Object{ ObjectAttrs: fakestorage.ObjectAttrs{ @@ -73,18 +73,30 @@ func (s *mockGCSSuite) TestErrorMessage() { err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' FORMAT '123' INTO TABLE t;`, gcsEndpoint)) checkClientErrorMessage(s.T(), err, - "ERROR 8156 (HY000): The FORMAT '123' is not supported") + "ERROR 8157 (HY000): The FORMAT '123' is not supported") err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' FORMAT 'sqldumpfile' INTO TABLE t;`, gcsEndpoint)) checkClientErrorMessage(s.T(), err, - "ERROR 8159 (HY000): Failed to read source files. Reason: syntax error: unexpected Integer (1) at offset 1, expecting start of row. Only the following formats delimited text file (csv, tsv), parquet, sql are supported. Please provide the valid source file(s)") + "ERROR 8160 (HY000): Failed to read source files. Reason: syntax error: unexpected Integer (1) at offset 1, expecting start of row. Only the following formats delimited text file (csv, tsv), parquet, sql are supported. Please provide the valid source file(s)") err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' INTO TABLE t LINES STARTING BY '\n';`, gcsEndpoint)) checkClientErrorMessage(s.T(), err, - `ERROR 8161 (HY000): STARTING BY ' + `ERROR 8162 (HY000): STARTING BY ' ' cannot contain TERMINATED BY ' '`) + // TODO: fix these tests + //s.tk.MustExec("CREATE TABLE t2 (c1 INT, c2 INT, c3 INT);") + //err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' + // INTO TABLE t2;`, gcsEndpoint)) + //checkClientErrorMessage(s.T(), err, + // "ERROR 1261 (01000): Row 1 doesn't contain data for all columns") + //s.tk.MustExec("CREATE TABLE t3 (c1 INT);") + //err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' + // INTO TABLE t3;`, gcsEndpoint)) + //checkClientErrorMessage(s.T(), err, + // "ERROR 1262 (01000): Row 1 was truncated; it contained more data than there were input columns") + // TODO: don't use batchCheckAndInsert, mimic (*InsertExec).exec() //err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' diff --git a/executor/loadremotetest/one_csv_test.go b/executor/loadremotetest/one_csv_test.go index ff9fd277f7dfe..15c4cb4e58e81 100644 --- a/executor/loadremotetest/one_csv_test.go +++ b/executor/loadremotetest/one_csv_test.go @@ -81,7 +81,7 @@ func (s *mockGCSSuite) TestLoadCSV() { // can't read file at tidb-server sql = "LOAD DATA INFILE '/etc/passwd' INTO TABLE load_csv.t;" - s.tk.MustContainErrMsg(sql, "don't support load data from tidb-server") + s.tk.MustContainErrMsg(sql, "The path of INFILE '/etc/passwd' needs to specify the parameter of LOCAL first") } func (s *mockGCSSuite) TestIgnoreNLines() { From 4a627d45ad92c72691c5449d88a33b877fdb0fbf Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 24 Feb 2023 18:12:40 +0800 Subject: [PATCH 12/19] fix more Signed-off-by: lance6716 --- server/server_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/server_test.go b/server/server_test.go index 4db3ee56a6c88..53903d8002940 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1037,10 +1037,10 @@ func (cli *testServerClient) runTestLoadData(t *testing.T, server *Server) { // can't insert into views (in TiDB) or sequences. issue #20880 _, err = dbt.GetDB().Exec(fmt.Sprintf("load data local infile %q into table v1", path)) require.Error(t, err) - require.Equal(t, "Error 8154 (HY000): LOAD DATA raises error(s): can only load data into base tables", err.Error()) + require.Equal(t, "Error 1288 (HY000): The target table v1 of the LOAD is not updatable", err.Error()) _, err = dbt.GetDB().Exec(fmt.Sprintf("load data local infile %q into table s1", path)) require.Error(t, err) - require.Equal(t, "Error 8154 (HY000): LOAD DATA raises error(s): can only load data into base tables", err.Error()) + require.Equal(t, "Error 1288 (HY000): The target table s1 of the LOAD is not updatable", err.Error()) rs, err1 := dbt.GetDB().Exec(fmt.Sprintf("load data local infile %q into table test", path)) require.NoError(t, err1) From 7857c268af052dbcb900e492b26e983ca684795d Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 24 Feb 2023 18:33:24 +0800 Subject: [PATCH 13/19] fix UT Signed-off-by: lance6716 --- server/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/server_test.go b/server/server_test.go index 53903d8002940..4628af181b61a 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1412,7 +1412,7 @@ func (cli *testServerClient) runTestLoadData(t *testing.T, server *Server) { _, err1 = dbt.GetDB().Exec(fmt.Sprintf(`load data local infile %q into table pn FIELDS TERMINATED BY ','`, path)) mysqlErr, ok := err1.(*mysql.MySQLError) require.True(t, ok) - require.Equal(t, "LOAD DATA raises error(s): mock commit one task error", mysqlErr.Message) + require.Equal(t, "mock commit one task error", mysqlErr.Message) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/commitOneTaskErr")) dbt.MustExec("drop table if exists pn") From 90cae54ba90dcc5bf0e9992e52ea3bdd139c9c7e Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 27 Feb 2023 13:42:53 +0800 Subject: [PATCH 14/19] align with spec Signed-off-by: lance6716 --- errno/errname.go | 2 +- executor/load_data.go | 2 +- executor/loadremotetest/error_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/errno/errname.go b/errno/errname.go index 2040988cb6b89..b7272a4710e27 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1040,7 +1040,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrTempTableNotAllowedWithTTL: mysql.Message("Set TTL for temporary table is not allowed", nil), ErrUnsupportedTTLReferencedByFK: mysql.Message("Set TTL for a table referenced by foreign key is not allowed", nil), ErrUnsupportedPrimaryKeyTypeWithTTL: mysql.Message("Unsupported clustered primary key type FLOAT/DOUBLE for TTL", nil), - ErrLoadDataFromServerDisk: mysql.Message("The path of INFILE '%s' needs to specify the parameter of LOCAL first", nil), + ErrLoadDataFromServerDisk: mysql.Message("Don't support load data from tidb-server's disk. Or if you want to load local data via client, the path of INFILE '%s' needs to specify the clause of LOCAL first", nil), ErrLoadParquetFromLocal: mysql.Message("Do not support loading parquet files from local. Please try to load the parquet files from the cloud storage", nil), ErrLoadDataEmptyPath: mysql.Message("The value of INFILE must not be empty when LOAD DATA from LOCAL", nil), ErrLoadDataUnsupportedFormat: mysql.Message("The FORMAT '%s' is not supported", nil), diff --git a/executor/load_data.go b/executor/load_data.go index b4d2dc3c3eedb..885bbeede13d5 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -50,7 +50,7 @@ import ( const ( // LoadDataFormatSQLDump represents the data source file of LOAD DATA is // mydumper-format DML file - LoadDataFormatSQLDump = "sqldumpfile" + LoadDataFormatSQLDump = "sql file" // LoadDataFormatParquet represents the data source file of LOAD DATA is // parquet LoadDataFormatParquet = "parquet" diff --git a/executor/loadremotetest/error_test.go b/executor/loadremotetest/error_test.go index 98f1450cbb518..7d17539ef00fe 100644 --- a/executor/loadremotetest/error_test.go +++ b/executor/loadremotetest/error_test.go @@ -75,7 +75,7 @@ func (s *mockGCSSuite) TestErrorMessage() { checkClientErrorMessage(s.T(), err, "ERROR 8157 (HY000): The FORMAT '123' is not supported") err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' - FORMAT 'sqldumpfile' INTO TABLE t;`, gcsEndpoint)) + FORMAT 'sql file' INTO TABLE t;`, gcsEndpoint)) checkClientErrorMessage(s.T(), err, "ERROR 8160 (HY000): Failed to read source files. Reason: syntax error: unexpected Integer (1) at offset 1, expecting start of row. Only the following formats delimited text file (csv, tsv), parquet, sql are supported. Please provide the valid source file(s)") err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' From ed6a2d00a5c39004dbe300493e108a07fd9f55ea Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 27 Feb 2023 13:51:52 +0800 Subject: [PATCH 15/19] fix make errdoc Signed-off-by: lance6716 --- errors.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/errors.toml b/errors.toml index 8911c907cd855..bbc2ff8d78192 100644 --- a/errors.toml +++ b/errors.toml @@ -1648,7 +1648,7 @@ transaction aborted because lazy uniqueness check is enabled and an error occurr ["executor:8154"] error = ''' -The path of INFILE '%s' needs to specify the parameter of LOCAL first +Don't support load data from tidb-server's disk. Or if you want to load local data via client, the path of INFILE '%s' needs to specify the clause of LOCAL first ''' ["executor:8155"] From 2e6d13c0790cb97ceae9af4e0f9a9f0f095968c6 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 28 Feb 2023 10:36:35 +0800 Subject: [PATCH 16/19] address comment Signed-off-by: lance6716 --- br/pkg/lightning/mydump/csv_parser.go | 2 +- br/pkg/lightning/mydump/csv_parser_test.go | 2 +- executor/errors.go | 2 +- executor/load_data.go | 29 ++++++---------------- executor/loadremotetest/error_test.go | 2 +- executor/utils.go | 19 ++++++++++++++ executor/utils_test.go | 10 ++++++++ 7 files changed, 40 insertions(+), 26 deletions(-) diff --git a/br/pkg/lightning/mydump/csv_parser.go b/br/pkg/lightning/mydump/csv_parser.go index fee616de99fe8..854f63830b468 100644 --- a/br/pkg/lightning/mydump/csv_parser.go +++ b/br/pkg/lightning/mydump/csv_parser.go @@ -137,7 +137,7 @@ func NewCSVParser( if len(cfg.StartingBy) > 0 { if strings.Contains(cfg.StartingBy, terminator) { - return nil, errors.Errorf("STARTING BY '%s' cannot contain TERMINATED BY '%s'", cfg.StartingBy, terminator) + return nil, errors.Errorf("STARTING BY '%s' cannot contain LINES TERMINATED BY '%s'", cfg.StartingBy, terminator) } } diff --git a/br/pkg/lightning/mydump/csv_parser_test.go b/br/pkg/lightning/mydump/csv_parser_test.go index db51d11b9e492..3e002eeef42ab 100644 --- a/br/pkg/lightning/mydump/csv_parser_test.go +++ b/br/pkg/lightning/mydump/csv_parser_test.go @@ -1346,7 +1346,7 @@ yyy",5,xx"xxxx,8 }, } _, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, nil, 1, ioWorkers, false, nil) - require.ErrorContains(t, err, "STARTING BY 'x\nxx' cannot contain TERMINATED BY '\n'") + require.ErrorContains(t, err, "STARTING BY 'x\nxx' cannot contain LINES TERMINATED BY '\n'") } func TestCharsetConversion(t *testing.T) { diff --git a/executor/errors.go b/executor/errors.go index b2c2fd74aa2d5..45c6916bbe146 100644 --- a/executor/errors.go +++ b/executor/errors.go @@ -81,7 +81,7 @@ var ( ErrLoadParquetFromLocal = dbterror.ClassExecutor.NewStd(mysql.ErrLoadParquetFromLocal) ErrLoadDataEmptyPath = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataEmptyPath) ErrLoadDataUnsupportedFormat = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataUnsupportedFormat) - ErrLoadDataURI = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataInvalidURI) + ErrLoadDataInvalidURI = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataInvalidURI) ErrLoadDataCantAccess = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataCantAccess) ErrLoadDataCantRead = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataCantRead) ErrLoadDataWrongFormatConfig = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataWrongFormatConfig) diff --git a/executor/load_data.go b/executor/load_data.go index 885bbeede13d5..2720d182d6b69 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -87,7 +87,7 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error { } // TODO: support lines terminated is "". if len(e.loadDataWorker.LinesInfo.Terminated) == 0 { - return ErrLoadDataWrongFormatConfig.GenWithStackByArgs("don't support load data terminated is nil") + return ErrLoadDataWrongFormatConfig.GenWithStackByArgs("LINES TERMINATED BY is empty") } } @@ -95,13 +95,13 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error { case ast.FileLocServerOrRemote: u, err := storage.ParseRawURL(e.loadDataWorker.Path) if err != nil { - return ErrLoadDataURI.GenWithStackByArgs(err.Error()) + return ErrLoadDataInvalidURI.GenWithStackByArgs(err.Error()) } var filename string u.Path, filename = filepath.Split(u.Path) b, err := storage.ParseBackendFromURL(u, nil) if err != nil { - return ErrLoadDataURI.GenWithStackByArgs(getMsgFromBRError(err)) + return ErrLoadDataInvalidURI.GenWithStackByArgs(getMsgFromBRError(err)) } if b.GetLocal() != nil { return ErrLoadDataFromServerDisk.GenWithStackByArgs(e.loadDataWorker.Path) @@ -113,29 +113,13 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error { val := sctx.Value(LoadDataVarKey) if val != nil { sctx.SetValue(LoadDataVarKey, nil) - return ErrLoadDataURI.GenWithStackByArgs("previous load data option wasn't closed normally") + return errors.New("previous load data option wasn't closed normally") } sctx.SetValue(LoadDataVarKey, e.loadDataWorker) } return nil } -// TODO: add GetMsg() to errors package to replace this function. -func getMsgFromBRError(err error) string { - if err == nil { - return "" - } - if berr, ok := err.(*errors.Error); ok { - return berr.GetMsg() - } - raw := err.Error() - berrMsg := errors.Cause(err).Error() - if len(raw) <= len(berrMsg)+len(": ") { - return raw - } - return raw[:len(raw)-len(berrMsg)-len(": ")] -} - func (e *LoadDataExec) loadFromRemote( ctx context.Context, b *backup.StorageBackend, @@ -477,6 +461,7 @@ func (e *LoadDataWorker) processStream( case <-checkKilled.C: if atomic.CompareAndSwapUint32(&e.Ctx.GetSessionVars().Killed, 1, 0) { logutil.Logger(ctx).Info("load data query interrupted quit data processing") + close(e.commitTaskQueue) return ErrQueryInterrupted } goto TrySendTask @@ -556,11 +541,11 @@ func (e *LoadDataWorker) commitOneTask(ctx context.Context, task commitTask) err return err } -// CheckAndInsertOneBatch is used to commit one transaction batch full filled data +// CheckAndInsertOneBatch is used to commit one transaction batch fulfilled data func (e *LoadDataWorker) CheckAndInsertOneBatch(ctx context.Context, rows [][]types.Datum, cnt uint64) error { if e.stats != nil && e.stats.BasicRuntimeStats != nil { // Since this method will not call by executor Next, - // so we need record the basic executor runtime stats by ourself. + // so we need record the basic executor runtime stats by ourselves. start := time.Now() defer func() { e.stats.BasicRuntimeStats.Record(time.Since(start), 0) diff --git a/executor/loadremotetest/error_test.go b/executor/loadremotetest/error_test.go index 7d17539ef00fe..c47ae6f6764be 100644 --- a/executor/loadremotetest/error_test.go +++ b/executor/loadremotetest/error_test.go @@ -82,7 +82,7 @@ func (s *mockGCSSuite) TestErrorMessage() { INTO TABLE t LINES STARTING BY '\n';`, gcsEndpoint)) checkClientErrorMessage(s.T(), err, `ERROR 8162 (HY000): STARTING BY ' -' cannot contain TERMINATED BY ' +' cannot contain LINES TERMINATED BY ' '`) // TODO: fix these tests diff --git a/executor/utils.go b/executor/utils.go index 47fe32a93aa68..4997eb9d277a8 100644 --- a/executor/utils.go +++ b/executor/utils.go @@ -16,6 +16,8 @@ package executor import ( "strings" + + "github.com/pingcap/errors" ) // SetFromString constructs a slice of strings from a comma separated string. @@ -92,3 +94,20 @@ func (b *batchRetrieverHelper) nextBatch(retrieveRange func(start, end int) erro } return nil } + +// TODO: add GetMsg() to errors package to replace this function. +// see TestGetMsgFromBRError for more details. +func getMsgFromBRError(err error) string { + if err == nil { + return "" + } + if berr, ok := err.(*errors.Error); ok { + return berr.GetMsg() + } + raw := err.Error() + berrMsg := errors.Cause(err).Error() + if len(raw) <= len(berrMsg)+len(": ") { + return raw + } + return raw[:len(raw)-len(berrMsg)-len(": ")] +} diff --git a/executor/utils_test.go b/executor/utils_test.go index 3c8a32de25cc5..e0795228141a5 100644 --- a/executor/utils_test.go +++ b/executor/utils_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/pingcap/errors" + berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/stretchr/testify/require" ) @@ -93,3 +94,12 @@ func TestBatchRetrieverHelper(t *testing.T) { require.Equal(t, rangeStarts, []int{0}) require.Equal(t, rangeEnds, []int{10}) } + +func TestGetMsgFromBRError(t *testing.T) { + var berr error = berrors.ErrStorageInvalidConfig + require.Equal(t, "[BR:ExternalStorage:ErrStorageInvalidConfig]invalid external storage config", berr.Error()) + require.Equal(t, "invalid external storage config", getMsgFromBRError(berr)) + berr = errors.Annotatef(berr, "some message about error reason") + require.Equal(t, "some message about error reason: [BR:ExternalStorage:ErrStorageInvalidConfig]invalid external storage config", berr.Error()) + require.Equal(t, "some message about error reason", getMsgFromBRError(berr)) +} From 52dc64d1eb61ee9f67bae937e38a4704c13c42d4 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 28 Feb 2023 10:44:50 +0800 Subject: [PATCH 17/19] fix bazel Signed-off-by: lance6716 --- executor/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index 52d3fc5986b4b..4393a91b32e64 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -347,6 +347,7 @@ go_test( flaky = True, shard_count = 50, deps = [ + "//br/pkg/errors", "//config", "//ddl", "//ddl/placement", From 2bf2dc582eb49a0300ea1f3a35fd54936fd5ccd5 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 28 Feb 2023 11:29:20 +0800 Subject: [PATCH 18/19] fix UT Signed-off-by: lance6716 --- executor/loadremotetest/one_csv_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/loadremotetest/one_csv_test.go b/executor/loadremotetest/one_csv_test.go index 15c4cb4e58e81..cc98e6ff68af8 100644 --- a/executor/loadremotetest/one_csv_test.go +++ b/executor/loadremotetest/one_csv_test.go @@ -81,7 +81,7 @@ func (s *mockGCSSuite) TestLoadCSV() { // can't read file at tidb-server sql = "LOAD DATA INFILE '/etc/passwd' INTO TABLE load_csv.t;" - s.tk.MustContainErrMsg(sql, "The path of INFILE '/etc/passwd' needs to specify the parameter of LOCAL first") + s.tk.MustContainErrMsg(sql, "Don't support load data from tidb-server's disk. Or if you want to load local data via client, the path of INFILE '/etc/passwd' needs to specify the clause of LOCAL first") } func (s *mockGCSSuite) TestIgnoreNLines() { From 105807684f0184e1bd5de91c497d6cd3a36abed0 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 28 Feb 2023 15:21:45 +0800 Subject: [PATCH 19/19] fix UT Signed-off-by: lance6716 --- executor/loadremotetest/one_sqldump_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/loadremotetest/one_sqldump_test.go b/executor/loadremotetest/one_sqldump_test.go index ec320787e3b05..e2755c8f0d74b 100644 --- a/executor/loadremotetest/one_sqldump_test.go +++ b/executor/loadremotetest/one_sqldump_test.go @@ -36,7 +36,7 @@ func (s *mockGCSSuite) TestLoadSQLDump() { }) sql := fmt.Sprintf(`LOAD DATA INFILE 'gs://test-load-parquet/p?endpoint=%s' - FORMAT 'SQLDumpFile' INTO TABLE load_csv.t;`, gcsEndpoint) + FORMAT 'SQL file' INTO TABLE load_csv.t;`, gcsEndpoint) s.tk.MustExec(sql) s.tk.MustQuery("SELECT * FROM load_csv.t;").Check(testkit.Rows( "1 a",