From 80ef781b89630a2b96a94d1c0a9fccb0a1221227 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Tue, 21 Mar 2023 14:31:17 -0700 Subject: [PATCH 1/3] sql: add telemetry `MultipleActivePortalCounter` This commit added a telemetry counter `MultipleActivePortalCounter` that would be incremented each time the cluster setting `sql.pgwire.multiple_active_portals.enabled` is set to true Release note: None --- pkg/server/BUILD.bazel | 1 + pkg/server/server_sql.go | 8 ++++++++ pkg/sql/exec_util.go | 2 +- pkg/sql/pgwire/command_result.go | 4 ++-- pkg/sql/sqltelemetry/pgwire.go | 4 ++++ 5 files changed, 16 insertions(+), 3 deletions(-) diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index c0063d1d35b9..e22e270f7724 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -249,6 +249,7 @@ go_library( "//pkg/sql/sqlstats/insights", "//pkg/sql/sqlstats/persistedsqlstats", "//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil", + "//pkg/sql/sqltelemetry", "//pkg/sql/stats", "//pkg/sql/stmtdiagnostics", "//pkg/sql/syntheticprivilege", diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 717c1c00f271..f950d7de9f61 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -61,6 +61,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/settingswatcher" "github.com/cockroachdb/cockroach/pkg/server/status" "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/server/tracedumper" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -103,6 +104,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilegecache" @@ -1342,6 +1344,12 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { vmoduleSetting.SetOnChange(&cfg.Settings.SV, fn) fn(ctx) + sql.EnableMultipleActivePortals.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) { + if sql.EnableMultipleActivePortals.Get(&cfg.Settings.SV) { + telemetry.Inc(sqltelemetry.MultipleActivePortalCounter) + } + }) + return &SQLServer{ ambientCtx: cfg.BaseConfig.AmbientCtx, stopper: cfg.stopper, diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index eff8772830ce..7e0d7a55a580 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -733,7 +733,7 @@ var overrideAlterPrimaryRegionInSuperRegion = settings.RegisterBoolSetting( false, ).WithPublic() -var enableMultipleActivePortals = settings.RegisterBoolSetting( +var EnableMultipleActivePortals = settings.RegisterBoolSetting( settings.TenantWritable, "sql.pgwire.multiple_active_portals.enabled", "if true, portals with read-only SELECT query without sub/post queries "+ diff --git a/pkg/sql/pgwire/command_result.go b/pkg/sql/pgwire/command_result.go index 521dbf098cbb..31da52ca745c 100644 --- a/pkg/sql/pgwire/command_result.go +++ b/pkg/sql/pgwire/command_result.go @@ -470,8 +470,8 @@ func (r *limitedCommandResult) AddRow(ctx context.Context, row tree.Datums) erro r.reachedLimit = true return sql.ErrPortalLimitHasBeenReached } else { - // TODO(janexing): we keep this part as for general portals, we would like - // to keep the execution logic to avoid bring too many bugs. Eventually + // TODO(janexing): we keep using the logic from before we added + // multiple-active-portals support to avoid bring too many bugs. Eventually // we should remove them and use the "return the control to connExecutor" // logic for all portals. r.seenTuples = 0 diff --git a/pkg/sql/sqltelemetry/pgwire.go b/pkg/sql/sqltelemetry/pgwire.go index 8715134a85ea..18a7ef68e867 100644 --- a/pkg/sql/sqltelemetry/pgwire.go +++ b/pkg/sql/sqltelemetry/pgwire.go @@ -68,3 +68,7 @@ var CloseRequestCounter = telemetry.GetCounterOnce("pgwire.command.close") // FlushRequestCounter is to be incremented every time a flush request // is made. var FlushRequestCounter = telemetry.GetCounterOnce("pgwire.command.flush") + +// MultipleActivePortalCounter is to be incremented every time the cluster setting +// sql.pgwire.multiple_active_portals.enabled is set true. +var MultipleActivePortalCounter = telemetry.GetCounterOnce("pgwire.multiple_active_portals") From 2a25067152390bad96dad5b20a1aca6077aa1038 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Mon, 20 Mar 2023 10:21:59 -0700 Subject: [PATCH 2/3] sql: add Resume method for flowinfra.Flow and execinfra.Processor For pausable portals, each execution needs to resume the processor with new output receiver. We don't need to restart the processors, and this `Resume()` step can be called many times after `Run()` is called. Release note: None --- pkg/sql/backfill/mvcc_index_merger.go | 5 +++ pkg/sql/colflow/vectorized_flow.go | 23 ++++++++-- .../vectorized_panic_propagation_test.go | 2 +- pkg/sql/distsql_running.go | 2 +- pkg/sql/execinfra/processorsbase.go | 20 ++++++++- pkg/sql/flowinfra/flow.go | 45 +++++++++++++++++-- pkg/sql/flowinfra/server_test.go | 4 +- pkg/sql/importer/exportcsv.go | 5 +++ pkg/sql/importer/exportparquet.go | 5 +++ pkg/sql/physicalplan/aggregator_funcs_test.go | 2 +- pkg/sql/rowexec/backfiller.go | 5 +++ pkg/sql/rowexec/indexbackfiller.go | 5 +++ 12 files changed, 111 insertions(+), 12 deletions(-) diff --git a/pkg/sql/backfill/mvcc_index_merger.go b/pkg/sql/backfill/mvcc_index_merger.go index 993214808b42..7ca84e51171d 100644 --- a/pkg/sql/backfill/mvcc_index_merger.go +++ b/pkg/sql/backfill/mvcc_index_merger.go @@ -495,6 +495,11 @@ func (ibm *IndexBackfillMerger) shrinkBoundAccount(ctx context.Context, shrinkBy ibm.muBoundAccount.boundAccount.Shrink(ctx, shrinkBy) } +// Resume is part of the execinfra.Processor interface. +func (ibm *IndexBackfillMerger) Resume(output execinfra.RowReceiver) { + panic("not implemented") +} + // NewIndexBackfillMerger creates a new IndexBackfillMerger. func NewIndexBackfillMerger( ctx context.Context, flowCtx *execinfra.FlowCtx, spec execinfrapb.IndexBackfillMergerSpec, diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 7100bb9b1b43..687cebf6753e 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -288,16 +288,33 @@ func (f *vectorizedFlow) Setup( return ctx, opChains, nil } +// Resume is part of the Flow interface. +func (f *vectorizedFlow) Resume(recv execinfra.RowReceiver) { + if f.batchFlowCoordinator != nil { + recv.Push( + nil, /* row */ + &execinfrapb.ProducerMetadata{ + Err: errors.AssertionFailedf( + "batchFlowCoordinator should be nil for vectorizedFlow", + )}) + recv.ProducerDone() + return + } + f.FlowBase.Resume(recv) +} + // Run is part of the Flow interface. -func (f *vectorizedFlow) Run(ctx context.Context) { +func (f *vectorizedFlow) Run(ctx context.Context, noWait bool) { if f.batchFlowCoordinator == nil { // If we didn't create a BatchFlowCoordinator, then we have a processor // as the root, so we run this flow with the default implementation. - f.FlowBase.Run(ctx) + f.FlowBase.Run(ctx, noWait) return } - defer f.Wait() + if !noWait { + defer f.Wait() + } if err := f.StartInternal(ctx, nil /* processors */, nil /* outputs */); err != nil { f.GetRowSyncFlowConsumer().Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err}) diff --git a/pkg/sql/distsql/vectorized_panic_propagation_test.go b/pkg/sql/distsql/vectorized_panic_propagation_test.go index 72b0bac25508..0d5c58c796f6 100644 --- a/pkg/sql/distsql/vectorized_panic_propagation_test.go +++ b/pkg/sql/distsql/vectorized_panic_propagation_test.go @@ -86,5 +86,5 @@ func TestNonVectorizedPanicDoesntHangServer(t *testing.T) { }), ) - require.Panics(t, func() { flow.Run(ctx) }) + require.Panics(t, func() { flow.Run(ctx, false /* noWait */) }) } diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index f3ecd03002b4..a92e9e338a26 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -863,7 +863,7 @@ func (dsp *DistSQLPlanner) Run( return } - flow.Run(ctx) + flow.Run(ctx, false /* noWait */) } // DistSQLReceiver is an execinfra.RowReceiver and execinfra.BatchReceiver that diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index b6391f0b5596..8707f57eed3f 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -46,8 +46,18 @@ type Processor interface { // and the vectorized engines). MustBeStreaming() bool - // Run is the main loop of the processor. + // Run is the main loop of the processor. It can be called only once + // throughout the processor's lifetime. Run(context.Context, RowReceiver) + + // Resume resumes the execution of the processor with the new receiver. It + // can be called many times but after Run() has already been called. + // + // Currently only used by the pausable portals. + // + // NB: this method doesn't take the context as parameter because the context + // was already captured on Run(). + Resume(output RowReceiver) } // DoesNotUseTxn is an interface implemented by some processors to mark that @@ -727,6 +737,14 @@ func (pb *ProcessorBaseNoHelper) Run(ctx context.Context, output RowReceiver) { Run(pb.ctx, pb.self, output) } +// Resume is part of the Processor interface. +func (pb *ProcessorBaseNoHelper) Resume(output RowReceiver) { + if output == nil { + panic("processor output is not provided for emitting rows") + } + Run(pb.ctx, pb.self, output) +} + // ProcStateOpts contains fields used by the ProcessorBase's family of functions // that deal with draining and trailing metadata: the ProcessorBase implements // generic useful functionality that needs to call back into the Processor. diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index 6ab652ab2525..6a14ccda81d0 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -105,8 +105,23 @@ type Flow interface { // It is assumed that rowSyncFlowConsumer is set, so all errors encountered // when running this flow are sent to it. // + // noWait is set true when the flow is bound to a pausable portal. With it set, + // the function returns without waiting the all goroutines to finish. For a + // pausable portal we will persist this flow and reuse it when re-executing + // the portal. The flow will be cleaned when the portal is closed, rather than + // when each portal execution finishes. + // // The caller needs to call f.Cleanup(). - Run(context.Context) + Run(ctx context.Context, noWait bool) + + // Resume continues running the Flow after it has been paused with the new + // output receiver. The Flow is expected to have exactly one processor. + // It is called when resuming a paused portal. + // The lifecycle of a flow for a pausable portal is: + // - flow.Run(ctx, true /* noWait */) (only once) + // - flow.Resume() (for all re-executions of the portal) + // - flow.Cleanup() (only once) + Resume(recv execinfra.RowReceiver) // Wait waits for all the goroutines for this flow to exit. If the context gets // canceled before all goroutines exit, it calls f.cancel(). @@ -169,6 +184,9 @@ type Flow interface { type FlowBase struct { execinfra.FlowCtx + // resumeCtx is only captured for using inside of Flow.Resume() implementations. + resumeCtx context.Context + flowRegistry *FlowRegistry // processors contains a subset of the processors in the flow - the ones that @@ -488,9 +506,29 @@ func (f *FlowBase) Start(ctx context.Context) error { return f.StartInternal(ctx, f.processors, f.outputs) } +// Resume is part of the Flow interface. +func (f *FlowBase) Resume(recv execinfra.RowReceiver) { + if len(f.processors) != 1 || len(f.outputs) != 1 { + recv.Push( + nil, /* row */ + &execinfrapb.ProducerMetadata{ + Err: errors.AssertionFailedf( + "length of both the processor and the output must be 1", + )}) + recv.ProducerDone() + return + } + + f.outputs[0] = recv + log.VEventf(f.resumeCtx, 1, "resuming %T in the flow's goroutine", f.processors[0]) + f.processors[0].Resume(recv) +} + // Run is part of the Flow interface. -func (f *FlowBase) Run(ctx context.Context) { - defer f.Wait() +func (f *FlowBase) Run(ctx context.Context, noWait bool) { + if !noWait { + defer f.Wait() + } if len(f.processors) == 0 { f.rowSyncFlowConsumer.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: errors.AssertionFailedf("no processors in flow")}) @@ -509,6 +547,7 @@ func (f *FlowBase) Run(ctx context.Context) { f.rowSyncFlowConsumer.ProducerDone() return } + f.resumeCtx = ctx log.VEventf(ctx, 1, "running %T in the flow's goroutine", headProc) headProc.Run(ctx, headOutput) } diff --git a/pkg/sql/flowinfra/server_test.go b/pkg/sql/flowinfra/server_test.go index c5e6bed05615..8a2aad3106b7 100644 --- a/pkg/sql/flowinfra/server_test.go +++ b/pkg/sql/flowinfra/server_test.go @@ -173,7 +173,7 @@ func runLocalFlow( if err != nil { return nil, err } - flow.Run(flowCtx) + flow.Run(flowCtx, false /* noWait */) flow.Cleanup(flowCtx) if !rowBuf.ProducerClosed() { @@ -210,7 +210,7 @@ func runLocalFlowTenant( if err != nil { return nil, err } - flow.Run(flowCtx) + flow.Run(flowCtx, false /* noWait */) flow.Cleanup(flowCtx) if !rowBuf.ProducerClosed() { diff --git a/pkg/sql/importer/exportcsv.go b/pkg/sql/importer/exportcsv.go index 2fd31bb16587..2e0902a6130f 100644 --- a/pkg/sql/importer/exportcsv.go +++ b/pkg/sql/importer/exportcsv.go @@ -310,6 +310,11 @@ func (sp *csvWriter) Run(ctx context.Context, output execinfra.RowReceiver) { ctx, output, err, func(context.Context, execinfra.RowReceiver) {} /* pushTrailingMeta */, sp.input) } +// Resume is part of the execinfra.Processor interface. +func (sp *csvWriter) Resume(output execinfra.RowReceiver) { + panic("not implemented") +} + func init() { rowexec.NewCSVWriterProcessor = newCSVWriterProcessor } diff --git a/pkg/sql/importer/exportparquet.go b/pkg/sql/importer/exportparquet.go index 8f77b3038b1d..6bc5572b4001 100644 --- a/pkg/sql/importer/exportparquet.go +++ b/pkg/sql/importer/exportparquet.go @@ -867,6 +867,11 @@ func (sp *parquetWriterProcessor) Run(ctx context.Context, output execinfra.RowR ctx, output, err, func(context.Context, execinfra.RowReceiver) {} /* pushTrailingMeta */, sp.input) } +// Resume is part of the execinfra.Processor interface. +func (sp *parquetWriterProcessor) Resume(output execinfra.RowReceiver) { + panic("not implemented") +} + func init() { rowexec.NewParquetWriterProcessor = newParquetWriterProcessor } diff --git a/pkg/sql/physicalplan/aggregator_funcs_test.go b/pkg/sql/physicalplan/aggregator_funcs_test.go index 07d085be7b98..185a1c77192b 100644 --- a/pkg/sql/physicalplan/aggregator_funcs_test.go +++ b/pkg/sql/physicalplan/aggregator_funcs_test.go @@ -84,7 +84,7 @@ func runTestFlow( if err != nil { t.Fatal(err) } - flow.Run(ctx) + flow.Run(ctx, false /* noWait */) flow.Cleanup(ctx) if !rowBuf.ProducerClosed() { diff --git a/pkg/sql/rowexec/backfiller.go b/pkg/sql/rowexec/backfiller.go index b28efd96fc00..f9536b772807 100644 --- a/pkg/sql/rowexec/backfiller.go +++ b/pkg/sql/rowexec/backfiller.go @@ -276,3 +276,8 @@ func SetResumeSpansInJob( details.ResumeSpanList[mutationIdx].ResumeSpans = spans return job.WithTxn(txn).SetDetails(ctx, details) } + +// Resume is part of the execinfra.Processor interface. +func (b *backfiller) Resume(output execinfra.RowReceiver) { + panic("not implemented") +} diff --git a/pkg/sql/rowexec/indexbackfiller.go b/pkg/sql/rowexec/indexbackfiller.go index 7a705cc5f698..0a9cf97e6c95 100644 --- a/pkg/sql/rowexec/indexbackfiller.go +++ b/pkg/sql/rowexec/indexbackfiller.go @@ -438,3 +438,8 @@ func (ib *indexBackfiller) buildIndexEntryBatch( return key, entries, memUsedBuildingBatch, nil } + +// Resume is part of the execinfra.Processor interface. +func (ib *indexBackfiller) Resume(output execinfra.RowReceiver) { + panic("not implemented") +} From a79adbabdb7a50b38edb9773a5309abcbebea76e Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Sun, 12 Mar 2023 21:29:29 -0400 Subject: [PATCH 3/3] sql: reuse flow for pausable portal To execute portals in an interleaving manner, we need to persist the flow and queryID so that we can _continue_ fetching the result when we come back to the same portal. We now introduce `pauseInfo` field in `sql.PreparedPortal` that stores this metadata. It's set during the first-time execution of an engine, and all following executions will reuse the flow and the queryID. This also implies that these resources should not be cleaned up with the end of each execution. Implementation for the clean-up steps is included in the next commit. Also, in this commit we hang a `*PreparedPortal` to the planner, and how it is set can be seen in the next commit as well. Release note: None --- pkg/sql/conn_executor_exec.go | 72 ++++++++++++++++------------- pkg/sql/distsql_physical_planner.go | 9 ++++ pkg/sql/distsql_running.go | 34 ++++++++++---- pkg/sql/planner.go | 23 +++++++++ pkg/sql/prepared_stmt.go | 28 +++++++++-- 5 files changed, 121 insertions(+), 45 deletions(-) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 4f349ece15b7..09f4094e9cd5 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1734,42 +1734,48 @@ func (ex *connExecutor) execWithDistSQLEngine( } defer recv.Release() - evalCtx := planner.ExtendedEvalContext() - planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner, - planner.txn, distribute) - planCtx.stmtType = recv.stmtType - // Skip the diagram generation since on this "main" query path we can get it - // via the statement bundle. - planCtx.skipDistSQLDiagramGeneration = true - if ex.server.cfg.TestingKnobs.TestingSaveFlows != nil { - planCtx.saveFlows = ex.server.cfg.TestingKnobs.TestingSaveFlows(planner.stmt.SQL) - } else if planner.instrumentation.ShouldSaveFlows() { - planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery) - } - planCtx.associateNodeWithComponents = planner.instrumentation.getAssociateNodeWithComponentsFn() - planCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats() - - var evalCtxFactory func(usedConcurrently bool) *extendedEvalContext - if len(planner.curPlan.subqueryPlans) != 0 || - len(planner.curPlan.cascades) != 0 || - len(planner.curPlan.checkPlans) != 0 { - var serialEvalCtx extendedEvalContext - ex.initEvalCtx(ctx, &serialEvalCtx, planner) - evalCtxFactory = func(usedConcurrently bool) *extendedEvalContext { - // Reuse the same object if this factory is not used concurrently. - factoryEvalCtx := &serialEvalCtx - if usedConcurrently { - factoryEvalCtx = &extendedEvalContext{} - ex.initEvalCtx(ctx, factoryEvalCtx, planner) + var err error + + if planner.hasFlowForPausablePortal() { + err = planner.resumeFlowForPausablePortal(recv) + } else { + evalCtx := planner.ExtendedEvalContext() + planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner, + planner.txn, distribute) + planCtx.stmtType = recv.stmtType + // Skip the diagram generation since on this "main" query path we can get it + // via the statement bundle. + planCtx.skipDistSQLDiagramGeneration = true + if ex.server.cfg.TestingKnobs.TestingSaveFlows != nil { + planCtx.saveFlows = ex.server.cfg.TestingKnobs.TestingSaveFlows(planner.stmt.SQL) + } else if planner.instrumentation.ShouldSaveFlows() { + planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery) + } + planCtx.associateNodeWithComponents = planner.instrumentation.getAssociateNodeWithComponentsFn() + planCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats() + + var evalCtxFactory func(usedConcurrently bool) *extendedEvalContext + if len(planner.curPlan.subqueryPlans) != 0 || + len(planner.curPlan.cascades) != 0 || + len(planner.curPlan.checkPlans) != 0 { + var serialEvalCtx extendedEvalContext + ex.initEvalCtx(ctx, &serialEvalCtx, planner) + evalCtxFactory = func(usedConcurrently bool) *extendedEvalContext { + // Reuse the same object if this factory is not used concurrently. + factoryEvalCtx := &serialEvalCtx + if usedConcurrently { + factoryEvalCtx = &extendedEvalContext{} + ex.initEvalCtx(ctx, factoryEvalCtx, planner) + } + ex.resetEvalCtx(factoryEvalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp) + factoryEvalCtx.Placeholders = &planner.semaCtx.Placeholders + factoryEvalCtx.Annotations = &planner.semaCtx.Annotations + factoryEvalCtx.SessionID = planner.ExtendedEvalContext().SessionID + return factoryEvalCtx } - ex.resetEvalCtx(factoryEvalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp) - factoryEvalCtx.Placeholders = &planner.semaCtx.Placeholders - factoryEvalCtx.Annotations = &planner.semaCtx.Annotations - factoryEvalCtx.SessionID = planner.ExtendedEvalContext().SessionID - return factoryEvalCtx } + err = ex.server.cfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, planner, recv, evalCtxFactory) } - err := ex.server.cfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, planner, recv, evalCtxFactory) return recv.stats, err } diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 3dab75fe8129..656b9056875c 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -883,6 +883,15 @@ func (p *PlanningCtx) IsLocal() bool { return p.isLocal } +// getPortalPauseInfo returns the portal pause info if the current planner is +// for a pausable portal. Otherwise, returns nil. +func (p *PlanningCtx) getPortalPauseInfo() *portalPauseInfo { + if p.planner != nil && p.planner.pausablePortal != nil && p.planner.pausablePortal.pauseInfo != nil { + return p.planner.pausablePortal.pauseInfo + } + return nil +} + // getDefaultSaveFlowsFunc returns the default function used to save physical // plans and their diagrams. The returned function is **not** concurrency-safe. func (p *PlanningCtx) getDefaultSaveFlowsFunc( diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index a92e9e338a26..5e590e3b8bf0 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -833,14 +833,31 @@ func (dsp *DistSQLPlanner) Run( if planCtx.planner != nil { statementSQL = planCtx.planner.stmt.StmtNoConstants } - ctx, flow, err := dsp.setupFlows( - ctx, evalCtx, planCtx, leafInputState, flows, recv, localState, statementSQL, - ) - // Make sure that the local flow is always cleaned up if it was created. + + var flow flowinfra.Flow + var err error + if i := planCtx.getPortalPauseInfo(); i != nil && i.flow != nil { + flow = i.flow + } else { + ctx, flow, err = dsp.setupFlows( + ctx, evalCtx, planCtx, leafInputState, flows, recv, localState, statementSQL, + ) + if i != nil { + i.flow = flow + i.outputTypes = plan.GetResultTypes() + } + } + if flow != nil { - defer func() { - flow.Cleanup(ctx) - }() + // Make sure that the local flow is always cleaned up if it was created. + // If the flow is not for retained portal, we clean the flow up here. + // Otherwise, we delay the clean up via portalPauseInfo.flowCleanup until + // the portal is closed. + if planCtx.getPortalPauseInfo() == nil { + defer func() { + flow.Cleanup(ctx) + }() + } } if err != nil { recv.SetError(err) @@ -863,7 +880,8 @@ func (dsp *DistSQLPlanner) Run( return } - flow.Run(ctx, false /* noWait */) + noWait := planCtx.getPortalPauseInfo() != nil + flow.Run(ctx, noWait) } // DistSQLReceiver is an execinfra.RowReceiver and execinfra.BatchReceiver that diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 036f8249dcda..a41eb6081a21 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -52,6 +52,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" "github.com/lib/pq/oid" @@ -201,6 +202,9 @@ type planner struct { // home region is being enforced. StmtNoConstantsWithHomeRegionEnforced string + // pausablePortal is set when the query is from a pausable portal. + pausablePortal *PreparedPortal + instrumentation instrumentationHelper // Contexts for different stages of planning and execution. @@ -260,6 +264,24 @@ type planner struct { evalCatalogBuiltins evalcatalog.Builtins } +// hasFlowForPausablePortal returns true if the planner is for re-executing a +// portal. We reuse the flow stored in p.pausablePortal.pauseInfo. +func (p *planner) hasFlowForPausablePortal() bool { + return p.pausablePortal != nil && p.pausablePortal.pauseInfo != nil && p.pausablePortal.pauseInfo.flow != nil +} + +// resumeFlowForPausablePortal is called when re-executing a portal. We reuse +// the flow with a new receiver, without re-generating the physical plan. +func (p *planner) resumeFlowForPausablePortal(recv *DistSQLReceiver) error { + if !p.hasFlowForPausablePortal() { + return errors.AssertionFailedf("no flow found for pausable portal") + } + recv.discardRows = p.instrumentation.ShouldDiscardRows() + recv.outputTypes = p.pausablePortal.pauseInfo.outputTypes + p.pausablePortal.pauseInfo.flow.Resume(recv) + return recv.commErr +} + func (evalCtx *extendedEvalContext) setSessionID(sessionID clusterunique.ID) { evalCtx.SessionID = sessionID } @@ -848,6 +870,7 @@ func (p *planner) resetPlanner( p.evalCatalogBuiltins.Init(p.execCfg.Codec, txn, p.Descriptors()) p.skipDescriptorCache = false p.typeResolutionDbID = descpb.InvalidID + p.pausablePortal = nil } // GetReplicationStreamManager returns a ReplicationStreamManager. diff --git a/pkg/sql/prepared_stmt.go b/pkg/sql/prepared_stmt.go index 5ad4bbd872c6..7a0c67cc051e 100644 --- a/pkg/sql/prepared_stmt.go +++ b/pkg/sql/prepared_stmt.go @@ -15,10 +15,12 @@ import ( "time" "unsafe" + "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase" "github.com/cockroachdb/cockroach/pkg/sql/querycache" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" ) @@ -155,6 +157,9 @@ type PreparedPortal struct { // a portal. // See comments for PortalPausablity for more details. portalPausablity PortalPausablity + + // pauseInfo is the saved info needed for a pausable portal. + pauseInfo *portalPauseInfo } // makePreparedPortal creates a new PreparedPortal. @@ -173,10 +178,11 @@ func (ex *connExecutor) makePreparedPortal( Qargs: qargs, OutFormats: outFormats, } - // TODO(janexing): we added this line to avoid the unused lint error. - // Will remove it once the whole functionality of multple active portals - // is merged. - _ = enableMultipleActivePortals.Get(&ex.server.cfg.Settings.SV) + + if EnableMultipleActivePortals.Get(&ex.server.cfg.Settings.SV) { + portal.pauseInfo = &portalPauseInfo{} + portal.portalPausablity = PausablePortal + } return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name) } @@ -204,3 +210,17 @@ func (p *PreparedPortal) close( func (p *PreparedPortal) size(portalName string) int64 { return int64(uintptr(len(portalName)) + unsafe.Sizeof(p)) } + +// portalPauseInfo stores info that enables the pause of a portal. After pausing +// the portal, execute any other statement, and come back to re-execute it or +// close it. +type portalPauseInfo struct { + // outputTypes are the types of the result columns produced by the physical plan. + // We need this as when re-executing the portal, we are reusing the flow + // with the new receiver, but not re-generating the physical plan. + outputTypes []*types.T + // We need to store the flow for a portal so that when re-executing it, we + // continue from the previous execution. It lives along with the portal, and + // will be cleaned-up when the portal is closed. + flow flowinfra.Flow +}