Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: refactor internal executor factory to accept init function #73193

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -686,7 +687,8 @@ func (b *backupResumer) maybeNotifyScheduledJobCompletion(
err := exec.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// We cannot rely on b.job containing created_by_id because on job
// resumption the registry does not populate the resumer's CreatedByInfo.
datums, err := exec.InternalExecutor.QueryRowEx(
ie := exec.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
datums, err := ie.QueryRowEx(
ctx,
"lookup-schedule-info",
txn,
Expand All @@ -706,7 +708,7 @@ func (b *backupResumer) maybeNotifyScheduledJobCompletion(

scheduleID := int64(tree.MustBeDInt(datums[0]))
if err := jobs.NotifyJobTermination(
ctx, env, b.job.ID(), jobStatus, b.job.Details(), scheduleID, exec.InternalExecutor, txn); err != nil {
ctx, env, b.job.ID(), jobStatus, b.job.Details(), scheduleID, ie, txn); err != nil {
return errors.Wrapf(err,
"failed to notify schedule %d of completion of job %d", scheduleID, b.job.ID())
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/interval"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -820,13 +821,14 @@ func backupPlanHook(

var spans []roachpb.Span
var tenants []descpb.TenantInfoWithUsage
ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
if backupStmt.Targets != nil && backupStmt.Targets.Tenant != (roachpb.TenantID{}) {
if !p.ExecCfg().Codec.ForSystemTenant() {
return pgerror.Newf(pgcode.InsufficientPrivilege, "only the system tenant can backup other tenants")
}

tenantInfo, err := retrieveSingleTenantMetadata(
ctx, p.ExecCfg().InternalExecutor, p.ExtendedEvalContext().Txn, backupStmt.Targets.Tenant,
ctx, ie, p.ExtendedEvalContext().Txn, backupStmt.Targets.Tenant,
)
if err != nil {
return err
Expand All @@ -842,7 +844,7 @@ func backupPlanHook(
if p.ExecCfg().Codec.ForSystemTenant() && backupStmt.Coverage() == tree.AllDescriptors {
// Include all tenants.
tenants, err = retrieveAllTenantsMetadata(
ctx, p.ExecCfg().InternalExecutor, p.ExtendedEvalContext().Txn,
ctx, ie, p.ExtendedEvalContext().Txn,
)
if err != nil {
return err
Expand Down Expand Up @@ -1185,7 +1187,7 @@ func getScheduledBackupExecutionArgsFromSchedule(
ctx context.Context,
env scheduledjobs.JobSchedulerEnv,
txn *kv.Txn,
ie *sql.InternalExecutor,
ie sqlutil.InternalExecutor,
scheduleID int64,
) (*jobs.ScheduledJob, *ScheduledBackupExecutionArgs, error) {
// Load the schedule that has spawned this job.
Expand Down Expand Up @@ -1224,8 +1226,9 @@ func planSchedulePTSChaining(
return nil
}

ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
_, args, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env,
p.ExtendedEvalContext().Txn, p.ExecCfg().InternalExecutor, backupStmt.CreatedByInfo.ID)
p.ExtendedEvalContext().Txn, ie.(*sql.InternalExecutor), backupStmt.CreatedByInfo.ID)
if err != nil {
return err
}
Expand All @@ -1244,7 +1247,7 @@ func planSchedulePTSChaining(
}

_, incArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env,
p.ExtendedEvalContext().Txn, p.ExecCfg().InternalExecutor, args.DependentScheduleID)
p.ExtendedEvalContext().Txn, ie.(*sql.InternalExecutor), args.DependentScheduleID)
if err != nil {
// If we are unable to resolve the dependent incremental schedule (it
// could have been dropped) we do not need to perform any chaining.
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/backup_planning_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -76,7 +76,7 @@ func tenantMetadataFromRow(row tree.Datums) (descpb.TenantInfoWithUsage, error)
}

func retrieveSingleTenantMetadata(
ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, tenantID roachpb.TenantID,
ctx context.Context, ie sqlutil.InternalExecutor, txn *kv.Txn, tenantID roachpb.TenantID,
) (descpb.TenantInfoWithUsage, error) {
row, err := ie.QueryRow(
ctx, "backup-lookup-tenant", txn,
Expand All @@ -96,7 +96,7 @@ func retrieveSingleTenantMetadata(
}

func retrieveAllTenantsMetadata(
ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn,
ctx context.Context, ie sqlutil.InternalExecutor, txn *kv.Txn,
) ([]descpb.TenantInfoWithUsage, error) {
rows, err := ie.QueryBuffered(
ctx, "backup-lookup-tenants", txn,
Expand Down
9 changes: 5 additions & 4 deletions pkg/ccl/backupccl/create_scheduled_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/protoreflect"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/jsonpb"
Expand Down Expand Up @@ -430,7 +431,7 @@ func doCreateBackupSchedules(
return err
}

ex := p.ExecCfg().InternalExecutor
ex := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})

unpauseOnSuccessID := jobs.InvalidScheduleID

Expand Down Expand Up @@ -509,7 +510,7 @@ func doCreateBackupSchedules(

func setDependentSchedule(
ctx context.Context,
ex *sql.InternalExecutor,
ex sqlutil.InternalExecutor,
scheduleExecutionArgs *ScheduledBackupExecutionArgs,
schedule *jobs.ScheduledJob,
dependentID int64,
Expand Down Expand Up @@ -659,8 +660,8 @@ func emitSchedule(
func checkScheduleAlreadyExists(
ctx context.Context, p sql.PlanHookState, scheduleLabel string,
) (bool, error) {

row, err := p.ExecCfg().InternalExecutor.QueryRowEx(ctx, "check-sched",
ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
row, err := ie.QueryRowEx(ctx, "check-sched",
p.ExtendedEvalContext().Txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()},
fmt.Sprintf("SELECT count(schedule_name) FROM %s WHERE schedule_name = '%s'",
scheduledjobs.ProdJobSchedulerEnv.ScheduledJobsTableName(), scheduleLabel))
Expand Down
11 changes: 7 additions & 4 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1803,7 +1803,9 @@ func revalidateIndexes(
// since our table is offline.
var runner sqlutil.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sqlutil.InternalExecFn) error {
return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ie := job.MakeSessionBoundInternalExecutor(ctx, sql.NewFakeSessionData(execCfg.SV())).(*sql.InternalExecutor)
ie := job.MakeSessionBoundInternalExecutor(ctx, func(ie sqlutil.InternalExecutor) {
ie.SetSessionData(sql.NewFakeSessionData(execCfg.SV()))
})
return fn(ctx, txn, ie)
})
}
Expand Down Expand Up @@ -1895,7 +1897,8 @@ func insertStats(
}

err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := stats.InsertNewStats(ctx, execCfg.InternalExecutor, txn, latestStats); err != nil {
if err := stats.InsertNewStats(ctx, execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}),
txn, latestStats); err != nil {
return errors.Wrapf(err, "inserting stats from backup")
}
details.StatsInserted = true
Expand Down Expand Up @@ -2119,7 +2122,7 @@ func (r *restoreResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}
if details.DescriptorCoverage == tree.AllDescriptors {
// We've dropped defaultdb and postgres in the planning phase, we must
// recreate them now if the full cluster restore failed.
ie := p.ExecCfg().InternalExecutor
ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
_, err := ie.Exec(ctx, "recreate-defaultdb", txn, "CREATE DATABASE IF NOT EXISTS defaultdb")
if err != nil {
return err
Expand Down Expand Up @@ -2665,7 +2668,7 @@ func (r *restoreResumer) restoreSystemTables(
}

func (r *restoreResumer) cleanupTempSystemTables(ctx context.Context, txn *kv.Txn) error {
executor := r.execCfg.InternalExecutor
executor := r.execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
// Check if the temp system database has already been dropped. This can happen
// if the restore job fails after the system database has cleaned up.
checkIfDatabaseExists := "SELECT database_name FROM [SHOW DATABASES] WHERE database_name=$1"
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/roleoption"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -891,7 +892,7 @@ func allocateDescriptorRewrites(
// restore.
func dropDefaultUserDBs(ctx context.Context, execCfg *sql.ExecutorConfig) error {
return sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error {
ie := execCfg.InternalExecutor
ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
_, err := ie.Exec(ctx, "drop-defaultdb", nil, "DROP DATABASE IF EXISTS defaultdb")
if err != nil {
return err
Expand Down Expand Up @@ -1894,8 +1895,9 @@ func doRestorePlan(
if !p.ExecCfg().Codec.ForSystemTenant() {
return pgerror.Newf(pgcode.InsufficientPrivilege, "only the system tenant can restore other tenants")
}
ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
for _, i := range tenants {
res, err := p.ExecCfg().InternalExecutor.QueryRow(
res, err := ie.QueryRow(
ctx, "restore-lookup-tenant", p.ExtendedEvalContext().Txn,
`SELECT active FROM system.tenants WHERE id = $1`, i.ID,
)
Expand Down
12 changes: 7 additions & 5 deletions pkg/ccl/backupccl/schedule_pts_chaining.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -56,9 +57,10 @@ func maybeUpdateSchedulePTSRecord(
}

return exec.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ie := exec.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
// We cannot rely on b.job containing created_by_id because on job
// resumption the registry does not populate the resumers' CreatedByInfo.
datums, err := exec.InternalExecutor.QueryRowEx(
datums, err := ie.QueryRowEx(
ctx,
"lookup-schedule-info",
txn,
Expand All @@ -77,8 +79,7 @@ func maybeUpdateSchedulePTSRecord(
}

scheduleID := int64(tree.MustBeDInt(datums[0]))
_, args, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env, txn,
exec.InternalExecutor, scheduleID)
_, args, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env, txn, ie, scheduleID)
if err != nil {
return errors.Wrap(err, "load scheduled job")
}
Expand Down Expand Up @@ -132,10 +133,11 @@ func manageFullBackupPTSChaining(
exec *sql.ExecutorConfig,
args *ScheduledBackupExecutionArgs,
) error {
ie := exec.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
// Let's resolve the dependent incremental schedule as the first step. If the
// schedule has been dropped then we can avoid doing unnecessary work.
incSj, incArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env, txn,
exec.InternalExecutor, args.DependentScheduleID)
ie, args.DependentScheduleID)
if err != nil {
if jobs.HasScheduledJobNotFoundError(err) {
log.Warningf(ctx, "could not find dependent schedule with id %d",
Expand Down Expand Up @@ -180,7 +182,7 @@ func manageFullBackupPTSChaining(
return err
}
incSj.SetExecutionDetails(incSj.ExecutorType(), jobspb.ExecutionArguments{Args: any})
return incSj.Update(ctx, exec.InternalExecutor, txn)
return incSj.Update(ctx, ie, txn)
}

// manageIncrementalBackupPTSChaining is invoked on successful completion of an
Expand Down
9 changes: 5 additions & 4 deletions pkg/ccl/backupccl/system_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -89,7 +90,7 @@ func defaultSystemTableRestoreFunc(
txn *kv.Txn,
systemTableName, tempTableName string,
) error {
executor := execCfg.InternalExecutor
executor := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})

deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE true", systemTableName)
opName := systemTableName + "-data-deletion"
Expand Down Expand Up @@ -117,7 +118,7 @@ func defaultSystemTableRestoreFunc(
func jobsMigrationFunc(
ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, tempTableName string,
) (err error) {
executor := execCfg.InternalExecutor
executor := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})

const statesToRevert = `('` + string(jobs.StatusRunning) + `', ` +
`'` + string(jobs.StatusPauseRequested) + `', ` +
Expand Down Expand Up @@ -191,7 +192,7 @@ func jobsRestoreFunc(
txn *kv.Txn,
systemTableName, tempTableName string,
) error {
executor := execCfg.InternalExecutor
executor := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})

// When restoring jobs, don't clear the existing table.

Expand All @@ -212,7 +213,7 @@ func settingsRestoreFunc(
txn *kv.Txn,
systemTableName, tempTableName string,
) error {
executor := execCfg.InternalExecutor
executor := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})

deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE name <> 'version'", systemTableName)
opName := systemTableName + "-data-deletion"
Expand Down
16 changes: 9 additions & 7 deletions pkg/ccl/changefeedccl/schemafeed/schema_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,15 @@ func New(
metrics *Metrics,
) SchemaFeed {
m := &schemaFeed{
filter: schemaChangeEventFilters[events],
db: cfg.DB,
clock: cfg.DB.Clock(),
settings: cfg.Settings,
targets: targets,
leaseMgr: cfg.LeaseManager.(*lease.Manager),
ie: cfg.SessionBoundInternalExecutorFactory(ctx, &sessiondata.SessionData{}),
filter: schemaChangeEventFilters[events],
db: cfg.DB,
clock: cfg.DB.Clock(),
settings: cfg.Settings,
targets: targets,
leaseMgr: cfg.LeaseManager.(*lease.Manager),
ie: cfg.SessionBoundInternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {
ie.SetSessionData(&sessiondata.SessionData{})
}),
collectionFactory: cfg.CollectionFactory,
metrics: metrics,
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/importccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqltelemetry",
"//pkg/sql/sqlutil",
"//pkg/sql/stats",
"//pkg/sql/types",
"//pkg/util",
Expand Down
11 changes: 7 additions & 4 deletions pkg/ccl/importccl/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -165,8 +166,8 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
// is in keeping with the semantics we use when creating a schema during
// sql execution. Namely, queue job in the txn which creates the schema
// desc and run once the txn has committed.
if err := p.ExecCfg().JobRegistry.Run(ctx, p.ExecCfg().InternalExecutor,
schemaMetadata.queuedSchemaJobs); err != nil {
ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
if err := p.ExecCfg().JobRegistry.Run(ctx, ie, schemaMetadata.queuedSchemaJobs); err != nil {
return err
}

Expand Down Expand Up @@ -954,6 +955,7 @@ func (r *importResumer) publishTables(
func (r *importResumer) writeStubStatisticsForImportedTables(
ctx context.Context, execCfg *sql.ExecutorConfig, res roachpb.BulkOpSummary,
) {
ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
details := r.job.Details().(jobspb.ImportDetails)
for _, tbl := range details.Tables {
if tbl.IsNew {
Expand All @@ -977,7 +979,7 @@ func (r *importResumer) writeStubStatisticsForImportedTables(
statistic.AvgSize = avgRowSize
}
// TODO(michae2): parallelize insertion of statistics.
err = stats.InsertNewStats(ctx, execCfg.InternalExecutor, nil /* txn */, statistics)
err = stats.InsertNewStats(ctx, ie, nil /* txn */, statistics)
}
if err != nil {
// Failure to create statistics should not fail the entire import.
Expand Down Expand Up @@ -1317,7 +1319,8 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, execCtx interface{})
// This would be a job to drop all the schemas, and a job to update the parent
// database descriptor.
if len(jobsToRunAfterTxnCommit) != 0 {
if err := p.ExecCfg().JobRegistry.Run(ctx, p.ExecCfg().InternalExecutor,
ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
if err := p.ExecCfg().JobRegistry.Run(ctx, ie,
jobsToRunAfterTxnCommit); err != nil {
return errors.Wrap(err, "failed to run jobs that drop the imported schemas")
}
Expand Down
Loading