Skip to content

Commit

Permalink
*: use chunk grow for simple executor (#7540)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and zz-jason committed Sep 27, 2018
1 parent 081920d commit 05b37de
Show file tree
Hide file tree
Showing 38 changed files with 194 additions and 140 deletions.
2 changes: 1 addition & 1 deletion ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (d *ddlCtx) GetTableMaxRowID(startTS uint64, tbl table.PhysicalTable) (maxR
}
defer terror.Call(result.Close)

chk := chunk.NewChunkWithCapacity(getColumnsTypes(columns), 1)
chk := chunk.New(getColumnsTypes(columns), 1, 1)
err = result.Next(ctx, chk)
if err != nil {
return maxRowID, false, errors.Trace(err)
Expand Down
8 changes: 4 additions & 4 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *testSuite) TestSelectNormal(c *C) {
response.Fetch(context.TODO())

// Test Next.
chk := chunk.NewChunkWithCapacity(colTypes, 32)
chk := chunk.New(colTypes, 32, 32)
numAllRows := 0
for {
err = response.Next(context.TODO(), chk)
Expand Down Expand Up @@ -122,7 +122,7 @@ func (s *testSuite) TestSelectStreaming(c *C) {
response.Fetch(context.TODO())

// Test Next.
chk := chunk.NewChunkWithCapacity(colTypes, 32)
chk := chunk.New(colTypes, 32, 32)
numAllRows := 0
for {
err = response.Next(context.TODO(), chk)
Expand Down Expand Up @@ -259,7 +259,7 @@ func BenchmarkReadRowsData(b *testing.B) {
for i := 0; i < numCols; i++ {
colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong}
}
chk := chunk.NewChunkWithCapacity(colTypes, numRows)
chk := chunk.New(colTypes, numRows, numRows)

buffer := populateBuffer()

Expand All @@ -277,7 +277,7 @@ func BenchmarkDecodeToChunk(b *testing.B) {
for i := 0; i < numCols; i++ {
colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong}
}
chk := chunk.NewChunkWithCapacity(colTypes, numRows)
chk := chunk.New(colTypes, numRows, numRows)

for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ {
for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ {
Expand Down
4 changes: 2 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (a *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error {

// NewChunk create a new chunk using NewChunk function in chunk package.
func (a *recordSet) NewChunk() *chunk.Chunk {
return a.executor.newChunk()
return a.executor.newFirstChunk()
}

func (a *recordSet) Close() error {
Expand Down Expand Up @@ -270,7 +270,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
a.logSlowQuery(txnTS, err == nil)
}()

err = e.Next(ctx, e.newChunk())
err = e.Next(ctx, e.newFirstChunk())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error {
FieldType: *colTypeForHandle,
})

e.srcChunk = e.newChunk()
e.srcChunk = e.newFirstChunk()
dagPB, err := e.buildDAGPB()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -197,7 +197,7 @@ func (e *RecoverIndexExec) Open(ctx context.Context) error {
return errors.Trace(err)
}

e.srcChunk = chunk.NewChunkWithCapacity(e.columnsTypes(), e.maxChunkSize)
e.srcChunk = chunk.New(e.columnsTypes(), e.initCap, e.maxChunkSize)
e.batchSize = 2048
e.recoverRows = make([]recoverRows, 0, e.batchSize)
e.idxValsBufs = make([][]types.Datum, e.batchSize)
Expand Down Expand Up @@ -636,7 +636,7 @@ func (e *CleanupIndexExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.idxChunk = chunk.NewChunkWithCapacity(e.getIdxColTypes(), e.maxChunkSize)
e.idxChunk = chunk.New(e.getIdxColTypes(), e.initCap, e.maxChunkSize)
e.idxValues = make(map[int64][][]types.Datum, e.batchSize)
e.batchKeys = make([]kv.Key, 0, e.batchSize)
e.idxValsBufs = make([][]types.Datum, e.batchSize)
Expand Down
10 changes: 5 additions & 5 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (e *HashAggExec) initForUnparallelExec() {
e.partialResultMap = make(aggPartialResultMapper, 0)
e.groupKeyBuffer = make([]byte, 0, 8)
e.groupValDatums = make([]types.Datum, 0, len(e.groupKeyBuffer))
e.childResult = e.children[0].newChunk()
e.childResult = e.children[0].newFirstChunk()
}

func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
Expand Down Expand Up @@ -270,12 +270,12 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
partialResultsMap: make(aggPartialResultMapper, 0),
groupByItems: e.GroupByItems,
groupValDatums: make([]types.Datum, 0, len(e.GroupByItems)),
chk: e.children[0].newChunk(),
chk: e.children[0].newFirstChunk(),
}

e.partialWorkers[i] = w
e.inputCh <- &HashAggInput{
chk: e.children[0].newChunk(),
chk: e.children[0].newFirstChunk(),
giveBackCh: w.inputCh,
}
}
Expand All @@ -292,7 +292,7 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
rowBuffer: make([]types.Datum, 0, e.Schema().Len()),
mutableRow: chunk.MutRowFromTypes(e.retTypes()),
}
e.finalWorkers[i].finalResultHolderCh <- e.newChunk()
e.finalWorkers[i].finalResultHolderCh <- e.newFirstChunk()
}
}

Expand Down Expand Up @@ -734,7 +734,7 @@ func (e *StreamAggExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.childResult = e.children[0].newChunk()
e.childResult = e.children[0].newFirstChunk()
e.executed = false
e.isChildReturnEmpty = true
e.inputIter = chunk.NewIterator4Chunk(e.childResult)
Expand Down
57 changes: 39 additions & 18 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"time"

"github.com/cznic/mathutil"
"github.com/cznic/sortutil"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/distsql"
Expand Down Expand Up @@ -398,8 +399,10 @@ func (b *executorBuilder) buildChecksumTable(v *plannercore.ChecksumTable) Execu
}

func (b *executorBuilder) buildDeallocate(v *plannercore.Deallocate) Executor {
base := newBaseExecutor(b.ctx, nil, v.ExplainID())
base.initCap = chunk.ZeroCapacity
e := &DeallocateExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID()),
baseExecutor: base,
Name: v.Name,
}
return e
Expand Down Expand Up @@ -431,17 +434,22 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor {
b.err = errors.Trace(b.err)
return nil
}
n := int(mathutil.MinUint64(v.Count, uint64(b.ctx.GetSessionVars().MaxChunkSize)))
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec)
base.initCap = n
e := &LimitExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec),
baseExecutor: base,
begin: v.Offset,
end: v.Offset + v.Count,
}
return e
}

