Skip to content

Commit

Permalink
sql: always close planNode trees
Browse files Browse the repository at this point in the history
The distsql merge introduced some confusing, incorrect contracts around
planNode tree closure. This commit restores order: planNode trees are
now unconditionally closed after a flow is finished, and when a flow is
closed it releases all of its resources that aren't managed by embedded
planNode trees if it has any.

Release note: None
  • Loading branch information
jordanlewis committed Aug 20, 2018
1 parent 2ba7e0c commit ccc5a8a
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 64 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
err = planner.makePlan(ctx, stmt)
enhanceErrWithCorrelation(err, isCorrelated)
}
defer planner.curPlan.close(ctx)

defer func() { planner.maybeLogStatement(ctx, "exec", res.RowsAffected(), res.Err()) }()

Expand Down Expand Up @@ -806,7 +807,6 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ex.sessionTracing.TraceExecStart(ctx, "distributed")
err = ex.execWithDistSQLEngine(ctx, planner, stmt.AST.StatementType(), res, distributePlan)
} else {
defer planner.curPlan.close(ctx)
ex.sessionTracing.TraceExecStart(ctx, "local")
err = ex.execWithLocalEngine(ctx, planner, stmt.AST.StatementType(), res)
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/sql/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (n *distinctNode) startExec(params runParams) error {
return err
}

n.run = makeRowSourceToPlanNode(proc, nil /* forwarder */, planColumns(n))
n.run = makeRowSourceToPlanNode(proc, nil /* forwarder */, planColumns(n), nil /* originalPlanNode */)

n.run.source.Start(params.ctx)

Expand All @@ -247,11 +247,8 @@ func (n *distinctNode) Values() tree.Datums {
func (n *distinctNode) Close(ctx context.Context) {
if n.run != nil {
n.run.Close(ctx)
} else {
// If we haven't gotten around to initializing n.run yet, then we still
// need to propagate the close message to our inputs - do so directly.
n.plan.Close(ctx)
}
n.plan.Close(ctx)
}

// projectChildPropsToOnExprs takes the physical props (e.g. ordering info,
Expand Down
11 changes: 8 additions & 3 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,9 +480,14 @@ type PlanningCtx struct {
NodeAddresses map[roachpb.NodeID]string

// isLocal is set to true if we're planning this query on a single node.
isLocal bool
planner *planner
stmtType tree.StatementType
isLocal bool
planner *planner
// ignoreClose, when set to true, will prevent the closing of the planner's
// current plan. The top-level query needs to close it, but everything else
// (like subqueries or EXPLAIN ANALYZE) should set this to true to avoid
// double closes of the planNode tree.
ignoreClose bool
stmtType tree.StatementType
// planDepth is set to the current depth of the planNode tree. It's used to
// keep track of whether it's valid to run a root node in a special fast path
// mode.
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ func (dsp *DistSQLPlanner) Run(
log.Fatalf(ctx, "unexpected error from syncFlow.Start(): %s "+
"The error should have gone to the consumer.", err)
}
// We need to close the planNode tree we translated into a DistSQL plan before
// flow.Cleanup, which closes memory accounts that expect to be emptied.
if planCtx.planner != nil && !planCtx.ignoreClose {
planCtx.planner.curPlan.close(ctx)
}

flow.Wait()
flow.Cleanup(ctx)
}
Expand Down Expand Up @@ -617,6 +623,9 @@ func (dsp *DistSQLPlanner) PlanAndRunSubqueries(
subqueryPlanCtx.planner = planner
subqueryPlanCtx.stmtType = tree.Rows
subqueryPlanCtx.validExtendedEvalCtx = true
// Don't close the top-level plan from subqueries - someone else will handle
// that.
subqueryPlanCtx.ignoreClose = true

subqueryPhysPlan, err := dsp.createPlanForNode(&subqueryPlanCtx, subqueryPlan.plan)
if err != nil {
Expand Down
8 changes: 2 additions & 6 deletions pkg/sql/explain_distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (n *explainDistSQLNode) startExec(params runParams) error {

planCtx := distSQLPlanner.NewPlanningCtx(params.ctx, params.extendedEvalCtx, params.p.txn)
planCtx.isLocal = !shouldDistributeGivenRecAndMode(recommendation, params.SessionData().DistSQLMode)
planCtx.ignoreClose = true
planCtx.planner = params.p
planCtx.stmtType = n.stmtType
planCtx.validExtendedEvalCtx = true
Expand Down Expand Up @@ -154,10 +155,5 @@ func (n *explainDistSQLNode) Next(runParams) (bool, error) {

func (n *explainDistSQLNode) Values() tree.Datums { return n.run.values }
func (n *explainDistSQLNode) Close(ctx context.Context) {
// If we managed to execute and we were in ANALYZE mode, our child planNode
// tree will already have been closed - so don't do anything to avoid a double
// close of that tree.
if !n.run.executedStatement {
n.plan.Close(ctx)
}
n.plan.Close(ctx)
}
102 changes: 58 additions & 44 deletions pkg/sql/plan_node_to_row_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (p *planNodeToRowSource) SetInput(ctx context.Context, input distsqlrun.Row
return walkPlan(ctx, p.node, planObserver{
replaceNode: func(ctx context.Context, nodeName string, plan planNode) (planNode, error) {
if plan == p.firstNotWrapped {
return makeRowSourceToPlanNode(input, p, planColumns(p.firstNotWrapped)), nil
return makeRowSourceToPlanNode(input, p, planColumns(p.firstNotWrapped), p.firstNotWrapped), nil
}
return nil, nil
},
Expand All @@ -120,12 +120,23 @@ func (p *planNodeToRowSource) OutputTypes() []sqlbase.ColumnType {

func (p *planNodeToRowSource) Start(ctx context.Context) context.Context {
p.params.ctx = ctx
if !p.started {
p.started = true
// This starts all of the nodes below this node.
if err := startExec(p.params, p.node); err != nil {
p.internalClose()
p.metadataBuf = append(p.metadataBuf, &distsqlrun.ProducerMetadata{Err: err})
return ctx
}
}
return ctx
}

func (p *planNodeToRowSource) internalClose() {
if p.running {
p.node.Close(p.params.ctx)
if p.toDrain != nil {
p.toDrain.ConsumerClosed()
}
p.running = false
p.started = true
}
Expand All @@ -140,48 +151,39 @@ func (p *planNodeToRowSource) startExec(_ runParams) error {
}

func (p *planNodeToRowSource) Next() (sqlbase.EncDatumRow, *distsqlrun.ProducerMetadata) {
if !p.started {
p.started = true
// This starts all of the nodes below this node.
if err := startExec(p.params, p.node); err != nil {
p.internalClose()
return nil, &distsqlrun.ProducerMetadata{Err: err}
}

if p.fastPath {
var count int
// If our node is a "fast path node", it means that we're set up to just
// return a row count. So trigger the fast path and return the row count as
// a row with a single column.
if fastPath, ok := p.node.(planNodeFastPath); ok {
count, ok = fastPath.FastPathResults()
if !ok {
p.internalClose()
return nil, nil
}
if p.params.extendedEvalCtx.Tracing.Enabled() {
log.VEvent(p.params.ctx, 2, "fast path completed")
}
} else {
// If we have no fast path to trigger, fall back to counting the rows
// by Nexting our source until exhaustion.
next, err := p.node.Next(p.params)
for ; next; next, err = p.node.Next(p.params) {
// If we're tracking memory, clear the previous row's memory account.
if p.params.extendedEvalCtx.ActiveMemAcc != nil {
p.params.extendedEvalCtx.ActiveMemAcc.Clear(p.params.ctx)
}
count++
}
if err != nil {
return nil, &distsqlrun.ProducerMetadata{Err: err}
if p.running && p.fastPath {
var count int
// If our node is a "fast path node", it means that we're set up to just
// return a row count. So trigger the fast path and return the row count as
// a row with a single column.
if fastPath, ok := p.node.(planNodeFastPath); ok {
count, ok = fastPath.FastPathResults()
if !ok {
p.internalClose()
return nil, nil
}
if p.params.extendedEvalCtx.Tracing.Enabled() {
log.VEvent(p.params.ctx, 2, "fast path completed")
}
} else {
// If we have no fast path to trigger, fall back to counting the rows
// by Nexting our source until exhaustion.
next, err := p.node.Next(p.params)
for ; next; next, err = p.node.Next(p.params) {
// If we're tracking memory, clear the previous row's memory account.
if p.params.extendedEvalCtx.ActiveMemAcc != nil {
p.params.extendedEvalCtx.ActiveMemAcc.Clear(p.params.ctx)
}
count++
}
if err != nil {
return nil, &distsqlrun.ProducerMetadata{Err: err}
}
p.internalClose()
// Return the row count the only way we can: as a single-column row with
// the count inside.
return sqlbase.EncDatumRow{sqlbase.EncDatum{Datum: tree.NewDInt(tree.DInt(count))}}, nil
}
p.internalClose()
// Return the row count the only way we can: as a single-column row with
// the count inside.
return sqlbase.EncDatumRow{sqlbase.EncDatum{Datum: tree.NewDInt(tree.DInt(count))}}, nil
}

if len(p.metadataBuf) > 0 {
Expand All @@ -197,7 +199,7 @@ func (p *planNodeToRowSource) Next() (sqlbase.EncDatumRow, *distsqlrun.ProducerM
}
if !valid {
p.internalClose()
return nil, nil
return nil, p.drainHelper()
}

for i, datum := range p.node.Values() {
Expand Down Expand Up @@ -231,14 +233,26 @@ func (p *planNodeToRowSource) Next() (sqlbase.EncDatumRow, *distsqlrun.ProducerM
}
}
}
return nil, nil
return nil, p.drainHelper()
}

func (p *planNodeToRowSource) drainHelper() *distsqlrun.ProducerMetadata {
if len(p.metadataBuf) > 0 {
var m *distsqlrun.ProducerMetadata
m, p.metadataBuf = p.metadataBuf[0], p.metadataBuf[1:]
return m
}
return nil
}

func (p *planNodeToRowSource) ConsumerDone() {
p.internalClose()
if p.toDrain != nil {
p.toDrain.ConsumerDone()
}
}

func (p *planNodeToRowSource) ConsumerClosed() {
// The consumer is done, Next() will not be called again.
p.internalClose()
}

Expand Down
22 changes: 17 additions & 5 deletions pkg/sql/row_source_to_plan_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type rowSourceToPlanNode struct {
source distsqlrun.RowSource
forwarder metadataForwarder

originalPlanNode planNode

planCols sqlbase.ResultColumns

// Temporary variables
Expand All @@ -44,16 +46,23 @@ var _ planNode = &rowSourceToPlanNode{}
// makeRowSourceToPlanNode creates a new planNode that wraps a RowSource. It
// takes an optional metadataForwarder, which if non-nil is invoked for every
// piece of metadata this wrapper receives from the wrapped RowSource.
// It also takes an optional planNode, which is the planNode that the RowSource
// that this rowSourceToPlanNode is wrapping originally replaced. That planNode
// will be closed when this one is closed.
func makeRowSourceToPlanNode(
s distsqlrun.RowSource, forwarder metadataForwarder, planCols sqlbase.ResultColumns,
s distsqlrun.RowSource,
forwarder metadataForwarder,
planCols sqlbase.ResultColumns,
originalPlanNode planNode,
) *rowSourceToPlanNode {
row := make(tree.Datums, len(s.OutputTypes()))

return &rowSourceToPlanNode{
source: s,
datumRow: row,
forwarder: forwarder,
planCols: planCols,
source: s,
datumRow: row,
forwarder: forwarder,
planCols: planCols,
originalPlanNode: originalPlanNode,
}
}

Expand Down Expand Up @@ -102,4 +111,7 @@ func (r *rowSourceToPlanNode) Close(ctx context.Context) {
if r.source != nil {
r.source.ConsumerClosed()
}
if r.originalPlanNode != nil {
r.originalPlanNode.Close(ctx)
}
}

0 comments on commit ccc5a8a

Please sign in to comment.