Skip to content

Commit

Permalink
executor: control Chunk size for Selection&Projection (#10110)
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored and ngaut committed Apr 11, 2019
1 parent 998cbf6 commit e5cfa51
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 4 deletions.
2 changes: 1 addition & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if !e.selected[e.inputRow.Idx()] {
continue
}
if chk.NumRows() >= chk.Capacity() {
if chk.IsFull() {
return nil
}
chk.AppendRow(e.inputRow)
Expand Down
226 changes: 223 additions & 3 deletions executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"fmt"
"math"
"math/rand"
"time"

"github.com/cznic/mathutil"
. "github.com/pingcap/check"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
plannercore "github.com/pingcap/tidb/planner/core"
Expand All @@ -40,6 +42,15 @@ type requiredRowsDataSource struct {

expectedRowsRet []int
numNextCalled int

generator func(valType *types.FieldType) interface{}
}

func newRequiredRowsDataSourceWithGenerator(ctx sessionctx.Context, totalRows int, expectedRowsRet []int,
gen func(valType *types.FieldType) interface{}) *requiredRowsDataSource {
ds := newRequiredRowsDataSource(ctx, totalRows, expectedRowsRet)
ds.generator = gen
return ds
}

func newRequiredRowsDataSource(ctx sessionctx.Context, totalRows int, expectedRowsRet []int) *requiredRowsDataSource {
Expand All @@ -51,7 +62,7 @@ func newRequiredRowsDataSource(ctx sessionctx.Context, totalRows int, expectedRo
}
schema := expression.NewSchema(cols...)
baseExec := newBaseExecutor(ctx, schema, "")
return &requiredRowsDataSource{baseExec, totalRows, 0, ctx, expectedRowsRet, 0}
return &requiredRowsDataSource{baseExec, totalRows, 0, ctx, expectedRowsRet, 0, defaultGenerator}
}

func (r *requiredRowsDataSource) Next(ctx context.Context, req *chunk.Chunk) error {
Expand Down Expand Up @@ -79,12 +90,12 @@ func (r *requiredRowsDataSource) Next(ctx context.Context, req *chunk.Chunk) err
func (r *requiredRowsDataSource) genOneRow() chunk.Row {
row := chunk.MutRowFromTypes(r.retTypes())
for i := range r.retTypes() {
row.SetValue(i, r.genValue(r.retTypes()[i]))
row.SetValue(i, r.generator(r.retTypes()[i]))
}
return row.ToRow()
}

func (r *requiredRowsDataSource) genValue(valType *types.FieldType) interface{} {
func defaultGenerator(valType *types.FieldType) interface{} {
switch valType.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(rand.Int())
Expand Down Expand Up @@ -167,6 +178,7 @@ func (s *testExecSuite) TestLimitRequiredRows(c *C) {
c.Assert(exec.Next(ctx, chk), IsNil)
c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i])
}
c.Assert(exec.Close(), IsNil)
c.Assert(ds.checkNumNextCalled(), IsNil)
}
}
Expand Down Expand Up @@ -248,6 +260,7 @@ func (s *testExecSuite) TestSortRequiredRows(c *C) {
c.Assert(exec.Next(ctx, chk), IsNil)
c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i])
}
c.Assert(exec.Close(), IsNil)
c.Assert(ds.checkNumNextCalled(), IsNil)
}
}
Expand Down Expand Up @@ -354,6 +367,7 @@ func (s *testExecSuite) TestTopNRequiredRows(c *C) {
c.Assert(exec.Next(ctx, chk), IsNil)
c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i])
}
c.Assert(exec.Close(), IsNil)
c.Assert(ds.checkNumNextCalled(), IsNil)
}
}
Expand All @@ -369,3 +383,209 @@ func buildTopNExec(ctx sessionctx.Context, offset, count int, byItems []*planner
limit: &plannercore.PhysicalLimit{Count: uint64(count), Offset: uint64(offset)},
}
}

func (s *testExecSuite) TestSelectionRequiredRows(c *C) {
gen01 := func() func(valType *types.FieldType) interface{} {
closureCount := 0
return func(valType *types.FieldType) interface{} {
switch valType.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
ret := int64(closureCount % 2)
closureCount++
return ret
case mysql.TypeDouble:
return rand.Float64()
default:
panic("not implement")
}
}
}

maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize
testCases := []struct {
totalRows int
filtersOfCol1 int
requiredRows []int
expectedRows []int
expectedRowsDS []int
gen func(valType *types.FieldType) interface{}
}{
{
totalRows: 20,
requiredRows: []int{1, 2, 3, 4, 5, 20},
expectedRows: []int{1, 2, 3, 4, 5, 5},
expectedRowsDS: []int{20, 0},
},
{
totalRows: 20,
filtersOfCol1: 0,
requiredRows: []int{1, 3, 5, 7, 9},
expectedRows: []int{1, 3, 5, 1, 0},
expectedRowsDS: []int{20, 0, 0},
gen: gen01(),
},
{
totalRows: maxChunkSize + 20,
filtersOfCol1: 1,
requiredRows: []int{1, 3, 5, maxChunkSize},
expectedRows: []int{1, 3, 5, maxChunkSize/2 - 1 - 3 - 5 + 10},
expectedRowsDS: []int{maxChunkSize, 20, 0},
gen: gen01(),
},
}

for _, testCase := range testCases {
sctx := defaultCtx()
ctx := context.Background()
var filters []expression.Expression
var ds *requiredRowsDataSource
if testCase.gen == nil {
// ignore filters
ds = newRequiredRowsDataSource(sctx, testCase.totalRows, testCase.expectedRowsDS)
} else {
ds = newRequiredRowsDataSourceWithGenerator(sctx, testCase.totalRows, testCase.expectedRowsDS, testCase.gen)
f, err := expression.NewFunction(
sctx, ast.EQ, types.NewFieldType(byte(types.ETInt)), ds.Schema().Columns[1], &expression.Constant{
Value: types.NewDatum(testCase.filtersOfCol1),
RetType: types.NewFieldType(mysql.TypeTiny),
})
c.Assert(err, IsNil)
filters = append(filters, f)
}
exec := buildSelectionExec(sctx, filters, ds)
c.Assert(exec.Open(ctx), IsNil)
chk := exec.newFirstChunk()
for i := range testCase.requiredRows {
chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize)
c.Assert(exec.Next(ctx, chk), IsNil)
c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i])
}
c.Assert(exec.Close(), IsNil)
c.Assert(ds.checkNumNextCalled(), IsNil)
}
}

func buildSelectionExec(ctx sessionctx.Context, filters []expression.Expression, src Executor) Executor {
return &SelectionExec{
baseExecutor: newBaseExecutor(ctx, src.Schema(), "", src),
filters: filters,
}
}

func (s *testExecSuite) TestProjectionUnparallelRequiredRows(c *C) {
maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize
testCases := []struct {
totalRows int
requiredRows []int
expectedRows []int
expectedRowsDS []int
}{
{
totalRows: 20,
requiredRows: []int{1, 3, 5, 7, 9},
expectedRows: []int{1, 3, 5, 7, 4},
expectedRowsDS: []int{1, 3, 5, 7, 4},
},
{
totalRows: maxChunkSize + 10,
requiredRows: []int{1, 3, 5, 7, 9, maxChunkSize},
expectedRows: []int{1, 3, 5, 7, 9, maxChunkSize - 1 - 3 - 5 - 7 - 9 + 10},
expectedRowsDS: []int{1, 3, 5, 7, 9, maxChunkSize - 1 - 3 - 5 - 7 - 9 + 10},
},
{
totalRows: maxChunkSize*2 + 10,
requiredRows: []int{1, 7, 9, maxChunkSize, maxChunkSize + 10},
expectedRows: []int{1, 7, 9, maxChunkSize, maxChunkSize + 10 - 1 - 7 - 9},
expectedRowsDS: []int{1, 7, 9, maxChunkSize, maxChunkSize + 10 - 1 - 7 - 9},
},
}

for _, testCase := range testCases {
sctx := defaultCtx()
ctx := context.Background()
ds := newRequiredRowsDataSource(sctx, testCase.totalRows, testCase.expectedRowsDS)
exprs := make([]expression.Expression, 0, len(ds.Schema().Columns))
if len(exprs) == 0 {
for _, col := range ds.Schema().Columns {
exprs = append(exprs, col)
}
}
exec := buildProjectionExec(sctx, exprs, ds, 0)
c.Assert(exec.Open(ctx), IsNil)
chk := exec.newFirstChunk()
for i := range testCase.requiredRows {
chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize)
c.Assert(exec.Next(ctx, chk), IsNil)
c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i])
}
c.Assert(exec.Close(), IsNil)
c.Assert(ds.checkNumNextCalled(), IsNil)
}
}