func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor {
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = chunk.ZeroCapacity
e := &PrepareExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
baseExecutor: base,
is: b.is,
name: v.Name,
sqlText: v.SQLText,
Expand Down Expand Up @@ -495,17 +503,21 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor {
case *ast.RevokeStmt:
return b.buildRevoke(s)
}
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = chunk.ZeroCapacity
e := &SimpleExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
baseExecutor: base,
Statement: v.Statement,
is: b.is,
}
return e
}

func (b *executorBuilder) buildSet(v *plannercore.Set) Executor {
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = chunk.ZeroCapacity
e := &SetExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
baseExecutor: base,
vars: v.VarAssigns,
}
return e
Expand All @@ -523,6 +535,7 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
} else {
baseExec = newBaseExecutor(b.ctx, nil, v.ExplainID())
}
baseExec.initCap = chunk.ZeroCapacity

ivs := &InsertValues{
baseExecutor: baseExec,
Expand Down Expand Up @@ -614,12 +627,13 @@ func (b *executorBuilder) buildGrant(grant *ast.GrantStmt) Executor {

func (b *executorBuilder) buildRevoke(revoke *ast.RevokeStmt) Executor {
e := &RevokeExec{
ctx: b.ctx,
Privs: revoke.Privs,
ObjectType: revoke.ObjectType,
Level: revoke.Level,
Users: revoke.Users,
is: b.is,
baseExecutor: newBaseExecutor(b.ctx, nil, "RevokeStmt"),
ctx: b.ctx,
Privs: revoke.Privs,
ObjectType: revoke.ObjectType,
Level: revoke.Level,
Users: revoke.Users,
is: b.is,
}
return e
}
Expand Down Expand Up @@ -1091,8 +1105,10 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu
b.err = errors.Errorf("buildTableDual failed, invalid row count for dual table: %v", v.RowCount)
return nil
}
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = v.RowCount
e := &TableDualExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
baseExecutor: base,
numDualRows: v.RowCount,
}
// Init the startTS for later use.
Expand Down Expand Up @@ -1209,9 +1225,10 @@ func (b *executorBuilder) buildMaxOneRow(v *plannercore.PhysicalMaxOneRow) Execu
b.err = errors.Trace(b.err)
return nil
}
e := &MaxOneRowExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec),
}
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec)
base.initCap = 2
base.maxChunkSize = 2
e := &MaxOneRowExec{baseExecutor: base}
return e
}

