Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor, util: rename original List to ListInMemory and add a new interface List #13353

Merged
merged 5 commits into from
Nov 11, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ func newFirstChunk(e Executor) *chunk.Chunk {
return chunk.New(base.retFieldTypes, base.initCap, base.maxChunkSize)
}

// newList creates a new List to buffer current executor's result.
func newList(e Executor) *chunk.List {
// newList creates a new ListInMemory to buffer current executor's result.
func newList(e Executor) *chunk.ListInMemory {
base := e.base()
return chunk.NewList(base.retFieldTypes, base.initCap, base.maxChunkSize)
return chunk.NewListInMemory(base.retFieldTypes, base.initCap, base.maxChunkSize)
}

// retTypes returns all output column types.
Expand Down Expand Up @@ -1115,7 +1115,7 @@ type TableScanExec struct {
iter kv.Iterator
columns []*model.ColumnInfo
isVirtualTable bool
virtualTableChunkList *chunk.List
virtualTableChunkList *chunk.ListInMemory
virtualTableChunkIdx int
}

Expand Down Expand Up @@ -1146,7 +1146,7 @@ func (e *TableScanExec) Next(ctx context.Context, req *chunk.Chunk) error {
func (e *TableScanExec) nextChunk4InfoSchema(ctx context.Context, chk *chunk.Chunk) error {
chk.GrowAndReset(e.maxChunkSize)
if e.virtualTableChunkList == nil {
e.virtualTableChunkList = chunk.NewList(retTypes(e), e.initCap, e.maxChunkSize)
e.virtualTableChunkList = chunk.NewListInMemory(retTypes(e), e.initCap, e.maxChunkSize)
columns := make([]*table.Column, e.schema.Len())
for i, colInfo := range e.columns {
columns[i] = table.ToColumn(colInfo)
Expand Down
11 changes: 7 additions & 4 deletions executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type hashRowContainer struct {
memTracker *memory.Tracker

// records stores the chunks in memory.
records *chunk.List
records *chunk.ListInMemory
// recordsInDisk stores the chunks in disk.
recordsInDisk *chunk.ListInDisk

Expand All @@ -117,7 +117,7 @@ func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContex
if estCount < maxChunkSize*estCountMinFactor {
estCount = 0
}
initList := chunk.NewList(hCtx.allTypes, maxChunkSize, maxChunkSize)
initList := chunk.NewListInMemory(hCtx.allTypes, maxChunkSize, maxChunkSize)
c := &hashRowContainer{
sc: sCtx.GetSessionVars().StmtCtx,
hCtx: hCtx,
Expand Down Expand Up @@ -202,7 +202,10 @@ func (c *hashRowContainer) PutChunk(chk *chunk.Chunk) error {
}
} else {
chkIdx = uint32(c.records.NumChunks())
c.records.Add(chk)
err := c.records.Add(chk)
if err != nil {
return err
}
if atomic.LoadUint32(&c.exceeded) != 0 {
err := c.spillToDisk()
if err != nil {
Expand Down Expand Up @@ -269,7 +272,7 @@ func (c *hashRowContainer) ActionSpill() memory.ActionOnExceed {
return &spillDiskAction{c: c}
}

// spillDiskAction implements memory.ActionOnExceed for chunk.List. If
// spillDiskAction implements memory.ActionOnExceed for chunk.ListInMemory. If
// the memory quota of a query is exceeded, spillDiskAction.Action is
// triggered.
type spillDiskAction struct {
Expand Down
16 changes: 11 additions & 5 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ type innerCtx struct {
}

type lookUpJoinTask struct {
outerResult *chunk.List
outerResult *chunk.ListInMemory
outerMatch [][]bool

innerResult *chunk.List
innerResult *chunk.ListInMemory
encodedLookUpKeys []*chunk.Chunk
lookupMap *mvmap.MVMap
matchedInners []chunk.Row
Expand Down Expand Up @@ -392,7 +392,10 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
break
}

task.outerResult.Add(chk)
err = task.outerResult.Add(chk)
if err != nil {
return nil, err
}
}
if task.outerResult.Len() == 0 {
return nil, nil
Expand Down Expand Up @@ -601,7 +604,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
return err
}
defer terror.Call(innerExec.Close)
innerResult := chunk.NewList(retTypes(innerExec), iw.ctx.GetSessionVars().MaxChunkSize, iw.ctx.GetSessionVars().MaxChunkSize)
innerResult := chunk.NewListInMemory(retTypes(innerExec), iw.ctx.GetSessionVars().MaxChunkSize, iw.ctx.GetSessionVars().MaxChunkSize)
innerResult.GetMemTracker().SetLabel(buildSideResultLabel)
innerResult.GetMemTracker().AttachTo(task.memTracker)
for {
Expand All @@ -617,7 +620,10 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
if iw.executorChk.NumRows() == 0 {
break
}
innerResult.Add(iw.executorChk)
err = innerResult.Add(iw.executorChk)
if err != nil {
return err
}
iw.executorChk = newFirstChunk(innerExec)
}
task.innerResult = innerResult
Expand Down
9 changes: 6 additions & 3 deletions executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type innerMergeCtx struct {
}

type lookUpMergeJoinTask struct {
outerResult *chunk.List
outerResult *chunk.ListInMemory
outerOrderIdx []chunk.RowPtr

innerResult *chunk.Chunk
Expand Down Expand Up @@ -323,7 +323,7 @@ func (omw *outerMergeWorker) pushToChan(ctx context.Context, task *lookUpMergeJo
func (omw *outerMergeWorker) buildTask(ctx context.Context) (*lookUpMergeJoinTask, error) {
task := &lookUpMergeJoinTask{
results: make(chan *indexMergeJoinResult, numResChkHold),
outerResult: chunk.NewList(omw.rowTypes, omw.executor.base().initCap, omw.executor.base().maxChunkSize),
outerResult: chunk.NewListInMemory(omw.rowTypes, omw.executor.base().initCap, omw.executor.base().maxChunkSize),
}
task.memTracker = memory.NewTracker(stringutil.MemoizeStr(func() string { return fmt.Sprintf("lookup join task %p", task) }), -1)
task.memTracker.AttachTo(omw.parentMemTracker)
Expand All @@ -346,7 +346,10 @@ func (omw *outerMergeWorker) buildTask(ctx context.Context) (*lookUpMergeJoinTas
break
}

task.outerResult.Add(execChk)
err = task.outerResult.Add(execChk)
if err != nil {
return task, err
}
requiredRows -= execChk.NumRows()
task.memTracker.Consume(execChk.MemoryUsage())
}
Expand Down
6 changes: 3 additions & 3 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ type NestedLoopApplyExec struct {
outerChunk *chunk.Chunk
outerChunkCursor int
outerSelected []bool
innerList *chunk.List
innerList *chunk.ListInMemory
innerChunk *chunk.Chunk
innerSelected []bool
innerIter chunk.Iterator
Expand Down Expand Up @@ -586,7 +586,7 @@ func (e *NestedLoopApplyExec) Open(ctx context.Context) error {
e.innerRows = e.innerRows[:0]
e.outerChunk = newFirstChunk(e.outerExec)
e.innerChunk = newFirstChunk(e.innerExec)
e.innerList = chunk.NewList(retTypes(e.innerExec), e.initCap, e.maxChunkSize)
e.innerList = chunk.NewListInMemory(retTypes(e.innerExec), e.initCap, e.maxChunkSize)

e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaNestedLoopApply)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
Expand Down Expand Up @@ -628,7 +628,7 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch
}
}

// fetchAllInners reads all data from the inner table and stores them in a List.
// fetchAllInners reads all data from the inner table and stores them in a ListInMemory.
func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {
err := e.innerExec.Open(ctx)
defer terror.Call(e.innerExec.Close)
Expand Down
2 changes: 1 addition & 1 deletion executor/pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s *pkgTestSuite) TestNestedLoopApply(c *C) {
innerFilter: []expression.Expression{innerFilter},
joiner: joiner,
}
join.innerList = chunk.NewList(retTypes(innerExec), innerExec.initCap, innerExec.maxChunkSize)
join.innerList = chunk.NewListInMemory(retTypes(innerExec), innerExec.initCap, innerExec.maxChunkSize)
join.innerChunk = newFirstChunk(innerExec)
join.outerChunk = newFirstChunk(outerExec)
joinChk := newFirstChunk(join)
Expand Down
18 changes: 12 additions & 6 deletions executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type SortExec struct {
// keyCmpFuncs is used to compare each ByItem.
keyCmpFuncs []chunk.CompareFunc
// rowChunks is the chunks to store row values.
rowChunks *chunk.List
rowChunks *chunk.ListInMemory
// rowPointer store the chunk index and row index for each row.
rowPtrs []chunk.RowPtr

Expand Down Expand Up @@ -95,7 +95,7 @@ func (e *SortExec) Next(ctx context.Context, req *chunk.Chunk) error {

func (e *SortExec) fetchRowChunks(ctx context.Context) error {
fields := retTypes(e)
e.rowChunks = chunk.NewList(fields, e.initCap, e.maxChunkSize)
e.rowChunks = chunk.NewListInMemory(fields, e.initCap, e.maxChunkSize)
e.rowChunks.GetMemTracker().AttachTo(e.memTracker)
e.rowChunks.GetMemTracker().SetLabel(rowChunksLabel)
for {
Expand All @@ -108,7 +108,10 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error {
if rowCount == 0 {
break
}
e.rowChunks.Add(chk)
err = e.rowChunks.Add(chk)
if err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -256,7 +259,7 @@ func (e *TopNExec) Next(ctx context.Context, req *chunk.Chunk) error {

func (e *TopNExec) loadChunksUntilTotalLimit(ctx context.Context) error {
e.chkHeap = &topNChunkHeap{e}
e.rowChunks = chunk.NewList(retTypes(e), e.initCap, e.maxChunkSize)
e.rowChunks = chunk.NewListInMemory(retTypes(e), e.initCap, e.maxChunkSize)
e.rowChunks.GetMemTracker().AttachTo(e.memTracker)
e.rowChunks.GetMemTracker().SetLabel(rowChunksLabel)
for uint64(e.rowChunks.Len()) < e.totalLimit {
Expand All @@ -270,7 +273,10 @@ func (e *TopNExec) loadChunksUntilTotalLimit(ctx context.Context) error {
if srcChk.NumRows() == 0 {
break
}
e.rowChunks.Add(srcChk)
err = e.rowChunks.Add(srcChk)
if err != nil {
return err
}
}
e.initPointers()
e.initCompareFuncs()
Expand Down Expand Up @@ -330,7 +336,7 @@ func (e *TopNExec) processChildChk(childRowChk *chunk.Chunk) error {
// but we want descending top N, then we will keep all data in memory.
// But if data is distributed randomly, this function will be called log(n) times.
func (e *TopNExec) doCompaction() error {
newRowChunks := chunk.NewList(retTypes(e), e.initCap, e.maxChunkSize)
newRowChunks := chunk.NewListInMemory(retTypes(e), e.initCap, e.maxChunkSize)
newRowPtrs := make([]chunk.RowPtr, 0, e.rowChunks.Len())
for _, rowPtr := range e.rowPtrs {
newRowPtr := newRowChunks.AppendRow(e.rowChunks.GetRow(rowPtr))
Expand Down
9 changes: 7 additions & 2 deletions util/chunk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk {
return l
}

// GetDiskTracker returns the memory tracker of this List.
// GetDiskTracker returns the memory tracker of this ListInMemory.
func (l *ListInDisk) GetDiskTracker() *disk.Tracker {
return l.diskTracker
}
Expand All @@ -92,7 +92,7 @@ func (l *ListInDisk) GetDiskTracker() *disk.Tracker {
// is not empty and not used any more and has the same field types.
func (l *ListInDisk) Add(chk *Chunk) (err error) {
if chk.NumRows() == 0 {
return errors.New("chunk appended to List should have at least 1 row")
return errors.New("chunk appended to ListInMemory should have at least 1 row")
}
if l.disk == nil {
l.disk, err = ioutil.TempFile(tmpDir, l.diskTracker.Label().String())
Expand Down Expand Up @@ -138,6 +138,11 @@ func (l *ListInDisk) NumChunks() int {
return len(l.offsets)
}

// NumRowsOfChunk returns the number of rows of a chunk in the ListInDisk.
func (l *ListInDisk) NumRowsOfChunk(chkID int) int {
return len(l.offsets[chkID])
}

// Close releases the disk resource.
func (l *ListInDisk) Close() error {
if l.disk != nil {
Expand Down
10 changes: 5 additions & 5 deletions util/chunk/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ func (it *Iterator4Chunk) GetChunk() *Chunk {
return it.chk
}

// NewIterator4List returns a Iterator for List.
func NewIterator4List(li *List) Iterator {
// NewIterator4List returns a Iterator for ListInMemory.
func NewIterator4List(li *ListInMemory) Iterator {
return &iterator4List{li: li}
}

type iterator4List struct {
li *List
li *ListInMemory
chkCursor int
rowCursor int
}
Expand Down Expand Up @@ -232,12 +232,12 @@ func (it *iterator4List) Len() int {
}

// NewIterator4RowPtr returns a Iterator for RowPtrs.
func NewIterator4RowPtr(li *List, ptrs []RowPtr) Iterator {
func NewIterator4RowPtr(li *ListInMemory, ptrs []RowPtr) Iterator {
return &iterator4RowPtr{li: li, ptrs: ptrs}
}

type iterator4RowPtr struct {
li *List
li *ListInMemory
ptrs []RowPtr
cursor int
}
Expand Down
6 changes: 3 additions & 3 deletions util/chunk/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (s *testChunkSuite) TestIterator(c *check.C) {
expected = append(expected, int64(i))
}
var rows []Row
li := NewList(fields, 1, 2)
li2 := NewList(fields, 8, 16)
li := NewListInMemory(fields, 1, 2)
li2 := NewListInMemory(fields, 8, 16)
var ptrs []RowPtr
var ptrs2 []RowPtr
for i := 0; i < n; i++ {
Expand Down Expand Up @@ -120,7 +120,7 @@ func (s *testChunkSuite) TestIterator(c *check.C) {
c.Assert(it.Begin(), check.Equals, it.End())
it = NewIterator4Chunk(new(Chunk))
c.Assert(it.Begin(), check.Equals, it.End())
it = NewIterator4List(new(List))
it = NewIterator4List(new(ListInMemory))
c.Assert(it.Begin(), check.Equals, it.End())
it = NewIterator4RowPtr(li, nil)
c.Assert(it.Begin(), check.Equals, it.End())
Expand Down
Loading