diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 57aaad642af17..e7b79118fbca4 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1599,7 +1599,9 @@ func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalPrope Enforced: true, Items: property.ItemsFromCols(la.groupByCols, desc), } - + if !prop.IsPrefix(childProp) { + return enforcedAggs + } taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType} if la.HasDistinct() { // TODO: remove AllowDistinctAggPushDown after the cost estimation of distinct pushdown is implemented. diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index a746f960e6783..2132a7c099fad 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -952,3 +952,29 @@ func (s *testIntegrationSerialSuite) TestIssue16837(c *C) { tk.MustExec("insert into t values (2, 1, 1, 1, 2)") tk.MustQuery("select /*+ use_index_merge(t,c,idx_ab) */ * from t where a = 1 or (e = 1 and c = 1)").Check(testkit.Rows()) } + +func (s *testIntegrationSuite) TestStreamAggProp(c *C) { + tk := testkit.NewTestKit(c, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + tk.MustExec("insert into t values(1),(1),(2)") + + var input []string + var output []struct { + SQL string + Plan []string + Res []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + tt).Rows()) + output[i].Res = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + tk.MustQuery("explain " + tt).Check(testkit.Rows(output[i].Plan...)) + tk.MustQuery(tt).Check(testkit.Rows(output[i].Res...)) + } +} diff --git a/planner/core/testdata/integration_suite_in.json b/planner/core/testdata/integration_suite_in.json index 360b6bb3d9ad2..3c691d4ee1fd8 100644 --- a/planner/core/testdata/integration_suite_in.json +++ b/planner/core/testdata/integration_suite_in.json @@ -127,5 +127,14 @@ "explain select * from t where a in (14, floor(3.47))", "explain select * from t where b in (3, 4)" ] + }, + { + "name": "TestStreamAggProp", + "cases": [ + "select /*+ stream_agg() */ count(*) c from t group by a order by c limit 1", + "select /*+ stream_agg() */ count(*) c from t group by a order by c", + "select /*+ stream_agg() */ count(*) c from t group by a order by a limit 1", + "select /*+ stream_agg() */ count(*) c from t group by a order by a" + ] } ] diff --git a/planner/core/testdata/integration_suite_out.json b/planner/core/testdata/integration_suite_out.json index b735e3f630860..accbd5f81d340 100644 --- a/planner/core/testdata/integration_suite_out.json +++ b/planner/core/testdata/integration_suite_out.json @@ -582,5 +582,65 @@ ] } ] + }, + { + "Name": "TestStreamAggProp", + "Cases": [ + { + "SQL": "select /*+ stream_agg() */ count(*) c from t group by a order by c limit 1", + "Plan": [ + "TopN_10 1.00 root Column#3:asc, offset:0, count:1", + "└─StreamAgg_17 8000.00 root group by:test.t.a, funcs:count(1)->Column#3", + " └─Sort_22 10000.00 root test.t.a:asc", + " └─TableReader_21 10000.00 root data:TableFullScan_20", + " └─TableFullScan_20 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Res": [ + "1" + ] + }, + { + "SQL": "select /*+ stream_agg() */ count(*) c from t group by a order by c", + "Plan": [ + "Sort_5 8000.00 root Column#3:asc", + "└─StreamAgg_11 8000.00 root group by:test.t.a, funcs:count(1)->Column#3", + " └─Sort_16 10000.00 root test.t.a:asc", + " └─TableReader_15 10000.00 root data:TableFullScan_14", + " └─TableFullScan_14 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Res": [ + "1", + "2" + ] + }, + { + "SQL": "select /*+ stream_agg() */ count(*) c from t group by a order by a limit 1", + "Plan": [ + "Projection_8 1.00 root Column#3", + "└─Limit_14 1.00 root offset:0, count:1", + " └─StreamAgg_27 1.00 root group by:test.t.a, funcs:count(1)->Column#3, funcs:firstrow(test.t.a)->test.t.a", + " └─Sort_32 1.25 root test.t.a:asc", + " └─TableReader_31 1.25 root data:TableFullScan_30", + " └─TableFullScan_30 1.25 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Res": [ + "2" + ] + }, + { + "SQL": "select /*+ stream_agg() */ count(*) c from t group by a order by a", + "Plan": [ + "Projection_6 8000.00 root Column#3", + "└─StreamAgg_21 8000.00 root group by:test.t.a, funcs:count(1)->Column#3, funcs:firstrow(test.t.a)->test.t.a", + " └─Sort_17 10000.00 root test.t.a:asc", + " └─TableReader_16 10000.00 root data:TableFullScan_15", + " └─TableFullScan_15 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Res": [ + "2", + "1" + ] + } + ] } ]