Skip to content

Commit

Permalink
distsqlrun: make windower respect the memory limits
Browse files Browse the repository at this point in the history
Previously, the windower didn't respect the memory limit testing knob and
the setting. Now this is fixed. There is one caveat though: windower
requires some amount of RAM to store its intermediate results, so if the
testing knob is lower than that, the limit is overwritten, but if the
cluster setting is insufficient, an error is returned.

Release note: None
  • Loading branch information
yuzefovich committed Sep 10, 2019
1 parent 3256e02 commit a454a44
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 52 deletions.
4 changes: 2 additions & 2 deletions pkg/sql/distsqlrun/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,14 +736,14 @@ func (h *hashJoiner) shouldEmitUnmatched(
// initStoredRows initializes a hashRowContainer and sets h.storedRows.
func (h *hashJoiner) initStoredRows() error {
if h.useTempStorage {
hrc := rowcontainer.MakeHashDiskBackedRowContainer(
hrc := rowcontainer.NewHashDiskBackedRowContainer(
&h.rows[h.storedSide],
h.evalCtx,
h.MemMonitor,
h.diskMonitor,
h.flowCtx.Cfg.TempStorage,
)
h.storedRows = &hrc
h.storedRows = hrc
} else {
hrc := rowcontainer.MakeHashMemRowContainer(&h.rows[h.storedSide])
h.storedRows = &hrc
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func newJoinReader(
limitedMon.Start(ctx, flowCtx.EvalCtx.Mon, mon.BoundAccount{})
jr.MemMonitor = &limitedMon
jr.diskMonitor = NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "joinreader-disk")
drc := rowcontainer.MakeDiskBackedIndexedRowContainer(
drc := rowcontainer.NewDiskBackedIndexedRowContainer(
nil, /* ordering */
jr.desc.ColumnTypesWithMutations(returnMutations),
jr.evalCtx,
Expand Down
82 changes: 54 additions & 28 deletions pkg/sql/distsqlrun/windower.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ const (
windowerEmittingRows
)

// memRequiredByWindower indicates the minimum amount of RAM (in bytes) that
// the windower needs.
const memRequiredByWindower = 100 * 1024

// windower is the processor that performs computation of window functions
// that have the same PARTITION BY clause. It passes through all of its input
// columns and puts the output of a window function windowFn at
Expand Down Expand Up @@ -160,34 +164,9 @@ func newWindower(
evalCtx := flowCtx.NewEvalCtx()
w.inputTypes = input.OutputTypes()
ctx := evalCtx.Ctx()
memMonitor := NewMonitor(ctx, evalCtx.Mon, "windower-mem")
w.acc = memMonitor.MakeBoundAccount()
w.diskMonitor = NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "windower-disk")
if sp := opentracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) {
w.input = NewInputStatCollector(w.input)
w.finishTrace = w.outputStatsToTrace
}

windowFns := spec.WindowFns
w.partitionBy = spec.PartitionBy
allRowsPartitioned := rowcontainer.MakeHashDiskBackedRowContainer(
nil, /* memRowContainer */
evalCtx,
memMonitor,
w.diskMonitor,
flowCtx.Cfg.TempStorage,
)
w.allRowsPartitioned = &allRowsPartitioned
if err := w.allRowsPartitioned.Init(
ctx,
false, /* shouldMark */
w.inputTypes,
w.partitionBy,
true, /* encodeNull */
); err != nil {
return nil, err
}

windowFns := spec.WindowFns
w.windowFns = make([]*windowFunc, 0, len(windowFns))
w.builtins = make([]tree.WindowFunc, 0, len(windowFns))
// windower passes through all of its input columns and appends an output
Expand Down Expand Up @@ -227,7 +206,7 @@ func newWindower(
evalCtx,
processorID,
output,
memMonitor,
nil, /* memMonitor */
ProcStateOpts{InputsToDrain: []RowSource{w.input},
TrailingMetaCallback: func(context.Context) []distsqlpb.ProducerMetadata {
w.close()
Expand All @@ -237,6 +216,53 @@ func newWindower(
return nil, err
}

st := flowCtx.Cfg.Settings
// Limit the memory use by creating a child monitor with a hard limit.
// windower will overflow to disk if this limit is not enough.
limit := flowCtx.Cfg.TestingKnobs.MemoryLimitBytes
if limit <= 0 {
limit = settingWorkMemBytes.Get(&st.SV)
if limit < memRequiredByWindower {
return nil, errors.Errorf(
"window functions require %d bytes of RAM but only %d are in the budget. "+
"Consider increasing sql.distsql.temp_storage.workmem setting",
memRequiredByWindower, limit)
}
} else {
if limit < memRequiredByWindower {
// The limit is set very low by the tests, but the windower requires
// some amount of RAM, so we override the limit.
limit = memRequiredByWindower
}
}
limitedMon := mon.MakeMonitorInheritWithLimit("windower-limited", limit, evalCtx.Mon)
limitedMon.Start(ctx, evalCtx.Mon, mon.BoundAccount{})
w.MemMonitor = &limitedMon
w.diskMonitor = NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "windower-disk")
w.allRowsPartitioned = rowcontainer.NewHashDiskBackedRowContainer(
nil, /* memRowContainer */
evalCtx,
w.MemMonitor,
w.diskMonitor,
flowCtx.Cfg.TempStorage,
)
if err := w.allRowsPartitioned.Init(
ctx,
false, /* shouldMark */
w.inputTypes,
w.partitionBy,
true, /* encodeNull */
); err != nil {
return nil, err
}

w.acc = w.MemMonitor.MakeBoundAccount()

if sp := opentracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) {
w.input = NewInputStatCollector(w.input)
w.finishTrace = w.outputStatsToTrace
}

return w, nil
}

Expand Down Expand Up @@ -666,7 +692,7 @@ func (w *windower) computeWindowFunctions(ctx context.Context, evalCtx *tree.Eva
// w.partition will have ordering as needed by the first window function to
// be processed.
ordering := distsqlpb.ConvertToColumnOrdering(w.windowFns[w.orderOfWindowFnsProcessing[0]].ordering)
w.partition = rowcontainer.MakeDiskBackedIndexedRowContainer(
w.partition = rowcontainer.NewDiskBackedIndexedRowContainer(
ordering,
w.inputTypes,
w.evalCtx,
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/window
Original file line number Diff line number Diff line change
Expand Up @@ -3545,3 +3545,19 @@ SELECT *, avg(w) OVER (PARTITION BY w, z ORDER BY y) FROM wxyz ORDER BY z, w, y
----
2 10 2 0 2
4 10 2 0 4

# Test that windower respects the memory limit set via the cluster setting.
statement ok
SET CLUSTER SETTING sql.distsql.temp_storage.workmem='200KB'

statement ok
CREATE TABLE l (a INT PRIMARY KEY)

statement ok
INSERT INTO l SELECT g FROM generate_series(0,10000) g(g)

statement error memory budget exceeded
SELECT array_agg(a) OVER () FROM l LIMIT 1

statement ok
RESET CLUSTER SETTING sql.distsql.temp_storage.workmem
8 changes: 4 additions & 4 deletions pkg/sql/rowcontainer/hash_row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,21 +765,21 @@ type HashDiskBackedRowContainer struct {

var _ HashRowContainer = &HashDiskBackedRowContainer{}

// MakeHashDiskBackedRowContainer makes a HashDiskBackedRowContainer.
// NewHashDiskBackedRowContainer makes a HashDiskBackedRowContainer.
// mrc (the first argument) can either be nil (in which case
// HashMemRowContainer will be built upon an empty MemRowContainer) or non-nil
// (in which case mrc is used as underlying MemRowContainer under
// HashMemRowContainer). The latter case is used by the hashJoiner since when
// initializing HashDiskBackedRowContainer it will have accumulated rows from
// both sides of the join in MemRowContainers, and we can reuse one of them.
func MakeHashDiskBackedRowContainer(
func NewHashDiskBackedRowContainer(
mrc *MemRowContainer,
evalCtx *tree.EvalContext,
memoryMonitor *mon.BytesMonitor,
diskMonitor *mon.BytesMonitor,
engine diskmap.Factory,
) HashDiskBackedRowContainer {
return HashDiskBackedRowContainer{
) *HashDiskBackedRowContainer {
return &HashDiskBackedRowContainer{
mrc: mrc,
evalCtx: evalCtx,
memoryMonitor: memoryMonitor,
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/rowcontainer/hash_row_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestHashDiskBackedRowContainer(t *testing.T) {
types := sqlbase.OneIntCol
ordering := sqlbase.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}}

rc := MakeHashDiskBackedRowContainer(nil, &evalCtx, &memoryMonitor, &diskMonitor, tempEngine)
rc := NewHashDiskBackedRowContainer(nil, &evalCtx, &memoryMonitor, &diskMonitor, tempEngine)
err = rc.Init(
ctx,
false, /* shouldMark */
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestHashDiskBackedRowContainerPreservesMatchesAndMarks(t *testing.T) {
types := []types.T{*types.Int, *types.Int}
ordering := sqlbase.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}}

rc := MakeHashDiskBackedRowContainer(nil, &evalCtx, &memoryMonitor, &diskMonitor, tempEngine)
rc := NewHashDiskBackedRowContainer(nil, &evalCtx, &memoryMonitor, &diskMonitor, tempEngine)
err = rc.Init(
ctx,
true, /* shouldMark */
Expand Down
14 changes: 7 additions & 7 deletions pkg/sql/rowcontainer/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"fmt"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -134,7 +132,7 @@ type MemRowContainer struct {
}

var _ heap.Interface = &MemRowContainer{}
var _ SortableRowContainer = &MemRowContainer{}
var _ IndexedRowContainer = &MemRowContainer{}

// Init initializes the MemRowContainer. The MemRowContainer uses evalCtx.Mon
// to track memory usage.
Expand Down Expand Up @@ -357,7 +355,7 @@ type DiskBackedRowContainer struct {
diskMonitor *mon.BytesMonitor
}

var _ SortableRowContainer = &DiskBackedRowContainer{}
var _ ReorderableRowContainer = &DiskBackedRowContainer{}

// Init initializes a DiskBackedRowContainer.
// Arguments:
Expand Down Expand Up @@ -481,7 +479,7 @@ func (f *DiskBackedRowContainer) UsingDisk() bool {
// memory error. Returns whether the DiskBackedRowContainer spilled to disk and
// an error if one occurred while doing so.
func (f *DiskBackedRowContainer) spillIfMemErr(ctx context.Context, err error) (bool, error) {
if code := pgerror.GetPGCode(err); code != pgcode.OutOfMemory {
if !sqlbase.IsOutOfMemoryError(err) {
return false, nil
}
if spillErr := f.SpillToDisk(ctx); spillErr != nil {
Expand Down Expand Up @@ -561,7 +559,9 @@ type DiskBackedIndexedRowContainer struct {
DisableCache bool
}

// MakeDiskBackedIndexedRowContainer creates a DiskBackedIndexedRowContainer
var _ IndexedRowContainer = &DiskBackedIndexedRowContainer{}

// NewDiskBackedIndexedRowContainer creates a DiskBackedIndexedRowContainer
// with the given engine as the underlying store that rows are stored on when
// it spills to disk.
// Arguments:
Expand All @@ -575,7 +575,7 @@ type DiskBackedIndexedRowContainer struct {
// - diskMonitor is used to monitor this container's disk usage.
// - rowCapacity (if not 0) specifies the number of rows in-memory container
// should be preallocated for.
func MakeDiskBackedIndexedRowContainer(
func NewDiskBackedIndexedRowContainer(
ordering sqlbase.ColumnOrdering,
typs []types.T,
evalCtx *tree.EvalContext,
Expand Down
16 changes: 8 additions & 8 deletions pkg/sql/rowcontainer/row_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) {
}

func() {
rc := MakeDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */)
rc := NewDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */)
defer rc.Close(ctx)
mid := numRows / 2
for i := 0; i < mid; i++ {
Expand Down Expand Up @@ -493,7 +493,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) {
}

func() {
rc := MakeDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */)
rc := NewDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */)
defer rc.Close(ctx)
for _, row := range rows {
if err := rc.AddRow(ctx, row); err != nil {
Expand Down Expand Up @@ -588,7 +588,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) {
}

func() {
rc := MakeDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */)
rc := NewDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */)
defer rc.Close(ctx)
if err := rc.SpillToDisk(ctx); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -654,7 +654,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) {
storedTypes[len(typs)] = sqlbase.OneIntCol[0]

func() {
rc := MakeDiskBackedIndexedRowContainer(ordering, typs, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */)
rc := NewDiskBackedIndexedRowContainer(ordering, typs, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */)
defer rc.Close(ctx)
for i := 0; i < numRows; i++ {
if err := rc.AddRow(ctx, rows[i]); err != nil {
Expand Down Expand Up @@ -695,7 +695,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) {
storedTypes[len(typs)] = sqlbase.OneIntCol[0]

func() {
d := MakeDiskBackedIndexedRowContainer(ordering, typs, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */)
d := NewDiskBackedIndexedRowContainer(ordering, typs, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */)
defer d.Close(ctx)
if err := d.SpillToDisk(ctx); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -855,7 +855,7 @@ func BenchmarkDiskBackedIndexedRowContainer(b *testing.B) {
accessPattern := generateAccessPattern(numRows)

b.Run("InMemory", func(b *testing.B) {
rc := MakeDiskBackedIndexedRowContainer(nil, sqlbase.OneIntCol, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */)
rc := NewDiskBackedIndexedRowContainer(nil, sqlbase.OneIntCol, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */)
defer rc.Close(ctx)
for i := 0; i < len(rows); i++ {
if err := rc.AddRow(ctx, rows[i]); err != nil {
Expand All @@ -877,7 +877,7 @@ func BenchmarkDiskBackedIndexedRowContainer(b *testing.B) {
})

b.Run("OnDiskWithCache", func(b *testing.B) {
rc := MakeDiskBackedIndexedRowContainer(nil, sqlbase.OneIntCol, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */)
rc := NewDiskBackedIndexedRowContainer(nil, sqlbase.OneIntCol, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */)
defer rc.Close(ctx)
if err := rc.SpillToDisk(ctx); err != nil {
b.Fatal(err)
Expand All @@ -902,7 +902,7 @@ func BenchmarkDiskBackedIndexedRowContainer(b *testing.B) {
})

b.Run("OnDiskWithoutCache", func(b *testing.B) {
rc := MakeDiskBackedIndexedRowContainer(nil, sqlbase.OneIntCol, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */)
rc := NewDiskBackedIndexedRowContainer(nil, sqlbase.OneIntCol, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */)
defer rc.Close(ctx)
if err := rc.SpillToDisk(ctx); err != nil {
b.Fatal(err)
Expand Down

0 comments on commit a454a44

Please sign in to comment.