Skip to content

Commit

Permalink
Merge branch 'master' of github.com:pingcap/tidb into values
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx committed Mar 6, 2019
2 parents 48fb31c + 2b646cb commit ffe851a
Show file tree
Hide file tree
Showing 22 changed files with 351 additions and 93 deletions.
2 changes: 1 addition & 1 deletion cmd/explaintest/r/window_function.result
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Projection_7 10000.00 root sum(a) over(partition by a)
explain select sum(a) over(partition by a order by b) from t;
id count task operator info
Projection_7 10000.00 root sum(a) over(partition by a order by b)
└─Window_8 10000.00 root sum(cast(test.t.a)) over(partition by test.t.a order by test.t.b asc)
└─Window_8 10000.00 root sum(cast(test.t.a)) over(partition by test.t.a order by test.t.b asc range between unbounded preceding and current row)
└─Sort_12 10000.00 root test.t.a:asc, test.t.b:asc
└─TableReader_11 10000.00 root data:TableScan_10
└─TableScan_10 10000.00 cop table:t, range:[-inf,+inf], keep order:false, stats:pseudo
Expand Down
43 changes: 19 additions & 24 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"sync"
"time"

opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/executor/aggfuncs"
Expand Down Expand Up @@ -87,8 +87,6 @@ type HashAggFinalWorker struct {
type AfFinalResult struct {
chk *chunk.Chunk
err error

giveBackCh chan *chunk.Chunk
}

// HashAggExec deals with all the aggregate functions.
Expand Down Expand Up @@ -153,6 +151,7 @@ type HashAggExec struct {

finishCh chan struct{}
finalOutputCh chan *AfFinalResult
finalInputCh chan *chunk.Chunk
partialOutputChs []chan *HashAggIntermData
inputCh chan *HashAggInput
partialInputChs []chan *chunk.Chunk
Expand Down Expand Up @@ -248,6 +247,7 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
partialConcurrency := sessionVars.HashAggPartialConcurrency
e.isChildReturnEmpty = true
e.finalOutputCh = make(chan *AfFinalResult, finalConcurrency)
e.finalInputCh = make(chan *chunk.Chunk, finalConcurrency)
e.inputCh = make(chan *HashAggInput, partialConcurrency)
e.finishCh = make(chan struct{}, 1)

Expand Down Expand Up @@ -292,11 +292,10 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
groupSet: set.NewStringSet(),
inputCh: e.partialOutputChs[i],
outputCh: e.finalOutputCh,
finalResultHolderCh: make(chan *chunk.Chunk, 1),
finalResultHolderCh: e.finalInputCh,
rowBuffer: make([]types.Datum, 0, e.Schema().Len()),
mutableRow: chunk.MutRowFromTypes(e.retTypes()),
}
e.finalWorkers[i].finalResultHolderCh <- e.newFirstChunk()
}
}

