From f7b778ef0c6fd8482e1fcc7b36dd934026404669 Mon Sep 17 00:00:00 2001 From: robi Date: Wed, 29 Aug 2018 15:31:28 +0800 Subject: [PATCH 01/12] executor,chunk: add initCap to executor and grow chunk list --- executor/executor.go | 6 ++++-- executor/index_lookup_join.go | 2 +- executor/join.go | 4 ++-- executor/pkg_test.go | 2 +- executor/sort.go | 10 +++++----- util/chunk/iterator_test.go | 4 ++-- util/chunk/list.go | 31 ++++++++++++++++++------------- util/chunk/list_test.go | 6 +++--- 8 files changed, 36 insertions(+), 29 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 1ec327617dfc1..b94b8c6fd68fa 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -65,6 +65,7 @@ type baseExecutor struct { ctx sessionctx.Context id string schema *expression.Schema + initCap int maxChunkSize int children []Executor retFieldTypes []*types.FieldType @@ -102,7 +103,7 @@ func (e *baseExecutor) Schema() *expression.Schema { // newChunk creates a new chunk to buffer current executor's result. func (e *baseExecutor) newChunk() *chunk.Chunk { - return chunk.NewChunkWithCapacity(e.retTypes(), e.maxChunkSize) + return chunk.New(e.retTypes(), e.initCap, e.maxChunkSize) } // retTypes returns all output column types. @@ -121,6 +122,7 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin ctx: ctx, id: id, schema: schema, + initCap: ctx.GetSessionVars().MaxChunkSize, maxChunkSize: ctx.GetSessionVars().MaxChunkSize, } if schema != nil { @@ -814,7 +816,7 @@ func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error { func (e *TableScanExec) nextChunk4InfoSchema(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() if e.virtualTableChunkList == nil { - e.virtualTableChunkList = chunk.NewList(e.retTypes(), e.maxChunkSize) + e.virtualTableChunkList = chunk.NewList(e.retTypes(), e.initCap, e.maxChunkSize) columns := make([]*table.Column, e.schema.Len()) for i, colInfo := range e.columns { columns[i] = table.ToColumn(colInfo) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index cf380e9534494..817874f66ebfe 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -511,7 +511,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa return errors.Trace(err) } defer terror.Call(innerExec.Close) - innerResult := chunk.NewList(innerExec.retTypes(), iw.ctx.GetSessionVars().MaxChunkSize) + innerResult := chunk.NewList(innerExec.retTypes(), iw.ctx.GetSessionVars().MaxChunkSize, iw.ctx.GetSessionVars().MaxChunkSize) innerResult.GetMemTracker().SetLabel("inner result") innerResult.GetMemTracker().AttachTo(task.memTracker) for { diff --git a/executor/join.go b/executor/join.go index a8a720feb9513..5b66edd6d501b 100644 --- a/executor/join.go +++ b/executor/join.go @@ -250,7 +250,7 @@ func (e *HashJoinExec) wait4Inner() (finished bool, err error) { // and append them to e.innerResult. func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh chan struct{}) { defer close(chkCh) - e.innerResult = chunk.NewList(e.innerExec.retTypes(), e.maxChunkSize) + e.innerResult = chunk.NewList(e.innerExec.retTypes(), e.initCap, e.maxChunkSize) e.innerResult.GetMemTracker().AttachTo(e.memTracker) e.innerResult.GetMemTracker().SetLabel("innerResult") var err error @@ -622,7 +622,7 @@ func (e *NestedLoopApplyExec) Open(ctx context.Context) error { e.innerRows = e.innerRows[:0] e.outerChunk = e.outerExec.newChunk() e.innerChunk = e.innerExec.newChunk() - e.innerList = chunk.NewList(e.innerExec.retTypes(), e.maxChunkSize) + e.innerList = chunk.NewList(e.innerExec.retTypes(), e.initCap, e.maxChunkSize) e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaNestedLoopApply) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) diff --git a/executor/pkg_test.go b/executor/pkg_test.go index ac75608b846cf..b19620d510ef5 100644 --- a/executor/pkg_test.go +++ b/executor/pkg_test.go @@ -91,7 +91,7 @@ func (s *pkgTestSuite) TestNestedLoopApply(c *C) { innerFilter: []expression.Expression{innerFilter}, joiner: joiner, } - join.innerList = chunk.NewList(innerExec.retTypes(), innerExec.maxChunkSize) + join.innerList = chunk.NewList(innerExec.retTypes(), innerExec.initCap, innerExec.maxChunkSize) join.innerChunk = innerExec.newChunk() join.outerChunk = outerExec.newChunk() joinChk := join.newChunk() diff --git a/executor/sort.go b/executor/sort.go index 603f525b59264..26551865f0345 100644 --- a/executor/sort.go +++ b/executor/sort.go @@ -107,7 +107,7 @@ func (e *SortExec) Next(ctx context.Context, chk *chunk.Chunk) error { func (e *SortExec) fetchRowChunks(ctx context.Context) error { fields := e.retTypes() - e.rowChunks = chunk.NewList(fields, e.maxChunkSize) + e.rowChunks = chunk.NewList(fields, e.initCap, e.maxChunkSize) e.rowChunks.GetMemTracker().AttachTo(e.memTracker) e.rowChunks.GetMemTracker().SetLabel("rowChunks") for { @@ -171,7 +171,7 @@ func (e *SortExec) buildKeyExprsAndTypes() { } func (e *SortExec) buildKeyChunks() error { - e.keyChunks = chunk.NewList(e.keyTypes, e.maxChunkSize) + e.keyChunks = chunk.NewList(e.keyTypes, e.initCap, e.maxChunkSize) e.keyChunks.GetMemTracker().SetLabel("keyChunks") e.keyChunks.GetMemTracker().AttachTo(e.memTracker) @@ -323,7 +323,7 @@ func (e *TopNExec) Next(ctx context.Context, chk *chunk.Chunk) error { func (e *TopNExec) loadChunksUntilTotalLimit(ctx context.Context) error { e.chkHeap = &topNChunkHeap{e} - e.rowChunks = chunk.NewList(e.retTypes(), e.maxChunkSize) + e.rowChunks = chunk.NewList(e.retTypes(), e.initCap, e.maxChunkSize) e.rowChunks.GetMemTracker().AttachTo(e.memTracker) e.rowChunks.GetMemTracker().SetLabel("rowChunks") for e.rowChunks.Len() < e.totalLimit { @@ -425,7 +425,7 @@ func (e *TopNExec) processChildChk(childRowChk, childKeyChk *chunk.Chunk) error // but we want descending top N, then we will keep all data in memory. // But if data is distributed randomly, this function will be called log(n) times. func (e *TopNExec) doCompaction() error { - newRowChunks := chunk.NewList(e.retTypes(), e.maxChunkSize) + newRowChunks := chunk.NewList(e.retTypes(), e.initCap, e.maxChunkSize) newRowPtrs := make([]chunk.RowPtr, 0, e.rowChunks.Len()) for _, rowPtr := range e.rowPtrs { newRowPtr := newRowChunks.AppendRow(e.rowChunks.GetRow(rowPtr)) @@ -436,7 +436,7 @@ func (e *TopNExec) doCompaction() error { e.rowChunks = newRowChunks if e.keyChunks != nil { - newKeyChunks := chunk.NewList(e.keyTypes, e.maxChunkSize) + newKeyChunks := chunk.NewList(e.keyTypes, e.initCap, e.maxChunkSize) for _, rowPtr := range e.rowPtrs { newKeyChunks.AppendRow(e.keyChunks.GetRow(rowPtr)) } diff --git a/util/chunk/iterator_test.go b/util/chunk/iterator_test.go index ca0b69273014d..ddbae1ebbec85 100644 --- a/util/chunk/iterator_test.go +++ b/util/chunk/iterator_test.go @@ -29,8 +29,8 @@ func (s *testChunkSuite) TestIterator(c *check.C) { expected = append(expected, int64(i)) } var rows []Row - li := NewList(fields, 1) - li2 := NewList(fields, 5) + li := NewList(fields, 1, 2) + li2 := NewList(fields, 8, 16) var ptrs []RowPtr var ptrs2 []RowPtr for i := 0; i < n; i++ { diff --git a/util/chunk/list.go b/util/chunk/list.go index 9c89ebbce6ad1..da789211d5a0d 100644 --- a/util/chunk/list.go +++ b/util/chunk/list.go @@ -21,11 +21,12 @@ import ( // List holds a slice of chunks, use to append rows with max chunk size properly handled. type List struct { - fieldTypes []*types.FieldType - maxChunkSize int - length int - chunks []*Chunk - freelist []*Chunk + fieldTypes []*types.FieldType + initChunkSize int + maxChunkSize int + length int + chunks []*Chunk + freelist []*Chunk memTracker *memory.Tracker // track memory usage. consumedIdx int // chunk index in "chunks", has been consumed. @@ -38,13 +39,14 @@ type RowPtr struct { RowIdx uint32 } -// NewList creates a new List with field types and max chunk size. -func NewList(fieldTypes []*types.FieldType, maxChunkSize int) *List { +// NewList creates a new List with field types, init chunk size and max chunk size. +func NewList(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int) *List { l := &List{ - fieldTypes: fieldTypes, - maxChunkSize: maxChunkSize, - memTracker: memory.NewTracker("chunk.List", -1), - consumedIdx: -1, + fieldTypes: fieldTypes, + initChunkSize: initChunkSize, + maxChunkSize: maxChunkSize, + memTracker: memory.NewTracker("chunk.List", -1), + consumedIdx: -1, } return l } @@ -72,7 +74,7 @@ func (l *List) GetChunk(chkIdx int) *Chunk { // AppendRow appends a row to the List, the row is copied to the List. func (l *List) AppendRow(row Row) RowPtr { chkIdx := len(l.chunks) - 1 - if chkIdx == -1 || l.chunks[chkIdx].NumRows() >= l.maxChunkSize || chkIdx == l.consumedIdx { + if chkIdx == -1 || l.chunks[chkIdx].NumRows() >= l.chunks[chkIdx].Capacity() || chkIdx == l.consumedIdx { newChk := l.allocChunk() l.chunks = append(l.chunks, newChk) if chkIdx != l.consumedIdx { @@ -115,7 +117,10 @@ func (l *List) allocChunk() (chk *Chunk) { chk.Reset() return } - return NewChunkWithCapacity(l.fieldTypes, l.maxChunkSize) + if len(l.chunks) > 0 { + return Renew(l.chunks[len(l.chunks)-1], l.maxChunkSize) + } + return New(l.fieldTypes, l.initChunkSize, l.maxChunkSize) } // GetRow gets a Row from the list by RowPtr. diff --git a/util/chunk/list_test.go b/util/chunk/list_test.go index 8468876c6d826..646812331ceb8 100644 --- a/util/chunk/list_test.go +++ b/util/chunk/list_test.go @@ -28,7 +28,7 @@ func (s *testChunkSuite) TestList(c *check.C) { fields := []*types.FieldType{ types.NewFieldType(mysql.TypeLonglong), } - l := NewList(fields, 2) + l := NewList(fields, 2, 2) srcChunk := NewChunkWithCapacity(fields, 32) srcChunk.AppendInt64(0, 1) srcRow := srcChunk.GetRow(0) @@ -100,7 +100,7 @@ func (s *testChunkSuite) TestListMemoryUsage(c *check.C) { srcChk.AppendTime(3, timeObj) srcChk.AppendDuration(4, durationObj) - list := NewList(fieldTypes, maxChunkSize) + list := NewList(fieldTypes, maxChunkSize, maxChunkSize*2) c.Assert(list.GetMemTracker().BytesConsumed(), check.Equals, int64(0)) list.AppendRow(srcChk.GetRow(0)) @@ -131,7 +131,7 @@ func BenchmarkListMemoryUsage(b *testing.B) { row := chk.GetRow(0) initCap := 50 - list := NewList(fieldTypes, 2) + list := NewList(fieldTypes, 2, 8) for i := 0; i < initCap; i++ { list.AppendRow(row) } From 2e3d555f0fc4a3f1ffc17dd276df8b7dd42fc0df Mon Sep 17 00:00:00 2001 From: robi Date: Wed, 29 Aug 2018 19:00:47 +0800 Subject: [PATCH 02/12] *: modify "simple" executor first. --- ddl/reorg.go | 2 +- distsql/distsql_test.go | 8 +++---- executor/admin.go | 4 ++-- executor/builder.go | 40 +++++++++++++++++++---------------- executor/delete.go | 7 +++--- executor/executor.go | 26 +++++++++++++++++------ executor/explain.go | 2 +- executor/insert_common.go | 2 +- executor/pkg_test.go | 2 +- executor/point_get.go | 2 +- executor/prepared.go | 2 +- executor/show.go | 2 +- executor/union_scan.go | 2 +- executor/update.go | 3 ++- privilege/privileges/cache.go | 9 ++++---- server/conn.go | 4 +++- session/session.go | 11 +++++++--- session/tidb.go | 3 ++- util/chunk/chunk.go | 12 ++++++++++- util/chunk/iterator_test.go | 2 +- util/codec/bench_test.go | 2 +- util/codec/codec_test.go | 2 +- 22 files changed, 93 insertions(+), 56 deletions(-) diff --git a/ddl/reorg.go b/ddl/reorg.go index 66730b5a826d9..10cf2d85b9064 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -282,7 +282,7 @@ func (d *ddlCtx) GetTableMaxRowID(startTS uint64, tbl table.PhysicalTable) (maxR } defer terror.Call(result.Close) - chk := chunk.NewChunkWithCapacity(getColumnsTypes(columns), 1) + chk := chunk.New(getColumnsTypes(columns), 1, 1) err = result.Next(ctx, chk) if err != nil { return maxRowID, false, errors.Trace(err) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index e914382417a8a..99825557c15d0 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -70,7 +70,7 @@ func (s *testSuite) TestSelectNormal(c *C) { response.Fetch(context.TODO()) // Test Next. - chk := chunk.NewChunkWithCapacity(colTypes, 32) + chk := chunk.New(colTypes, 32, 32*3) numAllRows := 0 for { err = response.Next(context.TODO(), chk) @@ -122,7 +122,7 @@ func (s *testSuite) TestSelectStreaming(c *C) { response.Fetch(context.TODO()) // Test Next. - chk := chunk.NewChunkWithCapacity(colTypes, 32) + chk := chunk.New(colTypes, 32, 32*3) numAllRows := 0 for { err = response.Next(context.TODO(), chk) @@ -259,7 +259,7 @@ func BenchmarkReadRowsData(b *testing.B) { for i := 0; i < numCols; i++ { colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} } - chk := chunk.NewChunkWithCapacity(colTypes, numRows) + chk := chunk.New(colTypes, numRows, numRows) buffer := populateBuffer() @@ -277,7 +277,7 @@ func BenchmarkDecodeToChunk(b *testing.B) { for i := 0; i < numCols; i++ { colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} } - chk := chunk.NewChunkWithCapacity(colTypes, numRows) + chk := chunk.New(colTypes, numRows, numRows) for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { diff --git a/executor/admin.go b/executor/admin.go index 2a571b878ce7f..51054ff1a0591 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -197,7 +197,7 @@ func (e *RecoverIndexExec) Open(ctx context.Context) error { return errors.Trace(err) } - e.srcChunk = chunk.NewChunkWithCapacity(e.columnsTypes(), e.maxChunkSize) + e.srcChunk = chunk.New(e.columnsTypes(), e.initCap, e.maxChunkSize) e.batchSize = 2048 e.recoverRows = make([]recoverRows, 0, e.batchSize) e.idxValsBufs = make([][]types.Datum, e.batchSize) @@ -636,7 +636,7 @@ func (e *CleanupIndexExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } - e.idxChunk = chunk.NewChunkWithCapacity(e.getIdxColTypes(), e.maxChunkSize) + e.idxChunk = chunk.New(e.getIdxColTypes(), e.initCap, e.maxChunkSize) e.idxValues = make(map[int64][][]types.Datum, e.batchSize) e.batchKeys = make([]kv.Key, 0, e.batchSize) e.idxValsBufs = make([][]types.Datum, e.batchSize) diff --git a/executor/builder.go b/executor/builder.go index de7c4056c6a91..218d9ae027864 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/cznic/mathutil" "github.com/cznic/sortutil" "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/distsql" @@ -389,7 +390,7 @@ func (b *executorBuilder) buildChecksumTable(v *plannercore.ChecksumTable) Execu func (b *executorBuilder) buildDeallocate(v *plannercore.Deallocate) Executor { e := &DeallocateExec{ - baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID()), + baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID()).withInitCap(chunk.NoDataChunkCap), Name: v.Name, } return e @@ -421,8 +422,9 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor { b.err = errors.Trace(b.err) return nil } + n := int(mathutil.MinUint64(v.Count, math.MaxInt64)) e := &LimitExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec), + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec).withInitCap(n), begin: v.Offset, end: v.Offset + v.Count, } @@ -431,7 +433,7 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor { func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor { e := &PrepareExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()).withInitCap(chunk.NoDataChunkCap), is: b.is, name: v.Name, sqlText: v.SQLText, @@ -486,7 +488,7 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor { return b.buildRevoke(s) } e := &SimpleExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()).withInitCap(chunk.NoDataChunkCap), Statement: v.Statement, is: b.is, } @@ -495,7 +497,7 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor { func (b *executorBuilder) buildSet(v *plannercore.Set) Executor { e := &SetExecutor{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()).withInitCap(chunk.NoDataChunkCap), vars: v.VarAssigns, } return e @@ -509,9 +511,9 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor { } var baseExec baseExecutor if selectExec != nil { - baseExec = newBaseExecutor(b.ctx, nil, v.ExplainID(), selectExec) + baseExec = newBaseExecutor(b.ctx, nil, v.ExplainID(), selectExec).withInitCap(chunk.NoDataChunkCap) } else { - baseExec = newBaseExecutor(b.ctx, nil, v.ExplainID()) + baseExec = newBaseExecutor(b.ctx, nil, v.ExplainID()).withInitCap(chunk.NoDataChunkCap) } ivs := &InsertValues{ @@ -604,12 +606,13 @@ func (b *executorBuilder) buildGrant(grant *ast.GrantStmt) Executor { func (b *executorBuilder) buildRevoke(revoke *ast.RevokeStmt) Executor { e := &RevokeExec{ - ctx: b.ctx, - Privs: revoke.Privs, - ObjectType: revoke.ObjectType, - Level: revoke.Level, - Users: revoke.Users, - is: b.is, + baseExecutor: newBaseExecutor(b.ctx, nil, "RevokeStmt"), + ctx: b.ctx, + Privs: revoke.Privs, + ObjectType: revoke.ObjectType, + Level: revoke.Level, + Users: revoke.Users, + is: b.is, } return e } @@ -1082,7 +1085,7 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu return nil } e := &TableDualExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()).withInitCap(v.RowCount), numDualRows: v.RowCount, } // Init the startTS for later use. @@ -1122,8 +1125,9 @@ func (b *executorBuilder) buildSort(v *plannercore.PhysicalSort) Executor { b.err = errors.Trace(b.err) return nil } + n := int(mathutil.MinUint64(v.Count, math.MaxInt64)) sortExec := SortExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec), + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec).withInitCap(n), ByItems: v.ByItems, schema: v.Schema(), } @@ -1200,7 +1204,7 @@ func (b *executorBuilder) buildMaxOneRow(v *plannercore.PhysicalMaxOneRow) Execu return nil } e := &MaxOneRowExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec), + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec).withInitCap(2).withMaxChunkSize(2), } return e } @@ -1232,7 +1236,7 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor { } columns2Handle := buildColumns2Handle(v.SelectPlan.Schema(), tblID2table) updateExec := &UpdateExec{ - baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec), + baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec).withInitCap(chunk.NoDataChunkCap), SelectExec: selExec, OrderedList: v.OrderedList, tblID2table: tblID2table, @@ -1311,7 +1315,7 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor { return nil } deleteExec := &DeleteExec{ - baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec), + baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec).withInitCap(chunk.NoDataChunkCap), SelectExec: selExec, Tables: v.Tables, IsMultiTable: v.IsMultiTable, diff --git a/executor/delete.go b/executor/delete.go index c60e8b9200018..50191d21ab006 100644 --- a/executor/delete.go +++ b/executor/delete.go @@ -104,8 +104,8 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error { batchDelete := e.ctx.GetSessionVars().BatchDelete && !e.ctx.GetSessionVars().InTxn() batchDMLSize := e.ctx.GetSessionVars().DMLBatchSize fields := e.children[0].retTypes() + chk := e.children[0].newChunk() for { - chk := e.children[0].newChunk() iter := chunk.NewIterator4Chunk(chk) err := e.children[0].Next(ctx, chk) @@ -133,6 +133,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error { } rowCount++ } + chk = chunk.Renew(chk, e.maxChunkSize) } return nil @@ -184,10 +185,9 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error { colPosInfos := e.getColPosInfos(e.children[0].Schema()) tblRowMap := make(tableRowMapType) fields := e.children[0].retTypes() + chk := e.children[0].newChunk() for { - chk := e.children[0].newChunk() iter := chunk.NewIterator4Chunk(chk) - err := e.children[0].Next(ctx, chk) if err != nil { return errors.Trace(err) @@ -200,6 +200,7 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error { joinedDatumRow := joinedChunkRow.GetDatumRow(fields) e.composeTblRowMap(tblRowMap, colPosInfos, joinedDatumRow) } + chk = chunk.Renew(chk, e.maxChunkSize) } return errors.Trace(e.removeRowsInTblRowMap(tblRowMap)) diff --git a/executor/executor.go b/executor/executor.go index b94b8c6fd68fa..e15c6a1c557d7 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/model" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/terror" @@ -135,6 +136,16 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin return e } +func (e baseExecutor) withInitCap(initCap int) (ret baseExecutor) { + e.initCap = initCap + return e +} + +func (e baseExecutor) withMaxChunkSize(maxChunkSize int) (ret baseExecutor) { + e.maxChunkSize = maxChunkSize + return e +} + // Executor is the physical implementation of a algebra operator. // // In TiDB, all algebra operators are implemented as iterators, i.e., they @@ -170,7 +181,7 @@ func (e *CancelDDLJobsExec) Next(ctx context.Context, chk *chunk.Chunk) error { if e.cursor >= len(e.jobIDs) { return nil } - numCurBatch := mathutil.Min(e.maxChunkSize, len(e.jobIDs)-e.cursor) + numCurBatch := mathutil.Min(chk.Capacity(), len(e.jobIDs)-e.cursor) for i := e.cursor; i < e.cursor+numCurBatch; i++ { chk.AppendString(0, fmt.Sprintf("%d", e.jobIDs[i])) if e.errs[i] != nil { @@ -266,7 +277,7 @@ func (e *ShowDDLJobQueriesExec) Next(ctx context.Context, chk *chunk.Chunk) erro if len(e.jobIDs) >= len(e.jobs) { return nil } - numCurBatch := mathutil.Min(e.maxChunkSize, len(e.jobs)-e.cursor) + numCurBatch := mathutil.Min(chk.Capacity(), len(e.jobs)-e.cursor) for _, id := range e.jobIDs { for i := e.cursor; i < e.cursor+numCurBatch; i++ { if id == e.jobs[i].ID { @@ -306,7 +317,7 @@ func (e *ShowDDLJobsExec) Next(ctx context.Context, chk *chunk.Chunk) error { if e.cursor >= len(e.jobs) { return nil } - numCurBatch := mathutil.Min(e.maxChunkSize, len(e.jobs)-e.cursor) + numCurBatch := mathutil.Min(chk.Capacity(), len(e.jobs)-e.cursor) for i := e.cursor; i < e.cursor+numCurBatch; i++ { chk.AppendInt64(0, e.jobs[i].ID) chk.AppendString(1, getSchemaName(e.is, e.jobs[i].SchemaID)) @@ -630,8 +641,8 @@ func init() { if err != nil { return rows, errors.Trace(err) } + chk := exec.newChunk() for { - chk := exec.newChunk() err = exec.Next(ctx, chk) if err != nil { return rows, errors.Trace(err) @@ -644,6 +655,7 @@ func init() { row := r.GetDatumRow(exec.retTypes()) rows = append(rows, row) } + chk = chunk.Renew(chk, variable.DefMaxChunkSize) } } } @@ -716,7 +728,7 @@ func (e *SelectionExec) Close() error { // Next implements the Executor Next interface. func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if !e.batched { return errors.Trace(e.unBatchedNext(ctx, chk)) @@ -727,7 +739,7 @@ func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error { if !e.selected[e.inputRow.Idx()] { continue } - if chk.NumRows() == e.maxChunkSize { + if chk.NumRows() >= chk.Capacity() { return nil } chk.AppendRow(e.inputRow) @@ -801,7 +813,7 @@ func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error { } mutableRow := chunk.MutRowFromTypes(e.retTypes()) - for chk.NumRows() < e.maxChunkSize { + for chk.NumRows() < chk.Capacity() { row, err := e.getRow(handle) if err != nil { return errors.Trace(err) diff --git a/executor/explain.go b/executor/explain.go index 781f026c736af..500d7ef0dd6ef 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -40,7 +40,7 @@ func (e *ExplainExec) Next(ctx context.Context, chk *chunk.Chunk) error { return nil } - numCurRows := mathutil.Min(e.maxChunkSize, len(e.rows)-e.cursor) + numCurRows := mathutil.Min(chk.Capacity(), len(e.rows)-e.cursor) for i := e.cursor; i < e.cursor+numCurRows; i++ { for j := range e.rows[i] { chk.AppendString(j, e.rows[i][j]) diff --git a/executor/insert_common.go b/executor/insert_common.go index 8a4d7856dc93a..a538dac02e74b 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -252,7 +252,7 @@ func (e *InsertValues) insertRowsFromSelect(ctx context.Context, cols []*table.C fields := selectExec.retTypes() chk := selectExec.newChunk() iter := chunk.NewIterator4Chunk(chk) - rows := make([][]types.Datum, 0, e.ctx.GetSessionVars().MaxChunkSize) + rows := make([][]types.Datum, 0, chk.Capacity()) sessVars := e.ctx.GetSessionVars() batchInsert := sessVars.BatchInsert && !sessVars.InTxn() diff --git a/executor/pkg_test.go b/executor/pkg_test.go index b19620d510ef5..38dedbcf63363 100644 --- a/executor/pkg_test.go +++ b/executor/pkg_test.go @@ -29,7 +29,7 @@ type MockExec struct { func (m *MockExec) Next(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() colTypes := m.retTypes() - for ; m.curRowIdx < len(m.Rows) && chk.NumRows() < m.maxChunkSize; m.curRowIdx++ { + for ; m.curRowIdx < len(m.Rows) && chk.NumRows() < chk.Capacity(); m.curRowIdx++ { curRow := m.Rows[m.curRowIdx] for i := 0; i < curRow.Len(); i++ { curDatum := curRow.ToRow().GetDatum(i, colTypes[i]) diff --git a/executor/point_get.go b/executor/point_get.go index a5c743a8405ba..2a3d8a1220782 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -196,5 +196,5 @@ func (e *PointGetExecutor) retTypes() []*types.FieldType { } func (e *PointGetExecutor) newChunk() *chunk.Chunk { - return chunk.NewChunkWithCapacity(e.retTypes(), 1) + return chunk.New(e.retTypes(), 1, 1) } diff --git a/executor/prepared.go b/executor/prepared.go index bab33f295089c..a57e48315fbb5 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -90,7 +90,7 @@ type PrepareExec struct { // NewPrepareExec creates a new PrepareExec. func NewPrepareExec(ctx sessionctx.Context, is infoschema.InfoSchema, sqlTxt string) *PrepareExec { return &PrepareExec{ - baseExecutor: newBaseExecutor(ctx, nil, "PrepareStmt"), + baseExecutor: newBaseExecutor(ctx, nil, "PrepareStmt").withInitCap(chunk.NoDataChunkCap), is: is, sqlText: sqlTxt, } diff --git a/executor/show.go b/executor/show.go index 8823d17d3d3f8..1b959ffaa7bfd 100644 --- a/executor/show.go +++ b/executor/show.go @@ -87,7 +87,7 @@ func (e *ShowExec) Next(ctx context.Context, chk *chunk.Chunk) error { if e.cursor >= e.result.NumRows() { return nil } - numCurBatch := mathutil.Min(e.maxChunkSize, e.result.NumRows()-e.cursor) + numCurBatch := mathutil.Min(chk.Capacity(), e.result.NumRows()-e.cursor) chk.Append(e.result, e.cursor, e.cursor+numCurBatch) e.cursor += numCurBatch return nil diff --git a/executor/union_scan.go b/executor/union_scan.go index 474a02819f9bf..df7a7813aed28 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -127,7 +127,7 @@ func (us *UnionScanExec) Open(ctx context.Context) error { func (us *UnionScanExec) Next(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() mutableRow := chunk.MutRowFromTypes(us.retTypes()) - for i, batchSize := 0, us.ctx.GetSessionVars().MaxChunkSize; i < batchSize; i++ { + for i, batchSize := 0, chk.Capacity(); i < batchSize; i++ { row, err := us.getOneRow(ctx) if err != nil { return errors.Trace(err) diff --git a/executor/update.go b/executor/update.go index 4ab7fc345e0fb..d5a36b592ab52 100644 --- a/executor/update.go +++ b/executor/update.go @@ -140,8 +140,8 @@ func (e *UpdateExec) Next(ctx context.Context, chk *chunk.Chunk) error { func (e *UpdateExec) fetchChunkRows(ctx context.Context) error { fields := e.children[0].retTypes() globalRowIdx := 0 + chk := e.children[0].newChunk() for { - chk := e.children[0].newChunk() err := e.children[0].Next(ctx, chk) if err != nil { return errors.Trace(err) @@ -162,6 +162,7 @@ func (e *UpdateExec) fetchChunkRows(ctx context.Context) error { e.newRowsData = append(e.newRowsData, newRow) globalRowIdx++ } + chk = chunk.Renew(chk, e.maxChunkSize) } return nil } diff --git a/privilege/privileges/cache.go b/privilege/privileges/cache.go index 100e86efba061..08974a2374ec9 100644 --- a/privilege/privileges/cache.go +++ b/privilege/privileges/cache.go @@ -277,11 +277,8 @@ func (p *MySQLPrivilege) loadTable(sctx sessionctx.Context, sql string, defer terror.Call(rs.Close) fs := rs.Fields() + chk := rs.NewChunk() for { - // NOTE: decodeTableRow decodes data from a chunk Row, that is a shallow copy. - // The result will reference memory in the chunk, so the chunk must not be reused - // here, otherwise some werid bug will happen! - chk := rs.NewChunk() err = rs.Next(context.TODO(), chk) if err != nil { return errors.Trace(err) @@ -296,6 +293,10 @@ func (p *MySQLPrivilege) loadTable(sctx sessionctx.Context, sql string, return errors.Trace(err) } } + // NOTE: decodeTableRow decodes data from a chunk Row, that is a shallow copy. + // The result will reference memory in the chunk, so the chunk must not be reused + // here, otherwise some werid bug will happen! + chk = chunk.Renew(chk, sctx.GetSessionVars().MaxChunkSize) } } diff --git a/server/conn.go b/server/conn.go index 2275f7170897f..1d98e297d9149 100644 --- a/server/conn.go +++ b/server/conn.go @@ -54,6 +54,7 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/util/arena" "github.com/pingcap/tidb/util/auth" @@ -1033,8 +1034,8 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet fetchedRows := rs.GetFetchedRows() // if fetchedRows is not enough, getting data from recordSet. + chk := rs.NewChunk() for len(fetchedRows) < fetchSize { - chk := rs.NewChunk() // Here server.tidbResultSet implements Next method. err := rs.Next(ctx, chk) if err != nil { @@ -1048,6 +1049,7 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet for i := 0; i < rowCount; i++ { fetchedRows = append(fetchedRows, chk.GetRow(i)) } + chk = chunk.Renew(chk, variable.DefMaxChunkSize) } // tell the client COM_STMT_FETCH has finished by setting proper serverStatus, diff --git a/session/session.go b/session/session.go index 04a0d0cc78f22..55a78d02a8970 100644 --- a/session/session.go +++ b/session/session.go @@ -555,7 +555,7 @@ func (s *session) ExecRestrictedSQL(sctx sessionctx.Context, sql string) ([]chun ) // Execute all recordset, take out the first one as result. for i, rs := range recordSets { - tmp, err := drainRecordSet(ctx, rs) + tmp, err := drainRecordSet(ctx, sctx, rs) if err != nil { return nil, nil, errors.Trace(err) } @@ -604,10 +604,10 @@ func createSessionWithDomainFunc(store kv.Storage) func(*domain.Domain) (pools.R } } -func drainRecordSet(ctx context.Context, rs ast.RecordSet) ([]chunk.Row, error) { +func drainRecordSet(ctx context.Context, sctx sessionctx.Context, rs ast.RecordSet) ([]chunk.Row, error) { var rows []chunk.Row + chk := rs.NewChunk() for { - chk := rs.NewChunk() err := rs.Next(ctx, chk) if err != nil || chk.NumRows() == 0 { return rows, errors.Trace(err) @@ -616,6 +616,11 @@ func drainRecordSet(ctx context.Context, rs ast.RecordSet) ([]chunk.Row, error) for r := iter.Begin(); r != iter.End(); r = iter.Next() { rows = append(rows, r) } + maxChunkSize := variable.DefMaxChunkSize + if sctx != nil { + maxChunkSize = sctx.GetSessionVars().MaxChunkSize + } + chk = chunk.Renew(chk, maxChunkSize) } } diff --git a/session/tidb.go b/session/tidb.go index 26ed871736e73..1357c3e753f01 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -199,9 +199,9 @@ func GetRows4Test(ctx context.Context, sctx sessionctx.Context, rs ast.RecordSet return nil, nil } var rows []chunk.Row + chk := rs.NewChunk() for { // Since we collect all the rows, we can not reuse the chunk. - chk := rs.NewChunk() iter := chunk.NewIterator4Chunk(chk) err := rs.Next(ctx, chk) @@ -215,6 +215,7 @@ func GetRows4Test(ctx context.Context, sctx sessionctx.Context, rs ast.RecordSet for row := iter.Begin(); row != iter.End(); row = iter.Next() { rows = append(rows, row) } + chk = chunk.Renew(chk, sctx.GetSessionVars().MaxChunkSize) } return rows, nil } diff --git a/util/chunk/chunk.go b/util/chunk/chunk.go index a9211cb79a416..92d3b8614809f 100644 --- a/util/chunk/chunk.go +++ b/util/chunk/chunk.go @@ -38,6 +38,7 @@ type Chunk struct { // Capacity constants. const ( InitialCapacity = 32 + NoDataChunkCap = -1 ) // NewChunkWithCapacity creates a new chunk with field types and capacity. @@ -50,6 +51,9 @@ func NewChunkWithCapacity(fields []*types.FieldType, cap int) *Chunk { // maxChunkSize: the max limit for the number of rows. func New(fields []*types.FieldType, cap, maxChunkSize int) *Chunk { chk := new(Chunk) + if cap == NoDataChunkCap { + return chk + } chk.columns = make([]*column, 0, len(fields)) chk.capacity = mathutil.Min(cap, maxChunkSize) for _, f := range fields { @@ -70,8 +74,11 @@ func New(fields []*types.FieldType, cap, maxChunkSize int) *Chunk { // chk: old chunk(often used in previous call). // maxChunkSize: the limit for the max number of rows. func Renew(chk *Chunk, maxChunkSize int) *Chunk { - newCap := reCalcCapacity(chk, maxChunkSize) newChk := new(Chunk) + if chk.columns == nil { + return newChk + } + newCap := reCalcCapacity(chk, maxChunkSize) newChk.columns = renewColumns(chk.columns, newCap) newChk.numVirtualRows = 0 newChk.capacity = newCap @@ -153,6 +160,9 @@ func (c *Chunk) SetNumVirtualRows(numVirtualRows int) { // Reset resets the chunk, so the memory it allocated can be reused. // Make sure all the data in the chunk is not used anymore before you reuse this chunk. func (c *Chunk) Reset() { + if c.columns == nil { + return + } for _, col := range c.columns { col.reset() } diff --git a/util/chunk/iterator_test.go b/util/chunk/iterator_test.go index ddbae1ebbec85..eb4f9d7c04cf7 100644 --- a/util/chunk/iterator_test.go +++ b/util/chunk/iterator_test.go @@ -21,7 +21,7 @@ import ( func (s *testChunkSuite) TestIterator(c *check.C) { fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} - chk := NewChunkWithCapacity(fields, 32) + chk := New(fields, 32, 1024) n := 10 var expected []int64 for i := 0; i < n; i++ { diff --git a/util/codec/bench_test.go b/util/codec/bench_test.go index d7e837fa92b69..4929458dcf994 100644 --- a/util/codec/bench_test.go +++ b/util/codec/bench_test.go @@ -82,7 +82,7 @@ func BenchmarkDecodeOneToChunk(b *testing.B) { raw = EncodeBytes(raw, str.GetBytes()) intType := types.NewFieldType(mysql.TypeLonglong) b.ResetTimer() - decoder := NewDecoder(chunk.NewChunkWithCapacity([]*types.FieldType{intType}, 32), nil) + decoder := NewDecoder(chunk.New([]*types.FieldType{intType}, 32, 32), nil) for i := 0; i < b.N; i++ { decoder.DecodeOne(raw, 0, intType) } diff --git a/util/codec/codec_test.go b/util/codec/codec_test.go index 667213a4e6524..07b38e79c5cbe 100644 --- a/util/codec/codec_test.go +++ b/util/codec/codec_test.go @@ -935,7 +935,7 @@ func (s *testCodecSuite) TestDecodeOneToChunk(c *C) { datums = append(datums, types.NewDatum(t.value)) } rowCount := 3 - decoder := NewDecoder(chunk.NewChunkWithCapacity(tps, 32), time.Local) + decoder := NewDecoder(chunk.New(tps, 32, 32), time.Local) for rowIdx := 0; rowIdx < rowCount; rowIdx++ { encoded, err := EncodeValue(sc, nil, datums...) c.Assert(err, IsNil) From bd6e630b0e63557995dcfd682a6a38d19d5c184a Mon Sep 17 00:00:00 2001 From: robi Date: Wed, 29 Aug 2018 19:32:29 +0800 Subject: [PATCH 03/12] *: use grow then reset. --- executor/executor.go | 14 +++++++------- executor/explain.go | 2 +- executor/load_data.go | 2 +- executor/load_stats.go | 2 +- executor/projection.go | 2 +- executor/show.go | 2 +- executor/union_scan.go | 2 +- 7 files changed, 13 insertions(+), 13 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index e15c6a1c557d7..4f3622454ace1 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -177,7 +177,7 @@ type CancelDDLJobsExec struct { // Next implements the Executor Next interface. func (e *CancelDDLJobsExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if e.cursor >= len(e.jobIDs) { return nil } @@ -270,7 +270,7 @@ func (e *ShowDDLJobQueriesExec) Open(ctx context.Context) error { // Next implements the Executor Next interface. func (e *ShowDDLJobQueriesExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if e.cursor >= len(e.jobs) { return nil } @@ -313,7 +313,7 @@ func (e *ShowDDLJobsExec) Open(ctx context.Context) error { // Next implements the Executor Next interface. func (e *ShowDDLJobsExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if e.cursor >= len(e.jobs) { return nil } @@ -514,7 +514,7 @@ func (e *SelectLockExec) Open(ctx context.Context) error { // Next implements the Executor Next interface. func (e *SelectLockExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) err := e.children[0].Next(ctx, chk) if err != nil { return errors.Trace(err) @@ -803,7 +803,7 @@ type TableScanExec struct { // Next implements the Executor Next interface. func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if e.isVirtualTable { return errors.Trace(e.nextChunk4InfoSchema(ctx, chk)) } @@ -826,7 +826,7 @@ func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error { } func (e *TableScanExec) nextChunk4InfoSchema(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if e.virtualTableChunkList == nil { e.virtualTableChunkList = chunk.NewList(e.retTypes(), e.initCap, e.maxChunkSize) columns := make([]*table.Column, e.schema.Len()) @@ -1045,7 +1045,7 @@ func (e *UnionExec) resultPuller(ctx context.Context, childID int) { // Next implements the Executor Next interface. func (e *UnionExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if !e.initialized { e.initialize(ctx) e.initialized = true diff --git a/executor/explain.go b/executor/explain.go index 500d7ef0dd6ef..afc3f871e0883 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -35,7 +35,7 @@ func (e *ExplainExec) Close() error { // Next implements the Executor Next interface. func (e *ExplainExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if e.cursor >= len(e.rows) { return nil } diff --git a/executor/load_data.go b/executor/load_data.go index 2d5b2b018f66b..652ad68d1aa09 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -50,7 +50,7 @@ func NewLoadDataInfo(ctx sessionctx.Context, row []types.Datum, tbl table.Table, // Next implements the Executor Next interface. func (e *LoadDataExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) // TODO: support load data without local field. if !e.IsLocal { return errors.New("Load Data: don't support load data without local field") diff --git a/executor/load_stats.go b/executor/load_stats.go index edad12113f5bc..968d3b10e0601 100644 --- a/executor/load_stats.go +++ b/executor/load_stats.go @@ -51,7 +51,7 @@ const LoadStatsVarKey loadStatsVarKeyType = 0 // Next implements the Executor Next interface. func (e *LoadStatsExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if len(e.info.Path) == 0 { return errors.New("Load Stats: file path is empty") } diff --git a/executor/projection.go b/executor/projection.go index 83609ed6f1893..f7642c111953f 100644 --- a/executor/projection.go +++ b/executor/projection.go @@ -139,7 +139,7 @@ func (e *ProjectionExec) Open(ctx context.Context) error { // +------------------------------+ +----------------------+ // func (e *ProjectionExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if e.isUnparallelExec() { return errors.Trace(e.unParallelExecute(ctx, chk)) } diff --git a/executor/show.go b/executor/show.go index 1b959ffaa7bfd..9c320e67dd23b 100644 --- a/executor/show.go +++ b/executor/show.go @@ -64,7 +64,7 @@ type ShowExec struct { // Next implements the Executor Next interface. func (e *ShowExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if e.result == nil { e.result = e.newChunk() err := e.fetchAll() diff --git a/executor/union_scan.go b/executor/union_scan.go index df7a7813aed28..fff8f2c5eed86 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -125,7 +125,7 @@ func (us *UnionScanExec) Open(ctx context.Context) error { // Next implements the Executor Next interface. func (us *UnionScanExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(us.maxChunkSize) mutableRow := chunk.MutRowFromTypes(us.retTypes()) for i, batchSize := 0, chk.Capacity(); i < batchSize; i++ { row, err := us.getOneRow(ctx) From 2e99831ceae26abf8f0392617fe4acbc6d472214 Mon Sep 17 00:00:00 2001 From: robi Date: Wed, 29 Aug 2018 21:12:09 +0800 Subject: [PATCH 04/12] *: use maxChunkSize in sessionVars as possible --- executor/executor.go | 3 +-- server/conn.go | 3 +-- server/driver.go | 4 ++++ server/driver_tidb.go | 6 ++++++ session/session.go | 9 +++++---- 5 files changed, 17 insertions(+), 8 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 4f3622454ace1..2b49c86bdcfe2 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/model" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/terror" @@ -655,7 +654,7 @@ func init() { row := r.GetDatumRow(exec.retTypes()) rows = append(rows, row) } - chk = chunk.Renew(chk, variable.DefMaxChunkSize) + chk = chunk.Renew(chk, sctx.GetSessionVars().MaxChunkSize) } } } diff --git a/server/conn.go b/server/conn.go index 1d98e297d9149..df2fa182ebe93 100644 --- a/server/conn.go +++ b/server/conn.go @@ -54,7 +54,6 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/util/arena" "github.com/pingcap/tidb/util/auth" @@ -1049,7 +1048,7 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet for i := 0; i < rowCount; i++ { fetchedRows = append(fetchedRows, chk.GetRow(i)) } - chk = chunk.Renew(chk, variable.DefMaxChunkSize) + chk = chunk.Renew(chk, cc.ctx.GetSessionVars().MaxChunkSize) } // tell the client COM_STMT_FETCH has finished by setting proper serverStatus, diff --git a/server/driver.go b/server/driver.go index 905d8aec3f959..ebc2d7278ccc0 100644 --- a/server/driver.go +++ b/server/driver.go @@ -17,6 +17,7 @@ import ( "crypto/tls" "fmt" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/auth" "github.com/pingcap/tidb/util/chunk" @@ -82,6 +83,9 @@ type QueryCtx interface { // ShowProcess shows the information about the session. ShowProcess() util.ProcessInfo + // GetSessionVars return SessionVars. + GetSessionVars() *variable.SessionVars + SetSessionManager(util.SessionManager) } diff --git a/server/driver_tidb.go b/server/driver_tidb.go index 6a583bd357c6f..103556477765c 100644 --- a/server/driver_tidb.go +++ b/server/driver_tidb.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" @@ -324,6 +325,11 @@ func (tc *TiDBContext) ShowProcess() util.ProcessInfo { return tc.session.ShowProcess() } +// GetSessionVars return SessionVars. +func (tc *TiDBContext) GetSessionVars() *variable.SessionVars { + return tc.session.GetSessionVars() +} + type tidbResultSet struct { recordSet ast.RecordSet columns []*ColumnInfo diff --git a/session/session.go b/session/session.go index 55a78d02a8970..735bd4fd2283f 100644 --- a/session/session.go +++ b/session/session.go @@ -616,11 +616,12 @@ func drainRecordSet(ctx context.Context, sctx sessionctx.Context, rs ast.RecordS for r := iter.Begin(); r != iter.End(); r = iter.Next() { rows = append(rows, r) } - maxChunkSize := variable.DefMaxChunkSize - if sctx != nil { - maxChunkSize = sctx.GetSessionVars().MaxChunkSize + if sctx == nil { + // sadness, statistic will pass nil sctx + chk = chunk.Renew(chk, variable.DefMaxChunkSize) + continue } - chk = chunk.Renew(chk, maxChunkSize) + chk = chunk.Renew(chk, sctx.GetSessionVars().MaxChunkSize) } } From 6eb8058bf89740b8fd85af1b3984a412b496fe62 Mon Sep 17 00:00:00 2001 From: robi Date: Mon, 3 Sep 2018 21:54:53 +0800 Subject: [PATCH 05/12] rebase and fix PhySort no-longer Count' --- executor/builder.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 218d9ae027864..bf2d7c19cd34d 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1125,9 +1125,8 @@ func (b *executorBuilder) buildSort(v *plannercore.PhysicalSort) Executor { b.err = errors.Trace(b.err) return nil } - n := int(mathutil.MinUint64(v.Count, math.MaxInt64)) sortExec := SortExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec).withInitCap(n), + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec), ByItems: v.ByItems, schema: v.Schema(), } From 4a076a9df269c626cea22ab993767317610067e2 Mon Sep 17 00:00:00 2001 From: robi Date: Thu, 6 Sep 2018 21:41:55 +0800 Subject: [PATCH 06/12] address comment --- executor/builder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/builder.go b/executor/builder.go index bf2d7c19cd34d..8feec862aed07 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -422,7 +422,7 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor { b.err = errors.Trace(b.err) return nil } - n := int(mathutil.MinUint64(v.Count, math.MaxInt64)) + n := int(mathutil.MinUint64(v.Count, uint64(b.ctx.GetSessionVars().MaxChunkSize))) e := &LimitExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec).withInitCap(n), begin: v.Offset, From ea5f4eccdae5ac3499cb8472e8e354498a4f20af Mon Sep 17 00:00:00 2001 From: robi Date: Mon, 10 Sep 2018 08:57:32 +0000 Subject: [PATCH 07/12] address comment --- distsql/distsql_test.go | 2 +- executor/executor.go | 6 ++---- executor/prepared.go | 4 +++- session/session.go | 11 +++-------- 4 files changed, 9 insertions(+), 14 deletions(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 99825557c15d0..cafcfcbdc50ce 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -70,7 +70,7 @@ func (s *testSuite) TestSelectNormal(c *C) { response.Fetch(context.TODO()) // Test Next. - chk := chunk.New(colTypes, 32, 32*3) + chk := chunk.New(colTypes, 32, 32) numAllRows := 0 for { err = response.Next(context.TODO(), chk) diff --git a/executor/executor.go b/executor/executor.go index 2b49c86bdcfe2..327fc9b778070 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -135,14 +135,12 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin return e } -func (e baseExecutor) withInitCap(initCap int) (ret baseExecutor) { +func (e *baseExecutor) setInitCap(initCap int) { e.initCap = initCap - return e } -func (e baseExecutor) withMaxChunkSize(maxChunkSize int) (ret baseExecutor) { +func (e *baseExecutor) setMaxChunkSize(maxChunkSize int) { e.maxChunkSize = maxChunkSize - return e } // Executor is the physical implementation of a algebra operator. diff --git a/executor/prepared.go b/executor/prepared.go index a57e48315fbb5..cdcadc5601ca9 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -89,8 +89,10 @@ type PrepareExec struct { // NewPrepareExec creates a new PrepareExec. func NewPrepareExec(ctx sessionctx.Context, is infoschema.InfoSchema, sqlTxt string) *PrepareExec { + base := newBaseExecutor(ctx, nil, "PrepareStmt") + base.setInitCap(chunk.NoDataChunkCap) return &PrepareExec{ - baseExecutor: newBaseExecutor(ctx, nil, "PrepareStmt").withInitCap(chunk.NoDataChunkCap), + baseExecutor: base, is: is, sqlText: sqlTxt, } diff --git a/session/session.go b/session/session.go index 735bd4fd2283f..cf44975b59e15 100644 --- a/session/session.go +++ b/session/session.go @@ -555,7 +555,7 @@ func (s *session) ExecRestrictedSQL(sctx sessionctx.Context, sql string) ([]chun ) // Execute all recordset, take out the first one as result. for i, rs := range recordSets { - tmp, err := drainRecordSet(ctx, sctx, rs) + tmp, err := drainRecordSet(ctx, se, rs) if err != nil { return nil, nil, errors.Trace(err) } @@ -604,7 +604,7 @@ func createSessionWithDomainFunc(store kv.Storage) func(*domain.Domain) (pools.R } } -func drainRecordSet(ctx context.Context, sctx sessionctx.Context, rs ast.RecordSet) ([]chunk.Row, error) { +func drainRecordSet(ctx context.Context, se *session, rs ast.RecordSet) ([]chunk.Row, error) { var rows []chunk.Row chk := rs.NewChunk() for { @@ -616,12 +616,7 @@ func drainRecordSet(ctx context.Context, sctx sessionctx.Context, rs ast.RecordS for r := iter.Begin(); r != iter.End(); r = iter.Next() { rows = append(rows, r) } - if sctx == nil { - // sadness, statistic will pass nil sctx - chk = chunk.Renew(chk, variable.DefMaxChunkSize) - continue - } - chk = chunk.Renew(chk, sctx.GetSessionVars().MaxChunkSize) + chk = chunk.Renew(chk, se.sessionVars.MaxChunkSize) } } From f7a85c55d35dd291f3b1d48eeda8c8de01f571c4 Mon Sep 17 00:00:00 2001 From: robi Date: Mon, 10 Sep 2018 09:04:13 +0000 Subject: [PATCH 08/12] address comment --- distsql/distsql_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index cafcfcbdc50ce..b06dd9afa6c56 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -122,7 +122,7 @@ func (s *testSuite) TestSelectStreaming(c *C) { response.Fetch(context.TODO()) // Test Next. - chk := chunk.New(colTypes, 32, 32*3) + chk := chunk.New(colTypes, 32, 32) numAllRows := 0 for { err = response.Next(context.TODO(), chk) From cf3502cbba14479d72c29fd5e3ba878d433339b0 Mon Sep 17 00:00:00 2001 From: robi Date: Mon, 10 Sep 2018 09:42:41 +0000 Subject: [PATCH 09/12] address comment --- executor/executor.go | 8 -------- executor/prepared.go | 2 +- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 327fc9b778070..bd8fb415047d2 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -135,14 +135,6 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin return e } -func (e *baseExecutor) setInitCap(initCap int) { - e.initCap = initCap -} - -func (e *baseExecutor) setMaxChunkSize(maxChunkSize int) { - e.maxChunkSize = maxChunkSize -} - // Executor is the physical implementation of a algebra operator. // // In TiDB, all algebra operators are implemented as iterators, i.e., they diff --git a/executor/prepared.go b/executor/prepared.go index cdcadc5601ca9..d6e6dfd614de5 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -90,7 +90,7 @@ type PrepareExec struct { // NewPrepareExec creates a new PrepareExec. func NewPrepareExec(ctx sessionctx.Context, is infoschema.InfoSchema, sqlTxt string) *PrepareExec { base := newBaseExecutor(ctx, nil, "PrepareStmt") - base.setInitCap(chunk.NoDataChunkCap) + base.initCap = chunk.NoDataChunkCap return &PrepareExec{ baseExecutor: base, is: is, From 4efa618fd2154f1c6bc0139bf402b1397e9c5ffb Mon Sep 17 00:00:00 2001 From: robi Date: Tue, 25 Sep 2018 16:49:31 +0800 Subject: [PATCH 10/12] fix rebase problem --- executor/builder.go | 44 +++++++++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 8feec862aed07..c3884cf23bc5a 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -389,8 +389,10 @@ func (b *executorBuilder) buildChecksumTable(v *plannercore.ChecksumTable) Execu } func (b *executorBuilder) buildDeallocate(v *plannercore.Deallocate) Executor { + base := newBaseExecutor(b.ctx, nil, v.ExplainID()) + base.initCap = chunk.NoDataChunkCap e := &DeallocateExec{ - baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID()).withInitCap(chunk.NoDataChunkCap), + baseExecutor: base, Name: v.Name, } return e @@ -423,8 +425,10 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor { return nil } n := int(mathutil.MinUint64(v.Count, uint64(b.ctx.GetSessionVars().MaxChunkSize))) + base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec) + base.initCap = n e := &LimitExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec).withInitCap(n), + baseExecutor: base, begin: v.Offset, end: v.Offset + v.Count, } @@ -432,8 +436,10 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor { } func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor { + base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()) + base.initCap = chunk.NoDataChunkCap e := &PrepareExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()).withInitCap(chunk.NoDataChunkCap), + baseExecutor: base, is: b.is, name: v.Name, sqlText: v.SQLText, @@ -487,8 +493,10 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor { case *ast.RevokeStmt: return b.buildRevoke(s) } + base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()) + base.initCap = chunk.NoDataChunkCap e := &SimpleExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()).withInitCap(chunk.NoDataChunkCap), + baseExecutor: base, Statement: v.Statement, is: b.is, } @@ -496,8 +504,10 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor { } func (b *executorBuilder) buildSet(v *plannercore.Set) Executor { + base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()) + base.initCap = chunk.NoDataChunkCap e := &SetExecutor{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()).withInitCap(chunk.NoDataChunkCap), + baseExecutor: base, vars: v.VarAssigns, } return e @@ -511,10 +521,11 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor { } var baseExec baseExecutor if selectExec != nil { - baseExec = newBaseExecutor(b.ctx, nil, v.ExplainID(), selectExec).withInitCap(chunk.NoDataChunkCap) + baseExec = newBaseExecutor(b.ctx, nil, v.ExplainID(), selectExec) } else { - baseExec = newBaseExecutor(b.ctx, nil, v.ExplainID()).withInitCap(chunk.NoDataChunkCap) + baseExec = newBaseExecutor(b.ctx, nil, v.ExplainID()) } + baseExec.initCap = chunk.NoDataChunkCap ivs := &InsertValues{ baseExecutor: baseExec, @@ -1084,8 +1095,10 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu b.err = errors.Errorf("buildTableDual failed, invalid row count for dual table: %v", v.RowCount) return nil } + base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()) + base.initCap = v.RowCount e := &TableDualExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()).withInitCap(v.RowCount), + baseExecutor: base, numDualRows: v.RowCount, } // Init the startTS for later use. @@ -1202,9 +1215,10 @@ func (b *executorBuilder) buildMaxOneRow(v *plannercore.PhysicalMaxOneRow) Execu b.err = errors.Trace(b.err) return nil } - e := &MaxOneRowExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec).withInitCap(2).withMaxChunkSize(2), - } + base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec) + base.initCap = 2 + base.maxChunkSize = 2 + e := &MaxOneRowExec{baseExecutor: base} return e } @@ -1234,8 +1248,10 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor { return nil } columns2Handle := buildColumns2Handle(v.SelectPlan.Schema(), tblID2table) + base := newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec) + base.initCap = chunk.NoDataChunkCap updateExec := &UpdateExec{ - baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec).withInitCap(chunk.NoDataChunkCap), + baseExecutor: base, SelectExec: selExec, OrderedList: v.OrderedList, tblID2table: tblID2table, @@ -1313,8 +1329,10 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor { b.err = errors.Trace(b.err) return nil } + base := newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec) + base.initCap = chunk.NoDataChunkCap deleteExec := &DeleteExec{ - baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec).withInitCap(chunk.NoDataChunkCap), + baseExecutor: base, SelectExec: selExec, Tables: v.Tables, IsMultiTable: v.IsMultiTable, From 03029d28f6195429df39294d58980add1413b70e Mon Sep 17 00:00:00 2001 From: robi Date: Wed, 26 Sep 2018 21:22:48 +0800 Subject: [PATCH 11/12] address comment --- executor/adapter.go | 4 ++-- executor/admin.go | 2 +- executor/aggregate.go | 10 +++++----- executor/builder.go | 16 ++++++++-------- executor/delete.go | 4 ++-- executor/distsql.go | 2 +- executor/executor.go | 18 +++++++++--------- executor/executor_pkg_test.go | 2 +- executor/index_lookup_join.go | 6 +++--- executor/insert_common.go | 2 +- executor/join.go | 10 +++++----- executor/merge_join.go | 4 ++-- executor/pkg_test.go | 6 +++--- executor/point_get.go | 2 +- executor/prepared.go | 2 +- executor/projection.go | 6 +++--- executor/show.go | 2 +- executor/sort.go | 6 +++--- executor/trace.go | 2 +- executor/union_scan.go | 2 +- executor/update.go | 2 +- util/chunk/chunk.go | 4 ++-- 22 files changed, 57 insertions(+), 57 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 7cc90cc84001e..34b317cd9f883 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -112,7 +112,7 @@ func (a *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error { // NewChunk create a new chunk using NewChunk function in chunk package. func (a *recordSet) NewChunk() *chunk.Chunk { - return a.executor.newChunk() + return a.executor.newFirstChunk() } func (a *recordSet) Close() error { @@ -270,7 +270,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co a.logSlowQuery(txnTS, err == nil) }() - err = e.Next(ctx, e.newChunk()) + err = e.Next(ctx, e.newFirstChunk()) if err != nil { return nil, errors.Trace(err) } diff --git a/executor/admin.go b/executor/admin.go index 51054ff1a0591..1fe645ae15de3 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -102,7 +102,7 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error { FieldType: *colTypeForHandle, }) - e.srcChunk = e.newChunk() + e.srcChunk = e.newFirstChunk() dagPB, err := e.buildDAGPB() if err != nil { return errors.Trace(err) diff --git a/executor/aggregate.go b/executor/aggregate.go index b2c6753539f3e..d28d5a4c404f7 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -235,7 +235,7 @@ func (e *HashAggExec) initForUnparallelExec() { e.partialResultMap = make(aggPartialResultMapper, 0) e.groupKeyBuffer = make([]byte, 0, 8) e.groupValDatums = make([]types.Datum, 0, len(e.groupKeyBuffer)) - e.childResult = e.children[0].newChunk() + e.childResult = e.children[0].newFirstChunk() } func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { @@ -270,12 +270,12 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { partialResultsMap: make(aggPartialResultMapper, 0), groupByItems: e.GroupByItems, groupValDatums: make([]types.Datum, 0, len(e.GroupByItems)), - chk: e.children[0].newChunk(), + chk: e.children[0].newFirstChunk(), } e.partialWorkers[i] = w e.inputCh <- &HashAggInput{ - chk: e.children[0].newChunk(), + chk: e.children[0].newFirstChunk(), giveBackCh: w.inputCh, } } @@ -292,7 +292,7 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { rowBuffer: make([]types.Datum, 0, e.Schema().Len()), mutableRow: chunk.MutRowFromTypes(e.retTypes()), } - e.finalWorkers[i].finalResultHolderCh <- e.newChunk() + e.finalWorkers[i].finalResultHolderCh <- e.newFirstChunk() } } @@ -734,7 +734,7 @@ func (e *StreamAggExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } - e.childResult = e.children[0].newChunk() + e.childResult = e.children[0].newFirstChunk() e.executed = false e.isChildReturnEmpty = true e.inputIter = chunk.NewIterator4Chunk(e.childResult) diff --git a/executor/builder.go b/executor/builder.go index c3884cf23bc5a..6def83fb827c1 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -390,7 +390,7 @@ func (b *executorBuilder) buildChecksumTable(v *plannercore.ChecksumTable) Execu func (b *executorBuilder) buildDeallocate(v *plannercore.Deallocate) Executor { base := newBaseExecutor(b.ctx, nil, v.ExplainID()) - base.initCap = chunk.NoDataChunkCap + base.initCap = chunk.ZeroCapacity e := &DeallocateExec{ baseExecutor: base, Name: v.Name, @@ -437,7 +437,7 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor { func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor { base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()) - base.initCap = chunk.NoDataChunkCap + base.initCap = chunk.ZeroCapacity e := &PrepareExec{ baseExecutor: base, is: b.is, @@ -494,7 +494,7 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor { return b.buildRevoke(s) } base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()) - base.initCap = chunk.NoDataChunkCap + base.initCap = chunk.ZeroCapacity e := &SimpleExec{ baseExecutor: base, Statement: v.Statement, @@ -505,7 +505,7 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor { func (b *executorBuilder) buildSet(v *plannercore.Set) Executor { base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()) - base.initCap = chunk.NoDataChunkCap + base.initCap = chunk.ZeroCapacity e := &SetExecutor{ baseExecutor: base, vars: v.VarAssigns, @@ -525,7 +525,7 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor { } else { baseExec = newBaseExecutor(b.ctx, nil, v.ExplainID()) } - baseExec.initCap = chunk.NoDataChunkCap + baseExec.initCap = chunk.ZeroCapacity ivs := &InsertValues{ baseExecutor: baseExec, @@ -1249,7 +1249,7 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor { } columns2Handle := buildColumns2Handle(v.SelectPlan.Schema(), tblID2table) base := newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec) - base.initCap = chunk.NoDataChunkCap + base.initCap = chunk.ZeroCapacity updateExec := &UpdateExec{ baseExecutor: base, SelectExec: selExec, @@ -1330,7 +1330,7 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor { return nil } base := newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec) - base.initCap = chunk.NoDataChunkCap + base.initCap = chunk.ZeroCapacity deleteExec := &DeleteExec{ baseExecutor: base, SelectExec: selExec, @@ -1553,7 +1553,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin) innerKeyCols[i] = v.InnerJoinKeys[i].Index } e.innerCtx.keyCols = innerKeyCols - e.joinResult = e.newChunk() + e.joinResult = e.newFirstChunk() metrics.ExecutorCounter.WithLabelValues("IndexLookUpJoin").Inc() return e } diff --git a/executor/delete.go b/executor/delete.go index 50191d21ab006..59c9d1eee5fc7 100644 --- a/executor/delete.go +++ b/executor/delete.go @@ -104,7 +104,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error { batchDelete := e.ctx.GetSessionVars().BatchDelete && !e.ctx.GetSessionVars().InTxn() batchDMLSize := e.ctx.GetSessionVars().DMLBatchSize fields := e.children[0].retTypes() - chk := e.children[0].newChunk() + chk := e.children[0].newFirstChunk() for { iter := chunk.NewIterator4Chunk(chk) @@ -185,7 +185,7 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error { colPosInfos := e.getColPosInfos(e.children[0].Schema()) tblRowMap := make(tableRowMapType) fields := e.children[0].retTypes() - chk := e.children[0].newChunk() + chk := e.children[0].newFirstChunk() for { iter := chunk.NewIterator4Chunk(chk) err := e.children[0].Next(ctx, chk) diff --git a/executor/distsql.go b/executor/distsql.go index 19bb507317f24..d262d8b57bbb0 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -711,7 +711,7 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er handleCnt := len(task.handles) task.rows = make([]chunk.Row, 0, handleCnt) for { - chk := tableReader.newChunk() + chk := tableReader.newFirstChunk() err = tableReader.Next(ctx, chk) if err != nil { log.Error(err) diff --git a/executor/executor.go b/executor/executor.go index bd8fb415047d2..6352b7d99d421 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -101,8 +101,8 @@ func (e *baseExecutor) Schema() *expression.Schema { return e.schema } -// newChunk creates a new chunk to buffer current executor's result. -func (e *baseExecutor) newChunk() *chunk.Chunk { +// newFirstChunk creates a new chunk to buffer current executor's result. +func (e *baseExecutor) newFirstChunk() *chunk.Chunk { return chunk.New(e.retTypes(), e.initCap, e.maxChunkSize) } @@ -152,7 +152,7 @@ type Executor interface { Schema() *expression.Schema retTypes() []*types.FieldType - newChunk() *chunk.Chunk + newFirstChunk() *chunk.Chunk } // CancelDDLJobsExec represents a cancel DDL jobs executor. @@ -461,7 +461,7 @@ func (e *CheckIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error { if err != nil { return errors.Trace(err) } - chk = e.src.newChunk() + chk = e.src.newFirstChunk() for { err := e.src.Next(ctx, chk) if err != nil { @@ -598,7 +598,7 @@ func (e *LimitExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } - e.childResult = e.children[0].newChunk() + e.childResult = e.children[0].newFirstChunk() e.cursor = 0 e.meetFirstBatch = e.begin == 0 return nil @@ -630,7 +630,7 @@ func init() { if err != nil { return rows, errors.Trace(err) } - chk := exec.newChunk() + chk := exec.newFirstChunk() for { err = exec.Next(ctx, chk) if err != nil { @@ -698,7 +698,7 @@ func (e *SelectionExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } - e.childResult = e.children[0].newChunk() + e.childResult = e.children[0].newFirstChunk() e.batched = expression.Vectorizable(e.filters) if e.batched { e.selected = make([]bool, 0, chunk.InitialCapacity) @@ -911,7 +911,7 @@ func (e *MaxOneRowExec) Next(ctx context.Context, chk *chunk.Chunk) error { return errors.New("subquery returns more than 1 row") } - childChunk := e.children[0].newChunk() + childChunk := e.children[0].newFirstChunk() err = e.children[0].Next(ctx, childChunk) if childChunk.NumRows() != 0 { return errors.New("subquery returns more than 1 row") @@ -973,7 +973,7 @@ func (e *UnionExec) Open(ctx context.Context) error { return errors.Trace(err) } for _, child := range e.children { - e.childrenResults = append(e.childrenResults, child.newChunk()) + e.childrenResults = append(e.childrenResults, child.newFirstChunk()) } e.stopFetchData.Store(false) e.initialized = false diff --git a/executor/executor_pkg_test.go b/executor/executor_pkg_test.go index c399d4671cb77..ef1a158fc0801 100644 --- a/executor/executor_pkg_test.go +++ b/executor/executor_pkg_test.go @@ -89,7 +89,7 @@ func (s *testExecSuite) TestShowProcessList(c *C) { err := e.Open(ctx) c.Assert(err, IsNil) - chk := e.newChunk() + chk := e.newFirstChunk() it := chunk.NewIterator4Chunk(chk) // Run test and check results. for _, p := range ps { diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 817874f66ebfe..a0bc75994bc92 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -317,11 +317,11 @@ func (ow *outerWorker) pushToChan(ctx context.Context, task *lookUpJoinTask, dst // buildTask builds a lookUpJoinTask and read outer rows. // When err is not nil, task must not be nil to send the error to the main thread via task. func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) { - ow.executor.newChunk() + ow.executor.newFirstChunk() task := &lookUpJoinTask{ doneCh: make(chan error, 1), - outerResult: ow.executor.newChunk(), + outerResult: ow.executor.newFirstChunk(), encodedLookUpKeys: chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, ow.ctx.GetSessionVars().MaxChunkSize), lookupMap: mvmap.NewMVMap(), } @@ -523,7 +523,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa break } innerResult.Add(iw.executorChk) - iw.executorChk = innerExec.newChunk() + iw.executorChk = innerExec.newFirstChunk() } task.innerResult = innerResult return nil diff --git a/executor/insert_common.go b/executor/insert_common.go index a538dac02e74b..628bead17cfbe 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -250,7 +250,7 @@ func (e *InsertValues) insertRowsFromSelect(ctx context.Context, cols []*table.C // process `insert|replace into ... select ... from ...` selectExec := e.children[0] fields := selectExec.retTypes() - chk := selectExec.newChunk() + chk := selectExec.newFirstChunk() iter := chunk.NewIterator4Chunk(chk) rows := make([][]types.Datum, 0, chk.Capacity()) diff --git a/executor/join.go b/executor/join.go index 5b66edd6d501b..a979790fe2af7 100644 --- a/executor/join.go +++ b/executor/join.go @@ -262,7 +262,7 @@ func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.C if e.finished.Load().(bool) { return } - chk := e.children[e.innerIdx].newChunk() + chk := e.children[e.innerIdx].newFirstChunk() err = e.innerExec.Next(ctx, chk) if err != nil { e.innerFinished <- errors.Trace(err) @@ -289,7 +289,7 @@ func (e *HashJoinExec) initializeForProbe() { e.outerChkResourceCh = make(chan *outerChkResource, e.concurrency) for i := uint(0); i < e.concurrency; i++ { e.outerChkResourceCh <- &outerChkResource{ - chk: e.outerExec.newChunk(), + chk: e.outerExec.newFirstChunk(), dest: e.outerResultChs[i], } } @@ -299,7 +299,7 @@ func (e *HashJoinExec) initializeForProbe() { e.joinChkResourceCh = make([]chan *chunk.Chunk, e.concurrency) for i := uint(0); i < e.concurrency; i++ { e.joinChkResourceCh[i] = make(chan *chunk.Chunk, 1) - e.joinChkResourceCh[i] <- e.newChunk() + e.joinChkResourceCh[i] <- e.newFirstChunk() } // e.joinResultCh is for transmitting the join result chunks to the main thread. @@ -620,8 +620,8 @@ func (e *NestedLoopApplyExec) Open(ctx context.Context) error { } e.cursor = 0 e.innerRows = e.innerRows[:0] - e.outerChunk = e.outerExec.newChunk() - e.innerChunk = e.innerExec.newChunk() + e.outerChunk = e.outerExec.newFirstChunk() + e.innerChunk = e.innerExec.newFirstChunk() e.innerList = chunk.NewList(e.innerExec.retTypes(), e.initCap, e.maxChunkSize) e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaNestedLoopApply) diff --git a/executor/merge_join.go b/executor/merge_join.go index 134be86d410c8..56f2102e763e2 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -179,7 +179,7 @@ func (t *mergeJoinInnerTable) reallocReaderResult() { // Create a new Chunk and append it to "resourceQueue" if there is no more // available chunk in "resourceQueue". if len(t.resourceQueue) == 0 { - newChunk := t.reader.newChunk() + newChunk := t.reader.newFirstChunk() t.memTracker.Consume(newChunk.MemoryUsage()) t.resourceQueue = append(t.resourceQueue, newChunk) } @@ -214,7 +214,7 @@ func (e *MergeJoinExec) Open(ctx context.Context) error { e.childrenResults = make([]*chunk.Chunk, 0, len(e.children)) for _, child := range e.children { - e.childrenResults = append(e.childrenResults, child.newChunk()) + e.childrenResults = append(e.childrenResults, child.newFirstChunk()) } e.innerTable.memTracker = memory.NewTracker("innerTable", -1) diff --git a/executor/pkg_test.go b/executor/pkg_test.go index 38dedbcf63363..e75e5fade8a55 100644 --- a/executor/pkg_test.go +++ b/executor/pkg_test.go @@ -92,9 +92,9 @@ func (s *pkgTestSuite) TestNestedLoopApply(c *C) { joiner: joiner, } join.innerList = chunk.NewList(innerExec.retTypes(), innerExec.initCap, innerExec.maxChunkSize) - join.innerChunk = innerExec.newChunk() - join.outerChunk = outerExec.newChunk() - joinChk := join.newChunk() + join.innerChunk = innerExec.newFirstChunk() + join.outerChunk = outerExec.newFirstChunk() + joinChk := join.newFirstChunk() it := chunk.NewIterator4Chunk(joinChk) for rowIdx := 1; ; { err := join.Next(ctx, joinChk) diff --git a/executor/point_get.go b/executor/point_get.go index 2a3d8a1220782..d2db04d830350 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -195,6 +195,6 @@ func (e *PointGetExecutor) retTypes() []*types.FieldType { return e.tps } -func (e *PointGetExecutor) newChunk() *chunk.Chunk { +func (e *PointGetExecutor) newFirstChunk() *chunk.Chunk { return chunk.New(e.retTypes(), 1, 1) } diff --git a/executor/prepared.go b/executor/prepared.go index d6e6dfd614de5..2e629c7971661 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -90,7 +90,7 @@ type PrepareExec struct { // NewPrepareExec creates a new PrepareExec. func NewPrepareExec(ctx sessionctx.Context, is infoschema.InfoSchema, sqlTxt string) *PrepareExec { base := newBaseExecutor(ctx, nil, "PrepareStmt") - base.initCap = chunk.NoDataChunkCap + base.initCap = chunk.ZeroCapacity return &PrepareExec{ baseExecutor: base, is: is, diff --git a/executor/projection.go b/executor/projection.go index f7642c111953f..168ce32f39914 100644 --- a/executor/projection.go +++ b/executor/projection.go @@ -75,7 +75,7 @@ func (e *ProjectionExec) Open(ctx context.Context) error { } if e.isUnparallelExec() { - e.childResult = e.children[0].newChunk() + e.childResult = e.children[0].newFirstChunk() } return nil @@ -207,11 +207,11 @@ func (e *ProjectionExec) prepare(ctx context.Context) { }) e.fetcher.inputCh <- &projectionInput{ - chk: e.children[0].newChunk(), + chk: e.children[0].newFirstChunk(), targetWorker: e.workers[i], } e.fetcher.outputCh <- &projectionOutput{ - chk: e.newChunk(), + chk: e.newFirstChunk(), done: make(chan error, 1), } } diff --git a/executor/show.go b/executor/show.go index 9c320e67dd23b..7deee4a51f2c5 100644 --- a/executor/show.go +++ b/executor/show.go @@ -66,7 +66,7 @@ type ShowExec struct { func (e *ShowExec) Next(ctx context.Context, chk *chunk.Chunk) error { chk.GrowAndReset(e.maxChunkSize) if e.result == nil { - e.result = e.newChunk() + e.result = e.newFirstChunk() err := e.fetchAll() if err != nil { return errors.Trace(err) diff --git a/executor/sort.go b/executor/sort.go index 26551865f0345..95b9ecac29d38 100644 --- a/executor/sort.go +++ b/executor/sort.go @@ -111,7 +111,7 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error { e.rowChunks.GetMemTracker().AttachTo(e.memTracker) e.rowChunks.GetMemTracker().SetLabel("rowChunks") for { - chk := e.children[0].newChunk() + chk := e.children[0].newFirstChunk() err := e.children[0].Next(ctx, chk) if err != nil { return errors.Trace(err) @@ -327,7 +327,7 @@ func (e *TopNExec) loadChunksUntilTotalLimit(ctx context.Context) error { e.rowChunks.GetMemTracker().AttachTo(e.memTracker) e.rowChunks.GetMemTracker().SetLabel("rowChunks") for e.rowChunks.Len() < e.totalLimit { - srcChk := e.children[0].newChunk() + srcChk := e.children[0].newFirstChunk() err := e.children[0].Next(ctx, srcChk) if err != nil { return errors.Trace(err) @@ -362,7 +362,7 @@ func (e *TopNExec) executeTopN(ctx context.Context) error { if e.keyChunks != nil { childKeyChk = chunk.NewChunkWithCapacity(e.keyTypes, e.maxChunkSize) } - childRowChk := e.children[0].newChunk() + childRowChk := e.children[0].newFirstChunk() for { err := e.children[0].Next(ctx, childRowChk) if err != nil { diff --git a/executor/trace.go b/executor/trace.go index 85204f53a04d0..588df29065c1b 100644 --- a/executor/trace.go +++ b/executor/trace.go @@ -72,7 +72,7 @@ func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error { if err != nil { return errors.Trace(err) } - stmtExecChk := stmtExec.newChunk() + stmtExecChk := stmtExec.newFirstChunk() // store span into context ctx = opentracing.ContextWithSpan(ctx, e.rootTrace) diff --git a/executor/union_scan.go b/executor/union_scan.go index fff8f2c5eed86..3c1682936d71d 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -119,7 +119,7 @@ func (us *UnionScanExec) Open(ctx context.Context) error { if err := us.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } - us.snapshotChunkBuffer = us.newChunk() + us.snapshotChunkBuffer = us.newFirstChunk() return nil } diff --git a/executor/update.go b/executor/update.go index d5a36b592ab52..de60b92ab8a01 100644 --- a/executor/update.go +++ b/executor/update.go @@ -140,7 +140,7 @@ func (e *UpdateExec) Next(ctx context.Context, chk *chunk.Chunk) error { func (e *UpdateExec) fetchChunkRows(ctx context.Context) error { fields := e.children[0].retTypes() globalRowIdx := 0 - chk := e.children[0].newChunk() + chk := e.children[0].newFirstChunk() for { err := e.children[0].Next(ctx, chk) if err != nil { diff --git a/util/chunk/chunk.go b/util/chunk/chunk.go index 92d3b8614809f..3a1ed4238d5b0 100644 --- a/util/chunk/chunk.go +++ b/util/chunk/chunk.go @@ -38,7 +38,7 @@ type Chunk struct { // Capacity constants. const ( InitialCapacity = 32 - NoDataChunkCap = -1 + ZeroCapacity = -1 ) // NewChunkWithCapacity creates a new chunk with field types and capacity. @@ -51,7 +51,7 @@ func NewChunkWithCapacity(fields []*types.FieldType, cap int) *Chunk { // maxChunkSize: the max limit for the number of rows. func New(fields []*types.FieldType, cap, maxChunkSize int) *Chunk { chk := new(Chunk) - if cap == NoDataChunkCap { + if cap == ZeroCapacity { return chk } chk.columns = make([]*column, 0, len(fields)) From bbb107f505f0a9ac511f6aab6e62817a1e129db9 Mon Sep 17 00:00:00 2001 From: robi Date: Wed, 26 Sep 2018 21:32:22 +0800 Subject: [PATCH 12/12] address comment --- util/chunk/chunk.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/util/chunk/chunk.go b/util/chunk/chunk.go index 3a1ed4238d5b0..a5f2cddc57fa1 100644 --- a/util/chunk/chunk.go +++ b/util/chunk/chunk.go @@ -38,7 +38,7 @@ type Chunk struct { // Capacity constants. const ( InitialCapacity = 32 - ZeroCapacity = -1 + ZeroCapacity = 0 ) // NewChunkWithCapacity creates a new chunk with field types and capacity. @@ -51,9 +51,6 @@ func NewChunkWithCapacity(fields []*types.FieldType, cap int) *Chunk { // maxChunkSize: the max limit for the number of rows. func New(fields []*types.FieldType, cap, maxChunkSize int) *Chunk { chk := new(Chunk) - if cap == ZeroCapacity { - return chk - } chk.columns = make([]*column, 0, len(fields)) chk.capacity = mathutil.Min(cap, maxChunkSize) for _, f := range fields {