Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner/cascades: support outer join implementation and push down selection for outer join #13996

Merged
merged 20 commits into from
Dec 23, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions planner/cascades/implementation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,11 @@ type ImplHashJoinBuildLeft struct {

// Match implements ImplementationRule Match interface.
func (r *ImplHashJoinBuildLeft) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
join := expr.ExprNode.(*plannercore.LogicalJoin)
switch join.JoinType {
case plannercore.SemiJoin, plannercore.AntiSemiJoin, plannercore.LeftOuterSemiJoin, plannercore.AntiLeftOuterSemiJoin:
return false
default:
switch expr.ExprNode.(*plannercore.LogicalJoin).JoinType {
case plannercore.InnerJoin, plannercore.LeftOuterJoin, plannercore.RightOuterJoin:
return prop.IsEmpty()
default:
return false
}
}

Expand All @@ -421,8 +420,11 @@ func (r *ImplHashJoinBuildLeft) OnImplement(expr *memo.GroupExpr, reqProp *prope
switch join.JoinType {
case plannercore.InnerJoin:
return getImplForHashJoin(expr, reqProp, 0, false), nil
case plannercore.LeftOuterJoin:
return getImplForHashJoin(expr, reqProp, 1, true), nil
case plannercore.RightOuterJoin:
return getImplForHashJoin(expr, reqProp, 0, false), nil
default:
// TODO: deal with other join type.
return nil, nil
}
}
Expand All @@ -440,12 +442,17 @@ func (r *ImplHashJoinBuildRight) Match(expr *memo.GroupExpr, prop *property.Phys
func (r *ImplHashJoinBuildRight) OnImplement(expr *memo.GroupExpr, reqProp *property.PhysicalProperty) (memo.Implementation, error) {
join := expr.ExprNode.(*plannercore.LogicalJoin)
switch join.JoinType {
case plannercore.SemiJoin, plannercore.AntiSemiJoin,
plannercore.LeftOuterSemiJoin, plannercore.AntiLeftOuterSemiJoin:
return getImplForHashJoin(expr, reqProp, 1, false), nil
case plannercore.InnerJoin:
return getImplForHashJoin(expr, reqProp, 1, false), nil
default:
// TODO: deal with other join type.
return nil, nil
case plannercore.LeftOuterJoin:
return getImplForHashJoin(expr, reqProp, 1, false), nil
case plannercore.RightOuterJoin:
return getImplForHashJoin(expr, reqProp, 0, true), nil
}
return nil, nil
}

// ImplUnionAll implements LogicalUnionAll to PhysicalUnionAll.
Expand Down
2 changes: 1 addition & 1 deletion planner/cascades/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (s *testIntegrationSuite) TestJoin(c *C) {
tk.MustExec("create table t1(a int primary key, b int)")
tk.MustExec("create table t2(a int primary key, b int)")
tk.MustExec("insert into t1 values (1, 11), (4, 44), (2, 22), (3, 33)")
tk.MustExec("insert into t2 values (1, 111), (2, 222), (3, 333)")
tk.MustExec("insert into t2 values (1, 111), (2, 222), (3, 333), (5, 555)")
tk.MustExec("set session tidb_enable_cascades_planner = 1")
var input []string
var output []struct {
Expand Down
4 changes: 3 additions & 1 deletion planner/cascades/testdata/integration_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
"name": "TestJoin",
"cases": [
"select t1.a, t1.b from t1, t2 where t1.a = t2.a and t1.a > 2",
"select t1.a, t1.b from t1, t2 where t1.a > t2.a and t2.b > 200"
"select t1.a, t1.b from t1, t2 where t1.a > t2.a and t2.b > 200",
"select t1.a, t1.b from t1 left join t2 on t1.a = t2.a where t1.a > 2 and t2.b > 200",
"select t2.a, t2.b from t1 right join t2 on t1.a = t2.a where t1.a > 2 and t2.b > 200"
]
},
{
Expand Down
31 changes: 31 additions & 0 deletions planner/cascades/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,37 @@
"4 44",
"4 44"
]
},
{
"SQL": "select t1.a, t1.b from t1 left join t2 on t1.a = t2.a where t1.a > 2 and t2.b > 200",
"Plan": [
"Projection_17 3333.33 root test.t1.a, test.t1.b",
"└─Selection_18 3333.33 root gt(test.t2.b, 200)",
" └─HashLeftJoin_20 4166.67 root left outer join, inner:TableReader_23, equal:[eq(test.t1.a, test.t2.a)]",
" ├─TableReader_21 3333.33 root data:TableScan_22",
" │ └─TableScan_22 3333.33 cop[tikv] table:t1, range:(2,+inf], keep order:false, stats:pseudo",
" └─TableReader_23 3333.33 root data:TableScan_24",
" └─TableScan_24 3333.33 cop[tikv] table:t2, range:(2,+inf], keep order:false, stats:pseudo"
],
"Result": [
"3 33"
]
},
{
"SQL": "select t2.a, t2.b from t1 right join t2 on t1.a = t2.a where t1.a > 2 and t2.b > 200",
"Plan": [
"Projection_13 8000.00 root test.t2.a, test.t2.b",
"└─Selection_14 8000.00 root gt(test.t1.a, 2)",
" └─HashRightJoin_16 10000.00 root right outer join, inner:TableReader_19 (REVERSED), equal:[eq(test.t1.a, test.t2.a)]",
" ├─TableReader_17 10000.00 root data:TableScan_18",
" │ └─TableScan_18 10000.00 cop[tikv] table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
" └─TableReader_19 8000.00 root data:Selection_20",
" └─Selection_20 8000.00 cop[tikv] gt(test.t2.b, 200)",
" └─TableScan_21 10000.00 cop[tikv] table:t2, range:[-inf,+inf], keep order:false, stats:pseudo"
],
"Result": [
"3 333"
]
}
]
},
Expand Down
72 changes: 67 additions & 5 deletions planner/cascades/transformation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,16 +553,19 @@ func buildChildSelectionGroup(
// This rule tries to pushes the Selection through Join. Besides, this rule fulfills the `XXXConditions` field of Join.
func (r *PushSelDownJoin) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) {
sel := old.GetExpr().ExprNode.(*plannercore.LogicalSelection)
if sel.HasBeenPushed {
return nil, false, false, nil
}
joinExpr := old.Children[0].GetExpr()
// TODO: we need to create a new LogicalJoin here.
join := joinExpr.ExprNode.(*plannercore.LogicalJoin)
sctx := sel.SCtx()
leftGroup := old.Children[0].GetExpr().Children[0]
rightGroup := old.Children[0].GetExpr().Children[1]
var equalCond []*expression.ScalarFunction
var leftPushCond, rightPushCond, otherCond, leftCond, rightCond []expression.Expression
var leftPushCond, rightPushCond, otherCond, leftCond, rightCond, remainCond []expression.Expression
switch join.JoinType {
case plannercore.InnerJoin:
case plannercore.SemiJoin, plannercore.InnerJoin:
tempCond := make([]expression.Expression, 0,
len(join.LeftConditions)+len(join.RightConditions)+len(join.EqualConditions)+len(join.OtherConditions)+len(sel.Conditions))
tempCond = append(tempCond, join.LeftConditions...)
Expand All @@ -584,8 +587,59 @@ func (r *PushSelDownJoin) OnTransform(old *memo.ExprIter) (newExprs []*memo.Grou
join.OtherConditions = otherCond
leftCond = leftPushCond
rightCond = rightPushCond
case plannercore.LeftOuterJoin, plannercore.LeftOuterSemiJoin, plannercore.AntiLeftOuterSemiJoin,
plannercore.RightOuterJoin:
lenJoinConds := len(join.EqualConditions) + len(join.LeftConditions) + len(join.RightConditions) + len(join.OtherConditions)
joinConds := make([]expression.Expression, 0, lenJoinConds)
for _, equalCond := range join.EqualConditions {
joinConds = append(joinConds, equalCond)
}
joinConds = append(joinConds, join.LeftConditions...)
joinConds = append(joinConds, join.RightConditions...)
joinConds = append(joinConds, join.OtherConditions...)
join.EqualConditions = nil
join.LeftConditions = nil
join.RightConditions = nil
join.OtherConditions = nil
remainCond = make([]expression.Expression, len(sel.Conditions))
copy(remainCond, sel.Conditions)
nullSensitive := join.JoinType == plannercore.AntiLeftOuterSemiJoin || join.JoinType == plannercore.LeftOuterSemiJoin
if join.JoinType == plannercore.RightOuterJoin {
joinConds, remainCond = expression.PropConstOverOuterJoin(join.SCtx(), joinConds, remainCond, rightGroup.Prop.Schema, leftGroup.Prop.Schema, nullSensitive)
} else {
joinConds, remainCond = expression.PropConstOverOuterJoin(join.SCtx(), joinConds, remainCond, leftGroup.Prop.Schema, rightGroup.Prop.Schema, nullSensitive)
}
join.AttachOnConds(joinConds)
// Return table dual when filter is constant false or null.
dual := plannercore.Conds2TableDual(join, remainCond)
if dual != nil {
return []*memo.GroupExpr{memo.NewGroupExpr(dual)}, false, true, nil
}
if join.JoinType == plannercore.RightOuterJoin {
remainCond = expression.ExtractFiltersFromDNFs(join.SCtx(), remainCond)
// Only derive right where condition, because left where condition cannot be pushed down
equalCond, leftPushCond, rightPushCond, otherCond = join.ExtractOnCondition(remainCond, leftGroup.Prop.Schema, rightGroup.Prop.Schema, false, true)
rightCond = rightPushCond
// Handle join conditions, only derive left join condition, because right join condition cannot be pushed down
derivedLeftJoinCond, _ := plannercore.DeriveOtherConditions(join, true, false)
leftCond = append(join.LeftConditions, derivedLeftJoinCond...)
join.LeftConditions = nil
remainCond = append(expression.ScalarFuncs2Exprs(equalCond), otherCond...)
remainCond = append(remainCond, leftPushCond...)
} else {
remainCond = expression.ExtractFiltersFromDNFs(join.SCtx(), remainCond)
// Only derive left where condition, because right where condition cannot be pushed down
equalCond, leftPushCond, rightPushCond, otherCond = join.ExtractOnCondition(remainCond, leftGroup.Prop.Schema, rightGroup.Prop.Schema, true, false)
leftCond = leftPushCond
// Handle join conditions, only derive left join condition, because right join condition cannot be pushed down
_, derivedRightJoinCond := plannercore.DeriveOtherConditions(join, false, true)
rightCond = append(join.RightConditions, derivedRightJoinCond...)
join.RightConditions = nil
remainCond = append(expression.ScalarFuncs2Exprs(equalCond), otherCond...)
remainCond = append(remainCond, rightPushCond...)
}
default:
// TODO: Enhance this rule to deal with LeftOuter/RightOuter/Semi/SmiAnti/LeftOuterSemi/LeftOuterSemiAnti Joins.
// TODO: Enhance this rule to deal with Semi/SmiAnti Joins.
}
leftCond = expression.RemoveDupExprs(sctx, leftCond)
rightCond = expression.RemoveDupExprs(sctx, rightCond)
Expand All @@ -594,10 +648,18 @@ func (r *PushSelDownJoin) OnTransform(old *memo.ExprIter) (newExprs []*memo.Grou
join.RightJoinKeys = append(join.RightJoinKeys, eqCond.GetArgs()[1].(*expression.Column))
}
// TODO: Update EqualConditions like what we have done in the method join.updateEQCond() before.
leftGroup = buildChildSelectionGroup(sel, leftCond, joinExpr.Children[0])
rightGroup = buildChildSelectionGroup(sel, rightCond, joinExpr.Children[1])
leftGroup = buildChildSelectionGroup(sel, leftCond, leftGroup)
rightGroup = buildChildSelectionGroup(sel, rightCond, rightGroup)
newJoinExpr := memo.NewGroupExpr(join)
newJoinExpr.SetChildren(leftGroup, rightGroup)
if len(remainCond) > 0 {
francis0407 marked this conversation as resolved.
Show resolved Hide resolved
newSel := plannercore.LogicalSelection{Conditions: remainCond}.Init(sctx, sel.SelectBlockOffset())
newSel.Conditions = remainCond
newSel.HasBeenPushed = true
newSelExpr := memo.NewGroupExpr(newSel)
newSelExpr.SetChildren(memo.NewGroupWithSchema(newJoinExpr, old.Children[0].Prop.Schema))
return []*memo.GroupExpr{newSelExpr}, true, false, nil
}
return []*memo.GroupExpr{newJoinExpr}, true, false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion planner/core/expression_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ func (er *expressionRewriter) handleInSubquery(ctx context.Context, v *ast.Patte
join.names = make([]*types.FieldName, er.p.Schema().Len()+agg.Schema().Len())
copy(join.names, er.p.OutputNames())
copy(join.names[er.p.Schema().Len():], agg.OutputNames())
join.attachOnConds(expression.SplitCNFItems(checkCondition))
join.AttachOnConds(expression.SplitCNFItems(checkCondition))
// Set join hint for this join.
if er.b.TableHints() != nil {
join.setPreferredJoinType(er.b.TableHints())
Expand Down
4 changes: 2 additions & 2 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func (b *PlanBuilder) buildJoin(ctx context.Context, joinNode *ast.Join) (Logica
return nil, errors.New("ON condition doesn't support subqueries yet")
}
onCondition := expression.SplitCNFItems(onExpr)
joinPlan.attachOnConds(onCondition)
joinPlan.AttachOnConds(onCondition)
} else if joinPlan.JoinType == InnerJoin {
// If a inner join without "ON" or "USING" clause, it's a cartesian
// product over the join tables.
Expand Down Expand Up @@ -2857,7 +2857,7 @@ func (b *PlanBuilder) buildSemiJoin(outerPlan, innerPlan LogicalPlan, onConditio
onCondition[i] = expr.Decorrelate(outerPlan.Schema())
}
joinPlan.SetChildren(outerPlan, innerPlan)
joinPlan.attachOnConds(onCondition)
joinPlan.AttachOnConds(onCondition)
joinPlan.names = make([]*types.FieldName, outerPlan.Schema().Len(), outerPlan.Schema().Len()+innerPlan.Schema().Len()+1)
copy(joinPlan.names, outerPlan.OutputNames())
if asScalar {
Expand Down
8 changes: 7 additions & 1 deletion planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ func (p *LogicalJoin) columnSubstitute(schema *expression.Schema, exprs []expres
}
}

func (p *LogicalJoin) attachOnConds(onConds []expression.Expression) {
// AttachOnConds extracts on conditions for join and set the `EqualConditions`, `LeftConditions`, `RightConditions` and
// `OtherConditions` by the result of extract.
func (p *LogicalJoin) AttachOnConds(onConds []expression.Expression) {
eq, left, right, other := p.extractOnCondition(onConds, false, false)
p.EqualConditions = append(eq, p.EqualConditions...)
p.LeftConditions = append(left, p.LeftConditions...)
Expand Down Expand Up @@ -320,6 +322,10 @@ type LogicalSelection struct {
// but after we converted to CNF(Conjunctive normal form), it can be
// split into a list of AND conditions.
Conditions []expression.Expression
// HasBeenPushed means whether the selection is applied rule `PushSelDown`,
// it's used in cascades planner.
// TODO: we need a general method to check the circle transfer rule application.
HasBeenPushed bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a general solution to solve the recursive rule applying. There's also other cases that would cause endless loop.
You can add a TODO here first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be done by AppliedRuleSet. When we apply the rule PushSelDownJoin, we should add this rule to AppliedRuleSet of the new created Selection, so that the new Selection will not match this rule again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the AppliedRuleSet?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still working on it >.<

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK...Don't forget to remove this when you finish it. 0.0

}

func (p *LogicalSelection) extractCorrelatedCols() []*expression.CorrelatedColumn {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/rule_decorrelate.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s *decorrelateSolver) optimize(ctx context.Context, p LogicalPlan) (Logica
for _, cond := range sel.Conditions {
newConds = append(newConds, cond.Decorrelate(outerPlan.Schema()))
}
apply.attachOnConds(newConds)
apply.AttachOnConds(newConds)
innerPlan = sel.children[0]
apply.SetChildren(outerPlan, innerPlan)
return s.optimize(ctx, p)
Expand Down
12 changes: 6 additions & 6 deletions planner/core/rule_predicate_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (p *LogicalJoin) PredicatePushDown(predicates []expression.Expression) (ret
equalCond, leftPushCond, rightPushCond, otherCond = p.extractOnCondition(predicates, true, false)
leftCond = leftPushCond
// Handle join conditions, only derive right join condition, because left join condition cannot be pushed down
_, derivedRightJoinCond := deriveOtherConditions(p, false, true)
_, derivedRightJoinCond := DeriveOtherConditions(p, false, true)
rightCond = append(p.RightConditions, derivedRightJoinCond...)
p.RightConditions = nil
ret = append(expression.ScalarFuncs2Exprs(equalCond), otherCond...)
Expand All @@ -147,7 +147,7 @@ func (p *LogicalJoin) PredicatePushDown(predicates []expression.Expression) (ret
equalCond, leftPushCond, rightPushCond, otherCond = p.extractOnCondition(predicates, false, true)
rightCond = rightPushCond
// Handle join conditions, only derive left join condition, because right join condition cannot be pushed down
derivedLeftJoinCond, _ := deriveOtherConditions(p, true, false)
derivedLeftJoinCond, _ := DeriveOtherConditions(p, true, false)
leftCond = append(p.LeftConditions, derivedLeftJoinCond...)
p.LeftConditions = nil
ret = append(expression.ScalarFuncs2Exprs(equalCond), otherCond...)
Expand Down Expand Up @@ -443,9 +443,9 @@ func (p *LogicalMaxOneRow) PredicatePushDown(predicates []expression.Expression)
return predicates, p
}

// deriveOtherConditions given a LogicalJoin, check the OtherConditions to see if we can derive more
// DeriveOtherConditions given a LogicalJoin, check the OtherConditions to see if we can derive more
// conditions for left/right child pushdown.
func deriveOtherConditions(p *LogicalJoin, deriveLeft bool, deriveRight bool) (leftCond []expression.Expression,
func DeriveOtherConditions(p *LogicalJoin, deriveLeft bool, deriveRight bool) (leftCond []expression.Expression,
rightCond []expression.Expression) {
leftPlan, rightPlan := p.children[0], p.children[1]
isOuterSemi := (p.JoinType == LeftOuterSemiJoin) || (p.JoinType == AntiLeftOuterSemiJoin)
Expand Down Expand Up @@ -545,9 +545,9 @@ func (p *LogicalJoin) outerJoinPropConst(predicates []expression.Expression) []e
p.LeftConditions = nil
p.RightConditions = nil
p.OtherConditions = nil
nullSensitive := (p.JoinType == AntiLeftOuterSemiJoin || p.JoinType == LeftOuterSemiJoin)
nullSensitive := p.JoinType == AntiLeftOuterSemiJoin || p.JoinType == LeftOuterSemiJoin
joinConds, predicates = expression.PropConstOverOuterJoin(p.ctx, joinConds, predicates, outerTable.Schema(), innerTable.Schema(), nullSensitive)
p.attachOnConds(joinConds)
p.AttachOnConds(joinConds)
return predicates
}

Expand Down