Skip to content

Commit

Permalink
planner: check required order property for enforced stream aggregation (
Browse files Browse the repository at this point in the history
  • Loading branch information
eurekaka committed May 22, 2020
1 parent d514a17 commit 2aa891b
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 1 deletion.
4 changes: 3 additions & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1599,7 +1599,9 @@ func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalPrope
Enforced: true,
Items: property.ItemsFromCols(la.groupByCols, desc),
}

if !prop.IsPrefix(childProp) {
return enforcedAggs
}
taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType}
if la.HasDistinct() {
// TODO: remove AllowDistinctAggPushDown after the cost estimation of distinct pushdown is implemented.
Expand Down
26 changes: 26 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,3 +952,29 @@ func (s *testIntegrationSerialSuite) TestIssue16837(c *C) {
tk.MustExec("insert into t values (2, 1, 1, 1, 2)")
tk.MustQuery("select /*+ use_index_merge(t,c,idx_ab) */ * from t where a = 1 or (e = 1 and c = 1)").Check(testkit.Rows())
}

func (s *testIntegrationSuite) TestStreamAggProp(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
tk.MustExec("insert into t values(1),(1),(2)")

var input []string
var output []struct {
SQL string
Plan []string
Res []string
}
s.testData.GetTestCases(c, &input, &output)
for i, tt := range input {
s.testData.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + tt).Rows())
output[i].Res = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
})
tk.MustQuery("explain " + tt).Check(testkit.Rows(output[i].Plan...))
tk.MustQuery(tt).Check(testkit.Rows(output[i].Res...))
}
}
9 changes: 9 additions & 0 deletions planner/core/testdata/integration_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,14 @@
"explain select * from t where a in (14, floor(3.47))",
"explain select * from t where b in (3, 4)"
]
},
{
"name": "TestStreamAggProp",
"cases": [
"select /*+ stream_agg() */ count(*) c from t group by a order by c limit 1",
"select /*+ stream_agg() */ count(*) c from t group by a order by c",
"select /*+ stream_agg() */ count(*) c from t group by a order by a limit 1",
"select /*+ stream_agg() */ count(*) c from t group by a order by a"
]
}
]
60 changes: 60 additions & 0 deletions planner/core/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -582,5 +582,65 @@
]
}
]
},
{
"Name": "TestStreamAggProp",
"Cases": [
{
"SQL": "select /*+ stream_agg() */ count(*) c from t group by a order by c limit 1",
"Plan": [
"TopN_10 1.00 root Column#3:asc, offset:0, count:1",
"└─StreamAgg_17 8000.00 root group by:test.t.a, funcs:count(1)->Column#3",
" └─Sort_22 10000.00 root test.t.a:asc",
" └─TableReader_21 10000.00 root data:TableFullScan_20",
" └─TableFullScan_20 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
],
"Res": [
"1"
]
},
{
"SQL": "select /*+ stream_agg() */ count(*) c from t group by a order by c",
"Plan": [
"Sort_5 8000.00 root Column#3:asc",
"└─StreamAgg_11 8000.00 root group by:test.t.a, funcs:count(1)->Column#3",
" └─Sort_16 10000.00 root test.t.a:asc",
" └─TableReader_15 10000.00 root data:TableFullScan_14",
" └─TableFullScan_14 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
],
"Res": [
"1",
"2"
]
},
{
"SQL": "select /*+ stream_agg() */ count(*) c from t group by a order by a limit 1",
"Plan": [
"Projection_8 1.00 root Column#3",
"└─Limit_14 1.00 root offset:0, count:1",
" └─StreamAgg_27 1.00 root group by:test.t.a, funcs:count(1)->Column#3, funcs:firstrow(test.t.a)->test.t.a",
" └─Sort_32 1.25 root test.t.a:asc",
" └─TableReader_31 1.25 root data:TableFullScan_30",
" └─TableFullScan_30 1.25 cop[tikv] table:t keep order:false, stats:pseudo"
],
"Res": [
"2"
]
},
{
"SQL": "select /*+ stream_agg() */ count(*) c from t group by a order by a",
"Plan": [
"Projection_6 8000.00 root Column#3",
"└─StreamAgg_21 8000.00 root group by:test.t.a, funcs:count(1)->Column#3, funcs:firstrow(test.t.a)->test.t.a",
" └─Sort_17 10000.00 root test.t.a:asc",
" └─TableReader_16 10000.00 root data:TableFullScan_15",
" └─TableFullScan_15 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
],
"Res": [
"2",
"1"
]
}
]
}
]

0 comments on commit 2aa891b

Please sign in to comment.