From a454a44743f5735b03783e14944b36f2ece54879 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 4 Sep 2019 11:45:30 -0700 Subject: [PATCH] distsqlrun: make windower respect the memory limits Previously, the windower didn't respect the memory limit testing knob and the setting. Now this is fixed. There is one caveat though: windower requires some amount of RAM to store its intermediate results, so if the testing knob is lower than that, the limit is overwritten, but if the cluster setting is insufficient, an error is returned. Release note: None --- pkg/sql/distsqlrun/hashjoiner.go | 4 +- pkg/sql/distsqlrun/joinreader.go | 2 +- pkg/sql/distsqlrun/windower.go | 82 ++++++++++++------- pkg/sql/logictest/testdata/logic_test/window | 16 ++++ pkg/sql/rowcontainer/hash_row_container.go | 8 +- .../rowcontainer/hash_row_container_test.go | 4 +- pkg/sql/rowcontainer/row_container.go | 14 ++-- pkg/sql/rowcontainer/row_container_test.go | 16 ++-- 8 files changed, 94 insertions(+), 52 deletions(-) diff --git a/pkg/sql/distsqlrun/hashjoiner.go b/pkg/sql/distsqlrun/hashjoiner.go index a57c6eed5c26..eef654b6c04e 100644 --- a/pkg/sql/distsqlrun/hashjoiner.go +++ b/pkg/sql/distsqlrun/hashjoiner.go @@ -736,14 +736,14 @@ func (h *hashJoiner) shouldEmitUnmatched( // initStoredRows initializes a hashRowContainer and sets h.storedRows. func (h *hashJoiner) initStoredRows() error { if h.useTempStorage { - hrc := rowcontainer.MakeHashDiskBackedRowContainer( + hrc := rowcontainer.NewHashDiskBackedRowContainer( &h.rows[h.storedSide], h.evalCtx, h.MemMonitor, h.diskMonitor, h.flowCtx.Cfg.TempStorage, ) - h.storedRows = &hrc + h.storedRows = hrc } else { hrc := rowcontainer.MakeHashMemRowContainer(&h.rows[h.storedSide]) h.storedRows = &hrc diff --git a/pkg/sql/distsqlrun/joinreader.go b/pkg/sql/distsqlrun/joinreader.go index ba5c488d440d..99776fc3bb00 100644 --- a/pkg/sql/distsqlrun/joinreader.go +++ b/pkg/sql/distsqlrun/joinreader.go @@ -243,7 +243,7 @@ func newJoinReader( limitedMon.Start(ctx, flowCtx.EvalCtx.Mon, mon.BoundAccount{}) jr.MemMonitor = &limitedMon jr.diskMonitor = NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "joinreader-disk") - drc := rowcontainer.MakeDiskBackedIndexedRowContainer( + drc := rowcontainer.NewDiskBackedIndexedRowContainer( nil, /* ordering */ jr.desc.ColumnTypesWithMutations(returnMutations), jr.evalCtx, diff --git a/pkg/sql/distsqlrun/windower.go b/pkg/sql/distsqlrun/windower.go index a3c981a1bc71..48fba36dd3a8 100644 --- a/pkg/sql/distsqlrun/windower.go +++ b/pkg/sql/distsqlrun/windower.go @@ -103,6 +103,10 @@ const ( windowerEmittingRows ) +// memRequiredByWindower indicates the minimum amount of RAM (in bytes) that +// the windower needs. +const memRequiredByWindower = 100 * 1024 + // windower is the processor that performs computation of window functions // that have the same PARTITION BY clause. It passes through all of its input // columns and puts the output of a window function windowFn at @@ -160,34 +164,9 @@ func newWindower( evalCtx := flowCtx.NewEvalCtx() w.inputTypes = input.OutputTypes() ctx := evalCtx.Ctx() - memMonitor := NewMonitor(ctx, evalCtx.Mon, "windower-mem") - w.acc = memMonitor.MakeBoundAccount() - w.diskMonitor = NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "windower-disk") - if sp := opentracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) { - w.input = NewInputStatCollector(w.input) - w.finishTrace = w.outputStatsToTrace - } - windowFns := spec.WindowFns w.partitionBy = spec.PartitionBy - allRowsPartitioned := rowcontainer.MakeHashDiskBackedRowContainer( - nil, /* memRowContainer */ - evalCtx, - memMonitor, - w.diskMonitor, - flowCtx.Cfg.TempStorage, - ) - w.allRowsPartitioned = &allRowsPartitioned - if err := w.allRowsPartitioned.Init( - ctx, - false, /* shouldMark */ - w.inputTypes, - w.partitionBy, - true, /* encodeNull */ - ); err != nil { - return nil, err - } - + windowFns := spec.WindowFns w.windowFns = make([]*windowFunc, 0, len(windowFns)) w.builtins = make([]tree.WindowFunc, 0, len(windowFns)) // windower passes through all of its input columns and appends an output @@ -227,7 +206,7 @@ func newWindower( evalCtx, processorID, output, - memMonitor, + nil, /* memMonitor */ ProcStateOpts{InputsToDrain: []RowSource{w.input}, TrailingMetaCallback: func(context.Context) []distsqlpb.ProducerMetadata { w.close() @@ -237,6 +216,53 @@ func newWindower( return nil, err } + st := flowCtx.Cfg.Settings + // Limit the memory use by creating a child monitor with a hard limit. + // windower will overflow to disk if this limit is not enough. + limit := flowCtx.Cfg.TestingKnobs.MemoryLimitBytes + if limit <= 0 { + limit = settingWorkMemBytes.Get(&st.SV) + if limit < memRequiredByWindower { + return nil, errors.Errorf( + "window functions require %d bytes of RAM but only %d are in the budget. "+ + "Consider increasing sql.distsql.temp_storage.workmem setting", + memRequiredByWindower, limit) + } + } else { + if limit < memRequiredByWindower { + // The limit is set very low by the tests, but the windower requires + // some amount of RAM, so we override the limit. + limit = memRequiredByWindower + } + } + limitedMon := mon.MakeMonitorInheritWithLimit("windower-limited", limit, evalCtx.Mon) + limitedMon.Start(ctx, evalCtx.Mon, mon.BoundAccount{}) + w.MemMonitor = &limitedMon + w.diskMonitor = NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "windower-disk") + w.allRowsPartitioned = rowcontainer.NewHashDiskBackedRowContainer( + nil, /* memRowContainer */ + evalCtx, + w.MemMonitor, + w.diskMonitor, + flowCtx.Cfg.TempStorage, + ) + if err := w.allRowsPartitioned.Init( + ctx, + false, /* shouldMark */ + w.inputTypes, + w.partitionBy, + true, /* encodeNull */ + ); err != nil { + return nil, err + } + + w.acc = w.MemMonitor.MakeBoundAccount() + + if sp := opentracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) { + w.input = NewInputStatCollector(w.input) + w.finishTrace = w.outputStatsToTrace + } + return w, nil } @@ -666,7 +692,7 @@ func (w *windower) computeWindowFunctions(ctx context.Context, evalCtx *tree.Eva // w.partition will have ordering as needed by the first window function to // be processed. ordering := distsqlpb.ConvertToColumnOrdering(w.windowFns[w.orderOfWindowFnsProcessing[0]].ordering) - w.partition = rowcontainer.MakeDiskBackedIndexedRowContainer( + w.partition = rowcontainer.NewDiskBackedIndexedRowContainer( ordering, w.inputTypes, w.evalCtx, diff --git a/pkg/sql/logictest/testdata/logic_test/window b/pkg/sql/logictest/testdata/logic_test/window index 3af555520572..4625a5c29838 100644 --- a/pkg/sql/logictest/testdata/logic_test/window +++ b/pkg/sql/logictest/testdata/logic_test/window @@ -3545,3 +3545,19 @@ SELECT *, avg(w) OVER (PARTITION BY w, z ORDER BY y) FROM wxyz ORDER BY z, w, y ---- 2 10 2 0 2 4 10 2 0 4 + +# Test that windower respects the memory limit set via the cluster setting. +statement ok +SET CLUSTER SETTING sql.distsql.temp_storage.workmem='200KB' + +statement ok +CREATE TABLE l (a INT PRIMARY KEY) + +statement ok +INSERT INTO l SELECT g FROM generate_series(0,10000) g(g) + +statement error memory budget exceeded +SELECT array_agg(a) OVER () FROM l LIMIT 1 + +statement ok +RESET CLUSTER SETTING sql.distsql.temp_storage.workmem diff --git a/pkg/sql/rowcontainer/hash_row_container.go b/pkg/sql/rowcontainer/hash_row_container.go index 2a077883dd1a..cf0af75e69fc 100644 --- a/pkg/sql/rowcontainer/hash_row_container.go +++ b/pkg/sql/rowcontainer/hash_row_container.go @@ -765,21 +765,21 @@ type HashDiskBackedRowContainer struct { var _ HashRowContainer = &HashDiskBackedRowContainer{} -// MakeHashDiskBackedRowContainer makes a HashDiskBackedRowContainer. +// NewHashDiskBackedRowContainer makes a HashDiskBackedRowContainer. // mrc (the first argument) can either be nil (in which case // HashMemRowContainer will be built upon an empty MemRowContainer) or non-nil // (in which case mrc is used as underlying MemRowContainer under // HashMemRowContainer). The latter case is used by the hashJoiner since when // initializing HashDiskBackedRowContainer it will have accumulated rows from // both sides of the join in MemRowContainers, and we can reuse one of them. -func MakeHashDiskBackedRowContainer( +func NewHashDiskBackedRowContainer( mrc *MemRowContainer, evalCtx *tree.EvalContext, memoryMonitor *mon.BytesMonitor, diskMonitor *mon.BytesMonitor, engine diskmap.Factory, -) HashDiskBackedRowContainer { - return HashDiskBackedRowContainer{ +) *HashDiskBackedRowContainer { + return &HashDiskBackedRowContainer{ mrc: mrc, evalCtx: evalCtx, memoryMonitor: memoryMonitor, diff --git a/pkg/sql/rowcontainer/hash_row_container_test.go b/pkg/sql/rowcontainer/hash_row_container_test.go index f53e47166ac7..168a4c5a62a4 100644 --- a/pkg/sql/rowcontainer/hash_row_container_test.go +++ b/pkg/sql/rowcontainer/hash_row_container_test.go @@ -68,7 +68,7 @@ func TestHashDiskBackedRowContainer(t *testing.T) { types := sqlbase.OneIntCol ordering := sqlbase.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}} - rc := MakeHashDiskBackedRowContainer(nil, &evalCtx, &memoryMonitor, &diskMonitor, tempEngine) + rc := NewHashDiskBackedRowContainer(nil, &evalCtx, &memoryMonitor, &diskMonitor, tempEngine) err = rc.Init( ctx, false, /* shouldMark */ @@ -373,7 +373,7 @@ func TestHashDiskBackedRowContainerPreservesMatchesAndMarks(t *testing.T) { types := []types.T{*types.Int, *types.Int} ordering := sqlbase.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}} - rc := MakeHashDiskBackedRowContainer(nil, &evalCtx, &memoryMonitor, &diskMonitor, tempEngine) + rc := NewHashDiskBackedRowContainer(nil, &evalCtx, &memoryMonitor, &diskMonitor, tempEngine) err = rc.Init( ctx, true, /* shouldMark */ diff --git a/pkg/sql/rowcontainer/row_container.go b/pkg/sql/rowcontainer/row_container.go index ca005eb6ffc4..81f0593128ff 100644 --- a/pkg/sql/rowcontainer/row_container.go +++ b/pkg/sql/rowcontainer/row_container.go @@ -16,8 +16,6 @@ import ( "fmt" "unsafe" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -134,7 +132,7 @@ type MemRowContainer struct { } var _ heap.Interface = &MemRowContainer{} -var _ SortableRowContainer = &MemRowContainer{} +var _ IndexedRowContainer = &MemRowContainer{} // Init initializes the MemRowContainer. The MemRowContainer uses evalCtx.Mon // to track memory usage. @@ -357,7 +355,7 @@ type DiskBackedRowContainer struct { diskMonitor *mon.BytesMonitor } -var _ SortableRowContainer = &DiskBackedRowContainer{} +var _ ReorderableRowContainer = &DiskBackedRowContainer{} // Init initializes a DiskBackedRowContainer. // Arguments: @@ -481,7 +479,7 @@ func (f *DiskBackedRowContainer) UsingDisk() bool { // memory error. Returns whether the DiskBackedRowContainer spilled to disk and // an error if one occurred while doing so. func (f *DiskBackedRowContainer) spillIfMemErr(ctx context.Context, err error) (bool, error) { - if code := pgerror.GetPGCode(err); code != pgcode.OutOfMemory { + if !sqlbase.IsOutOfMemoryError(err) { return false, nil } if spillErr := f.SpillToDisk(ctx); spillErr != nil { @@ -561,7 +559,9 @@ type DiskBackedIndexedRowContainer struct { DisableCache bool } -// MakeDiskBackedIndexedRowContainer creates a DiskBackedIndexedRowContainer +var _ IndexedRowContainer = &DiskBackedIndexedRowContainer{} + +// NewDiskBackedIndexedRowContainer creates a DiskBackedIndexedRowContainer // with the given engine as the underlying store that rows are stored on when // it spills to disk. // Arguments: @@ -575,7 +575,7 @@ type DiskBackedIndexedRowContainer struct { // - diskMonitor is used to monitor this container's disk usage. // - rowCapacity (if not 0) specifies the number of rows in-memory container // should be preallocated for. -func MakeDiskBackedIndexedRowContainer( +func NewDiskBackedIndexedRowContainer( ordering sqlbase.ColumnOrdering, typs []types.T, evalCtx *tree.EvalContext, diff --git a/pkg/sql/rowcontainer/row_container_test.go b/pkg/sql/rowcontainer/row_container_test.go index 19d40b68a824..77d79a3115f2 100644 --- a/pkg/sql/rowcontainer/row_container_test.go +++ b/pkg/sql/rowcontainer/row_container_test.go @@ -429,7 +429,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { } func() { - rc := MakeDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) + rc := NewDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) defer rc.Close(ctx) mid := numRows / 2 for i := 0; i < mid; i++ { @@ -493,7 +493,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { } func() { - rc := MakeDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) + rc := NewDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) defer rc.Close(ctx) for _, row := range rows { if err := rc.AddRow(ctx, row); err != nil { @@ -588,7 +588,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { } func() { - rc := MakeDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) + rc := NewDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) defer rc.Close(ctx) if err := rc.SpillToDisk(ctx); err != nil { t.Fatal(err) @@ -654,7 +654,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { storedTypes[len(typs)] = sqlbase.OneIntCol[0] func() { - rc := MakeDiskBackedIndexedRowContainer(ordering, typs, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) + rc := NewDiskBackedIndexedRowContainer(ordering, typs, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) defer rc.Close(ctx) for i := 0; i < numRows; i++ { if err := rc.AddRow(ctx, rows[i]); err != nil { @@ -695,7 +695,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { storedTypes[len(typs)] = sqlbase.OneIntCol[0] func() { - d := MakeDiskBackedIndexedRowContainer(ordering, typs, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) + d := NewDiskBackedIndexedRowContainer(ordering, typs, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) defer d.Close(ctx) if err := d.SpillToDisk(ctx); err != nil { t.Fatal(err) @@ -855,7 +855,7 @@ func BenchmarkDiskBackedIndexedRowContainer(b *testing.B) { accessPattern := generateAccessPattern(numRows) b.Run("InMemory", func(b *testing.B) { - rc := MakeDiskBackedIndexedRowContainer(nil, sqlbase.OneIntCol, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) + rc := NewDiskBackedIndexedRowContainer(nil, sqlbase.OneIntCol, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) defer rc.Close(ctx) for i := 0; i < len(rows); i++ { if err := rc.AddRow(ctx, rows[i]); err != nil { @@ -877,7 +877,7 @@ func BenchmarkDiskBackedIndexedRowContainer(b *testing.B) { }) b.Run("OnDiskWithCache", func(b *testing.B) { - rc := MakeDiskBackedIndexedRowContainer(nil, sqlbase.OneIntCol, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) + rc := NewDiskBackedIndexedRowContainer(nil, sqlbase.OneIntCol, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) defer rc.Close(ctx) if err := rc.SpillToDisk(ctx); err != nil { b.Fatal(err) @@ -902,7 +902,7 @@ func BenchmarkDiskBackedIndexedRowContainer(b *testing.B) { }) b.Run("OnDiskWithoutCache", func(b *testing.B) { - rc := MakeDiskBackedIndexedRowContainer(nil, sqlbase.OneIntCol, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) + rc := NewDiskBackedIndexedRowContainer(nil, sqlbase.OneIntCol, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) defer rc.Close(ctx) if err := rc.SpillToDisk(ctx); err != nil { b.Fatal(err)