Skip to content

Commit

Permalink
support grouping on top of join plan
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Jul 22, 2021
1 parent 9b54895 commit cbad388
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 8 deletions.
11 changes: 8 additions & 3 deletions go/vt/vtgate/planbuilder/horizon_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ func (hp *horizonPlanning) haveToTruncate(v bool) {
func (hp *horizonPlanning) planAggregations() error {
newPlan := hp.plan
var oa *orderedAggregate
if !hasUniqueVindex(hp.vschema, hp.semTable, hp.qp.GroupByExprs) {
uniqVindex := hasUniqueVindex(hp.vschema, hp.semTable, hp.qp.GroupByExprs)
_, joinPlan := hp.plan.(*joinGen4)
if !uniqVindex || joinPlan {
eaggr := &engine.OrderedAggregate{}
oa = &orderedAggregate{
resultsBuilder: resultsBuilder{
Expand Down Expand Up @@ -219,6 +221,9 @@ func planGroupByGen4(groupExpr abstract.GroupBy, plan logicalPlan, semTable *sem
sel := node.Select.(*sqlparser.Select)
sel.GroupBy = append(sel.GroupBy, groupExpr.Inner)
return false, nil
case *joinGen4:
_, _, added, err := wrapAndPushExpr(groupExpr.Inner, groupExpr.WeightStrExpr, node, semTable)
return added, err
case *orderedAggregate:
keyCol, weightStringOffset, colAdded, err := wrapAndPushExpr(groupExpr.Inner, groupExpr.WeightStrExpr, node.input, semTable)
if err != nil {
Expand Down Expand Up @@ -315,6 +320,8 @@ func planOrderByForRoute(orderExprs []abstract.OrderBy, plan *route, semTable *s
return plan, origColCount != plan.Select.GetColumnCount(), nil
}

// wrapAndPushExpr pushes the expression and weighted_string function to the plan using semantics.SemTable
// It returns (expr offset, weight_string offset, new_column added, error)
func wrapAndPushExpr(expr sqlparser.Expr, weightStrExpr sqlparser.Expr, plan logicalPlan, semTable *semantics.SemTable) (int, int, bool, error) {
offset, added, err := pushProjection(&sqlparser.AliasedExpr{Expr: expr}, plan, semTable, true, true)
if err != nil {
Expand Down Expand Up @@ -370,8 +377,6 @@ func (hp *horizonPlanning) planOrderByForJoin(orderExprs []abstract.OrderBy, pla
return nil, err
}
plan.Left = newLeft
hp.needsTruncation = false // since this is a join, we can safely
// add extra columns and not need to truncate them
return plan, nil
}
sortPlan, err := hp.createMemorySortPlan(plan, orderExprs)
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtgate/planbuilder/route_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,16 @@ func (hp *horizonPlanning) planHorizon() (logicalPlan, error) {
return hp.plan, nil
}

func (hp horizonPlanning) truncateColumnsIfNeeded() error {
func (hp *horizonPlanning) truncateColumnsIfNeeded() error {
if !hp.needsTruncation {
return nil
}

switch p := hp.plan.(type) {
case *route:
p.eroute.SetTruncateColumnCount(hp.sel.GetColumnCount())
case *joinGen4:
// since this is a join, we can safely add extra columns and not need to truncate them
case *orderedAggregate:
p.eaggr.SetTruncateColumnCount(hp.sel.GetColumnCount())
case *memorySort:
Expand Down
143 changes: 143 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/aggr_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1927,3 +1927,146 @@ Gen4 plan same as above
]
}
}

# Grouping on join
"select user.a from user join user_extra group by user.a"
"unsupported: cross-shard query with aggregates"
{
"QueryType": "SELECT",
"Original": "select user.a from user join user_extra group by user.a",
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Ordered",
"GroupBy": "(0|1)",
"ResultColumns": 1,
"Inputs": [
{
"OperatorType": "Join",
"Variant": "Join",
"JoinColumnIndexes": "-1,-2",
"TableName": "`user`_user_extra",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "SelectScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select `user`.a, weight_string(`user`.a) from `user` where 1 != 1",
"OrderBy": "(0|1) ASC",
"Query": "select `user`.a, weight_string(`user`.a) from `user` order by `user`.a asc",
"Table": "`user`"
},
{
"OperatorType": "Route",
"Variant": "SelectScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select 1 from user_extra where 1 != 1",
"Query": "select 1 from user_extra",
"Table": "user_extra"
}
]
}
]
}
}

# Aggregate on join
"select user.a, count(*) from user join user_extra group by user.a"
"unsupported: cross-shard query with aggregates"
{
"QueryType": "SELECT",
"Original": "select user.a, count(*) from user join user_extra group by user.a",
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Ordered",
"Aggregates": "count(1)",
"GroupBy": "(0|2)",
"ResultColumns": 2,
"Inputs": [
{
"OperatorType": "Join",
"Variant": "Join",
"JoinColumnIndexes": "-1,-2,-3",
"TableName": "`user`_user_extra",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "SelectScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select `user`.a, count(*), weight_string(`user`.a) from `user` where 1 != 1",
"OrderBy": "(0|2) ASC",
"Query": "select `user`.a, count(*), weight_string(`user`.a) from `user` order by `user`.a asc",
"Table": "`user`"
},
{
"OperatorType": "Route",
"Variant": "SelectScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select 1 from user_extra where 1 != 1",
"Query": "select 1 from user_extra",
"Table": "user_extra"
}
]
}
]
}
}

# Aggregate on other table in join
"select user.a, count(user_extra.a) from user join user_extra group by user.a"
"unsupported: cross-shard query with aggregates"
{
"QueryType": "SELECT",
"Original": "select user.a, count(user_extra.a) from user join user_extra group by user.a",
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Ordered",
"Aggregates": "count(1)",
"GroupBy": "(0|2)",
"ResultColumns": 2,
"Inputs": [
{
"OperatorType": "Join",
"Variant": "Join",
"JoinColumnIndexes": "-1,1,-2",
"TableName": "`user`_user_extra",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "SelectScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select `user`.a, weight_string(`user`.a) from `user` where 1 != 1",
"OrderBy": "(0|1) ASC",
"Query": "select `user`.a, weight_string(`user`.a) from `user` order by `user`.a asc",
"Table": "`user`"
},
{
"OperatorType": "Route",
"Variant": "SelectScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select count(user_extra.a) from user_extra where 1 != 1",
"Query": "select count(user_extra.a) from user_extra",
"Table": "user_extra"
}
]
}
]
}
}
4 changes: 0 additions & 4 deletions go/vt/vtgate/planbuilder/testdata/unsupported_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,6 @@ Gen4 error: In aggregated query without GROUP BY, expression of SELECT list cont
"unsupported: cross-shard query with aggregates"
Gen4 error: unsupported: in scatter query: complex aggregate expression

# Aggregate detection (group by)
"select user.a from user join user_extra group by user.a"
"unsupported: cross-shard query with aggregates"

# group by and ',' joins
"select user.id from user, user_extra group by id"
"unsupported: cross-shard query with aggregates"
Expand Down

0 comments on commit cbad388

Please sign in to comment.