Skip to content

Commit

Permalink
sql: pool flow allocations
Browse files Browse the repository at this point in the history
Previously, new structs for rowBasedFlow and vectorizedFlow would be
allocated upon creation. This commit creates pools for both of them.
flowinfra.Releasable interface is moved into execinfra package because
now components from rowflow, rowexec, and colflow packages implement
that.

In order to actually be able to release the flow structs, I needed to
create separate Cleanup methods (which still share most of the logic)
which allows for removal of vectorized memory monitoring logic from
the shared FlowCtx.

Release note: None
  • Loading branch information
yuzefovich committed Nov 27, 2019
1 parent a94720d commit 834e866
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 30 deletions.
57 changes: 53 additions & 4 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,32 @@ type vectorizedFlow struct {
*flowinfra.FlowBase
// operatorConcurrency is set if any operators are executed in parallel.
operatorConcurrency bool

// streamingMemAccount is the memory account that is tracking the static
// memory usage of the whole vectorized flow as well as all dynamic memory of
// the streaming components.
streamingMemAccount *mon.BoundAccount

// bufferingMemMonitors are the memory monitors of the buffering components.
bufferingMemMonitors []*mon.BytesMonitor
// bufferingMemAccounts are the memory accounts that are tracking the dynamic
// memory usage of the buffering components.
bufferingMemAccounts []*mon.BoundAccount
}

var _ flowinfra.Flow = &vectorizedFlow{}

var vectorizedFlowPool = sync.Pool{
New: func() interface{} {
return &vectorizedFlow{}
},
}

// NewVectorizedFlow creates a new vectorized flow given the flow base.
func NewVectorizedFlow(base *flowinfra.FlowBase) flowinfra.Flow {
return &vectorizedFlow{FlowBase: base}
vf := vectorizedFlowPool.Get().(*vectorizedFlow)
vf.FlowBase = base
return vf
}

// Setup is part of the flowinfra.Flow interface.
Expand All @@ -55,7 +74,7 @@ func (f *vectorizedFlow) Setup(
f.SetSpec(spec)
log.VEventf(ctx, 1, "setting up vectorize flow %s", f.ID.Short())
streamingMemAccount := f.EvalCtx.Mon.MakeBoundAccount()
f.VectorizedStreamingMemAccount = &streamingMemAccount
f.streamingMemAccount = &streamingMemAccount
recordingStats := false
if sp := opentracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) {
recordingStats = true
Expand All @@ -73,11 +92,21 @@ func (f *vectorizedFlow) Setup(
_, err := creator.setupFlow(ctx, f.GetFlowCtx(), spec.Processors, &streamingMemAccount, opt)
if err == nil {
f.operatorConcurrency = creator.operatorConcurrency
f.VectorizedBufferingMemMonitors = append(f.VectorizedBufferingMemMonitors, creator.bufferingMemMonitors...)
f.VectorizedBufferingMemAccounts = append(f.VectorizedBufferingMemAccounts, creator.bufferingMemAccounts...)
f.bufferingMemMonitors = append(f.bufferingMemMonitors, creator.bufferingMemMonitors...)
f.bufferingMemAccounts = append(f.bufferingMemAccounts, creator.bufferingMemAccounts...)
log.VEventf(ctx, 1, "vectorized flow setup succeeded")
return nil
}
f.streamingMemAccount.Close(ctx)
// It is (theoretically) possible that some of the memory monitoring
// infrastructure was created even in case of an error, and we need to clean
// that up.
for _, memAcc := range creator.bufferingMemAccounts {
memAcc.Close(ctx)
}
for _, memMonitor := range creator.bufferingMemMonitors {
memMonitor.Stop(ctx)
}
log.VEventf(ctx, 1, "failed to vectorize: %s", err)
return err
}
Expand All @@ -87,6 +116,26 @@ func (f *vectorizedFlow) ConcurrentExecution() bool {
return f.operatorConcurrency || f.FlowBase.ConcurrentExecution()
}

// Release releases this vectorizedFlow back to the pool.
func (f *vectorizedFlow) Release() {
*f = vectorizedFlow{}
vectorizedFlowPool.Put(f)
}

// Cleanup is part of the Flow interface.
func (f *vectorizedFlow) Cleanup(ctx context.Context) {
// This cleans up all the memory monitoring of the vectorized flow.
f.streamingMemAccount.Close(ctx)
for _, memAcc := range f.bufferingMemAccounts {
memAcc.Close(ctx)
}
for _, memMonitor := range f.bufferingMemMonitors {
memMonitor.Stop(ctx)
}
f.FlowBase.Cleanup(ctx)
f.Release()
}

// wrapWithVectorizedStatsCollector creates a new exec.VectorizedStatsCollector
// that wraps op and connects the newly created wrapper with those
// corresponding to operators in inputs (the latter must have already been
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/execinfra/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ func Run(ctx context.Context, src RowSource, dst RowReceiver) {
}
}

