Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: Implement a preliminary version of multiple active portals #96358

Closed

Conversation

ZhouXing19
Copy link
Collaborator

@ZhouXing19 ZhouXing19 commented Feb 1, 2023

Design doc: https://docs.google.com/document/d/1SpKTrTqc4AlGWBqBNgmyXfTweUUsrlqIaSkmaXpznA8/edit

This PR is to add limited support for multiple active portals. Now portals satisfying all following restrictions can be paused and resumed (i.e., with other queries interleaving it):

  1. Not an internal query;
  2. Read-only query;
  3. No sub-queries or post-queries.

And such a portal will only have the statement executed with a non-distributed plan.

This feature is gated by a non-public cluster setting sql.defaults.multiple_active_portals.enabled. When it's set true, all portals that satisfy the restrictions above will automatically become "pausable" when being created via the pgwire Bind stmt.

The core idea of this implementation is

  1. Add a switchToAnotherPortal status to the result-consumption state machine. When we receive an ExecPortal message for a different portal, we simply return the control to the connExecutor.
  2. Persist flow queryID span and instrumentationHelper for the portal, and reuse it when we re-execute a portal. This is to ensure we continue the fetching rather than starting all over.
  3. To enable 2, we need to delay the clean-up of resources till we close the portal. For this we introduced the stacks of cleanup functions.

Note that we kept the implementation of the original "un-pausable" portal, as we'd like to limit this new functionality only to a small set of statements. Eventually some of them should be replaced (e.g. the limitedCommandResult's lifecycle) with the new code.

Also, we don't support distributed plan yet, as it involves much more complicated changes. See Start with an entirely local plan section in the design doc. Support for this will come as a follow-up.

Epic: CRDB-17622

Release note: initial support for multiple active portals. Now with cluster setting sql.defaults.multiple_active_portals.enabled = true, portals satisfying all following restrictions can be executed in an interleaving manner: 1. Not an internal query; 2. Read-only query; 3. No sub-queries or post-queries. And such a portal will only have the statement executed with an entirely local plan.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@ZhouXing19 ZhouXing19 changed the title [DNM] experimental implementation of multiple active portal [DNM] experimental implementation of multiple active portals Feb 1, 2023
@ZhouXing19 ZhouXing19 force-pushed the multiple-active-portal-0124 branch 3 times, most recently from 2b20843 to b7e2739 Compare February 2, 2023 04:14
@ZhouXing19 ZhouXing19 force-pushed the multiple-active-portal-0124 branch 2 times, most recently from e429426 to f2d3280 Compare March 2, 2023 22:01
@ZhouXing19 ZhouXing19 force-pushed the multiple-active-portal-0124 branch 2 times, most recently from fd193bc to 3ed56a7 Compare March 13, 2023 03:30
@ZhouXing19 ZhouXing19 changed the title [DNM] experimental implementation of multiple active portals sql: Implement a preliminary version of multiple active portals Mar 13, 2023
@ZhouXing19 ZhouXing19 marked this pull request as ready for review March 13, 2023 03:54
@ZhouXing19 ZhouXing19 requested a review from a team March 13, 2023 03:54
@ZhouXing19 ZhouXing19 requested review from a team as code owners March 13, 2023 03:54
Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't finished reviewing but wanted to push the comments I have so far. It's looking pretty good and nice job on splitting it out into smaller commits!

Reviewed 2 of 2 files at r1, 9 of 9 files at r2, 6 of 9 files at r3.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rafiss and @ZhouXing19)


-- commits line 28 at r3:
(Not directly related to this commit, but) something that we should confirm / do is that in case the node is being drained, we might need to be able to clean up paused portals.

The important thing is to confirm that the paused portals will not block draining from proceeding (I think this should already be the case since in pgwire.Server.drainImpl we are canceling all sessions after some timeout, and paused portals present on this node will always have a corresponding session). This needs to be done for 23.1.

A nice-to-have is ensuring that all flows of paused portals are properly cleaned up (an example of when this might matter is if we have a query that spilled to disk - i.e. created some temp storage files on disk - without proper shutdown of the flow, those files will not be deleted). This can wait, but a TODO seems warranted (if not implemented already).


-- commits line 36 at r3:
I haven't read the code for this yet, but it'd be nice if we preserved the ability to cancel query by queryID (see connExecutor.addActiveQuery). We also need to think through whether the paused portals are shown as "active queries".


pkg/sql/conn_io.go line 663 at r2 (raw file):

		portalName string,
		implicitTxn bool,
		isForPausablePortal bool,

nit: s/isForPausablePortal/forPausablePortal/g for consistency with all other places.


pkg/sql/distsql_physical_planner.go line 884 at r3 (raw file):

}

// getPortalPauseInfo return the portal pause info if the current planner is

nit: s/return/returns/g.


pkg/sql/distsql_running.go line 1508 at r2 (raw file):

