From feb6c6d52c440ef5119559e39c4ee85dadf36069 Mon Sep 17 00:00:00 2001 From: CaiXiaoJian Date: Wed, 29 Aug 2018 06:09:02 +0000 Subject: [PATCH 1/5] support load data with ignore lines --- ast/dml.go | 13 +++++++------ executor/builder.go | 1 + executor/executor_test.go | 2 ++ executor/load_data.go | 19 ++++++++++++------- executor/write_test.go | 21 +++++++++++++++++++++ parser/parser.y | 15 +++++++++++++-- parser/parser_test.go | 4 ++++ plan/common_plans.go | 13 +++++++------ plan/planbuilder.go | 13 +++++++------ 9 files changed, 74 insertions(+), 27 deletions(-) diff --git a/ast/dml.go b/ast/dml.go index 1cb75897b0e73..d3f6575dee0fe 100644 --- a/ast/dml.go +++ b/ast/dml.go @@ -661,12 +661,13 @@ func (n *Assignment) Accept(v Visitor) (Node, bool) { type LoadDataStmt struct { dmlNode - IsLocal bool - Path string - Table *TableName - Columns []*ColumnName - FieldsInfo *FieldsClause - LinesInfo *LinesClause + IsLocal bool + Path string + Table *TableName + Columns []*ColumnName + FieldsInfo *FieldsClause + LinesInfo *LinesClause + IgnoreLines uint64 } // Accept implements Node Accept interface. diff --git a/executor/builder.go b/executor/builder.go index aed3694fdde3a..7728db8e0e3bb 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -564,6 +564,7 @@ func (b *executorBuilder) buildLoadData(v *plan.LoadData) Executor { Table: tbl, FieldsInfo: v.FieldsInfo, LinesInfo: v.LinesInfo, + IgnoreLines: v.IgnoreLines, Ctx: b.ctx, columns: columns, }, diff --git a/executor/executor_test.go b/executor/executor_test.go index 926a3e3f40afc..826f1d68bfc33 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -292,7 +292,9 @@ type testCase struct { func checkCases(tests []testCase, ld *executor.LoadDataInfo, c *C, tk *testkit.TestKit, ctx sessionctx.Context, selectSQL, deleteSQL string) { + origin := ld.IgnoreLines for _, tt := range tests { + ld.IgnoreLines = origin c.Assert(ctx.NewTxn(), IsNil) ctx.GetSessionVars().StmtCtx.DupKeyAsWarning = true ctx.GetSessionVars().StmtCtx.BadNullAsWarning = true diff --git a/executor/load_data.go b/executor/load_data.go index afe7b61f5bce3..46e9eb99c9c0a 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -88,13 +88,14 @@ func (e *LoadDataExec) Open(ctx context.Context) error { type LoadDataInfo struct { *InsertValues - row []types.Datum - Path string - Table table.Table - FieldsInfo *ast.FieldsClause - LinesInfo *ast.LinesClause - Ctx sessionctx.Context - columns []*table.Column + row []types.Datum + Path string + Table table.Table + FieldsInfo *ast.FieldsClause + LinesInfo *ast.LinesClause + IgnoreLines uint64 + Ctx sessionctx.Context + columns []*table.Column } // SetMaxRowsInBatch sets the max number of rows to insert in a batch. @@ -235,6 +236,10 @@ func (e *LoadDataInfo) InsertData(prevData, curData []byte) ([]byte, bool, error curData = nil } + if e.IgnoreLines > 0 { + e.IgnoreLines-- + continue + } cols, err := e.getFieldsFromLine(line) if err != nil { return nil, false, errors.Trace(err) diff --git a/executor/write_test.go b/executor/write_test.go index 30fa336e9d296..a1a77188c0e0d 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -1635,6 +1635,27 @@ func (s *testSuite) TestLoadDataSpecifiedColumns(c *C) { checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL) } +func (s *testSuite) TestLoadDataIgnoreLines(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test; drop table if exists load_data_test;") + tk.MustExec("CREATE TABLE load_data_test (id INT NOT NULL PRIMARY KEY, value TEXT NOT NULL) CHARACTER SET utf8") + tk.MustExec("load data local infile '/tmp/nonexistence.csv' into table load_data_test ignore 1 lines") + ctx := tk.Se.(sessionctx.Context) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + c.Assert(ok, IsTrue) + defer ctx.SetValue(executor.LoadDataVarKey, nil) + c.Assert(ld, NotNil) + ld.IgnoreLines = 1 + // test escape + tests := []testCase{ + {nil, []byte("1\tline1\n2\tline2\n"), []string{"2|line2"}, nil}, + {nil, []byte("1\tline1\n2\tline2\n3\tline3\n"), []string{"2|line2", "3|line3"}, nil}, + } + deleteSQL := "delete from load_data_test" + selectSQL := "select * from load_data_test;" + checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL) +} + func (s *testSuite) TestBatchInsertDelete(c *C) { originLimit := atomic.LoadUint64(&kv.TxnEntryCountLimit) defer func() { diff --git a/parser/parser.y b/parser/parser.y index 1c7c8ca0e3e45..e844369ac3bca 100644 --- a/parser/parser.y +++ b/parser/parser.y @@ -809,6 +809,7 @@ import ( TableOptimizerHintOpt "Table level optimizer hint" TableOptimizerHints "Table level optimizer hints" TableOptimizerHintList "Table level optimizer hint list" + IgnoreLines "Ignore num(int) lines" %type AsOpt "AS or EmptyString" @@ -6883,12 +6884,13 @@ RevokeStmt: * See https://dev.mysql.com/doc/refman/5.7/en/load-data.html *******************************************************************************************/ LoadDataStmt: - "LOAD" "DATA" LocalOpt "INFILE" stringLit "INTO" "TABLE" TableName CharsetOpt Fields Lines ColumnNameListOptWithBrackets + "LOAD" "DATA" LocalOpt "INFILE" stringLit "INTO" "TABLE" TableName CharsetOpt Fields Lines IgnoreLines ColumnNameListOptWithBrackets { x := &ast.LoadDataStmt{ Path: $5, Table: $8.(*ast.TableName), - Columns: $12.([]*ast.ColumnName), + Columns: $13.([]*ast.ColumnName), + IgnoreLines:$12.(uint64), } if $3 != nil { x.IsLocal = true @@ -6902,6 +6904,15 @@ LoadDataStmt: $$ = x } +IgnoreLines: + { + $$ = uint64(0) + } +| "IGNORE" NUM "LINES" + { + $$ = getUint64FromNUM($2) + } + CharsetOpt: {} | "CHARACTER" "SET" CharsetName diff --git a/parser/parser_test.go b/parser/parser_test.go index b74b43f438687..ce846a4cf07be 100644 --- a/parser/parser_test.go +++ b/parser/parser_test.go @@ -389,6 +389,10 @@ func (s *testParserSuite) TestDMLStmt(c *C) { {"load data local infile '/tmp/t.csv' into table t character set utf8 fields terminated by 'ab' lines terminated by 'xy' (a,b)", true}, {"load data local infile '/tmp/t.csv' into table t fields terminated by 'ab' lines terminated by 'xy' (a,b)", true}, {"load data local infile '/tmp/t.csv' into table t (a,b) fields terminated by 'ab'", false}, + {"load data local infile '/tmp/t.csv' into table t ignore 1 lines", true}, + {"load data local infile '/tmp/t.csv' into table t fields terminated by 'ab' enclosed by 'b' (a,b) ignore 1 lines", false}, + {"load data local infile '/tmp/t.csv' into table t lines starting by 'ab' terminated by 'xy' ignore 1 lines", true}, + {"load data local infile '/tmp/t.csv' into table t fields terminated by 'ab' enclosed by 'b' escaped by '*' ignore 1 lines (a,b)", true}, // select for update {"SELECT * from t for update", true}, diff --git a/plan/common_plans.go b/plan/common_plans.go index fafd419a3b62d..75d796efc5803 100644 --- a/plan/common_plans.go +++ b/plan/common_plans.go @@ -381,12 +381,13 @@ type Analyze struct { type LoadData struct { baseSchemaProducer - IsLocal bool - Path string - Table *ast.TableName - Columns []*ast.ColumnName - FieldsInfo *ast.FieldsClause - LinesInfo *ast.LinesClause + IsLocal bool + Path string + Table *ast.TableName + Columns []*ast.ColumnName + FieldsInfo *ast.FieldsClause + LinesInfo *ast.LinesClause + IgnoreLines uint64 GenCols InsertGeneratedColumns } diff --git a/plan/planbuilder.go b/plan/planbuilder.go index 3b64ffcb3f164..78a620a8b283f 100644 --- a/plan/planbuilder.go +++ b/plan/planbuilder.go @@ -1257,12 +1257,13 @@ func (b *planBuilder) buildSelectPlanOfInsert(insert *ast.InsertStmt, insertPlan func (b *planBuilder) buildLoadData(ld *ast.LoadDataStmt) (Plan, error) { p := &LoadData{ - IsLocal: ld.IsLocal, - Path: ld.Path, - Table: ld.Table, - Columns: ld.Columns, - FieldsInfo: ld.FieldsInfo, - LinesInfo: ld.LinesInfo, + IsLocal: ld.IsLocal, + Path: ld.Path, + Table: ld.Table, + Columns: ld.Columns, + FieldsInfo: ld.FieldsInfo, + LinesInfo: ld.LinesInfo, + IgnoreLines: ld.IgnoreLines, } tableInfo := p.Table.TableInfo tableInPlan, ok := b.is.TableByID(tableInfo.ID) From ba95d82977d76c92b963661f0c516f832326170e Mon Sep 17 00:00:00 2001 From: caixiaojian Date: Sat, 1 Sep 2018 17:46:54 +0800 Subject: [PATCH 2/5] remove useless code --- executor/write_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/executor/write_test.go b/executor/write_test.go index a1a77188c0e0d..5a49c1ec07d05 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -1645,7 +1645,6 @@ func (s *testSuite) TestLoadDataIgnoreLines(c *C) { c.Assert(ok, IsTrue) defer ctx.SetValue(executor.LoadDataVarKey, nil) c.Assert(ld, NotNil) - ld.IgnoreLines = 1 // test escape tests := []testCase{ {nil, []byte("1\tline1\n2\tline2\n"), []string{"2|line2"}, nil}, From 594586d635bfa24cc92d6635a4c41049186fa3d9 Mon Sep 17 00:00:00 2001 From: caixiaojian Date: Sun, 2 Sep 2018 17:04:52 +0800 Subject: [PATCH 3/5] remove useless comment --- executor/write_test.go | 1 - parser/parser.y | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/executor/write_test.go b/executor/write_test.go index 5a49c1ec07d05..9427972d83f2e 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -1645,7 +1645,6 @@ func (s *testSuite) TestLoadDataIgnoreLines(c *C) { c.Assert(ok, IsTrue) defer ctx.SetValue(executor.LoadDataVarKey, nil) c.Assert(ld, NotNil) - // test escape tests := []testCase{ {nil, []byte("1\tline1\n2\tline2\n"), []string{"2|line2"}, nil}, {nil, []byte("1\tline1\n2\tline2\n3\tline3\n"), []string{"2|line2", "3|line3"}, nil}, diff --git a/parser/parser.y b/parser/parser.y index e844369ac3bca..216be0791b7dd 100644 --- a/parser/parser.y +++ b/parser/parser.y @@ -802,6 +802,7 @@ import ( OptBinMod "Optional BINARY mode" OptCharset "Optional Character setting" OptCollate "Optional Collate setting" + IgnoreLines "Ignore num(int) lines" NUM "A number" NumList "Some numbers" LengthNum "Field length num(uint64)" @@ -809,7 +810,6 @@ import ( TableOptimizerHintOpt "Table level optimizer hint" TableOptimizerHints "Table level optimizer hints" TableOptimizerHintList "Table level optimizer hint list" - IgnoreLines "Ignore num(int) lines" %type AsOpt "AS or EmptyString" From 0abd9c0971602a97b63f00286aba98facbc1b714 Mon Sep 17 00:00:00 2001 From: xiaojian cai Date: Mon, 3 Sep 2018 15:34:17 +0800 Subject: [PATCH 4/5] keep alignment --- parser/parser.y | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parser/parser.y b/parser/parser.y index 216be0791b7dd..90f1175fef7b4 100644 --- a/parser/parser.y +++ b/parser/parser.y @@ -802,7 +802,7 @@ import ( OptBinMod "Optional BINARY mode" OptCharset "Optional Character setting" OptCollate "Optional Collate setting" - IgnoreLines "Ignore num(int) lines" + IgnoreLines "Ignore num(int) lines" NUM "A number" NumList "Some numbers" LengthNum "Field length num(uint64)" From 51680c45b3cb5dd81fddade39e68072071178ed1 Mon Sep 17 00:00:00 2001 From: xiaojian cai Date: Tue, 4 Sep 2018 21:10:23 +0800 Subject: [PATCH 5/5] add test for nagative num lines --- parser/parser_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/parser/parser_test.go b/parser/parser_test.go index ce846a4cf07be..72a491452326a 100644 --- a/parser/parser_test.go +++ b/parser/parser_test.go @@ -390,6 +390,7 @@ func (s *testParserSuite) TestDMLStmt(c *C) { {"load data local infile '/tmp/t.csv' into table t fields terminated by 'ab' lines terminated by 'xy' (a,b)", true}, {"load data local infile '/tmp/t.csv' into table t (a,b) fields terminated by 'ab'", false}, {"load data local infile '/tmp/t.csv' into table t ignore 1 lines", true}, + {"load data local infile '/tmp/t.csv' into table t ignore -1 lines", false}, {"load data local infile '/tmp/t.csv' into table t fields terminated by 'ab' enclosed by 'b' (a,b) ignore 1 lines", false}, {"load data local infile '/tmp/t.csv' into table t lines starting by 'ab' terminated by 'xy' ignore 1 lines", true}, {"load data local infile '/tmp/t.csv' into table t fields terminated by 'ab' enclosed by 'b' escaped by '*' ignore 1 lines (a,b)", true},