// Releasable is an interface for objects than can be Released back into a
// memory pool when finished.
type Releasable interface {
// Release allows this object to be returned to a memory pool. Objects must
// not be used after Release is called.
Release()
}

// DrainAndForwardMetadata calls src.ConsumerDone() (thus asking src for
// draining metadata) and then forwards all the metadata to dst.
//
Expand Down
13 changes: 0 additions & 13 deletions pkg/sql/execinfra/flow_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/stop"
)

Expand Down Expand Up @@ -60,18 +59,6 @@ type FlowCtx struct {

// Local is true if this flow is being run as part of a local-only query.
Local bool

// VectorizedStreamingMemAccount is the memory account that is tracking the
// static memory usage of the whole vectorized flow as well as all dynamic
// memory of the streaming vectorized components.
VectorizedStreamingMemAccount *mon.BoundAccount

// VectorizedBufferingMemMonitors are the memory monitors of the buffering
// vectorized components.
VectorizedBufferingMemMonitors []*mon.BytesMonitor
// VectorizedBufferingMemAccounts are the memory accounts that are tracking
// the dynamic memory usage of the buffering vectorized components.
VectorizedBufferingMemAccounts []*mon.BoundAccount
}

// NewEvalCtx returns a modifiable copy of the FlowCtx's EvalContext.
Expand Down
14 changes: 2 additions & 12 deletions pkg/sql/flowinfra/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,22 +419,13 @@ type Releasable interface {
}

// Cleanup is part of the Flow interface.
// NOTE: this implements only the shared clean up logic between row-based and
// vectorized flows.
func (f *FlowBase) Cleanup(ctx context.Context) {
if f.status == FlowFinished {
panic("flow cleanup called twice")
}

// This cleans up all the memory monitoring of the vectorized flow.
if f.VectorizedStreamingMemAccount != nil {
f.VectorizedStreamingMemAccount.Close(ctx)
}
for _, memAcc := range f.VectorizedBufferingMemAccounts {
memAcc.Close(ctx)
}
for _, memMon := range f.VectorizedBufferingMemMonitors {
memMon.Stop(ctx)
}

// This closes the monitor opened in ServerImpl.setupFlow.
f.EvalCtx.Stop(ctx)
for _, p := range f.processors {
Expand All @@ -453,7 +444,6 @@ func (f *FlowBase) Cleanup(ctx context.Context) {
f.status = FlowFinished
f.ctxCancel()
f.doneFn()
f.doneFn = nil
sp.Finish()
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/tablereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type tableReader struct {
var _ execinfra.Processor = &tableReader{}
var _ execinfra.RowSource = &tableReader{}
var _ execinfrapb.MetadataSource = &tableReader{}
var _ execinfra.Releasable = &tableReader{}

const tableReaderProcName = "table reader"

Expand Down
23 changes: 22 additions & 1 deletion pkg/sql/rowflow/row_based_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package rowflow
import (
"context"
"fmt"
"sync"

"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand All @@ -31,9 +32,17 @@ type rowBasedFlow struct {

var _ flowinfra.Flow = &rowBasedFlow{}

var rowBasedFlowPool = sync.Pool{
New: func() interface{} {
return &rowBasedFlow{}
},
}

// NewRowBasedFlow returns a row based flow using base as its FlowBase.
func NewRowBasedFlow(base *flowinfra.FlowBase) flowinfra.Flow {
return &rowBasedFlow{FlowBase: base}
rbf := rowBasedFlowPool.Get().(*rowBasedFlow)
rbf.FlowBase = base
return rbf
}

// Setup if part of the flowinfra.Flow interface.
Expand Down Expand Up @@ -385,6 +394,18 @@ func (f *rowBasedFlow) setupRouter(spec *execinfrapb.OutputRouterSpec) (router,
return makeRouter(spec, streams)
}

// Release releases this rowBasedFlow back to the pool.
func (f *rowBasedFlow) Release() {
*f = rowBasedFlow{}
rowBasedFlowPool.Put(f)
}

// Cleanup is part of the Flow interface.
func (f *rowBasedFlow) Cleanup(ctx context.Context) {
f.FlowBase.Cleanup(ctx)
f.Release()
}

type copyingRowReceiver struct {
execinfra.RowReceiver
alloc sqlbase.EncDatumRowAlloc
Expand Down

0 comments on commit 834e866

Please sign in to comment.