Skip to content

Commit

Permalink
sql: remove EvalContext.ActiveMemAcc
Browse files Browse the repository at this point in the history
This memory account was used to track allocations performed by builtins
that allocated memory, mainly string manipulation builtins. It was
pretty hard to use this correctly, as the lifetimes of the account were
never particularly clear.

There were a couple of hacks in place to try to clear the memory at the
right times, but they didn't work correctly, leading to crashes that
were hard to diagnose.

Rather than try to make this work perfectly, this commit removes the
memory accounting for these builtins and replaces it with a hard cap on
the size of string allocations - set arbitrarily to 64 MB at this time.

Release note: None
  • Loading branch information
jordanlewis committed Feb 21, 2019
1 parent 1387b4f commit 2b00f15
Show file tree
Hide file tree
Showing 18 changed files with 56 additions and 241 deletions.
103 changes: 3 additions & 100 deletions pkg/sql/builtin_mem_usage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@ package sql
import (
"context"
gosql "database/sql"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/lib/pq"
Expand Down Expand Up @@ -96,71 +92,14 @@ func TestAggregatesMonitorMemory(t *testing.T) {
}
}

func TestBuiltinsAccountForMemory(t *testing.T) {
defer leaktest.AfterTest(t)()

_, repeatFns := builtins.GetBuiltinProperties("repeat")
_, concatFns := builtins.GetBuiltinProperties("concat")
_, concatwsFns := builtins.GetBuiltinProperties("concat_ws")
_, lowerFns := builtins.GetBuiltinProperties("lower")

testData := []struct {
builtin tree.Overload
args tree.Datums
expectedAllocation int64
}{
{repeatFns[0],
tree.Datums{
tree.NewDString("abc"),
tree.NewDInt(123),
},
int64(3 * 123)},
{concatFns[0],
tree.Datums{
tree.NewDString("abc"),
tree.NewDString("abc"),
},
int64(3 + 3)},
{concatwsFns[0],
tree.Datums{
tree.NewDString("!"),
tree.NewDString("abc"),
tree.NewDString("abc"),
},
int64(3 + 1 + 3)},
{lowerFns[0],
tree.Datums{
tree.NewDString("ABC"),
},
int64(3)},
}

for _, test := range testData {
t.Run("", func(t *testing.T) {
evalCtx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings())
defer evalCtx.Stop(context.Background())
defer evalCtx.ActiveMemAcc.Close(context.Background())
previouslyAllocated := evalCtx.ActiveMemAcc.Used()
_, err := test.builtin.Fn(evalCtx, test.args)
if err != nil {
t.Fatal(err)
}
deltaAllocated := evalCtx.ActiveMemAcc.Used() - previouslyAllocated
if deltaAllocated != test.expectedAllocation {
t.Errorf("Expected to allocate %d, actually allocated %d", test.expectedAllocation, deltaAllocated)
}
})
}
}

func TestEvaluatedMemoryIsChecked(t *testing.T) {
defer leaktest.AfterTest(t)()
// We select the LENGTH here and elsewhere because if we passed the result of
// REPEAT up as a result, the memory error would be caught there even if
// REPEAT was not doing its accounting.
testData := []string{
`SELECT length(repeat('abc', 300000))`,
`SELECT crdb_internal.no_constant_folding(length(repeat('abc', 300000)))`,
`SELECT length(repeat('abc', 70000000))`,
`SELECT crdb_internal.no_constant_folding(length(repeat('abc', 70000000)))`,
}

for _, statement := range testData {
Expand All @@ -172,45 +111,9 @@ func TestEvaluatedMemoryIsChecked(t *testing.T) {

if _, err := sqlDB.Exec(
statement,
); err.(*pq.Error).Code != pgerror.CodeOutOfMemoryError {
); err.(*pq.Error).Code != pgerror.CodeProgramLimitExceededError {
t.Errorf("Expected \"%s\" to OOM, but it didn't", statement)
}
})
}
}

