Skip to content

Commit

Permalink
vtgate planner: HAVING in the new operator horizon planner (#13289)
Browse files Browse the repository at this point in the history
Co-authored-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
systay and harshit-gangal authored Jun 14, 2023
1 parent 6bd7cae commit 081194d
Show file tree
Hide file tree
Showing 14 changed files with 872 additions and 579 deletions.
44 changes: 38 additions & 6 deletions go/vt/vtgate/planbuilder/operators/aggregation_pushing.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,31 @@ func tryPushingDownAggregator(ctx *plancontext.PlanningContext, aggregator *Aggr
if aggregator.Pushed {
return aggregator, rewrite.SameTree, nil
}
aggregator.Pushed = true
switch src := aggregator.Source.(type) {
case *Route:
// if we have a single sharded route, we can push it down
output, applyResult, err = pushDownAggregationThroughRoute(ctx, aggregator, src)
case *ApplyJoin:
output, applyResult, err = pushDownAggregationThroughJoin(ctx, aggregator, src)
if ctx.DelegateAggregation {
output, applyResult, err = pushDownAggregationThroughJoin(ctx, aggregator, src)
}
case *Filter:
output, applyResult, err = pushDownAggregationThroughFilter(ctx, aggregator, src)
if ctx.DelegateAggregation {
output, applyResult, err = pushDownAggregationThroughFilter(ctx, aggregator, src)
}
default:
return aggregator, rewrite.SameTree, nil
}

if err != nil {
return nil, nil, err
}

if output == nil {
return aggregator, rewrite.SameTree, nil
}

aggregator.Pushed = true
if applyResult != rewrite.SameTree && aggregator.Original {
aggregator.aggregateTheAggregates()
}
Expand Down Expand Up @@ -74,6 +87,10 @@ func pushDownAggregationThroughRoute(
return rewrite.Swap(aggregator, route, "push down aggregation under route - remove original")
}

if !ctx.DelegateAggregation {
return nil, nil, nil
}

// Create a new aggregator to be placed below the route.
aggrBelowRoute := aggregator.Clone([]ops.Operator{route.Source}).(*Aggregator)
aggrBelowRoute.Pushed = false
Expand Down Expand Up @@ -247,6 +264,10 @@ func pushDownAggregationThroughJoin(ctx *plancontext.PlanningContext, rootAggr *

joinColumns, output, err := splitAggrColumnsToLeftAndRight(ctx, rootAggr, join, lhs, rhs)
if err != nil {
// if we get this error, we just abort the splitting and fall back on simpler ways of solving the same query
if err == errAbortAggrPushing {
return nil, nil, nil
}
return nil, nil, err
}

Expand Down Expand Up @@ -276,6 +297,8 @@ func pushDownAggregationThroughJoin(ctx *plancontext.PlanningContext, rootAggr *
return rootAggr, rewrite.NewTree("push Aggregation under join", rootAggr), nil
}

var errAbortAggrPushing = fmt.Errorf("abort aggregation pushing")

func addColumnsFromLHSInJoinPredicates(ctx *plancontext.PlanningContext, rootAggr *Aggregator, join *ApplyJoin, lhs *joinPusher) error {
for _, pred := range join.JoinPredicates {
for _, expr := range pred.LHSExprs {
Expand Down Expand Up @@ -323,8 +346,17 @@ func splitGroupingToLeftAndRight(ctx *plancontext.PlanningContext, rootAggr *Agg
Original: aeWrap(groupBy.Inner),
RHSExpr: expr,
})
case deps.IsSolvedBy(lhs.tableID.Merge(rhs.tableID)):
jc, err := BreakExpressionInLHSandRHS(ctx, groupBy.SimplifiedExpr, lhs.tableID)
if err != nil {
return nil, err
}
for _, lhsExpr := range jc.LHSExprs {
lhs.addGrouping(ctx, NewGroupBy(lhsExpr, lhsExpr, aeWrap(lhsExpr)))
}
rhs.addGrouping(ctx, NewGroupBy(jc.RHSExpr, jc.RHSExpr, aeWrap(jc.RHSExpr)))
default:
return nil, vterrors.VT12001("grouping on columns from different sources")
return nil, vterrors.VT13001(fmt.Sprintf("grouping with bad dependencies %s", groupBy.SimplifiedExpr))
}
}
return groupingJCs, nil
Expand Down Expand Up @@ -464,7 +496,7 @@ func (ab *aggBuilder) handlePushThroughAggregation(ctx *plancontext.PlanningCont
case deps.IsSolvedBy(ab.rhs.tableID):
ab.pushThroughRight(aggr)
default:
return vterrors.VT12001("aggregation on columns from different sources: " + sqlparser.String(aggr.Original.Expr))
return errAbortAggrPushing
}
return nil
}
Expand Down Expand Up @@ -493,7 +525,7 @@ func (ab *aggBuilder) handleAggrWithCountStarMultiplier(ctx *plancontext.Plannin
rhsAE = aggr.Original

default:
return errHorizonNotPlanned()
return errAbortAggrPushing
}

ab.buildProjectionForAggr(lhsAE, rhsAE, aggr)
Expand Down
13 changes: 6 additions & 7 deletions go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser

func (a *Aggregator) GetColumns() ([]*sqlparser.AliasedExpr, error) {
// we update the incoming columns, so we know about any new columns that have been added
// in the optimization phase, other operators could be pushed down resulting in additional columns for aggregator.
// Aggregator should be made aware of these to truncate them in final result.
columns, err := a.Source.GetColumns()
if err != nil {
return nil, err
Expand Down Expand Up @@ -207,6 +209,10 @@ func (a *Aggregator) GetOrdering() ([]ops.OrderBy, error) {
}

func (a *Aggregator) planOffsets(ctx *plancontext.PlanningContext) error {
if !a.Pushed {
return a.planOffsetsNotPushed(ctx)
}

addColumn := func(aliasedExpr *sqlparser.AliasedExpr, addToGroupBy bool) (int, error) {
newSrc, offset, err := a.Source.AddColumn(ctx, aliasedExpr, true, addToGroupBy)
if err != nil {
Expand All @@ -220,13 +226,6 @@ func (a *Aggregator) planOffsets(ctx *plancontext.PlanningContext) error {
return offset, nil
}

if !a.Pushed {
err := a.planOffsetsNotPushed(ctx)
if err != nil {
return err
}
}

for idx, gb := range a.Grouping {
if gb.ColOffset == -1 {
offset, err := addColumn(aeWrap(gb.Inner), false)
Expand Down
9 changes: 5 additions & 4 deletions go/vt/vtgate/planbuilder/operators/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package operators
import (
"strings"

"golang.org/x/exp/slices"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -46,11 +48,10 @@ func newFilter(op ops.Operator, expr sqlparser.Expr) ops.Operator {

// Clone implements the Operator interface
func (f *Filter) Clone(inputs []ops.Operator) ops.Operator {
predicatesClone := make([]sqlparser.Expr, len(f.Predicates))
copy(predicatesClone, f.Predicates)
return &Filter{
Source: inputs[0],
Predicates: predicatesClone,
Source: inputs[0],
Predicates: slices.Clone(f.Predicates),
FinalPredicate: f.FinalPredicate,
}
}

Expand Down
15 changes: 11 additions & 4 deletions go/vt/vtgate/planbuilder/operators/horizon_expanding.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
/*
Copyright 2023 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -29,10 +32,6 @@ func expandHorizon(ctx *plancontext.PlanningContext, horizon horizonLike) (ops.O
return nil, nil, errHorizonNotPlanned()
}

if sel.Having != nil {
return nil, nil, errHorizonNotPlanned()
}

op, err := createProjectionFromSelect(ctx, horizon)
if err != nil {
return nil, nil, err
Expand All @@ -50,6 +49,14 @@ func expandHorizon(ctx *plancontext.PlanningContext, horizon horizonLike) (ops.O
}
}

if sel.Having != nil {
op = &Filter{
Source: op,
Predicates: sqlparser.SplitAndExpression(nil, sel.Having.Expr),
FinalPredicate: nil,
}
}

if len(qp.OrderExprs) > 0 {
op = &Ordering{
Source: op,
Expand Down
Loading

0 comments on commit 081194d

Please sign in to comment.