diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 8aebb9576135..256ca0cd3af3 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1917,8 +1917,8 @@ func revalidateIndexes( // We don't actually need the 'historical' read the way the schema change does // since our table is offline. runner := descs.NewHistoricalInternalExecTxnRunner(hlc.Timestamp{}, func(ctx context.Context, fn descs.InternalExecFn) error { - return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - return fn(ctx, txn, descs.FromTxn(txn)) + return execCfg.InternalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error { + return fn(ctx, txn) }) }) diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index a8f47562e251..2f54afd45587 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -111,8 +111,8 @@ type Registry struct { adoptionCh chan adoptionNotice sqlInstance sqlliveness.Instance - // sessionBoundInternalExecutorFactory provides a way for jobs to create - // internal executors. This is rarely needed, and usually job resumers should + // internalDB provides a way for jobs to create internal executors. + // This is rarely needed, and usually job resumers should // use the internal executor from the JobExecCtx. The intended user of this // interface is the schema change job resumer, which needs to set the // tableCollectionModifier on the internal executor to different values in diff --git a/pkg/sql/alter_table.go b/pkg/sql/alter_table.go index 13287ece04da..465f6b4219dd 100644 --- a/pkg/sql/alter_table.go +++ b/pkg/sql/alter_table.go @@ -545,7 +545,7 @@ func (n *alterTableNode) startExec(params runParams) error { ck.CheckDesc().Validity = descpb.ConstraintValidity_Validated } else if fk := c.AsForeignKey(); fk != nil { if err := validateFkInTxn( - params.ctx, params.p.InternalSQLTxn(), params.p.descCollection, n.tableDesc, name, + params.ctx, params.p.InternalSQLTxn(), n.tableDesc, name, ); err != nil { return err } diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 3edce37697c3..8f1e4fa0de10 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -140,7 +140,7 @@ func (sc *SchemaChanger) getChunkSize(chunkSize int64) int64 { } // scTxnFn is the type of functions that operates using transactions in the backfiller. -type scTxnFn func(ctx context.Context, txn isql.Txn, evalCtx *extendedEvalContext) error +type scTxnFn func(ctx context.Context, txn descs.Txn, evalCtx *extendedEvalContext) error // historicalTxnRunner is the type of the callback used by the various // helper functions to run checks at a fixed timestamp (logically, at @@ -152,12 +152,10 @@ func (sc *SchemaChanger) makeFixedTimestampRunner(readAsOf hlc.Timestamp) histor runner := func(ctx context.Context, retryable scTxnFn) error { return sc.fixedTimestampTxnWithExecutor(ctx, readAsOf, func( ctx context.Context, - txn isql.Txn, - sd *sessiondata.SessionData, - descriptors *descs.Collection, + txn descs.Txn, ) error { // We need to re-create the evalCtx since the txn may retry. - evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, sd, readAsOf, descriptors) + evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.SessionData(), readAsOf, txn.Descriptors()) return retryable(ctx, txn, &evalCtx) }) } @@ -173,11 +171,9 @@ func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner( ) error { return sc.fixedTimestampTxnWithExecutor(ctx, readAsOf, func( ctx context.Context, - txn isql.Txn, - _ *sessiondata.SessionData, - descriptors *descs.Collection, + txn descs.Txn, ) error { - return retryable(ctx, txn, descriptors) + return retryable(ctx, txn) }) }) } @@ -185,13 +181,12 @@ func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner( func (sc *SchemaChanger) fixedTimestampTxn( ctx context.Context, readAsOf hlc.Timestamp, - retryable func(ctx context.Context, txn isql.Txn, descriptors *descs.Collection) error, + retryable func(ctx context.Context, txn descs.Txn) error, ) error { return sc.fixedTimestampTxnWithExecutor(ctx, readAsOf, func( - ctx context.Context, txn isql.Txn, _ *sessiondata.SessionData, - descriptors *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { - return retryable(ctx, txn, descriptors) + return retryable(ctx, txn) }) } @@ -200,18 +195,16 @@ func (sc *SchemaChanger) fixedTimestampTxnWithExecutor( readAsOf hlc.Timestamp, retryable func( ctx context.Context, - txn isql.Txn, - sd *sessiondata.SessionData, - descriptors *descs.Collection, + txn descs.Txn, ) error, ) error { return sc.txn(ctx, func( - ctx context.Context, txn isql.Txn, descriptors *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { if err := txn.KV().SetFixedTimestamp(ctx, readAsOf); err != nil { return err } - return retryable(ctx, txn, txn.SessionData(), descriptors) + return retryable(ctx, txn) }) } @@ -446,8 +439,8 @@ func (sc *SchemaChanger) dropConstraints( } // Create update closure for the table and all other tables with backreferences. - if err := sc.txn(ctx, func(ctx context.Context, txn isql.Txn, descsCol *descs.Collection) error { - scTable, err := descsCol.MutableByID(txn.KV()).Table(ctx, sc.descID) + if err := sc.txn(ctx, func(ctx context.Context, txn descs.Txn) error { + scTable, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, sc.descID) if err != nil { return err } @@ -477,7 +470,7 @@ func (sc *SchemaChanger) dropConstraints( if def.Name != constraint.GetName() { continue } - backrefTable, err := descsCol.MutableByID(txn.KV()).Table(ctx, fk.GetReferencedTableID()) + backrefTable, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, fk.GetReferencedTableID()) if err != nil { return err } @@ -486,7 +479,7 @@ func (sc *SchemaChanger) dropConstraints( ); err != nil { return err } - if err := descsCol.WriteDescToBatch( + if err := txn.Descriptors().WriteDescToBatch( ctx, true /* kvTrace */, backrefTable, b, ); err != nil { return err @@ -525,7 +518,7 @@ func (sc *SchemaChanger) dropConstraints( } } } - if err := descsCol.WriteDescToBatch( + if err := txn.Descriptors().WriteDescToBatch( ctx, true /* kvTrace */, scTable, b, ); err != nil { return err @@ -538,13 +531,13 @@ func (sc *SchemaChanger) dropConstraints( log.Info(ctx, "finished dropping constraints") tableDescs := make(map[descpb.ID]catalog.TableDescriptor, len(fksByBackrefTable)+1) if err := sc.txn(ctx, func( - ctx context.Context, txn isql.Txn, descsCol *descs.Collection, + ctx context.Context, txn descs.Txn, ) (err error) { - if tableDescs[sc.descID], err = descsCol.ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, sc.descID); err != nil { + if tableDescs[sc.descID], err = txn.Descriptors().ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, sc.descID); err != nil { return err } for id := range fksByBackrefTable { - desc, err := descsCol.ByIDWithLeased(txn.KV()).WithoutOffline().Get().Table(ctx, id) + desc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).WithoutOffline().Get().Table(ctx, id) if err != nil { return err } @@ -581,9 +574,9 @@ func (sc *SchemaChanger) addConstraints( // Create update closure for the table and all other tables with backreferences if err := sc.txn(ctx, func( - ctx context.Context, txn isql.Txn, descsCol *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { - scTable, err := descsCol.MutableByID(txn.KV()).Table(ctx, sc.descID) + scTable, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, sc.descID) if err != nil { return err } @@ -632,7 +625,7 @@ func (sc *SchemaChanger) addConstraints( } if !foundExisting { scTable.OutboundFKs = append(scTable.OutboundFKs, *fk.ForeignKeyDesc()) - backrefTable, err := descsCol.MutableByID(txn.KV()).Table(ctx, fk.GetReferencedTableID()) + backrefTable, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, fk.GetReferencedTableID()) if err != nil { return err } @@ -656,7 +649,7 @@ func (sc *SchemaChanger) addConstraints( // the last put will win but it's perhaps not ideal. We could add // code to deduplicate but it doesn't seem worth the hassle. if backrefTable != scTable { - if err := descsCol.WriteDescToBatch( + if err := txn.Descriptors().WriteDescToBatch( ctx, true /* kvTrace */, backrefTable, b, ); err != nil { return err @@ -686,7 +679,7 @@ func (sc *SchemaChanger) addConstraints( } } } - if err := descsCol.WriteDescToBatch( + if err := txn.Descriptors().WriteDescToBatch( ctx, true /* kvTrace */, scTable, b, ); err != nil { return err @@ -727,9 +720,9 @@ func (sc *SchemaChanger) validateConstraints( var tableDesc catalog.TableDescriptor if err := sc.fixedTimestampTxn(ctx, readAsOf, func( - ctx context.Context, txn isql.Txn, descriptors *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { - tableDesc, err = descriptors.ByID(txn.KV()).WithoutNonPublic().Get().Table(ctx, sc.descID) + tableDesc, err = txn.Descriptors().ByID(txn.KV()).WithoutNonPublic().Get().Table(ctx, sc.descID) return err }); err != nil { return err @@ -756,7 +749,7 @@ func (sc *SchemaChanger) validateConstraints( } desc := descI.(*tabledesc.Mutable) // Each check operates at the historical timestamp. - return runHistoricalTxn(ctx, func(ctx context.Context, txn isql.Txn, evalCtx *extendedEvalContext) error { + return runHistoricalTxn(ctx, func(ctx context.Context, txn descs.Txn, evalCtx *extendedEvalContext) error { // If the constraint is a check constraint that fails validation, we // need a semaContext set up that can resolve types in order to pretty // print the check expression back to the user. @@ -782,7 +775,7 @@ func (sc *SchemaChanger) validateConstraints( return err } } else if c.AsForeignKey() != nil { - if err := validateFkInTxn(ctx, txn, collection, desc, c.GetName()); err != nil { + if err := validateFkInTxn(ctx, txn, desc, c.GetName()); err != nil { return err } } else if c.AsUniqueWithoutIndex() != nil { @@ -922,10 +915,10 @@ func (sc *SchemaChanger) distIndexBackfill( var mutationIdx int if err := sc.txn(ctx, func( - ctx context.Context, txn isql.Txn, col *descs.Collection, + ctx context.Context, txn descs.Txn, ) (err error) { todoSpans, _, mutationIdx, err = rowexec.GetResumeSpans( - ctx, sc.jobRegistry, txn, sc.execCfg.Codec, col, sc.descID, + ctx, sc.jobRegistry, txn, sc.execCfg.Codec, txn.Descriptors(), sc.descID, sc.mutationID, filter, ) return err @@ -960,7 +953,7 @@ func (sc *SchemaChanger) distIndexBackfill( const pageSize = 10000 noop := func(_ []kv.KeyValue) error { return nil } if err := sc.fixedTimestampTxn(ctx, writeAsOf, func( - ctx context.Context, txn isql.Txn, _ *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { for _, span := range targetSpans { // TODO(dt): a Count() request would be nice here if the target isn't @@ -976,7 +969,7 @@ func (sc *SchemaChanger) distIndexBackfill( } log.Infof(ctx, "persisting target safe write time %v...", writeAsOf) if err := sc.txn(ctx, func( - ctx context.Context, txn isql.Txn, _ *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { details := sc.job.Details().(jobspb.SchemaChangeDetails) details.WriteTimestamp = writeAsOf @@ -1003,7 +996,7 @@ func (sc *SchemaChanger) distIndexBackfill( // The txn is used to fetch a tableDesc, partition the spans and set the // evalCtx ts all of which is during planning of the DistSQL flow. if err := sc.txn(ctx, func( - ctx context.Context, txn isql.Txn, descriptors *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { // It is okay to release the lease on the descriptor before running the @@ -1017,12 +1010,12 @@ func (sc *SchemaChanger) distIndexBackfill( // clear what this buys us in terms of checking the descriptors validity. // Thus, in favor of simpler code and no correctness concerns we release // the lease once the flow is planned. - tableDesc, err := sc.getTableVersion(ctx, txn.KV(), descriptors, version) + tableDesc, err := sc.getTableVersion(ctx, txn.KV(), txn.Descriptors(), version) if err != nil { return err } sd := NewFakeSessionData(sc.execCfg.SV()) - evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, sd, txn.KV().ReadTimestamp(), descriptors) + evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, sd, txn.KV().ReadTimestamp(), txn.Descriptors()) planCtx = sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil, /* planner */ txn.KV(), DistributionTypeSystemTenantOnly) indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV) @@ -1302,10 +1295,10 @@ func (sc *SchemaChanger) distColumnBackfill( // variable and assign to todoSpans after committing. var updatedTodoSpans []roachpb.Span if err := sc.txn(ctx, func( - ctx context.Context, txn isql.Txn, descriptors *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { updatedTodoSpans = todoSpans - tableDesc, err := sc.getTableVersion(ctx, txn.KV(), descriptors, version) + tableDesc, err := sc.getTableVersion(ctx, txn.KV(), txn.Descriptors(), version) if err != nil { return err } @@ -1318,7 +1311,7 @@ func (sc *SchemaChanger) distColumnBackfill( } cbw := MetadataCallbackWriter{rowResultWriter: &errOnlyResultWriter{}, fn: metaFn} sd := NewFakeSessionData(sc.execCfg.SV()) - evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, sd, txn.KV().ReadTimestamp(), descriptors) + evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, sd, txn.KV().ReadTimestamp(), txn.Descriptors()) recv := MakeDistSQLReceiver( ctx, &cbw, @@ -1432,9 +1425,9 @@ func (sc *SchemaChanger) validateIndexes(ctx context.Context) error { readAsOf := sc.clock.Now() var tableDesc catalog.TableDescriptor if err := sc.fixedTimestampTxn(ctx, readAsOf, func( - ctx context.Context, txn isql.Txn, descriptors *descs.Collection, + ctx context.Context, txn descs.Txn, ) (err error) { - tableDesc, err = descriptors.ByID(txn.KV()).WithoutNonPublic().Get().Table(ctx, sc.descID) + tableDesc, err = txn.Descriptors().ByID(txn.KV()).WithoutNonPublic().Get().Table(ctx, sc.descID) return err }); err != nil { return err @@ -1530,14 +1523,14 @@ func ValidateConstraint( // The check operates at the historical timestamp. return runHistoricalTxn.Exec(ctx, func( - ctx context.Context, txn isql.Txn, descriptors *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { // Use a schema resolver because we need to resolve types by ID and table by name. - resolver := NewSkippingCacheSchemaResolver(descriptors, sessiondata.NewStack(sessionData), txn.KV(), nil /* authAccessor */) + resolver := NewSkippingCacheSchemaResolver(txn.Descriptors(), sessiondata.NewStack(sessionData), txn.KV(), nil /* authAccessor */) semaCtx := tree.MakeSemaContext() semaCtx.TypeResolver = resolver semaCtx.TableNameResolver = resolver - defer func() { descriptors.ReleaseAll(ctx) }() + defer func() { txn.Descriptors().ReleaseAll(ctx) }() switch catalog.GetConstraintType(constraint) { case catconstants.ConstraintTypeCheck: @@ -1565,7 +1558,7 @@ func ValidateConstraint( ) case catconstants.ConstraintTypeFK: fk := constraint.AsForeignKey() - targetTable, err := descriptors.ByID(txn.KV()).Get().Table(ctx, fk.GetReferencedTableID()) + targetTable, err := txn.Descriptors().ByID(txn.KV()).Get().Table(ctx, fk.GetReferencedTableID()) if err != nil { return err } @@ -1651,7 +1644,7 @@ func ValidateInvertedIndexes( key := span.Key endKey := span.EndKey if err := runHistoricalTxn.Exec(ctx, func( - ctx context.Context, txn isql.Txn, _ *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { for { kvs, err := txn.KV().Scan(ctx, key, endKey, 1000000) @@ -1760,7 +1753,7 @@ func countExpectedRowsForInvertedIndex( var expectedCount int64 if err := runHistoricalTxn.Exec(ctx, func( - ctx context.Context, txn isql.Txn, _ *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { var stmt string geoConfig := idx.GetGeoConfig() @@ -1946,7 +1939,7 @@ func populateExpectedCounts( } var tableRowCount int64 if err := runHistoricalTxn.Exec(ctx, func( - ctx context.Context, txn isql.Txn, _ *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { var s strings.Builder for _, idx := range indexes { @@ -2059,7 +2052,7 @@ func countIndexRowsAndMaybeCheckUniqueness( // Retrieve the row count in the index. var idxLen int64 if err := runHistoricalTxn.Exec(ctx, func( - ctx context.Context, txn isql.Txn, _ *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { query := fmt.Sprintf(`SELECT count(1) FROM [%d AS t]@[%d]`, desc.GetID(), idx.GetID()) // If the index is a partial index the predicate must be added @@ -2266,10 +2259,10 @@ func (sc *SchemaChanger) mergeFromTemporaryIndex( ) error { var tbl *tabledesc.Mutable if err := sc.txn(ctx, func( - ctx context.Context, txn isql.Txn, descsCol *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { var err error - tbl, err = descsCol.MutableByID(txn.KV()).Table(ctx, sc.descID) + tbl, err = txn.Descriptors().MutableByID(txn.KV()).Table(ctx, sc.descID) return err }); err != nil { return err @@ -2288,9 +2281,9 @@ func (sc *SchemaChanger) mergeFromTemporaryIndex( func (sc *SchemaChanger) runStateMachineAfterTempIndexMerge(ctx context.Context) error { var runStatus jobs.RunningStatus return sc.txn(ctx, func( - ctx context.Context, txn isql.Txn, descsCol *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { - tbl, err := descsCol.MutableByID(txn.KV()).Table(ctx, sc.descID) + tbl, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, sc.descID) if err != nil { return err } @@ -2320,7 +2313,7 @@ func (sc *SchemaChanger) runStateMachineAfterTempIndexMerge(ctx context.Context) if runStatus == "" || tbl.Dropped() { return nil } - if err := descsCol.WriteDesc( + if err := txn.Descriptors().WriteDesc( ctx, true /* kvTrace */, tbl, txn.KV(), ); err != nil { return err @@ -2663,11 +2656,7 @@ func validateCheckInTxn( } func getTargetTablesAndFk( - ctx context.Context, - srcTable *tabledesc.Mutable, - txn *kv.Txn, - descsCol *descs.Collection, - fkName string, + ctx context.Context, srcTable *tabledesc.Mutable, txn descs.Txn, fkName string, ) ( syntheticDescs []catalog.Descriptor, fk *descpb.ForeignKeyConstraint, @@ -2688,7 +2677,7 @@ func getTargetTablesAndFk( if fk == nil { return nil, nil, nil, errors.AssertionFailedf("foreign key %s does not exist", fkName) } - targetTable, err = descsCol.ByID(txn).Get().Table(ctx, fk.ReferencedTableID) + targetTable, err = txn.Descriptors().ByID(txn.KV()).Get().Table(ctx, fk.ReferencedTableID) if err != nil { return nil, nil, nil, err } @@ -2714,13 +2703,9 @@ func getTargetTablesAndFk( // It operates entirely on the current goroutine and is thus able to // reuse an existing kv.Txn safely. func validateFkInTxn( - ctx context.Context, - txn isql.Txn, - descsCol *descs.Collection, - srcTable *tabledesc.Mutable, - fkName string, + ctx context.Context, txn descs.Txn, srcTable *tabledesc.Mutable, fkName string, ) error { - syntheticDescs, fk, targetTable, err := getTargetTablesAndFk(ctx, srcTable, txn.KV(), descsCol, fkName) + syntheticDescs, fk, targetTable, err := getTargetTablesAndFk(ctx, srcTable, txn, fkName) if err != nil { return err } diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index a03175bc0950..b1fe4cd26487 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -1206,7 +1206,7 @@ func MakeTestCollection(ctx context.Context, leaseManager *lease.Manager) Collec } // InternalExecFn is the type of functions that operates using an internalExecutor. -type InternalExecFn func(ctx context.Context, txn isql.Txn, descriptors *Collection) error +type InternalExecFn func(ctx context.Context, txn Txn) error // HistoricalInternalExecTxnRunnerFn callback for executing with the internal executor // at a fixed timestamp. diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 68f5ab736115..2d976eb66699 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -440,8 +440,8 @@ func (sc *SchemaChanger) maybeMakeAddTablePublic( } log.Info(ctx, "making table public") - return sc.txn(ctx, func(ctx context.Context, txn isql.Txn, descsCol *descs.Collection) error { - mut, err := descsCol.MutableByID(txn.KV()).Table(ctx, table.GetID()) + return sc.txn(ctx, func(ctx context.Context, txn descs.Txn) error { + mut, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, table.GetID()) if err != nil { return err } @@ -449,7 +449,7 @@ func (sc *SchemaChanger) maybeMakeAddTablePublic( return nil } mut.State = descpb.DescriptorState_PUBLIC - return descsCol.WriteDesc(ctx, true /* kvTrace */, mut, txn.KV()) + return txn.Descriptors().WriteDesc(ctx, true /* kvTrace */, mut, txn.KV()) }) } @@ -481,8 +481,8 @@ func (sc *SchemaChanger) ignoreRevertedDropIndex( if !table.IsPhysicalTable() { return nil } - return sc.txn(ctx, func(ctx context.Context, txn isql.Txn, descsCol *descs.Collection) error { - mut, err := descsCol.MutableByID(txn.KV()).Table(ctx, table.GetID()) + return sc.txn(ctx, func(ctx context.Context, txn descs.Txn) error { + mut, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, table.GetID()) if err != nil { return err } @@ -500,7 +500,7 @@ func (sc *SchemaChanger) ignoreRevertedDropIndex( mutationsModified = true } if mutationsModified { - return descsCol.WriteDesc(ctx, true /* kvTrace */, mut, txn.KV()) + return txn.Descriptors().WriteDesc(ctx, true /* kvTrace */, mut, txn.KV()) } return nil }) @@ -574,9 +574,9 @@ func (sc *SchemaChanger) getTargetDescriptor(ctx context.Context) (catalog.Descr // Retrieve the descriptor that is being changed. var desc catalog.Descriptor if err := sc.txn(ctx, func( - ctx context.Context, txn isql.Txn, descriptors *descs.Collection, + ctx context.Context, txn descs.Txn, ) (err error) { - desc, err = descriptors.ByID(txn.KV()).Get().Desc(ctx, sc.descID) + desc, err = txn.Descriptors().ByID(txn.KV()).Get().Desc(ctx, sc.descID) return err }); err != nil { return nil, err @@ -894,8 +894,8 @@ func (sc *SchemaChanger) handlePermanentSchemaChangeError( // initialize the job running status. func (sc *SchemaChanger) initJobRunningStatus(ctx context.Context) error { - return sc.txn(ctx, func(ctx context.Context, txn isql.Txn, descriptors *descs.Collection) error { - desc, err := descriptors.ByID(txn.KV()).WithoutNonPublic().Get().Table(ctx, sc.descID) + return sc.txn(ctx, func(ctx context.Context, txn descs.Txn) error { + desc, err := txn.Descriptors().ByID(txn.KV()).WithoutNonPublic().Get().Table(ctx, sc.descID) if err != nil { return err } @@ -1034,8 +1034,8 @@ func (sc *SchemaChanger) rollbackSchemaChange(ctx context.Context, err error) er // table was in the ADD state and the schema change failed, then we need to // clean up the descriptor. gcJobID := sc.jobRegistry.MakeJobID() - if err := sc.txn(ctx, func(ctx context.Context, txn isql.Txn, descsCol *descs.Collection) error { - scTable, err := descsCol.MutableByID(txn.KV()).Table(ctx, sc.descID) + if err := sc.txn(ctx, func(ctx context.Context, txn descs.Txn) error { + scTable, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, sc.descID) if err != nil { return err } @@ -1046,17 +1046,17 @@ func (sc *SchemaChanger) rollbackSchemaChange(ctx context.Context, err error) er b := txn.KV().NewBatch() // For views, we need to clean up and references that exist to tables. if scTable.IsView() { - if err := sc.dropViewDeps(ctx, descsCol, txn.KV(), b, scTable); err != nil { + if err := sc.dropViewDeps(ctx, txn.Descriptors(), txn.KV(), b, scTable); err != nil { return err } } scTable.SetDropped() scTable.DropTime = timeutil.Now().UnixNano() const kvTrace = false - if err := descsCol.WriteDescToBatch(ctx, kvTrace, scTable, b); err != nil { + if err := txn.Descriptors().WriteDescToBatch(ctx, kvTrace, scTable, b); err != nil { return err } - if err := descsCol.DeleteNamespaceEntryToBatch(ctx, kvTrace, scTable, b); err != nil { + if err := txn.Descriptors().DeleteNamespaceEntryToBatch(ctx, kvTrace, scTable, b); err != nil { return err } @@ -1094,13 +1094,13 @@ func (sc *SchemaChanger) RunStateMachineBeforeBackfill(ctx context.Context) erro var runStatus jobs.RunningStatus if err := sc.txn(ctx, func( - ctx context.Context, txn isql.Txn, descsCol *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { - tbl, err := descsCol.MutableByID(txn.KV()).Table(ctx, sc.descID) + tbl, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, sc.descID) if err != nil { return err } - dbDesc, err := descsCol.ByID(txn.KV()).WithoutNonPublic().Get().Database(ctx, tbl.GetParentID()) + dbDesc, err := txn.Descriptors().ByID(txn.KV()).WithoutNonPublic().Get().Database(ctx, tbl.GetParentID()) if err != nil { return err } @@ -1143,7 +1143,7 @@ func (sc *SchemaChanger) RunStateMachineBeforeBackfill(ctx context.Context) erro tbl, m, false, // isDone - descsCol, + txn.Descriptors(), ); err != nil { return err } @@ -1151,7 +1151,7 @@ func (sc *SchemaChanger) RunStateMachineBeforeBackfill(ctx context.Context) erro if doNothing := runStatus == "" || tbl.Dropped(); doNothing { return nil } - if err := descsCol.WriteDesc( + if err := txn.Descriptors().WriteDesc( ctx, true /* kvTrace */, tbl, txn.KV(), ); err != nil { return err @@ -1200,9 +1200,9 @@ func (sc *SchemaChanger) stepStateMachineAfterIndexBackfill(ctx context.Context) var runStatus jobs.RunningStatus if err := sc.txn(ctx, func( - ctx context.Context, txn isql.Txn, descsCol *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { - tbl, err := descsCol.MutableByID(txn.KV()).Table(ctx, sc.descID) + tbl, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, sc.descID) if err != nil { return err } @@ -1232,7 +1232,7 @@ func (sc *SchemaChanger) stepStateMachineAfterIndexBackfill(ctx context.Context) if runStatus == "" || tbl.Dropped() { return nil } - if err := descsCol.WriteDesc( + if err := txn.Descriptors().WriteDesc( ctx, true /* kvTrace */, tbl, txn.KV(), ); err != nil { return err @@ -1342,24 +1342,24 @@ func (sc *SchemaChanger) done(ctx context.Context) error { var depMutationJobs []jobspb.JobID var otherJobIDs []jobspb.JobID err := sc.txn(ctx, func( - ctx context.Context, txn isql.Txn, descsCol *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { depMutationJobs = depMutationJobs[:0] otherJobIDs = otherJobIDs[:0] var err error - scTable, err := descsCol.MutableByID(txn.KV()).Table(ctx, sc.descID) + scTable, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, sc.descID) if err != nil { return err } - dbDesc, err := descsCol.ByID(txn.KV()).WithoutNonPublic().Get().Database(ctx, scTable.GetParentID()) + dbDesc, err := txn.Descriptors().ByID(txn.KV()).WithoutNonPublic().Get().Database(ctx, scTable.GetParentID()) if err != nil { return err } collectReferencedTypeIDs := func() (catalog.DescriptorIDSet, error) { typeLookupFn := func(id descpb.ID) (catalog.TypeDescriptor, error) { - desc, err := descsCol.ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Type(ctx, id) + desc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Type(ctx, id) if err != nil { return nil, err } @@ -1415,13 +1415,13 @@ func (sc *SchemaChanger) done(ctx context.Context) error { if fk := m.AsForeignKey(); fk != nil && fk.Adding() && fk.GetConstraintValidity() == descpb.ConstraintValidity_Unvalidated { // Add backreference on the referenced table (which could be the same table) - backrefTable, err := descsCol.MutableByID(txn.KV()).Table(ctx, fk.GetReferencedTableID()) + backrefTable, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, fk.GetReferencedTableID()) if err != nil { return err } backrefTable.InboundFKs = append(backrefTable.InboundFKs, *fk.ForeignKeyDesc()) if backrefTable != scTable { - if err := descsCol.WriteDescToBatch(ctx, kvTrace, backrefTable, b); err != nil { + if err := txn.Descriptors().WriteDescToBatch(ctx, kvTrace, backrefTable, b); err != nil { return err } } @@ -1437,7 +1437,7 @@ func (sc *SchemaChanger) done(ctx context.Context) error { scTable, m, true, // isDone - descsCol, + txn.Descriptors(), ); err != nil { return err } @@ -1594,7 +1594,7 @@ func (sc *SchemaChanger) done(ctx context.Context) error { } if err := setNewLocalityConfig( - ctx, txn.KV(), descsCol, b, scTable, localityConfigToSwapTo, kvTrace, + ctx, txn.KV(), txn.Descriptors(), b, scTable, localityConfigToSwapTo, kvTrace, ); err != nil { return err } @@ -1690,7 +1690,7 @@ func (sc *SchemaChanger) done(ctx context.Context) error { // Update the set of back references. for id, isAddition := range update { - typ, err := descsCol.MutableByID(txn.KV()).Type(ctx, id) + typ, err := txn.Descriptors().MutableByID(txn.KV()).Type(ctx, id) if err != nil { return err } @@ -1699,7 +1699,7 @@ func (sc *SchemaChanger) done(ctx context.Context) error { } else { typ.RemoveReferencingDescriptorID(scTable.ID) } - if err := descsCol.WriteDescToBatch(ctx, kvTrace, typ, b); err != nil { + if err := txn.Descriptors().WriteDescToBatch(ctx, kvTrace, typ, b); err != nil { return err } } @@ -1731,12 +1731,12 @@ func (sc *SchemaChanger) done(ctx context.Context) error { // Update the set of back references. for id, colIDSet := range update { - tbl, err := descsCol.MutableByID(txn.KV()).Table(ctx, id) + tbl, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, id) if err != nil { return err } tbl.UpdateColumnsDependedOnBy(scTable.ID, colIDSet) - if err := descsCol.WriteDescToBatch(ctx, kvTrace, tbl, b); err != nil { + if err := txn.Descriptors().WriteDescToBatch(ctx, kvTrace, tbl, b); err != nil { return err } } @@ -1745,7 +1745,7 @@ func (sc *SchemaChanger) done(ctx context.Context) error { // Clean up any comments related to the mutations, specifically if we need // to drop them. for _, comment := range commentsToDelete { - if err := descsCol.DeleteCommentInBatch( + if err := txn.Descriptors().DeleteCommentInBatch( ctx, false /* kvTrace */, b, catalogkeys.MakeCommentKey(uint32(comment.id), uint32(comment.subID), comment.commentType), ); err != nil { return err @@ -1753,23 +1753,23 @@ func (sc *SchemaChanger) done(ctx context.Context) error { } for _, comment := range commentsToSwap { - cmt, found := descsCol.GetComment(catalogkeys.MakeCommentKey(uint32(comment.id), uint32(comment.oldSubID), comment.commentType)) + cmt, found := txn.Descriptors().GetComment(catalogkeys.MakeCommentKey(uint32(comment.id), uint32(comment.oldSubID), comment.commentType)) if !found { continue } - if err := descsCol.DeleteCommentInBatch( + if err := txn.Descriptors().DeleteCommentInBatch( ctx, false /* kvTrace */, b, catalogkeys.MakeCommentKey(uint32(comment.id), uint32(comment.oldSubID), comment.commentType), ); err != nil { return err } - if err := descsCol.WriteCommentToBatch( + if err := txn.Descriptors().WriteCommentToBatch( ctx, false /* kvTrace */, b, catalogkeys.MakeCommentKey(uint32(comment.id), uint32(comment.newSubID), comment.commentType), cmt, ); err != nil { return err } } - if err := descsCol.WriteDescToBatch(ctx, kvTrace, scTable, b); err != nil { + if err := txn.Descriptors().WriteDescToBatch(ctx, kvTrace, scTable, b); err != nil { return err } if err := txn.KV().Run(ctx, b); err != nil { @@ -1924,8 +1924,8 @@ func (sc *SchemaChanger) maybeReverseMutations(ctx context.Context, causingError // Get the other tables whose foreign key backreferences need to be removed. alreadyReversed := false const kvTrace = true // TODO(ajwerner): figure this out - err := sc.txn(ctx, func(ctx context.Context, txn isql.Txn, descsCol *descs.Collection) error { - scTable, err := descsCol.MutableByID(txn.KV()).Table(ctx, sc.descID) + err := sc.txn(ctx, func(ctx context.Context, txn descs.Txn) error { + scTable, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, sc.descID) if err != nil { return err } @@ -1998,7 +1998,7 @@ func (sc *SchemaChanger) maybeReverseMutations(ctx context.Context, causingError } // Get the foreign key backreferences to remove. if fk := constraint.AsForeignKey(); fk != nil { - backrefTable, err := descsCol.MutableByID(txn.KV()).Table(ctx, fk.GetReferencedTableID()) + backrefTable, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, fk.GetReferencedTableID()) if err != nil { return err } @@ -2009,7 +2009,7 @@ func (sc *SchemaChanger) maybeReverseMutations(ctx context.Context, causingError log.Infof(ctx, "error attempting to remove backreference %s during rollback: %s", constraint.GetName(), err) } - if err := descsCol.WriteDescToBatch(ctx, kvTrace, backrefTable, b); err != nil { + if err := txn.Descriptors().WriteDescToBatch(ctx, kvTrace, backrefTable, b); err != nil { return err } } @@ -2030,7 +2030,7 @@ func (sc *SchemaChanger) maybeReverseMutations(ctx context.Context, causingError // Read the table descriptor from the store. The Version of the // descriptor has already been incremented in the transaction and // this descriptor can be modified without incrementing the version. - if err := descsCol.WriteDescToBatch(ctx, kvTrace, scTable, b); err != nil { + if err := txn.Descriptors().WriteDescToBatch(ctx, kvTrace, scTable, b); err != nil { return err } if err := txn.KV().Run(ctx, b); err != nil { @@ -2447,27 +2447,7 @@ type SchemaChangerTestingKnobs struct { func (*SchemaChangerTestingKnobs) ModuleTestingKnobs() {} // txn is a convenient wrapper around descs.Txn(). -// -// TODO(ajwerner): Replace this with direct calls to DescsTxn. -func (sc *SchemaChanger) txn( - ctx context.Context, f func(context.Context, isql.Txn, *descs.Collection) error, -) error { - return sc.txnWithExecutor(ctx, func( - ctx context.Context, txn isql.Txn, _ *sessiondata.SessionData, - collection *descs.Collection, - ) error { - return f(ctx, txn, collection) - }) -} - -// txnWithExecutor is to run internal executor within a txn. -func (sc *SchemaChanger) txnWithExecutor( - ctx context.Context, - f func( - context.Context, isql.Txn, *sessiondata.SessionData, - *descs.Collection, - ) error, -) error { +func (sc *SchemaChanger) txn(ctx context.Context, f func(context.Context, descs.Txn) error) error { if fn := sc.testingKnobs.RunBeforeDescTxn; fn != nil { if err := fn(sc.job.ID()); err != nil { return err @@ -2476,7 +2456,7 @@ func (sc *SchemaChanger) txnWithExecutor( return sc.execCfg.InternalDB.DescsTxn(ctx, func( ctx context.Context, txn descs.Txn, ) error { - return f(ctx, txn, txn.SessionData(), txn.Descriptors()) + return f(ctx, txn) }) } @@ -3085,9 +3065,9 @@ func (sc *SchemaChanger) getDependentMutationsJobs( func (sc *SchemaChanger) preSplitHashShardedIndexRanges(ctx context.Context) error { if err := sc.txn(ctx, func( - ctx context.Context, txn isql.Txn, descsCol *descs.Collection, + ctx context.Context, txn descs.Txn, ) error { - tableDesc, err := descsCol.MutableByID(txn.KV()).Table(ctx, sc.descID) + tableDesc, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, sc.descID) if err != nil { return err } diff --git a/pkg/sql/schemachanger/scdeps/validator.go b/pkg/sql/schemachanger/scdeps/validator.go index 8c6009b2dc83..e63ac296a671 100644 --- a/pkg/sql/schemachanger/scdeps/validator.go +++ b/pkg/sql/schemachanger/scdeps/validator.go @@ -136,7 +136,7 @@ func (vd validator) makeHistoricalInternalExecTxnRunner() descs.HistoricalIntern if err := txn.KV().SetFixedTimestamp(ctx, now); err != nil { return err } - return fn(ctx, txn, txn.Descriptors()) + return fn(ctx, txn) }) }) }