Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: introduce new internal executor interfaces #82477

Merged
merged 7 commits into from
Aug 12, 2022
4 changes: 2 additions & 2 deletions pkg/bench/rttanalysis/testdata/benchmark_expectations
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ exp,benchmark
2,ORMQueries/has_table_privilege_1
4,ORMQueries/has_table_privilege_3
6,ORMQueries/has_table_privilege_5
15,ORMQueries/information_schema._pg_index_position
3,ORMQueries/information_schema._pg_index_position
2,ORMQueries/pg_attribute
2,ORMQueries/pg_class
11,ORMQueries/pg_is_other_temp_schema
23,ORMQueries/pg_is_other_temp_schema_multiple_times
18,ORMQueries/pg_is_other_temp_schema_multiple_times
4,ORMQueries/pg_my_temp_schema
4,ORMQueries/pg_my_temp_schema_multiple_times
4,ORMQueries/pg_namespace
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1581,7 +1581,7 @@ func revalidateIndexes(
// since our table is offline.
var runner sqlutil.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sqlutil.InternalExecFn) error {
return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ie := job.MakeSessionBoundInternalExecutor(ctx, sql.NewFakeSessionData(execCfg.SV())).(*sql.InternalExecutor)
ie := job.MakeSessionBoundInternalExecutor(sql.NewFakeSessionData(execCfg.SV())).(*sql.InternalExecutor)
return fn(ctx, txn, ie)
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func newRowFetcherCache(
return &rowFetcherCache{
codec: codec,
leaseMgr: leaseMgr,
collection: cf.NewCollection(ctx, nil /* TemporarySchemaProvider */),
collection: cf.NewCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */),
db: db,
fetchers: cache.NewUnorderedCache(defaultCacheConfig),
watchedFamilies: watchedFamilies,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,7 @@ func getQualifiedTableName(
func getQualifiedTableNameObj(
ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, desc catalog.TableDescriptor,
) (tree.TableName, error) {
col := execCfg.CollectionFactory.MakeCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */)
col := execCfg.CollectionFactory.NewCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */)
dbDesc, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, desc.GetParentID())
if err != nil {
return tree.TableName{}, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/schemafeed/schema_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func New(
settings: cfg.Settings,
targets: targets,
leaseMgr: cfg.LeaseManager.(*lease.Manager),
ie: cfg.SessionBoundInternalExecutorFactory(ctx, &sessiondata.SessionData{}),
ie: cfg.InternalExecutorFactory.NewInternalExecutor(&sessiondata.SessionData{}),
collectionFactory: cfg.CollectionFactory,
metrics: metrics,
tolerances: tolerances,
Expand Down
9 changes: 7 additions & 2 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,9 +718,14 @@ func (j *Job) FractionCompleted() float32 {
// sessionBoundInternalExecutorFactory for a more detailed explanation of why
// this exists.
func (j *Job) MakeSessionBoundInternalExecutor(
ctx context.Context, sd *sessiondata.SessionData,
sd *sessiondata.SessionData,
) sqlutil.InternalExecutor {
return j.registry.sessionBoundInternalExecutorFactory(ctx, sd)
return j.registry.internalExecutorFactory.NewInternalExecutor(sd)
}

// GetInternalExecutorFactory returns the internal executor factory.
func (j *Job) GetInternalExecutorFactory() sqlutil.InternalExecutorFactory {
return j.registry.internalExecutorFactory
}

// MarkIdle marks the job as Idle. Idleness should not be toggled frequently
Expand Down
12 changes: 5 additions & 7 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type Registry struct {
// field. Modifying the TableCollection is basically a per-query operation
// and should be a per-query setting. #34304 is the issue for creating/
// improving this API.
sessionBoundInternalExecutorFactory sqlutil.SessionBoundInternalExecutorFactory
internalExecutorFactory sqlutil.InternalExecutorFactory

// if non-empty, indicates path to file that prevents any job adoptions.
preventAdoptionFile string
Expand Down Expand Up @@ -226,14 +226,12 @@ func MakeRegistry(
return r
}

// SetSessionBoundInternalExecutorFactory sets the
// SessionBoundInternalExecutorFactory that will be used by the job registry
// SetInternalExecutorFactory sets the
// InternalExecutorFactory that will be used by the job registry
// executor. We expose this separately from the constructor to avoid a circular
// dependency.
func (r *Registry) SetSessionBoundInternalExecutorFactory(
factory sqlutil.SessionBoundInternalExecutorFactory,
) {
r.sessionBoundInternalExecutorFactory = factory
func (r *Registry) SetInternalExecutorFactory(factory sqlutil.InternalExecutorFactory) {
r.internalExecutorFactory = factory
}

// NewSpanConstrainer returns an instance of sql.SpanConstrainer as an interface{},
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,17 @@ type wrappedInternalExecutor struct {
}
}

func (ie *wrappedInternalExecutor) QueryBufferedExWithCols(
ctx context.Context,
opName string,
txn *kv.Txn,
session sessiondata.InternalExecutorOverride,
stmt string,
qargs ...interface{},
) ([]tree.Datums, colinfo.ResultColumns, error) {
panic("unimplemented")
}

var _ sqlutil.InternalExecutor = &wrappedInternalExecutor{}

func (ie *wrappedInternalExecutor) Exec(
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_internal_executor_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestInternalExecutorClearsMonitorMemory(t *testing.T) {
mon := s.(*TestServer).sqlServer.internalExecutorFactoryMemMonitor
ief := s.ExecutorConfig().(sql.ExecutorConfig).InternalExecutorFactory
sessionData := sql.NewFakeSessionData(&s.ClusterSettings().SV)
ie := ief(ctx, sessionData)
ie := ief.NewInternalExecutor(sessionData)
rows, err := ie.QueryIteratorEx(ctx, "test", nil, sessiondata.NodeUserSessionDataOverride, `SELECT 1`)
require.NoError(t, err)
require.Greater(t, mon.AllocBytes(), int64(0))
Expand Down
22 changes: 11 additions & 11 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,18 +947,18 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
ieFactoryMonitor.StartNoReserved(ctx, pgServer.SQLServer.GetBytesMonitor())
// Now that we have a pgwire.Server (which has a sql.Server), we can close a
// circular dependency between the rowexec.Server and sql.Server and set
// SessionBoundInternalExecutorFactory. The same applies for setting a
// InternalExecutorFactory. The same applies for setting a
// SessionBoundInternalExecutor on the job registry.
ieFactory := func(
ctx context.Context, sessionData *sessiondata.SessionData,
) sqlutil.InternalExecutor {
ie := sql.MakeInternalExecutor(pgServer.SQLServer, internalMemMetrics, ieFactoryMonitor)
ie.SetSessionData(sessionData)
return &ie
}
ieFactory := sql.NewInternalExecutorFactory(
pgServer.SQLServer,
internalMemMetrics,
ieFactoryMonitor,
)

collectionFactory.SetInternalExecutorWithTxn(ieFactory)

distSQLServer.ServerConfig.SessionBoundInternalExecutorFactory = ieFactory
jobRegistry.SetSessionBoundInternalExecutorFactory(ieFactory)
distSQLServer.ServerConfig.InternalExecutorFactory = ieFactory
jobRegistry.SetInternalExecutorFactory(ieFactory)
execCfg.IndexBackfiller = sql.NewIndexBackfiller(execCfg)
execCfg.IndexMerger = sql.NewIndexBackfillerMergePlanner(execCfg)
execCfg.IndexValidator = scdeps.NewIndexValidator(
Expand Down Expand Up @@ -1070,7 +1070,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
cfg.db,
codec,
cfg.registry,
distSQLServer.ServerConfig.SessionBoundInternalExecutorFactory,
distSQLServer.ServerConfig.InternalExecutorFactory,
cfg.sqlStatusServer,
cfg.isMeta1Leaseholder,
sqlExecutorTestingKnobs,
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,8 +611,8 @@ func (n *alterTableNode) startExec(params runParams) error {
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateUniqueWithoutIndexConstraintInTxn(
params.ctx, params.ExecCfg().InternalExecutorFactory(
params.ctx, params.SessionData(),
params.ctx, params.ExecCfg().InternalExecutorFactory.NewInternalExecutor(
params.SessionData(),
), n.tableDesc, params.p.Txn(), params.p.User(), name,
); err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// We need to re-create the evalCtx since the txn may retry.
ie := sc.ieFactory(ctx, NewFakeSessionData(sc.execCfg.SV()))
ie := sc.ieFactory.NewInternalExecutor(NewFakeSessionData(sc.execCfg.SV()))
return retryable(ctx, txn, ie)
})
}
Expand Down Expand Up @@ -764,7 +764,7 @@ func (sc *SchemaChanger) validateConstraints(
}
} else if c.IsUniqueWithoutIndex() {
if err := validateUniqueWithoutIndexConstraintInTxn(
ctx, sc.ieFactory(ctx, evalCtx.SessionData()), desc, txn, evalCtx.SessionData().User(), c.GetName(),
ctx, sc.ieFactory.NewInternalExecutor(evalCtx.SessionData()), desc, txn, evalCtx.SessionData().User(), c.GetName(),
); err != nil {
return err
}
Expand Down Expand Up @@ -2480,7 +2480,7 @@ func runSchemaChangesInTxn(
func validateCheckInTxn(
ctx context.Context,
semaCtx *tree.SemaContext,
ief sqlutil.SessionBoundInternalExecutorFactory,
ief sqlutil.InternalExecutorFactory,
sessionData *sessiondata.SessionData,
tableDesc *tabledesc.Mutable,
txn *kv.Txn,
Expand All @@ -2490,7 +2490,7 @@ func validateCheckInTxn(
if tableDesc.Version > tableDesc.ClusterVersion().Version {
syntheticDescs = append(syntheticDescs, tableDesc)
}
ie := ief(ctx, sessionData)
ie := ief.NewInternalExecutor(sessionData)
return ie.WithSyntheticDescriptors(syntheticDescs, func() error {
return validateCheckExpr(ctx, semaCtx, sessionData, checkExpr, tableDesc, ie, txn)
})
Expand All @@ -2510,7 +2510,7 @@ func validateCheckInTxn(
// reuse an existing kv.Txn safely.
func validateFkInTxn(
ctx context.Context,
ief sqlutil.SessionBoundInternalExecutorFactory,
ief sqlutil.InternalExecutorFactory,
sd *sessiondata.SessionData,
srcTable *tabledesc.Mutable,
txn *kv.Txn,
Expand Down Expand Up @@ -2543,7 +2543,7 @@ func validateFkInTxn(
targetTable = syntheticTable
}
}
ie := ief(ctx, sd)
ie := ief.NewInternalExecutor(sd)
return ie.WithSyntheticDescriptors(syntheticDescs, func() error {
return validateForeignKey(ctx, srcTable, targetTable, fk, ie, txn)
})
Expand Down
15 changes: 10 additions & 5 deletions pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
"github.com/cockroachdb/errors"
)

// makeCollection constructs a Collection.
func makeCollection(
// newCollection constructs a Collection.
func newCollection(
ctx context.Context,
leaseMgr *lease.Manager,
settings *cluster.Settings,
Expand All @@ -50,8 +50,8 @@ func makeCollection(
virtualSchemas catalog.VirtualSchemas,
temporarySchemaProvider TemporarySchemaProvider,
monitor *mon.BytesMonitor,
) Collection {
return Collection{
) *Collection {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the make terminology is differentiated from the new terminology in this codebase and other to indicate that it's returning a struct and not a pointer. You need to rename this newCollection.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return &Collection{
settings: settings,
version: settings.Version.ActiveVersion(ctx),
hydrated: hydrated,
Expand Down Expand Up @@ -178,11 +178,16 @@ func (tc *Collection) ReleaseLeases(ctx context.Context) {
func (tc *Collection) ReleaseAll(ctx context.Context) {
tc.ReleaseLeases(ctx)
tc.stored.reset(ctx)
tc.synthetic.reset()
tc.ResetSyntheticDescriptors()
tc.deletedDescs = catalog.DescriptorIDSet{}
tc.skipValidationOnWrite = false
}

// ResetSyntheticDescriptors clear all syntheticDescriptors.
func (tc *Collection) ResetSyntheticDescriptors() {
tc.synthetic.reset()
}

// HasUncommittedTables returns true if the Collection contains uncommitted
// tables.
func (tc *Collection) HasUncommittedTables() bool {
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/catalog/descs/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestCollectionWriteDescToBatch(t *testing.T) {

db := s0.DB()
descriptors := s0.ExecutorConfig().(sql.ExecutorConfig).CollectionFactory.
NewCollection(ctx, nil /* TemporarySchemaProvider */)
NewCollection(ctx, nil /* TemporarySchemaProvider */, nil /* Monitor */)

// Note this transaction abuses the mechanisms normally required for updating
// tables and is just for testing what this test intends to exercise.
Expand Down Expand Up @@ -640,7 +640,7 @@ func TestCollectionProperlyUsesMemoryMonitoring(t *testing.T) {

// Create a `Collection` with monitor hooked up.
col := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).CollectionFactory.
MakeCollection(ctx, nil /* temporarySchemaProvider */, monitor)
NewCollection(ctx, nil /* temporarySchemaProvider */, monitor)
require.Equal(t, int64(0), monitor.AllocBytes())

// Read all the descriptors into `col` and assert this read will finish without error.
Expand All @@ -659,7 +659,7 @@ func TestCollectionProperlyUsesMemoryMonitoring(t *testing.T) {

// Repeat the process again and assert this time memory allocation will err out.
col = tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).CollectionFactory.
MakeCollection(ctx, nil /* temporarySchemaProvider */, monitor)
NewCollection(ctx, nil /* temporarySchemaProvider */, monitor)
_, err2 := col.GetAllDescriptors(ctx, txn)
require.Error(t, err2)

Expand Down
38 changes: 27 additions & 11 deletions pkg/sql/catalog/descs/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/hydrateddesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/mon"
)

Expand All @@ -33,6 +37,19 @@ type CollectionFactory struct {
spanConfigSplitter spanconfig.Splitter
spanConfigLimiter spanconfig.Limiter
defaultMonitor *mon.BytesMonitor
ieFactoryWithTxn InternalExecutorFactoryWithTxn
}

// InternalExecutorFactoryWithTxn is used to create an internal executor
// with associated extra txn state information.
// It should only be used as a field hanging off CollectionFactory.
type InternalExecutorFactoryWithTxn interface {
NewInternalExecutorWithTxn(
sd *sessiondata.SessionData,
sv *settings.Values,
txn *kv.Txn,
descCol *Collection,
) (sqlutil.InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc)
}

// NewCollectionFactory constructs a new CollectionFactory which holds onto
Expand Down Expand Up @@ -72,24 +89,23 @@ func NewBareBonesCollectionFactory(
}
}

// MakeCollection constructs a Collection for the purposes of embedding.
func (cf *CollectionFactory) MakeCollection(
// NewCollection constructs a new Collection.
func (cf *CollectionFactory) NewCollection(
ctx context.Context, temporarySchemaProvider TemporarySchemaProvider, monitor *mon.BytesMonitor,
) Collection {
) *Collection {
if monitor == nil {
// If an upstream monitor is not provided, the default, unlimited monitor will be used.
// All downstream resource allocation/releases on this default monitor will then be no-ops.
monitor = cf.defaultMonitor
}

return makeCollection(ctx, cf.leaseMgr, cf.settings, cf.codec, cf.hydrated, cf.systemDatabase,
return newCollection(ctx, cf.leaseMgr, cf.settings, cf.codec, cf.hydrated, cf.systemDatabase,
cf.virtualSchemas, temporarySchemaProvider, monitor)
}

// NewCollection constructs a new Collection.
func (cf *CollectionFactory) NewCollection(
ctx context.Context, temporarySchemaProvider TemporarySchemaProvider,
) *Collection {
c := cf.MakeCollection(ctx, temporarySchemaProvider, nil /* monitor */)
return &c
// SetInternalExecutorWithTxn is to set the internal executor factory hanging
// off the collection factory.
func (cf *CollectionFactory) SetInternalExecutorWithTxn(
ieFactoryWithTxn InternalExecutorFactoryWithTxn,
) {
cf.ieFactoryWithTxn = ieFactoryWithTxn
}
Loading