Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: sql: update connExecutor logic for pausable portals #101026

Merged
merged 8 commits into from
Apr 10, 2023
1 change: 0 additions & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ sql.multiple_modifications_of_table.enabled boolean false if true, allow stateme
sql.multiregion.drop_primary_region.enabled boolean true allows dropping the PRIMARY REGION of a database if it is the last region
sql.notices.enabled boolean true enable notices in the server/client protocol being sent
sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled boolean false if enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability
sql.pgwire.multiple_active_portals.enabled boolean false if true, portals with read-only SELECT query without sub/post queries can be executed in interleaving manner, but with local execution plan
sql.schema.telemetry.recurrence string @weekly cron-tab recurrence for SQL schema telemetry job
sql.show_ranges_deprecated_behavior.enabled boolean true if set, SHOW RANGES and crdb_internal.ranges{_no_leases} behave with deprecated pre-v23.1 semantics. NB: the new SHOW RANGES interface has richer WITH options than pre-v23.1 SHOW RANGES.
sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enables the use of certain experimental box2d comparison operators
Expand Down
1 change: 0 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@
<tr><td><div id="setting-sql-multiregion-drop-primary-region-enabled" class="anchored"><code>sql.multiregion.drop_primary_region.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>allows dropping the PRIMARY REGION of a database if it is the last region</td></tr>
<tr><td><div id="setting-sql-notices-enabled" class="anchored"><code>sql.notices.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>enable notices in the server/client protocol being sent</td></tr>
<tr><td><div id="setting-sql-optimizer-uniqueness-checks-for-gen-random-uuid-enabled" class="anchored"><code>sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability</td></tr>
<tr><td><div id="setting-sql-pgwire-multiple-active-portals-enabled" class="anchored"><code>sql.pgwire.multiple_active_portals.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if true, portals with read-only SELECT query without sub/post queries can be executed in interleaving manner, but with local execution plan</td></tr>
<tr><td><div id="setting-sql-schema-telemetry-recurrence" class="anchored"><code>sql.schema.telemetry.recurrence</code></div></td><td>string</td><td><code>@weekly</code></td><td>cron-tab recurrence for SQL schema telemetry job</td></tr>
<tr><td><div id="setting-sql-show-ranges-deprecated-behavior-enabled" class="anchored"><code>sql.show_ranges_deprecated_behavior.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, SHOW RANGES and crdb_internal.ranges{_no_leases} behave with deprecated pre-v23.1 semantics. NB: the new SHOW RANGES interface has richer WITH options than pre-v23.1 SHOW RANGES.</td></tr>
<tr><td><div id="setting-sql-spatial-experimental-box2d-comparison-operators-enabled" class="anchored"><code>sql.spatial.experimental_box2d_comparison_operators.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>enables the use of certain experimental box2d comparison operators</td></tr>
Expand Down
1 change: 0 additions & 1 deletion pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ go_library(
"//pkg/sql/sqlstats/insights",
"//pkg/sql/sqlstats/persistedsqlstats",
"//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil",
"//pkg/sql/sqltelemetry",
"//pkg/sql/stats",
"//pkg/sql/stmtdiagnostics",
"//pkg/sql/syntheticprivilege",
Expand Down
8 changes: 0 additions & 8 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/settingswatcher"
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/server/tracedumper"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -103,7 +102,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilegecache"
Expand Down Expand Up @@ -1353,12 +1351,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
vmoduleSetting.SetOnChange(&cfg.Settings.SV, fn)
fn(ctx)

sql.EnableMultipleActivePortals.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) {
if sql.EnableMultipleActivePortals.Get(&cfg.Settings.SV) {
telemetry.Inc(sqltelemetry.MultipleActivePortalCounter)
}
})