func (s *testExecSuite) TestProjectionParallelRequiredRows(c *C) {
maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize
testCases := []struct {
totalRows int
numWorkers int
requiredRows []int
expectedRows []int
expectedRowsDS []int
}{
{
totalRows: 20,
numWorkers: 1,
requiredRows: []int{1, 2, 3, 4, 5, 6, 1, 1},
expectedRows: []int{1, 1, 2, 3, 4, 5, 4, 0},
expectedRowsDS: []int{1, 1, 2, 3, 4, 5, 4, 0},
},
{
totalRows: maxChunkSize * 2,
numWorkers: 1,
requiredRows: []int{7, maxChunkSize, maxChunkSize, maxChunkSize},
expectedRows: []int{7, 7, maxChunkSize, maxChunkSize - 14},
expectedRowsDS: []int{7, 7, maxChunkSize, maxChunkSize - 14, 0},
},
{
totalRows: 20,
numWorkers: 2,
requiredRows: []int{1, 2, 3, 4, 5, 6, 1, 1, 1},
expectedRows: []int{1, 1, 1, 2, 3, 4, 5, 3, 0},
expectedRowsDS: []int{1, 1, 1, 2, 3, 4, 5, 3, 0},
},
}

for _, testCase := range testCases {
sctx := defaultCtx()
ctx := context.Background()
ds := newRequiredRowsDataSource(sctx, testCase.totalRows, testCase.expectedRowsDS)
exprs := make([]expression.Expression, 0, len(ds.Schema().Columns))
if len(exprs) == 0 {
for _, col := range ds.Schema().Columns {
exprs = append(exprs, col)
}
}
exec := buildProjectionExec(sctx, exprs, ds, testCase.numWorkers)
c.Assert(exec.Open(ctx), IsNil)
chk := exec.newFirstChunk()
for i := range testCase.requiredRows {
chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize)
c.Assert(exec.Next(ctx, chk), IsNil)
c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i])

// wait projectionInputFetcher blocked on fetching data
// from child in the background.
time.Sleep(time.Millisecond * 5)
}
c.Assert(exec.Close(), IsNil)
c.Assert(ds.checkNumNextCalled(), IsNil)
}
}

func buildProjectionExec(ctx sessionctx.Context, exprs []expression.Expression, src Executor, numWorkers int) Executor {
return &ProjectionExec{
baseExecutor: newBaseExecutor(ctx, src.Schema(), "", src),
numWorkers: int64(numWorkers),
evaluatorSuit: expression.NewEvaluatorSuite(exprs, false),
}
}
16 changes: 16 additions & 0 deletions executor/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -63,6 +64,13 @@ type ProjectionExec struct {
numWorkers int64
workers []*projectionWorker
childResult *chunk.Chunk

// parentReqRows indicates how many rows the parent executor is
// requiring. It is set when parallelExecute() is called and used by the
// concurrent projectionInputFetcher.
//
// NOTE: It should be protected by atomic operations.
parentReqRows int64
}

// Open implements the Executor Open interface.
Expand All @@ -72,6 +80,7 @@ func (e *ProjectionExec) Open(ctx context.Context) error {
}

e.prepared = false
e.parentReqRows = int64(e.maxChunkSize)

// For now a Projection can not be executed vectorially only because it
// contains "SetVar" or "GetVar" functions, in this scenario this
Expand Down Expand Up @@ -162,6 +171,8 @@ func (e *ProjectionExec) isUnparallelExec() bool {
}

func (e *ProjectionExec) unParallelExecute(ctx context.Context, chk *chunk.Chunk) error {
// push requiredRows down
e.childResult.SetRequiredRows(chk.RequiredRows(), e.maxChunkSize)
err := e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
Expand All @@ -171,6 +182,7 @@ func (e *ProjectionExec) unParallelExecute(ctx context.Context, chk *chunk.Chunk
}

func (e *ProjectionExec) parallelExecute(ctx context.Context, chk *chunk.Chunk) error {
atomic.StoreInt64(&e.parentReqRows, int64(chk.RequiredRows()))
if !e.prepared {
e.prepare(ctx)
e.prepared = true
Expand All @@ -197,6 +209,7 @@ func (e *ProjectionExec) prepare(ctx context.Context) {

// Initialize projectionInputFetcher.
e.fetcher = projectionInputFetcher{
proj: e,
child: e.children[0],
globalFinishCh: e.finishCh,
globalOutputCh: e.outputCh,
Expand Down Expand Up @@ -249,6 +262,7 @@ func (e *ProjectionExec) Close() error {
}

type projectionInputFetcher struct {
proj *ProjectionExec
child Executor
globalFinishCh <-chan struct{}
globalOutputCh chan<- *projectionOutput
Expand Down Expand Up @@ -290,6 +304,8 @@ func (f *projectionInputFetcher) run(ctx context.Context) {

f.globalOutputCh <- output

requiredRows := atomic.LoadInt64(&f.proj.parentReqRows)
input.chk.SetRequiredRows(int(requiredRows), f.proj.maxChunkSize)
err := f.child.Next(ctx, input.chk)
if err != nil || input.chk.NumRows() == 0 {
output.done <- errors.Trace(err)
Expand Down

0 comments on commit e5cfa51

Please sign in to comment.