Skip to content

Commit

Permalink
Merge #61380 #61412
Browse files Browse the repository at this point in the history
61380: colflow: clean up vectorized stats propagation r=yuzefovich a=yuzefovich

Previously, in order to propagate execution statistics we were creating
temporary tracing spans, setting the stats on them, and finishing the
spans right away. This allowed for using (to be more precise, abusing)
the existing infrastructure. The root of the problem is that in the
vectorized engine we do not start per-operator span if stats collection
is enabled at the moment, so we had to get around that limitation.

However, this way is not how tracing spans are intended to be used and
creates some performance slowdown in the hot path, so this commit
refactors the situation. Now we are ensuring that there is always
a tracing span available at the "root" components (either root
materializer or an outbox), so when root components are finishing the
vectorized stats collectors for their subtree of operators, there is
a span to record the stats into.

This required the following minor adjustments:
- in the materializer, we now delegate attachment of the stats to the
tracing span to the drainHelper (which does so on `ConsumerDone`). Note
that the drainHelper doesn't get the recording from the span and leaves
that to the materializer (this is needed in order to avoid collecting
duplicate trace data).
- in the outbox, we now start a "remote child span" (if there is a span
in the parent context) in the beginning of `Run` method, and we attach
that stats in `sendMetadata`.

Addresses: #59379.
Fixes: #59555.

Release justification: low-risk update to existing functionality.

Release note: None

61412: sql: clean up planNodeToRowSource r=yuzefovich a=yuzefovich

This commit removes some redundant things that were kept during
ccc5a8a. Namely, `planNodeToRowSource`
doesn't need to track whether it was started or not now that `startExec`
is called in `Start`. This also allows us to remove the override of
`InternalClose` method.

Release justification: low-risk update to existing functionality.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Mar 4, 2021
3 parents 7ddc401 + 0cf062a + 461a1d5 commit d7748a9
Show file tree
Hide file tree
Showing 25 changed files with 159 additions and 159 deletions.
1 change: 1 addition & 0 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/stringarena",
"//pkg/util/tracing",
"@com_github_cockroachdb_apd_v2//:apd", # keep
"@com_github_cockroachdb_errors//:errors",
"@com_github_marusama_semaphore//:semaphore",
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func wrapRowSources(
nil, /* output */
nil, /* metadataSourcesQueue */
nil, /* toClose */
nil, /* execStatsForTrace */
nil, /* getStats */
nil, /* cancelFlow */
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colbuilder/execplan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestNewColOperatorExpectedTypeSchema(t *testing.T) {
nil, /* output */
nil, /* metadataSourcesQueue */
nil, /* toClose */
nil, /* execStatsForTrace */
nil, /* getStats */
nil, /* cancelFlow */
)
require.NoError(t, err)
Expand Down
31 changes: 25 additions & 6 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -74,6 +75,7 @@ type drainHelper struct {
sources execinfrapb.MetadataSources
ctx context.Context
bufferedMeta []execinfrapb.ProducerMetadata
getStats func() []*execinfrapb.ComponentStats
}

var _ execinfra.RowSource = &drainHelper{}
Expand All @@ -85,9 +87,12 @@ var drainHelperPool = sync.Pool{
},
}

func newDrainHelper(sources execinfrapb.MetadataSources) *drainHelper {
func newDrainHelper(
sources execinfrapb.MetadataSources, getStats func() []*execinfrapb.ComponentStats,
) *drainHelper {
d := drainHelperPool.Get().(*drainHelper)
d.sources = sources
d.getStats = getStats
return d
}

Expand Down Expand Up @@ -121,7 +126,21 @@ func (d *drainHelper) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata)
}

// ConsumerDone implements the RowSource interface.
func (d *drainHelper) ConsumerDone() {}
func (d *drainHelper) ConsumerDone() {
if d.getStats != nil {
// If getStats is non-nil, then the drainHelper is responsible for
// attaching the execution statistics to the span, yet we don't get the
// recording from the span - that is left to the materializer (more
// precisely to the embedded ProcessorBase) which is necessary in order
// to not collect same trace data twice.
if sp := tracing.SpanFromContext(d.ctx); sp != nil {
for _, s := range d.getStats() {
sp.RecordStructured(s)
}
}
d.getStats = nil
}
}