return &SQLServer{
ambientCtx: cfg.BaseConfig.AmbientCtx,
stopper: cfg.stopper,
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ func runPlanInsidePlan(
// Make a copy of the EvalContext so it can be safely modified.
evalCtx := params.p.ExtendedEvalContextCopy()
plannerCopy := *params.p
// If we reach this part when re-executing a pausable portal, we won't want to
// resume the flow bound to it. The inner-plan should have its own lifecycle
// for its flow.
plannerCopy.pausablePortal = nil
distributePlan := getPlanDistribution(
ctx, plannerCopy.Descriptors().HasUncommittedTypes(),
plannerCopy.SessionData().DistSQLMode, plan.main,
Expand Down
93 changes: 79 additions & 14 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,14 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
txnEvType = txnRollback
}

// Close all portals, otherwise there will be leftover bytes.
ex.extraTxnState.prepStmtsNamespace.closeAllPortals(
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)

if closeType == normalClose {
// We'll cleanup the SQL txn by creating a non-retriable (commit:true) event.
// This event is guaranteed to be accepted in every state.
Expand Down Expand Up @@ -1760,6 +1768,26 @@ func (ns *prepStmtNamespace) touchLRUEntry(name string) {
ns.addLRUEntry(name, 0)
}

func (ns *prepStmtNamespace) closeAllPortals(
ctx context.Context, prepStmtsNamespaceMemAcc *mon.BoundAccount,
) {
for name, p := range ns.portals {
p.close(ctx, prepStmtsNamespaceMemAcc, name)
delete(ns.portals, name)
}
}

func (ns *prepStmtNamespace) closeAllPausablePortals(
ctx context.Context, prepStmtsNamespaceMemAcc *mon.BoundAccount,
) {
for name, p := range ns.portals {
if p.pauseInfo != nil {
p.close(ctx, prepStmtsNamespaceMemAcc, name)
delete(ns.portals, name)
}
}
}

// MigratablePreparedStatements returns a mapping of all prepared statements.
func (ns *prepStmtNamespace) MigratablePreparedStatements() []sessiondatapb.MigratableSession_PreparedStatement {
ret := make([]sessiondatapb.MigratableSession_PreparedStatement, 0, len(ns.prepStmts))
Expand Down Expand Up @@ -1836,10 +1864,7 @@ func (ns *prepStmtNamespace) resetTo(
for name := range ns.prepStmtsLRU {
delete(ns.prepStmtsLRU, name)
}
for name, p := range ns.portals {
p.close(ctx, prepStmtsNamespaceMemAcc, name)
delete(ns.portals, name)
}
ns.closeAllPortals(ctx, prepStmtsNamespaceMemAcc)

for name, ps := range to.prepStmts {
ps.incRef(ctx)
Expand Down Expand Up @@ -1880,10 +1905,9 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) {
}

// Close all portals.
for name, p := range ex.extraTxnState.prepStmtsNamespace.portals {
p.close(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
delete(ex.extraTxnState.prepStmtsNamespace.portals, name)
}
ex.extraTxnState.prepStmtsNamespace.closeAllPortals(
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)

// Close all cursors.
if err := ex.extraTxnState.sqlCursors.closeAll(false /* errorOnWithHold */); err != nil {
Expand All @@ -1894,10 +1918,9 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) {

switch ev.eventType {
case txnCommit, txnRollback:
for name, p := range ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals {
p.close(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
delete(ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals, name)
}
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
ex.extraTxnState.savepoints.clear()
ex.onTxnFinish(ctx, ev)
case txnRestart:
Expand Down Expand Up @@ -2044,7 +2067,6 @@ func (ex *connExecutor) run(
return err
}
}

}

// errDrainingComplete is returned by execCmd when the connExecutor previously got
Expand Down Expand Up @@ -2116,7 +2138,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
(tcmd.LastInBatchBeforeShowCommitTimestamp ||
tcmd.LastInBatch || !implicitTxnForBatch)
ev, payload, err = ex.execStmt(
ctx, tcmd.Statement, nil /* prepared */, nil /* pinfo */, stmtRes, canAutoCommit,
ctx, tcmd.Statement, nil /* portal */, nil /* pinfo */, stmtRes, canAutoCommit,
)

return err
Expand Down Expand Up @@ -2173,6 +2195,12 @@ func (ex *connExecutor) execCmd() (retErr error) {
Values: portal.Qargs,
}

// If this is the first-time execution of a portal without a limit set,
// it means all rows will be exhausted, so no need to pause this portal.
if tcmd.Limit == 0 && portal.pauseInfo != nil && portal.pauseInfo.curRes == nil {
portal.pauseInfo = nil
}

stmtRes := ex.clientComm.CreateStatementResult(
portal.Stmt.AST,
// The client is using the extended protocol, so no row description is
Expand All @@ -2186,6 +2214,9 @@ func (ex *connExecutor) execCmd() (retErr error) {
ex.implicitTxn(),
portal.portalPausablity,
)
if portal.pauseInfo != nil {
portal.pauseInfo.curRes = stmtRes
}
res = stmtRes

// In the extended protocol, autocommit is not always allowed. The postgres
Expand All @@ -2204,6 +2235,8 @@ func (ex *connExecutor) execCmd() (retErr error) {
// - ex.statsCollector merely contains a copy of the times, that
// was created when the statement started executing (via the
// reset() method).
// TODO(sql-sessions): fix the phase time for pausable portals.
// https://github.com/cockroachdb/cockroach/issues/99410
ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.SessionQueryServiced, timeutil.Now())
if err != nil {
return err
Expand Down Expand Up @@ -2313,9 +2346,26 @@ func (ex *connExecutor) execCmd() (retErr error) {

var advInfo advanceInfo

// We close all pausable portals when we encounter err payload, otherwise
// there will be leftover bytes.
shouldClosePausablePortals := func(payload fsm.EventPayload) bool {
switch payload.(type) {
case eventNonRetriableErrPayload, eventRetriableErrPayload:
return true
default:
return false
}
}

// If an event was generated, feed it to the state machine.
if ev != nil {
var err error
if shouldClosePausablePortals(payload) {
// We need this as otherwise, there'll be leftover bytes when
// txnState.finishSQLTxn() is being called, as the underlying resources of
// pausable portals hasn't been cleared yet.
ex.extraTxnState.prepStmtsNamespace.closeAllPausablePortals(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc)
}
advInfo, err = ex.txnStateTransitionsApplyWrapper(ev, payload, res, pos)
if err != nil {
return err
Expand Down Expand Up @@ -2366,6 +2416,16 @@ func (ex *connExecutor) execCmd() (retErr error) {
res.SetError(pe.errorCause())
}
}
// For a pausable portal, we don't log the affected rows until we close the
// portal. However, we update the result for each execution. Thus, we need
// to accumulate the number of affected rows before closing the result.
if tcmd, ok := cmd.(*ExecPortal); ok {
if portal, ok := ex.extraTxnState.prepStmtsNamespace.portals[tcmd.Name]; ok {
if portal.pauseInfo != nil {
portal.pauseInfo.dispatchToExecutionEngine.rowsAffected += res.(RestrictedCommandResult).RowsAffected()
}
}
}
res.Close(ctx, stateToTxnStatusIndicator(ex.machine.CurState()))
} else {
res.Discard()
Expand Down Expand Up @@ -3598,6 +3658,11 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
fallthrough
case txnRollback:
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent)
// Since we're doing a complete rollback, there's no need to keep the
// prepared stmts for a txn rewind.
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
ex.Ctx(), &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
case txnRestart:
// In addition to resetting the extraTxnState, the restart event may
// also need to reset the sqlliveness.Session.
Expand Down
Loading