Skip to content

Commit

Permalink
Merge pull request #8506 from planetscale/gen4-distinct-2
Browse files Browse the repository at this point in the history
Gen4: Distinct plan refactor and support grouping and aggregation on join query
  • Loading branch information
systay authored Jul 22, 2021
2 parents bd205e9 + e4c82d3 commit 283bde1
Show file tree
Hide file tree
Showing 12 changed files with 719 additions and 166 deletions.
41 changes: 29 additions & 12 deletions go/vt/vtgate/planbuilder/abstract/queryprojection.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,6 @@ func CreateQPFromSelect(sel *sqlparser.Select) (*QueryProjection, error) {
qp.GroupByExprs = append(qp.GroupByExprs, GroupBy{Inner: expr, WeightStrExpr: weightStrExpr})
}

if qp.HasAggr {
expr := qp.getNonAggrExprNotMatchingGroupByExprs()
// if we have aggregation functions, non aggregating columns and GROUP BY,
// the non-aggregating expressions must all be listed in the GROUP BY list
if expr != nil {
if len(qp.GroupByExprs) > 0 {
return nil, vterrors.NewErrorf(vtrpcpb.Code_INVALID_ARGUMENT, vterrors.WrongFieldWithGroup, "Expression of SELECT list is not in GROUP BY clause and contains nonaggregated column '%s' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by", sqlparser.String(expr))
}
return nil, vterrors.NewErrorf(vtrpcpb.Code_INVALID_ARGUMENT, vterrors.MixOfGroupFuncAndFields, "In aggregated query without GROUP BY, expression of SELECT list contains nonaggregated column '%s'; this is incompatible with sql_mode=only_full_group_by", sqlparser.String(expr))
}
}

canPushDownSorting := true
for _, order := range sel.OrderBy {
expr, weightStrExpr, err := qp.getSimplifiedExpr(order.Expr, "order clause")
Expand All @@ -121,10 +109,24 @@ func CreateQPFromSelect(sel *sqlparser.Select) (*QueryProjection, error) {
})
canPushDownSorting = canPushDownSorting && !sqlparser.ContainsAggregation(weightStrExpr)
}

if qp.HasAggr || len(qp.GroupByExprs) > 0 {
expr := qp.getNonAggrExprNotMatchingGroupByExprs()
// if we have aggregation functions, non aggregating columns and GROUP BY,
// the non-aggregating expressions must all be listed in the GROUP BY list
if expr != nil {
if len(qp.GroupByExprs) > 0 {
return nil, vterrors.NewErrorf(vtrpcpb.Code_INVALID_ARGUMENT, vterrors.WrongFieldWithGroup, "Expression of SELECT list is not in GROUP BY clause and contains nonaggregated column '%s' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by", sqlparser.String(expr))
}
return nil, vterrors.NewErrorf(vtrpcpb.Code_INVALID_ARGUMENT, vterrors.MixOfGroupFuncAndFields, "In aggregated query without GROUP BY, expression of SELECT list contains nonaggregated column '%s'; this is incompatible with sql_mode=only_full_group_by", sqlparser.String(expr))
}
}

if qp.Distinct && !qp.HasAggr {
qp.GroupByExprs = nil
}
qp.CanPushDownSorting = canPushDownSorting

return qp, nil
}

Expand Down Expand Up @@ -159,6 +161,21 @@ func (qp *QueryProjection) getNonAggrExprNotMatchingGroupByExprs() sqlparser.Exp
return expr.Col.Expr
}
}
for _, order := range qp.OrderExprs {
if sqlparser.IsAggregation(order.WeightStrExpr) {
continue
}
isGroupByOk := false
for _, groupByExpr := range qp.GroupByExprs {
if sqlparser.EqualsExpr(groupByExpr.WeightStrExpr, order.WeightStrExpr) {
isGroupByOk = true
break
}
}
if !isGroupByOk {
return order.Inner.Expr
}
}
return nil
}

