Skip to content

Commit

Permalink
Merge #69338
Browse files Browse the repository at this point in the history
69338: sessiondata: introduce sessiondata.Stack and thread to EvalContext and connExecutor r=rafiss a=otan

See individual commits for details. This one is needed ASAP to prevent merge conflicts.
Part 1 of a series of PRs to merge SET LOCAL.
Refs: #32562

Release justification: low risk, high pri change.


Co-authored-by: Oliver Tan <otan@cockroachlabs.com>
  • Loading branch information
craig[bot] and otan committed Aug 25, 2021
2 parents 336f9c5 + 7e83c05 commit d42e428
Show file tree
Hide file tree
Showing 87 changed files with 457 additions and 339 deletions.
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,9 @@ func TestAvroSchema(t *testing.T) {
require.NoError(t, err)

for _, row := range rows {
evalCtx := &tree.EvalContext{SessionData: &sessiondata.SessionData{}}
evalCtx := &tree.EvalContext{
SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}),
}
serialized, err := origSchema.textualFromRow(row)
require.NoError(t, err)
roundtripped, err := roundtrippedSchema.rowFromTextual(serialized)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/importccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ func benchmarkConvertToKVs(b *testing.B, g workload.Generator) {
wc := importccl.NewWorkloadKVConverter(
0, tableDesc, t.InitialRows, 0, t.InitialRows.NumBatches, kvCh)
evalCtx := &tree.EvalContext{
SessionData: &sessiondata.SessionData{},
Codec: keys.SystemSQLCodec,
SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}),
Codec: keys.SystemSQLCodec,
}
return wc.Worker(ctx, evalCtx)
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/importccl/import_table_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func MakeSimpleTableDescriptor(
evalCtx := tree.EvalContext{
Context: ctx,
Sequence: &importSequenceOperators{},
SessionData: &sessiondata.SessionData{},
SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}),
ClientNoticeSender: &faketreeeval.DummyClientNoticeSender{},
Settings: st,
}
Expand All @@ -226,7 +226,7 @@ func MakeSimpleTableDescriptor(
affected,
semaCtx,
&evalCtx,
evalCtx.SessionData, /* sessionData */
evalCtx.SessionData(), /* sessionData */
tree.PersistencePermanent,
// We need to bypass the LOCALITY on non multi-region check here because
// we cannot access the database region config at import level.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ func (p *parallelImporter) importWorker(
if err != nil {
return err
}
if conv.EvalCtx.SessionData == nil {
if conv.EvalCtx.SessionData() == nil {
panic("uninitialized session data")
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/importccl/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ func descForTable(
}

var testEvalCtx = &tree.EvalContext{
SessionData: &sessiondata.SessionData{
Location: time.UTC,
},
SessionDataStack: sessiondata.NewStack(
&sessiondata.SessionData{
Location: time.UTC,
},
),
StmtTimestamp: timeutil.Unix(100000000, 0),
Settings: cluster.MakeTestingClusterSettings(),
Codec: keys.SystemSQLCodec,
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/workloadccl/format/sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ func ToSSTable(t workload.Table, tableID descpb.ID, ts time.Time) ([]byte, error
g.GoCtx(func(ctx context.Context) error {
defer close(kvCh)
evalCtx := &tree.EvalContext{
SessionData: &sessiondata.SessionData{},
Codec: keys.SystemSQLCodec,
SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}),
Codec: keys.SystemSQLCodec,
}
return wc.Worker(ctx, evalCtx)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/alter_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,7 @@ func (p *planner) AlterDatabasePlacement(
return nil, err
}

if !p.EvalContext().SessionData.PlacementEnabled {
if !p.EvalContext().SessionData().PlacementEnabled {
return nil, errors.WithHint(pgerror.New(
pgcode.FeatureNotSupported,
"ALTER DATABASE PLACEMENT requires that the session setting "+
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/alter_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (n *alterIndexNode) startExec(params runParams) error {
"cannot ALTER INDEX PARTITION BY on an index which already has implicit column partitioning",
)
}
allowImplicitPartitioning := params.p.EvalContext().SessionData.ImplicitColumnPartitioningEnabled ||
allowImplicitPartitioning := params.p.EvalContext().SessionData().ImplicitColumnPartitioningEnabled ||
n.tableDesc.IsLocalityRegionalByRow()
alteredIndexDesc := n.index.IndexDescDeepCopy()
newImplicitCols, newPartitioning, err := CreatePartitioning(
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/alter_primary_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (p *planner) AlterPrimaryKey(
}

if alterPKNode.Sharded != nil {
if !p.EvalContext().SessionData.HashShardedIndexesEnabled {
if !p.EvalContext().SessionData().HashShardedIndexesEnabled {
return hashShardedIndexesDisabledError
}
if alterPKNode.Interleave != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ func (n *alterTableNode) startExec(params runParams) error {
newPrimaryIndexDesc,
t.PartitionBy,
nil, /* allowedNewColumnNames */
params.p.EvalContext().SessionData.ImplicitColumnPartitioningEnabled ||
params.p.EvalContext().SessionData().ImplicitColumnPartitioningEnabled ||
n.tableDesc.IsLocalityRegionalByRow(),
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -2486,7 +2486,7 @@ func indexTruncateInTxn(
for done := false; !done; done = sp.Key == nil {
rd := row.MakeDeleter(
execCfg.Codec, tableDesc, nil /* requestedCols */, &execCfg.Settings.SV,
evalCtx.SessionData.Internal,
evalCtx.SessionData().Internal,
)
td := tableDeleter{rd: rd, alloc: alloc}
if err := td.init(ctx, txn, evalCtx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
row.UpdaterOnlyColumns,
&cb.alloc,
&cb.evalCtx.Settings.SV,
cb.evalCtx.SessionData.Internal,
cb.evalCtx.SessionData().Internal,
)
if err != nil {
return roachpb.Key{}, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/buffer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (c *rowContainerHelper) init(
) {
distSQLCfg := &evalContext.DistSQLPlanner.distSQLSrv.ServerConfig
c.memMonitor = execinfra.NewLimitedMonitorNoFlowCtx(
evalContext.Context, evalContext.Mon, distSQLCfg, evalContext.SessionData,
evalContext.Context, evalContext.Mon, distSQLCfg, evalContext.SessionData(),
fmt.Sprintf("%s-limited", opName),
)
c.diskMonitor = execinfra.NewMonitor(evalContext.Context, distSQLCfg.ParentDiskMonitor, fmt.Sprintf("%s-disk", opName))
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/schemaexpr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func newNameResolver(
// unresolved names replaced with IndexedVars.
func (nr *nameResolver) resolveNames(expr tree.Expr) (tree.Expr, error) {
var v NameResolutionVisitor
return ResolveNamesUsingVisitor(&v, expr, nr.source, *nr.ivarHelper, nr.evalCtx.SessionData.SearchPath)
return ResolveNamesUsingVisitor(&v, expr, nr.source, *nr.ivarHelper, nr.evalCtx.SessionData().SearchPath)
}

// addColumn adds a new column to the nameResolver so that it can be resolved in
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (r opResult) createAndWrapRowSource(
return errors.New("processorConstructor is nil")
}
log.VEventf(ctx, 1, "planning a row-execution processor in the vectorized flow: %v", causeToWrap)
if err := canWrap(flowCtx.EvalCtx.SessionData.VectorizeMode, spec); err != nil {
if err := canWrap(flowCtx.EvalCtx.SessionData().VectorizeMode, spec); err != nil {
log.VEventf(ctx, 1, "planning a wrapped processor failed: %v", err)
// Return the original error for why we don't support this spec
// natively since it is more interesting.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1654,7 +1654,7 @@ func initCFetcher(

if err := fetcher.Init(
flowCtx.Codec(), allocator, args.memoryLimit, args.reverse, args.lockingStrength,
args.lockingWaitPolicy, flowCtx.EvalCtx.SessionData.LockTimeout, tableArgs,
args.lockingWaitPolicy, flowCtx.EvalCtx.SessionData().LockTimeout, tableArgs,
); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ func (s *vectorizedFlowCreator) setupFlow(
err = errors.Wrapf(err, "unable to vectorize execution plan")
return
}
if flowCtx.EvalCtx.SessionData.TestingVectorizeInjectPanics {
if flowCtx.EvalCtx.SessionData().TestingVectorizeInjectPanics {
result.Root = newPanicInjector(result.Root)
}
if util.CrdbTestBuild {
Expand Down
76 changes: 42 additions & 34 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func (h ConnectionHandler) GetUnqualifiedIntSize() *types.T {
if h.ex != nil {
// The executor will be nil in certain testing situations where
// no server is actually present.
size = h.ex.sessionData.DefaultIntSize
size = h.ex.sessionData().DefaultIntSize
}
return parser.NakedIntTypeFromDefaultIntSize(size)
}
Expand Down Expand Up @@ -742,14 +742,14 @@ func (s *Server) newConnExecutor(
sdMutator := new(sessionDataMutator)
*sdMutator = s.makeSessionDataMutator(sd, sdDefaults)
ex := &connExecutor{
server: s,
metrics: srvMetrics,
stmtBuf: stmtBuf,
clientComm: clientComm,
mon: sessionRootMon,
sessionMon: sessionMon,
sessionData: sd,
dataMutator: sdMutator,
server: s,
metrics: srvMetrics,
stmtBuf: stmtBuf,
clientComm: clientComm,
mon: sessionRootMon,
sessionMon: sessionMon,
sessionDataStack: sessiondata.NewStack(sd),
dataMutator: sdMutator,
state: txnState{
mon: txnMon,
connCtx: ctx,
Expand Down Expand Up @@ -791,7 +791,7 @@ func (s *Server) newConnExecutor(
ex.hasCreatedTemporarySchema = true
}

ex.applicationName.Store(ex.sessionData.ApplicationName)
ex.applicationName.Store(ex.sessionData().ApplicationName)
ex.statsWriter = statsWriter
ex.statsCollector = sslocal.NewStatsCollector(statsWriter, ex.phaseTimes)
sdMutator.RegisterOnSessionDataChange("application_name", func(newName string) {
Expand Down Expand Up @@ -1224,8 +1224,8 @@ type connExecutor struct {
hasAdminRoleCache HasAdminRoleCache
}

// sessionData contains the user-configurable connection variables.
sessionData *sessiondata.SessionData
// sessionDataStack contains the user-configurable connection variables.
sessionDataStack *sessiondata.Stack
// dataMutator is nil for session-bound internal executors; we shouldn't issue
// statements that manipulate session state to an internal executor.
dataMutator *sessionDataMutator
Expand Down Expand Up @@ -1425,7 +1425,7 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) err
ex.extraTxnState.jobs = nil
ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{}
ex.extraTxnState.schemaChangerState = SchemaChangerState{
mode: ex.sessionData.NewSchemaChangerMode,
mode: ex.sessionData().NewSchemaChangerMode,
}

for k := range ex.extraTxnState.schemaChangeJobRecords {
Expand Down Expand Up @@ -1477,6 +1477,14 @@ func (ex *connExecutor) Ctx() context.Context {
return ctx
}

// sessionData returns the top SessionData on the executor.
func (ex *connExecutor) sessionData() *sessiondata.SessionData {
if ex.sessionDataStack == nil {
return nil
}
return ex.sessionDataStack.Top()
}

// activate engages the use of resources that must be cleaned up
// afterwards. after activate() completes, the close() method must be
// called.
Expand All @@ -1499,11 +1507,11 @@ func (ex *connExecutor) activate(
// Enable the trace if configured.
if traceSessionEventLogEnabled.Get(&ex.server.cfg.Settings.SV) {
remoteStr := "<admin>"
if ex.sessionData.RemoteAddr != nil {
remoteStr = ex.sessionData.RemoteAddr.String()
if ex.sessionData().RemoteAddr != nil {
remoteStr = ex.sessionData().RemoteAddr.String()
}
ex.eventLog = trace.NewEventLog(
fmt.Sprintf("sql session [%s]", ex.sessionData.User()), remoteStr)
fmt.Sprintf("sql session [%s]", ex.sessionData().User()), remoteStr)
}

ex.activated = true
Expand Down Expand Up @@ -1637,8 +1645,8 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
NeedRowDesc,
pos,
nil, /* formatCodes */
ex.sessionData.DataConversionConfig,
ex.sessionData.GetLocation(),
ex.sessionData().DataConversionConfig,
ex.sessionData().GetLocation(),
0, /* limit */
"", /* portalName */
ex.implicitTxn(),
Expand Down Expand Up @@ -1706,8 +1714,8 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
// needed.
DontNeedRowDesc,
pos, portal.OutFormats,
ex.sessionData.DataConversionConfig,
ex.sessionData.GetLocation(),
ex.sessionData().DataConversionConfig,
ex.sessionData().GetLocation(),
tcmd.Limit,
portalName,
ex.implicitTxn(),
Expand Down Expand Up @@ -2299,7 +2307,7 @@ func txnPriorityToProto(mode tree.UserPriority) roachpb.UserPriority {

func (ex *connExecutor) txnPriorityWithSessionDefault(mode tree.UserPriority) roachpb.UserPriority {
if mode == tree.UnspecifiedUserPriority {
mode = tree.UserPriority(ex.sessionData.DefaultTxnPriority)
mode = tree.UserPriority(ex.sessionData().DefaultTxnPriority)
}
return txnPriorityToProto(mode)
}
Expand All @@ -2308,7 +2316,7 @@ func (ex *connExecutor) readWriteModeWithSessionDefault(
mode tree.ReadWriteMode,
) tree.ReadWriteMode {
if mode == tree.UnspecifiedReadWriteMode {
if ex.sessionData.DefaultTxnReadOnly {
if ex.sessionData().DefaultTxnReadOnly {
return tree.ReadOnly
}
return tree.ReadWrite
Expand All @@ -2327,7 +2335,7 @@ var followerReadTimestampExpr = &tree.FuncExpr{

func (ex *connExecutor) asOfClauseWithSessionDefault(expr tree.AsOfClause) tree.AsOfClause {
if expr.Expr == nil {
if ex.sessionData.DefaultTxnUseFollowerReads {
if ex.sessionData().DefaultTxnUseFollowerReads {
return tree.AsOfClause{Expr: followerReadTimestampExpr}
}
return tree.AsOfClause{}
Expand All @@ -2345,7 +2353,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
ex.memMetrics,
ex.server.cfg.Settings,
)
ie.SetSessionData(ex.sessionData)
ie.SetSessionData(ex.sessionData())

*evalCtx = extendedEvalContext{
EvalContext: tree.EvalContext{
Expand All @@ -2357,7 +2365,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
Tenant: p,
JoinTokenCreator: p,
PreparedStatementState: &ex.extraTxnState.prepStmtsNamespace,
SessionData: ex.sessionData,
SessionDataStack: ex.sessionDataStack,
Settings: ex.server.cfg.Settings,
TestingKnobs: ex.server.cfg.EvalContextTestingKnobs,
ClusterID: ex.server.cfg.ClusterID(),
Expand Down Expand Up @@ -2472,14 +2480,14 @@ func (ex *connExecutor) resetPlanner(
p.cancelChecker.Reset(ctx)

p.semaCtx = tree.MakeSemaContext()
p.semaCtx.SearchPath = ex.sessionData.SearchPath
p.semaCtx.IntervalStyleEnabled = ex.sessionData.IntervalStyleEnabled
p.semaCtx.DateStyleEnabled = ex.sessionData.DateStyleEnabled
p.semaCtx.SearchPath = ex.sessionData().SearchPath
p.semaCtx.IntervalStyleEnabled = ex.sessionData().IntervalStyleEnabled
p.semaCtx.DateStyleEnabled = ex.sessionData().DateStyleEnabled
p.semaCtx.Annotations = nil
p.semaCtx.TypeResolver = p
p.semaCtx.TableNameResolver = p
p.semaCtx.DateStyle = ex.sessionData.GetDateStyle()
p.semaCtx.IntervalStyle = ex.sessionData.GetIntervalStyle()
p.semaCtx.DateStyle = ex.sessionData().GetDateStyle()
p.semaCtx.IntervalStyle = ex.sessionData().GetIntervalStyle()

ex.resetEvalCtx(&p.extendedEvalCtx, txn, stmtTS)

Expand Down Expand Up @@ -2692,7 +2700,7 @@ func (ex *connExecutor) cancelSession() {

// user is part of the registrySession interface.
func (ex *connExecutor) user() security.SQLUsername {
return ex.sessionData.User()
return ex.sessionData().User()
}

// serialize is part of the registrySession interface.
Expand Down Expand Up @@ -2768,12 +2776,12 @@ func (ex *connExecutor) serialize() serverpb.Session {
}

remoteStr := "<admin>"
if ex.sessionData.RemoteAddr != nil {
remoteStr = ex.sessionData.RemoteAddr.String()
if ex.sessionData().RemoteAddr != nil {
remoteStr = ex.sessionData().RemoteAddr.String()
}

return serverpb.Session{
Username: ex.sessionData.User().Normalized(),
Username: ex.sessionData().User().Normalized(),
ClientAddress: remoteStr,
ApplicationName: ex.applicationName.Load().(string),
Start: ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionInit).UTC(),
Expand Down
Loading

0 comments on commit d42e428

Please sign in to comment.