From e6c664ab02f1224a544e371267c4b1196b2fe56e Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Mon, 8 Aug 2022 13:00:54 -0500 Subject: [PATCH 1/7] sql: skip release descs.Collection and schema change jobs records if are passed This commit adds a boolean field `fromOuterTxn` to the conn executor's extraTxnState. It's set true when the conn executor is run with a not-nil txn passed from the internal executor, and hence the collection and the job records, which are passed from the caller of the internal executor to the conn executor, should not be released when the conn executor close. Instead, we leave the caller to release them. This commit also changed the descriptor collection and schema changer state stored in conn executor's `ExtraTxnState` to pointer. We also deprecated `collectionFactory.MakeCollection()` with `collectionFactory.NewCollection()`. Release note: None --- .../cdcevent/rowfetcher_cache.go | 2 +- pkg/ccl/changefeedccl/changefeed_stmt.go | 2 +- pkg/sql/catalog/descs/collection.go | 15 ++++--- pkg/sql/catalog/descs/collection_test.go | 6 +-- pkg/sql/catalog/descs/factory.go | 17 ++----- pkg/sql/catalog/descs/txn.go | 10 ++--- pkg/sql/conn_executor.go | 44 ++++++++++++------- pkg/sql/conn_executor_exec.go | 8 ++-- pkg/sql/distsql/server.go | 2 +- pkg/sql/planner.go | 2 +- pkg/sql/row/row_converter.go | 2 +- pkg/sql/temporary_schema.go | 2 +- 12 files changed, 60 insertions(+), 52 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go index 2c54634dd52e..3f5523eeadb8 100644 --- a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go @@ -94,7 +94,7 @@ func newRowFetcherCache( return &rowFetcherCache{ codec: codec, leaseMgr: leaseMgr, - collection: cf.NewCollection(ctx, nil /* TemporarySchemaProvider */), + collection: cf.NewCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */), db: db, fetchers: cache.NewUnorderedCache(defaultCacheConfig), watchedFamilies: watchedFamilies, diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index b175c43b66e3..78e6cf64b0c6 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -1133,7 +1133,7 @@ func getQualifiedTableName( func getQualifiedTableNameObj( ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, desc catalog.TableDescriptor, ) (tree.TableName, error) { - col := execCfg.CollectionFactory.MakeCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */) + col := execCfg.CollectionFactory.NewCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */) dbDesc, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, desc.GetParentID()) if err != nil { return tree.TableName{}, err diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index 3262c16bd430..12ea3e4e1119 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -39,8 +39,8 @@ import ( "github.com/cockroachdb/errors" ) -// makeCollection constructs a Collection. -func makeCollection( +// newCollection constructs a Collection. +func newCollection( ctx context.Context, leaseMgr *lease.Manager, settings *cluster.Settings, @@ -50,8 +50,8 @@ func makeCollection( virtualSchemas catalog.VirtualSchemas, temporarySchemaProvider TemporarySchemaProvider, monitor *mon.BytesMonitor, -) Collection { - return Collection{ +) *Collection { + return &Collection{ settings: settings, version: settings.Version.ActiveVersion(ctx), hydrated: hydrated, @@ -178,11 +178,16 @@ func (tc *Collection) ReleaseLeases(ctx context.Context) { func (tc *Collection) ReleaseAll(ctx context.Context) { tc.ReleaseLeases(ctx) tc.stored.reset(ctx) - tc.synthetic.reset() + tc.ResetSyntheticDescriptors() tc.deletedDescs = catalog.DescriptorIDSet{} tc.skipValidationOnWrite = false } +// ResetSyntheticDescriptors clear all syntheticDescriptors. +func (tc *Collection) ResetSyntheticDescriptors() { + tc.synthetic.reset() +} + // HasUncommittedTables returns true if the Collection contains uncommitted // tables. func (tc *Collection) HasUncommittedTables() bool { diff --git a/pkg/sql/catalog/descs/collection_test.go b/pkg/sql/catalog/descs/collection_test.go index 7024a8fce413..0fd7f38cf800 100644 --- a/pkg/sql/catalog/descs/collection_test.go +++ b/pkg/sql/catalog/descs/collection_test.go @@ -68,7 +68,7 @@ func TestCollectionWriteDescToBatch(t *testing.T) { db := s0.DB() descriptors := s0.ExecutorConfig().(sql.ExecutorConfig).CollectionFactory. - NewCollection(ctx, nil /* TemporarySchemaProvider */) + NewCollection(ctx, nil /* TemporarySchemaProvider */, nil /* Monitor */) // Note this transaction abuses the mechanisms normally required for updating // tables and is just for testing what this test intends to exercise. @@ -640,7 +640,7 @@ func TestCollectionProperlyUsesMemoryMonitoring(t *testing.T) { // Create a `Collection` with monitor hooked up. col := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).CollectionFactory. - MakeCollection(ctx, nil /* temporarySchemaProvider */, monitor) + NewCollection(ctx, nil /* temporarySchemaProvider */, monitor) require.Equal(t, int64(0), monitor.AllocBytes()) // Read all the descriptors into `col` and assert this read will finish without error. @@ -659,7 +659,7 @@ func TestCollectionProperlyUsesMemoryMonitoring(t *testing.T) { // Repeat the process again and assert this time memory allocation will err out. col = tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).CollectionFactory. - MakeCollection(ctx, nil /* temporarySchemaProvider */, monitor) + NewCollection(ctx, nil /* temporarySchemaProvider */, monitor) _, err2 := col.GetAllDescriptors(ctx, txn) require.Error(t, err2) diff --git a/pkg/sql/catalog/descs/factory.go b/pkg/sql/catalog/descs/factory.go index c77ea48d5275..655b38d1b5ca 100644 --- a/pkg/sql/catalog/descs/factory.go +++ b/pkg/sql/catalog/descs/factory.go @@ -72,24 +72,15 @@ func NewBareBonesCollectionFactory( } } -// MakeCollection constructs a Collection for the purposes of embedding. -func (cf *CollectionFactory) MakeCollection( +// NewCollection constructs a new Collection. +func (cf *CollectionFactory) NewCollection( ctx context.Context, temporarySchemaProvider TemporarySchemaProvider, monitor *mon.BytesMonitor, -) Collection { +) *Collection { if monitor == nil { // If an upstream monitor is not provided, the default, unlimited monitor will be used. // All downstream resource allocation/releases on this default monitor will then be no-ops. monitor = cf.defaultMonitor } - - return makeCollection(ctx, cf.leaseMgr, cf.settings, cf.codec, cf.hydrated, cf.systemDatabase, + return newCollection(ctx, cf.leaseMgr, cf.settings, cf.codec, cf.hydrated, cf.systemDatabase, cf.virtualSchemas, temporarySchemaProvider, monitor) } - -// NewCollection constructs a new Collection. -func (cf *CollectionFactory) NewCollection( - ctx context.Context, temporarySchemaProvider TemporarySchemaProvider, -) *Collection { - c := cf.MakeCollection(ctx, temporarySchemaProvider, nil /* monitor */) - return &c -} diff --git a/pkg/sql/catalog/descs/txn.go b/pkg/sql/catalog/descs/txn.go index 6226a8c679c1..94659d833ae9 100644 --- a/pkg/sql/catalog/descs/txn.go +++ b/pkg/sql/catalog/descs/txn.go @@ -75,13 +75,13 @@ func (cf *CollectionFactory) Txn( for { var modifiedDescriptors []lease.IDVersion var deletedDescs catalog.DescriptorIDSet - var descsCol Collection + var descsCol *Collection if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { modifiedDescriptors = nil deletedDescs = catalog.DescriptorIDSet{} - descsCol = cf.MakeCollection(ctx, nil /* temporarySchemaProvider */, nil /* monitor */) + descsCol = cf.NewCollection(ctx, nil /* temporarySchemaProvider */, nil /* monitor */) defer descsCol.ReleaseAll(ctx) - if err := f(ctx, txn, &descsCol); err != nil { + if err := f(ctx, txn, descsCol); err != nil { return err } @@ -91,12 +91,12 @@ func (cf *CollectionFactory) Txn( modifiedDescriptors = descsCol.GetDescriptorsWithNewVersion() if err := CheckSpanCountLimit( - ctx, &descsCol, cf.spanConfigSplitter, cf.spanConfigLimiter, txn, + ctx, descsCol, cf.spanConfigSplitter, cf.spanConfigLimiter, txn, ); err != nil { return err } retryErr, err := CheckTwoVersionInvariant( - ctx, db.Clock(), ie, &descsCol, txn, nil /* onRetryBackoff */) + ctx, db.Clock(), ie, descsCol, txn, nil /* onRetryBackoff */) if retryErr { return errTwoVersionInvariantViolated } diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 74ca33d1bcc0..8a80040ee2f8 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -981,10 +981,10 @@ func (s *Server) newConnExecutor( portals: make(map[string]PreparedPortal), } ex.extraTxnState.prepStmtsNamespaceMemAcc = ex.sessionMon.MakeBoundAccount() - ex.extraTxnState.descCollection = s.cfg.CollectionFactory.MakeCollection(ctx, descs.NewTemporarySchemaProvider(sdMutIterator.sds), ex.sessionMon) + ex.extraTxnState.descCollection = s.cfg.CollectionFactory.NewCollection(ctx, descs.NewTemporarySchemaProvider(sdMutIterator.sds), ex.sessionMon) ex.extraTxnState.txnRewindPos = -1 ex.extraTxnState.schemaChangeJobRecords = make(map[descpb.ID]*jobs.Record) - ex.extraTxnState.schemaChangerState = SchemaChangerState{ + ex.extraTxnState.schemaChangerState = &SchemaChangerState{ mode: ex.sessionData().NewSchemaChangerMode, } ex.queryCancelKey = pgwirecancel.MakeBackendKeyData(ex.rng, ex.server.cfg.NodeInfo.NodeID.SQLInstanceID()) @@ -1288,8 +1288,15 @@ type connExecutor struct { // the field is accessed in connExecutor's serialize function, it should be // added to txnState behind the mutex. extraTxnState struct { + // fromOuterTxn should be set true if the conn executor is run under an + // internal executor with an outer txn, which means when the conn executor + // closes, it should not release the leases of descriptor collections or + // delete schema change job records. Instead, we leave the caller of the + // internal executor to release them. + fromOuterTxn bool + // descCollection collects descriptors used by the current transaction. - descCollection descs.Collection + descCollection *descs.Collection // jobs accumulates jobs staged for execution inside the transaction. // Staging happens when executing statements that are implemented with a @@ -1412,7 +1419,7 @@ type connExecutor struct { // statements. transactionStatementsHash util.FNV64 - schemaChangerState SchemaChangerState + schemaChangerState *SchemaChangerState // shouldCollectTxnExecutionStats specifies whether the statements in // this transaction should collect execution stats. @@ -1699,19 +1706,22 @@ func (ns *prepStmtNamespace) resetTo( // transaction event, resetExtraTxnState invokes corresponding callbacks // (e.g. onTxnFinish() and onTxnRestart()). func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) { - ex.extraTxnState.jobs = nil ex.extraTxnState.firstStmtExecuted = false ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{} - ex.extraTxnState.schemaChangerState = SchemaChangerState{ - mode: ex.sessionData().NewSchemaChangerMode, - } - for k := range ex.extraTxnState.schemaChangeJobRecords { - delete(ex.extraTxnState.schemaChangeJobRecords, k) + if ex.extraTxnState.fromOuterTxn { + ex.extraTxnState.descCollection.ResetSyntheticDescriptors() + } else { + ex.extraTxnState.descCollection.ReleaseAll(ctx) + for k := range ex.extraTxnState.schemaChangeJobRecords { + delete(ex.extraTxnState.schemaChangeJobRecords, k) + } + ex.extraTxnState.jobs = nil + ex.extraTxnState.schemaChangerState = &SchemaChangerState{ + mode: ex.sessionData().NewSchemaChangerMode, + } } - ex.extraTxnState.descCollection.ReleaseAll(ctx) - // Close all portals. for name, p := range ex.extraTxnState.prepStmtsNamespace.portals { p.close(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name) @@ -2748,7 +2758,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo }, Tracing: &ex.sessionTracing, MemMetrics: &ex.memMetrics, - Descs: &ex.extraTxnState.descCollection, + Descs: ex.extraTxnState.descCollection, TxnModesSetter: ex, Jobs: &ex.extraTxnState.jobs, SchemaChangeJobRecords: ex.extraTxnState.schemaChangeJobRecords, @@ -2787,7 +2797,7 @@ func (ex *connExecutor) resetEvalCtx(evalCtx *extendedEvalContext, txn *kv.Txn, evalCtx.Mon = ex.state.mon evalCtx.PrepareOnly = false evalCtx.SkipNormalize = false - evalCtx.SchemaChangerState = &ex.extraTxnState.schemaChangerState + evalCtx.SchemaChangerState = ex.extraTxnState.schemaChangerState // If we are retrying due to an unsatisfiable timestamp bound which is // retriable, it means we were unable to serve the previous minimum timestamp @@ -3039,7 +3049,7 @@ func (ex *connExecutor) handleWaitingForConcurrentSchemaChanges( ctx context.Context, descID descpb.ID, ) error { if err := ex.planner.waitForDescriptorSchemaChanges( - ctx, descID, ex.extraTxnState.schemaChangerState, + ctx, descID, *ex.extraTxnState.schemaChangerState, ); err != nil { return err } @@ -3275,7 +3285,7 @@ func (ex *connExecutor) notifyStatsRefresherOfNewTables(ctx context.Context) { // runPreCommitStages is part of the new schema changer infrastructure to // mutate descriptors prior to committing a SQL transaction. func (ex *connExecutor) runPreCommitStages(ctx context.Context) error { - scs := &ex.extraTxnState.schemaChangerState + scs := ex.extraTxnState.schemaChangerState if len(scs.state.Targets) == 0 { return nil } @@ -3284,7 +3294,7 @@ func (ex *connExecutor) runPreCommitStages(ctx context.Context) error { ex.planner.User(), ex.server.cfg, ex.planner.txn, - &ex.extraTxnState.descCollection, + ex.extraTxnState.descCollection, ex.planner.EvalContext(), ex.planner.ExtendedEvalContext().Tracing.KVTracingEnabled(), scs.jobID, diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index c279c15deed2..d9f233e2bdb4 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -852,7 +852,7 @@ func (ex *connExecutor) checkDescriptorTwoVersionInvariant(ctx context.Context) if err := descs.CheckSpanCountLimit( ctx, - &ex.extraTxnState.descCollection, + ex.extraTxnState.descCollection, ex.server.cfg.SpanConfigSplitter, ex.server.cfg.SpanConfigLimiter, ex.state.mu.txn, @@ -864,7 +864,7 @@ func (ex *connExecutor) checkDescriptorTwoVersionInvariant(ctx context.Context) ctx, ex.server.cfg.Clock, ex.server.cfg.InternalExecutor, - &ex.extraTxnState.descCollection, + ex.extraTxnState.descCollection, ex.state.mu.txn, inRetryBackoff, ) @@ -978,7 +978,9 @@ func (ex *connExecutor) commitSQLTransactionInternal(ctx context.Context) error // to release the leases for them so that the schema change can proceed and // we don't block the client. if descs := ex.extraTxnState.descCollection.GetDescriptorsWithNewVersion(); descs != nil { - ex.extraTxnState.descCollection.ReleaseLeases(ctx) + if !ex.extraTxnState.fromOuterTxn { + ex.extraTxnState.descCollection.ReleaseLeases(ctx) + } } return nil } diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 8f810c41d007..00b8906d9117 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -499,7 +499,7 @@ func (ds *ServerImpl) newFlowContext( // If we weren't passed a descs.Collection, then make a new one. We are // responsible for cleaning it up and releasing any accessed descriptors // on flow cleanup. - flowCtx.Descriptors = ds.CollectionFactory.NewCollection(ctx, descs.NewTemporarySchemaProvider(evalCtx.SessionDataStack)) + flowCtx.Descriptors = ds.CollectionFactory.NewCollection(ctx, descs.NewTemporarySchemaProvider(evalCtx.SessionDataStack), nil /* monitor */) flowCtx.IsDescriptorsCleanupRequired = true flowCtx.EvalCatalogBuiltins.Init(evalCtx.Codec, evalCtx.Txn, flowCtx.Descriptors) evalCtx.CatalogBuiltins = &flowCtx.EvalCatalogBuiltins diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 2e3a92abebee..bb2dba470749 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -332,7 +332,7 @@ func newInternalPlanner( sds := sessiondata.NewStack(sd) if params.collection == nil { - params.collection = execCfg.CollectionFactory.NewCollection(ctx, descs.NewTemporarySchemaProvider(sds)) + params.collection = execCfg.CollectionFactory.NewCollection(ctx, descs.NewTemporarySchemaProvider(sds), nil /* monitor */) } var ts time.Time diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index 00726598aa90..a435bdcfe94d 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -267,7 +267,7 @@ func (c *DatumRowConverter) getSequenceAnnotation( // TODO(postamar): give the eval.Context a useful interface // instead of cobbling a descs.Collection in this way. cf := descs.NewBareBonesCollectionFactory(evalCtx.Settings, evalCtx.Codec) - descsCol := cf.MakeCollection(evalCtx.Context, descs.NewTemporarySchemaProvider(evalCtx.SessionDataStack), nil /* monitor */) + descsCol := cf.NewCollection(evalCtx.Context, descs.NewTemporarySchemaProvider(evalCtx.SessionDataStack), nil /* monitor */) err := c.db.Txn(evalCtx.Context, func(ctx context.Context, txn *kv.Txn) error { seqNameToMetadata = make(map[string]*SequenceMetadata) seqIDToMetadata = make(map[descpb.ID]*SequenceMetadata) diff --git a/pkg/sql/temporary_schema.go b/pkg/sql/temporary_schema.go index 43ec8e920e58..a31410edbb0f 100644 --- a/pkg/sql/temporary_schema.go +++ b/pkg/sql/temporary_schema.go @@ -520,7 +520,7 @@ func (c *TemporaryObjectCleaner) doTemporaryObjectCleanup( waitTimeForCreation := TempObjectWaitInterval.Get(&c.settings.SV) // Build a set of all databases with temporary objects. var allDbDescs []catalog.DatabaseDescriptor - descsCol := c.collectionFactory.NewCollection(ctx, nil /* TemporarySchemaProvider */) + descsCol := c.collectionFactory.NewCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */) if err := retryFunc(ctx, func() error { var err error allDbDescs, err = descsCol.GetAllDatabaseDescriptors(ctx, txn) From 32a12faa4575bbaec39b7ed8848f6437fa2c87df Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Tue, 9 Aug 2022 12:30:16 -0500 Subject: [PATCH 2/7] sql/sqlutil: change InternalExecutorFactory to an interface This commit 1. renamed the original `sqlutil.SessionBoundInternalExecutorFactory` to a more general name `sqlutil.InternalExecutorFactory`, and 2. change this factory from a function to an interface, that include a `NewInternalExecutor()` method with the same logic as the original function. Release note: none --- pkg/ccl/backupccl/restore_job.go | 2 +- .../changefeedccl/schemafeed/schema_feed.go | 2 +- pkg/jobs/jobs.go | 9 ++++-- pkg/jobs/registry.go | 12 ++++---- .../server_internal_executor_factory_test.go | 2 +- pkg/server/server_sql.go | 21 ++++++-------- pkg/sql/alter_table.go | 4 +-- pkg/sql/backfill.go | 12 ++++---- pkg/sql/descmetadata/metadata_updater.go | 20 ++++++------- pkg/sql/exec_util.go | 2 +- pkg/sql/execinfra/server_config.go | 4 +-- pkg/sql/execstats/traceanalyzer_test.go | 2 +- pkg/sql/importer/import_job.go | 2 +- pkg/sql/instrumentation.go | 3 +- pkg/sql/internal.go | 29 +++++++++++++++++++ pkg/sql/opt_exec_factory.go | 3 +- pkg/sql/planner.go | 4 +-- pkg/sql/resolve_oid.go | 4 +-- pkg/sql/schema_changer.go | 18 ++++-------- .../schemachanger/scdeps/index_validator.go | 6 ++-- pkg/sql/sql_cursor.go | 2 +- pkg/sql/sqlutil/internal_executor.go | 13 +++++---- pkg/sql/temporary_schema.go | 6 ++-- pkg/sql/unsplit.go | 2 +- 24 files changed, 103 insertions(+), 81 deletions(-) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 9c6ae9697cae..9d5344865160 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1581,7 +1581,7 @@ func revalidateIndexes( // since our table is offline. var runner sqlutil.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sqlutil.InternalExecFn) error { return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - ie := job.MakeSessionBoundInternalExecutor(ctx, sql.NewFakeSessionData(execCfg.SV())).(*sql.InternalExecutor) + ie := job.MakeSessionBoundInternalExecutor(sql.NewFakeSessionData(execCfg.SV())).(*sql.InternalExecutor) return fn(ctx, txn, ie) }) } diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go index ef01d8087116..afc58244b3a1 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go +++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go @@ -92,7 +92,7 @@ func New( settings: cfg.Settings, targets: targets, leaseMgr: cfg.LeaseManager.(*lease.Manager), - ie: cfg.SessionBoundInternalExecutorFactory(ctx, &sessiondata.SessionData{}), + ie: cfg.InternalExecutorFactory.NewInternalExecutor(&sessiondata.SessionData{}), collectionFactory: cfg.CollectionFactory, metrics: metrics, tolerances: tolerances, diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index eb4af74cd80d..47af9b73eff5 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -718,9 +718,14 @@ func (j *Job) FractionCompleted() float32 { // sessionBoundInternalExecutorFactory for a more detailed explanation of why // this exists. func (j *Job) MakeSessionBoundInternalExecutor( - ctx context.Context, sd *sessiondata.SessionData, + sd *sessiondata.SessionData, ) sqlutil.InternalExecutor { - return j.registry.sessionBoundInternalExecutorFactory(ctx, sd) + return j.registry.internalExecutorFactory.NewInternalExecutor(sd) +} + +// GetInternalExecutorFactory returns the internal executor factory. +func (j *Job) GetInternalExecutorFactory() sqlutil.InternalExecutorFactory { + return j.registry.internalExecutorFactory } // MarkIdle marks the job as Idle. Idleness should not be toggled frequently diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 2bc90981a0a3..0ed2d228a005 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -127,7 +127,7 @@ type Registry struct { // field. Modifying the TableCollection is basically a per-query operation // and should be a per-query setting. #34304 is the issue for creating/ // improving this API. - sessionBoundInternalExecutorFactory sqlutil.SessionBoundInternalExecutorFactory + internalExecutorFactory sqlutil.InternalExecutorFactory // if non-empty, indicates path to file that prevents any job adoptions. preventAdoptionFile string @@ -226,14 +226,12 @@ func MakeRegistry( return r } -// SetSessionBoundInternalExecutorFactory sets the -// SessionBoundInternalExecutorFactory that will be used by the job registry +// SetInternalExecutorFactory sets the +// InternalExecutorFactory that will be used by the job registry // executor. We expose this separately from the constructor to avoid a circular // dependency. -func (r *Registry) SetSessionBoundInternalExecutorFactory( - factory sqlutil.SessionBoundInternalExecutorFactory, -) { - r.sessionBoundInternalExecutorFactory = factory +func (r *Registry) SetInternalExecutorFactory(factory sqlutil.InternalExecutorFactory) { + r.internalExecutorFactory = factory } // NewSpanConstrainer returns an instance of sql.SpanConstrainer as an interface{}, diff --git a/pkg/server/server_internal_executor_factory_test.go b/pkg/server/server_internal_executor_factory_test.go index 7f767c6cccc8..38b6a9afd5c3 100644 --- a/pkg/server/server_internal_executor_factory_test.go +++ b/pkg/server/server_internal_executor_factory_test.go @@ -34,7 +34,7 @@ func TestInternalExecutorClearsMonitorMemory(t *testing.T) { mon := s.(*TestServer).sqlServer.internalExecutorFactoryMemMonitor ief := s.ExecutorConfig().(sql.ExecutorConfig).InternalExecutorFactory sessionData := sql.NewFakeSessionData(&s.ClusterSettings().SV) - ie := ief(ctx, sessionData) + ie := ief.NewInternalExecutor(sessionData) rows, err := ie.QueryIteratorEx(ctx, "test", nil, sessiondata.NodeUserSessionDataOverride, `SELECT 1`) require.NoError(t, err) require.Greater(t, mon.AllocBytes(), int64(0)) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 427121f8907c..c3b2c1f4188a 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -91,7 +91,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/startupmigrations" @@ -947,18 +946,16 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { ieFactoryMonitor.StartNoReserved(ctx, pgServer.SQLServer.GetBytesMonitor()) // Now that we have a pgwire.Server (which has a sql.Server), we can close a // circular dependency between the rowexec.Server and sql.Server and set - // SessionBoundInternalExecutorFactory. The same applies for setting a + // InternalExecutorFactory. The same applies for setting a // SessionBoundInternalExecutor on the job registry. - ieFactory := func( - ctx context.Context, sessionData *sessiondata.SessionData, - ) sqlutil.InternalExecutor { - ie := sql.MakeInternalExecutor(pgServer.SQLServer, internalMemMetrics, ieFactoryMonitor) - ie.SetSessionData(sessionData) - return &ie - } + ieFactory := sql.NewInternalExecutorFactory( + pgServer.SQLServer, + internalMemMetrics, + ieFactoryMonitor, + ) - distSQLServer.ServerConfig.SessionBoundInternalExecutorFactory = ieFactory - jobRegistry.SetSessionBoundInternalExecutorFactory(ieFactory) + distSQLServer.ServerConfig.InternalExecutorFactory = ieFactory + jobRegistry.SetInternalExecutorFactory(ieFactory) execCfg.IndexBackfiller = sql.NewIndexBackfiller(execCfg) execCfg.IndexMerger = sql.NewIndexBackfillerMergePlanner(execCfg) execCfg.IndexValidator = scdeps.NewIndexValidator( @@ -1070,7 +1067,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.db, codec, cfg.registry, - distSQLServer.ServerConfig.SessionBoundInternalExecutorFactory, + distSQLServer.ServerConfig.InternalExecutorFactory, cfg.sqlStatusServer, cfg.isMeta1Leaseholder, sqlExecutorTestingKnobs, diff --git a/pkg/sql/alter_table.go b/pkg/sql/alter_table.go index fbca74993427..1c738b02854a 100644 --- a/pkg/sql/alter_table.go +++ b/pkg/sql/alter_table.go @@ -611,8 +611,8 @@ func (n *alterTableNode) startExec(params runParams) error { "constraint %q in the middle of being added, try again later", t.Constraint) } if err := validateUniqueWithoutIndexConstraintInTxn( - params.ctx, params.ExecCfg().InternalExecutorFactory( - params.ctx, params.SessionData(), + params.ctx, params.ExecCfg().InternalExecutorFactory.NewInternalExecutor( + params.SessionData(), ), n.tableDesc, params.p.Txn(), params.p.User(), name, ); err != nil { return err diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 97736aa3083c..eef01b049221 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -167,7 +167,7 @@ func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { // We need to re-create the evalCtx since the txn may retry. - ie := sc.ieFactory(ctx, NewFakeSessionData(sc.execCfg.SV())) + ie := sc.ieFactory.NewInternalExecutor(NewFakeSessionData(sc.execCfg.SV())) return retryable(ctx, txn, ie) }) } @@ -764,7 +764,7 @@ func (sc *SchemaChanger) validateConstraints( } } else if c.IsUniqueWithoutIndex() { if err := validateUniqueWithoutIndexConstraintInTxn( - ctx, sc.ieFactory(ctx, evalCtx.SessionData()), desc, txn, evalCtx.SessionData().User(), c.GetName(), + ctx, sc.ieFactory.NewInternalExecutor(evalCtx.SessionData()), desc, txn, evalCtx.SessionData().User(), c.GetName(), ); err != nil { return err } @@ -2480,7 +2480,7 @@ func runSchemaChangesInTxn( func validateCheckInTxn( ctx context.Context, semaCtx *tree.SemaContext, - ief sqlutil.SessionBoundInternalExecutorFactory, + ief sqlutil.InternalExecutorFactory, sessionData *sessiondata.SessionData, tableDesc *tabledesc.Mutable, txn *kv.Txn, @@ -2490,7 +2490,7 @@ func validateCheckInTxn( if tableDesc.Version > tableDesc.ClusterVersion().Version { syntheticDescs = append(syntheticDescs, tableDesc) } - ie := ief(ctx, sessionData) + ie := ief.NewInternalExecutor(sessionData) return ie.WithSyntheticDescriptors(syntheticDescs, func() error { return validateCheckExpr(ctx, semaCtx, sessionData, checkExpr, tableDesc, ie, txn) }) @@ -2510,7 +2510,7 @@ func validateCheckInTxn( // reuse an existing kv.Txn safely. func validateFkInTxn( ctx context.Context, - ief sqlutil.SessionBoundInternalExecutorFactory, + ief sqlutil.InternalExecutorFactory, sd *sessiondata.SessionData, srcTable *tabledesc.Mutable, txn *kv.Txn, @@ -2543,7 +2543,7 @@ func validateFkInTxn( targetTable = syntheticTable } } - ie := ief(ctx, sd) + ie := ief.NewInternalExecutor(sd) return ie.WithSyntheticDescriptors(syntheticDescs, func() error { return validateForeignKey(ctx, srcTable, targetTable, fk, ie, txn) }) diff --git a/pkg/sql/descmetadata/metadata_updater.go b/pkg/sql/descmetadata/metadata_updater.go index 6a842390438d..ae744b64c79b 100644 --- a/pkg/sql/descmetadata/metadata_updater.go +++ b/pkg/sql/descmetadata/metadata_updater.go @@ -39,7 +39,7 @@ import ( type metadataUpdater struct { ctx context.Context txn *kv.Txn - ieFactory sqlutil.SessionBoundInternalExecutorFactory + ieFactory sqlutil.InternalExecutorFactory sessionData *sessiondata.SessionData descriptors *descs.Collection cacheEnabled bool @@ -50,7 +50,7 @@ type metadataUpdater struct { // schema objects. func NewMetadataUpdater( ctx context.Context, - ieFactory sqlutil.SessionBoundInternalExecutorFactory, + ieFactory sqlutil.InternalExecutorFactory, descriptors *descs.Collection, settings *settings.Values, txn *kv.Txn, @@ -76,7 +76,7 @@ func NewMetadataUpdater( func (mu metadataUpdater) UpsertDescriptorComment( id int64, subID int64, commentType keys.CommentType, comment string, ) error { - ie := mu.ieFactory(mu.ctx, mu.sessionData) + ie := mu.ieFactory.NewInternalExecutor(mu.sessionData) _, err := ie.ExecEx(context.Background(), fmt.Sprintf("upsert-%s-comment", commentType), mu.txn, @@ -94,7 +94,7 @@ func (mu metadataUpdater) UpsertDescriptorComment( func (mu metadataUpdater) DeleteDescriptorComment( id int64, subID int64, commentType keys.CommentType, ) error { - ie := mu.ieFactory(mu.ctx, mu.sessionData) + ie := mu.ieFactory.NewInternalExecutor(mu.sessionData) _, err := ie.ExecEx(context.Background(), fmt.Sprintf("delete-%s-comment", commentType), mu.txn, @@ -126,7 +126,7 @@ DELETE FROM system.comments _, _ = fmt.Fprintf(&buf, ", %d", id) } buf.WriteString(")") - ie := mu.ieFactory(mu.ctx, mu.sessionData) + ie := mu.ieFactory.NewInternalExecutor(mu.sessionData) _, err := ie.ExecEx(context.Background(), "delete-all-comments-for-tables", mu.txn, @@ -152,7 +152,7 @@ func (mu metadataUpdater) DeleteConstraintComment( // DeleteDatabaseRoleSettings implement scexec.DescriptorMetaDataUpdater. func (mu metadataUpdater) DeleteDatabaseRoleSettings(ctx context.Context, dbID descpb.ID) error { - ie := mu.ieFactory(mu.ctx, mu.sessionData) + ie := mu.ieFactory.NewInternalExecutor(mu.sessionData) rowsDeleted, err := ie.ExecEx(ctx, "delete-db-role-setting", mu.txn, @@ -193,7 +193,7 @@ func (mu metadataUpdater) DeleteDatabaseRoleSettings(ctx context.Context, dbID d func (mu metadataUpdater) SwapDescriptorSubComment( id int64, oldSubID int64, newSubID int64, commentType keys.CommentType, ) error { - ie := mu.ieFactory(mu.ctx, mu.sessionData) + ie := mu.ieFactory.NewInternalExecutor(mu.sessionData) _, err := ie.ExecEx(context.Background(), fmt.Sprintf("upsert-%s-comment", commentType), mu.txn, @@ -210,7 +210,7 @@ func (mu metadataUpdater) SwapDescriptorSubComment( // DeleteSchedule implement scexec.DescriptorMetadataUpdater. func (mu metadataUpdater) DeleteSchedule(ctx context.Context, scheduleID int64) error { - ie := mu.ieFactory(mu.ctx, mu.sessionData) + ie := mu.ieFactory.NewInternalExecutor(mu.sessionData) _, err := ie.ExecEx( ctx, "delete-schedule", @@ -226,7 +226,7 @@ func (mu metadataUpdater) DeleteSchedule(ctx context.Context, scheduleID int64) func (mu metadataUpdater) DeleteZoneConfig( ctx context.Context, id descpb.ID, ) (numAffected int, err error) { - ie := mu.ieFactory(mu.ctx, mu.sessionData) + ie := mu.ieFactory.NewInternalExecutor(mu.sessionData) return ie.Exec(ctx, "delete-zone", mu.txn, "DELETE FROM system.zones WHERE id = $1", id) } @@ -235,7 +235,7 @@ func (mu metadataUpdater) DeleteZoneConfig( func (mu metadataUpdater) UpsertZoneConfig( ctx context.Context, id descpb.ID, zone *zonepb.ZoneConfig, ) (numAffected int, err error) { - ie := mu.ieFactory(mu.ctx, mu.sessionData) + ie := mu.ieFactory.NewInternalExecutor(mu.sessionData) bytes, err := protoutil.Marshal(zone) if err != nil { return 0, pgerror.Wrap(err, pgcode.CheckViolation, "could not marshal zone config") diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 6198882ebbed..b07530a75807 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1306,7 +1306,7 @@ type ExecutorConfig struct { // InternalExecutorFactory is used to create an InternalExecutor binded with // SessionData and other ExtraTxnState. // This is currently only for builtin functions where we need to execute sql. - InternalExecutorFactory sqlutil.SessionBoundInternalExecutorFactory + InternalExecutorFactory sqlutil.InternalExecutorFactory // ConsistencyChecker is to generate the results in calls to // crdb_internal.check_consistency. diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index 1a48ccb20a02..ef9def87ab9b 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -149,10 +149,10 @@ type ServerConfig struct { // Dialer for communication between SQL nodes/pods. PodNodeDialer *nodedialer.Dialer - // SessionBoundInternalExecutorFactory is used to construct session-bound + // InternalExecutorFactory is used to construct session-bound // executors. The idea is that a higher-layer binds some of the arguments // required, so that users of ServerConfig don't have to care about them. - SessionBoundInternalExecutorFactory sqlutil.SessionBoundInternalExecutorFactory + InternalExecutorFactory sqlutil.InternalExecutorFactory ExternalStorage cloud.ExternalStorageFactory ExternalStorageFromURI cloud.ExternalStorageFromURIFactory diff --git a/pkg/sql/execstats/traceanalyzer_test.go b/pkg/sql/execstats/traceanalyzer_test.go index 7aeb41902e46..169d49a1b60c 100644 --- a/pkg/sql/execstats/traceanalyzer_test.go +++ b/pkg/sql/execstats/traceanalyzer_test.go @@ -108,7 +108,7 @@ func TestTraceAnalyzer(t *testing.T) { for _, vectorizeMode := range []sessiondatapb.VectorizeExecMode{sessiondatapb.VectorizeOff, sessiondatapb.VectorizeOn} { execCtx, finishAndCollect := tracing.ContextWithRecordingSpan(ctx, execCfg.AmbientCtx.Tracer, t.Name()) defer finishAndCollect() - ie := execCfg.InternalExecutorFactory(ctx, &sessiondata.SessionData{ + ie := execCfg.InternalExecutorFactory.NewInternalExecutor(&sessiondata.SessionData{ SessionData: sessiondatapb.SessionData{ VectorizeMode: vectorizeMode, }, diff --git a/pkg/sql/importer/import_job.go b/pkg/sql/importer/import_job.go index ef4ffa4db65d..ac7f96ca6c0e 100644 --- a/pkg/sql/importer/import_job.go +++ b/pkg/sql/importer/import_job.go @@ -1121,7 +1121,7 @@ func (r *importResumer) checkVirtualConstraints( } if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - ie := execCfg.InternalExecutorFactory(ctx, sql.NewFakeSessionData(execCfg.SV())) + ie := execCfg.InternalExecutorFactory.NewInternalExecutor(sql.NewFakeSessionData(execCfg.SV())) return ie.WithSyntheticDescriptors([]catalog.Descriptor{desc}, func() error { return sql.RevalidateUniqueConstraintsInTable(ctx, txn, user, ie, desc) }) diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index bf98415d2bb8..8c6b332daead 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -344,8 +344,7 @@ func (ih *instrumentationHelper) Finish( var bundle diagnosticsBundle if ih.collectBundle { - ie := p.extendedEvalCtx.ExecCfg.InternalExecutorFactory( - p.EvalContext().Context, + ie := p.extendedEvalCtx.ExecCfg.InternalExecutorFactory.NewInternalExecutor( p.SessionData(), ) phaseTimes := statsCollector.PhaseTimes() diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 520cec4ea509..bb38f4439cdc 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -984,3 +984,32 @@ func (ncl *noopClientLock) RTrim(_ context.Context, pos CmdPos) { } ncl.results = ncl.results[:i] } + +// InternalExecutorFactory stored information needed to construct a new +// internal executor. +type InternalExecutorFactory struct { + server *Server + memMetrics MemoryMetrics + monitor *mon.BytesMonitor +} + +// NewInternalExecutorFactory returns a new internal executor factory. +func NewInternalExecutorFactory( + s *Server, memMetrics MemoryMetrics, monitor *mon.BytesMonitor, +) *InternalExecutorFactory { + return &InternalExecutorFactory{ + server: s, + memMetrics: memMetrics, + monitor: monitor, + } +} + +// NewInternalExecutor constructs a new internal executor. +// TODO (janexing): this should be deprecated soon. +func (ief *InternalExecutorFactory) NewInternalExecutor( + sd *sessiondata.SessionData, +) sqlutil.InternalExecutor { + ie := MakeInternalExecutor(ief.server, ief.memMetrics, ief.monitor) + ie.SetSessionData(sd) + return &ie +} diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index ee2ddbc867d1..3ff592cd0c39 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -1165,8 +1165,7 @@ func (e *urlOutputter) finish() (url.URL, error) { func (ef *execFactory) showEnv(plan string, envOpts exec.ExplainEnvData) (exec.Node, error) { var out urlOutputter - ie := ef.planner.extendedEvalCtx.ExecCfg.InternalExecutorFactory( - ef.planner.EvalContext().Context, + ie := ef.planner.extendedEvalCtx.ExecCfg.InternalExecutorFactory.NewInternalExecutor( ef.planner.SessionData(), ) c := makeStmtEnvCollector(ef.planner.EvalContext().Context, ie.(*InternalExecutor)) diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index bb2dba470749..68e7f567fe06 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -901,7 +901,7 @@ func (p *planner) QueryRowEx( stmt string, qargs ...interface{}, ) (tree.Datums, error) { - ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) + ie := p.ExecCfg().InternalExecutorFactory.NewInternalExecutor(p.SessionData()) return ie.QueryRowEx(ctx, opName, p.Txn(), override, stmt, qargs...) } @@ -918,7 +918,7 @@ func (p *planner) QueryIteratorEx( stmt string, qargs ...interface{}, ) (eval.InternalRows, error) { - ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) + ie := p.ExecCfg().InternalExecutorFactory.NewInternalExecutor(p.SessionData()) rows, err := ie.QueryIteratorEx(ctx, opName, p.Txn(), override, stmt, qargs...) return rows.(eval.InternalRows), err } diff --git a/pkg/sql/resolve_oid.go b/pkg/sql/resolve_oid.go index 7aa67eb8f588..da9b820aad49 100644 --- a/pkg/sql/resolve_oid.go +++ b/pkg/sql/resolve_oid.go @@ -29,7 +29,7 @@ import ( func (p *planner) ResolveOIDFromString( ctx context.Context, resultType *types.T, toResolve *tree.DString, ) (_ *tree.DOid, errSafeToIgnore bool, _ error) { - ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) + ie := p.ExecCfg().InternalExecutorFactory.NewInternalExecutor(p.SessionData()) return resolveOID( ctx, p.Txn(), ie, @@ -41,7 +41,7 @@ func (p *planner) ResolveOIDFromString( func (p *planner) ResolveOIDFromOID( ctx context.Context, resultType *types.T, toResolve *tree.DOid, ) (_ *tree.DOid, errSafeToIgnore bool, _ error) { - ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) + ie := p.ExecCfg().InternalExecutorFactory.NewInternalExecutor(p.SessionData()) return resolveOID( ctx, p.Txn(), ie, diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 8bf624bd74e5..253451721420 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -115,7 +115,7 @@ type SchemaChanger struct { clock *hlc.Clock settings *cluster.Settings execCfg *ExecutorConfig - ieFactory sqlutil.SessionBoundInternalExecutorFactory + ieFactory sqlutil.InternalExecutorFactory // mvccCompliantAddIndex is set to true early in exec if we // find that the schema change was created under the @@ -145,11 +145,7 @@ func NewSchemaChangerForTesting( execCfg: execCfg, // Note that this doesn't end up actually being session-bound but that's // good enough for testing. - ieFactory: func( - ctx context.Context, sd *sessiondata.SessionData, - ) sqlutil.InternalExecutor { - return execCfg.InternalExecutor - }, + ieFactory: execCfg.InternalExecutorFactory, metrics: NewSchemaChangerMetrics(), clock: db.Clock(), distSQLPlanner: execCfg.DistSQLPlanner, @@ -2573,10 +2569,8 @@ func (r schemaChangeResumer) Resume(ctx context.Context, execCtx interface{}) er clock: p.ExecCfg().Clock, settings: p.ExecCfg().Settings, execCfg: p.ExecCfg(), - ieFactory: func(ctx context.Context, sd *sessiondata.SessionData) sqlutil.InternalExecutor { - return r.job.MakeSessionBoundInternalExecutor(ctx, sd) - }, - metrics: p.ExecCfg().SchemaChangerMetrics, + ieFactory: r.job.GetInternalExecutorFactory(), + metrics: p.ExecCfg().SchemaChangerMetrics, } opts := retry.Options{ InitialBackoff: 20 * time.Millisecond, @@ -2763,9 +2757,7 @@ func (r schemaChangeResumer) OnFailOrCancel( clock: p.ExecCfg().Clock, settings: p.ExecCfg().Settings, execCfg: p.ExecCfg(), - ieFactory: func(ctx context.Context, sd *sessiondata.SessionData) sqlutil.InternalExecutor { - return r.job.MakeSessionBoundInternalExecutor(ctx, sd) - }, + ieFactory: r.job.GetInternalExecutorFactory(), } if r.job.Payload().FinalResumeError == nil { diff --git a/pkg/sql/schemachanger/scdeps/index_validator.go b/pkg/sql/schemachanger/scdeps/index_validator.go index f211b1f4eef1..aac2e01eff1f 100644 --- a/pkg/sql/schemachanger/scdeps/index_validator.go +++ b/pkg/sql/schemachanger/scdeps/index_validator.go @@ -54,7 +54,7 @@ type indexValidator struct { db *kv.DB codec keys.SQLCodec settings *cluster.Settings - ieFactory sqlutil.SessionBoundInternalExecutorFactory + ieFactory sqlutil.InternalExecutorFactory validateForwardIndexes ValidateForwardIndexesFn validateInvertedIndexes ValidateInvertedIndexesFn newFakeSessionData NewFakeSessionDataFn @@ -103,7 +103,7 @@ func (iv indexValidator) makeHistoricalInternalExecTxnRunner() sqlutil.Historica if err != nil { return err } - return fn(ctx, validationTxn, iv.ieFactory(ctx, iv.newFakeSessionData(&iv.settings.SV))) + return fn(ctx, validationTxn, iv.ieFactory.NewInternalExecutor(iv.newFakeSessionData(&iv.settings.SV))) } } @@ -113,7 +113,7 @@ func NewIndexValidator( db *kv.DB, codec keys.SQLCodec, settings *cluster.Settings, - ieFactory sqlutil.SessionBoundInternalExecutorFactory, + ieFactory sqlutil.InternalExecutorFactory, validateForwardIndexes ValidateForwardIndexesFn, validateInvertedIndexes ValidateInvertedIndexesFn, newFakeSessionData NewFakeSessionDataFn, diff --git a/pkg/sql/sql_cursor.go b/pkg/sql/sql_cursor.go index 7a6013294883..26b9609e1646 100644 --- a/pkg/sql/sql_cursor.go +++ b/pkg/sql/sql_cursor.go @@ -48,7 +48,7 @@ func (p *planner) DeclareCursor(ctx context.Context, s *tree.DeclareCursor) (pla return nil, pgerror.Newf(pgcode.NoActiveSQLTransaction, "DECLARE CURSOR can only be used in transaction blocks") } - ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) + ie := p.ExecCfg().InternalExecutorFactory.NewInternalExecutor(p.SessionData()) cursorName := s.Name.String() if cursor := p.sqlCursors.getCursor(cursorName); cursor != nil { return nil, pgerror.Newf(pgcode.DuplicateCursor, "cursor %q already exists", cursorName) diff --git a/pkg/sql/sqlutil/internal_executor.go b/pkg/sql/sqlutil/internal_executor.go index dcf41cd57d35..d0844db34792 100644 --- a/pkg/sql/sqlutil/internal_executor.go +++ b/pkg/sql/sqlutil/internal_executor.go @@ -196,11 +196,14 @@ type InternalRows interface { Types() colinfo.ResultColumns } -// SessionBoundInternalExecutorFactory is a function that produces a "session -// bound" internal executor. -type SessionBoundInternalExecutorFactory func( - context.Context, *sessiondata.SessionData, -) InternalExecutor +// InternalExecutorFactory is an interface that allow the creation of an +// internal executor, and run sql statement without a txn with the internal +// executor. +type InternalExecutorFactory interface { + // NewInternalExecutor constructs a new internal executor. + // TODO (janexing): this should be deprecated soon. + NewInternalExecutor(sd *sessiondata.SessionData) InternalExecutor +} // InternalExecFn is the type of functions that operates using an internalExecutor. type InternalExecFn func(ctx context.Context, txn *kv.Txn, ie InternalExecutor) error diff --git a/pkg/sql/temporary_schema.go b/pkg/sql/temporary_schema.go index a31410edbb0f..9d8bb5631202 100644 --- a/pkg/sql/temporary_schema.go +++ b/pkg/sql/temporary_schema.go @@ -404,7 +404,7 @@ type TemporaryObjectCleaner struct { settings *cluster.Settings db *kv.DB codec keys.SQLCodec - makeSessionBoundInternalExecutor sqlutil.SessionBoundInternalExecutorFactory + makeSessionBoundInternalExecutor sqlutil.InternalExecutorFactory // statusServer gives access to the SQLStatus service. statusServer serverpb.SQLStatusServer isMeta1LeaseholderFunc isMeta1LeaseholderFunc @@ -433,7 +433,7 @@ func NewTemporaryObjectCleaner( db *kv.DB, codec keys.SQLCodec, registry *metric.Registry, - makeSessionBoundInternalExecutor sqlutil.SessionBoundInternalExecutorFactory, + makeSessionBoundInternalExecutor sqlutil.InternalExecutorFactory, statusServer serverpb.SQLStatusServer, isMeta1LeaseholderFunc isMeta1LeaseholderFunc, testingKnobs ExecutorTestingKnobs, @@ -587,7 +587,7 @@ func (c *TemporaryObjectCleaner) doTemporaryObjectCleanup( } // Clean up temporary data for inactive sessions. - ie := c.makeSessionBoundInternalExecutor(ctx, &sessiondata.SessionData{}) + ie := c.makeSessionBoundInternalExecutor.NewInternalExecutor(&sessiondata.SessionData{}) for sessionID := range sessionIDs { if _, ok := activeSessions[sessionID.Uint128]; !ok { log.Eventf(ctx, "cleaning up temporary object for session %q", sessionID) diff --git a/pkg/sql/unsplit.go b/pkg/sql/unsplit.go index c7a579b303a3..dfa7f5f3443f 100644 --- a/pkg/sql/unsplit.go +++ b/pkg/sql/unsplit.go @@ -105,7 +105,7 @@ func (n *unsplitAllNode) startExec(params runParams) error { if n.index.GetID() != n.tableDesc.GetPrimaryIndexID() { indexName = n.index.GetName() } - ie := params.p.ExecCfg().InternalExecutorFactory(params.ctx, params.SessionData()) + ie := params.p.ExecCfg().InternalExecutorFactory.NewInternalExecutor(params.SessionData()) it, err := ie.QueryIteratorEx( params.ctx, "split points query", params.p.txn, sessiondata.NoSessionDataOverride, statement, From 3427691c3be6574bc75caae87e1ed89f487801a7 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Tue, 9 Aug 2022 12:43:59 -0500 Subject: [PATCH 3/7] sql: modified conn executor init logic under internal executor This commit modified how a child conn executor is initialized under internal executor. We modified the logic of initializing a conn executor under internal executor. If there's a descriptor collection, txn state, job collection and schema change job passed to the internal executor, we let the child conn executor inherit them, instead of creating a new set for itself. Release note: None --- .../protectedts/ptstorage/storage_test.go | 11 + pkg/sql/conn_executor.go | 76 +------ pkg/sql/internal.go | 195 +++++++++++++++--- pkg/sql/sqlutil/internal_executor.go | 11 + 4 files changed, 200 insertions(+), 93 deletions(-) diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go index 82f8e7b74297..ff6c1498a40b 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go @@ -879,6 +879,17 @@ type wrappedInternalExecutor struct { } } +func (ie *wrappedInternalExecutor) QueryBufferedExWithCols( + ctx context.Context, + opName string, + txn *kv.Txn, + session sessiondata.InternalExecutorOverride, + stmt string, + qargs ...interface{}, +) ([]tree.Datums, colinfo.ResultColumns, error) { + panic("unimplemented") +} + var _ sqlutil.InternalExecutor = &wrappedInternalExecutor{} func (ie *wrappedInternalExecutor) Exec( diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 8a80040ee2f8..dfa34b64daaf 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -29,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descidgen" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -719,6 +718,7 @@ func (s *Server) SetupConn( ex := s.newConnExecutor( ctx, sdMutIterator, stmtBuf, clientComm, memMetrics, &s.Metrics, s.sqlStats.GetApplicationStats(sd.ApplicationName), + nil, /* postSetupFn */ ) return ConnectionHandler{ex}, nil } @@ -878,6 +878,10 @@ func (s *Server) newConnExecutor( memMetrics MemoryMetrics, srvMetrics *Metrics, applicationStats sqlstats.ApplicationStats, + // postSetupFn is to override certain field of a conn executor. + // It is set when conn executor is init under an internal executor + // with a not-nil txn. + postSetupFn func(ex *connExecutor), ) *connExecutor { // Create the various monitors. // The session monitors are started in activate(). @@ -997,76 +1001,12 @@ func (s *Server) newConnExecutor( ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{} ex.extraTxnState.createdSequences = make(map[descpb.ID]struct{}) - ex.initPlanner(ctx, &ex.planner) - - return ex -} - -// newConnExecutorWithTxn creates a connExecutor that will execute statements -// under a higher-level txn. This connExecutor runs with a different state -// machine, much reduced from the regular one. It cannot initiate or end -// transactions (so, no BEGIN, COMMIT, ROLLBACK, no auto-commit, no automatic -// retries). -// -// If there is no error, this function also activate()s the returned -// executor, so the caller does not need to run the -// activation. However this means that run() or close() must be called -// to release resources. -func (s *Server) newConnExecutorWithTxn( - ctx context.Context, - sdMutIterator *sessionDataMutatorIterator, - stmtBuf *StmtBuf, - clientComm ClientComm, - parentMon *mon.BytesMonitor, - memMetrics MemoryMetrics, - srvMetrics *Metrics, - txn *kv.Txn, - syntheticDescs []catalog.Descriptor, - applicationStats sqlstats.ApplicationStats, -) *connExecutor { - ex := s.newConnExecutor( - ctx, - sdMutIterator, - stmtBuf, - clientComm, - memMetrics, - srvMetrics, - applicationStats, - ) - if txn.Type() == kv.LeafTxn { - // If the txn is a leaf txn it is not allowed to perform mutations. For - // sanity, set read only on the session. - ex.dataMutatorIterator.applyOnEachMutator(func(m sessionDataMutator) { - m.SetReadOnly(true) - }) + if postSetupFn != nil { + postSetupFn(ex) } - // The new transaction stuff below requires active monitors and traces, so - // we need to activate the executor now. - ex.activate(ctx, parentMon, &mon.BoundAccount{}) + ex.initPlanner(ctx, &ex.planner) - // Perform some surgery on the executor - replace its state machine and - // initialize the state. - ex.machine = fsm.MakeMachine( - BoundTxnStateTransitions, - stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, - &ex.state, - ) - ex.state.resetForNewSQLTxn( - ctx, - explicitTxn, - txn.ReadTimestamp().GoTime(), - nil, /* historicalTimestamp */ - roachpb.UnspecifiedUserPriority, - tree.ReadWrite, - txn, - ex.transitionCtx, - ex.QualityOfService()) - - // Modify the Collection to match the parent executor's Collection. - // This allows the InternalExecutor to see schema changes made by the - // parent executor. - ex.extraTxnState.descCollection.SetSyntheticDescriptors(syntheticDescs) return ex } diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index bb38f4439cdc..e3afb6a90d85 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -17,11 +17,15 @@ import ( "sync" "time" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase" "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" @@ -29,9 +33,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/fsm" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -72,6 +78,14 @@ type InternalExecutor struct { // // Warning: Not safe for concurrent use from multiple goroutines. syntheticDescriptors []catalog.Descriptor + + // extraTxnState is to store extra transaction state info that + // will be passed to an internal executor. It should only be set when the + // internal executor is used under a not-nil txn. + // TODO (janexing): we will deprecate this field with *connExecutor ASAP. + // An internal executor, if used with a not nil txn, should be always coupled + // with a single connExecutor which runs all passed sql statements. + extraTxnState *extraTxnState } // WithSyntheticDescriptors sets the synthetic descriptors before running the @@ -128,8 +142,41 @@ func MakeInternalExecutorMemMonitor( // // SetSessionData cannot be called concurrently with query execution. func (ie *InternalExecutor) SetSessionData(sessionData *sessiondata.SessionData) { - ie.s.populateMinimalSessionData(sessionData) - ie.sessionDataStack = sessiondata.NewStack(sessionData) + if sessionData != nil { + ie.s.populateMinimalSessionData(sessionData) + ie.sessionDataStack = sessiondata.NewStack(sessionData) + } +} + +func (ie *InternalExecutor) runWithEx( + ctx context.Context, + txn *kv.Txn, + w ieResultWriter, + sd *sessiondata.SessionData, + stmtBuf *StmtBuf, + wg *sync.WaitGroup, + syncCallback func([]resWithPos), + errCallback func(error), +) error { + ex, err := ie.initConnEx(ctx, txn, w, sd, stmtBuf, syncCallback) + if err != nil { + return err + } + wg.Add(1) + go func() { + if err := ex.run(ctx, ie.mon, &mon.BoundAccount{} /*reserved*/, nil /* cancel */); err != nil { + sqltelemetry.RecordError(ctx, err, &ex.server.cfg.Settings.SV) + errCallback(err) + } + w.finish() + closeMode := normalClose + if txn != nil { + closeMode = externalTxnClose + } + ex.close(ctx, closeMode) + wg.Done() + }() + return nil } // initConnEx creates a connExecutor and runs it on a separate goroutine. It @@ -149,10 +196,8 @@ func (ie *InternalExecutor) initConnEx( w ieResultWriter, sd *sessiondata.SessionData, stmtBuf *StmtBuf, - wg *sync.WaitGroup, syncCallback func([]resWithPos), - errCallback func(error), -) { +) (*connExecutor, error) { clientComm := &internalClientComm{ w: w, // init lastDelivered below the position of the first result (0). @@ -180,6 +225,7 @@ func (ie *InternalExecutor) initConnEx( sds := sessiondata.NewStack(sd) sdMutIterator := ie.s.makeSessionDataMutatorIterator(sds, nil /* sessionDefaults */) var ex *connExecutor + var err error if txn == nil { ex = ie.s.newConnExecutor( ctx, @@ -189,38 +235,118 @@ func (ie *InternalExecutor) initConnEx( ie.memMetrics, &ie.s.InternalMetrics, applicationStats, + nil, /* postSetupFn */ ) } else { - ex = ie.s.newConnExecutorWithTxn( + ex, err = ie.newConnExecutorWithTxn( ctx, + txn, sdMutIterator, stmtBuf, clientComm, - ie.mon, - ie.memMetrics, - &ie.s.InternalMetrics, - txn, - ie.syntheticDescriptors, applicationStats, ) + if err != nil { + return nil, err + } } ex.executorType = executorTypeInternal + return ex, nil - wg.Add(1) - go func() { - if err := ex.run(ctx, ie.mon, &mon.BoundAccount{} /*reserved*/, nil /* cancel */); err != nil { - sqltelemetry.RecordError(ctx, err, &ex.server.cfg.Settings.SV) - errCallback(err) - } - w.finish() - closeMode := normalClose - if txn != nil { - closeMode = externalTxnClose +} + +// newConnExecutorWithTxn creates a connExecutor that will execute statements +// under a higher-level txn. This connExecutor runs with a different state +// machine, much reduced from the regular one. It cannot initiate or end +// transactions (so, no BEGIN, COMMIT, ROLLBACK, no auto-commit, no automatic +// retries). It may inherit the descriptor collection and txn state from the +// internal executor. +// +// If there is no error, this function also activate()s the returned +// executor, so the caller does not need to run the +// activation. However this means that run() or close() must be called +// to release resources. +// TODO (janexing): txn should be passed to ie.extraTxnState rather than +// as a parameter to this function. +func (ie *InternalExecutor) newConnExecutorWithTxn( + ctx context.Context, + txn *kv.Txn, + sdMutIterator *sessionDataMutatorIterator, + stmtBuf *StmtBuf, + clientComm ClientComm, + applicationStats sqlstats.ApplicationStats, +) (ex *connExecutor, err error) { + // If an internal executor is run with a not-nil txn, we may want to + // let it inherit the descriptor collection, schema change job records + // and job collections from the caller. + postSetupFn := func(ex *connExecutor) { + if ie.extraTxnState != nil { + if ie.extraTxnState.descCollection != nil { + ex.extraTxnState.descCollection = ie.extraTxnState.descCollection + ex.extraTxnState.fromOuterTxn = true + ex.extraTxnState.schemaChangeJobRecords = ie.extraTxnState.schemaChangeJobRecords + if ie.extraTxnState.jobs != nil { + ex.extraTxnState.jobs = *ie.extraTxnState.jobs + } + ex.extraTxnState.schemaChangerState = ie.extraTxnState.schemaChangerState + } } - ex.close(ctx, closeMode) - wg.Done() - }() + } + + ex = ie.s.newConnExecutor( + ctx, + sdMutIterator, + stmtBuf, + clientComm, + ie.memMetrics, + &ie.s.InternalMetrics, + applicationStats, + postSetupFn, + ) + + if txn.Type() == kv.LeafTxn { + // If the txn is a leaf txn it is not allowed to perform mutations. For + // sanity, set read only on the session. + ex.dataMutatorIterator.applyOnEachMutator(func(m sessionDataMutator) { + m.SetReadOnly(true) + }) + } + + // The new transaction stuff below requires active monitors and traces, so + // we need to activate the executor now. + ex.activate(ctx, ie.mon, &mon.BoundAccount{}) + + // Perform some surgery on the executor - replace its state machine and + // initialize the state, and its jobs and schema change job records if + // they are passed by the caller. + // The txn is always set as explicit, because when running in an outer txn, + // the conn executor inside an internal executor is generally not at liberty + // to commit the transaction. + // Thus, to disallow auto-commit and auto-retries, we make the txn + // here an explicit one. + ex.machine = fsm.MakeMachine( + BoundTxnStateTransitions, + stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, + &ex.state, + ) + + ex.state.resetForNewSQLTxn( + ctx, + explicitTxn, + txn.ReadTimestamp().GoTime(), + nil, /* historicalTimestamp */ + roachpb.UnspecifiedUserPriority, + tree.ReadWrite, + txn, + ex.transitionCtx, + ex.QualityOfService()) + + // Modify the Collection to match the parent executor's Collection. + // This allows the InternalExecutor to see schema changes made by the + // parent executor. + ex.extraTxnState.descCollection.SetSyntheticDescriptors(ie.syntheticDescriptors) + return ex, err } type ieIteratorResult struct { @@ -740,7 +866,10 @@ func (ie *InternalExecutor) execInternal( errCallback := func(err error) { _ = rw.addResult(ctx, ieIteratorResult{err: err}) } - ie.initConnEx(ctx, txn, rw, sd, stmtBuf, &wg, syncCallback, errCallback) + err = ie.runWithEx(ctx, txn, rw, sd, stmtBuf, &wg, syncCallback, errCallback) + if err != nil { + return nil, err + } typeHints := make(tree.PlaceholderTypes, len(datums)) for i, d := range datums { @@ -985,6 +1114,22 @@ func (ncl *noopClientLock) RTrim(_ context.Context, pos CmdPos) { ncl.results = ncl.results[:i] } +// extraTxnState is to store extra transaction state info that +// will be passed to an internal executor when it's used under a txn context. +// It should not be exported from the sql package. +// TODO (janexing): we will deprecate this struct ASAP. It only exists as a +// stop-gap before we implement InternalExecutor.ConnExecutor to run all +// sql statements under a transaction. This struct is not ideal for an internal +// executor in that it may lead to surprising bugs whereby we forget to add +// fields here and keep them in sync. +type extraTxnState struct { + txn *kv.Txn + descCollection *descs.Collection + jobs *jobsCollection + schemaChangeJobRecords map[descpb.ID]*jobs.Record + schemaChangerState *SchemaChangerState +} + // InternalExecutorFactory stored information needed to construct a new // internal executor. type InternalExecutorFactory struct { diff --git a/pkg/sql/sqlutil/internal_executor.go b/pkg/sql/sqlutil/internal_executor.go index d0844db34792..1825c8051957 100644 --- a/pkg/sql/sqlutil/internal_executor.go +++ b/pkg/sql/sqlutil/internal_executor.go @@ -144,6 +144,17 @@ type InternalExecutor interface { qargs ...interface{}, ) (InternalRows, error) + // QueryBufferedExWithCols is like QueryBufferedEx, additionally returning the computed + // ResultColumns of the input query. + QueryBufferedExWithCols( + ctx context.Context, + opName string, + txn *kv.Txn, + session sessiondata.InternalExecutorOverride, + stmt string, + qargs ...interface{}, + ) ([]tree.Datums, colinfo.ResultColumns, error) + // WithSyntheticDescriptors sets the synthetic descriptors before running the // the provided closure and resets them afterward. Used for queries/statements // that need to use in-memory synthetic descriptors different from descriptors From 8b17f6aa167a6967e2aad20f546579d0ecefbbc6 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Wed, 3 Aug 2022 11:08:16 -0500 Subject: [PATCH 4/7] sql: introduce query functions under sql.planner Currently, the internal executor always create its own descriptor collections, txn state, job collection and etc. for its conn executor, even though it's run underneath a "parent" query. These recreation can unneccesarily reduce the query efficiency in some use cases, such as when an internal executor is used under a planner context. In this case, the internal executor is expected to inherit these info from the planner, rather than creating its own. To make this rule more explicit, this commit adds a series of query functions under `sql.planner`. Each of these functions wrap both the init of an internal executor and the query execution. In this way, the internal executor always stores the info inherited from the parent planner, and will pass it to its child conn executor. fixes #69495 Release note: None --- .../testdata/benchmark_expectations | 4 +- pkg/sql/planner.go | 92 ++++++++++++++++++- 2 files changed, 89 insertions(+), 7 deletions(-) diff --git a/pkg/bench/rttanalysis/testdata/benchmark_expectations b/pkg/bench/rttanalysis/testdata/benchmark_expectations index c9d14d2b734e..320b4427a616 100644 --- a/pkg/bench/rttanalysis/testdata/benchmark_expectations +++ b/pkg/bench/rttanalysis/testdata/benchmark_expectations @@ -67,11 +67,11 @@ exp,benchmark 2,ORMQueries/has_table_privilege_1 4,ORMQueries/has_table_privilege_3 6,ORMQueries/has_table_privilege_5 -15,ORMQueries/information_schema._pg_index_position +3,ORMQueries/information_schema._pg_index_position 2,ORMQueries/pg_attribute 2,ORMQueries/pg_class 11,ORMQueries/pg_is_other_temp_schema -23,ORMQueries/pg_is_other_temp_schema_multiple_times +18,ORMQueries/pg_is_other_temp_schema_multiple_times 4,ORMQueries/pg_my_temp_schema 4,ORMQueries/pg_my_temp_schema_multiple_times 4,ORMQueries/pg_namespace diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 68e7f567fe06..78c7c3a5f9dc 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" @@ -39,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/upgrade" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" @@ -889,6 +891,26 @@ func validateDescriptor(ctx context.Context, p *planner, descriptor catalog.Desc ) } +// IsActive implements the Planner interface. +func (p *planner) IsActive(ctx context.Context, key clusterversion.Key) bool { + return p.execCfg.Settings.Version.IsActive(ctx, key) +} + +// initInternalExecutor is to initialize an internal executor with a planner. +// Note that this function should only be used when using internal executor +// to run sql statement under the planner context. +func initInternalExecutor(ctx context.Context, p *planner) sqlutil.InternalExecutor { + ie := p.ExecCfg().InternalExecutorFactory.NewInternalExecutor(p.SessionData()) + ie.(*InternalExecutor).extraTxnState = &extraTxnState{ + txn: p.Txn(), + descCollection: p.Descriptors(), + jobs: p.extendedEvalCtx.Jobs, + schemaChangeJobRecords: p.extendedEvalCtx.SchemaChangeJobRecords, + schemaChangerState: p.extendedEvalCtx.SchemaChangerState, + } + return ie +} + // QueryRowEx executes the supplied SQL statement and returns a single row, or // nil if no row is found, or an error if more that one row is returned. // @@ -901,10 +923,23 @@ func (p *planner) QueryRowEx( stmt string, qargs ...interface{}, ) (tree.Datums, error) { - ie := p.ExecCfg().InternalExecutorFactory.NewInternalExecutor(p.SessionData()) + ie := initInternalExecutor(ctx, p) return ie.QueryRowEx(ctx, opName, p.Txn(), override, stmt, qargs...) } +// ExecEx is like Exec, but allows the caller to override some session data +// fields (e.g. the user). +func (p *planner) ExecEx( + ctx context.Context, + opName string, + override sessiondata.InternalExecutorOverride, + stmt string, + qargs ...interface{}, +) (int, error) { + ie := initInternalExecutor(ctx, p) + return ie.ExecEx(ctx, opName, p.Txn(), override, stmt, qargs...) +} + // QueryIteratorEx executes the query, returning an iterator that can be used // to get the results. If the call is successful, the returned iterator // *must* be closed. @@ -918,12 +953,59 @@ func (p *planner) QueryIteratorEx( stmt string, qargs ...interface{}, ) (eval.InternalRows, error) { - ie := p.ExecCfg().InternalExecutorFactory.NewInternalExecutor(p.SessionData()) + ie := initInternalExecutor(ctx, p) rows, err := ie.QueryIteratorEx(ctx, opName, p.Txn(), override, stmt, qargs...) return rows.(eval.InternalRows), err } -// IsActive implements the Planner interface. -func (p *planner) IsActive(ctx context.Context, key clusterversion.Key) bool { - return p.execCfg.Settings.Version.IsActive(ctx, key) +// QueryBufferedEx executes the supplied SQL statement and returns the resulting +// rows (meaning all of them are buffered at once). +// The fields set in session that are set override the respective fields if they +// have previously been set through SetSessionData(). +func (p *planner) QueryBufferedEx( + ctx context.Context, + opName string, + session sessiondata.InternalExecutorOverride, + stmt string, + qargs ...interface{}, +) ([]tree.Datums, error) { + ie := initInternalExecutor(ctx, p) + return ie.QueryBufferedEx(ctx, opName, p.Txn(), session, stmt, qargs...) +} + +// QueryRowExWithCols is like QueryRowEx, additionally returning the computed +// ResultColumns of the input query. +func (p *planner) QueryRowExWithCols( + ctx context.Context, + opName string, + session sessiondata.InternalExecutorOverride, + stmt string, + qargs ...interface{}, +) (tree.Datums, colinfo.ResultColumns, error) { + ie := initInternalExecutor(ctx, p) + return ie.QueryRowExWithCols(ctx, opName, p.Txn(), session, stmt, qargs...) +} + +// QueryBufferedExWithCols is like QueryBufferedEx, additionally returning the +// computed ResultColumns of the input query. +func (p *planner) QueryBufferedExWithCols( + ctx context.Context, + opName string, + session sessiondata.InternalExecutorOverride, + stmt string, + qargs ...interface{}, +) ([]tree.Datums, colinfo.ResultColumns, error) { + ie := initInternalExecutor(ctx, p) + return ie.QueryBufferedExWithCols(ctx, opName, p.Txn(), session, stmt, qargs...) +} + +// WithInternalExecutor let user run multiple sql statements within the same +// internal executor initialized under a planner context. To run single sql +// statements, please use the query functions above. +func (p *planner) WithInternalExecutor( + ctx context.Context, + run func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error, +) error { + ie := initInternalExecutor(ctx, p) + return run(ctx, p.Txn(), ie) } From 9b23132516d972bd55e4eef6b853bc6d70ba56ba Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Mon, 27 Jun 2022 17:30:42 -0400 Subject: [PATCH 5/7] sql: introduce ieFactory.RunWithouTxn() and collectionFactory.TxnWithExecutor() This commit introduces two functions that allow users to run sql statements with an internal executor. We intend to limit the usage of a real internal executor only inside these functions, instead of free-floating or hanging off certain structs. In other words, we restrict the init of an internal executor. The motivation is that if an internal executor is used to run multiple sql statements in a txn manner, these executions are expected to use the same set of info (such as descriptor collections) among their conn executor. While this rule can be easily forgot by the user of internal executors. Hence we provide an interface that wraps the initialization of internal executors with the query executions, so that the users won't need to be worried. Informs: once all existing usages of the internal executors are replaced with the new interfaces proposed here, #70888 should be solved. Release note: None --- pkg/server/server_sql.go | 3 + pkg/sql/catalog/descs/factory.go | 25 ++++++ pkg/sql/catalog/descs/txn.go | 94 ++++++++++++++++++++ pkg/sql/internal.go | 128 +++++++++++++++++++++++++++ pkg/sql/sqlutil/internal_executor.go | 7 ++ 5 files changed, 257 insertions(+) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index c3b2c1f4188a..d1dc07aa9ac4 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -91,6 +91,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/startupmigrations" @@ -954,6 +955,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { ieFactoryMonitor, ) + collectionFactory.SetInternalExecutorWithTxn(ieFactory) + distSQLServer.ServerConfig.InternalExecutorFactory = ieFactory jobRegistry.SetInternalExecutorFactory(ieFactory) execCfg.IndexBackfiller = sql.NewIndexBackfiller(execCfg) diff --git a/pkg/sql/catalog/descs/factory.go b/pkg/sql/catalog/descs/factory.go index 655b38d1b5ca..3436908558f7 100644 --- a/pkg/sql/catalog/descs/factory.go +++ b/pkg/sql/catalog/descs/factory.go @@ -14,11 +14,15 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/hydrateddesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/mon" ) @@ -33,6 +37,19 @@ type CollectionFactory struct { spanConfigSplitter spanconfig.Splitter spanConfigLimiter spanconfig.Limiter defaultMonitor *mon.BytesMonitor + ieFactoryWithTxn InternalExecutorFactoryWithTxn +} + +// InternalExecutorFactoryWithTxn is used to create an internal executor +// with associated extra txn state information. +// It should only be used as a field hanging off CollectionFactory. +type InternalExecutorFactoryWithTxn interface { + NewInternalExecutorWithTxn( + sd *sessiondata.SessionData, + sv *settings.Values, + txn *kv.Txn, + descCol *Collection, + ) (sqlutil.InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc) } // NewCollectionFactory constructs a new CollectionFactory which holds onto @@ -84,3 +101,11 @@ func (cf *CollectionFactory) NewCollection( return newCollection(ctx, cf.leaseMgr, cf.settings, cf.codec, cf.hydrated, cf.systemDatabase, cf.virtualSchemas, temporarySchemaProvider, monitor) } + +// SetInternalExecutorWithTxn is to set the internal executor factory hanging +// off the collection factory. +func (cf *CollectionFactory) SetInternalExecutorWithTxn( + ieFactoryWithTxn InternalExecutorFactoryWithTxn, +) { + cf.ieFactoryWithTxn = ieFactoryWithTxn +} diff --git a/pkg/sql/catalog/descs/txn.go b/pkg/sql/catalog/descs/txn.go index 94659d833ae9..ee44ec4d3a0b 100644 --- a/pkg/sql/catalog/descs/txn.go +++ b/pkg/sql/catalog/descs/txn.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -40,6 +41,7 @@ var errTwoVersionInvariantViolated = errors.Errorf("two version invariant violat // // The passed transaction is pre-emptively anchored to the system config key on // the system tenant. +// Deprecated: Use cf.TxnWithExecutor(). func (cf *CollectionFactory) Txn( ctx context.Context, ie sqlutil.InternalExecutor, @@ -113,6 +115,98 @@ func (cf *CollectionFactory) Txn( } } +// TxnWithExecutor enables callers to run transactions with a *Collection such that all +// retrieved immutable descriptors are properly leased and all mutable +// descriptors are handled. The function deals with verifying the two version +// invariant and retrying when it is violated. Callers need not worry that they +// write mutable descriptors multiple times. The call will explicitly wait for +// the leases to drain on old versions of descriptors modified or deleted in the +// transaction; callers do not need to call lease.WaitForOneVersion. +// It also enables using internal executor to run sql queries in a txn manner. +// +// The passed transaction is pre-emptively anchored to the system config key on +// the system tenant. +func (cf *CollectionFactory) TxnWithExecutor( + ctx context.Context, + db *kv.DB, + sd *sessiondata.SessionData, + f func(ctx context.Context, txn *kv.Txn, descriptors *Collection, ie sqlutil.InternalExecutor) error, +) error { + // Waits for descriptors that were modified, skipping + // over ones that had their descriptor wiped. + waitForDescriptors := func(modifiedDescriptors []lease.IDVersion, deletedDescs catalog.DescriptorIDSet) error { + // Wait for a single version on leased descriptors. + for _, ld := range modifiedDescriptors { + waitForNoVersion := deletedDescs.Contains(ld.ID) + retryOpts := retry.Options{ + InitialBackoff: time.Millisecond, + Multiplier: 1.5, + MaxBackoff: time.Second, + } + // Detect unpublished ones. + if waitForNoVersion { + err := cf.leaseMgr.WaitForNoVersion(ctx, ld.ID, retryOpts) + if err != nil { + return err + } + } else { + _, err := cf.leaseMgr.WaitForOneVersion(ctx, ld.ID, retryOpts) + if err != nil { + return err + } + } + } + return nil + } + for { + var modifiedDescriptors []lease.IDVersion + var deletedDescs catalog.DescriptorIDSet + var descsCol *Collection + if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + modifiedDescriptors = nil + deletedDescs = catalog.DescriptorIDSet{} + descsCol = cf.NewCollection(ctx, nil /* temporarySchemaProvider */, nil /* monitor */) + defer func() { + descsCol.ReleaseAll(ctx) + }() + + ie, commitTxnFn := cf.ieFactoryWithTxn.NewInternalExecutorWithTxn(sd, &cf.settings.SV, txn, descsCol) + if err := f(ctx, txn, descsCol, ie); err != nil { + return err + } + + if err := commitTxnFn(ctx); err != nil { + return err + } + + if err := descsCol.ValidateUncommittedDescriptors(ctx, txn); err != nil { + return err + } + modifiedDescriptors = descsCol.GetDescriptorsWithNewVersion() + + if err := CheckSpanCountLimit( + ctx, descsCol, cf.spanConfigSplitter, cf.spanConfigLimiter, txn, + ); err != nil { + return err + } + retryErr, err := CheckTwoVersionInvariant( + ctx, db.Clock(), ie, descsCol, txn, nil /* onRetryBackoff */) + if retryErr { + return errTwoVersionInvariantViolated + } + deletedDescs = descsCol.deletedDescs + return err + }); errors.Is(err, errTwoVersionInvariantViolated) { + continue + } else { + if err == nil { + err = waitForDescriptors(modifiedDescriptors, deletedDescs) + } + return err + } + } +} + // CheckTwoVersionInvariant checks whether any new schema being modified written // at a version V has only valid leases at version = V - 1. A transaction retry // error as well as a boolean is returned whenever the invariant is violated. diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index e3afb6a90d85..9b40e3707565 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" @@ -120,6 +121,52 @@ func MakeInternalExecutor( } } +// newInternalExecutorWithTxn creates an Internal Executor with txn related +// information, and also a function that can be called to commit the txn. +// This function should only be used in the implementation of +// descs.CollectionFactory's InternalExecutorFactoryWithTxn. +// TODO (janexing): This function will be soon refactored after we change +// the internal executor infrastructure with a single conn executor for all +// sql statement executions within a txn. +func newInternalExecutorWithTxn( + s *Server, + sd *sessiondata.SessionData, + txn *kv.Txn, + memMetrics MemoryMetrics, + monitor *mon.BytesMonitor, + descCol *descs.Collection, + schemaChangeJobRecords map[descpb.ID]*jobs.Record, +) (*InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc) { + schemaChangerState := &SchemaChangerState{ + mode: sd.NewSchemaChangerMode, + } + ie := InternalExecutor{ + s: s, + mon: monitor, + memMetrics: memMetrics, + extraTxnState: &extraTxnState{ + txn: txn, + descCollection: descCol, + schemaChangeJobRecords: schemaChangeJobRecords, + schemaChangerState: schemaChangerState, + }, + } + ie.s.populateMinimalSessionData(sd) + ie.sessionDataStack = sessiondata.NewStack(sd) + + commitTxnFunc := func(ctx context.Context) error { + defer func() { + ie.releaseSchemaChangeJobRecords() + }() + if err := ie.commitTxn(ctx); err != nil { + return err + } + return nil + } + + return &ie, commitTxnFunc +} + // MakeInternalExecutorMemMonitor creates and starts memory monitor for an // InternalExecutor. func MakeInternalExecutorMemMonitor( @@ -966,6 +1013,42 @@ func (ie *InternalExecutor) execInternal( return r, nil } +// ReleaseSchemaChangeJobRecords is to release the schema change job records. +func (ie *InternalExecutor) releaseSchemaChangeJobRecords() { + for k := range ie.extraTxnState.schemaChangeJobRecords { + delete(ie.extraTxnState.schemaChangeJobRecords, k) + } +} + +// commitTxn is to commit the txn bound to the internal executor. +// It should only be used in CollectionFactory.TxnWithExecutor(). +func (ie *InternalExecutor) commitTxn(ctx context.Context) error { + if ie.extraTxnState == nil || ie.extraTxnState.txn == nil { + return errors.New("no txn to commit") + } + + var sd *sessiondata.SessionData + if ie.sessionDataStack != nil { + sd = ie.sessionDataStack.Top().Clone() + } else { + sd = ie.s.newSessionData(SessionArgs{}) + } + + rw := newAsyncIEResultChannel() + stmtBuf := NewStmtBuf() + + ex, err := ie.initConnEx(ctx, ie.extraTxnState.txn, rw, sd, stmtBuf, nil /* syncCallback */) + if err != nil { + return errors.Wrap(err, "cannot create conn executor to commit txn") + } + defer ex.close(ctx, externalTxnClose) + + if err := ex.commitSQLTransactionInternal(ctx); err != nil { + return err + } + return nil +} + // internalClientComm is an implementation of ClientComm used by the // InternalExecutor. Result rows are buffered in memory. type internalClientComm struct { @@ -1149,6 +1232,9 @@ func NewInternalExecutorFactory( } } +var _ sqlutil.InternalExecutorFactory = &InternalExecutorFactory{} +var _ descs.InternalExecutorFactoryWithTxn = &InternalExecutorFactory{} + // NewInternalExecutor constructs a new internal executor. // TODO (janexing): this should be deprecated soon. func (ief *InternalExecutorFactory) NewInternalExecutor( @@ -1158,3 +1244,45 @@ func (ief *InternalExecutorFactory) NewInternalExecutor( ie.SetSessionData(sd) return &ie } + +// NewInternalExecutorWithTxn creates an internal executor with txn-related info, +// such as descriptor collection and schema change job records, etc. It should +// be called only after InternalExecutorFactory.NewInternalExecutor is already +// called to construct the InternalExecutorFactory with required server info. +// This function should only be used under CollectionFactory.TxnWithExecutor(). +func (ief *InternalExecutorFactory) NewInternalExecutorWithTxn( + sd *sessiondata.SessionData, sv *settings.Values, txn *kv.Txn, descCol *descs.Collection, +) (sqlutil.InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc) { + schemaChangeJobRecords := make(map[descpb.ID]*jobs.Record) + // By default, if not given session data, we initialize a sessionData that + // would be the same as what would be created if root logged in. + // The sessionData's user can be override when calling the query + // functions of internal executor. + // TODO(janexing): since we can be running queries with a higher privilege + // than the actual user, a security boundary should be added to the error + // handling of internal executor. + if sd == nil { + sd = NewFakeSessionData(sv) + sd.UserProto = username.RootUserName().EncodeProto() + } + ie, commitTxnFunc := newInternalExecutorWithTxn( + ief.server, + sd, + txn, + ief.memMetrics, + ief.monitor, + descCol, + schemaChangeJobRecords, + ) + + return ie, commitTxnFunc +} + +// RunWithoutTxn is to create an internal executor without binding to a txn, +// and run the passed function with this internal executor. +func (ief *InternalExecutorFactory) RunWithoutTxn( + ctx context.Context, run func(ctx context.Context, ie sqlutil.InternalExecutor) error, +) error { + ie := ief.NewInternalExecutor(nil /* sessionData */) + return run(ctx, ie) +} diff --git a/pkg/sql/sqlutil/internal_executor.go b/pkg/sql/sqlutil/internal_executor.go index 1825c8051957..c94f183b2b6c 100644 --- a/pkg/sql/sqlutil/internal_executor.go +++ b/pkg/sql/sqlutil/internal_executor.go @@ -214,6 +214,9 @@ type InternalExecutorFactory interface { // NewInternalExecutor constructs a new internal executor. // TODO (janexing): this should be deprecated soon. NewInternalExecutor(sd *sessiondata.SessionData) InternalExecutor + // RunWithoutTxn is to create an internal executor without binding to a txn, + // and run the passed function with this internal executor. + RunWithoutTxn(ctx context.Context, run func(ctx context.Context, ie InternalExecutor) error) error } // InternalExecFn is the type of functions that operates using an internalExecutor. @@ -223,3 +226,7 @@ type InternalExecFn func(ctx context.Context, txn *kv.Txn, ie InternalExecutor) // passes the fn the exported InternalExecutor instead of the whole unexported // extendedEvalContenxt, so it can be implemented outside pkg/sql. type HistoricalInternalExecTxnRunner func(ctx context.Context, fn InternalExecFn) error + +// InternalExecutorCommitTxnFunc is to commit the txn associated with an +// internal executor. +type InternalExecutorCommitTxnFunc func(ctx context.Context) error From 9bfd4fc5f968ce95cb7fc8b6d04cbf34ececc7b5 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Mon, 27 Jun 2022 17:40:30 -0400 Subject: [PATCH 6/7] sql: refactor adminServer.setUIData with ieFactory.RunWithoutTxn() This commit provide an example to refactor the current use cases with the new internal executor interfaces. In this example, originally, the internal executor was used with a nil txn. We now replace it with ieFactory.RunWithoutTxn(). Release Note: None Release note (): --- pkg/server/admin.go | 33 ++++++++++++++++++++------------- pkg/server/server.go | 1 + pkg/server/server_sql.go | 30 ++++++++++++++++++------------ 3 files changed, 39 insertions(+), 25 deletions(-) diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 32de7ce2e0c0..42502559c689 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -1779,21 +1779,28 @@ func (s *adminServer) SetUIData( for key, val := range req.KeyValues { // Do an upsert of the key. We update each key in a separate transaction to // avoid long-running transactions and possible deadlocks. - query := `UPSERT INTO system.ui (key, value, "lastUpdated") VALUES ($1, $2, now())` - rowsAffected, err := s.server.sqlServer.internalExecutor.ExecEx( - ctx, "admin-set-ui-data", nil, /* txn */ - sessiondata.InternalExecutorOverride{ - User: username.RootUserName(), - }, - query, makeUIKey(userName, key), val) - if err != nil { - return nil, serverError(ctx, err) - } - if rowsAffected != 1 { - return nil, serverErrorf(ctx, "rows affected %d != expected %d", rowsAffected, 1) + + if err := s.server.sqlServer.internalExecutorFactory.RunWithoutTxn(ctx, func( + ctx context.Context, ie sqlutil.InternalExecutor, + ) error { + query := `UPSERT INTO system.ui (key, value, "lastUpdated") VALUES ($1, $2, now())` + rowsAffected, err := ie.ExecEx( + ctx, "admin-set-ui-data", nil, /* txn */ + sessiondata.InternalExecutorOverride{ + User: username.RootUserName(), + }, + query, makeUIKey(userName, key), val) + if err != nil { + return serverError(ctx, err) + } + if rowsAffected != 1 { + return serverErrorf(ctx, "rows affected %d != expected %d", rowsAffected, 1) + } + return nil + }); err != nil { + return nil, err } } - return &serverpb.SetUIDataResponse{}, nil } diff --git a/pkg/server/server.go b/pkg/server/server.go index 98aa504b633c..08afc206010b 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -803,6 +803,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { closedSessionCache: closedSessionCache, flowScheduler: flowScheduler, circularInternalExecutor: internalExecutor, + internalExecutorFactory: nil, // will be initialized in server.newSQLServer. circularJobRegistry: jobRegistry, jobAdoptionStopFile: jobAdoptionStopFile, protectedtsProvider: protectedtsProvider, diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index d1dc07aa9ac4..3ccc6ad8d69f 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -130,18 +130,19 @@ import ( // standalone SQLServer instances per tenant (the KV layer is shared across all // tenants). type SQLServer struct { - ambientCtx log.AmbientContext - stopper *stop.Stopper - sqlIDContainer *base.SQLIDContainer - pgServer *pgwire.Server - distSQLServer *distsql.ServerImpl - execCfg *sql.ExecutorConfig - cfg *BaseConfig - internalExecutor *sql.InternalExecutor - leaseMgr *lease.Manager - blobService *blobs.Service - tracingService *service.Service - tenantConnect kvtenant.Connector + ambientCtx log.AmbientContext + stopper *stop.Stopper + sqlIDContainer *base.SQLIDContainer + pgServer *pgwire.Server + distSQLServer *distsql.ServerImpl + execCfg *sql.ExecutorConfig + cfg *BaseConfig + internalExecutor *sql.InternalExecutor + internalExecutorFactory sqlutil.InternalExecutorFactory + leaseMgr *lease.Manager + blobService *blobs.Service + tracingService *service.Service + tenantConnect kvtenant.Connector // sessionRegistry can be queried for info on running SQL sessions. It is // shared between the sql.Server and the statusServer. sessionRegistry *sql.SessionRegistry @@ -303,6 +304,9 @@ type sqlServerArgs struct { // TODO(tbg): make this less hacky. circularInternalExecutor *sql.InternalExecutor // empty initially + // internalExecutorFactory is to initialize an internal executor. + internalExecutorFactory sqlutil.InternalExecutorFactory + // Stores and deletes expired liveness sessions. sqlLivenessProvider sqlliveness.Provider @@ -978,6 +982,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.registry.AddMetricStruct(m) } *cfg.circularInternalExecutor = sql.MakeInternalExecutor(pgServer.SQLServer, internalMemMetrics, ieFactoryMonitor) + cfg.internalExecutorFactory = ieFactory execCfg.InternalExecutor = cfg.circularInternalExecutor stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry( cfg.circularInternalExecutor, @@ -1118,6 +1123,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { distSQLServer: distSQLServer, execCfg: execCfg, internalExecutor: cfg.circularInternalExecutor, + internalExecutorFactory: cfg.internalExecutorFactory, leaseMgr: leaseMgr, blobService: blobService, tracingService: tracingService, From afb5db4cf1afad8eba265e2238a0e175b936e79c Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Mon, 27 Jun 2022 17:41:20 -0400 Subject: [PATCH 7/7] sql: refactor validation with collectionFactory.WithTxn() and planner.WithTxn() This commit is to provides example to refactor the usages of internal executor with the new interfaces. Idealy, if a planner is involved, use the query functions for `sql.planner`. Otherwise, if the query is to run with a not-nil txn, we should use collectionFactory.WithTxn(). Release note: None --- pkg/sql/alter_table.go | 37 ++++---- pkg/sql/backfill.go | 184 +++++++++++++++++++++++------------- pkg/sql/check.go | 15 ++- pkg/sql/index_backfiller.go | 3 +- pkg/sql/mvcc_backfiller.go | 3 +- pkg/sql/schema_changer.go | 24 ++++- 6 files changed, 170 insertions(+), 96 deletions(-) diff --git a/pkg/sql/alter_table.go b/pkg/sql/alter_table.go index 1c738b02854a..f7f6d8e7212d 100644 --- a/pkg/sql/alter_table.go +++ b/pkg/sql/alter_table.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -43,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/storageparam" "github.com/cockroachdb/cockroach/pkg/sql/storageparam/tablestorageparam" @@ -558,10 +560,9 @@ func (n *alterTableNode) startExec(params runParams) error { return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState, "constraint %q in the middle of being added, try again later", t.Constraint) } - if err := validateCheckInTxn( - params.ctx, ¶ms.p.semaCtx, params.ExecCfg().InternalExecutorFactory, - params.SessionData(), n.tableDesc, params.p.Txn(), ck.Expr, - ); err != nil { + if err := params.p.WithInternalExecutor(params.ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + return validateCheckInTxn(ctx, ¶ms.p.semaCtx, params.p.SessionData(), n.tableDesc, txn, ie, ck.Expr) + }); err != nil { return err } ck.Validity = descpb.ConstraintValidity_Validated @@ -581,19 +582,12 @@ func (n *alterTableNode) startExec(params runParams) error { return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState, "constraint %q in the middle of being added, try again later", t.Constraint) } - if err := validateFkInTxn( - params.ctx, - params.ExecCfg().InternalExecutorFactory, - params.p.SessionData(), - n.tableDesc, - params.p.Txn(), - params.p.Descriptors(), - name, - ); err != nil { + if err := params.p.WithInternalExecutor(params.ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + return validateFkInTxn(ctx, n.tableDesc, txn, ie, params.p.descCollection, name) + }); err != nil { return err } foundFk.Validity = descpb.ConstraintValidity_Validated - case descpb.ConstraintTypeUnique: if constraint.Index == nil { var foundUnique *descpb.UniqueWithoutIndexConstraint @@ -610,11 +604,16 @@ func (n *alterTableNode) startExec(params runParams) error { return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState, "constraint %q in the middle of being added, try again later", t.Constraint) } - if err := validateUniqueWithoutIndexConstraintInTxn( - params.ctx, params.ExecCfg().InternalExecutorFactory.NewInternalExecutor( - params.SessionData(), - ), n.tableDesc, params.p.Txn(), params.p.User(), name, - ); err != nil { + if err := params.p.WithInternalExecutor(params.ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + return validateUniqueWithoutIndexConstraintInTxn( + params.ctx, + n.tableDesc, + txn, + ie, + params.p.User(), + name, + ) + }); err != nil { return err } foundUnique.Validity = descpb.ConstraintValidity_Validated diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index eef01b049221..45453d71df84 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -137,7 +137,7 @@ func (sc *SchemaChanger) getChunkSize(chunkSize int64) int64 { } // scTxnFn is the type of functions that operates using transactions in the backfiller. -type scTxnFn func(ctx context.Context, txn *kv.Txn, evalCtx *extendedEvalContext) error +type scTxnFn func(ctx context.Context, txn *kv.Txn, evalCtx *extendedEvalContext, ie sqlutil.InternalExecutor) error // historicalTxnRunner is the type of the callback used by the various // helper functions to run checks at a fixed timestamp (logically, at @@ -147,12 +147,16 @@ type historicalTxnRunner func(ctx context.Context, fn scTxnFn) error // makeFixedTimestampRunner creates a historicalTxnRunner suitable for use by the helpers. func (sc *SchemaChanger) makeFixedTimestampRunner(readAsOf hlc.Timestamp) historicalTxnRunner { runner := func(ctx context.Context, retryable scTxnFn) error { - return sc.fixedTimestampTxn(ctx, readAsOf, func( - ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + return sc.fixedTimestampTxnWithExecutor(ctx, readAsOf, func( + ctx context.Context, + txn *kv.Txn, + sd *sessiondata.SessionData, + descriptors *descs.Collection, + ie sqlutil.InternalExecutor, ) error { // We need to re-create the evalCtx since the txn may retry. - evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, readAsOf, descriptors) - return retryable(ctx, txn, &evalCtx) + evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, sd, readAsOf, descriptors) + return retryable(ctx, txn, &evalCtx, ie) }) } return runner @@ -187,6 +191,26 @@ func (sc *SchemaChanger) fixedTimestampTxn( }) } +func (sc *SchemaChanger) fixedTimestampTxnWithExecutor( + ctx context.Context, + readAsOf hlc.Timestamp, + retryable func( + ctx context.Context, + txn *kv.Txn, + sd *sessiondata.SessionData, + descriptors *descs.Collection, + ie sqlutil.InternalExecutor, + ) error, +) error { + sd := NewFakeSessionData(sc.execCfg.SV()) + return sc.txnWithExecutor(ctx, sd, func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor) error { + if err := txn.SetFixedTimestamp(ctx, readAsOf); err != nil { + return err + } + return retryable(ctx, txn, sd, descriptors, ie) + }) +} + // runBackfill runs the backfill for the schema changer. // // This operates over multiple goroutines concurrently and is thus not @@ -739,7 +763,7 @@ func (sc *SchemaChanger) validateConstraints( } desc := descI.(*tabledesc.Mutable) // Each check operates at the historical timestamp. - return runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, evalCtx *extendedEvalContext) error { + return runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, evalCtx *extendedEvalContext, ie sqlutil.InternalExecutor) error { // If the constraint is a check constraint that fails validation, we // need a semaContext set up that can resolve types in order to pretty // print the check expression back to the user. @@ -754,23 +778,21 @@ func (sc *SchemaChanger) validateConstraints( defer func() { collection.ReleaseAll(ctx) }() if c.IsCheck() { if err := validateCheckInTxn( - ctx, &semaCtx, sc.ieFactory, evalCtx.SessionData(), desc, txn, c.Check().Expr, + ctx, &semaCtx, evalCtx.SessionData(), desc, txn, ie, c.Check().Expr, ); err != nil { return err } } else if c.IsForeignKey() { - if err := validateFkInTxn(ctx, sc.ieFactory, evalCtx.SessionData(), desc, txn, collection, c.GetName()); err != nil { + if err := validateFkInTxn(ctx, desc, txn, ie, collection, c.GetName()); err != nil { return err } } else if c.IsUniqueWithoutIndex() { - if err := validateUniqueWithoutIndexConstraintInTxn( - ctx, sc.ieFactory.NewInternalExecutor(evalCtx.SessionData()), desc, txn, evalCtx.SessionData().User(), c.GetName(), - ); err != nil { + if err := validateUniqueWithoutIndexConstraintInTxn(ctx, desc, txn, ie, evalCtx.SessionData().User(), c.GetName()); err != nil { return err } } else if c.IsNotNull() { if err := validateCheckInTxn( - ctx, &semaCtx, sc.ieFactory, evalCtx.SessionData(), desc, txn, c.Check().Expr, + ctx, &semaCtx, evalCtx.SessionData(), desc, txn, ie, c.Check().Expr, ); err != nil { // TODO (lucy): This should distinguish between constraint // validation errors and other types of unexpected errors, and @@ -999,7 +1021,8 @@ func (sc *SchemaChanger) distIndexBackfill( if err != nil { return err } - evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), descriptors) + sd := NewFakeSessionData(sc.execCfg.SV()) + evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, sd, txn.ReadTimestamp(), descriptors) planCtx = sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil, /* planner */ txn, DistributionTypeSystemTenantOnly) indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV) @@ -1295,7 +1318,8 @@ func (sc *SchemaChanger) distColumnBackfill( return nil } cbw := MetadataCallbackWriter{rowResultWriter: &errOnlyResultWriter{}, fn: metaFn} - evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), descriptors) + sd := NewFakeSessionData(sc.execCfg.SV()) + evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, sd, txn.ReadTimestamp(), descriptors) recv := MakeDistSQLReceiver( ctx, &cbw, @@ -2381,10 +2405,9 @@ func runSchemaChangesInTxn( if c.IsCheck() || c.IsNotNull() { check := &c.ConstraintToUpdateDesc().Check if check.Validity == descpb.ConstraintValidity_Validating { - if err := validateCheckInTxn( - ctx, &planner.semaCtx, planner.ExecCfg().InternalExecutorFactory, - planner.SessionData(), tableDesc, planner.txn, check.Expr, - ); err != nil { + if err := planner.WithInternalExecutor(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + return validateCheckInTxn(ctx, &planner.semaCtx, planner.SessionData(), tableDesc, txn, ie, check.Expr) + }); err != nil { return err } check.Validity = descpb.ConstraintValidity_Validated @@ -2406,9 +2429,17 @@ func runSchemaChangesInTxn( } else if c.IsUniqueWithoutIndex() { uwi := &c.ConstraintToUpdateDesc().UniqueWithoutIndexConstraint if uwi.Validity == descpb.ConstraintValidity_Validating { - if err := validateUniqueWithoutIndexConstraintInTxn( - ctx, planner.ExecCfg().InternalExecutor, tableDesc, planner.txn, planner.User(), c.GetName(), - ); err != nil { + + if err := planner.WithInternalExecutor(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + return validateUniqueWithoutIndexConstraintInTxn( + ctx, + tableDesc, + txn, + ie, + planner.User(), + c.GetName(), + ) + }); err != nil { return err } uwi.Validity = descpb.ConstraintValidity_Validated @@ -2480,48 +2511,40 @@ func runSchemaChangesInTxn( func validateCheckInTxn( ctx context.Context, semaCtx *tree.SemaContext, - ief sqlutil.InternalExecutorFactory, sessionData *sessiondata.SessionData, tableDesc *tabledesc.Mutable, txn *kv.Txn, + ie sqlutil.InternalExecutor, checkExpr string, ) error { var syntheticDescs []catalog.Descriptor if tableDesc.Version > tableDesc.ClusterVersion().Version { syntheticDescs = append(syntheticDescs, tableDesc) } - ie := ief.NewInternalExecutor(sessionData) - return ie.WithSyntheticDescriptors(syntheticDescs, func() error { - return validateCheckExpr(ctx, semaCtx, sessionData, checkExpr, tableDesc, ie, txn) - }) + + return ie.WithSyntheticDescriptors( + syntheticDescs, + func() error { + return validateCheckExpr(ctx, semaCtx, txn, sessionData, checkExpr, tableDesc, ie) + }) } -// validateFkInTxn validates foreign key constraints within the provided -// transaction. If the provided table descriptor version is newer than the -// cluster version, it will be used in the InternalExecutor that performs the -// validation query. -// -// TODO (lucy): The special case where the table descriptor version is the same -// as the cluster version only happens because the query in VALIDATE CONSTRAINT -// still runs in the user transaction instead of a step in the schema changer. -// When that's no longer true, this function should be updated. -// -// It operates entirely on the current goroutine and is thus able to -// reuse an existing kv.Txn safely. -func validateFkInTxn( +func getTargetTablesAndFk( ctx context.Context, - ief sqlutil.InternalExecutorFactory, - sd *sessiondata.SessionData, srcTable *tabledesc.Mutable, txn *kv.Txn, descsCol *descs.Collection, fkName string, -) error { +) ( + syntheticDescs []catalog.Descriptor, + fk *descpb.ForeignKeyConstraint, + targetTable catalog.TableDescriptor, + err error, +) { var syntheticTable catalog.TableDescriptor if srcTable.Version > srcTable.ClusterVersion().Version { syntheticTable = srcTable } - var fk *descpb.ForeignKeyConstraint for i := range srcTable.OutboundFKs { def := &srcTable.OutboundFKs[i] if def.Name == fkName { @@ -2530,23 +2553,51 @@ func validateFkInTxn( } } if fk == nil { - return errors.AssertionFailedf("foreign key %s does not exist", fkName) + return nil, nil, nil, errors.AssertionFailedf("foreign key %s does not exist", fkName) } - targetTable, err := descsCol.Direct().MustGetTableDescByID(ctx, txn, fk.ReferencedTableID) + targetTable, err = descsCol.Direct().MustGetTableDescByID(ctx, txn, fk.ReferencedTableID) if err != nil { - return err + return nil, nil, nil, err } - var syntheticDescs []catalog.Descriptor if syntheticTable != nil { syntheticDescs = append(syntheticDescs, syntheticTable) if targetTable.GetID() == syntheticTable.GetID() { targetTable = syntheticTable } } - ie := ief.NewInternalExecutor(sd) - return ie.WithSyntheticDescriptors(syntheticDescs, func() error { - return validateForeignKey(ctx, srcTable, targetTable, fk, ie, txn) - }) + return syntheticDescs, fk, targetTable, nil +} + +// validateFkInTxn validates foreign key constraints within the provided +// transaction. If the provided table descriptor version is newer than the +// cluster version, it will be used in the InternalExecutor that performs the +// validation query. +// +// TODO (lucy): The special case where the table descriptor version is the same +// as the cluster version only happens because the query in VALIDATE CONSTRAINT +// still runs in the user transaction instead of a step in the schema changer. +// When that's no longer true, this function should be updated. +// +// It operates entirely on the current goroutine and is thus able to +// reuse an existing kv.Txn safely. +func validateFkInTxn( + ctx context.Context, + srcTable *tabledesc.Mutable, + txn *kv.Txn, + ie sqlutil.InternalExecutor, + descsCol *descs.Collection, + fkName string, +) error { + syntheticDescs, fk, targetTable, err := getTargetTablesAndFk(ctx, srcTable, txn, descsCol, fkName) + if err != nil { + return err + } + + return ie.WithSyntheticDescriptors( + syntheticDescs, + func() error { + return validateForeignKey(ctx, srcTable, targetTable, fk, ie, txn) + }) } // validateUniqueWithoutIndexConstraintInTxn validates a unique constraint @@ -2563,9 +2614,9 @@ func validateFkInTxn( // reuse an existing kv.Txn safely. func validateUniqueWithoutIndexConstraintInTxn( ctx context.Context, - ie sqlutil.InternalExecutor, tableDesc *tabledesc.Mutable, txn *kv.Txn, + ie sqlutil.InternalExecutor, user username.SQLUsername, constraintName string, ) error { @@ -2573,7 +2624,6 @@ func validateUniqueWithoutIndexConstraintInTxn( if tableDesc.Version > tableDesc.ClusterVersion().Version { syntheticDescs = append(syntheticDescs, tableDesc) } - var uc *descpb.UniqueWithoutIndexConstraint for i := range tableDesc.UniqueWithoutIndexConstraints { def := &tableDesc.UniqueWithoutIndexConstraints[i] @@ -2586,19 +2636,21 @@ func validateUniqueWithoutIndexConstraintInTxn( return errors.AssertionFailedf("unique constraint %s does not exist", constraintName) } - return ie.WithSyntheticDescriptors(syntheticDescs, func() error { - return validateUniqueConstraint( - ctx, - tableDesc, - uc.Name, - uc.ColumnIDs, - uc.Predicate, - ie, - txn, - user, - false, /* preExisting */ - ) - }) + return ie.WithSyntheticDescriptors( + syntheticDescs, + func() error { + return validateUniqueConstraint( + ctx, + tableDesc, + uc.Name, + uc.ColumnIDs, + uc.Predicate, + ie, + txn, + user, + false, /* preExisting */ + ) + }) } // columnBackfillInTxn backfills columns for all mutation columns in diff --git a/pkg/sql/check.go b/pkg/sql/check.go index 5819ac19405c..9bbdf49d3f28 100644 --- a/pkg/sql/check.go +++ b/pkg/sql/check.go @@ -43,11 +43,11 @@ import ( func validateCheckExpr( ctx context.Context, semaCtx *tree.SemaContext, + txn *kv.Txn, sessionData *sessiondata.SessionData, exprStr string, tableDesc *tabledesc.Mutable, ie sqlutil.InternalExecutor, - txn *kv.Txn, ) error { expr, err := schemaexpr.FormatExprForDisplay(ctx, tableDesc, exprStr, semaCtx, sessionData, tree.FmtParsable) if err != nil { @@ -57,8 +57,14 @@ func validateCheckExpr( columns := tree.AsStringWithFlags(&colSelectors, tree.FmtSerializable) queryStr := fmt.Sprintf(`SELECT %s FROM [%d AS t] WHERE NOT (%s) LIMIT 1`, columns, tableDesc.GetID(), exprStr) log.Infof(ctx, "validating check constraint %q with query %q", expr, queryStr) - - rows, err := ie.QueryRow(ctx, "validate check constraint", txn, queryStr) + rows, err := ie.QueryRowEx( + ctx, + "validate check constraint", + txn, + sessiondata.InternalExecutorOverride{ + User: username.RootUserName(), + }, + queryStr) if err != nil { return err } @@ -269,7 +275,8 @@ func validateForeignKey( ) values, err := ie.QueryRowEx(ctx, "validate foreign key constraint", - txn, sessiondata.NodeUserSessionDataOverride, query) + txn, + sessiondata.NodeUserSessionDataOverride, query) if err != nil { return err } diff --git a/pkg/sql/index_backfiller.go b/pkg/sql/index_backfiller.go index 730d8f487b50..fa76ad9ca084 100644 --- a/pkg/sql/index_backfiller.go +++ b/pkg/sql/index_backfiller.go @@ -166,7 +166,8 @@ func (ib *IndexBackfillPlanner) plan( if err := DescsTxn(ctx, ib.execCfg, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { - evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, nowTimestamp, descriptors) + sd := NewFakeSessionData(ib.execCfg.SV()) + evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, sd, nowTimestamp, descriptors) planCtx = ib.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn, DistributionTypeSystemTenantOnly) // TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the diff --git a/pkg/sql/mvcc_backfiller.go b/pkg/sql/mvcc_backfiller.go index ed4baab5d3dd..547ee800b890 100644 --- a/pkg/sql/mvcc_backfiller.go +++ b/pkg/sql/mvcc_backfiller.go @@ -126,7 +126,8 @@ func (im *IndexBackfillerMergePlanner) plan( if err := DescsTxn(ctx, im.execCfg, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { - evalCtx = createSchemaChangeEvalCtx(ctx, im.execCfg, txn.ReadTimestamp(), descriptors) + sd := NewFakeSessionData(im.execCfg.SV()) + evalCtx = createSchemaChangeEvalCtx(ctx, im.execCfg, sd, txn.ReadTimestamp(), descriptors) planCtx = im.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn, DistributionTypeSystemTenantOnly) diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 253451721420..2c9655301a08 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -2447,10 +2447,23 @@ func (sc *SchemaChanger) txn( return err } } - return sc.execCfg.CollectionFactory.Txn(ctx, sc.execCfg.InternalExecutor, sc.db, f) } +// txnWithExecutor is to run internal executor within a txn. +func (sc *SchemaChanger) txnWithExecutor( + ctx context.Context, + sd *sessiondata.SessionData, + f func(context.Context, *kv.Txn, *descs.Collection, sqlutil.InternalExecutor) error, +) error { + if fn := sc.testingKnobs.RunBeforeDescTxn; fn != nil { + if err := fn(sc.job.ID()); err != nil { + return err + } + } + return sc.execCfg.CollectionFactory.TxnWithExecutor(ctx, sc.db, sd, f) +} + // createSchemaChangeEvalCtx creates an extendedEvalContext() to be used for backfills. // // TODO(andrei): This EvalContext() will be broken for backfills trying to use @@ -2459,11 +2472,12 @@ func (sc *SchemaChanger) txn( // used in the surrounding SQL session, so session tracing is unable // to capture schema change activity. func createSchemaChangeEvalCtx( - ctx context.Context, execCfg *ExecutorConfig, ts hlc.Timestamp, descriptors *descs.Collection, + ctx context.Context, + execCfg *ExecutorConfig, + sd *sessiondata.SessionData, + ts hlc.Timestamp, + descriptors *descs.Collection, ) extendedEvalContext { - - sd := NewFakeSessionData(execCfg.SV()) - evalCtx := extendedEvalContext{ // Make a session tracing object on-the-fly. This is OK // because it sets "enabled: false" and thus none of the