Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: use chunk grow for simple executor #7540

Merged
merged 13 commits into from
Sep 27, 2018
2 changes: 1 addition & 1 deletion ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (d *ddlCtx) GetTableMaxRowID(startTS uint64, tbl table.PhysicalTable) (maxR
}
defer terror.Call(result.Close)

chk := chunk.NewChunkWithCapacity(getColumnsTypes(columns), 1)
chk := chunk.New(getColumnsTypes(columns), 1, 1)
err = result.Next(ctx, chk)
if err != nil {
return maxRowID, false, errors.Trace(err)
Expand Down
8 changes: 4 additions & 4 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *testSuite) TestSelectNormal(c *C) {
response.Fetch(context.TODO())

// Test Next.
chk := chunk.NewChunkWithCapacity(colTypes, 32)
chk := chunk.New(colTypes, 32, 32)
numAllRows := 0
for {
err = response.Next(context.TODO(), chk)
Expand Down Expand Up @@ -122,7 +122,7 @@ func (s *testSuite) TestSelectStreaming(c *C) {
response.Fetch(context.TODO())

// Test Next.
chk := chunk.NewChunkWithCapacity(colTypes, 32)
chk := chunk.New(colTypes, 32, 32)
numAllRows := 0
for {
err = response.Next(context.TODO(), chk)
Expand Down Expand Up @@ -259,7 +259,7 @@ func BenchmarkReadRowsData(b *testing.B) {
for i := 0; i < numCols; i++ {
colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong}
}
chk := chunk.NewChunkWithCapacity(colTypes, numRows)
chk := chunk.New(colTypes, numRows, numRows)

buffer := populateBuffer()

Expand All @@ -277,7 +277,7 @@ func BenchmarkDecodeToChunk(b *testing.B) {
for i := 0; i < numCols; i++ {
colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong}
}
chk := chunk.NewChunkWithCapacity(colTypes, numRows)
chk := chunk.New(colTypes, numRows, numRows)

for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ {
for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ {
Expand Down
4 changes: 2 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (a *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error {

// NewChunk create a new chunk using NewChunk function in chunk package.
func (a *recordSet) NewChunk() *chunk.Chunk {
return a.executor.newChunk()
return a.executor.newFirstChunk()
}

func (a *recordSet) Close() error {
Expand Down Expand Up @@ -270,7 +270,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
a.logSlowQuery(txnTS, err == nil)
}()

err = e.Next(ctx, e.newChunk())
err = e.Next(ctx, e.newFirstChunk())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error {
FieldType: *colTypeForHandle,
})

e.srcChunk = e.newChunk()
e.srcChunk = e.newFirstChunk()
dagPB, err := e.buildDAGPB()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -197,7 +197,7 @@ func (e *RecoverIndexExec) Open(ctx context.Context) error {
return errors.Trace(err)
}

e.srcChunk = chunk.NewChunkWithCapacity(e.columnsTypes(), e.maxChunkSize)
e.srcChunk = chunk.New(e.columnsTypes(), e.initCap, e.maxChunkSize)
e.batchSize = 2048
e.recoverRows = make([]recoverRows, 0, e.batchSize)
e.idxValsBufs = make([][]types.Datum, e.batchSize)
Expand Down Expand Up @@ -636,7 +636,7 @@ func (e *CleanupIndexExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.idxChunk = chunk.NewChunkWithCapacity(e.getIdxColTypes(), e.maxChunkSize)
e.idxChunk = chunk.New(e.getIdxColTypes(), e.initCap, e.maxChunkSize)
e.idxValues = make(map[int64][][]types.Datum, e.batchSize)
e.batchKeys = make([]kv.Key, 0, e.batchSize)
e.idxValsBufs = make([][]types.Datum, e.batchSize)
Expand Down
10 changes: 5 additions & 5 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (e *HashAggExec) initForUnparallelExec() {
e.partialResultMap = make(aggPartialResultMapper, 0)
e.groupKeyBuffer = make([]byte, 0, 8)
e.groupValDatums = make([]types.Datum, 0, len(e.groupKeyBuffer))
e.childResult = e.children[0].newChunk()
e.childResult = e.children[0].newFirstChunk()
}

func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
Expand Down Expand Up @@ -270,12 +270,12 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
partialResultsMap: make(aggPartialResultMapper, 0),
groupByItems: e.GroupByItems,
groupValDatums: make([]types.Datum, 0, len(e.GroupByItems)),
chk: e.children[0].newChunk(),
chk: e.children[0].newFirstChunk(),
}

e.partialWorkers[i] = w
e.inputCh <- &HashAggInput{
chk: e.children[0].newChunk(),
chk: e.children[0].newFirstChunk(),
giveBackCh: w.inputCh,
}
}
Expand All @@ -292,7 +292,7 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
rowBuffer: make([]types.Datum, 0, e.Schema().Len()),
mutableRow: chunk.MutRowFromTypes(e.retTypes()),
}
e.finalWorkers[i].finalResultHolderCh <- e.newChunk()
e.finalWorkers[i].finalResultHolderCh <- e.newFirstChunk()
}
}

Expand Down Expand Up @@ -734,7 +734,7 @@ func (e *StreamAggExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.childResult = e.children[0].newChunk()
e.childResult = e.children[0].newFirstChunk()
e.executed = false
e.isChildReturnEmpty = true
e.inputIter = chunk.NewIterator4Chunk(e.childResult)
Expand Down
57 changes: 39 additions & 18 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"time"

"github.com/cznic/mathutil"
"github.com/cznic/sortutil"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/distsql"
Expand Down Expand Up @@ -398,8 +399,10 @@ func (b *executorBuilder) buildChecksumTable(v *plannercore.ChecksumTable) Execu
}

func (b *executorBuilder) buildDeallocate(v *plannercore.Deallocate) Executor {
base := newBaseExecutor(b.ctx, nil, v.ExplainID())
base.initCap = chunk.ZeroCapacity
e := &DeallocateExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID()),
baseExecutor: base,
Name: v.Name,
}
return e
Expand Down Expand Up @@ -431,17 +434,22 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor {
b.err = errors.Trace(b.err)
return nil
}
n := int(mathutil.MinUint64(v.Count, uint64(b.ctx.GetSessionVars().MaxChunkSize)))
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec)
base.initCap = n
e := &LimitExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec),
baseExecutor: base,
begin: v.Offset,
end: v.Offset + v.Count,
}
return e
}

