Skip to content

Commit

Permalink
sql: split out planning and execution monitors
Browse files Browse the repository at this point in the history
This commit cleans up a long standing tech debt of how the "planning"
and "execution" monitors were intertwined which made it so that we had
to put some hacks in place to close things in the correct order. In
particular, previously `DistSQLPlanner.Run` was returning a "cleanup"
function that had to be deferred in some cases (e.g. on the main query
path we had to defer until after the postqueries finished). That cleanup
function closed the plan before cleaning up the execution flow. It had
to happen in this particular order since the memory monitor of the
latter was used to derive some memory accounts for the planNode tree
(part of the former). We also didn't want to return flow objects
explicitly, so we returned this cleanup closure. We also had some hacks
about "ignoring the closure" of the plans for sub- and postqueries.

This commit refactors this whole mess into a much more clear structure.
The main idea is that we now store the "flow" memory monitor explicitly
on the `FlowContext`, and most of the things (like processors and
operators) use it. However, in some cases we need to have access to
a memory monitor that outlives the flow (this is what was previously
intertwined), so we introduce a method to `eval.Planner` interface to
return the "planner" monitor (which is actually a parent of the "flow"
monitor). There are two main users (builtin functions like aggregates
and row containers) that - philosophically speaking - should be using
the flow monitor, but there is no easy way to plumb it there nor are
those things closed on the flow cleanup, so these things access the
"planner" monitor.

This refactor allowed us to remove several cleanup closures, so now the
flow is cleaned up automatically at the end of `Run` method. Several
non-main query paths (like apply joins) that use `PlanAndRun`
infrastructure now explicitly close their plans (previously this was
hidden inside the now-removed cleanup closure).

Release note: None
  • Loading branch information
yuzefovich committed Oct 2, 2022
1 parent 6fcfd27 commit 961e66f
Show file tree
Hide file tree
Showing 135 changed files with 386 additions and 212 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,6 @@ func distBackup(
defer close(progCh)
// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)()
dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)
return rowResultWriter.Err()
}
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func distRestore(

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)()
dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)
return rowResultWriter.Err()
})

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/split_and_scatter_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func TestSplitAndScatterProcessor(t *testing.T) {
Stopper: tc.Stopper(),
},
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
DiskMonitor: testDiskMonitor,
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func startDistChangefeed(
// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
// p is the physical plan, recv is the distsqlreceiver
dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, finishedSetupFn)()
dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, finishedSetupFn)
return resultRows.Err()
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func newChangeAggregatorProcessor(
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
memMonitor := execinfra.NewMonitor(ctx, flowCtx.EvalCtx.Mon, "changeagg-mem")
memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, "changeagg-mem")
ca := &changeAggregator{
flowCtx: flowCtx,
spec: spec,
Expand Down Expand Up @@ -822,7 +822,7 @@ func newChangeFrontierProcessor(
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
memMonitor := execinfra.NewMonitor(ctx, flowCtx.EvalCtx.Mon, "changefntr-mem")
memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, "changefntr-mem")
sf, err := makeSchemaChangeFrontier(hlc.Timestamp{}, spec.TrackedSpans...)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) {
BulkSenderLimiter: limit.MakeConcurrentRequestLimiter("test", math.MaxInt),
},
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
DiskMonitor: testDiskMonitor,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,6 @@ func distStreamIngest(

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)()
dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)
return rw.Err()
}
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ func getStreamIngestionProcessor(
BulkSenderLimiter: limit.MakeConcurrentRequestLimiter("test", math.MaxInt),
},
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
DiskMonitor: testDiskMonitor,
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,6 @@ func streamPartition(
spec: spec,
subscribedSpans: subscribedSpans,
execCfg: execCfg,
mon: evalCtx.Mon,
mon: evalCtx.Planner.Mon(),
}, nil
}
7 changes: 4 additions & 3 deletions pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (a *applyJoinNode) Next(params runParams) (bool, error) {
}

