From d8b52b82a860bff0ec857390fc1137e26e2262d0 Mon Sep 17 00:00:00 2001 From: lysu Date: Tue, 23 Oct 2018 18:07:54 +0800 Subject: [PATCH] executor: improve wide table insert & update performance (#7935) --- executor/builder.go | 13 +++++--- executor/insert.go | 10 ++---- executor/insert_common.go | 67 ++++++++++++++++++++++++++------------- executor/load_data.go | 7 ++-- executor/replace.go | 10 ++---- executor/update.go | 10 ++++-- 6 files changed, 70 insertions(+), 47 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index c7c53aae3157c..de3a0b3c60256 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -548,6 +548,11 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor { hasRefCols: v.NeedFillDefaultValue, SelectExec: selectExec, } + err := ivs.initInsertColumns() + if err != nil { + b.err = err + return nil + } if v.IsReplace { return b.buildReplace(ivs) @@ -572,17 +577,16 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor { GenColumns: v.GenCols.Columns, GenExprs: v.GenCols.Exprs, } - tableCols := tbl.Cols() - columns, err := insertVal.getColumns(tableCols) + err := insertVal.initInsertColumns() if err != nil { - b.err = errors.Trace(err) + b.err = err return nil } loadDataExec := &LoadDataExec{ baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID()), IsLocal: v.IsLocal, loadDataInfo: &LoadDataInfo{ - row: make([]types.Datum, len(columns)), + row: make([]types.Datum, len(insertVal.insertColumns)), InsertValues: insertVal, Path: v.Path, Table: tbl, @@ -590,7 +594,6 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor { LinesInfo: v.LinesInfo, IgnoreLines: v.IgnoreLines, Ctx: b.ctx, - columns: columns, }, } diff --git a/executor/insert.go b/executor/insert.go index 932a2c09ce326..a62a2524c8003 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -129,15 +129,10 @@ func (e *InsertExec) batchUpdateDupRows(newRows [][]types.Datum) error { // Next implements Exec Next interface. func (e *InsertExec) Next(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() - cols, err := e.getColumns(e.Table.Cols()) - if err != nil { - return errors.Trace(err) - } - if len(e.children) > 0 && e.children[0] != nil { - return errors.Trace(e.insertRowsFromSelect(ctx, cols, e.exec)) + return e.insertRowsFromSelect(ctx, e.exec) } - return errors.Trace(e.insertRows(cols, e.exec)) + return e.insertRows(e.exec) } // Close implements the Executor Close interface. @@ -154,6 +149,7 @@ func (e *InsertExec) Open(ctx context.Context) error { if e.SelectExec != nil { return e.SelectExec.Open(ctx) } + e.initEvalBuffer() return nil } diff --git a/executor/insert_common.go b/executor/insert_common.go index 8e53f899d72c5..a76b2f5272e96 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -50,9 +50,13 @@ type InsertValues struct { GenColumns []*ast.ColumnName GenExprs []expression.Expression + insertColumns []*table.Column + // colDefaultVals is used to store casted default value. // Because not every insert statement needs colDefaultVals, so we will init the buffer lazily. - colDefaultVals []defaultVal + colDefaultVals []defaultVal + evalBuffer chunk.MutRow + evalBufferTypes []*types.FieldType } type defaultVal struct { @@ -61,16 +65,18 @@ type defaultVal struct { valid bool } -// getColumns gets the explicitly specified columns of an insert statement. There are three cases: +// initInsertColumns sets the explicitly specified columns of an insert statement. There are three cases: // There are three types of insert statements: // 1 insert ... values(...) --> name type column // 2 insert ... set x=y... --> set type column // 3 insert ... (select ..) --> name type column // See https://dev.mysql.com/doc/refman/5.7/en/insert.html -func (e *InsertValues) getColumns(tableCols []*table.Column) ([]*table.Column, error) { +func (e *InsertValues) initInsertColumns() error { var cols []*table.Column var err error + tableCols := e.Table.Cols() + if len(e.SetList) > 0 { // Process `set` type column. columns := make([]string, 0, len(e.SetList)) @@ -82,10 +88,10 @@ func (e *InsertValues) getColumns(tableCols []*table.Column) ([]*table.Column, e } cols, err = table.FindCols(tableCols, columns, e.Table.Meta().PKIsHandle) if err != nil { - return nil, errors.Errorf("INSERT INTO %s: %s", e.Table.Meta().Name.O, err) + return errors.Errorf("INSERT INTO %s: %s", e.Table.Meta().Name.O, err) } if len(cols) == 0 { - return nil, errors.Errorf("INSERT INTO %s: empty column", e.Table.Meta().Name.O) + return errors.Errorf("INSERT INTO %s: empty column", e.Table.Meta().Name.O) } } else if len(e.Columns) > 0 { // Process `name` type column. @@ -98,7 +104,7 @@ func (e *InsertValues) getColumns(tableCols []*table.Column) ([]*table.Column, e } cols, err = table.FindCols(tableCols, columns, e.Table.Meta().PKIsHandle) if err != nil { - return nil, errors.Errorf("INSERT INTO %s: %s", e.Table.Meta().Name.O, err) + return errors.Errorf("INSERT INTO %s: %s", e.Table.Meta().Name.O, err) } } else { // If e.Columns are empty, use all columns instead. @@ -114,10 +120,25 @@ func (e *InsertValues) getColumns(tableCols []*table.Column) ([]*table.Column, e // Check column whether is specified only once. err = table.CheckOnce(cols) if err != nil { - return nil, errors.Trace(err) + return err } + e.insertColumns = cols + return nil +} - return cols, nil +func (e *InsertValues) initEvalBuffer() { + numCols := len(e.Table.Cols()) + if e.hasExtraHandle { + numCols++ + } + e.evalBufferTypes = make([]*types.FieldType, numCols) + for i, col := range e.Table.Cols() { + e.evalBufferTypes[i] = &col.FieldType + } + if e.hasExtraHandle { + e.evalBufferTypes[len(e.evalBufferTypes)-1] = types.NewFieldType(mysql.TypeLonglong) + } + e.evalBuffer = chunk.MutRowFromTypes(e.evalBufferTypes) } func (e *InsertValues) lazilyInitColDefaultValBuf() (ok bool) { @@ -150,7 +171,7 @@ func (e *InsertValues) processSetList() error { } // insertRows processes `insert|replace into values ()` or `insert|replace into set x=y` -func (e *InsertValues) insertRows(cols []*table.Column, exec func(rows [][]types.Datum) error) (err error) { +func (e *InsertValues) insertRows(exec func(rows [][]types.Datum) error) (err error) { // For `insert|replace into set x=y`, process the set list here. if err = e.processSetList(); err != nil { return errors.Trace(err) @@ -159,7 +180,7 @@ func (e *InsertValues) insertRows(cols []*table.Column, exec func(rows [][]types rows := make([][]types.Datum, 0, len(e.Lists)) for i, list := range e.Lists { e.rowCount++ - row, err := e.evalRow(cols, list, i) + row, err := e.evalRow(list, i) if err != nil { return errors.Trace(err) } @@ -189,7 +210,7 @@ func (e *InsertValues) handleErr(col *table.Column, val *types.Datum, rowIdx int // evalRow evaluates a to-be-inserted row. The value of the column may base on another column, // so we use setValueForRefColumn to fill the empty row some default values when needFillDefaultValues is true. -func (e *InsertValues) evalRow(cols []*table.Column, list []expression.Expression, rowIdx int) ([]types.Datum, error) { +func (e *InsertValues) evalRow(list []expression.Expression, rowIdx int) ([]types.Datum, error) { rowLen := len(e.Table.Cols()) if e.hasExtraHandle { rowLen++ @@ -204,18 +225,20 @@ func (e *InsertValues) evalRow(cols []*table.Column, list []expression.Expressio } } + e.evalBuffer.SetDatums(row...) for i, expr := range list { - val, err := expr.Eval(chunk.MutRowFromDatums(row).ToRow()) - if err = e.handleErr(cols[i], &val, rowIdx, err); err != nil { + val, err := expr.Eval(e.evalBuffer.ToRow()) + if err = e.handleErr(e.insertColumns[i], &val, rowIdx, err); err != nil { return nil, errors.Trace(err) } - val1, err := table.CastValue(e.ctx, val, cols[i].ToInfo()) - if err = e.handleErr(cols[i], &val, rowIdx, err); err != nil { + val1, err := table.CastValue(e.ctx, val, e.insertColumns[i].ToInfo()) + if err = e.handleErr(e.insertColumns[i], &val, rowIdx, err); err != nil { return nil, errors.Trace(err) } - offset := cols[i].Offset - row[offset], hasValue[offset] = val1, true + offset := e.insertColumns[i].Offset + row[offset], hasValue[offset] = *val1.Copy(), true + e.evalBuffer.SetDatum(offset, val1) } return e.fillRow(row, hasValue) @@ -251,7 +274,7 @@ func (e *InsertValues) setValueForRefColumn(row []types.Datum, hasValue []bool) return nil } -func (e *InsertValues) insertRowsFromSelect(ctx context.Context, cols []*table.Column, exec func(rows [][]types.Datum) error) error { +func (e *InsertValues) insertRowsFromSelect(ctx context.Context, exec func(rows [][]types.Datum) error) error { // process `insert|replace into ... select ... from ...` selectExec := e.children[0] fields := selectExec.retTypes() @@ -275,7 +298,7 @@ func (e *InsertValues) insertRowsFromSelect(ctx context.Context, cols []*table.C for innerChunkRow := iter.Begin(); innerChunkRow != iter.End(); innerChunkRow = iter.Next() { innerRow := types.CopyRow(innerChunkRow.GetDatumRow(fields)) e.rowCount++ - row, err := e.getRow(cols, innerRow) + row, err := e.getRow(innerRow) if err != nil { return errors.Trace(err) } @@ -305,16 +328,16 @@ func (e *InsertValues) insertRowsFromSelect(ctx context.Context, cols []*table.C // getRow gets the row which from `insert into select from` or `load data`. // The input values from these two statements are datums instead of // expressions which are used in `insert into set x=y`. -func (e *InsertValues) getRow(cols []*table.Column, vals []types.Datum) ([]types.Datum, error) { +func (e *InsertValues) getRow(vals []types.Datum) ([]types.Datum, error) { row := make([]types.Datum, len(e.Table.Cols())) hasValue := make([]bool, len(e.Table.Cols())) for i, v := range vals { - casted, err := table.CastValue(e.ctx, v, cols[i].ToInfo()) + casted, err := table.CastValue(e.ctx, v, e.insertColumns[i].ToInfo()) if e.filterErr(err) != nil { return nil, errors.Trace(err) } - offset := cols[i].Offset + offset := e.insertColumns[i].Offset row[offset] = casted hasValue[offset] = true } diff --git a/executor/load_data.go b/executor/load_data.go index d6ee3724a3a74..83171cbbd8589 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -44,7 +44,6 @@ func NewLoadDataInfo(ctx sessionctx.Context, row []types.Datum, tbl table.Table, InsertValues: insertVal, Table: tbl, Ctx: ctx, - columns: cols, } } @@ -81,6 +80,9 @@ func (e *LoadDataExec) Close() error { // Open implements the Executor Open interface. func (e *LoadDataExec) Open(ctx context.Context) error { + if e.loadDataInfo.insertColumns != nil { + e.loadDataInfo.initEvalBuffer() + } return nil } @@ -95,7 +97,6 @@ type LoadDataInfo struct { LinesInfo *ast.LinesClause IgnoreLines uint64 Ctx sessionctx.Context - columns []*table.Column } // SetMaxRowsInBatch sets the max number of rows to insert in a batch. @@ -274,7 +275,7 @@ func (e *LoadDataInfo) colsToRow(cols []field) []types.Datum { e.row[i].SetString(string(cols[i].str)) } } - row, err := e.getRow(e.columns, e.row) + row, err := e.getRow(e.row) if err != nil { e.handleWarning(err, fmt.Sprintf("Load Data: insert data:%v failed:%v", e.row, errors.ErrorStack(err))) diff --git a/executor/replace.go b/executor/replace.go index dbe79cfde4817..bb06f2fec0455 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -42,6 +42,7 @@ func (e *ReplaceExec) Open(ctx context.Context) error { if e.SelectExec != nil { return e.SelectExec.Open(ctx) } + e.initEvalBuffer() return nil } @@ -178,13 +179,8 @@ func (e *ReplaceExec) exec(newRows [][]types.Datum) error { // Next implements the Executor Next interface. func (e *ReplaceExec) Next(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() - cols, err := e.getColumns(e.Table.Cols()) - if err != nil { - return errors.Trace(err) - } - if len(e.children) > 0 && e.children[0] != nil { - return errors.Trace(e.insertRowsFromSelect(ctx, cols, e.exec)) + return e.insertRowsFromSelect(ctx, e.exec) } - return errors.Trace(e.insertRows(cols, e.exec)) + return e.insertRows(e.exec) } diff --git a/executor/update.go b/executor/update.go index de60b92ab8a01..dc04ef5f505f3 100644 --- a/executor/update.go +++ b/executor/update.go @@ -42,6 +42,7 @@ type UpdateExec struct { // columns2Handle stores relationship between column ordinal to its table handle. // the columns ordinals is present in ordinal range format, @see executor.cols2Handle columns2Handle cols2HandleSlice + evalBuffer chunk.MutRow } func (e *UpdateExec) exec(schema *expression.Schema) ([]types.Datum, error) { @@ -141,6 +142,7 @@ func (e *UpdateExec) fetchChunkRows(ctx context.Context) error { fields := e.children[0].retTypes() globalRowIdx := 0 chk := e.children[0].newFirstChunk() + e.evalBuffer = chunk.MutRowFromTypes(fields) for { err := e.children[0].Next(ctx, chk) if err != nil { @@ -181,17 +183,19 @@ func (e *UpdateExec) handleErr(colName model.CIStr, rowIdx int, err error) error func (e *UpdateExec) composeNewRow(rowIdx int, oldRow []types.Datum) ([]types.Datum, error) { newRowData := types.CopyRow(oldRow) + e.evalBuffer.SetDatums(newRowData...) for _, assign := range e.OrderedList { handleIdx, handleFound := e.columns2Handle.findHandle(int32(assign.Col.Index)) if handleFound && e.canNotUpdate(oldRow[handleIdx]) { continue } - val, err := assign.Expr.Eval(chunk.MutRowFromDatums(newRowData).ToRow()) + val, err := assign.Expr.Eval(e.evalBuffer.ToRow()) if err1 := e.handleErr(assign.Col.ColName, rowIdx, err); err1 != nil { - return nil, errors.Trace(err1) + return nil, err1 } - newRowData[assign.Col.Index] = val + newRowData[assign.Col.Index] = *val.Copy() + e.evalBuffer.SetDatum(assign.Col.Index, val) } return newRowData, nil }