func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor {
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = chunk.ZeroCapacity
e := &PrepareExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
baseExecutor: base,
is: b.is,
name: v.Name,
sqlText: v.SQLText,
Expand Down Expand Up @@ -495,17 +503,21 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor {
case *ast.RevokeStmt:
return b.buildRevoke(s)
}
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = chunk.ZeroCapacity
e := &SimpleExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
baseExecutor: base,
Statement: v.Statement,
is: b.is,
}
return e
}

func (b *executorBuilder) buildSet(v *plannercore.Set) Executor {
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = chunk.ZeroCapacity
e := &SetExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
baseExecutor: base,
vars: v.VarAssigns,
}
return e
Expand All @@ -523,6 +535,7 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
} else {
baseExec = newBaseExecutor(b.ctx, nil, v.ExplainID())
}
baseExec.initCap = chunk.ZeroCapacity

ivs := &InsertValues{
baseExecutor: baseExec,
Expand Down Expand Up @@ -614,12 +627,13 @@ func (b *executorBuilder) buildGrant(grant *ast.GrantStmt) Executor {

func (b *executorBuilder) buildRevoke(revoke *ast.RevokeStmt) Executor {
e := &RevokeExec{
ctx: b.ctx,
Privs: revoke.Privs,
ObjectType: revoke.ObjectType,
Level: revoke.Level,
Users: revoke.Users,
is: b.is,
baseExecutor: newBaseExecutor(b.ctx, nil, "RevokeStmt"),
ctx: b.ctx,
Privs: revoke.Privs,
ObjectType: revoke.ObjectType,
Level: revoke.Level,
Users: revoke.Users,
is: b.is,
}
return e
}
Expand Down Expand Up @@ -1091,8 +1105,10 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu
b.err = errors.Errorf("buildTableDual failed, invalid row count for dual table: %v", v.RowCount)
return nil
}
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = v.RowCount
e := &TableDualExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
baseExecutor: base,
numDualRows: v.RowCount,
}
// Init the startTS for later use.
Expand Down Expand Up @@ -1209,9 +1225,10 @@ func (b *executorBuilder) buildMaxOneRow(v *plannercore.PhysicalMaxOneRow) Execu
b.err = errors.Trace(b.err)
return nil
}
e := &MaxOneRowExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec),
}
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec)
base.initCap = 2
base.maxChunkSize = 2
e := &MaxOneRowExec{baseExecutor: base}
return e
}

