diff --git a/pkg/bench/rttanalysis/testdata/benchmark_expectations b/pkg/bench/rttanalysis/testdata/benchmark_expectations index c9d14d2b734e..320b4427a616 100644 --- a/pkg/bench/rttanalysis/testdata/benchmark_expectations +++ b/pkg/bench/rttanalysis/testdata/benchmark_expectations @@ -67,11 +67,11 @@ exp,benchmark 2,ORMQueries/has_table_privilege_1 4,ORMQueries/has_table_privilege_3 6,ORMQueries/has_table_privilege_5 -15,ORMQueries/information_schema._pg_index_position +3,ORMQueries/information_schema._pg_index_position 2,ORMQueries/pg_attribute 2,ORMQueries/pg_class 11,ORMQueries/pg_is_other_temp_schema -23,ORMQueries/pg_is_other_temp_schema_multiple_times +18,ORMQueries/pg_is_other_temp_schema_multiple_times 4,ORMQueries/pg_my_temp_schema 4,ORMQueries/pg_my_temp_schema_multiple_times 4,ORMQueries/pg_namespace diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 9c6ae9697cae..9d5344865160 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1581,7 +1581,7 @@ func revalidateIndexes( // since our table is offline. var runner sqlutil.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sqlutil.InternalExecFn) error { return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - ie := job.MakeSessionBoundInternalExecutor(ctx, sql.NewFakeSessionData(execCfg.SV())).(*sql.InternalExecutor) + ie := job.MakeSessionBoundInternalExecutor(sql.NewFakeSessionData(execCfg.SV())).(*sql.InternalExecutor) return fn(ctx, txn, ie) }) } diff --git a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go index 2c54634dd52e..3f5523eeadb8 100644 --- a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go @@ -94,7 +94,7 @@ func newRowFetcherCache( return &rowFetcherCache{ codec: codec, leaseMgr: leaseMgr, - collection: cf.NewCollection(ctx, nil /* TemporarySchemaProvider */), + collection: cf.NewCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */), db: db, fetchers: cache.NewUnorderedCache(defaultCacheConfig), watchedFamilies: watchedFamilies, diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index b175c43b66e3..78e6cf64b0c6 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -1133,7 +1133,7 @@ func getQualifiedTableName( func getQualifiedTableNameObj( ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, desc catalog.TableDescriptor, ) (tree.TableName, error) { - col := execCfg.CollectionFactory.MakeCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */) + col := execCfg.CollectionFactory.NewCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */) dbDesc, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, desc.GetParentID()) if err != nil { return tree.TableName{}, err diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go index ef01d8087116..afc58244b3a1 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go +++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go @@ -92,7 +92,7 @@ func New( settings: cfg.Settings, targets: targets, leaseMgr: cfg.LeaseManager.(*lease.Manager), - ie: cfg.SessionBoundInternalExecutorFactory(ctx, &sessiondata.SessionData{}), + ie: cfg.InternalExecutorFactory.NewInternalExecutor(&sessiondata.SessionData{}), collectionFactory: cfg.CollectionFactory, metrics: metrics, tolerances: tolerances, diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index eb4af74cd80d..47af9b73eff5 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -718,9 +718,14 @@ func (j *Job) FractionCompleted() float32 { // sessionBoundInternalExecutorFactory for a more detailed explanation of why // this exists. func (j *Job) MakeSessionBoundInternalExecutor( - ctx context.Context, sd *sessiondata.SessionData, + sd *sessiondata.SessionData, ) sqlutil.InternalExecutor { - return j.registry.sessionBoundInternalExecutorFactory(ctx, sd) + return j.registry.internalExecutorFactory.NewInternalExecutor(sd) +} + +// GetInternalExecutorFactory returns the internal executor factory. +func (j *Job) GetInternalExecutorFactory() sqlutil.InternalExecutorFactory { + return j.registry.internalExecutorFactory } // MarkIdle marks the job as Idle. Idleness should not be toggled frequently diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 2bc90981a0a3..0ed2d228a005 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -127,7 +127,7 @@ type Registry struct { // field. Modifying the TableCollection is basically a per-query operation // and should be a per-query setting. #34304 is the issue for creating/ // improving this API. - sessionBoundInternalExecutorFactory sqlutil.SessionBoundInternalExecutorFactory + internalExecutorFactory sqlutil.InternalExecutorFactory // if non-empty, indicates path to file that prevents any job adoptions. preventAdoptionFile string @@ -226,14 +226,12 @@ func MakeRegistry( return r } -// SetSessionBoundInternalExecutorFactory sets the -// SessionBoundInternalExecutorFactory that will be used by the job registry +// SetInternalExecutorFactory sets the +// InternalExecutorFactory that will be used by the job registry // executor. We expose this separately from the constructor to avoid a circular // dependency. -func (r *Registry) SetSessionBoundInternalExecutorFactory( - factory sqlutil.SessionBoundInternalExecutorFactory, -) { - r.sessionBoundInternalExecutorFactory = factory +func (r *Registry) SetInternalExecutorFactory(factory sqlutil.InternalExecutorFactory) { + r.internalExecutorFactory = factory } // NewSpanConstrainer returns an instance of sql.SpanConstrainer as an interface{}, diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go index 82f8e7b74297..ff6c1498a40b 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go @@ -879,6 +879,17 @@ type wrappedInternalExecutor struct { } } +func (ie *wrappedInternalExecutor) QueryBufferedExWithCols( + ctx context.Context, + opName string, + txn *kv.Txn, + session sessiondata.InternalExecutorOverride, + stmt string, + qargs ...interface{}, +) ([]tree.Datums, colinfo.ResultColumns, error) { + panic("unimplemented") +} + var _ sqlutil.InternalExecutor = &wrappedInternalExecutor{} func (ie *wrappedInternalExecutor) Exec( diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 32de7ce2e0c0..42502559c689 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -1779,21 +1779,28 @@ func (s *adminServer) SetUIData( for key, val := range req.KeyValues { // Do an upsert of the key. We update each key in a separate transaction to // avoid long-running transactions and possible deadlocks. - query := `UPSERT INTO system.ui (key, value, "lastUpdated") VALUES ($1, $2, now())` - rowsAffected, err := s.server.sqlServer.internalExecutor.ExecEx( - ctx, "admin-set-ui-data", nil, /* txn */ - sessiondata.InternalExecutorOverride{ - User: username.RootUserName(), - }, - query, makeUIKey(userName, key), val) - if err != nil { - return nil, serverError(ctx, err) - } - if rowsAffected != 1 { - return nil, serverErrorf(ctx, "rows affected %d != expected %d", rowsAffected, 1) + + if err := s.server.sqlServer.internalExecutorFactory.RunWithoutTxn(ctx, func( + ctx context.Context, ie sqlutil.InternalExecutor, + ) error { + query := `UPSERT INTO system.ui (key, value, "lastUpdated") VALUES ($1, $2, now())` + rowsAffected, err := ie.ExecEx( + ctx, "admin-set-ui-data", nil, /* txn */ + sessiondata.InternalExecutorOverride{ + User: username.RootUserName(), + }, + query, makeUIKey(userName, key), val) + if err != nil { + return serverError(ctx, err) + } + if rowsAffected != 1 { + return serverErrorf(ctx, "rows affected %d != expected %d", rowsAffected, 1) + } + return nil + }); err != nil { + return nil, err } } - return &serverpb.SetUIDataResponse{}, nil } diff --git a/pkg/server/server.go b/pkg/server/server.go index 98aa504b633c..08afc206010b 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -803,6 +803,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { closedSessionCache: closedSessionCache, flowScheduler: flowScheduler, circularInternalExecutor: internalExecutor, + internalExecutorFactory: nil, // will be initialized in server.newSQLServer. circularJobRegistry: jobRegistry, jobAdoptionStopFile: jobAdoptionStopFile, protectedtsProvider: protectedtsProvider, diff --git a/pkg/server/server_internal_executor_factory_test.go b/pkg/server/server_internal_executor_factory_test.go index 7f767c6cccc8..38b6a9afd5c3 100644 --- a/pkg/server/server_internal_executor_factory_test.go +++ b/pkg/server/server_internal_executor_factory_test.go @@ -34,7 +34,7 @@ func TestInternalExecutorClearsMonitorMemory(t *testing.T) { mon := s.(*TestServer).sqlServer.internalExecutorFactoryMemMonitor ief := s.ExecutorConfig().(sql.ExecutorConfig).InternalExecutorFactory sessionData := sql.NewFakeSessionData(&s.ClusterSettings().SV) - ie := ief(ctx, sessionData) + ie := ief.NewInternalExecutor(sessionData) rows, err := ie.QueryIteratorEx(ctx, "test", nil, sessiondata.NodeUserSessionDataOverride, `SELECT 1`) require.NoError(t, err) require.Greater(t, mon.AllocBytes(), int64(0)) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 427121f8907c..3ccc6ad8d69f 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -130,18 +130,19 @@ import ( // standalone SQLServer instances per tenant (the KV layer is shared across all // tenants). type SQLServer struct { - ambientCtx log.AmbientContext - stopper *stop.Stopper - sqlIDContainer *base.SQLIDContainer - pgServer *pgwire.Server - distSQLServer *distsql.ServerImpl - execCfg *sql.ExecutorConfig - cfg *BaseConfig - internalExecutor *sql.InternalExecutor - leaseMgr *lease.Manager - blobService *blobs.Service - tracingService *service.Service - tenantConnect kvtenant.Connector + ambientCtx log.AmbientContext + stopper *stop.Stopper + sqlIDContainer *base.SQLIDContainer + pgServer *pgwire.Server + distSQLServer *distsql.ServerImpl + execCfg *sql.ExecutorConfig + cfg *BaseConfig + internalExecutor *sql.InternalExecutor + internalExecutorFactory sqlutil.InternalExecutorFactory + leaseMgr *lease.Manager + blobService *blobs.Service + tracingService *service.Service + tenantConnect kvtenant.Connector // sessionRegistry can be queried for info on running SQL sessions. It is // shared between the sql.Server and the statusServer. sessionRegistry *sql.SessionRegistry @@ -303,6 +304,9 @@ type sqlServerArgs struct { // TODO(tbg): make this less hacky. circularInternalExecutor *sql.InternalExecutor // empty initially + // internalExecutorFactory is to initialize an internal executor. + internalExecutorFactory sqlutil.InternalExecutorFactory + // Stores and deletes expired liveness sessions. sqlLivenessProvider sqlliveness.Provider @@ -947,18 +951,18 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { ieFactoryMonitor.StartNoReserved(ctx, pgServer.SQLServer.GetBytesMonitor()) // Now that we have a pgwire.Server (which has a sql.Server), we can close a // circular dependency between the rowexec.Server and sql.Server and set - // SessionBoundInternalExecutorFactory. The same applies for setting a + // InternalExecutorFactory. The same applies for setting a // SessionBoundInternalExecutor on the job registry. - ieFactory := func( - ctx context.Context, sessionData *sessiondata.SessionData, - ) sqlutil.InternalExecutor { - ie := sql.MakeInternalExecutor(pgServer.SQLServer, internalMemMetrics, ieFactoryMonitor) - ie.SetSessionData(sessionData) - return &ie - } + ieFactory := sql.NewInternalExecutorFactory( + pgServer.SQLServer, + internalMemMetrics, + ieFactoryMonitor, + ) + + collectionFactory.SetInternalExecutorWithTxn(ieFactory) - distSQLServer.ServerConfig.SessionBoundInternalExecutorFactory = ieFactory - jobRegistry.SetSessionBoundInternalExecutorFactory(ieFactory) + distSQLServer.ServerConfig.InternalExecutorFactory = ieFactory + jobRegistry.SetInternalExecutorFactory(ieFactory) execCfg.IndexBackfiller = sql.NewIndexBackfiller(execCfg) execCfg.IndexMerger = sql.NewIndexBackfillerMergePlanner(execCfg) execCfg.IndexValidator = scdeps.NewIndexValidator( @@ -978,6 +982,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.registry.AddMetricStruct(m) } *cfg.circularInternalExecutor = sql.MakeInternalExecutor(pgServer.SQLServer, internalMemMetrics, ieFactoryMonitor) + cfg.internalExecutorFactory = ieFactory execCfg.InternalExecutor = cfg.circularInternalExecutor stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry( cfg.circularInternalExecutor, @@ -1070,7 +1075,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.db, codec, cfg.registry, - distSQLServer.ServerConfig.SessionBoundInternalExecutorFactory, + distSQLServer.ServerConfig.InternalExecutorFactory, cfg.sqlStatusServer, cfg.isMeta1Leaseholder, sqlExecutorTestingKnobs, @@ -1118,6 +1123,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { distSQLServer: distSQLServer, execCfg: execCfg, internalExecutor: cfg.circularInternalExecutor, + internalExecutorFactory: cfg.internalExecutorFactory, leaseMgr: leaseMgr, blobService: blobService, tracingService: tracingService, diff --git a/pkg/sql/alter_table.go b/pkg/sql/alter_table.go index fbca74993427..f7f6d8e7212d 100644 --- a/pkg/sql/alter_table.go +++ b/pkg/sql/alter_table.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -43,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/storageparam" "github.com/cockroachdb/cockroach/pkg/sql/storageparam/tablestorageparam" @@ -558,10 +560,9 @@ func (n *alterTableNode) startExec(params runParams) error { return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState, "constraint %q in the middle of being added, try again later", t.Constraint) } - if err := validateCheckInTxn( - params.ctx, ¶ms.p.semaCtx, params.ExecCfg().InternalExecutorFactory, - params.SessionData(), n.tableDesc, params.p.Txn(), ck.Expr, - ); err != nil { + if err := params.p.WithInternalExecutor(params.ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + return validateCheckInTxn(ctx, ¶ms.p.semaCtx, params.p.SessionData(), n.tableDesc, txn, ie, ck.Expr) + }); err != nil { return err } ck.Validity = descpb.ConstraintValidity_Validated @@ -581,19 +582,12 @@ func (n *alterTableNode) startExec(params runParams) error { return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState, "constraint %q in the middle of being added, try again later", t.Constraint) } - if err := validateFkInTxn( - params.ctx, - params.ExecCfg().InternalExecutorFactory, - params.p.SessionData(), - n.tableDesc, - params.p.Txn(), - params.p.Descriptors(), - name, - ); err != nil { + if err := params.p.WithInternalExecutor(params.ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + return validateFkInTxn(ctx, n.tableDesc, txn, ie, params.p.descCollection, name) + }); err != nil { return err } foundFk.Validity = descpb.ConstraintValidity_Validated - case descpb.ConstraintTypeUnique: if constraint.Index == nil { var foundUnique *descpb.UniqueWithoutIndexConstraint @@ -610,11 +604,16 @@ func (n *alterTableNode) startExec(params runParams) error { return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState, "constraint %q in the middle of being added, try again later", t.Constraint) } - if err := validateUniqueWithoutIndexConstraintInTxn( - params.ctx, params.ExecCfg().InternalExecutorFactory( - params.ctx, params.SessionData(), - ), n.tableDesc, params.p.Txn(), params.p.User(), name, - ); err != nil { + if err := params.p.WithInternalExecutor(params.ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + return validateUniqueWithoutIndexConstraintInTxn( + params.ctx, + n.tableDesc, + txn, + ie, + params.p.User(), + name, + ) + }); err != nil { return err } foundUnique.Validity = descpb.ConstraintValidity_Validated diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 97736aa3083c..45453d71df84 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -137,7 +137,7 @@ func (sc *SchemaChanger) getChunkSize(chunkSize int64) int64 { } // scTxnFn is the type of functions that operates using transactions in the backfiller. -type scTxnFn func(ctx context.Context, txn *kv.Txn, evalCtx *extendedEvalContext) error +type scTxnFn func(ctx context.Context, txn *kv.Txn, evalCtx *extendedEvalContext, ie sqlutil.InternalExecutor) error // historicalTxnRunner is the type of the callback used by the various // helper functions to run checks at a fixed timestamp (logically, at @@ -147,12 +147,16 @@ type historicalTxnRunner func(ctx context.Context, fn scTxnFn) error // makeFixedTimestampRunner creates a historicalTxnRunner suitable for use by the helpers. func (sc *SchemaChanger) makeFixedTimestampRunner(readAsOf hlc.Timestamp) historicalTxnRunner { runner := func(ctx context.Context, retryable scTxnFn) error { - return sc.fixedTimestampTxn(ctx, readAsOf, func( - ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + return sc.fixedTimestampTxnWithExecutor(ctx, readAsOf, func( + ctx context.Context, + txn *kv.Txn, + sd *sessiondata.SessionData, + descriptors *descs.Collection, + ie sqlutil.InternalExecutor, ) error { // We need to re-create the evalCtx since the txn may retry. - evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, readAsOf, descriptors) - return retryable(ctx, txn, &evalCtx) + evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, sd, readAsOf, descriptors) + return retryable(ctx, txn, &evalCtx, ie) }) } return runner @@ -167,7 +171,7 @@ func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { // We need to re-create the evalCtx since the txn may retry. - ie := sc.ieFactory(ctx, NewFakeSessionData(sc.execCfg.SV())) + ie := sc.ieFactory.NewInternalExecutor(NewFakeSessionData(sc.execCfg.SV())) return retryable(ctx, txn, ie) }) } @@ -187,6 +191,26 @@ func (sc *SchemaChanger) fixedTimestampTxn( }) } +func (sc *SchemaChanger) fixedTimestampTxnWithExecutor( + ctx context.Context, + readAsOf hlc.Timestamp, + retryable func( + ctx context.Context, + txn *kv.Txn, + sd *sessiondata.SessionData, + descriptors *descs.Collection, + ie sqlutil.InternalExecutor, + ) error, +) error { + sd := NewFakeSessionData(sc.execCfg.SV()) + return sc.txnWithExecutor(ctx, sd, func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor) error { + if err := txn.SetFixedTimestamp(ctx, readAsOf); err != nil { + return err + } + return retryable(ctx, txn, sd, descriptors, ie) + }) +} + // runBackfill runs the backfill for the schema changer. // // This operates over multiple goroutines concurrently and is thus not @@ -739,7 +763,7 @@ func (sc *SchemaChanger) validateConstraints( } desc := descI.(*tabledesc.Mutable) // Each check operates at the historical timestamp. - return runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, evalCtx *extendedEvalContext) error { + return runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, evalCtx *extendedEvalContext, ie sqlutil.InternalExecutor) error { // If the constraint is a check constraint that fails validation, we // need a semaContext set up that can resolve types in order to pretty // print the check expression back to the user. @@ -754,23 +778,21 @@ func (sc *SchemaChanger) validateConstraints( defer func() { collection.ReleaseAll(ctx) }() if c.IsCheck() { if err := validateCheckInTxn( - ctx, &semaCtx, sc.ieFactory, evalCtx.SessionData(), desc, txn, c.Check().Expr, + ctx, &semaCtx, evalCtx.SessionData(), desc, txn, ie, c.Check().Expr, ); err != nil { return err } } else if c.IsForeignKey() { - if err := validateFkInTxn(ctx, sc.ieFactory, evalCtx.SessionData(), desc, txn, collection, c.GetName()); err != nil { + if err := validateFkInTxn(ctx, desc, txn, ie, collection, c.GetName()); err != nil { return err } } else if c.IsUniqueWithoutIndex() { - if err := validateUniqueWithoutIndexConstraintInTxn( - ctx, sc.ieFactory(ctx, evalCtx.SessionData()), desc, txn, evalCtx.SessionData().User(), c.GetName(), - ); err != nil { + if err := validateUniqueWithoutIndexConstraintInTxn(ctx, desc, txn, ie, evalCtx.SessionData().User(), c.GetName()); err != nil { return err } } else if c.IsNotNull() { if err := validateCheckInTxn( - ctx, &semaCtx, sc.ieFactory, evalCtx.SessionData(), desc, txn, c.Check().Expr, + ctx, &semaCtx, evalCtx.SessionData(), desc, txn, ie, c.Check().Expr, ); err != nil { // TODO (lucy): This should distinguish between constraint // validation errors and other types of unexpected errors, and @@ -999,7 +1021,8 @@ func (sc *SchemaChanger) distIndexBackfill( if err != nil { return err } - evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), descriptors) + sd := NewFakeSessionData(sc.execCfg.SV()) + evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, sd, txn.ReadTimestamp(), descriptors) planCtx = sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil, /* planner */ txn, DistributionTypeSystemTenantOnly) indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV) @@ -1295,7 +1318,8 @@ func (sc *SchemaChanger) distColumnBackfill( return nil } cbw := MetadataCallbackWriter{rowResultWriter: &errOnlyResultWriter{}, fn: metaFn} - evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), descriptors) + sd := NewFakeSessionData(sc.execCfg.SV()) + evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, sd, txn.ReadTimestamp(), descriptors) recv := MakeDistSQLReceiver( ctx, &cbw, @@ -2381,10 +2405,9 @@ func runSchemaChangesInTxn( if c.IsCheck() || c.IsNotNull() { check := &c.ConstraintToUpdateDesc().Check if check.Validity == descpb.ConstraintValidity_Validating { - if err := validateCheckInTxn( - ctx, &planner.semaCtx, planner.ExecCfg().InternalExecutorFactory, - planner.SessionData(), tableDesc, planner.txn, check.Expr, - ); err != nil { + if err := planner.WithInternalExecutor(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + return validateCheckInTxn(ctx, &planner.semaCtx, planner.SessionData(), tableDesc, txn, ie, check.Expr) + }); err != nil { return err } check.Validity = descpb.ConstraintValidity_Validated @@ -2406,9 +2429,17 @@ func runSchemaChangesInTxn( } else if c.IsUniqueWithoutIndex() { uwi := &c.ConstraintToUpdateDesc().UniqueWithoutIndexConstraint if uwi.Validity == descpb.ConstraintValidity_Validating { - if err := validateUniqueWithoutIndexConstraintInTxn( - ctx, planner.ExecCfg().InternalExecutor, tableDesc, planner.txn, planner.User(), c.GetName(), - ); err != nil { + + if err := planner.WithInternalExecutor(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + return validateUniqueWithoutIndexConstraintInTxn( + ctx, + tableDesc, + txn, + ie, + planner.User(), + c.GetName(), + ) + }); err != nil { return err } uwi.Validity = descpb.ConstraintValidity_Validated @@ -2480,48 +2511,40 @@ func runSchemaChangesInTxn( func validateCheckInTxn( ctx context.Context, semaCtx *tree.SemaContext, - ief sqlutil.SessionBoundInternalExecutorFactory, sessionData *sessiondata.SessionData, tableDesc *tabledesc.Mutable, txn *kv.Txn, + ie sqlutil.InternalExecutor, checkExpr string, ) error { var syntheticDescs []catalog.Descriptor if tableDesc.Version > tableDesc.ClusterVersion().Version { syntheticDescs = append(syntheticDescs, tableDesc) } - ie := ief(ctx, sessionData) - return ie.WithSyntheticDescriptors(syntheticDescs, func() error { - return validateCheckExpr(ctx, semaCtx, sessionData, checkExpr, tableDesc, ie, txn) - }) + + return ie.WithSyntheticDescriptors( + syntheticDescs, + func() error { + return validateCheckExpr(ctx, semaCtx, txn, sessionData, checkExpr, tableDesc, ie) + }) } -// validateFkInTxn validates foreign key constraints within the provided -// transaction. If the provided table descriptor version is newer than the -// cluster version, it will be used in the InternalExecutor that performs the -// validation query. -// -// TODO (lucy): The special case where the table descriptor version is the same -// as the cluster version only happens because the query in VALIDATE CONSTRAINT -// still runs in the user transaction instead of a step in the schema changer. -// When that's no longer true, this function should be updated. -// -// It operates entirely on the current goroutine and is thus able to -// reuse an existing kv.Txn safely. -func validateFkInTxn( +func getTargetTablesAndFk( ctx context.Context, - ief sqlutil.SessionBoundInternalExecutorFactory, - sd *sessiondata.SessionData, srcTable *tabledesc.Mutable, txn *kv.Txn, descsCol *descs.Collection, fkName string, -) error { +) ( + syntheticDescs []catalog.Descriptor, + fk *descpb.ForeignKeyConstraint, + targetTable catalog.TableDescriptor, + err error, +) { var syntheticTable catalog.TableDescriptor if srcTable.Version > srcTable.ClusterVersion().Version { syntheticTable = srcTable } - var fk *descpb.ForeignKeyConstraint for i := range srcTable.OutboundFKs { def := &srcTable.OutboundFKs[i] if def.Name == fkName { @@ -2530,23 +2553,51 @@ func validateFkInTxn( } } if fk == nil { - return errors.AssertionFailedf("foreign key %s does not exist", fkName) + return nil, nil, nil, errors.AssertionFailedf("foreign key %s does not exist", fkName) } - targetTable, err := descsCol.Direct().MustGetTableDescByID(ctx, txn, fk.ReferencedTableID) + targetTable, err = descsCol.Direct().MustGetTableDescByID(ctx, txn, fk.ReferencedTableID) if err != nil { - return err + return nil, nil, nil, err } - var syntheticDescs []catalog.Descriptor if syntheticTable != nil { syntheticDescs = append(syntheticDescs, syntheticTable) if targetTable.GetID() == syntheticTable.GetID() { targetTable = syntheticTable } } - ie := ief(ctx, sd) - return ie.WithSyntheticDescriptors(syntheticDescs, func() error { - return validateForeignKey(ctx, srcTable, targetTable, fk, ie, txn) - }) + return syntheticDescs, fk, targetTable, nil +} + +// validateFkInTxn validates foreign key constraints within the provided +// transaction. If the provided table descriptor version is newer than the +// cluster version, it will be used in the InternalExecutor that performs the +// validation query. +// +// TODO (lucy): The special case where the table descriptor version is the same +// as the cluster version only happens because the query in VALIDATE CONSTRAINT +// still runs in the user transaction instead of a step in the schema changer. +// When that's no longer true, this function should be updated. +// +// It operates entirely on the current goroutine and is thus able to +// reuse an existing kv.Txn safely. +func validateFkInTxn( + ctx context.Context, + srcTable *tabledesc.Mutable, + txn *kv.Txn, + ie sqlutil.InternalExecutor, + descsCol *descs.Collection, + fkName string, +) error { + syntheticDescs, fk, targetTable, err := getTargetTablesAndFk(ctx, srcTable, txn, descsCol, fkName) + if err != nil { + return err + } + + return ie.WithSyntheticDescriptors( + syntheticDescs, + func() error { + return validateForeignKey(ctx, srcTable, targetTable, fk, ie, txn) + }) } // validateUniqueWithoutIndexConstraintInTxn validates a unique constraint @@ -2563,9 +2614,9 @@ func validateFkInTxn( // reuse an existing kv.Txn safely. func validateUniqueWithoutIndexConstraintInTxn( ctx context.Context, - ie sqlutil.InternalExecutor, tableDesc *tabledesc.Mutable, txn *kv.Txn, + ie sqlutil.InternalExecutor, user username.SQLUsername, constraintName string, ) error { @@ -2573,7 +2624,6 @@ func validateUniqueWithoutIndexConstraintInTxn( if tableDesc.Version > tableDesc.ClusterVersion().Version { syntheticDescs = append(syntheticDescs, tableDesc) } - var uc *descpb.UniqueWithoutIndexConstraint for i := range tableDesc.UniqueWithoutIndexConstraints { def := &tableDesc.UniqueWithoutIndexConstraints[i] @@ -2586,19 +2636,21 @@ func validateUniqueWithoutIndexConstraintInTxn( return errors.AssertionFailedf("unique constraint %s does not exist", constraintName) } - return ie.WithSyntheticDescriptors(syntheticDescs, func() error { - return validateUniqueConstraint( - ctx, - tableDesc, - uc.Name, - uc.ColumnIDs, - uc.Predicate, - ie, - txn, - user, - false, /* preExisting */ - ) - }) + return ie.WithSyntheticDescriptors( + syntheticDescs, + func() error { + return validateUniqueConstraint( + ctx, + tableDesc, + uc.Name, + uc.ColumnIDs, + uc.Predicate, + ie, + txn, + user, + false, /* preExisting */ + ) + }) } // columnBackfillInTxn backfills columns for all mutation columns in diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index 3262c16bd430..12ea3e4e1119 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -39,8 +39,8 @@ import ( "github.com/cockroachdb/errors" ) -// makeCollection constructs a Collection. -func makeCollection( +// newCollection constructs a Collection. +func newCollection( ctx context.Context, leaseMgr *lease.Manager, settings *cluster.Settings, @@ -50,8 +50,8 @@ func makeCollection( virtualSchemas catalog.VirtualSchemas, temporarySchemaProvider TemporarySchemaProvider, monitor *mon.BytesMonitor, -) Collection { - return Collection{ +) *Collection { + return &Collection{ settings: settings, version: settings.Version.ActiveVersion(ctx), hydrated: hydrated, @@ -178,11 +178,16 @@ func (tc *Collection) ReleaseLeases(ctx context.Context) { func (tc *Collection) ReleaseAll(ctx context.Context) { tc.ReleaseLeases(ctx) tc.stored.reset(ctx) - tc.synthetic.reset() + tc.ResetSyntheticDescriptors() tc.deletedDescs = catalog.DescriptorIDSet{} tc.skipValidationOnWrite = false } +// ResetSyntheticDescriptors clear all syntheticDescriptors. +func (tc *Collection) ResetSyntheticDescriptors() { + tc.synthetic.reset() +} + // HasUncommittedTables returns true if the Collection contains uncommitted // tables. func (tc *Collection) HasUncommittedTables() bool { diff --git a/pkg/sql/catalog/descs/collection_test.go b/pkg/sql/catalog/descs/collection_test.go index 7024a8fce413..0fd7f38cf800 100644 --- a/pkg/sql/catalog/descs/collection_test.go +++ b/pkg/sql/catalog/descs/collection_test.go @@ -68,7 +68,7 @@ func TestCollectionWriteDescToBatch(t *testing.T) { db := s0.DB() descriptors := s0.ExecutorConfig().(sql.ExecutorConfig).CollectionFactory. - NewCollection(ctx, nil /* TemporarySchemaProvider */) + NewCollection(ctx, nil /* TemporarySchemaProvider */, nil /* Monitor */) // Note this transaction abuses the mechanisms normally required for updating // tables and is just for testing what this test intends to exercise. @@ -640,7 +640,7 @@ func TestCollectionProperlyUsesMemoryMonitoring(t *testing.T) { // Create a `Collection` with monitor hooked up. col := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).CollectionFactory. - MakeCollection(ctx, nil /* temporarySchemaProvider */, monitor) + NewCollection(ctx, nil /* temporarySchemaProvider */, monitor) require.Equal(t, int64(0), monitor.AllocBytes()) // Read all the descriptors into `col` and assert this read will finish without error. @@ -659,7 +659,7 @@ func TestCollectionProperlyUsesMemoryMonitoring(t *testing.T) { // Repeat the process again and assert this time memory allocation will err out. col = tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).CollectionFactory. - MakeCollection(ctx, nil /* temporarySchemaProvider */, monitor) + NewCollection(ctx, nil /* temporarySchemaProvider */, monitor) _, err2 := col.GetAllDescriptors(ctx, txn) require.Error(t, err2) diff --git a/pkg/sql/catalog/descs/factory.go b/pkg/sql/catalog/descs/factory.go index c77ea48d5275..3436908558f7 100644 --- a/pkg/sql/catalog/descs/factory.go +++ b/pkg/sql/catalog/descs/factory.go @@ -14,11 +14,15 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/hydrateddesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/mon" ) @@ -33,6 +37,19 @@ type CollectionFactory struct { spanConfigSplitter spanconfig.Splitter spanConfigLimiter spanconfig.Limiter defaultMonitor *mon.BytesMonitor + ieFactoryWithTxn InternalExecutorFactoryWithTxn +} + +// InternalExecutorFactoryWithTxn is used to create an internal executor +// with associated extra txn state information. +// It should only be used as a field hanging off CollectionFactory. +type InternalExecutorFactoryWithTxn interface { + NewInternalExecutorWithTxn( + sd *sessiondata.SessionData, + sv *settings.Values, + txn *kv.Txn, + descCol *Collection, + ) (sqlutil.InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc) } // NewCollectionFactory constructs a new CollectionFactory which holds onto @@ -72,24 +89,23 @@ func NewBareBonesCollectionFactory( } } -// MakeCollection constructs a Collection for the purposes of embedding. -func (cf *CollectionFactory) MakeCollection( +// NewCollection constructs a new Collection. +func (cf *CollectionFactory) NewCollection( ctx context.Context, temporarySchemaProvider TemporarySchemaProvider, monitor *mon.BytesMonitor, -) Collection { +) *Collection { if monitor == nil { // If an upstream monitor is not provided, the default, unlimited monitor will be used. // All downstream resource allocation/releases on this default monitor will then be no-ops. monitor = cf.defaultMonitor } - - return makeCollection(ctx, cf.leaseMgr, cf.settings, cf.codec, cf.hydrated, cf.systemDatabase, + return newCollection(ctx, cf.leaseMgr, cf.settings, cf.codec, cf.hydrated, cf.systemDatabase, cf.virtualSchemas, temporarySchemaProvider, monitor) } -// NewCollection constructs a new Collection. -func (cf *CollectionFactory) NewCollection( - ctx context.Context, temporarySchemaProvider TemporarySchemaProvider, -) *Collection { - c := cf.MakeCollection(ctx, temporarySchemaProvider, nil /* monitor */) - return &c +// SetInternalExecutorWithTxn is to set the internal executor factory hanging +// off the collection factory. +func (cf *CollectionFactory) SetInternalExecutorWithTxn( + ieFactoryWithTxn InternalExecutorFactoryWithTxn, +) { + cf.ieFactoryWithTxn = ieFactoryWithTxn } diff --git a/pkg/sql/catalog/descs/txn.go b/pkg/sql/catalog/descs/txn.go index 6226a8c679c1..ee44ec4d3a0b 100644 --- a/pkg/sql/catalog/descs/txn.go +++ b/pkg/sql/catalog/descs/txn.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -40,6 +41,7 @@ var errTwoVersionInvariantViolated = errors.Errorf("two version invariant violat // // The passed transaction is pre-emptively anchored to the system config key on // the system tenant. +// Deprecated: Use cf.TxnWithExecutor(). func (cf *CollectionFactory) Txn( ctx context.Context, ie sqlutil.InternalExecutor, @@ -75,13 +77,13 @@ func (cf *CollectionFactory) Txn( for { var modifiedDescriptors []lease.IDVersion var deletedDescs catalog.DescriptorIDSet - var descsCol Collection + var descsCol *Collection if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { modifiedDescriptors = nil deletedDescs = catalog.DescriptorIDSet{} - descsCol = cf.MakeCollection(ctx, nil /* temporarySchemaProvider */, nil /* monitor */) + descsCol = cf.NewCollection(ctx, nil /* temporarySchemaProvider */, nil /* monitor */) defer descsCol.ReleaseAll(ctx) - if err := f(ctx, txn, &descsCol); err != nil { + if err := f(ctx, txn, descsCol); err != nil { return err } @@ -91,12 +93,104 @@ func (cf *CollectionFactory) Txn( modifiedDescriptors = descsCol.GetDescriptorsWithNewVersion() if err := CheckSpanCountLimit( - ctx, &descsCol, cf.spanConfigSplitter, cf.spanConfigLimiter, txn, + ctx, descsCol, cf.spanConfigSplitter, cf.spanConfigLimiter, txn, ); err != nil { return err } retryErr, err := CheckTwoVersionInvariant( - ctx, db.Clock(), ie, &descsCol, txn, nil /* onRetryBackoff */) + ctx, db.Clock(), ie, descsCol, txn, nil /* onRetryBackoff */) + if retryErr { + return errTwoVersionInvariantViolated + } + deletedDescs = descsCol.deletedDescs + return err + }); errors.Is(err, errTwoVersionInvariantViolated) { + continue + } else { + if err == nil { + err = waitForDescriptors(modifiedDescriptors, deletedDescs) + } + return err + } + } +} + +// TxnWithExecutor enables callers to run transactions with a *Collection such that all +// retrieved immutable descriptors are properly leased and all mutable +// descriptors are handled. The function deals with verifying the two version +// invariant and retrying when it is violated. Callers need not worry that they +// write mutable descriptors multiple times. The call will explicitly wait for +// the leases to drain on old versions of descriptors modified or deleted in the +// transaction; callers do not need to call lease.WaitForOneVersion. +// It also enables using internal executor to run sql queries in a txn manner. +// +// The passed transaction is pre-emptively anchored to the system config key on +// the system tenant. +func (cf *CollectionFactory) TxnWithExecutor( + ctx context.Context, + db *kv.DB, + sd *sessiondata.SessionData, + f func(ctx context.Context, txn *kv.Txn, descriptors *Collection, ie sqlutil.InternalExecutor) error, +) error { + // Waits for descriptors that were modified, skipping + // over ones that had their descriptor wiped. + waitForDescriptors := func(modifiedDescriptors []lease.IDVersion, deletedDescs catalog.DescriptorIDSet) error { + // Wait for a single version on leased descriptors. + for _, ld := range modifiedDescriptors { + waitForNoVersion := deletedDescs.Contains(ld.ID) + retryOpts := retry.Options{ + InitialBackoff: time.Millisecond, + Multiplier: 1.5, + MaxBackoff: time.Second, + } + // Detect unpublished ones. + if waitForNoVersion { + err := cf.leaseMgr.WaitForNoVersion(ctx, ld.ID, retryOpts) + if err != nil { + return err + } + } else { + _, err := cf.leaseMgr.WaitForOneVersion(ctx, ld.ID, retryOpts) + if err != nil { + return err + } + } + } + return nil + } + for { + var modifiedDescriptors []lease.IDVersion + var deletedDescs catalog.DescriptorIDSet + var descsCol *Collection + if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + modifiedDescriptors = nil + deletedDescs = catalog.DescriptorIDSet{} + descsCol = cf.NewCollection(ctx, nil /* temporarySchemaProvider */, nil /* monitor */) + defer func() { + descsCol.ReleaseAll(ctx) + }() + + ie, commitTxnFn := cf.ieFactoryWithTxn.NewInternalExecutorWithTxn(sd, &cf.settings.SV, txn, descsCol) + if err := f(ctx, txn, descsCol, ie); err != nil { + return err + } + + if err := commitTxnFn(ctx); err != nil { + return err + } + + if err := descsCol.ValidateUncommittedDescriptors(ctx, txn); err != nil { + return err + } + modifiedDescriptors = descsCol.GetDescriptorsWithNewVersion() + + if err := CheckSpanCountLimit( + ctx, descsCol, cf.spanConfigSplitter, cf.spanConfigLimiter, txn, + ); err != nil { + return err + } + retryErr, err := CheckTwoVersionInvariant( + ctx, db.Clock(), ie, descsCol, txn, nil /* onRetryBackoff */) if retryErr { return errTwoVersionInvariantViolated } diff --git a/pkg/sql/check.go b/pkg/sql/check.go index 5819ac19405c..9bbdf49d3f28 100644 --- a/pkg/sql/check.go +++ b/pkg/sql/check.go @@ -43,11 +43,11 @@ import ( func validateCheckExpr( ctx context.Context, semaCtx *tree.SemaContext, + txn *kv.Txn, sessionData *sessiondata.SessionData, exprStr string, tableDesc *tabledesc.Mutable, ie sqlutil.InternalExecutor, - txn *kv.Txn, ) error { expr, err := schemaexpr.FormatExprForDisplay(ctx, tableDesc, exprStr, semaCtx, sessionData, tree.FmtParsable) if err != nil { @@ -57,8 +57,14 @@ func validateCheckExpr( columns := tree.AsStringWithFlags(&colSelectors, tree.FmtSerializable) queryStr := fmt.Sprintf(`SELECT %s FROM [%d AS t] WHERE NOT (%s) LIMIT 1`, columns, tableDesc.GetID(), exprStr) log.Infof(ctx, "validating check constraint %q with query %q", expr, queryStr) - - rows, err := ie.QueryRow(ctx, "validate check constraint", txn, queryStr) + rows, err := ie.QueryRowEx( + ctx, + "validate check constraint", + txn, + sessiondata.InternalExecutorOverride{ + User: username.RootUserName(), + }, + queryStr) if err != nil { return err } @@ -269,7 +275,8 @@ func validateForeignKey( ) values, err := ie.QueryRowEx(ctx, "validate foreign key constraint", - txn, sessiondata.NodeUserSessionDataOverride, query) + txn, + sessiondata.NodeUserSessionDataOverride, query) if err != nil { return err } diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 74ca33d1bcc0..dfa34b64daaf 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -29,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descidgen" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -719,6 +718,7 @@ func (s *Server) SetupConn( ex := s.newConnExecutor( ctx, sdMutIterator, stmtBuf, clientComm, memMetrics, &s.Metrics, s.sqlStats.GetApplicationStats(sd.ApplicationName), + nil, /* postSetupFn */ ) return ConnectionHandler{ex}, nil } @@ -878,6 +878,10 @@ func (s *Server) newConnExecutor( memMetrics MemoryMetrics, srvMetrics *Metrics, applicationStats sqlstats.ApplicationStats, + // postSetupFn is to override certain field of a conn executor. + // It is set when conn executor is init under an internal executor + // with a not-nil txn. + postSetupFn func(ex *connExecutor), ) *connExecutor { // Create the various monitors. // The session monitors are started in activate(). @@ -981,10 +985,10 @@ func (s *Server) newConnExecutor( portals: make(map[string]PreparedPortal), } ex.extraTxnState.prepStmtsNamespaceMemAcc = ex.sessionMon.MakeBoundAccount() - ex.extraTxnState.descCollection = s.cfg.CollectionFactory.MakeCollection(ctx, descs.NewTemporarySchemaProvider(sdMutIterator.sds), ex.sessionMon) + ex.extraTxnState.descCollection = s.cfg.CollectionFactory.NewCollection(ctx, descs.NewTemporarySchemaProvider(sdMutIterator.sds), ex.sessionMon) ex.extraTxnState.txnRewindPos = -1 ex.extraTxnState.schemaChangeJobRecords = make(map[descpb.ID]*jobs.Record) - ex.extraTxnState.schemaChangerState = SchemaChangerState{ + ex.extraTxnState.schemaChangerState = &SchemaChangerState{ mode: ex.sessionData().NewSchemaChangerMode, } ex.queryCancelKey = pgwirecancel.MakeBackendKeyData(ex.rng, ex.server.cfg.NodeInfo.NodeID.SQLInstanceID()) @@ -997,76 +1001,12 @@ func (s *Server) newConnExecutor( ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{} ex.extraTxnState.createdSequences = make(map[descpb.ID]struct{}) - ex.initPlanner(ctx, &ex.planner) - - return ex -} - -// newConnExecutorWithTxn creates a connExecutor that will execute statements -// under a higher-level txn. This connExecutor runs with a different state -// machine, much reduced from the regular one. It cannot initiate or end -// transactions (so, no BEGIN, COMMIT, ROLLBACK, no auto-commit, no automatic -// retries). -// -// If there is no error, this function also activate()s the returned -// executor, so the caller does not need to run the -// activation. However this means that run() or close() must be called -// to release resources. -func (s *Server) newConnExecutorWithTxn( - ctx context.Context, - sdMutIterator *sessionDataMutatorIterator, - stmtBuf *StmtBuf, - clientComm ClientComm, - parentMon *mon.BytesMonitor, - memMetrics MemoryMetrics, - srvMetrics *Metrics, - txn *kv.Txn, - syntheticDescs []catalog.Descriptor, - applicationStats sqlstats.ApplicationStats, -) *connExecutor { - ex := s.newConnExecutor( - ctx, - sdMutIterator, - stmtBuf, - clientComm, - memMetrics, - srvMetrics, - applicationStats, - ) - if txn.Type() == kv.LeafTxn { - // If the txn is a leaf txn it is not allowed to perform mutations. For - // sanity, set read only on the session. - ex.dataMutatorIterator.applyOnEachMutator(func(m sessionDataMutator) { - m.SetReadOnly(true) - }) + if postSetupFn != nil { + postSetupFn(ex) } - // The new transaction stuff below requires active monitors and traces, so - // we need to activate the executor now. - ex.activate(ctx, parentMon, &mon.BoundAccount{}) + ex.initPlanner(ctx, &ex.planner) - // Perform some surgery on the executor - replace its state machine and - // initialize the state. - ex.machine = fsm.MakeMachine( - BoundTxnStateTransitions, - stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, - &ex.state, - ) - ex.state.resetForNewSQLTxn( - ctx, - explicitTxn, - txn.ReadTimestamp().GoTime(), - nil, /* historicalTimestamp */ - roachpb.UnspecifiedUserPriority, - tree.ReadWrite, - txn, - ex.transitionCtx, - ex.QualityOfService()) - - // Modify the Collection to match the parent executor's Collection. - // This allows the InternalExecutor to see schema changes made by the - // parent executor. - ex.extraTxnState.descCollection.SetSyntheticDescriptors(syntheticDescs) return ex } @@ -1288,8 +1228,15 @@ type connExecutor struct { // the field is accessed in connExecutor's serialize function, it should be // added to txnState behind the mutex. extraTxnState struct { + // fromOuterTxn should be set true if the conn executor is run under an + // internal executor with an outer txn, which means when the conn executor + // closes, it should not release the leases of descriptor collections or + // delete schema change job records. Instead, we leave the caller of the + // internal executor to release them. + fromOuterTxn bool + // descCollection collects descriptors used by the current transaction. - descCollection descs.Collection + descCollection *descs.Collection // jobs accumulates jobs staged for execution inside the transaction. // Staging happens when executing statements that are implemented with a @@ -1412,7 +1359,7 @@ type connExecutor struct { // statements. transactionStatementsHash util.FNV64 - schemaChangerState SchemaChangerState + schemaChangerState *SchemaChangerState // shouldCollectTxnExecutionStats specifies whether the statements in // this transaction should collect execution stats. @@ -1699,19 +1646,22 @@ func (ns *prepStmtNamespace) resetTo( // transaction event, resetExtraTxnState invokes corresponding callbacks // (e.g. onTxnFinish() and onTxnRestart()). func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) { - ex.extraTxnState.jobs = nil ex.extraTxnState.firstStmtExecuted = false ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{} - ex.extraTxnState.schemaChangerState = SchemaChangerState{ - mode: ex.sessionData().NewSchemaChangerMode, - } - for k := range ex.extraTxnState.schemaChangeJobRecords { - delete(ex.extraTxnState.schemaChangeJobRecords, k) + if ex.extraTxnState.fromOuterTxn { + ex.extraTxnState.descCollection.ResetSyntheticDescriptors() + } else { + ex.extraTxnState.descCollection.ReleaseAll(ctx) + for k := range ex.extraTxnState.schemaChangeJobRecords { + delete(ex.extraTxnState.schemaChangeJobRecords, k) + } + ex.extraTxnState.jobs = nil + ex.extraTxnState.schemaChangerState = &SchemaChangerState{ + mode: ex.sessionData().NewSchemaChangerMode, + } } - ex.extraTxnState.descCollection.ReleaseAll(ctx) - // Close all portals. for name, p := range ex.extraTxnState.prepStmtsNamespace.portals { p.close(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name) @@ -2748,7 +2698,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo }, Tracing: &ex.sessionTracing, MemMetrics: &ex.memMetrics, - Descs: &ex.extraTxnState.descCollection, + Descs: ex.extraTxnState.descCollection, TxnModesSetter: ex, Jobs: &ex.extraTxnState.jobs, SchemaChangeJobRecords: ex.extraTxnState.schemaChangeJobRecords, @@ -2787,7 +2737,7 @@ func (ex *connExecutor) resetEvalCtx(evalCtx *extendedEvalContext, txn *kv.Txn, evalCtx.Mon = ex.state.mon evalCtx.PrepareOnly = false evalCtx.SkipNormalize = false - evalCtx.SchemaChangerState = &ex.extraTxnState.schemaChangerState + evalCtx.SchemaChangerState = ex.extraTxnState.schemaChangerState // If we are retrying due to an unsatisfiable timestamp bound which is // retriable, it means we were unable to serve the previous minimum timestamp @@ -3039,7 +2989,7 @@ func (ex *connExecutor) handleWaitingForConcurrentSchemaChanges( ctx context.Context, descID descpb.ID, ) error { if err := ex.planner.waitForDescriptorSchemaChanges( - ctx, descID, ex.extraTxnState.schemaChangerState, + ctx, descID, *ex.extraTxnState.schemaChangerState, ); err != nil { return err } @@ -3275,7 +3225,7 @@ func (ex *connExecutor) notifyStatsRefresherOfNewTables(ctx context.Context) { // runPreCommitStages is part of the new schema changer infrastructure to // mutate descriptors prior to committing a SQL transaction. func (ex *connExecutor) runPreCommitStages(ctx context.Context) error { - scs := &ex.extraTxnState.schemaChangerState + scs := ex.extraTxnState.schemaChangerState if len(scs.state.Targets) == 0 { return nil } @@ -3284,7 +3234,7 @@ func (ex *connExecutor) runPreCommitStages(ctx context.Context) error { ex.planner.User(), ex.server.cfg, ex.planner.txn, - &ex.extraTxnState.descCollection, + ex.extraTxnState.descCollection, ex.planner.EvalContext(), ex.planner.ExtendedEvalContext().Tracing.KVTracingEnabled(), scs.jobID, diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index c279c15deed2..d9f233e2bdb4 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -852,7 +852,7 @@ func (ex *connExecutor) checkDescriptorTwoVersionInvariant(ctx context.Context) if err := descs.CheckSpanCountLimit( ctx, - &ex.extraTxnState.descCollection, + ex.extraTxnState.descCollection, ex.server.cfg.SpanConfigSplitter, ex.server.cfg.SpanConfigLimiter, ex.state.mu.txn, @@ -864,7 +864,7 @@ func (ex *connExecutor) checkDescriptorTwoVersionInvariant(ctx context.Context) ctx, ex.server.cfg.Clock, ex.server.cfg.InternalExecutor, - &ex.extraTxnState.descCollection, + ex.extraTxnState.descCollection, ex.state.mu.txn, inRetryBackoff, ) @@ -978,7 +978,9 @@ func (ex *connExecutor) commitSQLTransactionInternal(ctx context.Context) error // to release the leases for them so that the schema change can proceed and // we don't block the client. if descs := ex.extraTxnState.descCollection.GetDescriptorsWithNewVersion(); descs != nil { - ex.extraTxnState.descCollection.ReleaseLeases(ctx) + if !ex.extraTxnState.fromOuterTxn { + ex.extraTxnState.descCollection.ReleaseLeases(ctx) + } } return nil } diff --git a/pkg/sql/descmetadata/metadata_updater.go b/pkg/sql/descmetadata/metadata_updater.go index 6a842390438d..ae744b64c79b 100644 --- a/pkg/sql/descmetadata/metadata_updater.go +++ b/pkg/sql/descmetadata/metadata_updater.go @@ -39,7 +39,7 @@ import ( type metadataUpdater struct { ctx context.Context txn *kv.Txn - ieFactory sqlutil.SessionBoundInternalExecutorFactory + ieFactory sqlutil.InternalExecutorFactory sessionData *sessiondata.SessionData descriptors *descs.Collection cacheEnabled bool @@ -50,7 +50,7 @@ type metadataUpdater struct { // schema objects. func NewMetadataUpdater( ctx context.Context, - ieFactory sqlutil.SessionBoundInternalExecutorFactory, + ieFactory sqlutil.InternalExecutorFactory, descriptors *descs.Collection, settings *settings.Values, txn *kv.Txn, @@ -76,7 +76,7 @@ func NewMetadataUpdater( func (mu metadataUpdater) UpsertDescriptorComment( id int64, subID int64, commentType keys.CommentType, comment string, ) error { - ie := mu.ieFactory(mu.ctx, mu.sessionData) + ie := mu.ieFactory.NewInternalExecutor(mu.sessionData) _, err := ie.ExecEx(context.Background(), fmt.Sprintf("upsert-%s-comment", commentType), mu.txn, @@ -94,7 +94,7 @@ func (mu metadataUpdater) UpsertDescriptorComment( func (mu metadataUpdater) DeleteDescriptorComment( id int64, subID int64, commentType keys.CommentType, ) error { - ie := mu.ieFactory(mu.ctx, mu.sessionData) + ie := mu.ieFactory.NewInternalExecutor(mu.sessionData) _, err := ie.ExecEx(context.Background(), fmt.Sprintf("delete-%s-comment", commentType), mu.txn, @@ -126,7 +126,7 @@ DELETE FROM system.comments _, _ = fmt.Fprintf(&buf, ", %d", id) } buf.WriteString(")") - ie := mu.ieFactory(mu.ctx, mu.sessionData) + ie := mu.ieFactory.NewInternalExecutor(mu.sessionData) _, err := ie.ExecEx(context.Background(), "delete-all-comments-for-tables", mu.txn, @@ -152,7 +152,7 @@ func (mu metadataUpdater) DeleteConstraintComment( // DeleteDatabaseRoleSettings implement scexec.DescriptorMetaDataUpdater. func (mu metadataUpdater) DeleteDatabaseRoleSettings(ctx context.Context, dbID descpb.ID) error { - ie := mu.ieFactory(mu.ctx, mu.sessionData) + ie := mu.ieFactory.NewInternalExecutor(mu.sessionData) rowsDeleted, err := ie.ExecEx(ctx, "delete-db-role-setting", mu.txn, @@ -193,7 +193,7 @@ func (mu metadataUpdater) DeleteDatabaseRoleSettings(ctx context.Context, dbID d func (mu metadataUpdater) SwapDescriptorSubComment( id int64, oldSubID int64, newSubID int64, commentType keys.CommentType, ) error { - ie := mu.ieFactory(mu.ctx, mu.sessionData) + ie := mu.ieFactory.NewInternalExecutor(mu.sessionData) _, err := ie.ExecEx(context.Background(), fmt.Sprintf("upsert-%s-comment", commentType), mu.txn, @@ -210,7 +210,7 @@ func (mu metadataUpdater) SwapDescriptorSubComment( // DeleteSchedule implement scexec.DescriptorMetadataUpdater. func (mu metadataUpdater) DeleteSchedule(ctx context.Context, scheduleID int64) error { - ie := mu.ieFactory(mu.ctx, mu.sessionData) + ie := mu.ieFactory.NewInternalExecutor(mu.sessionData) _, err := ie.ExecEx( ctx, "delete-schedule", @@ -226,7 +226,7 @@ func (mu metadataUpdater) DeleteSchedule(ctx context.Context, scheduleID int64) func (mu metadataUpdater) DeleteZoneConfig( ctx context.Context, id descpb.ID, ) (numAffected int, err error) { - ie := mu.ieFactory(mu.ctx, mu.sessionData) + ie := mu.ieFactory.NewInternalExecutor(mu.sessionData) return ie.Exec(ctx, "delete-zone", mu.txn, "DELETE FROM system.zones WHERE id = $1", id) } @@ -235,7 +235,7 @@ func (mu metadataUpdater) DeleteZoneConfig( func (mu metadataUpdater) UpsertZoneConfig( ctx context.Context, id descpb.ID, zone *zonepb.ZoneConfig, ) (numAffected int, err error) { - ie := mu.ieFactory(mu.ctx, mu.sessionData) + ie := mu.ieFactory.NewInternalExecutor(mu.sessionData) bytes, err := protoutil.Marshal(zone) if err != nil { return 0, pgerror.Wrap(err, pgcode.CheckViolation, "could not marshal zone config") diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 8f810c41d007..00b8906d9117 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -499,7 +499,7 @@ func (ds *ServerImpl) newFlowContext( // If we weren't passed a descs.Collection, then make a new one. We are // responsible for cleaning it up and releasing any accessed descriptors // on flow cleanup. - flowCtx.Descriptors = ds.CollectionFactory.NewCollection(ctx, descs.NewTemporarySchemaProvider(evalCtx.SessionDataStack)) + flowCtx.Descriptors = ds.CollectionFactory.NewCollection(ctx, descs.NewTemporarySchemaProvider(evalCtx.SessionDataStack), nil /* monitor */) flowCtx.IsDescriptorsCleanupRequired = true flowCtx.EvalCatalogBuiltins.Init(evalCtx.Codec, evalCtx.Txn, flowCtx.Descriptors) evalCtx.CatalogBuiltins = &flowCtx.EvalCatalogBuiltins diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 6198882ebbed..b07530a75807 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1306,7 +1306,7 @@ type ExecutorConfig struct { // InternalExecutorFactory is used to create an InternalExecutor binded with // SessionData and other ExtraTxnState. // This is currently only for builtin functions where we need to execute sql. - InternalExecutorFactory sqlutil.SessionBoundInternalExecutorFactory + InternalExecutorFactory sqlutil.InternalExecutorFactory // ConsistencyChecker is to generate the results in calls to // crdb_internal.check_consistency. diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index 1a48ccb20a02..ef9def87ab9b 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -149,10 +149,10 @@ type ServerConfig struct { // Dialer for communication between SQL nodes/pods. PodNodeDialer *nodedialer.Dialer - // SessionBoundInternalExecutorFactory is used to construct session-bound + // InternalExecutorFactory is used to construct session-bound // executors. The idea is that a higher-layer binds some of the arguments // required, so that users of ServerConfig don't have to care about them. - SessionBoundInternalExecutorFactory sqlutil.SessionBoundInternalExecutorFactory + InternalExecutorFactory sqlutil.InternalExecutorFactory ExternalStorage cloud.ExternalStorageFactory ExternalStorageFromURI cloud.ExternalStorageFromURIFactory diff --git a/pkg/sql/execstats/traceanalyzer_test.go b/pkg/sql/execstats/traceanalyzer_test.go index 7aeb41902e46..169d49a1b60c 100644 --- a/pkg/sql/execstats/traceanalyzer_test.go +++ b/pkg/sql/execstats/traceanalyzer_test.go @@ -108,7 +108,7 @@ func TestTraceAnalyzer(t *testing.T) { for _, vectorizeMode := range []sessiondatapb.VectorizeExecMode{sessiondatapb.VectorizeOff, sessiondatapb.VectorizeOn} { execCtx, finishAndCollect := tracing.ContextWithRecordingSpan(ctx, execCfg.AmbientCtx.Tracer, t.Name()) defer finishAndCollect() - ie := execCfg.InternalExecutorFactory(ctx, &sessiondata.SessionData{ + ie := execCfg.InternalExecutorFactory.NewInternalExecutor(&sessiondata.SessionData{ SessionData: sessiondatapb.SessionData{ VectorizeMode: vectorizeMode, }, diff --git a/pkg/sql/importer/import_job.go b/pkg/sql/importer/import_job.go index ef4ffa4db65d..ac7f96ca6c0e 100644 --- a/pkg/sql/importer/import_job.go +++ b/pkg/sql/importer/import_job.go @@ -1121,7 +1121,7 @@ func (r *importResumer) checkVirtualConstraints( } if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - ie := execCfg.InternalExecutorFactory(ctx, sql.NewFakeSessionData(execCfg.SV())) + ie := execCfg.InternalExecutorFactory.NewInternalExecutor(sql.NewFakeSessionData(execCfg.SV())) return ie.WithSyntheticDescriptors([]catalog.Descriptor{desc}, func() error { return sql.RevalidateUniqueConstraintsInTable(ctx, txn, user, ie, desc) }) diff --git a/pkg/sql/index_backfiller.go b/pkg/sql/index_backfiller.go index 730d8f487b50..fa76ad9ca084 100644 --- a/pkg/sql/index_backfiller.go +++ b/pkg/sql/index_backfiller.go @@ -166,7 +166,8 @@ func (ib *IndexBackfillPlanner) plan( if err := DescsTxn(ctx, ib.execCfg, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { - evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, nowTimestamp, descriptors) + sd := NewFakeSessionData(ib.execCfg.SV()) + evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, sd, nowTimestamp, descriptors) planCtx = ib.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn, DistributionTypeSystemTenantOnly) // TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index bf98415d2bb8..8c6b332daead 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -344,8 +344,7 @@ func (ih *instrumentationHelper) Finish( var bundle diagnosticsBundle if ih.collectBundle { - ie := p.extendedEvalCtx.ExecCfg.InternalExecutorFactory( - p.EvalContext().Context, + ie := p.extendedEvalCtx.ExecCfg.InternalExecutorFactory.NewInternalExecutor( p.SessionData(), ) phaseTimes := statsCollector.PhaseTimes() diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 520cec4ea509..9b40e3707565 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -17,11 +17,16 @@ import ( "sync" "time" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase" "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" @@ -29,9 +34,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/fsm" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -72,6 +79,14 @@ type InternalExecutor struct { // // Warning: Not safe for concurrent use from multiple goroutines. syntheticDescriptors []catalog.Descriptor + + // extraTxnState is to store extra transaction state info that + // will be passed to an internal executor. It should only be set when the + // internal executor is used under a not-nil txn. + // TODO (janexing): we will deprecate this field with *connExecutor ASAP. + // An internal executor, if used with a not nil txn, should be always coupled + // with a single connExecutor which runs all passed sql statements. + extraTxnState *extraTxnState } // WithSyntheticDescriptors sets the synthetic descriptors before running the @@ -106,6 +121,52 @@ func MakeInternalExecutor( } } +// newInternalExecutorWithTxn creates an Internal Executor with txn related +// information, and also a function that can be called to commit the txn. +// This function should only be used in the implementation of +// descs.CollectionFactory's InternalExecutorFactoryWithTxn. +// TODO (janexing): This function will be soon refactored after we change +// the internal executor infrastructure with a single conn executor for all +// sql statement executions within a txn. +func newInternalExecutorWithTxn( + s *Server, + sd *sessiondata.SessionData, + txn *kv.Txn, + memMetrics MemoryMetrics, + monitor *mon.BytesMonitor, + descCol *descs.Collection, + schemaChangeJobRecords map[descpb.ID]*jobs.Record, +) (*InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc) { + schemaChangerState := &SchemaChangerState{ + mode: sd.NewSchemaChangerMode, + } + ie := InternalExecutor{ + s: s, + mon: monitor, + memMetrics: memMetrics, + extraTxnState: &extraTxnState{ + txn: txn, + descCollection: descCol, + schemaChangeJobRecords: schemaChangeJobRecords, + schemaChangerState: schemaChangerState, + }, + } + ie.s.populateMinimalSessionData(sd) + ie.sessionDataStack = sessiondata.NewStack(sd) + + commitTxnFunc := func(ctx context.Context) error { + defer func() { + ie.releaseSchemaChangeJobRecords() + }() + if err := ie.commitTxn(ctx); err != nil { + return err + } + return nil + } + + return &ie, commitTxnFunc +} + // MakeInternalExecutorMemMonitor creates and starts memory monitor for an // InternalExecutor. func MakeInternalExecutorMemMonitor( @@ -128,8 +189,41 @@ func MakeInternalExecutorMemMonitor( // // SetSessionData cannot be called concurrently with query execution. func (ie *InternalExecutor) SetSessionData(sessionData *sessiondata.SessionData) { - ie.s.populateMinimalSessionData(sessionData) - ie.sessionDataStack = sessiondata.NewStack(sessionData) + if sessionData != nil { + ie.s.populateMinimalSessionData(sessionData) + ie.sessionDataStack = sessiondata.NewStack(sessionData) + } +} + +func (ie *InternalExecutor) runWithEx( + ctx context.Context, + txn *kv.Txn, + w ieResultWriter, + sd *sessiondata.SessionData, + stmtBuf *StmtBuf, + wg *sync.WaitGroup, + syncCallback func([]resWithPos), + errCallback func(error), +) error { + ex, err := ie.initConnEx(ctx, txn, w, sd, stmtBuf, syncCallback) + if err != nil { + return err + } + wg.Add(1) + go func() { + if err := ex.run(ctx, ie.mon, &mon.BoundAccount{} /*reserved*/, nil /* cancel */); err != nil { + sqltelemetry.RecordError(ctx, err, &ex.server.cfg.Settings.SV) + errCallback(err) + } + w.finish() + closeMode := normalClose + if txn != nil { + closeMode = externalTxnClose + } + ex.close(ctx, closeMode) + wg.Done() + }() + return nil } // initConnEx creates a connExecutor and runs it on a separate goroutine. It @@ -149,10 +243,8 @@ func (ie *InternalExecutor) initConnEx( w ieResultWriter, sd *sessiondata.SessionData, stmtBuf *StmtBuf, - wg *sync.WaitGroup, syncCallback func([]resWithPos), - errCallback func(error), -) { +) (*connExecutor, error) { clientComm := &internalClientComm{ w: w, // init lastDelivered below the position of the first result (0). @@ -180,6 +272,7 @@ func (ie *InternalExecutor) initConnEx( sds := sessiondata.NewStack(sd) sdMutIterator := ie.s.makeSessionDataMutatorIterator(sds, nil /* sessionDefaults */) var ex *connExecutor + var err error if txn == nil { ex = ie.s.newConnExecutor( ctx, @@ -189,38 +282,118 @@ func (ie *InternalExecutor) initConnEx( ie.memMetrics, &ie.s.InternalMetrics, applicationStats, + nil, /* postSetupFn */ ) } else { - ex = ie.s.newConnExecutorWithTxn( + ex, err = ie.newConnExecutorWithTxn( ctx, + txn, sdMutIterator, stmtBuf, clientComm, - ie.mon, - ie.memMetrics, - &ie.s.InternalMetrics, - txn, - ie.syntheticDescriptors, applicationStats, ) + if err != nil { + return nil, err + } } ex.executorType = executorTypeInternal + return ex, nil - wg.Add(1) - go func() { - if err := ex.run(ctx, ie.mon, &mon.BoundAccount{} /*reserved*/, nil /* cancel */); err != nil { - sqltelemetry.RecordError(ctx, err, &ex.server.cfg.Settings.SV) - errCallback(err) - } - w.finish() - closeMode := normalClose - if txn != nil { - closeMode = externalTxnClose +} + +// newConnExecutorWithTxn creates a connExecutor that will execute statements +// under a higher-level txn. This connExecutor runs with a different state +// machine, much reduced from the regular one. It cannot initiate or end +// transactions (so, no BEGIN, COMMIT, ROLLBACK, no auto-commit, no automatic +// retries). It may inherit the descriptor collection and txn state from the +// internal executor. +// +// If there is no error, this function also activate()s the returned +// executor, so the caller does not need to run the +// activation. However this means that run() or close() must be called +// to release resources. +// TODO (janexing): txn should be passed to ie.extraTxnState rather than +// as a parameter to this function. +func (ie *InternalExecutor) newConnExecutorWithTxn( + ctx context.Context, + txn *kv.Txn, + sdMutIterator *sessionDataMutatorIterator, + stmtBuf *StmtBuf, + clientComm ClientComm, + applicationStats sqlstats.ApplicationStats, +) (ex *connExecutor, err error) { + // If an internal executor is run with a not-nil txn, we may want to + // let it inherit the descriptor collection, schema change job records + // and job collections from the caller. + postSetupFn := func(ex *connExecutor) { + if ie.extraTxnState != nil { + if ie.extraTxnState.descCollection != nil { + ex.extraTxnState.descCollection = ie.extraTxnState.descCollection + ex.extraTxnState.fromOuterTxn = true + ex.extraTxnState.schemaChangeJobRecords = ie.extraTxnState.schemaChangeJobRecords + if ie.extraTxnState.jobs != nil { + ex.extraTxnState.jobs = *ie.extraTxnState.jobs + } + ex.extraTxnState.schemaChangerState = ie.extraTxnState.schemaChangerState + } } - ex.close(ctx, closeMode) - wg.Done() - }() + } + + ex = ie.s.newConnExecutor( + ctx, + sdMutIterator, + stmtBuf, + clientComm, + ie.memMetrics, + &ie.s.InternalMetrics, + applicationStats, + postSetupFn, + ) + + if txn.Type() == kv.LeafTxn { + // If the txn is a leaf txn it is not allowed to perform mutations. For + // sanity, set read only on the session. + ex.dataMutatorIterator.applyOnEachMutator(func(m sessionDataMutator) { + m.SetReadOnly(true) + }) + } + + // The new transaction stuff below requires active monitors and traces, so + // we need to activate the executor now. + ex.activate(ctx, ie.mon, &mon.BoundAccount{}) + + // Perform some surgery on the executor - replace its state machine and + // initialize the state, and its jobs and schema change job records if + // they are passed by the caller. + // The txn is always set as explicit, because when running in an outer txn, + // the conn executor inside an internal executor is generally not at liberty + // to commit the transaction. + // Thus, to disallow auto-commit and auto-retries, we make the txn + // here an explicit one. + ex.machine = fsm.MakeMachine( + BoundTxnStateTransitions, + stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, + &ex.state, + ) + + ex.state.resetForNewSQLTxn( + ctx, + explicitTxn, + txn.ReadTimestamp().GoTime(), + nil, /* historicalTimestamp */ + roachpb.UnspecifiedUserPriority, + tree.ReadWrite, + txn, + ex.transitionCtx, + ex.QualityOfService()) + + // Modify the Collection to match the parent executor's Collection. + // This allows the InternalExecutor to see schema changes made by the + // parent executor. + ex.extraTxnState.descCollection.SetSyntheticDescriptors(ie.syntheticDescriptors) + return ex, err } type ieIteratorResult struct { @@ -740,7 +913,10 @@ func (ie *InternalExecutor) execInternal( errCallback := func(err error) { _ = rw.addResult(ctx, ieIteratorResult{err: err}) } - ie.initConnEx(ctx, txn, rw, sd, stmtBuf, &wg, syncCallback, errCallback) + err = ie.runWithEx(ctx, txn, rw, sd, stmtBuf, &wg, syncCallback, errCallback) + if err != nil { + return nil, err + } typeHints := make(tree.PlaceholderTypes, len(datums)) for i, d := range datums { @@ -837,6 +1013,42 @@ func (ie *InternalExecutor) execInternal( return r, nil } +// ReleaseSchemaChangeJobRecords is to release the schema change job records. +func (ie *InternalExecutor) releaseSchemaChangeJobRecords() { + for k := range ie.extraTxnState.schemaChangeJobRecords { + delete(ie.extraTxnState.schemaChangeJobRecords, k) + } +} + +// commitTxn is to commit the txn bound to the internal executor. +// It should only be used in CollectionFactory.TxnWithExecutor(). +func (ie *InternalExecutor) commitTxn(ctx context.Context) error { + if ie.extraTxnState == nil || ie.extraTxnState.txn == nil { + return errors.New("no txn to commit") + } + + var sd *sessiondata.SessionData + if ie.sessionDataStack != nil { + sd = ie.sessionDataStack.Top().Clone() + } else { + sd = ie.s.newSessionData(SessionArgs{}) + } + + rw := newAsyncIEResultChannel() + stmtBuf := NewStmtBuf() + + ex, err := ie.initConnEx(ctx, ie.extraTxnState.txn, rw, sd, stmtBuf, nil /* syncCallback */) + if err != nil { + return errors.Wrap(err, "cannot create conn executor to commit txn") + } + defer ex.close(ctx, externalTxnClose) + + if err := ex.commitSQLTransactionInternal(ctx); err != nil { + return err + } + return nil +} + // internalClientComm is an implementation of ClientComm used by the // InternalExecutor. Result rows are buffered in memory. type internalClientComm struct { @@ -984,3 +1196,93 @@ func (ncl *noopClientLock) RTrim(_ context.Context, pos CmdPos) { } ncl.results = ncl.results[:i] } + +// extraTxnState is to store extra transaction state info that +// will be passed to an internal executor when it's used under a txn context. +// It should not be exported from the sql package. +// TODO (janexing): we will deprecate this struct ASAP. It only exists as a +// stop-gap before we implement InternalExecutor.ConnExecutor to run all +// sql statements under a transaction. This struct is not ideal for an internal +// executor in that it may lead to surprising bugs whereby we forget to add +// fields here and keep them in sync. +type extraTxnState struct { + txn *kv.Txn + descCollection *descs.Collection + jobs *jobsCollection + schemaChangeJobRecords map[descpb.ID]*jobs.Record + schemaChangerState *SchemaChangerState +} + +// InternalExecutorFactory stored information needed to construct a new +// internal executor. +type InternalExecutorFactory struct { + server *Server + memMetrics MemoryMetrics + monitor *mon.BytesMonitor +} + +// NewInternalExecutorFactory returns a new internal executor factory. +func NewInternalExecutorFactory( + s *Server, memMetrics MemoryMetrics, monitor *mon.BytesMonitor, +) *InternalExecutorFactory { + return &InternalExecutorFactory{ + server: s, + memMetrics: memMetrics, + monitor: monitor, + } +} + +var _ sqlutil.InternalExecutorFactory = &InternalExecutorFactory{} +var _ descs.InternalExecutorFactoryWithTxn = &InternalExecutorFactory{} + +// NewInternalExecutor constructs a new internal executor. +// TODO (janexing): this should be deprecated soon. +func (ief *InternalExecutorFactory) NewInternalExecutor( + sd *sessiondata.SessionData, +) sqlutil.InternalExecutor { + ie := MakeInternalExecutor(ief.server, ief.memMetrics, ief.monitor) + ie.SetSessionData(sd) + return &ie +} + +// NewInternalExecutorWithTxn creates an internal executor with txn-related info, +// such as descriptor collection and schema change job records, etc. It should +// be called only after InternalExecutorFactory.NewInternalExecutor is already +// called to construct the InternalExecutorFactory with required server info. +// This function should only be used under CollectionFactory.TxnWithExecutor(). +func (ief *InternalExecutorFactory) NewInternalExecutorWithTxn( + sd *sessiondata.SessionData, sv *settings.Values, txn *kv.Txn, descCol *descs.Collection, +) (sqlutil.InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc) { + schemaChangeJobRecords := make(map[descpb.ID]*jobs.Record) + // By default, if not given session data, we initialize a sessionData that + // would be the same as what would be created if root logged in. + // The sessionData's user can be override when calling the query + // functions of internal executor. + // TODO(janexing): since we can be running queries with a higher privilege + // than the actual user, a security boundary should be added to the error + // handling of internal executor. + if sd == nil { + sd = NewFakeSessionData(sv) + sd.UserProto = username.RootUserName().EncodeProto() + } + ie, commitTxnFunc := newInternalExecutorWithTxn( + ief.server, + sd, + txn, + ief.memMetrics, + ief.monitor, + descCol, + schemaChangeJobRecords, + ) + + return ie, commitTxnFunc +} + +// RunWithoutTxn is to create an internal executor without binding to a txn, +// and run the passed function with this internal executor. +func (ief *InternalExecutorFactory) RunWithoutTxn( + ctx context.Context, run func(ctx context.Context, ie sqlutil.InternalExecutor) error, +) error { + ie := ief.NewInternalExecutor(nil /* sessionData */) + return run(ctx, ie) +} diff --git a/pkg/sql/mvcc_backfiller.go b/pkg/sql/mvcc_backfiller.go index ed4baab5d3dd..547ee800b890 100644 --- a/pkg/sql/mvcc_backfiller.go +++ b/pkg/sql/mvcc_backfiller.go @@ -126,7 +126,8 @@ func (im *IndexBackfillerMergePlanner) plan( if err := DescsTxn(ctx, im.execCfg, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { - evalCtx = createSchemaChangeEvalCtx(ctx, im.execCfg, txn.ReadTimestamp(), descriptors) + sd := NewFakeSessionData(im.execCfg.SV()) + evalCtx = createSchemaChangeEvalCtx(ctx, im.execCfg, sd, txn.ReadTimestamp(), descriptors) planCtx = im.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn, DistributionTypeSystemTenantOnly) diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index ee2ddbc867d1..3ff592cd0c39 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -1165,8 +1165,7 @@ func (e *urlOutputter) finish() (url.URL, error) { func (ef *execFactory) showEnv(plan string, envOpts exec.ExplainEnvData) (exec.Node, error) { var out urlOutputter - ie := ef.planner.extendedEvalCtx.ExecCfg.InternalExecutorFactory( - ef.planner.EvalContext().Context, + ie := ef.planner.extendedEvalCtx.ExecCfg.InternalExecutorFactory.NewInternalExecutor( ef.planner.SessionData(), ) c := makeStmtEnvCollector(ef.planner.EvalContext().Context, ie.(*InternalExecutor)) diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 2e3a92abebee..78c7c3a5f9dc 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" @@ -39,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/upgrade" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" @@ -332,7 +334,7 @@ func newInternalPlanner( sds := sessiondata.NewStack(sd) if params.collection == nil { - params.collection = execCfg.CollectionFactory.NewCollection(ctx, descs.NewTemporarySchemaProvider(sds)) + params.collection = execCfg.CollectionFactory.NewCollection(ctx, descs.NewTemporarySchemaProvider(sds), nil /* monitor */) } var ts time.Time @@ -889,6 +891,26 @@ func validateDescriptor(ctx context.Context, p *planner, descriptor catalog.Desc ) } +// IsActive implements the Planner interface. +func (p *planner) IsActive(ctx context.Context, key clusterversion.Key) bool { + return p.execCfg.Settings.Version.IsActive(ctx, key) +} + +// initInternalExecutor is to initialize an internal executor with a planner. +// Note that this function should only be used when using internal executor +// to run sql statement under the planner context. +func initInternalExecutor(ctx context.Context, p *planner) sqlutil.InternalExecutor { + ie := p.ExecCfg().InternalExecutorFactory.NewInternalExecutor(p.SessionData()) + ie.(*InternalExecutor).extraTxnState = &extraTxnState{ + txn: p.Txn(), + descCollection: p.Descriptors(), + jobs: p.extendedEvalCtx.Jobs, + schemaChangeJobRecords: p.extendedEvalCtx.SchemaChangeJobRecords, + schemaChangerState: p.extendedEvalCtx.SchemaChangerState, + } + return ie +} + // QueryRowEx executes the supplied SQL statement and returns a single row, or // nil if no row is found, or an error if more that one row is returned. // @@ -901,10 +923,23 @@ func (p *planner) QueryRowEx( stmt string, qargs ...interface{}, ) (tree.Datums, error) { - ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) + ie := initInternalExecutor(ctx, p) return ie.QueryRowEx(ctx, opName, p.Txn(), override, stmt, qargs...) } +// ExecEx is like Exec, but allows the caller to override some session data +// fields (e.g. the user). +func (p *planner) ExecEx( + ctx context.Context, + opName string, + override sessiondata.InternalExecutorOverride, + stmt string, + qargs ...interface{}, +) (int, error) { + ie := initInternalExecutor(ctx, p) + return ie.ExecEx(ctx, opName, p.Txn(), override, stmt, qargs...) +} + // QueryIteratorEx executes the query, returning an iterator that can be used // to get the results. If the call is successful, the returned iterator // *must* be closed. @@ -918,12 +953,59 @@ func (p *planner) QueryIteratorEx( stmt string, qargs ...interface{}, ) (eval.InternalRows, error) { - ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) + ie := initInternalExecutor(ctx, p) rows, err := ie.QueryIteratorEx(ctx, opName, p.Txn(), override, stmt, qargs...) return rows.(eval.InternalRows), err } -// IsActive implements the Planner interface. -func (p *planner) IsActive(ctx context.Context, key clusterversion.Key) bool { - return p.execCfg.Settings.Version.IsActive(ctx, key) +// QueryBufferedEx executes the supplied SQL statement and returns the resulting +// rows (meaning all of them are buffered at once). +// The fields set in session that are set override the respective fields if they +// have previously been set through SetSessionData(). +func (p *planner) QueryBufferedEx( + ctx context.Context, + opName string, + session sessiondata.InternalExecutorOverride, + stmt string, + qargs ...interface{}, +) ([]tree.Datums, error) { + ie := initInternalExecutor(ctx, p) + return ie.QueryBufferedEx(ctx, opName, p.Txn(), session, stmt, qargs...) +} + +// QueryRowExWithCols is like QueryRowEx, additionally returning the computed +// ResultColumns of the input query. +func (p *planner) QueryRowExWithCols( + ctx context.Context, + opName string, + session sessiondata.InternalExecutorOverride, + stmt string, + qargs ...interface{}, +) (tree.Datums, colinfo.ResultColumns, error) { + ie := initInternalExecutor(ctx, p) + return ie.QueryRowExWithCols(ctx, opName, p.Txn(), session, stmt, qargs...) +} + +// QueryBufferedExWithCols is like QueryBufferedEx, additionally returning the +// computed ResultColumns of the input query. +func (p *planner) QueryBufferedExWithCols( + ctx context.Context, + opName string, + session sessiondata.InternalExecutorOverride, + stmt string, + qargs ...interface{}, +) ([]tree.Datums, colinfo.ResultColumns, error) { + ie := initInternalExecutor(ctx, p) + return ie.QueryBufferedExWithCols(ctx, opName, p.Txn(), session, stmt, qargs...) +} + +// WithInternalExecutor let user run multiple sql statements within the same +// internal executor initialized under a planner context. To run single sql +// statements, please use the query functions above. +func (p *planner) WithInternalExecutor( + ctx context.Context, + run func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error, +) error { + ie := initInternalExecutor(ctx, p) + return run(ctx, p.Txn(), ie) } diff --git a/pkg/sql/resolve_oid.go b/pkg/sql/resolve_oid.go index 7aa67eb8f588..da9b820aad49 100644 --- a/pkg/sql/resolve_oid.go +++ b/pkg/sql/resolve_oid.go @@ -29,7 +29,7 @@ import ( func (p *planner) ResolveOIDFromString( ctx context.Context, resultType *types.T, toResolve *tree.DString, ) (_ *tree.DOid, errSafeToIgnore bool, _ error) { - ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) + ie := p.ExecCfg().InternalExecutorFactory.NewInternalExecutor(p.SessionData()) return resolveOID( ctx, p.Txn(), ie, @@ -41,7 +41,7 @@ func (p *planner) ResolveOIDFromString( func (p *planner) ResolveOIDFromOID( ctx context.Context, resultType *types.T, toResolve *tree.DOid, ) (_ *tree.DOid, errSafeToIgnore bool, _ error) { - ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) + ie := p.ExecCfg().InternalExecutorFactory.NewInternalExecutor(p.SessionData()) return resolveOID( ctx, p.Txn(), ie, diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index 00726598aa90..a435bdcfe94d 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -267,7 +267,7 @@ func (c *DatumRowConverter) getSequenceAnnotation( // TODO(postamar): give the eval.Context a useful interface // instead of cobbling a descs.Collection in this way. cf := descs.NewBareBonesCollectionFactory(evalCtx.Settings, evalCtx.Codec) - descsCol := cf.MakeCollection(evalCtx.Context, descs.NewTemporarySchemaProvider(evalCtx.SessionDataStack), nil /* monitor */) + descsCol := cf.NewCollection(evalCtx.Context, descs.NewTemporarySchemaProvider(evalCtx.SessionDataStack), nil /* monitor */) err := c.db.Txn(evalCtx.Context, func(ctx context.Context, txn *kv.Txn) error { seqNameToMetadata = make(map[string]*SequenceMetadata) seqIDToMetadata = make(map[descpb.ID]*SequenceMetadata) diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 8bf624bd74e5..2c9655301a08 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -115,7 +115,7 @@ type SchemaChanger struct { clock *hlc.Clock settings *cluster.Settings execCfg *ExecutorConfig - ieFactory sqlutil.SessionBoundInternalExecutorFactory + ieFactory sqlutil.InternalExecutorFactory // mvccCompliantAddIndex is set to true early in exec if we // find that the schema change was created under the @@ -145,11 +145,7 @@ func NewSchemaChangerForTesting( execCfg: execCfg, // Note that this doesn't end up actually being session-bound but that's // good enough for testing. - ieFactory: func( - ctx context.Context, sd *sessiondata.SessionData, - ) sqlutil.InternalExecutor { - return execCfg.InternalExecutor - }, + ieFactory: execCfg.InternalExecutorFactory, metrics: NewSchemaChangerMetrics(), clock: db.Clock(), distSQLPlanner: execCfg.DistSQLPlanner, @@ -2451,10 +2447,23 @@ func (sc *SchemaChanger) txn( return err } } - return sc.execCfg.CollectionFactory.Txn(ctx, sc.execCfg.InternalExecutor, sc.db, f) } +// txnWithExecutor is to run internal executor within a txn. +func (sc *SchemaChanger) txnWithExecutor( + ctx context.Context, + sd *sessiondata.SessionData, + f func(context.Context, *kv.Txn, *descs.Collection, sqlutil.InternalExecutor) error, +) error { + if fn := sc.testingKnobs.RunBeforeDescTxn; fn != nil { + if err := fn(sc.job.ID()); err != nil { + return err + } + } + return sc.execCfg.CollectionFactory.TxnWithExecutor(ctx, sc.db, sd, f) +} + // createSchemaChangeEvalCtx creates an extendedEvalContext() to be used for backfills. // // TODO(andrei): This EvalContext() will be broken for backfills trying to use @@ -2463,11 +2472,12 @@ func (sc *SchemaChanger) txn( // used in the surrounding SQL session, so session tracing is unable // to capture schema change activity. func createSchemaChangeEvalCtx( - ctx context.Context, execCfg *ExecutorConfig, ts hlc.Timestamp, descriptors *descs.Collection, + ctx context.Context, + execCfg *ExecutorConfig, + sd *sessiondata.SessionData, + ts hlc.Timestamp, + descriptors *descs.Collection, ) extendedEvalContext { - - sd := NewFakeSessionData(execCfg.SV()) - evalCtx := extendedEvalContext{ // Make a session tracing object on-the-fly. This is OK // because it sets "enabled: false" and thus none of the @@ -2573,10 +2583,8 @@ func (r schemaChangeResumer) Resume(ctx context.Context, execCtx interface{}) er clock: p.ExecCfg().Clock, settings: p.ExecCfg().Settings, execCfg: p.ExecCfg(), - ieFactory: func(ctx context.Context, sd *sessiondata.SessionData) sqlutil.InternalExecutor { - return r.job.MakeSessionBoundInternalExecutor(ctx, sd) - }, - metrics: p.ExecCfg().SchemaChangerMetrics, + ieFactory: r.job.GetInternalExecutorFactory(), + metrics: p.ExecCfg().SchemaChangerMetrics, } opts := retry.Options{ InitialBackoff: 20 * time.Millisecond, @@ -2763,9 +2771,7 @@ func (r schemaChangeResumer) OnFailOrCancel( clock: p.ExecCfg().Clock, settings: p.ExecCfg().Settings, execCfg: p.ExecCfg(), - ieFactory: func(ctx context.Context, sd *sessiondata.SessionData) sqlutil.InternalExecutor { - return r.job.MakeSessionBoundInternalExecutor(ctx, sd) - }, + ieFactory: r.job.GetInternalExecutorFactory(), } if r.job.Payload().FinalResumeError == nil { diff --git a/pkg/sql/schemachanger/scdeps/index_validator.go b/pkg/sql/schemachanger/scdeps/index_validator.go index f211b1f4eef1..aac2e01eff1f 100644 --- a/pkg/sql/schemachanger/scdeps/index_validator.go +++ b/pkg/sql/schemachanger/scdeps/index_validator.go @@ -54,7 +54,7 @@ type indexValidator struct { db *kv.DB codec keys.SQLCodec settings *cluster.Settings - ieFactory sqlutil.SessionBoundInternalExecutorFactory + ieFactory sqlutil.InternalExecutorFactory validateForwardIndexes ValidateForwardIndexesFn validateInvertedIndexes ValidateInvertedIndexesFn newFakeSessionData NewFakeSessionDataFn @@ -103,7 +103,7 @@ func (iv indexValidator) makeHistoricalInternalExecTxnRunner() sqlutil.Historica if err != nil { return err } - return fn(ctx, validationTxn, iv.ieFactory(ctx, iv.newFakeSessionData(&iv.settings.SV))) + return fn(ctx, validationTxn, iv.ieFactory.NewInternalExecutor(iv.newFakeSessionData(&iv.settings.SV))) } } @@ -113,7 +113,7 @@ func NewIndexValidator( db *kv.DB, codec keys.SQLCodec, settings *cluster.Settings, - ieFactory sqlutil.SessionBoundInternalExecutorFactory, + ieFactory sqlutil.InternalExecutorFactory, validateForwardIndexes ValidateForwardIndexesFn, validateInvertedIndexes ValidateInvertedIndexesFn, newFakeSessionData NewFakeSessionDataFn, diff --git a/pkg/sql/sql_cursor.go b/pkg/sql/sql_cursor.go index 7a6013294883..26b9609e1646 100644 --- a/pkg/sql/sql_cursor.go +++ b/pkg/sql/sql_cursor.go @@ -48,7 +48,7 @@ func (p *planner) DeclareCursor(ctx context.Context, s *tree.DeclareCursor) (pla return nil, pgerror.Newf(pgcode.NoActiveSQLTransaction, "DECLARE CURSOR can only be used in transaction blocks") } - ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) + ie := p.ExecCfg().InternalExecutorFactory.NewInternalExecutor(p.SessionData()) cursorName := s.Name.String() if cursor := p.sqlCursors.getCursor(cursorName); cursor != nil { return nil, pgerror.Newf(pgcode.DuplicateCursor, "cursor %q already exists", cursorName) diff --git a/pkg/sql/sqlutil/internal_executor.go b/pkg/sql/sqlutil/internal_executor.go index dcf41cd57d35..c94f183b2b6c 100644 --- a/pkg/sql/sqlutil/internal_executor.go +++ b/pkg/sql/sqlutil/internal_executor.go @@ -144,6 +144,17 @@ type InternalExecutor interface { qargs ...interface{}, ) (InternalRows, error) + // QueryBufferedExWithCols is like QueryBufferedEx, additionally returning the computed + // ResultColumns of the input query. + QueryBufferedExWithCols( + ctx context.Context, + opName string, + txn *kv.Txn, + session sessiondata.InternalExecutorOverride, + stmt string, + qargs ...interface{}, + ) ([]tree.Datums, colinfo.ResultColumns, error) + // WithSyntheticDescriptors sets the synthetic descriptors before running the // the provided closure and resets them afterward. Used for queries/statements // that need to use in-memory synthetic descriptors different from descriptors @@ -196,11 +207,17 @@ type InternalRows interface { Types() colinfo.ResultColumns } -// SessionBoundInternalExecutorFactory is a function that produces a "session -// bound" internal executor. -type SessionBoundInternalExecutorFactory func( - context.Context, *sessiondata.SessionData, -) InternalExecutor +// InternalExecutorFactory is an interface that allow the creation of an +// internal executor, and run sql statement without a txn with the internal +// executor. +type InternalExecutorFactory interface { + // NewInternalExecutor constructs a new internal executor. + // TODO (janexing): this should be deprecated soon. + NewInternalExecutor(sd *sessiondata.SessionData) InternalExecutor + // RunWithoutTxn is to create an internal executor without binding to a txn, + // and run the passed function with this internal executor. + RunWithoutTxn(ctx context.Context, run func(ctx context.Context, ie InternalExecutor) error) error +} // InternalExecFn is the type of functions that operates using an internalExecutor. type InternalExecFn func(ctx context.Context, txn *kv.Txn, ie InternalExecutor) error @@ -209,3 +226,7 @@ type InternalExecFn func(ctx context.Context, txn *kv.Txn, ie InternalExecutor) // passes the fn the exported InternalExecutor instead of the whole unexported // extendedEvalContenxt, so it can be implemented outside pkg/sql. type HistoricalInternalExecTxnRunner func(ctx context.Context, fn InternalExecFn) error + +// InternalExecutorCommitTxnFunc is to commit the txn associated with an +// internal executor. +type InternalExecutorCommitTxnFunc func(ctx context.Context) error diff --git a/pkg/sql/temporary_schema.go b/pkg/sql/temporary_schema.go index 43ec8e920e58..9d8bb5631202 100644 --- a/pkg/sql/temporary_schema.go +++ b/pkg/sql/temporary_schema.go @@ -404,7 +404,7 @@ type TemporaryObjectCleaner struct { settings *cluster.Settings db *kv.DB codec keys.SQLCodec - makeSessionBoundInternalExecutor sqlutil.SessionBoundInternalExecutorFactory + makeSessionBoundInternalExecutor sqlutil.InternalExecutorFactory // statusServer gives access to the SQLStatus service. statusServer serverpb.SQLStatusServer isMeta1LeaseholderFunc isMeta1LeaseholderFunc @@ -433,7 +433,7 @@ func NewTemporaryObjectCleaner( db *kv.DB, codec keys.SQLCodec, registry *metric.Registry, - makeSessionBoundInternalExecutor sqlutil.SessionBoundInternalExecutorFactory, + makeSessionBoundInternalExecutor sqlutil.InternalExecutorFactory, statusServer serverpb.SQLStatusServer, isMeta1LeaseholderFunc isMeta1LeaseholderFunc, testingKnobs ExecutorTestingKnobs, @@ -520,7 +520,7 @@ func (c *TemporaryObjectCleaner) doTemporaryObjectCleanup( waitTimeForCreation := TempObjectWaitInterval.Get(&c.settings.SV) // Build a set of all databases with temporary objects. var allDbDescs []catalog.DatabaseDescriptor - descsCol := c.collectionFactory.NewCollection(ctx, nil /* TemporarySchemaProvider */) + descsCol := c.collectionFactory.NewCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */) if err := retryFunc(ctx, func() error { var err error allDbDescs, err = descsCol.GetAllDatabaseDescriptors(ctx, txn) @@ -587,7 +587,7 @@ func (c *TemporaryObjectCleaner) doTemporaryObjectCleanup( } // Clean up temporary data for inactive sessions. - ie := c.makeSessionBoundInternalExecutor(ctx, &sessiondata.SessionData{}) + ie := c.makeSessionBoundInternalExecutor.NewInternalExecutor(&sessiondata.SessionData{}) for sessionID := range sessionIDs { if _, ok := activeSessions[sessionID.Uint128]; !ok { log.Eventf(ctx, "cleaning up temporary object for session %q", sessionID) diff --git a/pkg/sql/unsplit.go b/pkg/sql/unsplit.go index c7a579b303a3..dfa7f5f3443f 100644 --- a/pkg/sql/unsplit.go +++ b/pkg/sql/unsplit.go @@ -105,7 +105,7 @@ func (n *unsplitAllNode) startExec(params runParams) error { if n.index.GetID() != n.tableDesc.GetPrimaryIndexID() { indexName = n.index.GetName() } - ie := params.p.ExecCfg().InternalExecutorFactory(params.ctx, params.SessionData()) + ie := params.p.ExecCfg().InternalExecutorFactory.NewInternalExecutor(params.SessionData()) it, err := ie.QueryIteratorEx( params.ctx, "split points query", params.p.txn, sessiondata.NoSessionDataOverride, statement,