// ConsumerClosed implements the RowSource interface.
func (d *drainHelper) ConsumerClosed() {}
Expand Down Expand Up @@ -153,7 +172,8 @@ var materializerEmptyPostProcessSpec = &execinfrapb.PostProcessSpec{}
// - typs is the output types scheme.
// - metadataSourcesQueue are all of the metadata sources that are planned on
// the same node as the Materializer and that need to be drained.
// - outputStatsToTrace (when tracing is enabled) finishes the stats.
// - getStats (when tracing is enabled) returns all of the execution statistics
// of operators which the materializer is responsible for.
// - cancelFlow should return the context cancellation function that cancels
// the context of the flow (i.e. it is Flow.ctxCancel). It should only be
// non-nil in case of a root Materializer (i.e. not when we're wrapping a row
Expand All @@ -168,15 +188,15 @@ func NewMaterializer(
output execinfra.RowReceiver,
metadataSourcesQueue []execinfrapb.MetadataSource,
toClose []colexecop.Closer,
execStatsForTrace func() *execinfrapb.ComponentStats,
getStats func() []*execinfrapb.ComponentStats,
cancelFlow func() context.CancelFunc,
) (*Materializer, error) {
m := materializerPool.Get().(*Materializer)
*m = Materializer{
ProcessorBase: m.ProcessorBase,
input: input,
typs: typs,
drainHelper: newDrainHelper(metadataSourcesQueue),
drainHelper: newDrainHelper(metadataSourcesQueue, getStats),
converter: colconv.NewAllVecToDatumConverter(len(typs)),
row: make(rowenc.EncDatumRow, len(typs)),
closers: toClose,
Expand Down Expand Up @@ -209,7 +229,6 @@ func NewMaterializer(
return nil, err
}
m.AddInputToDrain(m.drainHelper)
m.ExecStatsForTrace = execStatsForTrace
m.cancelFlow = cancelFlow
return m, nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/colexec/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestColumnarizeMaterialize(t *testing.T) {
nil, /* output */
nil, /* metadataSourcesQueue */
nil, /* toClose */
nil, /* execStatsForTrace */
nil, /* getStats */
nil, /* cancelFlow */
)
if err != nil {
Expand Down Expand Up @@ -154,7 +154,7 @@ func BenchmarkMaterializer(b *testing.B) {
nil, /* output */
nil, /* metadataSourcesQueue */
nil, /* toClose */
nil, /* execStatsForTrace */
nil, /* getStats */
nil, /* cancelFlow */
)
if err != nil {
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestMaterializerNextErrorAfterConsumerDone(t *testing.T) {
nil, /* output */
[]execinfrapb.MetadataSource{metadataSource},
nil, /* toClose */
nil, /* execStatsForTrace */
nil, /* getStats */
nil, /* cancelFlow */
)
require.NoError(t, err)
Expand Down Expand Up @@ -258,7 +258,7 @@ func BenchmarkColumnarizeMaterialize(b *testing.B) {
nil, /* output */
nil, /* metadataSourcesQueue */
nil, /* toClose */
nil, /* execStatsForTrace */
nil, /* getStats */
nil, /* cancelFlow */
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/types_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestSQLTypesIntegration(t *testing.T) {
output,
nil, /* metadataSourcesQueue */
nil, /* toClose */
nil, /* execStatsForTrace */
nil, /* getStats */
nil, /* cancelFlow */
)
require.NoError(t, err)
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/colflow/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ go_library(
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_marusama_semaphore//:semaphore",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colflow/colrpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/log/logcrash",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_apache_arrow_go_arrow//array",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
Expand Down
32 changes: 20 additions & 12 deletions pkg/sql/colflow/colrpc/colrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,10 @@ func TestOutboxInbox(t *testing.T) {

outboxMemAcc := testMemMonitor.MakeBoundAccount()
defer outboxMemAcc.Close(ctx)
outbox, err :=
NewOutbox(colmem.NewAllocator(ctx, &outboxMemAcc, coldata.StandardColumnFactory), input, typs, nil /* metadataSource */, nil /* toClose */)
outbox, err := NewOutbox(
colmem.NewAllocator(ctx, &outboxMemAcc, coldata.StandardColumnFactory),
input, typs, nil /* metadataSource */, nil /* toClose */, nil, /* getStats */
)
require.NoError(t, err)

inboxMemAcc := testMemMonitor.MakeBoundAccount()
Expand Down Expand Up @@ -499,13 +501,15 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) {
if tc.overrideExpectedMetadata != nil {
expectedMetadata = tc.overrideExpectedMetadata
}
outbox, err := NewOutbox(colmem.NewAllocator(ctx, &outboxMemAcc, coldata.StandardColumnFactory), input, typs, []execinfrapb.MetadataSource{
execinfrapb.CallbackMetadataSource{
DrainMetaCb: func(context.Context) []execinfrapb.ProducerMetadata {
return expectedMetadata
outbox, err := NewOutbox(
colmem.NewAllocator(ctx, &outboxMemAcc, coldata.StandardColumnFactory),
input, typs, []execinfrapb.MetadataSource{
execinfrapb.CallbackMetadataSource{
DrainMetaCb: func(context.Context) []execinfrapb.ProducerMetadata {
return expectedMetadata
},
},
},
}, nil /* toClose */)
}, nil /* toClose */, nil /* getStats */)
require.NoError(t, err)

inboxMemAcc := testMemMonitor.MakeBoundAccount()
Expand Down Expand Up @@ -577,8 +581,10 @@ func BenchmarkOutboxInbox(b *testing.B) {

outboxMemAcc := testMemMonitor.MakeBoundAccount()
defer outboxMemAcc.Close(ctx)
outbox, err :=
NewOutbox(colmem.NewAllocator(ctx, &outboxMemAcc, coldata.StandardColumnFactory), input, typs, nil /* metadataSources */, nil /* toClose */)
outbox, err := NewOutbox(
colmem.NewAllocator(ctx, &outboxMemAcc, coldata.StandardColumnFactory),
input, typs, nil /* metadataSources */, nil /* toClose */, nil, /* getStats */
)
require.NoError(b, err)

inboxMemAcc := testMemMonitor.MakeBoundAccount()
Expand Down Expand Up @@ -639,8 +645,10 @@ func TestOutboxStreamIDPropagation(t *testing.T) {

outboxMemAcc := testMemMonitor.MakeBoundAccount()
defer outboxMemAcc.Close(ctx)
outbox, err :=
NewOutbox(colmem.NewAllocator(ctx, &outboxMemAcc, coldata.StandardColumnFactory), input, typs, nil /* metadataSources */, nil /* toClose */)
outbox, err := NewOutbox(
colmem.NewAllocator(ctx, &outboxMemAcc, coldata.StandardColumnFactory),
input, typs, nil /* metadataSources */, nil /* toClose */, nil, /* getStats */
)
require.NoError(t, err)

outboxDone := make(chan struct{})
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)
Expand Down Expand Up @@ -60,18 +61,28 @@ type Outbox struct {
msg *execinfrapb.ProducerMessage
}

span *tracing.Span
// getStats, when non-nil, returns all of the execution statistics of the
// operators that are in the same tree as this Outbox. The stats will be
// added into the span as Structured payload and returned to the gateway as
// execinfrapb.ProducerMetadata.
getStats func() []*execinfrapb.ComponentStats

// A copy of Run's caller ctx, with no StreamID tag.
// Used to pass a clean context to the input.Next.
runnerCtx context.Context
}

// NewOutbox creates a new Outbox.
// - getStats, when non-nil, returns all of the execution statistics of the
// operators that are in the same tree as this Outbox.
func NewOutbox(
allocator *colmem.Allocator,
input colexecop.Operator,
typs []*types.T,
metadataSources []execinfrapb.MetadataSource,
toClose []colexecop.Closer,
getStats func() []*execinfrapb.ComponentStats,
) (*Outbox, error) {
c, err := colserde.NewArrowBatchConverter(typs)
if err != nil {
Expand All @@ -90,6 +101,7 @@ func NewOutbox(
serializer: s,
metadataSources: metadataSources,
closers: toClose,
getStats: getStats,
}
o.scratch.buf = &bytes.Buffer{}
o.scratch.msg = &execinfrapb.ProducerMessage{}
Expand Down Expand Up @@ -127,6 +139,11 @@ func (o *Outbox) Run(
cancelFn context.CancelFunc,
connectionTimeout time.Duration,
) {
ctx, o.span = execinfra.ProcessorSpan(ctx, "outbox")
if o.span != nil {
defer o.span.Finish()
}

o.runnerCtx = ctx
ctx = logtags.AddTag(ctx, "streamID", streamID)
log.VEventf(ctx, 2, "Outbox Dialing %s", nodeID)
Expand Down Expand Up @@ -276,6 +293,11 @@ func (o *Outbox) sendMetadata(ctx context.Context, stream flowStreamClient, errT
msg.Data.Metadata, execinfrapb.LocalMetaToRemoteProducerMeta(ctx, execinfrapb.ProducerMetadata{Err: errToSend}),
)
}
if o.span != nil && o.getStats != nil {
for _, s := range o.getStats() {
o.span.RecordStructured(s)
}
}
if trace := execinfra.GetTraceData(ctx); trace != nil {
msg.Data.Metadata = append(msg.Data.Metadata, execinfrapb.RemoteProducerMetadata{
Value: &execinfrapb.RemoteProducerMetadata_TraceData_{
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colflow/colrpc/outbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestOutboxCatchesPanics(t *testing.T) {
typs = []*types.T{types.Int}
rpcLayer = makeMockFlowStreamRPCLayer()
)
outbox, err := NewOutbox(testAllocator, input, typs, nil /* metadataSources */, nil /* toClose */)
outbox, err := NewOutbox(testAllocator, input, typs, nil /* metadataSources */, nil /* toClose */, nil /* getStats */)
require.NoError(t, err)

// This test relies on the fact that BatchBuffer panics when there are no
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestOutboxDrainsMetadataSources(t *testing.T) {
return nil
},
},
}, nil /* toClose */)
}, nil /* toClose */, nil /* getStats */)
if err != nil {
return nil, nil, err
}
Expand Down
Loading

0 comments on commit d7748a9

Please sign in to comment.