Expand Down Expand Up @@ -1241,8 +1258,10 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
return nil
}
columns2Handle := buildColumns2Handle(v.SelectPlan.Schema(), tblID2table)
base := newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec)
base.initCap = chunk.ZeroCapacity
updateExec := &UpdateExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec),
baseExecutor: base,
SelectExec: selExec,
OrderedList: v.OrderedList,
tblID2table: tblID2table,
Expand Down Expand Up @@ -1320,8 +1339,10 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
b.err = errors.Trace(b.err)
return nil
}
base := newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec)
base.initCap = chunk.ZeroCapacity
deleteExec := &DeleteExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec),
baseExecutor: base,
SelectExec: selExec,
Tables: v.Tables,
IsMultiTable: v.IsMultiTable,
Expand Down Expand Up @@ -1542,7 +1563,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
innerKeyCols[i] = v.InnerJoinKeys[i].Index
}
e.innerCtx.keyCols = innerKeyCols
e.joinResult = e.newChunk()
e.joinResult = e.newFirstChunk()
metrics.ExecutorCounter.WithLabelValues("IndexLookUpJoin").Inc()
return e
}
Expand Down
7 changes: 4 additions & 3 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
batchDelete := e.ctx.GetSessionVars().BatchDelete && !e.ctx.GetSessionVars().InTxn()
batchDMLSize := e.ctx.GetSessionVars().DMLBatchSize
fields := e.children[0].retTypes()
chk := e.children[0].newFirstChunk()
for {
chk := e.children[0].newChunk()
iter := chunk.NewIterator4Chunk(chk)

err := e.children[0].Next(ctx, chk)
Expand Down Expand Up @@ -133,6 +133,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
}
rowCount++
}
chk = chunk.Renew(chk, e.maxChunkSize)
}

return nil
Expand Down Expand Up @@ -184,10 +185,9 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
colPosInfos := e.getColPosInfos(e.children[0].Schema())
tblRowMap := make(tableRowMapType)
fields := e.children[0].retTypes()
chk := e.children[0].newFirstChunk()
for {
chk := e.children[0].newChunk()
iter := chunk.NewIterator4Chunk(chk)

err := e.children[0].Next(ctx, chk)
if err != nil {
return errors.Trace(err)
Expand All @@ -200,6 +200,7 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
joinedDatumRow := joinedChunkRow.GetDatumRow(fields)
e.composeTblRowMap(tblRowMap, colPosInfos, joinedDatumRow)
}
chk = chunk.Renew(chk, e.maxChunkSize)
}

return errors.Trace(e.removeRowsInTblRowMap(tblRowMap))
Expand Down
2 changes: 1 addition & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er
handleCnt := len(task.handles)
task.rows = make([]chunk.Row, 0, handleCnt)
for {
chk := tableReader.newChunk()
chk := tableReader.newFirstChunk()
err = tableReader.Next(ctx, chk)
if err != nil {
log.Error(err)
Expand Down
Loading