Expand Down Expand Up @@ -470,7 +469,6 @@ func (w *HashAggFinalWorker) getFinalResult(sctx sessionctx.Context) {
if finished {
return
}
result.Reset()
for groupKey := range w.groupSet {
partialResults := w.getPartialResult(sctx.GetSessionVars().StmtCtx, []byte(groupKey), w.partialResultMap)
for i, af := range w.aggFuncs {
Expand All @@ -481,18 +479,15 @@ func (w *HashAggFinalWorker) getFinalResult(sctx sessionctx.Context) {
if len(w.aggFuncs) == 0 {
result.SetNumVirtualRows(result.NumRows() + 1)
}
if result.NumRows() == w.maxChunkSize {
w.outputCh <- &AfFinalResult{chk: result, giveBackCh: w.finalResultHolderCh}
if result.IsFull() {
w.outputCh <- &AfFinalResult{chk: result}
result, finished = w.receiveFinalResultHolder()
if finished {
return
}
result.Reset()
}
}
if result.NumRows() > 0 {
w.outputCh <- &AfFinalResult{chk: result, giveBackCh: w.finalResultHolderCh}
}
w.outputCh <- &AfFinalResult{chk: result}
}

func (w *HashAggFinalWorker) receiveFinalResultHolder() (*chunk.Chunk, bool) {
Expand Down Expand Up @@ -617,24 +612,24 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error
// return errors.New("HashAggExec.parallelExec error")
// }

for {
for !chk.IsFull() {
e.finalInputCh <- chk
result, ok := <-e.finalOutputCh
if !ok || result.err != nil || result.chk.NumRows() == 0 {
if result != nil {
return errors.Trace(result.err)
if !ok { // all finalWorkers exited
if chk.NumRows() > 0 { // but there are some data left
return nil
}
if e.isChildReturnEmpty && e.defaultVal != nil {
chk.Append(e.defaultVal, 0, 1)
}
e.isChildReturnEmpty = false
return nil
}
e.isChildReturnEmpty = false
chk.SwapColumns(result.chk)
// Put result.chk back to the corresponded final worker's finalResultHolderCh.
result.giveBackCh <- result.chk
if result.err != nil {
return result.err
}
if chk.NumRows() > 0 {
break
e.isChildReturnEmpty = false
}
}
return nil
Expand Down Expand Up @@ -668,11 +663,11 @@ func (e *HashAggExec) unparallelExec(ctx context.Context, chk *chunk.Chunk) erro
chk.SetNumVirtualRows(chk.NumRows() + 1)
}
for i, af := range e.PartialAggFuncs {
if err := (af.AppendFinalResult2Chunk(e.ctx, partialResults[i], chk)); err != nil {
if err := af.AppendFinalResult2Chunk(e.ctx, partialResults[i], chk); err != nil {
return err
}
}
if chk.NumRows() == e.maxChunkSize {
if chk.IsFull() {
e.cursor4GroupKey++
return nil
}
Expand Down Expand Up @@ -805,7 +800,7 @@ func (e *StreamAggExec) Next(ctx context.Context, req *chunk.RecordBatch) error
defer func() { e.runtimeStats.Record(time.Since(start), req.NumRows()) }()
}
req.Reset()
for !e.executed && req.NumRows() < e.maxChunkSize {
for !e.executed && !req.IsFull() {
err := e.consumeOneGroup(ctx, req.Chunk)
if err != nil {
e.executed = true
Expand Down
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1942,7 +1942,7 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec
partialResult: agg.AllocPartialResult(),
start: v.Frame.Start,
end: v.Frame.End,
col: v.OrderBy[0].Col,
orderByCols: orderByCols,
expectedCmpResult: cmpResult,
}
}
Expand Down
138 changes: 138 additions & 0 deletions executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -589,3 +590,140 @@ func buildProjectionExec(ctx sessionctx.Context, exprs []expression.Expression,
evaluatorSuit: expression.NewEvaluatorSuite(exprs, false),
}
}

func divGenerator(factor int) func(valType *types.FieldType) interface{} {
closureCountInt := 0
closureCountDouble := 0
return func(valType *types.FieldType) interface{} {
switch valType.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
ret := int64(closureCountInt / factor)
closureCountInt++
return ret
case mysql.TypeDouble:
ret := float64(closureCountInt / factor)
closureCountDouble++
return ret
default:
panic("not implement")
}
}
}

func (s *testExecSuite) TestStreamAggRequiredRows(c *C) {
maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize
testCases := []struct {
totalRows int
aggFunc string
requiredRows []int
expectedRows []int
expectedRowsDS []int
gen func(valType *types.FieldType) interface{}
}{
{
totalRows: 1000000,
aggFunc: ast.AggFuncSum,
requiredRows: []int{1, 2, 3, 4, 5, 6, 7},
expectedRows: []int{1, 2, 3, 4, 5, 6, 7},
expectedRowsDS: []int{maxChunkSize},
gen: divGenerator(1),
},
{
totalRows: maxChunkSize * 3,
aggFunc: ast.AggFuncAvg,
requiredRows: []int{1, 3},
expectedRows: []int{1, 2},
expectedRowsDS: []int{maxChunkSize, maxChunkSize, maxChunkSize, 0},
gen: divGenerator(maxChunkSize),
},
{
totalRows: maxChunkSize*2 - 1,
aggFunc: ast.AggFuncMax,
requiredRows: []int{maxChunkSize/2 + 1},
expectedRows: []int{maxChunkSize/2 + 1},
expectedRowsDS: []int{maxChunkSize, maxChunkSize - 1},
gen: divGenerator(2),
},
}

for _, testCase := range testCases {
sctx := defaultCtx()
ctx := context.Background()
ds := newRequiredRowsDataSourceWithGenerator(sctx, testCase.totalRows, testCase.expectedRowsDS, testCase.gen)
childCols := ds.Schema().Columns
schema := expression.NewSchema(childCols...)
groupBy := []expression.Expression{childCols[1]}
aggFunc := aggregation.NewAggFuncDesc(sctx, testCase.aggFunc, []expression.Expression{childCols[0]}, true)
aggFuncs := []*aggregation.AggFuncDesc{aggFunc}
exec := buildStreamAggExecutor(sctx, ds, schema, aggFuncs, groupBy)
c.Assert(exec.Open(ctx), IsNil)
chk := exec.newFirstChunk()
for i := range testCase.requiredRows {
chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize)
c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil)
c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i])
}
c.Assert(exec.Close(), IsNil)
c.Assert(ds.checkNumNextCalled(), IsNil)
}
}

func (s *testExecSuite) TestHashAggParallelRequiredRows(c *C) {
maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize
testCases := []struct {
totalRows int
aggFunc string
requiredRows []int
expectedRows []int
expectedRowsDS []int
gen func(valType *types.FieldType) interface{}
}{
{
totalRows: maxChunkSize,
aggFunc: ast.AggFuncSum,
requiredRows: []int{1, 2, 3, 4, 5, 6, 7},
expectedRows: []int{1, 2, 3, 4, 5, 6, 7},
expectedRowsDS: []int{maxChunkSize, 0},
gen: divGenerator(1),
},
{
totalRows: maxChunkSize * 3,
aggFunc: ast.AggFuncAvg,
requiredRows: []int{1, 3},
expectedRows: []int{1, 2},
expectedRowsDS: []int{maxChunkSize, maxChunkSize, maxChunkSize, 0},
gen: divGenerator(maxChunkSize),
},
{
totalRows: maxChunkSize * 3,
aggFunc: ast.AggFuncAvg,
requiredRows: []int{maxChunkSize, maxChunkSize},
expectedRows: []int{maxChunkSize, maxChunkSize / 2},
expectedRowsDS: []int{maxChunkSize, maxChunkSize, maxChunkSize, 0},
gen: divGenerator(2),
},
}

for _, hasDistinct := range []bool{false, true} {
for _, testCase := range testCases {
sctx := defaultCtx()
ctx := context.Background()
ds := newRequiredRowsDataSourceWithGenerator(sctx, testCase.totalRows, testCase.expectedRowsDS, testCase.gen)
childCols := ds.Schema().Columns
schema := expression.NewSchema(childCols...)
groupBy := []expression.Expression{childCols[1]}
aggFunc := aggregation.NewAggFuncDesc(sctx, testCase.aggFunc, []expression.Expression{childCols[0]}, hasDistinct)
aggFuncs := []*aggregation.AggFuncDesc{aggFunc}
exec := buildHashAggExecutor(sctx, ds, schema, aggFuncs, groupBy)
c.Assert(exec.Open(ctx), IsNil)
chk := exec.newFirstChunk()
for i := range testCase.requiredRows {
chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize)
c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil)
c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i])
}
c.Assert(exec.Close(), IsNil)
c.Assert(ds.checkNumNextCalled(), IsNil)
}
}
}
1 change: 1 addition & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2210,6 +2210,7 @@ func (s *testSuite) TestTimestampDefaultValueTimeZone(c *C) {
is := domain.GetDomain(sctx).InfoSchema()
c.Assert(is, NotNil)
tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tb.Cols()[1].Version = model.ColumnInfoVersion1 + 1
tk.MustExec("insert into t set a=3")
r = tk.MustQuery(`select a,b from t order by a`)
Expand Down
28 changes: 21 additions & 7 deletions executor/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ type rangeFrameWindowProcessor struct {
curRowIdx uint64
lastStartOffset uint64
lastEndOffset uint64
col *expression.Column
orderByCols []*expression.Column
// expectedCmpResult is used to decide if one value is included in the frame.
expectedCmpResult int64
}
Expand All @@ -326,9 +326,16 @@ func (p *rangeFrameWindowProcessor) getStartOffset(ctx sessionctx.Context, rows
}
numRows := uint64(len(rows))
for ; p.lastStartOffset < numRows; p.lastStartOffset++ {
res, _, err := p.start.CmpFunc(ctx, p.col, p.start.CalcFunc, rows[p.lastStartOffset], rows[p.curRowIdx])
if err != nil {
return 0, err
var res int64
var err error
for i := range p.orderByCols {
res, _, err = p.start.CmpFuncs[i](ctx, p.orderByCols[i], p.start.CalcFuncs[i], rows[p.lastStartOffset], rows[p.curRowIdx])
if err != nil {
return 0, err
}
if res != 0 {
break
}
}
// For asc, break when the current value is greater or equal to the calculated result;
// For desc, break when the current value is less or equal to the calculated result.
Expand All @@ -345,9 +352,16 @@ func (p *rangeFrameWindowProcessor) getEndOffset(ctx sessionctx.Context, rows []
return numRows, nil
}
for ; p.lastEndOffset < numRows; p.lastEndOffset++ {
res, _, err := p.end.CmpFunc(ctx, p.end.CalcFunc, p.col, rows[p.curRowIdx], rows[p.lastEndOffset])
if err != nil {
return 0, err
var res int64
var err error
for i := range p.orderByCols {
res, _, err = p.end.CmpFuncs[i](ctx, p.end.CalcFuncs[i], p.orderByCols[i], rows[p.curRowIdx], rows[p.lastEndOffset])
if err != nil {
return 0, err
}
if res != 0 {
break
}
}
// For asc, break when the calculated result is greater than the current value.
// For desc, break when the calculated result is less than the current value.
Expand Down
12 changes: 12 additions & 0 deletions executor/window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,18 @@ func (s *testSuite2) TestWindowFunctions(c *C) {
result = tk.MustQuery("select a, b, dense_rank() over(order by a, b) from t")
result.Check(testkit.Rows("1 1 1", "1 2 2", "2 1 3", "2 2 4"))

result = tk.MustQuery("select row_number() over(rows between 1 preceding and 1 following) from t")
result.Check(testkit.Rows("1", "2", "3", "4"))
result = tk.MustQuery("show warnings")
result.Check(testkit.Rows("Note 3599 Window function 'row_number' ignores the frame clause of window '<unnamed window>' and aggregates over the whole partition"))

result = tk.MustQuery("select a, sum(a) over() from t")
result.Check(testkit.Rows("1 6", "1 6", "2 6", "2 6"))
result = tk.MustQuery("select a, sum(a) over(order by a) from t")
result.Check(testkit.Rows("1 2", "1 2", "2 6", "2 6"))
result = tk.MustQuery("select a, sum(a) over(order by a, b) from t")
result.Check(testkit.Rows("1 1", "1 2", "2 4", "2 6"))

result = tk.MustQuery("select a, first_value(a) over(), last_value(a) over() from t")
result.Check(testkit.Rows("1 1 2", "1 1 2", "2 1 2", "2 1 2"))
result = tk.MustQuery("select a, first_value(a) over(rows between 1 preceding and 1 following), last_value(a) over(rows between 1 preceding and 1 following) from t")
Expand Down
Loading

0 comments on commit ffe851a

Please sign in to comment.