From 15321ee00583cef96bc9e1e96cd0dd784aaa9e2b Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 19 May 2022 16:46:13 -0700 Subject: [PATCH] colexec: add more redundancy to releasing disk resources As a couple of recently-found issues showed, making sure that all disk resources are released can be tricky since disk-backed operators can form large graphs with multiple external operators supporting a single operation. This commit makes the release of disk resources more bullet-proof by auditing all users of the vectorized disk queues to make sure they are added to `OpWithMetaInfo.ToClose` which are closed on the flow cleanup. Since `Close` can be safely called multiple times, it adds some redundancy, leaning on the side of caution. In particular, the following changes are made: - external distinct and external hash aggregators are explicitly added to `ToClose` slice. They should already be now closed by the `diskSpillerBase`, but it doesn't hurt closing them explicitly. - window aggregator operator has been refactored so that it doesn't throw an error in its `Close` method - with the previous version it was possible to panic during the `Close` execution and possibly leak some resources. - signatures of the constructor methods have been adjusted to return `ClosableOperator` to make the need for closing be more explicit. - each router output is now a `Closer` and the consumer of each output is now resposible for closing it. Again, I'm pretty sure that each output will have been closed by that time the consumer explicitly tries to close the output, yet there is no harm in closing it twice. An additional minor cleanup is the removal of the usage of an embedded context in a couple `Close` implementations given that the function takes it as an argument. Release note: None --- pkg/sql/colexec/colbuilder/execplan.go | 14 ++++++++++---- pkg/sql/colexec/colexecdisk/external_distinct.go | 6 +++--- .../colexecdisk/external_hash_aggregator.go | 6 +++--- .../colexec/colexecdisk/external_hash_joiner.go | 2 +- pkg/sql/colexec/colexecjoin/crossjoiner.go | 2 +- pkg/sql/colexec/colexecwindow/buffered_window.go | 13 ++++--------- .../colexecwindow/count_rows_aggregator.go | 2 +- .../colexecwindow/first_last_nth_value_tmpl.go | 2 +- pkg/sql/colexec/colexecwindow/first_value.eg.go | 2 +- pkg/sql/colexec/colexecwindow/lag.eg.go | 2 +- pkg/sql/colexec/colexecwindow/last_value.eg.go | 2 +- pkg/sql/colexec/colexecwindow/lead.eg.go | 2 +- pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go | 2 +- pkg/sql/colexec/colexecwindow/nth_value.eg.go | 2 +- .../colexec/colexecwindow/relative_rank.eg.go | 16 ++++------------ .../colexec/colexecwindow/relative_rank_tmpl.go | 4 +--- .../colexecwindow/window_aggregator.eg.go | 9 +-------- .../colexecwindow/window_aggregator_tmpl.go | 9 +-------- .../colexecwindow/window_functions_test.go | 9 ++++++++- pkg/sql/colexec/columnarizer.go | 5 +---- pkg/sql/colexec/external_distinct_test.go | 12 ++++++------ pkg/sql/colexec/external_hash_aggregator_test.go | 7 +++---- pkg/sql/colexec/invariants_checker.go | 5 ++--- .../colexec/parallel_unordered_synchronizer.go | 3 +-- pkg/sql/colexecop/operator.go | 8 ++++---- pkg/sql/colflow/routers.go | 12 +++++++++--- pkg/sql/colflow/routers_test.go | 2 +- pkg/sql/colflow/vectorized_flow.go | 5 +++-- 28 files changed, 75 insertions(+), 90 deletions(-) diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 5f18a82cfa2a..f767db8aa733 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -943,7 +943,7 @@ func NewColOperator( newAggArgs.Allocator = colmem.NewAllocator(ctx, ehaMemAccount, factory) newAggArgs.MemAccount = ehaMemAccount newAggArgs.Input = input - return colexecdisk.NewExternalHashAggregator( + eha, toClose := colexecdisk.NewExternalHashAggregator( flowCtx, args, &newAggArgs, @@ -957,6 +957,8 @@ func NewColOperator( outputUnlimitedAllocator, maxOutputBatchMemSize, ) + result.ToClose = append(result.ToClose, toClose) + return eha }, args.TestingKnobs.SpillingCallbackFn, ) @@ -1006,7 +1008,7 @@ func NewColOperator( unlimitedAllocator := colmem.NewAllocator( ctx, args.MonitorRegistry.CreateUnlimitedMemAccount(ctx, flowCtx, edOpName, spec.ProcessorID), factory, ) - return colexecdisk.NewExternalDistinct( + ed, toClose := colexecdisk.NewExternalDistinct( unlimitedAllocator, flowCtx, args, @@ -1016,6 +1018,8 @@ func NewColOperator( inMemoryUnorderedDistinct, diskAccount, ) + result.ToClose = append(result.ToClose, toClose) + return ed }, args.TestingKnobs.SpillingCallbackFn, ) @@ -1106,7 +1110,7 @@ func NewColOperator( result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, opName, factory), diskAccount, ) - result.ToClose = append(result.ToClose, ehj.(colexecop.Closer)) + result.ToClose = append(result.ToClose, ehj) return ehj }, args.TestingKnobs.SpillingCallbackFn, @@ -1419,7 +1423,9 @@ func NewColOperator( } result.Root = colexecwindow.NewWindowAggregatorOperator( windowArgs, aggType, wf.Frame, &wf.Ordering, argIdxs, - aggArgs.OutputTypes[0], aggFnsAlloc, toClose) + aggArgs.OutputTypes[0], aggFnsAlloc, + ) + result.ToClose = append(result.ToClose, toClose...) returnType = aggArgs.OutputTypes[0] } } else { diff --git a/pkg/sql/colexec/colexecdisk/external_distinct.go b/pkg/sql/colexec/colexecdisk/external_distinct.go index 14ea6a93c200..598c8c0624c7 100644 --- a/pkg/sql/colexec/colexecdisk/external_distinct.go +++ b/pkg/sql/colexec/colexecdisk/external_distinct.go @@ -36,7 +36,7 @@ func NewExternalDistinct( createDiskBackedSorter DiskBackedSorterConstructor, inMemUnorderedDistinct colexecop.Operator, diskAcc *mon.BoundAccount, -) colexecop.Operator { +) (colexecop.Operator, colexecop.Closer) { distinctSpec := args.Spec.Core.Distinct distinctCols := distinctSpec.DistinctColumns inMemMainOpConstructor := func(partitionedInputs []*partitionerToOperator) colexecop.ResettableOperator { @@ -111,13 +111,13 @@ func NewExternalDistinct( outputOrdering := args.Spec.Core.Distinct.OutputOrdering if len(outputOrdering.Columns) == 0 { // No particular output ordering is required. - return ed + return ed, ed } // TODO(yuzefovich): the fact that we're planning an additional external // sort isn't accounted for when considering the number file descriptors to // acquire. Not urgent, but it should be fixed. maxNumberActivePartitions := calculateMaxNumberActivePartitions(flowCtx, args, numRequiredActivePartitions) - return createDiskBackedSorter(ed, inputTypes, outputOrdering.Columns, maxNumberActivePartitions) + return createDiskBackedSorter(ed, inputTypes, outputOrdering.Columns, maxNumberActivePartitions), ed } // unorderedDistinctFilterer filters out tuples that are duplicates of the diff --git a/pkg/sql/colexec/colexecdisk/external_hash_aggregator.go b/pkg/sql/colexec/colexecdisk/external_hash_aggregator.go index f7f610c0297c..dc9bd7177824 100644 --- a/pkg/sql/colexec/colexecdisk/external_hash_aggregator.go +++ b/pkg/sql/colexec/colexecdisk/external_hash_aggregator.go @@ -39,7 +39,7 @@ func NewExternalHashAggregator( diskAcc *mon.BoundAccount, outputUnlimitedAllocator *colmem.Allocator, maxOutputBatchMemSize int64, -) colexecop.Operator { +) (colexecop.Operator, colexecop.Closer) { inMemMainOpConstructor := func(partitionedInputs []*partitionerToOperator) colexecop.ResettableOperator { newAggArgs := *newAggArgs newAggArgs.Input = partitionedInputs[0] @@ -83,11 +83,11 @@ func NewExternalHashAggregator( outputOrdering := args.Spec.Core.Aggregator.OutputOrdering if len(outputOrdering.Columns) == 0 { // No particular output ordering is required. - return eha + return eha, eha } // TODO(yuzefovich): the fact that we're planning an additional external // sort isn't accounted for when considering the number file descriptors to // acquire. Not urgent, but it should be fixed. maxNumberActivePartitions := calculateMaxNumberActivePartitions(flowCtx, args, ehaNumRequiredActivePartitions) - return createDiskBackedSorter(eha, newAggArgs.OutputTypes, outputOrdering.Columns, maxNumberActivePartitions) + return createDiskBackedSorter(eha, newAggArgs.OutputTypes, outputOrdering.Columns, maxNumberActivePartitions), eha } diff --git a/pkg/sql/colexec/colexecdisk/external_hash_joiner.go b/pkg/sql/colexec/colexecdisk/external_hash_joiner.go index 01ea8658cb77..f024df2ff77d 100644 --- a/pkg/sql/colexec/colexecdisk/external_hash_joiner.go +++ b/pkg/sql/colexec/colexecdisk/external_hash_joiner.go @@ -68,7 +68,7 @@ func NewExternalHashJoiner( leftInput, rightInput colexecop.Operator, createDiskBackedSorter DiskBackedSorterConstructor, diskAcc *mon.BoundAccount, -) colexecop.Operator { +) colexecop.ClosableOperator { // This memory limit will restrict the size of the batches output by the // in-memory hash joiner in the main strategy as well as by the merge joiner // in the fallback strategy. diff --git a/pkg/sql/colexec/colexecjoin/crossjoiner.go b/pkg/sql/colexec/colexecjoin/crossjoiner.go index c44084571cff..87a4cd8c5319 100644 --- a/pkg/sql/colexec/colexecjoin/crossjoiner.go +++ b/pkg/sql/colexec/colexecjoin/crossjoiner.go @@ -39,7 +39,7 @@ func NewCrossJoiner( leftTypes []*types.T, rightTypes []*types.T, diskAcc *mon.BoundAccount, -) colexecop.Operator { +) colexecop.ClosableOperator { return &crossJoiner{ crossJoinerBase: newCrossJoinerBase( unlimitedAllocator, diff --git a/pkg/sql/colexec/colexecwindow/buffered_window.go b/pkg/sql/colexec/colexecwindow/buffered_window.go index a5f1cf83c27d..2e4af2d853aa 100644 --- a/pkg/sql/colexec/colexecwindow/buffered_window.go +++ b/pkg/sql/colexec/colexecwindow/buffered_window.go @@ -30,7 +30,7 @@ import ( // window function. func newBufferedWindowOperator( args *WindowArgs, windower bufferedWindower, outputColType *types.T, memoryLimit int64, -) colexecop.Operator { +) colexecop.ClosableOperator { outputTypes := make([]*types.T, len(args.InputTypes), len(args.InputTypes)+1) copy(outputTypes, args.InputTypes) outputTypes = append(outputTypes, outputColType) @@ -213,7 +213,7 @@ func (b *bufferedWindowOp) Init(ctx context.Context) { b.windower.startNewPartition() } -var _ colexecop.Operator = &bufferedWindowOp{} +var _ colexecop.ClosableOperator = &bufferedWindowOp{} func (b *bufferedWindowOp) Next() coldata.Batch { var err error @@ -343,16 +343,11 @@ func (b *bufferedWindowOp) Next() coldata.Batch { } func (b *bufferedWindowOp) Close(ctx context.Context) error { - if !b.CloserHelper.Close() || b.Ctx == nil { - // Either Close() has already been called or Init() was never called. In - // both cases there is nothing to do. + if !b.CloserHelper.Close() { return nil } - if err := b.bufferQueue.Close(ctx); err != nil { - return err - } b.windower.Close(ctx) - return nil + return b.bufferQueue.Close(ctx) } // partitionSeekerBase extracts common fields and methods for buffered windower diff --git a/pkg/sql/colexec/colexecwindow/count_rows_aggregator.go b/pkg/sql/colexec/colexecwindow/count_rows_aggregator.go index 20af8be13cab..71f830c1f3fc 100644 --- a/pkg/sql/colexec/colexecwindow/count_rows_aggregator.go +++ b/pkg/sql/colexec/colexecwindow/count_rows_aggregator.go @@ -25,7 +25,7 @@ import ( // aggregate window function. func NewCountRowsOperator( args *WindowArgs, frame *execinfrapb.WindowerSpec_Frame, ordering *execinfrapb.Ordering, -) colexecop.Operator { +) colexecop.ClosableOperator { // Because the buffer is potentially used multiple times per-row, it is // important to prevent it from spilling to disk if possible. For this reason, // we give the buffer half of the memory budget even though it will generally diff --git a/pkg/sql/colexec/colexecwindow/first_last_nth_value_tmpl.go b/pkg/sql/colexec/colexecwindow/first_last_nth_value_tmpl.go index 4efdea49c86f..60e14f3ffd49 100644 --- a/pkg/sql/colexec/colexecwindow/first_last_nth_value_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/first_last_nth_value_tmpl.go @@ -56,7 +56,7 @@ func New_UPPERCASE_NAMEOperator( frame *execinfrapb.WindowerSpec_Frame, ordering *execinfrapb.Ordering, argIdxs []int, -) (colexecop.Operator, error) { +) (colexecop.ClosableOperator, error) { framer := newWindowFramer(args.EvalCtx, frame, ordering, args.InputTypes, args.PeersColIdx) colsToStore := framer.getColsToStore([]int{argIdxs[0]}) diff --git a/pkg/sql/colexec/colexecwindow/first_value.eg.go b/pkg/sql/colexec/colexecwindow/first_value.eg.go index 2a666afbc323..8d4e303acd7a 100644 --- a/pkg/sql/colexec/colexecwindow/first_value.eg.go +++ b/pkg/sql/colexec/colexecwindow/first_value.eg.go @@ -29,7 +29,7 @@ func NewFirstValueOperator( frame *execinfrapb.WindowerSpec_Frame, ordering *execinfrapb.Ordering, argIdxs []int, -) (colexecop.Operator, error) { +) (colexecop.ClosableOperator, error) { framer := newWindowFramer(args.EvalCtx, frame, ordering, args.InputTypes, args.PeersColIdx) colsToStore := framer.getColsToStore([]int{argIdxs[0]}) diff --git a/pkg/sql/colexec/colexecwindow/lag.eg.go b/pkg/sql/colexec/colexecwindow/lag.eg.go index 271974c3d70a..b7a02e32c396 100644 --- a/pkg/sql/colexec/colexecwindow/lag.eg.go +++ b/pkg/sql/colexec/colexecwindow/lag.eg.go @@ -25,7 +25,7 @@ import ( // should put its output (if there is no such column, a new column is appended). func NewLagOperator( args *WindowArgs, argIdx int, offsetIdx int, defaultIdx int, -) (colexecop.Operator, error) { +) (colexecop.ClosableOperator, error) { // Allow the direct-access buffer 10% of the available memory. The rest will // be given to the bufferedWindowOp queue. While it is somewhat more important // for the direct-access buffer tuples to be kept in-memory, it only has to diff --git a/pkg/sql/colexec/colexecwindow/last_value.eg.go b/pkg/sql/colexec/colexecwindow/last_value.eg.go index b6e2ef4a8cf9..721fa8001555 100644 --- a/pkg/sql/colexec/colexecwindow/last_value.eg.go +++ b/pkg/sql/colexec/colexecwindow/last_value.eg.go @@ -29,7 +29,7 @@ func NewLastValueOperator( frame *execinfrapb.WindowerSpec_Frame, ordering *execinfrapb.Ordering, argIdxs []int, -) (colexecop.Operator, error) { +) (colexecop.ClosableOperator, error) { framer := newWindowFramer(args.EvalCtx, frame, ordering, args.InputTypes, args.PeersColIdx) colsToStore := framer.getColsToStore([]int{argIdxs[0]}) diff --git a/pkg/sql/colexec/colexecwindow/lead.eg.go b/pkg/sql/colexec/colexecwindow/lead.eg.go index fe9295f334ae..408efefca6a0 100644 --- a/pkg/sql/colexec/colexecwindow/lead.eg.go +++ b/pkg/sql/colexec/colexecwindow/lead.eg.go @@ -25,7 +25,7 @@ import ( // should put its output (if there is no such column, a new column is appended). func NewLeadOperator( args *WindowArgs, argIdx int, offsetIdx int, defaultIdx int, -) (colexecop.Operator, error) { +) (colexecop.ClosableOperator, error) { // Allow the direct-access buffer 10% of the available memory. The rest will // be given to the bufferedWindowOp queue. While it is somewhat more important // for the direct-access buffer tuples to be kept in-memory, it only has to diff --git a/pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go b/pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go index 4dbf5e5f5b1c..ff985706e1d9 100644 --- a/pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go @@ -49,7 +49,7 @@ const _TYPE_WIDTH = 0 // should put its output (if there is no such column, a new column is appended). func New_UPPERCASE_NAMEOperator( args *WindowArgs, argIdx int, offsetIdx int, defaultIdx int, -) (colexecop.Operator, error) { +) (colexecop.ClosableOperator, error) { // Allow the direct-access buffer 10% of the available memory. The rest will // be given to the bufferedWindowOp queue. While it is somewhat more important // for the direct-access buffer tuples to be kept in-memory, it only has to diff --git a/pkg/sql/colexec/colexecwindow/nth_value.eg.go b/pkg/sql/colexec/colexecwindow/nth_value.eg.go index 1e13803a1cdd..59ef0647cbc1 100644 --- a/pkg/sql/colexec/colexecwindow/nth_value.eg.go +++ b/pkg/sql/colexec/colexecwindow/nth_value.eg.go @@ -31,7 +31,7 @@ func NewNthValueOperator( frame *execinfrapb.WindowerSpec_Frame, ordering *execinfrapb.Ordering, argIdxs []int, -) (colexecop.Operator, error) { +) (colexecop.ClosableOperator, error) { framer := newWindowFramer(args.EvalCtx, frame, ordering, args.InputTypes, args.PeersColIdx) colsToStore := framer.getColsToStore([]int{argIdxs[0]}) diff --git a/pkg/sql/colexec/colexecwindow/relative_rank.eg.go b/pkg/sql/colexec/colexecwindow/relative_rank.eg.go index 8c5abaaca414..c3b4382dd797 100644 --- a/pkg/sql/colexec/colexecwindow/relative_rank.eg.go +++ b/pkg/sql/colexec/colexecwindow/relative_rank.eg.go @@ -324,9 +324,7 @@ func (r *percentRankNoPartitionOp) Next() coldata.Batch { } func (r *percentRankNoPartitionOp) Close(ctx context.Context) error { - if !r.CloserHelper.Close() || r.Ctx == nil { - // Either Close() has already been called or Init() was never called. In - // both cases there is nothing to do. + if !r.CloserHelper.Close() { return nil } var lastErr error @@ -603,9 +601,7 @@ func (r *percentRankWithPartitionOp) Next() coldata.Batch { } func (r *percentRankWithPartitionOp) Close(ctx context.Context) error { - if !r.CloserHelper.Close() || r.Ctx == nil { - // Either Close() has already been called or Init() was never called. In - // both cases there is nothing to do. + if !r.CloserHelper.Close() { return nil } var lastErr error @@ -870,9 +866,7 @@ func (r *cumeDistNoPartitionOp) Next() coldata.Batch { } func (r *cumeDistNoPartitionOp) Close(ctx context.Context) error { - if !r.CloserHelper.Close() || r.Ctx == nil { - // Either Close() has already been called or Init() was never called. In - // both cases there is nothing to do. + if !r.CloserHelper.Close() { return nil } var lastErr error @@ -1231,9 +1225,7 @@ func (r *cumeDistWithPartitionOp) Next() coldata.Batch { } func (r *cumeDistWithPartitionOp) Close(ctx context.Context) error { - if !r.CloserHelper.Close() || r.Ctx == nil { - // Either Close() has already been called or Init() was never called. In - // both cases there is nothing to do. + if !r.CloserHelper.Close() { return nil } var lastErr error diff --git a/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go b/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go index ce4439b0b975..c67254a89c93 100644 --- a/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go @@ -590,9 +590,7 @@ func (r *_RELATIVE_RANK_STRINGOp) Next() coldata.Batch { } func (r *_RELATIVE_RANK_STRINGOp) Close(ctx context.Context) error { - if !r.CloserHelper.Close() || r.Ctx == nil { - // Either Close() has already been called or Init() was never called. In - // both cases there is nothing to do. + if !r.CloserHelper.Close() { return nil } var lastErr error diff --git a/pkg/sql/colexec/colexecwindow/window_aggregator.eg.go b/pkg/sql/colexec/colexecwindow/window_aggregator.eg.go index b20cf3d4b0b2..8522f38e26e8 100644 --- a/pkg/sql/colexec/colexecwindow/window_aggregator.eg.go +++ b/pkg/sql/colexec/colexecwindow/window_aggregator.eg.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecagg" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils" - "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/colmem" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -52,8 +51,7 @@ func NewWindowAggregatorOperator( argIdxs []int, outputType *types.T, aggAlloc *colexecagg.AggregateFuncsAlloc, - closers colexecop.Closers, -) colexecop.Operator { +) colexecop.ClosableOperator { // Because the buffer is used multiple times per-row, it is important to // prevent it from spilling to disk if possible. For this reason, we give the // buffer half of the memory budget even though it will generally store less @@ -79,7 +77,6 @@ func NewWindowAggregatorOperator( outputColIdx: args.OutputColIdx, inputIdxs: inputIdxs, framer: framer, - closers: closers, vecs: make([]coldata.Vec, len(inputIdxs)), } var agg colexecagg.AggregateFunc @@ -126,7 +123,6 @@ func NewWindowAggregatorOperator( type windowAggregatorBase struct { partitionSeekerBase colexecop.CloserHelper - closers colexecop.Closers allocator *colmem.Allocator outputColIdx int @@ -178,9 +174,6 @@ func (a *windowAggregatorBase) Close(ctx context.Context) { if !a.CloserHelper.Close() { return } - if err := a.closers.Close(ctx); err != nil { - colexecerror.InternalError(err) - } a.framer.close() a.buffer.Close(ctx) } diff --git a/pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go b/pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go index 6c71db6bfcd0..5c303606251b 100644 --- a/pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go @@ -27,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecagg" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils" - "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/colmem" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -57,8 +56,7 @@ func NewWindowAggregatorOperator( argIdxs []int, outputType *types.T, aggAlloc *colexecagg.AggregateFuncsAlloc, - closers colexecop.Closers, -) colexecop.Operator { +) colexecop.ClosableOperator { // Because the buffer is used multiple times per-row, it is important to // prevent it from spilling to disk if possible. For this reason, we give the // buffer half of the memory budget even though it will generally store less @@ -84,7 +82,6 @@ func NewWindowAggregatorOperator( outputColIdx: args.OutputColIdx, inputIdxs: inputIdxs, framer: framer, - closers: closers, vecs: make([]coldata.Vec, len(inputIdxs)), } var agg colexecagg.AggregateFunc @@ -131,7 +128,6 @@ func NewWindowAggregatorOperator( type windowAggregatorBase struct { partitionSeekerBase colexecop.CloserHelper - closers colexecop.Closers allocator *colmem.Allocator outputColIdx int @@ -183,9 +179,6 @@ func (a *windowAggregatorBase) Close(ctx context.Context) { if !a.CloserHelper.Close() { return } - if err := a.closers.Close(ctx); err != nil { - colexecerror.InternalError(err) - } a.framer.close() a.buffer.Close(ctx) } diff --git a/pkg/sql/colexec/colexecwindow/window_functions_test.go b/pkg/sql/colexec/colexecwindow/window_functions_test.go index 580b678f5510..a82ebc0ad35a 100644 --- a/pkg/sql/colexec/colexecwindow/window_functions_test.go +++ b/pkg/sql/colexec/colexecwindow/window_functions_test.go @@ -1124,6 +1124,11 @@ func BenchmarkWindowFunctions(b *testing.B) { benchMemAccount := testMemMonitor.MakeBoundAccount() defer benchMemAccount.Close(ctx) + var allClosers colexecop.Closers + defer func() { + require.NoError(b, allClosers.Close(ctx)) + }() + getWindowFn := func( fun execinfrapb.WindowerSpec_Func, source colexecop.Operator, partition, order bool, ) (op colexecop.Operator) { @@ -1222,7 +1227,9 @@ func BenchmarkWindowFunctions(b *testing.B) { op = NewWindowAggregatorOperator( args, *fun.AggregateFunc, NormalizeWindowFrame(nil), &execinfrapb.Ordering{Columns: orderingCols}, []int{arg1ColIdx}, - aggArgs.OutputTypes[0], aggFnsAlloc, toClose) + aggArgs.OutputTypes[0], aggFnsAlloc, + ) + allClosers = append(allClosers, toClose...) } else { require.Fail(b, "expected non-nil window function") } diff --git a/pkg/sql/colexec/columnarizer.go b/pkg/sql/colexec/columnarizer.go index 9029e1dd61fa..916a50509e99 100644 --- a/pkg/sql/colexec/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -261,10 +261,7 @@ func (c *Columnarizer) Next() coldata.Batch { return c.batch } -var ( - _ colexecop.DrainableOperator = &Columnarizer{} - _ colexecop.Closer = &Columnarizer{} -) +var _ colexecop.DrainableClosableOperator = &Columnarizer{} // DrainMeta is part of the colexecop.MetadataSource interface. func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata { diff --git a/pkg/sql/colexec/external_distinct_test.go b/pkg/sql/colexec/external_distinct_test.go index 70d2d5afb9ee..48f968b026a3 100644 --- a/pkg/sql/colexec/external_distinct_test.go +++ b/pkg/sql/colexec/external_distinct_test.go @@ -67,9 +67,9 @@ func TestExternalDistinct(t *testing.T) { var semsToCheck []semaphore.Semaphore var outputOrdering execinfrapb.Ordering verifier := colexectestutils.UnorderedVerifier - // Check that the external distinct and the disk-backed sort - // were added as Closers. - numExpectedClosers := 2 + // Check that the disk spiller, the external distinct, and the + // disk-backed sort were added as Closers. + numExpectedClosers := 3 if tc.isOrderedOnDistinctCols { outputOrdering = convertDistinctColsToOrdering(tc.distinctCols) verifier = colexectestutils.OrderedVerifier @@ -190,9 +190,9 @@ func TestExternalDistinctSpilling(t *testing.T) { &monitorRegistry, ) require.NoError(t, err) - // Check that the external distinct and the disk-backed sort - // were added as Closers. - numExpectedClosers := 2 + // Check that the disk spiller, the external distinct, and the + // disk-backed sort were added as Closers. + numExpectedClosers := 3 require.Equal(t, numExpectedClosers, len(closers)) numRuns++ return distinct, nil diff --git a/pkg/sql/colexec/external_hash_aggregator_test.go b/pkg/sql/colexec/external_hash_aggregator_test.go index 154f6fb08fb9..78e7093df38b 100644 --- a/pkg/sql/colexec/external_hash_aggregator_test.go +++ b/pkg/sql/colexec/external_hash_aggregator_test.go @@ -111,10 +111,9 @@ func TestExternalHashAggregator(t *testing.T) { } var numExpectedClosers int if cfg.diskSpillingEnabled { - // The external sorter and the disk spiller should be added - // as Closers (the latter is responsible for closing the - // in-memory hash aggregator as well as the external one). - numExpectedClosers = 2 + // The external sorter, the disk spiller, and the external hash + // aggregator should be added as Closers. + numExpectedClosers = 3 if len(tc.spec.OutputOrdering.Columns) > 0 { // When the output ordering is required, we also plan // another external sort. diff --git a/pkg/sql/colexec/invariants_checker.go b/pkg/sql/colexec/invariants_checker.go index 4970e095fe00..33f46a1af22d 100644 --- a/pkg/sql/colexec/invariants_checker.go +++ b/pkg/sql/colexec/invariants_checker.go @@ -32,11 +32,10 @@ type invariantsChecker struct { metadataSource colexecop.MetadataSource } -var _ colexecop.DrainableOperator = &invariantsChecker{} -var _ colexecop.ClosableOperator = &invariantsChecker{} +var _ colexecop.DrainableClosableOperator = &invariantsChecker{} // NewInvariantsChecker creates a new invariantsChecker. -func NewInvariantsChecker(input colexecop.Operator) colexecop.DrainableOperator { +func NewInvariantsChecker(input colexecop.Operator) colexecop.DrainableClosableOperator { if !buildutil.CrdbTestBuild { colexecerror.InternalError(errors.AssertionFailedf( "an invariantsChecker is attempted to be created in non-test build", diff --git a/pkg/sql/colexec/parallel_unordered_synchronizer.go b/pkg/sql/colexec/parallel_unordered_synchronizer.go index 926985cba2e7..152374eaa5b4 100644 --- a/pkg/sql/colexec/parallel_unordered_synchronizer.go +++ b/pkg/sql/colexec/parallel_unordered_synchronizer.go @@ -111,8 +111,7 @@ type ParallelUnorderedSynchronizer struct { bufferedMeta []execinfrapb.ProducerMetadata } -var _ colexecop.DrainableOperator = &ParallelUnorderedSynchronizer{} -var _ colexecop.ClosableOperator = &ParallelUnorderedSynchronizer{} +var _ colexecop.DrainableClosableOperator = &ParallelUnorderedSynchronizer{} // ChildCount implements the execopnode.OpNode interface. func (s *ParallelUnorderedSynchronizer) ChildCount(verbose bool) int { diff --git a/pkg/sql/colexecop/operator.go b/pkg/sql/colexecop/operator.go index 99d4630100cf..e9d26d7571a2 100644 --- a/pkg/sql/colexecop/operator.go +++ b/pkg/sql/colexecop/operator.go @@ -53,10 +53,10 @@ type Operator interface { execopnode.OpNode } -// DrainableOperator is an operator that also implements DrainMeta. Next and -// DrainMeta may not be called concurrently. -type DrainableOperator interface { - Operator +// DrainableClosableOperator is a ClosableOperator that also implements +// DrainMeta. Next, DrainMeta, and Close may not be called concurrently. +type DrainableClosableOperator interface { + ClosableOperator MetadataSource } diff --git a/pkg/sql/colflow/routers.go b/pkg/sql/colflow/routers.go index d22111db3b70..9410cfa325b4 100644 --- a/pkg/sql/colflow/routers.go +++ b/pkg/sql/colflow/routers.go @@ -144,7 +144,7 @@ func (o *routerOutputOp) Child(nth int, verbose bool) execopnode.OpNode { return nil } -var _ colexecop.Operator = &routerOutputOp{} +var _ colexecop.DrainableClosableOperator = &routerOutputOp{} type routerOutputOpTestingKnobs struct { // blockedThreshold is the number of buffered values above which we consider @@ -282,6 +282,12 @@ func (o *routerOutputOp) DrainMeta() []execinfrapb.ProducerMetadata { return o.drainCoordinator.drainMeta() } +func (o *routerOutputOp) Close(ctx context.Context) error { + o.mu.Lock() + defer o.mu.Unlock() + return o.mu.data.Close(ctx) +} + func (o *routerOutputOp) initWithHashRouter(r *HashRouter) { o.input = r o.drainCoordinator = r @@ -468,9 +474,9 @@ func NewHashRouter( diskQueueCfg colcontainer.DiskQueueCfg, fdSemaphore semaphore.Semaphore, diskAccounts []*mon.BoundAccount, -) (*HashRouter, []colexecop.DrainableOperator) { +) (*HashRouter, []colexecop.DrainableClosableOperator) { outputs := make([]routerOutput, len(unlimitedAllocators)) - outputsAsOps := make([]colexecop.DrainableOperator, len(unlimitedAllocators)) + outputsAsOps := make([]colexecop.DrainableClosableOperator, len(unlimitedAllocators)) // unblockEventsChan is buffered to 2*numOutputs as we don't want the outputs // writing to it to block. // Unblock events only happen after a corresponding block event. Since these diff --git a/pkg/sql/colflow/routers_test.go b/pkg/sql/colflow/routers_test.go index 1f0f31385734..b25a77521fbd 100644 --- a/pkg/sql/colflow/routers_test.go +++ b/pkg/sql/colflow/routers_test.go @@ -1037,7 +1037,7 @@ func TestHashRouterRandom(t *testing.T) { colexectestutils.RunTestsWithFn(t, tu.testAllocator, []colexectestutils.Tuples{data}, nil, func(t *testing.T, inputs []colexecop.Operator) { unblockEventsChan := make(chan struct{}, 2*numOutputs) outputs := make([]routerOutput, numOutputs) - outputsAsOps := make([]colexecop.DrainableOperator, numOutputs) + outputsAsOps := make([]colexecop.DrainableClosableOperator, numOutputs) memoryLimitPerOutput := mtc.bytes / int64(len(outputs)) for i := range outputs { // Create separate monitoring infrastructure as well as diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index b3391ad085c0..0d9439ae8596 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -767,6 +767,7 @@ func (s *vectorizedFlowCreator) setupRouter( ctx, flowCtx, colexecargs.OpWithMetaInfo{ Root: op, MetadataSources: colexecop.MetadataSources{op}, + ToClose: colexecop.Closers{op}, }, outputTyps, stream, factory, nil, /* getStats */ ); err != nil { return err @@ -776,8 +777,8 @@ func (s *vectorizedFlowCreator) setupRouter( opWithMetaInfo := colexecargs.OpWithMetaInfo{ Root: op, MetadataSources: colexecop.MetadataSources{op}, - // ToClose will be closed by the hash router. - ToClose: nil, + // input.ToClose will be closed by the hash router. + ToClose: colexecop.Closers{op}, } if s.recordingStats { mons := []*mon.BytesMonitor{hashRouterMemMonitor, diskMon}