Skip to content

Commit

Permalink
Operator planner refactor (#13294)
Browse files Browse the repository at this point in the history
* fix debug output

Signed-off-by: Andres Taylor <andres@planetscale.com>

* remove unused debug method

Signed-off-by: Andres Taylor <andres@planetscale.com>

* refactor: extract horizon expanding into it's own file

Signed-off-by: Andres Taylor <andres@planetscale.com>

* collect all filter pushing in one place

Signed-off-by: Andres Taylor <andres@planetscale.com>

* don't reuse createProjectionWithoutAggr since it has bad expectations of aggregations

Signed-off-by: Andres Taylor <andres@planetscale.com>

* refactor: clean up how we calculate offsets

Signed-off-by: Andres Taylor <andres@planetscale.com>

* reintroduce filter pushing on route planning

Signed-off-by: Andres Taylor <andres@planetscale.com>

* refactor addOrderBysAndGroupBysForAggregations

Signed-off-by: Andres Taylor <andres@planetscale.com>

* extract into method

Signed-off-by: Andres Taylor <andres@planetscale.com>

* refactor code to make it easier to add more phases

Signed-off-by: Andres Taylor <andres@planetscale.com>

* split a phase in two

Signed-off-by: Andres Taylor <andres@planetscale.com>

* refactor in preparation for adding having

Signed-off-by: Andres Taylor <andres@planetscale.com>

* make sure to also clone the result columns

Signed-off-by: Andres Taylor <andres@planetscale.com>

* stop fuzzing on CI

Signed-off-by: Andres Taylor <andres@planetscale.com>

* always keep aggregator columns updated

Signed-off-by: Andres Taylor <andres@planetscale.com>

* remove passed in functions to useOffsets instread passed in the operator

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* merge 2 phases into 1

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* refactor some phases part

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

---------

Signed-off-by: Andres Taylor <andres@planetscale.com>
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
Co-authored-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
systay and harshit-gangal authored Jun 13, 2023
1 parent 18e639f commit cc380c4
Show file tree
Hide file tree
Showing 28 changed files with 446 additions and 540 deletions.
1 change: 1 addition & 0 deletions go/test/endtoend/vtgate/queries/aggregation/fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type (
)

func TestFuzzAggregations(t *testing.T) {
t.Skip("dont run on CI for now")
// This test randomizes values and queries, and checks that mysql returns the same values that Vitess does
mcmp, closer := start(t)
defer closer()
Expand Down
37 changes: 24 additions & 13 deletions go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,13 @@ func (a *Aggregator) addColumnWithoutPushing(expr *sqlparser.AliasedExpr, addToG
groupBy.ColOffset = offset
a.Grouping = append(a.Grouping, groupBy)
} else {
aggr := NewAggr(opcode.AggregateRandom, nil, expr, expr.As.String())
var aggr Aggr
switch e := expr.Expr.(type) {
case sqlparser.AggrFunc:
aggr = createAggrFromAggrFunc(e, expr)
default:
aggr = NewAggr(opcode.AggregateRandom, nil, expr, expr.As.String())
}
aggr.ColOffset = offset
a.Aggregations = append(a.Aggregations, aggr)
}
Expand Down Expand Up @@ -159,35 +165,40 @@ func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser
return a, offset, nil
}

func (a *Aggregator) GetColumns() (columns []*sqlparser.AliasedExpr, err error) {
return a.Columns, nil
}
func (a *Aggregator) GetColumns() ([]*sqlparser.AliasedExpr, error) {
// we update the incoming columns, so we know about any new columns that have been added
columns, err := a.Source.GetColumns()
if err != nil {
return nil, err
}

func (a *Aggregator) Description() ops.OpDescription {
return ops.OpDescription{
OperatorType: "Aggregator",
// if this operator is producing more columns than expected, we want to know about it
if len(columns) > len(a.Columns) {
a.Columns = append(a.Columns, columns[len(a.Columns):]...)
}

return a.Columns, nil
}

func (a *Aggregator) ShortDescription() string {
columnns := slices2.Map(a.Columns, func(from *sqlparser.AliasedExpr) string {
return sqlparser.String(from)
})

org := ""
if a.Original {
org = "ORG "
}

if len(a.Grouping) == 0 {
return strings.Join(columnns, ", ")
return fmt.Sprintf("%s%s", org, strings.Join(columnns, ", "))
}

var grouping []string
for _, gb := range a.Grouping {
grouping = append(grouping, sqlparser.String(gb.SimplifiedExpr))
}

org := ""
if a.Original {
org = "ORG "
}

return fmt.Sprintf("%s%s group by %s", org, strings.Join(columnns, ", "), strings.Join(grouping, ","))
}

Expand Down
15 changes: 0 additions & 15 deletions go/vt/vtgate/planbuilder/operators/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,21 +287,6 @@ func (a *ApplyJoin) addOffset(offset int) {
a.Columns = append(a.Columns, offset)
}

func (a *ApplyJoin) Description() ops.OpDescription {
other := map[string]any{}
if len(a.Columns) > 0 {
other["OutputColumns"] = a.Columns
}
if a.Predicate != nil {
other["Predicate"] = sqlparser.String(a.Predicate)
}
return ops.OpDescription{
OperatorType: "Join",
Variant: "Apply",
Other: other,
}
}

func (a *ApplyJoin) ShortDescription() string {
pred := sqlparser.String(a.Predicate)
columns := slices2.Map(a.ColumnsAST, func(from JoinColumn) string {
Expand Down
14 changes: 0 additions & 14 deletions go/vt/vtgate/planbuilder/operators/correlated_subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,6 @@ func (s *SubQueryOp) SetInputs(ops []ops.Operator) {
s.Outer, s.Inner = ops[0], ops[1]
}

func (s *SubQueryOp) Description() ops.OpDescription {
return ops.OpDescription{
OperatorType: "SubQuery",
Variant: "Apply",
}
}

func (s *SubQueryOp) ShortDescription() string {
return ""
}
Expand Down Expand Up @@ -114,13 +107,6 @@ func (c *CorrelatedSubQueryOp) SetInputs(ops []ops.Operator) {
c.Outer, c.Inner = ops[0], ops[1]
}

func (c *CorrelatedSubQueryOp) Description() ops.OpDescription {
return ops.OpDescription{
OperatorType: "SubQuery",
Variant: "Correlated",
}
}

func (c *CorrelatedSubQueryOp) ShortDescription() string {
return ""
}
6 changes: 0 additions & 6 deletions go/vt/vtgate/planbuilder/operators/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ func (d *Delete) GetOrdering() ([]ops.OrderBy, error) {
return nil, nil
}

func (d *Delete) Description() ops.OpDescription {
return ops.OpDescription{
OperatorType: "Delete",
}
}

func (d *Delete) ShortDescription() string {
return fmt.Sprintf("%s.%s %s", d.VTable.Keyspace.Name, d.VTable.Name.String(), sqlparser.String(d.AST.Where))
}
6 changes: 0 additions & 6 deletions go/vt/vtgate/planbuilder/operators/derived.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,6 @@ func (d *Derived) setQP(qp *QueryProjection) {
d.QP = qp
}

func (d *Derived) Description() ops.OpDescription {
return ops.OpDescription{
OperatorType: "Derived",
}
}

func (d *Derived) ShortDescription() string {
return d.Alias
}
6 changes: 0 additions & 6 deletions go/vt/vtgate/planbuilder/operators/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,6 @@ func (d *Distinct) GetColumns() ([]*sqlparser.AliasedExpr, error) {
return d.Source.GetColumns()
}

func (d *Distinct) Description() ops.OpDescription {
return ops.OpDescription{
OperatorType: "Distinct",
}
}

func (d *Distinct) ShortDescription() string {
return ""
}
Expand Down
36 changes: 15 additions & 21 deletions go/vt/vtgate/planbuilder/operators/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ limitations under the License.
package operators

import (
"strings"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/rewrite"
Expand Down Expand Up @@ -114,38 +118,28 @@ func (f *Filter) Compact(*plancontext.PlanningContext) (ops.Operator, *rewrite.A
}

func (f *Filter) planOffsets(ctx *plancontext.PlanningContext) error {
resolveColumn := func(col *sqlparser.ColName) (int, error) {
newSrc, offset, err := f.Source.AddColumn(ctx, aeWrap(col), true, false)
if err != nil {
return 0, err
}
f.Source = newSrc
return offset, nil
}
cfg := &evalengine.Config{
ResolveType: ctx.SemTable.TypeForExpr,
Collation: ctx.SemTable.Collation,
ResolveColumn: resolveColumn,
ResolveType: ctx.SemTable.TypeForExpr,
Collation: ctx.SemTable.Collation,
}

eexpr, err := evalengine.Translate(sqlparser.AndExpressions(f.Predicates...), cfg)
predicate := sqlparser.AndExpressions(f.Predicates...)
rewritten, err := useOffsets(ctx, predicate, f)
if err != nil {
return err
}
eexpr, err := evalengine.Translate(rewritten, cfg)
if err != nil {
if strings.HasPrefix(err.Error(), evalengine.ErrTranslateExprNotSupported) {
return vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "%s: %s", evalengine.ErrTranslateExprNotSupported, sqlparser.String(predicate))
}
return err
}

f.FinalPredicate = eexpr
return nil
}

func (f *Filter) Description() ops.OpDescription {
return ops.OpDescription{
OperatorType: "Filter",
Other: map[string]any{
"Predicate": sqlparser.String(sqlparser.AndExpressions(f.Predicates...)),
},
}
}

func (f *Filter) ShortDescription() string {
return sqlparser.String(sqlparser.AndExpressions(f.Predicates...))
}
6 changes: 0 additions & 6 deletions go/vt/vtgate/planbuilder/operators/horizon.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,6 @@ func (h *Horizon) setQP(qp *QueryProjection) {
h.QP = qp
}

func (h *Horizon) Description() ops.OpDescription {
return ops.OpDescription{
OperatorType: "Horizon",
}
}

func (h *Horizon) ShortDescription() string {
return ""
}
Loading

0 comments on commit cc380c4

Please sign in to comment.