Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: fix the lifetime of portals #33423

Merged
merged 1 commit into from
Jan 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,13 @@ func (m *MockTransactionalSender) AugmentMeta(context.Context, roachpb.TxnCoordM
}

// OnFinish is part of the TxnSender interface.
func (m *MockTransactionalSender) OnFinish(_ func(error)) { panic("unimplemented") }
func (m *MockTransactionalSender) OnFinish(f func(error)) {
// We accept the nil, as that's commonly used to reset a previously-set
// closure.
if f != nil {
panic("unimplemented")
}
}

// SetSystemConfigTrigger is part of the TxnSender interface.
func (m *MockTransactionalSender) SetSystemConfigTrigger() error { panic("unimplemented") }
Expand Down Expand Up @@ -346,7 +352,7 @@ func (m *MockTransactionalSender) ManualRestart(

// IsSerializablePushAndRefreshNotPossible is part of the TxnSender interface.
func (m *MockTransactionalSender) IsSerializablePushAndRefreshNotPossible() bool {
panic("unimplemented")
return false
}

// Epoch is part of the TxnSender interface.
Expand Down
137 changes: 66 additions & 71 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (h ConnectionHandler) GetStatusParam(ctx context.Context, varName string) s
}

// ServeConn serves a client connection by reading commands from
// the stmtBuf embedded in the connHandler.
// the stmtBuf embedded in the ConnHandler.
func (s *Server) ServeConn(
ctx context.Context, h ConnectionHandler, reserved mon.BoundAccount, cancel context.CancelFunc,
) error {
Expand Down Expand Up @@ -495,10 +495,6 @@ func (s *Server) newConnExecutor(
mon: &sessionRootMon,
sessionMon: &sessionMon,
sessionData: sd,
prepStmtsNamespace: prepStmtNamespace{
prepStmts: make(map[string]prepStmtEntry),
portals: make(map[string]portalEntry),
},
state: txnState{
mon: &txnMon,
connCtx: ctx,
Expand Down Expand Up @@ -564,6 +560,14 @@ func (s *Server) newConnExecutor(
}

ex.phaseTimes[sessionInit] = timeutil.Now()
ex.extraTxnState.prepStmtsNamespace = prepStmtNamespace{
prepStmts: make(map[string]*PreparedStatement),
portals: make(map[string]*PreparedPortal),
}
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos = prepStmtNamespace{
prepStmts: make(map[string]*PreparedStatement),
portals: make(map[string]*PreparedPortal),
}
ex.extraTxnState.tables = TableCollection{
leaseMgr: s.cfg.LeaseManager,
databaseCache: s.dbCache.getDatabaseCache(),
Expand Down Expand Up @@ -744,10 +748,9 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
}

if closeType != panicClose {
// Close all statements and prepared portals by first unifying the namespaces
// and the closing what remains.
ex.commitPrepStmtNamespace(ctx)
ex.prepStmtsNamespace.resetTo(ctx, &prepStmtNamespace{})
// Close all statements and prepared portals.
ex.extraTxnState.prepStmtsNamespace.resetTo(ctx, prepStmtNamespace{})
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.resetTo(ctx, prepStmtNamespace{})
}

if ex.sessionTracing.Enabled() {
Expand Down Expand Up @@ -847,6 +850,16 @@ type connExecutor struct {
// Set via setTxnRewindPos().
txnRewindPos CmdPos

// prepStmtNamespace contains the prepared statements and portals that the
// session currently has access to.
// Portals are bound to a transaction and they're all destroyed once the
// transaction finishes.
// Prepared statements are not transactional and so it's a bit weird that
// they're part of extraTxnState, but it's convenient to put them here
// because they need the same kind of "snapshoting" as the portals (see
// prepStmtsNamespaceAtTxnRewindPos).
prepStmtsNamespace prepStmtNamespace

// prepStmtsNamespaceAtTxnRewindPos is a snapshot of the prep stmts/portals
// (ex.prepStmtsNamespace) before processing the command at position
// txnRewindPos.
Expand Down Expand Up @@ -896,10 +909,6 @@ type connExecutor struct {
// running on this connection.
parallelizeQueue ParallelizeQueue

// prepStmtNamespace contains the prepared statements and portals that the
// session currently has access to.
prepStmtsNamespace prepStmtNamespace

// mu contains of all elements of the struct that can be changed
// after initialization, and may be accessed from another thread.
mu struct {
Expand Down Expand Up @@ -957,68 +966,50 @@ func (ch *ctxHolder) unhijack() {
type prepStmtNamespace struct {
// prepStmts contains the prepared statements currently available on the
// session.
prepStmts map[string]prepStmtEntry
prepStmts map[string]*PreparedStatement
// portals contains the portals currently available on the session.
portals map[string]portalEntry
portals map[string]*PreparedPortal
}

type prepStmtEntry struct {
*PreparedStatement
portals map[string]struct{}
}

func (pe *prepStmtEntry) copy() prepStmtEntry {
cpy := prepStmtEntry{}
cpy.PreparedStatement = pe.PreparedStatement
cpy.portals = make(map[string]struct{})
for pname := range pe.portals {
cpy.portals[pname] = struct{}{}
func (ns prepStmtNamespace) String() string {
var sb strings.Builder
sb.WriteString("Prep stmts: ")
for name := range ns.prepStmts {
sb.WriteString(name + " ")
}
return cpy
}

type portalEntry struct {
*PreparedPortal
psName string
sb.WriteString("Portals: ")
for name := range ns.portals {
sb.WriteString(name + " ")
}
return sb.String()
}

// resetTo resets a namespace to equate another one (`to`). Prep stmts and portals
// that are present in ns but not in to are deallocated.
// resetTo resets a namespace to equate another one (`to`). All the receiver's
// references are release and all the to's references are duplicated.
//
// A (pointer to) empty `to` can be passed in to deallocate everything.
func (ns *prepStmtNamespace) resetTo(ctx context.Context, to *prepStmtNamespace) {
for name, ps := range ns.prepStmts {
bps, ok := to.prepStmts[name]
// If the prepared statement didn't exist before (including if a statement
// with the same name existed, but it was different), close it.
if !ok || bps.PreparedStatement != ps.PreparedStatement {
ps.close(ctx)
}
// An empty `to` can be passed in to deallocate everything.
func (ns *prepStmtNamespace) resetTo(ctx context.Context, to prepStmtNamespace) {
for name, p := range ns.prepStmts {
p.decRef(ctx)
delete(ns.prepStmts, name)
}
for name, p := range ns.portals {
bp, ok := to.portals[name]
// If the prepared statement didn't exist before (including if a statement
// with the same name existed, but it was different), close it.
if !ok || bp.PreparedPortal != p.PreparedPortal {
p.close(ctx)
}
p.decRef(ctx)
delete(ns.portals, name)
}
*ns = to.copy()
}

func (ns *prepStmtNamespace) copy() prepStmtNamespace {
var cpy prepStmtNamespace
cpy.prepStmts = make(map[string]prepStmtEntry)
for name, psEntry := range ns.prepStmts {
cpy.prepStmts[name] = psEntry.copy()
for name, ps := range to.prepStmts {
ps.incRef(ctx)
ns.prepStmts[name] = ps
}
cpy.portals = make(map[string]portalEntry)
for name, p := range ns.portals {
cpy.portals[name] = p
for name, p := range to.portals {
p.incRef(ctx)
ns.portals[name] = p
}
return cpy
}

// resetExtraTxnState resets the fields of ex.extraTxnState when a transaction
// commits, rolls back or restarts.
func (ex *connExecutor) resetExtraTxnState(
ctx context.Context, dbCacheHolder *databaseCacheHolder,
) error {
Expand All @@ -1029,6 +1020,13 @@ func (ex *connExecutor) resetExtraTxnState(
ex.extraTxnState.tables.databaseCache = dbCacheHolder.getDatabaseCache()

ex.extraTxnState.autoRetryCounter = 0

// Close all portals.
for name, p := range ex.extraTxnState.prepStmtsNamespace.portals {
p.decRef(ctx)
delete(ex.extraTxnState.prepStmtsNamespace.portals, name)
}

return nil
}

Expand Down Expand Up @@ -1179,7 +1177,7 @@ func (ex *connExecutor) run(
// ExecPortal is handled like ExecStmt, except that the placeholder info
// is taken from the portal.

portal, ok := ex.prepStmtsNamespace.portals[tcmd.Name]
portal, ok := ex.extraTxnState.prepStmtsNamespace.portals[tcmd.Name]
if !ok {
err := pgerror.NewErrorf(
pgerror.CodeInvalidCursorNameError, "unknown portal %q", tcmd.Name)
Expand Down Expand Up @@ -1426,7 +1424,7 @@ func (ex *connExecutor) updateTxnRewindPosMaybe(
case ExecStmt:
canAdvance = ex.stmtDoesntNeedRetry(tcmd.AST)
case ExecPortal:
portal := ex.prepStmtsNamespace.portals[tcmd.Name]
portal := ex.extraTxnState.prepStmtsNamespace.portals[tcmd.Name]
canAdvance = ex.stmtDoesntNeedRetry(portal.Stmt.AST)
case PrepareStmt:
canAdvance = true
Expand Down Expand Up @@ -1593,14 +1591,14 @@ func (ex *connExecutor) generateID() ClusterWideID {
// prepStmtsNamespaceAtTxnRewindPos that's not part of prepStmtsNamespace.
func (ex *connExecutor) commitPrepStmtNamespace(ctx context.Context) {
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.resetTo(
ctx, &ex.prepStmtsNamespace)
ctx, ex.extraTxnState.prepStmtsNamespace)
}

// commitPrepStmtNamespace deallocates everything in prepStmtsNamespace that's
// not part of prepStmtsNamespaceAtTxnRewindPos.
func (ex *connExecutor) rewindPrepStmtNamespace(ctx context.Context) {
ex.prepStmtsNamespace.resetTo(
ctx, &ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos)
ex.extraTxnState.prepStmtsNamespace.resetTo(
ctx, ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos)
}

// getRewindTxnCapability checks whether rewinding to the position previously
Expand Down Expand Up @@ -2194,8 +2192,8 @@ var _ preparedStatementsAccessor = connExPrepStmtsAccessor{}

// Get is part of the preparedStatementsAccessor interface.
func (ps connExPrepStmtsAccessor) Get(name string) (*PreparedStatement, bool) {
s, ok := ps.ex.prepStmtsNamespace.prepStmts[name]
return s.PreparedStatement, ok
s, ok := ps.ex.extraTxnState.prepStmtsNamespace.prepStmts[name]
return s, ok
}

// Delete is part of the preparedStatementsAccessor interface.
Expand All @@ -2210,10 +2208,7 @@ func (ps connExPrepStmtsAccessor) Delete(ctx context.Context, name string) bool

// DeleteAll is part of the preparedStatementsAccessor interface.
func (ps connExPrepStmtsAccessor) DeleteAll(ctx context.Context) {
ps.ex.prepStmtsNamespace = prepStmtNamespace{
prepStmts: make(map[string]prepStmtEntry),
portals: make(map[string]portalEntry),
}
ps.ex.extraTxnState.prepStmtsNamespace.resetTo(ctx, prepStmtNamespace{})
}

// contextStatementKey is an empty type for the handle associated with the
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (ex *connExecutor) execStmtInOpenState(
// This is handling the SQL statement "PREPARE". See execPrepare for
// handling of the protocol-level command for preparing statements.
name := s.Name.String()
if _, ok := ex.prepStmtsNamespace.prepStmts[name]; ok {
if _, ok := ex.extraTxnState.prepStmtsNamespace.prepStmts[name]; ok {
err := pgerror.NewErrorf(
pgerror.CodeDuplicatePreparedStatementError,
"prepared statement %q already exists", name,
Expand Down Expand Up @@ -316,7 +316,7 @@ func (ex *connExecutor) execStmtInOpenState(
// Replace the `EXECUTE foo` statement with the prepared statement, and
// continue execution below.
name := s.Name.String()
ps, ok := ex.prepStmtsNamespace.prepStmts[name]
ps, ok := ex.extraTxnState.prepStmtsNamespace.prepStmts[name]
if !ok {
err := pgerror.NewErrorf(
pgerror.CodeInvalidSQLStatementNameError,
Expand All @@ -325,13 +325,13 @@ func (ex *connExecutor) execStmtInOpenState(
return makeErrEvent(err)
}
var err error
pinfo, err = fillInPlaceholders(ps.PreparedStatement, name, s.Params, ex.sessionData.SearchPath)
pinfo, err = fillInPlaceholders(ps, name, s.Params, ex.sessionData.SearchPath)
if err != nil {
return makeErrEvent(err)
}

stmt.Statement = ps.Statement
stmt.Prepared = ps.PreparedStatement
stmt.Prepared = ps
stmt.ExpectedTypes = ps.Columns
stmt.AnonymizedStr = ps.AnonymizedStr
res.ResetStmtType(ps.AST)
Expand Down
Loading