Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner/cascades: add transformation rule PushTopNDownProjection #13855

Merged
merged 9 commits into from
Dec 9, 2019
14 changes: 7 additions & 7 deletions planner/cascades/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -412,13 +412,13 @@
{
"SQL": "select a = (select a from t2 where t1.b = t2.b order by a limit 1) from t1",
"Plan": [
"Projection_16 10000.00 root eq(test.t1.a, test.t2.a)->Column#5",
"└─Apply_18 10000.00 root CARTESIAN left outer join, inner:MaxOneRow_21",
" ├─TableReader_19 10000.00 root data:TableScan_20",
" │ └─TableScan_20 10000.00 cop[tikv] table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
" └─MaxOneRow_21 1.00 root ",
" └─Limit_23 1.00 root offset:0, count:1",
" └─Projection_28 1.00 root test.t2.a",
"Projection_17 10000.00 root eq(test.t1.a, test.t2.a)->Column#5",
"└─Apply_19 10000.00 root CARTESIAN left outer join, inner:MaxOneRow_22",
" ├─TableReader_20 10000.00 root data:TableScan_21",
" │ └─TableScan_21 10000.00 cop[tikv] table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
" └─MaxOneRow_22 1.00 root ",
" └─Projection_23 1.00 root test.t2.a",
" └─Limit_25 1.00 root offset:0, count:1",
" └─TableReader_29 1.00 root data:Selection_30",
" └─Selection_30 1.00 cop[tikv] eq(test.t1.b, test.t2.b)",
" └─TableScan_31 1.00 cop[tikv] table:t2, range:[-inf,+inf], keep order:true, stats:pseudo"
Expand Down
7 changes: 6 additions & 1 deletion planner/cascades/testdata/transformation_rules_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
"name": "TestTopNRules",
"cases": [
"select b from t order by a limit 2",
"select a+b from t order by a limit 1 offset 2"
"select a+b from t order by a limit 1 offset 2",
"select c from t order by t.a limit 1",
"select c from t order by t.a + t.b limit 1",
"select a, b, c from t t1 where t1.a in (select t2.a as a from t t2 where t2.b > t1.b order by t1.b limit 1)",
"select a, b, c from t t1 where t1.a in (select a from (select t2.a as a, t1.b as b from t t2 where t2.b > t1.b) x order by b limit 1)",
"select a, b from (select @i as a, @i := @i+1 as b from t) t order by a desc limit 1"
]
},
{
Expand Down
128 changes: 122 additions & 6 deletions planner/cascades/testdata/transformation_rules_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@
"Group#0 Schema:[test.t.b]",
" Projection_5 input:[Group#1], test.t.b",
"Group#1 Schema:[test.t.b,test.t.a]",
" TopN_8 input:[Group#2], test.t.a:asc, offset:0, count:2",
"Group#2 Schema:[test.t.b,test.t.a]",
" Projection_2 input:[Group#3], test.t.b, test.t.a",
" Projection_2 input:[Group#2], test.t.b, test.t.a",
"Group#2 Schema:[test.t.a,test.t.b]",
" TopN_9 input:[Group#3], test.t.a:asc, offset:0, count:2",
"Group#3 Schema:[test.t.a,test.t.b]",
" TiKVSingleGather_7 input:[Group#4], table:t",
"Group#4 Schema:[test.t.a,test.t.b]",
Expand All @@ -219,14 +219,130 @@
"Group#0 Schema:[Column#14]",
" Projection_5 input:[Group#1], Column#13",
"Group#1 Schema:[Column#13,test.t.a]",
" TopN_8 input:[Group#2], test.t.a:asc, offset:2, count:1",
"Group#2 Schema:[Column#13,test.t.a]",
" Projection_2 input:[Group#3], plus(test.t.a, test.t.b)->Column#13, test.t.a",
" Projection_2 input:[Group#2], plus(test.t.a, test.t.b)->Column#13, test.t.a",
"Group#2 Schema:[test.t.a,test.t.b]",
" TopN_9 input:[Group#3], test.t.a:asc, offset:2, count:1",
"Group#3 Schema:[test.t.a,test.t.b]",
" TiKVSingleGather_7 input:[Group#4], table:t",
"Group#4 Schema:[test.t.a,test.t.b]",
" TableScan_6 table:t, pk col:test.t.a"
]
},
{
"SQL": "select c from t order by t.a limit 1",
"Result": [
"Group#0 Schema:[test.t.c]",
" Projection_5 input:[Group#1], test.t.c",
"Group#1 Schema:[test.t.c,test.t.a]",
" Projection_2 input:[Group#2], test.t.c, test.t.a",
"Group#2 Schema:[test.t.a,test.t.c]",
" TopN_11 input:[Group#3], test.t.a:asc, offset:0, count:1",
"Group#3 Schema:[test.t.a,test.t.c]",
" TiKVSingleGather_7 input:[Group#4], table:t",
" TiKVSingleGather_9 input:[Group#5], table:t, index:c_d_e",
"Group#4 Schema:[test.t.a,test.t.c]",
" TableScan_6 table:t, pk col:test.t.a",
"Group#5 Schema:[test.t.a,test.t.c]",
" IndexScan_8 table:t, index:c, d, e"
]
},
{
"SQL": "select c from t order by t.a + t.b limit 1",
"Result": [
"Group#0 Schema:[test.t.c]",
" Projection_5 input:[Group#1], test.t.c",
"Group#1 Schema:[test.t.c,test.t.a,test.t.b]",
" Projection_2 input:[Group#2], test.t.c, test.t.a, test.t.b",
"Group#2 Schema:[test.t.a,test.t.b,test.t.c]",
" TopN_9 input:[Group#3], plus(test.t.a, test.t.b):asc, offset:0, count:1",
"Group#3 Schema:[test.t.a,test.t.b,test.t.c]",
" TiKVSingleGather_7 input:[Group#4], table:t",
"Group#4 Schema:[test.t.a,test.t.b,test.t.c]",
" TableScan_6 table:t, pk col:test.t.a"
]
},
{
"SQL": "select a, b, c from t t1 where t1.a in (select t2.a as a from t t2 where t2.b > t1.b order by t1.b limit 1)",
"Result": [
"Group#0 Schema:[test.t.a,test.t.b,test.t.c]",
" Projection_9 input:[Group#1], test.t.a, test.t.b, test.t.c",
"Group#1 Schema:[test.t.a,test.t.b,test.t.c]",
" Apply_8 input:[Group#2,Group#3], semi join, equal:[eq(test.t.a, test.t.a)]",
"Group#2 Schema:[test.t.a,test.t.b,test.t.c]",
" TiKVSingleGather_11 input:[Group#4], table:t1",
"Group#4 Schema:[test.t.a,test.t.b,test.t.c]",
" TableScan_10 table:t1, pk col:test.t.a",
"Group#3 Schema:[test.t.a]",
" Projection_5 input:[Group#5], test.t.a",
"Group#5 Schema:[test.t.a,test.t.b]",
" TopN_15 input:[Group#6], , offset:0, count:1",
hsqlu marked this conversation as resolved.
Show resolved Hide resolved
"Group#6 Schema:[test.t.a,test.t.b]",
" Selection_4 input:[Group#7], gt(test.t.b, test.t.b)",
"Group#7 Schema:[test.t.a,test.t.b]",
" TiKVSingleGather_13 input:[Group#8], table:t2",
"Group#8 Schema:[test.t.a,test.t.b]",
" TableScan_12 table:t2, pk col:test.t.a"
]
},
{
"SQL": "select a, b, c from t t1 where t1.a in (select a from (select t2.a as a, t1.b as b from t t2 where t2.b > t1.b) x order by b limit 1)",
"Result": [
"Group#0 Schema:[test.t.a,test.t.b,test.t.c]",
" Projection_11 input:[Group#1], test.t.a, test.t.b, test.t.c",
"Group#1 Schema:[test.t.a,test.t.b,test.t.c]",
" Apply_10 input:[Group#2,Group#3], semi join, equal:[eq(test.t.a, test.t.a)]",
"Group#2 Schema:[test.t.a,test.t.b,test.t.c]",
" TiKVSingleGather_13 input:[Group#4], table:t1",
"Group#4 Schema:[test.t.a,test.t.b,test.t.c]",
" TableScan_12 table:t1, pk col:test.t.a",
"Group#3 Schema:[test.t.a]",
" Projection_9 input:[Group#5], test.t.a",
"Group#5 Schema:[test.t.a,Column#25]",
" Projection_6 input:[Group#6], test.t.a, Column#25",
"Group#6 Schema:[test.t.a,Column#25]",
" Projection_5 input:[Group#7], test.t.a, test.t.b",
"Group#7 Schema:[test.t.a,test.t.b]",
" TopN_18 input:[Group#8], test.t.b:asc, offset:0, count:1",
"Group#8 Schema:[test.t.a,test.t.b]",
" Selection_4 input:[Group#9], gt(test.t.b, test.t.b)",
"Group#9 Schema:[test.t.a,test.t.b]",
" TiKVSingleGather_15 input:[Group#10], table:t2",
"Group#10 Schema:[test.t.a,test.t.b]",
" TableScan_14 table:t2, pk col:test.t.a"
]
},
{
"SQL": "select a, b from (select @i as a, @i := @i+1 as b from t) t order by a desc limit 1",
"Result": [
"Group#0 Schema:[Column#13,Column#14]",
" Projection_3 input:[Group#1], Column#13, Column#14",
"Group#1 Schema:[Column#13,Column#14]",
" TopN_21 input:[Group#2], Column#13:desc, offset:0, count:1",
"Group#2 Schema:[Column#13,Column#14]",
" Projection_2 input:[Group#3], getvar(i)->Column#13, setvar(i, cast(plus(cast(getvar(i)), 1)))->Column#14",
"Group#3 Schema:[test.t.a]",
" TiKVSingleGather_7 input:[Group#4], table:t",
" TiKVSingleGather_19 input:[Group#5], table:t, index:e_d_c_str_prefix",
" TiKVSingleGather_17 input:[Group#6], table:t, index:c_d_e_str",
" TiKVSingleGather_15 input:[Group#7], table:t, index:f_g",
" TiKVSingleGather_13 input:[Group#8], table:t, index:g",
" TiKVSingleGather_11 input:[Group#9], table:t, index:f",
" TiKVSingleGather_9 input:[Group#10], table:t, index:c_d_e",
"Group#4 Schema:[test.t.a]",
" TableScan_6 table:t, pk col:test.t.a",
"Group#5 Schema:[test.t.a]",
" IndexScan_18 table:t, index:e_str, d_str, c_str",
"Group#6 Schema:[test.t.a]",
" IndexScan_16 table:t, index:c_str, d_str, e_str",
"Group#7 Schema:[test.t.a]",
" IndexScan_14 table:t, index:f, g",
"Group#8 Schema:[test.t.a]",
" IndexScan_12 table:t, index:g",
"Group#9 Schema:[test.t.a]",
" IndexScan_10 table:t, index:f",
"Group#10 Schema:[test.t.a]",
" IndexScan_8 table:t, index:c, d, e"
]
}
]
},
Expand Down
66 changes: 66 additions & 0 deletions planner/cascades/transformation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ var defaultTransformationMap = map[memo.Operand][]Transformation{
NewRuleEliminateProjection(),
NewRuleMergeAdjacentProjection(),
},
memo.OperandTopN: {
NewRulePushTopNDownProjection(),
},
}

