Skip to content

Commit

Permalink
sql: refactor some validation with ieProto.WithTxn() and planner.With…
Browse files Browse the repository at this point in the history
…Txn()

This commit is to provides example to refactor the usages of internal executor
with the new interface. Idealy, if a planner is involved, use the query
functions for `sql.planner`. Otherwise, if the query is to run with a not-nil
txn, we should use ieProto.WithTxn().

Release note: None.
  • Loading branch information
ZhouXing19 committed Jun 15, 2022
1 parent 0961197 commit 287e4ff
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 59 deletions.
114 changes: 97 additions & 17 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/storageparam"
"github.com/cockroachdb/cockroach/pkg/sql/storageparam/tablestorageparam"
Expand Down Expand Up @@ -554,10 +556,7 @@ func (n *alterTableNode) startExec(params runParams) error {
return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState,
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateCheckInTxn(
params.ctx, &params.p.semaCtx, params.ExecCfg().InternalExecutorFactory,
params.SessionData(), n.tableDesc, params.p.Txn(), ck.Expr,
); err != nil {
if err := params.p.validateCheckInTxn(params.ctx, n.tableDesc, ck.Expr); err != nil {
return err
}
ck.Validity = descpb.ConstraintValidity_Validated
Expand All @@ -577,15 +576,7 @@ func (n *alterTableNode) startExec(params runParams) error {
return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState,
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateFkInTxn(
params.ctx,
params.ExecCfg().InternalExecutorFactory,
params.p.SessionData(),
n.tableDesc,
params.p.Txn(),
params.p.Descriptors(),
name,
); err != nil {
if err := params.p.validateFkInTxn(params.ctx, n.tableDesc, name); err != nil {
return err
}
foundFk.Validity = descpb.ConstraintValidity_Validated
Expand All @@ -606,10 +597,8 @@ func (n *alterTableNode) startExec(params runParams) error {
return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState,
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateUniqueWithoutIndexConstraintInTxn(
params.ctx, params.ExecCfg().InternalExecutorFactory(
params.ctx, params.SessionData(),
), n.tableDesc, params.p.Txn(), name,
if err := params.p.validateUniqueWithoutIndexConstraintInTxn(
params.ctx, n.tableDesc, name,
); err != nil {
return err
}
Expand Down Expand Up @@ -2039,3 +2028,94 @@ func (p *planner) tryRemoveFKBackReferences(
tableDesc.InboundFKs = tableDesc.InboundFKs[:sliceIdx]
return nil
}

// validateFkInTxn validates foreign key constraints within the provided
// transaction. The logic is the same as in
// SchemaChanger.validateFkInTxn, but this one is wrapped in a planner context.
func (p *planner) validateFkInTxn(
ctx context.Context,
srcTable *tabledesc.Mutable,
fkName string,
) error {
syntheticDescs, fk, targetTable, err := getTargetTablesAndFk(
ctx,
srcTable,
p.Txn(),
p.Descriptors(),
fkName,
)
if err != nil {
return err
}

p.execCfg.InternalExecutorProto.SyntheticDescs = syntheticDescs
if err := p.WithInternalExecutor(
ctx,
func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
return validateForeignKey(ctx, srcTable, targetTable, fk, ie, txn)
},
); err != nil {
return err
}
return nil
}

// validateCheckInTxn validates check constraints within the provided
// transaction. The logic is the same as in SchemaChanger.validateCheckInTxn,
// but this one is wrapped in a planner context.
func (p *planner) validateCheckInTxn(
ctx context.Context,
tableDesc *tabledesc.Mutable,
checkExpr string,
) error {

var syntheticDescs []catalog.Descriptor
if tableDesc.Version > tableDesc.ClusterVersion().Version {
syntheticDescs = append(syntheticDescs, tableDesc)
}

p.execCfg.InternalExecutorProto.SyntheticDescs = syntheticDescs
if err := p.WithInternalExecutor(
ctx,
func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
return validateCheckExpr(ctx, &p.semaCtx, p.SessionData(), checkExpr, tableDesc, ie, txn)
},
); err != nil {
return err
}
return nil
}

// validateUniqueWithoutIndexConstraintInTxn validates a unique constraint
// within the provided transaction. The logic is the same as in
// SchemaChanger.validateUniqueWithoutIndexConstraintInTxn, but this one is
// wrapped in a planner context.
func (p *planner) validateUniqueWithoutIndexConstraintInTxn(
ctx context.Context,
tableDesc *tabledesc.Mutable,
constraintName string,
) error {
var syntheticDescs []catalog.Descriptor
if tableDesc.Version > tableDesc.ClusterVersion().Version {
syntheticDescs = append(syntheticDescs, tableDesc)
}
var uc *descpb.UniqueWithoutIndexConstraint
for i := range tableDesc.UniqueWithoutIndexConstraints {
def := &tableDesc.UniqueWithoutIndexConstraints[i]
if def.Name == constraintName {
uc = def
break
}
}
if uc == nil {
return errors.AssertionFailedf("unique constraint %s does not exist", constraintName)
}

p.execCfg.InternalExecutorProto.SyntheticDescs = syntheticDescs

return p.WithInternalExecutor(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
return validateUniqueConstraint(
ctx, tableDesc, uc.Name, uc.ColumnIDs, uc.Predicate, ie, txn, false, /* preExisting */
)
})
}
98 changes: 56 additions & 42 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,22 +743,22 @@ func (sc *SchemaChanger) validateConstraints(
// after the check is validated.
defer func() { collection.ReleaseAll(ctx) }()
if c.IsCheck() {
if err := validateCheckInTxn(
ctx, &semaCtx, sc.ieProto.IeFactory, evalCtx.SessionData(), desc, txn, c.Check().Expr,
if err := sc.validateCheckInTxn(
ctx, &semaCtx, evalCtx.SessionData(), desc, txn, c.Check().Expr,
); err != nil {
return err
}
} else if c.IsForeignKey() {
if err := validateFkInTxn(ctx, sc.ieProto.IeFactory, evalCtx.SessionData(), desc, txn, collection, c.GetName()); err != nil {
if err := sc.validateFkInTxn(ctx, evalCtx.SessionData(), desc, txn, collection, c.GetName()); err != nil {
return err
}
} else if c.IsUniqueWithoutIndex() {
if err := validateUniqueWithoutIndexConstraintInTxn(ctx, sc.ieProto.IeFactory(ctx, evalCtx.SessionData()), desc, txn, c.GetName()); err != nil {
if err := sc.validateUniqueWithoutIndexConstraintInTxn(ctx, evalCtx.SessionData(), desc, txn, c.GetName()); err != nil {
return err
}
} else if c.IsNotNull() {
if err := validateCheckInTxn(
ctx, &semaCtx, sc.ieProto.IeFactory, evalCtx.SessionData(), desc, txn, c.Check().Expr,
if err := sc.validateCheckInTxn(
ctx, &semaCtx, evalCtx.SessionData(), desc, txn, c.Check().Expr,
); err != nil {
// TODO (lucy): This should distinguish between constraint
// validation errors and other types of unexpected errors, and
Expand Down Expand Up @@ -2346,10 +2346,7 @@ func runSchemaChangesInTxn(
if c.IsCheck() || c.IsNotNull() {
check := &c.ConstraintToUpdateDesc().Check
if check.Validity == descpb.ConstraintValidity_Validating {
if err := validateCheckInTxn(
ctx, &planner.semaCtx, planner.ExecCfg().InternalExecutorFactory,
planner.SessionData(), tableDesc, planner.txn, check.Expr,
); err != nil {
if err := planner.validateCheckInTxn(ctx, tableDesc, check.Expr); err != nil {
return err
}
check.Validity = descpb.ConstraintValidity_Validated
Expand All @@ -2371,8 +2368,8 @@ func runSchemaChangesInTxn(
} else if c.IsUniqueWithoutIndex() {
uwi := &c.ConstraintToUpdateDesc().UniqueWithoutIndexConstraint
if uwi.Validity == descpb.ConstraintValidity_Validating {
if err := validateUniqueWithoutIndexConstraintInTxn(
ctx, planner.ExecCfg().InternalExecutor, tableDesc, planner.txn, c.GetName(),
if err := planner.validateUniqueWithoutIndexConstraintInTxn(
ctx, tableDesc, c.GetName(),
); err != nil {
return err
}
Expand Down Expand Up @@ -2442,10 +2439,9 @@ func runSchemaChangesInTxn(
//
// It operates entirely on the current goroutine and is thus able to
// reuse an existing kv.Txn safely.
func validateCheckInTxn(
func (sc *SchemaChanger) validateCheckInTxn(
ctx context.Context,
semaCtx *tree.SemaContext,
ief sqlutil.InternalExecutorFactory,
sessionData *sessiondata.SessionData,
tableDesc *tabledesc.Mutable,
txn *kv.Txn,
Expand All @@ -2455,38 +2451,28 @@ func validateCheckInTxn(
if tableDesc.Version > tableDesc.ClusterVersion().Version {
syntheticDescs = append(syntheticDescs, tableDesc)
}
ie := ief(ctx, sessionData)
return ie.WithSyntheticDescriptors(syntheticDescs, func() error {
sc.ieProto.SetSyntheticDescs(syntheticDescs)
return sc.ieProto.WithTxn(ctx, txn, sessionData, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
return validateCheckExpr(ctx, semaCtx, sessionData, checkExpr, tableDesc, ie, txn)
})
}

// validateFkInTxn validates foreign key constraints within the provided
// transaction. If the provided table descriptor version is newer than the
// cluster version, it will be used in the InternalExecutor that performs the
// validation query.
//
// TODO (lucy): The special case where the table descriptor version is the same
// as the cluster version only happens because the query in VALIDATE CONSTRAINT
// still runs in the user transaction instead of a step in the schema changer.
// When that's no longer true, this function should be updated.
//
// It operates entirely on the current goroutine and is thus able to
// reuse an existing kv.Txn safely.
func validateFkInTxn(
func getTargetTablesAndFk(
ctx context.Context,
ief sqlutil.InternalExecutorFactory,
sd *sessiondata.SessionData,
srcTable *tabledesc.Mutable,
txn *kv.Txn,
descsCol *descs.Collection,
fkName string,
) error {
) (
syntheticDescs []catalog.Descriptor,
fk *descpb.ForeignKeyConstraint,
targetTable catalog.TableDescriptor,
err error,
) {
var syntheticTable catalog.TableDescriptor
if srcTable.Version > srcTable.ClusterVersion().Version {
syntheticTable = srcTable
}
var fk *descpb.ForeignKeyConstraint
for i := range srcTable.OutboundFKs {
def := &srcTable.OutboundFKs[i]
if def.Name == fkName {
Expand All @@ -2495,21 +2481,48 @@ func validateFkInTxn(
}
}
if fk == nil {
return errors.AssertionFailedf("foreign key %s does not exist", fkName)
return nil, nil, nil, errors.AssertionFailedf("foreign key %s does not exist", fkName)
}
targetTable, err := descsCol.Direct().MustGetTableDescByID(ctx, txn, fk.ReferencedTableID)
targetTable, err = descsCol.Direct().MustGetTableDescByID(ctx, txn, fk.ReferencedTableID)
if err != nil {
return err
return nil, nil, nil, err
}
var syntheticDescs []catalog.Descriptor

if syntheticTable != nil {
syntheticDescs = append(syntheticDescs, syntheticTable)
if targetTable.GetID() == syntheticTable.GetID() {
targetTable = syntheticTable
}
}
ie := ief(ctx, sd)
return ie.WithSyntheticDescriptors(syntheticDescs, func() error {
return syntheticDescs, fk, targetTable, nil
}

// validateFkInTxn validates foreign key constraints within the provided
// transaction. If the provided table descriptor version is newer than the
// cluster version, it will be used in the InternalExecutor that performs the
// validation query.
//
// TODO (lucy): The special case where the table descriptor version is the same
// as the cluster version only happens because the query in VALIDATE CONSTRAINT
// still runs in the user transaction instead of a step in the schema changer.
// When that's no longer true, this function should be updated.
//
// It operates entirely on the current goroutine and is thus able to
// reuse an existing kv.Txn safely.
func (sc *SchemaChanger) validateFkInTxn(
ctx context.Context,
sd *sessiondata.SessionData,
srcTable *tabledesc.Mutable,
txn *kv.Txn,
descsCol *descs.Collection,
fkName string,
) error {
syntheticDescs, fk, targetTable, err := getTargetTablesAndFk(ctx, srcTable, txn, descsCol, fkName)
if err != nil {
return err
}
sc.ieProto.SetSyntheticDescs(syntheticDescs)
return sc.ieProto.WithTxn(ctx, txn, sd, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
return validateForeignKey(ctx, srcTable, targetTable, fk, ie, txn)
})
}
Expand All @@ -2526,9 +2539,9 @@ func validateFkInTxn(
//
// It operates entirely on the current goroutine and is thus able to
// reuse an existing kv.Txn safely.
func validateUniqueWithoutIndexConstraintInTxn(
func (sc *SchemaChanger) validateUniqueWithoutIndexConstraintInTxn(
ctx context.Context,
ie sqlutil.InternalExecutor,
sd *sessiondata.SessionData,
tableDesc *tabledesc.Mutable,
txn *kv.Txn,
constraintName string,
Expand All @@ -2537,6 +2550,7 @@ func validateUniqueWithoutIndexConstraintInTxn(
if tableDesc.Version > tableDesc.ClusterVersion().Version {
syntheticDescs = append(syntheticDescs, tableDesc)
}
sc.ieProto.SetSyntheticDescs(syntheticDescs)

var uc *descpb.UniqueWithoutIndexConstraint
for i := range tableDesc.UniqueWithoutIndexConstraints {
Expand All @@ -2550,7 +2564,7 @@ func validateUniqueWithoutIndexConstraintInTxn(
return errors.AssertionFailedf("unique constraint %s does not exist", constraintName)
}

return ie.WithSyntheticDescriptors(syntheticDescs, func() error {
return sc.ieProto.WithTxn(ctx, txn, sd, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
return validateUniqueConstraint(
ctx, tableDesc, uc.Name, uc.ColumnIDs, uc.Predicate, ie, txn, false, /* preExisting */
)
Expand Down

0 comments on commit 287e4ff

Please sign in to comment.