Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
69394: sql: refactor sessionDataMutator as sessionDataMutatorIterator r=rafiss a=otan

Part of a body of work for SET LOCAL (refs cockroachdb#32562)

See individual commits for details.

Release justification: high pri work

Co-authored-by: Oliver Tan <otan@cockroachlabs.com>
  • Loading branch information
craig[bot] and otan committed Aug 26, 2021
2 parents d0d0749 + d7d05b2 commit 2e949fa
Show file tree
Hide file tree
Showing 15 changed files with 304 additions and 154 deletions.
3 changes: 1 addition & 2 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1791,8 +1791,7 @@ func (r *importResumer) parseBundleSchemaIfNeeded(ctx context.Context, phs inter

owner := r.job.Payload().UsernameProto.Decode()

sessionMutator := p.ExtendedEvalContext().SessionMutator
sessionMutator.SetDefaultIntSize(details.DefaultIntSize)
p.ExtendedEvalContext().SessionMutatorIterator.SetSessionDefaultIntSize(details.DefaultIntSize)

if details.ParseBundleSchema {
var span *tracing.Span
Expand Down
61 changes: 35 additions & 26 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,11 +582,13 @@ func (s *Server) SetupConn(
memMetrics MemoryMetrics,
) (ConnectionHandler, error) {
sd := s.newSessionData(args)

sds := sessiondata.NewStack(sd)
// Set the SessionData from args.SessionDefaults. This also validates the
// respective values.
sdMut := s.makeSessionDataMutator(sd, args.SessionDefaults)
if err := resetSessionVars(ctx, &sdMut); err != nil {
sdMutIterator := s.makeSessionDataMutatorIterator(sds, args.SessionDefaults)
if err := sdMutIterator.forEachMutatorError(func(m *sessionDataMutator) error {
return resetSessionVars(ctx, m)
}); err != nil {
log.Errorf(ctx, "error setting up client session: %s", err)
return ConnectionHandler{}, err
}
Expand Down Expand Up @@ -627,7 +629,7 @@ func (h ConnectionHandler) GetParamStatus(ctx context.Context, varName string) s
log.Fatalf(ctx, "programming error: status param %q must be defined session var", varName)
return ""
}
hasDefault, defVal := getSessionVarDefaultString(name, v, h.ex.dataMutator)
hasDefault, defVal := getSessionVarDefaultString(name, v, h.ex.dataMutatorIterator.sessionDataMutatorBase)
if !hasDefault {
log.Fatalf(ctx, "programming error: status param %q must have a default value", varName)
return ""
Expand Down Expand Up @@ -672,14 +674,16 @@ func (s *Server) newSessionData(args SessionArgs) *sessiondata.SessionData {
return sd
}

func (s *Server) makeSessionDataMutator(
sd *sessiondata.SessionData, defaults SessionDefaults,
) sessionDataMutator {
return sessionDataMutator{
data: sd,
defaults: defaults,
settings: s.cfg.Settings,
paramStatusUpdater: &noopParamStatusUpdater{},
func (s *Server) makeSessionDataMutatorIterator(
sds *sessiondata.Stack, defaults SessionDefaults,
) *sessionDataMutatorIterator {
return &sessionDataMutatorIterator{
sds: sds,
sessionDataMutatorBase: sessionDataMutatorBase{
defaults: defaults,
settings: s.cfg.Settings,
},
sessionDataMutatorCallbacks: sessionDataMutatorCallbacks{},
}
}

Expand Down Expand Up @@ -739,8 +743,6 @@ func (s *Server) newConnExecutor(
)

nodeIDOrZero, _ := s.cfg.NodeID.OptionalNodeID()
sdMutator := new(sessionDataMutator)
*sdMutator = s.makeSessionDataMutator(sd, sdDefaults)
ex := &connExecutor{
server: s,
metrics: srvMetrics,
Expand All @@ -749,7 +751,6 @@ func (s *Server) newConnExecutor(
mon: sessionRootMon,
sessionMon: sessionMon,
sessionDataStack: sessiondata.NewStack(sd),
dataMutator: sdMutator,
state: txnState{
mon: txnMon,
connCtx: ctx,
Expand Down Expand Up @@ -781,20 +782,23 @@ func (s *Server) newConnExecutor(

ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount

ex.dataMutatorIterator = s.makeSessionDataMutatorIterator(
ex.sessionDataStack,
sdDefaults,
)
// The transaction_read_only variable is special; its updates need to be
// hooked-up to the executor.
sdMutator.setCurTxnReadOnly = func(val bool) {
ex.dataMutatorIterator.setCurTxnReadOnly = func(val bool) {
ex.state.readOnly = val
}

sdMutator.onTempSchemaCreation = func() {
ex.dataMutatorIterator.onTempSchemaCreation = func() {
ex.hasCreatedTemporarySchema = true
}

ex.applicationName.Store(ex.sessionData().ApplicationName)
ex.statsWriter = statsWriter
ex.statsCollector = sslocal.NewStatsCollector(statsWriter, ex.phaseTimes)
sdMutator.RegisterOnSessionDataChange("application_name", func(newName string) {
ex.dataMutatorIterator.RegisterOnSessionDataChange("application_name", func(newName string) {
ex.applicationName.Store(newName)
ex.statsWriter = ex.server.sqlStats.GetWriterForApplication(newName)
})
Expand Down Expand Up @@ -852,7 +856,9 @@ func (s *Server) newConnExecutorWithTxn(
if txn.Type() == kv.LeafTxn {
// If the txn is a leaf txn it is not allowed to perform mutations. For
// sanity, set read only on the session.
ex.dataMutator.SetReadOnly(true)
ex.dataMutatorIterator.forEachMutator(func(m *sessionDataMutator) {
m.SetReadOnly(true)
})
}

// The new transaction stuff below requires active monitors and traces, so
Expand Down Expand Up @@ -1226,9 +1232,10 @@ type connExecutor struct {

// sessionDataStack contains the user-configurable connection variables.
sessionDataStack *sessiondata.Stack
// dataMutator is nil for session-bound internal executors; we shouldn't issue
// statements that manipulate session state to an internal executor.
dataMutator *sessionDataMutator
// dataMutatorIterator is nil for session-bound internal executors; we
// shouldn't issue statements that manipulate session state to an internal
// executor.
dataMutatorIterator *sessionDataMutatorIterator

// statsWriter is a writer interface for recording per-application SQL usage
// statistics. It is maintained to represent statistics for the application
Expand Down Expand Up @@ -1477,7 +1484,9 @@ func (ex *connExecutor) Ctx() context.Context {
return ctx
}

// sessionData returns the top SessionData on the executor.
// sessionData returns the top SessionData in the executor's sessionDataStack.
// This should be how callers should reference SessionData objects, as it
// will always contain the "latest" SessionData object in the transaction.
func (ex *connExecutor) sessionData() *sessiondata.SessionData {
if ex.sessionDataStack == nil {
return nil
Expand Down Expand Up @@ -2380,7 +2389,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
SQLStatsController: ex.server.sqlStatsController,
CompactEngineSpan: ex.server.cfg.CompactEngineSpanFunc,
},
SessionMutator: ex.dataMutator,
SessionMutatorIterator: ex.dataMutatorIterator,
VirtualSchemas: ex.server.cfg.VirtualSchemas,
Tracing: &ex.sessionTracing,
NodesStatusServer: ex.server.cfg.NodesStatusServer,
Expand Down Expand Up @@ -2462,7 +2471,7 @@ func (ex *connExecutor) initPlanner(ctx context.Context, p *planner) {

ex.initEvalCtx(ctx, &p.extendedEvalCtx, p)

p.sessionDataMutator = ex.dataMutator
p.sessionDataMutatorIterator = ex.dataMutatorIterator
p.noticeSender = nil
p.preparedStatements = ex.getPrepStmtsAccessor()

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (ex *connExecutor) execStmtInOpenState(
stmtTS := ex.server.cfg.Clock.PhysicalTime()
ex.statsCollector.Reset(ex.statsWriter, ex.phaseTimes)
ex.resetPlanner(ctx, p, ex.state.mu.txn, stmtTS)
p.sessionDataMutator.paramStatusUpdater = res
p.sessionDataMutatorIterator.paramStatusUpdater = res
p.noticeSender = res
ih := &p.instrumentation

Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/discard.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ func (p *planner) Discard(ctx context.Context, s *tree.Discard) (planNode, error
}

// RESET ALL
if err := resetSessionVars(ctx, p.sessionDataMutator); err != nil {
if err := p.sessionDataMutatorIterator.forEachMutatorError(
func(m *sessionDataMutator) error {
return resetSessionVars(ctx, m)
},
); err != nil {
return nil, err
}

Expand All @@ -46,7 +50,7 @@ func resetSessionVars(ctx context.Context, m *sessionDataMutator) error {
for _, varName := range varNames {
v := varGen[varName]
if v.Set != nil {
hasDefault, defVal := getSessionVarDefaultString(varName, v, m)
hasDefault, defVal := getSessionVarDefaultString(varName, v, m.sessionDataMutatorBase)
if hasDefault {
if err := v.Set(ctx, m, defVal); err != nil {
return err
Expand Down
126 changes: 102 additions & 24 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2391,38 +2391,108 @@ type paramStatusUpdater interface {
BufferParamStatusUpdate(string, string)
}

// noopParamStatusUpdater implements paramStatusUpdater by performing a no-op.
type noopParamStatusUpdater struct{}

var _ paramStatusUpdater = (*noopParamStatusUpdater)(nil)

func (noopParamStatusUpdater) BufferParamStatusUpdate(string, string) {}
// sessionDataMutatorBase contains elements in a sessionDataMutator
// which is the same across all SessionData elements in the sessiondata.Stack.
type sessionDataMutatorBase struct {
defaults SessionDefaults
settings *cluster.Settings
}

// sessionDataMutator is the interface used by sessionVars to change the session
// state. It mostly mutates the Session's SessionData, but not exclusively (e.g.
// see curTxnReadOnly).
type sessionDataMutator struct {
data *sessiondata.SessionData
defaults SessionDefaults
settings *cluster.Settings
// RegisterOnSessionDataChange adds a listener to execute when a change on the
// given key is made using the mutator object.
func (it *sessionDataMutatorIterator) RegisterOnSessionDataChange(key string, f func(val string)) {
if it.onSessionDataChangeListeners == nil {
it.onSessionDataChangeListeners = make(map[string][]func(val string))
}
it.onSessionDataChangeListeners[key] = append(it.onSessionDataChangeListeners[key], f)
}

// sessionDataMutatorCallbacks contains elements in a sessionDataMutator
// which are only populated when mutating the "top" sessionData element.
// It is intended for functions which should only be called once per SET
// (e.g. param status updates, which only should be sent once within
// a transaction where there may be two or more SessionData elements in
// the stack)
type sessionDataMutatorCallbacks struct {
// paramStatusUpdater is called when there is a ParamStatusUpdate.
// It can be nil, in which case nothing triggers on execution.
paramStatusUpdater paramStatusUpdater
// setCurTxnReadOnly is called when we execute SET transaction_read_only = ...
// It can be nil, in which case nothing triggers on execution.
setCurTxnReadOnly func(val bool)
// onTempSchemaCreation is called when the temporary schema is set
// on the search path (the first and only time).
// It can be nil, in which case nothing triggers on execution.
onTempSchemaCreation func()
// onSessionDataChangeListeners stores all the observers to execute when
// session data is modified, keyed by the value to change on.
onSessionDataChangeListeners map[string][]func(val string)
}

// RegisterOnSessionDataChange adds a listener to execute when a change on the
// given key is made using the mutator object.
func (m *sessionDataMutator) RegisterOnSessionDataChange(key string, f func(val string)) {
if m.onSessionDataChangeListeners == nil {
m.onSessionDataChangeListeners = make(map[string][]func(val string))
// sessionDataMutatorIterator generates sessionDataMutators which allow
// the changing of SessionData on some element inside the sessiondata Stack.
type sessionDataMutatorIterator struct {
sessionDataMutatorBase
sds *sessiondata.Stack
sessionDataMutatorCallbacks
}

// mutator returns a mutator for the given sessionData.
func (it *sessionDataMutatorIterator) mutator(
applyCallbacks bool, sd *sessiondata.SessionData,
) *sessionDataMutator {
ret := &sessionDataMutator{
data: sd,
sessionDataMutatorBase: it.sessionDataMutatorBase,
}
// We usually apply callbacks on the first element in the stack, as the txn
// rollback will always reset to the first element we touch in the stack,
// in which case it should be up-to-date by default.
if applyCallbacks {
ret.sessionDataMutatorCallbacks = it.sessionDataMutatorCallbacks
}
m.onSessionDataChangeListeners[key] = append(m.onSessionDataChangeListeners[key], f)
return ret
}

// SetSessionDefaultIntSize sets the default int size for the session.
// It is exported for use in import which is a CCL package.
func (it *sessionDataMutatorIterator) SetSessionDefaultIntSize(size int32) {
it.forEachMutator(func(m *sessionDataMutator) {
m.SetDefaultIntSize(size)
})
}

// forEachMutator iterates over each mutator over all SessionData elements
// in the stack and applies the given function to them.
// It is the equivalent of SET SESSION x = y.
func (it *sessionDataMutatorIterator) forEachMutator(applyFunc func(m *sessionDataMutator)) {
elems := it.sds.Elems()
for i, sd := range elems {
applyFunc(it.mutator(i == 0, sd))
}
}

// forEachMutatorError is the same as forEachMutator, but takes in a function
// that can return an error, erroring if any of applications error.
func (it *sessionDataMutatorIterator) forEachMutatorError(
applyFunc func(m *sessionDataMutator) error,
) error {
elems := it.sds.Elems()
for i, sd := range elems {
if err := applyFunc(it.mutator(i == 0, sd)); err != nil {
return err
}
}
return nil
}

// sessionDataMutator is the object used by sessionVars to change the session
// state. It mostly mutates the session's SessionData, but not exclusively (e.g.
// see curTxnReadOnly).
type sessionDataMutator struct {
data *sessiondata.SessionData
sessionDataMutatorBase
sessionDataMutatorCallbacks
}

func (m *sessionDataMutator) notifyOnDataChangeListeners(key string, val string) {
Expand All @@ -2431,11 +2501,17 @@ func (m *sessionDataMutator) notifyOnDataChangeListeners(key string, val string)
}
}

func (m *sessionDataMutator) bufferParamStatusUpdate(param string, status string) {
if m.paramStatusUpdater != nil {
m.paramStatusUpdater.BufferParamStatusUpdate(param, status)
}
}

// SetApplicationName sets the application name.
func (m *sessionDataMutator) SetApplicationName(appName string) {
m.data.ApplicationName = appName
m.notifyOnDataChangeListeners("application_name", appName)
m.paramStatusUpdater.BufferParamStatusUpdate("application_name", appName)
m.bufferParamStatusUpdate("application_name", appName)
}

func (m *sessionDataMutator) SetBytesEncodeFormat(val sessiondatapb.BytesEncodeFormat) {
Expand All @@ -2451,7 +2527,9 @@ func (m *sessionDataMutator) SetDatabase(dbName string) {
}

func (m *sessionDataMutator) SetTemporarySchemaName(scName string) {
m.onTempSchemaCreation()
if m.onTempSchemaCreation != nil {
m.onTempSchemaCreation()
}
m.data.SearchPath = m.data.SearchPath.WithTemporarySchemaName(scName)
}

Expand Down Expand Up @@ -2570,7 +2648,7 @@ func (m *sessionDataMutator) UpdateSearchPath(paths []string) {

func (m *sessionDataMutator) SetLocation(loc *time.Location) {
m.data.Location = loc
m.paramStatusUpdater.BufferParamStatusUpdate("TimeZone", sessionDataTimeZoneFormat(loc))
m.bufferParamStatusUpdate("TimeZone", sessionDataTimeZoneFormat(loc))
}

func (m *sessionDataMutator) SetReadOnly(val bool) {
Expand Down Expand Up @@ -2674,13 +2752,13 @@ func (m *sessionDataMutator) initSequenceCache() {
// SetIntervalStyle sets the IntervalStyle for the given session.
func (m *sessionDataMutator) SetIntervalStyle(style duration.IntervalStyle) {
m.data.DataConversionConfig.IntervalStyle = style
m.paramStatusUpdater.BufferParamStatusUpdate("IntervalStyle", strings.ToLower(style.String()))
m.bufferParamStatusUpdate("IntervalStyle", strings.ToLower(style.String()))
}

// SetDateStyle sets the DateStyle for the given session.
func (m *sessionDataMutator) SetDateStyle(style pgdate.DateStyle) {
m.data.DataConversionConfig.DateStyle = style
m.paramStatusUpdater.BufferParamStatusUpdate("DateStyle", style.SQLString())
m.bufferParamStatusUpdate("DateStyle", style.SQLString())
}

// SetIntervalStyleEnabled sets the IntervalStyleEnabled for the given session.
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/pg_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2398,7 +2398,11 @@ https://www.postgresql.org/docs/9.5/catalog-pg-settings.html`,
globalDefVal := gen.GlobalDefault(&p.EvalContext().Settings.SV)
bootDatum = tree.NewDString(globalDefVal)
}
if hasDefault, defVal := getSessionVarDefaultString(vName, gen, p.sessionDataMutator); hasDefault {
if hasDefault, defVal := getSessionVarDefaultString(
vName,
gen,
p.sessionDataMutatorIterator.sessionDataMutatorBase,
); hasDefault {
resetDatum = tree.NewDString(defVal)
}
}
Expand Down
Loading

0 comments on commit 2e949fa

Please sign in to comment.