From 068f90bb3bd848f8d9eb48a8a05c3fdbe06c844a Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Mon, 27 Jun 2022 17:41:20 -0400 Subject: [PATCH] sql: refactor validation with collectionFactory.WithTxn() and planner.WithTxn() This commit is to provides example to refactor the usages of internal executor with the new interfaces. Idealy, if a planner is involved, use the query functions for `sql.planner`. Otherwise, if the query is to run with a not-nil txn, we should use collectionFactory.WithTxn(). Release note: None --- pkg/sql/alter_table.go | 37 ++++---- pkg/sql/backfill.go | 184 +++++++++++++++++++++++------------- pkg/sql/check.go | 15 ++- pkg/sql/index_backfiller.go | 3 +- pkg/sql/mvcc_backfiller.go | 3 +- pkg/sql/schema_changer.go | 24 ++++- 6 files changed, 170 insertions(+), 96 deletions(-) diff --git a/pkg/sql/alter_table.go b/pkg/sql/alter_table.go index a80f7f8bac3d..e4818c363e6f 100644 --- a/pkg/sql/alter_table.go +++ b/pkg/sql/alter_table.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -40,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/storageparam" "github.com/cockroachdb/cockroach/pkg/sql/storageparam/tablestorageparam" @@ -554,10 +556,9 @@ func (n *alterTableNode) startExec(params runParams) error { return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState, "constraint %q in the middle of being added, try again later", t.Constraint) } - if err := validateCheckInTxn( - params.ctx, ¶ms.p.semaCtx, params.ExecCfg().InternalExecutorFactory, - params.SessionData(), n.tableDesc, params.p.Txn(), ck.Expr, - ); err != nil { + if err := params.p.WithInternalExecutor(params.ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + return validateCheckInTxn(ctx, ¶ms.p.semaCtx, params.p.SessionData(), n.tableDesc, txn, ie, ck.Expr) + }); err != nil { return err } ck.Validity = descpb.ConstraintValidity_Validated @@ -577,19 +578,12 @@ func (n *alterTableNode) startExec(params runParams) error { return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState, "constraint %q in the middle of being added, try again later", t.Constraint) } - if err := validateFkInTxn( - params.ctx, - params.ExecCfg().InternalExecutorFactory, - params.p.SessionData(), - n.tableDesc, - params.p.Txn(), - params.p.Descriptors(), - name, - ); err != nil { + if err := params.p.WithInternalExecutor(params.ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + return validateFkInTxn(ctx, n.tableDesc, txn, ie, params.p.descCollection, name) + }); err != nil { return err } foundFk.Validity = descpb.ConstraintValidity_Validated - case descpb.ConstraintTypeUnique: if constraint.Index == nil { var foundUnique *descpb.UniqueWithoutIndexConstraint @@ -606,11 +600,16 @@ func (n *alterTableNode) startExec(params runParams) error { return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState, "constraint %q in the middle of being added, try again later", t.Constraint) } - if err := validateUniqueWithoutIndexConstraintInTxn( - params.ctx, params.ExecCfg().InternalExecutorFactory.NewInternalExecutor( - params.SessionData(), - ), n.tableDesc, params.p.Txn(), params.p.User(), name, - ); err != nil { + if err := params.p.WithInternalExecutor(params.ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + return validateUniqueWithoutIndexConstraintInTxn( + params.ctx, + n.tableDesc, + txn, + ie, + params.p.User(), + name, + ) + }); err != nil { return err } foundUnique.Validity = descpb.ConstraintValidity_Validated diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index eef01b049221..45453d71df84 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -137,7 +137,7 @@ func (sc *SchemaChanger) getChunkSize(chunkSize int64) int64 { } // scTxnFn is the type of functions that operates using transactions in the backfiller. -type scTxnFn func(ctx context.Context, txn *kv.Txn, evalCtx *extendedEvalContext) error +type scTxnFn func(ctx context.Context, txn *kv.Txn, evalCtx *extendedEvalContext, ie sqlutil.InternalExecutor) error // historicalTxnRunner is the type of the callback used by the various // helper functions to run checks at a fixed timestamp (logically, at @@ -147,12 +147,16 @@ type historicalTxnRunner func(ctx context.Context, fn scTxnFn) error // makeFixedTimestampRunner creates a historicalTxnRunner suitable for use by the helpers. func (sc *SchemaChanger) makeFixedTimestampRunner(readAsOf hlc.Timestamp) historicalTxnRunner { runner := func(ctx context.Context, retryable scTxnFn) error { - return sc.fixedTimestampTxn(ctx, readAsOf, func( - ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + return sc.fixedTimestampTxnWithExecutor(ctx, readAsOf, func( + ctx context.Context, + txn *kv.Txn, + sd *sessiondata.SessionData, + descriptors *descs.Collection, + ie sqlutil.InternalExecutor, ) error { // We need to re-create the evalCtx since the txn may retry. - evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, readAsOf, descriptors) - return retryable(ctx, txn, &evalCtx) + evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, sd, readAsOf, descriptors) + return retryable(ctx, txn, &evalCtx, ie) }) } return runner @@ -187,6 +191,26 @@ func (sc *SchemaChanger) fixedTimestampTxn( }) } +func (sc *SchemaChanger) fixedTimestampTxnWithExecutor( + ctx context.Context, + readAsOf hlc.Timestamp, + retryable func( + ctx context.Context, + txn *kv.Txn, + sd *sessiondata.SessionData, + descriptors *descs.Collection, + ie sqlutil.InternalExecutor, + ) error, +) error { + sd := NewFakeSessionData(sc.execCfg.SV()) + return sc.txnWithExecutor(ctx, sd, func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor) error { + if err := txn.SetFixedTimestamp(ctx, readAsOf); err != nil { + return err + } + return retryable(ctx, txn, sd, descriptors, ie) + }) +} + // runBackfill runs the backfill for the schema changer. // // This operates over multiple goroutines concurrently and is thus not @@ -739,7 +763,7 @@ func (sc *SchemaChanger) validateConstraints( } desc := descI.(*tabledesc.Mutable) // Each check operates at the historical timestamp. - return runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, evalCtx *extendedEvalContext) error { + return runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, evalCtx *extendedEvalContext, ie sqlutil.InternalExecutor) error { // If the constraint is a check constraint that fails validation, we // need a semaContext set up that can resolve types in order to pretty // print the check expression back to the user. @@ -754,23 +778,21 @@ func (sc *SchemaChanger) validateConstraints( defer func() { collection.ReleaseAll(ctx) }() if c.IsCheck() { if err := validateCheckInTxn( - ctx, &semaCtx, sc.ieFactory, evalCtx.SessionData(), desc, txn, c.Check().Expr, + ctx, &semaCtx, evalCtx.SessionData(), desc, txn, ie, c.Check().Expr, ); err != nil { return err } } else if c.IsForeignKey() { - if err := validateFkInTxn(ctx, sc.ieFactory, evalCtx.SessionData(), desc, txn, collection, c.GetName()); err != nil { + if err := validateFkInTxn(ctx, desc, txn, ie, collection, c.GetName()); err != nil { return err } } else if c.IsUniqueWithoutIndex() { - if err := validateUniqueWithoutIndexConstraintInTxn( - ctx, sc.ieFactory.NewInternalExecutor(evalCtx.SessionData()), desc, txn, evalCtx.SessionData().User(), c.GetName(), - ); err != nil { + if err := validateUniqueWithoutIndexConstraintInTxn(ctx, desc, txn, ie, evalCtx.SessionData().User(), c.GetName()); err != nil { return err } } else if c.IsNotNull() { if err := validateCheckInTxn( - ctx, &semaCtx, sc.ieFactory, evalCtx.SessionData(), desc, txn, c.Check().Expr, + ctx, &semaCtx, evalCtx.SessionData(), desc, txn, ie, c.Check().Expr, ); err != nil { // TODO (lucy): This should distinguish between constraint // validation errors and other types of unexpected errors, and @@ -999,7 +1021,8 @@ func (sc *SchemaChanger) distIndexBackfill( if err != nil { return err } - evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), descriptors) + sd := NewFakeSessionData(sc.execCfg.SV()) + evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, sd, txn.ReadTimestamp(), descriptors) planCtx = sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil, /* planner */ txn, DistributionTypeSystemTenantOnly) indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV) @@ -1295,7 +1318,8 @@ func (sc *SchemaChanger) distColumnBackfill( return nil } cbw := MetadataCallbackWriter{rowResultWriter: &errOnlyResultWriter{}, fn: metaFn} - evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), descriptors) + sd := NewFakeSessionData(sc.execCfg.SV()) + evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, sd, txn.ReadTimestamp(), descriptors) recv := MakeDistSQLReceiver( ctx, &cbw, @@ -2381,10 +2405,9 @@ func runSchemaChangesInTxn( if c.IsCheck() || c.IsNotNull() { check := &c.ConstraintToUpdateDesc().Check if check.Validity == descpb.ConstraintValidity_Validating { - if err := validateCheckInTxn( - ctx, &planner.semaCtx, planner.ExecCfg().InternalExecutorFactory, - planner.SessionData(), tableDesc, planner.txn, check.Expr, - ); err != nil { + if err := planner.WithInternalExecutor(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + return validateCheckInTxn(ctx, &planner.semaCtx, planner.SessionData(), tableDesc, txn, ie, check.Expr) + }); err != nil { return err } check.Validity = descpb.ConstraintValidity_Validated @@ -2406,9 +2429,17 @@ func runSchemaChangesInTxn( } else if c.IsUniqueWithoutIndex() { uwi := &c.ConstraintToUpdateDesc().UniqueWithoutIndexConstraint if uwi.Validity == descpb.ConstraintValidity_Validating { - if err := validateUniqueWithoutIndexConstraintInTxn( - ctx, planner.ExecCfg().InternalExecutor, tableDesc, planner.txn, planner.User(), c.GetName(), - ); err != nil { + + if err := planner.WithInternalExecutor(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + return validateUniqueWithoutIndexConstraintInTxn( + ctx, + tableDesc, + txn, + ie, + planner.User(), + c.GetName(), + ) + }); err != nil { return err } uwi.Validity = descpb.ConstraintValidity_Validated @@ -2480,48 +2511,40 @@ func runSchemaChangesInTxn( func validateCheckInTxn( ctx context.Context, semaCtx *tree.SemaContext, - ief sqlutil.InternalExecutorFactory, sessionData *sessiondata.SessionData, tableDesc *tabledesc.Mutable, txn *kv.Txn, + ie sqlutil.InternalExecutor, checkExpr string, ) error { var syntheticDescs []catalog.Descriptor if tableDesc.Version > tableDesc.ClusterVersion().Version { syntheticDescs = append(syntheticDescs, tableDesc) } - ie := ief.NewInternalExecutor(sessionData) - return ie.WithSyntheticDescriptors(syntheticDescs, func() error { - return validateCheckExpr(ctx, semaCtx, sessionData, checkExpr, tableDesc, ie, txn) - }) + + return ie.WithSyntheticDescriptors( + syntheticDescs, + func() error { + return validateCheckExpr(ctx, semaCtx, txn, sessionData, checkExpr, tableDesc, ie) + }) } -// validateFkInTxn validates foreign key constraints within the provided -// transaction. If the provided table descriptor version is newer than the -// cluster version, it will be used in the InternalExecutor that performs the -// validation query. -// -// TODO (lucy): The special case where the table descriptor version is the same -// as the cluster version only happens because the query in VALIDATE CONSTRAINT -// still runs in the user transaction instead of a step in the schema changer. -// When that's no longer true, this function should be updated. -// -// It operates entirely on the current goroutine and is thus able to -// reuse an existing kv.Txn safely. -func validateFkInTxn( +func getTargetTablesAndFk( ctx context.Context, - ief sqlutil.InternalExecutorFactory, - sd *sessiondata.SessionData, srcTable *tabledesc.Mutable, txn *kv.Txn, descsCol *descs.Collection, fkName string, -) error { +) ( + syntheticDescs []catalog.Descriptor, + fk *descpb.ForeignKeyConstraint, + targetTable catalog.TableDescriptor, + err error, +) { var syntheticTable catalog.TableDescriptor if srcTable.Version > srcTable.ClusterVersion().Version { syntheticTable = srcTable } - var fk *descpb.ForeignKeyConstraint for i := range srcTable.OutboundFKs { def := &srcTable.OutboundFKs[i] if def.Name == fkName { @@ -2530,23 +2553,51 @@ func validateFkInTxn( } } if fk == nil { - return errors.AssertionFailedf("foreign key %s does not exist", fkName) + return nil, nil, nil, errors.AssertionFailedf("foreign key %s does not exist", fkName) } - targetTable, err := descsCol.Direct().MustGetTableDescByID(ctx, txn, fk.ReferencedTableID) + targetTable, err = descsCol.Direct().MustGetTableDescByID(ctx, txn, fk.ReferencedTableID) if err != nil { - return err + return nil, nil, nil, err } - var syntheticDescs []catalog.Descriptor if syntheticTable != nil { syntheticDescs = append(syntheticDescs, syntheticTable) if targetTable.GetID() == syntheticTable.GetID() { targetTable = syntheticTable } } - ie := ief.NewInternalExecutor(sd) - return ie.WithSyntheticDescriptors(syntheticDescs, func() error { - return validateForeignKey(ctx, srcTable, targetTable, fk, ie, txn) - }) + return syntheticDescs, fk, targetTable, nil +} + +// validateFkInTxn validates foreign key constraints within the provided +// transaction. If the provided table descriptor version is newer than the +// cluster version, it will be used in the InternalExecutor that performs the +// validation query. +// +// TODO (lucy): The special case where the table descriptor version is the same +// as the cluster version only happens because the query in VALIDATE CONSTRAINT +// still runs in the user transaction instead of a step in the schema changer. +// When that's no longer true, this function should be updated. +// +// It operates entirely on the current goroutine and is thus able to +// reuse an existing kv.Txn safely. +func validateFkInTxn( + ctx context.Context, + srcTable *tabledesc.Mutable, + txn *kv.Txn, + ie sqlutil.InternalExecutor, + descsCol *descs.Collection, + fkName string, +) error { + syntheticDescs, fk, targetTable, err := getTargetTablesAndFk(ctx, srcTable, txn, descsCol, fkName) + if err != nil { + return err + } + + return ie.WithSyntheticDescriptors( + syntheticDescs, + func() error { + return validateForeignKey(ctx, srcTable, targetTable, fk, ie, txn) + }) } // validateUniqueWithoutIndexConstraintInTxn validates a unique constraint @@ -2563,9 +2614,9 @@ func validateFkInTxn( // reuse an existing kv.Txn safely. func validateUniqueWithoutIndexConstraintInTxn( ctx context.Context, - ie sqlutil.InternalExecutor, tableDesc *tabledesc.Mutable, txn *kv.Txn, + ie sqlutil.InternalExecutor, user username.SQLUsername, constraintName string, ) error { @@ -2573,7 +2624,6 @@ func validateUniqueWithoutIndexConstraintInTxn( if tableDesc.Version > tableDesc.ClusterVersion().Version { syntheticDescs = append(syntheticDescs, tableDesc) } - var uc *descpb.UniqueWithoutIndexConstraint for i := range tableDesc.UniqueWithoutIndexConstraints { def := &tableDesc.UniqueWithoutIndexConstraints[i] @@ -2586,19 +2636,21 @@ func validateUniqueWithoutIndexConstraintInTxn( return errors.AssertionFailedf("unique constraint %s does not exist", constraintName) } - return ie.WithSyntheticDescriptors(syntheticDescs, func() error { - return validateUniqueConstraint( - ctx, - tableDesc, - uc.Name, - uc.ColumnIDs, - uc.Predicate, - ie, - txn, - user, - false, /* preExisting */ - ) - }) + return ie.WithSyntheticDescriptors( + syntheticDescs, + func() error { + return validateUniqueConstraint( + ctx, + tableDesc, + uc.Name, + uc.ColumnIDs, + uc.Predicate, + ie, + txn, + user, + false, /* preExisting */ + ) + }) } // columnBackfillInTxn backfills columns for all mutation columns in diff --git a/pkg/sql/check.go b/pkg/sql/check.go index 5819ac19405c..9bbdf49d3f28 100644 --- a/pkg/sql/check.go +++ b/pkg/sql/check.go @@ -43,11 +43,11 @@ import ( func validateCheckExpr( ctx context.Context, semaCtx *tree.SemaContext, + txn *kv.Txn, sessionData *sessiondata.SessionData, exprStr string, tableDesc *tabledesc.Mutable, ie sqlutil.InternalExecutor, - txn *kv.Txn, ) error { expr, err := schemaexpr.FormatExprForDisplay(ctx, tableDesc, exprStr, semaCtx, sessionData, tree.FmtParsable) if err != nil { @@ -57,8 +57,14 @@ func validateCheckExpr( columns := tree.AsStringWithFlags(&colSelectors, tree.FmtSerializable) queryStr := fmt.Sprintf(`SELECT %s FROM [%d AS t] WHERE NOT (%s) LIMIT 1`, columns, tableDesc.GetID(), exprStr) log.Infof(ctx, "validating check constraint %q with query %q", expr, queryStr) - - rows, err := ie.QueryRow(ctx, "validate check constraint", txn, queryStr) + rows, err := ie.QueryRowEx( + ctx, + "validate check constraint", + txn, + sessiondata.InternalExecutorOverride{ + User: username.RootUserName(), + }, + queryStr) if err != nil { return err } @@ -269,7 +275,8 @@ func validateForeignKey( ) values, err := ie.QueryRowEx(ctx, "validate foreign key constraint", - txn, sessiondata.NodeUserSessionDataOverride, query) + txn, + sessiondata.NodeUserSessionDataOverride, query) if err != nil { return err } diff --git a/pkg/sql/index_backfiller.go b/pkg/sql/index_backfiller.go index 730d8f487b50..fa76ad9ca084 100644 --- a/pkg/sql/index_backfiller.go +++ b/pkg/sql/index_backfiller.go @@ -166,7 +166,8 @@ func (ib *IndexBackfillPlanner) plan( if err := DescsTxn(ctx, ib.execCfg, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { - evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, nowTimestamp, descriptors) + sd := NewFakeSessionData(ib.execCfg.SV()) + evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, sd, nowTimestamp, descriptors) planCtx = ib.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn, DistributionTypeSystemTenantOnly) // TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the diff --git a/pkg/sql/mvcc_backfiller.go b/pkg/sql/mvcc_backfiller.go index ed4baab5d3dd..547ee800b890 100644 --- a/pkg/sql/mvcc_backfiller.go +++ b/pkg/sql/mvcc_backfiller.go @@ -126,7 +126,8 @@ func (im *IndexBackfillerMergePlanner) plan( if err := DescsTxn(ctx, im.execCfg, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { - evalCtx = createSchemaChangeEvalCtx(ctx, im.execCfg, txn.ReadTimestamp(), descriptors) + sd := NewFakeSessionData(im.execCfg.SV()) + evalCtx = createSchemaChangeEvalCtx(ctx, im.execCfg, sd, txn.ReadTimestamp(), descriptors) planCtx = im.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn, DistributionTypeSystemTenantOnly) diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 1fa7053e925a..5815d5df7400 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -2447,10 +2447,23 @@ func (sc *SchemaChanger) txn( return err } } - return sc.execCfg.CollectionFactory.Txn(ctx, sc.execCfg.InternalExecutor, sc.db, f) } +// txnWithExecutor is to run internal executor within a txn. +func (sc *SchemaChanger) txnWithExecutor( + ctx context.Context, + sd *sessiondata.SessionData, + f func(context.Context, *kv.Txn, *descs.Collection, sqlutil.InternalExecutor) error, +) error { + if fn := sc.testingKnobs.RunBeforeDescTxn; fn != nil { + if err := fn(sc.job.ID()); err != nil { + return err + } + } + return sc.execCfg.CollectionFactory.TxnWithExecutor(ctx, sc.db, sd, f) +} + // createSchemaChangeEvalCtx creates an extendedEvalContext() to be used for backfills. // // TODO(andrei): This EvalContext() will be broken for backfills trying to use @@ -2459,11 +2472,12 @@ func (sc *SchemaChanger) txn( // used in the surrounding SQL session, so session tracing is unable // to capture schema change activity. func createSchemaChangeEvalCtx( - ctx context.Context, execCfg *ExecutorConfig, ts hlc.Timestamp, descriptors *descs.Collection, + ctx context.Context, + execCfg *ExecutorConfig, + sd *sessiondata.SessionData, + ts hlc.Timestamp, + descriptors *descs.Collection, ) extendedEvalContext { - - sd := NewFakeSessionData(execCfg.SV()) - evalCtx := extendedEvalContext{ // Make a session tracing object on-the-fly. This is OK // because it sets "enabled: false" and thus none of the