Skip to content

Commit

Permalink
executor: improve wide table insert & update performance (#7935)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and ngaut committed Oct 23, 2018
1 parent 1434fa9 commit d8b52b8
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 47 deletions.
13 changes: 8 additions & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,11 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
hasRefCols: v.NeedFillDefaultValue,
SelectExec: selectExec,
}
err := ivs.initInsertColumns()
if err != nil {
b.err = err
return nil
}

if v.IsReplace {
return b.buildReplace(ivs)
Expand All @@ -572,25 +577,23 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
GenColumns: v.GenCols.Columns,
GenExprs: v.GenCols.Exprs,
}
tableCols := tbl.Cols()
columns, err := insertVal.getColumns(tableCols)
err := insertVal.initInsertColumns()
if err != nil {
b.err = errors.Trace(err)
b.err = err
return nil
}
loadDataExec := &LoadDataExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID()),
IsLocal: v.IsLocal,
loadDataInfo: &LoadDataInfo{
row: make([]types.Datum, len(columns)),
row: make([]types.Datum, len(insertVal.insertColumns)),
InsertValues: insertVal,
Path: v.Path,
Table: tbl,
FieldsInfo: v.FieldsInfo,
LinesInfo: v.LinesInfo,
IgnoreLines: v.IgnoreLines,
Ctx: b.ctx,
columns: columns,
},
}

