Skip to content

Commit

Permalink
Merge #42809
Browse files Browse the repository at this point in the history
42809: sql: pool some allocations during flow setup r=yuzefovich a=yuzefovich

**flowinfra: slightly tweak the setup of processors in the flow**

Previously, the last processor ('headProc') would be removed from
f.processors before flow.startInternal to be run in the same goroutine
as the one that is doing the setup. However, this was problematic
because if that processor implements 'Releasable' interface, it will not
be returned to the pool on the flow clean up. Now this is fixed.

Addresses: #42770.

Release note: None

**sql: pool flow allocations**

sql: pool flow allocations

Previously, new structs for rowBasedFlow and vectorizedFlow would be
allocated upon creation. This commit creates pools for both of them.
flowinfra.Releasable interface is moved into execinfra package because
now components from rowflow, rowexec, and colflow packages implement
that.

In order to actually be able to release the flow structs, I needed to
create separate Cleanup methods (which still share most of the logic)
which allows for removal of vectorized memory monitoring logic from
the shared FlowCtx.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Nov 27, 2019
2 parents 8f21cb5 + 834e866 commit 91b92a6
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 38 deletions.
57 changes: 53 additions & 4 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,32 @@ type vectorizedFlow struct {
*flowinfra.FlowBase
// operatorConcurrency is set if any operators are executed in parallel.
operatorConcurrency bool

// streamingMemAccount is the memory account that is tracking the static
// memory usage of the whole vectorized flow as well as all dynamic memory of
// the streaming components.
streamingMemAccount *mon.BoundAccount

// bufferingMemMonitors are the memory monitors of the buffering components.
bufferingMemMonitors []*mon.BytesMonitor
// bufferingMemAccounts are the memory accounts that are tracking the dynamic
// memory usage of the buffering components.
bufferingMemAccounts []*mon.BoundAccount
}

var _ flowinfra.Flow = &vectorizedFlow{}

var vectorizedFlowPool = sync.Pool{
New: func() interface{} {
return &vectorizedFlow{}
},
}

// NewVectorizedFlow creates a new vectorized flow given the flow base.
func NewVectorizedFlow(base *flowinfra.FlowBase) flowinfra.Flow {
return &vectorizedFlow{FlowBase: base}
vf := vectorizedFlowPool.Get().(*vectorizedFlow)
vf.FlowBase = base
return vf
}

// Setup is part of the flowinfra.Flow interface.
Expand All @@ -55,7 +74,7 @@ func (f *vectorizedFlow) Setup(
f.SetSpec(spec)
log.VEventf(ctx, 1, "setting up vectorize flow %s", f.ID.Short())
streamingMemAccount := f.EvalCtx.Mon.MakeBoundAccount()
f.VectorizedStreamingMemAccount = &streamingMemAccount
f.streamingMemAccount = &streamingMemAccount
recordingStats := false
if sp := opentracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) {
recordingStats = true
Expand All @@ -73,11 +92,21 @@ func (f *vectorizedFlow) Setup(
_, err := creator.setupFlow(ctx, f.GetFlowCtx(), spec.Processors, &streamingMemAccount, opt)
if err == nil {
f.operatorConcurrency = creator.operatorConcurrency
f.VectorizedBufferingMemMonitors = append(f.VectorizedBufferingMemMonitors, creator.bufferingMemMonitors...)
f.VectorizedBufferingMemAccounts = append(f.VectorizedBufferingMemAccounts, creator.bufferingMemAccounts...)
f.bufferingMemMonitors = append(f.bufferingMemMonitors, creator.bufferingMemMonitors...)
f.bufferingMemAccounts = append(f.bufferingMemAccounts, creator.bufferingMemAccounts...)
log.VEventf(ctx, 1, "vectorized flow setup succeeded")
return nil
}
f.streamingMemAccount.Close(ctx)
// It is (theoretically) possible that some of the memory monitoring
// infrastructure was created even in case of an error, and we need to clean
// that up.
for _, memAcc := range creator.bufferingMemAccounts {
memAcc.Close(ctx)
}
for _, memMonitor := range creator.bufferingMemMonitors {
memMonitor.Stop(ctx)
}
log.VEventf(ctx, 1, "failed to vectorize: %s", err)
return err
}
Expand All @@ -87,6 +116,26 @@ func (f *vectorizedFlow) ConcurrentExecution() bool {
return f.operatorConcurrency || f.FlowBase.ConcurrentExecution()
}

// Release releases this vectorizedFlow back to the pool.
func (f *vectorizedFlow) Release() {
*f = vectorizedFlow{}
vectorizedFlowPool.Put(f)
}

// Cleanup is part of the Flow interface.
func (f *vectorizedFlow) Cleanup(ctx context.Context) {
// This cleans up all the memory monitoring of the vectorized flow.
f.streamingMemAccount.Close(ctx)
for _, memAcc := range f.bufferingMemAccounts {
memAcc.Close(ctx)
}
for _, memMonitor := range f.bufferingMemMonitors {
memMonitor.Stop(ctx)
}
f.FlowBase.Cleanup(ctx)
f.Release()
}

// wrapWithVectorizedStatsCollector creates a new exec.VectorizedStatsCollector
// that wraps op and connects the newly created wrapper with those
// corresponding to operators in inputs (the latter must have already been
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/execinfra/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ func Run(ctx context.Context, src RowSource, dst RowReceiver) {
}
}

