Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INSERTs having Cross shard foreign keys support #13859

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/fk_verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (f *FkVerify) Inputs() ([]Primitive, []map[string]any) {
inputName: fmt.Sprintf("VerifyParent-%d", idx+1),
"BvName": parent.BvName,
"Cols": parent.Cols,
"Values": parent.Values,
})
inputs = append(inputs, parent.Exec)
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func gen4DeleteStmtPlanner(
}

if ks, tables := ctx.SemTable.SingleUnshardedKeyspace(); ks != nil {
if fkManagementNotRequired(vschema, tables) {
if fkManagementNotRequiredForDelete(vschema, tables) {
plan := deleteUnshardedShortcut(deleteStmt, ks, tables)
plan = pushCommentDirectivesOnPlan(plan, deleteStmt)
return newPlanResult(plan.Primitive(), operators.QualifiedTables(ks, tables)...), nil
Expand Down Expand Up @@ -94,7 +94,7 @@ func gen4DeleteStmtPlanner(
return newPlanResult(plan.Primitive(), operators.TablesUsed(op)...), nil
}

func fkManagementNotRequired(vschema plancontext.VSchema, vTables []*vindexes.Table) bool {
func fkManagementNotRequiredForDelete(vschema plancontext.VSchema, vTables []*vindexes.Table) bool {
// Find the foreign key mode and check for any managed child foreign keys.
for _, vTable := range vTables {
ksMode, err := vschema.ForeignKeyMode(vTable.Keyspace.Name)
Expand Down
78 changes: 78 additions & 0 deletions go/vt/vtgate/planbuilder/fk_verify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
Copyright 2023 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package planbuilder

import (
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
"vitess.io/vitess/go/vt/vtgate/semantics"
)

var _ logicalPlan = (*fkVerify)(nil)

// fkVerify is the logicalPlan for engine.FkVerify.
type fkVerify struct {
input logicalPlan
verify []*engine.FkParent
}

// newFkVerify builds a new fkVerify.
func newFkVerify(input logicalPlan, verify []*engine.FkParent) *fkVerify {
return &fkVerify{
input: input,
verify: verify,
}
}

// Primitive implements the logicalPlan interface
func (fkc *fkVerify) Primitive() engine.Primitive {
return &engine.FkVerify{
Exec: fkc.input.Primitive(),
Verify: fkc.verify,
}
}

// Wireup implements the logicalPlan interface
func (fkc *fkVerify) Wireup(ctx *plancontext.PlanningContext) error {
return fkc.input.Wireup(ctx)
}

// Rewrite implements the logicalPlan interface
func (fkc *fkVerify) Rewrite(inputs ...logicalPlan) error {
if len(inputs) != 1 {
return vterrors.VT13001("fkVerify: wrong number of inputs")
}
fkc.input = inputs[0]
return nil
}

// ContainsTables implements the logicalPlan interface
func (fkc *fkVerify) ContainsTables() semantics.TableSet {
return fkc.input.ContainsTables()
}

// Inputs implements the logicalPlan interface
func (fkc *fkVerify) Inputs() []logicalPlan {
return []logicalPlan{fkc.input}
}

// OutputColumns implements the logicalPlan interface
func (fkc *fkVerify) OutputColumns() []sqlparser.SelectExpr {
return nil
}
37 changes: 36 additions & 1 deletion go/vt/vtgate/planbuilder/operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,52 @@ func transformToLogicalPlan(ctx *plancontext.PlanningContext, op ops.Operator) (
return transformDistinct(ctx, op)
case *operators.FkCascade:
return transformFkCascade(ctx, op)
case *operators.FkVerify:
return transformFkVerify(ctx, op)
}

return nil, vterrors.VT13001(fmt.Sprintf("unknown type encountered: %T (transformToLogicalPlan)", op))
}

func transformFkVerify(ctx *plancontext.PlanningContext, fkv *operators.FkVerify) (logicalPlan, error) {
inputLP, err := transformToLogicalPlan(ctx, fkv.Input)
if err != nil {
return nil, err
}

// Once we have the input logical plan, we can create the primitives for the verification operators.
// For all of these, we don't need the semTable anymore. We set it to nil, to avoid using an incorrect one.
ctx.SemTable = nil

// Go over the children and convert them to Primitives too.
var parents []*engine.FkParent
for _, verify := range fkv.Verify {
verifyLP, err := transformToLogicalPlan(ctx, verify.Op)
if err != nil {
return nil, err
}
err = verifyLP.Wireup(ctx)
if err != nil {
return nil, err
}
verifyEngine := verifyLP.Primitive()
parents = append(parents, &engine.FkParent{
BvName: verify.BvName,
Values: verify.Values,
Cols: verify.Cols,
Exec: verifyEngine,
})
}

return newFkVerify(inputLP, parents), nil
}

// transformFkCascade transforms a FkCascade operator into a logical plan.
func transformFkCascade(ctx *plancontext.PlanningContext, fkc *operators.FkCascade) (logicalPlan, error) {
// We convert the parent operator to a logical plan.
parentLP, err := transformToLogicalPlan(ctx, fkc.Parent)
if err != nil {
return nil, nil
return nil, err
}

// Once we have the parent logical plan, we can create the selection logical plan and the primitives for the children operators.
Expand Down
137 changes: 125 additions & 12 deletions go/vt/vtgate/planbuilder/operators/ast2op.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strconv"

"vitess.io/vitess/go/mysql/collations"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -132,6 +133,130 @@ func createOperatorFromInsert(ctx *plancontext.PlanningContext, ins *sqlparser.I
return nil, err
}

insOp, err := createInsertOperator(ctx, ins, routing, vindexTable)
if err != nil {
return nil, err
}

// Find the foreign key mode and store the ParentFKs that we need to verify.
ksMode, err := ctx.VSchema.ForeignKeyMode(vindexTable.Keyspace.Name)
if err != nil {
return nil, err
}
if ksMode != vschemapb.Keyspace_FK_MANAGED {
return insOp, nil
}
parentFKs := vindexTable.ParentFKsNeedsHandling()
if len(parentFKs) == 0 {
return insOp, nil
}
// TODO(@GuptaManan100, @harshit-gangal): Reject some queries that are unsupported
// return nil, vterrors.VT12002()

insValues, isValues := ins.Rows.(sqlparser.Values)
if !isValues {
// TODO(@GuptaManan100, @harshit-gangal) - Fix error message
return nil, vterrors.VT12001()
}

// For each fk we create a FkParent
var verify []*FkParent
for _, fk := range parentFKs {
// Verification query looks like -
// SELECT distinct <parent cols in foreign keys> from parent where (<parent cols>) in ::fkv_vals
bvName := ctx.ReservedVars.ReserveVariable("fkv_vals")

var valTuple sqlparser.ValTuple
var selExprs sqlparser.SelectExprs
for _, column := range fk.ParentColumns {
valTuple = append(valTuple, sqlparser.NewColName(column.String()))
selExprs = append(selExprs, aeWrap(sqlparser.NewColName(column.String())))
}

selStmt := &sqlparser.Select{
Distinct: true,
SelectExprs: selExprs,
From: []sqlparser.TableExpr{
sqlparser.NewAliasedTableExpr(fk.Table.GetTableName(), ""),
},
Where: &sqlparser.Where{
Type: sqlparser.WhereClause,
Expr: &sqlparser.ComparisonExpr{
Operator: sqlparser.InOp,
Left: valTuple,
Right: sqlparser.NewListArg(bvName),
},
},
}
selOp, err := createOpFromStmt(ctx, selStmt)
if err != nil {
return nil, err
}

var checkCols []engine.CheckCol
for idx, column := range fk.ChildColumns {
checkCol := engine.CheckCol{
Col: idx,
}
found := false
for _, col := range vindexTable.Columns {
if column.Equal(col.Name) {
found = true
checkCol.Type = col.Type
if col.CollationName != "" {
// TODO(@GuptaManan100, @harshit-gangal) - Convert Collation from string to ID.
} else {
checkCol.Collation = collations.CollationBinaryID
}
break
}
}
if !found {
// TODO(@GuptaManan100, @harshit-gangal) - Fix error message
return nil, vterrors.VT12002()
}
checkCols = append(checkCols, checkCol)
}

var colIdxs []int
for _, column := range fk.ChildColumns {
found := false
for idx, col := range ins.Columns {
if column.Equal(col) {
found = true
colIdxs = append(colIdxs, idx)
break
}
}
if !found {
// TODO(@GuptaManan100, @harshit-gangal) - Fix error message
return nil, vterrors.VT12002()
}
}

var vals []sqlparser.Exprs
for _, tuple := range insValues {
var exprs sqlparser.Exprs
for _, idx := range colIdxs {
exprs = append(exprs, tuple[idx])
}
vals = append(vals, exprs)
}
verify = append(verify, &FkParent{
Values: vals,
Cols: checkCols,
BvName: bvName,
Op: selOp,
})
}

return &FkVerify{
Input: insOp,
Verify: verify,
}, nil
}

func createInsertOperator(ctx *plancontext.PlanningContext, ins *sqlparser.Insert, routing Routing, vindexTable *vindexes.Table) (ops.Operator, error) {
if _, target := routing.(*TargetedRouting); target {
return nil, vterrors.VT12001("INSERT with a target destination")
}
Expand All @@ -145,18 +270,6 @@ func createOperatorFromInsert(ctx *plancontext.PlanningContext, ins *sqlparser.I
Routing: routing,
}

// Find the foreign key mode and store the ParentFKs that we need to verify.
ksMode, err := ctx.VSchema.ForeignKeyMode(vindexTable.Keyspace.Name)
if err != nil {
return nil, err
}
if ksMode == vschemapb.Keyspace_FK_MANAGED {
parentFKs := vindexTable.ParentFKsNeedsHandling()
if len(parentFKs) > 0 {
return nil, vterrors.VT12002()
}
}

// Table column list is nil then add all the columns
// If the column list is empty then add only the auto-inc column and
// this happens on calling modifyForAutoinc
Expand Down
Loading