type baseRule struct {
Expand Down Expand Up @@ -683,3 +686,66 @@ func (r *MergeAdjacentProjection) OnTransform(old *memo.ExprIter) (newExprs []*m
newProjExpr.SetChildren(old.Children[0].GetExpr().Children[0])
return []*memo.GroupExpr{newProjExpr}, true, false, nil
}

// PushTopNDownProjection pushes TopN to Projection.
type PushTopNDownProjection struct {
baseRule
}

// NewRulePushTopNDownProjection creates a new Transformation PushTopNDownProjection.
// The pattern of this rule is `TopN->Projection->X` to `Projection->TopN->X`.
func NewRulePushTopNDownProjection() Transformation {
rule := &PushTopNDownProjection{}
rule.pattern = memo.BuildPattern(
memo.OperandTopN,
memo.EngineTiDBOnly,
memo.NewPattern(memo.OperandProjection, memo.EngineTiDBOnly),
)
return rule
}

// Match implements Transformation interface.
func (r *PushTopNDownProjection) Match(expr *memo.ExprIter) bool {
proj := expr.Children[0].GetExpr().ExprNode.(*plannercore.LogicalProjection)
for _, expr := range proj.Exprs {
if expression.HasAssignSetVarFunc(expr) {
hsqlu marked this conversation as resolved.
Show resolved Hide resolved
return false
}
}
return true
}

// OnTransform implements Transformation interface.
// This rule tries to pushes the TopN through Projection.
func (r *PushTopNDownProjection) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) {
topN := old.GetExpr().ExprNode.(*plannercore.LogicalTopN)
proj := old.Children[0].GetExpr().ExprNode.(*plannercore.LogicalProjection)
childGroup := old.Children[0].GetExpr().Children[0]

newTopN := plannercore.LogicalTopN{
Offset: topN.Offset,
Count: topN.Count,
}.Init(topN.SCtx(), topN.SelectBlockOffset())

newTopN.ByItems = make([]*plannercore.ByItems, 0, len(topN.ByItems))
for _, by := range topN.ByItems {
newTopN.ByItems = append(newTopN.ByItems, &plannercore.ByItems{
Expr: expression.ColumnSubstitute(by.Expr, old.Children[0].Group.Prop.Schema, proj.Exprs),
hsqlu marked this conversation as resolved.
Show resolved Hide resolved
Desc: by.Desc,
})
}

// remove meaningless constant sort items.
for i := len(newTopN.ByItems) - 1; i >= 0; i-- {
switch newTopN.ByItems[i].Expr.(type) {
case *expression.Constant, *expression.CorrelatedColumn:
hsqlu marked this conversation as resolved.
Show resolved Hide resolved
topN.ByItems = append(newTopN.ByItems[:i], newTopN.ByItems[i+1:]...)
}
}
projExpr := memo.NewGroupExpr(proj)
topNExpr := memo.NewGroupExpr(newTopN)
topNExpr.SetChildren(childGroup)
topNGroup := memo.NewGroupWithSchema(topNExpr, childGroup.Prop.Schema)
projExpr.SetChildren(topNGroup)
return []*memo.GroupExpr{projExpr}, true, false, nil
}
3 changes: 3 additions & 0 deletions planner/cascades/transformation_rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ func (s *testTransformationRuleSuite) TestTopNRules(c *C) {
memo.OperandDataSource: {
NewRuleEnumeratePaths(),
},
memo.OperandTopN: {
NewRulePushTopNDownProjection(),
},
})
var input []string
var output []struct {
Expand Down
5 changes: 5 additions & 0 deletions planner/core/rule_topn_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ func (p *LogicalUnionAll) pushDownTopN(topN *LogicalTopN) LogicalPlan {
}

func (p *LogicalProjection) pushDownTopN(topN *LogicalTopN) LogicalPlan {
for _, expr := range p.Exprs {
if expression.HasAssignSetVarFunc(expr) {
return p.baseLogicalPlan.pushDownTopN(topN)
}
}
if topN != nil {
for _, by := range topN.ByItems {
by.Expr = expression.ColumnSubstitute(by.Expr, p.schema, p.Exprs)
Expand Down
2 changes: 2 additions & 0 deletions planner/core/testdata/plan_suite_unexported_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@
"select * from t, t s order by t.a limit 5",
// Test Limit + Join + Proj.
"select * from t, t s limit 5",
// Test Limit + Proj
"select a, b from (select @i as a, @i := @i+1 as b from t) t order by a desc limit 1",
// Test TopN + Left Join + Proj.
"select * from t left outer join t s on t.a = s.a order by t.a limit 5",
// Test TopN + Left Join + Proj.
Expand Down
1 change: 1 addition & 0 deletions planner/core/testdata/plan_suite_unexported_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
"DataScan(t)->Aggr(count(test.t.b),firstrow(test.t.a),firstrow(test.t.c))->TopN([test.t.c],0,5)->Projection",
"Join{DataScan(t)->DataScan(s)}->TopN([test.t.a],0,5)->Projection",
"Join{DataScan(t)->DataScan(s)}->Limit->Projection",
"DataScan(t)->Projection->TopN([Column#13 true],0,1)->Projection",
"Join{DataScan(t)->TopN([test.t.a],0,5)->DataScan(s)}(test.t.a,test.t.a)->TopN([test.t.a],0,5)->Projection",
"Join{DataScan(t)->TopN([test.t.a],0,10)->DataScan(s)}(test.t.a,test.t.a)->TopN([test.t.a],5,5)->Projection",
"Join{DataScan(t)->Limit->DataScan(s)}(test.t.a,test.t.a)->Limit->Projection",
Expand Down