diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 0cfcc2474847..17df98608f20 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -686,7 +687,8 @@ func (b *backupResumer) maybeNotifyScheduledJobCompletion( err := exec.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { // We cannot rely on b.job containing created_by_id because on job // resumption the registry does not populate the resumer's CreatedByInfo. - datums, err := exec.InternalExecutor.QueryRowEx( + ie := exec.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + datums, err := ie.QueryRowEx( ctx, "lookup-schedule-info", txn, @@ -706,7 +708,7 @@ func (b *backupResumer) maybeNotifyScheduledJobCompletion( scheduleID := int64(tree.MustBeDInt(datums[0])) if err := jobs.NotifyJobTermination( - ctx, env, b.job.ID(), jobStatus, b.job.Details(), scheduleID, exec.InternalExecutor, txn); err != nil { + ctx, env, b.job.ID(), jobStatus, b.job.Details(), scheduleID, ie, txn); err != nil { return errors.Wrapf(err, "failed to notify schedule %d of completion of job %d", scheduleID, b.job.ID()) } diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 738afd911bbc..07b8262d4ab4 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -44,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/interval" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -820,13 +821,14 @@ func backupPlanHook( var spans []roachpb.Span var tenants []descpb.TenantInfoWithUsage + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) if backupStmt.Targets != nil && backupStmt.Targets.Tenant != (roachpb.TenantID{}) { if !p.ExecCfg().Codec.ForSystemTenant() { return pgerror.Newf(pgcode.InsufficientPrivilege, "only the system tenant can backup other tenants") } tenantInfo, err := retrieveSingleTenantMetadata( - ctx, p.ExecCfg().InternalExecutor, p.ExtendedEvalContext().Txn, backupStmt.Targets.Tenant, + ctx, ie, p.ExtendedEvalContext().Txn, backupStmt.Targets.Tenant, ) if err != nil { return err @@ -842,7 +844,7 @@ func backupPlanHook( if p.ExecCfg().Codec.ForSystemTenant() && backupStmt.Coverage() == tree.AllDescriptors { // Include all tenants. tenants, err = retrieveAllTenantsMetadata( - ctx, p.ExecCfg().InternalExecutor, p.ExtendedEvalContext().Txn, + ctx, ie, p.ExtendedEvalContext().Txn, ) if err != nil { return err @@ -1185,7 +1187,7 @@ func getScheduledBackupExecutionArgsFromSchedule( ctx context.Context, env scheduledjobs.JobSchedulerEnv, txn *kv.Txn, - ie *sql.InternalExecutor, + ie sqlutil.InternalExecutor, scheduleID int64, ) (*jobs.ScheduledJob, *ScheduledBackupExecutionArgs, error) { // Load the schedule that has spawned this job. @@ -1224,8 +1226,9 @@ func planSchedulePTSChaining( return nil } + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) _, args, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env, - p.ExtendedEvalContext().Txn, p.ExecCfg().InternalExecutor, backupStmt.CreatedByInfo.ID) + p.ExtendedEvalContext().Txn, ie.(*sql.InternalExecutor), backupStmt.CreatedByInfo.ID) if err != nil { return err } @@ -1244,7 +1247,7 @@ func planSchedulePTSChaining( } _, incArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env, - p.ExtendedEvalContext().Txn, p.ExecCfg().InternalExecutor, args.DependentScheduleID) + p.ExtendedEvalContext().Txn, ie.(*sql.InternalExecutor), args.DependentScheduleID) if err != nil { // If we are unable to resolve the dependent incremental schedule (it // could have been dropped) we do not need to perform any chaining. diff --git a/pkg/ccl/backupccl/backup_planning_tenant.go b/pkg/ccl/backupccl/backup_planning_tenant.go index 1d88c577d0bb..a079b165646d 100644 --- a/pkg/ccl/backupccl/backup_planning_tenant.go +++ b/pkg/ccl/backupccl/backup_planning_tenant.go @@ -13,9 +13,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/errors" ) @@ -76,7 +76,7 @@ func tenantMetadataFromRow(row tree.Datums) (descpb.TenantInfoWithUsage, error) } func retrieveSingleTenantMetadata( - ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, tenantID roachpb.TenantID, + ctx context.Context, ie sqlutil.InternalExecutor, txn *kv.Txn, tenantID roachpb.TenantID, ) (descpb.TenantInfoWithUsage, error) { row, err := ie.QueryRow( ctx, "backup-lookup-tenant", txn, @@ -96,7 +96,7 @@ func retrieveSingleTenantMetadata( } func retrieveAllTenantsMetadata( - ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, + ctx context.Context, ie sqlutil.InternalExecutor, txn *kv.Txn, ) ([]descpb.TenantInfoWithUsage, error) { rows, err := ie.QueryBuffered( ctx, "backup-lookup-tenants", txn, diff --git a/pkg/ccl/backupccl/create_scheduled_backup.go b/pkg/ccl/backupccl/create_scheduled_backup.go index 0cf8d8f0200c..b4cccf70554a 100644 --- a/pkg/ccl/backupccl/create_scheduled_backup.go +++ b/pkg/ccl/backupccl/create_scheduled_backup.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/protoreflect" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/errors" "github.com/gogo/protobuf/jsonpb" @@ -430,7 +431,7 @@ func doCreateBackupSchedules( return err } - ex := p.ExecCfg().InternalExecutor + ex := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) unpauseOnSuccessID := jobs.InvalidScheduleID @@ -509,7 +510,7 @@ func doCreateBackupSchedules( func setDependentSchedule( ctx context.Context, - ex *sql.InternalExecutor, + ex sqlutil.InternalExecutor, scheduleExecutionArgs *ScheduledBackupExecutionArgs, schedule *jobs.ScheduledJob, dependentID int64, @@ -659,8 +660,8 @@ func emitSchedule( func checkScheduleAlreadyExists( ctx context.Context, p sql.PlanHookState, scheduleLabel string, ) (bool, error) { - - row, err := p.ExecCfg().InternalExecutor.QueryRowEx(ctx, "check-sched", + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + row, err := ie.QueryRowEx(ctx, "check-sched", p.ExtendedEvalContext().Txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, fmt.Sprintf("SELECT count(schedule_name) FROM %s WHERE schedule_name = '%s'", scheduledjobs.ProdJobSchedulerEnv.ScheduledJobsTableName(), scheduleLabel)) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 8cfee1c91436..82ebab505666 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1803,7 +1803,9 @@ func revalidateIndexes( // since our table is offline. var runner sqlutil.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sqlutil.InternalExecFn) error { return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - ie := job.MakeSessionBoundInternalExecutor(ctx, sql.NewFakeSessionData(execCfg.SV())).(*sql.InternalExecutor) + ie := job.MakeSessionBoundInternalExecutor(ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(sql.NewFakeSessionData(execCfg.SV())) + }) return fn(ctx, txn, ie) }) } @@ -1895,7 +1897,8 @@ func insertStats( } err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - if err := stats.InsertNewStats(ctx, execCfg.InternalExecutor, txn, latestStats); err != nil { + if err := stats.InsertNewStats(ctx, execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}), + txn, latestStats); err != nil { return errors.Wrapf(err, "inserting stats from backup") } details.StatsInserted = true @@ -2119,7 +2122,7 @@ func (r *restoreResumer) OnFailOrCancel(ctx context.Context, execCtx interface{} if details.DescriptorCoverage == tree.AllDescriptors { // We've dropped defaultdb and postgres in the planning phase, we must // recreate them now if the full cluster restore failed. - ie := p.ExecCfg().InternalExecutor + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) _, err := ie.Exec(ctx, "recreate-defaultdb", txn, "CREATE DATABASE IF NOT EXISTS defaultdb") if err != nil { return err @@ -2665,7 +2668,7 @@ func (r *restoreResumer) restoreSystemTables( } func (r *restoreResumer) cleanupTempSystemTables(ctx context.Context, txn *kv.Txn) error { - executor := r.execCfg.InternalExecutor + executor := r.execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) // Check if the temp system database has already been dropped. This can happen // if the restore job fails after the system database has cleaned up. checkIfDatabaseExists := "SELECT database_name FROM [SHOW DATABASES] WHERE database_name=$1" diff --git a/pkg/ccl/backupccl/restore_planning.go b/pkg/ccl/backupccl/restore_planning.go index 6e888bd3b779..24f575ef2fbd 100644 --- a/pkg/ccl/backupccl/restore_planning.go +++ b/pkg/ccl/backupccl/restore_planning.go @@ -53,6 +53,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/roleoption" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -891,7 +892,7 @@ func allocateDescriptorRewrites( // restore. func dropDefaultUserDBs(ctx context.Context, execCfg *sql.ExecutorConfig) error { return sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error { - ie := execCfg.InternalExecutor + ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) _, err := ie.Exec(ctx, "drop-defaultdb", nil, "DROP DATABASE IF EXISTS defaultdb") if err != nil { return err @@ -1894,8 +1895,9 @@ func doRestorePlan( if !p.ExecCfg().Codec.ForSystemTenant() { return pgerror.Newf(pgcode.InsufficientPrivilege, "only the system tenant can restore other tenants") } + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) for _, i := range tenants { - res, err := p.ExecCfg().InternalExecutor.QueryRow( + res, err := ie.QueryRow( ctx, "restore-lookup-tenant", p.ExtendedEvalContext().Txn, `SELECT active FROM system.tenants WHERE id = $1`, i.ID, ) diff --git a/pkg/ccl/backupccl/schedule_pts_chaining.go b/pkg/ccl/backupccl/schedule_pts_chaining.go index 9008c99d8095..245c2f32c742 100644 --- a/pkg/ccl/backupccl/schedule_pts_chaining.go +++ b/pkg/ccl/backupccl/schedule_pts_chaining.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -56,9 +57,10 @@ func maybeUpdateSchedulePTSRecord( } return exec.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + ie := exec.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) // We cannot rely on b.job containing created_by_id because on job // resumption the registry does not populate the resumers' CreatedByInfo. - datums, err := exec.InternalExecutor.QueryRowEx( + datums, err := ie.QueryRowEx( ctx, "lookup-schedule-info", txn, @@ -77,8 +79,7 @@ func maybeUpdateSchedulePTSRecord( } scheduleID := int64(tree.MustBeDInt(datums[0])) - _, args, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env, txn, - exec.InternalExecutor, scheduleID) + _, args, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env, txn, ie, scheduleID) if err != nil { return errors.Wrap(err, "load scheduled job") } @@ -132,10 +133,11 @@ func manageFullBackupPTSChaining( exec *sql.ExecutorConfig, args *ScheduledBackupExecutionArgs, ) error { + ie := exec.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) // Let's resolve the dependent incremental schedule as the first step. If the // schedule has been dropped then we can avoid doing unnecessary work. incSj, incArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env, txn, - exec.InternalExecutor, args.DependentScheduleID) + ie, args.DependentScheduleID) if err != nil { if jobs.HasScheduledJobNotFoundError(err) { log.Warningf(ctx, "could not find dependent schedule with id %d", @@ -180,7 +182,7 @@ func manageFullBackupPTSChaining( return err } incSj.SetExecutionDetails(incSj.ExecutorType(), jobspb.ExecutionArguments{Args: any}) - return incSj.Update(ctx, exec.InternalExecutor, txn) + return incSj.Update(ctx, ie, txn) } // manageIncrementalBackupPTSChaining is invoked on successful completion of an diff --git a/pkg/ccl/backupccl/system_schema.go b/pkg/ccl/backupccl/system_schema.go index 1501f0e78e03..dad9a9468f9d 100644 --- a/pkg/ccl/backupccl/system_schema.go +++ b/pkg/ccl/backupccl/system_schema.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -89,7 +90,7 @@ func defaultSystemTableRestoreFunc( txn *kv.Txn, systemTableName, tempTableName string, ) error { - executor := execCfg.InternalExecutor + executor := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE true", systemTableName) opName := systemTableName + "-data-deletion" @@ -117,7 +118,7 @@ func defaultSystemTableRestoreFunc( func jobsMigrationFunc( ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, tempTableName string, ) (err error) { - executor := execCfg.InternalExecutor + executor := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) const statesToRevert = `('` + string(jobs.StatusRunning) + `', ` + `'` + string(jobs.StatusPauseRequested) + `', ` + @@ -191,7 +192,7 @@ func jobsRestoreFunc( txn *kv.Txn, systemTableName, tempTableName string, ) error { - executor := execCfg.InternalExecutor + executor := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) // When restoring jobs, don't clear the existing table. @@ -212,7 +213,7 @@ func settingsRestoreFunc( txn *kv.Txn, systemTableName, tempTableName string, ) error { - executor := execCfg.InternalExecutor + executor := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE name <> 'version'", systemTableName) opName := systemTableName + "-data-deletion" diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go index 041f94475016..e810ca38190e 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go +++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go @@ -83,13 +83,15 @@ func New( metrics *Metrics, ) SchemaFeed { m := &schemaFeed{ - filter: schemaChangeEventFilters[events], - db: cfg.DB, - clock: cfg.DB.Clock(), - settings: cfg.Settings, - targets: targets, - leaseMgr: cfg.LeaseManager.(*lease.Manager), - ie: cfg.SessionBoundInternalExecutorFactory(ctx, &sessiondata.SessionData{}), + filter: schemaChangeEventFilters[events], + db: cfg.DB, + clock: cfg.DB.Clock(), + settings: cfg.Settings, + targets: targets, + leaseMgr: cfg.LeaseManager.(*lease.Manager), + ie: cfg.SessionBoundInternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(&sessiondata.SessionData{}) + }), collectionFactory: cfg.CollectionFactory, metrics: metrics, } diff --git a/pkg/ccl/importccl/BUILD.bazel b/pkg/ccl/importccl/BUILD.bazel index 57e5da4a1bf2..837b29a3a0ea 100644 --- a/pkg/ccl/importccl/BUILD.bazel +++ b/pkg/ccl/importccl/BUILD.bazel @@ -73,6 +73,7 @@ go_library( "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/sql/sqltelemetry", + "//pkg/sql/sqlutil", "//pkg/sql/stats", "//pkg/sql/types", "//pkg/util", diff --git a/pkg/ccl/importccl/import_job.go b/pkg/ccl/importccl/import_job.go index 4b4b88af13ae..fdca0bace4e9 100644 --- a/pkg/ccl/importccl/import_job.go +++ b/pkg/ccl/importccl/import_job.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -165,8 +166,8 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { // is in keeping with the semantics we use when creating a schema during // sql execution. Namely, queue job in the txn which creates the schema // desc and run once the txn has committed. - if err := p.ExecCfg().JobRegistry.Run(ctx, p.ExecCfg().InternalExecutor, - schemaMetadata.queuedSchemaJobs); err != nil { + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + if err := p.ExecCfg().JobRegistry.Run(ctx, ie, schemaMetadata.queuedSchemaJobs); err != nil { return err } @@ -954,6 +955,7 @@ func (r *importResumer) publishTables( func (r *importResumer) writeStubStatisticsForImportedTables( ctx context.Context, execCfg *sql.ExecutorConfig, res roachpb.BulkOpSummary, ) { + ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) details := r.job.Details().(jobspb.ImportDetails) for _, tbl := range details.Tables { if tbl.IsNew { @@ -977,7 +979,7 @@ func (r *importResumer) writeStubStatisticsForImportedTables( statistic.AvgSize = avgRowSize } // TODO(michae2): parallelize insertion of statistics. - err = stats.InsertNewStats(ctx, execCfg.InternalExecutor, nil /* txn */, statistics) + err = stats.InsertNewStats(ctx, ie, nil /* txn */, statistics) } if err != nil { // Failure to create statistics should not fail the entire import. @@ -1317,7 +1319,8 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) // This would be a job to drop all the schemas, and a job to update the parent // database descriptor. if len(jobsToRunAfterTxnCommit) != 0 { - if err := p.ExecCfg().JobRegistry.Run(ctx, p.ExecCfg().InternalExecutor, + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + if err := p.ExecCfg().JobRegistry.Run(ctx, ie, jobsToRunAfterTxnCommit); err != nil { return errors.Wrap(err, "failed to run jobs that drop the imported schemas") } diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index dac6e12927ea..8e025a52aa22 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -723,9 +723,9 @@ func (j *Job) FractionCompleted() float32 { // sessionBoundInternalExecutorFactory for a more detailed explanation of why // this exists. func (j *Job) MakeSessionBoundInternalExecutor( - ctx context.Context, sd *sessiondata.SessionData, + ctx context.Context, initInternalExecutor func(sqlutil.InternalExecutor), ) sqlutil.InternalExecutor { - return j.registry.sessionBoundInternalExecutorFactory(ctx, sd) + return j.registry.sessionBoundInternalExecutorFactory(ctx, initInternalExecutor) } func (j *Job) runInTxn( diff --git a/pkg/migration/migrationjob/migration_job.go b/pkg/migration/migrationjob/migration_job.go index 7e2c468f64bd..df6e65626816 100644 --- a/pkg/migration/migrationjob/migration_job.go +++ b/pkg/migration/migrationjob/migration_job.go @@ -61,7 +61,7 @@ func (r resumer) Resume(ctx context.Context, execCtxI interface{}) error { execCtx := execCtxI.(sql.JobExecContext) pl := r.j.Payload() cv := *pl.GetMigration().ClusterVersion - ie := execCtx.ExecCfg().InternalExecutor + ie := execCtx.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) alreadyCompleted, err := CheckIfMigrationCompleted(ctx, nil /* txn */, ie, cv) if alreadyCompleted || err != nil { @@ -88,7 +88,7 @@ func (r resumer) Resume(ctx context.Context, execCtxI interface{}) error { Settings: execCtx.ExecCfg().Settings, CollectionFactory: execCtx.ExecCfg().CollectionFactory, LeaseManager: execCtx.ExecCfg().LeaseManager, - InternalExecutor: execCtx.ExecCfg().InternalExecutor, + InternalExecutor: ie, TestingKnobs: execCtx.ExecCfg().MigrationTestingKnobs, }, r.j) default: diff --git a/pkg/server/authentication.go b/pkg/server/authentication.go index ef0743dd650c..bdba605cac4b 100644 --- a/pkg/server/authentication.go +++ b/pkg/server/authentication.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/ui" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -254,10 +255,11 @@ func (s *authenticationServer) UserLoginFromSSO( // without further normalization. username, _ := security.MakeSQLUsernameFromUserInput(reqUsername, security.UsernameValidation) + ie := s.server.sqlServer.execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) exists, canLogin, _, _, _, _, err := sql.GetUserSessionInitInfo( ctx, s.server.sqlServer.execCfg, - s.server.sqlServer.execCfg.InternalExecutor, + ie.(*sql.InternalExecutor), username, "", /* databaseName */ ) @@ -417,10 +419,11 @@ WHERE id = $1` func (s *authenticationServer) verifyPassword( ctx context.Context, username security.SQLUsername, password string, ) (valid bool, expired bool, err error) { + ie := s.server.sqlServer.execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) exists, canLogin, _, validUntil, _, pwRetrieveFn, err := sql.GetUserSessionInitInfo( ctx, s.server.sqlServer.execCfg, - s.server.sqlServer.execCfg.InternalExecutor, + ie.(*sql.InternalExecutor), username, "", /* databaseName */ ) diff --git a/pkg/server/node.go b/pkg/server/node.go index a63c735a31e1..6ab8c836ab52 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/admission" @@ -177,7 +178,7 @@ type Node struct { clusterID *base.ClusterIDContainer // UUID for Cockroach cluster Descriptor roachpb.NodeDescriptor // Node ID, network/physical topology storeCfg kvserver.StoreConfig // Config to use and pass to stores - sqlExec *sql.InternalExecutor // For event logging + sqlExec sqlutil.InternalExecutor // For event logging stores *kvserver.Stores // Access to node-local stores metrics nodeMetrics recorder *status.MetricsRecorder @@ -311,6 +312,7 @@ func bootstrapCluster( // before the ExecutorConfig is initialized). In that case, InitLogger() needs // to be called before the Node is used. func NewNode( + ctx context.Context, cfg kvserver.StoreConfig, recorder *status.MetricsRecorder, reg *metric.Registry, @@ -324,9 +326,10 @@ func NewNode( tenantUsage multitenant.TenantUsageServer, spanConfigAccessor spanconfig.KVAccessor, ) *Node { - var sqlExec *sql.InternalExecutor + var sqlExec sqlutil.InternalExecutor if execCfg != nil { - sqlExec = execCfg.InternalExecutor + ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + sqlExec = ie } n := &Node{ storeCfg: cfg, @@ -347,8 +350,9 @@ func NewNode( } // InitLogger needs to be called if a nil execCfg was passed to NewNode(). -func (n *Node) InitLogger(execCfg *sql.ExecutorConfig) { - n.sqlExec = execCfg.InternalExecutor +func (n *Node) InitLogger(ctx context.Context, execCfg *sql.ExecutorConfig) { + ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + n.sqlExec = ie } // String implements fmt.Stringer. diff --git a/pkg/server/server.go b/pkg/server/server.go index 3cc1f180c84c..26642defd750 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -80,6 +80,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/optionalnodeliveness" "github.com/cockroachdb/cockroach/pkg/sql/pgwire" _ "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scjob" // register jobs declared outside of pkg/sql + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/ts" @@ -554,7 +555,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { // The InternalExecutor will be further initialized later, as we create more // of the server's components. There's a circular dependency - many things - // need an InternalExecutor, but the InternalExecutor needs an xecutorConfig, + // need an InternalExecutor, but the InternalExecutor needs an ExecutorConfig, // which in turn needs many things. That's why everybody that needs an // InternalExecutor uses this one instance. internalExecutor := &sql.InternalExecutor{} @@ -681,7 +682,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { registry.AddMetricStruct(tenantUsage.Metrics()) node := NewNode( - storeCfg, recorder, registry, stopper, + ctx, storeCfg, recorder, registry, stopper, txnMetrics, stores, nil /* execCfg */, &rpcContext.ClusterID, gcoords.Regular.GetWorkQueue(admission.KVWork), gcoords.Stores, tenantUsage, spanConfigAccessor, @@ -803,7 +804,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { sStatus.setStmtDiagnosticsRequester(sqlServer.execCfg.StmtDiagnosticsRecorder) sStatus.baseStatusServer.sqlServer = sqlServer debugServer := debug.NewServer(st, sqlServer.pgServer.HBADebugFn(), sStatus) - node.InitLogger(sqlServer.execCfg) + node.InitLogger(ctx, sqlServer.execCfg) *lateBoundServer = Server{ nodeIDContainer: nodeIDContainer, @@ -2540,9 +2541,10 @@ func (s *Server) Decommission( // the node liveness range. Better to make the event logging best effort // than to slow down future node liveness transactions. if err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + ie := s.sqlServer.execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) return sql.InsertEventRecord( ctx, - s.sqlServer.execCfg.InternalExecutor, + ie.(*sql.InternalExecutor), txn, int32(s.NodeID()), /* reporting ID: the node where the event is logged */ sql.LogToSystemTable|sql.LogToDevChannelIfVerbose, /* we already call log.StructuredEvent above */ diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 80d489217d3f..112b4ebaf3f9 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -774,7 +774,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { // SessionBoundInternalExecutorFactory. The same applies for setting a // SessionBoundInternalExecutor on the job registry. ieFactory := func( - ctx context.Context, sessionData *sessiondata.SessionData, + ctx context.Context, initInternalExecutor func(ie sqlutil.InternalExecutor), ) sqlutil.InternalExecutor { ie := sql.MakeInternalExecutor( ctx, @@ -782,7 +782,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { internalMemMetrics, cfg.Settings, ) - ie.SetSessionData(sessionData) + initInternalExecutor(&ie) return &ie } @@ -806,7 +806,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { *cfg.circularInternalExecutor = sql.MakeInternalExecutor( ctx, pgServer.SQLServer, internalMemMetrics, cfg.Settings, ) - execCfg.InternalExecutor = cfg.circularInternalExecutor stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry( cfg.circularInternalExecutor, cfg.db, diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 7c39f7584b58..0cca12d07882 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -616,6 +616,7 @@ go_test( "//pkg/sql/sqlliveness", "//pkg/sql/sqlstats", "//pkg/sql/sqltestutils", + "//pkg/sql/sqlutil", "//pkg/sql/stats", "//pkg/sql/stmtdiagnostics", "//pkg/sql/tests", diff --git a/pkg/sql/alter_role.go b/pkg/sql/alter_role.go index 8d8493995621..36447778d824 100644 --- a/pkg/sql/alter_role.go +++ b/pkg/sql/alter_role.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessioninit" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/errors" @@ -157,7 +158,8 @@ func (n *alterRoleNode) startExec(params runParams) error { } // Check if role exists. - row, err := params.extendedEvalCtx.ExecCfg.InternalExecutor.QueryRowEx( + ie := params.extendedEvalCtx.ExecCfg.InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}) + row, err := ie.QueryRowEx( params.ctx, opName, params.p.txn, @@ -220,7 +222,8 @@ func (n *alterRoleNode) startExec(params runParams) error { // Updating PASSWORD is a special case since PASSWORD lives in system.users // while the rest of the role options lives in system.role_options. - _, err = params.extendedEvalCtx.ExecCfg.InternalExecutor.Exec( + ie := params.extendedEvalCtx.ExecCfg.InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}) + _, err = ie.Exec( params.ctx, opName, params.p.txn, @@ -263,7 +266,8 @@ func (n *alterRoleNode) startExec(params runParams) error { } } - _, err := params.extendedEvalCtx.ExecCfg.InternalExecutor.ExecEx( + ie := params.extendedEvalCtx.ExecCfg.InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}) + _, err := ie.ExecEx( params.ctx, opName, params.p.txn, @@ -465,8 +469,9 @@ func (n *alterRoleSetNode) startExec(params runParams) error { upsertOrDeleteFunc := func(newSettings []string) error { var rowsAffected int var internalExecErr error + ie := params.extendedEvalCtx.ExecCfg.InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}) if newSettings == nil { - rowsAffected, internalExecErr = params.extendedEvalCtx.ExecCfg.InternalExecutor.ExecEx( + rowsAffected, internalExecErr = ie.ExecEx( params.ctx, opName, params.p.txn, @@ -476,7 +481,7 @@ func (n *alterRoleSetNode) startExec(params runParams) error { roleName, ) } else { - rowsAffected, internalExecErr = params.extendedEvalCtx.ExecCfg.InternalExecutor.ExecEx( + rowsAffected, internalExecErr = ie.ExecEx( params.ctx, opName, params.p.txn, @@ -553,7 +558,8 @@ func (n *alterRoleSetNode) getRoleName( return false, security.SQLUsername{}, pgerror.Newf(pgcode.InsufficientPrivilege, "cannot edit public role") } // Check if role exists. - row, err := params.extendedEvalCtx.ExecCfg.InternalExecutor.QueryRowEx( + ie := params.extendedEvalCtx.ExecCfg.InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}) + row, err := ie.QueryRowEx( params.ctx, opName, params.p.txn, @@ -591,7 +597,8 @@ func (n *alterRoleSetNode) makeNewSettings( `SELECT settings FROM %s WHERE database_id = $1 AND role_name = $2`, sessioninit.DatabaseRoleSettingsTableName, ) - datums, err := params.extendedEvalCtx.ExecCfg.InternalExecutor.QueryRowEx( + ie := params.extendedEvalCtx.ExecCfg.InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}) + datums, err := ie.QueryRowEx( params.ctx, opName, params.p.txn, diff --git a/pkg/sql/alter_table.go b/pkg/sql/alter_table.go index 97d547d92490..d10e8767f965 100644 --- a/pkg/sql/alter_table.go +++ b/pkg/sql/alter_table.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" @@ -156,6 +157,7 @@ func (n *alterTableNode) startExec(params runParams) error { "%q was not resolved as a table but is %T", resolved, resolved) } + ie := params.extendedEvalCtx.ExecCfg.InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}) for i, cmd := range n.n.Cmds { telemetry.Inc(cmd.TelemetryCounter()) @@ -809,7 +811,7 @@ func (n *alterTableNode) startExec(params runParams) error { "constraint %q in the middle of being added, try again later", t.Constraint) } if err := validateCheckInTxn( - params.ctx, ¶ms.p.semaCtx, params.ExecCfg().InternalExecutor, params.SessionData(), n.tableDesc, params.EvalContext().Txn, ck.Expr, + params.ctx, ¶ms.p.semaCtx, ie, params.SessionData(), n.tableDesc, params.EvalContext().Txn, ck.Expr, ); err != nil { return err } @@ -831,7 +833,7 @@ func (n *alterTableNode) startExec(params runParams) error { "constraint %q in the middle of being added, try again later", t.Constraint) } if err := validateFkInTxn( - params.ctx, params.p.LeaseMgr(), params.ExecCfg().InternalExecutor, + params.ctx, ie, n.tableDesc, params.EvalContext().Txn, name, params.EvalContext().Codec, ); err != nil { return err @@ -855,7 +857,7 @@ func (n *alterTableNode) startExec(params runParams) error { "constraint %q in the middle of being added, try again later", t.Constraint) } if err := validateUniqueWithoutIndexConstraintInTxn( - params.ctx, params.ExecCfg().InternalExecutor, n.tableDesc, params.EvalContext().Txn, name, + params.ctx, ie, n.tableDesc, params.EvalContext().Txn, name, ); err != nil { return err } @@ -1459,8 +1461,9 @@ func injectTableStats( return err } + ie := params.extendedEvalCtx.ExecCfg.InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}) // First, delete all statistics for the table. - if _ /* rows */, err := params.extendedEvalCtx.ExecCfg.InternalExecutor.Exec( + if _ /* rows */, err := ie.Exec( params.ctx, "delete-stats", params.EvalContext().Txn, @@ -1519,7 +1522,7 @@ func insertJSONStatistic( ) error { var ( ctx = params.ctx - ie = params.ExecCfg().InternalExecutor + ie = params.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) txn = params.EvalContext().Txn settings = params.ExecCfg().Settings ) @@ -1584,7 +1587,8 @@ func insertJSONStatistic( func (p *planner) removeColumnComment( ctx context.Context, tableID descpb.ID, columnID descpb.ColumnID, ) error { - _, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.ExecEx( + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + _, err := ie.ExecEx( ctx, "delete-column-comment", p.txn, diff --git a/pkg/sql/authorization.go b/pkg/sql/authorization.go index 04a49a1849ac..0ceee479b747 100644 --- a/pkg/sql/authorization.go +++ b/pkg/sql/authorization.go @@ -317,7 +317,7 @@ func (p *planner) MemberOfWithAdminOption( return MemberOfWithAdminOption( ctx, p.execCfg, - p.ExecCfg().InternalExecutor, + p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}), p.Descriptors(), p.Txn(), member, @@ -492,7 +492,8 @@ func (p *planner) HasRoleOption(ctx context.Context, roleOption roleoption.Optio return true, nil } - hasRolePrivilege, err := p.ExecCfg().InternalExecutor.QueryRowEx( + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + hasRolePrivilege, err := ie.QueryRowEx( ctx, "has-role-option", p.Txn(), sessiondata.InternalExecutorOverride{User: security.RootUserName()}, fmt.Sprintf( diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 922e72c2eed1..6ea93537de2e 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -715,7 +715,7 @@ func (sc *SchemaChanger) validateConstraints( return err } } else if c.IsForeignKey() { - if err := validateFkInTxn(ctx, sc.leaseMgr, evalCtx.SchemaChangeInternalExecutor, desc, txn, c.GetName(), evalCtx.Codec); err != nil { + if err := validateFkInTxn(ctx, evalCtx.SchemaChangeInternalExecutor, desc, txn, c.GetName(), evalCtx.Codec); err != nil { return err } } else if c.IsUniqueWithoutIndex() { @@ -2051,11 +2051,12 @@ func runSchemaChangesInTxn( // Now that the table descriptor is in a valid state with all column and index // mutations applied, it can be used for validating check/FK constraints. for _, c := range constraintAdditionMutations { + ie := planner.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) if c.IsCheck() || c.IsNotNull() { check := &c.ConstraintToUpdateDesc().Check if check.Validity == descpb.ConstraintValidity_Validating { if err := validateCheckInTxn( - ctx, &planner.semaCtx, planner.ExecCfg().InternalExecutor, planner.SessionData(), tableDesc, planner.txn, check.Expr, + ctx, &planner.semaCtx, ie, planner.SessionData(), tableDesc, planner.txn, check.Expr, ); err != nil { return err } @@ -2079,7 +2080,7 @@ func runSchemaChangesInTxn( uwi := &c.ConstraintToUpdateDesc().UniqueWithoutIndexConstraint if uwi.Validity == descpb.ConstraintValidity_Validating { if err := validateUniqueWithoutIndexConstraintInTxn( - ctx, planner.ExecCfg().InternalExecutor, tableDesc, planner.txn, c.GetName(), + ctx, ie, tableDesc, planner.txn, c.GetName(), ); err != nil { return err } @@ -2152,7 +2153,7 @@ func runSchemaChangesInTxn( func validateCheckInTxn( ctx context.Context, semaCtx *tree.SemaContext, - ie *InternalExecutor, + ie sqlutil.InternalExecutor, sessionData *sessiondata.SessionData, tableDesc *tabledesc.Mutable, txn *kv.Txn, @@ -2181,8 +2182,7 @@ func validateCheckInTxn( // reuse an existing kv.Txn safely. func validateFkInTxn( ctx context.Context, - leaseMgr *lease.Manager, - ie *InternalExecutor, + ie sqlutil.InternalExecutor, tableDesc *tabledesc.Mutable, txn *kv.Txn, fkName string, @@ -2224,7 +2224,7 @@ func validateFkInTxn( // reuse an existing kv.Txn safely. func validateUniqueWithoutIndexConstraintInTxn( ctx context.Context, - ie *InternalExecutor, + ie sqlutil.InternalExecutor, tableDesc *tabledesc.Mutable, txn *kv.Txn, constraintName string, diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 128145126a64..a17d6277f96f 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -217,7 +217,7 @@ func (t *leaseTest) node(nodeID uint32) *lease.Manager { nc, cfgCpy.DB, cfgCpy.Clock, - cfgCpy.InternalExecutor, + cfgCpy.InternalExecutorFactory(context.Background(), func(ie sqlutil.InternalExecutor) {}), cfgCpy.Settings, cfgCpy.Codec, t.leaseManagerTestingKnobs, diff --git a/pkg/sql/check.go b/pkg/sql/check.go index 109afbb655f6..5a884eabcb37 100644 --- a/pkg/sql/check.go +++ b/pkg/sql/check.go @@ -44,7 +44,7 @@ func validateCheckExpr( sessionData *sessiondata.SessionData, exprStr string, tableDesc *tabledesc.Mutable, - ie *InternalExecutor, + ie sqlutil.InternalExecutor, txn *kv.Txn, ) error { expr, err := schemaexpr.FormatExprForDisplay(ctx, tableDesc, exprStr, semaCtx, sessionData, tree.FmtParsable) @@ -237,7 +237,7 @@ func validateForeignKey( ctx context.Context, srcTable *tabledesc.Mutable, fk *descpb.ForeignKeyConstraint, - ie *InternalExecutor, + ie sqlutil.InternalExecutor, txn *kv.Txn, codec keys.SQLCodec, ) error { diff --git a/pkg/sql/comment_on_column.go b/pkg/sql/comment_on_column.go index c80d9106a457..0106a2bcf9b4 100644 --- a/pkg/sql/comment_on_column.go +++ b/pkg/sql/comment_on_column.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" ) @@ -60,8 +61,9 @@ func (n *commentOnColumnNode) startExec(params runParams) error { return err } + ie := params.p.extendedEvalCtx.ExecCfg.InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}) if n.n.Comment != nil { - _, err := params.p.extendedEvalCtx.ExecCfg.InternalExecutor.ExecEx( + _, err := ie.ExecEx( params.ctx, "set-column-comment", params.p.Txn(), @@ -75,7 +77,7 @@ func (n *commentOnColumnNode) startExec(params runParams) error { return err } } else { - _, err := params.p.extendedEvalCtx.ExecCfg.InternalExecutor.ExecEx( + _, err := ie.ExecEx( params.ctx, "delete-column-comment", params.p.Txn(), diff --git a/pkg/sql/comment_on_constraint.go b/pkg/sql/comment_on_constraint.go index 2feea4f12e21..5e61bbd47de9 100644 --- a/pkg/sql/comment_on_constraint.go +++ b/pkg/sql/comment_on_constraint.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" ) @@ -91,10 +92,12 @@ func (n *commentOnConstraintNode) startExec(params runParams) error { n.oid = hasher.CheckConstraintOid(n.tableDesc.GetParentID(), schema.GetName(), n.tableDesc.GetID(), constraintDesc) } + + ie := params.p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}) // Setting the comment to NULL is the // equivalent of deleting the comment. if n.n.Comment != nil { - _, err := params.p.extendedEvalCtx.ExecCfg.InternalExecutor.ExecEx( + _, err := ie.ExecEx( params.ctx, "set-constraint-comment", params.p.Txn(), @@ -108,7 +111,7 @@ func (n *commentOnConstraintNode) startExec(params runParams) error { return err } } else { - _, err := params.p.extendedEvalCtx.ExecCfg.InternalExecutor.ExecEx( + _, err := ie.ExecEx( params.ctx, "delete-constraint-comment", params.p.Txn(), diff --git a/pkg/sql/comment_on_database.go b/pkg/sql/comment_on_database.go index 7f9052fbe093..e38ab295839f 100644 --- a/pkg/sql/comment_on_database.go +++ b/pkg/sql/comment_on_database.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" ) @@ -54,8 +55,9 @@ func (p *planner) CommentOnDatabase( } func (n *commentOnDatabaseNode) startExec(params runParams) error { + ie := params.p.extendedEvalCtx.ExecCfg.InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}) if n.n.Comment != nil { - _, err := params.p.extendedEvalCtx.ExecCfg.InternalExecutor.ExecEx( + _, err := ie.ExecEx( params.ctx, "set-db-comment", params.p.Txn(), @@ -68,7 +70,7 @@ func (n *commentOnDatabaseNode) startExec(params runParams) error { return err } } else { - _, err := params.p.extendedEvalCtx.ExecCfg.InternalExecutor.ExecEx( + _, err := ie.ExecEx( params.ctx, "delete-db-comment", params.p.Txn(), diff --git a/pkg/sql/comment_on_index.go b/pkg/sql/comment_on_index.go index 82d5521d5373..bc974ab10700 100644 --- a/pkg/sql/comment_on_index.go +++ b/pkg/sql/comment_on_index.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" ) @@ -89,7 +90,8 @@ func (n *commentOnIndexNode) startExec(params runParams) error { func (p *planner) upsertIndexComment( ctx context.Context, tableID descpb.ID, indexID descpb.IndexID, comment string, ) error { - _, err := p.extendedEvalCtx.ExecCfg.InternalExecutor.ExecEx( + ie := p.extendedEvalCtx.ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + _, err := ie.ExecEx( ctx, "set-index-comment", p.Txn(), @@ -106,7 +108,8 @@ func (p *planner) upsertIndexComment( func (p *planner) removeIndexComment( ctx context.Context, tableID descpb.ID, indexID descpb.IndexID, ) error { - _, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.ExecEx( + ie := p.extendedEvalCtx.ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + _, err := ie.ExecEx( ctx, "delete-index-comment", p.txn, diff --git a/pkg/sql/comment_on_schema.go b/pkg/sql/comment_on_schema.go index 9e2095dd5383..e513de154ce1 100644 --- a/pkg/sql/comment_on_schema.go +++ b/pkg/sql/comment_on_schema.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" ) type commentOnSchemaNode struct { @@ -67,8 +68,10 @@ func (p *planner) CommentOnSchema(ctx context.Context, n *tree.CommentOnSchema) } func (n *commentOnSchemaNode) startExec(params runParams) error { + ie := params.p.extendedEvalCtx.ExecCfg.InternalExecutorFactory(params.ctx, + func(ie sqlutil.InternalExecutor) {}) if n.n.Comment != nil { - _, err := params.p.extendedEvalCtx.ExecCfg.InternalExecutor.ExecEx( + _, err := ie.ExecEx( params.ctx, "set-schema-comment", params.p.Txn(), diff --git a/pkg/sql/comment_on_table.go b/pkg/sql/comment_on_table.go index 421272b8dd27..91558dcbd73d 100644 --- a/pkg/sql/comment_on_table.go +++ b/pkg/sql/comment_on_table.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" ) @@ -53,8 +54,9 @@ func (p *planner) CommentOnTable(ctx context.Context, n *tree.CommentOnTable) (p } func (n *commentOnTableNode) startExec(params runParams) error { + ie := params.p.extendedEvalCtx.ExecCfg.InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}) if n.n.Comment != nil { - _, err := params.p.extendedEvalCtx.ExecCfg.InternalExecutor.ExecEx( + _, err := ie.ExecEx( params.ctx, "set-table-comment", params.p.Txn(), @@ -67,7 +69,7 @@ func (n *commentOnTableNode) startExec(params runParams) error { return err } } else { - _, err := params.p.extendedEvalCtx.ExecCfg.InternalExecutor.ExecEx( + _, err := ie.ExecEx( params.ctx, "delete-table-comment", params.p.Txn(), diff --git a/pkg/sql/compact_sql_stats.go b/pkg/sql/compact_sql_stats.go index 497f456d60a9..e67622dbd581 100644 --- a/pkg/sql/compact_sql_stats.go +++ b/pkg/sql/compact_sql_stats.go @@ -41,7 +41,7 @@ var _ jobs.Resumer = &sqlStatsCompactionResumer{} func (r *sqlStatsCompactionResumer) Resume(ctx context.Context, execCtx interface{}) error { log.Infof(ctx, "starting sql stats compaction job") p := execCtx.(JobExecContext) - ie := p.ExecCfg().InternalExecutor + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) db := p.ExecCfg().DB var ( @@ -81,7 +81,7 @@ func (r *sqlStatsCompactionResumer) Resume(ctx context.Context, execCtx interfac r.st, ie, db, - ie.s.ServerMetrics.StatsMetrics.SQLStatsRemovedRows, + ie.(*InternalExecutor).s.ServerMetrics.StatsMetrics.SQLStatsRemovedRows, p.ExecCfg().SQLStatsTestingKnobs) if err = statsCompactor.DeleteOldestEntries(ctx); err != nil { return err @@ -98,7 +98,7 @@ func (r *sqlStatsCompactionResumer) Resume(ctx context.Context, execCtx interfac func (r *sqlStatsCompactionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { p := execCtx.(JobExecContext) execCfg := p.ExecCfg() - ie := execCfg.InternalExecutor + ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) return r.maybeNotifyJobTerminated(ctx, ie, execCfg, jobs.StatusFailed) } diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index d6335c871eb5..77040d5c6628 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -50,6 +50,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/sslocal" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/buildutil" @@ -340,7 +341,7 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server { cfg.SQLStatsTestingKnobs, ) reportedSQLStatsController := - reportedSQLStats.GetController(cfg.SQLStatusServer, cfg.DB, cfg.InternalExecutor) + reportedSQLStats.GetController(cfg.SQLStatusServer) memSQLStats := sslocal.New( cfg.Settings, sqlstats.MaxMemSQLStatsStmtFingerprints, @@ -2728,7 +2729,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.SessionStartPostCommitJob, timeutil.Now()) if err := ex.server.cfg.JobRegistry.Run( ex.ctxHolder.connCtx, - ex.server.cfg.InternalExecutor, + ex.server.cfg.InternalExecutorFactory(ex.Ctx(), func(ie sqlutil.InternalExecutor) {}), ex.extraTxnState.jobs); err != nil { handleErr(err) } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index a1758b6a3b4f..668ed6d683c4 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -40,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" @@ -780,7 +781,7 @@ func (ex *connExecutor) checkDescriptorTwoVersionInvariant(ctx context.Context) retryErr, err := descs.CheckTwoVersionInvariant( ctx, ex.server.cfg.Clock, - ex.server.cfg.InternalExecutor, + ex.server.cfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}), &ex.extraTxnState.descCollection, ex.state.mu.txn, inRetryBackoff, diff --git a/pkg/sql/control_schedules.go b/pkg/sql/control_schedules.go index 3c053cb86d39..0801f90fa73e 100644 --- a/pkg/sql/control_schedules.go +++ b/pkg/sql/control_schedules.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/errors" ) @@ -63,7 +64,8 @@ func loadSchedule(params runParams, scheduleID tree.Datum) (*jobs.ScheduledJob, // Load schedule expression. This is needed for resume command, but we // also use this query to check for the schedule existence. - datums, cols, err := params.ExecCfg().InternalExecutor.QueryRowExWithCols( + ie := params.ExecCfg().InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}) + datums, cols, err := ie.QueryRowExWithCols( params.ctx, "load-schedule", params.EvalContext().Txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, @@ -91,7 +93,7 @@ func loadSchedule(params runParams, scheduleID tree.Datum) (*jobs.ScheduledJob, func updateSchedule(params runParams, schedule *jobs.ScheduledJob) error { return schedule.Update( params.ctx, - params.ExecCfg().InternalExecutor, + params.ExecCfg().InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}), params.EvalContext().Txn, ) } @@ -99,7 +101,8 @@ func updateSchedule(params runParams, schedule *jobs.ScheduledJob) error { // deleteSchedule deletes specified schedule. func deleteSchedule(params runParams, scheduleID int64) error { env := jobSchedulerEnv(params) - _, err := params.ExecCfg().InternalExecutor.ExecEx( + ie := params.ExecCfg().InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}) + _, err := ie.ExecEx( params.ctx, "delete-schedule", params.EvalContext().Txn, @@ -153,8 +156,9 @@ func (n *controlSchedulesNode) startExec(params runParams) error { return errors.Wrap(err, "failed to get scheduled job executor during drop") } if controller, ok := ex.(jobs.ScheduledJobController); ok { + ie := params.ExecCfg().InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}) scheduleControllerEnv := scheduledjobs.MakeProdScheduleControllerEnv( - params.ExecCfg().ProtectedTimestampProvider, params.ExecCfg().InternalExecutor) + params.ExecCfg().ProtectedTimestampProvider, ie) if err := controller.OnDrop(params.ctx, scheduleControllerEnv, scheduledjobs.ProdJobSchedulerEnv, schedule, params.extendedEvalCtx.Txn); err != nil { diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 9ea988f19b49..a95e427b544f 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -62,6 +62,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/sslocal" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/duration" "github.com/cockroachdb/cockroach/pkg/util/errorutil" @@ -502,7 +503,8 @@ CREATE TABLE crdb_internal.table_row_statistics ( ) AS l ON l."tableID" = s."tableID" AND l.last_dt = s."createdAt" AS OF SYSTEM TIME '%s' GROUP BY s."tableID"`, statsAsOfTimeClusterMode.String(&p.ExecCfg().Settings.SV)) - statRows, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryBufferedEx( + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + statRows, err := ie.QueryBufferedEx( ctx, "crdb-internal-statistics-table", nil, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, query) @@ -711,7 +713,8 @@ CREATE TABLE crdb_internal.jobs ( args = append(args, p.execCfg.JobRegistry.RetryInitialDelay(), p.execCfg.JobRegistry.RetryMaxDelay()) } - it, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryIteratorEx( + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + it, err := ie.QueryIteratorEx( ctx, "crdb-internal-jobs-table", p.txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, query, args...) @@ -3146,12 +3149,13 @@ CREATE TABLE crdb_internal.ranges_no_leases ( // getAllNames returns a map from ID to namespaceKey for every entry in // system.namespace. func (p *planner) getAllNames(ctx context.Context) (map[descpb.ID]catalog.NameKey, error) { - return getAllNames(ctx, p.txn, p.ExtendedEvalContext().ExecCfg.InternalExecutor) + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + return getAllNames(ctx, p.txn, ie) } // TestingGetAllNames is a wrapper for getAllNames. func TestingGetAllNames( - ctx context.Context, txn *kv.Txn, executor *InternalExecutor, + ctx context.Context, txn *kv.Txn, executor sqlutil.InternalExecutor, ) (map[descpb.ID]catalog.NameKey, error) { return getAllNames(ctx, txn, executor) } @@ -3159,7 +3163,7 @@ func TestingGetAllNames( // getAllNames is the testable implementation of getAllNames. // It is public so that it can be tested outside the sql package. func getAllNames( - ctx context.Context, txn *kv.Txn, executor *InternalExecutor, + ctx context.Context, txn *kv.Txn, executor sqlutil.InternalExecutor, ) (map[descpb.ID]catalog.NameKey, error) { namespace := map[descpb.ID]catalog.NameKey{} it, err := executor.QueryIterator( @@ -3243,7 +3247,8 @@ CREATE TABLE crdb_internal.zones ( // For some reason, if we use the iterator API here, "concurrent txn use // detected" error might occur, so we buffer up all zones first. - rows, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryBuffered( + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + rows, err := ie.QueryBuffered( ctx, "crdb-internal-zones-table", p.txn, `SELECT id, config FROM system.zones`) if err != nil { return err @@ -4367,7 +4372,8 @@ func collectMarshaledJobMetadataMap( // Build job map with referenced job IDs. m := make(marshaledJobMetadataMap) query := `SELECT id, status, payload, progress FROM system.jobs` - it, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryIteratorEx( + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + it, err := ie.QueryIteratorEx( ctx, "crdb-internal-jobs-table", p.Txn(), sessiondata.InternalExecutorOverride{User: security.RootUserName()}, query) @@ -4986,9 +4992,10 @@ CREATE TABLE crdb_internal.statement_statistics ( } execCfg := p.ExecCfg() + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) sqlStats := persistedsqlstats.New(&persistedsqlstats.Config{ Settings: execCfg.Settings, - InternalExecutor: execCfg.InternalExecutor, + InternalExecutor: ie, KvDB: execCfg.DB, SQLIDContainer: execCfg.NodeID, Knobs: execCfg.SQLStatsTestingKnobs, @@ -5138,9 +5145,10 @@ CREATE TABLE crdb_internal.transaction_statistics ( } execCfg := p.ExecCfg() + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) sqlStats := persistedsqlstats.New(&persistedsqlstats.Config{ Settings: execCfg.Settings, - InternalExecutor: execCfg.InternalExecutor, + InternalExecutor: ie, KvDB: execCfg.DB, SQLIDContainer: execCfg.NodeID, Knobs: execCfg.SQLStatsTestingKnobs, diff --git a/pkg/sql/create_role.go b/pkg/sql/create_role.go index 0fe1f3c202fa..4268b6b42b24 100644 --- a/pkg/sql/create_role.go +++ b/pkg/sql/create_role.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessioninit" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/errors" ) @@ -151,8 +152,10 @@ func (n *CreateRoleNode) startExec(params runParams) error { hashedPassword = []byte{} } + ie := params.extendedEvalCtx.ExecCfg.InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}) + // Check if the user/role exists. - row, err := params.extendedEvalCtx.ExecCfg.InternalExecutor.QueryRowEx( + row, err := ie.QueryRowEx( params.ctx, opName, params.p.txn, @@ -172,7 +175,7 @@ func (n *CreateRoleNode) startExec(params runParams) error { } // TODO(richardjcai): move hashedPassword column to system.role_options. - rowsAffected, err := params.extendedEvalCtx.ExecCfg.InternalExecutor.Exec( + rowsAffected, err := ie.Exec( params.ctx, opName, params.p.txn, @@ -214,7 +217,7 @@ func (n *CreateRoleNode) startExec(params runParams) error { } } - _, err = params.extendedEvalCtx.ExecCfg.InternalExecutor.ExecEx( + _, err = ie.ExecEx( params.ctx, opName, params.p.txn, diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index ea6d189e6baf..67f5f41179ce 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" @@ -176,7 +177,8 @@ func (n *createStatsNode) startJob(ctx context.Context, resultsCh chan<- tree.Da if errors.Is(err, stats.ConcurrentCreateStatsError) { // Delete the job so users don't see it and get confused by the error. const stmt = `DELETE FROM system.jobs WHERE id = $1` - if _ /* cols */, delErr := n.p.ExecCfg().InternalExecutor.Exec( + ie := n.p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + if _ /* cols */, delErr := ie.Exec( ctx, "delete-job", nil /* txn */, stmt, jobID, ); delErr != nil { log.Warningf(ctx, "failed to delete job: %v", delErr) @@ -670,7 +672,8 @@ func checkRunningJobs(ctx context.Context, job *jobs.Job, p JobExecContext) erro if job != nil { jobID = job.ID() } - exists, err := jobs.RunningJobExists(ctx, jobID, p.ExecCfg().InternalExecutor, nil /* txn */, func(payload *jobspb.Payload) bool { + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + exists, err := jobs.RunningJobExists(ctx, jobID, ie, nil /* txn */, func(payload *jobspb.Payload) bool { return payload.Type() == jobspb.TypeCreateStats || payload.Type() == jobspb.TypeAutoCreateStats }) diff --git a/pkg/sql/drop_database.go b/pkg/sql/drop_database.go index 64ac41f38ac9..afad67c067bf 100644 --- a/pkg/sql/drop_database.go +++ b/pkg/sql/drop_database.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessioninit" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/errors" @@ -306,7 +307,8 @@ func (p *planner) accumulateCascadingViews( } func (p *planner) removeDbComment(ctx context.Context, dbID descpb.ID) error { - _, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.ExecEx( + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + _, err := ie.ExecEx( ctx, "delete-db-comment", p.txn, @@ -323,7 +325,8 @@ func (p *planner) removeDbRoleSettings(ctx context.Context, dbID descpb.ID) erro if !p.EvalContext().Settings.Version.IsActive(ctx, clusterversion.DatabaseRoleSettings) { return nil } - rowsDeleted, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.ExecEx( + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + rowsDeleted, err := ie.ExecEx( ctx, "delete-db-role-settings", p.txn, diff --git a/pkg/sql/drop_role.go b/pkg/sql/drop_role.go index e42ca9972844..3e9570353b0a 100644 --- a/pkg/sql/drop_role.go +++ b/pkg/sql/drop_role.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessioninit" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/errors" @@ -277,6 +278,7 @@ func (n *DropRoleNode) startExec(params runParams) error { // All safe - do the work. var numRoleMembershipsDeleted, numRoleSettingsRowsDeleted int + ie := params.extendedEvalCtx.ExecCfg.InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}) for normalizedUsername := range userNames { // Specifically reject special users and roles. Some (root, admin) would fail with // "privileges still exist" first. @@ -290,7 +292,7 @@ func (n *DropRoleNode) startExec(params runParams) error { } // Check if user owns any scheduled jobs. - numSchedulesRow, err := params.ExecCfg().InternalExecutor.QueryRow( + numSchedulesRow, err := ie.QueryRow( params.ctx, "check-user-schedules", params.p.txn, @@ -310,7 +312,7 @@ func (n *DropRoleNode) startExec(params runParams) error { normalizedUsername, numSchedules) } - numUsersDeleted, err := params.extendedEvalCtx.ExecCfg.InternalExecutor.Exec( + numUsersDeleted, err := ie.Exec( params.ctx, opName, params.p.txn, @@ -326,7 +328,7 @@ func (n *DropRoleNode) startExec(params runParams) error { } // Drop all role memberships involving the user/role. - rowsDeleted, err := params.extendedEvalCtx.ExecCfg.InternalExecutor.Exec( + rowsDeleted, err := ie.Exec( params.ctx, "drop-role-membership", params.p.txn, @@ -338,7 +340,7 @@ func (n *DropRoleNode) startExec(params runParams) error { } numRoleMembershipsDeleted += rowsDeleted - _, err = params.extendedEvalCtx.ExecCfg.InternalExecutor.Exec( + _, err = ie.Exec( params.ctx, opName, params.p.txn, @@ -354,7 +356,7 @@ func (n *DropRoleNode) startExec(params runParams) error { // TODO(rafi): Remove this condition in 21.2. if params.EvalContext().Settings.Version.IsActive(params.ctx, clusterversion.DatabaseRoleSettings) { - rowsDeleted, err = params.extendedEvalCtx.ExecCfg.InternalExecutor.Exec( + rowsDeleted, err = ie.Exec( params.ctx, opName, params.p.txn, diff --git a/pkg/sql/drop_schema.go b/pkg/sql/drop_schema.go index 50209267e1fa..5a05f9b3469b 100644 --- a/pkg/sql/drop_schema.go +++ b/pkg/sql/drop_schema.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/errors" ) @@ -267,7 +268,8 @@ func (p *planner) createDropSchemaJob( } func (p *planner) removeSchemaComment(ctx context.Context, schemaID descpb.ID) error { - _, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.ExecEx( + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + _, err := ie.ExecEx( ctx, "delete-schema-comment", p.txn, diff --git a/pkg/sql/drop_table.go b/pkg/sql/drop_table.go index 31acefc4ca10..fb7ced02534c 100644 --- a/pkg/sql/drop_table.go +++ b/pkg/sql/drop_table.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -618,7 +619,8 @@ func removeMatchingReferences( } func (p *planner) removeTableComments(ctx context.Context, tableDesc *tabledesc.Mutable) error { - _, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.ExecEx( + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + _, err := ie.ExecEx( ctx, "delete-table-comments", p.txn, diff --git a/pkg/sql/event_log.go b/pkg/sql/event_log.go index 8d8d6da6258d..f51808bb4c27 100644 --- a/pkg/sql/event_log.go +++ b/pkg/sql/event_log.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -220,13 +221,14 @@ func logEventInternalForSchemaChanges( m.DescriptorID = uint32(descID) m.MutationID = uint32(mutationID) + ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) // Delegate the storing of the event to the regular event logic. // // We use depth=1 because the caller of this function typically // wraps the call in a db.Txn() callback, which confuses the vmodule // filtering. Easiest is to pretend the event is sourced here. return insertEventRecords( - ctx, execCfg.InternalExecutor, + ctx, ie.(*InternalExecutor), txn, int32(execCfg.NodeID.SQLInstanceID()), /* reporter ID */ 1, /* depth: use this function as origin */ @@ -287,7 +289,7 @@ func logEventInternalForSQLStatements( } return insertEventRecords(ctx, - execCfg.InternalExecutor, txn, + execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}), txn, int32(execCfg.NodeID.SQLInstanceID()), /* reporter ID */ 1+depth, /* depth */ opts, /* eventLogOptions */ @@ -343,6 +345,7 @@ func LogEventForJobs( m.DescriptorIDs = append(m.DescriptorIDs, uint32(id)) } m.Description = payload.Description + ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) // Delegate the storing of the event to the regular event logic. // @@ -350,7 +353,7 @@ func LogEventForJobs( // wraps the call in a db.Txn() callback, which confuses the vmodule // filtering. Easiest is to pretend the event is sourced here. return insertEventRecords( - ctx, execCfg.InternalExecutor, + ctx, ie.(*InternalExecutor), txn, int32(execCfg.NodeID.SQLInstanceID()), /* reporter ID */ 1, /* depth: use this function for vmodule filtering */ @@ -396,7 +399,7 @@ const ( // This converts to a call to insertEventRecords() with just 1 entry. func InsertEventRecord( ctx context.Context, - ex *InternalExecutor, + ex sqlutil.InternalExecutor, txn *kv.Txn, reportingID int32, dst LogEventDestination, @@ -424,7 +427,7 @@ func InsertEventRecord( // should be removed after v21.1 is released. func insertEventRecords( ctx context.Context, - ex *InternalExecutor, + ex sqlutil.InternalExecutor, txn *kv.Txn, reportingID int32, depth int, @@ -463,7 +466,8 @@ func insertEventRecords( } // If we only want to log externally and not write to the events table, early exit. - loggingToSystemTable := opts.dst.hasFlag(LogToSystemTable) && eventLogSystemTableEnabled.Get(&ex.s.cfg.Settings.SV) + ie := ex.(*InternalExecutor) + loggingToSystemTable := opts.dst.hasFlag(LogToSystemTable) && eventLogSystemTableEnabled.Get(&ie.s.cfg.Settings.SV) if !loggingToSystemTable { // Simply emit the events to their respective channels and call it a day. if opts.dst.hasFlag(LogExternally) { diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 142253e0b050..2ae7ebe16620 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1057,18 +1057,17 @@ type ExecutorConfig struct { NodesStatusServer serverpb.OptionalNodesStatusServer // SQLStatusServer gives access to a subset of the Status service and is // available when not running as a system tenant. - SQLStatusServer serverpb.SQLStatusServer - RegionsServer serverpb.RegionsServer - MetricsRecorder nodeStatusGenerator - SessionRegistry *SessionRegistry - SQLLiveness sqlliveness.Liveness - JobRegistry *jobs.Registry - VirtualSchemas *VirtualSchemaHolder - DistSQLPlanner *DistSQLPlanner - TableStatsCache *stats.TableStatisticsCache - StatsRefresher *stats.Refresher - InternalExecutor *InternalExecutor - QueryCache *querycache.C + SQLStatusServer serverpb.SQLStatusServer + RegionsServer serverpb.RegionsServer + MetricsRecorder nodeStatusGenerator + SessionRegistry *SessionRegistry + SQLLiveness sqlliveness.Liveness + JobRegistry *jobs.Registry + VirtualSchemas *VirtualSchemaHolder + DistSQLPlanner *DistSQLPlanner + TableStatsCache *stats.TableStatisticsCache + StatsRefresher *stats.Refresher + QueryCache *querycache.C SchemaChangerMetrics *SchemaChangerMetrics FeatureFlagMetrics *featureflag.DenialMetrics @@ -3062,7 +3061,8 @@ func DescsTxn( execCfg *ExecutorConfig, f func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error, ) error { - return execCfg.CollectionFactory.Txn(ctx, execCfg.InternalExecutor, execCfg.DB, f) + return execCfg.CollectionFactory.Txn(ctx, execCfg.InternalExecutorFactory(ctx, + func(ie sqlutil.InternalExecutor) {}), execCfg.DB, f) } // NewRowMetrics creates a row.Metrics struct for either internal or user diff --git a/pkg/sql/execstats/BUILD.bazel b/pkg/sql/execstats/BUILD.bazel index 8cd311dabde0..8f24eeb30fce 100644 --- a/pkg/sql/execstats/BUILD.bazel +++ b/pkg/sql/execstats/BUILD.bazel @@ -36,6 +36,7 @@ go_test( "//pkg/sql/execinfrapb", "//pkg/sql/sessiondata", "//pkg/sql/sessiondatapb", + "//pkg/sql/sqlutil", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", diff --git a/pkg/sql/execstats/traceanalyzer_test.go b/pkg/sql/execstats/traceanalyzer_test.go index d806aa39b61c..c84bfa11cea2 100644 --- a/pkg/sql/execstats/traceanalyzer_test.go +++ b/pkg/sql/execstats/traceanalyzer_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -108,7 +109,7 @@ func TestTraceAnalyzer(t *testing.T) { for _, vectorizeMode := range []sessiondatapb.VectorizeExecMode{sessiondatapb.VectorizeOff, sessiondatapb.VectorizeOn} { execCtx, finishAndCollect := tracing.ContextWithRecordingSpan(ctx, execCfg.AmbientCtx.Tracer, t.Name()) defer finishAndCollect() - ie := execCfg.InternalExecutor + ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) ie.SetSessionData( &sessiondata.SessionData{ SessionData: sessiondatapb.SessionData{ diff --git a/pkg/sql/explain_bundle.go b/pkg/sql/explain_bundle.go index 9572f046c1c5..682b978453fa 100644 --- a/pkg/sql/explain_bundle.go +++ b/pkg/sql/explain_bundle.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/memzipper" @@ -115,7 +116,7 @@ type diagnosticsBundle struct { func buildStatementBundle( ctx context.Context, db *kv.DB, - ie *InternalExecutor, + ie sqlutil.InternalExecutor, plan *planTop, planString string, trace tracing.Recording, @@ -173,7 +174,7 @@ func (bundle *diagnosticsBundle) insert( // stmtBundleBuilder is a helper for building a statement bundle. type stmtBundleBuilder struct { db *kv.DB - ie *InternalExecutor + ie sqlutil.InternalExecutor plan *planTop trace tracing.Recording @@ -184,7 +185,7 @@ type stmtBundleBuilder struct { func makeStmtBundleBuilder( db *kv.DB, - ie *InternalExecutor, + ie sqlutil.InternalExecutor, plan *planTop, trace tracing.Recording, placeholders *tree.PlaceholderInfo, @@ -424,10 +425,10 @@ func (b *stmtBundleBuilder) finalize() (*bytes.Buffer, error) { // schema, table statistics. type stmtEnvCollector struct { ctx context.Context - ie *InternalExecutor + ie sqlutil.InternalExecutor } -func makeStmtEnvCollector(ctx context.Context, ie *InternalExecutor) stmtEnvCollector { +func makeStmtEnvCollector(ctx context.Context, ie sqlutil.InternalExecutor) stmtEnvCollector { return stmtEnvCollector{ctx: ctx, ie: ie} } diff --git a/pkg/sql/grant_role.go b/pkg/sql/grant_role.go index 40e9c5563fbd..e8da95aee984 100644 --- a/pkg/sql/grant_role.go +++ b/pkg/sql/grant_role.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" ) @@ -175,9 +176,11 @@ func (n *GrantRoleNode) startExec(params runParams) error { } var rowsAffected int + ie := params.extendedEvalCtx.ExecCfg.InternalExecutorFactory(params.ctx, + func(ie sqlutil.InternalExecutor) {}) for _, r := range n.roles { for _, m := range n.members { - affected, err := params.extendedEvalCtx.ExecCfg.InternalExecutor.ExecEx( + affected, err := ie.ExecEx( params.ctx, opName, params.p.txn, diff --git a/pkg/sql/information_schema.go b/pkg/sql/information_schema.go index d6af6dc8c520..aeb2c1ea0d5c 100755 --- a/pkg/sql/information_schema.go +++ b/pkg/sql/information_schema.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/sql/vtable" "github.com/cockroachdb/cockroach/pkg/util/timeutil/pgdate" @@ -2586,7 +2587,8 @@ func forEachRole( // logic test fail in 3node-tenant config with 'txn already encountered an // error' (because of the context cancellation), so we buffer all roles // first. - rows, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryBuffered( + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + rows, err := ie.QueryBuffered( ctx, "read-roles", p.txn, query, ) if err != nil { @@ -2621,7 +2623,8 @@ func forEachRoleMembership( ctx context.Context, p *planner, fn func(role, member security.SQLUsername, isAdmin bool) error, ) (retErr error) { query := `SELECT "role", "member", "isAdmin" FROM system.role_members` - it, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryIterator( + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + it, err := ie.QueryIterator( ctx, "read-members", p.txn, query, ) if err != nil { diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index a8d392c23710..aef96056891b 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessionphase" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/buildutil" @@ -312,7 +313,7 @@ func (ih *instrumentationHelper) Finish( var bundle diagnosticsBundle if ih.collectBundle { - ie := p.extendedEvalCtx.ExecCfg.InternalExecutor + ie := p.extendedEvalCtx.ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) phaseTimes := statsCollector.PhaseTimes() if ih.stmtDiagnosticsRecorder.IsExecLatencyConditionMet( ih.diagRequestID, ih.diagRequest, phaseTimes.GetServiceLatencyNoOverhead(), diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index f6c1a0235428..318a4a0d9cb7 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -116,8 +116,8 @@ func MakeInternalExecutor( } // SetSessionData binds the session variables that will be used by queries -// performed through this executor from now on. This creates a new session stack. -// It is recommended to use SetSessionDataStack. +// performed through this executor from now on. This creates a new session +// stack. // // SetSessionData cannot be called concurrently with query execution. func (ie *InternalExecutor) SetSessionData(sessionData *sessiondata.SessionData) { diff --git a/pkg/sql/join_token.go b/pkg/sql/join_token.go index 722af85696d3..a12606c1617a 100644 --- a/pkg/sql/join_token.go +++ b/pkg/sql/join_token.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -61,7 +62,8 @@ func (p *planner) CreateJoinToken(ctx context.Context) (string, error) { } expiration := timeutil.Now().Add(security.JoinTokenExpiration) err = p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - _, err = p.ExecCfg().InternalExecutor.Exec( + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + _, err = ie.Exec( ctx, "insert-join-token", txn, "insert into system.join_tokens(id, secret, expiration) "+ "values($1, $2, $3)", diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 1832a11f6ca1..6de79d9d6960 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/span" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/encoding" @@ -1159,8 +1160,9 @@ func (ef *execFactory) showEnv(plan string, envOpts exec.ExplainEnvData) (exec.N ie := ef.planner.extendedEvalCtx.ExecCfg.InternalExecutorFactory( ef.planner.EvalContext().Context, - ef.planner.SessionData(), - ) + func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(ef.planner.SessionData()) + }) c := makeStmtEnvCollector(ef.planner.EvalContext().Context, ie.(*InternalExecutor)) // Show the version of Cockroach running. diff --git a/pkg/sql/pg_catalog.go b/pkg/sql/pg_catalog.go index 49d5ec915fa3..d25c540ae3db 100644 --- a/pkg/sql/pg_catalog.go +++ b/pkg/sql/pg_catalog.go @@ -40,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/sql/vtable" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -1478,7 +1479,8 @@ https://www.postgresql.org/docs/9.5/catalog-pg-depend.html`, // as a datum row, containing object id, sub id (column id in the case of // columns), comment text, and comment type (keys.FooCommentType). func getComments(ctx context.Context, p *planner) ([]tree.Datums, error) { - return p.extendedEvalCtx.ExecCfg.InternalExecutor.QueryBuffered( + ie := p.extendedEvalCtx.ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + return ie.QueryBuffered( ctx, "select-comments", p.EvalContext().Txn, @@ -3144,7 +3146,8 @@ var pgCatalogDbRoleSettingTable = virtualSchemaTable{ https://www.postgresql.org/docs/13/catalog-pg-db-role-setting.html`, schema: vtable.PgCatalogDbRoleSetting, populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error { - rows, err := p.extendedEvalCtx.ExecCfg.InternalExecutor.QueryBufferedEx( + ie := p.extendedEvalCtx.ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + rows, err := ie.QueryBufferedEx( ctx, "select-db-role-settings", p.EvalContext().Txn, @@ -3396,7 +3399,8 @@ https://www.postgresql.org/docs/13/catalog-pg-statistic-ext.html`, schema: vtable.PgCatalogStatisticExt, populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error { query := `SELECT "statisticID", name, "tableID", "columnIDs" FROM system.table_statistics;` - rows, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryBuffered( + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + rows, err := ie.QueryBuffered( ctx, "read-statistics-objects", p.txn, query, ) if err != nil { diff --git a/pkg/sql/pgwire/BUILD.bazel b/pkg/sql/pgwire/BUILD.bazel index 5544eca765e6..ef0f1640bc12 100644 --- a/pkg/sql/pgwire/BUILD.bazel +++ b/pkg/sql/pgwire/BUILD.bazel @@ -40,6 +40,7 @@ go_library( "//pkg/sql/sem/tree", "//pkg/sql/sessiondatapb", "//pkg/sql/sqltelemetry", + "//pkg/sql/sqlutil", "//pkg/sql/types", "//pkg/util", "//pkg/util/contextutil", diff --git a/pkg/sql/pgwire/auth.go b/pkg/sql/pgwire/auth.go index 6b38cda556f2..54d1fa7b59c8 100644 --- a/pkg/sql/pgwire/auth.go +++ b/pkg/sql/pgwire/auth.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/errors" @@ -59,7 +60,7 @@ type authOptions struct { identMap *identmap.Conf // ie is the server-wide internal executor, used to // retrieve entries from system.users. - ie *sql.InternalExecutor + ie sqlutil.InternalExecutor // The following fields are only used by tests. @@ -143,7 +144,7 @@ func (c *conn) handleAuthentication( sql.GetUserSessionInitInfo( ctx, execCfg, - authOpt.ie, + authOpt.ie.(*sql.InternalExecutor), dbUser, c.sessionArgs.SessionDefaults["database"], ) diff --git a/pkg/sql/pgwire/server.go b/pkg/sql/pgwire/server.go index 2b143c263c5f..5b628188d30d 100644 --- a/pkg/sql/pgwire/server.go +++ b/pkg/sql/pgwire/server.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" @@ -710,7 +711,7 @@ func (s *Server) ServeConn(ctx context.Context, conn net.Conn, socketType Socket connType: connType, connDetails: connDetails, insecure: s.cfg.Insecure, - ie: s.execCfg.InternalExecutor, + ie: s.execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}), auth: hbaConf, identMap: identMap, testingAuthHook: testingAuthHook, diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index b9ab21449576..a0fee259eab6 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/envutil" @@ -420,11 +421,12 @@ func internalExtendedEvalCtx( var indexUsageStats *idxusage.LocalIndexUsageStats var sqlStatsController tree.SQLStatsController var indexUsageStatsController tree.IndexUsageStatsController - if execCfg.InternalExecutor != nil { - if execCfg.InternalExecutor.s != nil { - indexUsageStats = execCfg.InternalExecutor.s.indexUsageStats - sqlStatsController = execCfg.InternalExecutor.s.sqlStatsController - indexUsageStatsController = execCfg.InternalExecutor.s.indexUsageStatsController + if execCfg.InternalExecutorFactory != nil { + ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}).(*InternalExecutor) + if ie.s != nil { + indexUsageStats = ie.s.indexUsageStats + sqlStatsController = ie.s.sqlStatsController + indexUsageStatsController = ie.s.indexUsageStatsController } else { // If the indexUsageStats is nil from the sql.Server, we create a dummy // index usage stats collector. The sql.Server in the ExecutorConfig @@ -825,7 +827,9 @@ func (p *planner) QueryRowEx( stmt string, qargs ...interface{}, ) (tree.Datums, error) { - ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(p.SessionData()) + }) return ie.QueryRowEx(ctx, opName, txn, override, stmt, qargs...) } @@ -843,7 +847,9 @@ func (p *planner) QueryIteratorEx( stmt string, qargs ...interface{}, ) (tree.InternalRows, error) { - ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(p.SessionData()) + }) rows, err := ie.QueryIteratorEx(ctx, opName, txn, override, stmt, qargs...) return rows.(tree.InternalRows), err } diff --git a/pkg/sql/privileged_accessor.go b/pkg/sql/privileged_accessor.go index 3db6d6deb73f..c195c9c2d22c 100644 --- a/pkg/sql/privileged_accessor.go +++ b/pkg/sql/privileged_accessor.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/errors" ) @@ -31,11 +32,12 @@ import ( func (p *planner) LookupNamespaceID( ctx context.Context, parentID int64, name string, ) (tree.DInt, bool, error) { + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) query := fmt.Sprintf( `SELECT id FROM [%d AS namespace] WHERE "parentID" = $1 AND "parentSchemaID" IN (0, 29) AND name = $2`, keys.NamespaceTableID, ) - r, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryRowEx( + r, err := ie.QueryRowEx( ctx, "crdb-internal-get-descriptor-id", p.txn, @@ -65,8 +67,9 @@ func (p *planner) LookupZoneConfigByNamespaceID( return "", false, err } + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) const query = `SELECT config FROM system.zones WHERE id = $1` - r, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryRowEx( + r, err := ie.QueryRowEx( ctx, "crdb-internal-get-zone", p.txn, diff --git a/pkg/sql/region_util.go b/pkg/sql/region_util.go index 939d6074fd39..4016f5c3f0fc 100644 --- a/pkg/sql/region_util.go +++ b/pkg/sql/region_util.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/errors" "github.com/gogo/protobuf/proto" ) @@ -70,7 +71,8 @@ func GetLiveClusterRegions(ctx context.Context, p PlanHookState) (LiveClusterReg User: security.RootUserName(), } - it, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryIteratorEx( + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + it, err := ie.QueryIteratorEx( ctx, "get_live_cluster_regions", p.ExtendedEvalContext().Txn, @@ -758,7 +760,8 @@ func applyZoneConfigForMultiRegionDatabase( ) // If the new zone config is the same as a blank zone config, delete it. if newZoneConfig.Equal(zonepb.NewZoneConfig()) { - _, err = execConfig.InternalExecutor.Exec( + ie := execConfig.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + _, err = ie.Exec( ctx, "delete-zone-multiregion-database", txn, diff --git a/pkg/sql/resolve_oid.go b/pkg/sql/resolve_oid.go index 177cf5e370e4..c47b8e3ac1fc 100644 --- a/pkg/sql/resolve_oid.go +++ b/pkg/sql/resolve_oid.go @@ -29,7 +29,9 @@ import ( func (p *planner) ResolveOIDFromString( ctx context.Context, resultType *types.T, toResolve *tree.DString, ) (*tree.DOid, error) { - ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(p.SessionData()) + }) return resolveOID( ctx, p.Txn(), ie, @@ -41,7 +43,9 @@ func (p *planner) ResolveOIDFromString( func (p *planner) ResolveOIDFromOID( ctx context.Context, resultType *types.T, toResolve *tree.DOid, ) (*tree.DOid, error) { - ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(p.SessionData()) + }) return resolveOID( ctx, p.Txn(), ie, diff --git a/pkg/sql/revoke_role.go b/pkg/sql/revoke_role.go index cb70ec38647f..0ee5eaeb8cf4 100644 --- a/pkg/sql/revoke_role.go +++ b/pkg/sql/revoke_role.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" ) @@ -131,7 +132,9 @@ func (n *RevokeRoleNode) startExec(params runParams) error { "role/user %s cannot be removed from role %s or lose the ADMIN OPTION", security.RootUser, security.AdminRole) } - affected, err := params.extendedEvalCtx.ExecCfg.InternalExecutor.ExecEx( + ie := params.extendedEvalCtx.ExecCfg.InternalExecutorFactory(params.ctx, + func(ie sqlutil.InternalExecutor) {}) + affected, err := ie.ExecEx( params.ctx, opName, params.p.txn, diff --git a/pkg/sql/save_table.go b/pkg/sql/save_table.go index f2d091dbf666..7ab704bc1d19 100644 --- a/pkg/sql/save_table.go +++ b/pkg/sql/save_table.go @@ -15,6 +15,7 @@ import ( "fmt" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/errors" ) @@ -69,7 +70,9 @@ func (n *saveTableNode) startExec(params runParams) error { create.Defs = append(create.Defs, def) } - _, err := params.p.ExtendedEvalContext().ExecCfg.InternalExecutor.Exec( + ie := params.p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(params.ctx, + func(ie sqlutil.InternalExecutor) {}) + _, err := ie.Exec( params.ctx, "create save table", nil, /* txn */ @@ -82,7 +85,9 @@ func (n *saveTableNode) startExec(params runParams) error { func (n *saveTableNode) issue(params runParams) error { if v := &n.run.vals; len(v.Rows) > 0 { stmt := fmt.Sprintf("INSERT INTO %s %s", n.target.String(), v.String()) - if _, err := params.p.ExtendedEvalContext().ExecCfg.InternalExecutor.Exec( + ie := params.p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(params.ctx, + func(ie sqlutil.InternalExecutor) {}) + if _, err := ie.Exec( params.ctx, "insert into save table", nil, /* txn */ diff --git a/pkg/sql/schema_change_plan_node.go b/pkg/sql/schema_change_plan_node.go index 8dd6d5f4fabb..45f3c28cf93b 100644 --- a/pkg/sql/schema_change_plan_node.go +++ b/pkg/sql/schema_change_plan_node.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scsqldeps" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -94,7 +95,7 @@ func (p *planner) WaitForDescriptorSchemaChanges( } blocked := false if err := p.ExecCfg().CollectionFactory.Txn( - ctx, p.ExecCfg().InternalExecutor, p.ExecCfg().DB, + ctx, p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}), p.ExecCfg().DB, func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { if err := txn.SetFixedTimestamp(ctx, now); err != nil { return err diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 85ce65e56fb3..823528c4a375 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -128,9 +128,9 @@ func NewSchemaChangerForTesting( // Note that this doesn't end up actually being session-bound but that's // good enough for testing. ieFactory: func( - ctx context.Context, sd *sessiondata.SessionData, + ctx context.Context, initInternalExecutor func(sqlutil.InternalExecutor), ) sqlutil.InternalExecutor { - return execCfg.InternalExecutor + return execCfg.InternalExecutorFactory(ctx, initInternalExecutor) }, metrics: NewSchemaChangerMetrics(), clock: db.Clock(), @@ -586,8 +586,9 @@ func (sc *SchemaChanger) exec(ctx context.Context) error { // If there are any names to drain, then drain them. if len(desc.GetDrainingNames()) > 0 { + ie := sc.execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) if err := drainNamesForDescriptor( - ctx, desc.GetID(), sc.execCfg.CollectionFactory, sc.db, sc.execCfg.InternalExecutor, + ctx, desc.GetID(), sc.execCfg.CollectionFactory, sc.db, ie, sc.execCfg.Codec, sc.testingKnobs.OldNamesDrainedNotification, ); err != nil { return err @@ -1040,7 +1041,8 @@ func (sc *SchemaChanger) done(ctx context.Context) error { // descriptor updates are published. var didUpdate bool var depMutationJobs []jobspb.JobID - err := sc.execCfg.CollectionFactory.Txn(ctx, sc.execCfg.InternalExecutor, sc.db, func( + ie := sc.execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + err := sc.execCfg.CollectionFactory.Txn(ctx, ie, sc.db, func( ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ) error { var err error @@ -1338,7 +1340,7 @@ func (sc *SchemaChanger) done(ctx context.Context) error { // If any operations was skipped because a mutation was made // redundant due to a column getting dropped later on then we should // wait for those jobs to complete before returning our result back. - if err := sc.jobRegistry.WaitForJobs(ctx, sc.execCfg.InternalExecutor, depMutationJobs); err != nil { + if err := sc.jobRegistry.WaitForJobs(ctx, ie, depMutationJobs); err != nil { return errors.Wrap(err, "A dependent transaction failed for this schema change") } @@ -1930,7 +1932,8 @@ func (*SchemaChangerTestingKnobs) ModuleTestingKnobs() {} func (sc *SchemaChanger) txn( ctx context.Context, f func(context.Context, *kv.Txn, *descs.Collection) error, ) error { - return sc.execCfg.CollectionFactory.Txn(ctx, sc.execCfg.InternalExecutor, sc.db, f) + return sc.execCfg.CollectionFactory.Txn(ctx, sc.execCfg.InternalExecutorFactory(ctx, + func(ie sqlutil.InternalExecutor) {}), sc.db, f) } // createSchemaChangeEvalCtx creates an extendedEvalContext() to be used for backfills. @@ -1949,7 +1952,9 @@ func createSchemaChangeEvalCtx( ) extendedEvalContext { sd := NewFakeSessionData(execCfg.SV()) - ie := ieFactory(ctx, sd) + ie := ieFactory(ctx, func(ex sqlutil.InternalExecutor) { + ex.SetSessionData(sd) + }) evalCtx := extendedEvalContext{ // Make a session tracing object on-the-fly. This is OK @@ -2057,8 +2062,8 @@ func (r schemaChangeResumer) Resume(ctx context.Context, execCtx interface{}) er clock: p.ExecCfg().Clock, settings: p.ExecCfg().Settings, execCfg: p.ExecCfg(), - ieFactory: func(ctx context.Context, sd *sessiondata.SessionData) sqlutil.InternalExecutor { - return r.job.MakeSessionBoundInternalExecutor(ctx, sd) + ieFactory: func(ctx context.Context, initInternalExecutor func(ie sqlutil.InternalExecutor)) sqlutil.InternalExecutor { + return r.job.MakeSessionBoundInternalExecutor(ctx, initInternalExecutor) }, metrics: p.ExecCfg().SchemaChangerMetrics, } @@ -2243,8 +2248,8 @@ func (r schemaChangeResumer) OnFailOrCancel(ctx context.Context, execCtx interfa clock: p.ExecCfg().Clock, settings: p.ExecCfg().Settings, execCfg: p.ExecCfg(), - ieFactory: func(ctx context.Context, sd *sessiondata.SessionData) sqlutil.InternalExecutor { - return r.job.MakeSessionBoundInternalExecutor(ctx, sd) + ieFactory: func(ctx context.Context, initInternalExecutor func(executor sqlutil.InternalExecutor)) sqlutil.InternalExecutor { + return r.job.MakeSessionBoundInternalExecutor(ctx, initInternalExecutor) }, } diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index c7425d36f799..efe1c7a1c0ae 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -43,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/gcjob" "github.com/cockroachdb/cockroach/pkg/sql/sqltestutils" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/startupmigrations" @@ -98,7 +99,7 @@ func TestSchemaChangeProcess(t *testing.T) { execCfg.NodeID, execCfg.DB, execCfg.Clock, - execCfg.InternalExecutor, + execCfg.InternalExecutorFactory(context.Background(), func(ie sqlutil.InternalExecutor) {}), execCfg.Settings, execCfg.Codec, lease.ManagerTestingKnobs{}, diff --git a/pkg/sql/schemachanger/scjob/BUILD.bazel b/pkg/sql/schemachanger/scjob/BUILD.bazel index 9f73cbe91bda..9394a6c56812 100644 --- a/pkg/sql/schemachanger/scjob/BUILD.bazel +++ b/pkg/sql/schemachanger/scjob/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//pkg/sql/schemachanger/scpb", "//pkg/sql/schemachanger/scrun", "//pkg/sql/schemachanger/scsqldeps", + "//pkg/sql/sqlutil", "//pkg/util/log/eventpb", ], ) diff --git a/pkg/sql/schemachanger/scjob/job.go b/pkg/sql/schemachanger/scjob/job.go index 706f4332b2bd..4f71a1585f34 100644 --- a/pkg/sql/schemachanger/scjob/job.go +++ b/pkg/sql/schemachanger/scjob/job.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scrun" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scsqldeps" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" ) @@ -70,7 +71,7 @@ func (n *newSchemaChangeResumer) run(ctx context.Context, execCtxI interface{}) deps := scdeps.NewJobRunDependencies( execCfg.CollectionFactory, execCfg.DB, - execCfg.InternalExecutor, + execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}), execCfg.IndexBackfiller, func(ctx context.Context, txn *kv.Txn, depth int, descID descpb.ID, metadata scpb.ElementMetadata, event eventpb.EventPayload) error { return sql.LogEventForSchemaChanger(ctx, execCtx.ExecCfg(), txn, depth, descID, metadata, event) diff --git a/pkg/sql/schemachanger/scsqldeps/index_validator.go b/pkg/sql/schemachanger/scsqldeps/index_validator.go index 2a1a414c946f..155c059a7840 100644 --- a/pkg/sql/schemachanger/scsqldeps/index_validator.go +++ b/pkg/sql/schemachanger/scsqldeps/index_validator.go @@ -75,7 +75,9 @@ func (iv indexValidator) ValidateForwardIndexes( if err != nil { return err } - return fn(ctx, validationTxn, iv.ieFactory(ctx, iv.newFakeSessionData(&iv.settings.SV))) + return fn(ctx, validationTxn, iv.ieFactory(ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(iv.newFakeSessionData(&iv.settings.SV)) + })) } return iv.validateForwardIndexes(ctx, tableDesc, indexes, txnRunner, withFirstMutationPublic, gatherAllInvalid, override) } @@ -95,7 +97,9 @@ func (iv indexValidator) ValidateInvertedIndexes( if err != nil { return err } - return fn(ctx, validationTxn, iv.ieFactory(ctx, iv.newFakeSessionData(&iv.settings.SV))) + return fn(ctx, validationTxn, iv.ieFactory(ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(iv.newFakeSessionData(&iv.settings.SV)) + })) } return iv.validateInvertedIndexes(ctx, iv.codec, tableDesc, indexes, txnRunner, gatherAllInvalid, override) } diff --git a/pkg/sql/scrub_constraint.go b/pkg/sql/scrub_constraint.go index 72f46ff99cf2..4a69e0b0b42b 100644 --- a/pkg/sql/scrub_constraint.go +++ b/pkg/sql/scrub_constraint.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/scrub" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -98,7 +99,8 @@ func (o *sqlCheckConstraintCheckOperation) Start(params runParams) error { } } - rows, err := params.extendedEvalCtx.ExecCfg.InternalExecutor.QueryBuffered( + ie := params.extendedEvalCtx.ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + rows, err := ie.QueryBuffered( ctx, "check-constraint", params.p.txn, tree.AsStringWithFlags(sel, tree.FmtParsable), ) if err != nil { diff --git a/pkg/sql/scrub_fk.go b/pkg/sql/scrub_fk.go index 14e1ef4153f7..2e66e3a27857 100644 --- a/pkg/sql/scrub_fk.go +++ b/pkg/sql/scrub_fk.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/scrub" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -74,7 +75,8 @@ func (o *sqlForeignKeyCheckOperation) Start(params runParams) error { return err } - rows, err := params.extendedEvalCtx.ExecCfg.InternalExecutor.QueryBuffered( + ie := params.extendedEvalCtx.ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + rows, err := ie.QueryBuffered( ctx, "scrub-fk", params.p.txn, checkQuery, ) if err != nil { @@ -93,7 +95,7 @@ func (o *sqlForeignKeyCheckOperation) Start(params runParams) error { if err != nil { return err } - rows, err := params.extendedEvalCtx.ExecCfg.InternalExecutor.QueryBuffered( + rows, err := ie.QueryBuffered( ctx, "scrub-fk", params.p.txn, checkNullsQuery, ) if err != nil { diff --git a/pkg/sql/scrub_index.go b/pkg/sql/scrub_index.go index 6e8756ff1912..1be4e3d3eb2d 100644 --- a/pkg/sql/scrub_index.go +++ b/pkg/sql/scrub_index.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/scrub" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -116,7 +117,8 @@ func (o *indexCheckOperation) Start(params runParams) error { colNames(pkColumns), colNames(otherColumns), o.tableDesc.GetID(), o.index.GetID(), ) - rows, err := params.extendedEvalCtx.ExecCfg.InternalExecutor.QueryBuffered( + ie := params.extendedEvalCtx.ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + rows, err := ie.QueryBuffered( ctx, "scrub-index", params.p.txn, checkQuery, ) if err != nil { diff --git a/pkg/sql/set_cluster_setting.go b/pkg/sql/set_cluster_setting.go index af3da8cf7241..9bc6640c7f43 100644 --- a/pkg/sql/set_cluster_setting.go +++ b/pkg/sql/set_cluster_setting.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sessioninit" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" @@ -173,11 +174,12 @@ func (n *setClusterSettingNode) startExec(params runParams) error { execCfg := params.extendedEvalCtx.ExecCfg var expectedEncodedValue string if err := execCfg.DB.Txn(params.ctx, func(ctx context.Context, txn *kv.Txn) error { + ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) var reportedValue string if n.value == nil { reportedValue = "DEFAULT" expectedEncodedValue = n.setting.EncodedDefault() - if _, err := execCfg.InternalExecutor.ExecEx( + if _, err := ie.ExecEx( ctx, "reset-setting", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, "DELETE FROM system.settings WHERE name = $1", n.name, @@ -193,7 +195,7 @@ func (n *setClusterSettingNode) startExec(params runParams) error { var prev tree.Datum _, isSetVersion := n.setting.(*settings.VersionSetting) if isSetVersion { - datums, err := execCfg.InternalExecutor.QueryRowEx( + datums, err := ie.QueryRowEx( ctx, "retrieve-prev-setting", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, "SELECT value FROM system.settings WHERE name = $1", n.name, @@ -244,7 +246,7 @@ func (n *setClusterSettingNode) startExec(params runParams) error { } return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { // Confirm if the version has actually changed on us. - datums, err := execCfg.InternalExecutor.QueryRowEx( + datums, err := ie.QueryRowEx( ctx, "retrieve-prev-setting", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, "SELECT value FROM system.settings WHERE name = $1", n.name, @@ -260,7 +262,7 @@ func (n *setClusterSettingNode) startExec(params runParams) error { } // Only if the version has changed alter the setting. if versionIsDifferent { - _, err = execCfg.InternalExecutor.ExecEx( + _, err = ie.ExecEx( ctx, "update-setting", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, `UPSERT INTO system.settings (name, value, "lastUpdated", "valueType") VALUES ($1, $2, now(), $3)`, @@ -276,7 +278,7 @@ func (n *setClusterSettingNode) startExec(params runParams) error { return err } } else { - if _, err = execCfg.InternalExecutor.ExecEx( + if _, err = ie.ExecEx( ctx, "update-setting", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, `UPSERT INTO system.settings (name, value, "lastUpdated", "valueType") VALUES ($1, $2, now(), $3)`, diff --git a/pkg/sql/set_zone_config.go b/pkg/sql/set_zone_config.go index 10b060d268f8..ac95c20a54fc 100644 --- a/pkg/sql/set_zone_config.go +++ b/pkg/sql/set_zone_config.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/errorutil" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" @@ -1085,11 +1086,12 @@ func writeZoneConfig( func writeZoneConfigUpdate( ctx context.Context, txn *kv.Txn, execCfg *ExecutorConfig, update *zoneConfigUpdate, ) (numAffected int, _ error) { + ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) if update.value == nil { - return execCfg.InternalExecutor.Exec(ctx, "delete-zone", txn, + return ie.Exec(ctx, "delete-zone", txn, "DELETE FROM system.zones WHERE id = $1", update.id) } - return execCfg.InternalExecutor.Exec(ctx, "update-zone", txn, + return ie.Exec(ctx, "update-zone", txn, "UPSERT INTO system.zones (id, config) VALUES ($1, $2)", update.id, update.value) } diff --git a/pkg/sql/show_cluster_setting.go b/pkg/sql/show_cluster_setting.go index eb9e84092826..3d0c585fe501 100644 --- a/pkg/sql/show_cluster_setting.go +++ b/pkg/sql/show_cluster_setting.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/duration" @@ -54,7 +55,8 @@ func (p *planner) showVersionSetting( // The (slight ab)use of WithMaxAttempts achieves convenient context cancellation. return retry.WithMaxAttempts(ctx, retry.Options{}, math.MaxInt32, func() error { return p.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - datums, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryRowEx( + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + datums, err := ie.QueryRowEx( ctx, "read-setting", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, diff --git a/pkg/sql/show_create_clauses.go b/pkg/sql/show_create_clauses.go index 07e668f61814..7d1459873e87 100644 --- a/pkg/sql/show_create_clauses.go +++ b/pkg/sql/show_create_clauses.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -48,7 +49,8 @@ func selectComment(ctx context.Context, p PlanHookState, tableID descpb.ID) (tc query := fmt.Sprintf("SELECT type, object_id, sub_id, comment FROM system.comments WHERE object_id = %d", tableID) txn := p.ExtendedEvalContext().Txn - it, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryIterator( + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + it, err := ie.QueryIterator( ctx, "show-tables-with-comment", txn, query) if err != nil { log.VEventf(ctx, 1, "%q", err) diff --git a/pkg/sql/show_create_schedule.go b/pkg/sql/show_create_schedule.go index 55af3f2dc337..02819d4f2d36 100644 --- a/pkg/sql/show_create_schedule.go +++ b/pkg/sql/show_create_schedule.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" ) @@ -43,13 +44,14 @@ func loadSchedules(params runParams, n *tree.ShowCreateSchedules) ([]*jobs.Sched var rows []tree.Datums var cols colinfo.ResultColumns + ie := params.ExecCfg().InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {}) if n.ScheduleID != nil { sjID, err := strconv.Atoi(n.ScheduleID.String()) if err != nil { return nil, err } - datums, columns, err := params.ExecCfg().InternalExecutor.QueryRowExWithCols( + datums, columns, err := ie.QueryRowExWithCols( params.ctx, "load-schedules", params.EvalContext().Txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, @@ -61,7 +63,7 @@ func loadSchedules(params runParams, n *tree.ShowCreateSchedules) ([]*jobs.Sched rows = append(rows, datums) cols = columns } else { - datums, columns, err := params.ExecCfg().InternalExecutor.QueryBufferedExWithCols( + datums, columns, err := ie.QueryBufferedExWithCols( params.ctx, "load-schedules", params.EvalContext().Txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, @@ -114,7 +116,8 @@ func (p *planner) ShowCreateSchedule( } createStmtStr, err := ex.GetCreateScheduleStatement(ctx, - scheduledjobs.ProdJobSchedulerEnv, p.Txn(), sj, p.ExecCfg().InternalExecutor) + scheduledjobs.ProdJobSchedulerEnv, p.Txn(), sj, + p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})) if err != nil { return nil, err } diff --git a/pkg/sql/show_fingerprints.go b/pkg/sql/show_fingerprints.go index c84003a06ab3..5845205ac86e 100644 --- a/pkg/sql/show_fingerprints.go +++ b/pkg/sql/show_fingerprints.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/errors" ) @@ -155,7 +156,9 @@ func (n *showFingerprintsNode) Next(params runParams) (bool, error) { sql = sql + " AS OF SYSTEM TIME " + ts.AsOfSystemTime() } - fingerprintCols, err := params.extendedEvalCtx.ExecCfg.InternalExecutor.QueryRowEx( + ie := params.extendedEvalCtx.ExecCfg.InternalExecutorFactory(params.ctx, + func(ie sqlutil.InternalExecutor) {}) + fingerprintCols, err := ie.QueryRowEx( params.ctx, "hash-fingerprint", params.p.txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, diff --git a/pkg/sql/show_histogram.go b/pkg/sql/show_histogram.go index 617f25c91876..5cd2cdf78f59 100644 --- a/pkg/sql/show_histogram.go +++ b/pkg/sql/show_histogram.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -44,7 +45,8 @@ func (p *planner) ShowHistogram(ctx context.Context, n *tree.ShowHistogram) (pla columns: showHistogramColumns, constructor: func(ctx context.Context, p *planner) (planNode, error) { - row, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryRowEx( + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + row, err := ie.QueryRowEx( ctx, "read-histogram", p.txn, diff --git a/pkg/sql/show_stats.go b/pkg/sql/show_stats.go index b34c1df56308..c0e2a26e7a71 100644 --- a/pkg/sql/show_stats.go +++ b/pkg/sql/show_stats.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/errorutil" @@ -67,7 +68,9 @@ func (p *planner) ShowTableStats(ctx context.Context, n *tree.ShowTableStats) (p // "handle" which can be used with SHOW HISTOGRAM. // TODO(yuzefovich): refactor the code to use the iterator API // (currently it is not possible due to a panic-catcher below). - rows, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryBuffered( + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, + func(ie sqlutil.InternalExecutor) {}) + rows, err := ie.QueryBuffered( ctx, "read-table-stats", p.txn, diff --git a/pkg/sql/sqlstats/sslocal/sslocal_provider.go b/pkg/sql/sqlstats/sslocal/sslocal_provider.go index c79d7314b88d..6e0987a896c9 100644 --- a/pkg/sql/sqlstats/sslocal/sslocal_provider.go +++ b/pkg/sql/sqlstats/sslocal/sslocal_provider.go @@ -15,13 +15,11 @@ import ( "sort" "time" - "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/ssmemstorage" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/mon" @@ -50,9 +48,7 @@ var _ sqlstats.Provider = &SQLStats{} // GetController returns a sqlstats.Controller responsible for the current // SQLStats. -func (s *SQLStats) GetController( - server serverpb.SQLStatusServer, db *kv.DB, ie sqlutil.InternalExecutor, -) *Controller { +func (s *SQLStats) GetController(server serverpb.SQLStatusServer) *Controller { return NewController(s, server) } diff --git a/pkg/sql/sqlutil/internal_executor.go b/pkg/sql/sqlutil/internal_executor.go index fedcc0d28440..2ccea60490bb 100644 --- a/pkg/sql/sqlutil/internal_executor.go +++ b/pkg/sql/sqlutil/internal_executor.go @@ -118,6 +118,17 @@ type InternalExecutor interface { qargs ...interface{}, ) ([]tree.Datums, error) + // QueryBufferedExWithCols is like QueryBufferedEx, additionally returning the + // computed ResultColumns of the input query. + QueryBufferedExWithCols( + ctx context.Context, + opName string, + txn *kv.Txn, + session sessiondata.InternalExecutorOverride, + stmt string, + qargs ...interface{}, + ) ([]tree.Datums, colinfo.ResultColumns, error) + // QueryIterator executes the query, returning an iterator that can be used // to get the results. If the call is successful, the returned iterator // *must* be closed. @@ -158,6 +169,13 @@ type InternalExecutor interface { WithSyntheticDescriptors( descs []catalog.Descriptor, run func() error, ) error + + // SetSessionData binds the session variables that will be used by queries + // performed through this executor from now on. This creates a new session + // stack. + // + // SetSessionData cannot be called concurrently with query execution. + SetSessionData(*sessiondata.SessionData) } // InternalRows is an iterator interface that's exposed by the internal @@ -194,7 +212,7 @@ type InternalRows interface { // SessionBoundInternalExecutorFactory is a function that produces a "session // bound" internal executor. type SessionBoundInternalExecutorFactory func( - context.Context, *sessiondata.SessionData, + context.Context, func(InternalExecutor), ) InternalExecutor // InternalExecFn is the type of functions that operates using an internalExecutor. diff --git a/pkg/sql/temporary_schema.go b/pkg/sql/temporary_schema.go index 007359720d00..9975d64940b9 100644 --- a/pkg/sql/temporary_schema.go +++ b/pkg/sql/temporary_schema.go @@ -582,7 +582,9 @@ func (c *TemporaryObjectCleaner) doTemporaryObjectCleanup( } // Clean up temporary data for inactive sessions. - ie := c.makeSessionBoundInternalExecutor(ctx, &sessiondata.SessionData{}) + ie := c.makeSessionBoundInternalExecutor(ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(&sessiondata.SessionData{}) + }) for sessionID := range sessionIDs { if _, ok := activeSessions[sessionID.Uint128]; !ok { log.Eventf(ctx, "cleaning up temporary object for session %q", sessionID) diff --git a/pkg/sql/tenant.go b/pkg/sql/tenant.go index d28f1634f29d..abb0a82a8b98 100644 --- a/pkg/sql/tenant.go +++ b/pkg/sql/tenant.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -81,8 +82,9 @@ func CreateTenantRecord( return err } + ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) // Insert into the tenant table and detect collisions. - if num, err := execCfg.InternalExecutor.ExecEx( + if num, err := ie.ExecEx( ctx, "create-tenant", txn, sessiondata.NodeUserSessionDataOverride, `INSERT INTO system.tenants (id, active, info) VALUES ($1, $2, $3)`, tenID, active, infoBytes, @@ -100,7 +102,7 @@ func CreateTenantRecord( if err != nil { return errors.Wrap(err, "marshaling tenant usage data") } - if num, err := execCfg.InternalExecutor.ExecEx( + if num, err := ie.ExecEx( ctx, "create-tenant-usage", txn, sessiondata.NodeUserSessionDataOverride, `INSERT INTO system.tenant_usage ( tenant_id, instance_id, next_instance_id, last_update, @@ -129,7 +131,8 @@ func CreateTenantRecord( func GetTenantRecord( ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, tenID uint64, ) (*descpb.TenantInfo, error) { - row, err := execCfg.InternalExecutor.QueryRowEx( + ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + row, err := ie.QueryRowEx( ctx, "activate-tenant", txn, sessiondata.NodeUserSessionDataOverride, `SELECT info FROM system.tenants WHERE id = $1`, tenID, ) @@ -158,7 +161,8 @@ func updateTenantRecord( return err } - if num, err := execCfg.InternalExecutor.ExecEx( + ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + if num, err := ie.ExecEx( ctx, "activate-tenant", txn, sessiondata.NodeUserSessionDataOverride, `UPDATE system.tenants SET active = $2, info = $3 WHERE id = $1`, tenID, active, infoBytes, @@ -372,8 +376,9 @@ func GCTenantSync(ctx context.Context, execCfg *ExecutorConfig, info *descpb.Ten return errors.Wrap(err, "clear tenant") } + ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - if num, err := execCfg.InternalExecutor.ExecEx( + if num, err := ie.ExecEx( ctx, "delete-tenant", txn, sessiondata.NodeUserSessionDataOverride, `DELETE FROM system.tenants WHERE id = $1`, info.ID, ); err != nil { @@ -382,7 +387,7 @@ func GCTenantSync(ctx context.Context, execCfg *ExecutorConfig, info *descpb.Ten log.Fatalf(ctx, "unexpected number of rows affected: %d", num) } - if _, err := execCfg.InternalExecutor.ExecEx( + if _, err := ie.ExecEx( ctx, "delete-tenant-usage", txn, sessiondata.NodeUserSessionDataOverride, `DELETE FROM system.tenant_usage WHERE tenant_id = $1`, info.ID, ); err != nil { diff --git a/pkg/sql/truncate.go b/pkg/sql/truncate.go index 6e84b9c6682c..1164621a191a 100644 --- a/pkg/sql/truncate.go +++ b/pkg/sql/truncate.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" @@ -418,7 +419,8 @@ func (p *planner) copySplitPointsToNewIndexes( if preservedSplitsMultiple <= 0 { return nil } - row, err := p.execCfg.InternalExecutor.QueryRowEx( + ie := p.execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + row, err := ie.QueryRowEx( // Run as Root, since ordinary users can't select from this table. ctx, "count-active-nodes", nil, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, "SELECT count(*) FROM crdb_internal.kv_node_status") @@ -551,8 +553,9 @@ func (p *planner) copySplitPointsToNewIndexes( func (p *planner) reassignIndexComments( ctx context.Context, table *tabledesc.Mutable, indexIDMapping map[descpb.IndexID]descpb.IndexID, ) error { + ie := p.extendedEvalCtx.ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) // Check if there are any index comments that need to be updated. - row, err := p.extendedEvalCtx.ExecCfg.InternalExecutor.QueryRowEx( + row, err := ie.QueryRowEx( ctx, "update-table-comments", p.txn, @@ -569,7 +572,7 @@ func (p *planner) reassignIndexComments( } if int(tree.MustBeDInt(row[0])) > 0 { for old, new := range indexIDMapping { - if _, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.ExecEx( + if _, err := ie.ExecEx( ctx, "update-table-comments", p.txn, diff --git a/pkg/sql/type_change.go b/pkg/sql/type_change.go index b3f2b0a4b075..80864ebceea1 100644 --- a/pkg/sql/type_change.go +++ b/pkg/sql/type_change.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -252,9 +253,10 @@ func (t *typeSchemaChanger) exec(ctx context.Context) error { // If there are any names to drain, then do so. if len(typeDesc.GetDrainingNames()) > 0 { + ie := t.execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) if err := drainNamesForDescriptor( ctx, typeDesc.GetID(), t.execCfg.CollectionFactory, t.execCfg.DB, - t.execCfg.InternalExecutor, codec, nil, + ie, codec, nil, ); err != nil { return err } @@ -936,7 +938,8 @@ func (t *typeSchemaChanger) canRemoveEnumValue( User: security.RootUserName(), Database: dbDesc.GetName(), } - rows, err := t.execCfg.InternalExecutor.QueryRowEx(ctx, "count-value-usage", txn, override, query.String()) + ie := t.execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + rows, err := ie.QueryRowEx(ctx, "count-value-usage", txn, override, query.String()) if err != nil { return errors.Wrapf(err, validationErr, member.LogicalRepresentation) } @@ -1164,7 +1167,8 @@ func (t *typeSchemaChanger) canRemoveEnumValueFromArrayUsages( User: security.RootUserName(), Database: dbDesc.GetName(), } - rows, err := t.execCfg.InternalExecutor.QueryRowEx( + ie := t.execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + rows, err := ie.QueryRowEx( ctx, "count-array-type-value-usage", txn, @@ -1297,9 +1301,10 @@ func (t *typeChangeResumer) OnFailOrCancel(ctx context.Context, execCtx interfac return err } + ie := tc.execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) if err := drainNamesForDescriptor( ctx, tc.typeID, tc.execCfg.CollectionFactory, tc.execCfg.DB, - tc.execCfg.InternalExecutor, tc.execCfg.Codec, + ie, tc.execCfg.Codec, nil, /* beforeDrainNames */ ); err != nil { return err diff --git a/pkg/sql/unsplit.go b/pkg/sql/unsplit.go index 4ece5f0a3356..6079203ca57e 100644 --- a/pkg/sql/unsplit.go +++ b/pkg/sql/unsplit.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/errors" ) @@ -106,7 +107,9 @@ func (n *unsplitAllNode) startExec(params runParams) error { if n.index.GetID() != n.tableDesc.GetPrimaryIndexID() { indexName = n.index.GetName() } - ie := params.p.ExecCfg().InternalExecutorFactory(params.ctx, params.SessionData()) + ie := params.p.ExecCfg().InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(params.SessionData()) + }) it, err := ie.QueryIteratorEx( params.ctx, "split points query", params.p.txn, sessiondata.NoSessionDataOverride, statement, diff --git a/pkg/sql/user.go b/pkg/sql/user.go index d66788f013a1..9a62d0bc8eb3 100644 --- a/pkg/sql/user.go +++ b/pkg/sql/user.go @@ -379,7 +379,8 @@ var userLoginTimeout = settings.RegisterDurationSetting( // GetAllRoles returns a "set" (map) of Roles -> true. func (p *planner) GetAllRoles(ctx context.Context) (map[security.SQLUsername]bool, error) { query := `SELECT username FROM system.users` - it, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryIteratorEx( + ie := p.ExtendedEvalContext().ExecCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + it, err := ie.QueryIteratorEx( ctx, "read-users", p.txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, query) @@ -410,7 +411,8 @@ func RoleExists( ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, role security.SQLUsername, ) (bool, error) { query := `SELECT username FROM system.users WHERE username = $1` - row, err := execCfg.InternalExecutor.QueryRowEx( + ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}) + row, err := ie.QueryRowEx( ctx, "read-users", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, query, role,