diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index 6afffa344d7..dfcfcbc4947 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -705,7 +705,7 @@ func (cluster *LocalProcessCluster) NewVtgateInstance() *VtgateProcess { cluster.Cell, cluster.Cell, cluster.Hostname, - "PRIMARY,REPLICA", + "PRIMARY", cluster.TopoProcess.Port, cluster.TmpDirectory, cluster.VtGateExtraArgs, diff --git a/go/test/endtoend/utils/utils.go b/go/test/endtoend/utils/utils.go index 45ddce217b1..2e44292504d 100644 --- a/go/test/endtoend/utils/utils.go +++ b/go/test/endtoend/utils/utils.go @@ -220,7 +220,7 @@ func AssertMatchesWithTimeout(t *testing.T, conn *mysql.Conn, query, expected st } // WaitForAuthoritative waits for a table to become authoritative -func WaitForAuthoritative(t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl string) error { +func WaitForAuthoritative(t *testing.T, ks, tbl string, readVSchema func() (*interface{}, error)) error { timeout := time.After(10 * time.Second) for { select { @@ -228,7 +228,7 @@ func WaitForAuthoritative(t *testing.T, vtgateProcess cluster.VtgateProcess, ks, return fmt.Errorf("schema tracking didn't mark table t2 as authoritative until timeout") default: time.Sleep(1 * time.Second) - res, err := vtgateProcess.ReadVSchema() + res, err := readVSchema() require.NoError(t, err, res) t2Map := getTableT2Map(res, ks, tbl) authoritative, fieldPresent := t2Map["column_list_authoritative"] diff --git a/go/test/endtoend/vtgate/gen4/gen4_test.go b/go/test/endtoend/vtgate/gen4/gen4_test.go index c1521012909..77b7daea05f 100644 --- a/go/test/endtoend/vtgate/gen4/gen4_test.go +++ b/go/test/endtoend/vtgate/gen4/gen4_test.go @@ -430,9 +430,9 @@ func TestOuterJoin(t *testing.T) { } func TestUsingJoin(t *testing.T) { - require.NoError(t, utils.WaitForAuthoritative(t, clusterInstance.VtgateProcess, shardedKs, "t1")) - require.NoError(t, utils.WaitForAuthoritative(t, clusterInstance.VtgateProcess, shardedKs, "t2")) - require.NoError(t, utils.WaitForAuthoritative(t, clusterInstance.VtgateProcess, shardedKs, "t3")) + require.NoError(t, utils.WaitForAuthoritative(t, shardedKs, "t1", clusterInstance.VtgateProcess.ReadVSchema)) + require.NoError(t, utils.WaitForAuthoritative(t, shardedKs, "t2", clusterInstance.VtgateProcess.ReadVSchema)) + require.NoError(t, utils.WaitForAuthoritative(t, shardedKs, "t3", clusterInstance.VtgateProcess.ReadVSchema)) mcmp, closer := start(t) defer closer() diff --git a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go index 45f5eeb5164..a714550fb50 100644 --- a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go +++ b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go @@ -33,7 +33,7 @@ func start(t *testing.T) (utils.MySQLCompare, func()) { deleteAll := func() { _, _ = utils.ExecAllowError(t, mcmp.VtConn, "set workload = oltp") - tables := []string{"t9", "aggr_test", "t3", "t7_xxhash", "aggr_test_dates", "t7_xxhash_idx", "t1", "t2"} + tables := []string{"t9", "aggr_test", "t3", "t7_xxhash", "aggr_test_dates", "t7_xxhash_idx", "t1", "t2", "t10"} for _, table := range tables { _, _ = mcmp.ExecAndIgnore("delete from " + table) } @@ -81,20 +81,6 @@ func TestGroupBy(t *testing.T) { mcmp.AssertMatches("select /*vt+ PLANNER=Gen4 */ id6+id7, count(*) k from t3 group by id6+id7 order by k", `[[INT64(9) INT64(1)] [INT64(6) INT64(2)] [INT64(3) INT64(3)]]`) } -func TestDistinct(t *testing.T) { - mcmp, closer := start(t) - defer closer() - mcmp.Exec("insert into t3(id5,id6,id7) values(1,3,3), (2,3,4), (3,3,6), (4,5,7), (5,5,6)") - mcmp.Exec("insert into t7_xxhash(uid,phone) values('1',4), ('2',4), ('3',3), ('4',1), ('5',1)") - mcmp.Exec("insert into aggr_test(id, val1, val2) values(1,'a',1), (2,'A',1), (3,'b',1), (4,'c',3), (5,'c',4)") - mcmp.Exec("insert into aggr_test(id, val1, val2) values(6,'d',null), (7,'e',null), (8,'E',1)") - mcmp.AssertMatches("select distinct val2, count(*) from aggr_test group by val2", `[[NULL INT64(2)] [INT64(1) INT64(4)] [INT64(3) INT64(1)] [INT64(4) INT64(1)]]`) - mcmp.AssertMatches("select distinct id6 from t3 join t7_xxhash on t3.id5 = t7_xxhash.phone", `[[INT64(3)] [INT64(5)]]`) - mcmp.Exec("delete from t3") - mcmp.Exec("delete from t7_xxhash") - mcmp.Exec("delete from aggr_test") -} - func TestEqualFilterOnScatter(t *testing.T) { mcmp, closer := start(t) defer closer() @@ -438,3 +424,31 @@ func TestAggregationRandomOnAnAggregatedValue(t *testing.T) { mcmp.AssertMatchesNoOrder("select /*vt+ PLANNER=gen4 */ A.a, A.b, (A.a / A.b) as d from (select sum(a) as a, sum(b) as b from t10 where a = 100) A;", `[[DECIMAL(100) DECIMAL(10) DECIMAL(10.0000)]]`) } + +func TestBuggyQueries(t *testing.T) { + // These queries have been found to be producing the wrong results by the query fuzzer + // Adding them as end2end tests to make sure we never get them wrong again + mcmp, closer := start(t) + defer closer() + + mcmp.Exec("insert into t10(k, a, b) values (0, 100, 10), (10, 200, 20), (20, null, null)") + + mcmp.AssertMatches("select /*vt+ PLANNER=Gen4 */ sum(t1.a) from t10 as t1, t10 as t2", + `[[DECIMAL(900)]]`) + + mcmp.AssertMatches("select /*vt+ PLANNER=gen4 */t1.a, sum(t1.a), count(*), t1.a, sum(t1.a), count(*) from t10 as t1, t10 as t2 group by t1.a", + "[[NULL NULL INT64(3) NULL NULL INT64(3)] "+ + "[INT32(100) DECIMAL(300) INT64(3) INT32(100) DECIMAL(300) INT64(3)] "+ + "[INT32(200) DECIMAL(600) INT64(3) INT32(200) DECIMAL(600) INT64(3)]]") +} + +func TestMinMaxAcrossJoins(t *testing.T) { + mcmp, closer := start(t) + defer closer() + mcmp.Exec("insert into t1(t1_id, name, value, shardKey) values (1, 'name 1', 'value 1', 1), (2, 'name 2', 'value 2', 2)") + mcmp.Exec("insert into t2(id, shardKey) values (1, 10), (2, 20)") + + mcmp.AssertMatchesNoOrder( + `SELECT /*vt+ PLANNER=gen4 */ t1.name, max(t1.shardKey), t2.shardKey, min(t2.id) FROM t1 JOIN t2 ON t1.t1_id != t2.shardKey GROUP BY t1.name, t2.shardKey`, + `[[VARCHAR("name 2") INT64(2) INT64(10) INT64(1)] [VARCHAR("name 1") INT64(1) INT64(10) INT64(1)] [VARCHAR("name 2") INT64(2) INT64(20) INT64(2)] [VARCHAR("name 1") INT64(1) INT64(20) INT64(2)]]`) +} diff --git a/go/test/endtoend/vtgate/queries/aggregation/distinct_test.go b/go/test/endtoend/vtgate/queries/aggregation/distinct_test.go new file mode 100644 index 00000000000..a09808bbf47 --- /dev/null +++ b/go/test/endtoend/vtgate/queries/aggregation/distinct_test.go @@ -0,0 +1,54 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregation + +import ( + "testing" + + "vitess.io/vitess/go/test/endtoend/utils" +) + +func TestDistinct(t *testing.T) { + mcmp, closer := start(t) + defer closer() + mcmp.Exec("insert into t3(id5,id6,id7) values(1,3,3), (2,3,4), (3,3,6), (4,5,7), (5,5,6)") + mcmp.Exec("insert into t7_xxhash(uid,phone) values('1',4), ('2',4), ('3',3), ('4',1), ('5',1)") + mcmp.Exec("insert into aggr_test(id, val1, val2) values(1,'a',1), (2,'A',1), (3,'b',1), (4,'c',3), (5,'c',4)") + mcmp.Exec("insert into aggr_test(id, val1, val2) values(6,'d',null), (7,'e',null), (8,'E',1)") + mcmp.AssertMatches("select distinct val2, count(*) from aggr_test group by val2", `[[NULL INT64(2)] [INT64(1) INT64(4)] [INT64(3) INT64(1)] [INT64(4) INT64(1)]]`) + mcmp.AssertMatches("select distinct id6 from t3 join t7_xxhash on t3.id5 = t7_xxhash.phone", `[[INT64(3)] [INT64(5)]]`) +} + +func TestDistinctIt(t *testing.T) { + // tests more variations of DISTINCT + mcmp, closer := start(t) + defer closer() + + mcmp.Exec("insert into aggr_test(id, val1, val2) values(1,'a',1), (2,'A',1), (3,'b',1), (4,'c',3), (5,'c',4)") + mcmp.Exec("insert into aggr_test(id, val1, val2) values(6,'d',null), (7,'e',null), (8,'E',1)") + + mcmp.AssertMatchesNoOrder("select distinct val1 from aggr_test", `[[VARCHAR("c")] [VARCHAR("d")] [VARCHAR("e")] [VARCHAR("a")] [VARCHAR("b")]]`) + mcmp.AssertMatchesNoOrder("select distinct val2 from aggr_test", `[[INT64(1)] [INT64(4)] [INT64(3)] [NULL]]`) + mcmp.AssertMatchesNoOrder("select distinct id from aggr_test", `[[INT64(1)] [INT64(2)] [INT64(3)] [INT64(5)] [INT64(4)] [INT64(6)] [INT64(7)] [INT64(8)]]`) + + if utils.BinaryIsAtVersion(17, "vtgate") { + mcmp.AssertMatches("select /*vt+ PLANNER=Gen4 */ distinct val1 from aggr_test order by val1 desc", `[[VARCHAR("e")] [VARCHAR("d")] [VARCHAR("c")] [VARCHAR("b")] [VARCHAR("a")]]`) + mcmp.AssertMatchesNoOrder("select /*vt+ PLANNER=Gen4 */ distinct val1, count(*) from aggr_test group by val1", `[[VARCHAR("a") INT64(2)] [VARCHAR("b") INT64(1)] [VARCHAR("c") INT64(2)] [VARCHAR("d") INT64(1)] [VARCHAR("e") INT64(2)]]`) + mcmp.AssertMatchesNoOrder("select /*vt+ PLANNER=Gen4 */ distinct val1+val2 from aggr_test", `[[NULL] [FLOAT64(1)] [FLOAT64(3)] [FLOAT64(4)]]`) + mcmp.AssertMatchesNoOrder("select /*vt+ PLANNER=Gen4 */ distinct count(*) from aggr_test group by val1", `[[INT64(2)] [INT64(1)]]`) + } +} diff --git a/go/test/endtoend/vtgate/queries/foundrows/found_rows_test.go b/go/test/endtoend/vtgate/queries/foundrows/found_rows_test.go index 6c56bd19991..f52e2eff532 100644 --- a/go/test/endtoend/vtgate/queries/foundrows/found_rows_test.go +++ b/go/test/endtoend/vtgate/queries/foundrows/found_rows_test.go @@ -44,7 +44,7 @@ func TestFoundRows(t *testing.T) { // Wait for schema tracking to run and mark t2 as authoritative before we try out the queries. // Some of the queries depend on schema tracking to run successfully to be able to replace the StarExpr // in the select clause with the definitive column list. - err = utils.WaitForAuthoritative(t, clusterInstance.VtgateProcess, keyspaceName, "t2") + err = utils.WaitForAuthoritative(t, keyspaceName, "t2", clusterInstance.VtgateProcess.ReadVSchema) require.NoError(t, err) runTests := func(workload string) { mcmp.AssertFoundRowsValue("select * from t2", workload, 5) diff --git a/go/vt/vtgate/endtoend/last_insert_id_test.go b/go/vt/vtgate/endtoend/last_insert_id_test.go index 9a467c089b8..6d841fadd07 100644 --- a/go/vt/vtgate/endtoend/last_insert_id_test.go +++ b/go/vt/vtgate/endtoend/last_insert_id_test.go @@ -21,15 +21,18 @@ import ( "fmt" "testing" - "vitess.io/vitess/go/vt/vtgate/evalengine" - "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/utils" + "vitess.io/vitess/go/vt/vtgate/evalengine" ) func TestLastInsertId(t *testing.T) { + require.NoError(t, + utils.WaitForAuthoritative(t, "ks", "t1_last_insert_id", cluster.VTProcess().ReadVSchema)) + ctx := context.Background() conn, err := mysql.Connect(ctx, &vtParams) require.NoError(t, err) @@ -53,6 +56,9 @@ func TestLastInsertId(t *testing.T) { } func TestLastInsertIdWithRollback(t *testing.T) { + require.NoError(t, + utils.WaitForAuthoritative(t, "ks", "t1_last_insert_id", cluster.VTProcess().ReadVSchema)) + ctx := context.Background() conn, err := mysql.Connect(ctx, &vtParams) require.NoError(t, err) diff --git a/go/vt/vtgate/endtoend/main_test.go b/go/vt/vtgate/endtoend/main_test.go index 1bf13ceadb5..08aae25420e 100644 --- a/go/vt/vtgate/endtoend/main_test.go +++ b/go/vt/vtgate/endtoend/main_test.go @@ -18,6 +18,7 @@ package endtoend import ( "context" + _ "embed" "fmt" "os" "testing" @@ -25,10 +26,9 @@ import ( _flag "vitess.io/vitess/go/internal/flag" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/vttest" - vschemapb "vitess.io/vitess/go/vt/proto/vschema" vttestpb "vitess.io/vitess/go/vt/proto/vttest" + "vitess.io/vitess/go/vt/vttest" ) var ( @@ -37,82 +37,8 @@ var ( mysqlParams mysql.ConnParams grpcAddress string - schema = ` -create table t1( - id1 bigint, - id2 bigint, - primary key(id1) -) Engine=InnoDB; - -create table t1_copy_basic( - id1 bigint, - id2 bigint, - primary key(id1) -) Engine=InnoDB; - -create table t1_copy_all( - id1 bigint, - id2 bigint, - primary key(id1) -) Engine=InnoDB; - -create table t1_copy_resume( - id1 bigint, - id2 bigint, - primary key(id1) -) Engine=InnoDB; - -create table t1_id2_idx( - id2 bigint, - keyspace_id varbinary(10), - primary key(id2) -) Engine=InnoDB; - -create table vstream_test( - id bigint, - val bigint, - primary key(id) -) Engine=InnoDB; - -create table aggr_test( - id bigint, - val1 varchar(16), - val2 bigint, - primary key(id) -) Engine=InnoDB; - -create table t2( - id3 bigint, - id4 bigint, - primary key(id3) -) Engine=InnoDB; - -create table t2_id4_idx( - id bigint not null auto_increment, - id4 bigint, - id3 bigint, - primary key(id), - key idx_id4(id4) -) Engine=InnoDB; - -create table t1_last_insert_id( - id bigint not null auto_increment, - id1 bigint, - primary key(id) -) Engine=InnoDB; - -create table t1_row_count( - id bigint not null, - id1 bigint, - primary key(id) -) Engine=InnoDB; - -create table t1_sharded( - id1 bigint, - id2 bigint, - primary key(id1) -) Engine=InnoDB; -` + //go:embed schema.sql + Schema string vschema = &vschemapb.Keyspace{ Sharded: true, @@ -281,7 +207,7 @@ func TestMain(m *testing.M) { }, }, } - if err := cfg.InitSchemas("ks", schema, vschema); err != nil { + if err := cfg.InitSchemas("ks", Schema, vschema); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.RemoveAll(cfg.SchemaDir) return 1 diff --git a/go/vt/vtgate/endtoend/schema.sql b/go/vt/vtgate/endtoend/schema.sql new file mode 100644 index 00000000000..5fb1f52224f --- /dev/null +++ b/go/vt/vtgate/endtoend/schema.sql @@ -0,0 +1,74 @@ +create table t1( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; + +create table t1_copy_basic( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; + +create table t1_copy_all( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; + +create table t1_copy_resume( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; + +create table t1_id2_idx( + id2 bigint, + keyspace_id varbinary(10), + primary key(id2) +) Engine=InnoDB; + +create table vstream_test( + id bigint, + val bigint, + primary key(id) +) Engine=InnoDB; + +create table aggr_test( + id bigint, + val1 varchar(16), + val2 bigint, + primary key(id) +) Engine=InnoDB; + +create table t2( + id3 bigint, + id4 bigint, + primary key(id3) +) Engine=InnoDB; + +create table t2_id4_idx( + id bigint not null auto_increment, + id4 bigint, + id3 bigint, + primary key(id), + key idx_id4(id4) +) Engine=InnoDB; + +create table t1_last_insert_id( + id bigint not null auto_increment, + id1 bigint, + primary key(id) +) Engine=InnoDB; + +create table t1_row_count( + id bigint not null, + id1 bigint, + primary key(id) +) Engine=InnoDB; + +create table t1_sharded( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; diff --git a/go/vt/vtgate/engine/cached_size.go b/go/vt/vtgate/engine/cached_size.go index 9370ea42ca7..0c8d2d18426 100644 --- a/go/vt/vtgate/engine/cached_size.go +++ b/go/vt/vtgate/engine/cached_size.go @@ -588,8 +588,6 @@ func (cached *OnlineDDL) CachedSize(alloc bool) int64 { } return size } - -//go:nocheckptr func (cached *OrderedAggregate) CachedSize(alloc bool) int64 { if cached == nil { return int64(0) @@ -612,17 +610,6 @@ func (cached *OrderedAggregate) CachedSize(alloc bool) int64 { size += elem.CachedSize(true) } } - // field Collations map[int]vitess.io/vitess/go/mysql/collations.ID - if cached.Collations != nil { - size += int64(48) - hmap := reflect.ValueOf(cached.Collations) - numBuckets := int(math.Pow(2, float64((*(*uint8)(unsafe.Pointer(hmap.Pointer() + uintptr(9))))))) - numOldBuckets := (*(*uint16)(unsafe.Pointer(hmap.Pointer() + uintptr(10)))) - size += hack.RuntimeAllocSize(int64(numOldBuckets * 96)) - if len(cached.Collations) > 0 || numBuckets > 1 { - size += hack.RuntimeAllocSize(int64(numBuckets * 96)) - } - } // field Input vitess.io/vitess/go/vt/vtgate/engine.Primitive if cc, ok := cached.Input.(cachedObject); ok { size += cc.CachedSize(true) @@ -898,8 +885,6 @@ func (cached *SQLCalcFoundRows) CachedSize(alloc bool) int64 { } return size } - -//go:nocheckptr func (cached *ScalarAggregate) CachedSize(alloc bool) int64 { if cached == nil { return int64(0) @@ -915,17 +900,6 @@ func (cached *ScalarAggregate) CachedSize(alloc bool) int64 { size += elem.CachedSize(true) } } - // field Collations map[int]vitess.io/vitess/go/mysql/collations.ID - if cached.Collations != nil { - size += int64(48) - hmap := reflect.ValueOf(cached.Collations) - numBuckets := int(math.Pow(2, float64((*(*uint8)(unsafe.Pointer(hmap.Pointer() + uintptr(9))))))) - numOldBuckets := (*(*uint16)(unsafe.Pointer(hmap.Pointer() + uintptr(10)))) - size += hack.RuntimeAllocSize(int64(numOldBuckets * 96)) - if len(cached.Collations) > 0 || numBuckets > 1 { - size += hack.RuntimeAllocSize(int64(numBuckets * 96)) - } - } // field Input vitess.io/vitess/go/vt/vtgate/engine.Primitive if cc, ok := cached.Input.(cachedObject); ok { size += cc.CachedSize(true) diff --git a/go/vt/vtgate/engine/distinct.go b/go/vt/vtgate/engine/distinct.go index bc3634d630b..fcd74ce5eeb 100644 --- a/go/vt/vtgate/engine/distinct.go +++ b/go/vt/vtgate/engine/distinct.go @@ -35,7 +35,7 @@ type ( Distinct struct { Source Primitive CheckCols []CheckCol - Truncate bool + Truncate int } CheckCol struct { Col int @@ -189,8 +189,8 @@ func (d *Distinct) TryExecute(ctx context.Context, vcursor VCursor, bindVars map result.Rows = append(result.Rows, row) } } - if d.Truncate { - return result.Truncate(len(d.CheckCols)), nil + if d.Truncate > 0 { + return result.Truncate(d.Truncate), nil } return result, err } @@ -260,8 +260,8 @@ func (d *Distinct) description() PrimitiveDescription { other["Collations"] = colls } - if d.Truncate { - other["ResultColumns"] = len(d.CheckCols) + if d.Truncate > 0 { + other["ResultColumns"] = d.Truncate } return PrimitiveDescription{ Other: other, diff --git a/go/vt/vtgate/engine/distinct_test.go b/go/vt/vtgate/engine/distinct_test.go index 5e39d2c4425..2c379da40e9 100644 --- a/go/vt/vtgate/engine/distinct_test.go +++ b/go/vt/vtgate/engine/distinct_test.go @@ -96,7 +96,6 @@ func TestDistinct(t *testing.T) { distinct := &Distinct{ Source: &fakePrimitive{results: []*sqltypes.Result{tc.inputs}}, CheckCols: checkCols, - Truncate: false, } qr, err := distinct.TryExecute(context.Background(), &noopVCursor{}, nil, true) @@ -145,7 +144,7 @@ func TestWeightStringFallBack(t *testing.T) { distinct := &Distinct{ Source: &fakePrimitive{results: []*sqltypes.Result{input}}, CheckCols: checkCols, - Truncate: true, + Truncate: 1, } qr, err := distinct.TryExecute(context.Background(), &noopVCursor{}, nil, true) diff --git a/go/vt/vtgate/engine/ordered_aggregate.go b/go/vt/vtgate/engine/ordered_aggregate.go index 4b7559ca740..ee7fb40e8aa 100644 --- a/go/vt/vtgate/engine/ordered_aggregate.go +++ b/go/vt/vtgate/engine/ordered_aggregate.go @@ -66,10 +66,6 @@ type OrderedAggregate struct { // from the result received. If 0, no truncation happens. TruncateColumnCount int `json:",omitempty"` - // Collations stores the collation ID per column offset. - // It is used for grouping keys and distinct aggregate functions - Collations map[int]collations.ID - // Input is the primitive that will feed into this Primitive. Input Primitive } @@ -193,13 +189,13 @@ func (oa *OrderedAggregate) execute(ctx context.Context, vcursor VCursor, bindVa current, curDistincts = convertRow(row, oa.PreProcess, oa.Aggregates, oa.AggrOnEngine) continue } - equal, err := oa.keysEqual(current, row, oa.Collations) + equal, err := oa.keysEqual(current, row) if err != nil { return nil, err } if equal { - current, curDistincts, err = merge(result.Fields, current, row, curDistincts, oa.Collations, oa.Aggregates) + current, curDistincts, err = merge(result.Fields, current, row, curDistincts, oa.Aggregates) if err != nil { return nil, err } @@ -243,13 +239,13 @@ func (oa *OrderedAggregate) TryStreamExecute(ctx context.Context, vcursor VCurso continue } - equal, err := oa.keysEqual(current, row, oa.Collations) + equal, err := oa.keysEqual(current, row) if err != nil { return err } if equal { - current, curDistincts, err = merge(fields, current, row, curDistincts, oa.Collations, oa.Aggregates) + current, curDistincts, err = merge(fields, current, row, curDistincts, oa.Aggregates) if err != nil { return err } @@ -380,17 +376,17 @@ func (oa *OrderedAggregate) NeedsTransaction() bool { return oa.Input.NeedsTransaction() } -func (oa *OrderedAggregate) keysEqual(row1, row2 []sqltypes.Value, colls map[int]collations.ID) (bool, error) { - for _, key := range oa.GroupByKeys { - cmp, err := evalengine.NullsafeCompare(row1[key.KeyCol], row2[key.KeyCol], colls[key.KeyCol]) +func (oa *OrderedAggregate) keysEqual(row1, row2 []sqltypes.Value) (bool, error) { + for _, gb := range oa.GroupByKeys { + cmp, err := evalengine.NullsafeCompare(row1[gb.KeyCol], row2[gb.KeyCol], gb.CollationID) if err != nil { _, isComparisonErr := err.(evalengine.UnsupportedComparisonError) _, isCollationErr := err.(evalengine.UnsupportedCollationError) - if !isComparisonErr && !isCollationErr || key.WeightStringCol == -1 { + if !isComparisonErr && !isCollationErr || gb.WeightStringCol == -1 { return false, err } - key.KeyCol = key.WeightStringCol - cmp, err = evalengine.NullsafeCompare(row1[key.WeightStringCol], row2[key.WeightStringCol], colls[key.KeyCol]) + gb.KeyCol = gb.WeightStringCol + cmp, err = evalengine.NullsafeCompare(row1[gb.WeightStringCol], row2[gb.WeightStringCol], gb.CollationID) if err != nil { return false, err } @@ -406,7 +402,6 @@ func merge( fields []*querypb.Field, row1, row2 []sqltypes.Value, curDistincts []sqltypes.Value, - colls map[int]collations.ID, aggregates []*AggregateParams, ) ([]sqltypes.Value, []sqltypes.Value, error) { result := sqltypes.CopyRow(row1) @@ -415,7 +410,7 @@ func merge( if row2[aggr.KeyCol].IsNull() { continue } - cmp, err := evalengine.NullsafeCompare(curDistincts[index], row2[aggr.KeyCol], colls[aggr.KeyCol]) + cmp, err := evalengine.NullsafeCompare(curDistincts[index], row2[aggr.KeyCol], aggr.CollationID) if err != nil { return nil, nil, err } @@ -444,9 +439,9 @@ func merge( } result[aggr.Col], err = evalengine.NullSafeAdd(value, v2, fields[aggr.Col].Type) case AggregateMin: - result[aggr.Col], err = evalengine.Min(row1[aggr.Col], row2[aggr.Col], colls[aggr.Col]) + result[aggr.Col], err = evalengine.Min(row1[aggr.Col], row2[aggr.Col], aggr.CollationID) case AggregateMax: - result[aggr.Col], err = evalengine.Max(row1[aggr.Col], row2[aggr.Col], colls[aggr.Col]) + result[aggr.Col], err = evalengine.Max(row1[aggr.Col], row2[aggr.Col], aggr.CollationID) case AggregateCountDistinct: result[aggr.Col], err = evalengine.NullSafeAdd(row1[aggr.Col], countOne, OpcodeType[aggr.Opcode]) case AggregateSumDistinct: diff --git a/go/vt/vtgate/engine/ordered_aggregate_test.go b/go/vt/vtgate/engine/ordered_aggregate_test.go index c522cd03f69..e915c6fd877 100644 --- a/go/vt/vtgate/engine/ordered_aggregate_test.go +++ b/go/vt/vtgate/engine/ordered_aggregate_test.go @@ -668,13 +668,13 @@ func TestMerge(t *testing.T) { "1|3|2.8|2|bc", ) - merged, _, err := merge(fields, r.Rows[0], r.Rows[1], nil, nil, oa.Aggregates) + merged, _, err := merge(fields, r.Rows[0], r.Rows[1], nil, oa.Aggregates) assert.NoError(err) want := sqltypes.MakeTestResult(fields, "1|5|6.0|2|bc").Rows[0] assert.Equal(want, merged) // swap and retry - merged, _, err = merge(fields, r.Rows[1], r.Rows[0], nil, nil, oa.Aggregates) + merged, _, err = merge(fields, r.Rows[1], r.Rows[0], nil, oa.Aggregates) assert.NoError(err) assert.Equal(want, merged) } @@ -1018,7 +1018,6 @@ func TestOrderedAggregateCollate(t *testing.T) { }}, GroupByKeys: []*GroupByParams{{KeyCol: 0, CollationID: collationID}}, Input: fp, - Collations: map[int]collations.ID{0: collationID}, } result, err := oa.TryExecute(context.Background(), &noopVCursor{}, nil, false) @@ -1060,7 +1059,6 @@ func TestOrderedAggregateCollateAS(t *testing.T) { Col: 1, }}, GroupByKeys: []*GroupByParams{{KeyCol: 0, CollationID: collationID}}, - Collations: map[int]collations.ID{0: collationID}, Input: fp, } @@ -1105,7 +1103,6 @@ func TestOrderedAggregateCollateKS(t *testing.T) { Col: 1, }}, GroupByKeys: []*GroupByParams{{KeyCol: 0, CollationID: collationID}}, - Collations: map[int]collations.ID{0: collationID}, Input: fp, } diff --git a/go/vt/vtgate/engine/scalar_aggregation.go b/go/vt/vtgate/engine/scalar_aggregation.go index a0cf09bed9d..7e3e5a391eb 100644 --- a/go/vt/vtgate/engine/scalar_aggregation.go +++ b/go/vt/vtgate/engine/scalar_aggregation.go @@ -20,7 +20,6 @@ import ( "context" "sync" - "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/proto/vtrpc" @@ -46,10 +45,6 @@ type ScalarAggregate struct { // from the result received. If 0, no truncation happens. TruncateColumnCount int `json:",omitempty"` - // Collations stores the collation ID per column offset. - // It is used for grouping keys and distinct aggregate functions - Collations map[int]collations.ID - // Input is the primitive that will feed into this Primitive. Input Primitive } @@ -102,7 +97,7 @@ func (sa *ScalarAggregate) TryExecute(ctx context.Context, vcursor VCursor, bind resultRow, curDistincts = convertRow(row, sa.PreProcess, sa.Aggregates, sa.AggrOnEngine) continue } - resultRow, curDistincts, err = merge(result.Fields, resultRow, row, curDistincts, sa.Collations, sa.Aggregates) + resultRow, curDistincts, err = merge(result.Fields, resultRow, row, curDistincts, sa.Aggregates) if err != nil { return nil, err } @@ -158,7 +153,7 @@ func (sa *ScalarAggregate) TryStreamExecute(ctx context.Context, vcursor VCursor continue } var err error - current, curDistincts, err = merge(fields, current, row, curDistincts, sa.Collations, sa.Aggregates) + current, curDistincts, err = merge(fields, current, row, curDistincts, sa.Aggregates) if err != nil { return err } diff --git a/go/vt/vtgate/planbuilder/aggregation_pushing.go b/go/vt/vtgate/planbuilder/aggregation_pushing.go index a878e039b91..fd5c6d84735 100644 --- a/go/vt/vtgate/planbuilder/aggregation_pushing.go +++ b/go/vt/vtgate/planbuilder/aggregation_pushing.go @@ -228,12 +228,8 @@ func addAggregationToSelect(ctx *plancontext.PlanningContext, sel *sqlparser.Sel func countStarAggr() *operators.Aggr { f := &sqlparser.CountStar{} - - return &operators.Aggr{ - Original: &sqlparser.AliasedExpr{Expr: f}, - OpCode: popcode.AggregateCountStar, - Alias: "count(*)", - } + aggr := operators.NewAggr(popcode.AggregateCountStar, f, &sqlparser.AliasedExpr{Expr: f}, "count(*)") + return &aggr } /* diff --git a/go/vt/vtgate/planbuilder/distinct.go b/go/vt/vtgate/planbuilder/distinct.go index 98e6b550b8b..dff6370078e 100644 --- a/go/vt/vtgate/planbuilder/distinct.go +++ b/go/vt/vtgate/planbuilder/distinct.go @@ -27,10 +27,21 @@ var _ logicalPlan = (*distinct)(nil) type distinct struct { logicalPlanCommon checkCols []engine.CheckCol + truncateColumn int + + // needToTruncate is the old way to check weight_string column and set truncation. needToTruncate bool } -func newDistinct(source logicalPlan, checkCols []engine.CheckCol, needToTruncate bool) logicalPlan { +func newDistinct(source logicalPlan, checkCols []engine.CheckCol, truncateColumn int) logicalPlan { + return &distinct{ + logicalPlanCommon: newBuilderCommon(source), + checkCols: checkCols, + truncateColumn: truncateColumn, + } +} + +func newDistinctGen4Legacy(source logicalPlan, checkCols []engine.CheckCol, needToTruncate bool) logicalPlan { return &distinct{ logicalPlanCommon: newBuilderCommon(source), checkCols: checkCols, @@ -47,14 +58,19 @@ func (d *distinct) Primitive() engine.Primitive { // If we are missing the checkCols information, we are on the V3 planner and should produce a V3 Distinct return &engine.DistinctV3{Source: d.input.Primitive()} } - truncate := false + + truncate := d.truncateColumn if d.needToTruncate { + wsColFound := false for _, col := range d.checkCols { if col.WsCol != nil { - truncate = true + wsColFound = true break } } + if wsColFound { + truncate = len(d.checkCols) + } } return &engine.Distinct{ Source: d.input.Primitive(), diff --git a/go/vt/vtgate/planbuilder/operator_transformers.go b/go/vt/vtgate/planbuilder/operator_transformers.go index 8da42d3b585..cde1b756a9a 100644 --- a/go/vt/vtgate/planbuilder/operator_transformers.go +++ b/go/vt/vtgate/planbuilder/operator_transformers.go @@ -65,6 +65,8 @@ func transformToLogicalPlan(ctx *plancontext.PlanningContext, op ops.Operator, i return transformOrdering(ctx, op) case *operators.Aggregator: return transformAggregator(ctx, op) + case *operators.Distinct: + return transformDistinct(ctx, op) } return nil, vterrors.VT13001(fmt.Sprintf("unknown type encountered: %T (transformToLogicalPlan)", op)) @@ -88,12 +90,14 @@ func transformAggregator(ctx *plancontext.PlanningContext, op *operators.Aggrega return nil, vterrors.VT12001(fmt.Sprintf("in scatter query: aggregation function '%s'", sqlparser.String(aggr.Original))) } oa.aggregates = append(oa.aggregates, &engine.AggregateParams{ - Opcode: aggr.OpCode, - Col: aggr.ColOffset, - Alias: aggr.Alias, - Expr: aggr.Func, - Original: aggr.Original, - OrigOpcode: aggr.OriginalOpCode, + Opcode: aggr.OpCode, + Col: aggr.ColOffset, + Alias: aggr.Alias, + Expr: aggr.Func, + Original: aggr.Original, + OrigOpcode: aggr.OriginalOpCode, + WCol: aggr.WSOffset, + CollationID: aggr.GetCollation(ctx), }) } for _, groupBy := range op.Grouping { @@ -112,6 +116,14 @@ func transformAggregator(ctx *plancontext.PlanningContext, op *operators.Aggrega return oa, nil } +func transformDistinct(ctx *plancontext.PlanningContext, op *operators.Distinct) (logicalPlan, error) { + src, err := transformToLogicalPlan(ctx, op.Source, false) + if err != nil { + return nil, err + } + return newDistinct(src, op.Columns, op.Truncate), nil +} + func transformOrdering(ctx *plancontext.PlanningContext, op *operators.Ordering) (logicalPlan, error) { plan, err := transformToLogicalPlan(ctx, op.Source, false) if err != nil { @@ -673,7 +685,7 @@ func transformUnionPlan(ctx *plancontext.PlanningContext, op *operators.Union, i if err != nil { return nil, err } - return newDistinct(result, checkCols, isRoot), nil + return newDistinctGen4Legacy(result, checkCols, isRoot), nil } return result, nil diff --git a/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go b/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go index 1725d45612f..75ecd5287c0 100644 --- a/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go +++ b/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go @@ -41,10 +41,6 @@ func tryPushingDownAggregator(ctx *plancontext.PlanningContext, aggregator *Aggr case *Filter: output, applyResult, err = pushDownAggregationThroughFilter(ctx, aggregator, src) default: - if aggregator.Original { - err = vterrors.VT12001(fmt.Sprintf("using aggregation on top of a %T plan", src)) - return - } return aggregator, rewrite.SameTree, nil } @@ -121,7 +117,7 @@ withNextColumn: continue withNextColumn } } - pushedAggr.addNoPushCol(aeWrap(col), true) + pushedAggr.addColumnWithoutPushing(aeWrap(col), true) } // Set the source of the filter to the new aggregator placed below the route. @@ -353,34 +349,28 @@ func splitAggrColumnsToLeftAndRight( outer: // we prefer adding the aggregations in the same order as the columns are declared for colIdx, col := range aggregator.Columns { - for aggrIdx, aggr := range aggregator.Aggregations { + for _, aggr := range aggregator.Aggregations { if aggr.ColOffset == colIdx { - aggrToKeep, err := builder.handleAggr(ctx, aggr) + err := builder.handleAggr(ctx, aggr) if err != nil { return nil, nil, err } - aggregator.Aggregations[aggrIdx] = aggrToKeep continue outer } } builder.proj.addUnexploredExpr(col, col.Expr) } - if builder.projectionRequired { - return builder.joinColumns, builder.proj, nil - } - - return builder.joinColumns, join, nil + return builder.joinColumns, builder.proj, nil } type ( // aggBuilder is a helper struct that aids in pushing down an Aggregator through a join // it accumulates the projections (if any) that need to be evaluated on top of the join aggBuilder struct { - lhs, rhs *joinPusher - projectionRequired bool - joinColumns []JoinColumn - proj *Projection - outerJoin bool + lhs, rhs *joinPusher + joinColumns []JoinColumn + proj *Projection + outerJoin bool } // joinPusher is a helper struct that aids in pushing down an Aggregator into one side of a Join. // It creates a new Aggregator that is pushed down and keeps track of the column dependencies that the new Aggregator has. @@ -424,29 +414,25 @@ func (p *joinPusher) countStar(ctx *plancontext.PlanningContext) (*sqlparser.Ali } cs := &sqlparser.CountStar{} ae := aeWrap(cs) - csAggr := Aggr{ - Original: ae, - Func: cs, - OpCode: opcode.AggregateCountStar, - } + csAggr := NewAggr(opcode.AggregateCountStar, cs, ae, "") expr := p.addAggr(ctx, csAggr) p.csAE = aeWrap(expr) return p.csAE, true } -func (ab *aggBuilder) handleAggr(ctx *plancontext.PlanningContext, aggr Aggr) (Aggr, error) { +func (ab *aggBuilder) handleAggr(ctx *plancontext.PlanningContext, aggr Aggr) error { switch aggr.OpCode { case opcode.AggregateCountStar: - return ab.handleCountStar(ctx, aggr) + ab.handleCountStar(ctx, aggr) + return nil case opcode.AggregateMax, opcode.AggregateMin, opcode.AggregateRandom: return ab.handlePushThroughAggregation(ctx, aggr) - case opcode.AggregateCount: - return ab.handleCount(ctx, aggr) - + case opcode.AggregateCount, opcode.AggregateSum: + return ab.handleAggrWithCountStarMultiplier(ctx, aggr) case opcode.AggregateUnassigned: - return Aggr{}, vterrors.VT12001(fmt.Sprintf("in scatter query: aggregation function '%s'", sqlparser.String(aggr.Original))) + return vterrors.VT12001(fmt.Sprintf("in scatter query: aggregation function '%s'", sqlparser.String(aggr.Original))) default: - return Aggr{}, errHorizonNotPlanned() + return errHorizonNotPlanned() } } @@ -468,30 +454,53 @@ func (ab *aggBuilder) pushThroughRight(aggr Aggr) { }) } -func (ab *aggBuilder) handlePushThroughAggregation(ctx *plancontext.PlanningContext, aggr Aggr) (Aggr, error) { +func (ab *aggBuilder) handlePushThroughAggregation(ctx *plancontext.PlanningContext, aggr Aggr) error { ab.proj.addUnexploredExpr(aggr.Original, aggr.Original.Expr) deps := ctx.SemTable.RecursiveDeps(aggr.Original.Expr) switch { case deps.IsSolvedBy(ab.lhs.tableID): ab.pushThroughLeft(aggr) - return aggr, nil case deps.IsSolvedBy(ab.rhs.tableID): ab.pushThroughRight(aggr) - return aggr, nil default: - return Aggr{}, vterrors.VT12001("aggregation on columns from different sources: " + sqlparser.String(aggr.Original.Expr)) + return vterrors.VT12001("aggregation on columns from different sources: " + sqlparser.String(aggr.Original.Expr)) } + return nil } -func (ab *aggBuilder) handleCountStar(ctx *plancontext.PlanningContext, aggr Aggr) (Aggr, error) { - // Projection is necessary since we are going to need to do arithmetics to summarize the aggregates - ab.projectionRequired = true - +func (ab *aggBuilder) handleCountStar(ctx *plancontext.PlanningContext, aggr Aggr) { // Add the aggregate to both sides of the join. lhsAE := ab.leftCountStar(ctx) rhsAE := ab.rightCountStar(ctx) + ab.buildProjectionForAggr(lhsAE, rhsAE, aggr) +} + +func (ab *aggBuilder) handleAggrWithCountStarMultiplier(ctx *plancontext.PlanningContext, aggr Aggr) error { + var lhsAE, rhsAE *sqlparser.AliasedExpr + + deps := ctx.SemTable.RecursiveDeps(aggr.Original.Expr) + switch { + case deps.IsSolvedBy(ab.lhs.tableID): + ab.pushThroughLeft(aggr) + lhsAE = aggr.Original + rhsAE = ab.rightCountStar(ctx) + + case deps.IsSolvedBy(ab.rhs.tableID): + ab.pushThroughRight(aggr) + lhsAE = ab.leftCountStar(ctx) + rhsAE = aggr.Original + + default: + return errHorizonNotPlanned() + } + + ab.buildProjectionForAggr(lhsAE, rhsAE, aggr) + return nil +} + +func (ab *aggBuilder) buildProjectionForAggr(lhsAE *sqlparser.AliasedExpr, rhsAE *sqlparser.AliasedExpr, aggr Aggr) { // We expect the expressions to be different on each side of the join, otherwise it's an error. if lhsAE.Expr == rhsAE.Expr { panic(fmt.Sprintf("Need the two produced expressions to be different. %T %T", lhsAE, rhsAE)) @@ -521,45 +530,6 @@ func (ab *aggBuilder) handleCountStar(ctx *plancontext.PlanningContext, aggr Agg } ab.proj.addUnexploredExpr(projAE, projExpr) - return aggr, nil -} - -func (ab *aggBuilder) handleCount(ctx *plancontext.PlanningContext, aggr Aggr) (Aggr, error) { - ab.projectionRequired = true - - expr := aggr.Original.Expr - deps := ctx.SemTable.RecursiveDeps(expr) - var otherSide sqlparser.Expr - - switch { - case deps.IsSolvedBy(ab.lhs.tableID): - ab.pushThroughLeft(aggr) - ae := ab.rightCountStar(ctx) - otherSide = ae.Expr - - case deps.IsSolvedBy(ab.rhs.tableID): - ab.pushThroughRight(aggr) - ae := ab.leftCountStar(ctx) - otherSide = ae.Expr - - default: - return Aggr{}, errHorizonNotPlanned() - } - - if ab.outerJoin { - otherSide = coalesceFunc(otherSide) - } - - projAE := &sqlparser.AliasedExpr{ - Expr: aggr.Original.Expr, - As: sqlparser.NewIdentifierCI(aggr.Original.ColumnName()), - } - ab.proj.addUnexploredExpr(projAE, &sqlparser.BinaryExpr{ - Operator: sqlparser.MultOp, - Left: expr, - Right: otherSide, - }) - return aggr, nil } func coalesceFunc(e sqlparser.Expr) sqlparser.Expr { @@ -592,8 +562,10 @@ func (p *joinPusher) addAggr(ctx *plancontext.PlanningContext, aggr Aggr) sqlpar // pushThroughAggr pushes through an aggregation without changing dependencies. // Can be used for aggregations we can push in one piece func (p *joinPusher) pushThroughAggr(aggr Aggr) { - p.pushed.Columns = append(p.pushed.Columns, aggr.Original) - p.pushed.Aggregations = append(p.pushed.Aggregations, aggr) + newAggr := NewAggr(aggr.OpCode, aggr.Func, aggr.Original, aggr.Alias) + newAggr.ColOffset = len(p.pushed.Columns) + p.pushed.Columns = append(p.pushed.Columns, newAggr.Original) + p.pushed.Aggregations = append(p.pushed.Aggregations, newAggr) } // addGrouping creates a copy of the given GroupBy, updates its column offset to point to the correct location in the new Aggregator, @@ -608,6 +580,9 @@ func (p *joinPusher) addGrouping(ctx *plancontext.PlanningContext, gb GroupBy) s if copyGB.ColOffset != -1 { offset := p.useColumn(copyGB.ColOffset) copyGB.ColOffset = offset + } else { + copyGB.ColOffset = len(p.pushed.Columns) + p.pushed.Columns = append(p.pushed.Columns, aeWrap(copyGB.Inner)) } p.pushed.Grouping = append(p.pushed.Grouping, copyGB) return expr diff --git a/go/vt/vtgate/planbuilder/operators/aggregator.go b/go/vt/vtgate/planbuilder/operators/aggregator.go index 33d28ae34ae..c9327b4e384 100644 --- a/go/vt/vtgate/planbuilder/operators/aggregator.go +++ b/go/vt/vtgate/planbuilder/operators/aggregator.go @@ -89,7 +89,7 @@ func (a *Aggregator) AddPredicate(ctx *plancontext.PlanningContext, expr sqlpars return a, nil } -func (a *Aggregator) addNoPushCol(expr *sqlparser.AliasedExpr, addToGroupBy bool) int { +func (a *Aggregator) addColumnWithoutPushing(expr *sqlparser.AliasedExpr, addToGroupBy bool) int { offset := len(a.Columns) a.Columns = append(a.Columns, expr) @@ -98,13 +98,9 @@ func (a *Aggregator) addNoPushCol(expr *sqlparser.AliasedExpr, addToGroupBy bool groupBy.ColOffset = offset a.Grouping = append(a.Grouping, groupBy) } else { - a.Aggregations = append(a.Aggregations, Aggr{ - Original: expr, - Func: nil, - OpCode: opcode.AggregateRandom, - Alias: expr.As.String(), - ColOffset: offset, - }) + aggr := NewAggr(opcode.AggregateRandom, nil, expr, expr.As.String()) + aggr.ColOffset = offset + a.Aggregations = append(a.Aggregations, aggr) } return offset } @@ -146,13 +142,9 @@ func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser } if !addToGroupBy { - a.Aggregations = append(a.Aggregations, Aggr{ - Original: expr, - Func: nil, - OpCode: opcode.AggregateRandom, - Alias: expr.As.String(), - ColOffset: len(a.Columns), - }) + aggr := NewAggr(opcode.AggregateRandom, nil, expr, expr.As.String()) + aggr.ColOffset = len(a.Columns) + a.Aggregations = append(a.Aggregations, aggr) } a.Columns = append(a.Columns, expr) expectedOffset := len(a.Columns) - 1 @@ -203,8 +195,6 @@ func (a *Aggregator) GetOrdering() ([]ops.OrderBy, error) { return a.Source.GetOrdering() } -var _ ops.Operator = (*Aggregator)(nil) - func (a *Aggregator) planOffsets(ctx *plancontext.PlanningContext) error { addColumn := func(aliasedExpr *sqlparser.AliasedExpr, addToGroupBy bool) (int, error) { newSrc, offset, err := a.Source.AddColumn(ctx, aliasedExpr, true, addToGroupBy) @@ -219,6 +209,13 @@ func (a *Aggregator) planOffsets(ctx *plancontext.PlanningContext) error { return offset, nil } + if !a.Pushed { + err := a.planOffsetsNotPushed(ctx) + if err != nil { + return err + } + } + for idx, gb := range a.Grouping { if gb.ColOffset == -1 { offset, err := addColumn(aeWrap(gb.Inner), false) @@ -227,7 +224,7 @@ func (a *Aggregator) planOffsets(ctx *plancontext.PlanningContext) error { } a.Grouping[idx].ColOffset = offset } - if a.Grouping[idx].WSOffset != -1 || !ctx.SemTable.NeedsWeightString(gb.SimplifiedExpr) { + if gb.WSOffset != -1 || !ctx.SemTable.NeedsWeightString(gb.SimplifiedExpr) { continue } @@ -238,9 +235,100 @@ func (a *Aggregator) planOffsets(ctx *plancontext.PlanningContext) error { a.Grouping[idx].WSOffset = offset } + for idx, aggr := range a.Aggregations { + if !aggr.NeedWeightString(ctx) { + continue + } + offset, err := addColumn(aeWrap(weightStringFor(aggr.Func.GetArg())), true) + if err != nil { + return err + } + a.Aggregations[idx].WSOffset = offset + } + + return nil +} + +func (aggr Aggr) getPushDownColumn() sqlparser.Expr { + switch aggr.OpCode { + case opcode.AggregateRandom: + return aggr.Original.Expr + case opcode.AggregateCountStar: + return sqlparser.NewIntLiteral("1") + default: + return aggr.Func.GetArg() + } +} + +func (a *Aggregator) planOffsetsNotPushed(ctx *plancontext.PlanningContext) error { + // we need to keep things in the column order, so we can't iterate over the aggregations or groupings + for colIdx, col := range a.Columns { + idx, err := a.addIfGroupingColumn(ctx, col) + if err != nil { + return err + } + if idx >= 0 { + if idx != colIdx { + return vterrors.VT13001(fmt.Sprintf("grouping column on wrong index: want: %d, got: %d", colIdx, idx)) + } + continue + } + + idx, err = a.addIfAggregationColumn(ctx, col) + if err != nil { + return err + } + + if idx < 0 { + return vterrors.VT13001("failed to find the corresponding column") + } + } return nil } +func (a *Aggregator) addIfAggregationColumn(ctx *plancontext.PlanningContext, col *sqlparser.AliasedExpr) (int, error) { + for aggIdx, aggr := range a.Aggregations { + if !ctx.SemTable.EqualsExprWithDeps(col.Expr, aggr.Original.Expr) { + continue + } + + newSrc, offset, err := a.Source.AddColumn(ctx, aeWrap(aggr.getPushDownColumn()), false, false) + if err != nil { + return 0, err + } + a.Aggregations[aggIdx].ColOffset = offset + a.Source = newSrc + return offset, nil + } + return -1, nil +} + +func (a *Aggregator) addIfGroupingColumn(ctx *plancontext.PlanningContext, col *sqlparser.AliasedExpr) (int, error) { + for gbIdx, gb := range a.Grouping { + if !ctx.SemTable.EqualsExprWithDeps(col.Expr, gb.SimplifiedExpr) { + continue + } + + newSrc, offset, err := a.Source.AddColumn(ctx, col, false, false) + if err != nil { + return 0, err + } + + a.Grouping[gbIdx].ColOffset = offset + a.Source = newSrc + + if !ctx.SemTable.NeedsWeightString(gb.SimplifiedExpr) { + return offset, nil + } + + // TODO: we need to do stuff + return offset, nil + } + return -1, nil +} + func (a *Aggregator) setTruncateColumnCount(offset int) { a.ResultColumns = offset } + +var _ ops.Operator = (*Aggregator)(nil) diff --git a/go/vt/vtgate/planbuilder/operators/distinct.go b/go/vt/vtgate/planbuilder/operators/distinct.go new file mode 100644 index 00000000000..9d1eea8182e --- /dev/null +++ b/go/vt/vtgate/planbuilder/operators/distinct.go @@ -0,0 +1,131 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operators + +import ( + "golang.org/x/exp/slices" + + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtgate/engine" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" + "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" +) + +type ( + Distinct struct { + Source ops.Operator + QP *QueryProjection + Pushed bool + + // When offset planning, we'll fill in this field + Columns []engine.CheckCol + + Truncate int + } +) + +func (d *Distinct) planOffsets(ctx *plancontext.PlanningContext) error { + columns, err := d.GetColumns() + if err != nil { + return err + } + d.Columns = nil + var exprs []sqlparser.Expr + for _, col := range columns { + newSrc, offset, err := d.Source.AddColumn(ctx, col, true, false) + if err != nil { + return err + } + d.Source = newSrc + e := d.QP.GetSimplifiedExpr(col.Expr) + exprs = append(exprs, e) + d.Columns = append(d.Columns, engine.CheckCol{ + Col: offset, + Collation: ctx.SemTable.CollationForExpr(e), + }) + } + for i, e := range exprs { + if !ctx.SemTable.NeedsWeightString(e) { + continue + } + newSrc, offset, err := d.Source.AddColumn(ctx, aeWrap(weightStringFor(e)), true, false) + if err != nil { + return err + } + d.Source = newSrc + d.Columns[i].WsCol = &offset + } + return nil +} + +func (d *Distinct) Clone(inputs []ops.Operator) ops.Operator { + return &Distinct{ + Source: inputs[0], + Columns: slices.Clone(d.Columns), + QP: d.QP, + Pushed: d.Pushed, + Truncate: d.Truncate, + } +} + +func (d *Distinct) Inputs() []ops.Operator { + return []ops.Operator{d.Source} +} + +func (d *Distinct) SetInputs(operators []ops.Operator) { + d.Source = operators[0] +} + +func (d *Distinct) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (ops.Operator, error) { + newSrc, err := d.Source.AddPredicate(ctx, expr) + if err != nil { + return nil, err + } + d.Source = newSrc + return d, nil +} + +func (d *Distinct) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, reuseExisting, addToGroupBy bool) (ops.Operator, int, error) { + newSrc, offset, err := d.Source.AddColumn(ctx, expr, reuseExisting, addToGroupBy) + if err != nil { + return nil, 0, err + } + d.Source = newSrc + return d, offset, nil +} + +func (d *Distinct) GetColumns() ([]*sqlparser.AliasedExpr, error) { + return d.Source.GetColumns() +} + +func (d *Distinct) Description() ops.OpDescription { + return ops.OpDescription{ + OperatorType: "Distinct", + } +} + +func (d *Distinct) ShortDescription() string { + return "" +} + +func (d *Distinct) GetOrdering() ([]ops.OrderBy, error) { + return d.Source.GetOrdering() +} + +func (d *Distinct) setTruncateColumnCount(offset int) { + d.Truncate = offset +} diff --git a/go/vt/vtgate/planbuilder/operators/horizon_planning.go b/go/vt/vtgate/planbuilder/operators/horizon_planning.go index 6120c35a00a..75067f71d69 100644 --- a/go/vt/vtgate/planbuilder/operators/horizon_planning.go +++ b/go/vt/vtgate/planbuilder/operators/horizon_planning.go @@ -29,6 +29,21 @@ import ( "vitess.io/vitess/go/vt/vtgate/semantics" ) +type ( + projector struct { + cols []ProjExpr + names []*sqlparser.AliasedExpr + } + + // horizonLike should be removed. we should use Horizon for both these cases + horizonLike interface { + ops.Operator + selectStatement() sqlparser.SelectStatement + src() ops.Operator + getQP(ctx *plancontext.PlanningContext) (*QueryProjection, error) + } +) + func errHorizonNotPlanned() error { return _errHorizonNotPlanned } @@ -81,8 +96,7 @@ func tryHorizonPlanning(ctx *plancontext.PlanningContext, root ops.Operator) (ou // If we can't, we will instead expand the Horizon into // smaller operators and try to push these down as far as possible func planHorizons(ctx *plancontext.PlanningContext, root ops.Operator) (ops.Operator, error) { - var err error - root, err = optimizeHorizonPlanning(ctx, root) + root, err := optimizeHorizonPlanning(ctx, root) if err != nil { return nil, err } @@ -118,6 +132,8 @@ func optimizeHorizonPlanning(ctx *plancontext.PlanningContext, root ops.Operator return tryPushingDownAggregator(ctx, in) case *Filter: return tryPushingDownFilter(ctx, in) + case *Distinct: + return tryPushingDownDistinct(in) default: return in, rewrite.SameTree, nil } @@ -165,6 +181,39 @@ func tryPushingDownFilter(ctx *plancontext.PlanningContext, in *Filter) (ops.Ope return rewrite.Swap(in, proj, "push filter under projection") } +func tryPushingDownDistinct(in *Distinct) (ops.Operator, *rewrite.ApplyResult, error) { + if in.Pushed { + return in, rewrite.SameTree, nil + } + switch src := in.Source.(type) { + case *Route: + if src.IsSingleShard() { + return rewrite.Swap(in, src, "push distinct under route") + } + case *Distinct: + return src, rewrite.NewTree("removed double distinct", src), nil + case *Aggregator: + return in, rewrite.SameTree, nil + } + + cols, err := in.Source.GetColumns() + if err != nil { + return nil, nil, err + } + + aggr := &Aggregator{ + Source: in.Source, + QP: in.QP, + Original: true, + } + + for _, col := range cols { + aggr.addColumnWithoutPushing(col, true) + } + + return aggr, rewrite.NewTree("replace distinct with aggregator", in), nil +} + // addOrderBysAndGroupBysForAggregations runs after we have run horizonPlanning until the op tree stops changing // this means that we have pushed aggregations and other ops as far down as they'll go // addOrderBysAndGroupBysForAggregations will find Aggregators that have not been pushed under routes and @@ -173,12 +222,14 @@ func addOrderBysAndGroupBysForAggregations(ctx *plancontext.PlanningContext, roo visitor := func(in ops.Operator, _ semantics.TableSet, isRoot bool) (ops.Operator, *rewrite.ApplyResult, error) { switch in := in.(type) { case *Aggregator: - // first we update the incoming columns, so we know about any new columns that have been added - columns, err := in.Source.GetColumns() - if err != nil { - return nil, nil, err + if in.Pushed { + // first we update the incoming columns, so we know about any new columns that have been added + columns, err := in.Source.GetColumns() + if err != nil { + return nil, nil, err + } + in.Columns = columns } - in.Columns = columns requireOrdering, err := needsOrdering(in, ctx) if err != nil { @@ -256,7 +307,7 @@ func tryPushingDownOrdering(ctx *plancontext.PlanningContext, in *Ordering) (ops } return rewrite.Swap(in, src, "push ordering under projection") case *Aggregator: - if !src.QP.AlignGroupByAndOrderBy(ctx) { + if !(src.QP.AlignGroupByAndOrderBy(ctx) || overlaps(ctx, in.Order, src.Grouping)) { return in, rewrite.SameTree, nil } @@ -266,6 +317,20 @@ func tryPushingDownOrdering(ctx *plancontext.PlanningContext, in *Ordering) (ops return in, rewrite.SameTree, nil } +func overlaps(ctx *plancontext.PlanningContext, order []ops.OrderBy, grouping []GroupBy) bool { +ordering: + for _, orderBy := range order { + for _, groupBy := range grouping { + if ctx.SemTable.EqualsExprWithDeps(orderBy.SimplifiedExpr, groupBy.SimplifiedExpr) { + continue ordering + } + } + return false + } + + return true +} + func pushOrderingUnderAggr(ctx *plancontext.PlanningContext, order *Ordering, aggregator *Aggregator) (ops.Operator, *rewrite.ApplyResult, error) { // Step 1: Align the GROUP BY and ORDER BY. // Reorder the GROUP BY columns to match the ORDER BY columns. @@ -362,11 +427,6 @@ func pushDownProjectionInVindex( return src, rewrite.NewTree("push projection into vindex", p), nil } -type projector struct { - cols []ProjExpr - names []*sqlparser.AliasedExpr -} - func (p *projector) add(e ProjExpr, alias *sqlparser.AliasedExpr) { p.cols = append(p.cols, e) p.names = append(p.names, alias) @@ -628,25 +688,13 @@ func pushOrExpandHorizon(ctx *plancontext.PlanningContext, in horizonLike) (ops. return expandHorizon(ctx, in) } -// horizonLike should be removed. we should use Horizon for both these cases -type horizonLike interface { - ops.Operator - selectStatement() sqlparser.SelectStatement - src() ops.Operator - getQP(ctx *plancontext.PlanningContext) (*QueryProjection, error) -} - func expandHorizon(ctx *plancontext.PlanningContext, horizon horizonLike) (ops.Operator, *rewrite.ApplyResult, error) { sel, isSel := horizon.selectStatement().(*sqlparser.Select) if !isSel { return nil, nil, errHorizonNotPlanned() } - qp, err := horizon.getQP(ctx) - if err != nil { - return nil, nil, err - } - if sel.Having != nil || qp.NeedsDistinct() || sel.Distinct { + if sel.Having != nil { return nil, nil, errHorizonNotPlanned() } @@ -655,6 +703,25 @@ func expandHorizon(ctx *plancontext.PlanningContext, horizon horizonLike) (ops.O return nil, nil, err } + qp, err := horizon.getQP(ctx) + if err != nil { + return nil, nil, err + } + + if qp.NeedsDistinct() { + op = &Distinct{ + Source: op, + QP: qp, + } + } + + if len(qp.OrderExprs) > 0 { + op = &Ordering{ + Source: op, + Order: qp.OrderExprs, + } + } + if sel.Limit != nil { op = &Limit{ Source: op, @@ -694,12 +761,6 @@ func createProjectionFromSelect(ctx *plancontext.PlanningContext, horizon horizo projX.Alias = derived.Alias } out = projX - if qp.OrderExprs != nil { - out = &Ordering{ - Source: out, - Order: qp.OrderExprs, - } - } return out, nil } @@ -738,29 +799,29 @@ outer: if err != nil { return nil, err } + addedToCol := false for idx, groupBy := range a.Grouping { - if ae == groupBy.aliasedExpr { - a.Columns = append(a.Columns, ae) - a.Grouping[idx].ColOffset = colIdx - continue outer + if ctx.SemTable.EqualsExprWithDeps(groupBy.SimplifiedExpr, ae.Expr) { + if !addedToCol { + a.Columns = append(a.Columns, ae) + addedToCol = true + } + if groupBy.ColOffset < 0 { + a.Grouping[idx].ColOffset = colIdx + } } } + if addedToCol { + continue + } for idx, aggr := range a.Aggregations { - if ae == aggr.Original { + if ctx.SemTable.EqualsExprWithDeps(aggr.Original.Expr, ae.Expr) && aggr.ColOffset < 0 { a.Columns = append(a.Columns, ae) a.Aggregations[idx].ColOffset = colIdx continue outer } } - return nil, vterrors.VT13001(fmt.Sprintf("Could not find the %v in aggregation in the original query", expr)) - } - - // If ordering is required, create an Ordering operation. - if len(qp.OrderExprs) > 0 { - return &Ordering{ - Source: a, - Order: qp.OrderExprs, - }, nil + return nil, vterrors.VT13001(fmt.Sprintf("Could not find the %s in aggregation in the original query", sqlparser.String(ae))) } return a, nil diff --git a/go/vt/vtgate/planbuilder/operators/projection.go b/go/vt/vtgate/planbuilder/operators/projection.go index a5a9f6d4492..1ed2e1cd681 100644 --- a/go/vt/vtgate/planbuilder/operators/projection.go +++ b/go/vt/vtgate/planbuilder/operators/projection.go @@ -84,7 +84,7 @@ func (p *Projection) addUnexploredExpr(ae *sqlparser.AliasedExpr, e sqlparser.Ex return len(p.Projections) - 1 } -func (p *Projection) addNoPushCol(expr *sqlparser.AliasedExpr, _ bool) int { +func (p *Projection) addColumnWithoutPushing(expr *sqlparser.AliasedExpr, _ bool) int { return p.addUnexploredExpr(expr, expr.Expr) } @@ -204,29 +204,43 @@ func (p *Projection) ShortDescription() string { return strings.Join(columns, ", ") } -func (p *Projection) Compact(*plancontext.PlanningContext) (ops.Operator, *rewrite.ApplyResult, error) { +func (p *Projection) Compact(ctx *plancontext.PlanningContext) (ops.Operator, *rewrite.ApplyResult, error) { switch src := p.Source.(type) { case *Route: return p.compactWithRoute(src) case *ApplyJoin: - return p.compactWithJoin(src) + return p.compactWithJoin(ctx, src) } return p, rewrite.SameTree, nil } -func (p *Projection) compactWithJoin(src *ApplyJoin) (ops.Operator, *rewrite.ApplyResult, error) { +func (p *Projection) compactWithJoin(ctx *plancontext.PlanningContext, src *ApplyJoin) (ops.Operator, *rewrite.ApplyResult, error) { var newColumns []int var newColumnsAST []JoinColumn - for _, col := range p.Projections { - offset, ok := col.(Offset) - if !ok { + for idx, col := range p.Projections { + switch col := col.(type) { + case Offset: + newColumns = append(newColumns, src.Columns[col.Offset]) + newColumnsAST = append(newColumnsAST, src.ColumnsAST[col.Offset]) + case UnexploredExpression: + if !ctx.SemTable.EqualsExprWithDeps(col.E, p.Columns[idx].Expr) { + // the inner expression is different from what we are presenting to the outside - this means we need to evaluate + return p, rewrite.SameTree, nil + } + offset := slices.IndexFunc(src.ColumnsAST, func(jc JoinColumn) bool { + return ctx.SemTable.EqualsExprWithDeps(jc.Original.Expr, col.E) + }) + if offset < 0 { + return p, rewrite.SameTree, nil + } + if len(src.Columns) > 0 { + newColumns = append(newColumns, src.Columns[offset]) + } + newColumnsAST = append(newColumnsAST, src.ColumnsAST[offset]) + default: return p, rewrite.SameTree, nil } - - newColumns = append(newColumns, src.Columns[offset.Offset]) - newColumnsAST = append(newColumnsAST, src.ColumnsAST[offset.Offset]) } - src.Columns = newColumns src.ColumnsAST = newColumnsAST return src, rewrite.NewTree("remove projection from before join", src), nil diff --git a/go/vt/vtgate/planbuilder/operators/queryprojection.go b/go/vt/vtgate/planbuilder/operators/queryprojection.go index f0147ca7ec6..e3c6981ff99 100644 --- a/go/vt/vtgate/planbuilder/operators/queryprojection.go +++ b/go/vt/vtgate/planbuilder/operators/queryprojection.go @@ -24,6 +24,7 @@ import ( "golang.org/x/exp/slices" + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine/opcode" @@ -94,7 +95,9 @@ type ( Index *int Distinct bool - ColOffset int // points to the column on the same aggregator + // the offsets point to columns on the same aggregator + ColOffset int + WSOffset int } AggrRewriter struct { @@ -104,6 +107,29 @@ type ( } ) +func (aggr Aggr) NeedWeightString(ctx *plancontext.PlanningContext) bool { + switch aggr.OpCode { + case opcode.AggregateCountDistinct, opcode.AggregateSumDistinct: + return ctx.SemTable.NeedsWeightString(aggr.Func.GetArg()) + case opcode.AggregateMin, opcode.AggregateMax: + // currently this returns false, as aggregation engine primitive does not support the usage of weight_string + // for comparison. If Min/Max column is non-comparable then it will fail at runtime. + return false + } + return false +} + +func (aggr Aggr) GetCollation(ctx *plancontext.PlanningContext) collations.ID { + if aggr.Func == nil { + return collations.Unknown + } + switch aggr.OpCode { + case opcode.AggregateMin, opcode.AggregateMax, opcode.AggregateSumDistinct, opcode.AggregateCountDistinct: + return ctx.SemTable.CollationForExpr(aggr.Func.GetArg()) + } + return collations.Unknown +} + // NewGroupBy creates a new group by from the given fields. func NewGroupBy(inner, simplified sqlparser.Expr, aliasedExpr *sqlparser.AliasedExpr) GroupBy { return GroupBy{ @@ -115,6 +141,17 @@ func NewGroupBy(inner, simplified sqlparser.Expr, aliasedExpr *sqlparser.Aliased } } +func NewAggr(opCode opcode.AggregateOpcode, f sqlparser.AggrFunc, original *sqlparser.AliasedExpr, alias string) Aggr { + return Aggr{ + Original: original, + Func: f, + OpCode: opCode, + Alias: alias, + ColOffset: -1, + WSOffset: -1, + } +} + func (b GroupBy) AsOrderBy() ops.OrderBy { return ops.OrderBy{ Inner: &sqlparser.Order{ @@ -184,11 +221,7 @@ func CreateQPFromSelect(ctx *plancontext.PlanningContext, sel *sqlparser.Select) return nil, err } - if qp.Distinct && !qp.HasAggr { - // grouping and distinct both lead to unique results, so we don't need - // TODO: we should check that we are returning all the grouping columns, or this is not safe to do - qp.groupByExprs = nil - } + qp.calculateDistinct(ctx) return qp, nil } @@ -335,6 +368,38 @@ func (qp *QueryProjection) addOrderBy(ctx *plancontext.PlanningContext, orderBy return nil } +func (qp *QueryProjection) calculateDistinct(ctx *plancontext.PlanningContext) { + if qp.Distinct && !qp.HasAggr { + // grouping and distinct both lead to unique results, so we don't need + qp.groupByExprs = nil + } + + if qp.HasAggr && len(qp.groupByExprs) == 0 { + // this is a scalar aggregation and is inherently distinct + qp.Distinct = false + } + + if !qp.Distinct || len(qp.groupByExprs) == 0 { + return + } + + for _, gb := range qp.groupByExprs { + _, found := canReuseColumn(ctx, qp.SelectExprs, gb.SimplifiedExpr, func(expr SelectExpr) sqlparser.Expr { + getExpr, err := expr.GetExpr() + if err != nil { + panic(err) + } + return getExpr + }) + if !found { + return + } + } + + // since we are returning all grouping expressions, we know the results are guaranteed to be unique + qp.Distinct = false +} + func (qp *QueryProjection) addGroupBy(ctx *plancontext.PlanningContext, groupBy sqlparser.GroupBy) error { es := &expressionSet{} for _, group := range groupBy { @@ -589,12 +654,9 @@ orderBy: if !sqlparser.ContainsAggregation(expr.Col) { if !qp.isExprInGroupByExprs(ctx, expr) { - out = append(out, Aggr{ - Original: aliasedExpr, - OpCode: opcode.AggregateRandom, - Alias: aliasedExpr.ColumnName(), - Index: &idxCopy, - }) + aggr := NewAggr(opcode.AggregateRandom, nil, aliasedExpr, aliasedExpr.ColumnName()) + aggr.Index = &idxCopy + out = append(out, aggr) } continue } @@ -611,9 +673,9 @@ orderBy: } } - aggr, _ := aliasedExpr.Expr.(sqlparser.AggrFunc) + aggrF, _ := aliasedExpr.Expr.(sqlparser.AggrFunc) - if aggr.IsDistinct() { + if aggrF.IsDistinct() { switch code { case opcode.AggregateCount: code = opcode.AggregateCountDistinct @@ -622,14 +684,10 @@ orderBy: } } - out = append(out, Aggr{ - Original: aliasedExpr, - Func: aggr, - OpCode: code, - Alias: aliasedExpr.ColumnName(), - Index: &idxCopy, - Distinct: aggr.IsDistinct(), - }) + aggr := NewAggr(code, aggrF, aliasedExpr, aliasedExpr.ColumnName()) + aggr.Index = &idxCopy + aggr.Distinct = aggrF.IsDistinct() + out = append(out, aggr) } return } diff --git a/go/vt/vtgate/planbuilder/operators/route.go b/go/vt/vtgate/planbuilder/operators/route.go index bcff3ffa652..785371315cc 100644 --- a/go/vt/vtgate/planbuilder/operators/route.go +++ b/go/vt/vtgate/planbuilder/operators/route.go @@ -571,7 +571,7 @@ func (r *Route) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser.Alia r.Source = src // And since we are under the route, we don't need to continue pushing anything further down - offset := src.addNoPushCol(expr, false) + offset := src.addColumnWithoutPushing(expr, false) if err != nil { return nil, 0, err } @@ -579,7 +579,7 @@ func (r *Route) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser.Alia } type selectExpressions interface { - addNoPushCol(expr *sqlparser.AliasedExpr, addToGroupBy bool) int + addColumnWithoutPushing(expr *sqlparser.AliasedExpr, addToGroupBy bool) int isDerived() bool } @@ -597,7 +597,7 @@ func addColumnToInput(operator ops.Operator, expr *sqlparser.AliasedExpr, addToG // we have to add a new projection and can't build on this one return false, 0 } - offset := op.addNoPushCol(expr, addToGroupBy) + offset := op.addColumnWithoutPushing(expr, addToGroupBy) return true, offset default: return false, 0 diff --git a/go/vt/vtgate/planbuilder/ordered_aggregate.go b/go/vt/vtgate/planbuilder/ordered_aggregate.go index 4d7ca9f1d77..2c570782c31 100644 --- a/go/vt/vtgate/planbuilder/ordered_aggregate.go +++ b/go/vt/vtgate/planbuilder/ordered_aggregate.go @@ -21,7 +21,6 @@ import ( "strconv" "strings" - "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/sqlparser" @@ -226,18 +225,6 @@ func findAlias(colname *sqlparser.ColName, selects sqlparser.SelectExprs) sqlpar // Primitive implements the logicalPlan interface func (oa *orderedAggregate) Primitive() engine.Primitive { - colls := map[int]collations.ID{} - for _, key := range oa.aggregates { - if key.CollationID != collations.Unknown { - colls[key.KeyCol] = key.CollationID - } - } - for _, key := range oa.groupByKeys { - if key.CollationID != collations.Unknown { - colls[key.KeyCol] = key.CollationID - } - } - input := oa.input.Primitive() if len(oa.groupByKeys) == 0 { return &engine.ScalarAggregate{ @@ -245,7 +232,6 @@ func (oa *orderedAggregate) Primitive() engine.Primitive { AggrOnEngine: oa.aggrOnEngine, Aggregates: oa.aggregates, TruncateColumnCount: oa.truncateColumnCount, - Collations: colls, Input: input, } } @@ -256,7 +242,6 @@ func (oa *orderedAggregate) Primitive() engine.Primitive { Aggregates: oa.aggregates, GroupByKeys: oa.groupByKeys, TruncateColumnCount: oa.truncateColumnCount, - Collations: colls, Input: input, } } diff --git a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json index 5d421cc8a3a..b31f403e521 100644 --- a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json @@ -75,15 +75,15 @@ { "OperatorType": "Projection", "Expressions": [ - "[COLUMN 0] * COALESCE([COLUMN 1], INT64(1)) as sum(`user`.col)" + "[COLUMN 0] * [COLUMN 1] as sum(`user`.col)" ], "Inputs": [ { "OperatorType": "Join", "Variant": "Join", - "JoinColumnIndexes": "L:1,R:1", + "JoinColumnIndexes": "L:0,R:0", "JoinVars": { - "user_foo": 0 + "user_foo": 1 }, "TableName": "`user`_user_extra", "Inputs": [ @@ -94,8 +94,8 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select `user`.foo, sum(`user`.col), weight_string(`user`.foo) from `user` where 1 != 1 group by `user`.foo, weight_string(`user`.foo)", - "Query": "select `user`.foo, sum(`user`.col), weight_string(`user`.foo) from `user` group by `user`.foo, weight_string(`user`.foo)", + "FieldQuery": "select sum(`user`.col), `user`.foo from `user` where 1 != 1 group by `user`.foo", + "Query": "select sum(`user`.col), `user`.foo from `user` group by `user`.foo", "Table": "`user`" }, { @@ -105,8 +105,8 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select 1, count(*) from user_extra where 1 != 1 group by 1", - "Query": "select 1, count(*) from user_extra where user_extra.bar = :user_foo group by 1", + "FieldQuery": "select count(*) from user_extra where 1 != 1 group by .0", + "Query": "select count(*) from user_extra where user_extra.bar = :user_foo group by .0", "Table": "user_extra" } ] @@ -484,9 +484,9 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select col from `user` where 1 != 1", + "FieldQuery": "select col from `user` where 1 != 1 group by col", "OrderBy": "0 ASC", - "Query": "select distinct col from `user` order by col asc", + "Query": "select col from `user` group by col order by col asc", "Table": "`user`" } ] @@ -1797,26 +1797,19 @@ "Original": "select distinct a, count(*) from user", "Instructions": { "OperatorType": "Aggregate", - "Variant": "Ordered", - "GroupBy": "0, 1", + "Variant": "Scalar", + "Aggregates": "random(0) AS a, sum_count_star(1) AS count(*)", "Inputs": [ { - "OperatorType": "Aggregate", - "Variant": "Scalar", - "Aggregates": "random(0) AS a, sum_count_star(1) AS count(*)", - "Inputs": [ - { - "OperatorType": "Route", - "Variant": "Scatter", - "Keyspace": { - "Name": "user", - "Sharded": true - }, - "FieldQuery": "select a, count(*) from `user` where 1 != 1", - "Query": "select a, count(*) from `user`", - "Table": "`user`" - } - ] + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select a, count(*) from `user` where 1 != 1", + "Query": "select a, count(*) from `user`", + "Table": "`user`" } ] }, @@ -1864,28 +1857,21 @@ "Instructions": { "OperatorType": "Aggregate", "Variant": "Ordered", - "GroupBy": "(0|2), 1", + "Aggregates": "sum_count_star(1) AS count(*)", + "GroupBy": "(0|2)", "ResultColumns": 2, "Inputs": [ { - "OperatorType": "Aggregate", - "Variant": "Ordered", - "Aggregates": "sum_count_star(1) AS count(*)", - "GroupBy": "(0|2)", - "Inputs": [ - { - "OperatorType": "Route", - "Variant": "Scatter", - "Keyspace": { - "Name": "user", - "Sharded": true - }, - "FieldQuery": "select a, count(*), weight_string(a) from `user` where 1 != 1 group by a, weight_string(a)", - "OrderBy": "(0|2) ASC", - "Query": "select a, count(*), weight_string(a) from `user` group by a, weight_string(a) order by a asc", - "Table": "`user`" - } - ] + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select a, count(*), weight_string(a) from `user` where 1 != 1 group by a, weight_string(a)", + "OrderBy": "(0|2) ASC", + "Query": "select a, count(*), weight_string(a) from `user` group by a, weight_string(a) order by a asc", + "Table": "`user`" } ] }, @@ -2088,9 +2074,9 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select col1, col2, weight_string(col1), weight_string(col2) from `user` where 1 != 1 group by col1, col2", + "FieldQuery": "select col1, col2, weight_string(col1), weight_string(col2) from `user` where 1 != 1 group by col1, col2, weight_string(col1), weight_string(col2)", "OrderBy": "(0|2) ASC, (1|3) ASC", - "Query": "select distinct col1, col2, weight_string(col1), weight_string(col2) from `user` group by col1, col2 order by col1 asc, col2 asc", + "Query": "select col1, col2, weight_string(col1), weight_string(col2) from `user` group by col1, col2, weight_string(col1), weight_string(col2) order by col1 asc, col2 asc", "Table": "`user`" } ] @@ -3388,7 +3374,7 @@ "OperatorType": "Projection", "Expressions": [ "[COLUMN 2] as a", - "[COLUMN 0] * [COLUMN 1] as count(user_extra.a)", + "[COLUMN 1] * [COLUMN 0] as count(user_extra.a)", "[COLUMN 3] as weight_string(`user`.a)" ], "Inputs": [ @@ -3451,7 +3437,7 @@ "OperatorType": "Projection", "Expressions": [ "[COLUMN 0] * [COLUMN 1] as count(u.textcol1)", - "[COLUMN 2] * [COLUMN 3] as count(ue.foo)", + "[COLUMN 3] * [COLUMN 2] as count(ue.foo)", "[COLUMN 4] as bar", "[COLUMN 5] as weight_string(us.bar)" ], @@ -3932,62 +3918,6 @@ ] } }, - { - "comment": "interleaving grouping, aggregation and join", - "query": "select user.col, min(user_extra.foo), user.bar, max(user_extra.bar) from user join user_extra on user.col = user_extra.bar group by user.col, user.bar", - "v3-plan": "VT12001: unsupported: cross-shard query with aggregates", - "gen4-plan": { - "QueryType": "SELECT", - "Original": "select user.col, min(user_extra.foo), user.bar, max(user_extra.bar) from user join user_extra on user.col = user_extra.bar group by user.col, user.bar", - "Instructions": { - "OperatorType": "Aggregate", - "Variant": "Ordered", - "Aggregates": "min(1) AS min(user_extra.foo), max(3) AS max(user_extra.bar)", - "GroupBy": "0, (2|4)", - "ResultColumns": 4, - "Inputs": [ - { - "OperatorType": "Join", - "Variant": "Join", - "JoinColumnIndexes": "R:0,R:1,L:0,L:1,L:2", - "JoinVars": { - "user_col": 0 - }, - "TableName": "`user`_user_extra", - "Inputs": [ - { - "OperatorType": "Route", - "Variant": "Scatter", - "Keyspace": { - "Name": "user", - "Sharded": true - }, - "FieldQuery": "select `user`.col, `user`.bar, weight_string(`user`.bar) from `user` where 1 != 1 group by `user`.col, `user`.bar, weight_string(`user`.bar)", - "OrderBy": "0 ASC, (1|2) ASC", - "Query": "select `user`.col, `user`.bar, weight_string(`user`.bar) from `user` group by `user`.col, `user`.bar, weight_string(`user`.bar) order by `user`.col asc, `user`.bar asc", - "Table": "`user`" - }, - { - "OperatorType": "Route", - "Variant": "Scatter", - "Keyspace": { - "Name": "user", - "Sharded": true - }, - "FieldQuery": "select min(user_extra.foo), max(user_extra.bar) from user_extra where 1 != 1 group by .0", - "Query": "select min(user_extra.foo), max(user_extra.bar) from user_extra where user_extra.bar = :user_col group by .0", - "Table": "user_extra" - } - ] - } - ] - }, - "TablesUsed": [ - "user.user", - "user.user_extra" - ] - } - }, { "comment": "group_concat on single shards", "query": "select group_concat(user_id order by name), id from user group by id", @@ -5645,9 +5575,9 @@ { "OperatorType": "Join", "Variant": "LeftJoin", - "JoinColumnIndexes": "L:0,R:0,L:2", + "JoinColumnIndexes": "L:0,R:0,L:1", "JoinVars": { - "user_foo": 1 + "user_foo": 2 }, "TableName": "`user`_music", "Inputs": [ @@ -5658,9 +5588,9 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select count(*), `user`.foo, `user`.col from `user` where 1 != 1 group by `user`.col, `user`.foo", - "OrderBy": "2 ASC", - "Query": "select count(*), `user`.foo, `user`.col from `user` group by `user`.col, `user`.foo order by `user`.col asc", + "FieldQuery": "select count(*), `user`.col, `user`.foo from `user` where 1 != 1 group by `user`.col, `user`.foo", + "OrderBy": "1 ASC", + "Query": "select count(*), `user`.col, `user`.foo from `user` group by `user`.col, `user`.foo order by `user`.col asc", "Table": "`user`" }, { @@ -5780,9 +5710,9 @@ { "OperatorType": "Join", "Variant": "LeftJoin", - "JoinColumnIndexes": "L:0,R:0,L:2", + "JoinColumnIndexes": "L:0,R:0,L:1", "JoinVars": { - "user_foo": 1 + "user_foo": 2 }, "TableName": "`user`_music", "Inputs": [ @@ -5793,9 +5723,9 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select count(*), `user`.foo, `user`.col from `user` where 1 != 1 group by `user`.col, `user`.foo", - "OrderBy": "2 ASC", - "Query": "select count(*), `user`.foo, `user`.col from `user` group by `user`.col, `user`.foo order by `user`.col asc", + "FieldQuery": "select count(*), `user`.col, `user`.foo from `user` where 1 != 1 group by `user`.col, `user`.foo", + "OrderBy": "1 ASC", + "Query": "select count(*), `user`.col, `user`.foo from `user` group by `user`.col, `user`.foo order by `user`.col asc", "Table": "`user`" }, { @@ -6174,5 +6104,344 @@ "user.user" ] } + }, + { + "comment": "scatter aggregate with ambiguous aliases", + "query": "select distinct a, b as a from user", + "v3-plan": "generating ORDER BY clause: VT03021: ambiguous column reference: a", + "gen4-plan": { + "QueryType": "SELECT", + "Original": "select distinct a, b as a from user", + "Instructions": { + "OperatorType": "Aggregate", + "Variant": "Ordered", + "GroupBy": "(0|2), (1|3)", + "ResultColumns": 2, + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select a, b as a, weight_string(a), weight_string(b) from `user` where 1 != 1 group by a, b, weight_string(a), weight_string(b)", + "OrderBy": "(0|2) ASC, (1|3) ASC", + "Query": "select a, b as a, weight_string(a), weight_string(b) from `user` group by a, b, weight_string(a), weight_string(b) order by a asc, b asc", + "Table": "`user`" + } + ] + }, + "TablesUsed": [ + "user.user" + ] + } + }, + { + "comment": "scatter aggregate with complex select list (can't build order by)", + "query": "select distinct a+1 from user", + "v3-plan": "generating ORDER BY clause: VT12001: unsupported: reference a complex expression", + "gen4-plan": { + "QueryType": "SELECT", + "Original": "select distinct a+1 from user", + "Instructions": { + "OperatorType": "Aggregate", + "Variant": "Ordered", + "GroupBy": "(0|1)", + "ResultColumns": 1, + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select a + 1, weight_string(a + 1) from `user` where 1 != 1 group by a + 1, weight_string(a + 1)", + "OrderBy": "(0|1) ASC", + "Query": "select a + 1, weight_string(a + 1) from `user` group by a + 1, weight_string(a + 1) order by a + 1 asc", + "Table": "`user`" + } + ] + }, + "TablesUsed": [ + "user.user" + ] + } + }, + { + "QueryType": "SELECT", + "Original": "select distinct count(*) from user group by col", + "Instructions": { + "OperatorType": "Distinct", + "Collations": [ + "0" + ], + "ResultColumns": 1, + "Inputs": [ + { + "OperatorType": "Aggregate", + "Variant": "Ordered", + "Aggregates": "sum_count_star(0) AS count(*)", + "GroupBy": "1", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select count(*), col from `user` where 1 != 1 group by col", + "OrderBy": "1 ASC", + "Query": "select count(*), col from `user` group by col order by col asc", + "Table": "`user`" + } + ] + } + ] + }, + "TablesUsed": [ + "user.user" + ] + }, + { + "comment": "scalar aggregates with min, max, sum distinct and count distinct using collations", + "query": "select min(textcol1), max(textcol2), sum(distinct textcol1), count(distinct textcol1) from user", + "v3-plan": "VT12001: unsupported: only one DISTINCT aggregation allowed in a SELECT: count(distinct textcol1)", + "gen4-plan": { + "QueryType": "SELECT", + "Original": "select min(textcol1), max(textcol2), sum(distinct textcol1), count(distinct textcol1) from user", + "Instructions": { + "OperatorType": "Aggregate", + "Variant": "Scalar", + "Aggregates": "min(0) AS min(textcol1), max(1) AS max(textcol2), sum_distinct(2 COLLATE latin1_swedish_ci) AS sum(distinct textcol1), count_distinct(3 COLLATE latin1_swedish_ci) AS count(distinct textcol1)", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select min(textcol1), max(textcol2), textcol1, textcol1 from `user` where 1 != 1 group by textcol1", + "OrderBy": "2 ASC COLLATE latin1_swedish_ci", + "Query": "select min(textcol1), max(textcol2), textcol1, textcol1 from `user` group by textcol1 order by textcol1 asc", + "Table": "`user`" + } + ] + }, + "TablesUsed": [ + "user.user" + ] + } + }, + { + "comment": "grouping aggregates with mi, max, sum distinct and count distinct using collations", + "query": "select col, min(textcol1), max(textcol2), sum(distinct textcol1), count(distinct textcol1) from user group by col", + "v3-plan": "VT12001: unsupported: only one DISTINCT aggregation allowed in a SELECT: count(distinct textcol1)", + "gen4-plan": { + "QueryType": "SELECT", + "Original": "select col, min(textcol1), max(textcol2), sum(distinct textcol1), count(distinct textcol1) from user group by col", + "Instructions": { + "OperatorType": "Aggregate", + "Variant": "Ordered", + "Aggregates": "min(1) AS min(textcol1), max(2) AS max(textcol2), sum_distinct(3 COLLATE latin1_swedish_ci) AS sum(distinct textcol1), count_distinct(4 COLLATE latin1_swedish_ci) AS count(distinct textcol1)", + "GroupBy": "0", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select col, min(textcol1), max(textcol2), textcol1, textcol1 from `user` where 1 != 1 group by col, textcol1", + "OrderBy": "0 ASC, 3 ASC COLLATE latin1_swedish_ci", + "Query": "select col, min(textcol1), max(textcol2), textcol1, textcol1 from `user` group by col, textcol1 order by col asc, textcol1 asc", + "Table": "`user`" + } + ] + }, + "TablesUsed": [ + "user.user" + ] + } + }, + { + "comment": "using a grouping column multiple times should be OK", + "query": "select col, col, count(*) from user group by col", + "v3-plan": "generating ORDER BY clause: VT03021: ambiguous column reference: col", + "gen4-plan": { + "QueryType": "SELECT", + "Original": "select col, col, count(*) from user group by col", + "Instructions": { + "OperatorType": "Aggregate", + "Variant": "Ordered", + "Aggregates": "sum_count_star(2) AS count(*)", + "GroupBy": "0", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select col, col, count(*) from `user` where 1 != 1 group by col", + "OrderBy": "0 ASC", + "Query": "select col, col, count(*) from `user` group by col order by col asc", + "Table": "`user`" + } + ] + }, + "TablesUsed": [ + "user.user" + ] + } + }, + { + "comment": "multiple count star and a count with 3 table join", + "query": "select count(*), count(*), count(u.col) from user u, user u2, user_extra ue", + "v3-plan": "VT12001: unsupported: cross-shard query with aggregates", + "gen4-plan": { + "QueryType": "SELECT", + "Original": "select count(*), count(*), count(u.col) from user u, user u2, user_extra ue", + "Instructions": { + "OperatorType": "Aggregate", + "Variant": "Scalar", + "Aggregates": "sum_count_star(0) AS count(*), sum_count_star(1) AS count(*), sum_count(2) AS count(u.col)", + "Inputs": [ + { + "OperatorType": "Projection", + "Expressions": [ + "[COLUMN 0] * [COLUMN 1] as count(*)", + "[COLUMN 0] * [COLUMN 1] as count(*)", + "[COLUMN 0] * [COLUMN 2] as count(u.col)" + ], + "Inputs": [ + { + "OperatorType": "Join", + "Variant": "Join", + "JoinColumnIndexes": "L:0,R:0,R:1", + "TableName": "user_extra_`user`_`user`", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select count(*) from user_extra as ue where 1 != 1", + "Query": "select count(*) from user_extra as ue", + "Table": "user_extra" + }, + { + "OperatorType": "Projection", + "Expressions": [ + "[COLUMN 0] * [COLUMN 1] as count(*)", + "[COLUMN 2] * [COLUMN 1] as count(u.col)" + ], + "Inputs": [ + { + "OperatorType": "Join", + "Variant": "Join", + "JoinColumnIndexes": "L:0,R:0,L:1", + "TableName": "`user`_`user`", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select count(*), count(u.col) from `user` as u where 1 != 1 group by .0", + "Query": "select count(*), count(u.col) from `user` as u group by .0", + "Table": "`user`" + }, + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select count(*) from `user` as u2 where 1 != 1 group by .0", + "Query": "select count(*) from `user` as u2 group by .0", + "Table": "`user`" + } + ] + } + ] + } + ] + } + ] + } + ] + }, + "TablesUsed": [ + "user.user", + "user.user_extra" + ] + } + }, + { + "comment": "interleaving grouping, aggregation and join with min, max columns", + "query": "select user.col, min(user_extra.foo), user.bar, max(user_extra.bar) from user join user_extra on user.col = user_extra.bar group by user.col, user.bar", + "v3-plan": "VT12001: unsupported: cross-shard query with aggregates", + "gen4-plan": { + "QueryType": "SELECT", + "Original": "select user.col, min(user_extra.foo), user.bar, max(user_extra.bar) from user join user_extra on user.col = user_extra.bar group by user.col, user.bar", + "Instructions": { + "OperatorType": "Aggregate", + "Variant": "Ordered", + "Aggregates": "min(1) AS min(user_extra.foo), max(3) AS max(user_extra.bar)", + "GroupBy": "0, (2|4)", + "ResultColumns": 4, + "Inputs": [ + { + "OperatorType": "Join", + "Variant": "Join", + "JoinColumnIndexes": "L:0,R:0,L:1,R:1,L:2", + "JoinVars": { + "user_col": 0 + }, + "TableName": "`user`_user_extra", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select `user`.col, `user`.bar, weight_string(`user`.bar) from `user` where 1 != 1 group by `user`.col, `user`.bar, weight_string(`user`.bar)", + "OrderBy": "0 ASC, (1|2) ASC", + "Query": "select `user`.col, `user`.bar, weight_string(`user`.bar) from `user` group by `user`.col, `user`.bar, weight_string(`user`.bar) order by `user`.col asc, `user`.bar asc", + "Table": "`user`" + }, + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select min(user_extra.foo), max(user_extra.bar) from user_extra where 1 != 1 group by .0", + "Query": "select min(user_extra.foo), max(user_extra.bar) from user_extra where user_extra.bar = :user_col group by .0", + "Table": "user_extra" + } + ] + } + ] + }, + "TablesUsed": [ + "user.user", + "user.user_extra" + ] + } } ] diff --git a/go/vt/vtgate/planbuilder/testdata/oltp_cases.json b/go/vt/vtgate/planbuilder/testdata/oltp_cases.json index 0a283a77b84..9fdf352aee7 100644 --- a/go/vt/vtgate/planbuilder/testdata/oltp_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/oltp_cases.json @@ -206,8 +206,7 @@ "Instructions": { "OperatorType": "Aggregate", "Variant": "Ordered", - "GroupBy": "(0|1) COLLATE latin1_swedish_ci", - "ResultColumns": 1, + "GroupBy": "0 COLLATE latin1_swedish_ci", "Inputs": [ { "OperatorType": "Route", @@ -216,9 +215,9 @@ "Name": "main", "Sharded": true }, - "FieldQuery": "select c, weight_string(c) from sbtest30 where 1 != 1", - "OrderBy": "0 ASC COLLATE latin1_swedish_ci, 0 ASC COLLATE latin1_swedish_ci", - "Query": "select distinct c, weight_string(c) from sbtest30 where id between 1 and 10 order by c asc, c asc", + "FieldQuery": "select c from sbtest30 where 1 != 1 group by c", + "OrderBy": "0 ASC COLLATE latin1_swedish_ci", + "Query": "select c from sbtest30 where id between 1 and 10 group by c order by c asc", "Table": "sbtest30" } ] diff --git a/go/vt/vtgate/planbuilder/testdata/postprocess_cases.json b/go/vt/vtgate/planbuilder/testdata/postprocess_cases.json index 0efd31d29b8..6b80ba8862b 100644 --- a/go/vt/vtgate/planbuilder/testdata/postprocess_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/postprocess_cases.json @@ -2957,9 +2957,9 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select `user`.a, weight_string(`user`.a) from `user` where 1 != 1", + "FieldQuery": "select `user`.a, weight_string(`user`.a) from `user` where 1 != 1 group by `user`.a, weight_string(`user`.a)", "OrderBy": "(0|1) ASC", - "Query": "select `user`.a, weight_string(`user`.a) from `user` order by `user`.a asc", + "Query": "select `user`.a, weight_string(`user`.a) from `user` group by `user`.a, weight_string(`user`.a) order by `user`.a asc", "Table": "`user`" }, { @@ -2969,8 +2969,8 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select 1 from user_extra where 1 != 1", - "Query": "select 1 from user_extra", + "FieldQuery": "select 1 from user_extra where 1 != 1 group by .0", + "Query": "select 1 from user_extra group by .0", "Table": "user_extra" } ] @@ -3026,9 +3026,9 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select a as c, a, weight_string(a) from `user` where 1 != 1", + "FieldQuery": "select a as c, a, weight_string(a) from `user` where 1 != 1 group by a, a, weight_string(a)", "OrderBy": "(0|2) ASC, (0|2) ASC", - "Query": "select distinct a as c, a, weight_string(a) from `user` order by c asc, a asc", + "Query": "select a as c, a, weight_string(a) from `user` group by a, a, weight_string(a) order by a asc, a asc", "Table": "`user`" } ] @@ -3058,9 +3058,9 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select a, a, weight_string(a) from `user` where 1 != 1", + "FieldQuery": "select a, a, weight_string(a) from `user` where 1 != 1 group by a, a, weight_string(a)", "OrderBy": "(0|2) ASC, (0|2) ASC", - "Query": "select distinct a, a, weight_string(a) from `user` order by a asc, a asc", + "Query": "select a, a, weight_string(a) from `user` group by a, a, weight_string(a) order by a asc, a asc", "Table": "`user`" } ] @@ -3282,5 +3282,84 @@ "user.user_extra" ] } + }, + { + "comment": "having filter with %", + "query": "select a.tcol1 from user a join music b where a.tcol1 = b.tcol2 group by a.tcol1 having repeat(a.tcol1,min(a.id)) like \"A\\%B\" order by a.tcol1", + "v3-plan": "VT12001: unsupported: cross-shard query with aggregates", + "gen4-plan": { + "QueryType": "SELECT", + "Original": "select a.tcol1 from user a join music b where a.tcol1 = b.tcol2 group by a.tcol1 having repeat(a.tcol1,min(a.id)) like \"A\\%B\" order by a.tcol1", + "Instructions": { + "OperatorType": "SimpleProjection", + "Columns": [ + 0 + ], + "Inputs": [ + { + "OperatorType": "Filter", + "Predicate": "repeat(a.tcol1, :1) like 'A\\%B'", + "Inputs": [ + { + "OperatorType": "Aggregate", + "Variant": "Ordered", + "Aggregates": "min(1) AS min(a.id)", + "GroupBy": "(0|2)", + "Inputs": [ + { + "OperatorType": "Projection", + "Expressions": [ + "[COLUMN 0] as tcol1", + "[COLUMN 2] as min(a.id)", + "[COLUMN 1]" + ], + "Inputs": [ + { + "OperatorType": "Join", + "Variant": "Join", + "JoinColumnIndexes": "L:0,L:2,L:1", + "JoinVars": { + "a_tcol1": 0 + }, + "TableName": "`user`_music", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select a.tcol1, min(a.id), weight_string(a.tcol1) from `user` as a where 1 != 1 group by a.tcol1, weight_string(a.tcol1)", + "OrderBy": "(0|2) ASC", + "Query": "select a.tcol1, min(a.id), weight_string(a.tcol1) from `user` as a group by a.tcol1, weight_string(a.tcol1) order by a.tcol1 asc", + "Table": "`user`" + }, + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select 1 from music as b where 1 != 1 group by 1", + "Query": "select 1 from music as b where b.tcol2 = :a_tcol1 group by 1", + "Table": "music" + } + ] + } + ] + } + ] + } + ] + } + ] + }, + "TablesUsed": [ + "user.music", + "user.user" + ] + } } ] diff --git a/go/vt/vtgate/planbuilder/testdata/tpch_cases.json b/go/vt/vtgate/planbuilder/testdata/tpch_cases.json index f69f0c32ed4..33f3f73d9fb 100644 --- a/go/vt/vtgate/planbuilder/testdata/tpch_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/tpch_cases.json @@ -25,38 +25,38 @@ { "OperatorType": "Sort", "Variant": "Memory", - "OrderBy": "1 DESC, (2|5) ASC", + "OrderBy": "1 DESC, (2|4) ASC", "ResultColumns": 4, "Inputs": [ { "OperatorType": "Aggregate", "Variant": "Ordered", "Aggregates": "sum(1) AS revenue", - "GroupBy": "(0|6), (2|5), (3|4)", + "GroupBy": "(0|5), (2|4), (3|6)", "Inputs": [ { "OperatorType": "Projection", "Expressions": [ - "[COLUMN 0] as l_orderkey", - "([COLUMN 6] * COALESCE([COLUMN 7], INT64(1))) * COALESCE([COLUMN 8], INT64(1)) as revenue", - "[COLUMN 1] as o_orderdate", - "[COLUMN 2] as o_shippriority", - "[COLUMN 5]", - "[COLUMN 4]", - "[COLUMN 3]" + "[COLUMN 2] as l_orderkey", + "[COLUMN 0] * [COLUMN 1] as revenue", + "[COLUMN 3] as o_orderdate", + "[COLUMN 4] as o_shippriority", + "[COLUMN 5] as weight_string(o_orderdate)", + "[COLUMN 6] as weight_string(l_orderkey)", + "[COLUMN 7] as weight_string(o_shippriority)" ], "Inputs": [ { "OperatorType": "Sort", "Variant": "Memory", - "OrderBy": "(0|3) ASC, (1|4) ASC, (2|5) ASC", + "OrderBy": "(2|6) ASC, (3|5) ASC, (4|7) ASC", "Inputs": [ { "OperatorType": "Join", "Variant": "Join", - "JoinColumnIndexes": "L:0,R:0,R:1,L:2,R:2,R:3,L:1,R:4,R:5", + "JoinColumnIndexes": "L:0,R:0,L:1,R:1,R:2,R:3,L:2,R:4", "JoinVars": { - "l_orderkey": 0 + "l_orderkey": 1 }, "TableName": "lineitem_orders_customer", "Inputs": [ @@ -67,48 +67,60 @@ "Name": "main", "Sharded": true }, - "FieldQuery": "select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, weight_string(l_orderkey) from lineitem where 1 != 1 group by l_orderkey, weight_string(l_orderkey)", - "Query": "select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, weight_string(l_orderkey) from lineitem where l_shipdate > date('1995-03-15') group by l_orderkey, weight_string(l_orderkey)", + "FieldQuery": "select sum(l_extendedprice * (1 - l_discount)) as revenue, l_orderkey, weight_string(l_orderkey) from lineitem where 1 != 1 group by l_orderkey, weight_string(l_orderkey)", + "Query": "select sum(l_extendedprice * (1 - l_discount)) as revenue, l_orderkey, weight_string(l_orderkey) from lineitem where l_shipdate > date('1995-03-15') group by l_orderkey, weight_string(l_orderkey)", "Table": "lineitem" }, { - "OperatorType": "Join", - "Variant": "Join", - "JoinColumnIndexes": "L:3,L:5,L:4,L:6,L:1,R:1", - "JoinVars": { - "o_custkey": 0 - }, - "TableName": "orders_customer", + "OperatorType": "Projection", + "Expressions": [ + "[COLUMN 0] * [COLUMN 1] as count(*)", + "[COLUMN 2] as o_orderdate", + "[COLUMN 3] as o_shippriority", + "[COLUMN 4] as weight_string(o_orderdate)", + "[COLUMN 5] as weight_string(o_shippriority)" + ], "Inputs": [ { - "OperatorType": "Route", - "Variant": "EqualUnique", - "Keyspace": { - "Name": "main", - "Sharded": true + "OperatorType": "Join", + "Variant": "Join", + "JoinColumnIndexes": "L:0,R:0,L:1,L:2,L:4,L:5", + "JoinVars": { + "o_custkey": 3 }, - "FieldQuery": "select o_custkey, count(*), weight_string(o_custkey), o_orderdate, weight_string(o_orderdate), o_shippriority, weight_string(o_shippriority) from orders where 1 != 1 group by o_custkey, weight_string(o_custkey), o_orderdate, weight_string(o_orderdate), o_shippriority, weight_string(o_shippriority)", - "Query": "select o_custkey, count(*), weight_string(o_custkey), o_orderdate, weight_string(o_orderdate), o_shippriority, weight_string(o_shippriority) from orders where o_orderdate < date('1995-03-15') and o_orderkey = :l_orderkey group by o_custkey, weight_string(o_custkey), o_orderdate, weight_string(o_orderdate), o_shippriority, weight_string(o_shippriority)", - "Table": "orders", - "Values": [ - ":l_orderkey" - ], - "Vindex": "hash" - }, - { - "OperatorType": "Route", - "Variant": "EqualUnique", - "Keyspace": { - "Name": "main", - "Sharded": true - }, - "FieldQuery": "select 1, count(*) from customer where 1 != 1 group by 1", - "Query": "select 1, count(*) from customer where c_mktsegment = 'BUILDING' and c_custkey = :o_custkey group by 1", - "Table": "customer", - "Values": [ - ":o_custkey" - ], - "Vindex": "hash" + "TableName": "orders_customer", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "EqualUnique", + "Keyspace": { + "Name": "main", + "Sharded": true + }, + "FieldQuery": "select count(*), o_orderdate, o_shippriority, o_custkey, weight_string(o_orderdate), weight_string(o_shippriority) from orders where 1 != 1 group by o_orderdate, o_shippriority, o_custkey, weight_string(o_orderdate), weight_string(o_shippriority)", + "Query": "select count(*), o_orderdate, o_shippriority, o_custkey, weight_string(o_orderdate), weight_string(o_shippriority) from orders where o_orderdate < date('1995-03-15') and o_orderkey = :l_orderkey group by o_orderdate, o_shippriority, o_custkey, weight_string(o_orderdate), weight_string(o_shippriority)", + "Table": "orders", + "Values": [ + ":l_orderkey" + ], + "Vindex": "hash" + }, + { + "OperatorType": "Route", + "Variant": "EqualUnique", + "Keyspace": { + "Name": "main", + "Sharded": true + }, + "FieldQuery": "select count(*) from customer where 1 != 1 group by .0", + "Query": "select count(*) from customer where c_mktsegment = 'BUILDING' and c_custkey = :o_custkey group by .0", + "Table": "customer", + "Values": [ + ":o_custkey" + ], + "Vindex": "hash" + } + ] } ] } @@ -246,120 +258,199 @@ { "OperatorType": "Projection", "Expressions": [ - "[COLUMN 0] as n_name", - "(((([COLUMN 2] * COALESCE([COLUMN 3], INT64(1))) * COALESCE([COLUMN 4], INT64(1))) * COALESCE([COLUMN 5], INT64(1))) * COALESCE([COLUMN 6], INT64(1))) * COALESCE([COLUMN 7], INT64(1)) as revenue", - "[COLUMN 1]" + "[COLUMN 2] as n_name", + "[COLUMN 0] * [COLUMN 1] as revenue", + "[COLUMN 3] as weight_string(n_name)" ], "Inputs": [ { "OperatorType": "Sort", "Variant": "Memory", - "OrderBy": "(0|1) ASC", + "OrderBy": "(2|3) ASC", "Inputs": [ { "OperatorType": "Join", "Variant": "Join", - "JoinColumnIndexes": "R:0,R:1,L:3,L:4,L:5,L:6,R:2,R:3", + "JoinColumnIndexes": "L:0,R:0,R:1,R:2", "JoinVars": { - "s_nationkey": 0 + "s_nationkey": 1 }, "TableName": "orders_customer_lineitem_supplier_nation_region", "Inputs": [ { - "OperatorType": "Join", - "Variant": "Join", - "JoinColumnIndexes": "R:0,R:1,R:2,L:6,L:7,R:3,R:4", - "JoinVars": { - "c_nationkey": 1, - "o_orderkey": 0 - }, - "TableName": "orders_customer_lineitem_supplier", + "OperatorType": "Projection", + "Expressions": [ + "[COLUMN 1] * [COLUMN 0] as revenue", + "[COLUMN 2] as s_nationkey" + ], "Inputs": [ { "OperatorType": "Join", "Variant": "Join", - "JoinColumnIndexes": "L:0,R:0,L:0,R:0,L:4,R:2,L:2,R:1", + "JoinColumnIndexes": "R:0,L:0,R:1", "JoinVars": { - "o_custkey": 1 + "c_nationkey": 2, + "o_orderkey": 1 }, - "TableName": "orders_customer", + "TableName": "orders_customer_lineitem_supplier", "Inputs": [ { - "OperatorType": "Route", - "Variant": "Scatter", - "Keyspace": { - "Name": "main", - "Sharded": true - }, - "FieldQuery": "select o_orderkey, o_custkey, count(*), weight_string(o_custkey), weight_string(o_orderkey) from orders where 1 != 1 group by o_custkey, weight_string(o_custkey), o_orderkey, weight_string(o_orderkey)", - "Query": "select o_orderkey, o_custkey, count(*), weight_string(o_custkey), weight_string(o_orderkey) from orders where o_orderdate >= date('1994-01-01') and o_orderdate < date('1994-01-01') + interval '1' year group by o_custkey, weight_string(o_custkey), o_orderkey, weight_string(o_orderkey)", - "Table": "orders" + "OperatorType": "Projection", + "Expressions": [ + "[COLUMN 0] * [COLUMN 1] as count(*)", + "[COLUMN 2] as o_orderkey", + "[COLUMN 3] as c_nationkey" + ], + "Inputs": [ + { + "OperatorType": "Join", + "Variant": "Join", + "JoinColumnIndexes": "L:0,R:0,L:1,R:1", + "JoinVars": { + "o_custkey": 2 + }, + "TableName": "orders_customer", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "main", + "Sharded": true + }, + "FieldQuery": "select count(*), o_orderkey, o_custkey from orders where 1 != 1 group by o_orderkey, o_custkey", + "Query": "select count(*), o_orderkey, o_custkey from orders where o_orderdate >= date('1994-01-01') and o_orderdate < date('1994-01-01') + interval '1' year group by o_orderkey, o_custkey", + "Table": "orders" + }, + { + "OperatorType": "Route", + "Variant": "EqualUnique", + "Keyspace": { + "Name": "main", + "Sharded": true + }, + "FieldQuery": "select count(*), c_nationkey from customer where 1 != 1 group by c_nationkey", + "Query": "select count(*), c_nationkey from customer where c_custkey = :o_custkey group by c_nationkey", + "Table": "customer", + "Values": [ + ":o_custkey" + ], + "Vindex": "hash" + } + ] + } + ] }, { - "OperatorType": "Route", - "Variant": "EqualUnique", - "Keyspace": { - "Name": "main", - "Sharded": true - }, - "FieldQuery": "select c_nationkey, count(*), weight_string(c_nationkey) from customer where 1 != 1 group by c_nationkey, weight_string(c_nationkey)", - "Query": "select c_nationkey, count(*), weight_string(c_nationkey) from customer where c_custkey = :o_custkey group by c_nationkey, weight_string(c_nationkey)", - "Table": "customer", - "Values": [ - ":o_custkey" + "OperatorType": "Projection", + "Expressions": [ + "[COLUMN 0] * [COLUMN 1] as revenue", + "[COLUMN 2] as s_nationkey" ], - "Vindex": "hash" + "Inputs": [ + { + "OperatorType": "Join", + "Variant": "Join", + "JoinColumnIndexes": "L:0,R:0,R:1", + "JoinVars": { + "l_suppkey": 1 + }, + "TableName": "lineitem_supplier", + "Inputs": [ + { + "OperatorType": "VindexLookup", + "Variant": "EqualUnique", + "Keyspace": { + "Name": "main", + "Sharded": true + }, + "Values": [ + ":o_orderkey" + ], + "Vindex": "lineitem_map", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "IN", + "Keyspace": { + "Name": "main", + "Sharded": true + }, + "FieldQuery": "select l_orderkey, l_linenumber from lineitem_map where 1 != 1", + "Query": "select l_orderkey, l_linenumber from lineitem_map where l_orderkey in ::__vals", + "Table": "lineitem_map", + "Values": [ + "::l_orderkey" + ], + "Vindex": "md5" + }, + { + "OperatorType": "Route", + "Variant": "ByDestination", + "Keyspace": { + "Name": "main", + "Sharded": true + }, + "FieldQuery": "select sum(l_extendedprice * (1 - l_discount)) as revenue, l_suppkey from lineitem where 1 != 1 group by l_suppkey", + "Query": "select sum(l_extendedprice * (1 - l_discount)) as revenue, l_suppkey from lineitem where l_orderkey = :o_orderkey group by l_suppkey", + "Table": "lineitem" + } + ] + }, + { + "OperatorType": "Route", + "Variant": "EqualUnique", + "Keyspace": { + "Name": "main", + "Sharded": true + }, + "FieldQuery": "select count(*), s_nationkey from supplier where 1 != 1 group by s_nationkey", + "Query": "select count(*), s_nationkey from supplier where s_nationkey = :c_nationkey and s_suppkey = :l_suppkey group by s_nationkey", + "Table": "supplier", + "Values": [ + ":l_suppkey" + ], + "Vindex": "hash" + } + ] + } + ] } ] - }, + } + ] + }, + { + "OperatorType": "Projection", + "Expressions": [ + "[COLUMN 0] * [COLUMN 1] as count(*)", + "[COLUMN 2] as n_name", + "[COLUMN 3] as weight_string(n_name)" + ], + "Inputs": [ { "OperatorType": "Join", "Variant": "Join", - "JoinColumnIndexes": "R:0,R:0,R:2,L:1,R:1", + "JoinColumnIndexes": "L:0,R:0,L:1,L:3", "JoinVars": { - "l_suppkey": 0 + "n_regionkey": 2 }, - "TableName": "lineitem_supplier", + "TableName": "nation_region", "Inputs": [ { - "OperatorType": "VindexLookup", + "OperatorType": "Route", "Variant": "EqualUnique", "Keyspace": { "Name": "main", "Sharded": true }, + "FieldQuery": "select count(*), n_name, n_regionkey, weight_string(n_name) from nation where 1 != 1 group by n_name, n_regionkey, weight_string(n_name)", + "Query": "select count(*), n_name, n_regionkey, weight_string(n_name) from nation where n_nationkey = :s_nationkey group by n_name, n_regionkey, weight_string(n_name)", + "Table": "nation", "Values": [ - ":o_orderkey" + ":s_nationkey" ], - "Vindex": "lineitem_map", - "Inputs": [ - { - "OperatorType": "Route", - "Variant": "IN", - "Keyspace": { - "Name": "main", - "Sharded": true - }, - "FieldQuery": "select l_orderkey, l_linenumber from lineitem_map where 1 != 1", - "Query": "select l_orderkey, l_linenumber from lineitem_map where l_orderkey in ::__vals", - "Table": "lineitem_map", - "Values": [ - "::l_orderkey" - ], - "Vindex": "md5" - }, - { - "OperatorType": "Route", - "Variant": "ByDestination", - "Keyspace": { - "Name": "main", - "Sharded": true - }, - "FieldQuery": "select l_suppkey, sum(l_extendedprice * (1 - l_discount)) as revenue, weight_string(l_suppkey) from lineitem where 1 != 1 group by l_suppkey, weight_string(l_suppkey)", - "Query": "select l_suppkey, sum(l_extendedprice * (1 - l_discount)) as revenue, weight_string(l_suppkey) from lineitem where l_orderkey = :o_orderkey group by l_suppkey, weight_string(l_suppkey)", - "Table": "lineitem" - } - ] + "Vindex": "hash" }, { "OperatorType": "Route", @@ -368,58 +459,17 @@ "Name": "main", "Sharded": true }, - "FieldQuery": "select s_nationkey, count(*), weight_string(s_nationkey) from supplier where 1 != 1 group by s_nationkey, weight_string(s_nationkey)", - "Query": "select s_nationkey, count(*), weight_string(s_nationkey) from supplier where s_nationkey = :c_nationkey and s_suppkey = :l_suppkey group by s_nationkey, weight_string(s_nationkey)", - "Table": "supplier", + "FieldQuery": "select count(*) from region where 1 != 1 group by .0", + "Query": "select count(*) from region where r_name = 'ASIA' and r_regionkey = :n_regionkey group by .0", + "Table": "region", "Values": [ - ":l_suppkey" + ":n_regionkey" ], "Vindex": "hash" } ] } ] - }, - { - "OperatorType": "Join", - "Variant": "Join", - "JoinColumnIndexes": "L:3,L:4,L:1,R:1", - "JoinVars": { - "n_regionkey": 0 - }, - "TableName": "nation_region", - "Inputs": [ - { - "OperatorType": "Route", - "Variant": "EqualUnique", - "Keyspace": { - "Name": "main", - "Sharded": true - }, - "FieldQuery": "select n_regionkey, count(*), weight_string(n_regionkey), n_name, weight_string(n_name) from nation where 1 != 1 group by n_regionkey, weight_string(n_regionkey), n_name, weight_string(n_name)", - "Query": "select n_regionkey, count(*), weight_string(n_regionkey), n_name, weight_string(n_name) from nation where n_nationkey = :s_nationkey group by n_regionkey, weight_string(n_regionkey), n_name, weight_string(n_name)", - "Table": "nation", - "Values": [ - ":s_nationkey" - ], - "Vindex": "hash" - }, - { - "OperatorType": "Route", - "Variant": "EqualUnique", - "Keyspace": { - "Name": "main", - "Sharded": true - }, - "FieldQuery": "select 1, count(*) from region where 1 != 1 group by 1", - "Query": "select 1, count(*) from region where r_name = 'ASIA' and r_regionkey = :n_regionkey group by 1", - "Table": "region", - "Values": [ - ":n_regionkey" - ], - "Vindex": "hash" - } - ] } ] } @@ -713,142 +763,173 @@ "OperatorType": "Aggregate", "Variant": "Ordered", "Aggregates": "sum(2) AS revenue", - "GroupBy": "(0|14), (1|13), (3|12), (6|11), (4|10), (5|9), (7|8)", + "GroupBy": "(0|8), (1|9), (3|10), (6|11), (4|12), (5|13), (7|14)", "Inputs": [ { "OperatorType": "Projection", "Expressions": [ - "[COLUMN 0] as c_custkey", - "[COLUMN 1] as c_name", - "(([COLUMN 14] * COALESCE([COLUMN 15], INT64(1))) * COALESCE([COLUMN 16], INT64(1))) * COALESCE([COLUMN 17], INT64(1)) as revenue", - "[COLUMN 2] as c_acctbal", - "[COLUMN 4] as n_name", - "[COLUMN 5] as c_address", - "[COLUMN 3] as c_phone", - "[COLUMN 6] as c_comment", - "[COLUMN 13]", - "[COLUMN 12]", - "[COLUMN 11]", - "[COLUMN 10]", - "[COLUMN 9]", - "[COLUMN 8]", - "[COLUMN 7]" + "[COLUMN 2] as c_custkey", + "[COLUMN 3] as c_name", + "[COLUMN 0] * [COLUMN 1] as revenue", + "[COLUMN 4] as c_acctbal", + "[COLUMN 6] as n_name", + "[COLUMN 7] as c_address", + "[COLUMN 5] as c_phone", + "[COLUMN 8] as c_comment", + "[COLUMN 9] as weight_string(c_custkey)", + "[COLUMN 10] as weight_string(c_name)", + "[COLUMN 11] as weight_string(c_acctbal)", + "[COLUMN 12] as weight_string(c_phone)", + "[COLUMN 13] as weight_string(n_name)", + "[COLUMN 14] as weight_string(c_address)", + "[COLUMN 15] as weight_string(c_comment)" ], "Inputs": [ { "OperatorType": "Sort", "Variant": "Memory", - "OrderBy": "(0|7) ASC, (1|8) ASC, (2|9) ASC, (3|10) ASC, (4|11) ASC, (5|12) ASC, (6|13) ASC", + "OrderBy": "(2|9) ASC, (3|10) ASC, (4|11) ASC, (5|12) ASC, (6|13) ASC, (7|14) ASC, (8|15) ASC", "Inputs": [ { "OperatorType": "Join", "Variant": "Join", - "JoinColumnIndexes": "R:0,R:1,R:2,R:3,R:4,R:5,R:6,R:7,R:8,R:9,R:10,R:11,R:12,R:13,L:3,L:4,R:14,R:15", + "JoinColumnIndexes": "L:0,R:0,R:1,R:2,R:3,R:4,R:5,R:6,R:7,R:8,R:9,R:10,R:11,R:12,R:13,R:14", "JoinVars": { - "o_custkey": 0 + "o_custkey": 1 }, "TableName": "orders_lineitem_customer_nation", "Inputs": [ { - "OperatorType": "Join", - "Variant": "Join", - "JoinColumnIndexes": "L:0,L:0,L:4,L:2,R:1", - "JoinVars": { - "o_orderkey": 1 - }, - "TableName": "orders_lineitem", + "OperatorType": "Projection", + "Expressions": [ + "[COLUMN 1] * [COLUMN 0] as revenue", + "[COLUMN 2] as o_custkey" + ], "Inputs": [ { - "OperatorType": "Route", - "Variant": "Scatter", - "Keyspace": { - "Name": "main", - "Sharded": true - }, - "FieldQuery": "select o_custkey, o_orderkey, count(*), weight_string(o_orderkey), weight_string(o_custkey) from orders where 1 != 1 group by o_orderkey, weight_string(o_orderkey), o_custkey, weight_string(o_custkey)", - "Query": "select o_custkey, o_orderkey, count(*), weight_string(o_orderkey), weight_string(o_custkey) from orders where o_orderdate >= date('1993-10-01') and o_orderdate < date('1993-10-01') + interval '3' month group by o_orderkey, weight_string(o_orderkey), o_custkey, weight_string(o_custkey)", - "Table": "orders" - }, - { - "OperatorType": "VindexLookup", - "Variant": "EqualUnique", - "Keyspace": { - "Name": "main", - "Sharded": true + "OperatorType": "Join", + "Variant": "Join", + "JoinColumnIndexes": "R:0,L:0,L:1", + "JoinVars": { + "o_orderkey": 2 }, - "Values": [ - ":o_orderkey" - ], - "Vindex": "lineitem_map", + "TableName": "orders_lineitem", "Inputs": [ { "OperatorType": "Route", - "Variant": "IN", + "Variant": "Scatter", "Keyspace": { "Name": "main", "Sharded": true }, - "FieldQuery": "select l_orderkey, l_linenumber from lineitem_map where 1 != 1", - "Query": "select l_orderkey, l_linenumber from lineitem_map where l_orderkey in ::__vals", - "Table": "lineitem_map", - "Values": [ - "::l_orderkey" - ], - "Vindex": "md5" + "FieldQuery": "select count(*), o_custkey, o_orderkey from orders where 1 != 1 group by o_custkey, o_orderkey", + "Query": "select count(*), o_custkey, o_orderkey from orders where o_orderdate >= date('1993-10-01') and o_orderdate < date('1993-10-01') + interval '3' month group by o_custkey, o_orderkey", + "Table": "orders" }, { - "OperatorType": "Route", - "Variant": "ByDestination", + "OperatorType": "VindexLookup", + "Variant": "EqualUnique", "Keyspace": { "Name": "main", "Sharded": true }, - "FieldQuery": "select 1, sum(l_extendedprice * (1 - l_discount)) as revenue from lineitem where 1 != 1 group by 1", - "Query": "select 1, sum(l_extendedprice * (1 - l_discount)) as revenue from lineitem where l_returnflag = 'R' and l_orderkey = :o_orderkey group by 1", - "Table": "lineitem" + "Values": [ + ":o_orderkey" + ], + "Vindex": "lineitem_map", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "IN", + "Keyspace": { + "Name": "main", + "Sharded": true + }, + "FieldQuery": "select l_orderkey, l_linenumber from lineitem_map where 1 != 1", + "Query": "select l_orderkey, l_linenumber from lineitem_map where l_orderkey in ::__vals", + "Table": "lineitem_map", + "Values": [ + "::l_orderkey" + ], + "Vindex": "md5" + }, + { + "OperatorType": "Route", + "Variant": "ByDestination", + "Keyspace": { + "Name": "main", + "Sharded": true + }, + "FieldQuery": "select sum(l_extendedprice * (1 - l_discount)) as revenue from lineitem where 1 != 1 group by .0", + "Query": "select sum(l_extendedprice * (1 - l_discount)) as revenue from lineitem where l_returnflag = 'R' and l_orderkey = :o_orderkey group by .0", + "Table": "lineitem" + } + ] } ] } ] }, { - "OperatorType": "Join", - "Variant": "Join", - "JoinColumnIndexes": "L:3,L:5,L:7,L:9,R:1,L:11,L:13,L:4,L:6,L:8,L:10,R:2,L:12,L:14,L:1,R:0", - "JoinVars": { - "c_nationkey": 0 - }, - "TableName": "customer_nation", + "OperatorType": "Projection", + "Expressions": [ + "[COLUMN 0] * [COLUMN 1] as count(*)", + "[COLUMN 2] as c_custkey", + "[COLUMN 3] as c_name", + "[COLUMN 4] as c_acctbal", + "[COLUMN 5] as c_phone", + "[COLUMN 6] as n_name", + "[COLUMN 7] as c_address", + "[COLUMN 8] as c_comment", + "[COLUMN 9] as weight_string(c_custkey)", + "[COLUMN 10] as weight_string(c_name)", + "[COLUMN 11] as weight_string(c_acctbal)", + "[COLUMN 12] as weight_string(c_phone)", + "[COLUMN 13] as weight_string(n_name)", + "[COLUMN 14] as weight_string(c_address)", + "[COLUMN 15] as weight_string(c_comment)" + ], "Inputs": [ { - "OperatorType": "Route", - "Variant": "EqualUnique", - "Keyspace": { - "Name": "main", - "Sharded": true + "OperatorType": "Join", + "Variant": "Join", + "JoinColumnIndexes": "L:0,R:0,L:1,L:2,L:3,L:4,R:1,L:5,L:6,L:8,L:9,L:10,L:11,R:2,L:12,L:13", + "JoinVars": { + "c_nationkey": 7 }, - "FieldQuery": "select c_nationkey, count(*), weight_string(c_nationkey), c_custkey, weight_string(c_custkey), c_name, weight_string(c_name), c_acctbal, weight_string(c_acctbal), c_phone, weight_string(c_phone), c_address, weight_string(c_address), c_comment, weight_string(c_comment) from customer where 1 != 1 group by c_nationkey, weight_string(c_nationkey), c_custkey, weight_string(c_custkey), c_name, weight_string(c_name), c_acctbal, weight_string(c_acctbal), c_phone, weight_string(c_phone), c_address, weight_string(c_address), c_comment, weight_string(c_comment)", - "Query": "select c_nationkey, count(*), weight_string(c_nationkey), c_custkey, weight_string(c_custkey), c_name, weight_string(c_name), c_acctbal, weight_string(c_acctbal), c_phone, weight_string(c_phone), c_address, weight_string(c_address), c_comment, weight_string(c_comment) from customer where c_custkey = :o_custkey group by c_nationkey, weight_string(c_nationkey), c_custkey, weight_string(c_custkey), c_name, weight_string(c_name), c_acctbal, weight_string(c_acctbal), c_phone, weight_string(c_phone), c_address, weight_string(c_address), c_comment, weight_string(c_comment)", - "Table": "customer", - "Values": [ - ":o_custkey" - ], - "Vindex": "hash" - }, - { - "OperatorType": "Route", - "Variant": "EqualUnique", - "Keyspace": { - "Name": "main", - "Sharded": true - }, - "FieldQuery": "select count(*), n_name, weight_string(n_name) from nation where 1 != 1 group by n_name, weight_string(n_name)", - "Query": "select count(*), n_name, weight_string(n_name) from nation where n_nationkey = :c_nationkey group by n_name, weight_string(n_name)", - "Table": "nation", - "Values": [ - ":c_nationkey" - ], - "Vindex": "hash" + "TableName": "customer_nation", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "EqualUnique", + "Keyspace": { + "Name": "main", + "Sharded": true + }, + "FieldQuery": "select count(*), c_custkey, c_name, c_acctbal, c_phone, c_address, c_comment, c_nationkey, weight_string(c_custkey), weight_string(c_name), weight_string(c_acctbal), weight_string(c_phone), weight_string(c_address), weight_string(c_comment) from customer where 1 != 1 group by c_custkey, c_name, c_acctbal, c_phone, c_address, c_comment, c_nationkey, weight_string(c_custkey), weight_string(c_name), weight_string(c_acctbal), weight_string(c_phone), weight_string(c_address), weight_string(c_comment)", + "Query": "select count(*), c_custkey, c_name, c_acctbal, c_phone, c_address, c_comment, c_nationkey, weight_string(c_custkey), weight_string(c_name), weight_string(c_acctbal), weight_string(c_phone), weight_string(c_address), weight_string(c_comment) from customer where c_custkey = :o_custkey group by c_custkey, c_name, c_acctbal, c_phone, c_address, c_comment, c_nationkey, weight_string(c_custkey), weight_string(c_name), weight_string(c_acctbal), weight_string(c_phone), weight_string(c_address), weight_string(c_comment)", + "Table": "customer", + "Values": [ + ":o_custkey" + ], + "Vindex": "hash" + }, + { + "OperatorType": "Route", + "Variant": "EqualUnique", + "Keyspace": { + "Name": "main", + "Sharded": true + }, + "FieldQuery": "select count(*), n_name, weight_string(n_name) from nation where 1 != 1 group by n_name, weight_string(n_name)", + "Query": "select count(*), n_name, weight_string(n_name) from nation where n_nationkey = :c_nationkey group by n_name, weight_string(n_name)", + "Table": "nation", + "Values": [ + ":c_nationkey" + ], + "Vindex": "hash" + } + ] } ] } @@ -895,23 +976,23 @@ { "OperatorType": "Projection", "Expressions": [ - "[COLUMN 0] as l_shipmode", - "[COLUMN 2] * COALESCE([COLUMN 3], INT64(1)) as high_line_count", - "[COLUMN 4] * COALESCE([COLUMN 5], INT64(1)) as low_line_count", - "[COLUMN 1]" + "[COLUMN 3] as l_shipmode", + "[COLUMN 0] * [COLUMN 1] as high_line_count", + "[COLUMN 2] * [COLUMN 1] as low_line_count", + "[COLUMN 4] as weight_string(l_shipmode)" ], "Inputs": [ { "OperatorType": "Sort", "Variant": "Memory", - "OrderBy": "(0|1) ASC", + "OrderBy": "(3|4) ASC", "Inputs": [ { "OperatorType": "Join", "Variant": "Join", - "JoinColumnIndexes": "R:1,R:2,L:1,R:0,L:2,R:0", + "JoinColumnIndexes": "L:0,R:0,L:1,R:1,R:2", "JoinVars": { - "o_orderkey": 0 + "o_orderkey": 2 }, "TableName": "orders_lineitem", "Inputs": [ @@ -922,8 +1003,8 @@ "Name": "main", "Sharded": true }, - "FieldQuery": "select o_orderkey, sum(case when o_orderpriority = '1-URGENT' or o_orderpriority = '2-HIGH' then 1 else 0 end) as high_line_count, sum(case when o_orderpriority != '1-URGENT' and o_orderpriority != '2-HIGH' then 1 else 0 end) as low_line_count, weight_string(o_orderkey) from orders where 1 != 1 group by o_orderkey, weight_string(o_orderkey)", - "Query": "select o_orderkey, sum(case when o_orderpriority = '1-URGENT' or o_orderpriority = '2-HIGH' then 1 else 0 end) as high_line_count, sum(case when o_orderpriority != '1-URGENT' and o_orderpriority != '2-HIGH' then 1 else 0 end) as low_line_count, weight_string(o_orderkey) from orders group by o_orderkey, weight_string(o_orderkey)", + "FieldQuery": "select sum(case when o_orderpriority = '1-URGENT' or o_orderpriority = '2-HIGH' then 1 else 0 end) as high_line_count, sum(case when o_orderpriority != '1-URGENT' and o_orderpriority != '2-HIGH' then 1 else 0 end) as low_line_count, o_orderkey from orders where 1 != 1 group by o_orderkey", + "Query": "select sum(case when o_orderpriority = '1-URGENT' or o_orderpriority = '2-HIGH' then 1 else 0 end) as high_line_count, sum(case when o_orderpriority != '1-URGENT' and o_orderpriority != '2-HIGH' then 1 else 0 end) as low_line_count, o_orderkey from orders group by o_orderkey", "Table": "orders" }, { @@ -1288,18 +1369,18 @@ { "OperatorType": "Projection", "Expressions": [ - "[COLUMN 0] * COALESCE([COLUMN 1], INT64(1)) as revenue" + "[COLUMN 0] * [COLUMN 1] as revenue" ], "Inputs": [ { "OperatorType": "Join", "Variant": "Join", - "JoinColumnIndexes": "L:4,R:1", + "JoinColumnIndexes": "L:0,R:0", "JoinVars": { - "l_partkey": 0, - "l_quantity": 1, - "l_shipinstruct": 3, - "l_shipmode": 2 + "l_partkey": 1, + "l_quantity": 2, + "l_shipinstruct": 4, + "l_shipmode": 3 }, "TableName": "lineitem_part", "Inputs": [ @@ -1310,8 +1391,8 @@ "Name": "main", "Sharded": true }, - "FieldQuery": "select l_partkey, l_quantity, l_shipmode, l_shipinstruct, sum(l_extendedprice * (1 - l_discount)) as revenue, weight_string(l_partkey), weight_string(l_quantity), weight_string(l_shipmode), weight_string(l_shipinstruct) from lineitem where 1 != 1 group by l_partkey, weight_string(l_partkey), l_quantity, weight_string(l_quantity), l_shipmode, weight_string(l_shipmode), l_shipinstruct, weight_string(l_shipinstruct)", - "Query": "select l_partkey, l_quantity, l_shipmode, l_shipinstruct, sum(l_extendedprice * (1 - l_discount)) as revenue, weight_string(l_partkey), weight_string(l_quantity), weight_string(l_shipmode), weight_string(l_shipinstruct) from lineitem group by l_partkey, weight_string(l_partkey), l_quantity, weight_string(l_quantity), l_shipmode, weight_string(l_shipmode), l_shipinstruct, weight_string(l_shipinstruct)", + "FieldQuery": "select sum(l_extendedprice * (1 - l_discount)) as revenue, l_partkey, l_quantity, l_shipmode, l_shipinstruct from lineitem where 1 != 1 group by l_partkey, l_quantity, l_shipmode, l_shipinstruct", + "Query": "select sum(l_extendedprice * (1 - l_discount)) as revenue, l_partkey, l_quantity, l_shipmode, l_shipinstruct from lineitem group by l_partkey, l_quantity, l_shipmode, l_shipinstruct", "Table": "lineitem" }, { @@ -1321,8 +1402,8 @@ "Name": "main", "Sharded": true }, - "FieldQuery": "select 1, count(*) from part where 1 != 1 group by 1", - "Query": "select 1, count(*) from part where p_partkey = :l_partkey and p_brand = 'Brand#12' and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') and :l_quantity >= 1 and :l_quantity <= 1 + 10 and p_size between 1 and 5 and :l_shipmode in ('AIR', 'AIR REG') and :l_shipinstruct = 'DELIVER IN PERSON' or p_partkey = :l_partkey and p_brand = 'Brand#23' and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') and :l_quantity >= 10 and :l_quantity <= 10 + 10 and p_size between 1 and 10 and :l_shipmode in ('AIR', 'AIR REG') and :l_shipinstruct = 'DELIVER IN PERSON' or p_partkey = :l_partkey and p_brand = 'Brand#34' and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') and :l_quantity >= 20 and :l_quantity <= 20 + 10 and p_size between 1 and 15 and :l_shipmode in ('AIR', 'AIR REG') and :l_shipinstruct = 'DELIVER IN PERSON' group by 1", + "FieldQuery": "select count(*) from part where 1 != 1 group by .0", + "Query": "select count(*) from part where p_partkey = :l_partkey and p_brand = 'Brand#12' and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') and :l_quantity >= 1 and :l_quantity <= 1 + 10 and p_size between 1 and 5 and :l_shipmode in ('AIR', 'AIR REG') and :l_shipinstruct = 'DELIVER IN PERSON' or p_partkey = :l_partkey and p_brand = 'Brand#23' and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') and :l_quantity >= 10 and :l_quantity <= 10 + 10 and p_size between 1 and 10 and :l_shipmode in ('AIR', 'AIR REG') and :l_shipinstruct = 'DELIVER IN PERSON' or p_partkey = :l_partkey and p_brand = 'Brand#34' and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') and :l_quantity >= 20 and :l_quantity <= 20 + 10 and p_size between 1 and 15 and :l_shipmode in ('AIR', 'AIR REG') and :l_shipinstruct = 'DELIVER IN PERSON' group by .0", "Table": "part" } ] diff --git a/go/vt/vtgate/planbuilder/testdata/unsupported_cases.json b/go/vt/vtgate/planbuilder/testdata/unsupported_cases.json index 245ac390ce4..3676c09ead9 100644 --- a/go/vt/vtgate/planbuilder/testdata/unsupported_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/unsupported_cases.json @@ -378,12 +378,6 @@ "v3-plan": "VT12001: unsupported: in scatter query: complex aggregate expression", "gen4-plan": "VT12001: unsupported: in scatter query: aggregation function 'avg(id)'" }, - { - "comment": "scatter aggregate with ambiguous aliases", - "query": "select distinct a, b as a from user", - "v3-plan": "generating ORDER BY clause: VT03021: ambiguous column reference: a", - "gen4-plan": "VT13001: [BUG] generating ORDER BY clause: ambiguous symbol reference: a" - }, { "comment": "outer and inner subquery route reference the same \"uu.id\" name\n# but they refer to different things. The first reference is to the outermost query,\n# and the second reference is to the innermost 'from' subquery.\n# This query will never work as the inner derived table is only selecting one of the column", "query": "select id2 from user uu where id in (select id from user where id = uu.id and user.col in (select col from (select id from user_extra where user_id = 5) uu where uu.user_id = uu.id))", @@ -420,12 +414,6 @@ "query": "with x as (select * from user) select * from x union select * from x", "plan": "VT12001: unsupported: WITH expression in UNION statement" }, - { - "comment": "scatter aggregate with complex select list (can't build order by)", - "query": "select distinct a+1 from user", - "v3-plan": "generating ORDER BY clause: VT12001: unsupported: reference a complex expression", - "gen4-plan": "VT13001: [BUG] in scatter query: complex ORDER BY expression: a + 1" - }, { "comment": "aggregation on union", "query": "select sum(col) from (select col from user union all select col from unsharded) t", diff --git a/go/vt/vttest/local_cluster.go b/go/vt/vttest/local_cluster.go index 389f21b3e1f..3a095eb8b2d 100644 --- a/go/vt/vttest/local_cluster.go +++ b/go/vt/vttest/local_cluster.go @@ -22,6 +22,8 @@ import ( "context" "encoding/json" "fmt" + "io" + "net/http" "os" "os/exec" "path" @@ -760,3 +762,28 @@ func LoadSQLFile(filename, sourceroot string) ([]string, error) { return sql, nil } + +func (db *LocalCluster) VTProcess() *VtProcess { + return db.vt +} + +// ReadVSchema reads the vschema from the vtgate endpoint for it and returns +// a pointer to the interface. To read this vschema, the caller must convert it to a map +func (vt *VtProcess) ReadVSchema() (*interface{}, error) { + httpClient := &http.Client{Timeout: 5 * time.Second} + resp, err := httpClient.Get(fmt.Sprintf("http://%s:%d/debug/vschema", "127.0.0.1", vt.Port)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + res, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + var results interface{} + err = json.Unmarshal(res, &results) + if err != nil { + return nil, err + } + return &results, nil +}