Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed regression in v3 for grouping by integer functions #8856

Merged
merged 4 commits into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions go/test/endtoend/vtgate/aggr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ func TestAggregateTypes(t *testing.T) {
assertMatches(t, conn, "select val1, sum(distinct val2), sum(val2) from aggr_test group by val1", `[[VARCHAR("a") DECIMAL(1) DECIMAL(2)] [VARCHAR("b") DECIMAL(1) DECIMAL(1)] [VARCHAR("c") DECIMAL(7) DECIMAL(7)] [VARCHAR("d") NULL NULL] [VARCHAR("e") DECIMAL(1) DECIMAL(1)]]`)
assertMatches(t, conn, "select val1, count(distinct val2) k, count(*) from aggr_test group by val1 order by k desc, val1", `[[VARCHAR("c") INT64(2) INT64(2)] [VARCHAR("a") INT64(1) INT64(2)] [VARCHAR("b") INT64(1) INT64(1)] [VARCHAR("e") INT64(1) INT64(2)] [VARCHAR("d") INT64(0) INT64(1)]]`)
assertMatches(t, conn, "select val1, count(distinct val2) k, count(*) from aggr_test group by val1 order by k desc, val1 limit 4", `[[VARCHAR("c") INT64(2) INT64(2)] [VARCHAR("a") INT64(1) INT64(2)] [VARCHAR("b") INT64(1) INT64(1)] [VARCHAR("e") INT64(1) INT64(2)]]`)
assertMatches(t, conn, "select ascii(val1) as a, count(*) from aggr_test group by a", `[[INT64(65) INT64(1)] [INT64(69) INT64(1)] [INT64(97) INT64(1)] [INT64(98) INT64(1)] [INT64(99) INT64(2)] [INT64(100) INT64(1)] [INT64(101) INT64(1)]]`)
assertMatches(t, conn, "select ascii(val1) as a, count(*) from aggr_test group by a order by a", `[[INT64(65) INT64(1)] [INT64(69) INT64(1)] [INT64(97) INT64(1)] [INT64(98) INT64(1)] [INT64(99) INT64(2)] [INT64(100) INT64(1)] [INT64(101) INT64(1)]]`)
assertMatches(t, conn, "select ascii(val1) as a, count(*) from aggr_test group by a order by 2, a", `[[INT64(65) INT64(1)] [INT64(69) INT64(1)] [INT64(97) INT64(1)] [INT64(98) INT64(1)] [INT64(100) INT64(1)] [INT64(101) INT64(1)] [INT64(99) INT64(2)]]`)
assertMatches(t, conn, "select val1 as a, count(*) from aggr_test group by a", `[[VARCHAR("a") INT64(2)] [VARCHAR("b") INT64(1)] [VARCHAR("c") INT64(2)] [VARCHAR("d") INT64(1)] [VARCHAR("e") INT64(2)]]`)
assertMatches(t, conn, "select val1 as a, count(*) from aggr_test group by a order by a", `[[VARCHAR("a") INT64(2)] [VARCHAR("b") INT64(1)] [VARCHAR("c") INT64(2)] [VARCHAR("d") INT64(1)] [VARCHAR("e") INT64(2)]]`)
assertMatches(t, conn, "select val1 as a, count(*) from aggr_test group by a order by 2, a", `[[VARCHAR("b") INT64(1)] [VARCHAR("d") INT64(1)] [VARCHAR("a") INT64(2)] [VARCHAR("c") INT64(2)] [VARCHAR("e") INT64(2)]]`)
exec(t, conn, "delete from aggr_test")
}

Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/ordered_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type GroupByParams struct {
KeyCol int
WeightStringCol int
Expr sqlparser.Expr
FromGroupBy bool
}

// String returns a string. Used for plan descriptions
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ type OrderByParams struct {
WeightStringCol int
Desc bool
StarColFixedIndex int
// v3 specific boolean. Used to also add weight strings originating from GroupBys to the Group by clause
FromGroupBy bool
}

