Skip to content

Commit

Permalink
sql: reuse flow for pausable portal
Browse files Browse the repository at this point in the history
To execute portals in an interleaving manner, we need to persist the flow and
queryID so that we can _continue_ fetching the result when we come back to the same
portal.

We now introduce `pauseInfo` field in `sql.PreparedPortal` that stores this
metadata. It's set during the first-time execution of an engine, and all
following executions will reuse the flow and the queryID. This also implies that
these resources should not be cleaned up with the end of each execution.
Implementation for the clean-up steps is included in the next commit.

Also, in this commit we hang a `*PreparedPortal` to the planner, and how it is
set can be seen in the next commit as well.

Release note: None
  • Loading branch information
ZhouXing19 committed Mar 21, 2023
1 parent 66acf83 commit a54d8c2
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 46 deletions.
72 changes: 39 additions & 33 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1734,42 +1734,48 @@ func (ex *connExecutor) execWithDistSQLEngine(
}
defer recv.Release()

evalCtx := planner.ExtendedEvalContext()
planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner,
planner.txn, distribute)
planCtx.stmtType = recv.stmtType
// Skip the diagram generation since on this "main" query path we can get it
// via the statement bundle.
planCtx.skipDistSQLDiagramGeneration = true
if ex.server.cfg.TestingKnobs.TestingSaveFlows != nil {
planCtx.saveFlows = ex.server.cfg.TestingKnobs.TestingSaveFlows(planner.stmt.SQL)
} else if planner.instrumentation.ShouldSaveFlows() {
planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery)
}
planCtx.associateNodeWithComponents = planner.instrumentation.getAssociateNodeWithComponentsFn()
planCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats()

var evalCtxFactory func(usedConcurrently bool) *extendedEvalContext
if len(planner.curPlan.subqueryPlans) != 0 ||
len(planner.curPlan.cascades) != 0 ||
len(planner.curPlan.checkPlans) != 0 {
var serialEvalCtx extendedEvalContext
ex.initEvalCtx(ctx, &serialEvalCtx, planner)
evalCtxFactory = func(usedConcurrently bool) *extendedEvalContext {
// Reuse the same object if this factory is not used concurrently.
factoryEvalCtx := &serialEvalCtx
if usedConcurrently {
factoryEvalCtx = &extendedEvalContext{}
ex.initEvalCtx(ctx, factoryEvalCtx, planner)
var err error

if planner.hasFlowForPausablePortal() {
err = planner.resumeFlowForPausablePortal(recv)
} else {
evalCtx := planner.ExtendedEvalContext()
planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner,
planner.txn, distribute)
planCtx.stmtType = recv.stmtType
// Skip the diagram generation since on this "main" query path we can get it
// via the statement bundle.
planCtx.skipDistSQLDiagramGeneration = true
if ex.server.cfg.TestingKnobs.TestingSaveFlows != nil {
planCtx.saveFlows = ex.server.cfg.TestingKnobs.TestingSaveFlows(planner.stmt.SQL)
} else if planner.instrumentation.ShouldSaveFlows() {
planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery)
}
planCtx.associateNodeWithComponents = planner.instrumentation.getAssociateNodeWithComponentsFn()
planCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats()

var evalCtxFactory func(usedConcurrently bool) *extendedEvalContext
if len(planner.curPlan.subqueryPlans) != 0 ||
len(planner.curPlan.cascades) != 0 ||
len(planner.curPlan.checkPlans) != 0 {
var serialEvalCtx extendedEvalContext
ex.initEvalCtx(ctx, &serialEvalCtx, planner)
evalCtxFactory = func(usedConcurrently bool) *extendedEvalContext {
// Reuse the same object if this factory is not used concurrently.
factoryEvalCtx := &serialEvalCtx
if usedConcurrently {
factoryEvalCtx = &extendedEvalContext{}
ex.initEvalCtx(ctx, factoryEvalCtx, planner)
}
ex.resetEvalCtx(factoryEvalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp)
factoryEvalCtx.Placeholders = &planner.semaCtx.Placeholders
factoryEvalCtx.Annotations = &planner.semaCtx.Annotations
factoryEvalCtx.SessionID = planner.ExtendedEvalContext().SessionID
return factoryEvalCtx
}
ex.resetEvalCtx(factoryEvalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp)
factoryEvalCtx.Placeholders = &planner.semaCtx.Placeholders
factoryEvalCtx.Annotations = &planner.semaCtx.Annotations
factoryEvalCtx.SessionID = planner.ExtendedEvalContext().SessionID
return factoryEvalCtx
}
err = ex.server.cfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, planner, recv, evalCtxFactory)
}
err := ex.server.cfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, planner, recv, evalCtxFactory)
return recv.stats, err
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,15 @@ func (p *PlanningCtx) IsLocal() bool {
return p.isLocal
}

