Skip to content

Commit

Permalink
executor, util: rename original List to ListInMemory and add a new in…
Browse files Browse the repository at this point in the history
…terface List (pingcap#13353)
  • Loading branch information
fzhedu authored and sre-bot committed Nov 11, 2019
1 parent 30da446 commit 2b5f81a
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 75 deletions.
10 changes: 5 additions & 5 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ func newFirstChunk(e Executor) *chunk.Chunk {
return chunk.New(base.retFieldTypes, base.initCap, base.maxChunkSize)
}

// newList creates a new List to buffer current executor's result.
func newList(e Executor) *chunk.List {
// newList creates a new ListInMemory to buffer current executor's result.
func newList(e Executor) *chunk.ListInMemory {
base := e.base()
return chunk.NewList(base.retFieldTypes, base.initCap, base.maxChunkSize)
return chunk.NewListInMemory(base.retFieldTypes, base.initCap, base.maxChunkSize)
}

// retTypes returns all output column types.
Expand Down Expand Up @@ -1115,7 +1115,7 @@ type TableScanExec struct {
iter kv.Iterator
columns []*model.ColumnInfo
isVirtualTable bool
virtualTableChunkList *chunk.List
virtualTableChunkList *chunk.ListInMemory
virtualTableChunkIdx int
}

Expand Down Expand Up @@ -1146,7 +1146,7 @@ func (e *TableScanExec) Next(ctx context.Context, req *chunk.Chunk) error {
func (e *TableScanExec) nextChunk4InfoSchema(ctx context.Context, chk *chunk.Chunk) error {
chk.GrowAndReset(e.maxChunkSize)
if e.virtualTableChunkList == nil {
e.virtualTableChunkList = chunk.NewList(retTypes(e), e.initCap, e.maxChunkSize)
e.virtualTableChunkList = chunk.NewListInMemory(retTypes(e), e.initCap, e.maxChunkSize)
columns := make([]*table.Column, e.schema.Len())
for i, colInfo := range e.columns {
columns[i] = table.ToColumn(colInfo)
Expand Down
11 changes: 7 additions & 4 deletions executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type hashRowContainer struct {
memTracker *memory.Tracker

// records stores the chunks in memory.
records *chunk.List
records *chunk.ListInMemory
// recordsInDisk stores the chunks in disk.
recordsInDisk *chunk.ListInDisk

Expand All @@ -117,7 +117,7 @@ func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContex
if estCount < maxChunkSize*estCountMinFactor {
estCount = 0
}
initList := chunk.NewList(hCtx.allTypes, maxChunkSize, maxChunkSize)
initList := chunk.NewListInMemory(hCtx.allTypes, maxChunkSize, maxChunkSize)
c := &hashRowContainer{
sc: sCtx.GetSessionVars().StmtCtx,
hCtx: hCtx,
Expand Down Expand Up @@ -202,7 +202,10 @@ func (c *hashRowContainer) PutChunk(chk *chunk.Chunk) error {
}
} else {
chkIdx = uint32(c.records.NumChunks())
c.records.Add(chk)
err := c.records.Add(chk)
if err != nil {
return err
}
if atomic.LoadUint32(&c.exceeded) != 0 {
err := c.spillToDisk()
if err != nil {
Expand Down Expand Up @@ -269,7 +272,7 @@ func (c *hashRowContainer) ActionSpill() memory.ActionOnExceed {
return &spillDiskAction{c: c}
}

// spillDiskAction implements memory.ActionOnExceed for chunk.List. If
// spillDiskAction implements memory.ActionOnExceed for chunk.ListInMemory. If
// the memory quota of a query is exceeded, spillDiskAction.Action is
// triggered.
type spillDiskAction struct {
Expand Down
16 changes: 11 additions & 5 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ type innerCtx struct {
}

type lookUpJoinTask struct {
outerResult *chunk.List
outerResult *chunk.ListInMemory
outerMatch [][]bool

innerResult *chunk.List
innerResult *chunk.ListInMemory
encodedLookUpKeys []*chunk.Chunk
lookupMap *mvmap.MVMap
matchedInners []chunk.Row
Expand Down Expand Up @@ -392,7 +392,10 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
break
}

task.outerResult.Add(chk)
err = task.outerResult.Add(chk)
if err != nil {
return nil, err
}
}
if task.outerResult.Len() == 0 {
return nil, nil
Expand Down Expand Up @@ -601,7 +604,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
return err
}
defer terror.Call(innerExec.Close)
innerResult := chunk.NewList(retTypes(innerExec), iw.ctx.GetSessionVars().MaxChunkSize, iw.ctx.GetSessionVars().MaxChunkSize)
innerResult := chunk.NewListInMemory(retTypes(innerExec), iw.ctx.GetSessionVars().MaxChunkSize, iw.ctx.GetSessionVars().MaxChunkSize)
innerResult.GetMemTracker().SetLabel(buildSideResultLabel)
innerResult.GetMemTracker().AttachTo(task.memTracker)
for {
Expand All @@ -617,7 +620,10 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
if iw.executorChk.NumRows() == 0 {
break
}
innerResult.Add(iw.executorChk)
err = innerResult.Add(iw.executorChk)
if err != nil {
return err
}
iw.executorChk = newFirstChunk(innerExec)
}
task.innerResult = innerResult
Expand Down
9 changes: 6 additions & 3 deletions executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type innerMergeCtx struct {
}

type lookUpMergeJoinTask struct {
outerResult *chunk.List
outerResult *chunk.ListInMemory
outerOrderIdx []chunk.RowPtr

innerResult *chunk.Chunk
Expand Down Expand Up @@ -323,7 +323,7 @@ func (omw *outerMergeWorker) pushToChan(ctx context.Context, task *lookUpMergeJo
func (omw *outerMergeWorker) buildTask(ctx context.Context) (*lookUpMergeJoinTask, error) {
task := &lookUpMergeJoinTask{
results: make(chan *indexMergeJoinResult, numResChkHold),
outerResult: chunk.NewList(omw.rowTypes, omw.executor.base().initCap, omw.executor.base().maxChunkSize),
outerResult: chunk.NewListInMemory(omw.rowTypes, omw.executor.base().initCap, omw.executor.base().maxChunkSize),
}
task.memTracker = memory.NewTracker(stringutil.MemoizeStr(func() string { return fmt.Sprintf("lookup join task %p", task) }), -1)
task.memTracker.AttachTo(omw.parentMemTracker)
Expand All @@ -346,7 +346,10 @@ func (omw *outerMergeWorker) buildTask(ctx context.Context) (*lookUpMergeJoinTas
break
}

task.outerResult.Add(execChk)
err = task.outerResult.Add(execChk)
if err != nil {
return task, err
}
requiredRows -= execChk.NumRows()
task.memTracker.Consume(execChk.MemoryUsage())
}
Expand Down
6 changes: 3 additions & 3 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ type NestedLoopApplyExec struct {
outerChunk *chunk.Chunk
outerChunkCursor int
outerSelected []bool
innerList *chunk.List
innerList *chunk.ListInMemory
innerChunk *chunk.Chunk
innerSelected []bool
innerIter chunk.Iterator
Expand Down Expand Up @@ -586,7 +586,7 @@ func (e *NestedLoopApplyExec) Open(ctx context.Context) error {
e.innerRows = e.innerRows[:0]
e.outerChunk = newFirstChunk(e.outerExec)
e.innerChunk = newFirstChunk(e.innerExec)
e.innerList = chunk.NewList(retTypes(e.innerExec), e.initCap, e.maxChunkSize)
e.innerList = chunk.NewListInMemory(retTypes(e.innerExec), e.initCap, e.maxChunkSize)

e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaNestedLoopApply)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
Expand Down Expand Up @@ -628,7 +628,7 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch
}
}

// fetchAllInners reads all data from the inner table and stores them in a List.
// fetchAllInners reads all data from the inner table and stores them in a ListInMemory.
func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {
err := e.innerExec.Open(ctx)
defer terror.Call(e.innerExec.Close)
Expand Down
2 changes: 1 addition & 1 deletion executor/pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s *pkgTestSuite) TestNestedLoopApply(c *C) {
innerFilter: []expression.Expression{innerFilter},
joiner: joiner,
}
join.innerList = chunk.NewList(retTypes(innerExec), innerExec.initCap, innerExec.maxChunkSize)
join.innerList = chunk.NewListInMemory(retTypes(innerExec), innerExec.initCap, innerExec.maxChunkSize)
join.innerChunk = newFirstChunk(innerExec)
join.outerChunk = newFirstChunk(outerExec)
joinChk := newFirstChunk(join)
Expand Down
18 changes: 12 additions & 6 deletions executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type SortExec struct {
// keyCmpFuncs is used to compare each ByItem.
keyCmpFuncs []chunk.CompareFunc
// rowChunks is the chunks to store row values.
rowChunks *chunk.List
rowChunks *chunk.ListInMemory
// rowPointer store the chunk index and row index for each row.
rowPtrs []chunk.RowPtr

Expand Down Expand Up @@ -95,7 +95,7 @@ func (e *SortExec) Next(ctx context.Context, req *chunk.Chunk) error {

func (e *SortExec) fetchRowChunks(ctx context.Context) error {
fields := retTypes(e)
e.rowChunks = chunk.NewList(fields, e.initCap, e.maxChunkSize)
e.rowChunks = chunk.NewListInMemory(fields, e.initCap, e.maxChunkSize)
e.rowChunks.GetMemTracker().AttachTo(e.memTracker)
e.rowChunks.GetMemTracker().SetLabel(rowChunksLabel)
for {
Expand All @@ -108,7 +108,10 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error {
if rowCount == 0 {
break
}
e.rowChunks.Add(chk)
err = e.rowChunks.Add(chk)
if err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -256,7 +259,7 @@ func (e *TopNExec) Next(ctx context.Context, req *chunk.Chunk) error {

func (e *TopNExec) loadChunksUntilTotalLimit(ctx context.Context) error {
e.chkHeap = &topNChunkHeap{e}
e.rowChunks = chunk.NewList(retTypes(e), e.initCap, e.maxChunkSize)
e.rowChunks = chunk.NewListInMemory(retTypes(e), e.initCap, e.maxChunkSize)
e.rowChunks.GetMemTracker().AttachTo(e.memTracker)
e.rowChunks.GetMemTracker().SetLabel(rowChunksLabel)
for uint64(e.rowChunks.Len()) < e.totalLimit {
Expand All @@ -270,7 +273,10 @@ func (e *TopNExec) loadChunksUntilTotalLimit(ctx context.Context) error {
if srcChk.NumRows() == 0 {
break
}
e.rowChunks.Add(srcChk)
err = e.rowChunks.Add(srcChk)
if err != nil {
return err
}
}
e.initPointers()
e.initCompareFuncs()
Expand Down Expand Up @@ -330,7 +336,7 @@ func (e *TopNExec) processChildChk(childRowChk *chunk.Chunk) error {
// but we want descending top N, then we will keep all data in memory.
// But if data is distributed randomly, this function will be called log(n) times.
func (e *TopNExec) doCompaction() error {
newRowChunks := chunk.NewList(retTypes(e), e.initCap, e.maxChunkSize)
newRowChunks := chunk.NewListInMemory(retTypes(e), e.initCap, e.maxChunkSize)
newRowPtrs := make([]chunk.RowPtr, 0, e.rowChunks.Len())
for _, rowPtr := range e.rowPtrs {
newRowPtr := newRowChunks.AppendRow(e.rowChunks.GetRow(rowPtr))
Expand Down
9 changes: 7 additions & 2 deletions util/chunk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk {
return l
}

// GetDiskTracker returns the memory tracker of this List.
// GetDiskTracker returns the memory tracker of this ListInMemory.
func (l *ListInDisk) GetDiskTracker() *disk.Tracker {
return l.diskTracker
}
Expand All @@ -92,7 +92,7 @@ func (l *ListInDisk) GetDiskTracker() *disk.Tracker {
// is not empty and not used any more and has the same field types.
func (l *ListInDisk) Add(chk *Chunk) (err error) {
if chk.NumRows() == 0 {
return errors.New("chunk appended to List should have at least 1 row")
return errors.New("chunk appended to ListInMemory should have at least 1 row")
}
if l.disk == nil {
l.disk, err = ioutil.TempFile(tmpDir, l.diskTracker.Label().String())
Expand Down Expand Up @@ -138,6 +138,11 @@ func (l *ListInDisk) NumChunks() int {
return len(l.offsets)
}

// NumRowsOfChunk returns the number of rows of a chunk in the ListInDisk.
func (l *ListInDisk) NumRowsOfChunk(chkID int) int {
return len(l.offsets[chkID])
}

// Close releases the disk resource.
func (l *ListInDisk) Close() error {
if l.disk != nil {
Expand Down
10 changes: 5 additions & 5 deletions util/chunk/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ func (it *Iterator4Chunk) GetChunk() *Chunk {
return it.chk
}

// NewIterator4List returns a Iterator for List.
func NewIterator4List(li *List) Iterator {
// NewIterator4List returns a Iterator for ListInMemory.
func NewIterator4List(li *ListInMemory) Iterator {
return &iterator4List{li: li}
}

type iterator4List struct {
li *List
li *ListInMemory
chkCursor int
rowCursor int
}
Expand Down Expand Up @@ -232,12 +232,12 @@ func (it *iterator4List) Len() int {
}

// NewIterator4RowPtr returns a Iterator for RowPtrs.
func NewIterator4RowPtr(li *List, ptrs []RowPtr) Iterator {
func NewIterator4RowPtr(li *ListInMemory, ptrs []RowPtr) Iterator {
return &iterator4RowPtr{li: li, ptrs: ptrs}
}

type iterator4RowPtr struct {
li *List
li *ListInMemory
ptrs []RowPtr
cursor int
}
Expand Down
6 changes: 3 additions & 3 deletions util/chunk/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (s *testChunkSuite) TestIterator(c *check.C) {
expected = append(expected, int64(i))
}
var rows []Row
li := NewList(fields, 1, 2)
li2 := NewList(fields, 8, 16)
li := NewListInMemory(fields, 1, 2)
li2 := NewListInMemory(fields, 8, 16)
var ptrs []RowPtr
var ptrs2 []RowPtr
for i := 0; i < n; i++ {
Expand Down Expand Up @@ -120,7 +120,7 @@ func (s *testChunkSuite) TestIterator(c *check.C) {
c.Assert(it.Begin(), check.Equals, it.End())
it = NewIterator4Chunk(new(Chunk))
c.Assert(it.Begin(), check.Equals, it.End())
it = NewIterator4List(new(List))
it = NewIterator4List(new(ListInMemory))
c.Assert(it.Begin(), check.Equals, it.End())
it = NewIterator4RowPtr(li, nil)
c.Assert(it.Begin(), check.Equals, it.End())
Expand Down
Loading

0 comments on commit 2b5f81a

Please sign in to comment.