Expand Down
103 changes: 100 additions & 3 deletions go/vt/vtgate/planbuilder/horizon_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ func (hp *horizonPlanning) haveToTruncate(v bool) {
func (hp *horizonPlanning) planAggregations() error {
newPlan := hp.plan
var oa *orderedAggregate
if !hasUniqueVindex(hp.vschema, hp.semTable, hp.qp.GroupByExprs) {
uniqVindex := hasUniqueVindex(hp.vschema, hp.semTable, hp.qp.GroupByExprs)
_, joinPlan := hp.plan.(*joinGen4)
if !uniqVindex || joinPlan {
eaggr := &engine.OrderedAggregate{}
oa = &orderedAggregate{
resultsBuilder: resultsBuilder{
Expand Down Expand Up @@ -219,6 +221,9 @@ func planGroupByGen4(groupExpr abstract.GroupBy, plan logicalPlan, semTable *sem
sel := node.Select.(*sqlparser.Select)
sel.GroupBy = append(sel.GroupBy, groupExpr.Inner)
return false, nil
case *joinGen4:
_, _, added, err := wrapAndPushExpr(groupExpr.Inner, groupExpr.WeightStrExpr, node, semTable)
return added, err
case *orderedAggregate:
keyCol, weightStringOffset, colAdded, err := wrapAndPushExpr(groupExpr.Inner, groupExpr.WeightStrExpr, node.input, semTable)
if err != nil {
Expand Down Expand Up @@ -315,6 +320,8 @@ func planOrderByForRoute(orderExprs []abstract.OrderBy, plan *route, semTable *s
return plan, origColCount != plan.Select.GetColumnCount(), nil
}

// wrapAndPushExpr pushes the expression and weighted_string function to the plan using semantics.SemTable
// It returns (expr offset, weight_string offset, new_column added, error)
func wrapAndPushExpr(expr sqlparser.Expr, weightStrExpr sqlparser.Expr, plan logicalPlan, semTable *semantics.SemTable) (int, int, bool, error) {
offset, added, err := pushProjection(&sqlparser.AliasedExpr{Expr: expr}, plan, semTable, true, true)
if err != nil {
Expand Down Expand Up @@ -370,8 +377,6 @@ func (hp *horizonPlanning) planOrderByForJoin(orderExprs []abstract.OrderBy, pla
return nil, err
}
plan.Left = newLeft
hp.needsTruncation = false // since this is a join, we can safely
// add extra columns and not need to truncate them
return plan, nil
}
sortPlan, err := hp.createMemorySortPlan(plan, orderExprs)
Expand Down Expand Up @@ -457,3 +462,95 @@ func allLeft(orderExprs []abstract.OrderBy, semTable *semantics.SemTable, lhsTab
}
return true
}

func (hp *horizonPlanning) planDistinct() error {
if !hp.qp.NeedsDistinct() {
return nil
}
switch p := hp.plan.(type) {
case *route:
// we always make the underlying query distinct,
// and then we might also add a distinct operator on top if it is needed
p.Select.MakeDistinct()
if !p.isSingleShard() && !selectHasUniqueVindex(hp.vschema, hp.semTable, hp.qp.SelectExprs) {
return hp.addDistinct()
}
return nil
case *joinGen4:
return hp.addDistinct()
case *orderedAggregate:
return hp.planDistinctOA(p)
default:
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unknown plan type for DISTINCT %T", hp.plan)
}
}

func (hp *horizonPlanning) planDistinctOA(currPlan *orderedAggregate) error {
eaggr := &engine.OrderedAggregate{}
oa := &orderedAggregate{
resultsBuilder: resultsBuilder{
logicalPlanCommon: newBuilderCommon(hp.plan),
weightStrings: make(map[*resultColumn]int),
truncater: eaggr,
},
eaggr: eaggr,
}
for _, sExpr := range hp.qp.SelectExprs {
found := false
for _, grpParam := range currPlan.eaggr.GroupByKeys {
if sqlparser.EqualsExpr(sExpr.Col.Expr, grpParam.Expr) {
found = true
eaggr.GroupByKeys = append(eaggr.GroupByKeys, grpParam)
break
}
}
if found {
continue
}
for _, aggrParam := range currPlan.eaggr.Aggregates {
if sqlparser.EqualsExpr(sExpr.Col.Expr, aggrParam.Expr) {
found = true
eaggr.GroupByKeys = append(eaggr.GroupByKeys, engine.GroupByParams{KeyCol: aggrParam.Col, WeightStringCol: -1})
break
}
}
if !found {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unable to plan distinct query as the column is not projected: %s", sqlparser.String(sExpr.Col))
}
}
hp.plan = oa
return nil
}

func (hp *horizonPlanning) addDistinct() error {
eaggr := &engine.OrderedAggregate{}
oa := &orderedAggregate{
resultsBuilder: resultsBuilder{
logicalPlanCommon: newBuilderCommon(hp.plan),
weightStrings: make(map[*resultColumn]int),
truncater: eaggr,
},
eaggr: eaggr,
}
for index, sExpr := range hp.qp.SelectExprs {
grpParam := engine.GroupByParams{KeyCol: index, WeightStringCol: -1}
_, wOffset, added, err := wrapAndPushExpr(sExpr.Col.Expr, sExpr.Col.Expr, hp.plan, hp.semTable)
if err != nil {
return err
}
hp.needsTruncation = hp.needsTruncation || added
grpParam.WeightStringCol = wOffset
eaggr.GroupByKeys = append(eaggr.GroupByKeys, grpParam)
}
hp.plan = oa
return nil
}

func selectHasUniqueVindex(vschema ContextVSchema, semTable *semantics.SemTable, sel []abstract.SelectExpr) bool {
for _, expr := range sel {
if exprHasUniqueVindex(vschema, semTable, expr.Col.Expr) {
return true
}
}
return false
}
26 changes: 11 additions & 15 deletions go/vt/vtgate/planbuilder/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,15 +464,19 @@ func (vw *vschemaWrapper) ErrorIfShardedF(keyspace *vindexes.Keyspace, _, errFmt
return nil
}

func (vw *vschemaWrapper) currentDb() string {
ksName := ""
if vw.keyspace != nil {
ksName = vw.keyspace.Name
}
return ksName
}

func escapeNewLines(in string) string {
return strings.ReplaceAll(in, "\n", "\\n")
}

func testFile(t *testing.T, filename, tempDir string, vschema *vschemaWrapper, checkGen4equalPlan bool) {
ksName := ""
if vschema.keyspace != nil {
ksName = vschema.keyspace.Name
}
var checkAllTests = false
t.Run(filename, func(t *testing.T) {
expected := &strings.Builder{}
Expand All @@ -481,7 +485,7 @@ func testFile(t *testing.T, filename, tempDir string, vschema *vschemaWrapper, c
for tcase := range iterateExecFile(filename) {
t.Run(fmt.Sprintf("%d V3: %s", tcase.lineno, tcase.comments), func(t *testing.T) {
vschema.version = V3
plan, err := TestBuilder(tcase.input, vschema, ksName)
plan, err := TestBuilder(tcase.input, vschema, vschema.currentDb())
out := getPlanOrErrorOutput(err, plan)

if out != tcase.output {
Expand Down Expand Up @@ -558,11 +562,7 @@ func getPlanOutput(tcase testCase, vschema *vschemaWrapper) (out string, err err
out = fmt.Sprintf("panicked: %v\n%s", r, string(debug.Stack()))
}
}()
ksName := ""
if vschema.keyspace != nil {
ksName = vschema.keyspace.Name
}
plan, err := TestBuilder(tcase.input, vschema, ksName)
plan, err := TestBuilder(tcase.input, vschema, vschema.currentDb())
out = getPlanOrErrorOutput(err, plan)
return out, err
}
Expand Down Expand Up @@ -747,16 +747,12 @@ func BenchmarkSelectVsDML(b *testing.B) {
}

func benchmarkPlanner(b *testing.B, version PlannerVersion, testCases []testCase, vschema *vschemaWrapper) {
ksName := ""
if vschema.keyspace != nil {
ksName = vschema.keyspace.Name
}
b.ReportAllocs()
for n := 0; n < b.N; n++ {
for _, tcase := range testCases {
if tcase.output2ndPlanner != "" {
vschema.version = version
_, _ = TestBuilder(tcase.input, vschema, ksName)
_, _ = TestBuilder(tcase.input, vschema, vschema.currentDb())
}
}
}
Expand Down
43 changes: 8 additions & 35 deletions go/vt/vtgate/planbuilder/route_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ type horizonPlanning struct {
vtgateGrouping bool
}

func (hp horizonPlanning) planHorizon() (logicalPlan, error) {
func (hp *horizonPlanning) planHorizon() (logicalPlan, error) {
rb, ok := hp.plan.(*route)
if !ok && hp.semTable.ProjectionErr != nil {
return nil, hp.semTable.ProjectionErr
Expand Down Expand Up @@ -238,29 +238,29 @@ func (hp horizonPlanning) planHorizon() (logicalPlan, error) {
}
}

err = hp.truncateColumnsIfNeeded()
err = hp.planDistinct()
if err != nil {
return nil, err
}

if hp.qp.NeedsDistinct() {
hp.plan, err = pushDistinct(hp.plan, hp.semTable, hp.vschema, hp.qp)
if err != nil {
return nil, err
}
err = hp.truncateColumnsIfNeeded()
if err != nil {
return nil, err
}

return hp.plan, nil
}

func (hp horizonPlanning) truncateColumnsIfNeeded() error {
func (hp *horizonPlanning) truncateColumnsIfNeeded() error {
if !hp.needsTruncation {
return nil
}

switch p := hp.plan.(type) {
case *route:
p.eroute.SetTruncateColumnCount(hp.sel.GetColumnCount())
case *joinGen4:
// since this is a join, we can safely add extra columns and not need to truncate them
case *orderedAggregate:
p.eaggr.SetTruncateColumnCount(hp.sel.GetColumnCount())
case *memorySort:
Expand All @@ -272,33 +272,6 @@ func (hp horizonPlanning) truncateColumnsIfNeeded() error {
return nil
}

func pushDistinct(plan logicalPlan, semTable *semantics.SemTable, vschema ContextVSchema, qp *abstract.QueryProjection) (logicalPlan, error) {
switch p := plan.(type) {
case *route:
// we always make the underlying query distinct,
// and then we might also add a distinct operator on top if it is needed
p.Select.MakeDistinct()
if !p.isSingleShard() && !selectHasUniqueVindex(vschema, semTable, qp.SelectExprs) {
plan = newDistinct(plan)
}
return plan, nil
case *orderedAggregate, *joinGen4:
return newDistinct(plan), nil

default:
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unknown plan type for DISTINCT %T", plan)
}
}

func selectHasUniqueVindex(vschema ContextVSchema, semTable *semantics.SemTable, sel []abstract.SelectExpr) bool {
for _, expr := range sel {
if exprHasUniqueVindex(vschema, semTable, expr.Col.Expr) {
return true
}
}
return false
}

func exprHasUniqueVindex(vschema ContextVSchema, semTable *semantics.SemTable, expr sqlparser.Expr) bool {
col, isCol := expr.(*sqlparser.ColName)
if !isCol {
Expand Down
Loading

0 comments on commit 283bde1

Please sign in to comment.