Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support load data with ignore lines #7576

Merged
merged 7 commits into from
Sep 6, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions ast/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,12 +661,13 @@ func (n *Assignment) Accept(v Visitor) (Node, bool) {
type LoadDataStmt struct {
dmlNode

IsLocal bool
Path string
Table *TableName
Columns []*ColumnName
FieldsInfo *FieldsClause
LinesInfo *LinesClause
IsLocal bool
Path string
Table *TableName
Columns []*ColumnName
FieldsInfo *FieldsClause
LinesInfo *LinesClause
IgnoreLines uint64
}

// Accept implements Node Accept interface.
Expand Down
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ func (b *executorBuilder) buildLoadData(v *plan.LoadData) Executor {
Table: tbl,
FieldsInfo: v.FieldsInfo,
LinesInfo: v.LinesInfo,
IgnoreLines: v.IgnoreLines,
Ctx: b.ctx,
columns: columns,
},
Expand Down
2 changes: 2 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,9 @@ type testCase struct {

func checkCases(tests []testCase, ld *executor.LoadDataInfo,
c *C, tk *testkit.TestKit, ctx sessionctx.Context, selectSQL, deleteSQL string) {
origin := ld.IgnoreLines
for _, tt := range tests {
ld.IgnoreLines = origin
c.Assert(ctx.NewTxn(), IsNil)
ctx.GetSessionVars().StmtCtx.DupKeyAsWarning = true
ctx.GetSessionVars().StmtCtx.BadNullAsWarning = true
Expand Down
19 changes: 12 additions & 7 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,14 @@ func (e *LoadDataExec) Open(ctx context.Context) error {
type LoadDataInfo struct {
*InsertValues

row []types.Datum
Path string
Table table.Table
FieldsInfo *ast.FieldsClause
LinesInfo *ast.LinesClause
Ctx sessionctx.Context
columns []*table.Column
row []types.Datum
Path string
Table table.Table
FieldsInfo *ast.FieldsClause
LinesInfo *ast.LinesClause
IgnoreLines uint64
Ctx sessionctx.Context
columns []*table.Column
}

// SetMaxRowsInBatch sets the max number of rows to insert in a batch.
Expand Down Expand Up @@ -235,6 +236,10 @@ func (e *LoadDataInfo) InsertData(prevData, curData []byte) ([]byte, bool, error
curData = nil
}

if e.IgnoreLines > 0 {
e.IgnoreLines--
continue
}
cols, err := e.getFieldsFromLine(line)
if err != nil {
return nil, false, errors.Trace(err)
Expand Down
19 changes: 19 additions & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1635,6 +1635,25 @@ func (s *testSuite) TestLoadDataSpecifiedColumns(c *C) {
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)
}

func (s *testSuite) TestLoadDataIgnoreLines(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test; drop table if exists load_data_test;")
tk.MustExec("CREATE TABLE load_data_test (id INT NOT NULL PRIMARY KEY, value TEXT NOT NULL) CHARACTER SET utf8")
tk.MustExec("load data local infile '/tmp/nonexistence.csv' into table load_data_test ignore 1 lines")
ctx := tk.Se.(sessionctx.Context)
ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo)
c.Assert(ok, IsTrue)
defer ctx.SetValue(executor.LoadDataVarKey, nil)
c.Assert(ld, NotNil)
tests := []testCase{
{nil, []byte("1\tline1\n2\tline2\n"), []string{"2|line2"}, nil},
{nil, []byte("1\tline1\n2\tline2\n3\tline3\n"), []string{"2|line2", "3|line3"}, nil},
}
deleteSQL := "delete from load_data_test"
selectSQL := "select * from load_data_test;"
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)
}

func (s *testSuite) TestBatchInsertDelete(c *C) {
originLimit := atomic.LoadUint64(&kv.TxnEntryCountLimit)
defer func() {
Expand Down
15 changes: 13 additions & 2 deletions parser/parser.y
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,7 @@ import (
OptBinMod "Optional BINARY mode"
OptCharset "Optional Character setting"
OptCollate "Optional Collate setting"
IgnoreLines "Ignore num(int) lines"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep alignment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

NUM "A number"
NumList "Some numbers"
LengthNum "Field length num(uint64)"
Expand Down Expand Up @@ -6883,12 +6884,13 @@ RevokeStmt:
* See https://dev.mysql.com/doc/refman/5.7/en/load-data.html
*******************************************************************************************/
LoadDataStmt:
"LOAD" "DATA" LocalOpt "INFILE" stringLit "INTO" "TABLE" TableName CharsetOpt Fields Lines ColumnNameListOptWithBrackets
"LOAD" "DATA" LocalOpt "INFILE" stringLit "INTO" "TABLE" TableName CharsetOpt Fields Lines IgnoreLines ColumnNameListOptWithBrackets
{
x := &ast.LoadDataStmt{
Path: $5,
Table: $8.(*ast.TableName),
Columns: $12.([]*ast.ColumnName),
Columns: $13.([]*ast.ColumnName),
IgnoreLines:$12.(uint64),
}
if $3 != nil {
x.IsLocal = true
Expand All @@ -6902,6 +6904,15 @@ LoadDataStmt:
$$ = x
}

IgnoreLines:
{
$$ = uint64(0)
}
| "IGNORE" NUM "LINES"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In MySQL, a negative number will casue syntax error. Could you add a test case about this?

mysql> load data local infile '/tmp/t.csv' into table t ignore -1 lines;
ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '-1 lines' at line 1
mysql> load data local infile '/tmp/t.csv' into table t ignore 1 lines;
ERROR 1148 (42000): The used command is not allowed with this MySQL version
mysql> quit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testcase added.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

{
$$ = getUint64FromNUM($2)
}

CharsetOpt:
{}
| "CHARACTER" "SET" CharsetName
Expand Down
4 changes: 4 additions & 0 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,10 @@ func (s *testParserSuite) TestDMLStmt(c *C) {
{"load data local infile '/tmp/t.csv' into table t character set utf8 fields terminated by 'ab' lines terminated by 'xy' (a,b)", true},
{"load data local infile '/tmp/t.csv' into table t fields terminated by 'ab' lines terminated by 'xy' (a,b)", true},
{"load data local infile '/tmp/t.csv' into table t (a,b) fields terminated by 'ab'", false},
{"load data local infile '/tmp/t.csv' into table t ignore 1 lines", true},
{"load data local infile '/tmp/t.csv' into table t fields terminated by 'ab' enclosed by 'b' (a,b) ignore 1 lines", false},
{"load data local infile '/tmp/t.csv' into table t lines starting by 'ab' terminated by 'xy' ignore 1 lines", true},
{"load data local infile '/tmp/t.csv' into table t fields terminated by 'ab' enclosed by 'b' escaped by '*' ignore 1 lines (a,b)", true},

// select for update
{"SELECT * from t for update", true},
Expand Down
13 changes: 7 additions & 6 deletions plan/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,13 @@ type Analyze struct {
type LoadData struct {
baseSchemaProducer

IsLocal bool
Path string
Table *ast.TableName
Columns []*ast.ColumnName
FieldsInfo *ast.FieldsClause
LinesInfo *ast.LinesClause
IsLocal bool
Path string
Table *ast.TableName
Columns []*ast.ColumnName
FieldsInfo *ast.FieldsClause
LinesInfo *ast.LinesClause
IgnoreLines uint64

GenCols InsertGeneratedColumns
}
Expand Down
13 changes: 7 additions & 6 deletions plan/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1257,12 +1257,13 @@ func (b *planBuilder) buildSelectPlanOfInsert(insert *ast.InsertStmt, insertPlan

func (b *planBuilder) buildLoadData(ld *ast.LoadDataStmt) (Plan, error) {
p := &LoadData{
IsLocal: ld.IsLocal,
Path: ld.Path,
Table: ld.Table,
Columns: ld.Columns,
FieldsInfo: ld.FieldsInfo,
LinesInfo: ld.LinesInfo,
IsLocal: ld.IsLocal,
Path: ld.Path,
Table: ld.Table,
Columns: ld.Columns,
FieldsInfo: ld.FieldsInfo,
LinesInfo: ld.LinesInfo,
IgnoreLines: ld.IgnoreLines,
}
tableInfo := p.Table.TableInfo
tableInPlan, ok := b.is.TableByID(tableInfo.ID)
Expand Down