// clearRightRows clears rightRows and resets rightRowsIterator. This function
// must be called before reusing rightRows and rightRowIterator.
// must be called before reusing rightRows and rightRowsIterator.
func (a *applyJoinNode) clearRightRows(params runParams) error {
if err := a.run.rightRows.Clear(params.ctx); err != nil {
return err
Expand Down Expand Up @@ -265,6 +265,7 @@ func (a *applyJoinNode) runNextRightSideIteration(params runParams, leftRow tree
func runPlanInsidePlan(
ctx context.Context, params runParams, plan *planComponents, resultWriter rowResultWriter,
) error {
defer plan.close(ctx)
recv := MakeDistSQLReceiver(
ctx, resultWriter, tree.Rows,
params.ExecCfg().RangeDescriptorCache,
Expand Down Expand Up @@ -299,7 +300,7 @@ func runPlanInsidePlan(
// Create a separate memory account for the results of the subqueries.
// Note that we intentionally defer the closure of the account until we
// return from this method (after the main query is executed).
subqueryResultMemAcc := params.p.EvalContext().Mon.MakeBoundAccount()
subqueryResultMemAcc := params.p.Mon().MakeBoundAccount()
defer subqueryResultMemAcc.Close(ctx)
if !params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.PlanAndRunSubqueries(
ctx,
Expand Down Expand Up @@ -332,7 +333,7 @@ func runPlanInsidePlan(

params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.PlanAndRun(
ctx, evalCtx, planCtx, params.p.Txn(), plan.main, recv,
)()
)
return resultWriter.Err()
}

Expand Down
14 changes: 6 additions & 8 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,7 @@ func (sc *SchemaChanger) distIndexBackfill(
nil, /* txn - the processors manage their own transactions */
p, recv, &evalCtxCopy,
nil, /* finishedSetupFn */
)()
)
return cbw.Err()
})

Expand Down Expand Up @@ -1351,7 +1351,7 @@ func (sc *SchemaChanger) distColumnBackfill(
nil, /* txn - the processors manage their own transactions */
plan, recv, &evalCtx,
nil, /* finishedSetupFn */
)()
)
return cbw.Err()
}); err != nil {
return err
Expand Down Expand Up @@ -2673,9 +2673,8 @@ func columnBackfillInTxn(
return nil
}
var columnBackfillerMon *mon.BytesMonitor
// This is the planner's memory monitor.
if evalCtx.Mon != nil {
columnBackfillerMon = execinfra.NewMonitor(ctx, evalCtx.Mon, "local-column-backfill-mon")
if evalCtx.Planner.Mon() != nil {
columnBackfillerMon = execinfra.NewMonitor(ctx, evalCtx.Planner.Mon(), "local-column-backfill-mon")
}

rowMetrics := execCfg.GetRowMetrics(evalCtx.SessionData().Internal)
Expand Down Expand Up @@ -2717,9 +2716,8 @@ func indexBackfillInTxn(
traceKV bool,
) error {
var indexBackfillerMon *mon.BytesMonitor
// This is the planner's memory monitor.
if evalCtx.Mon != nil {
indexBackfillerMon = execinfra.NewMonitor(ctx, evalCtx.Mon, "local-index-backfill-mon")
if evalCtx.Planner.Mon() != nil {
indexBackfillerMon = execinfra.NewMonitor(ctx, evalCtx.Planner.Mon(), "local-index-backfill-mon")
}

var backfiller backfill.IndexBackfiller
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/buffer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *rowContainerHelper) initMonitors(
) {
distSQLCfg := &evalContext.DistSQLPlanner.distSQLSrv.ServerConfig
c.memMonitor = execinfra.NewLimitedMonitorNoFlowCtx(
ctx, evalContext.Mon, distSQLCfg, evalContext.SessionData(),
ctx, evalContext.Planner.Mon(), distSQLCfg, evalContext.SessionData(),
redact.Sprintf("%s-limited", opName),
)
c.diskMonitor = execinfra.NewMonitor(
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/aggregators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ func benchmarkAggregateFunction(
ctx := context.Background()
evalCtx := eval.MakeTestingEvalContext(cluster.MakeTestingClusterSettings())
defer evalCtx.Stop(ctx)
aggMemAcc := evalCtx.Mon.MakeBoundAccount()
aggMemAcc := evalCtx.TestingMon.MakeBoundAccount()
defer aggMemAcc.Close(ctx)
evalCtx.SingleDatumAggMemAccount = &aggMemAcc
const bytesFixedLength = 8
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/and_or_projection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func TestAndOrOps(t *testing.T) {
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand Down Expand Up @@ -228,6 +229,7 @@ func benchmarkLogicalProjOp(
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/builtin_funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestBasicBuiltinFunctions(t *testing.T) {
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand Down Expand Up @@ -129,6 +130,7 @@ func benchmarkBuiltinFunctions(b *testing.B, useSelectionVector bool, hasNulls b
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/case_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestCaseOp(t *testing.T) {
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand Down Expand Up @@ -106,6 +107,7 @@ func TestCaseOpRandomized(t *testing.T) {
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/coalesce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestCoalesceBasic(t *testing.T) {
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand Down Expand Up @@ -107,6 +108,7 @@ func TestCoalesceRandomized(t *testing.T) {
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/colbuilder/execplan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,15 @@ func TestNewColOperatorExpectedTypeSchema(t *testing.T) {
txn := kv.NewTxn(ctx, s.DB(), s.NodeID())
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Txn: txn,
NodeID: evalCtx.NodeID,
}

streamingMemAcc := evalCtx.Mon.MakeBoundAccount()
streamingMemAcc := evalCtx.TestingMon.MakeBoundAccount()
defer streamingMemAcc.Close(ctx)

desc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "test", "t")
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/colexec/colexecargs/monitor_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (r *MonitorRegistry) GetMonitors() []*mon.BytesMonitor {
// NewStreamingMemAccount creates a new memory account bound to the monitor in
// flowCtx.
func (r *MonitorRegistry) NewStreamingMemAccount(flowCtx *execinfra.FlowCtx) *mon.BoundAccount {
streamingMemAccount := flowCtx.EvalCtx.Mon.MakeBoundAccount()
streamingMemAccount := flowCtx.Mon.MakeBoundAccount()
r.accounts = append(r.accounts, &streamingMemAccount)
return &streamingMemAccount
}
Expand All @@ -63,7 +63,7 @@ func (r *MonitorRegistry) CreateMemAccountForSpillStrategy(
) (*mon.BoundAccount, redact.RedactableString) {
monitorName := r.getMemMonitorName(opName, processorID, "limited" /* suffix */)
bufferingOpMemMonitor := execinfra.NewLimitedMonitor(
ctx, flowCtx.EvalCtx.Mon, flowCtx, monitorName,
ctx, flowCtx.Mon, flowCtx, monitorName,
)
r.monitors = append(r.monitors, bufferingOpMemMonitor)
bufferingMemAccount := bufferingOpMemMonitor.MakeBoundAccount()
Expand All @@ -90,8 +90,8 @@ func (r *MonitorRegistry) CreateMemAccountForSpillStrategyWithLimit(
}
}
monitorName := r.getMemMonitorName(opName, processorID, "limited" /* suffix */)
bufferingOpMemMonitor := mon.NewMonitorInheritWithLimit(monitorName, limit, flowCtx.EvalCtx.Mon)
bufferingOpMemMonitor.StartNoReserved(ctx, flowCtx.EvalCtx.Mon)
bufferingOpMemMonitor := mon.NewMonitorInheritWithLimit(monitorName, limit, flowCtx.Mon)
bufferingOpMemMonitor.StartNoReserved(ctx, flowCtx.Mon)
r.monitors = append(r.monitors, bufferingOpMemMonitor)
bufferingMemAccount := bufferingOpMemMonitor.MakeBoundAccount()
r.accounts = append(r.accounts, &bufferingMemAccount)
Expand Down Expand Up @@ -163,7 +163,7 @@ func (r *MonitorRegistry) createUnlimitedMemAccounts(
numAccounts int,
) (*mon.BytesMonitor, []*mon.BoundAccount) {
bufferingOpUnlimitedMemMonitor := execinfra.NewMonitor(
ctx, flowCtx.EvalCtx.Mon, monitorName,
ctx, flowCtx.Mon, monitorName,
)
r.monitors = append(r.monitors, bufferingOpUnlimitedMemMonitor)
oldLen := len(r.accounts)
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecbase/const_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestConst(t *testing.T) {
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand Down Expand Up @@ -70,6 +71,7 @@ func TestConstNull(t *testing.T) {
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecbase/ordinality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestOrdinality(t *testing.T) {
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand Down Expand Up @@ -83,6 +84,7 @@ func BenchmarkOrdinality(b *testing.B) {
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecdisk/external_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestExternalSortMemoryAccounting(t *testing.T) {
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecproj/default_cmp_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestDefaultCmpProjOps(t *testing.T) {
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand Down Expand Up @@ -127,6 +128,7 @@ func BenchmarkDefaultCmpProjOp(b *testing.B) {
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/colexec/colexecproj/projection_ops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestProjPlusInt64Int64ConstOp(t *testing.T) {
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand All @@ -68,6 +69,7 @@ func TestProjPlusInt64Int64Op(t *testing.T) {
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand All @@ -90,6 +92,7 @@ func TestProjDivFloat64Float64Op(t *testing.T) {
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand All @@ -115,6 +118,7 @@ func TestRandomComparisons(t *testing.T) {
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand Down Expand Up @@ -292,6 +296,7 @@ func BenchmarkProjOp(b *testing.B) {
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Expand Down
Loading

0 comments on commit 961e66f

Please sign in to comment.