diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 5f18a82cfa2a..f767db8aa733 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -943,7 +943,7 @@ func NewColOperator( newAggArgs.Allocator = colmem.NewAllocator(ctx, ehaMemAccount, factory) newAggArgs.MemAccount = ehaMemAccount newAggArgs.Input = input - return colexecdisk.NewExternalHashAggregator( + eha, toClose := colexecdisk.NewExternalHashAggregator( flowCtx, args, &newAggArgs, @@ -957,6 +957,8 @@ func NewColOperator( outputUnlimitedAllocator, maxOutputBatchMemSize, ) + result.ToClose = append(result.ToClose, toClose) + return eha }, args.TestingKnobs.SpillingCallbackFn, ) @@ -1006,7 +1008,7 @@ func NewColOperator( unlimitedAllocator := colmem.NewAllocator( ctx, args.MonitorRegistry.CreateUnlimitedMemAccount(ctx, flowCtx, edOpName, spec.ProcessorID), factory, ) - return colexecdisk.NewExternalDistinct( + ed, toClose := colexecdisk.NewExternalDistinct( unlimitedAllocator, flowCtx, args, @@ -1016,6 +1018,8 @@ func NewColOperator( inMemoryUnorderedDistinct, diskAccount, ) + result.ToClose = append(result.ToClose, toClose) + return ed }, args.TestingKnobs.SpillingCallbackFn, ) @@ -1106,7 +1110,7 @@ func NewColOperator( result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, opName, factory), diskAccount, ) - result.ToClose = append(result.ToClose, ehj.(colexecop.Closer)) + result.ToClose = append(result.ToClose, ehj) return ehj }, args.TestingKnobs.SpillingCallbackFn, @@ -1419,7 +1423,9 @@ func NewColOperator( } result.Root = colexecwindow.NewWindowAggregatorOperator( windowArgs, aggType, wf.Frame, &wf.Ordering, argIdxs, - aggArgs.OutputTypes[0], aggFnsAlloc, toClose) + aggArgs.OutputTypes[0], aggFnsAlloc, + ) + result.ToClose = append(result.ToClose, toClose...) returnType = aggArgs.OutputTypes[0] } } else { diff --git a/pkg/sql/colexec/colexecdisk/external_distinct.go b/pkg/sql/colexec/colexecdisk/external_distinct.go index 14ea6a93c200..598c8c0624c7 100644 --- a/pkg/sql/colexec/colexecdisk/external_distinct.go +++ b/pkg/sql/colexec/colexecdisk/external_distinct.go @@ -36,7 +36,7 @@ func NewExternalDistinct( createDiskBackedSorter DiskBackedSorterConstructor, inMemUnorderedDistinct colexecop.Operator, diskAcc *mon.BoundAccount, -) colexecop.Operator { +) (colexecop.Operator, colexecop.Closer) { distinctSpec := args.Spec.Core.Distinct distinctCols := distinctSpec.DistinctColumns inMemMainOpConstructor := func(partitionedInputs []*partitionerToOperator) colexecop.ResettableOperator { @@ -111,13 +111,13 @@ func NewExternalDistinct( outputOrdering := args.Spec.Core.Distinct.OutputOrdering if len(outputOrdering.Columns) == 0 { // No particular output ordering is required. - return ed + return ed, ed } // TODO(yuzefovich): the fact that we're planning an additional external // sort isn't accounted for when considering the number file descriptors to // acquire. Not urgent, but it should be fixed. maxNumberActivePartitions := calculateMaxNumberActivePartitions(flowCtx, args, numRequiredActivePartitions) - return createDiskBackedSorter(ed, inputTypes, outputOrdering.Columns, maxNumberActivePartitions) + return createDiskBackedSorter(ed, inputTypes, outputOrdering.Columns, maxNumberActivePartitions), ed } // unorderedDistinctFilterer filters out tuples that are duplicates of the diff --git a/pkg/sql/colexec/colexecdisk/external_hash_aggregator.go b/pkg/sql/colexec/colexecdisk/external_hash_aggregator.go index f7f610c0297c..dc9bd7177824 100644 --- a/pkg/sql/colexec/colexecdisk/external_hash_aggregator.go +++ b/pkg/sql/colexec/colexecdisk/external_hash_aggregator.go @@ -39,7 +39,7 @@ func NewExternalHashAggregator( diskAcc *mon.BoundAccount, outputUnlimitedAllocator *colmem.Allocator, maxOutputBatchMemSize int64, -) colexecop.Operator { +) (colexecop.Operator, colexecop.Closer) { inMemMainOpConstructor := func(partitionedInputs []*partitionerToOperator) colexecop.ResettableOperator { newAggArgs := *newAggArgs newAggArgs.Input = partitionedInputs[0] @@ -83,11 +83,11 @@ func NewExternalHashAggregator( outputOrdering := args.Spec.Core.Aggregator.OutputOrdering if len(outputOrdering.Columns) == 0 { // No particular output ordering is required. - return eha + return eha, eha } // TODO(yuzefovich): the fact that we're planning an additional external // sort isn't accounted for when considering the number file descriptors to // acquire. Not urgent, but it should be fixed. maxNumberActivePartitions := calculateMaxNumberActivePartitions(flowCtx, args, ehaNumRequiredActivePartitions) - return createDiskBackedSorter(eha, newAggArgs.OutputTypes, outputOrdering.Columns, maxNumberActivePartitions) + return createDiskBackedSorter(eha, newAggArgs.OutputTypes, outputOrdering.Columns, maxNumberActivePartitions), eha } diff --git a/pkg/sql/colexec/colexecdisk/external_hash_joiner.go b/pkg/sql/colexec/colexecdisk/external_hash_joiner.go index 01ea8658cb77..f024df2ff77d 100644 --- a/pkg/sql/colexec/colexecdisk/external_hash_joiner.go +++ b/pkg/sql/colexec/colexecdisk/external_hash_joiner.go @@ -68,7 +68,7 @@ func NewExternalHashJoiner( leftInput, rightInput colexecop.Operator, createDiskBackedSorter DiskBackedSorterConstructor, diskAcc *mon.BoundAccount, -) colexecop.Operator { +) colexecop.ClosableOperator { // This memory limit will restrict the size of the batches output by the // in-memory hash joiner in the main strategy as well as by the merge joiner // in the fallback strategy. diff --git a/pkg/sql/colexec/colexecjoin/crossjoiner.go b/pkg/sql/colexec/colexecjoin/crossjoiner.go index c44084571cff..87a4cd8c5319 100644 --- a/pkg/sql/colexec/colexecjoin/crossjoiner.go +++ b/pkg/sql/colexec/colexecjoin/crossjoiner.go @@ -39,7 +39,7 @@ func NewCrossJoiner( leftTypes []*types.T, rightTypes []*types.T, diskAcc *mon.BoundAccount, -) colexecop.Operator { +) colexecop.ClosableOperator { return &crossJoiner{ crossJoinerBase: newCrossJoinerBase( unlimitedAllocator, diff --git a/pkg/sql/colexec/colexecwindow/buffered_window.go b/pkg/sql/colexec/colexecwindow/buffered_window.go index a5f1cf83c27d..2e4af2d853aa 100644 --- a/pkg/sql/colexec/colexecwindow/buffered_window.go +++ b/pkg/sql/colexec/colexecwindow/buffered_window.go @@ -30,7 +30,7 @@ import ( // window function. func newBufferedWindowOperator( args *WindowArgs, windower bufferedWindower, outputColType *types.T, memoryLimit int64, -) colexecop.Operator { +) colexecop.ClosableOperator { outputTypes := make([]*types.T, len(args.InputTypes), len(args.InputTypes)+1) copy(outputTypes, args.InputTypes) outputTypes = append(outputTypes, outputColType) @@ -213,7 +213,7 @@ func (b *bufferedWindowOp) Init(ctx context.Context) { b.windower.startNewPartition() } -var _ colexecop.Operator = &bufferedWindowOp{} +var _ colexecop.ClosableOperator = &bufferedWindowOp{} func (b *bufferedWindowOp) Next() coldata.Batch { var err error @@ -343,16 +343,11 @@ func (b *bufferedWindowOp) Next() coldata.Batch { } func (b *bufferedWindowOp) Close(ctx context.Context) error { - if !b.CloserHelper.Close() || b.Ctx == nil { - // Either Close() has already been called or Init() was never called. In - // both cases there is nothing to do. + if !b.CloserHelper.Close() { return nil } - if err := b.bufferQueue.Close(ctx); err != nil { - return err - } b.windower.Close(ctx) - return nil + return b.bufferQueue.Close(ctx) } // partitionSeekerBase extracts common fields and methods for buffered windower diff --git a/pkg/sql/colexec/colexecwindow/count_rows_aggregator.go b/pkg/sql/colexec/colexecwindow/count_rows_aggregator.go index 20af8be13cab..71f830c1f3fc 100644 --- a/pkg/sql/colexec/colexecwindow/count_rows_aggregator.go +++ b/pkg/sql/colexec/colexecwindow/count_rows_aggregator.go @@ -25,7 +25,7 @@ import ( // aggregate window function. func NewCountRowsOperator( args *WindowArgs, frame *execinfrapb.WindowerSpec_Frame, ordering *execinfrapb.Ordering, -) colexecop.Operator { +) colexecop.ClosableOperator { // Because the buffer is potentially used multiple times per-row, it is // important to prevent it from spilling to disk if possible. For this reason, // we give the buffer half of the memory budget even though it will generally diff --git a/pkg/sql/colexec/colexecwindow/first_last_nth_value_tmpl.go b/pkg/sql/colexec/colexecwindow/first_last_nth_value_tmpl.go index 4efdea49c86f..60e14f3ffd49 100644 --- a/pkg/sql/colexec/colexecwindow/first_last_nth_value_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/first_last_nth_value_tmpl.go @@ -56,7 +56,7 @@ func New_UPPERCASE_NAMEOperator( frame *execinfrapb.WindowerSpec_Frame, ordering *execinfrapb.Ordering, argIdxs []int, -) (colexecop.Operator, error) { +) (colexecop.ClosableOperator, error) { framer := newWindowFramer(args.EvalCtx, frame, ordering, args.InputTypes, args.PeersColIdx) colsToStore := framer.getColsToStore([]int{argIdxs[0]}) diff --git a/pkg/sql/colexec/colexecwindow/first_value.eg.go b/pkg/sql/colexec/colexecwindow/first_value.eg.go index 2a666afbc323..8d4e303acd7a 100644 --- a/pkg/sql/colexec/colexecwindow/first_value.eg.go +++ b/pkg/sql/colexec/colexecwindow/first_value.eg.go @@ -29,7 +29,7 @@ func NewFirstValueOperator( frame *execinfrapb.WindowerSpec_Frame, ordering *execinfrapb.Ordering, argIdxs []int, -) (colexecop.Operator, error) { +) (colexecop.ClosableOperator, error) { framer := newWindowFramer(args.EvalCtx, frame, ordering, args.InputTypes, args.PeersColIdx) colsToStore := framer.getColsToStore([]int{argIdxs[0]}) diff --git a/pkg/sql/colexec/colexecwindow/lag.eg.go b/pkg/sql/colexec/colexecwindow/lag.eg.go index 271974c3d70a..b7a02e32c396 100644 --- a/pkg/sql/colexec/colexecwindow/lag.eg.go +++ b/pkg/sql/colexec/colexecwindow/lag.eg.go @@ -25,7 +25,7 @@ import ( // should put its output (if there is no such column, a new column is appended). func NewLagOperator( args *WindowArgs, argIdx int, offsetIdx int, defaultIdx int, -) (colexecop.Operator, error) { +) (colexecop.ClosableOperator, error) { // Allow the direct-access buffer 10% of the available memory. The rest will // be given to the bufferedWindowOp queue. While it is somewhat more important // for the direct-access buffer tuples to be kept in-memory, it only has to diff --git a/pkg/sql/colexec/colexecwindow/last_value.eg.go b/pkg/sql/colexec/colexecwindow/last_value.eg.go index b6e2ef4a8cf9..721fa8001555 100644 --- a/pkg/sql/colexec/colexecwindow/last_value.eg.go +++ b/pkg/sql/colexec/colexecwindow/last_value.eg.go @@ -29,7 +29,7 @@ func NewLastValueOperator( frame *execinfrapb.WindowerSpec_Frame, ordering *execinfrapb.Ordering, argIdxs []int, -) (colexecop.Operator, error) { +) (colexecop.ClosableOperator, error) { framer := newWindowFramer(args.EvalCtx, frame, ordering, args.InputTypes, args.PeersColIdx) colsToStore := framer.getColsToStore([]int{argIdxs[0]}) diff --git a/pkg/sql/colexec/colexecwindow/lead.eg.go b/pkg/sql/colexec/colexecwindow/lead.eg.go index fe9295f334ae..408efefca6a0 100644 --- a/pkg/sql/colexec/colexecwindow/lead.eg.go +++ b/pkg/sql/colexec/colexecwindow/lead.eg.go @@ -25,7 +25,7 @@ import ( // should put its output (if there is no such column, a new column is appended). func NewLeadOperator( args *WindowArgs, argIdx int, offsetIdx int, defaultIdx int, -) (colexecop.Operator, error) { +) (colexecop.ClosableOperator, error) { // Allow the direct-access buffer 10% of the available memory. The rest will // be given to the bufferedWindowOp queue. While it is somewhat more important // for the direct-access buffer tuples to be kept in-memory, it only has to diff --git a/pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go b/pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go index 4dbf5e5f5b1c..ff985706e1d9 100644 --- a/pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go @@ -49,7 +49,7 @@ const _TYPE_WIDTH = 0 // should put its output (if there is no such column, a new column is appended). func New_UPPERCASE_NAMEOperator( args *WindowArgs, argIdx int, offsetIdx int, defaultIdx int, -) (colexecop.Operator, error) { +) (colexecop.ClosableOperator, error) { // Allow the direct-access buffer 10% of the available memory. The rest will // be given to the bufferedWindowOp queue. While it is somewhat more important // for the direct-access buffer tuples to be kept in-memory, it only has to diff --git a/pkg/sql/colexec/colexecwindow/nth_value.eg.go b/pkg/sql/colexec/colexecwindow/nth_value.eg.go index 1e13803a1cdd..59ef0647cbc1 100644 --- a/pkg/sql/colexec/colexecwindow/nth_value.eg.go +++ b/pkg/sql/colexec/colexecwindow/nth_value.eg.go @@ -31,7 +31,7 @@ func NewNthValueOperator( frame *execinfrapb.WindowerSpec_Frame, ordering *execinfrapb.Ordering, argIdxs []int, -) (colexecop.Operator, error) { +) (colexecop.ClosableOperator, error) { framer := newWindowFramer(args.EvalCtx, frame, ordering, args.InputTypes, args.PeersColIdx) colsToStore := framer.getColsToStore([]int{argIdxs[0]}) diff --git a/pkg/sql/colexec/colexecwindow/relative_rank.eg.go b/pkg/sql/colexec/colexecwindow/relative_rank.eg.go index 8c5abaaca414..c3b4382dd797 100644 --- a/pkg/sql/colexec/colexecwindow/relative_rank.eg.go +++ b/pkg/sql/colexec/colexecwindow/relative_rank.eg.go @@ -324,9 +324,7 @@ func (r *percentRankNoPartitionOp) Next() coldata.Batch { } func (r *percentRankNoPartitionOp) Close(ctx context.Context) error { - if !r.CloserHelper.Close() || r.Ctx == nil { - // Either Close() has already been called or Init() was never called. In - // both cases there is nothing to do. + if !r.CloserHelper.Close() { return nil } var lastErr error @@ -603,9 +601,7 @@ func (r *percentRankWithPartitionOp) Next() coldata.Batch { } func (r *percentRankWithPartitionOp) Close(ctx context.Context) error { - if !r.CloserHelper.Close() || r.Ctx == nil { - // Either Close() has already been called or Init() was never called. In - // both cases there is nothing to do. + if !r.CloserHelper.Close() { return nil } var lastErr error @@ -870,9 +866,7 @@ func (r *cumeDistNoPartitionOp) Next() coldata.Batch { } func (r *cumeDistNoPartitionOp) Close(ctx context.Context) error { - if !r.CloserHelper.Close() || r.Ctx == nil { - // Either Close() has already been called or Init() was never called. In - // both cases there is nothing to do. + if !r.CloserHelper.Close() { return nil } var lastErr error @@ -1231,9 +1225,7 @@ func (r *cumeDistWithPartitionOp) Next() coldata.Batch { } func (r *cumeDistWithPartitionOp) Close(ctx context.Context) error { - if !r.CloserHelper.Close() || r.Ctx == nil { - // Either Close() has already been called or Init() was never called. In - // both cases there is nothing to do. + if !r.CloserHelper.Close() { return nil } var lastErr error diff --git a/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go b/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go index ce4439b0b975..c67254a89c93 100644 --- a/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go @@ -590,9 +590,7 @@ func (r *_RELATIVE_RANK_STRINGOp) Next() coldata.Batch { } func (r *_RELATIVE_RANK_STRINGOp) Close(ctx context.Context) error { - if !r.CloserHelper.Close() || r.Ctx == nil { - // Either Close() has already been called or Init() was never called. In - // both cases there is nothing to do. + if !r.CloserHelper.Close() { return nil } var lastErr error diff --git a/pkg/sql/colexec/colexecwindow/window_aggregator.eg.go b/pkg/sql/colexec/colexecwindow/window_aggregator.eg.go index b20cf3d4b0b2..8522f38e26e8 100644 --- a/pkg/sql/colexec/colexecwindow/window_aggregator.eg.go +++ b/pkg/sql/colexec/colexecwindow/window_aggregator.eg.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecagg" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils" - "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/colmem" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -52,8 +51,7 @@ func NewWindowAggregatorOperator( argIdxs []int, outputType *types.T, aggAlloc *colexecagg.AggregateFuncsAlloc, - closers colexecop.Closers, -) colexecop.Operator { +) colexecop.ClosableOperator { // Because the buffer is used multiple times per-row, it is important to // prevent it from spilling to disk if possible. For this reason, we give the // buffer half of the memory budget even though it will generally store less @@ -79,7 +77,6 @@ func NewWindowAggregatorOperator( outputColIdx: args.OutputColIdx, inputIdxs: inputIdxs, framer: framer, - closers: closers, vecs: make([]coldata.Vec, len(inputIdxs)), } var agg colexecagg.AggregateFunc @@ -126,7 +123,6 @@ func NewWindowAggregatorOperator( type windowAggregatorBase struct { partitionSeekerBase colexecop.CloserHelper - closers colexecop.Closers allocator *colmem.Allocator outputColIdx int @@ -178,9 +174,6 @@ func (a *windowAggregatorBase) Close(ctx context.Context) { if !a.CloserHelper.Close() { return } - if err := a.closers.Close(ctx); err != nil { - colexecerror.InternalError(err) - } a.framer.close() a.buffer.Close(ctx) } diff --git a/pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go b/pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go index 6c71db6bfcd0..5c303606251b 100644 --- a/pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go @@ -27,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecagg" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils" - "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/colmem" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -57,8 +56,7 @@ func NewWindowAggregatorOperator( argIdxs []int, outputType *types.T, aggAlloc *colexecagg.AggregateFuncsAlloc, - closers colexecop.Closers, -) colexecop.Operator { +) colexecop.ClosableOperator { // Because the buffer is used multiple times per-row, it is important to // prevent it from spilling to disk if possible. For this reason, we give the // buffer half of the memory budget even though it will generally store less @@ -84,7 +82,6 @@ func NewWindowAggregatorOperator( outputColIdx: args.OutputColIdx, inputIdxs: inputIdxs, framer: framer, - closers: closers, vecs: make([]coldata.Vec, len(inputIdxs)), } var agg colexecagg.AggregateFunc @@ -131,7 +128,6 @@ func NewWindowAggregatorOperator( type windowAggregatorBase struct { partitionSeekerBase colexecop.CloserHelper - closers colexecop.Closers allocator *colmem.Allocator outputColIdx int @@ -183,9 +179,6 @@ func (a *windowAggregatorBase) Close(ctx context.Context) { if !a.CloserHelper.Close() { return } - if err := a.closers.Close(ctx); err != nil { - colexecerror.InternalError(err) - } a.framer.close() a.buffer.Close(ctx) } diff --git a/pkg/sql/colexec/colexecwindow/window_functions_test.go b/pkg/sql/colexec/colexecwindow/window_functions_test.go index 580b678f5510..a82ebc0ad35a 100644 --- a/pkg/sql/colexec/colexecwindow/window_functions_test.go +++ b/pkg/sql/colexec/colexecwindow/window_functions_test.go @@ -1124,6 +1124,11 @@ func BenchmarkWindowFunctions(b *testing.B) { benchMemAccount := testMemMonitor.MakeBoundAccount() defer benchMemAccount.Close(ctx) + var allClosers colexecop.Closers + defer func() { + require.NoError(b, allClosers.Close(ctx)) + }() + getWindowFn := func( fun execinfrapb.WindowerSpec_Func, source colexecop.Operator, partition, order bool, ) (op colexecop.Operator) { @@ -1222,7 +1227,9 @@ func BenchmarkWindowFunctions(b *testing.B) { op = NewWindowAggregatorOperator( args, *fun.AggregateFunc, NormalizeWindowFrame(nil), &execinfrapb.Ordering{Columns: orderingCols}, []int{arg1ColIdx}, - aggArgs.OutputTypes[0], aggFnsAlloc, toClose) + aggArgs.OutputTypes[0], aggFnsAlloc, + ) + allClosers = append(allClosers, toClose...) } else { require.Fail(b, "expected non-nil window function") } diff --git a/pkg/sql/colexec/columnarizer.go b/pkg/sql/colexec/columnarizer.go index 9029e1dd61fa..916a50509e99 100644 --- a/pkg/sql/colexec/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -261,10 +261,7 @@ func (c *Columnarizer) Next() coldata.Batch { return c.batch } -var ( - _ colexecop.DrainableOperator = &Columnarizer{} - _ colexecop.Closer = &Columnarizer{} -) +var _ colexecop.DrainableClosableOperator = &Columnarizer{} // DrainMeta is part of the colexecop.MetadataSource interface. func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata { diff --git a/pkg/sql/colexec/external_distinct_test.go b/pkg/sql/colexec/external_distinct_test.go index 70d2d5afb9ee..48f968b026a3 100644 --- a/pkg/sql/colexec/external_distinct_test.go +++ b/pkg/sql/colexec/external_distinct_test.go @@ -67,9 +67,9 @@ func TestExternalDistinct(t *testing.T) { var semsToCheck []semaphore.Semaphore var outputOrdering execinfrapb.Ordering verifier := colexectestutils.UnorderedVerifier - // Check that the external distinct and the disk-backed sort - // were added as Closers. - numExpectedClosers := 2 + // Check that the disk spiller, the external distinct, and the + // disk-backed sort were added as Closers. + numExpectedClosers := 3 if tc.isOrderedOnDistinctCols { outputOrdering = convertDistinctColsToOrdering(tc.distinctCols) verifier = colexectestutils.OrderedVerifier @@ -190,9 +190,9 @@ func TestExternalDistinctSpilling(t *testing.T) { &monitorRegistry, ) require.NoError(t, err) - // Check that the external distinct and the disk-backed sort - // were added as Closers. - numExpectedClosers := 2 + // Check that the disk spiller, the external distinct, and the + // disk-backed sort were added as Closers. + numExpectedClosers := 3 require.Equal(t, numExpectedClosers, len(closers)) numRuns++ return distinct, nil diff --git a/pkg/sql/colexec/external_hash_aggregator_test.go b/pkg/sql/colexec/external_hash_aggregator_test.go index 154f6fb08fb9..78e7093df38b 100644 --- a/pkg/sql/colexec/external_hash_aggregator_test.go +++ b/pkg/sql/colexec/external_hash_aggregator_test.go @@ -111,10 +111,9 @@ func TestExternalHashAggregator(t *testing.T) { } var numExpectedClosers int if cfg.diskSpillingEnabled { - // The external sorter and the disk spiller should be added - // as Closers (the latter is responsible for closing the - // in-memory hash aggregator as well as the external one). - numExpectedClosers = 2 + // The external sorter, the disk spiller, and the external hash + // aggregator should be added as Closers. + numExpectedClosers = 3 if len(tc.spec.OutputOrdering.Columns) > 0 { // When the output ordering is required, we also plan // another external sort. diff --git a/pkg/sql/colexec/invariants_checker.go b/pkg/sql/colexec/invariants_checker.go index 4970e095fe00..33f46a1af22d 100644 --- a/pkg/sql/colexec/invariants_checker.go +++ b/pkg/sql/colexec/invariants_checker.go @@ -32,11 +32,10 @@ type invariantsChecker struct { metadataSource colexecop.MetadataSource } -var _ colexecop.DrainableOperator = &invariantsChecker{} -var _ colexecop.ClosableOperator = &invariantsChecker{} +var _ colexecop.DrainableClosableOperator = &invariantsChecker{} // NewInvariantsChecker creates a new invariantsChecker. -func NewInvariantsChecker(input colexecop.Operator) colexecop.DrainableOperator { +func NewInvariantsChecker(input colexecop.Operator) colexecop.DrainableClosableOperator { if !buildutil.CrdbTestBuild { colexecerror.InternalError(errors.AssertionFailedf( "an invariantsChecker is attempted to be created in non-test build", diff --git a/pkg/sql/colexec/parallel_unordered_synchronizer.go b/pkg/sql/colexec/parallel_unordered_synchronizer.go index 926985cba2e7..152374eaa5b4 100644 --- a/pkg/sql/colexec/parallel_unordered_synchronizer.go +++ b/pkg/sql/colexec/parallel_unordered_synchronizer.go @@ -111,8 +111,7 @@ type ParallelUnorderedSynchronizer struct { bufferedMeta []execinfrapb.ProducerMetadata } -var _ colexecop.DrainableOperator = &ParallelUnorderedSynchronizer{} -var _ colexecop.ClosableOperator = &ParallelUnorderedSynchronizer{} +var _ colexecop.DrainableClosableOperator = &ParallelUnorderedSynchronizer{} // ChildCount implements the execopnode.OpNode interface. func (s *ParallelUnorderedSynchronizer) ChildCount(verbose bool) int { diff --git a/pkg/sql/colexecop/operator.go b/pkg/sql/colexecop/operator.go index 99d4630100cf..e9d26d7571a2 100644 --- a/pkg/sql/colexecop/operator.go +++ b/pkg/sql/colexecop/operator.go @@ -53,10 +53,10 @@ type Operator interface { execopnode.OpNode } -// DrainableOperator is an operator that also implements DrainMeta. Next and -// DrainMeta may not be called concurrently. -type DrainableOperator interface { - Operator +// DrainableClosableOperator is a ClosableOperator that also implements +// DrainMeta. Next, DrainMeta, and Close may not be called concurrently. +type DrainableClosableOperator interface { + ClosableOperator MetadataSource } diff --git a/pkg/sql/colflow/routers.go b/pkg/sql/colflow/routers.go index d22111db3b70..9410cfa325b4 100644 --- a/pkg/sql/colflow/routers.go +++ b/pkg/sql/colflow/routers.go @@ -144,7 +144,7 @@ func (o *routerOutputOp) Child(nth int, verbose bool) execopnode.OpNode { return nil } -var _ colexecop.Operator = &routerOutputOp{} +var _ colexecop.DrainableClosableOperator = &routerOutputOp{} type routerOutputOpTestingKnobs struct { // blockedThreshold is the number of buffered values above which we consider @@ -282,6 +282,12 @@ func (o *routerOutputOp) DrainMeta() []execinfrapb.ProducerMetadata { return o.drainCoordinator.drainMeta() } +func (o *routerOutputOp) Close(ctx context.Context) error { + o.mu.Lock() + defer o.mu.Unlock() + return o.mu.data.Close(ctx) +} + func (o *routerOutputOp) initWithHashRouter(r *HashRouter) { o.input = r o.drainCoordinator = r @@ -468,9 +474,9 @@ func NewHashRouter( diskQueueCfg colcontainer.DiskQueueCfg, fdSemaphore semaphore.Semaphore, diskAccounts []*mon.BoundAccount, -) (*HashRouter, []colexecop.DrainableOperator) { +) (*HashRouter, []colexecop.DrainableClosableOperator) { outputs := make([]routerOutput, len(unlimitedAllocators)) - outputsAsOps := make([]colexecop.DrainableOperator, len(unlimitedAllocators)) + outputsAsOps := make([]colexecop.DrainableClosableOperator, len(unlimitedAllocators)) // unblockEventsChan is buffered to 2*numOutputs as we don't want the outputs // writing to it to block. // Unblock events only happen after a corresponding block event. Since these diff --git a/pkg/sql/colflow/routers_test.go b/pkg/sql/colflow/routers_test.go index 1f0f31385734..b25a77521fbd 100644 --- a/pkg/sql/colflow/routers_test.go +++ b/pkg/sql/colflow/routers_test.go @@ -1037,7 +1037,7 @@ func TestHashRouterRandom(t *testing.T) { colexectestutils.RunTestsWithFn(t, tu.testAllocator, []colexectestutils.Tuples{data}, nil, func(t *testing.T, inputs []colexecop.Operator) { unblockEventsChan := make(chan struct{}, 2*numOutputs) outputs := make([]routerOutput, numOutputs) - outputsAsOps := make([]colexecop.DrainableOperator, numOutputs) + outputsAsOps := make([]colexecop.DrainableClosableOperator, numOutputs) memoryLimitPerOutput := mtc.bytes / int64(len(outputs)) for i := range outputs { // Create separate monitoring infrastructure as well as diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index b3391ad085c0..0d9439ae8596 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -767,6 +767,7 @@ func (s *vectorizedFlowCreator) setupRouter( ctx, flowCtx, colexecargs.OpWithMetaInfo{ Root: op, MetadataSources: colexecop.MetadataSources{op}, + ToClose: colexecop.Closers{op}, }, outputTyps, stream, factory, nil, /* getStats */ ); err != nil { return err @@ -776,8 +777,8 @@ func (s *vectorizedFlowCreator) setupRouter( opWithMetaInfo := colexecargs.OpWithMetaInfo{ Root: op, MetadataSources: colexecop.MetadataSources{op}, - // ToClose will be closed by the hash router. - ToClose: nil, + // input.ToClose will be closed by the hash router. + ToClose: colexecop.Closers{op}, } if s.recordingStats { mons := []*mon.BytesMonitor{hashRouterMemMonitor, diskMon}