diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index c0063d1d35b9..e22e270f7724 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -249,6 +249,7 @@ go_library( "//pkg/sql/sqlstats/insights", "//pkg/sql/sqlstats/persistedsqlstats", "//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil", + "//pkg/sql/sqltelemetry", "//pkg/sql/stats", "//pkg/sql/stmtdiagnostics", "//pkg/sql/syntheticprivilege", diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 717c1c00f271..f950d7de9f61 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -61,6 +61,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/settingswatcher" "github.com/cockroachdb/cockroach/pkg/server/status" "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/server/tracedumper" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -103,6 +104,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilegecache" @@ -1342,6 +1344,12 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { vmoduleSetting.SetOnChange(&cfg.Settings.SV, fn) fn(ctx) + sql.EnableMultipleActivePortals.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) { + if sql.EnableMultipleActivePortals.Get(&cfg.Settings.SV) { + telemetry.Inc(sqltelemetry.MultipleActivePortalCounter) + } + }) + return &SQLServer{ ambientCtx: cfg.BaseConfig.AmbientCtx, stopper: cfg.stopper, diff --git a/pkg/sql/backfill/mvcc_index_merger.go b/pkg/sql/backfill/mvcc_index_merger.go index 993214808b42..7ca84e51171d 100644 --- a/pkg/sql/backfill/mvcc_index_merger.go +++ b/pkg/sql/backfill/mvcc_index_merger.go @@ -495,6 +495,11 @@ func (ibm *IndexBackfillMerger) shrinkBoundAccount(ctx context.Context, shrinkBy ibm.muBoundAccount.boundAccount.Shrink(ctx, shrinkBy) } +// Resume is part of the execinfra.Processor interface. +func (ibm *IndexBackfillMerger) Resume(output execinfra.RowReceiver) { + panic("not implemented") +} + // NewIndexBackfillMerger creates a new IndexBackfillMerger. func NewIndexBackfillMerger( ctx context.Context, flowCtx *execinfra.FlowCtx, spec execinfrapb.IndexBackfillMergerSpec, diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 7100bb9b1b43..687cebf6753e 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -288,16 +288,33 @@ func (f *vectorizedFlow) Setup( return ctx, opChains, nil } +// Resume is part of the Flow interface. +func (f *vectorizedFlow) Resume(recv execinfra.RowReceiver) { + if f.batchFlowCoordinator != nil { + recv.Push( + nil, /* row */ + &execinfrapb.ProducerMetadata{ + Err: errors.AssertionFailedf( + "batchFlowCoordinator should be nil for vectorizedFlow", + )}) + recv.ProducerDone() + return + } + f.FlowBase.Resume(recv) +} + // Run is part of the Flow interface. -func (f *vectorizedFlow) Run(ctx context.Context) { +func (f *vectorizedFlow) Run(ctx context.Context, noWait bool) { if f.batchFlowCoordinator == nil { // If we didn't create a BatchFlowCoordinator, then we have a processor // as the root, so we run this flow with the default implementation. - f.FlowBase.Run(ctx) + f.FlowBase.Run(ctx, noWait) return } - defer f.Wait() + if !noWait { + defer f.Wait() + } if err := f.StartInternal(ctx, nil /* processors */, nil /* outputs */); err != nil { f.GetRowSyncFlowConsumer().Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err}) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 4f349ece15b7..09f4094e9cd5 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1734,42 +1734,48 @@ func (ex *connExecutor) execWithDistSQLEngine( } defer recv.Release() - evalCtx := planner.ExtendedEvalContext() - planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner, - planner.txn, distribute) - planCtx.stmtType = recv.stmtType - // Skip the diagram generation since on this "main" query path we can get it - // via the statement bundle. - planCtx.skipDistSQLDiagramGeneration = true - if ex.server.cfg.TestingKnobs.TestingSaveFlows != nil { - planCtx.saveFlows = ex.server.cfg.TestingKnobs.TestingSaveFlows(planner.stmt.SQL) - } else if planner.instrumentation.ShouldSaveFlows() { - planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery) - } - planCtx.associateNodeWithComponents = planner.instrumentation.getAssociateNodeWithComponentsFn() - planCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats() - - var evalCtxFactory func(usedConcurrently bool) *extendedEvalContext - if len(planner.curPlan.subqueryPlans) != 0 || - len(planner.curPlan.cascades) != 0 || - len(planner.curPlan.checkPlans) != 0 { - var serialEvalCtx extendedEvalContext - ex.initEvalCtx(ctx, &serialEvalCtx, planner) - evalCtxFactory = func(usedConcurrently bool) *extendedEvalContext { - // Reuse the same object if this factory is not used concurrently. - factoryEvalCtx := &serialEvalCtx - if usedConcurrently { - factoryEvalCtx = &extendedEvalContext{} - ex.initEvalCtx(ctx, factoryEvalCtx, planner) + var err error + + if planner.hasFlowForPausablePortal() { + err = planner.resumeFlowForPausablePortal(recv) + } else { + evalCtx := planner.ExtendedEvalContext() + planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner, + planner.txn, distribute) + planCtx.stmtType = recv.stmtType + // Skip the diagram generation since on this "main" query path we can get it + // via the statement bundle. + planCtx.skipDistSQLDiagramGeneration = true + if ex.server.cfg.TestingKnobs.TestingSaveFlows != nil { + planCtx.saveFlows = ex.server.cfg.TestingKnobs.TestingSaveFlows(planner.stmt.SQL) + } else if planner.instrumentation.ShouldSaveFlows() { + planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery) + } + planCtx.associateNodeWithComponents = planner.instrumentation.getAssociateNodeWithComponentsFn() + planCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats() + + var evalCtxFactory func(usedConcurrently bool) *extendedEvalContext + if len(planner.curPlan.subqueryPlans) != 0 || + len(planner.curPlan.cascades) != 0 || + len(planner.curPlan.checkPlans) != 0 { + var serialEvalCtx extendedEvalContext + ex.initEvalCtx(ctx, &serialEvalCtx, planner) + evalCtxFactory = func(usedConcurrently bool) *extendedEvalContext { + // Reuse the same object if this factory is not used concurrently. + factoryEvalCtx := &serialEvalCtx + if usedConcurrently { + factoryEvalCtx = &extendedEvalContext{} + ex.initEvalCtx(ctx, factoryEvalCtx, planner) + } + ex.resetEvalCtx(factoryEvalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp) + factoryEvalCtx.Placeholders = &planner.semaCtx.Placeholders + factoryEvalCtx.Annotations = &planner.semaCtx.Annotations + factoryEvalCtx.SessionID = planner.ExtendedEvalContext().SessionID + return factoryEvalCtx } - ex.resetEvalCtx(factoryEvalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp) - factoryEvalCtx.Placeholders = &planner.semaCtx.Placeholders - factoryEvalCtx.Annotations = &planner.semaCtx.Annotations - factoryEvalCtx.SessionID = planner.ExtendedEvalContext().SessionID - return factoryEvalCtx } + err = ex.server.cfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, planner, recv, evalCtxFactory) } - err := ex.server.cfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, planner, recv, evalCtxFactory) return recv.stats, err } diff --git a/pkg/sql/distsql/vectorized_panic_propagation_test.go b/pkg/sql/distsql/vectorized_panic_propagation_test.go index 72b0bac25508..0d5c58c796f6 100644 --- a/pkg/sql/distsql/vectorized_panic_propagation_test.go +++ b/pkg/sql/distsql/vectorized_panic_propagation_test.go @@ -86,5 +86,5 @@ func TestNonVectorizedPanicDoesntHangServer(t *testing.T) { }), ) - require.Panics(t, func() { flow.Run(ctx) }) + require.Panics(t, func() { flow.Run(ctx, false /* noWait */) }) } diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 3dab75fe8129..656b9056875c 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -883,6 +883,15 @@ func (p *PlanningCtx) IsLocal() bool { return p.isLocal } +// getPortalPauseInfo returns the portal pause info if the current planner is +// for a pausable portal. Otherwise, returns nil. +func (p *PlanningCtx) getPortalPauseInfo() *portalPauseInfo { + if p.planner != nil && p.planner.pausablePortal != nil && p.planner.pausablePortal.pauseInfo != nil { + return p.planner.pausablePortal.pauseInfo + } + return nil +} + // getDefaultSaveFlowsFunc returns the default function used to save physical // plans and their diagrams. The returned function is **not** concurrency-safe. func (p *PlanningCtx) getDefaultSaveFlowsFunc( diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index f3ecd03002b4..5e590e3b8bf0 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -833,14 +833,31 @@ func (dsp *DistSQLPlanner) Run( if planCtx.planner != nil { statementSQL = planCtx.planner.stmt.StmtNoConstants } - ctx, flow, err := dsp.setupFlows( - ctx, evalCtx, planCtx, leafInputState, flows, recv, localState, statementSQL, - ) - // Make sure that the local flow is always cleaned up if it was created. + + var flow flowinfra.Flow + var err error + if i := planCtx.getPortalPauseInfo(); i != nil && i.flow != nil { + flow = i.flow + } else { + ctx, flow, err = dsp.setupFlows( + ctx, evalCtx, planCtx, leafInputState, flows, recv, localState, statementSQL, + ) + if i != nil { + i.flow = flow + i.outputTypes = plan.GetResultTypes() + } + } + if flow != nil { - defer func() { - flow.Cleanup(ctx) - }() + // Make sure that the local flow is always cleaned up if it was created. + // If the flow is not for retained portal, we clean the flow up here. + // Otherwise, we delay the clean up via portalPauseInfo.flowCleanup until + // the portal is closed. + if planCtx.getPortalPauseInfo() == nil { + defer func() { + flow.Cleanup(ctx) + }() + } } if err != nil { recv.SetError(err) @@ -863,7 +880,8 @@ func (dsp *DistSQLPlanner) Run( return } - flow.Run(ctx) + noWait := planCtx.getPortalPauseInfo() != nil + flow.Run(ctx, noWait) } // DistSQLReceiver is an execinfra.RowReceiver and execinfra.BatchReceiver that diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index eff8772830ce..7e0d7a55a580 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -733,7 +733,7 @@ var overrideAlterPrimaryRegionInSuperRegion = settings.RegisterBoolSetting( false, ).WithPublic() -var enableMultipleActivePortals = settings.RegisterBoolSetting( +var EnableMultipleActivePortals = settings.RegisterBoolSetting( settings.TenantWritable, "sql.pgwire.multiple_active_portals.enabled", "if true, portals with read-only SELECT query without sub/post queries "+ diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index b6391f0b5596..8707f57eed3f 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -46,8 +46,18 @@ type Processor interface { // and the vectorized engines). MustBeStreaming() bool - // Run is the main loop of the processor. + // Run is the main loop of the processor. It can be called only once + // throughout the processor's lifetime. Run(context.Context, RowReceiver) + + // Resume resumes the execution of the processor with the new receiver. It + // can be called many times but after Run() has already been called. + // + // Currently only used by the pausable portals. + // + // NB: this method doesn't take the context as parameter because the context + // was already captured on Run(). + Resume(output RowReceiver) } // DoesNotUseTxn is an interface implemented by some processors to mark that @@ -727,6 +737,14 @@ func (pb *ProcessorBaseNoHelper) Run(ctx context.Context, output RowReceiver) { Run(pb.ctx, pb.self, output) } +// Resume is part of the Processor interface. +func (pb *ProcessorBaseNoHelper) Resume(output RowReceiver) { + if output == nil { + panic("processor output is not provided for emitting rows") + } + Run(pb.ctx, pb.self, output) +} + // ProcStateOpts contains fields used by the ProcessorBase's family of functions // that deal with draining and trailing metadata: the ProcessorBase implements // generic useful functionality that needs to call back into the Processor. diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index 6ab652ab2525..6a14ccda81d0 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -105,8 +105,23 @@ type Flow interface { // It is assumed that rowSyncFlowConsumer is set, so all errors encountered // when running this flow are sent to it. // + // noWait is set true when the flow is bound to a pausable portal. With it set, + // the function returns without waiting the all goroutines to finish. For a + // pausable portal we will persist this flow and reuse it when re-executing + // the portal. The flow will be cleaned when the portal is closed, rather than + // when each portal execution finishes. + // // The caller needs to call f.Cleanup(). - Run(context.Context) + Run(ctx context.Context, noWait bool) + + // Resume continues running the Flow after it has been paused with the new + // output receiver. The Flow is expected to have exactly one processor. + // It is called when resuming a paused portal. + // The lifecycle of a flow for a pausable portal is: + // - flow.Run(ctx, true /* noWait */) (only once) + // - flow.Resume() (for all re-executions of the portal) + // - flow.Cleanup() (only once) + Resume(recv execinfra.RowReceiver) // Wait waits for all the goroutines for this flow to exit. If the context gets // canceled before all goroutines exit, it calls f.cancel(). @@ -169,6 +184,9 @@ type Flow interface { type FlowBase struct { execinfra.FlowCtx + // resumeCtx is only captured for using inside of Flow.Resume() implementations. + resumeCtx context.Context + flowRegistry *FlowRegistry // processors contains a subset of the processors in the flow - the ones that @@ -488,9 +506,29 @@ func (f *FlowBase) Start(ctx context.Context) error { return f.StartInternal(ctx, f.processors, f.outputs) } +// Resume is part of the Flow interface. +func (f *FlowBase) Resume(recv execinfra.RowReceiver) { + if len(f.processors) != 1 || len(f.outputs) != 1 { + recv.Push( + nil, /* row */ + &execinfrapb.ProducerMetadata{ + Err: errors.AssertionFailedf( + "length of both the processor and the output must be 1", + )}) + recv.ProducerDone() + return + } + + f.outputs[0] = recv + log.VEventf(f.resumeCtx, 1, "resuming %T in the flow's goroutine", f.processors[0]) + f.processors[0].Resume(recv) +} + // Run is part of the Flow interface. -func (f *FlowBase) Run(ctx context.Context) { - defer f.Wait() +func (f *FlowBase) Run(ctx context.Context, noWait bool) { + if !noWait { + defer f.Wait() + } if len(f.processors) == 0 { f.rowSyncFlowConsumer.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: errors.AssertionFailedf("no processors in flow")}) @@ -509,6 +547,7 @@ func (f *FlowBase) Run(ctx context.Context) { f.rowSyncFlowConsumer.ProducerDone() return } + f.resumeCtx = ctx log.VEventf(ctx, 1, "running %T in the flow's goroutine", headProc) headProc.Run(ctx, headOutput) } diff --git a/pkg/sql/flowinfra/server_test.go b/pkg/sql/flowinfra/server_test.go index c5e6bed05615..8a2aad3106b7 100644 --- a/pkg/sql/flowinfra/server_test.go +++ b/pkg/sql/flowinfra/server_test.go @@ -173,7 +173,7 @@ func runLocalFlow( if err != nil { return nil, err } - flow.Run(flowCtx) + flow.Run(flowCtx, false /* noWait */) flow.Cleanup(flowCtx) if !rowBuf.ProducerClosed() { @@ -210,7 +210,7 @@ func runLocalFlowTenant( if err != nil { return nil, err } - flow.Run(flowCtx) + flow.Run(flowCtx, false /* noWait */) flow.Cleanup(flowCtx) if !rowBuf.ProducerClosed() { diff --git a/pkg/sql/importer/exportcsv.go b/pkg/sql/importer/exportcsv.go index 2fd31bb16587..2e0902a6130f 100644 --- a/pkg/sql/importer/exportcsv.go +++ b/pkg/sql/importer/exportcsv.go @@ -310,6 +310,11 @@ func (sp *csvWriter) Run(ctx context.Context, output execinfra.RowReceiver) { ctx, output, err, func(context.Context, execinfra.RowReceiver) {} /* pushTrailingMeta */, sp.input) } +// Resume is part of the execinfra.Processor interface. +func (sp *csvWriter) Resume(output execinfra.RowReceiver) { + panic("not implemented") +} + func init() { rowexec.NewCSVWriterProcessor = newCSVWriterProcessor } diff --git a/pkg/sql/importer/exportparquet.go b/pkg/sql/importer/exportparquet.go index 8f77b3038b1d..6bc5572b4001 100644 --- a/pkg/sql/importer/exportparquet.go +++ b/pkg/sql/importer/exportparquet.go @@ -867,6 +867,11 @@ func (sp *parquetWriterProcessor) Run(ctx context.Context, output execinfra.RowR ctx, output, err, func(context.Context, execinfra.RowReceiver) {} /* pushTrailingMeta */, sp.input) } +// Resume is part of the execinfra.Processor interface. +func (sp *parquetWriterProcessor) Resume(output execinfra.RowReceiver) { + panic("not implemented") +} + func init() { rowexec.NewParquetWriterProcessor = newParquetWriterProcessor } diff --git a/pkg/sql/pgwire/command_result.go b/pkg/sql/pgwire/command_result.go index 521dbf098cbb..31da52ca745c 100644 --- a/pkg/sql/pgwire/command_result.go +++ b/pkg/sql/pgwire/command_result.go @@ -470,8 +470,8 @@ func (r *limitedCommandResult) AddRow(ctx context.Context, row tree.Datums) erro r.reachedLimit = true return sql.ErrPortalLimitHasBeenReached } else { - // TODO(janexing): we keep this part as for general portals, we would like - // to keep the execution logic to avoid bring too many bugs. Eventually + // TODO(janexing): we keep using the logic from before we added + // multiple-active-portals support to avoid bring too many bugs. Eventually // we should remove them and use the "return the control to connExecutor" // logic for all portals. r.seenTuples = 0 diff --git a/pkg/sql/physicalplan/aggregator_funcs_test.go b/pkg/sql/physicalplan/aggregator_funcs_test.go index 07d085be7b98..185a1c77192b 100644 --- a/pkg/sql/physicalplan/aggregator_funcs_test.go +++ b/pkg/sql/physicalplan/aggregator_funcs_test.go @@ -84,7 +84,7 @@ func runTestFlow( if err != nil { t.Fatal(err) } - flow.Run(ctx) + flow.Run(ctx, false /* noWait */) flow.Cleanup(ctx) if !rowBuf.ProducerClosed() { diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 036f8249dcda..a41eb6081a21 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -52,6 +52,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" "github.com/lib/pq/oid" @@ -201,6 +202,9 @@ type planner struct { // home region is being enforced. StmtNoConstantsWithHomeRegionEnforced string + // pausablePortal is set when the query is from a pausable portal. + pausablePortal *PreparedPortal + instrumentation instrumentationHelper // Contexts for different stages of planning and execution. @@ -260,6 +264,24 @@ type planner struct { evalCatalogBuiltins evalcatalog.Builtins } +// hasFlowForPausablePortal returns true if the planner is for re-executing a +// portal. We reuse the flow stored in p.pausablePortal.pauseInfo. +func (p *planner) hasFlowForPausablePortal() bool { + return p.pausablePortal != nil && p.pausablePortal.pauseInfo != nil && p.pausablePortal.pauseInfo.flow != nil +} + +// resumeFlowForPausablePortal is called when re-executing a portal. We reuse +// the flow with a new receiver, without re-generating the physical plan. +func (p *planner) resumeFlowForPausablePortal(recv *DistSQLReceiver) error { + if !p.hasFlowForPausablePortal() { + return errors.AssertionFailedf("no flow found for pausable portal") + } + recv.discardRows = p.instrumentation.ShouldDiscardRows() + recv.outputTypes = p.pausablePortal.pauseInfo.outputTypes + p.pausablePortal.pauseInfo.flow.Resume(recv) + return recv.commErr +} + func (evalCtx *extendedEvalContext) setSessionID(sessionID clusterunique.ID) { evalCtx.SessionID = sessionID } @@ -848,6 +870,7 @@ func (p *planner) resetPlanner( p.evalCatalogBuiltins.Init(p.execCfg.Codec, txn, p.Descriptors()) p.skipDescriptorCache = false p.typeResolutionDbID = descpb.InvalidID + p.pausablePortal = nil } // GetReplicationStreamManager returns a ReplicationStreamManager. diff --git a/pkg/sql/prepared_stmt.go b/pkg/sql/prepared_stmt.go index 5ad4bbd872c6..7a0c67cc051e 100644 --- a/pkg/sql/prepared_stmt.go +++ b/pkg/sql/prepared_stmt.go @@ -15,10 +15,12 @@ import ( "time" "unsafe" + "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase" "github.com/cockroachdb/cockroach/pkg/sql/querycache" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" ) @@ -155,6 +157,9 @@ type PreparedPortal struct { // a portal. // See comments for PortalPausablity for more details. portalPausablity PortalPausablity + + // pauseInfo is the saved info needed for a pausable portal. + pauseInfo *portalPauseInfo } // makePreparedPortal creates a new PreparedPortal. @@ -173,10 +178,11 @@ func (ex *connExecutor) makePreparedPortal( Qargs: qargs, OutFormats: outFormats, } - // TODO(janexing): we added this line to avoid the unused lint error. - // Will remove it once the whole functionality of multple active portals - // is merged. - _ = enableMultipleActivePortals.Get(&ex.server.cfg.Settings.SV) + + if EnableMultipleActivePortals.Get(&ex.server.cfg.Settings.SV) { + portal.pauseInfo = &portalPauseInfo{} + portal.portalPausablity = PausablePortal + } return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name) } @@ -204,3 +210,17 @@ func (p *PreparedPortal) close( func (p *PreparedPortal) size(portalName string) int64 { return int64(uintptr(len(portalName)) + unsafe.Sizeof(p)) } + +// portalPauseInfo stores info that enables the pause of a portal. After pausing +// the portal, execute any other statement, and come back to re-execute it or +// close it. +type portalPauseInfo struct { + // outputTypes are the types of the result columns produced by the physical plan. + // We need this as when re-executing the portal, we are reusing the flow + // with the new receiver, but not re-generating the physical plan. + outputTypes []*types.T + // We need to store the flow for a portal so that when re-executing it, we + // continue from the previous execution. It lives along with the portal, and + // will be cleaned-up when the portal is closed. + flow flowinfra.Flow +} diff --git a/pkg/sql/rowexec/backfiller.go b/pkg/sql/rowexec/backfiller.go index b28efd96fc00..f9536b772807 100644 --- a/pkg/sql/rowexec/backfiller.go +++ b/pkg/sql/rowexec/backfiller.go @@ -276,3 +276,8 @@ func SetResumeSpansInJob( details.ResumeSpanList[mutationIdx].ResumeSpans = spans return job.WithTxn(txn).SetDetails(ctx, details) } + +// Resume is part of the execinfra.Processor interface. +func (b *backfiller) Resume(output execinfra.RowReceiver) { + panic("not implemented") +} diff --git a/pkg/sql/rowexec/indexbackfiller.go b/pkg/sql/rowexec/indexbackfiller.go index 7a705cc5f698..0a9cf97e6c95 100644 --- a/pkg/sql/rowexec/indexbackfiller.go +++ b/pkg/sql/rowexec/indexbackfiller.go @@ -438,3 +438,8 @@ func (ib *indexBackfiller) buildIndexEntryBatch( return key, entries, memUsedBuildingBatch, nil } + +// Resume is part of the execinfra.Processor interface. +func (ib *indexBackfiller) Resume(output execinfra.RowReceiver) { + panic("not implemented") +} diff --git a/pkg/sql/sqltelemetry/pgwire.go b/pkg/sql/sqltelemetry/pgwire.go index 8715134a85ea..18a7ef68e867 100644 --- a/pkg/sql/sqltelemetry/pgwire.go +++ b/pkg/sql/sqltelemetry/pgwire.go @@ -68,3 +68,7 @@ var CloseRequestCounter = telemetry.GetCounterOnce("pgwire.command.close") // FlushRequestCounter is to be incremented every time a flush request // is made. var FlushRequestCounter = telemetry.GetCounterOnce("pgwire.command.flush") + +// MultipleActivePortalCounter is to be incremented every time the cluster setting +// sql.pgwire.multiple_active_portals.enabled is set true. +var MultipleActivePortalCounter = telemetry.GetCounterOnce("pgwire.multiple_active_portals")