// Releasable is an interface for objects than can be Released back into a
// memory pool when finished.
type Releasable interface {
// Release allows this object to be returned to a memory pool. Objects must
// not be used after Release is called.
Release()
}

// DrainAndForwardMetadata calls src.ConsumerDone() (thus asking src for
// draining metadata) and then forwards all the metadata to dst.
//
Expand Down
13 changes: 0 additions & 13 deletions pkg/sql/execinfra/flow_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/stop"
)

Expand Down Expand Up @@ -60,18 +59,6 @@ type FlowCtx struct {

// Local is true if this flow is being run as part of a local-only query.
Local bool

// VectorizedStreamingMemAccount is the memory account that is tracking the
// static memory usage of the whole vectorized flow as well as all dynamic
// memory of the streaming vectorized components.
VectorizedStreamingMemAccount *mon.BoundAccount

// VectorizedBufferingMemMonitors are the memory monitors of the buffering
// vectorized components.
VectorizedBufferingMemMonitors []*mon.BytesMonitor
// VectorizedBufferingMemAccounts are the memory accounts that are tracking
// the dynamic memory usage of the buffering vectorized components.
VectorizedBufferingMemAccounts []*mon.BoundAccount
}

// NewEvalCtx returns a modifiable copy of the FlowCtx's EvalContext.
Expand Down
32 changes: 12 additions & 20 deletions pkg/sql/flowinfra/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,12 @@ func (f *FlowBase) GetLocalProcessors() []execinfra.LocalProcessor {
// set. A new context is derived and returned, and it must be used when this
// method returns so that all components running in their own goroutines could
// listen for a cancellation on the same context.
func (f *FlowBase) startInternal(ctx context.Context, doneFn func()) (context.Context, error) {
func (f *FlowBase) startInternal(
ctx context.Context, processors []execinfra.Processor, doneFn func(),
) (context.Context, error) {
f.doneFn = doneFn
log.VEventf(
ctx, 1, "starting (%d processors, %d startables)", len(f.processors), len(f.startables),
ctx, 1, "starting (%d processors, %d startables)", len(processors), len(f.startables),
)

ctx, f.ctxCancel = contextutil.WithCancel(ctx)
Expand Down Expand Up @@ -315,14 +317,14 @@ func (f *FlowBase) startInternal(ctx context.Context, doneFn func()) (context.Co
for _, s := range f.startables {
s.Start(ctx, &f.waitGroup, f.ctxCancel)
}
for i := 0; i < len(f.processors); i++ {
for i := 0; i < len(processors); i++ {
f.waitGroup.Add(1)
go func(i int) {
f.processors[i].Run(ctx)
processors[i].Run(ctx)
f.waitGroup.Done()
}(i)
}
f.startedGoroutines = len(f.startables) > 0 || len(f.processors) > 0 || !f.IsLocal()
f.startedGoroutines = len(f.startables) > 0 || len(processors) > 0 || !f.IsLocal()
return ctx, nil
}

Expand All @@ -338,7 +340,7 @@ func (f *FlowBase) IsVectorized() bool {

// Start is part of the Flow interface.
func (f *FlowBase) Start(ctx context.Context, doneFn func()) error {
if _, err := f.startInternal(ctx, doneFn); err != nil {
if _, err := f.startInternal(ctx, f.processors, doneFn); err != nil {
// For sync flows, the error goes to the consumer.
if f.syncFlowConsumer != nil {
f.syncFlowConsumer.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err})
Expand All @@ -360,10 +362,10 @@ func (f *FlowBase) Run(ctx context.Context, doneFn func()) error {
return errors.AssertionFailedf("no processors in flow")
}
headProc = f.processors[len(f.processors)-1]
f.processors = f.processors[:len(f.processors)-1]
otherProcs := f.processors[:len(f.processors)-1]

var err error
if ctx, err = f.startInternal(ctx, doneFn); err != nil {
if ctx, err = f.startInternal(ctx, otherProcs, doneFn); err != nil {
// For sync flows, the error goes to the consumer.
if f.syncFlowConsumer != nil {
f.syncFlowConsumer.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err})
Expand Down Expand Up @@ -417,22 +419,13 @@ type Releasable interface {
}

// Cleanup is part of the Flow interface.
// NOTE: this implements only the shared clean up logic between row-based and
// vectorized flows.
func (f *FlowBase) Cleanup(ctx context.Context) {
if f.status == FlowFinished {
panic("flow cleanup called twice")
}

// This cleans up all the memory monitoring of the vectorized flow.
if f.VectorizedStreamingMemAccount != nil {
f.VectorizedStreamingMemAccount.Close(ctx)
}
for _, memAcc := range f.VectorizedBufferingMemAccounts {
memAcc.Close(ctx)
}
for _, memMon := range f.VectorizedBufferingMemMonitors {
memMon.Stop(ctx)
}

// This closes the monitor opened in ServerImpl.setupFlow.
f.EvalCtx.Stop(ctx)
for _, p := range f.processors {
Expand All @@ -451,7 +444,6 @@ func (f *FlowBase) Cleanup(ctx context.Context) {
f.status = FlowFinished
f.ctxCancel()
f.doneFn()
f.doneFn = nil
sp.Finish()
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/tablereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type tableReader struct {
var _ execinfra.Processor = &tableReader{}
var _ execinfra.RowSource = &tableReader{}
var _ execinfrapb.MetadataSource = &tableReader{}
var _ execinfra.Releasable = &tableReader{}

const tableReaderProcName = "table reader"

Expand Down
23 changes: 22 additions & 1 deletion pkg/sql/rowflow/row_based_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package rowflow
import (
"context"
"fmt"
"sync"

"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand All @@ -31,9 +32,17 @@ type rowBasedFlow struct {

var _ flowinfra.Flow = &rowBasedFlow{}

var rowBasedFlowPool = sync.Pool{
New: func() interface{} {
return &rowBasedFlow{}
},
}

// NewRowBasedFlow returns a row based flow using base as its FlowBase.
func NewRowBasedFlow(base *flowinfra.FlowBase) flowinfra.Flow {
return &rowBasedFlow{FlowBase: base}
rbf := rowBasedFlowPool.Get().(*rowBasedFlow)
rbf.FlowBase = base
return rbf
}

// Setup if part of the flowinfra.Flow interface.
Expand Down Expand Up @@ -385,6 +394,18 @@ func (f *rowBasedFlow) setupRouter(spec *execinfrapb.OutputRouterSpec) (router,
return makeRouter(spec, streams)
}

// Release releases this rowBasedFlow back to the pool.
func (f *rowBasedFlow) Release() {
*f = rowBasedFlow{}
rowBasedFlowPool.Put(f)
}

// Cleanup is part of the Flow interface.
func (f *rowBasedFlow) Cleanup(ctx context.Context) {
f.FlowBase.Cleanup(ctx)
f.Release()
}

type copyingRowReceiver struct {
execinfra.RowReceiver
alloc sqlbase.EncDatumRowAlloc
Expand Down

0 comments on commit 91b92a6

Please sign in to comment.