// String returns a string. Used for plan descriptions
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1650,7 +1650,7 @@ func TestSelectScatterAggregate(t *testing.T) {
require.NoError(t, err)

wantQueries := []*querypb.BoundQuery{{
Sql: "select col, sum(foo), weight_string(col) from `user` group by col order by col asc",
Sql: "select col, sum(foo), weight_string(col) from `user` group by col, weight_string(col) order by col asc",
BindVariables: map[string]*querypb.BindVariable{},
}}
for _, conn := range conns {
Expand Down Expand Up @@ -1709,7 +1709,7 @@ func TestStreamSelectScatterAggregate(t *testing.T) {
require.NoError(t, err)

wantQueries := []*querypb.BoundQuery{{
Sql: "select col, sum(foo), weight_string(col) from `user` group by col order by col asc",
Sql: "select col, sum(foo), weight_string(col) from `user` group by col, weight_string(col) order by col asc",
BindVariables: map[string]*querypb.BindVariable{},
}}
for _, conn := range conns {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/concatenate.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (c *concatenate) SupplyCol(col *sqlparser.ColName) (rc *resultColumn, colNu
panic("implement me")
}

func (c *concatenate) SupplyWeightString(colNumber int) (weightcolNumber int, err error) {
func (c *concatenate) SupplyWeightString(colNumber int, alsoAddToGroupBy bool) (weightcolNumber int, err error) {
panic("implement me")
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/concatenateGen4.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *concatenateGen4) SupplyCol(col *sqlparser.ColName) (rc *resultColumn, c
}

// SupplyWeightString implements the logicalPlan interface
func (c *concatenateGen4) SupplyWeightString(colNumber int) (weightcolNumber int, err error) {
func (c *concatenateGen4) SupplyWeightString(colNumber int, alsoAddToGroupBy bool) (weightcolNumber int, err error) {
panic("implement me")
}

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/grouping.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func planGroupBy(pb *primitiveBuilder, input logicalPlan, groupBy sqlparser.Grou
default:
return nil, vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: in scatter query: only simple references allowed")
}
node.eaggr.GroupByKeys = append(node.eaggr.GroupByKeys, &engine.GroupByParams{KeyCol: colNumber, WeightStringCol: -1})
node.eaggr.GroupByKeys = append(node.eaggr.GroupByKeys, &engine.GroupByParams{KeyCol: colNumber, WeightStringCol: -1, FromGroupBy: true})
}
// Append the distinct aggregate if any.
if node.extraDistinct != nil {
Expand Down Expand Up @@ -110,7 +110,7 @@ func planDistinct(input logicalPlan) (logicalPlan, error) {
if rc.column.Origin() == node {
return newDistinct(node), nil
}
node.eaggr.GroupByKeys = append(node.eaggr.GroupByKeys, &engine.GroupByParams{KeyCol: i, WeightStringCol: -1})
node.eaggr.GroupByKeys = append(node.eaggr.GroupByKeys, &engine.GroupByParams{KeyCol: i, WeightStringCol: -1, FromGroupBy: false})
}
newInput, err := planDistinct(node.input)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/planbuilder/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,20 +210,20 @@ func (jb *join) SupplyCol(col *sqlparser.ColName) (rc *resultColumn, colNumber i
}

// SupplyWeightString implements the logicalPlan interface
func (jb *join) SupplyWeightString(colNumber int) (weightcolNumber int, err error) {
func (jb *join) SupplyWeightString(colNumber int, alsoAddToGroupBy bool) (weightcolNumber int, err error) {
rc := jb.resultColumns[colNumber]
if weightcolNumber, ok := jb.weightStrings[rc]; ok {
return weightcolNumber, nil
}
routeNumber := rc.column.Origin().Order()
if jb.isOnLeft(routeNumber) {
sourceCol, err := jb.Left.SupplyWeightString(-jb.ejoin.Cols[colNumber] - 1)
sourceCol, err := jb.Left.SupplyWeightString(-jb.ejoin.Cols[colNumber]-1, alsoAddToGroupBy)
if err != nil {
return 0, err
}
jb.ejoin.Cols = append(jb.ejoin.Cols, -sourceCol-1)
} else {
sourceCol, err := jb.Right.SupplyWeightString(jb.ejoin.Cols[colNumber] - 1)
sourceCol, err := jb.Right.SupplyWeightString(jb.ejoin.Cols[colNumber]-1, alsoAddToGroupBy)
if err != nil {
return 0, err
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/joinGen4.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (j *joinGen4) SupplyCol(col *sqlparser.ColName) (rc *resultColumn, colNumbe
}

// SupplyWeightString implements the logicalPlan interface
func (j *joinGen4) SupplyWeightString(colNumber int) (weightcolNumber int, err error) {
func (j *joinGen4) SupplyWeightString(colNumber int, alsoAddToGroupBy bool) (weightcolNumber int, err error) {
panic("implement me")
}

Expand Down
14 changes: 8 additions & 6 deletions go/vt/vtgate/planbuilder/logical_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type logicalPlan interface {

// SupplyWeightString must supply a weight_string expression of the
// specified column. It returns an error if we cannot supply a weight column for it.
SupplyWeightString(colNumber int) (weightcolNumber int, err error)
SupplyWeightString(colNumber int, alsoAddToGroupBy bool) (weightcolNumber int, err error)

// Primitive returns the underlying primitive.
// This function should only be called after Wireup is finished.
Expand Down Expand Up @@ -170,8 +170,8 @@ func (bc *logicalPlanCommon) SupplyCol(col *sqlparser.ColName) (rc *resultColumn
return bc.input.SupplyCol(col)
}

func (bc *logicalPlanCommon) SupplyWeightString(colNumber int) (weightcolNumber int, err error) {
return bc.input.SupplyWeightString(colNumber)
func (bc *logicalPlanCommon) SupplyWeightString(colNumber int, alsoAddToGroupBy bool) (weightcolNumber int, err error) {
return bc.input.SupplyWeightString(colNumber, alsoAddToGroupBy)
}

// Rewrite implements the logicalPlan interface
Expand Down Expand Up @@ -239,12 +239,14 @@ func (rsb *resultsBuilder) SupplyCol(col *sqlparser.ColName) (rc *resultColumn,
return rc, colNumber
}

func (rsb *resultsBuilder) SupplyWeightString(colNumber int) (weightcolNumber int, err error) {
func (rsb *resultsBuilder) SupplyWeightString(colNumber int, alsoAddToGroupBy bool) (weightcolNumber int, err error) {
rc := rsb.resultColumns[colNumber]
if weightcolNumber, ok := rsb.weightStrings[rc]; ok {
var ok bool
weightcolNumber, ok = rsb.weightStrings[rc]
if !alsoAddToGroupBy && ok {
return weightcolNumber, nil
}
weightcolNumber, err = rsb.input.SupplyWeightString(colNumber)
weightcolNumber, err = rsb.input.SupplyWeightString(colNumber, alsoAddToGroupBy)
if err != nil {
return 0, nil
}
Expand Down
10 changes: 3 additions & 7 deletions go/vt/vtgate/planbuilder/memory_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type memorySort struct {
}

// newMemorySort builds a new memorySort.
func newMemorySort(plan logicalPlan, orderBy sqlparser.OrderBy) (*memorySort, error) {
func newMemorySort(plan logicalPlan, orderBy v3OrderBy) (*memorySort, error) {
eMemorySort := &engine.MemorySort{}
ms := &memorySort{
resultsBuilder: newResultsBuilder(plan, eMemorySort),
Expand Down Expand Up @@ -88,6 +88,7 @@ func newMemorySort(plan logicalPlan, orderBy sqlparser.OrderBy) (*memorySort, er
WeightStringCol: -1,
Desc: order.Direction == sqlparser.DescOrder,
StarColFixedIndex: colNumber,
FromGroupBy: order.fromGroupBy,
}
ms.eMemorySort.OrderBy = append(ms.eMemorySort.OrderBy, ob)
}
Expand Down Expand Up @@ -115,12 +116,7 @@ func (ms *memorySort) Wireup(plan logicalPlan, jt *jointab) error {
rc := ms.resultColumns[orderby.Col]
// Add a weight_string column if we know that the column is a textual column or if its type is unknown
if sqltypes.IsText(rc.column.typ) || rc.column.typ == sqltypes.Null {
// If a weight string was previously requested, reuse it.
if weightcolNumber, ok := ms.weightStrings[rc]; ok {
ms.eMemorySort.OrderBy[i].WeightStringCol = weightcolNumber
continue
}
weightcolNumber, err := ms.input.SupplyWeightString(orderby.Col)
weightcolNumber, err := ms.input.SupplyWeightString(orderby.Col, orderby.FromGroupBy)
if err != nil {
_, isUnsupportedErr := err.(UnsupportedSupplyWeightString)
if isUnsupportedErr {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/memory_sort_gen4.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (m *memorySortGen4) SupplyCol(col *sqlparser.ColName) (rc *resultColumn, co
panic("implement me")
}

func (m *memorySortGen4) SupplyWeightString(colNumber int) (weightcolNumber int, err error) {
func (m *memorySortGen4) SupplyWeightString(colNumber int, alsoAddToGroupBy bool) (weightcolNumber int, err error) {
panic("implement me")
}

Expand Down
7 changes: 1 addition & 6 deletions go/vt/vtgate/planbuilder/merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,8 @@ func (ms *mergeSort) Wireup(plan logicalPlan, jt *jointab) error {
rc := ms.resultColumns[orderby.Col]
// Add a weight_string column if we know that the column is a textual column or if its type is unknown
if sqltypes.IsText(rc.column.typ) || rc.column.typ == sqltypes.Null {
// If a weight string was previously requested, reuse it.
if colNumber, ok := ms.weightStrings[rc]; ok {
rb.eroute.OrderBy[i].WeightStringCol = colNumber
continue
}
var err error
rb.eroute.OrderBy[i].WeightStringCol, err = rb.SupplyWeightString(orderby.Col)
rb.eroute.OrderBy[i].WeightStringCol, err = rb.SupplyWeightString(orderby.Col, orderby.FromGroupBy)
if err != nil {
_, isUnsupportedErr := err.(UnsupportedSupplyWeightString)
if isUnsupportedErr {
Expand Down
10 changes: 3 additions & 7 deletions go/vt/vtgate/planbuilder/ordered_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
"fmt"
"strconv"

"vitess.io/vitess/go/sqltypes"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/vt/vtgate/semantics"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/engine"
)
Expand Down Expand Up @@ -314,12 +315,7 @@ func (oa *orderedAggregate) Wireup(plan logicalPlan, jt *jointab) error {
for i, gbk := range oa.eaggr.GroupByKeys {
rc := oa.resultColumns[gbk.KeyCol]
if sqltypes.IsText(rc.column.typ) {
if weightcolNumber, ok := oa.weightStrings[rc]; ok {
oa.eaggr.GroupByKeys[i].WeightStringCol = weightcolNumber
oa.eaggr.GroupByKeys[i].KeyCol = weightcolNumber
continue
}
weightcolNumber, err := oa.input.SupplyWeightString(gbk.KeyCol)
weightcolNumber, err := oa.input.SupplyWeightString(gbk.KeyCol, gbk.FromGroupBy)
if err != nil {
_, isUnsupportedErr := err.(UnsupportedSupplyWeightString)
if isUnsupportedErr {
Expand Down
35 changes: 25 additions & 10 deletions go/vt/vtgate/planbuilder/ordering.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@ import (
"vitess.io/vitess/go/vt/vtgate/engine"
)

func planOrdering(pb *primitiveBuilder, input logicalPlan, orderBy sqlparser.OrderBy) (logicalPlan, error) {
type v3Order struct {
*sqlparser.Order
fromGroupBy bool
}

type v3OrderBy []*v3Order

func planOrdering(pb *primitiveBuilder, input logicalPlan, orderBy v3OrderBy) (logicalPlan, error) {
switch node := input.(type) {
case *simpleProjection, *vindexFunc:
if len(orderBy) == 0 {
Expand Down Expand Up @@ -58,7 +65,7 @@ func planOrdering(pb *primitiveBuilder, input logicalPlan, orderBy sqlparser.Ord
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "[BUG] unreachable %T.ordering", input)
}

func planOAOrdering(pb *primitiveBuilder, orderBy sqlparser.OrderBy, oa *orderedAggregate) (logicalPlan, error) {
func planOAOrdering(pb *primitiveBuilder, orderBy v3OrderBy, oa *orderedAggregate) (logicalPlan, error) {
// The requested order must be such that the ordering can be done
// before the group by, which will allow us to push it down to the
// route. This is actually true in most use cases, except for situations
Expand All @@ -80,7 +87,7 @@ func planOAOrdering(pb *primitiveBuilder, orderBy sqlparser.OrderBy, oa *ordered
// referenced tracks the keys referenced by the order by clause.
referenced := make([]bool, len(oa.eaggr.GroupByKeys))
postSort := false
selOrderBy := make(sqlparser.OrderBy, 0, len(orderBy))
selOrderBy := make(v3OrderBy, 0, len(orderBy))
for _, order := range orderBy {
// Identify the order by column.
var orderByCol *column
Expand Down Expand Up @@ -112,6 +119,7 @@ func planOAOrdering(pb *primitiveBuilder, orderBy sqlparser.OrderBy, oa *ordered

found = true
referenced[j] = true
order.fromGroupBy = groupBy.FromGroupBy
selOrderBy = append(selOrderBy, order)
break
}
Expand All @@ -130,12 +138,18 @@ func planOAOrdering(pb *primitiveBuilder, orderBy sqlparser.OrderBy, oa *ordered
if err != nil {
return nil, vterrors.Wrapf(err, "generating order by clause")
}
selOrderBy = append(selOrderBy, &sqlparser.Order{Expr: col, Direction: sqlparser.AscOrder})
selOrderBy = append(selOrderBy, &v3Order{
Order: &sqlparser.Order{Expr: col, Direction: sqlparser.AscOrder},
fromGroupBy: groupByKey.FromGroupBy,
})
}

// Append the distinct aggregate if any.
if oa.extraDistinct != nil {
selOrderBy = append(selOrderBy, &sqlparser.Order{Expr: oa.extraDistinct, Direction: sqlparser.AscOrder})
selOrderBy = append(selOrderBy, &v3Order{
Order: &sqlparser.Order{Expr: oa.extraDistinct, Direction: sqlparser.AscOrder},
fromGroupBy: true,
})
}

// Push down the order by.
Expand All @@ -153,7 +167,7 @@ func planOAOrdering(pb *primitiveBuilder, orderBy sqlparser.OrderBy, oa *ordered
return oa, nil
}

func planJoinOrdering(pb *primitiveBuilder, orderBy sqlparser.OrderBy, node *join) (logicalPlan, error) {
func planJoinOrdering(pb *primitiveBuilder, orderBy v3OrderBy, node *join) (logicalPlan, error) {
isSpecial := false
switch len(orderBy) {
case 0:
Expand Down Expand Up @@ -228,7 +242,7 @@ func planJoinOrdering(pb *primitiveBuilder, orderBy sqlparser.OrderBy, node *joi
return node, nil
}

func planRouteOrdering(orderBy sqlparser.OrderBy, node *route) (logicalPlan, error) {
func planRouteOrdering(orderBy v3OrderBy, node *route) (logicalPlan, error) {
switch len(orderBy) {
case 0:
return node, nil
Expand All @@ -242,14 +256,14 @@ func planRouteOrdering(orderBy sqlparser.OrderBy, node *route) (logicalPlan, err
}
}
if isSpecial {
node.Select.AddOrder(orderBy[0])
node.Select.AddOrder(orderBy[0].Order)
return node, nil
}
}

if node.isSingleShard() {
for _, order := range orderBy {
node.Select.AddOrder(order)
node.Select.AddOrder(order.Order)
}
return node, nil
}
Expand Down Expand Up @@ -320,10 +334,11 @@ func planRouteOrdering(orderBy sqlparser.OrderBy, node *route) (logicalPlan, err
WeightStringCol: -1,
Desc: order.Direction == sqlparser.DescOrder,
StarColFixedIndex: starColFixedIndex,
FromGroupBy: order.fromGroupBy,
}
node.eroute.OrderBy = append(node.eroute.OrderBy, ob)

node.Select.AddOrder(order)
node.Select.AddOrder(order.Order)
}
return newMergeSort(node), nil
}
6 changes: 5 additions & 1 deletion go/vt/vtgate/planbuilder/postprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ func (pb *primitiveBuilder) pushOrderBy(orderBy sqlparser.OrderBy) error {
if err := pb.st.ResolveSymbols(orderBy); err != nil {
return err
}
plan, err := planOrdering(pb, pb.plan, orderBy)
var v3OrderBylist v3OrderBy
for _, order := range orderBy {
v3OrderBylist = append(v3OrderBylist, &v3Order{Order: order})
}
plan, err := planOrdering(pb, pb.plan, v3OrderBylist)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/pullout_subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func (ps *pulloutSubquery) SupplyCol(col *sqlparser.ColName) (rc *resultColumn,
}

// SupplyWeightString implements the logicalPlan interface
func (ps *pulloutSubquery) SupplyWeightString(colNumber int) (weightcolNumber int, err error) {
return ps.underlying.SupplyWeightString(colNumber)
func (ps *pulloutSubquery) SupplyWeightString(colNumber int, alsoAddToGroupBy bool) (weightcolNumber int, err error) {
return ps.underlying.SupplyWeightString(colNumber, alsoAddToGroupBy)
}

// Rewrite implements the logicalPlan interface
Expand Down
Loading