Expand Down
10 changes: 3 additions & 7 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,10 @@ func (e *InsertExec) batchUpdateDupRows(newRows [][]types.Datum) error {
// Next implements Exec Next interface.
func (e *InsertExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
cols, err := e.getColumns(e.Table.Cols())
if err != nil {
return errors.Trace(err)
}

if len(e.children) > 0 && e.children[0] != nil {
return errors.Trace(e.insertRowsFromSelect(ctx, cols, e.exec))
return e.insertRowsFromSelect(ctx, e.exec)
}
return errors.Trace(e.insertRows(cols, e.exec))
return e.insertRows(e.exec)
}

// Close implements the Executor Close interface.
Expand All @@ -154,6 +149,7 @@ func (e *InsertExec) Open(ctx context.Context) error {
if e.SelectExec != nil {
return e.SelectExec.Open(ctx)
}
e.initEvalBuffer()
return nil
}

Expand Down
67 changes: 45 additions & 22 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,13 @@ type InsertValues struct {
GenColumns []*ast.ColumnName
GenExprs []expression.Expression

insertColumns []*table.Column

// colDefaultVals is used to store casted default value.
// Because not every insert statement needs colDefaultVals, so we will init the buffer lazily.
colDefaultVals []defaultVal
colDefaultVals []defaultVal
evalBuffer chunk.MutRow
evalBufferTypes []*types.FieldType
}

type defaultVal struct {
Expand All @@ -61,16 +65,18 @@ type defaultVal struct {
valid bool
}

// getColumns gets the explicitly specified columns of an insert statement. There are three cases:
// initInsertColumns sets the explicitly specified columns of an insert statement. There are three cases:
// There are three types of insert statements:
// 1 insert ... values(...) --> name type column
// 2 insert ... set x=y... --> set type column
// 3 insert ... (select ..) --> name type column
// See https://dev.mysql.com/doc/refman/5.7/en/insert.html
func (e *InsertValues) getColumns(tableCols []*table.Column) ([]*table.Column, error) {
func (e *InsertValues) initInsertColumns() error {
var cols []*table.Column
var err error

tableCols := e.Table.Cols()

if len(e.SetList) > 0 {
// Process `set` type column.
columns := make([]string, 0, len(e.SetList))
Expand All @@ -82,10 +88,10 @@ func (e *InsertValues) getColumns(tableCols []*table.Column) ([]*table.Column, e
}
cols, err = table.FindCols(tableCols, columns, e.Table.Meta().PKIsHandle)
if err != nil {
return nil, errors.Errorf("INSERT INTO %s: %s", e.Table.Meta().Name.O, err)
return errors.Errorf("INSERT INTO %s: %s", e.Table.Meta().Name.O, err)
}
if len(cols) == 0 {
return nil, errors.Errorf("INSERT INTO %s: empty column", e.Table.Meta().Name.O)
return errors.Errorf("INSERT INTO %s: empty column", e.Table.Meta().Name.O)
}
} else if len(e.Columns) > 0 {
// Process `name` type column.
Expand All @@ -98,7 +104,7 @@ func (e *InsertValues) getColumns(tableCols []*table.Column) ([]*table.Column, e
}
cols, err = table.FindCols(tableCols, columns, e.Table.Meta().PKIsHandle)
if err != nil {
return nil, errors.Errorf("INSERT INTO %s: %s", e.Table.Meta().Name.O, err)
return errors.Errorf("INSERT INTO %s: %s", e.Table.Meta().Name.O, err)
}
} else {
// If e.Columns are empty, use all columns instead.
Expand All @@ -114,10 +120,25 @@ func (e *InsertValues) getColumns(tableCols []*table.Column) ([]*table.Column, e
// Check column whether is specified only once.
err = table.CheckOnce(cols)
if err != nil {
return nil, errors.Trace(err)
return err
}
e.insertColumns = cols
return nil
}

return cols, nil
func (e *InsertValues) initEvalBuffer() {
numCols := len(e.Table.Cols())
if e.hasExtraHandle {
numCols++
}
e.evalBufferTypes = make([]*types.FieldType, numCols)
for i, col := range e.Table.Cols() {
e.evalBufferTypes[i] = &col.FieldType
}
if e.hasExtraHandle {
e.evalBufferTypes[len(e.evalBufferTypes)-1] = types.NewFieldType(mysql.TypeLonglong)
}
e.evalBuffer = chunk.MutRowFromTypes(e.evalBufferTypes)
}

func (e *InsertValues) lazilyInitColDefaultValBuf() (ok bool) {
Expand Down Expand Up @@ -150,7 +171,7 @@ func (e *InsertValues) processSetList() error {
}

// insertRows processes `insert|replace into values ()` or `insert|replace into set x=y`
func (e *InsertValues) insertRows(cols []*table.Column, exec func(rows [][]types.Datum) error) (err error) {
func (e *InsertValues) insertRows(exec func(rows [][]types.Datum) error) (err error) {
// For `insert|replace into set x=y`, process the set list here.
if err = e.processSetList(); err != nil {
return errors.Trace(err)
Expand All @@ -159,7 +180,7 @@ func (e *InsertValues) insertRows(cols []*table.Column, exec func(rows [][]types
rows := make([][]types.Datum, 0, len(e.Lists))
for i, list := range e.Lists {
e.rowCount++
row, err := e.evalRow(cols, list, i)
row, err := e.evalRow(list, i)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -189,7 +210,7 @@ func (e *InsertValues) handleErr(col *table.Column, val *types.Datum, rowIdx int

// evalRow evaluates a to-be-inserted row. The value of the column may base on another column,
// so we use setValueForRefColumn to fill the empty row some default values when needFillDefaultValues is true.
func (e *InsertValues) evalRow(cols []*table.Column, list []expression.Expression, rowIdx int) ([]types.Datum, error) {
func (e *InsertValues) evalRow(list []expression.Expression, rowIdx int) ([]types.Datum, error) {
rowLen := len(e.Table.Cols())
if e.hasExtraHandle {
rowLen++
Expand All @@ -204,18 +225,20 @@ func (e *InsertValues) evalRow(cols []*table.Column, list []expression.Expressio
}
}

e.evalBuffer.SetDatums(row...)
for i, expr := range list {
val, err := expr.Eval(chunk.MutRowFromDatums(row).ToRow())
if err = e.handleErr(cols[i], &val, rowIdx, err); err != nil {
val, err := expr.Eval(e.evalBuffer.ToRow())
if err = e.handleErr(e.insertColumns[i], &val, rowIdx, err); err != nil {
return nil, errors.Trace(err)
}
val1, err := table.CastValue(e.ctx, val, cols[i].ToInfo())
if err = e.handleErr(cols[i], &val, rowIdx, err); err != nil {
val1, err := table.CastValue(e.ctx, val, e.insertColumns[i].ToInfo())
if err = e.handleErr(e.insertColumns[i], &val, rowIdx, err); err != nil {
return nil, errors.Trace(err)
}

offset := cols[i].Offset
row[offset], hasValue[offset] = val1, true
offset := e.insertColumns[i].Offset
row[offset], hasValue[offset] = *val1.Copy(), true
e.evalBuffer.SetDatum(offset, val1)
}

return e.fillRow(row, hasValue)
Expand Down Expand Up @@ -251,7 +274,7 @@ func (e *InsertValues) setValueForRefColumn(row []types.Datum, hasValue []bool)
return nil
}

func (e *InsertValues) insertRowsFromSelect(ctx context.Context, cols []*table.Column, exec func(rows [][]types.Datum) error) error {
func (e *InsertValues) insertRowsFromSelect(ctx context.Context, exec func(rows [][]types.Datum) error) error {
// process `insert|replace into ... select ... from ...`
selectExec := e.children[0]
fields := selectExec.retTypes()
Expand All @@ -275,7 +298,7 @@ func (e *InsertValues) insertRowsFromSelect(ctx context.Context, cols []*table.C
for innerChunkRow := iter.Begin(); innerChunkRow != iter.End(); innerChunkRow = iter.Next() {
innerRow := types.CopyRow(innerChunkRow.GetDatumRow(fields))
e.rowCount++
row, err := e.getRow(cols, innerRow)
row, err := e.getRow(innerRow)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -305,16 +328,16 @@ func (e *InsertValues) insertRowsFromSelect(ctx context.Context, cols []*table.C
// getRow gets the row which from `insert into select from` or `load data`.
// The input values from these two statements are datums instead of
// expressions which are used in `insert into set x=y`.
func (e *InsertValues) getRow(cols []*table.Column, vals []types.Datum) ([]types.Datum, error) {
func (e *InsertValues) getRow(vals []types.Datum) ([]types.Datum, error) {
row := make([]types.Datum, len(e.Table.Cols()))
hasValue := make([]bool, len(e.Table.Cols()))
for i, v := range vals {
casted, err := table.CastValue(e.ctx, v, cols[i].ToInfo())
casted, err := table.CastValue(e.ctx, v, e.insertColumns[i].ToInfo())
if e.filterErr(err) != nil {
return nil, errors.Trace(err)
}

offset := cols[i].Offset
offset := e.insertColumns[i].Offset
row[offset] = casted
hasValue[offset] = true
}
Expand Down
7 changes: 4 additions & 3 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func NewLoadDataInfo(ctx sessionctx.Context, row []types.Datum, tbl table.Table,
InsertValues: insertVal,
Table: tbl,
Ctx: ctx,
columns: cols,
}
}

Expand Down Expand Up @@ -81,6 +80,9 @@ func (e *LoadDataExec) Close() error {

// Open implements the Executor Open interface.
func (e *LoadDataExec) Open(ctx context.Context) error {
if e.loadDataInfo.insertColumns != nil {
e.loadDataInfo.initEvalBuffer()
}
return nil
}

Expand All @@ -95,7 +97,6 @@ type LoadDataInfo struct {
LinesInfo *ast.LinesClause
IgnoreLines uint64
Ctx sessionctx.Context
columns []*table.Column
}

// SetMaxRowsInBatch sets the max number of rows to insert in a batch.
Expand Down Expand Up @@ -274,7 +275,7 @@ func (e *LoadDataInfo) colsToRow(cols []field) []types.Datum {
e.row[i].SetString(string(cols[i].str))
}
}
row, err := e.getRow(e.columns, e.row)
row, err := e.getRow(e.row)
if err != nil {
e.handleWarning(err,
fmt.Sprintf("Load Data: insert data:%v failed:%v", e.row, errors.ErrorStack(err)))
Expand Down
10 changes: 3 additions & 7 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (e *ReplaceExec) Open(ctx context.Context) error {
if e.SelectExec != nil {
return e.SelectExec.Open(ctx)
}
e.initEvalBuffer()
return nil
}

Expand Down Expand Up @@ -178,13 +179,8 @@ func (e *ReplaceExec) exec(newRows [][]types.Datum) error {
// Next implements the Executor Next interface.
func (e *ReplaceExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
cols, err := e.getColumns(e.Table.Cols())
if err != nil {
return errors.Trace(err)
}

if len(e.children) > 0 && e.children[0] != nil {
return errors.Trace(e.insertRowsFromSelect(ctx, cols, e.exec))
return e.insertRowsFromSelect(ctx, e.exec)
}
return errors.Trace(e.insertRows(cols, e.exec))
return e.insertRows(e.exec)
}
10 changes: 7 additions & 3 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type UpdateExec struct {
// columns2Handle stores relationship between column ordinal to its table handle.
// the columns ordinals is present in ordinal range format, @see executor.cols2Handle
columns2Handle cols2HandleSlice
evalBuffer chunk.MutRow
}

func (e *UpdateExec) exec(schema *expression.Schema) ([]types.Datum, error) {
Expand Down Expand Up @@ -141,6 +142,7 @@ func (e *UpdateExec) fetchChunkRows(ctx context.Context) error {
fields := e.children[0].retTypes()
globalRowIdx := 0
chk := e.children[0].newFirstChunk()
e.evalBuffer = chunk.MutRowFromTypes(fields)
for {
err := e.children[0].Next(ctx, chk)
if err != nil {
Expand Down Expand Up @@ -181,17 +183,19 @@ func (e *UpdateExec) handleErr(colName model.CIStr, rowIdx int, err error) error

func (e *UpdateExec) composeNewRow(rowIdx int, oldRow []types.Datum) ([]types.Datum, error) {
newRowData := types.CopyRow(oldRow)
e.evalBuffer.SetDatums(newRowData...)
for _, assign := range e.OrderedList {
handleIdx, handleFound := e.columns2Handle.findHandle(int32(assign.Col.Index))
if handleFound && e.canNotUpdate(oldRow[handleIdx]) {
continue
}
val, err := assign.Expr.Eval(chunk.MutRowFromDatums(newRowData).ToRow())
val, err := assign.Expr.Eval(e.evalBuffer.ToRow())

if err1 := e.handleErr(assign.Col.ColName, rowIdx, err); err1 != nil {
return nil, errors.Trace(err1)
return nil, err1
}
newRowData[assign.Col.Index] = val
newRowData[assign.Col.Index] = *val.Copy()
e.evalBuffer.SetDatum(assign.Col.Index, val)
}
return newRowData, nil
}
Expand Down

0 comments on commit d8b52b8

Please sign in to comment.