func TestMemoryGetsFreedOnEachRow(t *testing.T) {
defer leaktest.AfterTest(t)()
// This test verifies that the memory allocated during the computation of a
// row gets freed before moving on to subsequent rows.

s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
SQLMemoryPoolSize: lowMemoryBudget,
})
defer s.Stopper().Stop(context.Background())

stringLength := 300000
numRows := 100

// Check that if this string is allocated per-row, we don't OOM.
if _, err := sqlDB.Exec(
fmt.Sprintf(
`SELECT crdb_internal.no_constant_folding(length(repeat('a', %d))) FROM generate_series(1, %d)`,
stringLength,
numRows,
),
); err != nil {
t.Fatalf("Expected statement to run successfully, but got %s", err)
}

// Ensure that if this memory is all allocated at once, we OOM.
if _, err := sqlDB.Exec(
fmt.Sprintf(
`SELECT crdb_internal.no_constant_folding(length(repeat('a', %d * %d)))`,
stringLength,
numRows,
),
); err.(*pq.Error).Code != pgerror.CodeOutOfMemoryError {
t.Fatalf("Expected statement to OOM, but it didn't")
}
}
6 changes: 0 additions & 6 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,12 +414,6 @@ func (ex *connExecutor) execStmtInOpenState(
// contexts.
p.cancelChecker = sqlbase.NewCancelChecker(ctx)

// constantMemAcc accounts for all constant folded values that are computed
// prior to any rows being computed.
constantMemAcc := p.EvalContext().Mon.MakeBoundAccount()
p.EvalContext().ActiveMemAcc = &constantMemAcc
defer constantMemAcc.Close(ctx)

if runInParallel {
cols, err := ex.execStmtInParallel(ctx, p, queryDone)
queryDone = nil
Expand Down
6 changes: 0 additions & 6 deletions pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,6 @@ func (ex *connExecutor) populatePrepared(
prepared := stmt.Prepared

p.extendedEvalCtx.PrepareOnly = true
p.extendedEvalCtx.ActiveMemAcc = &prepared.memAcc
// constantMemAcc accounts for all constant folded values that are computed
// prior to any rows being computed.
constantMemAcc := p.extendedEvalCtx.Mon.MakeBoundAccount()
p.extendedEvalCtx.ActiveMemAcc = &constantMemAcc
defer constantMemAcc.Close(ctx)

protoTS, err := p.isAsOf(stmt.AST, ex.server.cfg.Clock.Now() /* max */)
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,6 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
subqueryMemAccount := subqueryMonitor.MakeBoundAccount()
defer subqueryMemAccount.Close(ctx)

evalCtx.ActiveMemAcc = &subqueryMemAccount

var subqueryPlanCtx *PlanningCtx
var distributeSubquery bool
if maybeDistribute {
Expand Down
31 changes: 2 additions & 29 deletions pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,36 +250,14 @@ func (f *Flow) setupInboundStream(
return nil
}

// This RowReceiver clears its BoundAccount on every input row. This is useful
// for clearing the per-row memory account that's used for expression
// evaluation.
type accountClearingRowReceiver struct {
RowReceiver
ctx context.Context
acc *mon.BoundAccount
}

func (r *accountClearingRowReceiver) Push(
row sqlbase.EncDatumRow, meta *ProducerMetadata,
) ConsumerStatus {
r.acc.Clear(r.ctx)
return r.RowReceiver.Push(row, meta)
}

// setupOutboundStream sets up an output stream; if the stream is local, the
// RowChannel is looked up in the localStreams map; otherwise an outgoing
// mailbox is created.
func (f *Flow) setupOutboundStream(spec distsqlpb.StreamEndpointSpec) (RowReceiver, error) {
sid := spec.StreamID
switch spec.Type {
case distsqlpb.StreamEndpointSpec_SYNC_RESPONSE:
// Wrap the syncFlowConsumer in a row receiver that clears the row's memory
// account.
return &accountClearingRowReceiver{
acc: f.EvalCtx.ActiveMemAcc,
ctx: f.EvalCtx.Ctx(),
RowReceiver: f.syncFlowConsumer,
}, nil
return f.syncFlowConsumer, nil

case distsqlpb.StreamEndpointSpec_REMOTE:
outbox := newOutbox(&f.FlowCtx, spec.TargetNodeID, f.id, sid)
Expand Down Expand Up @@ -373,10 +351,6 @@ func (f *Flow) makeProcessor(
// Initialize any routers (the setupRouter case above) and outboxes.
types := proc.OutputTypes()
rowRecv := output.(*copyingRowReceiver).RowReceiver
clearer, ok := rowRecv.(*accountClearingRowReceiver)
if ok {
rowRecv = clearer.RowReceiver
}
switch o := rowRecv.(type) {
case router:
o.init(ctx, &f.FlowCtx, types)
Expand Down Expand Up @@ -687,8 +661,7 @@ func (f *Flow) Cleanup(ctx context.Context) {
if f.status == FlowFinished {
panic("flow cleanup called twice")
}
// This closes the account and monitor opened in ServerImpl.setupFlow.
f.EvalCtx.ActiveMemAcc.Close(ctx)
// This closes the monitor opened in ServerImpl.setupFlow.
f.EvalCtx.Stop(ctx)
for _, p := range f.processors {
if d, ok := p.(Releasable); ok {
Expand Down
17 changes: 7 additions & 10 deletions pkg/sql/distsqlrun/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (ds *ServerImpl) setupFlow(
// sp will be Finish()ed by Flow.Cleanup().
ctx = opentracing.ContextWithSpan(ctx, sp)

// The monitor and account opened here are closed in Flow.Cleanup().
// The monitor opened here are closed in Flow.Cleanup().
monitor := mon.MakeMonitor(
"flow",
mon.MemoryResource,
Expand All @@ -331,7 +331,6 @@ func (ds *ServerImpl) setupFlow(
ds.Settings,
)
monitor.Start(ctx, parentMonitor, mon.BoundAccount{})
acc := monitor.MakeBoundAccount()

// Figure out what txn the flow needs to run in, if any.
// For local flows, the txn comes from localState.Txn. For non-local ones, we
Expand All @@ -355,7 +354,6 @@ func (ds *ServerImpl) setupFlow(
if localState.EvalContext != nil {
evalCtx = localState.EvalContext
evalCtx.Mon = &monitor
evalCtx.ActiveMemAcc = &acc
evalCtx.Txn = txn
} else {
location, err := timeutil.TimeZoneStringToLocation(req.EvalContext.Location)
Expand Down Expand Up @@ -403,13 +401,12 @@ func (ds *ServerImpl) setupFlow(
evalPlanner := &sqlbase.DummyEvalPlanner{}
sequence := &sqlbase.DummySequenceOperators{}
evalCtx = &tree.EvalContext{
Settings: ds.ServerConfig.Settings,
SessionData: sd,
ClusterID: ds.ServerConfig.ClusterID.Get(),
NodeID: nodeID,
ReCache: ds.regexpCache,
Mon: &monitor,
ActiveMemAcc: &acc,
Settings: ds.ServerConfig.Settings,
SessionData: sd,
ClusterID: ds.ServerConfig.ClusterID.Get(),
NodeID: nodeID,
ReCache: ds.regexpCache,
Mon: &monitor,
// TODO(andrei): This is wrong. Each processor should override Ctx with its
// own context.
Context: ctx,
Expand Down
5 changes: 0 additions & 5 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,11 +489,6 @@ func (dc *databaseCacheHolder) updateSystemConfig(cfg *config.SystemConfig) {
func forEachRow(params runParams, p planNode, f func(tree.Datums) error) error {
next, err := p.Next(params)
for ; next; next, err = p.Next(params) {
// If we're tracking memory, clear the previous row's memory account.
if params.extendedEvalCtx.ActiveMemAcc != nil {
params.extendedEvalCtx.ActiveMemAcc.Clear(params.ctx)
}

if err := f(p.Values()); err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/logictest/testdata/logic_test/builtin_function
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ SELECT repeat('Pg', -1) || 'empty'
----
empty

statement error pq: repeat\(\): .* memory budget exceeded
statement error pq: repeat\(\): requested length too large
SELECT repeat('s', 9223372036854775807)

# Regression for #19035.
Expand Down Expand Up @@ -2012,10 +2012,10 @@ SELECT lpad('abc', 5, ''), rpad('abc', 5, '')
----
abc abc

query error memory budget exceeded
query error requested length too large
SELECT lpad('abc', 100000000000000)

query error memory budget exceeded
query error requested length too large
SELECT rpad('abc', 100000000000000)

query TT
Expand Down
4 changes: 0 additions & 4 deletions pkg/sql/plan_node_to_row_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,6 @@ func (p *planNodeToRowSource) Next() (sqlbase.EncDatumRow, *distsqlrun.ProducerM
// by Nexting our source until exhaustion.
next, err := p.node.Next(p.params)
for ; next; next, err = p.node.Next(p.params) {
// If we're tracking memory, clear the previous row's memory account.
if p.params.extendedEvalCtx.ActiveMemAcc != nil {
p.params.extendedEvalCtx.ActiveMemAcc.Clear(p.params.ctx)
}
count++
}
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/plan_spans.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ func insertNodeWithValuesSpans(
// completed, so that the values don't need to be computed again during
// plan execution.
rowAcc := params.extendedEvalCtx.Mon.MakeBoundAccount()
params.extendedEvalCtx.ActiveMemAcc = &rowAcc
defer rowAcc.Close(params.ctx)

defer v.Reset(params.ctx)
Expand Down
9 changes: 4 additions & 5 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,11 @@ func newInternalPlanner(
p.extendedEvalCtx.Placeholders = &p.semaCtx.Placeholders
p.extendedEvalCtx.Tables = tables

acc := plannerMon.MakeBoundAccount()
p.extendedEvalCtx.ActiveMemAcc = &acc

p.queryCacheSession.Init()

return p, func() {
// Note that we capture ctx here. This is only valid as long as we create
// the context as explained at the top of the method.
acc.Close(ctx)
plannerMon.Stop(ctx)
}
}
Expand Down Expand Up @@ -556,8 +552,11 @@ func (p *planner) runWithDistSQL(
columns := planColumns(plan)

// Initialize a row container for the DistSQL execution engine to write into.
// The caller of this method will call Close on the returned RowContainer,
// which will close this account.
acc := planCtx.EvalContext().Mon.MakeBoundAccount()
ci := sqlbase.ColTypeInfoFromResCols(columns)
rows := rowcontainer.NewRowContainer(*p.extendedEvalCtx.ActiveMemAcc, ci, 0 /* rowCapacity */)
rows := rowcontainer.NewRowContainer(acc, ci, 0 /* rowCapacity */)
rowResultWriter := NewRowResultWriter(rows)
recv := MakeDistSQLReceiver(
ctx,
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/scrub.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,8 @@ func scrubRunDistSQL(
columnTypes []sqlbase.ColumnType,
) (*rowcontainer.RowContainer, error) {
ci := sqlbase.ColTypeInfoFromColTypes(columnTypes)
rows := rowcontainer.NewRowContainer(*p.extendedEvalCtx.ActiveMemAcc, ci, 0 /* rowCapacity */)
acc := p.extendedEvalCtx.Mon.MakeBoundAccount()
rows := rowcontainer.NewRowContainer(acc, ci, 0 /* rowCapacity */)
rowResultWriter := NewRowResultWriter(rows)
recv := MakeDistSQLReceiver(
ctx,
Expand Down
Loading

0 comments on commit 2b00f15

Please sign in to comment.