Skip to content

Commit

Permalink
Merge #39100
Browse files Browse the repository at this point in the history
39100: exec: cancel the flow context when materializer is done r=yuzefovich a=yuzefovich

Previously, flow shutting down has been a little fragile since
some goroutines would remain blocked when the materializer finishes
its execution. Now we will cancel the shared flow context, and if
all infrastructure listens to it, the infra will be shut down
appropriately.

Fixes: #39029.
Fixes: #39092.
Fixes: #38919.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Aug 2, 2019
2 parents ca9184d + 25f4530 commit c9a4021
Show file tree
Hide file tree
Showing 15 changed files with 651 additions and 97 deletions.
33 changes: 32 additions & 1 deletion pkg/sql/distsqlrun/column_exec_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"math"
"reflect"
"sync"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
Expand Down Expand Up @@ -79,6 +80,7 @@ func wrapRowSource(
nil, /* output */
nil, /* metadataSourcesQueue */
nil, /* outputStatsToTrace */
nil, /* cancelFlow */
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -971,6 +973,8 @@ type flowCreatorHelper interface {
accumulateAsyncComponent(runFn)
// addMaterializer adds a materializer to the flow.
addMaterializer(*materializer)
// getCancelFlowFn returns a flow cancellation function.
getCancelFlowFn() context.CancelFunc
}

// vectorizedFlowCreator performs all the setup of vectorized flows. Depending
Expand All @@ -988,6 +992,11 @@ type vectorizedFlowCreator struct {
syncFlowConsumer RowReceiver
nodeDialer *nodedialer.Dialer
flowID distsqlpb.FlowID

// numOutboxes counts how many exec.Outboxes have been set up on this node.
// It must be accessed atomically.
numOutboxes int32
materializerAdded bool
}

func newVectorizedFlowCreator(
Expand Down Expand Up @@ -1021,8 +1030,21 @@ func (s *vectorizedFlowCreator) setupRemoteOutputStream(
if err != nil {
return err
}
atomic.AddInt32(&s.numOutboxes, 1)
run := func(ctx context.Context, cancelFn context.CancelFunc) {
outbox.Run(ctx, s.nodeDialer, stream.TargetNodeID, s.flowID, stream.StreamID, cancelFn)
atomic.AddInt32(&s.numOutboxes, -1)
// When the last Outbox on this node exits, we want to make sure that
// everything is shutdown; namely, we need to call cancelFn if:
// - it is the last Outbox
// - there is no root materializer on this node (if it were, it would take
// care of the cancellation itself)
// - cancelFn is non-nil (it can be nil in tests).
// Calling cancelFn will cancel the context that all infrastructure on this
// node is listening on, so it will shut everything down.
if atomic.LoadInt32(&s.numOutboxes) == 0 && !s.materializerAdded && cancelFn != nil {
cancelFn()
}
}
s.accumulateAsyncComponent(run)
return nil
Expand Down Expand Up @@ -1274,13 +1296,15 @@ func (s *vectorizedFlowCreator) setupOutput(
// further appends without overwriting.
append([]distsqlpb.MetadataSource(nil), metadataSourcesQueue...),
outputStatsToTrace,
s.getCancelFlowFn(),
)
if err != nil {
return nil, err
}
metadataSourcesQueue = metadataSourcesQueue[:0]
s.vectorizedStatsCollectorsQueue = s.vectorizedStatsCollectorsQueue[:0]
s.addMaterializer(proc)
s.materializerAdded = true
default:
return nil, errors.Errorf("unsupported output stream type %s", outputStream.Type)
}
Expand Down Expand Up @@ -1453,6 +1477,10 @@ func (r *vectorizedFlowCreatorHelper) addMaterializer(m *materializer) {
r.f.processors[0] = m
}

func (r *vectorizedFlowCreatorHelper) getCancelFlowFn() context.CancelFunc {
return r.f.ctxCancel
}

func (f *Flow) setupVectorizedFlow(ctx context.Context, acc *mon.BoundAccount) error {
recordingStats := false
if sp := opentracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) {
Expand Down Expand Up @@ -1494,7 +1522,10 @@ func (r *noopFlowCreatorHelper) checkInboundStreamID(sid distsqlpb.StreamID) err

func (r *noopFlowCreatorHelper) accumulateAsyncComponent(runFn) {}

func (r *noopFlowCreatorHelper) addMaterializer(m *materializer) {
func (r *noopFlowCreatorHelper) addMaterializer(*materializer) {}

func (r *noopFlowCreatorHelper) getCancelFlowFn() context.CancelFunc {
return nil
}

// SupportsVectorized checks whether flow is supported by the vectorized engine
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/distsqlrun/columnar_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func verifyColOperator(
defer diskMonitor.Stop(ctx)
flowCtx := &FlowCtx{
EvalCtx: &evalCtx,
Settings: cluster.MakeTestingClusterSettings(),
Settings: st,
TempStorage: tempEngine,
diskMonitor: diskMonitor,
}
Expand Down Expand Up @@ -99,6 +99,7 @@ func verifyColOperator(
nil, /* output */
nil, /* metadataSourcesQueue */
nil, /* outputStatsToTrace */
nil, /* cancelFlow */
)
if err != nil {
return err
Expand Down
7 changes: 3 additions & 4 deletions pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
)

// FlowCtx encompasses the contexts needed for various flow components.
Expand Down Expand Up @@ -497,6 +497,8 @@ func (f *Flow) setupProcessors(ctx context.Context, inputSyncs [][]RowSource) er

func (f *Flow) setup(ctx context.Context, spec *distsqlpb.FlowSpec) error {
f.spec = spec
ctx, f.ctxCancel = contextutil.WithCancel(ctx)
f.ctxDone = ctx.Done()

if f.EvalCtx.SessionData.Vectorize != sessiondata.VectorizeOff {
log.VEventf(ctx, 1, "setting up vectorize flow %d with setting %s", f.id, f.EvalCtx.SessionData.Vectorize)
Expand Down Expand Up @@ -530,9 +532,6 @@ func (f *Flow) startInternal(ctx context.Context, doneFn func()) error {
ctx, 1, "starting (%d processors, %d startables)", len(f.processors), len(f.startables),
)

ctx, f.ctxCancel = contextutil.WithCancel(ctx)
f.ctxDone = ctx.Done()

// Only register the flow if there will be inbound stream connections that
// need to look up this flow in the flow registry.
if !f.isLocal() {
Expand Down
59 changes: 54 additions & 5 deletions pkg/sql/distsqlrun/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,41 @@ type materializer struct {
// adapter.
outputRow sqlbase.EncDatumRow
outputMetadata *distsqlpb.ProducerMetadata

// ctxCancel will cancel the context that is passed to the input (which will
// pass it down further). This allows for the cancellation of the tree rooted
// at this materializer when it is closed.
ctxCancel context.CancelFunc
// cancelFlow will cancel the context of the flow (i.e. if non-nil, it is
// Flow.ctxCancel).
cancelFlow context.CancelFunc
}

const materializerProcName = "materializer"

// newMaterializer creates a new materializer processor which processes the
// columnar data coming from input to return it as rows.
// Arguments:
// - typs is the output types scheme.
// - metadataSourcesQueue are all of the metadata sources that are planned on
// the same node as the materializer and that need to be drained.
// - outputStatsToTrace (when tracing is enabled) finishes the stats.
// - cancelFlow is the context cancellation function that cancels the context
// of the flow (i.e. it is Flow.ctxCancel). It should only be non-nil in case
// of a root materializer (i.e. not when we're wrapping a row source).
func newMaterializer(
flowCtx *FlowCtx,
processorID int32,
input exec.Operator,
typs []types.T,
// TODO(yuzefovich): I feel like we should remove outputToInputColIdx
// argument since it's always {0, 1, ..., len(typs)-1}.
outputToInputColIdx []int,
post *distsqlpb.PostProcessSpec,
output RowReceiver,
metadataSourcesQueue []distsqlpb.MetadataSource,
outputStatsToTrace func(),
cancelFlow context.CancelFunc,
) (*materializer, error) {
m := &materializer{
input: input,
Expand All @@ -84,20 +107,31 @@ func newMaterializer(
for _, src := range metadataSourcesQueue {
trailingMeta = append(trailingMeta, src.DrainMeta(ctx)...)
}
m.InternalClose()
return trailingMeta
},
},
); err != nil {
return nil, err
}
m.finishTrace = outputStatsToTrace
m.cancelFlow = cancelFlow
return m, nil
}

func (m *materializer) Start(ctx context.Context) context.Context {
m.input.Init()
m.Ctx = ctx
return ctx
ctx = m.ProcessorBase.StartInternal(ctx, materializerProcName)
// In general case, ctx that is passed is related to the "flow context" that
// will be canceled by m.cancelFlow. However, in some cases (like when there
// is a subquery), it appears as if the subquery flow context is not related
// to the flow context of the main query, so calling m.cancelFlow will not
// shutdown the subquery tree. To work around this, we always use another
// context and get another cancellation function, and we will trigger both
// upon exit from the materializer.
// TODO(yuzefovich): figure out what is the problem here.
m.Ctx, m.ctxCancel = context.WithCancel(ctx)
return m.Ctx
}

// nextAdapter calls next() and saves the returned results in m. For internal
Expand Down Expand Up @@ -157,9 +191,24 @@ func (m *materializer) Next() (sqlbase.EncDatumRow, *distsqlpb.ProducerMetadata)
return m.outputRow, m.outputMetadata
}

func (m *materializer) InternalClose() bool {
if m.ProcessorBase.InternalClose() {
m.ctxCancel()
if m.cancelFlow != nil {
m.cancelFlow()
}
return true
}
return false
}

func (m *materializer) ConsumerDone() {
// Materializer will move into 'draining' state, and after all the metadata
// has been drained - as part of TrailingMetaCallback - InternalClose() will
// be called which will cancel the flow.
m.MoveToDraining(nil /* err */)
}

func (m *materializer) ConsumerClosed() {
// TODO(yuzefovich): this seems like a hack to me, but in order to close an
// Inbox, we need to drain it.
m.trailingMetaCallback(m.Ctx)
m.InternalClose()
}
3 changes: 3 additions & 0 deletions pkg/sql/distsqlrun/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestColumnarizeMaterialize(t *testing.T) {
nil, /* output */
nil, /* metadataSourcesQueue */
nil, /* outputStatsToTrace */
nil, /* cancelFlow */
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -143,6 +144,7 @@ func TestMaterializeTypes(t *testing.T) {
nil, /* output */
nil, /* metadataSourcesQueue */
nil, /* outputStatsToTrace */
nil, /* cancelFlow */
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -197,6 +199,7 @@ func BenchmarkColumnarizeMaterialize(b *testing.B) {
nil, /* output */
nil, /* metadataSourcesQueue */
nil, /* outputStatsToTrace */
nil, /* cancelFlow */
)
if err != nil {
b.Fatal(err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/distsqlrun/vectorized_error_propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestVectorizedErrorPropagation(t *testing.T) {
nil, /* output */
nil, /* metadataSourceQueue */
nil, /* outputStatsToTrace */
nil, /* cancelFlow */
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -120,6 +121,7 @@ func TestNonVectorizedErrorPropagation(t *testing.T) {
nil, /* output */
nil, /* metadataSourceQueue */
nil, /* outputStatsToTrace */
nil, /* cancelFlow */
)
if err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit c9a4021

Please sign in to comment.