diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index 2e989efa4a6c..a93e691b3a07 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -209,6 +209,6 @@ func distBackup( defer close(progCh) // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx - dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)() + dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */) return rowResultWriter.Err() } diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 3b4880170a8a..73e9bc6dc004 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -278,7 +278,7 @@ func distRestore( // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx - dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)() + dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */) return rowResultWriter.Err() }) diff --git a/pkg/ccl/backupccl/split_and_scatter_processor_test.go b/pkg/ccl/backupccl/split_and_scatter_processor_test.go index 8c459cd8a42e..6f51ccf1df38 100644 --- a/pkg/ccl/backupccl/split_and_scatter_processor_test.go +++ b/pkg/ccl/backupccl/split_and_scatter_processor_test.go @@ -237,6 +237,7 @@ func TestSplitAndScatterProcessor(t *testing.T) { Stopper: tc.Stopper(), }, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, DiskMonitor: testDiskMonitor, } diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index 3952e4729121..fcc431e48c1d 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -300,7 +300,7 @@ func startDistChangefeed( // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx // p is the physical plan, recv is the distsqlreceiver - dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, finishedSetupFn)() + dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, finishedSetupFn) return resultRows.Err() } diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 85fc7a163964..97fc22fc3fa4 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -133,7 +133,7 @@ func newChangeAggregatorProcessor( post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver, ) (execinfra.Processor, error) { - memMonitor := execinfra.NewMonitor(ctx, flowCtx.EvalCtx.Mon, "changeagg-mem") + memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, "changeagg-mem") ca := &changeAggregator{ flowCtx: flowCtx, spec: spec, @@ -822,7 +822,7 @@ func newChangeFrontierProcessor( post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver, ) (execinfra.Processor, error) { - memMonitor := execinfra.NewMonitor(ctx, flowCtx.EvalCtx.Mon, "changefntr-mem") + memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, "changefntr-mem") sf, err := makeSchemaChangeFrontier(hlc.Timestamp{}, spec.TrackedSpans...) if err != nil { return nil, err diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go index 18f10045f088..2bdde050b156 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go @@ -77,6 +77,7 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) { BulkSenderLimiter: limit.MakeConcurrentRequestLimiter("test", math.MaxInt), }, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, DiskMonitor: testDiskMonitor, } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go index 35eac67df053..83ef0b2885b8 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go @@ -154,6 +154,6 @@ func distStreamIngest( // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx - dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)() + dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */) return rw.Err() } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index efa8a3b03d5e..a04cb9f25eee 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -602,6 +602,7 @@ func getStreamIngestionProcessor( BulkSenderLimiter: limit.MakeConcurrentRequestLimiter("test", math.MaxInt), }, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, DiskMonitor: testDiskMonitor, } diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index 568d6c0528cb..a6aaf392b069 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -543,6 +543,6 @@ func streamPartition( spec: spec, subscribedSpans: subscribedSpans, execCfg: execCfg, - mon: evalCtx.Mon, + mon: evalCtx.Planner.Mon(), }, nil } diff --git a/pkg/sql/apply_join.go b/pkg/sql/apply_join.go index f7d76f070bd4..bec0225d8db4 100644 --- a/pkg/sql/apply_join.go +++ b/pkg/sql/apply_join.go @@ -228,7 +228,7 @@ func (a *applyJoinNode) Next(params runParams) (bool, error) { } // clearRightRows clears rightRows and resets rightRowsIterator. This function -// must be called before reusing rightRows and rightRowIterator. +// must be called before reusing rightRows and rightRowsIterator. func (a *applyJoinNode) clearRightRows(params runParams) error { if err := a.run.rightRows.Clear(params.ctx); err != nil { return err @@ -265,6 +265,7 @@ func (a *applyJoinNode) runNextRightSideIteration(params runParams, leftRow tree func runPlanInsidePlan( ctx context.Context, params runParams, plan *planComponents, resultWriter rowResultWriter, ) error { + defer plan.close(ctx) recv := MakeDistSQLReceiver( ctx, resultWriter, tree.Rows, params.ExecCfg().RangeDescriptorCache, @@ -299,7 +300,7 @@ func runPlanInsidePlan( // Create a separate memory account for the results of the subqueries. // Note that we intentionally defer the closure of the account until we // return from this method (after the main query is executed). - subqueryResultMemAcc := params.p.EvalContext().Mon.MakeBoundAccount() + subqueryResultMemAcc := params.p.Mon().MakeBoundAccount() defer subqueryResultMemAcc.Close(ctx) if !params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.PlanAndRunSubqueries( ctx, @@ -332,7 +333,7 @@ func runPlanInsidePlan( params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.PlanAndRun( ctx, evalCtx, planCtx, params.p.Txn(), plan.main, recv, - )() + ) return resultWriter.Err() } diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 1c806d6e9bc4..4c7195a2a517 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -1212,7 +1212,7 @@ func (sc *SchemaChanger) distIndexBackfill( nil, /* txn - the processors manage their own transactions */ p, recv, &evalCtxCopy, nil, /* finishedSetupFn */ - )() + ) return cbw.Err() }) @@ -1351,7 +1351,7 @@ func (sc *SchemaChanger) distColumnBackfill( nil, /* txn - the processors manage their own transactions */ plan, recv, &evalCtx, nil, /* finishedSetupFn */ - )() + ) return cbw.Err() }); err != nil { return err @@ -2673,9 +2673,8 @@ func columnBackfillInTxn( return nil } var columnBackfillerMon *mon.BytesMonitor - // This is the planner's memory monitor. - if evalCtx.Mon != nil { - columnBackfillerMon = execinfra.NewMonitor(ctx, evalCtx.Mon, "local-column-backfill-mon") + if evalCtx.Planner.Mon() != nil { + columnBackfillerMon = execinfra.NewMonitor(ctx, evalCtx.Planner.Mon(), "local-column-backfill-mon") } rowMetrics := execCfg.GetRowMetrics(evalCtx.SessionData().Internal) @@ -2717,9 +2716,8 @@ func indexBackfillInTxn( traceKV bool, ) error { var indexBackfillerMon *mon.BytesMonitor - // This is the planner's memory monitor. - if evalCtx.Mon != nil { - indexBackfillerMon = execinfra.NewMonitor(ctx, evalCtx.Mon, "local-index-backfill-mon") + if evalCtx.Planner.Mon() != nil { + indexBackfillerMon = execinfra.NewMonitor(ctx, evalCtx.Planner.Mon(), "local-index-backfill-mon") } var backfiller backfill.IndexBackfiller diff --git a/pkg/sql/buffer_util.go b/pkg/sql/buffer_util.go index 0c9212c76323..d8f29d347d1d 100644 --- a/pkg/sql/buffer_util.go +++ b/pkg/sql/buffer_util.go @@ -82,7 +82,7 @@ func (c *rowContainerHelper) initMonitors( ) { distSQLCfg := &evalContext.DistSQLPlanner.distSQLSrv.ServerConfig c.memMonitor = execinfra.NewLimitedMonitorNoFlowCtx( - ctx, evalContext.Mon, distSQLCfg, evalContext.SessionData(), + ctx, evalContext.Planner.Mon(), distSQLCfg, evalContext.SessionData(), redact.Sprintf("%s-limited", opName), ) c.diskMonitor = execinfra.NewMonitor( diff --git a/pkg/sql/colexec/aggregators_test.go b/pkg/sql/colexec/aggregators_test.go index 7d7d816e07e1..0c3cb90fb68c 100644 --- a/pkg/sql/colexec/aggregators_test.go +++ b/pkg/sql/colexec/aggregators_test.go @@ -1003,7 +1003,7 @@ func benchmarkAggregateFunction( ctx := context.Background() evalCtx := eval.MakeTestingEvalContext(cluster.MakeTestingClusterSettings()) defer evalCtx.Stop(ctx) - aggMemAcc := evalCtx.Mon.MakeBoundAccount() + aggMemAcc := evalCtx.TestingMon.MakeBoundAccount() defer aggMemAcc.Close(ctx) evalCtx.SingleDatumAggMemAccount = &aggMemAcc const bytesFixedLength = 8 diff --git a/pkg/sql/colexec/and_or_projection_test.go b/pkg/sql/colexec/and_or_projection_test.go index 198a1c1d2e78..456d82a6a4c0 100644 --- a/pkg/sql/colexec/and_or_projection_test.go +++ b/pkg/sql/colexec/and_or_projection_test.go @@ -167,6 +167,7 @@ func TestAndOrOps(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -228,6 +229,7 @@ func benchmarkLogicalProjOp( defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/builtin_funcs_test.go b/pkg/sql/colexec/builtin_funcs_test.go index 15b1db776935..40f8ef584b63 100644 --- a/pkg/sql/colexec/builtin_funcs_test.go +++ b/pkg/sql/colexec/builtin_funcs_test.go @@ -44,6 +44,7 @@ func TestBasicBuiltinFunctions(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -129,6 +130,7 @@ func benchmarkBuiltinFunctions(b *testing.B, useSelectionVector bool, hasNulls b defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/case_test.go b/pkg/sql/colexec/case_test.go index 7cf1b6afe320..6b26124d2317 100644 --- a/pkg/sql/colexec/case_test.go +++ b/pkg/sql/colexec/case_test.go @@ -40,6 +40,7 @@ func TestCaseOp(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -106,6 +107,7 @@ func TestCaseOpRandomized(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/coalesce_test.go b/pkg/sql/colexec/coalesce_test.go index 1626942d0a83..bae0a6e260cf 100644 --- a/pkg/sql/colexec/coalesce_test.go +++ b/pkg/sql/colexec/coalesce_test.go @@ -40,6 +40,7 @@ func TestCoalesceBasic(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -107,6 +108,7 @@ func TestCoalesceRandomized(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/colbuilder/execplan_test.go b/pkg/sql/colexec/colbuilder/execplan_test.go index abc7b4e49225..aaf9e9042539 100644 --- a/pkg/sql/colexec/colbuilder/execplan_test.go +++ b/pkg/sql/colexec/colbuilder/execplan_test.go @@ -77,6 +77,7 @@ func TestNewColOperatorExpectedTypeSchema(t *testing.T) { txn := kv.NewTxn(ctx, s.DB(), s.NodeID()) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -84,7 +85,7 @@ func TestNewColOperatorExpectedTypeSchema(t *testing.T) { NodeID: evalCtx.NodeID, } - streamingMemAcc := evalCtx.Mon.MakeBoundAccount() + streamingMemAcc := evalCtx.TestingMon.MakeBoundAccount() defer streamingMemAcc.Close(ctx) desc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "test", "t") diff --git a/pkg/sql/colexec/colexecargs/monitor_registry.go b/pkg/sql/colexec/colexecargs/monitor_registry.go index 9b15f0494f9d..14533c52aeaf 100644 --- a/pkg/sql/colexec/colexecargs/monitor_registry.go +++ b/pkg/sql/colexec/colexecargs/monitor_registry.go @@ -36,7 +36,7 @@ func (r *MonitorRegistry) GetMonitors() []*mon.BytesMonitor { // NewStreamingMemAccount creates a new memory account bound to the monitor in // flowCtx. func (r *MonitorRegistry) NewStreamingMemAccount(flowCtx *execinfra.FlowCtx) *mon.BoundAccount { - streamingMemAccount := flowCtx.EvalCtx.Mon.MakeBoundAccount() + streamingMemAccount := flowCtx.Mon.MakeBoundAccount() r.accounts = append(r.accounts, &streamingMemAccount) return &streamingMemAccount } @@ -63,7 +63,7 @@ func (r *MonitorRegistry) CreateMemAccountForSpillStrategy( ) (*mon.BoundAccount, redact.RedactableString) { monitorName := r.getMemMonitorName(opName, processorID, "limited" /* suffix */) bufferingOpMemMonitor := execinfra.NewLimitedMonitor( - ctx, flowCtx.EvalCtx.Mon, flowCtx, monitorName, + ctx, flowCtx.Mon, flowCtx, monitorName, ) r.monitors = append(r.monitors, bufferingOpMemMonitor) bufferingMemAccount := bufferingOpMemMonitor.MakeBoundAccount() @@ -90,8 +90,8 @@ func (r *MonitorRegistry) CreateMemAccountForSpillStrategyWithLimit( } } monitorName := r.getMemMonitorName(opName, processorID, "limited" /* suffix */) - bufferingOpMemMonitor := mon.NewMonitorInheritWithLimit(monitorName, limit, flowCtx.EvalCtx.Mon) - bufferingOpMemMonitor.StartNoReserved(ctx, flowCtx.EvalCtx.Mon) + bufferingOpMemMonitor := mon.NewMonitorInheritWithLimit(monitorName, limit, flowCtx.Mon) + bufferingOpMemMonitor.StartNoReserved(ctx, flowCtx.Mon) r.monitors = append(r.monitors, bufferingOpMemMonitor) bufferingMemAccount := bufferingOpMemMonitor.MakeBoundAccount() r.accounts = append(r.accounts, &bufferingMemAccount) @@ -163,7 +163,7 @@ func (r *MonitorRegistry) createUnlimitedMemAccounts( numAccounts int, ) (*mon.BytesMonitor, []*mon.BoundAccount) { bufferingOpUnlimitedMemMonitor := execinfra.NewMonitor( - ctx, flowCtx.EvalCtx.Mon, monitorName, + ctx, flowCtx.Mon, monitorName, ) r.monitors = append(r.monitors, bufferingOpUnlimitedMemMonitor) oldLen := len(r.accounts) diff --git a/pkg/sql/colexec/colexecbase/const_test.go b/pkg/sql/colexec/colexecbase/const_test.go index 8b3157ff7205..ca803db9204d 100644 --- a/pkg/sql/colexec/colexecbase/const_test.go +++ b/pkg/sql/colexec/colexecbase/const_test.go @@ -33,6 +33,7 @@ func TestConst(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -70,6 +71,7 @@ func TestConstNull(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/colexecbase/ordinality_test.go b/pkg/sql/colexec/colexecbase/ordinality_test.go index 90d81e8d4503..48bfebdf283e 100644 --- a/pkg/sql/colexec/colexecbase/ordinality_test.go +++ b/pkg/sql/colexec/colexecbase/ordinality_test.go @@ -37,6 +37,7 @@ func TestOrdinality(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -83,6 +84,7 @@ func BenchmarkOrdinality(b *testing.B) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/colexecdisk/external_sort_test.go b/pkg/sql/colexec/colexecdisk/external_sort_test.go index 30456f55f95e..9b0128afefb0 100644 --- a/pkg/sql/colexec/colexecdisk/external_sort_test.go +++ b/pkg/sql/colexec/colexecdisk/external_sort_test.go @@ -56,6 +56,7 @@ func TestExternalSortMemoryAccounting(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/colexecproj/default_cmp_op_test.go b/pkg/sql/colexec/colexecproj/default_cmp_op_test.go index 57487efea3fe..4dbc1cabb19f 100644 --- a/pkg/sql/colexec/colexecproj/default_cmp_op_test.go +++ b/pkg/sql/colexec/colexecproj/default_cmp_op_test.go @@ -36,6 +36,7 @@ func TestDefaultCmpProjOps(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -127,6 +128,7 @@ func BenchmarkDefaultCmpProjOp(b *testing.B) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/colexecproj/projection_ops_test.go b/pkg/sql/colexec/colexecproj/projection_ops_test.go index 35f5453313a1..9cfd90b9f053 100644 --- a/pkg/sql/colexec/colexecproj/projection_ops_test.go +++ b/pkg/sql/colexec/colexecproj/projection_ops_test.go @@ -46,6 +46,7 @@ func TestProjPlusInt64Int64ConstOp(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -68,6 +69,7 @@ func TestProjPlusInt64Int64Op(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -90,6 +92,7 @@ func TestProjDivFloat64Float64Op(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -115,6 +118,7 @@ func TestRandomComparisons(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -292,6 +296,7 @@ func BenchmarkProjOp(b *testing.B) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/colexecwindow/window_functions_test.go b/pkg/sql/colexec/colexecwindow/window_functions_test.go index ee42805c7e97..0f8bada7c8be 100644 --- a/pkg/sql/colexec/colexecwindow/window_functions_test.go +++ b/pkg/sql/colexec/colexecwindow/window_functions_test.go @@ -53,6 +53,7 @@ func TestWindowFunctions(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/columnarizer_test.go b/pkg/sql/colexec/columnarizer_test.go index cbad992aa0dd..1367acda838c 100644 --- a/pkg/sql/colexec/columnarizer_test.go +++ b/pkg/sql/colexec/columnarizer_test.go @@ -49,6 +49,7 @@ func TestColumnarizerResetsInternalBatch(t *testing.T) { flowCtx := &execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } c := NewBufferingColumnarizerForTests(testAllocator, flowCtx, 0, input) @@ -80,6 +81,7 @@ func TestColumnarizerDrainsAndClosesInput(t *testing.T) { flowCtx := &execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } for _, tc := range []struct { @@ -139,6 +141,7 @@ func BenchmarkColumnarize(b *testing.B) { flowCtx := &execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } b.SetBytes(int64(nRows * nCols * int(memsize.Int64))) diff --git a/pkg/sql/colexec/crossjoiner_test.go b/pkg/sql/colexec/crossjoiner_test.go index 8b5bd983deb9..1cf541393307 100644 --- a/pkg/sql/colexec/crossjoiner_test.go +++ b/pkg/sql/colexec/crossjoiner_test.go @@ -373,6 +373,7 @@ func TestCrossJoiner(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -417,6 +418,7 @@ func BenchmarkCrossJoiner(b *testing.B) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/default_agg_test.go b/pkg/sql/colexec/default_agg_test.go index c3864ad74df0..d91a271a9f3f 100644 --- a/pkg/sql/colexec/default_agg_test.go +++ b/pkg/sql/colexec/default_agg_test.go @@ -133,7 +133,7 @@ func TestDefaultAggregateFunc(t *testing.T) { evalCtx := eval.MakeTestingEvalContext(cluster.MakeTestingClusterSettings()) defer evalCtx.Stop(context.Background()) - aggMemAcc := evalCtx.Mon.MakeBoundAccount() + aggMemAcc := evalCtx.TestingMon.MakeBoundAccount() defer aggMemAcc.Close(context.Background()) evalCtx.SingleDatumAggMemAccount = &aggMemAcc semaCtx := tree.MakeSemaContext() diff --git a/pkg/sql/colexec/external_distinct_test.go b/pkg/sql/colexec/external_distinct_test.go index 48f968b026a3..dd63203c891d 100644 --- a/pkg/sql/colexec/external_distinct_test.go +++ b/pkg/sql/colexec/external_distinct_test.go @@ -45,6 +45,7 @@ func TestExternalDistinct(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -110,6 +111,7 @@ func TestExternalDistinctSpilling(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -264,6 +266,7 @@ func BenchmarkExternalDistinct(b *testing.B) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/external_hash_aggregator_test.go b/pkg/sql/colexec/external_hash_aggregator_test.go index 527bf37f45e7..9dd784a694fc 100644 --- a/pkg/sql/colexec/external_hash_aggregator_test.go +++ b/pkg/sql/colexec/external_hash_aggregator_test.go @@ -43,6 +43,7 @@ func TestExternalHashAggregator(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -171,6 +172,7 @@ func BenchmarkExternalHashAggregator(b *testing.B) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/external_hash_joiner_test.go b/pkg/sql/colexec/external_hash_joiner_test.go index 6c5dc68a249a..88e434b9bcf3 100644 --- a/pkg/sql/colexec/external_hash_joiner_test.go +++ b/pkg/sql/colexec/external_hash_joiner_test.go @@ -44,6 +44,7 @@ func TestExternalHashJoiner(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -119,6 +120,7 @@ func TestExternalHashJoinerFallbackToSortMergeJoin(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, TestingKnobs: execinfra.TestingKnobs{ @@ -198,6 +200,7 @@ func BenchmarkExternalHashJoiner(b *testing.B) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/external_sort_test.go b/pkg/sql/colexec/external_sort_test.go index 87a2e1905660..fc608c77f218 100644 --- a/pkg/sql/colexec/external_sort_test.go +++ b/pkg/sql/colexec/external_sort_test.go @@ -44,6 +44,7 @@ func TestExternalSort(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -126,6 +127,7 @@ func TestExternalSortRandomized(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -226,6 +228,7 @@ func BenchmarkExternalSort(b *testing.B) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/hashjoiner_test.go b/pkg/sql/colexec/hashjoiner_test.go index dc5bf14e826f..8fc5975caccf 100644 --- a/pkg/sql/colexec/hashjoiner_test.go +++ b/pkg/sql/colexec/hashjoiner_test.go @@ -1015,6 +1015,7 @@ func TestHashJoiner(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{Settings: st}, } var monitorRegistry colexecargs.MonitorRegistry @@ -1141,6 +1142,7 @@ func TestHashJoinerProjection(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/if_expr_test.go b/pkg/sql/colexec/if_expr_test.go index c29b0d7f4032..9fc4d75fb2bc 100644 --- a/pkg/sql/colexec/if_expr_test.go +++ b/pkg/sql/colexec/if_expr_test.go @@ -39,6 +39,7 @@ func TestIfExprBasic(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -103,6 +104,7 @@ func TestIfExprRandomized(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/is_null_ops_test.go b/pkg/sql/colexec/is_null_ops_test.go index 540d3cd729fc..a108cc3d3967 100644 --- a/pkg/sql/colexec/is_null_ops_test.go +++ b/pkg/sql/colexec/is_null_ops_test.go @@ -36,6 +36,7 @@ func TestIsNullProjOp(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -142,6 +143,7 @@ func TestIsNullSelOp(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/materializer_test.go b/pkg/sql/colexec/materializer_test.go index 8043af20fa85..d655f018fb9c 100644 --- a/pkg/sql/colexec/materializer_test.go +++ b/pkg/sql/colexec/materializer_test.go @@ -56,6 +56,7 @@ func TestColumnarizeMaterialize(t *testing.T) { flowCtx := &execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } c := NewBufferingColumnarizerForTests(testAllocator, flowCtx, 0, input) @@ -100,6 +101,7 @@ func BenchmarkMaterializer(b *testing.B) { flowCtx := &execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } rng, _ := randutil.NewTestRand() @@ -183,6 +185,7 @@ func TestMaterializerNextErrorAfterConsumerDone(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } m := NewMaterializer( @@ -224,6 +227,7 @@ func BenchmarkColumnarizeMaterialize(b *testing.B) { flowCtx := &execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } c := NewBufferingColumnarizerForTests(testAllocator, flowCtx, 0, input) diff --git a/pkg/sql/colexec/mergejoiner_test.go b/pkg/sql/colexec/mergejoiner_test.go index 2b453bab1fac..67833f42e7e6 100644 --- a/pkg/sql/colexec/mergejoiner_test.go +++ b/pkg/sql/colexec/mergejoiner_test.go @@ -1663,6 +1663,7 @@ func TestMergeJoiner(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/select_in_test.go b/pkg/sql/colexec/select_in_test.go index 5aefb818f508..809f16ec655e 100644 --- a/pkg/sql/colexec/select_in_test.go +++ b/pkg/sql/colexec/select_in_test.go @@ -164,6 +164,7 @@ func TestProjectInInt64(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/types_integration_test.go b/pkg/sql/colexec/types_integration_test.go index 9161cf7059ee..42f917e32562 100644 --- a/pkg/sql/colexec/types_integration_test.go +++ b/pkg/sql/colexec/types_integration_test.go @@ -51,6 +51,7 @@ func TestSQLTypesIntegration(t *testing.T) { defer diskMonitor.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/colexec/values_test.go b/pkg/sql/colexec/values_test.go index 1413bf117b90..27955d875008 100644 --- a/pkg/sql/colexec/values_test.go +++ b/pkg/sql/colexec/values_test.go @@ -136,6 +136,7 @@ func BenchmarkValues(b *testing.B) { flowCtx := execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } post := execinfrapb.PostProcessSpec{} diff --git a/pkg/sql/colflow/colbatch_scan_test.go b/pkg/sql/colflow/colbatch_scan_test.go index 944cf1431992..fb95dfb81295 100644 --- a/pkg/sql/colflow/colbatch_scan_test.go +++ b/pkg/sql/colflow/colbatch_scan_test.go @@ -66,6 +66,7 @@ func TestColBatchScanMeta(t *testing.T) { leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), leafInputState) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -160,6 +161,7 @@ func BenchmarkColBatchScan(b *testing.B) { flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{Settings: s.ClusterSettings()}, Txn: kv.NewTxn(ctx, s.DB(), s.NodeID()), NodeID: evalCtx.NodeID, diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 475b265910d3..c516e4d821f7 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -492,7 +492,7 @@ func (s *vectorizedFlowCreator) makeGetStatsFnForOutbox( result = append(result, &execinfrapb.ComponentStats{ Component: execinfrapb.FlowComponentID(originSQLInstanceID, flowCtx.ID), FlowStats: execinfrapb.FlowStats{ - MaxMemUsage: optional.MakeUint(uint64(flowCtx.EvalCtx.Mon.MaximumBytes())), + MaxMemUsage: optional.MakeUint(uint64(flowCtx.Mon.MaximumBytes())), MaxDiskUsage: optional.MakeUint(uint64(flowCtx.DiskMonitor.MaximumBytes())), }, }) diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index 38236cc0de3c..96e9a9e87252 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -154,6 +154,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { defer evalCtx.Stop(ctxLocal) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{Settings: st}, } rng, _ := randutil.NewTestRand() diff --git a/pkg/sql/colflow/vectorized_flow_space_test.go b/pkg/sql/colflow/vectorized_flow_space_test.go index da3382c5496a..ebe865aad305 100644 --- a/pkg/sql/colflow/vectorized_flow_space_test.go +++ b/pkg/sql/colflow/vectorized_flow_space_test.go @@ -45,6 +45,7 @@ func TestVectorizeInternalMemorySpaceError(t *testing.T) { }, DiskMonitor: testDiskMonitor, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } oneInput := []execinfrapb.InputSyncSpec{ @@ -121,6 +122,7 @@ func TestVectorizeAllocatorSpaceError(t *testing.T) { }, DiskMonitor: testDiskMonitor, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index 85d710aee968..8c21c5969902 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -231,6 +231,7 @@ func TestDrainOnlyInputDAG(t *testing.T) { FlowCtx: execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, NodeID: base.TestingIDContainer, }, } @@ -281,6 +282,7 @@ func TestVectorizedFlowTempDirectory(t *testing.T) { }, }, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, NodeID: base.TestingIDContainer, DiskMonitor: execinfra.NewTestDiskMonitor(ctx, st), }, diff --git a/pkg/sql/colflow/vectorized_meta_propagation_test.go b/pkg/sql/colflow/vectorized_meta_propagation_test.go index e1e1f0442fa1..24dd1a1aeb06 100644 --- a/pkg/sql/colflow/vectorized_meta_propagation_test.go +++ b/pkg/sql/colflow/vectorized_meta_propagation_test.go @@ -64,6 +64,7 @@ func TestVectorizedMetaPropagation(t *testing.T) { flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{Settings: cluster.MakeTestingClusterSettings()}, } diff --git a/pkg/sql/colflow/vectorized_panic_propagation_test.go b/pkg/sql/colflow/vectorized_panic_propagation_test.go index 722979ba9757..9ab24d10ca6d 100644 --- a/pkg/sql/colflow/vectorized_panic_propagation_test.go +++ b/pkg/sql/colflow/vectorized_panic_propagation_test.go @@ -43,6 +43,7 @@ func TestVectorizedInternalPanic(t *testing.T) { flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{Settings: cluster.MakeTestingClusterSettings()}, } @@ -79,6 +80,7 @@ func TestNonVectorizedPanicPropagation(t *testing.T) { flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{Settings: cluster.MakeTestingClusterSettings()}, } diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index a10c7cd3ce08..91ead9cc1415 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2815,7 +2815,6 @@ func (ex *connExecutor) resetEvalCtx(evalCtx *extendedEvalContext, txn *kv.Txn, evalCtx.IVarContainer = nil evalCtx.Context.Context = ex.Ctx() evalCtx.Txn = txn - evalCtx.Mon = ex.state.mon evalCtx.PrepareOnly = false evalCtx.SkipNormalize = false evalCtx.SchemaChangerState = ex.extraTxnState.schemaChangerState @@ -2881,7 +2880,7 @@ func (ex *connExecutor) initPlanner(ctx context.Context, p *planner) { func (ex *connExecutor) resetPlanner( ctx context.Context, p *planner, txn *kv.Txn, stmtTS time.Time, ) { - p.resetPlanner(ctx, txn, stmtTS, ex.sessionData()) + p.resetPlanner(ctx, txn, stmtTS, ex.sessionData(), ex.state.mon) ex.resetEvalCtx(&p.extendedEvalCtx, txn, stmtTS) } diff --git a/pkg/sql/copyshim.go b/pkg/sql/copyshim.go index cc833f9454e1..51d0b19b2bfb 100644 --- a/pkg/sql/copyshim.go +++ b/pkg/sql/copyshim.go @@ -91,13 +91,15 @@ func RunCopyFrom( return -1, err } + mon := execinfra.NewTestMemMonitor(ctx, execCfg.Settings) + // TODO(cucaroach): test open transaction and implicit txn, this will require // a real client side/over the wire copy implementation logictest can use. txnOpt := copyTxnOpt{txn: txn} txnOpt.resetPlanner = func(ctx context.Context, p *planner, txn *kv.Txn, txnTS time.Time, stmtTS time.Time) { p.cancelChecker.Reset(ctx) p.optPlanningCtx.init(p) - p.resetPlanner(ctx, txn, stmtTS, p.sessionDataMutatorIterator.sds.Top()) + p.resetPlanner(ctx, txn, stmtTS, p.sessionDataMutatorIterator.sds.Top(), mon) p.extendedEvalCtx.Context.Txn = txn } p, cleanup := newInternalPlanner("copytest", @@ -134,7 +136,6 @@ func RunCopyFrom( rd: bufio.NewReader(bytes.NewReader(buf)), } rows := 0 - mon := execinfra.NewTestMemMonitor(ctx, execCfg.Settings) c, err := newCopyMachine(ctx, conn, stmt.AST.(*tree.CopyFrom), p, txnOpt, mon, func(ctx context.Context, p *planner, res RestrictedCommandResult) error { err := dsp.ExecLocalAll(ctx, execCfg, p, res) diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index ebcdc2bf3982..85450466e364 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -4933,7 +4933,7 @@ CREATE TABLE crdb_internal.invalid_objects ( } descs := c.OrderedDescriptors() // Collect all marshaled job metadata and account for its memory usage. - acct := p.EvalContext().Mon.MakeBoundAccount() + acct := p.Mon().MakeBoundAccount() defer acct.Close(ctx) jmg, err := collectMarshaledJobMetadataMap(ctx, p, &acct, descs) if err != nil { @@ -5548,7 +5548,7 @@ CREATE TABLE crdb_internal.cluster_statement_statistics ( generator: func(ctx context.Context, p *planner, db catalog.DatabaseDescriptor, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { // TODO(azhng): we want to eventually implement memory accounting within the // RPC handlers. See #69032. - acc := p.extendedEvalCtx.Mon.MakeBoundAccount() + acc := p.Mon().MakeBoundAccount() defer acc.Close(ctx) // Perform RPC fanout. @@ -5779,7 +5779,7 @@ CREATE TABLE crdb_internal.cluster_transaction_statistics ( generator: func(ctx context.Context, p *planner, db catalog.DatabaseDescriptor, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { // TODO(azhng): we want to eventually implement memory accounting within the // RPC handlers. See #69032. - acc := p.extendedEvalCtx.Mon.MakeBoundAccount() + acc := p.Mon().MakeBoundAccount() defer acc.Close(ctx) // Perform RPC fanout. @@ -5987,7 +5987,7 @@ CREATE TABLE crdb_internal.transaction_contention_events ( } // Account for memory used by the RPC fanout. - acc := p.extendedEvalCtx.Mon.MakeBoundAccount() + acc := p.Mon().MakeBoundAccount() defer acc.Close(ctx) resp, err := p.extendedEvalCtx.SQLStatusServer.TransactionContentionEvents( diff --git a/pkg/sql/delete.go b/pkg/sql/delete.go index a273bf5682db..00134b08cd2f 100644 --- a/pkg/sql/delete.go +++ b/pkg/sql/delete.go @@ -70,7 +70,7 @@ func (d *deleteNode) startExec(params runParams) error { if d.run.rowsNeeded { d.run.td.rows = rowcontainer.NewRowContainer( - params.EvalContext().Mon.MakeBoundAccount(), + params.p.Mon().MakeBoundAccount(), colinfo.ColTypeInfoFromResCols(d.columns)) } return d.run.td.init(params.ctx, params.p.txn, params.EvalContext(), ¶ms.EvalContext().Settings.SV) diff --git a/pkg/sql/distsql/columnar_utils_test.go b/pkg/sql/distsql/columnar_utils_test.go index 0b0340aee5a1..e978807f5f5b 100644 --- a/pkg/sql/distsql/columnar_utils_test.go +++ b/pkg/sql/distsql/columnar_utils_test.go @@ -98,6 +98,7 @@ func verifyColOperator(t *testing.T, args verifyColOperatorArgs) error { defer diskMonitor.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, TempStorage: tempEngine, @@ -127,7 +128,7 @@ func verifyColOperator(t *testing.T, args verifyColOperatorArgs) error { return errors.New("processor is unexpectedly not a RowSource") } - acc := evalCtx.Mon.MakeBoundAccount() + acc := evalCtx.TestingMon.MakeBoundAccount() defer acc.Close(ctx) testAllocator := colmem.NewAllocator(ctx, &acc, coldataext.NewExtendedColumnFactory(&evalCtx)) columnarizers := make([]colexecop.Operator, len(args.inputs)) diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 4e374b72ca0e..9ebd54fbfd9a 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -325,15 +325,12 @@ func (ds *ServerImpl) setupFlow( // state once the flow cleans up. Note that we could have made a copy of // the whole evalContext, but that isn't free, so we choose to restore // the original state in order to avoid performance regressions. - origMon := evalCtx.Mon origTxn := evalCtx.Txn oldOnFlowCleanup := onFlowCleanup onFlowCleanup = func() { - evalCtx.Mon = origMon evalCtx.Txn = origTxn oldOnFlowCleanup() } - evalCtx.Mon = monitor if localState.MustUseLeafTxn() { var err error leafTxn, err = makeLeaf() @@ -370,13 +367,12 @@ func (ds *ServerImpl) setupFlow( NodeID: ds.ServerConfig.NodeID, Codec: ds.ServerConfig.Codec, ReCache: ds.regexpCache, - Mon: monitor, Locality: ds.ServerConfig.Locality, Tracer: ds.ServerConfig.Tracer, // Most processors will override this Context with their own context in // ProcessorBase. StartInternal(). Context: ctx, - Planner: &faketreeeval.DummyEvalPlanner{}, + Planner: &faketreeeval.DummyEvalPlanner{Monitor: monitor}, PrivilegedAccessor: &faketreeeval.DummyPrivilegedAccessor{}, SessionAccessor: &faketreeeval.DummySessionAccessor{}, ClientNoticeSender: &faketreeeval.DummyClientNoticeSender{}, @@ -396,7 +392,8 @@ func (ds *ServerImpl) setupFlow( // Create the FlowCtx for the flow. flowCtx := ds.newFlowContext( - ctx, req.Flow.FlowID, evalCtx, makeLeaf, req.TraceKV, req.CollectStats, localState, req.Flow.Gateway == ds.NodeID.SQLInstanceID(), + ctx, req.Flow.FlowID, evalCtx, monitor, makeLeaf, req.TraceKV, + req.CollectStats, localState, req.Flow.Gateway == ds.NodeID.SQLInstanceID(), ) // req always contains the desired vectorize mode, regardless of whether we @@ -469,6 +466,7 @@ func (ds *ServerImpl) newFlowContext( ctx context.Context, id execinfrapb.FlowID, evalCtx *eval.Context, + monitor *mon.BytesMonitor, makeLeafTxn func() (*kv.Txn, error), traceKV bool, collectStats bool, @@ -481,6 +479,7 @@ func (ds *ServerImpl) newFlowContext( Cfg: &ds.ServerConfig, ID: id, EvalCtx: evalCtx, + Mon: monitor, Txn: evalCtx.Txn, MakeLeafTxn: makeLeafTxn, NodeID: ds.ServerConfig.NodeID, diff --git a/pkg/sql/distsql/vectorized_panic_propagation_test.go b/pkg/sql/distsql/vectorized_panic_propagation_test.go index a1abed54165e..77c9a778459f 100644 --- a/pkg/sql/distsql/vectorized_panic_propagation_test.go +++ b/pkg/sql/distsql/vectorized_panic_propagation_test.go @@ -40,6 +40,7 @@ func TestNonVectorizedPanicDoesntHangServer(t *testing.T) { flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{Settings: cluster.MakeTestingClusterSettings()}, } base := flowinfra.NewFlowBase( diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index d9af9a0c12a2..f45e17c75668 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -752,14 +752,9 @@ type PlanningCtx struct { infra physicalplan.PhysicalInfrastructure // isLocal is set to true if we're planning this query on a single node. - isLocal bool - planner *planner - // ignoreClose, when set to true, will prevent the closing of the planner's - // current plan. Only the top-level query needs to close it, but everything - // else (like sub- and postqueries, or EXPLAIN ANALYZE) should set this to - // true to avoid double closes of the planNode tree. - ignoreClose bool - stmtType tree.StatementReturnType + isLocal bool + planner *planner + stmtType tree.StatementReturnType // planDepth is set to the current depth of the planNode tree. It's used to // keep track of whether it's valid to run a root node in a special fast path // mode. @@ -843,7 +838,8 @@ func (p *PlanningCtx) getDefaultSaveFlowsFunc( var explainVec []string var explainVecVerbose []string if planner.instrumentation.collectBundle && planner.curPlan.flags.IsSet(planFlagVectorized) { - flowCtx := newFlowCtxForExplainPurposes(p, planner) + flowCtx, cleanup := newFlowCtxForExplainPurposes(ctx, p, planner) + defer cleanup() getExplain := func(verbose bool) []string { explain, cleanup, err := colflow.ExplainVec( ctx, flowCtx, flows, p.infra.LocalProcessors, opChains, diff --git a/pkg/sql/distsql_plan_ctas.go b/pkg/sql/distsql_plan_ctas.go index 97ded5e0ce0c..a4f9df99ede7 100644 --- a/pkg/sql/distsql_plan_ctas.go +++ b/pkg/sql/distsql_plan_ctas.go @@ -55,5 +55,5 @@ func PlanAndRunCTAS( // Make copy of evalCtx as Run might modify it. evalCtxCopy := planner.ExtendedEvalContextCopy() dsp.FinalizePlan(planCtx, physPlan) - dsp.Run(ctx, planCtx, txn, physPlan, recv, evalCtxCopy, nil /* finishedSetupFn */)() + dsp.Run(ctx, planCtx, txn, physPlan, recv, evalCtxCopy, nil /* finishedSetupFn */) } diff --git a/pkg/sql/distsql_plan_stats.go b/pkg/sql/distsql_plan_stats.go index 840c59035fa6..174db8d7242d 100644 --- a/pkg/sql/distsql_plan_stats.go +++ b/pkg/sql/distsql_plan_stats.go @@ -307,6 +307,6 @@ func (dsp *DistSQLPlanner) planAndRunCreateStats( ) defer recv.Release() - dsp.Run(ctx, planCtx, txn, physPlan, recv, evalCtx, nil /* finishedSetupFn */)() + dsp.Run(ctx, planCtx, txn, physPlan, recv, evalCtx, nil /* finishedSetupFn */) return resultWriter.Err() } diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 252be593c792..ecf9e1826322 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -447,7 +447,7 @@ func (dsp *DistSQLPlanner) setupFlows( // former has the corresponding writer set. batchReceiver = recv } - ctx, flow, opChains, firstErr := dsp.distSQLSrv.SetupLocalSyncFlow(ctx, evalCtx.Mon, &setupReq, recv, batchReceiver, localState) + ctx, flow, opChains, firstErr := dsp.distSQLSrv.SetupLocalSyncFlow(ctx, evalCtx.Planner.Mon(), &setupReq, recv, batchReceiver, localState) // Now wait for all the flows to be scheduled on remote nodes. Note that we // are not waiting for the flows themselves to complete. @@ -482,10 +482,6 @@ const clientRejectedMsg string = "client rejected when attempting to run DistSQL // mutated. // - finishedSetupFn, if non-nil, is called synchronously after all the // processors have successfully started up. -// -// It returns a non-nil (although it can be a noop when an error is -// encountered) cleanup function that must be called in order to release the -// resources. func (dsp *DistSQLPlanner) Run( ctx context.Context, planCtx *PlanningCtx, @@ -494,9 +490,7 @@ func (dsp *DistSQLPlanner) Run( recv *DistSQLReceiver, evalCtx *extendedEvalContext, finishedSetupFn func(), -) (cleanup func()) { - cleanup = func() {} - +) { flows := plan.GenerateFlowSpecs() defer func() { for _, flowSpec := range flows { @@ -505,7 +499,7 @@ func (dsp *DistSQLPlanner) Run( }() if _, ok := flows[dsp.gatewaySQLInstanceID]; !ok { recv.SetError(errors.Errorf("expected to find gateway flow")) - return cleanup + return } var ( @@ -584,13 +578,13 @@ func (dsp *DistSQLPlanner) Run( if err != nil { log.Infof(ctx, "%s: %s", clientRejectedMsg, err) recv.SetError(err) - return cleanup + return } if tis == nil { recv.SetError(errors.AssertionFailedf( "leafInputState is nil when txn is non-nil and we must use the leaf txn", )) - return cleanup + return } leafInputState = tis } @@ -655,13 +649,13 @@ func (dsp *DistSQLPlanner) Run( ) // Make sure that the local flow is always cleaned up if it was created. if flow != nil { - cleanup = func() { + defer func() { flow.Cleanup(ctx) - } + }() } if err != nil { recv.SetError(err) - return cleanup + return } if finishedSetupFn != nil { @@ -675,7 +669,7 @@ func (dsp *DistSQLPlanner) Run( if planCtx.saveFlows != nil { if err := planCtx.saveFlows(flows, opChains); err != nil { recv.SetError(err) - return cleanup + return } } @@ -688,30 +682,11 @@ func (dsp *DistSQLPlanner) Run( if txn != nil && !localState.MustUseLeafTxn() && flow.ConcurrentTxnUse() { recv.SetError(errors.AssertionFailedf( "unexpected concurrency for a flow that was forced to be planned locally")) - return cleanup + return } // TODO(radu): this should go through the flow scheduler. flow.Run(ctx, func() {}) - - // TODO(yuzefovich): it feels like this closing should happen after - // PlanAndRun. We should refactor this and get rid off ignoreClose field. - if planCtx.planner != nil && !planCtx.ignoreClose { - // planCtx can change before the cleanup function is executed, so we make - // a copy of the planner and bind it to the function. - curPlan := &planCtx.planner.curPlan - return func() { - // We need to close the planNode tree we translated into a DistSQL plan - // before flow.Cleanup, which closes memory accounts that expect to be - // emptied. - curPlan.close(ctx) - flow.Cleanup(ctx) - } - } - - // ignoreClose is set to true meaning that someone else will handle the - // closing of the current plan, so we simply clean up the flow. - return cleanup } // DistSQLReceiver is an execinfra.RowReceiver and execinfra.BatchReceiver that @@ -1313,11 +1288,12 @@ func (dsp *DistSQLPlanner) PlanAndRunAll( recv *DistSQLReceiver, evalCtxFactory func() *extendedEvalContext, ) error { + defer planner.curPlan.close(ctx) if len(planner.curPlan.subqueryPlans) != 0 { // Create a separate memory account for the results of the subqueries. // Note that we intentionally defer the closure of the account until we // return from this method (after the main query is executed). - subqueryResultMemAcc := planner.EvalContext().Mon.MakeBoundAccount() + subqueryResultMemAcc := planner.Mon().MakeBoundAccount() defer subqueryResultMemAcc.Close(ctx) if !dsp.PlanAndRunSubqueries( ctx, planner, evalCtxFactory, planner.curPlan.subqueryPlans, recv, &subqueryResultMemAcc, @@ -1329,14 +1305,9 @@ func (dsp *DistSQLPlanner) PlanAndRunAll( } } recv.discardRows = planner.instrumentation.ShouldDiscardRows() - // We pass in whether or not we wanted to distribute this plan, which tells - // the planner whether or not to plan remote table readers. - cleanup := dsp.PlanAndRun( + dsp.PlanAndRun( ctx, evalCtx, planCtx, planner.txn, planner.curPlan.main, recv, ) - // Note that we're not cleaning up right away because postqueries might - // need to have access to the main query tree. - defer cleanup() if recv.commErr != nil || recv.resultWriter.Err() != nil { return recv.commErr } @@ -1418,7 +1389,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery( noteworthyMemoryUsageBytes, dsp.distSQLSrv.Settings, ) - subqueryMonitor.StartNoReserved(ctx, evalCtx.Mon) + subqueryMonitor.StartNoReserved(ctx, evalCtx.Planner.Mon()) defer subqueryMonitor.Stop(ctx) subqueryMemAccount := subqueryMonitor.MakeBoundAccount() @@ -1440,9 +1411,6 @@ func (dsp *DistSQLPlanner) planAndRunSubquery( } subqueryPlanCtx.traceMetadata = planner.instrumentation.traceMetadata subqueryPlanCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats() - // Don't close the top-level plan from subqueries - someone else will handle - // that. - subqueryPlanCtx.ignoreClose = true subqueryPhysPlan, physPlanCleanup, err := dsp.createPhysPlan(ctx, subqueryPlanCtx, subqueryPlan.plan) defer physPlanCleanup() if err != nil { @@ -1470,7 +1438,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery( subqueryRowReceiver := NewRowResultWriter(&rows) subqueryRecv.resultWriter = subqueryRowReceiver subqueryPlans[planIdx].started = true - dsp.Run(ctx, subqueryPlanCtx, planner.txn, subqueryPhysPlan, subqueryRecv, evalCtx, nil /* finishedSetupFn */)() + dsp.Run(ctx, subqueryPlanCtx, planner.txn, subqueryPhysPlan, subqueryRecv, evalCtx, nil /* finishedSetupFn */) if err := subqueryRowReceiver.Err(); err != nil { return err } @@ -1575,16 +1543,6 @@ func (dsp *DistSQLPlanner) planAndRunSubquery( // while using that resultWriter), the error is also stored in // DistSQLReceiver.commErr. That can be tested to see if a client session needs // to be closed. -// -// It returns a non-nil (although it can be a noop when an error is -// encountered) cleanup function that must be called once the planTop AST is no -// longer needed and can be closed. Note that this function also cleans up the -// flow which is unfortunate but is caused by the sharing of memory monitors -// between planning and execution - cleaning up the flow wants to close the -// monitor, but it cannot do so because the AST needs to live longer and still -// uses the same monitor. That's why we end up in a situation that in order to -// clean up the flow, we need to close the AST first, but we can only do that -// after PlanAndRun returns. func (dsp *DistSQLPlanner) PlanAndRun( ctx context.Context, evalCtx *extendedEvalContext, @@ -1592,27 +1550,18 @@ func (dsp *DistSQLPlanner) PlanAndRun( txn *kv.Txn, plan planMaybePhysical, recv *DistSQLReceiver, -) (cleanup func()) { +) { log.VEventf(ctx, 2, "creating DistSQL plan with isLocal=%v", planCtx.isLocal) physPlan, physPlanCleanup, err := dsp.createPhysPlan(ctx, planCtx, plan) + defer physPlanCleanup() if err != nil { recv.SetError(err) - return func() { - // Make sure to close the current plan in case of a physical - // planning error. Usually, this is done in runCleanup() below, but - // we won't get to that point, so we have to do so here. - planCtx.planner.curPlan.close(ctx) - physPlanCleanup() - } + return } dsp.finalizePlanWithRowCount(planCtx, physPlan, planCtx.planner.curPlan.mainRowCount) recv.expectedRowsRead = int64(physPlan.TotalEstimatedScannedRows) - runCleanup := dsp.Run(ctx, planCtx, txn, physPlan, recv, evalCtx, nil /* finishedSetupFn */) - return func() { - runCleanup() - physPlanCleanup() - } + dsp.Run(ctx, planCtx, txn, physPlan, recv, evalCtx, nil /* finishedSetupFn */) } // PlanAndRunCascadesAndChecks runs any cascade and check queries. @@ -1772,7 +1721,7 @@ func (dsp *DistSQLPlanner) planAndRunPostquery( noteworthyMemoryUsageBytes, dsp.distSQLSrv.Settings, ) - postqueryMonitor.StartNoReserved(ctx, evalCtx.Mon) + postqueryMonitor.StartNoReserved(ctx, evalCtx.Planner.Mon()) defer postqueryMonitor.Stop(ctx) postqueryMemAccount := postqueryMonitor.MakeBoundAccount() @@ -1787,7 +1736,6 @@ func (dsp *DistSQLPlanner) planAndRunPostquery( } postqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distribute) postqueryPlanCtx.stmtType = tree.Rows - postqueryPlanCtx.ignoreClose = true // Postqueries are only executed on the main query path where we skip the // diagram generation. postqueryPlanCtx.skipDistSQLDiagramGeneration = true @@ -1811,6 +1759,6 @@ func (dsp *DistSQLPlanner) planAndRunPostquery( postqueryResultWriter := &errOnlyResultWriter{} postqueryRecv.resultWriter = postqueryResultWriter postqueryRecv.batchWriter = postqueryResultWriter - dsp.Run(ctx, postqueryPlanCtx, planner.txn, postqueryPhysPlan, postqueryRecv, evalCtx, nil /* finishedSetupFn */)() + dsp.Run(ctx, postqueryPlanCtx, planner.txn, postqueryPhysPlan, postqueryRecv, evalCtx, nil /* finishedSetupFn */) return postqueryRecv.resultWriter.Err() } diff --git a/pkg/sql/distsql_running_test.go b/pkg/sql/distsql_running_test.go index 841ba5526967..c186de95085a 100644 --- a/pkg/sql/distsql_running_test.go +++ b/pkg/sql/distsql_running_test.go @@ -170,6 +170,7 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) { if err := p.makeOptimizerPlan(ctx); err != nil { t.Fatal(err) } + defer p.curPlan.close(ctx) evalCtx := p.ExtendedEvalContext() // We need distribute = true so that executing the plan involves marshaling @@ -181,7 +182,7 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) { execCfg.DistSQLPlanner.PlanAndRun( ctx, evalCtx, planCtx, txn, p.curPlan.main, recv, - )() + ) return rw.Err() }) if err != nil { diff --git a/pkg/sql/execinfra/flow_context.go b/pkg/sql/execinfra/flow_context.go index 6a0cc5fd0f62..64ec654c7b14 100644 --- a/pkg/sql/execinfra/flow_context.go +++ b/pkg/sql/execinfra/flow_context.go @@ -50,6 +50,8 @@ type FlowCtx struct { // cores of the processors that need it. EvalCtx *eval.Context + Mon *mon.BytesMonitor + // The transaction in which kv operations performed by processors in the flow // must be performed. Processors in the Flow will use this txn concurrently. // This field is generally not nil, except for flows that don't run in a diff --git a/pkg/sql/explain_vec.go b/pkg/sql/explain_vec.go index d7a2fd93bcac..ff576cf17d7d 100644 --- a/pkg/sql/explain_vec.go +++ b/pkg/sql/explain_vec.go @@ -12,6 +12,7 @@ package sql import ( "context" + "math" "github.com/cockroachdb/cockroach/pkg/sql/colflow" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" @@ -62,7 +63,8 @@ func (n *explainVecNode) startExec(params runParams) error { distSQLPlanner.finalizePlanWithRowCount(planCtx, physPlan, n.plan.mainRowCount) flows := physPlan.GenerateFlowSpecs() - flowCtx := newFlowCtxForExplainPurposes(planCtx, params.p) + flowCtx, cleanup := newFlowCtxForExplainPurposes(params.ctx, planCtx, params.p) + defer cleanup() // We want to get the vectorized plan which would be executed with the // current 'vectorize' option. If 'vectorize' is set to 'off', then the @@ -85,10 +87,26 @@ func (n *explainVecNode) startExec(params runParams) error { return nil } -func newFlowCtxForExplainPurposes(planCtx *PlanningCtx, p *planner) *execinfra.FlowCtx { +func newFlowCtxForExplainPurposes( + ctx context.Context, planCtx *PlanningCtx, p *planner, +) (_ *execinfra.FlowCtx, cleanup func()) { + monitor := mon.NewMonitor( + "explain", /* name */ + mon.MemoryResource, + nil, /* curCount */ + nil, /* maxHist */ + -1, /* increment */ + math.MaxInt64, /* noteworthy */ + p.execCfg.Settings, + ) + monitor.StartNoReserved(ctx, p.Mon()) + cleanup = func() { + monitor.Stop(ctx) + } return &execinfra.FlowCtx{ NodeID: planCtx.EvalContext().NodeID, EvalCtx: planCtx.EvalContext(), + Mon: monitor, Cfg: &execinfra.ServerConfig{ Settings: p.execCfg.Settings, LogicalClusterID: p.DistSQLPlanner().distSQLSrv.ServerConfig.LogicalClusterID, @@ -97,7 +115,7 @@ func newFlowCtxForExplainPurposes(planCtx *PlanningCtx, p *planner) *execinfra.F }, Descriptors: p.Descriptors(), DiskMonitor: &mon.BytesMonitor{}, - } + }, cleanup } func newPlanningCtxForExplainPurposes( @@ -112,7 +130,6 @@ func newPlanningCtxForExplainPurposes( } planCtx := distSQLPlanner.NewPlanningCtx(params.ctx, params.extendedEvalCtx, params.p, params.p.txn, distribute) - planCtx.ignoreClose = true planCtx.planner.curPlan.subqueryPlans = subqueryPlans for i := range planCtx.planner.curPlan.subqueryPlans { p := &planCtx.planner.curPlan.subqueryPlans[i] diff --git a/pkg/sql/faketreeeval/BUILD.bazel b/pkg/sql/faketreeeval/BUILD.bazel index 77df33d5096d..9675da46001c 100644 --- a/pkg/sql/faketreeeval/BUILD.bazel +++ b/pkg/sql/faketreeeval/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//pkg/sql/sessiondatapb", "//pkg/sql/types", "//pkg/util/errorutil/unimplemented", + "//pkg/util/mon", "@com_github_cockroachdb_errors//:errors", "@com_github_lib_pq//oid", ], diff --git a/pkg/sql/faketreeeval/evalctx.go b/pkg/sql/faketreeeval/evalctx.go index e67c4a1643a4..a498b5bb20c7 100644 --- a/pkg/sql/faketreeeval/evalctx.go +++ b/pkg/sql/faketreeeval/evalctx.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/errors" "github.com/lib/pq/oid" ) @@ -157,7 +158,9 @@ func (so *DummyRegionOperator) ResetMultiRegionZoneConfigsForDatabase( // DummyEvalPlanner implements the eval.Planner interface by returning // errors. -type DummyEvalPlanner struct{} +type DummyEvalPlanner struct { + Monitor *mon.BytesMonitor +} // ResolveOIDFromString is part of the Planner interface. func (ep *DummyEvalPlanner) ResolveOIDFromString( @@ -293,6 +296,11 @@ func (*DummyEvalPlanner) RepairTTLScheduledJobForTable(ctx context.Context, tabl return errors.WithStack(errEvalPlanner) } +// Mon is part of the eval.Planner interface. +func (ep *DummyEvalPlanner) Mon() *mon.BytesMonitor { + return ep.Monitor +} + // ExecutorConfig is part of the Planner interface. func (*DummyEvalPlanner) ExecutorConfig() interface{} { return nil diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index c3b529f13fdc..3fe19cab2825 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -527,17 +527,23 @@ func (f *FlowBase) Cleanup(ctx context.Context) { f.sp.RecordStructured(&execinfrapb.ComponentStats{ Component: execinfrapb.FlowComponentID(f.NodeID.SQLInstanceID(), f.FlowCtx.ID), FlowStats: execinfrapb.FlowStats{ - MaxMemUsage: optional.MakeUint(uint64(f.FlowCtx.EvalCtx.Mon.MaximumBytes())), + MaxMemUsage: optional.MakeUint(uint64(f.FlowCtx.Mon.MaximumBytes())), MaxDiskUsage: optional.MakeUint(uint64(f.FlowCtx.DiskMonitor.MaximumBytes())), }, }) } } - // This closes the disk monitor opened in newFlowCtx. - f.DiskMonitor.Stop(ctx) - // This closes the monitor opened in ServerImpl.setupFlow. - f.EvalCtx.Stop(ctx) + // This closes the disk monitor opened in newFlowContext as well as the + // memory monitor opened in ServerImpl.setupFlow. + if r := recover(); r != nil { + f.DiskMonitor.EmergencyStop(ctx) + f.Mon.EmergencyStop(ctx) + panic(r) + } else { + f.DiskMonitor.Stop(ctx) + f.Mon.Stop(ctx) + } for _, p := range f.processors { if d, ok := p.(execreleasable.Releasable); ok { d.Release() diff --git a/pkg/sql/flowinfra/outbox.go b/pkg/sql/flowinfra/outbox.go index 4e97bd04d4e0..99e2c742e8e6 100644 --- a/pkg/sql/flowinfra/outbox.go +++ b/pkg/sql/flowinfra/outbox.go @@ -288,7 +288,7 @@ func (m *Outbox) mainLoop(ctx context.Context) error { // since it's a flow level stat. However, due to the row exec engine infrastructure, it is too // complicated to attach this to a flow level span. If the row exec engine gets removed, getting // maxMemUsage from streamStats should be removed as well. - m.stats.FlowStats.MaxMemUsage.Set(uint64(m.flowCtx.EvalCtx.Mon.MaximumBytes())) + m.stats.FlowStats.MaxMemUsage.Set(uint64(m.flowCtx.Mon.MaximumBytes())) m.stats.FlowStats.MaxDiskUsage.Set(uint64(m.flowCtx.DiskMonitor.MaximumBytes())) } span.RecordStructured(&m.stats) diff --git a/pkg/sql/flowinfra/outbox_test.go b/pkg/sql/flowinfra/outbox_test.go index 991cc29dc0be..e5e8eb95ea0d 100644 --- a/pkg/sql/flowinfra/outbox_test.go +++ b/pkg/sql/flowinfra/outbox_test.go @@ -72,6 +72,7 @@ func TestOutbox(t *testing.T) { dialer := nodedialer.New(clientRPC, staticAddressResolver(addr)) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, ID: execinfrapb.FlowID{UUID: uuid.MakeV4()}, Cfg: &execinfra.ServerConfig{ Settings: st, @@ -237,6 +238,7 @@ func TestOutboxInitializesStreamBeforeReceivingAnyRows(t *testing.T) { dialer := nodedialer.New(clientRPC, staticAddressResolver(addr)) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, ID: execinfrapb.FlowID{UUID: uuid.MakeV4()}, Cfg: &execinfra.ServerConfig{ Settings: st, @@ -309,6 +311,7 @@ func TestOutboxClosesWhenConsumerCloses(t *testing.T) { dialer := nodedialer.New(clientRPC, staticAddressResolver(addr)) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, ID: execinfrapb.FlowID{UUID: uuid.MakeV4()}, Cfg: &execinfra.ServerConfig{ Settings: st, @@ -386,6 +389,7 @@ func TestOutboxCancelsFlowOnError(t *testing.T) { dialer := nodedialer.New(clientRPC, staticAddressResolver(addr)) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, ID: execinfrapb.FlowID{UUID: uuid.MakeV4()}, Cfg: &execinfra.ServerConfig{ Settings: st, @@ -441,6 +445,7 @@ func TestOutboxUnblocksProducers(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, ID: execinfrapb.FlowID{UUID: uuid.MakeV4()}, Cfg: &execinfra.ServerConfig{ Settings: st, @@ -516,6 +521,7 @@ func BenchmarkOutbox(b *testing.B) { dialer := nodedialer.New(clientRPC, staticAddressResolver(addr)) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, ID: execinfrapb.FlowID{UUID: uuid.MakeV4()}, Cfg: &execinfra.ServerConfig{ Settings: st, diff --git a/pkg/sql/flowinfra/server_test.go b/pkg/sql/flowinfra/server_test.go index 95128edc3ce9..5be6d945f186 100644 --- a/pkg/sql/flowinfra/server_test.go +++ b/pkg/sql/flowinfra/server_test.go @@ -169,7 +169,7 @@ func runLocalFlow( evalCtx := eval.MakeTestingEvalContext(s.ClusterSettings()) defer evalCtx.Stop(ctx) var rowBuf distsqlutils.RowBuffer - flowCtx, flow, _, err := s.DistSQLServer().(*distsql.ServerImpl).SetupLocalSyncFlow(ctx, evalCtx.Mon, req, &rowBuf, nil /* batchOutput */, distsql.LocalState{}) + flowCtx, flow, _, err := s.DistSQLServer().(*distsql.ServerImpl).SetupLocalSyncFlow(ctx, evalCtx.TestingMon, req, &rowBuf, nil /* batchOutput */, distsql.LocalState{}) if err != nil { return nil, err } @@ -206,7 +206,7 @@ func runLocalFlowTenant( evalCtx := eval.MakeTestingEvalContext(s.ClusterSettings()) defer evalCtx.Stop(ctx) var rowBuf distsqlutils.RowBuffer - flowCtx, flow, _, err := s.DistSQLServer().(*distsql.ServerImpl).SetupLocalSyncFlow(ctx, evalCtx.Mon, req, &rowBuf, nil /* batchOutput */, distsql.LocalState{}) + flowCtx, flow, _, err := s.DistSQLServer().(*distsql.ServerImpl).SetupLocalSyncFlow(ctx, evalCtx.TestingMon, req, &rowBuf, nil /* batchOutput */, distsql.LocalState{}) if err != nil { return nil, err } diff --git a/pkg/sql/importer/import_processor_planning.go b/pkg/sql/importer/import_processor_planning.go index 70886b48bd36..37b979f677d2 100644 --- a/pkg/sql/importer/import_processor_planning.go +++ b/pkg/sql/importer/import_processor_planning.go @@ -261,7 +261,7 @@ func distImport( // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx - dsp.Run(ctx, planCtx, nil, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)() + dsp.Run(ctx, planCtx, nil, p, recv, &evalCtxCopy, nil /* finishedSetupFn */) return rowResultWriter.Err() }) diff --git a/pkg/sql/importer/import_processor_test.go b/pkg/sql/importer/import_processor_test.go index 664f731f990b..eade356105f7 100644 --- a/pkg/sql/importer/import_processor_test.go +++ b/pkg/sql/importer/import_processor_test.go @@ -236,6 +236,7 @@ func TestImportIgnoresProcessedFiles(t *testing.T) { evalCtx := eval.MakeTestingEvalContext(cluster.MakeTestingClusterSettings()) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: &cluster.Settings{}, ExternalStorage: externalStorageFactory, @@ -336,6 +337,7 @@ func TestImportHonorsResumePosition(t *testing.T) { evalCtx := eval.MakeTestingEvalContext(cluster.MakeTestingClusterSettings()) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: &cluster.Settings{}, ExternalStorage: externalStorageFactory, @@ -463,6 +465,7 @@ func TestImportHandlesDuplicateKVs(t *testing.T) { evalCtx := eval.MakeTestingEvalContext(cluster.MakeTestingClusterSettings()) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: &cluster.Settings{}, ExternalStorage: externalStorageFactory, diff --git a/pkg/sql/index_backfiller.go b/pkg/sql/index_backfiller.go index fa76ad9ca084..7563ca5ca621 100644 --- a/pkg/sql/index_backfiller.go +++ b/pkg/sql/index_backfiller.go @@ -202,7 +202,7 @@ func (ib *IndexBackfillPlanner) plan( ) defer recv.Release() evalCtxCopy := evalCtx - ib.execCfg.DistSQLPlanner.Run(ctx, planCtx, nil, p, recv, &evalCtxCopy, nil)() + ib.execCfg.DistSQLPlanner.Run(ctx, planCtx, nil, p, recv, &evalCtxCopy, nil) return cbw.Err() }, nil } diff --git a/pkg/sql/insert.go b/pkg/sql/insert.go index e89eacc33acd..e6b0faa69073 100644 --- a/pkg/sql/insert.go +++ b/pkg/sql/insert.go @@ -89,7 +89,7 @@ func (r *insertRun) initRowContainer(params runParams, columns colinfo.ResultCol return } r.ti.rows = rowcontainer.NewRowContainer( - params.EvalContext().Mon.MakeBoundAccount(), + params.p.Mon().MakeBoundAccount(), colinfo.ColTypeInfoFromResCols(columns), ) diff --git a/pkg/sql/mvcc_backfiller.go b/pkg/sql/mvcc_backfiller.go index 547ee800b890..a066cc4ab6aa 100644 --- a/pkg/sql/mvcc_backfiller.go +++ b/pkg/sql/mvcc_backfiller.go @@ -162,7 +162,7 @@ func (im *IndexBackfillerMergePlanner) plan( nil, /* txn - the processors manage their own transactions */ p, recv, &evalCtxCopy, nil, /* finishedSetupFn */ - )() + ) return cbw.Err() }, nil } diff --git a/pkg/sql/opt/memo/memo_test.go b/pkg/sql/opt/memo/memo_test.go index 56c1f43a8966..bb5a181c95dd 100644 --- a/pkg/sql/opt/memo/memo_test.go +++ b/pkg/sql/opt/memo/memo_test.go @@ -146,6 +146,12 @@ func TestMemoIsStale(t *testing.T) { // Initialize context with starting values. evalCtx := eval.MakeTestingEvalContext(cluster.MakeTestingClusterSettings()) evalCtx.SessionData().Database = "t" + // MakeTestingEvalContext created a fake planner that can only provide the + // memory monitor and will encounter a nil-pointer error when other methods + // are accessed. In this test, GetDatabaseSurvivalGoal method will be called + // which can handle a case of nil planner but cannot a case when the + // planner's GetMultiregionConfig is nil, so we nil out the planner. + evalCtx.Planner = nil var o xform.Optimizer opttestutils.BuildQuery(t, &o, catalog, &evalCtx, "SELECT a, b+1 FROM abcview WHERE c='foo'") diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 26751e334d27..1f9767e3c359 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -176,6 +176,8 @@ type planner struct { // a SQL session. isInternalPlanner bool + monitor *mon.BytesMonitor + // Corresponding Statement for this query. stmt Statement @@ -362,6 +364,7 @@ func newInternalPlanner( -1, /* increment */ noteworthyInternalMemoryUsageBytes, execCfg.Settings) plannerMon.StartNoReserved(ctx, execCfg.RootMemoryMonitor) + p.monitor = plannerMon smi := &sessionDataMutatorIterator{ sds: sds, @@ -375,7 +378,7 @@ func newInternalPlanner( sessionDataMutatorCallbacks: sessionDataMutatorCallbacks{}, } - p.extendedEvalCtx = internalExtendedEvalCtx(ctx, sds, params.collection, txn, ts, ts, execCfg, plannerMon) + p.extendedEvalCtx = internalExtendedEvalCtx(ctx, sds, params.collection, txn, ts, ts, execCfg) p.extendedEvalCtx.Planner = p p.extendedEvalCtx.PrivilegedAccessor = p p.extendedEvalCtx.SessionAccessor = p @@ -440,7 +443,6 @@ func internalExtendedEvalCtx( txnTimestamp time.Time, stmtTimestamp time.Time, execCfg *ExecutorConfig, - plannerMon *mon.BytesMonitor, ) extendedEvalContext { evalContextTestingKnobs := execCfg.EvalContextTestingKnobs @@ -474,7 +476,6 @@ func internalExtendedEvalCtx( TxnImplicit: true, TxnIsSingleStmt: true, Context: ctx, - Mon: plannerMon, TestingKnobs: evalContextTestingKnobs, StmtTimestamp: stmtTimestamp, TxnTimestamp: txnTimestamp, @@ -516,6 +517,11 @@ func (p *planner) Descriptors() *descs.Collection { return p.extendedEvalCtx.Descs } +// Mon is part of the eval.Planner interface. +func (p *planner) Mon() *mon.BytesMonitor { + return p.monitor +} + // ExecCfg implements the PlanHookState interface. func (p *planner) ExecCfg() *ExecutorConfig { return p.extendedEvalCtx.ExecCfg @@ -1009,11 +1015,16 @@ func (p *planner) WithInternalExecutor( } func (p *planner) resetPlanner( - ctx context.Context, txn *kv.Txn, stmtTS time.Time, sd *sessiondata.SessionData, + ctx context.Context, + txn *kv.Txn, + stmtTS time.Time, + sd *sessiondata.SessionData, + plannerMon *mon.BytesMonitor, ) { p.txn = txn p.stmt = Statement{} p.instrumentation = instrumentationHelper{} + p.monitor = plannerMon p.cancelChecker.Reset(ctx) diff --git a/pkg/sql/rowcontainer/datum_row_container_test.go b/pkg/sql/rowcontainer/datum_row_container_test.go index 50ec769d99cd..8b8b69763035 100644 --- a/pkg/sql/rowcontainer/datum_row_container_test.go +++ b/pkg/sql/rowcontainer/datum_row_container_test.go @@ -20,11 +20,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" ) func TestRowContainer(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() for _, numCols := range []int{0, 1, 2, 3, 5, 10, 15} { @@ -78,6 +80,7 @@ func TestRowContainer(t *testing.T) { func TestRowContainerAtOutOfRange(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() st := cluster.MakeTestingClusterSettings() @@ -105,6 +108,7 @@ func TestRowContainerAtOutOfRange(t *testing.T) { func TestRowContainerZeroCols(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() st := cluster.MakeTestingClusterSettings() @@ -153,6 +157,9 @@ func TestRowContainerZeroCols(t *testing.T) { } func BenchmarkRowContainerAt(b *testing.B) { + defer leaktest.AfterTest(b)() + defer log.Scope(b).Close(b) + const numCols = 3 const numRows = 1024 diff --git a/pkg/sql/rowcontainer/disk_row_container_test.go b/pkg/sql/rowcontainer/disk_row_container_test.go index 500bfa760c93..02daff2e3f27 100644 --- a/pkg/sql/rowcontainer/disk_row_container_test.go +++ b/pkg/sql/rowcontainer/disk_row_container_test.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" @@ -65,6 +66,7 @@ func compareRows( func TestDiskRowContainer(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() st := cluster.MakeTestingClusterSettings() @@ -108,6 +110,7 @@ func TestDiskRowContainer(t *testing.T) { rng := rand.New(rand.NewSource(timeutil.Now().UnixNano())) evalCtx := eval.MakeTestingEvalContext(st) + defer evalCtx.Stop(ctx) diskMonitor := mon.NewMonitor( "test-disk", mon.DiskResource, @@ -368,6 +371,7 @@ func makeUniqueRows( func TestDiskRowContainerDiskFull(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() st := cluster.MakeTestingClusterSettings() @@ -406,6 +410,7 @@ func TestDiskRowContainerDiskFull(t *testing.T) { func TestDiskRowContainerFinalIterator(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() st := cluster.MakeTestingClusterSettings() @@ -536,6 +541,7 @@ func TestDiskRowContainerFinalIterator(t *testing.T) { func TestDiskRowContainerUnsafeReset(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() st := cluster.MakeTestingClusterSettings() diff --git a/pkg/sql/rowcontainer/hash_row_container_test.go b/pkg/sql/rowcontainer/hash_row_container_test.go index b2c1c507bf6e..cd14befaf050 100644 --- a/pkg/sql/rowcontainer/hash_row_container_test.go +++ b/pkg/sql/rowcontainer/hash_row_container_test.go @@ -28,11 +28,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" ) func TestHashDiskBackedRowContainer(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() st := cluster.MakeTestingClusterSettings() @@ -318,6 +320,7 @@ func TestHashDiskBackedRowContainer(t *testing.T) { func TestHashDiskBackedRowContainerPreservesMatchesAndMarks(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() st := cluster.MakeTestingClusterSettings() diff --git a/pkg/sql/rowcontainer/numbered_row_container_test.go b/pkg/sql/rowcontainer/numbered_row_container_test.go index 758fe3c162a0..24f4807d5175 100644 --- a/pkg/sql/rowcontainer/numbered_row_container_test.go +++ b/pkg/sql/rowcontainer/numbered_row_container_test.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/stretchr/testify/require" @@ -51,6 +52,7 @@ func newTestDiskMonitor(ctx context.Context, st *cluster.Settings) *mon.BytesMon // Tests the de-duping functionality of DiskBackedNumberedRowContainer. func TestNumberedRowContainerDeDuping(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() st := cluster.MakeTestingClusterSettings() @@ -142,6 +144,7 @@ func TestNumberedRowContainerDeDuping(t *testing.T) { // elsewhere. func TestNumberedRowContainerIteratorCaching(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() st := cluster.MakeTestingClusterSettings() @@ -242,6 +245,7 @@ func TestNumberedRowContainerIteratorCaching(t *testing.T) { // DiskBackedIndexedRowContainer return the same results. func TestCompareNumberedAndIndexedRowContainers(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) rng, _ := randutil.NewTestRand() @@ -553,6 +557,9 @@ func accessPatternForBenchmarkIterations(totalAccesses int, accessPattern [][]in } func BenchmarkNumberedContainerIteratorCaching(b *testing.B) { + defer leaktest.AfterTest(b)() + defer log.Scope(b).Close(b) + const numRows = 10000 ctx := context.Background() diff --git a/pkg/sql/rowcontainer/row_container.go b/pkg/sql/rowcontainer/row_container.go index ccec3909193b..454e35ec6ba1 100644 --- a/pkg/sql/rowcontainer/row_container.go +++ b/pkg/sql/rowcontainer/row_container.go @@ -158,12 +158,12 @@ type MemRowContainer struct { var _ heap.Interface = &MemRowContainer{} var _ IndexedRowContainer = &MemRowContainer{} -// Init initializes the MemRowContainer. The MemRowContainer uses evalCtx.Mon -// to track memory usage. +// Init initializes the MemRowContainer. The MemRowContainer uses the planner's +// monitor to track memory usage. func (mc *MemRowContainer) Init( ordering colinfo.ColumnOrdering, types []*types.T, evalCtx *eval.Context, ) { - mc.InitWithMon(ordering, types, evalCtx, evalCtx.Mon) + mc.InitWithMon(ordering, types, evalCtx, evalCtx.Planner.Mon()) } // InitWithMon initializes the MemRowContainer with an explicit monitor. Only diff --git a/pkg/sql/rowcontainer/row_container_test.go b/pkg/sql/rowcontainer/row_container_test.go index 2cadc65ddd6b..35122e85bc7a 100644 --- a/pkg/sql/rowcontainer/row_container_test.go +++ b/pkg/sql/rowcontainer/row_container_test.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -76,6 +77,7 @@ func verifyRows( // the memory accounting. func TestRowContainerReplaceMax(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() rng, _ := randutil.NewTestRand() @@ -131,6 +133,7 @@ func TestRowContainerReplaceMax(t *testing.T) { func TestRowContainerIterators(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() st := cluster.MakeTestingClusterSettings() @@ -187,6 +190,7 @@ func TestRowContainerIterators(t *testing.T) { func TestDiskBackedRowContainer(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() st := cluster.MakeTestingClusterSettings() @@ -342,6 +346,7 @@ func TestDiskBackedRowContainer(t *testing.T) { func TestDiskBackedRowContainerDeDuping(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() st := cluster.MakeTestingClusterSettings() @@ -461,6 +466,7 @@ func verifyOrdering( func TestDiskBackedIndexedRowContainer(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() st := cluster.MakeTestingClusterSettings() @@ -920,6 +926,9 @@ func generateAccessPattern(numRows int) []int { } func BenchmarkDiskBackedIndexedRowContainer(b *testing.B) { + defer leaktest.AfterTest(b)() + defer log.Scope(b).Close(b) + const numCols = 1 const numRows = 100000 diff --git a/pkg/sql/rowenc/encoded_datum_test.go b/pkg/sql/rowenc/encoded_datum_test.go index 1cc75655558d..6e8c1686b127 100644 --- a/pkg/sql/rowenc/encoded_datum_test.go +++ b/pkg/sql/rowenc/encoded_datum_test.go @@ -735,7 +735,7 @@ func TestEncDatumFingerprintMemory(t *testing.T) { ctx := context.Background() evalCtx := eval.MakeTestingEvalContext(cluster.MakeTestingClusterSettings()) defer evalCtx.Stop(ctx) - memAcc := evalCtx.Mon.MakeBoundAccount() + memAcc := evalCtx.TestingMon.MakeBoundAccount() defer memAcc.Close(ctx) var da tree.DatumAlloc for _, c := range testCases { diff --git a/pkg/sql/rowexec/aggregator.go b/pkg/sql/rowexec/aggregator.go index b80d85761cb1..4ac11602237e 100644 --- a/pkg/sql/rowexec/aggregator.go +++ b/pkg/sql/rowexec/aggregator.go @@ -97,7 +97,7 @@ func (ag *aggregatorBase) init( output execinfra.RowReceiver, trailingMetaCallback func() []execinfrapb.ProducerMetadata, ) error { - memMonitor := execinfra.NewMonitor(ctx, flowCtx.EvalCtx.Mon, "aggregator-mem") + memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, "aggregator-mem") if execstats.ShouldCollectStats(ctx, flowCtx.CollectStats) { input = newInputStatCollector(input) ag.ExecStatsForTrace = ag.execStatsForTrace diff --git a/pkg/sql/rowexec/aggregator_test.go b/pkg/sql/rowexec/aggregator_test.go index 3a1ff77e4f80..af31f5f29571 100644 --- a/pkg/sql/rowexec/aggregator_test.go +++ b/pkg/sql/rowexec/aggregator_test.go @@ -426,6 +426,7 @@ func BenchmarkAggregation(b *testing.B) { flowCtx := &execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } for _, aggFunc := range aggFuncs { @@ -480,6 +481,7 @@ func BenchmarkCountRows(b *testing.B) { flowCtx := &execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } b.SetBytes(int64(8 * numRows * numCols)) @@ -507,6 +509,7 @@ func BenchmarkGrouping(b *testing.B) { flowCtx := &execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } spec := &execinfrapb.AggregatorSpec{ GroupCols: []uint32{0}, @@ -558,6 +561,7 @@ func benchmarkAggregationWithGrouping(b *testing.B, numOrderedCols int) { flowCtx := &execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } for _, aggFunc := range aggFuncs { diff --git a/pkg/sql/rowexec/distinct.go b/pkg/sql/rowexec/distinct.go index 2a675da3c7b5..00f3c2ecf661 100644 --- a/pkg/sql/rowexec/distinct.go +++ b/pkg/sql/rowexec/distinct.go @@ -99,7 +99,7 @@ func newDistinct( } } - memMonitor := execinfra.NewMonitor(ctx, flowCtx.EvalCtx.Mon, "distinct-mem") + memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, "distinct-mem") d := &distinct{ input: input, memAcc: memMonitor.MakeBoundAccount(), diff --git a/pkg/sql/rowexec/distinct_test.go b/pkg/sql/rowexec/distinct_test.go index d4fd2ffc043a..8ffddc6f4230 100644 --- a/pkg/sql/rowexec/distinct_test.go +++ b/pkg/sql/rowexec/distinct_test.go @@ -278,6 +278,7 @@ func TestDistinct(t *testing.T) { flowCtx := execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } d, err := newDistinct(context.Background(), &flowCtx, 0 /* processorID */, &ds, in, &execinfrapb.PostProcessSpec{}, out) @@ -327,6 +328,7 @@ func benchmarkDistinct(b *testing.B, orderedColumns []uint32) { flowCtx := &execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } spec := &execinfrapb.DistinctSpec{ DistinctColumns: []uint32{0, 1}, diff --git a/pkg/sql/rowexec/filterer_test.go b/pkg/sql/rowexec/filterer_test.go index 08ba7b3e9bcd..39228e8371ab 100644 --- a/pkg/sql/rowexec/filterer_test.go +++ b/pkg/sql/rowexec/filterer_test.go @@ -84,6 +84,7 @@ func TestFilterer(t *testing.T) { flowCtx := execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } spec := execinfrapb.FiltererSpec{ Filter: execinfrapb.Expression{Expr: c.filter}, @@ -126,6 +127,7 @@ func BenchmarkFilterer(b *testing.B) { flowCtx := &execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } post := &execinfrapb.PostProcessSpec{} disposer := &rowDisposer{} diff --git a/pkg/sql/rowexec/hashjoiner.go b/pkg/sql/rowexec/hashjoiner.go index e862d615a6e5..855493b12ae2 100644 --- a/pkg/sql/rowexec/hashjoiner.go +++ b/pkg/sql/rowexec/hashjoiner.go @@ -144,7 +144,7 @@ func newHashJoiner( // Limit the memory use by creating a child monitor with a hard limit. // The hashJoiner will overflow to disk if this limit is not enough. - h.MemMonitor = execinfra.NewLimitedMonitor(ctx, flowCtx.EvalCtx.Mon, flowCtx, "hashjoiner-limited") + h.MemMonitor = execinfra.NewLimitedMonitor(ctx, flowCtx.Mon, flowCtx, "hashjoiner-limited") h.diskMonitor = execinfra.NewMonitor(ctx, flowCtx.DiskMonitor, "hashjoiner-disk") h.hashTable = rowcontainer.NewHashDiskBackedRowContainer( h.EvalCtx, h.MemMonitor, h.diskMonitor, h.FlowCtx.Cfg.TempStorage, diff --git a/pkg/sql/rowexec/hashjoiner_test.go b/pkg/sql/rowexec/hashjoiner_test.go index 7af1c1769e49..ac77fdb64d6b 100644 --- a/pkg/sql/rowexec/hashjoiner_test.go +++ b/pkg/sql/rowexec/hashjoiner_test.go @@ -1046,6 +1046,7 @@ func TestHashJoiner(t *testing.T) { out := &distsqlutils.RowBuffer{} flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, TempStorage: tempEngine, @@ -1130,6 +1131,7 @@ func TestHashJoinerError(t *testing.T) { out := &distsqlutils.RowBuffer{} flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, TempStorage: tempEngine, @@ -1278,6 +1280,7 @@ func TestHashJoinerDrain(t *testing.T) { }, DiskMonitor: diskMonitor, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } post := execinfrapb.PostProcessSpec{Projection: true, OutputColumns: outCols} @@ -1410,6 +1413,7 @@ func TestHashJoinerDrainAfterBuildPhaseError(t *testing.T) { }, DiskMonitor: diskMonitor, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } post := execinfrapb.PostProcessSpec{Projection: true, OutputColumns: outCols} @@ -1460,6 +1464,7 @@ func BenchmarkHashJoiner(b *testing.B) { defer diskMonitor.Stop(ctx) flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/rowexec/inverted_filterer.go b/pkg/sql/rowexec/inverted_filterer.go index 9f3e6f0b54b7..96fdd9dd78e4 100644 --- a/pkg/sql/rowexec/inverted_filterer.go +++ b/pkg/sql/rowexec/inverted_filterer.go @@ -110,7 +110,7 @@ func newInvertedFilterer( } // Initialize memory monitor and row container for input rows. - ifr.MemMonitor = execinfra.NewLimitedMonitor(ctx, flowCtx.EvalCtx.Mon, flowCtx, "inverted-filterer-limited") + ifr.MemMonitor = execinfra.NewLimitedMonitor(ctx, flowCtx.Mon, flowCtx, "inverted-filterer-limited") ifr.diskMonitor = execinfra.NewMonitor(ctx, flowCtx.DiskMonitor, "inverted-filterer-disk") ifr.rc = rowcontainer.NewDiskBackedNumberedRowContainer( true, /* deDup */ diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index 1158ca00b120..215da8553183 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -297,7 +297,7 @@ func newInvertedJoiner( LockWaitPolicy: spec.LockingWaitPolicy, LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, Alloc: &ij.alloc, - MemMonitor: flowCtx.EvalCtx.Mon, + MemMonitor: flowCtx.Mon, Spec: &spec.FetchSpec, TraceKV: flowCtx.TraceKV, ForceProductionKVBatchSize: flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, @@ -317,7 +317,7 @@ func newInvertedJoiner( ij.spanBuilder.InitWithFetchSpec(flowCtx.EvalCtx, flowCtx.Codec(), &ij.fetchSpec) // Initialize memory monitors and row container for index rows. - ij.MemMonitor = execinfra.NewLimitedMonitor(ctx, flowCtx.EvalCtx.Mon, flowCtx, "invertedjoiner-limited") + ij.MemMonitor = execinfra.NewLimitedMonitor(ctx, flowCtx.Mon, flowCtx, "invertedjoiner-limited") ij.diskMonitor = execinfra.NewMonitor(ctx, flowCtx.DiskMonitor, "invertedjoiner-disk") ij.indexRows = rowcontainer.NewDiskBackedNumberedRowContainer( true, /* deDup */ diff --git a/pkg/sql/rowexec/inverted_joiner_test.go b/pkg/sql/rowexec/inverted_joiner_test.go index fe56fe1103c2..ef8ec009566a 100644 --- a/pkg/sql/rowexec/inverted_joiner_test.go +++ b/pkg/sql/rowexec/inverted_joiner_test.go @@ -636,6 +636,7 @@ func TestInvertedJoiner(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, TempStorage: tempEngine, @@ -777,6 +778,7 @@ func TestInvertedJoinerDrain(t *testing.T) { leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), leafInputState) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, TempStorage: tempEngine, diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 6c160564f77c..d9e0e661f9bd 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -440,9 +440,9 @@ func newJoinReader( // Initialize memory monitors and bound account for data structures in the joinReader. jr.MemMonitor = mon.NewMonitorInheritWithLimit( - "joinreader-mem" /* name */, memoryLimit, flowCtx.EvalCtx.Mon, + "joinreader-mem" /* name */, memoryLimit, flowCtx.Mon, ) - jr.MemMonitor.StartNoReserved(ctx, flowCtx.EvalCtx.Mon) + jr.MemMonitor.StartNoReserved(ctx, flowCtx.Mon) jr.memAcc = jr.MemMonitor.MakeBoundAccount() if err := jr.initJoinReaderStrategy(ctx, flowCtx, rightTypes, readerType); err != nil { @@ -488,9 +488,9 @@ func newJoinReader( // We need to use an unlimited monitor for the streamer's budget since // the streamer itself is responsible for staying under the limit. jr.streamerInfo.unlimitedMemMonitor = mon.NewMonitorInheritWithLimit( - "joinreader-streamer-unlimited" /* name */, math.MaxInt64, flowCtx.EvalCtx.Mon, + "joinreader-streamer-unlimited" /* name */, math.MaxInt64, flowCtx.Mon, ) - jr.streamerInfo.unlimitedMemMonitor.StartNoReserved(ctx, flowCtx.EvalCtx.Mon) + jr.streamerInfo.unlimitedMemMonitor.StartNoReserved(ctx, flowCtx.Mon) jr.streamerInfo.budgetAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount() jr.streamerInfo.txnKVStreamerMemAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount() // The index joiner can rely on the streamer to maintain the input ordering, @@ -546,7 +546,7 @@ func newJoinReader( LockWaitPolicy: spec.LockingWaitPolicy, LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, Alloc: &jr.alloc, - MemMonitor: flowCtx.EvalCtx.Mon, + MemMonitor: flowCtx.Mon, Spec: &spec.FetchSpec, TraceKV: flowCtx.TraceKV, ForceProductionKVBatchSize: flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, diff --git a/pkg/sql/rowexec/joinreader_test.go b/pkg/sql/rowexec/joinreader_test.go index a9f5b5f33ab1..a165a9891385 100644 --- a/pkg/sql/rowexec/joinreader_test.go +++ b/pkg/sql/rowexec/joinreader_test.go @@ -1096,6 +1096,7 @@ func TestJoinReader(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, TempStorage: tempEngine, @@ -1276,6 +1277,7 @@ CREATE TABLE test.t (a INT, s STRING, INDEX (a, s))`); err != nil { defer diskMonitor.Stop(ctx) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, TempStorage: tempEngine, @@ -1391,6 +1393,7 @@ func TestJoinReaderDrain(t *testing.T) { flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, TempStorage: tempEngine, @@ -1680,6 +1683,7 @@ func benchmarkJoinReader(b *testing.B, bc JRBenchConfig) { diskMonitor = execinfra.NewTestDiskMonitor(ctx, st) flowCtx = execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -1944,6 +1948,7 @@ func BenchmarkJoinReaderLookupStress(b *testing.B) { diskMonitor = execinfra.NewTestDiskMonitor(ctx, st) flowCtx = execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/rowexec/mergejoiner.go b/pkg/sql/rowexec/mergejoiner.go index e5611933ec6d..9b192fdbfbfb 100644 --- a/pkg/sql/rowexec/mergejoiner.go +++ b/pkg/sql/rowexec/mergejoiner.go @@ -88,7 +88,7 @@ func newMergeJoiner( return nil, err } - m.MemMonitor = execinfra.NewMonitor(ctx, flowCtx.EvalCtx.Mon, "mergejoiner-mem") + m.MemMonitor = execinfra.NewMonitor(ctx, flowCtx.Mon, "mergejoiner-mem") var err error m.streamMerger, err = makeStreamMerger( diff --git a/pkg/sql/rowexec/mergejoiner_test.go b/pkg/sql/rowexec/mergejoiner_test.go index 8df92a9e2eca..1a735928fd7e 100644 --- a/pkg/sql/rowexec/mergejoiner_test.go +++ b/pkg/sql/rowexec/mergejoiner_test.go @@ -725,6 +725,7 @@ func TestMergeJoiner(t *testing.T) { flowCtx := execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } post := execinfrapb.PostProcessSpec{Projection: true, OutputColumns: c.outCols} @@ -831,6 +832,7 @@ func TestConsumerClosed(t *testing.T) { flowCtx := execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } post := execinfrapb.PostProcessSpec{Projection: true, OutputColumns: outCols} m, err := newMergeJoiner(context.Background(), &flowCtx, 0 /* processorID */, &spec, leftInput, rightInput, &post, out) @@ -856,6 +858,7 @@ func BenchmarkMergeJoiner(b *testing.B) { flowCtx := &execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } spec := &execinfrapb.MergeJoinerSpec{ diff --git a/pkg/sql/rowexec/noop_test.go b/pkg/sql/rowexec/noop_test.go index fbd440d81673..edc28ef6bd99 100644 --- a/pkg/sql/rowexec/noop_test.go +++ b/pkg/sql/rowexec/noop_test.go @@ -36,6 +36,7 @@ func BenchmarkNoop(b *testing.B) { flowCtx := &execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } post := &execinfrapb.PostProcessSpec{} disposer := &rowDisposer{} diff --git a/pkg/sql/rowexec/ordinality_test.go b/pkg/sql/rowexec/ordinality_test.go index 2bc2c5521dcb..b010d60e0689 100644 --- a/pkg/sql/rowexec/ordinality_test.go +++ b/pkg/sql/rowexec/ordinality_test.go @@ -116,6 +116,7 @@ func TestOrdinality(t *testing.T) { flowCtx := execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } d, err := newOrdinalityProcessor(context.Background(), &flowCtx, 0 /* processorID */, &os, in, &execinfrapb.PostProcessSpec{}, out) @@ -164,6 +165,7 @@ func BenchmarkOrdinality(b *testing.B) { flowCtx := &execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } spec := &execinfrapb.OrdinalitySpec{} diff --git a/pkg/sql/rowexec/processor_utils_test.go b/pkg/sql/rowexec/processor_utils_test.go index a53afc20f45b..6dbc7d040289 100644 --- a/pkg/sql/rowexec/processor_utils_test.go +++ b/pkg/sql/rowexec/processor_utils_test.go @@ -45,6 +45,7 @@ func DefaultProcessorTestConfig() ProcessorTestConfig { FlowCtx: &execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, }, } } diff --git a/pkg/sql/rowexec/processors_test.go b/pkg/sql/rowexec/processors_test.go index af2e38453daf..5b8a1b4c404b 100644 --- a/pkg/sql/rowexec/processors_test.go +++ b/pkg/sql/rowexec/processors_test.go @@ -323,6 +323,7 @@ func TestProcessorBaseContext(t *testing.T) { flowCtx := &execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } defer flowCtx.EvalCtx.Stop(ctx) diff --git a/pkg/sql/rowexec/project_set_test.go b/pkg/sql/rowexec/project_set_test.go index 6293200d1461..d6f5383baf4e 100644 --- a/pkg/sql/rowexec/project_set_test.go +++ b/pkg/sql/rowexec/project_set_test.go @@ -164,6 +164,7 @@ func BenchmarkProjectSet(b *testing.B) { flowCtx := execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } in := distsqlutils.NewRowBuffer(c.inputTypes, c.input, distsqlutils.RowBufferArgs{}) diff --git a/pkg/sql/rowexec/sample_aggregator.go b/pkg/sql/rowexec/sample_aggregator.go index 599f7f4c3965..f597be7ce925 100644 --- a/pkg/sql/rowexec/sample_aggregator.go +++ b/pkg/sql/rowexec/sample_aggregator.go @@ -111,7 +111,7 @@ func newSampleAggregator( // Limit the memory use by creating a child monitor with a hard limit. // The processor will disable histogram collection if this limit is not // enough. - memMonitor := execinfra.NewLimitedMonitor(ctx, flowCtx.EvalCtx.Mon, flowCtx, "sample-aggregator-mem") + memMonitor := execinfra.NewLimitedMonitor(ctx, flowCtx.Mon, flowCtx, "sample-aggregator-mem") rankCol := len(input.OutputTypes()) - 8 s := &sampleAggregator{ spec: spec, diff --git a/pkg/sql/rowexec/sample_aggregator_test.go b/pkg/sql/rowexec/sample_aggregator_test.go index 9455535a7d5b..305a3ebdfe84 100644 --- a/pkg/sql/rowexec/sample_aggregator_test.go +++ b/pkg/sql/rowexec/sample_aggregator_test.go @@ -70,6 +70,7 @@ func runSampleAggregator( ) { flowCtx := execinfra.FlowCtx{ EvalCtx: evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, DB: kvDB, diff --git a/pkg/sql/rowexec/sampler.go b/pkg/sql/rowexec/sampler.go index 1dff9c3e5b6d..5e89b80a08f7 100644 --- a/pkg/sql/rowexec/sampler.go +++ b/pkg/sql/rowexec/sampler.go @@ -117,7 +117,7 @@ func newSamplerProcessor( // Limit the memory use by creating a child monitor with a hard limit. // The processor will disable histogram collection if this limit is not // enough. - memMonitor := execinfra.NewLimitedMonitor(ctx, flowCtx.EvalCtx.Mon, flowCtx, "sampler-mem") + memMonitor := execinfra.NewLimitedMonitor(ctx, flowCtx.Mon, flowCtx, "sampler-mem") s := &samplerProcessor{ flowCtx: flowCtx, input: input, diff --git a/pkg/sql/rowexec/sampler_test.go b/pkg/sql/rowexec/sampler_test.go index 60472faaee24..dabf46a85988 100644 --- a/pkg/sql/rowexec/sampler_test.go +++ b/pkg/sql/rowexec/sampler_test.go @@ -67,6 +67,7 @@ func runSampler( flowCtx := execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } // Override the default memory limit. If memLimitBytes is small but // non-zero, the processor will hit this limit and disable sampling. @@ -356,6 +357,7 @@ func TestSamplerSketch(t *testing.T) { Settings: st, }, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } spec := &execinfrapb.SamplerSpec{ diff --git a/pkg/sql/rowexec/sorter.go b/pkg/sql/rowexec/sorter.go index 9e874e457161..3472139c6148 100644 --- a/pkg/sql/rowexec/sorter.go +++ b/pkg/sql/rowexec/sorter.go @@ -61,7 +61,7 @@ func (s *sorterBase) init( // Limit the memory use by creating a child monitor with a hard limit. // The processor will overflow to disk if this limit is not enough. - memMonitor := execinfra.NewLimitedMonitor(ctx, flowCtx.EvalCtx.Mon, flowCtx, redact.Sprintf("%s-limited", processorName)) + memMonitor := execinfra.NewLimitedMonitor(ctx, flowCtx.Mon, flowCtx, redact.Sprintf("%s-limited", processorName)) if err := s.ProcessorBase.Init( ctx, self, post, input.OutputTypes(), flowCtx, processorID, output, memMonitor, opts, ); err != nil { diff --git a/pkg/sql/rowexec/sorter_test.go b/pkg/sql/rowexec/sorter_test.go index ecffd498f4e1..93186c5d4ab0 100644 --- a/pkg/sql/rowexec/sorter_test.go +++ b/pkg/sql/rowexec/sorter_test.go @@ -277,6 +277,7 @@ func TestSorter(t *testing.T) { defer diskMonitor.Stop(ctx) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: cluster.MakeTestingClusterSettings(), TempStorage: tempEngine, @@ -351,6 +352,7 @@ func TestSortInvalidLimit(t *testing.T) { defer diskMonitor.Stop(ctx) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -400,6 +402,7 @@ func BenchmarkSortAll(b *testing.B) { defer diskMonitor.Stop(ctx) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -443,6 +446,7 @@ func BenchmarkSortLimit(b *testing.B) { defer diskMonitor.Stop(ctx) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -491,6 +495,7 @@ func BenchmarkSortChunks(b *testing.B) { defer diskMonitor.Stop(ctx) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index a6afeb899541..c029e8dc709d 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -156,7 +156,7 @@ func newTableReader( LockWaitPolicy: spec.LockingWaitPolicy, LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, Alloc: &tr.alloc, - MemMonitor: flowCtx.EvalCtx.Mon, + MemMonitor: flowCtx.Mon, Spec: &spec.FetchSpec, TraceKV: flowCtx.TraceKV, ForceProductionKVBatchSize: flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, diff --git a/pkg/sql/rowexec/tablereader_test.go b/pkg/sql/rowexec/tablereader_test.go index 06d30d704ab7..05e9fa650b05 100644 --- a/pkg/sql/rowexec/tablereader_test.go +++ b/pkg/sql/rowexec/tablereader_test.go @@ -129,6 +129,7 @@ func TestTableReader(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, RangeCache: rangecache.NewRangeCache(s.ClusterSettings(), nil, @@ -224,6 +225,7 @@ ALTER TABLE t EXPERIMENTAL_RELOCATE VALUES (ARRAY[2], 1), (ARRAY[1], 2), (ARRAY[ flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, RangeCache: tc.Server(0).DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache(), @@ -329,6 +331,7 @@ func TestTableReaderDrain(t *testing.T) { leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), leafInputState) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, @@ -377,6 +380,7 @@ func TestLimitScans(t *testing.T) { evalCtx.TestingKnobs.ForceProductionValues = true flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, RangeCache: rangecache.NewRangeCache(s.ClusterSettings(), nil, @@ -490,6 +494,7 @@ func BenchmarkTableReader(b *testing.B) { tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "test", tableName) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, RangeCache: rangecache.NewRangeCache(s.ClusterSettings(), nil, diff --git a/pkg/sql/rowexec/utils_test.go b/pkg/sql/rowexec/utils_test.go index 665974a76079..fe2dffc2cb8d 100644 --- a/pkg/sql/rowexec/utils_test.go +++ b/pkg/sql/rowexec/utils_test.go @@ -57,6 +57,7 @@ func runProcessorTest( flowCtx := execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st, Stopper: stopper, DistSender: distSender}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Txn: txn, } diff --git a/pkg/sql/rowexec/values_test.go b/pkg/sql/rowexec/values_test.go index fe7ef24e9453..c15ff403ed11 100644 --- a/pkg/sql/rowexec/values_test.go +++ b/pkg/sql/rowexec/values_test.go @@ -49,6 +49,7 @@ func TestValuesProcessor(t *testing.T) { flowCtx := execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: evalCtx, + Mon: evalCtx.TestingMon, } v, err := newValuesProcessor(context.Background(), &flowCtx, 0 /* processorID */, &spec, &execinfrapb.PostProcessSpec{}, out) @@ -107,6 +108,7 @@ func BenchmarkValuesProcessor(b *testing.B) { flowCtx := execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, } post := execinfrapb.PostProcessSpec{} output := rowDisposer{} diff --git a/pkg/sql/rowexec/windower.go b/pkg/sql/rowexec/windower.go index 19f43e7aadb8..95f52f44dff5 100644 --- a/pkg/sql/rowexec/windower.go +++ b/pkg/sql/rowexec/windower.go @@ -123,8 +123,8 @@ func newWindower( // production. limit = memRequiredByWindower } - limitedMon := mon.NewMonitorInheritWithLimit("windower-limited", limit, evalCtx.Mon) - limitedMon.StartNoReserved(ctx, evalCtx.Mon) + limitedMon := mon.NewMonitorInheritWithLimit("windower-limited", limit, flowCtx.Mon) + limitedMon.StartNoReserved(ctx, flowCtx.Mon) w.acc = limitedMon.MakeBoundAccount() // If we have aggregate builtins that aggregate a single datum, we want // them to reuse the same shared memory account with the windower. Notably, diff --git a/pkg/sql/rowexec/windower_test.go b/pkg/sql/rowexec/windower_test.go index 58e102439c74..8d8ad05fb789 100644 --- a/pkg/sql/rowexec/windower_test.go +++ b/pkg/sql/rowexec/windower_test.go @@ -61,6 +61,7 @@ func TestWindowerAccountingForResults(t *testing.T) { flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, TempStorage: tempEngine, @@ -208,6 +209,7 @@ func BenchmarkWindower(b *testing.B) { flowCtx := &execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, }, diff --git a/pkg/sql/rowexec/zigzagjoiner.go b/pkg/sql/rowexec/zigzagjoiner.go index 7611e95ae5d7..65a2299478bd 100644 --- a/pkg/sql/rowexec/zigzagjoiner.go +++ b/pkg/sql/rowexec/zigzagjoiner.go @@ -468,7 +468,7 @@ func (z *zigzagJoiner) setupInfo( LockWaitPolicy: spec.LockingWaitPolicy, LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, Alloc: &info.alloc, - MemMonitor: flowCtx.EvalCtx.Mon, + MemMonitor: flowCtx.Mon, Spec: &spec.FetchSpec, TraceKV: flowCtx.TraceKV, ForceProductionKVBatchSize: flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, diff --git a/pkg/sql/rowexec/zigzagjoiner_test.go b/pkg/sql/rowexec/zigzagjoiner_test.go index 680cc2f6b94e..5a5e156a3094 100644 --- a/pkg/sql/rowexec/zigzagjoiner_test.go +++ b/pkg/sql/rowexec/zigzagjoiner_test.go @@ -700,6 +700,7 @@ func TestZigzagJoiner(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{Settings: st}, Txn: kv.NewTxn(ctx, s.DB(), s.NodeID()), } @@ -774,6 +775,7 @@ func TestZigzagJoinerDrain(t *testing.T) { leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), leafInputState) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{Settings: s.ClusterSettings()}, Txn: leafTxn, } diff --git a/pkg/sql/rowflow/routers.go b/pkg/sql/rowflow/routers.go index 5851f0b604fe..cc8d59df3d34 100644 --- a/pkg/sql/rowflow/routers.go +++ b/pkg/sql/rowflow/routers.go @@ -272,7 +272,7 @@ func (rb *routerBase) init(ctx context.Context, flowCtx *execinfra.FlowCtx, type // to take the mutex. evalCtx := flowCtx.NewEvalCtx() rb.outputs[i].memoryMonitor = execinfra.NewLimitedMonitor( - ctx, evalCtx.Mon, flowCtx, + ctx, flowCtx.Mon, flowCtx, redact.Sprintf("router-limited-%d", rb.outputs[i].streamID), ) rb.outputs[i].diskMonitor = execinfra.NewMonitor( @@ -283,7 +283,7 @@ func (rb *routerBase) init(ctx context.Context, flowCtx *execinfra.FlowCtx, type // to fallback to disk if a memory budget error is encountered when // we're popping rows from the row container into the row buffer. rb.outputs[i].rowBufToPushFromMon = execinfra.NewMonitor( - ctx, evalCtx.Mon, redact.Sprintf("router-unlimited-%d", rb.outputs[i].streamID), + ctx, flowCtx.Mon, redact.Sprintf("router-unlimited-%d", rb.outputs[i].streamID), ) memAcc := rb.outputs[i].rowBufToPushFromMon.MakeBoundAccount() rb.outputs[i].rowBufToPushFromAcc = &memAcc diff --git a/pkg/sql/rowflow/routers_test.go b/pkg/sql/rowflow/routers_test.go index c4c72098f256..b80a567bd8ed 100644 --- a/pkg/sql/rowflow/routers_test.go +++ b/pkg/sql/rowflow/routers_test.go @@ -67,6 +67,7 @@ func setupRouter( Settings: st, }, EvalCtx: evalCtx, + Mon: evalCtx.TestingMon, DiskMonitor: diskMonitor, } r.init(ctx, &flowCtx, inputTypes) @@ -676,6 +677,7 @@ func TestRouterBlocks(t *testing.T) { Settings: st, }, EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, DiskMonitor: diskMonitor, } router.init(ctx, &flowCtx, colTypes) @@ -786,6 +788,7 @@ func TestRouterDiskSpill(t *testing.T) { defer evalCtx.Stop(ctx) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ Settings: st, TempStorage: tempEngine, @@ -799,10 +802,10 @@ func TestRouterDiskSpill(t *testing.T) { // memErrorWhenConsumingRows indicates whether we expect an OOM error to // occur when we're consuming rows from the row channel. By default, it // will occur because routerOutput derives a memory monitor for the row - // buffer from evalCtx.Mon which has a limit, and we're going to consume - // rows after the spilling has occurred (meaning that evalCtx.Mon reached - // its limit). In order for this to not happen we will create a separate - // memory account. + // buffer from evalCtx.TestingMon which has a limit, and we're going to + // consume rows after the spilling has occurred (meaning that + // evalCtx.TestingMon reached its limit). In order for this to not happen we + // will create a separate memory account. for _, memErrorWhenConsumingRows := range []bool{false, true} { var ( rowChan execinfra.RowChannel diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index d3b13dfcd0d0..98bb8ae68e94 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -340,7 +340,7 @@ func (sc *SchemaChanger) backfillQueryIntoTable( // subqueries. Note that we intentionally defer the closure of // the account until we return from this method (after the main // query is executed). - subqueryResultMemAcc := localPlanner.EvalContext().Mon.MakeBoundAccount() + subqueryResultMemAcc := localPlanner.Mon().MakeBoundAccount() defer subqueryResultMemAcc.Close(ctx) if !sc.distSQLPlanner.PlanAndRunSubqueries( ctx, localPlanner, localPlanner.ExtendedEvalContextCopy, diff --git a/pkg/sql/sem/builtins/aggregate_builtins.go b/pkg/sql/sem/builtins/aggregate_builtins.go index f80d91a7b303..a57d4e19bebd 100644 --- a/pkg/sql/sem/builtins/aggregate_builtins.go +++ b/pkg/sql/sem/builtins/aggregate_builtins.go @@ -511,7 +511,7 @@ var aggregates = map[string]builtinDefinition{ params []*types.T, evalCtx *eval.Context, arguments tree.Datums, ) eval.AggregateFunc { return &stMakeLineAgg{ - acc: evalCtx.Mon.MakeBoundAccount(), + acc: evalCtx.Planner.Mon().MakeBoundAccount(), } }, infoBuilder{ @@ -785,7 +785,7 @@ func makeSTUnionBuiltin() builtinDefinition { params []*types.T, evalCtx *eval.Context, arguments tree.Datums, ) eval.AggregateFunc { return &stUnionAgg{ - acc: evalCtx.Mon.MakeBoundAccount(), + acc: evalCtx.Planner.Mon().MakeBoundAccount(), } }, infoBuilder{ @@ -975,7 +975,7 @@ type stCollectAgg struct { func newSTCollectAgg(_ []*types.T, evalCtx *eval.Context, _ tree.Datums) eval.AggregateFunc { return &stCollectAgg{ - acc: evalCtx.Mon.MakeBoundAccount(), + acc: evalCtx.Planner.Mon().MakeBoundAccount(), } } @@ -1308,7 +1308,7 @@ const ( // will be used by the new struct which will operate in "shared" mode func makeSingleDatumAggregateBase(evalCtx *eval.Context) singleDatumAggregateBase { if evalCtx.SingleDatumAggMemAccount == nil { - newAcc := evalCtx.Mon.MakeBoundAccount() + newAcc := evalCtx.Planner.Mon().MakeBoundAccount() return singleDatumAggregateBase{ mode: nonSharedSingleDatumAggregateBaseMode, acc: &newAcc, @@ -1435,7 +1435,7 @@ type arrayAggregate struct { func newArrayAggregate(params []*types.T, evalCtx *eval.Context, _ tree.Datums) eval.AggregateFunc { return &arrayAggregate{ arr: tree.NewDArray(params[0]), - acc: evalCtx.Mon.MakeBoundAccount(), + acc: evalCtx.Planner.Mon().MakeBoundAccount(), } } @@ -4517,7 +4517,7 @@ func newPercentileDiscAggregate( ) eval.AggregateFunc { return &percentileDiscAggregate{ arr: tree.NewDArray(params[1]), - acc: evalCtx.Mon.MakeBoundAccount(), + acc: evalCtx.Planner.Mon().MakeBoundAccount(), } } @@ -4615,7 +4615,7 @@ func newPercentileContAggregate( ) eval.AggregateFunc { return &percentileContAggregate{ arr: tree.NewDArray(params[1]), - acc: evalCtx.Mon.MakeBoundAccount(), + acc: evalCtx.Planner.Mon().MakeBoundAccount(), } } diff --git a/pkg/sql/sem/builtins/generator_builtins.go b/pkg/sql/sem/builtins/generator_builtins.go index 3ccac51bbfa4..b4a89d644752 100644 --- a/pkg/sql/sem/builtins/generator_builtins.go +++ b/pkg/sql/sem/builtins/generator_builtins.go @@ -2058,7 +2058,7 @@ type spanKeyIterator struct { func newSpanKeyIterator(evalCtx *eval.Context, span roachpb.Span) *spanKeyIterator { return &spanKeyIterator{ - acc: evalCtx.Mon.MakeBoundAccount(), + acc: evalCtx.Planner.Mon().MakeBoundAccount(), span: span, } } @@ -2162,7 +2162,7 @@ func makeRangeKeyIterator(ctx *eval.Context, args tree.Datums) (eval.ValueGenera rangeID := roachpb.RangeID(tree.MustBeDInt(args[0])) return &rangeKeyIterator{ spanKeyIterator: spanKeyIterator{ - acc: ctx.Mon.MakeBoundAccount(), + acc: ctx.Planner.Mon().MakeBoundAccount(), }, rangeID: rangeID, }, nil @@ -2475,7 +2475,7 @@ func makeShowCreateAllSchemasGenerator( return &showCreateAllSchemasGenerator{ evalPlanner: ctx.Planner, dbName: dbName, - acc: ctx.Mon.MakeBoundAccount(), + acc: ctx.Planner.Mon().MakeBoundAccount(), }, nil } @@ -2631,7 +2631,7 @@ func makeShowCreateAllTablesGenerator( return &showCreateAllTablesGenerator{ evalPlanner: ctx.Planner, dbName: dbName, - acc: ctx.Mon.MakeBoundAccount(), + acc: ctx.Planner.Mon().MakeBoundAccount(), sessionData: ctx.SessionData(), }, nil } @@ -2712,6 +2712,6 @@ func makeShowCreateAllTypesGenerator( return &showCreateAllTypesGenerator{ evalPlanner: ctx.Planner, dbName: dbName, - acc: ctx.Mon.MakeBoundAccount(), + acc: ctx.Planner.Mon().MakeBoundAccount(), }, nil } diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index bf336d86fcdf..b787c8609048 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -175,7 +175,11 @@ type Context struct { TestingKnobs TestingKnobs - Mon *mon.BytesMonitor + // TestingMon is a memory monitor that should be only used in tests. In + // production code consider using either the monitor of the planner or of + // the flow. + // TODO(yuzefovich): remove this. + TestingMon *mon.BytesMonitor // SingleDatumAggMemAccount is a memory account that all aggregate builtins // that store a single datum will share to account for the memory needed to @@ -309,7 +313,8 @@ func MakeTestingEvalContextWithMon(st *cluster.Settings, monitor *mon.BytesMonit NodeID: base.TestingIDContainer, } monitor.Start(context.Background(), nil /* pool */, mon.NewStandaloneBudget(math.MaxInt64)) - ctx.Mon = monitor + ctx.TestingMon = monitor + ctx.Planner = &fakePlannerWithMonitor{monitor: monitor} ctx.Context = context.TODO() now := timeutil.Now() ctx.SetTxnTimestamp(now) @@ -317,6 +322,16 @@ func MakeTestingEvalContextWithMon(st *cluster.Settings, monitor *mon.BytesMonit return ctx } +type fakePlannerWithMonitor struct { + Planner + monitor *mon.BytesMonitor +} + +// Mon is part of the Planner interface. +func (p *fakePlannerWithMonitor) Mon() *mon.BytesMonitor { + return p.monitor +} + // SessionData returns the SessionData the current EvalCtx should use to eval. func (ec *Context) SessionData() *sessiondata.SessionData { if ec.SessionDataStack == nil { @@ -368,10 +383,10 @@ func NewTestingEvalContext(st *cluster.Settings) *Context { // Stop closes out the EvalContext and must be called once it is no longer in use. func (ec *Context) Stop(c context.Context) { if r := recover(); r != nil { - ec.Mon.EmergencyStop(c) + ec.TestingMon.EmergencyStop(c) panic(r) } else { - ec.Mon.Stop(c) + ec.TestingMon.Stop(c) } } diff --git a/pkg/sql/sem/eval/deps.go b/pkg/sql/sem/eval/deps.go index c22e20a3f0ce..3a8003111748 100644 --- a/pkg/sql/sem/eval/deps.go +++ b/pkg/sql/sem/eval/deps.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/lib/pq/oid" ) @@ -199,6 +200,9 @@ type Planner interface { TypeResolver tree.FunctionReferenceResolver + // Mon returns the Planner's monitor. + Mon() *mon.BytesMonitor + // ExecutorConfig returns *ExecutorConfig ExecutorConfig() interface{} diff --git a/pkg/sql/sem/eval/eval_test.go b/pkg/sql/sem/eval/eval_test.go index a5a3518731d0..a849a98f4892 100644 --- a/pkg/sql/sem/eval/eval_test.go +++ b/pkg/sql/sem/eval/eval_test.go @@ -193,7 +193,7 @@ func TestTimeConversion(t *testing.T) { for _, test := range tests { ctx := eval.NewTestingEvalContext(cluster.MakeTestingClusterSettings()) - defer ctx.Mon.Stop(context.Background()) + defer ctx.TestingMon.Stop(context.Background()) exprStr := fmt.Sprintf("experimental_strptime('%s', '%s')", test.start, test.format) expr, err := parser.ParseExpr(exprStr) if err != nil { diff --git a/pkg/sql/sem/eval/eval_test/eval_test.go b/pkg/sql/sem/eval/eval_test/eval_test.go index 8d7ed4db1158..586ab6b909cc 100644 --- a/pkg/sql/sem/eval/eval_test/eval_test.go +++ b/pkg/sql/sem/eval/eval_test/eval_test.go @@ -145,6 +145,7 @@ func TestEval(t *testing.T) { flowCtx := &execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{Settings: st}, EvalCtx: evalCtx, + Mon: evalCtx.TestingMon, } memMonitor := execinfra.NewTestMemMonitor(ctx, st) defer memMonitor.Stop(ctx) diff --git a/pkg/sql/sem/eval/like_test.go b/pkg/sql/sem/eval/like_test.go index 6fedfdc109c3..2e31071c539e 100644 --- a/pkg/sql/sem/eval/like_test.go +++ b/pkg/sql/sem/eval/like_test.go @@ -171,7 +171,7 @@ func benchmarkLike(b *testing.B, ctx *Context, caseInsensitive bool) { func BenchmarkLikeWithCache(b *testing.B) { ctx := NewTestingEvalContext(cluster.MakeTestingClusterSettings()) ctx.ReCache = tree.NewRegexpCache(len(benchmarkLikePatterns)) - defer ctx.Mon.Stop(context.Background()) + defer ctx.TestingMon.Stop(context.Background()) benchmarkLike(b, ctx, false) } @@ -186,7 +186,7 @@ func BenchmarkLikeWithoutCache(b *testing.B) { func BenchmarkILikeWithCache(b *testing.B) { ctx := NewTestingEvalContext(cluster.MakeTestingClusterSettings()) ctx.ReCache = tree.NewRegexpCache(len(benchmarkLikePatterns)) - defer ctx.Mon.Stop(context.Background()) + defer ctx.TestingMon.Stop(context.Background()) benchmarkLike(b, ctx, true) } diff --git a/pkg/sql/sem/normalize/constant_eval_test.go b/pkg/sql/sem/normalize/constant_eval_test.go index 4c4cd19d969e..d71bce8272b8 100644 --- a/pkg/sql/sem/normalize/constant_eval_test.go +++ b/pkg/sql/sem/normalize/constant_eval_test.go @@ -43,7 +43,7 @@ func TestConstantEvalArrayComparison(t *testing.T) { } ctx := eval.NewTestingEvalContext(cluster.MakeTestingClusterSettings()) - defer ctx.Mon.Stop(context.Background()) + defer ctx.TestingMon.Stop(context.Background()) c := normalize.MakeConstantEvalVisitor(ctx) expr, _ = tree.WalkExpr(&c, typedExpr) if err := c.Err(); err != nil { diff --git a/pkg/sql/sem/normalize/normalize_test.go b/pkg/sql/sem/normalize/normalize_test.go index c1a7d23ac4fd..796b6024339e 100644 --- a/pkg/sql/sem/normalize/normalize_test.go +++ b/pkg/sql/sem/normalize/normalize_test.go @@ -236,7 +236,7 @@ func TestNormalizeExpr(t *testing.T) { } rOrig := typedExpr.String() ctx := eval.NewTestingEvalContext(cluster.MakeTestingClusterSettings()) - defer ctx.Mon.Stop(context.Background()) + defer ctx.TestingMon.Stop(context.Background()) r, err := normalize.Expr(ctx, typedExpr) if err != nil { t.Fatalf("%s: %v", d.expr, err) diff --git a/pkg/sql/sem/tree/compare_test.go b/pkg/sql/sem/tree/compare_test.go index 43be02d04409..75a1e0501fe3 100644 --- a/pkg/sql/sem/tree/compare_test.go +++ b/pkg/sql/sem/tree/compare_test.go @@ -56,7 +56,7 @@ func TestEvalComparisonExprCaching(t *testing.T) { Right: tree.NewDString(d.right), } ctx := eval.NewTestingEvalContext(cluster.MakeTestingClusterSettings()) - defer ctx.Mon.Stop(context.Background()) + defer ctx.TestingMon.Stop(context.Background()) ctx.ReCache = tree.NewRegexpCache(8) typedExpr, err := tree.TypeCheck(context.Background(), expr, nil, types.Any) if err != nil { diff --git a/pkg/sql/sem/tree/expr_test.go b/pkg/sql/sem/tree/expr_test.go index 136da18af882..d80d4d151c7f 100644 --- a/pkg/sql/sem/tree/expr_test.go +++ b/pkg/sql/sem/tree/expr_test.go @@ -175,7 +175,7 @@ func TestExprString(t *testing.T) { } // Compare the normalized expressions. ctx := eval.NewTestingEvalContext(cluster.MakeTestingClusterSettings()) - defer ctx.Mon.Stop(context.Background()) + defer ctx.TestingMon.Stop(context.Background()) normalized, err := normalize.Expr(ctx, typedExpr) if err != nil { t.Fatalf("%s: %v", exprStr, err) diff --git a/pkg/sql/spool.go b/pkg/sql/spool.go index 5b41c044da6b..50cabbb08937 100644 --- a/pkg/sql/spool.go +++ b/pkg/sql/spool.go @@ -45,7 +45,7 @@ func (s *spoolNode) startExec(params runParams) error { } s.rows = rowcontainer.NewRowContainer( - params.EvalContext().Mon.MakeBoundAccount(), + params.p.Mon().MakeBoundAccount(), colinfo.ColTypeInfoFromResCols(planColumns(s.source)), ) diff --git a/pkg/sql/tablewriter_upsert_opt.go b/pkg/sql/tablewriter_upsert_opt.go index 05fb2f31e201..df325ac96131 100644 --- a/pkg/sql/tablewriter_upsert_opt.go +++ b/pkg/sql/tablewriter_upsert_opt.go @@ -110,7 +110,7 @@ func (tu *optTableUpserter) init( if tu.rowsNeeded { tu.resultRow = make(tree.Datums, len(tu.returnCols)) tu.rows = rowcontainer.NewRowContainer( - evalCtx.Mon.MakeBoundAccount(), + evalCtx.Planner.Mon().MakeBoundAccount(), colinfo.ColTypeInfoFromColumns(tu.returnCols), ) diff --git a/pkg/sql/testutils.go b/pkg/sql/testutils.go index 971b59e36f4c..7f8ffd390b74 100644 --- a/pkg/sql/testutils.go +++ b/pkg/sql/testutils.go @@ -131,6 +131,7 @@ func (dsp *DistSQLPlanner) Exec( if err := p.makeOptimizerPlan(ctx); err != nil { return err } + defer p.curPlan.close(ctx) rw := NewCallbackResultWriter(func(ctx context.Context, row tree.Datums) error { return nil }) @@ -157,7 +158,7 @@ func (dsp *DistSQLPlanner) Exec( distributionType) planCtx.stmtType = recv.stmtType - dsp.PlanAndRun(ctx, evalCtx, planCtx, p.txn, p.curPlan.main, recv)() + dsp.PlanAndRun(ctx, evalCtx, planCtx, p.txn, p.curPlan.main, recv) return rw.Err() } diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go index ad37c6369613..842a1aae4536 100644 --- a/pkg/sql/ttl/ttljob/ttljob.go +++ b/pkg/sql/ttl/ttljob/ttljob.go @@ -313,7 +313,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx - cleanup := distSQLPlanner.Run( + distSQLPlanner.Run( ctx, planCtx, nil, /* txn */ @@ -322,7 +322,6 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err &evalCtxCopy, nil, /* finishedSetupFn */ ) - defer cleanup() return metadataCallbackWriter.Err() }() diff --git a/pkg/sql/update.go b/pkg/sql/update.go index c28d66dc42df..25dcfb2cce84 100644 --- a/pkg/sql/update.go +++ b/pkg/sql/update.go @@ -127,7 +127,7 @@ func (u *updateNode) startExec(params runParams) error { if u.run.rowsNeeded { u.run.tu.rows = rowcontainer.NewRowContainer( - params.EvalContext().Mon.MakeBoundAccount(), + params.p.Mon().MakeBoundAccount(), colinfo.ColTypeInfoFromResCols(u.columns), ) } diff --git a/pkg/sql/values.go b/pkg/sql/values.go index 6e5e7e04bda4..dc994baee5fb 100644 --- a/pkg/sql/values.go +++ b/pkg/sql/values.go @@ -42,7 +42,7 @@ func (p *planner) newContainerValuesNode(columns colinfo.ResultColumns, capacity columns: columns, valuesRun: valuesRun{ rows: rowcontainer.NewRowContainerWithCapacity( - p.EvalContext().Mon.MakeBoundAccount(), + p.Mon().MakeBoundAccount(), colinfo.ColTypeInfoFromResCols(columns), capacity, ), @@ -68,7 +68,7 @@ func (n *valuesNode) startExec(params runParams) error { // from other planNodes), so its expressions need evaluating. // This may run subqueries. n.rows = rowcontainer.NewRowContainerWithCapacity( - params.extendedEvalCtx.Mon.MakeBoundAccount(), + params.p.Mon().MakeBoundAccount(), colinfo.ColTypeInfoFromResCols(n.columns), len(n.tuples), ) diff --git a/pkg/sql/virtual_table.go b/pkg/sql/virtual_table.go index 1147e79bc6b0..49a2212a2357 100644 --- a/pkg/sql/virtual_table.go +++ b/pkg/sql/virtual_table.go @@ -258,7 +258,7 @@ var _ rowPusher = &vTableLookupJoinNode{} func (v *vTableLookupJoinNode) startExec(params runParams) error { v.run.keyCtx = constraint.KeyContext{EvalCtx: params.EvalContext()} v.run.rows = rowcontainer.NewRowContainer( - params.EvalContext().Mon.MakeBoundAccount(), + params.p.Mon().MakeBoundAccount(), colinfo.ColTypeInfoFromResCols(v.columns), ) v.run.indexKeyDatums = make(tree.Datums, len(v.columns))