diff --git a/ast/dml.go b/ast/dml.go index 1cb75897b0e73..d3f6575dee0fe 100644 --- a/ast/dml.go +++ b/ast/dml.go @@ -661,12 +661,13 @@ func (n *Assignment) Accept(v Visitor) (Node, bool) { type LoadDataStmt struct { dmlNode - IsLocal bool - Path string - Table *TableName - Columns []*ColumnName - FieldsInfo *FieldsClause - LinesInfo *LinesClause + IsLocal bool + Path string + Table *TableName + Columns []*ColumnName + FieldsInfo *FieldsClause + LinesInfo *LinesClause + IgnoreLines uint64 } // Accept implements Node Accept interface. diff --git a/executor/builder.go b/executor/builder.go index f371b0a21da4f..ceb0245c8a17f 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -564,6 +564,7 @@ func (b *executorBuilder) buildLoadData(v *plan.LoadData) Executor { Table: tbl, FieldsInfo: v.FieldsInfo, LinesInfo: v.LinesInfo, + IgnoreLines: v.IgnoreLines, Ctx: b.ctx, columns: columns, }, diff --git a/executor/executor_test.go b/executor/executor_test.go index 039bb2cdb53d0..6ab4b4bbcb755 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -292,7 +292,9 @@ type testCase struct { func checkCases(tests []testCase, ld *executor.LoadDataInfo, c *C, tk *testkit.TestKit, ctx sessionctx.Context, selectSQL, deleteSQL string) { + origin := ld.IgnoreLines for _, tt := range tests { + ld.IgnoreLines = origin c.Assert(ctx.NewTxn(), IsNil) ctx.GetSessionVars().StmtCtx.DupKeyAsWarning = true ctx.GetSessionVars().StmtCtx.BadNullAsWarning = true diff --git a/executor/load_data.go b/executor/load_data.go index afe7b61f5bce3..46e9eb99c9c0a 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -88,13 +88,14 @@ func (e *LoadDataExec) Open(ctx context.Context) error { type LoadDataInfo struct { *InsertValues - row []types.Datum - Path string - Table table.Table - FieldsInfo *ast.FieldsClause - LinesInfo *ast.LinesClause - Ctx sessionctx.Context - columns []*table.Column + row []types.Datum + Path string + Table table.Table + FieldsInfo *ast.FieldsClause + LinesInfo *ast.LinesClause + IgnoreLines uint64 + Ctx sessionctx.Context + columns []*table.Column } // SetMaxRowsInBatch sets the max number of rows to insert in a batch. @@ -235,6 +236,10 @@ func (e *LoadDataInfo) InsertData(prevData, curData []byte) ([]byte, bool, error curData = nil } + if e.IgnoreLines > 0 { + e.IgnoreLines-- + continue + } cols, err := e.getFieldsFromLine(line) if err != nil { return nil, false, errors.Trace(err) diff --git a/executor/write_test.go b/executor/write_test.go index 30fa336e9d296..9427972d83f2e 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -1635,6 +1635,25 @@ func (s *testSuite) TestLoadDataSpecifiedColumns(c *C) { checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL) } +func (s *testSuite) TestLoadDataIgnoreLines(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test; drop table if exists load_data_test;") + tk.MustExec("CREATE TABLE load_data_test (id INT NOT NULL PRIMARY KEY, value TEXT NOT NULL) CHARACTER SET utf8") + tk.MustExec("load data local infile '/tmp/nonexistence.csv' into table load_data_test ignore 1 lines") + ctx := tk.Se.(sessionctx.Context) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + c.Assert(ok, IsTrue) + defer ctx.SetValue(executor.LoadDataVarKey, nil) + c.Assert(ld, NotNil) + tests := []testCase{ + {nil, []byte("1\tline1\n2\tline2\n"), []string{"2|line2"}, nil}, + {nil, []byte("1\tline1\n2\tline2\n3\tline3\n"), []string{"2|line2", "3|line3"}, nil}, + } + deleteSQL := "delete from load_data_test" + selectSQL := "select * from load_data_test;" + checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL) +} + func (s *testSuite) TestBatchInsertDelete(c *C) { originLimit := atomic.LoadUint64(&kv.TxnEntryCountLimit) defer func() { diff --git a/parser/parser.y b/parser/parser.y index a45a619322056..49ba524035805 100644 --- a/parser/parser.y +++ b/parser/parser.y @@ -804,6 +804,7 @@ import ( OptBinMod "Optional BINARY mode" OptCharset "Optional Character setting" OptCollate "Optional Collate setting" + IgnoreLines "Ignore num(int) lines" NUM "A number" NumList "Some numbers" LengthNum "Field length num(uint64)" @@ -6894,12 +6895,13 @@ RevokeStmt: * See https://dev.mysql.com/doc/refman/5.7/en/load-data.html *******************************************************************************************/ LoadDataStmt: - "LOAD" "DATA" LocalOpt "INFILE" stringLit "INTO" "TABLE" TableName CharsetOpt Fields Lines ColumnNameListOptWithBrackets + "LOAD" "DATA" LocalOpt "INFILE" stringLit "INTO" "TABLE" TableName CharsetOpt Fields Lines IgnoreLines ColumnNameListOptWithBrackets { x := &ast.LoadDataStmt{ Path: $5, Table: $8.(*ast.TableName), - Columns: $12.([]*ast.ColumnName), + Columns: $13.([]*ast.ColumnName), + IgnoreLines:$12.(uint64), } if $3 != nil { x.IsLocal = true @@ -6913,6 +6915,15 @@ LoadDataStmt: $$ = x } +IgnoreLines: + { + $$ = uint64(0) + } +| "IGNORE" NUM "LINES" + { + $$ = getUint64FromNUM($2) + } + CharsetOpt: {} | "CHARACTER" "SET" CharsetName diff --git a/parser/parser_test.go b/parser/parser_test.go index 63516a6df5f1e..771b96f4ed063 100644 --- a/parser/parser_test.go +++ b/parser/parser_test.go @@ -389,6 +389,11 @@ func (s *testParserSuite) TestDMLStmt(c *C) { {"load data local infile '/tmp/t.csv' into table t character set utf8 fields terminated by 'ab' lines terminated by 'xy' (a,b)", true}, {"load data local infile '/tmp/t.csv' into table t fields terminated by 'ab' lines terminated by 'xy' (a,b)", true}, {"load data local infile '/tmp/t.csv' into table t (a,b) fields terminated by 'ab'", false}, + {"load data local infile '/tmp/t.csv' into table t ignore 1 lines", true}, + {"load data local infile '/tmp/t.csv' into table t ignore -1 lines", false}, + {"load data local infile '/tmp/t.csv' into table t fields terminated by 'ab' enclosed by 'b' (a,b) ignore 1 lines", false}, + {"load data local infile '/tmp/t.csv' into table t lines starting by 'ab' terminated by 'xy' ignore 1 lines", true}, + {"load data local infile '/tmp/t.csv' into table t fields terminated by 'ab' enclosed by 'b' escaped by '*' ignore 1 lines (a,b)", true}, // select for update {"SELECT * from t for update", true}, diff --git a/plan/common_plans.go b/plan/common_plans.go index 92c60c866a4c0..84b8cfcadd0d5 100644 --- a/plan/common_plans.go +++ b/plan/common_plans.go @@ -382,12 +382,13 @@ type Analyze struct { type LoadData struct { baseSchemaProducer - IsLocal bool - Path string - Table *ast.TableName - Columns []*ast.ColumnName - FieldsInfo *ast.FieldsClause - LinesInfo *ast.LinesClause + IsLocal bool + Path string + Table *ast.TableName + Columns []*ast.ColumnName + FieldsInfo *ast.FieldsClause + LinesInfo *ast.LinesClause + IgnoreLines uint64 GenCols InsertGeneratedColumns } diff --git a/plan/planbuilder.go b/plan/planbuilder.go index 57caef11e572d..199f76eaffeba 100644 --- a/plan/planbuilder.go +++ b/plan/planbuilder.go @@ -1268,12 +1268,13 @@ func (b *planBuilder) buildSelectPlanOfInsert(insert *ast.InsertStmt, insertPlan func (b *planBuilder) buildLoadData(ld *ast.LoadDataStmt) (Plan, error) { p := &LoadData{ - IsLocal: ld.IsLocal, - Path: ld.Path, - Table: ld.Table, - Columns: ld.Columns, - FieldsInfo: ld.FieldsInfo, - LinesInfo: ld.LinesInfo, + IsLocal: ld.IsLocal, + Path: ld.Path, + Table: ld.Table, + Columns: ld.Columns, + FieldsInfo: ld.FieldsInfo, + LinesInfo: ld.LinesInfo, + IgnoreLines: ld.IgnoreLines, } tableInfo := p.Table.TableInfo tableInPlan, ok := b.is.TableByID(tableInfo.ID)