Expand Down Expand Up @@ -1241,8 +1258,10 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
return nil
}
columns2Handle := buildColumns2Handle(v.SelectPlan.Schema(), tblID2table)
base := newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec)
base.initCap = chunk.ZeroCapacity
updateExec := &UpdateExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec),
baseExecutor: base,
SelectExec: selExec,
OrderedList: v.OrderedList,
tblID2table: tblID2table,
Expand Down Expand Up @@ -1320,8 +1339,10 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
b.err = errors.Trace(b.err)
return nil
}
base := newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec)
base.initCap = chunk.ZeroCapacity
deleteExec := &DeleteExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec),
baseExecutor: base,
SelectExec: selExec,
Tables: v.Tables,
IsMultiTable: v.IsMultiTable,
Expand Down Expand Up @@ -1542,7 +1563,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
innerKeyCols[i] = v.InnerJoinKeys[i].Index
}
e.innerCtx.keyCols = innerKeyCols
e.joinResult = e.newChunk()
e.joinResult = e.newFirstChunk()
metrics.ExecutorCounter.WithLabelValues("IndexLookUpJoin").Inc()
return e
}
Expand Down
7 changes: 4 additions & 3 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
batchDelete := e.ctx.GetSessionVars().BatchDelete && !e.ctx.GetSessionVars().InTxn()
batchDMLSize := e.ctx.GetSessionVars().DMLBatchSize
fields := e.children[0].retTypes()
chk := e.children[0].newFirstChunk()
for {
chk := e.children[0].newChunk()
iter := chunk.NewIterator4Chunk(chk)

err := e.children[0].Next(ctx, chk)
Expand Down Expand Up @@ -133,6 +133,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
}
rowCount++
}
chk = chunk.Renew(chk, e.maxChunkSize)
}

return nil
Expand Down Expand Up @@ -184,10 +185,9 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
colPosInfos := e.getColPosInfos(e.children[0].Schema())
tblRowMap := make(tableRowMapType)
fields := e.children[0].retTypes()
chk := e.children[0].newFirstChunk()
for {
chk := e.children[0].newChunk()
iter := chunk.NewIterator4Chunk(chk)

err := e.children[0].Next(ctx, chk)
if err != nil {
return errors.Trace(err)
Expand All @@ -200,6 +200,7 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
joinedDatumRow := joinedChunkRow.GetDatumRow(fields)
e.composeTblRowMap(tblRowMap, colPosInfos, joinedDatumRow)
}
chk = chunk.Renew(chk, e.maxChunkSize)
}

return errors.Trace(e.removeRowsInTblRowMap(tblRowMap))
Expand Down
2 changes: 1 addition & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er
handleCnt := len(task.handles)
task.rows = make([]chunk.Row, 0, handleCnt)
for {
chk := tableReader.newChunk()
chk := tableReader.newFirstChunk()
err = tableReader.Next(ctx, chk)
if err != nil {
log.Error(err)
Expand Down
Loading

0 comments on commit 05b37de

Please sign in to comment.