-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
planner: fix wrong agg function when agg push down union #17022
Changes from 5 commits
dda371a
9d9b5b4
e21e425
54ced2c
7a9029a
7b64b50
58a1db2
151b6a4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -320,7 +320,7 @@ func (a *aggregationPushDownSolver) splitPartialAgg(agg *LogicalAggregation) (pu | |
|
||
// pushAggCrossUnion will try to push the agg down to the union. If the new aggregation's group-by columns doesn't contain unique key. | ||
// We will return the new aggregation. Otherwise we will transform the aggregation to projection. | ||
func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, unionSchema *expression.Schema, unionChild LogicalPlan) LogicalPlan { | ||
func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, unionSchema *expression.Schema, unionChild LogicalPlan) (LogicalPlan, error) { | ||
ctx := agg.ctx | ||
newAgg := LogicalAggregation{ | ||
AggFuncs: make([]*aggregation.AggFuncDesc, 0, len(agg.AggFuncs)), | ||
|
@@ -340,6 +340,11 @@ func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, u | |
for _, gbyExpr := range agg.GroupByItems { | ||
newExpr := expression.ColumnSubstitute(gbyExpr, unionSchema, expression.Column2Exprs(unionChild.Schema().Columns)) | ||
newAgg.GroupByItems = append(newAgg.GroupByItems, newExpr) | ||
firstRow, err := aggregation.NewAggFuncDesc(agg.ctx, ast.AggFuncFirstRow, []expression.Expression{gbyExpr}, false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we do this, we need to change the schema for partial aggregation. I'm afraid there would be some other corner cases, so I prefer to keep the duplicated |
||
if err != nil { | ||
return nil, err | ||
} | ||
newAgg.AggFuncs = append(newAgg.AggFuncs, firstRow) | ||
} | ||
newAgg.collectGroupByColumns() | ||
tmpSchema := expression.NewSchema(newAgg.groupByCols...) | ||
|
@@ -350,13 +355,13 @@ func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, u | |
if tmpSchema.ColumnsIndices(key) != nil { | ||
if ok, proj := ConvertAggToProj(newAgg, newAgg.schema); ok { | ||
proj.SetChildren(unionChild) | ||
return proj | ||
return proj, nil | ||
} | ||
break | ||
} | ||
} | ||
newAgg.SetChildren(unionChild) | ||
return newAgg | ||
return newAgg, nil | ||
} | ||
|
||
func (a *aggregationPushDownSolver) optimize(ctx context.Context, p LogicalPlan) (LogicalPlan, error) { | ||
|
@@ -430,7 +435,10 @@ func (a *aggregationPushDownSolver) aggPushDown(p LogicalPlan) (_ LogicalPlan, e | |
pushedAgg := a.splitPartialAgg(agg) | ||
newChildren := make([]LogicalPlan, 0, len(union.children)) | ||
for _, child := range union.children { | ||
newChild := a.pushAggCrossUnion(pushedAgg, union.Schema(), child) | ||
newChild, err := a.pushAggCrossUnion(pushedAgg, union.Schema(), child) | ||
if err != nil { | ||
return p, err | ||
} | ||
newChildren = append(newChildren, newChild) | ||
} | ||
union.SetSchema(expression.NewSchema(newChildren[0].Schema().Columns...)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1554,14 +1554,14 @@ | |
{ | ||
"SQL": "select /*+ HASH_AGG(), AGG_TO_COP() */ sum(distinct b) from pt;", | ||
"Plan": [ | ||
"HashAgg_11 1.00 root funcs:sum(distinct Column#7)->Column#4", | ||
"└─Projection_23 16000.00 root cast(test.pt.b, decimal(65,0) BINARY)->Column#7", | ||
"HashAgg_11 1.00 root funcs:sum(distinct Column#9)->Column#4", | ||
"└─Projection_23 16000.00 root cast(test.pt.b, decimal(65,0) BINARY)->Column#9", | ||
" └─Union_12 16000.00 root ", | ||
" ├─HashAgg_16 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b", | ||
" ├─HashAgg_16 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b", | ||
" │ └─TableReader_17 8000.00 root data:HashAgg_13", | ||
" │ └─HashAgg_13 8000.00 cop[tikv] group by:test.pt.b, ", | ||
" │ └─TableFullScan_15 10000.00 cop[tikv] table:pt, partition:p0 keep order:false, stats:pseudo", | ||
" └─HashAgg_21 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b", | ||
" └─HashAgg_21 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b", | ||
" └─TableReader_22 8000.00 root data:HashAgg_18", | ||
" └─HashAgg_18 8000.00 cop[tikv] group by:test.pt.b, ", | ||
" └─TableFullScan_20 10000.00 cop[tikv] table:pt, partition:p1 keep order:false, stats:pseudo" | ||
|
@@ -1575,11 +1575,11 @@ | |
"Plan": [ | ||
"HashAgg_14 1.00 root funcs:count(distinct Column#5)->Column#6", | ||
"└─Union_15 16000.00 root ", | ||
" ├─HashAgg_19 8000.00 root group by:test.ta.a, funcs:firstrow(test.ta.a)->Column#5", | ||
" ├─HashAgg_19 8000.00 root group by:test.ta.a, funcs:firstrow(test.ta.a)->Column#5, funcs:firstrow(test.ta.a)->Column#5", | ||
" │ └─TableReader_20 8000.00 root data:HashAgg_16", | ||
" │ └─HashAgg_16 8000.00 cop[tikv] group by:test.ta.a, ", | ||
" │ └─TableFullScan_18 10000.00 cop[tikv] table:ta keep order:false, stats:pseudo", | ||
" └─HashAgg_24 8000.00 root group by:test.tb.a, funcs:firstrow(test.tb.a)->Column#5", | ||
" └─HashAgg_24 8000.00 root group by:test.tb.a, funcs:firstrow(test.tb.a)->Column#5, funcs:firstrow(test.tb.a)->Column#5", | ||
" └─TableReader_25 8000.00 root data:HashAgg_21", | ||
" └─HashAgg_21 8000.00 cop[tikv] group by:test.tb.a, ", | ||
" └─TableFullScan_23 10000.00 cop[tikv] table:tb keep order:false, stats:pseudo" | ||
|
@@ -1683,14 +1683,14 @@ | |
{ | ||
"SQL": "select /*+ HASH_AGG(), AGG_TO_COP() */ sum(distinct b) from pt;", | ||
"Plan": [ | ||
"HashAgg_11 1.00 root funcs:sum(distinct Column#7)->Column#4", | ||
"└─Projection_23 16000.00 root cast(test.pt.b, decimal(65,0) BINARY)->Column#7", | ||
"HashAgg_11 1.00 root funcs:sum(distinct Column#9)->Column#4", | ||
"└─Projection_23 16000.00 root cast(test.pt.b, decimal(65,0) BINARY)->Column#9", | ||
" └─Union_12 16000.00 root ", | ||
" ├─HashAgg_16 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b", | ||
" ├─HashAgg_16 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. For correctness. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we check if the firstrow is already existed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the partial aggregation should return the group by columns. In that way, final aggregation could execute. |
||
" │ └─TableReader_17 8000.00 root data:HashAgg_13", | ||
" │ └─HashAgg_13 8000.00 cop[tikv] group by:test.pt.b, ", | ||
" │ └─TableFullScan_15 10000.00 cop[tikv] table:pt, partition:p0 keep order:false, stats:pseudo", | ||
" └─HashAgg_21 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b", | ||
" └─HashAgg_21 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b", | ||
" └─TableReader_22 8000.00 root data:HashAgg_18", | ||
" └─HashAgg_18 8000.00 cop[tikv] group by:test.pt.b, ", | ||
" └─TableFullScan_20 10000.00 cop[tikv] table:pt, partition:p1 keep order:false, stats:pseudo" | ||
|
@@ -1704,11 +1704,11 @@ | |
"Plan": [ | ||
"HashAgg_14 1.00 root funcs:count(distinct Column#5)->Column#6", | ||
"└─Union_15 16000.00 root ", | ||
" ├─HashAgg_19 8000.00 root group by:test.ta.a, funcs:firstrow(test.ta.a)->Column#5", | ||
" ├─HashAgg_19 8000.00 root group by:test.ta.a, funcs:firstrow(test.ta.a)->Column#5, funcs:firstrow(test.ta.a)->Column#5", | ||
" │ └─TableReader_20 8000.00 root data:HashAgg_16", | ||
" │ └─HashAgg_16 8000.00 cop[tikv] group by:test.ta.a, ", | ||
" │ └─TableFullScan_18 10000.00 cop[tikv] table:ta keep order:false, stats:pseudo", | ||
" └─HashAgg_24 8000.00 root group by:test.tb.a, funcs:firstrow(test.tb.a)->Column#5", | ||
" └─HashAgg_24 8000.00 root group by:test.tb.a, funcs:firstrow(test.tb.a)->Column#5, funcs:firstrow(test.tb.a)->Column#5", | ||
" └─TableReader_25 8000.00 root data:HashAgg_21", | ||
" └─HashAgg_21 8000.00 cop[tikv] group by:test.tb.a, ", | ||
" └─TableFullScan_23 10000.00 cop[tikv] table:tb keep order:false, stats:pseudo" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set @@tidb_opt_agg_push_down=1