diff --git a/executor/executor.go b/executor/executor.go index 96ea85726e68f..bef9955dbb685 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -891,7 +891,7 @@ func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error { if !e.selected[e.inputRow.Idx()] { continue } - if chk.NumRows() >= chk.Capacity() { + if chk.IsFull() { return nil } chk.AppendRow(e.inputRow) diff --git a/executor/executor_required_rows_test.go b/executor/executor_required_rows_test.go index 32e39e124f6e6..21badfcf1e9d7 100644 --- a/executor/executor_required_rows_test.go +++ b/executor/executor_required_rows_test.go @@ -18,9 +18,11 @@ import ( "fmt" "math" "math/rand" + "time" "github.com/cznic/mathutil" . "github.com/pingcap/check" + "github.com/pingcap/parser/ast" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" plannercore "github.com/pingcap/tidb/planner/core" @@ -40,6 +42,15 @@ type requiredRowsDataSource struct { expectedRowsRet []int numNextCalled int + + generator func(valType *types.FieldType) interface{} +} + +func newRequiredRowsDataSourceWithGenerator(ctx sessionctx.Context, totalRows int, expectedRowsRet []int, + gen func(valType *types.FieldType) interface{}) *requiredRowsDataSource { + ds := newRequiredRowsDataSource(ctx, totalRows, expectedRowsRet) + ds.generator = gen + return ds } func newRequiredRowsDataSource(ctx sessionctx.Context, totalRows int, expectedRowsRet []int) *requiredRowsDataSource { @@ -51,7 +62,7 @@ func newRequiredRowsDataSource(ctx sessionctx.Context, totalRows int, expectedRo } schema := expression.NewSchema(cols...) baseExec := newBaseExecutor(ctx, schema, "") - return &requiredRowsDataSource{baseExec, totalRows, 0, ctx, expectedRowsRet, 0} + return &requiredRowsDataSource{baseExec, totalRows, 0, ctx, expectedRowsRet, 0, defaultGenerator} } func (r *requiredRowsDataSource) Next(ctx context.Context, req *chunk.Chunk) error { @@ -79,12 +90,12 @@ func (r *requiredRowsDataSource) Next(ctx context.Context, req *chunk.Chunk) err func (r *requiredRowsDataSource) genOneRow() chunk.Row { row := chunk.MutRowFromTypes(r.retTypes()) for i := range r.retTypes() { - row.SetValue(i, r.genValue(r.retTypes()[i])) + row.SetValue(i, r.generator(r.retTypes()[i])) } return row.ToRow() } -func (r *requiredRowsDataSource) genValue(valType *types.FieldType) interface{} { +func defaultGenerator(valType *types.FieldType) interface{} { switch valType.Tp { case mysql.TypeLong, mysql.TypeLonglong: return int64(rand.Int()) @@ -167,6 +178,7 @@ func (s *testExecSuite) TestLimitRequiredRows(c *C) { c.Assert(exec.Next(ctx, chk), IsNil) c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i]) } + c.Assert(exec.Close(), IsNil) c.Assert(ds.checkNumNextCalled(), IsNil) } } @@ -248,6 +260,7 @@ func (s *testExecSuite) TestSortRequiredRows(c *C) { c.Assert(exec.Next(ctx, chk), IsNil) c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i]) } + c.Assert(exec.Close(), IsNil) c.Assert(ds.checkNumNextCalled(), IsNil) } } @@ -354,6 +367,7 @@ func (s *testExecSuite) TestTopNRequiredRows(c *C) { c.Assert(exec.Next(ctx, chk), IsNil) c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i]) } + c.Assert(exec.Close(), IsNil) c.Assert(ds.checkNumNextCalled(), IsNil) } } @@ -369,3 +383,209 @@ func buildTopNExec(ctx sessionctx.Context, offset, count int, byItems []*planner limit: &plannercore.PhysicalLimit{Count: uint64(count), Offset: uint64(offset)}, } } + +func (s *testExecSuite) TestSelectionRequiredRows(c *C) { + gen01 := func() func(valType *types.FieldType) interface{} { + closureCount := 0 + return func(valType *types.FieldType) interface{} { + switch valType.Tp { + case mysql.TypeLong, mysql.TypeLonglong: + ret := int64(closureCount % 2) + closureCount++ + return ret + case mysql.TypeDouble: + return rand.Float64() + default: + panic("not implement") + } + } + } + + maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize + testCases := []struct { + totalRows int + filtersOfCol1 int + requiredRows []int + expectedRows []int + expectedRowsDS []int + gen func(valType *types.FieldType) interface{} + }{ + { + totalRows: 20, + requiredRows: []int{1, 2, 3, 4, 5, 20}, + expectedRows: []int{1, 2, 3, 4, 5, 5}, + expectedRowsDS: []int{20, 0}, + }, + { + totalRows: 20, + filtersOfCol1: 0, + requiredRows: []int{1, 3, 5, 7, 9}, + expectedRows: []int{1, 3, 5, 1, 0}, + expectedRowsDS: []int{20, 0, 0}, + gen: gen01(), + }, + { + totalRows: maxChunkSize + 20, + filtersOfCol1: 1, + requiredRows: []int{1, 3, 5, maxChunkSize}, + expectedRows: []int{1, 3, 5, maxChunkSize/2 - 1 - 3 - 5 + 10}, + expectedRowsDS: []int{maxChunkSize, 20, 0}, + gen: gen01(), + }, + } + + for _, testCase := range testCases { + sctx := defaultCtx() + ctx := context.Background() + var filters []expression.Expression + var ds *requiredRowsDataSource + if testCase.gen == nil { + // ignore filters + ds = newRequiredRowsDataSource(sctx, testCase.totalRows, testCase.expectedRowsDS) + } else { + ds = newRequiredRowsDataSourceWithGenerator(sctx, testCase.totalRows, testCase.expectedRowsDS, testCase.gen) + f, err := expression.NewFunction( + sctx, ast.EQ, types.NewFieldType(byte(types.ETInt)), ds.Schema().Columns[1], &expression.Constant{ + Value: types.NewDatum(testCase.filtersOfCol1), + RetType: types.NewFieldType(mysql.TypeTiny), + }) + c.Assert(err, IsNil) + filters = append(filters, f) + } + exec := buildSelectionExec(sctx, filters, ds) + c.Assert(exec.Open(ctx), IsNil) + chk := exec.newFirstChunk() + for i := range testCase.requiredRows { + chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) + c.Assert(exec.Next(ctx, chk), IsNil) + c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i]) + } + c.Assert(exec.Close(), IsNil) + c.Assert(ds.checkNumNextCalled(), IsNil) + } +} + +func buildSelectionExec(ctx sessionctx.Context, filters []expression.Expression, src Executor) Executor { + return &SelectionExec{ + baseExecutor: newBaseExecutor(ctx, src.Schema(), "", src), + filters: filters, + } +} + +func (s *testExecSuite) TestProjectionUnparallelRequiredRows(c *C) { + maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize + testCases := []struct { + totalRows int + requiredRows []int + expectedRows []int + expectedRowsDS []int + }{ + { + totalRows: 20, + requiredRows: []int{1, 3, 5, 7, 9}, + expectedRows: []int{1, 3, 5, 7, 4}, + expectedRowsDS: []int{1, 3, 5, 7, 4}, + }, + { + totalRows: maxChunkSize + 10, + requiredRows: []int{1, 3, 5, 7, 9, maxChunkSize}, + expectedRows: []int{1, 3, 5, 7, 9, maxChunkSize - 1 - 3 - 5 - 7 - 9 + 10}, + expectedRowsDS: []int{1, 3, 5, 7, 9, maxChunkSize - 1 - 3 - 5 - 7 - 9 + 10}, + }, + { + totalRows: maxChunkSize*2 + 10, + requiredRows: []int{1, 7, 9, maxChunkSize, maxChunkSize + 10}, + expectedRows: []int{1, 7, 9, maxChunkSize, maxChunkSize + 10 - 1 - 7 - 9}, + expectedRowsDS: []int{1, 7, 9, maxChunkSize, maxChunkSize + 10 - 1 - 7 - 9}, + }, + } + + for _, testCase := range testCases { + sctx := defaultCtx() + ctx := context.Background() + ds := newRequiredRowsDataSource(sctx, testCase.totalRows, testCase.expectedRowsDS) + exprs := make([]expression.Expression, 0, len(ds.Schema().Columns)) + if len(exprs) == 0 { + for _, col := range ds.Schema().Columns { + exprs = append(exprs, col) + } + } + exec := buildProjectionExec(sctx, exprs, ds, 0) + c.Assert(exec.Open(ctx), IsNil) + chk := exec.newFirstChunk() + for i := range testCase.requiredRows { + chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) + c.Assert(exec.Next(ctx, chk), IsNil) + c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i]) + } + c.Assert(exec.Close(), IsNil) + c.Assert(ds.checkNumNextCalled(), IsNil) + } +} + +func (s *testExecSuite) TestProjectionParallelRequiredRows(c *C) { + maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize + testCases := []struct { + totalRows int + numWorkers int + requiredRows []int + expectedRows []int + expectedRowsDS []int + }{ + { + totalRows: 20, + numWorkers: 1, + requiredRows: []int{1, 2, 3, 4, 5, 6, 1, 1}, + expectedRows: []int{1, 1, 2, 3, 4, 5, 4, 0}, + expectedRowsDS: []int{1, 1, 2, 3, 4, 5, 4, 0}, + }, + { + totalRows: maxChunkSize * 2, + numWorkers: 1, + requiredRows: []int{7, maxChunkSize, maxChunkSize, maxChunkSize}, + expectedRows: []int{7, 7, maxChunkSize, maxChunkSize - 14}, + expectedRowsDS: []int{7, 7, maxChunkSize, maxChunkSize - 14, 0}, + }, + { + totalRows: 20, + numWorkers: 2, + requiredRows: []int{1, 2, 3, 4, 5, 6, 1, 1, 1}, + expectedRows: []int{1, 1, 1, 2, 3, 4, 5, 3, 0}, + expectedRowsDS: []int{1, 1, 1, 2, 3, 4, 5, 3, 0}, + }, + } + + for _, testCase := range testCases { + sctx := defaultCtx() + ctx := context.Background() + ds := newRequiredRowsDataSource(sctx, testCase.totalRows, testCase.expectedRowsDS) + exprs := make([]expression.Expression, 0, len(ds.Schema().Columns)) + if len(exprs) == 0 { + for _, col := range ds.Schema().Columns { + exprs = append(exprs, col) + } + } + exec := buildProjectionExec(sctx, exprs, ds, testCase.numWorkers) + c.Assert(exec.Open(ctx), IsNil) + chk := exec.newFirstChunk() + for i := range testCase.requiredRows { + chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) + c.Assert(exec.Next(ctx, chk), IsNil) + c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i]) + + // wait projectionInputFetcher blocked on fetching data + // from child in the background. + time.Sleep(time.Millisecond * 5) + } + c.Assert(exec.Close(), IsNil) + c.Assert(ds.checkNumNextCalled(), IsNil) + } +} + +func buildProjectionExec(ctx sessionctx.Context, exprs []expression.Expression, src Executor, numWorkers int) Executor { + return &ProjectionExec{ + baseExecutor: newBaseExecutor(ctx, src.Schema(), "", src), + numWorkers: int64(numWorkers), + evaluatorSuit: expression.NewEvaluatorSuite(exprs, false), + } +} diff --git a/executor/projection.go b/executor/projection.go index 2a69b30e3d6e6..52e0b0d771651 100644 --- a/executor/projection.go +++ b/executor/projection.go @@ -16,6 +16,7 @@ package executor import ( "context" "fmt" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -63,6 +64,13 @@ type ProjectionExec struct { numWorkers int64 workers []*projectionWorker childResult *chunk.Chunk + + // parentReqRows indicates how many rows the parent executor is + // requiring. It is set when parallelExecute() is called and used by the + // concurrent projectionInputFetcher. + // + // NOTE: It should be protected by atomic operations. + parentReqRows int64 } // Open implements the Executor Open interface. @@ -72,6 +80,7 @@ func (e *ProjectionExec) Open(ctx context.Context) error { } e.prepared = false + e.parentReqRows = int64(e.maxChunkSize) // For now a Projection can not be executed vectorially only because it // contains "SetVar" or "GetVar" functions, in this scenario this @@ -162,6 +171,8 @@ func (e *ProjectionExec) isUnparallelExec() bool { } func (e *ProjectionExec) unParallelExecute(ctx context.Context, chk *chunk.Chunk) error { + // push requiredRows down + e.childResult.SetRequiredRows(chk.RequiredRows(), e.maxChunkSize) err := e.children[0].Next(ctx, e.childResult) if err != nil { return errors.Trace(err) @@ -171,6 +182,7 @@ func (e *ProjectionExec) unParallelExecute(ctx context.Context, chk *chunk.Chunk } func (e *ProjectionExec) parallelExecute(ctx context.Context, chk *chunk.Chunk) error { + atomic.StoreInt64(&e.parentReqRows, int64(chk.RequiredRows())) if !e.prepared { e.prepare(ctx) e.prepared = true @@ -197,6 +209,7 @@ func (e *ProjectionExec) prepare(ctx context.Context) { // Initialize projectionInputFetcher. e.fetcher = projectionInputFetcher{ + proj: e, child: e.children[0], globalFinishCh: e.finishCh, globalOutputCh: e.outputCh, @@ -249,6 +262,7 @@ func (e *ProjectionExec) Close() error { } type projectionInputFetcher struct { + proj *ProjectionExec child Executor globalFinishCh <-chan struct{} globalOutputCh chan<- *projectionOutput @@ -290,6 +304,8 @@ func (f *projectionInputFetcher) run(ctx context.Context) { f.globalOutputCh <- output + requiredRows := atomic.LoadInt64(&f.proj.parentReqRows) + input.chk.SetRequiredRows(int(requiredRows), f.proj.maxChunkSize) err := f.child.Next(ctx, input.chk) if err != nil || input.chk.NumRows() == 0 { output.done <- errors.Trace(err)