From 8f2e10e954ad53d49e06213443f5799dd2c8b046 Mon Sep 17 00:00:00 2001 From: "Zhuomin(Charming) Liu" Date: Wed, 20 May 2020 21:50:15 +0800 Subject: [PATCH] planner: fix wrong agg function when agg push down union (#17022) --- cmd/explaintest/r/explain_easy.result | 18 +++++------ executor/aggregate_test.go | 32 +++++++++++++++++++ planner/core/rule_aggregation_push_down.go | 17 +++++++--- planner/core/testdata/plan_suite_out.json | 24 +++++++------- .../testdata/plan_suite_unexported_out.json | 4 +-- 5 files changed, 68 insertions(+), 27 deletions(-) diff --git a/cmd/explaintest/r/explain_easy.result b/cmd/explaintest/r/explain_easy.result index 6d2cacb8402e6..231d5782c3e7c 100644 --- a/cmd/explaintest/r/explain_easy.result +++ b/cmd/explaintest/r/explain_easy.result @@ -164,10 +164,10 @@ id estRows task access object operator info Union_17 26000.00 root ├─HashAgg_21 16000.00 root group by:Column#10, funcs:firstrow(Column#12)->Column#10 │ └─Union_22 16000.00 root -│ ├─StreamAgg_27 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#12 +│ ├─StreamAgg_27 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#12, funcs:firstrow(test.t2.c1)->Column#10 │ │ └─IndexReader_40 10000.00 root index:IndexFullScan_39 │ │ └─IndexFullScan_39 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo -│ └─StreamAgg_45 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#12 +│ └─StreamAgg_45 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#12, funcs:firstrow(test.t2.c1)->Column#10 │ └─IndexReader_58 10000.00 root index:IndexFullScan_57 │ └─IndexFullScan_57 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo └─IndexReader_63 10000.00 root index:IndexFullScan_62 @@ -176,13 +176,13 @@ explain select c1 from t2 union all select c1 from t2 union select c1 from t2; id estRows task access object operator info HashAgg_18 24000.00 root group by:Column#10, funcs:firstrow(Column#11)->Column#10 └─Union_19 24000.00 root - ├─StreamAgg_24 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11 + ├─StreamAgg_24 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11, funcs:firstrow(test.t2.c1)->Column#10 │ └─IndexReader_37 10000.00 root index:IndexFullScan_36 │ └─IndexFullScan_36 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo - ├─StreamAgg_42 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11 + ├─StreamAgg_42 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11, funcs:firstrow(test.t2.c1)->Column#10 │ └─IndexReader_55 10000.00 root index:IndexFullScan_54 │ └─IndexFullScan_54 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo - └─StreamAgg_60 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11 + └─StreamAgg_60 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11, funcs:firstrow(test.t2.c1)->Column#10 └─IndexReader_73 10000.00 root index:IndexFullScan_72 └─IndexFullScan_72 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo select * from information_schema.tidb_indexes where table_name='t4'; @@ -669,17 +669,17 @@ id estRows task access object operator info Sort_13 2.00 root Column#3:asc └─HashAgg_17 2.00 root group by:Column#3, funcs:firstrow(Column#6)->Column#3 └─Union_18 2.00 root - ├─HashAgg_19 1.00 root group by:1, funcs:firstrow(0)->Column#6 + ├─HashAgg_19 1.00 root group by:1, funcs:firstrow(0)->Column#6, funcs:firstrow(0)->Column#3 │ └─TableDual_22 1.00 root rows:1 - └─HashAgg_25 1.00 root group by:1, funcs:firstrow(1)->Column#6 + └─HashAgg_25 1.00 root group by:1, funcs:firstrow(1)->Column#6, funcs:firstrow(1)->Column#3 └─TableDual_28 1.00 root rows:1 explain SELECT 0 AS a FROM dual UNION (SELECT 1 AS a FROM dual ORDER BY a); id estRows task access object operator info HashAgg_15 2.00 root group by:Column#3, funcs:firstrow(Column#6)->Column#3 └─Union_16 2.00 root - ├─HashAgg_17 1.00 root group by:1, funcs:firstrow(0)->Column#6 + ├─HashAgg_17 1.00 root group by:1, funcs:firstrow(0)->Column#6, funcs:firstrow(0)->Column#3 │ └─TableDual_20 1.00 root rows:1 - └─StreamAgg_27 1.00 root group by:Column#1, funcs:firstrow(Column#1)->Column#6 + └─StreamAgg_27 1.00 root group by:Column#1, funcs:firstrow(Column#1)->Column#6, funcs:firstrow(Column#1)->Column#3 └─Projection_32 1.00 root 1->Column#1 └─TableDual_33 1.00 root rows:1 create table t (i int key, j int, unique key (i, j)); diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index 2364ddcd07ff9..fee57a7239037 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -729,6 +729,38 @@ func (s *testSuiteAgg) TestIssue16279(c *C) { tk.MustQuery("select count(a) , date_format(a, '%Y-%m-%d') as xx from s group by xx") } +func (s *testSuiteAgg) TestAggPushDownPartitionTable(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec(`CREATE TABLE t1 ( + a int(11) DEFAULT NULL, + b tinyint(4) NOT NULL, + PRIMARY KEY (b) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin + PARTITION BY RANGE ( b ) ( + PARTITION p0 VALUES LESS THAN (10), + PARTITION p1 VALUES LESS THAN (20), + PARTITION p2 VALUES LESS THAN (30), + PARTITION p3 VALUES LESS THAN (40), + PARTITION p4 VALUES LESS THAN (MAXVALUE) + )`) + tk.MustExec("insert into t1 values (0, 0), (1, 1), (1, 2), (1, 3), (2, 4), (2, 5), (2, 6), (3, 7), (3, 10), (3, 11), (12, 12), (12, 13), (14, 14), (14, 15), (20, 20), (20, 21), (20, 22), (23, 23), (23, 24), (23, 25), (31, 30), (31, 31), (31, 32), (33, 33), (33, 34), (33, 35), (36, 36), (80, 80), (90, 90), (100, 100)") + tk.MustExec("set @@tidb_opt_agg_push_down = 1") + tk.MustQuery("select /*+ AGG_TO_COP() */ sum(a), sum(b) from t1 where a < 40 group by a").Sort().Check(testkit.Rows( + "0 0", + "24 25", + "28 29", + "3 6", + "36 36", + "6 15", + "60 63", + "69 72", + "9 28", + "93 93", + "99 102")) +} + func (s *testSuiteAgg) TestIssue13652(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/planner/core/rule_aggregation_push_down.go b/planner/core/rule_aggregation_push_down.go index 179a2d311f414..dc44480da4362 100644 --- a/planner/core/rule_aggregation_push_down.go +++ b/planner/core/rule_aggregation_push_down.go @@ -320,7 +320,7 @@ func (a *aggregationPushDownSolver) splitPartialAgg(agg *LogicalAggregation) (pu // pushAggCrossUnion will try to push the agg down to the union. If the new aggregation's group-by columns doesn't contain unique key. // We will return the new aggregation. Otherwise we will transform the aggregation to projection. -func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, unionSchema *expression.Schema, unionChild LogicalPlan) LogicalPlan { +func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, unionSchema *expression.Schema, unionChild LogicalPlan) (LogicalPlan, error) { ctx := agg.ctx newAgg := LogicalAggregation{ AggFuncs: make([]*aggregation.AggFuncDesc, 0, len(agg.AggFuncs)), @@ -340,6 +340,12 @@ func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, u for _, gbyExpr := range agg.GroupByItems { newExpr := expression.ColumnSubstitute(gbyExpr, unionSchema, expression.Column2Exprs(unionChild.Schema().Columns)) newAgg.GroupByItems = append(newAgg.GroupByItems, newExpr) + // TODO: if there is a duplicated first_row function, we can delete it. + firstRow, err := aggregation.NewAggFuncDesc(agg.ctx, ast.AggFuncFirstRow, []expression.Expression{gbyExpr}, false) + if err != nil { + return nil, err + } + newAgg.AggFuncs = append(newAgg.AggFuncs, firstRow) } newAgg.collectGroupByColumns() tmpSchema := expression.NewSchema(newAgg.groupByCols...) @@ -350,13 +356,13 @@ func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, u if tmpSchema.ColumnsIndices(key) != nil { if ok, proj := ConvertAggToProj(newAgg, newAgg.schema); ok { proj.SetChildren(unionChild) - return proj + return proj, nil } break } } newAgg.SetChildren(unionChild) - return newAgg + return newAgg, nil } func (a *aggregationPushDownSolver) optimize(ctx context.Context, p LogicalPlan) (LogicalPlan, error) { @@ -430,7 +436,10 @@ func (a *aggregationPushDownSolver) aggPushDown(p LogicalPlan) (_ LogicalPlan, e pushedAgg := a.splitPartialAgg(agg) newChildren := make([]LogicalPlan, 0, len(union.children)) for _, child := range union.children { - newChild := a.pushAggCrossUnion(pushedAgg, union.Schema(), child) + newChild, err := a.pushAggCrossUnion(pushedAgg, union.Schema(), child) + if err != nil { + return p, err + } newChildren = append(newChildren, newChild) } union.SetSchema(expression.NewSchema(newChildren[0].Schema().Columns...)) diff --git a/planner/core/testdata/plan_suite_out.json b/planner/core/testdata/plan_suite_out.json index d2a201411e3f4..10d3e3600db73 100644 --- a/planner/core/testdata/plan_suite_out.json +++ b/planner/core/testdata/plan_suite_out.json @@ -1554,14 +1554,14 @@ { "SQL": "select /*+ HASH_AGG(), AGG_TO_COP() */ sum(distinct b) from pt;", "Plan": [ - "HashAgg_11 1.00 root funcs:sum(distinct Column#7)->Column#4", - "└─Projection_23 16000.00 root cast(test.pt.b, decimal(65,0) BINARY)->Column#7", + "HashAgg_11 1.00 root funcs:sum(distinct Column#9)->Column#4", + "└─Projection_23 16000.00 root cast(test.pt.b, decimal(65,0) BINARY)->Column#9", " └─Union_12 16000.00 root ", - " ├─HashAgg_16 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b", + " ├─HashAgg_16 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b", " │ └─TableReader_17 8000.00 root data:HashAgg_13", " │ └─HashAgg_13 8000.00 cop[tikv] group by:test.pt.b, ", " │ └─TableFullScan_15 10000.00 cop[tikv] table:pt, partition:p0 keep order:false, stats:pseudo", - " └─HashAgg_21 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b", + " └─HashAgg_21 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b", " └─TableReader_22 8000.00 root data:HashAgg_18", " └─HashAgg_18 8000.00 cop[tikv] group by:test.pt.b, ", " └─TableFullScan_20 10000.00 cop[tikv] table:pt, partition:p1 keep order:false, stats:pseudo" @@ -1575,11 +1575,11 @@ "Plan": [ "HashAgg_14 1.00 root funcs:count(distinct Column#5)->Column#6", "└─Union_15 16000.00 root ", - " ├─HashAgg_19 8000.00 root group by:test.ta.a, funcs:firstrow(test.ta.a)->Column#5", + " ├─HashAgg_19 8000.00 root group by:test.ta.a, funcs:firstrow(test.ta.a)->Column#5, funcs:firstrow(test.ta.a)->Column#5", " │ └─TableReader_20 8000.00 root data:HashAgg_16", " │ └─HashAgg_16 8000.00 cop[tikv] group by:test.ta.a, ", " │ └─TableFullScan_18 10000.00 cop[tikv] table:ta keep order:false, stats:pseudo", - " └─HashAgg_24 8000.00 root group by:test.tb.a, funcs:firstrow(test.tb.a)->Column#5", + " └─HashAgg_24 8000.00 root group by:test.tb.a, funcs:firstrow(test.tb.a)->Column#5, funcs:firstrow(test.tb.a)->Column#5", " └─TableReader_25 8000.00 root data:HashAgg_21", " └─HashAgg_21 8000.00 cop[tikv] group by:test.tb.a, ", " └─TableFullScan_23 10000.00 cop[tikv] table:tb keep order:false, stats:pseudo" @@ -1683,14 +1683,14 @@ { "SQL": "select /*+ HASH_AGG(), AGG_TO_COP() */ sum(distinct b) from pt;", "Plan": [ - "HashAgg_11 1.00 root funcs:sum(distinct Column#7)->Column#4", - "└─Projection_23 16000.00 root cast(test.pt.b, decimal(65,0) BINARY)->Column#7", + "HashAgg_11 1.00 root funcs:sum(distinct Column#9)->Column#4", + "└─Projection_23 16000.00 root cast(test.pt.b, decimal(65,0) BINARY)->Column#9", " └─Union_12 16000.00 root ", - " ├─HashAgg_16 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b", + " ├─HashAgg_16 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b", " │ └─TableReader_17 8000.00 root data:HashAgg_13", " │ └─HashAgg_13 8000.00 cop[tikv] group by:test.pt.b, ", " │ └─TableFullScan_15 10000.00 cop[tikv] table:pt, partition:p0 keep order:false, stats:pseudo", - " └─HashAgg_21 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b", + " └─HashAgg_21 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b", " └─TableReader_22 8000.00 root data:HashAgg_18", " └─HashAgg_18 8000.00 cop[tikv] group by:test.pt.b, ", " └─TableFullScan_20 10000.00 cop[tikv] table:pt, partition:p1 keep order:false, stats:pseudo" @@ -1704,11 +1704,11 @@ "Plan": [ "HashAgg_14 1.00 root funcs:count(distinct Column#5)->Column#6", "└─Union_15 16000.00 root ", - " ├─HashAgg_19 8000.00 root group by:test.ta.a, funcs:firstrow(test.ta.a)->Column#5", + " ├─HashAgg_19 8000.00 root group by:test.ta.a, funcs:firstrow(test.ta.a)->Column#5, funcs:firstrow(test.ta.a)->Column#5", " │ └─TableReader_20 8000.00 root data:HashAgg_16", " │ └─HashAgg_16 8000.00 cop[tikv] group by:test.ta.a, ", " │ └─TableFullScan_18 10000.00 cop[tikv] table:ta keep order:false, stats:pseudo", - " └─HashAgg_24 8000.00 root group by:test.tb.a, funcs:firstrow(test.tb.a)->Column#5", + " └─HashAgg_24 8000.00 root group by:test.tb.a, funcs:firstrow(test.tb.a)->Column#5, funcs:firstrow(test.tb.a)->Column#5", " └─TableReader_25 8000.00 root data:HashAgg_21", " └─HashAgg_21 8000.00 cop[tikv] group by:test.tb.a, ", " └─TableFullScan_23 10000.00 cop[tikv] table:tb keep order:false, stats:pseudo" diff --git a/planner/core/testdata/plan_suite_unexported_out.json b/planner/core/testdata/plan_suite_unexported_out.json index 9a14c05a03314..442bb9a982305 100644 --- a/planner/core/testdata/plan_suite_unexported_out.json +++ b/planner/core/testdata/plan_suite_unexported_out.json @@ -16,7 +16,7 @@ "Join{DataScan(a)->Aggr(sum(test.t.a),firstrow(test.t.c))->DataScan(b)}(test.t.c,test.t.c)->Aggr(sum(Column#26))->Projection", "Join{DataScan(a)->Aggr(sum(test.t.a),firstrow(test.t.c))->DataScan(b)}(test.t.c,test.t.c)->Aggr(sum(Column#26))->Projection", "DataScan(t)->Aggr(sum(test.t.a))->Projection", - "UnionAll{DataScan(a)->Projection->Aggr(sum(test.t.c))->DataScan(b)->Projection->Aggr(sum(test.t.a))->DataScan(c)->Projection->Aggr(sum(test.t.b))}->Aggr(sum(Column#40))->Projection", + "UnionAll{DataScan(a)->Projection->Aggr(sum(test.t.c),firstrow(test.t.d))->DataScan(b)->Projection->Aggr(sum(test.t.a),firstrow(test.t.b))->DataScan(c)->Projection->Aggr(sum(test.t.b),firstrow(test.t.e))}->Aggr(sum(Column#40))->Projection", "Join{DataScan(a)->DataScan(b)->Aggr(max(test.t.b),firstrow(test.t.c))}(test.t.c,test.t.c)->Projection->Projection", "Join{DataScan(a)->DataScan(b)}(test.t.a,test.t.a)->Aggr(max(test.t.b),max(test.t.b))->Projection", "UnionAll{DataScan(a)->Projection->Projection->Projection->DataScan(b)->Projection->Projection->Projection}->Aggr(max(Column#38))->Projection", @@ -24,7 +24,7 @@ "Join{DataScan(t1)->DataScan(t2)}(test.t.a,test.t.a)->Projection->Projection", "UnionAll{DataScan(t1)->Projection->Aggr(count(test.t.a),sum(test.t.a))->DataScan(t2)->Projection->Aggr(count(test.t.a),sum(test.t.a))}->Aggr(avg(Column#38, Column#39))->Projection", "UnionAll{DataScan(t1)->Projection->Projection->Projection->DataScan(t2)->Projection->Projection->Projection}->Aggr(count(distinct Column#25))->Projection", - "UnionAll{DataScan(t1)->Projection->Aggr(firstrow(test.t.b))->DataScan(t2)->Projection->Aggr(firstrow(test.t.b))}->Aggr(count(distinct Column#26))->Projection" + "UnionAll{DataScan(t1)->Projection->Aggr(firstrow(test.t.b),firstrow(test.t.b))->DataScan(t2)->Projection->Aggr(firstrow(test.t.b),firstrow(test.t.b))}->Aggr(count(distinct Column#26))->Projection" ] }, {