Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vtgate planner: HAVING in the new operator horizon planner #13289

Merged
merged 9 commits into from
Jun 14, 2023
44 changes: 38 additions & 6 deletions go/vt/vtgate/planbuilder/operators/aggregation_pushing.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,31 @@ func tryPushingDownAggregator(ctx *plancontext.PlanningContext, aggregator *Aggr
if aggregator.Pushed {
return aggregator, rewrite.SameTree, nil
}
aggregator.Pushed = true
switch src := aggregator.Source.(type) {
case *Route:
// if we have a single sharded route, we can push it down
output, applyResult, err = pushDownAggregationThroughRoute(ctx, aggregator, src)
case *ApplyJoin:
output, applyResult, err = pushDownAggregationThroughJoin(ctx, aggregator, src)
if ctx.DelegateAggregation {
output, applyResult, err = pushDownAggregationThroughJoin(ctx, aggregator, src)
}
case *Filter:
output, applyResult, err = pushDownAggregationThroughFilter(ctx, aggregator, src)
if ctx.DelegateAggregation {
output, applyResult, err = pushDownAggregationThroughFilter(ctx, aggregator, src)
}
default:
return aggregator, rewrite.SameTree, nil
}

if err != nil {
return nil, nil, err
}

if output == nil {
return aggregator, rewrite.SameTree, nil
}

aggregator.Pushed = true
if applyResult != rewrite.SameTree && aggregator.Original {
aggregator.aggregateTheAggregates()
}
Expand Down Expand Up @@ -74,6 +87,10 @@ func pushDownAggregationThroughRoute(
return rewrite.Swap(aggregator, route, "push down aggregation under route - remove original")
}

if !ctx.DelegateAggregation {
return nil, nil, nil
}

// Create a new aggregator to be placed below the route.
aggrBelowRoute := aggregator.Clone([]ops.Operator{route.Source}).(*Aggregator)
aggrBelowRoute.Pushed = false
Expand Down Expand Up @@ -247,6 +264,10 @@ func pushDownAggregationThroughJoin(ctx *plancontext.PlanningContext, rootAggr *

joinColumns, output, err := splitAggrColumnsToLeftAndRight(ctx, rootAggr, join, lhs, rhs)
if err != nil {
// if we get this error, we just abort the splitting and fall back on simpler ways of solving the same query
if err == errAbortAggrPushing {
return nil, nil, nil
}
return nil, nil, err
}

Expand Down Expand Up @@ -276,6 +297,8 @@ func pushDownAggregationThroughJoin(ctx *plancontext.PlanningContext, rootAggr *
return rootAggr, rewrite.NewTree("push Aggregation under join", rootAggr), nil
}

var errAbortAggrPushing = fmt.Errorf("abort aggregation pushing")

func addColumnsFromLHSInJoinPredicates(ctx *plancontext.PlanningContext, rootAggr *Aggregator, join *ApplyJoin, lhs *joinPusher) error {
for _, pred := range join.JoinPredicates {
for _, expr := range pred.LHSExprs {
Expand Down Expand Up @@ -323,8 +346,17 @@ func splitGroupingToLeftAndRight(ctx *plancontext.PlanningContext, rootAggr *Agg
Original: aeWrap(groupBy.Inner),
RHSExpr: expr,
})
case deps.IsSolvedBy(lhs.tableID.Merge(rhs.tableID)):
jc, err := BreakExpressionInLHSandRHS(ctx, groupBy.SimplifiedExpr, lhs.tableID)
if err != nil {
return nil, err
}
for _, lhsExpr := range jc.LHSExprs {
lhs.addGrouping(ctx, NewGroupBy(lhsExpr, lhsExpr, aeWrap(lhsExpr)))
}
rhs.addGrouping(ctx, NewGroupBy(jc.RHSExpr, jc.RHSExpr, aeWrap(jc.RHSExpr)))
default:
return nil, vterrors.VT12001("grouping on columns from different sources")
return nil, vterrors.VT13001(fmt.Sprintf("grouping with bad dependencies %s", groupBy.SimplifiedExpr))
}
}
return groupingJCs, nil
Expand Down Expand Up @@ -464,7 +496,7 @@ func (ab *aggBuilder) handlePushThroughAggregation(ctx *plancontext.PlanningCont
case deps.IsSolvedBy(ab.rhs.tableID):
ab.pushThroughRight(aggr)
default:
return vterrors.VT12001("aggregation on columns from different sources: " + sqlparser.String(aggr.Original.Expr))
return errAbortAggrPushing
}
return nil
}
Expand Down Expand Up @@ -493,7 +525,7 @@ func (ab *aggBuilder) handleAggrWithCountStarMultiplier(ctx *plancontext.Plannin
rhsAE = aggr.Original

default:
return errHorizonNotPlanned()
return errAbortAggrPushing
}

ab.buildProjectionForAggr(lhsAE, rhsAE, aggr)
Expand Down
16 changes: 9 additions & 7 deletions go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser

func (a *Aggregator) GetColumns() ([]*sqlparser.AliasedExpr, error) {
// we update the incoming columns, so we know about any new columns that have been added
// in the optimization phase, other operators could be pushed down resulting in additional columns for aggregator.
// Aggregator should be made aware of these to truncate them in final result.
columns, err := a.Source.GetColumns()
if err != nil {
return nil, err
Expand Down Expand Up @@ -207,6 +209,13 @@ func (a *Aggregator) GetOrdering() ([]ops.OrderBy, error) {
}

func (a *Aggregator) planOffsets(ctx *plancontext.PlanningContext) error {
if !a.Pushed {
err := a.planOffsetsNotPushed(ctx)
systay marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
}

addColumn := func(aliasedExpr *sqlparser.AliasedExpr, addToGroupBy bool) (int, error) {
newSrc, offset, err := a.Source.AddColumn(ctx, aliasedExpr, true, addToGroupBy)
if err != nil {
Expand All @@ -220,13 +229,6 @@ func (a *Aggregator) planOffsets(ctx *plancontext.PlanningContext) error {
return offset, nil
}

if !a.Pushed {
err := a.planOffsetsNotPushed(ctx)
if err != nil {
return err
}
}

for idx, gb := range a.Grouping {
if gb.ColOffset == -1 {
offset, err := addColumn(aeWrap(gb.Inner), false)
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vtgate/planbuilder/operators/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ func (f *Filter) Clone(inputs []ops.Operator) ops.Operator {
predicatesClone := make([]sqlparser.Expr, len(f.Predicates))
systay marked this conversation as resolved.
Show resolved Hide resolved
copy(predicatesClone, f.Predicates)
return &Filter{
Source: inputs[0],
Predicates: predicatesClone,
Source: inputs[0],
Predicates: predicatesClone,
FinalPredicate: f.FinalPredicate,
}
}

Expand Down
16 changes: 12 additions & 4 deletions go/vt/vtgate/planbuilder/operators/horizon_expanding.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
/*
Copyright 2023 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -29,10 +32,6 @@ func expandHorizon(ctx *plancontext.PlanningContext, horizon horizonLike) (ops.O
return nil, nil, errHorizonNotPlanned()
}

if sel.Having != nil {
return nil, nil, errHorizonNotPlanned()
}

op, err := createProjectionFromSelect(ctx, horizon)
if err != nil {
return nil, nil, err
Expand All @@ -50,6 +49,15 @@ func expandHorizon(ctx *plancontext.PlanningContext, horizon horizonLike) (ops.O
}
}

if sel.Having != nil {
expr := sel.Having.Expr
op = &Filter{
Source: op,
Predicates: sqlparser.SplitAndExpression(nil, expr),
FinalPredicate: nil,
}
systay marked this conversation as resolved.
Show resolved Hide resolved
}

if len(qp.OrderExprs) > 0 {
op = &Ordering{
Source: op,
Expand Down
Loading