From d0d33fb9d21eec74a2a657ed198a3d7d9ce70cbd Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 30 Jul 2024 14:01:28 +0530 Subject: [PATCH] fix: reference table join merge (#16488) Signed-off-by: Harshit Gangal Signed-off-by: Andres Taylor Co-authored-by: Andres Taylor --- .../vtgate/queries/reference/main_test.go | 69 +++---------------- .../queries/reference/reference_test.go | 32 ++++++--- .../vtgate/queries/reference/sschema.sql | 6 ++ .../vtgate/queries/reference/svschema.json | 22 ++++++ .../vtgate/queries/reference/uschema.sql | 17 +++++ .../vtgate/queries/reference/uvschema.json | 6 ++ .../planbuilder/operators/join_merging.go | 36 +++++++++- go/vt/vtgate/planbuilder/operators/joins.go | 5 +- .../planbuilder/testdata/reference_cases.json | 25 +++++++ 9 files changed, 147 insertions(+), 71 deletions(-) create mode 100644 go/test/endtoend/vtgate/queries/reference/sschema.sql create mode 100644 go/test/endtoend/vtgate/queries/reference/svschema.json create mode 100644 go/test/endtoend/vtgate/queries/reference/uschema.sql create mode 100644 go/test/endtoend/vtgate/queries/reference/uvschema.json diff --git a/go/test/endtoend/vtgate/queries/reference/main_test.go b/go/test/endtoend/vtgate/queries/reference/main_test.go index 4c9440ca4ff..c350038bf6e 100644 --- a/go/test/endtoend/vtgate/queries/reference/main_test.go +++ b/go/test/endtoend/vtgate/queries/reference/main_test.go @@ -18,6 +18,7 @@ package reference import ( "context" + _ "embed" "flag" "fmt" "os" @@ -39,68 +40,16 @@ var ( vtParams mysql.ConnParams unshardedKeyspaceName = "uks" - unshardedSQLSchema = ` - CREATE TABLE IF NOT EXISTS zip( - id BIGINT NOT NULL AUTO_INCREMENT, - code5 INT(5) NOT NULL, - PRIMARY KEY(id) - ) ENGINE=InnoDB; + //go:embed uschema.sql + unshardedSQLSchema string + //go:embed uvschema.json + unshardedVSchema string - INSERT INTO zip(id, code5) - VALUES (1, 47107), - (2, 82845), - (3, 11237); - - CREATE TABLE IF NOT EXISTS zip_detail( - id BIGINT NOT NULL AUTO_INCREMENT, - zip_id BIGINT NOT NULL, - discontinued_at DATE, - PRIMARY KEY(id) - ) ENGINE=InnoDB; - - ` - unshardedVSchema = ` - { - "sharded":false, - "tables": { - "zip": {}, - "zip_detail": {} - } - } - ` shardedKeyspaceName = "sks" - shardedSQLSchema = ` - CREATE TABLE IF NOT EXISTS delivery_failure ( - id BIGINT NOT NULL, - zip_detail_id BIGINT NOT NULL, - reason VARCHAR(255), - PRIMARY KEY(id) - ) ENGINE=InnoDB; - ` - shardedVSchema = ` - { - "sharded": true, - "vindexes": { - "hash": { - "type": "hash" - } - }, - "tables": { - "delivery_failure": { - "columnVindexes": [ - { - "column": "id", - "name": "hash" - } - ] - }, - "zip_detail": { - "type": "reference", - "source": "` + unshardedKeyspaceName + `.zip_detail" - } - } - } - ` + //go:embed sschema.sql + shardedSQLSchema string + //go:embed svschema.json + shardedVSchema string ) func TestMain(m *testing.M) { diff --git a/go/test/endtoend/vtgate/queries/reference/reference_test.go b/go/test/endtoend/vtgate/queries/reference/reference_test.go index 72e61196e20..4dccc832f8c 100644 --- a/go/test/endtoend/vtgate/queries/reference/reference_test.go +++ b/go/test/endtoend/vtgate/queries/reference/reference_test.go @@ -90,14 +90,14 @@ func TestReferenceRouting(t *testing.T) { t, conn, `SELECT t.id FROM ( - SELECT zd.id, zd.zip_id - FROM `+shardedKeyspaceName+`.zip_detail AS zd - WHERE zd.id IN (2) - ORDER BY zd.discontinued_at - LIMIT 1 - ) AS t - LEFT JOIN `+shardedKeyspaceName+`.zip_detail AS t0 ON t.zip_id = t0.zip_id - ORDER BY t.id`, + SELECT zd.id, zd.zip_id + FROM `+shardedKeyspaceName+`.zip_detail AS zd + WHERE zd.id IN (2) + ORDER BY zd.discontinued_at + LIMIT 1 + ) AS t + LEFT JOIN `+shardedKeyspaceName+`.zip_detail AS t0 ON t.zip_id = t0.zip_id + ORDER BY t.id`, `[[INT64(2)]]`, ) }) @@ -156,3 +156,19 @@ func TestReferenceRouting(t *testing.T) { `[[INT64(2)]]`, ) } + +// TestMultiReferenceQuery tests that a query with multiple references with unsharded keyspace and sharded keyspace works with join. +func TestMultiReferenceQuery(t *testing.T) { + utils.SkipIfBinaryIsBelowVersion(t, 21, "vtgate") + conn, closer := start(t) + defer closer() + + query := + `select 1 + from delivery_failure df1 + join delivery_failure df2 on df1.id = df2.id + join uks.zip_detail zd1 on df1.zip_detail_id = zd1.zip_id + join uks.zip_detail zd2 on zd1.zip_id = zd2.zip_id` + + utils.Exec(t, conn, query) +} diff --git a/go/test/endtoend/vtgate/queries/reference/sschema.sql b/go/test/endtoend/vtgate/queries/reference/sschema.sql new file mode 100644 index 00000000000..0fcaf63a422 --- /dev/null +++ b/go/test/endtoend/vtgate/queries/reference/sschema.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS delivery_failure ( + id BIGINT NOT NULL, + zip_detail_id BIGINT NOT NULL, + reason VARCHAR(255), + PRIMARY KEY(id) +) ENGINE=InnoDB; \ No newline at end of file diff --git a/go/test/endtoend/vtgate/queries/reference/svschema.json b/go/test/endtoend/vtgate/queries/reference/svschema.json new file mode 100644 index 00000000000..815e0e8d21c --- /dev/null +++ b/go/test/endtoend/vtgate/queries/reference/svschema.json @@ -0,0 +1,22 @@ +{ + "sharded": true, + "vindexes": { + "hash": { + "type": "hash" + } + }, + "tables": { + "delivery_failure": { + "columnVindexes": [ + { + "column": "id", + "name": "hash" + } + ] + }, + "zip_detail": { + "type": "reference", + "source": "uks.zip_detail" + } + } +} \ No newline at end of file diff --git a/go/test/endtoend/vtgate/queries/reference/uschema.sql b/go/test/endtoend/vtgate/queries/reference/uschema.sql new file mode 100644 index 00000000000..52737928469 --- /dev/null +++ b/go/test/endtoend/vtgate/queries/reference/uschema.sql @@ -0,0 +1,17 @@ +CREATE TABLE IF NOT EXISTS zip( + id BIGINT NOT NULL AUTO_INCREMENT, + code5 INT(5) NOT NULL, + PRIMARY KEY(id) +) ENGINE=InnoDB; + +INSERT INTO zip(id, code5) +VALUES (1, 47107), + (2, 82845), + (3, 11237); + +CREATE TABLE IF NOT EXISTS zip_detail( + id BIGINT NOT NULL AUTO_INCREMENT, + zip_id BIGINT NOT NULL, + discontinued_at DATE, + PRIMARY KEY(id) +) ENGINE=InnoDB; \ No newline at end of file diff --git a/go/test/endtoend/vtgate/queries/reference/uvschema.json b/go/test/endtoend/vtgate/queries/reference/uvschema.json new file mode 100644 index 00000000000..fdcfca0d7a9 --- /dev/null +++ b/go/test/endtoend/vtgate/queries/reference/uvschema.json @@ -0,0 +1,6 @@ +{ + "tables": { + "zip": {}, + "zip_detail": {} + } +} \ No newline at end of file diff --git a/go/vt/vtgate/planbuilder/operators/join_merging.go b/go/vt/vtgate/planbuilder/operators/join_merging.go index f31259fddc5..de002bb7196 100644 --- a/go/vt/vtgate/planbuilder/operators/join_merging.go +++ b/go/vt/vtgate/planbuilder/operators/join_merging.go @@ -23,12 +23,13 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" + "vitess.io/vitess/go/vt/vtgate/vindexes" ) // mergeJoinInputs checks whether two operators can be merged into a single one. // If they can be merged, a new operator with the merged routing is returned // If they cannot be merged, nil is returned. -func mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, joinPredicates []sqlparser.Expr, m merger) (*Route, error) { +func mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, joinPredicates []sqlparser.Expr, m *joinMerger) (*Route, error) { lhsRoute, rhsRoute, routingA, routingB, a, b, sameKeyspace := prepareInputRoutes(lhs, rhs) if lhsRoute == nil { return nil, nil @@ -41,6 +42,14 @@ func mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, jo case b == dual: return m.merge(ctx, lhsRoute, rhsRoute, routingA) + // As both are reference route. We need to merge the alternates as well. + case a == anyShard && b == anyShard && sameKeyspace: + newrouting, err := mergeAnyShardRoutings(ctx, routingA.(*AnyShardRouting), routingB.(*AnyShardRouting), joinPredicates, m.innerJoin) + if err != nil { + return nil, err + } + return m.merge(ctx, lhsRoute, rhsRoute, newrouting) + // an unsharded/reference route can be merged with anything going to that keyspace case a == anyShard && sameKeyspace: return m.merge(ctx, lhsRoute, rhsRoute, routingB) @@ -66,6 +75,29 @@ func mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, jo } } +func mergeAnyShardRoutings(ctx *plancontext.PlanningContext, a, b *AnyShardRouting, joinPredicates []sqlparser.Expr, innerJoin bool) (*AnyShardRouting, error) { + alternates := make(map[*vindexes.Keyspace]*Route) + for ak, av := range a.Alternates { + for bk, bv := range b.Alternates { + // only same keyspace alternates can be merged. + if ak != bk { + continue + } + op, _, err := mergeOrJoin(ctx, av, bv, joinPredicates, innerJoin) + if err != nil { + return nil, err + } + if r, ok := op.(*Route); ok { + alternates[ak] = r + } + } + } + return &AnyShardRouting{ + keyspace: a.keyspace, + Alternates: alternates, + }, nil +} + func prepareInputRoutes(lhs ops.Operator, rhs ops.Operator) (*Route, *Route, Routing, Routing, routingType, routingType, bool) { lhsRoute, rhsRoute := operatorsToRoutes(lhs, rhs) if lhsRoute == nil || rhsRoute == nil { @@ -177,7 +209,7 @@ func getRoutingType(r Routing) routingType { panic(fmt.Sprintf("switch should be exhaustive, got %T", r)) } -func newJoinMerge(predicates []sqlparser.Expr, innerJoin bool) merger { +func newJoinMerge(predicates []sqlparser.Expr, innerJoin bool) *joinMerger { return &joinMerger{ predicates: predicates, innerJoin: innerJoin, diff --git a/go/vt/vtgate/planbuilder/operators/joins.go b/go/vt/vtgate/planbuilder/operators/joins.go index 2d7451b83d4..f9381edae85 100644 --- a/go/vt/vtgate/planbuilder/operators/joins.go +++ b/go/vt/vtgate/planbuilder/operators/joins.go @@ -17,7 +17,10 @@ limitations under the License. package operators import ( + "fmt" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" "vitess.io/vitess/go/vt/vtgate/semantics" @@ -92,7 +95,7 @@ func AddPredicate( return join, nil } - return nil, nil + return nil, vterrors.VT13001(fmt.Sprintf("pushed wrong predicate to the join: %s", sqlparser.String(expr))) } // we are looking for predicates like `tbl.col = <>` or `<> = tbl.col`, diff --git a/go/vt/vtgate/planbuilder/testdata/reference_cases.json b/go/vt/vtgate/planbuilder/testdata/reference_cases.json index 89cbf2425da..53019a48e12 100644 --- a/go/vt/vtgate/planbuilder/testdata/reference_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/reference_cases.json @@ -922,5 +922,30 @@ "user.user" ] } + }, + { + "comment": "two sharded and two unsharded reference table join - all should be merged into one route", + "query": "select 1 from user u join user_extra ue on u.id = ue.user_id join main.source_of_ref sr on sr.foo = ue.foo join main.rerouted_ref rr on rr.bar = sr.bar", + "plan": { + "QueryType": "SELECT", + "Original": "select 1 from user u join user_extra ue on u.id = ue.user_id join main.source_of_ref sr on sr.foo = ue.foo join main.rerouted_ref rr on rr.bar = sr.bar", + "Instructions": { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select 1 from `user` as u, user_extra as ue, ref_with_source as sr, ref as rr where 1 != 1", + "Query": "select 1 from `user` as u, user_extra as ue, ref_with_source as sr, ref as rr where rr.bar = sr.bar and u.id = ue.user_id and sr.foo = ue.foo", + "Table": "`user`, ref, ref_with_source, user_extra" + }, + "TablesUsed": [ + "user.ref", + "user.ref_with_source", + "user.user", + "user.user_extra" + ] + } } ]