var (
	// ErrLimitedResultNotSupported is an error produced by pgwire
	// indicating an unsupported feature of row count limits was attempted.

nit: this comment is now stale, let's mention multiple active portals.


pkg/sql/distsql_running.go line 1512 at r2 (raw file):

		40195,
		"multiple active portals not supported, "+
			"please set sql.defaults.multiple_active_portals.enabled",

nit: perhaps add "to true" to make it more clear? Also, perhaps mention that the feature is currently in the preview in this message.


pkg/sql/distsql_running.go line 851 at r3 (raw file):

		// Make sure that the local flow is always cleaned up if it was created.
		// If the flow is not for retained portal, we clean the flow up here.
		// Otherwise, we delay the clean up via flowCleanup until the portal

nit: s/flowCleanup/portalPauseInfo.flowCleanup/g to be more specific since we don't have flowCleanup somewhere close to this comment.


pkg/sql/distsql_running.go line 881 at r3 (raw file):

	if planCtx.getPortalPauseInfo() != nil {
		flow.Resume(ctx, recv)

There is a lot of code in this function that we don't really need to execute when we're resuming the paused portal, so I think it might be better to lift the call to flow.Resume a little bit higher in the stack.

In particular, given that for now we don't allow any sub- nor post-queries, perhaps the best place would be to put it in connExecutor.execWithDistSQLEngine method before (or maybe after) we create new PlanningCtx. This Resume call will replace PlanAndRunAll call when we're resuming the paused portal. This will allow to by-pass unnecessary code (like physical planning and different stuff in DistSQLPlanner.Run() method), and it seems also logically sound. Thoughts?

Update: I realized that you do want to call Resume even for the very first execution with the current implementation. The only thing that you need though is for Run to not call Wait. Perhaps, we should change the signature of Flow.Run to take an additional option that would indicate whether Wait should be deferred or skipped entirely. This way we can still lift the call to Resume higher up the stack and keep the call to Run here (although the new parameter would depend on whether we have "portal pause info" or not).


pkg/sql/prepared_stmt.go line 140 at r3 (raw file):

	exhausted bool

	// pauseInfo saved info needed for the "multiple active portal" mode.

nit: s/saved info/is the saved info/g.


pkg/sql/prepared_stmt.go line 207 at r3 (raw file):

// functions are added during the first-time execution of a portal. When the
// first-time execution is finished, we mark isComplete to true.
type CleanupFuncStack struct {

nit: does this need to be exported? Ditto for NamedFunc and all the methods. I'd assume that all of the code would be inside sql package, so we don't need to export anything.


pkg/sql/prepared_stmt.go line 213 at r3 (raw file):

func initCleanupFuncStack() CleanupFuncStack {
	return CleanupFuncStack{stack: make([]NamedFunc, 0)}

nit: why do we allocate a zero-length slice here? Could we just leave unset since we're appending below?


pkg/sql/prepared_stmt.go line 242 at r3 (raw file):

}

// instrumentationHelperWrapper wrap the instrumentation helper.

nit: s/wrap/wraps/g.


pkg/sql/prepared_stmt.go line 259 at r3 (raw file):

	// an existing portal, we should use the same query id.
	queryID clusterunique.ID
	// ihWrapper stores the instrumentation helper that is should be reused for

nit: s/that is should/that should/g.


pkg/sql/prepared_stmt.go line 266 at r3 (raw file):

	// They should be called in this order:
	// flowCleanup -> execStmtCleanup -> exhaustPortal.
	// TODO(janexing): I'm not sure about which is better here. Now we hard code

Currently we define three separate stacks, or - in other words - three separate one-dimensional slices. What about the idea to have a single two-dimensional struct? Imagine something like:

type CleanupFuncStacks struct {
  stacks [][]NamedFunc
}

// newMethod should be called whenever we enter a new method that needs to defer
// some cleanup. The method's "generation" is returned.
func (s *CleanupFuncStacks) newMethod() int {
  s.stacks = append(s.stacks, []NamedFunc{})
  return len(s.stacks) - 1
}

func (s *CleanupFuncStacks) pushNewCleanup(fn NamedFunc, generation int) {
  s.stacks[generation] = append(s.stacks[generation], fn)
}

func (s *CleanupFuncStacks) Run() {
  for i := range (s.stacks) - 1; i >= 0; i-- {
    for fn := range s.stacks[i] {
      fn()
    }
}

The idea is that we'd introduce a cleaner API (I typed up just a prototype, feel free to modify it or discard it completely) which will be more explicit and less error-prone.


pkg/sql/colflow/vectorized_flow.go line 299 at r3 (raw file):

func (f *vectorizedFlow) Resume(ctx context.Context, newOutput execinfra.RowReceiver) {
	f.SetNewRowSyncFlowConsumer(newOutput)
	if f.batchFlowCoordinator == nil {

Since we're using the RowReceiver interface for this (because limitedCommandResult currently doesn't implement BatchReceiver interface), f.batchFlowCoordinator is always nil, so we can simplify this function and avoid the introduction of RunImpl.


pkg/sql/colflow/vectorized_flow.go line 1363 at r3 (raw file):

	receiver execinfra.RowReceiver,
) {
	if fc, ok := r.processors[0].(*FlowCoordinator); ok {

I think we actually can assume that there is exactly one processor here and it must be FlowCoordinator, so if that's not the case, I'd return an assertion failed error.


pkg/sql/execinfra/base.go line 55 at r2 (raw file):

	// portal later. If the cluster setting sql.defaults.multiple_active_portals.enabled
	// is set to be true, we do nothing and return the control to the connExecutor.
	SwitchToAnotherPortal

nit: let's move this value higher - between NeedMoreRows and DrainRequested. The ordering here is implicit about the transitions between different statuses.


pkg/sql/execinfra/base.go line 205 at r2 (raw file):

				dst.ProducerDone()
				return
			case SwitchToAnotherPortal:

We probably need to handle this new ConsumerStatus in other places. E.g. in DrainAndForwardMetadata I think we want to return an assertion error if we see SwitchToAnotherPortal; but in RowChannel.Push it might be expected to see such status, but perhaps it also should be an assertion failure (I think we only use RowChannels if we have concurrency between processors within a flow, and for the initial version we don't allow any concurrency IIRC).


pkg/sql/flowinfra/flow.go line 91 at r3 (raw file):

	SetTxn(*kv.Txn)

	// SetNewRowSyncFlowConsumer set the new SetNewRowSyncFlowConsumer for a flow.

nit: I'd reword the first sentence as something like "SetNewRowSyncFlowConsumer updates the Flow with the new output".


pkg/sql/flowinfra/flow.go line 104 at r3 (raw file):

	Start(context.Context) error

	// Resume is similar to Run but without waiting the all flow to return and

nit: s/all flow/all goroutines/g.


pkg/sql/sem/tree/stmt.go line 132 at r2 (raw file):

	}
	switch t := stmt.(type) {
	case *Select:

What if stmt is not *Select? Currently we return true for those cases, should we be more cautious and, instead, return false by default?

@ZhouXing19
Copy link
Collaborator Author

-- commits line 28 at r3:

Previously, yuzefovich (Yahor Yuzefovich) wrote…

(Not directly related to this commit, but) something that we should confirm / do is that in case the node is being drained, we might need to be able to clean up paused portals.

The important thing is to confirm that the paused portals will not block draining from proceeding (I think this should already be the case since in pgwire.Server.drainImpl we are canceling all sessions after some timeout, and paused portals present on this node will always have a corresponding session). This needs to be done for 23.1.

A nice-to-have is ensuring that all flows of paused portals are properly cleaned up (an example of when this might matter is if we have a query that spilled to disk - i.e. created some temp storage files on disk - without proper shutdown of the flow, those files will not be deleted). This can wait, but a TODO seems warranted (if not implemented already).

This is a good point. I now have the portals closed when closing the conn executor (now I call it only when the close type is normalClose and externalTxnClose but maybe it should be called whenever we close the connEx?), and the function in the cleanup stack will be called to clean the flow. IIUC when we're draining a node, if there's an active connection, we close the conn executor.

Do you think this is sufficient to solve this concern? Or maybe we should still explicitly cleanup the portals in pgwire.Server.drainImpl?

Copy link
Collaborator Author

@ZhouXing19 ZhouXing19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rafiss and @yuzefovich)


-- commits line 36 at r3:

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I haven't read the code for this yet, but it'd be nice if we preserved the ability to cancel query by queryID (see connExecutor.addActiveQuery). We also need to think through whether the paused portals are shown as "active queries".

Yes, we still call addActiveQuery, but just only once (in the first-time execution of the portal)


pkg/sql/conn_io.go line 663 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: s/isForPausablePortal/forPausablePortal/g for consistency with all other places.

Done


pkg/sql/distsql_physical_planner.go line 884 at r3 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: s/return/returns/g.

Done.


pkg/sql/distsql_running.go line 1508 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: this comment is now stale, let's mention multiple active portals.

Done.


pkg/sql/distsql_running.go line 1512 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: perhaps add "to true" to make it more clear? Also, perhaps mention that the feature is currently in the preview in this message.

Done.


pkg/sql/distsql_running.go line 851 at r3 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: s/flowCleanup/portalPauseInfo.flowCleanup/g to be more specific since we don't have flowCleanup somewhere close to this comment.

Done.


pkg/sql/distsql_running.go line 881 at r3 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

There is a lot of code in this function that we don't really need to execute when we're resuming the paused portal, so I think it might be better to lift the call to flow.Resume a little bit higher in the stack.

In particular, given that for now we don't allow any sub- nor post-queries, perhaps the best place would be to put it in connExecutor.execWithDistSQLEngine method before (or maybe after) we create new PlanningCtx. This Resume call will replace PlanAndRunAll call when we're resuming the paused portal. This will allow to by-pass unnecessary code (like physical planning and different stuff in DistSQLPlanner.Run() method), and it seems also logically sound. Thoughts?

Update: I realized that you do want to call Resume even for the very first execution with the current implementation. The only thing that you need though is for Run to not call Wait. Perhaps, we should change the signature of Flow.Run to take an additional option that would indicate whether Wait should be deferred or skipped entirely. This way we can still lift the call to Resume higher up the stack and keep the call to Run here (although the new parameter would depend on whether we have "portal pause info" or not).

Good idea. I removed the Resume function and added a noWait arg to the run function.
Note that with reusing the flow without re-generating the physical plan, we have to store the output types for the new receiver. So added a new field outputTypes []*types.T in sql.portalPauseInfo


pkg/sql/prepared_stmt.go line 140 at r3 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: s/saved info/is the saved info/g.

Done.


pkg/sql/prepared_stmt.go line 207 at r3 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: does this need to be exported? Ditto for NamedFunc and all the methods. I'd assume that all of the code would be inside sql package, so we don't need to export anything.

Done.


pkg/sql/prepared_stmt.go line 213 at r3 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: why do we allocate a zero-length slice here? Could we just leave unset since we're appending below?

Removed.


pkg/sql/prepared_stmt.go line 242 at r3 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: s/wrap/wraps/g.

Done.

Code quote:

instrumentationHelperWrapper wrap

pkg/sql/prepared_stmt.go line 259 at r3 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: s/that is should/that should/g.

Done.


pkg/sql/colflow/vectorized_flow.go line 299 at r3 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Since we're using the RowReceiver interface for this (because limitedCommandResult currently doesn't implement BatchReceiver interface), f.batchFlowCoordinator is always nil, so we can simplify this function and avoid the introduction of RunImpl.

Have it removed.


pkg/sql/colflow/vectorized_flow.go line 1363 at r3 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I think we actually can assume that there is exactly one processor here and it must be FlowCoordinator, so if that's not the case, I'd return an assertion failed error.

Done.


pkg/sql/execinfra/base.go line 55 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: let's move this value higher - between NeedMoreRows and DrainRequested. The ordering here is implicit about the transitions between different statuses.

Done.


pkg/sql/execinfra/base.go line 205 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

We probably need to handle this new ConsumerStatus in other places. E.g. in DrainAndForwardMetadata I think we want to return an assertion error if we see SwitchToAnotherPortal; but in RowChannel.Push it might be expected to see such status, but perhaps it also should be an assertion failure (I think we only use RowChannels if we have concurrency between processors within a flow, and for the initial version we don't allow any concurrency IIRC).

Great point, added panic messages (as I'm not sure if we want to change the signature of the push and drain functions)


pkg/sql/flowinfra/flow.go line 91 at r3 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: I'd reword the first sentence as something like "SetNewRowSyncFlowConsumer updates the Flow with the new output".

Oops thanks. The original sentence doesn't make sense at all 😅


pkg/sql/flowinfra/flow.go line 104 at r3 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: s/all flow/all goroutines/g.

Had this function removed.


pkg/sql/sem/tree/stmt.go line 132 at r2 (raw file):
Yeah this gate is far from ideal. I'm wondering what kind of function should we allow. Only select? If so, how to filter out selects with potential data writes?
Have added more comments for this as well.

should we be more cautious and, instead, return false by default?

Good idea! Changed.

@ZhouXing19
Copy link
Collaborator Author

Thanks for the review! I made the fixes except for the cleanupStacks struct -- I may need more time to think about the redesign.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 22 of 22 files at r7, 9 of 9 files at r8, 13 of 13 files at r9.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rafiss and @ZhouXing19)


-- commits line 28 at r3:

now I call it only when the close type is normalClose and externalTxnClose but maybe it should be called whenever we close the connEx?

Hm, with panicClose I think other regular flows (i.e. not related to portals) don't get cleaned up properly either, so it seems ok to also not clean up properly for the portals too. Up to you though - it won't hurt if we close all portals on any type of closure of the connExecutor.

Do you think this is sufficient to solve this concern? Or maybe we should still explicitly cleanup the portals in pgwire.Server.drainImpl?

Yes, sounds good to me.

A related question: do we want to allow the internal executor to run multiple active portals? I don't think so, so we probably should just have a check somewhere that only non-internal connExecutor attempts to run multiple active portals.


pkg/sql/distsql_running.go line 881 at r3 (raw file):

Previously, ZhouXing19 (Jane Xing) wrote…

Good idea. I removed the Resume function and added a noWait arg to the run function.
Note that with reusing the flow without re-generating the physical plan, we have to store the output types for the new receiver. So added a new field outputTypes []*types.T in sql.portalPauseInfo

Looks good!


pkg/sql/distsql_running.go line 843 at r9 (raw file):

			ctx, evalCtx, planCtx, leafInputState, flows, recv, localState, statementSQL,
		)
		if m := planCtx.getPortalPauseInfo(); m != nil {

nit: we already have m in scope, so no need to call getPortalPauseInfo for the second time, and we can just check m != nil.


pkg/sql/planner.go line 276 at r9 (raw file):

func (p *planner) resumeFlowForPausablePortal(ctx context.Context, recv *DistSQLReceiver) error {
	if !p.hasFlowForPausablePortal() {
		panic("no flow found for pausable portal")

nit: maybe return this as assertion failure error?


pkg/sql/planner.go line 284 at r9 (raw file):

		return err
	}
	p.portal.pauseInfo.flow.Run(ctx, true /* noWait */)

nit: could just use f.


pkg/sql/prepared_stmt.go line 164 at r3 (raw file):

		// portal hooks on.
		telemetry.Inc(sqltelemetry.MultipleActivePortalCounter)
		if tree.IsReadOnly(stmt.AST) && !isInternal {

Oh, this is the answer to one of my other comments - I see that here we used to require non-internal executor in the first revision. Why did this change?


pkg/sql/colflow/vectorized_flow.go line 292 at r9 (raw file):

func (f *vectorizedFlow) SetNewRowSyncFlowConsumer(receiver execinfra.RowReceiver) error {
	_ = f.FlowBase.SetNewRowSyncFlowConsumer(receiver)

nit: add a quick comment here that this method cannot return non-nil error (or add a quick error check).


pkg/sql/execinfra/base.go line 205 at r2 (raw file):

Previously, ZhouXing19 (Jane Xing) wrote…

Great point, added panic messages (as I'm not sure if we want to change the signature of the push and drain functions)

Panicking in DrainAndForwardMetadata sounds good to me, but I'm not sure about the row channel.

Row channels are only used if the query is using the old row-by-row engine, and ideally it'd support multiple active portals too, so we might want to be less strict there. How about something like this:

--- a/pkg/sql/execinfra/base.go
+++ b/pkg/sql/execinfra/base.go
@@ -457,6 +457,12 @@ func (rc *RowChannel) Push(
        switch consumerStatus {
        case NeedMoreRows:
                rc.dataChan <- RowChannelMsg{Row: row, Meta: meta}
+       case SwitchToAnotherPortal:
+               // We currently don't expect this status, so we propagate an assertion
+               // failure as metadata.
+               m := execinfrapb.GetProducerMeta()
+               m.Err = errors.AssertionFailedf("multiple active portals are not expected with the row channel")
+               rc.dataChan <- RowChannelMsg{Meta: m}
        case DrainRequested:
                // If we're draining, only forward metadata.
                if meta != nil {

This will make it so that the query will error out, but the process won't crash (if we happen to use multiple active portals and RowChannels).


pkg/sql/execinfra/base.go line 250 at r8 (raw file):

		case DrainRequested:
		case SwitchToAnotherPortal:
			panic("current consumer is drained, cannot be paused and switch to another portal")

nit: s/switch/switched/g.


pkg/sql/flowinfra/flow.go line 502 at r9 (raw file):

	var err error
	if err = f.StartInternal(ctx, otherProcs); err != nil {

I still think that having a separate Resume method in the Flow interface is the right way to go. In particular, we want to skip this StartInternal call when resuming. The implementation of Resume should be very simple - simply call headProc.Resume (see my comment below on updating the output) with the new output.


pkg/sql/rowflow/row_based_flow.go line 78 at r9 (raw file):

func (f *rowBasedFlow) SetNewRowSyncFlowConsumer(receiver execinfra.RowReceiver) error {
	return f.FlowBase.SetNewRowSyncFlowConsumer(receiver)

Unfortunately, this won't just work for the row-based flows. Here we also need to explicitly update one of the processors (like we did with the FlowCoordinator in the vectorized flow). I think we can just assume FlowBase.processors also has exactly 1 processor, and we need to update it with the new output.

This likely will be more annoying to do that in the vectorized flow case though. Here any type of processor can be the "root", and the simplest way to achieve what we need (at least for most cases) would be to add a new method to execinfra.Processor interface (like UpdateOutput) and then make ProcessorBaseNoHelper implement it while making other implementations of execinfra.Processor interface (like backfiller and csvWriter) simply return an unimplemented error.

Probably rather than introducing UpdateOutput method, we actually should introduce Resume method that would take in the updated output and would call execinfra.Run method with it. Crucially, ProcessorBaseNoHelper.Run currently calls pb.self.Start - which is expected to be called only once - so we should not call Run multiple times.

This would look something like this:

--- a/pkg/sql/execinfra/processorsbase.go
+++ b/pkg/sql/execinfra/processorsbase.go
@@ -733,6 +733,12 @@ func (pb *ProcessorBaseNoHelper) Run(ctx context.Context) {
        Run(pb.ctx, pb.self, pb.Output)
 }
 
+// Resume is part of the Processor interface.
+func (pb *ProcessorBaseNoHelper) Resume(output RowReceiver) {
+       // Use the same context that was passed on the Run call.
+       Run(pb.ctx, pb.self, output)
+}
+

Another idea that might clean things up is to not store the output RowReceiver in the processor at all, and instead pass it as argument to execinfra.Processor.Run method. I think it would be a nice cleanup that should be done in a separate commit. This way we will be able to remove "update output on flow coordinator" business entirely, and here for the row-based flows too. I'll actually type up the commit to do so myself, will send a patch shortly.


pkg/sql/sem/tree/stmt.go line 132 at r2 (raw file):

Previously, ZhouXing19 (Jane Xing) wrote…

Yeah this gate is far from ideal. I'm wondering what kind of function should we allow. Only select? If so, how to filter out selects with potential data writes?
Have added more comments for this as well.

should we be more cautious and, instead, return false by default?

Good idea! Changed.

The current code looks good to me, but I don't really have a good intuition for this. It might be worth asking in #sql slack channel for ideas.

@ZhouXing19
Copy link
Collaborator Author

pkg/sql/prepared_stmt.go line 164 at r3 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Oh, this is the answer to one of my other comments - I see that here we used to require non-internal executor in the first revision. Why did this change?

Oh this still requires non-internal bind stmt, I put it in the later commit sql: add restrictions for pausable portals: 4197e9b#diff-7c5a26942e53c921a48fb4a83f4482df389c106e396179c1ef70eda12bd2a72fL165

I don't think we want to enable this feature for internal queries because 1. interleaving the portal is not a common usage for internal executors; 2. running all internal read-only queries with local plan may bring significant performance degradation.

Copy link
Collaborator Author

@ZhouXing19 ZhouXing19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rafiss and @yuzefovich)


pkg/sql/distsql_running.go line 843 at r9 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: we already have m in scope, so no need to call getPortalPauseInfo for the second time, and we can just check m != nil.

Done.


pkg/sql/planner.go line 276 at r9 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: maybe return this as assertion failure error?

Done.


pkg/sql/planner.go line 284 at r9 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: could just use f.

Done.


pkg/sql/colflow/vectorized_flow.go line 292 at r9 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: add a quick comment here that this method cannot return non-nil error (or add a quick error check).

Added the error check.


pkg/sql/execinfra/base.go line 205 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Panicking in DrainAndForwardMetadata sounds good to me, but I'm not sure about the row channel.

Row channels are only used if the query is using the old row-by-row engine, and ideally it'd support multiple active portals too, so we might want to be less strict there. How about something like this:

--- a/pkg/sql/execinfra/base.go
+++ b/pkg/sql/execinfra/base.go
@@ -457,6 +457,12 @@ func (rc *RowChannel) Push(
        switch consumerStatus {
        case NeedMoreRows:
                rc.dataChan <- RowChannelMsg{Row: row, Meta: meta}
+       case SwitchToAnotherPortal:
+               // We currently don't expect this status, so we propagate an assertion
+               // failure as metadata.
+               m := execinfrapb.GetProducerMeta()
+               m.Err = errors.AssertionFailedf("multiple active portals are not expected with the row channel")
+               rc.dataChan <- RowChannelMsg{Meta: m}
        case DrainRequested:
                // If we're draining, only forward metadata.
                if meta != nil {

This will make it so that the query will error out, but the process won't crash (if we happen to use multiple active portals and RowChannels).

Nice, thanks!

I also notice that in rowexec.emitHelper() we also check the consumer's status. Do we need to add the case for SwitchToAnotherPortal as well? Seems that it's mainly used by samplerProcessor, and I'm curious about in what scenario we use it.


pkg/sql/execinfra/base.go line 250 at r8 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: s/switch/switched/g.

Done.

@ZhouXing19
Copy link
Collaborator Author

pkg/sql/prepared_stmt.go line 266 at r3 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Currently we define three separate stacks, or - in other words - three separate one-dimensional slices. What about the idea to have a single two-dimensional struct? Imagine something like:

type CleanupFuncStacks struct {
  stacks [][]NamedFunc
}

// newMethod should be called whenever we enter a new method that needs to defer
// some cleanup. The method's "generation" is returned.
func (s *CleanupFuncStacks) newMethod() int {
  s.stacks = append(s.stacks, []NamedFunc{})
  return len(s.stacks) - 1
}

func (s *CleanupFuncStacks) pushNewCleanup(fn NamedFunc, generation int) {
  s.stacks[generation] = append(s.stacks[generation], fn)
}

func (s *CleanupFuncStacks) Run() {
  for i := range (s.stacks) - 1; i >= 0; i-- {
    for fn := range s.stacks[i] {
      fn()
    }
}

The idea is that we'd introduce a cleaner API (I typed up just a prototype, feel free to modify it or discard it completely) which will be more explicit and less error-prone.

(I'm still thinking about the redesign, but just would like to explain the current implementation)

I like having the newMethod() return the method's generation. The concern is where to store the generation id for each function layer. I think it's needed when we get an error and exit in the middle of a portal execution.

Generally, the cleanup is expected to happen in 2 cases:

  1. Closing the portal, which can happen when sql transaction is committed/rollback-ed, the connection is closed, or the user sends a Close pgwire message
  2. When we encounter an error executing (or re-executing) a portal.

The first case is relatively easy -- just all cleanup funcs with LIFO.

For the second case, my current implementation is that: when an error happens in a layer, run cleanup for its own and its children layer, propagate the error to the parent layer and let the parent cleanup itself. The reason is to preserve as much as possible the original cleanup order. (But I'm hesitating the necessity of this)

For example, if there's an error in execStmtInOpenState(), we first clean up the flow (i.e., the children layer, if the error happens after the flow is assigned), and then run all the cleanup functions added in execStmtInOpenState() (e.g., cancel query). We let the defer in execPortal to finally exhaust the portal.

This requires us always to know what generation we're at when we encounter an error in a function. (i.e., in execPortal is 0, execStmtOpenState is 1, DistSQLPlanner.PlanAndRunAll() is 2). But I'm unsure how to preserve the generation id after we push to the stacks.

(Sorry if I explained it too confusing -- the sql: set the clean-up steps for pausable portal commit is basically the implementation of this idea)

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I made it all the way this time 😃

I'm a bit concerned by the amount of changes of this PR, in particular about some observability metrics. (If there are correctness or performance regressions, then because this code is on the main path, those regressions should be surfaced easily; however, for observability things we don't really have good testing coverage.) My perspective on this is that it seems ok if the observability of the portals' execution is poor. However, what seems not ok is if we inflate some cluster-wide metrics due to each pausable portal execution making the cluster-wide metrics misleading.

Reviewed 21 of 21 files at r13, 13 of 13 files at r14, 4 of 4 files at r15, 6 of 6 files at r16, 2 of 2 files at r17, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rafiss and @ZhouXing19)


pkg/sql/conn_executor.go line 1140 at r15 (raw file):

	// Close all portals, otherwise there will be leftover bytes.
	ex.extraTxnState.prepStmtsNamespace.closeAllPortals(

Should we also do this for prepStmtsNamespaceAtTxnRewindPos? It might be unnecessary (if we expect that we always call resetTo to reconcile the two namespaces), but I don't think it'll hurt if we do.


pkg/sql/conn_executor_exec.go line 213 at r15 (raw file):

			if !portal.pauseInfo.exhaustPortal.isComplete {
				portal.pauseInfo.exhaustPortal.appendFunc(namedFunc{fName: "exhaust portal", f: func() {
					if _, ok := ex.extraTxnState.prepStmtsNamespace.portals[portalName]; ok {

What is the scenario when we will get ok==false here? Consider dropping a comment about this.


pkg/sql/conn_executor_exec.go line 219 at r15 (raw file):

				portal.pauseInfo.exhaustPortal.isComplete = true
			}
			// If we encountered an error when executing a pausable, clean up the retained

nit: s/a pausable/a pausable portal/g.


pkg/sql/conn_executor_exec.go line 251 at r15 (raw file):

		}
		ev, payload, err = ex.execStmt(ctx, portal.Stmt.Statement, &portal, pinfo, stmtRes, canAutoCommit)
		// For a non-pausable portal, it is considered exhausted regardless of the \

nit: stray slash at the end of the line.


pkg/sql/conn_executor_exec.go line 273 at r15 (raw file):

// adding the function to the execStmtCleanup.
// Otherwise, perform the clean-up step within every execution.
func processCleanupFunc(portal *PreparedPortal, fName string, f func()) {

nit: consider making it more clear that this function is specific to execStmtCleanup. One option would be to rename this function, another would be to define the function within execStmtInOpenState function itself - that way we'd only see processCleanupFunc within execStmtInOpenState, so there would be no confusion.


pkg/sql/conn_executor_exec.go line 307 at r15 (raw file):

	defer func() {
		// If there's any error, do the cleanup right here.
		if (retErr != nil || payloadHasError(retPayload)) && portal != nil && portal.isPausable() {

nit: could use isPausablePortal helper here.


pkg/sql/conn_executor_exec.go line 323 at r15 (raw file):

			// flow, we will hit `span used after finished` whenever we log an event
			// when cleaning up the flow.
			// TODO(janexing): maybe we should have 2 sets of span -- one for flow

I think your current approach is correct - the processors in the flow capture the context that was passed in on Start call (which should be called only once, on the first Flow.Run call), so we need to keep the tracing span outlive the flow. I'd remove this TODO.


pkg/sql/conn_executor_exec.go line 340 at r15 (raw file):

	var queryID clusterunique.ID

	if portal != nil && portal.isPausable() {

nit: could use helper here too.


pkg/sql/conn_executor_exec.go line 357 at r15 (raw file):

	isExtendedProtocol := portal != nil && portal.Stmt != nil

nit: stray line.


pkg/sql/conn_executor_exec.go line 375 at r15 (raw file):

	var cancelQuery context.CancelFunc
	ctx, cancelQuery = contextutil.WithCancel(ctx)

We should be deriving a new context here only on the first execution, so we need to store cancelQuery in the pause info as well. We also probably need to store this particular ctx that was derived on the first execution since we do examine ctx.Err() below.


pkg/sql/conn_executor_exec.go line 478 at r15 (raw file):

	if ex.executorType != executorTypeInternal {
		ex.metrics.EngineMetrics.SQLActiveStatements.Inc(1)

This needs to be synchronized with the decrement that we do in "cancel query" cleanup. We either need to increment here only on the first execution or decrement in "cancel query" cleanup func every time (i.e. do it outside of the deferred cleanup func). Given the description of this metric "Number of currently active user SQL statements" I think we should do the latter - i.e. to not consider the paused portal as active, so we want to increment here and decrement above every time.


pkg/sql/conn_executor_exec.go line 536 at r15 (raw file):

	// for EXPLAIN ANALYZE (in order to support EXPLAIN ANALYZE EXECUTE) but
	// before setting up the instrumentation helper.
	if e, ok := ast.(*tree.Execute); ok {

Do we need to make any changes in this if block? I think that the way the paused portal is resumed is via EXECUTE p, maybe with MaxRows parameter, so we should be hitting this code path on all executions. Perhaps we only want to execute this block only on the first run?


pkg/sql/conn_executor_exec.go line 569 at r15 (raw file):

	var needFinish bool
	// For pausable portal, the instrumentation helper needs to be setup only when

nit: s/to be setup/to be set up/g.


pkg/sql/conn_executor_exec.go line 586 at r15 (raw file):

	// ih should never have the same address, otherwise changing the former will
	// change the latter, and we will never be able to persist it.
	if portal != nil && portal.isPausable() {

nit: could use isPausablePortal helper here.


pkg/sql/conn_executor_exec.go line 598 at r15 (raw file):

		sql := stmt.SQL
		defer func() {
			processCleanupFunc(portal, "finish ih", func() {

nit: maybe expend ih to instrumentationHelper?


pkg/sql/conn_executor_exec.go line 615 at r15 (raw file):

	}

	if portal != nil && portal.isPausable() && !portal.pauseInfo.execStmtCleanup.isComplete {

nit: could use isPausablePortal helper here.


pkg/sql/conn_executor_exec.go line 619 at r15 (raw file):

	}

	if ex.sessionData().TransactionTimeout > 0 && !ex.implicitTxn() && ex.executorType != executorTypeInternal {

Do we need to think through how txn timeout and stmt timeout are applied? If we don't make any changes, then every time we resume the portal it gets full timeout "budget".

Perhaps this is good (or good enough for now). We might want to add some simple tests for this behavior - like use a query PREPARE p AS SELECT i, pg_sleep(1) FROM generate_series(1, 3) AS g(i); and a stmt timeout of 2s seconds - with the current implementation if we execute this prepared statement three times with MaxRows: 1, then this portal should succeed. It might be worth checking what postgres does in such scenario.


pkg/sql/conn_executor_exec.go line 682 at r15 (raw file):

		}
		if canAutoCommit && !isExtendedProtocol {
			retEv, retPayload = ex.handleAutoCommit(ctx, ast)

nit: consider adding a quick comment that we don't need to defer this for pausable portals because they can happen only in the extended protocol.


pkg/sql/conn_executor_exec.go line 779 at r15 (raw file):

	// don't return any event unless an error happens.

	if err := ex.handleAOST(ctx, ast); err != nil {

I feel like this either should only be done on the first execution of the portal or skipped completely (we do call handleAOST in connExecutor.prepare).


pkg/sql/conn_executor_exec.go line 1238 at r15 (raw file):

	}

	ex.extraTxnState.prepStmtsNamespace.closeAllPortals(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc)

Does postgres also close all portals on COMMIT or ROLLBACK? I didn't find the answer in the docs.


pkg/sql/conn_executor_exec.go line 1351 at r15 (raw file):

// expected that the caller will inspect res and react to query errors by
// producing an appropriate state machine event.
func (ex *connExecutor) dispatchToExecutionEngine(

I think this function will also need to be made into an additional stage of the cleanup stack. There are some steps in here that we want to perform only once (e.g. makeExecPlan) in the beginning and in the defer (e.g. recordStatementSummary and planner.curPlan.close). However, there are other steps that we do need to on every execution (e.g. initStatementResult), so we cannot lift the call to Flow.Resume above this method.


pkg/sql/conn_executor_exec.go line 1356 at r15 (raw file):

	stmt := planner.stmt
	ex.sessionTracing.TracePlanStart(ctx, stmt.AST.StatementTag())
	ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.PlannerStartLogicalPlan, timeutil.Now())

Currently, the phase timing will be messed up for the paused portals. It seems ok for the preview, so for now I'd leave a TODO and keep all phase-time-related things done every time.


pkg/sql/conn_executor_exec.go line 1523 at r15 (raw file):

	ex.extraTxnState.rowsWritten += stats.rowsWritten

	populateQueryLevelStatsAndRegions(ctx, planner, ex.server.cfg, &stats, &ex.cpuStatsCollector)

We'll need to be careful to aggregate query level stats across all executions.


pkg/sql/conn_executor_exec.go line 1601 at r15 (raw file):

	// Get the query-level stats.
	var flowsMetadata []*execstats.FlowsMetadata
	for _, flowInfo := range p.curPlan.distSQLFlowInfos {

For this to work correctly I think we'll need to preserve curPlan from the first execution and then update the planner with it on every execution.


pkg/sql/conn_executor_exec.go line 1437 at r16 (raw file):

	distSQLMode := ex.sessionData().DistSQLMode
	// We only allow non-distributed plan for pausable portals.
	if planner.portal != nil {

Shouldn't we also check that this portal is pausable? I.e. it currently seems like we disable DistSQL even for non-pausable ones.


pkg/sql/conn_executor_exec.go line 1927 at r16 (raw file):

				return factoryEvalCtx
			}
			// We don't sub / post queries for pausable portal. Set it back to an

nit: s/don't/don't allow/g.


pkg/sql/conn_executor_prepare.go line 499 at r13 (raw file):

) error {
	if _, ok := ex.extraTxnState.prepStmtsNamespace.portals[portalName]; ok {
		panic(errors.AssertionFailedf("portal already exists: %q", portalName))

Why are there no longer errors?


pkg/sql/conn_io.go line 273 at r16 (raw file):

	internalArgs []tree.Datum

	// isInternal is set ture when the bind stmt is from an internal executor.

nit: s/ture/to true/g.


pkg/sql/conn_io.go line 976 at r16 (raw file):

// UnsetForPausablePortal is part of the sql.RestrictedCommandResult interface.
func (r *streamingCommandResult) UnsetForPausablePortal() {

nit: maybe return an assertion failedf error for streamingCommandResult and commandResult when this method is called?


pkg/sql/distsql_running.go line 1584 at r15 (raw file):

		}
	}()
	defer planner.curPlan.close(ctx)

This should be deferred for pausable portals - we should be reusing the same plan as I mentioned above.


pkg/sql/distsql_running.go line 1609 at r15 (raw file):

	func() {
		finishedSetupFn, cleanup := getFinishedSetupFn(planner)
		defer cleanup()

This needs to be deferred.


pkg/sql/distsql_running.go line 1614 at r15 (raw file):

		)
	}()
	checkFlowForPortal := func() error {

nit: this function is called in a single place, so let's just inline it there.


pkg/sql/distsql_running.go line 1616 at r15 (raw file):

	checkFlowForPortal := func() error {
		if planCtx.getPortalPauseInfo().flow == nil {
			return errors.Newf("flow for portal %s cannot be found", planner.portal.Name)

nit: let's make this assertion failedf error.


pkg/sql/distsql_running.go line 1622 at r15 (raw file):

	if p := planCtx.getPortalPauseInfo(); p != nil {
		if checkErr := checkFlowForPortal(); checkErr != nil {

IIUC this is just a sanity check that we stored the flow. WDYT about hiding this check behind buildutil.CrdbTestBuild flag?


pkg/sql/distsql_running.go line 1626 at r15 (raw file):

				recv.commErr = errors.CombineErrors(recv.commErr, checkErr)
			} else {
				panic(checkErr)

nit: let's not panic here - just return the error.


pkg/sql/distsql_running.go line 1632 at r15 (raw file):

			p.flow.Cleanup(ctx)
		} else if !p.flowCleanup.isComplete {
			flow := planCtx.getPortalPauseInfo().flow

nit: can use p here.


pkg/sql/distsql_running.go line 1635 at r15 (raw file):

			p.flowCleanup.appendFunc(namedFunc{
				fName: "cleanup flow", f: func() {
					flow.GetFlowCtx().Mon.RelinquishAllOnReleaseBytes()

Why do we need to do RelinquishAllOnReleaseBytes here? This is surprising to me.


pkg/sql/distsql_running.go line 844 at r17 (raw file):

		)
		if m != nil {
			m.flow = flow

We should add a check here that this flow runs in a single goroutine. This will require some adjustments to the flowinfra, but it can wait, so perhaps just add something like this:

// TODO(yuzefovich): add a check that this flow runs in a single goroutine.

pkg/sql/prepared_stmt.go line 164 at r3 (raw file):

Previously, ZhouXing19 (Jane Xing) wrote…

Oh this still requires non-internal bind stmt, I put it in the later commit sql: add restrictions for pausable portals: 4197e9b#diff-7c5a26942e53c921a48fb4a83f4482df389c106e396179c1ef70eda12bd2a72fL165

I don't think we want to enable this feature for internal queries because 1. interleaving the portal is not a common usage for internal executors; 2. running all internal read-only queries with local plan may bring significant performance degradation.

Ack, I agree that we should only allow this feature for non-internal queries.


pkg/sql/prepared_stmt.go line 266 at r3 (raw file):
Alright, now that I read through the whole PR, your solution seems reasonable.

For the second case, my current implementation is that: when an error happens in a layer, run cleanup for its own and its children layer, propagate the error to the parent layer and let the parent cleanup itself. The reason is to preserve as much as possible the original cleanup order. (But I'm hesitating the necessity of this)

I agree, this is the interesting case for the cleanup. My intuition says it would be to defer the cleanup of all layers to a single place (assuming that "parent" layers also defer their cleanups). However, striving to simulate the current cleanup model as close as possible seems warranted, so your approach seems good. If you do come up with a more ergonomic way to for the cleanup stack, that'd be a nice-to-have but doesn't seem required.


pkg/sql/prepared_stmt.go line 164 at r14 (raw file):

		// TODO(janexing): maybe we should also add telemetry for the stmt that the
		// portal hooks on.
		telemetry.Inc(sqltelemetry.MultipleActivePortalCounter)

The comment on the counter says

// MultipleActivePortalCounter is to be incremented every time the cluster setting
// sql.defaults.multiple_active_portals.enabled is set true.

but that's not really what is being counted right now. Right now we count the number of portals created (for all stmt types) when the setting is enabled. If we want to count the number of times the setting is set to true, we need to define "on change" callback enableMultipleActivePortals.SetOnChange(...).


pkg/sql/prepared_stmt.go line 237 at r14 (raw file):

}

// portalPauseInfo stored info that enables the pause of a portal. After pausing

nit: s/stored/stores/.


pkg/sql/execinfra/base.go line 205 at r2 (raw file):

Previously, ZhouXing19 (Jane Xing) wrote…

Nice, thanks!

I also notice that in rowexec.emitHelper() we also check the consumer's status. Do we need to add the case for SwitchToAnotherPortal as well? Seems that it's mainly used by samplerProcessor, and I'm curious about in what scenario we use it.

Yeah, it would probably be good to add an explicit case to emitHelper too where we push an assertion failure error. Same for processProducerMessage in flowinfra/inbound.go.


pkg/sql/rowflow/row_based_flow.go line 78 at r9 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Unfortunately, this won't just work for the row-based flows. Here we also need to explicitly update one of the processors (like we did with the FlowCoordinator in the vectorized flow). I think we can just assume FlowBase.processors also has exactly 1 processor, and we need to update it with the new output.

This likely will be more annoying to do that in the vectorized flow case though. Here any type of processor can be the "root", and the simplest way to achieve what we need (at least for most cases) would be to add a new method to execinfra.Processor interface (like UpdateOutput) and then make ProcessorBaseNoHelper implement it while making other implementations of execinfra.Processor interface (like backfiller and csvWriter) simply return an unimplemented error.

Probably rather than introducing UpdateOutput method, we actually should introduce Resume method that would take in the updated output and would call execinfra.Run method with it. Crucially, ProcessorBaseNoHelper.Run currently calls pb.self.Start - which is expected to be called only once - so we should not call Run multiple times.

This would look something like this:

--- a/pkg/sql/execinfra/processorsbase.go
+++ b/pkg/sql/execinfra/processorsbase.go
@@ -733,6 +733,12 @@ func (pb *ProcessorBaseNoHelper) Run(ctx context.Context) {
        Run(pb.ctx, pb.self, pb.Output)
 }
 
+// Resume is part of the Processor interface.
+func (pb *ProcessorBaseNoHelper) Resume(output RowReceiver) {
+       // Use the same context that was passed on the Run call.
+       Run(pb.ctx, pb.self, output)
+}
+

Another idea that might clean things up is to not store the output RowReceiver in the processor at all, and instead pass it as argument to execinfra.Processor.Run method. I think it would be a nice cleanup that should be done in a separate commit. This way we will be able to remove "update output on flow coordinator" business entirely, and here for the row-based flows too. I'll actually type up the commit to do so myself, will send a patch shortly.

Ok, #98651 is now merged, so I think it should be easier to implement the suggestions in my comment above.


pkg/sql/sem/tree/stmt.go line 132 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

The current code looks good to me, but I don't really have a good intuition for this. It might be worth asking in #sql slack channel for ideas.

I was talking to Raphael earlier today, and he mentioned that we might have some builtins that modify the state of the database. For example, crdb_internal.request_statement_bundle will insert a row into a system table. This observation indicates that it's quite difficult to come up with a reliable way to check that a stmt is truly "read-only". However, given that the feature will be in preview the current check might be sufficient.

BTW why do we choose to allow read-only queries? Is it because with writes we wouldn't be able to commit the writes until the portal is closed, increasing the contention?


pkg/sql/pgwire/testdata/pgtest/multiple_active_portals line 5 at r17 (raw file):

----

# TODO(janexing): is there a way to ignore the notice for cluster setting?

I think you can do until crdb_only ignore=NoticeResponse.


pkg/sql/pgwire/testdata/pgtest/multiple_active_portals line 370 at r17 (raw file):

----

# multiple acitve portal

nit: s/acitve/active, also this comment seems incomplete.


pkg/sql/pgwire/testdata/pgtest/multiple_active_portals line 371 at r17 (raw file):


# multiple acitve portal
until keepErrMessage

Let's also add a couple of simple queries that are not supported (like a schema change and a write query).


pkg/sql/pgwire/testdata/pgtest/multiple_active_portals line 386 at r17 (raw file):

{"Type":"DataRow","Values":[{"text":"2"}]}
{"Type":"PortalSuspended"}
{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: multiple active portals not supported, please set sql.defaults.multiple_active_portals.enabled to true. (Note this feature is in preview)"}

This error message is misleading - the cluster setting is set to true, but we refused to execute this query because it has a subquery. We should improve things here.

craig bot pushed a commit that referenced this pull request Mar 20, 2023
98353: kvserver: make some cluster settings system only r=andrewbaptist a=kvoli

Update cluster settings in the `kv/kvserver` pkg to be `SystemOnly`.
Previously, there were many cluster settings which which were
`TenantWritable`  or `TenantReadOnly`. These settings, even if altered
by a tenant have no effect.

There are settings which are not updated, due due to tests relying on
modifying the setting value using a non-system tenant. We ignore these
in this commit and defer to #98723 for handling these.

These settings are updated to be `SystemOnly`:

```
kv.bulk_io_write.max_rate
kv.bulk_sst.max_allowed_overage
kv.bulk_sst.target_size
kv.closed_timestamp.follower_reads_enabled
kv.log_range_and_node_events.enabled
kv.range_split.by_load_enabled
kv.range_split.load_cpu_threshold
kv.range_split.load_qps_threshold
kv.replica_stats.addsst_request_size_factor
kv.replication_reports.interval
server.shutdown.lease_transfer_wait
```

Resolves: #98347

Release note (ops change): Some KV server cluster settings are now system
only. These settings could previously be written or read by
tenants, however writing to these settings had no effect.

99037: kvserver: skip `TestStoreRangeSplitRaceUninitializedRHS` under race/deadlock r=erikgrinaker a=erikgrinaker

The Raft groups are unable to maintain quorum when stressed under race/deadlock.

Resolves #98840.

Epic: none
Release note: None

99052: sql: add `switchToAnotherPortal` signal for result consumer r=ZhouXing19 a=ZhouXing19

This PR is part of the effort to implement the multiple active portals. (Extracted from #96358)

---

### sql/settings: add sql.pgwire.multiple_active_portals.enabled cluster setting


This commit is to add a non-public `sql.pgwire.multiple_active_portals.enabled`
setting. This setting is only for a PREVIEWABLE feature.
With it set to true, all non-internal portals with read-only queries without sub/post
queries can be paused and resumed in an interleaving manner but are executed with
local plan.

---
### sql: add switchToAnotherPortal signal for result consumer

Previously, after pushing the result to the consumer, we only accept the
following actions as the next step:
1. Pushing more data from the same source to the same consumer;
2. Drain or close the pipeline.

This commit is to add another option: pause the current execution, and switch
to the execution to another portal. I.e. when receiving an `ExecPortal` cmd but
for another portal, we do nothing and return the control to the connExecutor.
This allows us to execute different portals interleaving-ly.

Epic: CRDB-17622

Release note (sql change): add a non-public `sql.pgwire.multiple_active_portals.enabled` setting. This setting is only for a PREVIEWABLE feature. With it set to true, all non-internal portals with read-only queries without sub/post queries can be paused and resumed in an interleaving manner but are executed with local plan.


99062: sql: deflake `TestTrace` r=yuzefovich a=erikgrinaker

This has been seen to flake in CI:

```
=== RUN   TestTrace/ShowTraceForVectorized/TracingOff/node-1
    trace_test.go:386: expected span: "session recording", got: "pendingLeaseRequest: requesting lease"
    trace_test.go:397: remaining span: "session recording"
    trace_test.go:397: remaining span: "sql query"
    trace_test.go:397: remaining span: "sql txn"
    trace_test.go:397: remaining span: "txn coordinator send"
            --- FAIL: TestTrace/ShowTraceForVectorized/TracingOff/node-1 (0.02s)
```

There was already an exception for this span, but with a `storage.` prefix. This patch removes the prefix, and makes it match on substrings.

This flake has possibly been made worse with the introduction of a metamorphic setting to only use expiration-based leases in ecc931b.

Resolves #98971.

Epic: none
Release note: None

Co-authored-by: Austen McClernon <austen@cockroachlabs.com>
Co-authored-by: Erik Grinaker <grinaker@cockroachlabs.com>
Co-authored-by: Jane Xing <zhouxing@uchicago.edu>
Copy link
Collaborator Author

@ZhouXing19 ZhouXing19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rafiss and @yuzefovich)


pkg/sql/conn_executor_exec.go line 619 at r15 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I agree that this can wait, and an issue to track this sounds good.

Made #99140 and added it to the test.


pkg/sql/conn_executor_exec.go line 1351 at r15 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

We shouldn't need optPlanningCtx after makeOptimizerPlan returns (which is done on the first iteration), so I don't think we need to reset planner.optPlanningCtx to that state on each re-execution.

I do see that we currently use planner.optPlanningCtx in instrumentationHelper.SetIndexRecommendations, which is called after the query execution has completed. I think we can call that sooner, so consider applying this diff

--- a/pkg/sql/conn_executor_exec.go
+++ b/pkg/sql/conn_executor_exec.go
@@ -1400,17 +1400,6 @@ func (ex *connExecutor) dispatchToExecutionEngine(
 
        populateQueryLevelStatsAndRegions(ctx, planner, ex.server.cfg, &stats, &ex.cpuStatsCollector)
 
-       // The transaction (from planner.txn) may already have been committed at this point,
-       // due to one-phase commit optimization or an error. Since we use that transaction
-       // on the optimizer, check if is still open before generating index recommendations.
-       if planner.txn.IsOpen() {
-               // Set index recommendations, so it can be saved on statement statistics.
-               // TODO(yuzefovich): figure out whether we want to set isInternalPlanner
-               // to true for the internal executors.
-               isInternal := ex.executorType == executorTypeInternal || planner.isInternalPlanner
-               planner.instrumentation.SetIndexRecommendations(ctx, ex.server.idxRecommendationsCache, planner, isInternal)
-       }
-
        // Record the statement summary. This also closes the plan if the
        // plan has not been closed earlier.
        stmtFingerprintID = ex.recordStatementSummary(
@@ -1709,6 +1698,12 @@ func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) erro
                ex.extraTxnState.numDDL++
        }
 
+       // Set index recommendations, so it can be saved on statement statistics.
+       // TODO(yuzefovich): figure out whether we want to set isInternalPlanner
+       // to true for the internal executors.
+       isInternal := ex.executorType == executorTypeInternal || planner.isInternalPlanner
+       planner.instrumentation.SetIndexRecommendations(ctx, ex.server.idxRecommendationsCache, planner, isInternal)
+
        return nil
 }
 

This moves the call to SetIndexRecommendations right after the planning is performed (which seems more appropriate than after the execution is done). This also allows us to remove the planner.txn.IsOpen() check since the txn should always be open before the execution and makes less things to think through for the cleanup of dispatchToExecutionEngine method.

Nice, thanks for explaining!
Will rebase after #99081 is merged.


pkg/sql/conn_executor_exec.go line 283 at r29 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: maybe rather than defining this as a function, just store it as a boolean.

Done.


pkg/sql/conn_executor_exec.go line 289 at r29 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: do we need to pass portal as the argument?

Removed


pkg/sql/conn_executor_exec.go line 621 at r29 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

+1 to this concern, my intuition says that ihWrapper.ih = *ih should work, but I'm guessing that Jane tried it and it didn't work - it'd be good to understand why it didn't.

Sorry for the confusion -- ihWrapper.ih = *ih should work, and I have changed it back. (Previously when using the debugger I mistakenly thought that sp was not successfully copied.)


pkg/sql/conn_executor_exec.go line 632 at r29 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: does this need to be value instrumentationHelper rather than a pointer to it?

Hm, ihToFinish is a value -- ih is a pointer.


pkg/sql/conn_executor_exec.go line 1368 at r29 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: probably would be good to do this for prepStmtsNamespaceAtTxnRewindPos (and in commitSQLTransactionInternal above) for completeness sake (although likely it doesn't matter).

Hm, since this function (resetExtraTxnState()) is called when we are transitioning the txn to txnRollback or txnRestart, I'm not sure if we would like to cancel prepStmtsNamespaceAtTxnRewindPos as well. For txnRollback that seems fine, but if it's txnRestart, we're about to "auto-retry a txn" and I think we will need to use prepStmtsNamespaceAtTxnRewindPos to reset prepStmtsNamespace. (I actually tried having prepStmtsNamespaceAtTxnRewindPos.closeAllPortals() here and it caused lots of portal not found errors in the CI).


pkg/sql/conn_executor_exec.go line 1413 at r29 (raw file):

Previously, rafiss (Rafi Shamim) wrote…

could the TODO have more details that says what is wrong right now?

This is suggested by @yuzefovich. IIUC, we would like to have the phase time marked only during the first execution or when we cleanup the portal (e.g. I think sessionphase.PlannerStartLogicalPlan should be bound to the query, rather than each execution). But just wanted to confirm if I understood it correctly.


pkg/sql/conn_executor_exec.go line 1457 at r29 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I have some minor improvements to curPlan.close logic in #98823, we'll need to rebase on top of that PR but it should make things a bit cleaner.

Yup will rebase from that PR.


pkg/sql/conn_executor_prepare.go line 496 at r30 (raw file):

Previously, rafiss (Rafi Shamim) wrote…

is this needed? why not check ex.executorType instead?

Done.


pkg/sql/distsql_running.go line 1609 at r15 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

No worries! This was merged very recently.

Done.


pkg/sql/prepared_stmt.go line 127 at r26 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Is this the correct cluster setting? Shouldn't it be sql.defaults.multiple_active_portals.enabled?

Done.


pkg/sql/prepared_stmt.go line 187 at r28 (raw file):
Thanks for the suggestions! I moved this counter as Yahor suggested.

What Rafi is suggesting is also a valid thing to count

I added a new MultipleActivePortalStmtCounter that is incremented before DistSQLPlanner.PlanAndRunAll(). I put it there because it has to be after we check if the query has sub/post queries.


pkg/sql/prepared_stmt.go line 230 at r28 (raw file):

Previously, rafiss (Rafi Shamim) wrote…

nit: this cleanupFuncStack could go into its own package, like pkg/sql/cleanupstack, but no need to do that right now.

Yup, will do it in a separate PR.


pkg/sql/prepared_stmt.go line 177 at r30 (raw file):

Previously, rafiss (Rafi Shamim) wrote…

why not check ex.executorType instead?

Done.


pkg/sql/flowinfra/flow.go line 123 at r27 (raw file):
I now store the ctx in execinfra.FlowCtx, but I'm not sure if this is correct -- I saw this comment in execinfra/processorsbase.go:

ctx and span contain the tracing state while the processor is active (i.e. hasn't been closed). Initialized using flowCtx.Ctx (which should not be otherwise used).

This seems to imply that there was a flowCtx.Ctx field, but now it's removed. If so, maybe I should directly have ctx field in FlowBase and vectorizedFlow?

Also, this may be a dumb q, but why do we want to retain such a flow-based ctx, rather than using the execution-based ctx? More generally, how to understand a ctx work? I only have a vague impression that ctx is used for tracing and "canceling actions" in a cascading fashion.


pkg/sql/flowinfra/flow.go line 500 at r27 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I think we want to use the new receiver here.

Done.


pkg/sql/pgwire/testdata/pgtest/multiple_active_portals line 232 at r31 (raw file):

Previously, rafiss (Rafi Shamim) wrote…

can you file the issue for that and put it in this comment?

Done.


pkg/sql/pgwire/testdata/pgtest/multiple_active_portals line 605 at r31 (raw file):

Previously, rafiss (Rafi Shamim) wrote…

allow what?

Added more lines.

@ZhouXing19 ZhouXing19 force-pushed the multiple-active-portal-0124 branch 2 times, most recently from df60d26 to 092ba41 Compare March 21, 2023 21:38
Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, first three commits look good to me, so let's extract them into a separate PR (many revisions on this PR make it harder to review).

Reviewed 2 of 3 files at r24, 7 of 9 files at r36, 27 of 27 files at r37, 12 of 12 files at r38, 6 of 6 files at r39.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rafiss and @ZhouXing19)


pkg/sql/distsql_running.go line 839 at r39 (raw file):

	var flow flowinfra.Flow
	var err error
	if m := planCtx.getPortalPauseInfo(); m != nil && m.flow != nil {

nit: what does m stand here for 😃 ? Perhaps use p (for "portal") or i (for "info).


pkg/sql/planner.go line 205 at r39 (raw file):

	StmtNoConstantsWithHomeRegionEnforced string

	// portal is set when the query is from a portal.

nit: s/portal is/pausablePortal is/g and /s/from a portal/from a pausable portal/g.


pkg/sql/planner.go line 268 at r39 (raw file):

// hasFlowForPausablePortal returns true if the planner is for re-executing a
// portal. We reuse the flow stored in p.portal.pauseInfo.

nit: the second sentence is now out of date.


pkg/sql/flowinfra/flow.go line 123 at r27 (raw file):

This seems to imply that there was a flowCtx.Ctx field, but now it's removed.

Yes, we used to have FlowContext.Ctx and removed it because it was confusing, so let's not introduce it back here.

If so, maybe I should directly have ctx field in FlowBase and vectorizedFlow?

Yes, this sounds good - let's add unexported resumeCtx context.Context field to FlowBase and vectorizedFlow objects with the comment that says that this resumeCtx is only captured for using inside of Flow.Resume implementations.

Also, this may be a dumb q, but why do we want to retain such a flow-based ctx, rather than using the execution-based ctx? More generally, how to understand a ctx work? I only have a vague impression that ctx is used for tracing and "canceling actions" in a cascading fashion.

Not a dumb question at all! The contract of execinfra.RowSource interface (which most Processors implement) is that the context that is passed on RowSource.Start must be used throughout the lifetime of the row source. This is done precisely so that it is easy to cancel all RowSources, possibly running concurrently, by canceling that particular context.

The way I think about Processors is that their whole lifetime is performing a single operation (e.g. rowexec.tableReader's goal is to read certain number of rows from some table), so it makes sense that we would use a single context.Context object to serve the corresponding request ("request" in this context is "read some rows"). Thus, we make the processors capture the context they are given on their initialization.

The initialization must only be done once (that's why I was pushing on having separate Flow.Resume and Processor.Resume methods - to avoid multiple calls to Start), so if we want to cancel the execution of Processor.Resume method, we must cancel the context that we used in Start. That's why I'm suggesting on not taking context.Context argument into the signature - this way the interface should be more intuitive, at the cost of making the implementations a bit more complicated (by forcing FlowBase and vectorizedFlow to store the context that was passed in Run).

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rafiss and @ZhouXing19)


pkg/sql/conn_executor.go line 3509 at r40 (raw file):

	case txnRollback:
		ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent)
		ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(

Why did this change? It'd be good to leave a comment in the code for why this difference exists.


pkg/sql/conn_executor_exec.go line 632 at r29 (raw file):

Previously, ZhouXing19 (Jane Xing) wrote…

Hm, ihToFinish is a value -- ih is a pointer.

We could make ihToFinish to be a pointer, right? Effectively, we'd have something like

ihToFinish := ih
if isPausablePortal {
  ihToFinish = &portal.pauseInfo.ihWrapper.ih
}
...

pkg/sql/conn_executor_exec.go line 1368 at r29 (raw file):

Previously, ZhouXing19 (Jane Xing) wrote…

Hm, since this function (resetExtraTxnState()) is called when we are transitioning the txn to txnRollback or txnRestart, I'm not sure if we would like to cancel prepStmtsNamespaceAtTxnRewindPos as well. For txnRollback that seems fine, but if it's txnRestart, we're about to "auto-retry a txn" and I think we will need to use prepStmtsNamespaceAtTxnRewindPos to reset prepStmtsNamespace. (I actually tried having prepStmtsNamespaceAtTxnRewindPos.closeAllPortals() here and it caused lots of portal not found errors in the CI).

Hm, ok, I can't say that I know what I'm talking about here, so whatever makes CI happy sounds good to me.


pkg/sql/conn_executor_exec.go line 1413 at r29 (raw file):

Previously, ZhouXing19 (Jane Xing) wrote…

This is suggested by @yuzefovich. IIUC, we would like to have the phase time marked only during the first execution or when we cleanup the portal (e.g. I think sessionphase.PlannerStartLogicalPlan should be bound to the query, rather than each execution). But just wanted to confirm if I understood it correctly.

We might need to add new "phases" to describe what we're seeing with pausable portals. For example, on each re-execution we'll update timing for PlannerStartExecStmt, effectively discarding the time spent on previous executions. For PlannerStartLogicalPlan it might easy to fix (like move that call into makeExecPlan), but it's probably not worth the effort for now.


pkg/sql/conn_executor_exec.go line 313 at r40 (raw file):

	var sp *tracing.Span
	if !isPausablePortal || !portal.pauseInfo.execStmtInOpenStateCleanup.isComplete {
		ctx, sp = tracing.EnsureChildSpan(ctx, ex.server.cfg.AmbientCtx.Tracer, "sql query")

I wonder whether we want to store all four contexts that we derive in execStmtInOpenState on the first execution and then overwrite ctx in scope to the appropriate one at each point (this will make it so that we ignore ctx argument of re-executions of execStmtInOpenState completely).

We derive the new contexts in four places:

  • for sql query tracing span. The span is already captured.
  • withStatement for sentry reporting
  • with cancellation function for cancelQuery. We already capture that function.
  • in instrumentationHelper.Setup. We already capture stuff about the helper.

It seems like we should store the "stack" of four contexts for each of those stages and update ctx in scope accordingly at each step. What do you think?


pkg/sql/conn_executor_exec.go line 374 at r40 (raw file):

	var txnDoneAfterFunc chan struct{}

	ctx, cancelQuery := contextutil.WithCancel(ctx)

nit: this will result in deriving separate context and cancellation function on every execution even though these things won't actually be used. How about doing something like this

var cancelQuery context.CancelFunc

addActiveQuery := func() {
	ctx, cancelQuery = contextutil.WithCancel(ctx)
	ex.incrementStartedStmtCounter(ast)
	func(st *txnState) {
		st.mu.Lock()
		defer st.mu.Unlock()
		st.mu.stmtCount++
	}(&ex.state)
	ex.addActiveQuery(parserStmt, pinfo, queryID, cancelQuery)
}

...

// store ctx and cancelQuery in pauseInfo

Since addActiveQuery is called only once, then we'll derive the cancellation function only once too.


pkg/sql/conn_executor_exec.go line 503 at r40 (raw file):

	stmtTS := ex.server.cfg.Clock.PhysicalTime()
	ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes)
	ex.resetPlanner(ctx, p, ex.state.mu.txn, stmtTS)

I'm concerned about this part. For some of our execution components (namely planNodes) we capture planner in runParams. We are reusing the same planner object, embedded in connExecutor, for the same connection, resetting the planner in-between different portals. runParams.planner is a pointer, so it'll be mutated when the first execution of the pausable portal is stopped.

It's hard to know upfront what - if anything - will be broken (once the planner is reset) on the second and consecutive executions. The bullet-proof solution is to "restore" the state of the planner as if during the first execution. However, that might be annoying to implement. Perhaps simply doing ex.resetPlanner here is sufficient for most things. Perhaps we'll need to audit all fields in planner and restore only those that might be used during the execution.

As the first action I suggest you try some queries that use planNode infrastructure. In particular, it powers the virtual tables, so consider running some queries on pg_catalog and information_schema virtual tables. Also, it might be good to try some queries with WITH RECURSIVE CTEs (take a look at logictest/testdata/with file for inspiration). Maybe we should disallow WITH clauses initially.


pkg/sql/conn_executor_exec.go line 801 at r40 (raw file):

	// don't return any event unless an error happens.

	// For a portal (prepared stmt), since handleAOST() is called in when

nit: s/in when/when/g.


pkg/sql/conn_executor_exec.go line 858 at r40 (raw file):

		p.pausablePortal = portal
	}
	p.cancelChecker.Reset(ctx)

I think here we want the context for which we stored cancelQuery func. See my comment above about contexts.


pkg/sql/conn_executor_exec.go line 872 at r40 (raw file):

	var stmtThresholdSpan *tracing.Span
	alreadyRecording := ex.transitionCtx.sessionTracing.Enabled()
	stmtTraceThreshold := TraceStmtThreshold.Get(&ex.planner.execCfg.Settings.SV)

I think stmt threshold tracing is broken (i.e. each re-execution will get a fresh budget and we won't include details about the previous executions, only about the last one). This can wait, but it deserves an issue.


pkg/sql/conn_executor_exec.go line 885 at r40 (raw file):

	enforceHomeRegion := p.EnforceHomeRegion()
	_, isSelectStmt := stmt.AST.(*tree.Select)
	if enforceHomeRegion && ex.state.mu.txn.IsOpen() && isSelectStmt {

This might be broken for multiple active portals. It doesn't seem important, but probably an issue to track that it might be broken is worth it.

@ZhouXing19
Copy link
Collaborator Author

pkg/sql/conn_executor_exec.go line 503 at r40 (raw file):

As the first action I suggest you try some queries that use planNode infrastructure. In particular, it powers the virtual tables, so consider running some queries on pg_catalog and information_schema virtual tables. Also, it might be good to try some queries with WITH RECURSIVE CTEs

Yeah it's buggy when vtables and WITH RECURSIVE are involved. I'm still investigating but would like to share the test result as well: https://gist.github.com/ZhouXing19/4e807de7308cbdc5d4a50a6ead8e8ef8

craig bot pushed a commit that referenced this pull request Mar 22, 2023
98671: streamingccl: use fingerprinting in more tests r=dt a=stevendanna

This uses fingerprinting in more tests to avoid starting the destination tenant before cutting over.

Epic: none

Release note: None

98914: sqlsmith: add DELETE FROM ... USING and UPDATE ... FROM support r=yuzefovich a=yuzefovich

This commit makes it so that the sqlsmith can now generate statements of the form `DELETE FROM ... USING` and `UPDATE ... FROM`. We toss a coin every time before deciding to add another table (meaning in 50% cases these forms are not used, in 25% we have 1 extra table, etc). It also adjusts the generation of the RETURNING clause for DELETE and UPDATE to be able to pick from any of the table sources.

Fixes: #98910.

Release note: None

98917: sql: add prepared_statements_cache_size setting r=yuzefovich a=michae2

Add a new circular doubly-linked list of prepared statements to `prepStmtNamespace` which tracks the least-recently-used prepared statement. When new setting `prepared_statements_cache_size` is set, use this LRU list to automatically deallocate prepared statements.

Fixes: #97866

Epic: None

Release note (sql change): Add a new `prepared_statements_cache_size` setting which, when set to a non-zero number of bytes, causes the least-recently-used prepared statements to be automatically deallocated when prepared statement memory usage goes above the cache size. This setting can be used to avoid prepared statement leaks from long-lived connections which never `DEALLOCATE` prepared statements.

99155: sql/logictest: disable stats collection for system tables earlier r=rytaft a=rytaft

This commit updates the logictests to disable stats collection for system tables before the test cluster is started. This avoids a race condition where stats might be collected on system tables before collection is disabled with `SET CLUSTER SETTING`.

This should prevent flakes for tests that show `EXPLAIN` output for queries over system tables.

Fixes #99118

Release note: None

99173: sql: enable resumption of a flow for pausable portals r=yuzefovich a=ZhouXing19

This PR is part of the implementation of multiple active portals. (Extracted from #96358)

We now introduce a `Resume()` method for flow, and when a pausable portal is being re-executed, rather than generating a new flow, we resume the persisted flow to continue the previous execution.

---
### sql: add telemetry MultipleActivePortalCounter

This commit added a telemetry counter `MultipleActivePortalCounter` that would
be incremented each time the cluster setting
`sql.pgwire.multiple_active_portals.enabled` is set to true

---
### sql: add Resume method for flowinfra.Flow and execinfra.Processor

For pausable portals, each execution needs to resume the processor with new
output receiver. We don't need to restart the processors, and this `Resume()`
step can be called many times after `Run()` is called.

----
### sql: reuse flow for pausable portal
To execute portals in an interleaving manner, we need to persist the flow and
queryID so that we can _continue_ fetching the result when we come back to the same
portal.

We now introduce `pauseInfo` field in `sql.PreparedPortal` that stores this
metadata. It's set during the first-time execution of an engine, and all
following executions will reuse the flow and the queryID. This also implies that
these resources should not be cleaned up with the end of each execution.
Implementation for the clean-up steps is included in the next commit.

Also, in this commit we hang a `*PreparedPortal` to the planner, and how it is
set can be seen in the next commit as well.

Epic: CRDB-17622

Release note: None


Co-authored-by: Steven Danna <danna@cockroachlabs.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
Co-authored-by: Michael Erickson <michae2@cockroachlabs.com>
Co-authored-by: Rebecca Taft <becca@cockroachlabs.com>
Co-authored-by: Jane Xing <zhouxing@uchicago.edu>
@ZhouXing19
Copy link
Collaborator Author

pkg/sql/conn_executor_exec.go line 503 at r40 (raw file):
I updated the tests, and it seems that, at least with the simple statements I had with pg_catalog and information_schema, the current implementation gives the correct results (though I hit span used after finish again).

The one with WITH RECURSIVE did fail, and I'm stuck on further debugging. The error is due to a nil pointer when resuming a portal. The first-time execution is fine. Debugger told me is c.batch == nil here.

Perhaps we'll need to audit all fields in planner and restore only those that might be used during the execution.

I agree, the hard part is to determine what should be retained and what should be renewed ><

I'll move on to fix other comments first, and come back to this.

@ZhouXing19
Copy link
Collaborator Author

pkg/sql/conn_executor_exec.go line 503 at r40 (raw file):

Previously, ZhouXing19 (Jane Xing) wrote…

I updated the tests, and it seems that, at least with the simple statements I had with pg_catalog and information_schema, the current implementation gives the correct results (though I hit span used after finish again).

The one with WITH RECURSIVE did fail, and I'm stuck on further debugging. The error is due to a nil pointer when resuming a portal. The first-time execution is fine. Debugger told me is c.batch == nil here.

Perhaps we'll need to audit all fields in planner and restore only those that might be used during the execution.

I agree, the hard part is to determine what should be retained and what should be renewed ><

I'll move on to fix other comments first, and come back to this.

I forgot to paste the link to the updated test: https://gist.github.com/ZhouXing19/4e807de7308cbdc5d4a50a6ead8e8ef8

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rafiss and @ZhouXing19)


pkg/sql/conn_executor_exec.go line 503 at r40 (raw file):

Previously, ZhouXing19 (Jane Xing) wrote…

I forgot to paste the link to the updated test: https://gist.github.com/ZhouXing19/4e807de7308cbdc5d4a50a6ead8e8ef8

I haven't had a chance to take a look at this PR today, but I set up some time tomorrow for us to pair on debugging this.

@ZhouXing19 ZhouXing19 force-pushed the multiple-active-portal-0124 branch 2 times, most recently from a5210b4 to 587c203 Compare March 23, 2023 17:57
Copy link
Collaborator Author

@ZhouXing19 ZhouXing19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rafiss and @yuzefovich)


pkg/sql/conn_executor.go line 3509 at r40 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Why did this change? It'd be good to leave a comment in the code for why this difference exists.

Done.


pkg/sql/conn_executor_exec.go line 632 at r29 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

We could make ihToFinish to be a pointer, right? Effectively, we'd have something like

ihToFinish := ih
if isPausablePortal {
  ihToFinish = &portal.pauseInfo.ihWrapper.ih
}
...

Done.


pkg/sql/conn_executor_exec.go line 1413 at r29 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

We might need to add new "phases" to describe what we're seeing with pausable portals. For example, on each re-execution we'll update timing for PlannerStartExecStmt, effectively discarding the time spent on previous executions. For PlannerStartLogicalPlan it might easy to fix (like move that call into makeExecPlan), but it's probably not worth the effort for now.

Thanks! I made issue #99410 to track this.


pkg/sql/conn_executor_exec.go line 1500 at r29 (raw file):

Previously, rafiss (Rafi Shamim) wrote…

could the TODO have more details that says what is wrong right now?

Added comment for #99410


pkg/sql/conn_executor_exec.go line 1530 at r29 (raw file):

Previously, rafiss (Rafi Shamim) wrote…

could the TODO have more details that says what is wrong right now?

Added comment for #99410


pkg/sql/conn_executor_exec.go line 1564 at r29 (raw file):

Previously, rafiss (Rafi Shamim) wrote…

i think this makes sense as-is; it just means the current execution is done, right?

This part has been removed by 4d835ec doesn't seems to be a concern now.


pkg/sql/conn_executor_exec.go line 313 at r40 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I wonder whether we want to store all four contexts that we derive in execStmtInOpenState on the first execution and then overwrite ctx in scope to the appropriate one at each point (this will make it so that we ignore ctx argument of re-executions of execStmtInOpenState completely).

We derive the new contexts in four places:

  • for sql query tracing span. The span is already captured.
  • withStatement for sentry reporting
  • with cancellation function for cancelQuery. We already capture that function.
  • in instrumentationHelper.Setup. We already capture stuff about the helper.

It seems like we should store the "stack" of four contexts for each of those stages and update ctx in scope accordingly at each step. What do you think?

That makes sense, I made the changes. I combined the first 2 as there are no other usages of ctx between them.


pkg/sql/conn_executor_exec.go line 374 at r40 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: this will result in deriving separate context and cancellation function on every execution even though these things won't actually be used. How about doing something like this

var cancelQuery context.CancelFunc

addActiveQuery := func() {
	ctx, cancelQuery = contextutil.WithCancel(ctx)
	ex.incrementStartedStmtCounter(ast)
	func(st *txnState) {
		st.mu.Lock()
		defer st.mu.Unlock()
		st.mu.stmtCount++
	}(&ex.state)
	ex.addActiveQuery(parserStmt, pinfo, queryID, cancelQuery)
}

...

// store ctx and cancelQuery in pauseInfo

Since addActiveQuery is called only once, then we'll derive the cancellation function only once too.

Done.


pkg/sql/conn_executor_exec.go line 858 at r40 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I think here we want the context for which we stored cancelQuery func. See my comment above about contexts.

Originally here it's using the ctx from ih.Setup(). Not sure how


pkg/sql/conn_executor_exec.go line 872 at r40 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I think stmt threshold tracing is broken (i.e. each re-execution will get a fresh budget and we won't include details about the previous executions, only about the last one). This can wait, but it deserves an issue.

Done: #99404


pkg/sql/conn_executor_exec.go line 885 at r40 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

This might be broken for multiple active portals. It doesn't seem important, but probably an issue to track that it might be broken is worth it.

Done: #99408

@ZhouXing19
Copy link
Collaborator Author

pkg/sql/conn_executor_exec.go line 858 at r40 (raw file):

Previously, ZhouXing19 (Jane Xing) wrote…

Originally here it's using the ctx from ih.Setup(). Not sure how

(I made the changes accordingly. Sorry didn't complete the sentence) I wonder why previously we're not closing the ctx bound to cancelQuery (say cancalCtx) but the one from ih.Setup() (say ihCtx). Is it because we know that ihCtx is the children ctx of cancelCtx, so when the latter is cancelled, we know that ihCtx will be closed as well, so it's basically the same for a cancelChecker? But isn't this the same for a pausable portal as well?

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're pretty close now. I think I responded to all your previous comments and I left some more. Perhaps it'll be good now to open up a separate PR to make it easier to review.

Reviewed 2 of 26 files at r43, 15 of 16 files at r45, 9 of 9 files at r46, 7 of 7 files at r47, 1 of 1 files at r48, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rafiss and @ZhouXing19)


pkg/sql/conn_executor.go line 2201 at r46 (raw file):

			)
			if portal.pauseInfo != nil {
				portal.pauseInfo.curRes = stmtRes

nit: do we need to store it? I think we will always have the correct stmt res in scope.


pkg/sql/conn_executor_exec.go line 1523 at r15 (raw file):

Previously, ZhouXing19 (Jane Xing) wrote…

I feel a bit stuck when modifying this part. I think this should be added to the cleanup stack. However, when this gets called back, we should use the instrumentation helper of the original query.
I tried ppInfo.ihWrapper.ih.CopyTo(&planner.instrumentation) here, but got use span after finish again.
My gut feeling is that, as you may have mentioned, the observability of pausable portals are pretty messed up -- I think we may need more time to think about how to reuse span and instrumentation helpers.

As we did during pair programming, it would probably be good to do something like planner.instrumentation = ppInfo.ihWrapper.ih in the pausable portal case, and I think everything else should just work (i.e. it might not be necessary to make any changes to populateQueryLevelStatsAndRegions function).


pkg/sql/conn_executor_exec.go line 1413 at r29 (raw file):

Previously, ZhouXing19 (Jane Xing) wrote…

Thanks! I made issue #99410 to track this.

sessionTracing might also be a bit broken, maybe include it into #99410 or create a separate issue.


pkg/sql/conn_executor_exec.go line 503 at r40 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I haven't had a chance to take a look at this PR today, but I set up some time tomorrow for us to pair on debugging this.

Ok, I'm less concerned about this now that we have figured the bug with runPlanInsidePlan and confirmed that some basic queries using virtual tables work. I think we can leave this code as is given that the feature is in the preview.

However, it's still an unknown to me what (if anything) will be broken due to resetting the planner between re-executions. Consider leaving a TODO to explore this.

One approach that might be more bullet-proof would be to embed planner object into the pause info struct, and then use that planner instance whenever we're operating with the pausable portals. pauseInfo.planner would be modified ("initialized") on the first execution of the portal, but then we will not call resetPlanner on it. This way, with every re-execution we will let the resumed flow to modify our planner as the flow needs, and then we'll keep the state of the planner unchanged, until the portal is resumed again. This approach will also let us remove the ihWrapper I think.


pkg/sql/conn_executor_exec.go line 858 at r40 (raw file):

Previously, ZhouXing19 (Jane Xing) wrote…

(I made the changes accordingly. Sorry didn't complete the sentence) I wonder why previously we're not closing the ctx bound to cancelQuery (say cancalCtx) but the one from ih.Setup() (say ihCtx). Is it because we know that ihCtx is the children ctx of cancelCtx, so when the latter is cancelled, we know that ihCtx will be closed as well, so it's basically the same for a cancelChecker? But isn't this the same for a pausable portal as well?

I think we can actually remove p.cancelChecker.Reset calls here completely - this is already done in resetPlanner, so the calls here are redundant.

My comment about possibly using the incorrect ctx was before your changes to preserve the context stack. I think now ctx is the correct context, so my previous comment is out of date. Still, it's probably better to just remove Reset calls here.


pkg/sql/conn_executor_exec.go line 300 at r46 (raw file):

		// This is the first defer, so it will always be called after any cleanup
		// func being added to the stack from the defers below.
		if portal != nil && portal.isPausable() && !portal.pauseInfo.execStmtInOpenStateCleanup.isComplete {

nit: can use isPausablePortal here.


pkg/sql/conn_executor_exec.go line 447 at r46 (raw file):

			// For pausable portals, we retain the query but update the result for
			// each execution. When the query context is cancelled and we're in the
			// middle of an portal execution, push the error to the current result.

nit: as I mentioned above, I think res is the current result, no?


pkg/sql/conn_executor_exec.go line 464 at r46 (raw file):

				}
				errToPush := cancelchecker.QueryCanceledError
				// For pausable portal, we can arrive this after encounter a timeout

nit: s/can arrive this after encounter/can arriver here after encountering/g.


pkg/sql/conn_executor_exec.go line 472 at r46 (raw file):

				}
				resToPushErr.SetError(errToPush)
				retPayload = eventNonRetriableErrPayload{err: cancelchecker.QueryCanceledError}

Should we use errToPush here?


pkg/sql/conn_executor_exec.go line 1624 at r46 (raw file):

	// Record the statement summary. This also closes the plan if the
	// plan has not been closed earlier.
	stmtFingerprintID = ex.recordStatementSummary(

As I mentioned earlier, this needs to be deferred and only done once. We also need to make sure planner here has the correct instrumentationHelper info.


pkg/sql/conn_io.go line 273 at r47 (raw file):

	internalArgs []tree.Datum

	// isInternal is set to ture when the bound stmt is from an internal executor.

nit: s/ture/true/g.


pkg/sql/instrumentation.go line 829 at r46 (raw file):

// CopyTo is to make a copy of the original instrumentation helper.
func (ih *instrumentationHelper) CopyTo(dst *instrumentationHelper) {

nit: I think this is no longer needed.


pkg/sql/sem/tree/stmt.go line 132 at r2 (raw file):

Previously, ZhouXing19 (Jane Xing) wrote…

BTW why do we choose to allow read-only queries? Is it because with writes we wouldn't be able to commit the writes until the portal is closed, increasing the contention?

yes, it seems more complicated to deal with the level of isolation between portals when writes are involved.
For example, I tried the UPDATE ... RETURNING stmt with Postgres:

send
Query {"String": "BEGIN; CREATE TABLE t1 (x INT); CREATE TABLE t2 (x INT); INSERT INTO t1 VALUES (1), (1), (1); INSERT INTO t2 VALUES (2), (2), (2);"}
Parse {"Name": "qq1", "Query": "UPDATE t1 SET x = 10 RETURNING x"}
Bind {"DestinationPortal": "p1", "PreparedStatement": "qq1"}
Parse {"Name": "qq2", "Query": "SELECT * FROM t1"}
Bind {"DestinationPortal": "p2", "PreparedStatement": "qq2"}
Parse {"Name": "qq3", "Query": "UPDATE t2 SET x = 11"}
Bind {"DestinationPortal": "p3", "PreparedStatement": "qq3"}
Parse {"Name": "qq4", "Query": "SELECT * FROM t2"}
Bind {"DestinationPortal": "p4", "PreparedStatement": "qq4"}
Execute {"Portal": "p1", "MaxRows": 2}
Execute {"Portal": "p2", "MaxRows": 1}
Execute {"Portal": "p3", "MaxRows": 2}
Execute {"Portal": "p4", "MaxRows": 3}
Execute {"Portal": "p1", "MaxRows": 2}
Parse {"Name": "qq5", "Query": "SELECT * FROM t1"}
Bind {"DestinationPortal": "p5", "PreparedStatement": "qq5"}
Execute {"Portal": "p5", "MaxRows": 2}
Sync
----


until
ReadyForQuery
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"BEGIN"}
{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
{"Type":"CommandComplete","CommandTag":"INSERT 0 3"}
{"Type":"CommandComplete","CommandTag":"INSERT 0 3"}
{"Type":"ReadyForQuery","TxStatus":"T"}
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"10"}]}
{"Type":"DataRow","Values":[{"text":"10"}]}
{"Type":"PortalSuspended"}
{"Type":"DataRow","Values":[{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"CommandComplete","CommandTag":"UPDATE 3"}
{"Type":"DataRow","Values":[{"text":"2"}]}
{"Type":"DataRow","Values":[{"text":"2"}]}
{"Type":"DataRow","Values":[{"text":"2"}]}
{"Type":"PortalSuspended"}
{"Type":"DataRow","Values":[{"text":"10"}]}
{"Type":"CommandComplete","CommandTag":"UPDATE 1"}
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"10"}]}
{"Type":"DataRow","Values":[{"text":"10"}]}
{"Type":"PortalSuspended"}
{"Type":"ReadyForQuery","TxStatus":"T"}

So seems that the UPDATE ... RETURNING portal can be paused, and the actual write happens when all rows have returned, and that can override the UPDATE declared after it but finished before it.

I wasn't sure if our implementation for query execution supports this behavior already, so I tried running it against my implementation without the IsReadOnly gate, but got this error for enforcing the local-only plan:

panic: renderNode can't be run in local mode
...
*   | github.com/cockroachdb/cockroach/pkg/sql.(*renderNode).Next
*   | 	github.com/cockroachdb/cockroach/pkg/sql/render.go:85
*   | github.com/cockroachdb/cockroach/pkg/sql.(*updateNode).BatchedNext
*   | 	github.com/cockroachdb/cockroach/pkg/sql/update.go:164
*   | github.com/cockroachdb/cockroach/pkg/sql.(*serializeNode).Next
*   | 	github.com/cockroachdb/cockroach/pkg/sql/plan_batch.go:115
*   | github.com/cockroachdb/cockroach/pkg/sql.(*planNodeToRowSource).Next
*   | 	github.com/cockroachdb/cockroach/pkg/sql/plan_node_to_row_source.go:217
*   | github.com/cockroachdb/cockroach/pkg/sql/colexec.(*Columnarizer).Next
*   | 	github.com/cockroachdb/cockroach/pkg/sql/colexec/columnarizer.go:239
*   | github.com/cockroachdb/cockroach/pkg/sql/colexec.(*invariantsChecker).Next
*   | 	github.com/cockroachdb/cockroach/pkg/sql/colexec/invariants_checker.go:93
*   | github.com/cockroachdb/cockroach/pkg/sql/colflow.(*batchInfoCollector).next
*   | 	github.com/cockroachdb/cockroach/pkg/sql/colflow/stats.go:118

Interesting. I think if we didn't panic, my guess is that we'd return rows 1, 1 and then would get a retryable error (i.e. the app would need to discard those two rows and restart the transaction). It seems reasonable to me to allow read-only queries initially.


pkg/sql/sem/tree/stmt.go line 127 at r47 (raw file):

// write data.
// This function is to gate the queries allowed for pausable portals.
// TODO(janexing): We should be more accurate about the stmt selection here.

I think we should get more eyes on this gate. I might have mentioned this earlier, but it'd be good to ask in sql-team slack channel what is the best way to check that tree.Statement is read-only. Also, it'd be good to add some tests to this method.

This is part of the implementation of multiple active portals.

To enable executing portals interleavingly, we need to persist certain resources
for it, and delay their clean-up till we close the portal. Also, these resources
don't need to be re-setup when we re-executing a portal.

Thus we stores these cleanup steps in the `__Cleanup` function stacks in
`portalPauseInfo`, and they are called when 1. sql txn is commited; 2. sql txn
is rollbacked; 3. conn executor is closed.

The cleanup functions should be called according to the original order of a normal
portal. Since a portal's execution is via the `execPortal() -> execStmtInOpenState
() -> dispatchToExecutionEngine() ->  flow.Run()` function flow, we categorized the
cleanup functions accordingly into
4 "layers": `exhaustPortal`, `execStmtCleanup` `dispatchToExecEngCleanup` and
`flowCleanup`. The cleanup
is always LIFO, i.e. following the `flowCleanup -> dispatchToExecEngCleanup ->
execStmtCleanup -> exhaustPortal` order. Also, when there's error happens in each
layer, cleanup the current and proceeding layers. e.g. if we encounter an error in
`execStmtInOpenState()`, do `flowCleanup` and `dispatchToExecEngCleanup`
(proceeding) and then `execStmtCleanup` (current), and return the
error to `execPortal()`, where `exhaustPortal` will eventually be called.

We also pass as reference the PreparedPortal to the planner in
`execStmtInOpenState()`, so that the portal's flow can be set and reused.

Release note: None
This commits add the following restrictions for pausable portals:

1. Not an internal queries
2. Read-only queries
3. No sub-quereis or post-queries
4. Local plan only

This is because the current changes to the consumer-receiver model only consider
the local push-based case.

Release note: None
Copy link
Collaborator Author

@ZhouXing19 ZhouXing19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing!! I have addressed most of the comments and made #99663

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rafiss and @yuzefovich)


pkg/sql/conn_executor.go line 2201 at r46 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: do we need to store it? I think we will always have the correct stmt res in scope.

Removed.


pkg/sql/conn_executor_exec.go line 1523 at r15 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

As we did during pair programming, it would probably be good to do something like planner.instrumentation = ppInfo.ihWrapper.ih in the pausable portal case, and I think everything else should just work (i.e. it might not be necessary to make any changes to populateQueryLevelStatsAndRegions function).

I changed it to use a shallow copy of the pausable portal's planner. I feel it might be less error-prone. Does this make sense to you?


pkg/sql/conn_executor_exec.go line 1413 at r29 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

sessionTracing might also be a bit broken, maybe include it into #99410 or create a separate issue.

Updated #99410.


pkg/sql/conn_executor_exec.go line 503 at r40 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Ok, I'm less concerned about this now that we have figured the bug with runPlanInsidePlan and confirmed that some basic queries using virtual tables work. I think we can leave this code as is given that the feature is in the preview.

However, it's still an unknown to me what (if anything) will be broken due to resetting the planner between re-executions. Consider leaving a TODO to explore this.

One approach that might be more bullet-proof would be to embed planner object into the pause info struct, and then use that planner instance whenever we're operating with the pausable portals. pauseInfo.planner would be modified ("initialized") on the first execution of the portal, but then we will not call resetPlanner on it. This way, with every re-execution we will let the resumed flow to modify our planner as the flow needs, and then we'll keep the state of the planner unchanged, until the portal is resumed again. This approach will also let us remove the ihWrapper I think.

Done, added TODO and made #99625.


pkg/sql/conn_executor_exec.go line 858 at r40 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I think we can actually remove p.cancelChecker.Reset calls here completely - this is already done in resetPlanner, so the calls here are redundant.

My comment about possibly using the incorrect ctx was before your changes to preserve the context stack. I think now ctx is the correct context, so my previous comment is out of date. Still, it's probably better to just remove Reset calls here.

Done.


pkg/sql/conn_executor_exec.go line 447 at r46 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: as I mentioned above, I think res is the current result, no?

Removed


pkg/sql/conn_executor_exec.go line 472 at r46 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Should we use errToPush here?

Done.


pkg/sql/conn_executor_exec.go line 1624 at r46 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

As I mentioned earlier, this needs to be deferred and only done once. We also need to make sure planner here has the correct instrumentationHelper info.

Done.

@ZhouXing19
Copy link
Collaborator Author

Closing this in favor of #99663.

@ZhouXing19 ZhouXing19 closed this Mar 27, 2023
craig bot pushed a commit that referenced this pull request Apr 7, 2023
99663: sql: update connExecutor logic for pausable portals r=ZhouXing19 a=ZhouXing19

This PR replaces #96358 and is part of the initial implementation of multiple active portals.

----

This PR is to add limited support for multiple active portals. Now portals satisfying all following restrictions can be paused and resumed (i.e., with other queries interleaving it):

1. Not an internal query;
2. Read-only query;
3. No sub-queries or post-queries.

And such a portal will only have the statement executed with a _non-distributed_ plan. 

This feature is gated by a session variable `multiple_active_portals_enabled`. When it's set `true`, all portals that satisfy the restrictions above will automatically become "pausable" when being created via the pgwire `Bind` stmt. 

The core idea of this implementation is 
1. Add a `switchToAnotherPortal` status to the result-consumption state machine. When we receive an `ExecPortal` message for a different portal, we simply return the control to the connExecutor. (#99052)
2. Persist `flow` `queryID` `span` and `instrumentationHelper` for the portal, and reuse it when we re-execute a portal. This is to ensure we _continue_ the fetching rather than starting all over. (#99173)
3. To enable 2, we need to delay the clean-up of resources till we close the portal. For this we introduced the stacks of cleanup functions. (This PR)

Note that we kept the implementation of the original "un-pausable" portal, as we'd like to limit this new functionality only to a small set of statements. Eventually some of them should be replaced (e.g. the limitedCommandResult's lifecycle) with the new code. 

Also, we don't support distributed plan yet, as it involves much more complicated changes. See `Start with an entirely local plan` section in the [design doc](https://docs.google.com/document/d/1SpKTrTqc4AlGWBqBNgmyXfTweUUsrlqIaSkmaXpznA8/edit). Support for this will come as a follow-up.

Epic: CRDB-17622

Release note (sql change): initial support for multiple active portals. Now with session variable `multiple_active_portals_enabled` set to true,  portals satisfying all following restrictions can be executed in an interleaving manner:  1. Not an internal query; 2. Read-only query; 3. No sub-queries or post-queries. And such a portal will only have the statement executed with an entirely local plan. 





99947: ui: small fixes to DB Console charts shown for secondary tenants r=dhartunian a=abarganier

#97995 updated the
DB Console to filter out KV-specific charts from the metrics page
when viewing DB Console as a secondary application tenant.

The PR missed a couple small details. This patch cleans those
up with the following:

- Removes KV latency charts for app tenants
- Adds a single storage graph for app tenants showing livebytes
- Removes the "Capacity" chart on the Overview dashboard for app
  tenants

Release note: none

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-12100

NB: Please only review the final commit. 1st commit is being reviewed separately @ #99860

100188: changefeedccl: pubsub sink refactor to batching sink r=rickystewart a=samiskin

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-13237

This change is a followup to #99086 which moves the Pubsub sink to the batching sink framework.

The changes involve:
1. Moves the Pubsub code to match the `SinkClient` interface, moving to using the lower level v1 pubsub API that lets us publish batches manually
3. Removing the extra call to json.Marshal
4. Moving to using the `pstest` package for validating results in unit tests
5. Adding topic handling to the batching sink, where batches are created per-topic
6. Added a pubsub_sink_config since it can now handle Retry and Flush config settings
7. Added metrics even to the old pubsub for the purpose of comparing the two versions

At default settings, this resulted in a peak of 90k messages per second on a single node with throughput at 27.6% cpu usage, putting it at a similar level to kafka.

Running pubsub v2 across all of TPCC (nodes ran out of ranges at different speeds):
<img width="637" alt="Screenshot 2023-03-30 at 3 38 25 PM" src="https://user-images.githubusercontent.com/6236424/229863386-edaee27d-9762-4806-bab6-e18b8a6169d6.png">

Running pubsub v1 (barely visible, 2k messages per second) followed by v2 on tpcc.order_line (in v2 only 2 nodes ended up having ranges assigned to them):
<img width="642" alt="Screenshot 2023-04-04 at 12 53 45 PM" src="https://user-images.githubusercontent.com/6236424/229863507-1883ea45-d8ce-437b-9b9c-550afec68752.png">

In the following graphs from the cloud console, where v1 was ran followed by v2, you can see how the main reason v1 was slow was that it wasn't able to batch different keys together.
<img width="574" alt="Screenshot 2023-04-04 at 12 59 51 PM" src="https://user-images.githubusercontent.com/6236424/229864083-758c0814-d53c-447e-84c3-471cf5d56c44.png">

Publish requests remained the same despite way more messages in v2
<img width="1150" alt="Screenshot 2023-04-04 at 1 46 51 PM" src="https://user-images.githubusercontent.com/6236424/229875314-6e07177e-62c4-4c15-b13f-f75e8143e011.png">



Release note (performance improvement): pubsub sink changefeeds can now support higher throughputs by enabling the changefeed.new_pubsub_sink_enabled cluster setting.

100620: pkg/server: move DataDistribution to systemAdminServer r=dhartunian a=abarganier

The DataDistribution endpoint reports replica counts by database and table. When it was built, it operated off the assumption that a range would only ever contain a single table's data within.

Now that we have coalesced ranges, a single range can span multiple tables. Unfortunately, the DataDistribution endpoint does not take this fact into account, meaning it reports garbled and inaccurate data, unless the `spanconfig.storage_coalesce_adjacent.enabled` setting is set to false (see #98820).

For secondary tenants, ranges are *always* coalesced, so this endpoint in its current state could never report meaningful data for a tenant.

Given all of this, we have decided to make this endpoint only available for the system tenant. This patch
accomplishes this by moving the endpoint away from the adminServer and into the systemAdminServer, making it effectively unimplemented for secondary tenants.

Release note: none

Informs: #97942

Co-authored-by: Jane Xing <zhouxing@uchicago.edu>
Co-authored-by: Alex Barganier <abarganier@cockroachlabs.com>
Co-authored-by: Shiranka Miskin <shiranka.miskin@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants