From 6cfd6ea76c86d9e1dfced75d8c33e1ee70a592fd Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 1 Oct 2024 07:59:17 -0700 Subject: [PATCH 01/14] (batchprocessor) Add tracing support --- processor/batchprocessor/batch_processor.go | 189 ++++++++++++++++++-- 1 file changed, 177 insertions(+), 12 deletions(-) diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 3ad07597bec..e2fe48456b8 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -14,6 +14,7 @@ import ( "time" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "go.opentelemetry.io/collector/client" @@ -24,6 +25,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/batchprocessor/internal/metadata" ) // errTooManyBatchers is returned when the MetadataCardinalityLimit has been reached. @@ -63,6 +65,9 @@ type batchProcessor struct { // batcher will be either *singletonBatcher or *multiBatcher batcher batcher + + // tracer is the configured tracer + tracer trace.Tracer } // batcher is describes a *singletonBatcher or *multiBatcher. @@ -93,11 +98,34 @@ type shard struct { timer *time.Timer // newItem is used to receive data items from producers. - newItem chan any + newItem chan dataItem // batch is an in-flight data item containing one of the // underlying data types. batch batch + + // pending describes the contributors to the current batch. + pending []pendingItem + + // totalSent counts the number of items processed by the + // shard in its lifetime. + totalSent uint64 +} + +// pendingItem is stored parallel to a pending batch and records +// how many items the waiter submitted, used to ensure the correct +// response count is returned to each waiter. It is also used +// inside sendItems() to represent a partially complete batch. +type pendingItem struct { + parentCtx context.Context + numItems int +} + +// dataItem is exchanged between the waiter and the batching process +// includes the pendingItem and its data. +type dataItem struct { + data any + pendingItem } // batch is an interface generalizing the individual signal types. @@ -140,6 +168,7 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat shutdownC: make(chan struct{}, 1), metadataKeys: mks, metadataLimit: int(cfg.MetadataCardinalityLimit), + tracer: set.TelemetrySettings.TracerProvider.Tracer(metadata.ScopeName), } if len(bp.metadataKeys) == 0 { bp.batcher = &singleShardBatcher{ @@ -168,7 +197,7 @@ func (bp *batchProcessor) newShard(md map[string][]string) *shard { }) b := &shard{ processor: bp, - newItem: make(chan any, runtime.NumCPU()), + newItem: make(chan dataItem, runtime.NumCPU()), exportCtx: exportCtx, batch: bp.batchFunc(), } @@ -228,7 +257,7 @@ func (b *shard) startLoop() { } return case item := <-b.newItem: - if item == nil { + if item.data == nil { continue } b.processItem(item) @@ -241,8 +270,21 @@ func (b *shard) startLoop() { } } -func (b *shard) processItem(item any) { - b.batch.add(item) +func (b *shard) processItem(item dataItem) { + before := b.batch.itemCount() + b.batch.add(item.data) + after := b.batch.itemCount() + + totalItems := after - before + b.pending = append(b.pending, pendingItem{ + parentCtx: item.parentCtx, + numItems: totalItems, + }) + + b.flushItems() +} + +func (b *shard) flushItems() { sent := false for b.batch.itemCount() > 0 && (!b.hasTimer() || b.batch.itemCount() >= b.processor.sendBatchSize) { sent = true @@ -274,8 +316,76 @@ func (b *shard) resetTimer() { func (b *shard) sendItems(trigger trigger) { sent, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize) - err := b.batch.export(b.exportCtx, req) - if err != nil { + var thisBatch []pendingItem + + numItemsBefore := b.totalSent + numItemsAfter := b.totalSent + uint64(sent) + + // The current batch can contain items from several different + // producers. Update pending to correctly track contexts + // included in the current batch. + for len(b.pending) > 0 && numItemsBefore < numItemsAfter { + if numItemsBefore+uint64(b.pending[0].numItems) > numItemsAfter { + // Waiter only had some items in the current batch + partialSent := int(numItemsAfter - numItemsBefore) + numItemsBefore = numItemsAfter + b.pending[0].numItems -= partialSent + thisBatch = append(thisBatch, pendingItem{ + numItems: partialSent, + parentCtx: b.pending[0].parentCtx, + }) + } else { + // This item will be completely processed. + numItemsBefore += uint64(b.pending[0].numItems) + thisBatch = append(thisBatch, pendingItem{ + numItems: b.pending[0].numItems, + parentCtx: b.pending[0].parentCtx, + }) + + // Shift the pending array, to allow it to be re-used. + copy(b.pending[0:len(b.pending)-1], b.pending[1:]) + b.pending = b.pending[:len(b.pending)-1] + } + } + + b.totalSent = numItemsAfter + + var err error + + var parentSpan trace.Span + var parent context.Context + isSingleCtx := allSameContext(thisBatch) + + // If incoming requests are sufficiently large, there + // will be one context, in which case no need to create a new + // root span. + if isSingleCtx { + parent = thisBatch[0].parentCtx + parent, parentSpan = b.processor.tracer.Start(parent, "batch_processor/export") + } else { + spans := parentSpans(thisBatch) + + links := make([]trace.Link, len(spans)) + for i, span := range spans { + links[i] = trace.Link{SpanContext: span.SpanContext()} + } + parent, parentSpan = b.processor.tracer.Start(b.exportCtx, "batch_processor/export", trace.WithLinks(links...)) + + // Note: linking in the opposite direction. + // This could be inferred by the trace + // backend, but this adds helpful information + // in cases where sampling may break links. + // See https://github.com/open-telemetry/opentelemetry-specification/issues/1877 + for _, span := range spans { + span.AddLink(trace.Link{SpanContext: parentSpan.SpanContext()}) + } + } + // Note: call End() before returning to caller contexts, otherwise + // trace-based tests will not recognize unfinished spans when the test + // terminates. + parentSpan.End() + + if err = b.batch.export(parent, req); err != nil { b.processor.logger.Warn("Sender failed", zap.Error(err)) return } @@ -286,6 +396,63 @@ func (b *shard) sendItems(trigger trigger) { b.processor.telemetry.record(trigger, int64(sent), int64(bytes)) } +// parentSpans computes the set of distinct span contexts and returns +// the corresponding set of active span objects. +func parentSpans(x []pendingItem) []trace.Span { + var spans []trace.Span + unique := make(map[context.Context]bool) + for i := range x { + _, ok := unique[x[i].parentCtx] + if ok { + continue + } + + unique[x[i].parentCtx] = true + + spans = append(spans, trace.SpanFromContext(x[i].parentCtx)) + } + + return spans +} + +// allSameContext is a helper function to check if a slice of contexts +// contains more than one unique context. +func allSameContext(x []pendingItem) bool { + for idx := range x[1:] { + if x[idx].parentCtx != x[0].parentCtx { + return false + } + } + return true +} + +func (b *shard) consumeBatch(ctx context.Context, data any) error { + var itemCount int + switch telem := data.(type) { + case ptrace.Traces: + itemCount = telem.SpanCount() + case pmetric.Metrics: + itemCount = telem.DataPointCount() + case plog.Logs: + itemCount = telem.LogRecordCount() + } + + if itemCount == 0 { + return nil + } + + item := dataItem{ + data: data, + pendingItem: pendingItem{ + parentCtx: ctx, + numItems: itemCount, + }, + } + + b.newItem <- item + return nil +} + // singleShardBatcher is used when metadataKeys is empty, to avoid the // additional lock and map operations used in multiBatcher. type singleShardBatcher struct { @@ -299,9 +466,8 @@ func (sb *singleShardBatcher) start(context.Context) error { return nil } -func (sb *singleShardBatcher) consume(_ context.Context, data any) error { - sb.single.newItem <- data - return nil +func (sb *singleShardBatcher) consume(ctx context.Context, data any) error { + return sb.single.consumeBatch(ctx, data) } func (sb *singleShardBatcher) currentMetadataCardinality() int { @@ -362,8 +528,7 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { } mb.lock.Unlock() } - b.(*shard).newItem <- data - return nil + return b.(*shard).consumeBatch(ctx, data) } func (mb *multiShardBatcher) currentMetadataCardinality() int { From 4d4572ed7515375053c59f32f959318078a1116d Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 1 Oct 2024 08:18:47 -0700 Subject: [PATCH 02/14] tests, chlog, tidy, readme --- .chloggen/batch-tracing.yaml | 25 ++ processor/batchprocessor/README.md | 9 + processor/batchprocessor/batch_processor.go | 1 + .../batchprocessor/batch_processor_test.go | 229 ++++++++++++++++++ processor/batchprocessor/go.mod | 7 +- processor/batchprocessor/go.sum | 18 ++ 6 files changed, 288 insertions(+), 1 deletion(-) create mode 100644 .chloggen/batch-tracing.yaml diff --git a/.chloggen/batch-tracing.yaml b/.chloggen/batch-tracing.yaml new file mode 100644 index 00000000000..82cb61b76f7 --- /dev/null +++ b/.chloggen/batch-tracing.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: batchprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add tracing instrumentation to the batch processor component. + +# One or more tracking issues or pull requests related to the change +issues: [11308] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/batchprocessor/README.md b/processor/batchprocessor/README.md index 36781d3931c..a61139ea43a 100644 --- a/processor/batchprocessor/README.md +++ b/processor/batchprocessor/README.md @@ -114,3 +114,12 @@ metadata-key values. The number of batch processors currently in use is exported as the `otelcol_processor_batch_metadata_cardinality` metric. + +## Tracing support + +This processor is traced using a span name `batch_processor/export` +for each exported batch. If the batch consists of a single parent +context, the context will be used and a child span created. If the +batch consists of multiple parent contexts, then a new root span is +created and Span links are created in both directions to maintain +connectivity between the involved traces. diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index e2fe48456b8..5f6672d05ed 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -344,6 +344,7 @@ func (b *shard) sendItems(trigger trigger) { // Shift the pending array, to allow it to be re-used. copy(b.pending[0:len(b.pending)-1], b.pending[1:]) + b.pending[len(b.pending)-1] = pendingItem{} b.pending = b.pending[:len(b.pending)-1] } } diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index 8ab9aca8565..86c1c78a309 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -13,22 +13,58 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/client" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/testdata" + "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processortest" ) +// sendTraces asynchronously sends a batch of trace data. +func sendTraces(ctx context.Context, t *testing.T, batcher processor.Traces, wg *sync.WaitGroup, td ptrace.Traces) { + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, batcher.ConsumeTraces(ctx, td)) + }() +} + +// sendMetrics asynchronously sends a batch of metrics data. +func sendMetrics(ctx context.Context, t *testing.T, batcher processor.Metrics, wg *sync.WaitGroup, md pmetric.Metrics) { + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, batcher.ConsumeMetrics(ctx, md)) + }() +} + +// sendLogs asynchronously sends a batch of log data. +func sendLogs(ctx context.Context, t *testing.T, batcher processor.Logs, wg *sync.WaitGroup, ld plog.Logs) { + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, batcher.ConsumeLogs(ctx, ld)) + }() +} + func TestProcessorShutdown(t *testing.T) { factory := NewFactory() @@ -1392,3 +1428,196 @@ func TestBatchSplitOnly(t *testing.T) { require.Equal(t, maxBatch, ld.LogRecordCount()) } } + +func TestBatchProcessorUnbrokenParentContextSingle(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.SendBatchSize = 100 + cfg.SendBatchMaxSize = 100 + cfg.Timeout = 3 * time.Second + requestCount := 10 + spansPerRequest := 5249 + exp := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exp), + ) + otel.SetTracerProvider(tp) + tracer := tp.Tracer("otel") + bg, rootSp := tracer.Start(context.Background(), "test_parent") + + createSet := exporter.Settings{ + ID: component.MustNewID("test_exporter"), + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + } + + createSet.TelemetrySettings.TracerProvider = tp + + opt := exporterhelper.WithQueue(exporterhelper.QueueSettings{ + Enabled: false, + }) + next, err := exporterhelper.NewTracesExporter(bg, createSet, Config{}, func(context.Context, ptrace.Traces) error { return nil }, opt) + require.NoError(t, err) + + processorSet := processortest.NewNopSettings() + processorSet.MetricsLevel = configtelemetry.LevelDetailed + processorSet.TracerProvider = tp + bp, err := newBatchTracesProcessor(processorSet, next, cfg) + require.NoError(t, err) + require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) + + sentResourceSpans := ptrace.NewTraces().ResourceSpans() + var wg sync.WaitGroup + for requestNum := 0; requestNum < requestCount; requestNum++ { + td := testdata.GenerateTraces(spansPerRequest) + spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { + spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex)) + } + td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty()) + sendTraces(bg, t, bp, &wg, td) + } + wg.Wait() + rootSp.End() + + // Because the callers return early, shutdown is the only way to + // wait for the batch processor to flush. + require.NoError(t, bp.Shutdown(context.Background())) + + // need to flush tracerprovider + tp.ForceFlush(bg) + td := exp.GetSpans() + numBatches := float64(spansPerRequest*requestCount) / float64(cfg.SendBatchMaxSize) + assert.Len(t, td, 2*int(math.Ceil(numBatches))+1) + for _, span := range td { + switch span.Name { + case "batch_processor/export": + // more test below + case "exporter/test_exporter/traces": + continue + case "test_parent": + continue + default: + t.Error("unexpected span name:", span.Name) + } + // confirm parent is rootSp + assert.Equal(t, span.Parent, rootSp.SpanContext()) + } + + require.NoError(t, tp.Shutdown(context.Background())) +} + +func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.SendBatchSize = 100 + cfg.SendBatchMaxSize = 100 + cfg.Timeout = 3 * time.Second + requestCount := 50 + // keep spansPerRequest small to ensure multiple contexts end up in the same batch. + spansPerRequest := 5 + exp := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exp), + ) + otel.SetTracerProvider(tp) + tracer := tp.Tracer("otel") + bg := context.Background() + + createSet := exporter.Settings{ + ID: component.MustNewID("test_exporter"), + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + } + createSet.TelemetrySettings.TracerProvider = tp + opt := exporterhelper.WithQueue(exporterhelper.QueueSettings{ + Enabled: false, + }) + next, err := exporterhelper.NewTracesExporter(bg, createSet, Config{}, func(context.Context, ptrace.Traces) error { return nil }, opt) + require.NoError(t, err) + + processorSet := processortest.NewNopSettings() + processorSet.MetricsLevel = configtelemetry.LevelDetailed + processorSet.TracerProvider = tp + bp, err := newBatchTracesProcessor(processorSet, next, cfg) + require.NoError(t, err) + require.NoError(t, bp.Start(bg, componenttest.NewNopHost())) + + var endLater []trace.Span + mkCtx := func() context.Context { + ctx, span := tracer.Start(bg, "test_context") + endLater = append(endLater, span) + return ctx + } + callCtxs := []context.Context{ + mkCtx(), + mkCtx(), + mkCtx(), + } + + sentResourceSpans := ptrace.NewTraces().ResourceSpans() + var wg sync.WaitGroup + for requestNum := 0; requestNum < requestCount; requestNum++ { + num := requestNum % len(callCtxs) + td := testdata.GenerateTraces(spansPerRequest) + spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { + spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex)) + } + td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty()) + sendTraces(callCtxs[num], t, bp, &wg, td) + } + wg.Wait() + + // Because the callers return early, shutdown is the only way to + // wait for the batch processor to flush. + require.NoError(t, bp.Shutdown(context.Background())) + + // Flush and reset the internal traces exporter. + tp.ForceFlush(bg) + td := exp.GetSpans() + exp.Reset() + + // Expect 2 spans per batch, one exporter and one batch processor. + numBatches := float64(spansPerRequest*requestCount) / float64(cfg.SendBatchMaxSize) + assert.Len(t, td, 2*int(math.Ceil(numBatches))) + + var expectSpanCtxs []trace.SpanContext + for _, span := range endLater { + expectSpanCtxs = append(expectSpanCtxs, span.SpanContext()) + } + for _, span := range td { + switch span.Name { + case "batch_processor/export": + // more test below + case "exporter/test_exporter/traces": + continue + default: + t.Error("unexpected span name:", span.Name) + } + assert.Len(t, span.Links, len(callCtxs)) + + var haveSpanCtxs []trace.SpanContext + for _, link := range span.Links { + haveSpanCtxs = append(haveSpanCtxs, link.SpanContext) + } + + assert.ElementsMatch(t, expectSpanCtxs, haveSpanCtxs) + } + + // End the parent spans + for _, span := range endLater { + span.End() + } + + tp.ForceFlush(bg) + td = exp.GetSpans() + + assert.Len(t, td, len(callCtxs)) + for _, span := range td { + switch span.Name { + case "test_context": + default: + t.Error("unexpected span name:", span.Name) + } + assert.NotEmpty(t, span.Links) + } + + require.NoError(t, tp.Shutdown(context.Background())) +} diff --git a/processor/batchprocessor/go.mod b/processor/batchprocessor/go.mod index 3966c3d18bb..f495e6e49b5 100644 --- a/processor/batchprocessor/go.mod +++ b/processor/batchprocessor/go.mod @@ -10,11 +10,13 @@ require ( go.opentelemetry.io/collector/confmap v1.16.0 go.opentelemetry.io/collector/consumer v0.110.0 go.opentelemetry.io/collector/consumer/consumertest v0.110.0 + go.opentelemetry.io/collector/exporter v0.110.0 go.opentelemetry.io/collector/pdata v1.16.0 go.opentelemetry.io/collector/pdata/testdata v0.110.0 go.opentelemetry.io/collector/processor v0.110.0 go.opentelemetry.io/otel v1.30.0 go.opentelemetry.io/otel/metric v1.30.0 + go.opentelemetry.io/otel/sdk v1.30.0 go.opentelemetry.io/otel/sdk/metric v1.30.0 go.opentelemetry.io/otel/trace v1.30.0 go.uber.org/goleak v1.3.0 @@ -22,6 +24,7 @@ require ( ) require ( + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -38,12 +41,14 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/collector/component/componentstatus v0.110.0 // indirect + go.opentelemetry.io/collector/config/configretry v1.16.0 // indirect go.opentelemetry.io/collector/consumer/consumerprofiles v0.110.0 // indirect + go.opentelemetry.io/collector/extension v0.110.0 // indirect + go.opentelemetry.io/collector/extension/experimental/storage v0.110.0 // indirect go.opentelemetry.io/collector/internal/globalsignal v0.110.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.110.0 // indirect go.opentelemetry.io/collector/pipeline v0.110.0 // indirect go.opentelemetry.io/collector/processor/processorprofiles v0.110.0 // indirect - go.opentelemetry.io/otel/sdk v1.30.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.28.0 // indirect golang.org/x/sys v0.25.0 // indirect diff --git a/processor/batchprocessor/go.sum b/processor/batchprocessor/go.sum index a66d321512b..e34f7442f53 100644 --- a/processor/batchprocessor/go.sum +++ b/processor/batchprocessor/go.sum @@ -1,3 +1,5 @@ +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -48,6 +50,22 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/collector/component/componentprofiles v0.110.0 h1:YH43aYKPYfnC0TqgI+WlbHsJkTvQPO3ImJybK3oyxQ8= +go.opentelemetry.io/collector/component/componentprofiles v0.110.0/go.mod h1:ZDVFaOhCt6ce2u/HHwqxoV5f+8P2dh0Xut8laYRu4+o= +go.opentelemetry.io/collector/config/configretry v1.16.0 h1:GUjZO3uc35vJyMeccUUY2ozYSD8jLGR2Dt7d0NQN0C8= +go.opentelemetry.io/collector/config/configretry v1.16.0/go.mod h1:KvQF5cfphq1rQm1dKR4eLDNQYw6iI2fY72NMZVa+0N0= +go.opentelemetry.io/collector/exporter v0.110.0 h1:9XIzyk/xlNuSCfwEebJO9uiAlC4hjwhUSZbYv4JAXEE= +go.opentelemetry.io/collector/exporter v0.110.0/go.mod h1:Nr3aSDaak4j8tOCRqp4gUhsYloXwnhZnQ/sz0Qqb+yY= +go.opentelemetry.io/collector/exporter/exporterprofiles v0.110.0 h1:zq3RDDYX7jKTNEJgFbbfAtjeOtMU+doabpZzIyRoRv0= +go.opentelemetry.io/collector/exporter/exporterprofiles v0.110.0/go.mod h1:dUMXYGiNnjaRvD120peFUe6XlJhk8LqbQq2C6sXBkgY= +go.opentelemetry.io/collector/extension v0.110.0 h1:AYFk57W25f7xOo3I6pV0rWNWVtOLZsW+lzFCctnvCkU= +go.opentelemetry.io/collector/extension v0.110.0/go.mod h1:zD/pw9o83SFyn/DCbBdBcH0eUPyGtYgpMSAOqotFYRc= +go.opentelemetry.io/collector/extension/experimental/storage v0.110.0 h1:G1xkNGiBkdSrdhhU5VLE9+y7sZ5fU1/CHps92KSYDLc= +go.opentelemetry.io/collector/extension/experimental/storage v0.110.0/go.mod h1:0XFrIUcbqjsSycNI6Vu7ndMnjSkglMnD2YtUl2ZrzIU= +go.opentelemetry.io/collector/receiver v0.110.0 h1:uv+mCadEpWT7yoRvMil1bY44aZbZ7y4oAqsjvypf+t4= +go.opentelemetry.io/collector/receiver v0.110.0/go.mod h1:rTNskk6R+8bU4dlAB1IgdwkIiBl44+C6qcvdyarAyF0= +go.opentelemetry.io/collector/receiver/receiverprofiles v0.110.0 h1:QDbKYVQFlQJfo05qS8O0zyZghxeGmxlVUKIuIJQST6U= +go.opentelemetry.io/collector/receiver/receiverprofiles v0.110.0/go.mod h1:DsNqyNWfax62zb1y2ek2ERzrEAiaJocSfc+QLtHNnxI= go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= go.opentelemetry.io/otel/metric v1.30.0 h1:4xNulvn9gjzo4hjg+wzIKG7iNFEaBMX00Qd4QIZs7+w= From 194e027f9edfd40ca84df8bda122cf276f1ea3a0 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 1 Oct 2024 08:22:16 -0700 Subject: [PATCH 03/14] edit --- processor/batchprocessor/README.md | 2 +- processor/batchprocessor/batch_processor.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/processor/batchprocessor/README.md b/processor/batchprocessor/README.md index a61139ea43a..a3e62855bb1 100644 --- a/processor/batchprocessor/README.md +++ b/processor/batchprocessor/README.md @@ -117,7 +117,7 @@ The number of batch processors currently in use is exported as the ## Tracing support -This processor is traced using a span name `batch_processor/export` +This processor is traced using a span named `batch_processor/export` for each exported batch. If the batch consists of a single parent context, the context will be used and a child span created. If the batch consists of multiple parent contexts, then a new root span is diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 5f6672d05ed..3ddd2330916 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -112,10 +112,10 @@ type shard struct { totalSent uint64 } -// pendingItem is stored parallel to a pending batch and records -// how many items the waiter submitted, used to ensure the correct -// response count is returned to each waiter. It is also used -// inside sendItems() to represent a partially complete batch. +// pendingItem is stored parallel to a pending batch and records how +// many items the waiter submitted, used to match trace contexts with +// batches. It is also used inside sendItems() to represent a +// partially complete batch. type pendingItem struct { parentCtx context.Context numItems int From 80acd00925535bcde48fc695b3c0193bf69e50bd Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 1 Oct 2024 11:25:13 -0700 Subject: [PATCH 04/14] test all signals --- .../batchprocessor/batch_processor_test.go | 172 +++++++++++++----- 1 file changed, 122 insertions(+), 50 deletions(-) diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index 86c1c78a309..c34fb7e2115 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -1429,13 +1429,99 @@ func TestBatchSplitOnly(t *testing.T) { } } +type ( + testSender func(ctx context.Context, t *testing.T, bp *batchProcessor, wg *sync.WaitGroup, cnt int) + + testCreate func(ctx context.Context, cfg *Config, procSet processor.Settings, exportSet exporter.Settings) *batchProcessor + + testProfile struct { + sender testSender + create testCreate + } +) + +func traceTestProfiles(t *testing.T) map[string]testProfile { + exportOpts := exporterhelper.WithQueue( + exporterhelper.QueueConfig{ + Enabled: false, + }, + ) + + return map[string]testProfile{ + "traces": testProfile{ + sender: func(ctx context.Context, t *testing.T, bp *batchProcessor, wg *sync.WaitGroup, cnt int) { + sendTraces(ctx, t, bp, wg, testdata.GenerateTraces(cnt)) + }, + create: func(ctx context.Context, cfg *Config, procSet processor.Settings, exportSet exporter.Settings) *batchProcessor { + next, err := exporterhelper.NewTracesExporter( + ctx, + exportSet, + *cfg, + func(context.Context, ptrace.Traces) error { return nil }, + exportOpts, + ) + require.NoError(t, err) + bp, err := newBatchTracesProcessor(procSet, next, cfg) + require.NoError(t, err) + return bp + }, + }, + "metrics": testProfile{ + sender: func(ctx context.Context, t *testing.T, bp *batchProcessor, wg *sync.WaitGroup, cnt int) { + // Note that GenerateMetrics creates two points per requested point, + // so we halve the number and are required to use even counts. + sendMetrics(ctx, t, bp, wg, testdata.GenerateMetrics(cnt/2)) + }, + create: func(ctx context.Context, cfg *Config, procSet processor.Settings, exportSet exporter.Settings) *batchProcessor { + next, err := exporterhelper.NewMetricsExporter( + ctx, + exportSet, + *cfg, + func(context.Context, pmetric.Metrics) error { return nil }, + exportOpts, + ) + require.NoError(t, err) + bp, err := newBatchMetricsProcessor(procSet, next, cfg) + require.NoError(t, err) + return bp + }, + }, + "logs": testProfile{ + sender: func(ctx context.Context, t *testing.T, bp *batchProcessor, wg *sync.WaitGroup, cnt int) { + sendLogs(ctx, t, bp, wg, testdata.GenerateLogs(cnt)) + }, + create: func(ctx context.Context, cfg *Config, procSet processor.Settings, exportSet exporter.Settings) *batchProcessor { + next, err := exporterhelper.NewLogsExporter( + ctx, + exportSet, + *cfg, + func(context.Context, plog.Logs) error { return nil }, + exportOpts, + ) + require.NoError(t, err) + bp, err := newBatchLogsProcessor(procSet, next, cfg) + require.NoError(t, err) + return bp + }, + }, + } +} + func TestBatchProcessorUnbrokenParentContextSingle(t *testing.T) { + for signal, profile := range traceTestProfiles(t) { + t.Run(signal, func(t *testing.T) { + testBatchProcessorUnbrokenParentContextSingle(t, signal, profile) + }) + } +} + +func testBatchProcessorUnbrokenParentContextSingle(t *testing.T, signal string, profile testProfile) { cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 100 cfg.SendBatchMaxSize = 100 cfg.Timeout = 3 * time.Second requestCount := 10 - spansPerRequest := 5249 + itemsPerRequest := 5248 // Has to be even, see comment re: GenerateMetrics() above exp := tracetest.NewInMemoryExporter() tp := sdktrace.NewTracerProvider( sdktrace.WithBatcher(exp), @@ -1444,36 +1530,23 @@ func TestBatchProcessorUnbrokenParentContextSingle(t *testing.T) { tracer := tp.Tracer("otel") bg, rootSp := tracer.Start(context.Background(), "test_parent") - createSet := exporter.Settings{ + exportSet := exporter.Settings{ ID: component.MustNewID("test_exporter"), TelemetrySettings: componenttest.NewNopTelemetrySettings(), } - createSet.TelemetrySettings.TracerProvider = tp + exportSet.TelemetrySettings.TracerProvider = tp - opt := exporterhelper.WithQueue(exporterhelper.QueueSettings{ - Enabled: false, - }) - next, err := exporterhelper.NewTracesExporter(bg, createSet, Config{}, func(context.Context, ptrace.Traces) error { return nil }, opt) - require.NoError(t, err) + procSet := processortest.NewNopSettings() + procSet.MetricsLevel = configtelemetry.LevelDetailed + procSet.TracerProvider = tp - processorSet := processortest.NewNopSettings() - processorSet.MetricsLevel = configtelemetry.LevelDetailed - processorSet.TracerProvider = tp - bp, err := newBatchTracesProcessor(processorSet, next, cfg) - require.NoError(t, err) + bp := profile.create(bg, cfg, procSet, exportSet) require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) - sentResourceSpans := ptrace.NewTraces().ResourceSpans() var wg sync.WaitGroup for requestNum := 0; requestNum < requestCount; requestNum++ { - td := testdata.GenerateTraces(spansPerRequest) - spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() - for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { - spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex)) - } - td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty()) - sendTraces(bg, t, bp, &wg, td) + profile.sender(bg, t, bp, &wg, itemsPerRequest) } wg.Wait() rootSp.End() @@ -1484,14 +1557,15 @@ func TestBatchProcessorUnbrokenParentContextSingle(t *testing.T) { // need to flush tracerprovider tp.ForceFlush(bg) - td := exp.GetSpans() - numBatches := float64(spansPerRequest*requestCount) / float64(cfg.SendBatchMaxSize) - assert.Len(t, td, 2*int(math.Ceil(numBatches))+1) - for _, span := range td { + + traces := exp.GetSpans() + numBatches := float64(itemsPerRequest*requestCount) / float64(cfg.SendBatchMaxSize) + assert.Len(t, traces, 2*int(math.Ceil(numBatches))+1) + for _, span := range traces { switch span.Name { case "batch_processor/export": // more test below - case "exporter/test_exporter/traces": + case "exporter/test_exporter/" + signal: continue case "test_parent": continue @@ -1506,13 +1580,22 @@ func TestBatchProcessorUnbrokenParentContextSingle(t *testing.T) { } func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) { + for signal, profile := range traceTestProfiles(t) { + t.Run(signal, func(t *testing.T) { + testBatchProcessorUnbrokenParentContextMultiple(t, signal, profile) + }) + } +} + +func testBatchProcessorUnbrokenParentContextMultiple(t *testing.T, signal string, profile testProfile) { cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 100 cfg.SendBatchMaxSize = 100 cfg.Timeout = 3 * time.Second requestCount := 50 - // keep spansPerRequest small to ensure multiple contexts end up in the same batch. - spansPerRequest := 5 + // keep itemsPerRequest small to ensure multiple contexts end up in the same batch. + // this number has to be even, see comment on GenerateMetrics above. + itemsPerRequest := 6 exp := tracetest.NewInMemoryExporter() tp := sdktrace.NewTracerProvider( sdktrace.WithBatcher(exp), @@ -1521,22 +1604,18 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) { tracer := tp.Tracer("otel") bg := context.Background() - createSet := exporter.Settings{ + exportSet := exporter.Settings{ ID: component.MustNewID("test_exporter"), TelemetrySettings: componenttest.NewNopTelemetrySettings(), } - createSet.TelemetrySettings.TracerProvider = tp - opt := exporterhelper.WithQueue(exporterhelper.QueueSettings{ - Enabled: false, - }) - next, err := exporterhelper.NewTracesExporter(bg, createSet, Config{}, func(context.Context, ptrace.Traces) error { return nil }, opt) - require.NoError(t, err) + exportSet.TelemetrySettings.TracerProvider = tp + + procSet := processortest.NewNopSettings() + procSet.MetricsLevel = configtelemetry.LevelDetailed + procSet.TracerProvider = tp + + bp := profile.create(bg, cfg, procSet, exportSet) - processorSet := processortest.NewNopSettings() - processorSet.MetricsLevel = configtelemetry.LevelDetailed - processorSet.TracerProvider = tp - bp, err := newBatchTracesProcessor(processorSet, next, cfg) - require.NoError(t, err) require.NoError(t, bp.Start(bg, componenttest.NewNopHost())) var endLater []trace.Span @@ -1551,17 +1630,10 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) { mkCtx(), } - sentResourceSpans := ptrace.NewTraces().ResourceSpans() var wg sync.WaitGroup for requestNum := 0; requestNum < requestCount; requestNum++ { num := requestNum % len(callCtxs) - td := testdata.GenerateTraces(spansPerRequest) - spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() - for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { - spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex)) - } - td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty()) - sendTraces(callCtxs[num], t, bp, &wg, td) + profile.sender(callCtxs[num], t, bp, &wg, itemsPerRequest) } wg.Wait() @@ -1575,7 +1647,7 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) { exp.Reset() // Expect 2 spans per batch, one exporter and one batch processor. - numBatches := float64(spansPerRequest*requestCount) / float64(cfg.SendBatchMaxSize) + numBatches := float64(itemsPerRequest*requestCount) / float64(cfg.SendBatchMaxSize) assert.Len(t, td, 2*int(math.Ceil(numBatches))) var expectSpanCtxs []trace.SpanContext @@ -1586,7 +1658,7 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) { switch span.Name { case "batch_processor/export": // more test below - case "exporter/test_exporter/traces": + case "exporter/test_exporter/" + signal: continue default: t.Error("unexpected span name:", span.Name) From 5cdd3daccf41cbea774177275b98fd577a75261e Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 1 Oct 2024 11:29:47 -0700 Subject: [PATCH 05/14] Update processor/batchprocessor/batch_processor.go Co-authored-by: Bogdan Drutu --- processor/batchprocessor/batch_processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 3ddd2330916..8effb6c5574 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -168,7 +168,7 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat shutdownC: make(chan struct{}, 1), metadataKeys: mks, metadataLimit: int(cfg.MetadataCardinalityLimit), - tracer: set.TelemetrySettings.TracerProvider.Tracer(metadata.ScopeName), + tracer: metadata.Tracer(set.TelemetrySettings), } if len(bp.metadataKeys) == 0 { bp.batcher = &singleShardBatcher{ From 2488de6a23234e7c25d36b5c73898270875dff1f Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 1 Oct 2024 12:37:06 -0700 Subject: [PATCH 06/14] simplified --- processor/batchprocessor/batch_processor.go | 35 +++++-------------- .../batchprocessor/batch_processor_test.go | 19 +++++++--- 2 files changed, 24 insertions(+), 30 deletions(-) diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 8effb6c5574..c748b488518 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -355,12 +355,11 @@ func (b *shard) sendItems(trigger trigger) { var parentSpan trace.Span var parent context.Context - isSingleCtx := allSameContext(thisBatch) // If incoming requests are sufficiently large, there // will be one context, in which case no need to create a new // root span. - if isSingleCtx { + if len(thisBatch) == 1 { parent = thisBatch[0].parentCtx parent, parentSpan = b.processor.tracer.Start(parent, "batch_processor/export") } else { @@ -372,11 +371,14 @@ func (b *shard) sendItems(trigger trigger) { } parent, parentSpan = b.processor.tracer.Start(b.exportCtx, "batch_processor/export", trace.WithLinks(links...)) - // Note: linking in the opposite direction. - // This could be inferred by the trace - // backend, but this adds helpful information - // in cases where sampling may break links. - // See https://github.com/open-telemetry/opentelemetry-specification/issues/1877 + // Note: linking in the opposite direction. This + // could be inferred by the trace backend, but this + // adds helpful information in cases where sampling + // may break links. See + // https://github.com/open-telemetry/opentelemetry-specification/issues/1877 + // Note that there is a possibility that the span has + // already ended (if EarlyReturn or context canceled), + // in which case this becomes a no-op. for _, span := range spans { span.AddLink(trace.Link{SpanContext: parentSpan.SpanContext()}) } @@ -401,32 +403,13 @@ func (b *shard) sendItems(trigger trigger) { // the corresponding set of active span objects. func parentSpans(x []pendingItem) []trace.Span { var spans []trace.Span - unique := make(map[context.Context]bool) for i := range x { - _, ok := unique[x[i].parentCtx] - if ok { - continue - } - - unique[x[i].parentCtx] = true - spans = append(spans, trace.SpanFromContext(x[i].parentCtx)) } return spans } -// allSameContext is a helper function to check if a slice of contexts -// contains more than one unique context. -func allSameContext(x []pendingItem) bool { - for idx := range x[1:] { - if x[idx].parentCtx != x[0].parentCtx { - return false - } - } - return true -} - func (b *shard) consumeBatch(ctx context.Context, data any) error { var itemCount int switch telem := data.(type) { diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index c34fb7e2115..543bd509d34 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -1520,8 +1520,10 @@ func testBatchProcessorUnbrokenParentContextSingle(t *testing.T, signal string, cfg.SendBatchSize = 100 cfg.SendBatchMaxSize = 100 cfg.Timeout = 3 * time.Second + // Make itemsPerRequest match the batch size and it means single-batch exports, + // which avoid creating a new root span. requestCount := 10 - itemsPerRequest := 5248 // Has to be even, see comment re: GenerateMetrics() above + itemsPerRequest := 100 exp := tracetest.NewInMemoryExporter() tp := sdktrace.NewTracerProvider( sdktrace.WithBatcher(exp), @@ -1594,8 +1596,9 @@ func testBatchProcessorUnbrokenParentContextMultiple(t *testing.T, signal string cfg.Timeout = 3 * time.Second requestCount := 50 // keep itemsPerRequest small to ensure multiple contexts end up in the same batch. - // this number has to be even, see comment on GenerateMetrics above. - itemsPerRequest := 6 + // this number has to be even, see comment on GenerateMetrics above. Has to evenly + // divide batch size for the test logic below. + itemsPerRequest := 10 exp := tracetest.NewInMemoryExporter() tp := sdktrace.NewTracerProvider( sdktrace.WithBatcher(exp), @@ -1654,6 +1657,7 @@ func testBatchProcessorUnbrokenParentContextMultiple(t *testing.T, signal string for _, span := range endLater { expectSpanCtxs = append(expectSpanCtxs, span.SpanContext()) } + linksTotal := 0 for _, span := range td { switch span.Name { case "batch_processor/export": @@ -1663,16 +1667,23 @@ func testBatchProcessorUnbrokenParentContextMultiple(t *testing.T, signal string default: t.Error("unexpected span name:", span.Name) } - assert.Len(t, span.Links, len(callCtxs)) + linksTotal += len(span.Links) var haveSpanCtxs []trace.SpanContext + uniqSpanCtxs := make(map[trace.SpanID]struct{}) for _, link := range span.Links { + if _, ok := uniqSpanCtxs[link.SpanContext.SpanID()]; ok { + continue + } + uniqSpanCtxs[link.SpanContext.SpanID()] = struct{}{} haveSpanCtxs = append(haveSpanCtxs, link.SpanContext) } assert.ElementsMatch(t, expectSpanCtxs, haveSpanCtxs) } + assert.Equal(t, requestCount, linksTotal) + // End the parent spans for _, span := range endLater { span.End() From b12c78ae557f3048020d6ddd19261f631189b21f Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 1 Oct 2024 12:52:14 -0700 Subject: [PATCH 07/14] one more simplification --- processor/batchprocessor/batch_processor.go | 62 +++++++++------------ 1 file changed, 26 insertions(+), 36 deletions(-) diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index c748b488518..d057dfdbfb1 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -106,10 +106,6 @@ type shard struct { // pending describes the contributors to the current batch. pending []pendingItem - - // totalSent counts the number of items processed by the - // shard in its lifetime. - totalSent uint64 } // pendingItem is stored parallel to a pending batch and records how @@ -140,7 +136,7 @@ type batch interface { itemCount() int // add item to the current batch - add(item any) + add(item any) int // sizeBytes counts the OTLP encoding size of the batch sizeBytes(item any) int @@ -271,11 +267,8 @@ func (b *shard) startLoop() { } func (b *shard) processItem(item dataItem) { - before := b.batch.itemCount() - b.batch.add(item.data) - after := b.batch.itemCount() + totalItems := b.batch.add(item.data) - totalItems := after - before b.pending = append(b.pending, pendingItem{ parentCtx: item.parentCtx, numItems: totalItems, @@ -318,25 +311,25 @@ func (b *shard) sendItems(trigger trigger) { var thisBatch []pendingItem - numItemsBefore := b.totalSent - numItemsAfter := b.totalSent + uint64(sent) + // numToSend equals the number of points being sent not yet + // incorporated into thisBatch. It is decremented as the pending + // items are assembled. + numToSend := sent // The current batch can contain items from several different // producers. Update pending to correctly track contexts // included in the current batch. - for len(b.pending) > 0 && numItemsBefore < numItemsAfter { - if numItemsBefore+uint64(b.pending[0].numItems) > numItemsAfter { + for len(b.pending) > 0 && numToSend > 0 { + if b.pending[0].numItems > numToSend { // Waiter only had some items in the current batch - partialSent := int(numItemsAfter - numItemsBefore) - numItemsBefore = numItemsAfter - b.pending[0].numItems -= partialSent + b.pending[0].numItems -= numToSend thisBatch = append(thisBatch, pendingItem{ - numItems: partialSent, + numItems: numToSend, parentCtx: b.pending[0].parentCtx, }) } else { // This item will be completely processed. - numItemsBefore += uint64(b.pending[0].numItems) + numToSend -= b.pending[0].numItems thisBatch = append(thisBatch, pendingItem{ numItems: b.pending[0].numItems, parentCtx: b.pending[0].parentCtx, @@ -349,8 +342,6 @@ func (b *shard) sendItems(trigger trigger) { } } - b.totalSent = numItemsAfter - var err error var parentSpan trace.Span @@ -563,15 +554,14 @@ func newBatchTraces(nextConsumer consumer.Traces) *batchTraces { } // add updates current batchTraces by adding new TraceData object -func (bt *batchTraces) add(item any) { +func (bt *batchTraces) add(item any) int { td := item.(ptrace.Traces) newSpanCount := td.SpanCount() - if newSpanCount == 0 { - return + if newSpanCount != 0 { + bt.spanCount += newSpanCount + td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans()) } - - bt.spanCount += newSpanCount - td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans()) + return newSpanCount } func (bt *batchTraces) sizeBytes(data any) int { @@ -644,15 +634,15 @@ func (bm *batchMetrics) itemCount() int { return bm.dataPointCount } -func (bm *batchMetrics) add(item any) { +func (bm *batchMetrics) add(item any) int { md := item.(pmetric.Metrics) newDataPointCount := md.DataPointCount() - if newDataPointCount == 0 { - return + if newDataPointCount != 0 { + bm.dataPointCount += newDataPointCount + md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics()) } - bm.dataPointCount += newDataPointCount - md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics()) + return newDataPointCount } type batchLogs struct { @@ -696,13 +686,13 @@ func (bl *batchLogs) itemCount() int { return bl.logCount } -func (bl *batchLogs) add(item any) { +func (bl *batchLogs) add(item any) int { ld := item.(plog.Logs) newLogsCount := ld.LogRecordCount() - if newLogsCount == 0 { - return + if newLogsCount != 0 { + bl.logCount += newLogsCount + ld.ResourceLogs().MoveAndAppendTo(bl.logData.ResourceLogs()) } - bl.logCount += newLogsCount - ld.ResourceLogs().MoveAndAppendTo(bl.logData.ResourceLogs()) + return newLogsCount } From e7fe96d9bc9af1a2a498334f30a1742f8ca40e29 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 1 Oct 2024 12:53:33 -0700 Subject: [PATCH 08/14] lint --- processor/batchprocessor/batch_processor_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index 543bd509d34..ae16e6200f1 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -1448,7 +1448,7 @@ func traceTestProfiles(t *testing.T) map[string]testProfile { ) return map[string]testProfile{ - "traces": testProfile{ + "traces": { sender: func(ctx context.Context, t *testing.T, bp *batchProcessor, wg *sync.WaitGroup, cnt int) { sendTraces(ctx, t, bp, wg, testdata.GenerateTraces(cnt)) }, @@ -1466,7 +1466,7 @@ func traceTestProfiles(t *testing.T) map[string]testProfile { return bp }, }, - "metrics": testProfile{ + "metrics": { sender: func(ctx context.Context, t *testing.T, bp *batchProcessor, wg *sync.WaitGroup, cnt int) { // Note that GenerateMetrics creates two points per requested point, // so we halve the number and are required to use even counts. @@ -1486,7 +1486,7 @@ func traceTestProfiles(t *testing.T) map[string]testProfile { return bp }, }, - "logs": testProfile{ + "logs": { sender: func(ctx context.Context, t *testing.T, bp *batchProcessor, wg *sync.WaitGroup, cnt int) { sendLogs(ctx, t, bp, wg, testdata.GenerateLogs(cnt)) }, From 7d2594678e2330de0f2adf597e7de399180b0b65 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 1 Oct 2024 12:57:57 -0700 Subject: [PATCH 09/14] crosslink --- processor/batchprocessor/go.mod | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/processor/batchprocessor/go.mod b/processor/batchprocessor/go.mod index f495e6e49b5..b2468d2a547 100644 --- a/processor/batchprocessor/go.mod +++ b/processor/batchprocessor/go.mod @@ -93,3 +93,17 @@ replace go.opentelemetry.io/collector/processor/processorprofiles => ../processo replace go.opentelemetry.io/collector/pipeline => ../../pipeline replace go.opentelemetry.io/collector/internal/globalsignal => ../../internal/globalsignal + +replace go.opentelemetry.io/collector/extension => ../../extension + +replace go.opentelemetry.io/collector/config/configretry => ../../config/configretry + +replace go.opentelemetry.io/collector/exporter/exporterprofiles => ../../exporter/exporterprofiles + +replace go.opentelemetry.io/collector/extension/experimental/storage => ../../extension/experimental/storage + +replace go.opentelemetry.io/collector/receiver/receiverprofiles => ../../receiver/receiverprofiles + +replace go.opentelemetry.io/collector/receiver => ../../receiver + +replace go.opentelemetry.io/collector/exporter => ../../exporter From ff78f2292e74b18ab4537702d5f220d0944169bb Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 1 Oct 2024 13:06:47 -0700 Subject: [PATCH 10/14] whitespace --- processor/batchprocessor/batch_processor_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index ae16e6200f1..bbe3cc06924 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -13,7 +13,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" From ce923b46ba15528aa2fb02080b0ba081e08e6c40 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 1 Oct 2024 13:07:25 -0700 Subject: [PATCH 11/14] gosum --- processor/batchprocessor/go.sum | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/processor/batchprocessor/go.sum b/processor/batchprocessor/go.sum index e34f7442f53..5df87556447 100644 --- a/processor/batchprocessor/go.sum +++ b/processor/batchprocessor/go.sum @@ -50,22 +50,6 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.opentelemetry.io/collector/component/componentprofiles v0.110.0 h1:YH43aYKPYfnC0TqgI+WlbHsJkTvQPO3ImJybK3oyxQ8= -go.opentelemetry.io/collector/component/componentprofiles v0.110.0/go.mod h1:ZDVFaOhCt6ce2u/HHwqxoV5f+8P2dh0Xut8laYRu4+o= -go.opentelemetry.io/collector/config/configretry v1.16.0 h1:GUjZO3uc35vJyMeccUUY2ozYSD8jLGR2Dt7d0NQN0C8= -go.opentelemetry.io/collector/config/configretry v1.16.0/go.mod h1:KvQF5cfphq1rQm1dKR4eLDNQYw6iI2fY72NMZVa+0N0= -go.opentelemetry.io/collector/exporter v0.110.0 h1:9XIzyk/xlNuSCfwEebJO9uiAlC4hjwhUSZbYv4JAXEE= -go.opentelemetry.io/collector/exporter v0.110.0/go.mod h1:Nr3aSDaak4j8tOCRqp4gUhsYloXwnhZnQ/sz0Qqb+yY= -go.opentelemetry.io/collector/exporter/exporterprofiles v0.110.0 h1:zq3RDDYX7jKTNEJgFbbfAtjeOtMU+doabpZzIyRoRv0= -go.opentelemetry.io/collector/exporter/exporterprofiles v0.110.0/go.mod h1:dUMXYGiNnjaRvD120peFUe6XlJhk8LqbQq2C6sXBkgY= -go.opentelemetry.io/collector/extension v0.110.0 h1:AYFk57W25f7xOo3I6pV0rWNWVtOLZsW+lzFCctnvCkU= -go.opentelemetry.io/collector/extension v0.110.0/go.mod h1:zD/pw9o83SFyn/DCbBdBcH0eUPyGtYgpMSAOqotFYRc= -go.opentelemetry.io/collector/extension/experimental/storage v0.110.0 h1:G1xkNGiBkdSrdhhU5VLE9+y7sZ5fU1/CHps92KSYDLc= -go.opentelemetry.io/collector/extension/experimental/storage v0.110.0/go.mod h1:0XFrIUcbqjsSycNI6Vu7ndMnjSkglMnD2YtUl2ZrzIU= -go.opentelemetry.io/collector/receiver v0.110.0 h1:uv+mCadEpWT7yoRvMil1bY44aZbZ7y4oAqsjvypf+t4= -go.opentelemetry.io/collector/receiver v0.110.0/go.mod h1:rTNskk6R+8bU4dlAB1IgdwkIiBl44+C6qcvdyarAyF0= -go.opentelemetry.io/collector/receiver/receiverprofiles v0.110.0 h1:QDbKYVQFlQJfo05qS8O0zyZghxeGmxlVUKIuIJQST6U= -go.opentelemetry.io/collector/receiver/receiverprofiles v0.110.0/go.mod h1:DsNqyNWfax62zb1y2ek2ERzrEAiaJocSfc+QLtHNnxI= go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= go.opentelemetry.io/otel/metric v1.30.0 h1:4xNulvn9gjzo4hjg+wzIKG7iNFEaBMX00Qd4QIZs7+w= From 27cf98010822460d98effc205c7f240ed11c4aa4 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Wed, 2 Oct 2024 12:32:50 -0700 Subject: [PATCH 12/14] use fewer slices --- processor/batchprocessor/batch_processor.go | 150 +++++++++----------- 1 file changed, 67 insertions(+), 83 deletions(-) diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index d057dfdbfb1..5a316b8dcc4 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -120,8 +120,8 @@ type pendingItem struct { // dataItem is exchanged between the waiter and the batching process // includes the pendingItem and its data. type dataItem struct { - data any - pendingItem + data any + parentCtx context.Context } // batch is an interface generalizing the individual signal types. @@ -256,6 +256,8 @@ func (b *shard) startLoop() { if item.data == nil { continue } + // Important invariant. processItem() must return with the pending + // number of items less than the minimum batch size. b.processItem(item) case <-timerCh: if b.batch.itemCount() > 0 { @@ -274,6 +276,8 @@ func (b *shard) processItem(item dataItem) { numItems: totalItems, }) + // The call to flushItems() is necessary to maintain the invariant that + // after this call returns, the pending data is less than a full batch. b.flushItems() } @@ -307,82 +311,76 @@ func (b *shard) resetTimer() { } func (b *shard) sendItems(trigger trigger) { + // Note because of the invariant stated for processItems, we know the + // number of current waiters exceeds the batch by at most one entry. + // Therefore, we can enumerate the possibilities. + // + // 1. len(b.pending) == 1 where the item count of element[0] is <= batch size + // 2. len(b.pending) == 1 where the item count of element[0] is > batch size + // 3. len(b.pending) == N where N>1 and the item count in elements[0:N-1] is < batch size + // + // Importantly, in all cases, the batch will include a portion + // of data from all contexts in the pending slice. In case 2 + // there is always a single residual pendingItem. In case 3 there + // may or may not be a single residential pendingItem. + + pendingSize := b.batch.itemCount() sent, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize) - var thisBatch []pendingItem - - // numToSend equals the number of points being sent not yet - // incorporated into thisBatch. It is decremented as the pending - // items are assembled. - numToSend := sent - - // The current batch can contain items from several different - // producers. Update pending to correctly track contexts - // included in the current batch. - for len(b.pending) > 0 && numToSend > 0 { - if b.pending[0].numItems > numToSend { - // Waiter only had some items in the current batch - b.pending[0].numItems -= numToSend - thisBatch = append(thisBatch, pendingItem{ - numItems: numToSend, - parentCtx: b.pending[0].parentCtx, - }) - } else { - // This item will be completely processed. - numToSend -= b.pending[0].numItems - thisBatch = append(thisBatch, pendingItem{ - numItems: b.pending[0].numItems, - parentCtx: b.pending[0].parentCtx, - }) - - // Shift the pending array, to allow it to be re-used. - copy(b.pending[0:len(b.pending)-1], b.pending[1:]) - b.pending[len(b.pending)-1] = pendingItem{} - b.pending = b.pending[:len(b.pending)-1] - } - } + var rootCtx context.Context - var err error + // If the portion being sent belongs to the first item in the pending list, + // then there is a single context we can use. + if sent <= b.pending[0].numItems { + rootCtx = b.pending[0].parentCtx + } else { + rootCtx = b.exportCtx + } + ctx, parent := b.processor.tracer.Start(rootCtx, "batch_processor/export") + defer parent.End() + + // Note: linking spans in both directions. This could be + // inferred by the trace backend, but this adds helpful + // information in cases where sampling may break links. See + // https://github.com/open-telemetry/opentelemetry-specification/issues/1877 + // Note that there is a possibility that the span has already + // ended (if EarlyReturn or context canceled), in which case + // this becomes a no-op. + parentSpanContext := parent.SpanContext() + for _, pending := range b.pending { + span := trace.SpanFromContext(pending.parentCtx) + span.AddLink(trace.Link{SpanContext: parentSpanContext}) + parent.AddLink(trace.Link{SpanContext: span.SpanContext()}) + } - var parentSpan trace.Span - var parent context.Context + err := b.batch.export(ctx, req) - // If incoming requests are sufficiently large, there - // will be one context, in which case no need to create a new - // root span. - if len(thisBatch) == 1 { - parent = thisBatch[0].parentCtx - parent, parentSpan = b.processor.tracer.Start(parent, "batch_processor/export") + // Remember the last pending parent context, we may need it. + finalParent := b.pending[len(b.pending)-1].parentCtx + // Clear the array to permit GC of finished contexts while + // re-using the slice instead of a new allocation. + for i := range b.pending { + b.pending[i] = pendingItem{} + } + // There is either one or zero items left in the pending slice, + // according to the invariant discussed above. + if pendingSize == sent { + // The batch is fully sent, pending slice is clear. + b.pending = b.pending[:0] } else { - spans := parentSpans(thisBatch) - - links := make([]trace.Link, len(spans)) - for i, span := range spans { - links[i] = trace.Link{SpanContext: span.SpanContext()} - } - parent, parentSpan = b.processor.tracer.Start(b.exportCtx, "batch_processor/export", trace.WithLinks(links...)) - - // Note: linking in the opposite direction. This - // could be inferred by the trace backend, but this - // adds helpful information in cases where sampling - // may break links. See - // https://github.com/open-telemetry/opentelemetry-specification/issues/1877 - // Note that there is a possibility that the span has - // already ended (if EarlyReturn or context canceled), - // in which case this becomes a no-op. - for _, span := range spans { - span.AddLink(trace.Link{SpanContext: parentSpan.SpanContext()}) - } + // The batch is fully sent, pending slice keeps one item. + b.pending = b.pending[:1] + b.pending[0].parentCtx = finalParent + b.pending[0].numItems = pendingSize - sent } - // Note: call End() before returning to caller contexts, otherwise - // trace-based tests will not recognize unfinished spans when the test - // terminates. - parentSpan.End() - if err = b.batch.export(parent, req); err != nil { + if err != nil { b.processor.logger.Warn("Sender failed", zap.Error(err)) + // TODO: it seems incorrect not to call telemetry.record() + // for errors. Yes? return } + var bytes int if b.processor.telemetry.detailed { bytes = b.batch.sizeBytes(req) @@ -390,17 +388,6 @@ func (b *shard) sendItems(trigger trigger) { b.processor.telemetry.record(trigger, int64(sent), int64(bytes)) } -// parentSpans computes the set of distinct span contexts and returns -// the corresponding set of active span objects. -func parentSpans(x []pendingItem) []trace.Span { - var spans []trace.Span - for i := range x { - spans = append(spans, trace.SpanFromContext(x[i].parentCtx)) - } - - return spans -} - func (b *shard) consumeBatch(ctx context.Context, data any) error { var itemCount int switch telem := data.(type) { @@ -417,11 +404,8 @@ func (b *shard) consumeBatch(ctx context.Context, data any) error { } item := dataItem{ - data: data, - pendingItem: pendingItem{ - parentCtx: ctx, - numItems: itemCount, - }, + data: data, + parentCtx: ctx, } b.newItem <- item From 8c2c5eb442f6f24bd52e377121013a86d57cb9de Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Thu, 17 Oct 2024 15:11:51 -0700 Subject: [PATCH 13/14] crosslink again --- processor/batchprocessor/go.mod | 2 -- 1 file changed, 2 deletions(-) diff --git a/processor/batchprocessor/go.mod b/processor/batchprocessor/go.mod index f40198df66c..14615763397 100644 --- a/processor/batchprocessor/go.mod +++ b/processor/batchprocessor/go.mod @@ -92,8 +92,6 @@ replace go.opentelemetry.io/collector/processor/processorprofiles => ../processo replace go.opentelemetry.io/collector/pipeline => ../../pipeline -replace go.opentelemetry.io/collector/internal/globalsignal => ../../internal/globalsignal - replace go.opentelemetry.io/collector/extension => ../../extension replace go.opentelemetry.io/collector/config/configretry => ../../config/configretry From e52747a5f3bdb45e3e866d416a67aaf4e34429db Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Fri, 18 Oct 2024 11:17:40 -0700 Subject: [PATCH 14/14] lose the TODO --- processor/batchprocessor/batch_processor.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 3015de7710a..ecf8d7f00e5 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -376,8 +376,6 @@ func (b *shard) sendItems(trigger trigger) { if err != nil { b.processor.logger.Warn("Sender failed", zap.Error(err)) - // TODO: it seems incorrect not to call telemetry.record() - // for errors. Yes? return }