// getPortalPauseInfo returns the portal pause info if the current planner is
// for a pausable portal. Otherwise, returns nil.
func (p *PlanningCtx) getPortalPauseInfo() *portalPauseInfo {
if p.planner != nil && p.planner.pausablePortal != nil && p.planner.pausablePortal.pauseInfo != nil {
return p.planner.pausablePortal.pauseInfo
}
return nil
}

// getDefaultSaveFlowsFunc returns the default function used to save physical
// plans and their diagrams. The returned function is **not** concurrency-safe.
func (p *PlanningCtx) getDefaultSaveFlowsFunc(
Expand Down
34 changes: 26 additions & 8 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,14 +833,31 @@ func (dsp *DistSQLPlanner) Run(
if planCtx.planner != nil {
statementSQL = planCtx.planner.stmt.StmtNoConstants
}
ctx, flow, err := dsp.setupFlows(
ctx, evalCtx, planCtx, leafInputState, flows, recv, localState, statementSQL,
)
// Make sure that the local flow is always cleaned up if it was created.

var flow flowinfra.Flow
var err error
if i := planCtx.getPortalPauseInfo(); i != nil && i.flow != nil {
flow = i.flow
} else {
ctx, flow, err = dsp.setupFlows(
ctx, evalCtx, planCtx, leafInputState, flows, recv, localState, statementSQL,
)
if i != nil {
i.flow = flow
i.outputTypes = plan.GetResultTypes()
}
}

if flow != nil {
defer func() {
flow.Cleanup(ctx)
}()
// Make sure that the local flow is always cleaned up if it was created.
// If the flow is not for retained portal, we clean the flow up here.
// Otherwise, we delay the clean up via portalPauseInfo.flowCleanup until
// the portal is closed.
if planCtx.getPortalPauseInfo() == nil {
defer func() {
flow.Cleanup(ctx)
}()
}
}
if err != nil {
recv.SetError(err)
Expand All @@ -863,7 +880,8 @@ func (dsp *DistSQLPlanner) Run(
return
}

flow.Run(ctx, false /* noWait */)
noWait := planCtx.getPortalPauseInfo() != nil
flow.Run(ctx, noWait)
}

// DistSQLReceiver is an execinfra.RowReceiver and execinfra.BatchReceiver that
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/physicalplan/aggregator_funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func runTestFlow(
if err != nil {
t.Fatal(err)
}
flow.Run(ctx)
flow.Run(ctx, false /* noWait */)
flow.Cleanup(ctx)

if !rowBuf.ProducerClosed() {
Expand Down
23 changes: 23 additions & 0 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
"github.com/lib/pq/oid"
Expand Down Expand Up @@ -201,6 +202,9 @@ type planner struct {
// home region is being enforced.
StmtNoConstantsWithHomeRegionEnforced string

// pausablePortal is set when the query is from a pausable portal.
pausablePortal *PreparedPortal

instrumentation instrumentationHelper

// Contexts for different stages of planning and execution.
Expand Down Expand Up @@ -260,6 +264,24 @@ type planner struct {
evalCatalogBuiltins evalcatalog.Builtins
}

// hasFlowForPausablePortal returns true if the planner is for re-executing a
// portal. We reuse the flow stored in p.pausablePortal.pauseInfo.
func (p *planner) hasFlowForPausablePortal() bool {
return p.pausablePortal != nil && p.pausablePortal.pauseInfo != nil && p.pausablePortal.pauseInfo.flow != nil
}

// resumeFlowForPausablePortal is called when re-executing a portal. We reuse
// the flow with a new receiver, without re-generating the physical plan.
func (p *planner) resumeFlowForPausablePortal(recv *DistSQLReceiver) error {
if !p.hasFlowForPausablePortal() {
return errors.AssertionFailedf("no flow found for pausable portal")
}
recv.discardRows = p.instrumentation.ShouldDiscardRows()
recv.outputTypes = p.pausablePortal.pauseInfo.outputTypes
p.pausablePortal.pauseInfo.flow.Resume(recv)
return recv.commErr
}

func (evalCtx *extendedEvalContext) setSessionID(sessionID clusterunique.ID) {
evalCtx.SessionID = sessionID
}
Expand Down Expand Up @@ -848,6 +870,7 @@ func (p *planner) resetPlanner(
p.evalCatalogBuiltins.Init(p.execCfg.Codec, txn, p.Descriptors())
p.skipDescriptorCache = false
p.typeResolutionDbID = descpb.InvalidID
p.pausablePortal = nil
}

// GetReplicationStreamManager returns a ReplicationStreamManager.
Expand Down
120 changes: 116 additions & 4 deletions pkg/sql/prepared_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ import (
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
"github.com/cockroachdb/cockroach/pkg/sql/querycache"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

// PreparedStatementOrigin is an enum representing the source of where
Expand Down Expand Up @@ -155,6 +159,9 @@ type PreparedPortal struct {
// a portal.
// See comments for PortalPausablity for more details.
portalPausablity PortalPausablity

// pauseInfo is the saved info needed for a pausable portal.
pauseInfo *portalPauseInfo
}

// makePreparedPortal creates a new PreparedPortal.
Expand All @@ -173,10 +180,11 @@ func (ex *connExecutor) makePreparedPortal(
Qargs: qargs,
OutFormats: outFormats,
}
// TODO(janexing): we added this line to avoid the unused lint error.
// Will remove it once the whole functionality of multple active portals
// is merged.
_ = enableMultipleActivePortals.Get(&ex.server.cfg.Settings.SV)

if enableMultipleActivePortals.Get(&ex.server.cfg.Settings.SV) {
portal.pauseInfo = &portalPauseInfo{queryStats: &topLevelQueryStats{}}
portal.portalPausablity = PausablePortal
}
return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
}

Expand Down Expand Up @@ -204,3 +212,107 @@ func (p *PreparedPortal) close(
func (p *PreparedPortal) size(portalName string) int64 {
return int64(uintptr(len(portalName)) + unsafe.Sizeof(p))
}

func (p *PreparedPortal) isPausable() bool {
return p.pauseInfo != nil
}

// cleanupFuncStack stores cleanup functions for a portal. The clean-up
// functions are added during the first-time execution of a portal. When the
// first-time execution is finished, we mark isComplete to true.
type cleanupFuncStack struct {
stack []namedFunc
isComplete bool
}

func (n *cleanupFuncStack) appendFunc(f namedFunc) {
n.stack = append(n.stack, f)
}

func (n *cleanupFuncStack) run() {
for i := 0; i < len(n.stack); i++ {
n.stack[i].f()
}
*n = cleanupFuncStack{}
}

// namedFunc is function with name, which makes the debugging easier. It is
// used just for clean up functions of a pausable portal.
type namedFunc struct {
fName string
f func()
}

// instrumentationHelperWrapper wraps the instrumentation helper.
// We need to maintain it for a paused portal.
type instrumentationHelperWrapper struct {
ih instrumentationHelper
}

// portalPauseInfo stores info that enables the pause of a portal. After pausing
// the portal, execute any other statement, and come back to re-execute it or
// close it.
type portalPauseInfo struct {
// curRes is the command result of the current execution. For each execution
// we update this field. We need this when we encounter an error during
// execution, so that the error is correctly transmitted.
curRes RestrictedCommandResult
// sp stores the tracing span of the underlying statement. It is closed when
// the portal finishes.
sp *tracing.Span
// outputTypes are the types of the result columns produced by the physical plan.
// We need this as when re-executing the portal, we are reusing the flow
// with the new receiver, but not re-generating the physical plan.
outputTypes []*types.T
// We need to store the flow for a portal so that when re-executing it, we
// continue from the previous execution. It lives along with the portal, and
// will be cleaned-up when the portal is closed.
flow flowinfra.Flow
// queryID stores the id of the query that this portal bound to. When we re-execute
// an existing portal, we should use the same query id.
queryID clusterunique.ID
// ihWrapper stores the instrumentation helper that should be reused for
// each execution of the portal.
ihWrapper *instrumentationHelperWrapper
// cancelQueryFunc will be called to cancel the context of the query when
// the portal is closed.
cancelQueryFunc context.CancelFunc
// cancelQueryCtx is the context to be canceled when closing the portal.
cancelQueryCtx context.Context
// planTop collects the properties of the current plan being prepared.
// We reuse it when re-executing the portal.
// TODO(yuzefovich): consider refactoring distSQLFlowInfos from planTop to
// avoid storing the planTop here.
planTop planTop
// queryStats stores statistics on query execution. It is incremented for
// each execution of the portal.
queryStats *topLevelQueryStats
// The following 4 stacks store functions to call when close the portal.
// They should be called in this order:
// flowCleanup -> dispatchToExecEngCleanup -> execStmtInOpenStateCleanup ->
// exhaustPortal.
// Each stack is defined in the closure of its corresponding function.
// When encounter an error in any of these function, we run cleanup of this
// layer and its children layers and propagate the error to the parent layer.
// For example, when encounter an error in execStmtInOpenStateCleanup(),
// run flowCleanup -> dispatchToExecEngCleanup -> execStmtInOpenStateCleanup
// when exiting connExecutor.execStmtInOpenState(), and finally run
// exhaustPortal in connExecutor.execPortal().
exhaustPortal cleanupFuncStack
execStmtInOpenStateCleanup cleanupFuncStack
dispatchToExecEngCleanup cleanupFuncStack
flowCleanup cleanupFuncStack
}

// cleanupAll is to run all the cleanup layers.
func (pm *portalPauseInfo) cleanupAll() {
pm.flowCleanup.run()
pm.dispatchToExecEngCleanup.run()
pm.execStmtInOpenStateCleanup.run()
pm.exhaustPortal.run()
}

// isQueryIDSet returns true if the query id for the portal is set.
func (pm *portalPauseInfo) isQueryIDSet() bool {
return !pm.queryID.Equal(clusterunique.ID{}.Uint128)
}

0 comments on commit a54d8c2

Please sign in to comment.