Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(batchprocessor): Add tracing support #11324

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/batch-tracing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: batchprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add tracing instrumentation to the batch processor component.

# One or more tracking issues or pull requests related to the change
issues: [11308]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
9 changes: 9 additions & 0 deletions processor/batchprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,12 @@ metadata-key values.

The number of batch processors currently in use is exported as the
`otelcol_processor_batch_metadata_cardinality` metric.

## Tracing support

This processor is traced using a span named `batch_processor/export`
for each exported batch. If the batch consists of a single parent
context, the context will be used and a child span created. If the
batch consists of multiple parent contexts, then a new root span is
created and Span links are created in both directions to maintain
connectivity between the involved traces.
177 changes: 149 additions & 28 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"go.opentelemetry.io/collector/client"
Expand All @@ -24,6 +25,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/batchprocessor/internal/metadata"
)

// errTooManyBatchers is returned when the MetadataCardinalityLimit has been reached.
Expand Down Expand Up @@ -63,6 +65,9 @@ type batchProcessor struct {

// batcher will be either *singletonBatcher or *multiBatcher
batcher batcher

// tracer is the configured tracer
tracer trace.Tracer
}

// batcher is describes a *singletonBatcher or *multiBatcher.
Expand Down Expand Up @@ -93,11 +98,30 @@ type shard struct {
timer *time.Timer

// newItem is used to receive data items from producers.
newItem chan any
newItem chan dataItem

// batch is an in-flight data item containing one of the
// underlying data types.
batch batch

// pending describes the contributors to the current batch.
pending []pendingItem
}

// pendingItem is stored parallel to a pending batch and records how
// many items the waiter submitted, used to match trace contexts with
// batches. It is also used inside sendItems() to represent a
// partially complete batch.
type pendingItem struct {
parentCtx context.Context
jmacd marked this conversation as resolved.
Show resolved Hide resolved
numItems int
}

// dataItem is exchanged between the waiter and the batching process
// includes the pendingItem and its data.
type dataItem struct {
data any
parentCtx context.Context
}

// batch is an interface generalizing the individual signal types.
Expand All @@ -112,7 +136,7 @@ type batch interface {
itemCount() int

// add item to the current batch
add(item any)
add(item any) int

// sizeBytes counts the OTLP encoding size of the batch
sizeBytes(item any) int
Expand Down Expand Up @@ -140,6 +164,7 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat
shutdownC: make(chan struct{}, 1),
metadataKeys: mks,
metadataLimit: int(cfg.MetadataCardinalityLimit),
tracer: metadata.Tracer(set.TelemetrySettings),
}
if len(bp.metadataKeys) == 0 {
bp.batcher = &singleShardBatcher{
Expand Down Expand Up @@ -168,7 +193,7 @@ func (bp *batchProcessor) newShard(md map[string][]string) *shard {
})
b := &shard{
processor: bp,
newItem: make(chan any, runtime.NumCPU()),
newItem: make(chan dataItem, runtime.NumCPU()),
exportCtx: exportCtx,
batch: bp.batchFunc(),
}
Expand Down Expand Up @@ -228,9 +253,11 @@ func (b *shard) startLoop() {
}
return
case item := <-b.newItem:
if item == nil {
if item.data == nil {
continue
}
// Important invariant. processItem() must return with the pending
// number of items less than the minimum batch size.
b.processItem(item)
case <-timerCh:
if b.batch.itemCount() > 0 {
Expand All @@ -241,8 +268,20 @@ func (b *shard) startLoop() {
}
}

func (b *shard) processItem(item any) {
b.batch.add(item)
func (b *shard) processItem(item dataItem) {
totalItems := b.batch.add(item.data)

b.pending = append(b.pending, pendingItem{
parentCtx: item.parentCtx,
numItems: totalItems,
})

// The call to flushItems() is necessary to maintain the invariant that
// after this call returns, the pending data is less than a full batch.
b.flushItems()
}

func (b *shard) flushItems() {
sent := false
for b.batch.itemCount() > 0 && (!b.hasTimer() || b.batch.itemCount() >= b.processor.sendBatchSize) {
sent = true
Expand Down Expand Up @@ -272,20 +311,105 @@ func (b *shard) resetTimer() {
}

func (b *shard) sendItems(trigger trigger) {
// Note because of the invariant stated for processItems, we know the
// number of current waiters exceeds the batch by at most one entry.
// Therefore, we can enumerate the possibilities.
//
// 1. len(b.pending) == 1 where the item count of element[0] is <= batch size
// 2. len(b.pending) == 1 where the item count of element[0] is > batch size
// 3. len(b.pending) == N where N>1 and the item count in elements[0:N-1] is < batch size
//
// Importantly, in all cases, the batch will include a portion
// of data from all contexts in the pending slice. In case 2
// there is always a single residual pendingItem. In case 3 there
// may or may not be a single residential pendingItem.

pendingSize := b.batch.itemCount()
sent, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize)

err := b.batch.export(b.exportCtx, req)
var rootCtx context.Context

// If the portion being sent belongs to the first item in the pending list,
// then there is a single context we can use.
if sent <= b.pending[0].numItems {
rootCtx = b.pending[0].parentCtx
} else {
rootCtx = b.exportCtx
}
ctx, parent := b.processor.tracer.Start(rootCtx, "batch_processor/export")
defer parent.End()

// Note: linking spans in both directions. This could be
// inferred by the trace backend, but this adds helpful
// information in cases where sampling may break links. See
// https://github.com/open-telemetry/opentelemetry-specification/issues/1877
// Note that there is a possibility that the span has already
// ended (if EarlyReturn or context canceled), in which case
// this becomes a no-op.
parentSpanContext := parent.SpanContext()
for _, pending := range b.pending {
span := trace.SpanFromContext(pending.parentCtx)
span.AddLink(trace.Link{SpanContext: parentSpanContext})
parent.AddLink(trace.Link{SpanContext: span.SpanContext()})
}

err := b.batch.export(ctx, req)

// Remember the last pending parent context, we may need it.
finalParent := b.pending[len(b.pending)-1].parentCtx
// Clear the array to permit GC of finished contexts while
// re-using the slice instead of a new allocation.
for i := range b.pending {
b.pending[i] = pendingItem{}
}
// There is either one or zero items left in the pending slice,
// according to the invariant discussed above.
if pendingSize == sent {
// The batch is fully sent, pending slice is clear.
b.pending = b.pending[:0]
} else {
// The batch is fully sent, pending slice keeps one item.
b.pending = b.pending[:1]
b.pending[0].parentCtx = finalParent
b.pending[0].numItems = pendingSize - sent
}

if err != nil {
b.processor.logger.Warn("Sender failed", zap.Error(err))
return
}

var bytes int
if b.processor.telemetry.detailed {
bytes = b.batch.sizeBytes(req)
}
b.processor.telemetry.record(trigger, int64(sent), int64(bytes))
}

func (b *shard) consumeBatch(ctx context.Context, data any) error {
var itemCount int
switch telem := data.(type) {
case ptrace.Traces:
itemCount = telem.SpanCount()
case pmetric.Metrics:
itemCount = telem.DataPointCount()
case plog.Logs:
itemCount = telem.LogRecordCount()
}

if itemCount == 0 {
return nil
}
jmacd marked this conversation as resolved.
Show resolved Hide resolved

item := dataItem{
data: data,
parentCtx: ctx,
}

b.newItem <- item
return nil
}

// singleShardBatcher is used when metadataKeys is empty, to avoid the
// additional lock and map operations used in multiBatcher.
type singleShardBatcher struct {
Expand All @@ -299,9 +423,8 @@ func (sb *singleShardBatcher) start(context.Context) error {
return nil
}

func (sb *singleShardBatcher) consume(_ context.Context, data any) error {
sb.single.newItem <- data
return nil
func (sb *singleShardBatcher) consume(ctx context.Context, data any) error {
return sb.single.consumeBatch(ctx, data)
}

func (sb *singleShardBatcher) currentMetadataCardinality() int {
Expand Down Expand Up @@ -362,8 +485,7 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
}
mb.lock.Unlock()
}
b.(*shard).newItem <- data
return nil
return b.(*shard).consumeBatch(ctx, data)
}

func (mb *multiShardBatcher) currentMetadataCardinality() int {
Expand Down Expand Up @@ -414,15 +536,14 @@ func newBatchTraces(nextConsumer consumer.Traces) *batchTraces {
}

// add updates current batchTraces by adding new TraceData object
func (bt *batchTraces) add(item any) {
func (bt *batchTraces) add(item any) int {
td := item.(ptrace.Traces)
newSpanCount := td.SpanCount()
if newSpanCount == 0 {
return
if newSpanCount != 0 {
bt.spanCount += newSpanCount
td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans())
}

bt.spanCount += newSpanCount
td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans())
return newSpanCount
}

func (bt *batchTraces) sizeBytes(data any) int {
Expand Down Expand Up @@ -495,15 +616,15 @@ func (bm *batchMetrics) itemCount() int {
return bm.dataPointCount
}

func (bm *batchMetrics) add(item any) {
func (bm *batchMetrics) add(item any) int {
md := item.(pmetric.Metrics)

newDataPointCount := md.DataPointCount()
if newDataPointCount == 0 {
return
if newDataPointCount != 0 {
bm.dataPointCount += newDataPointCount
md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics())
}
bm.dataPointCount += newDataPointCount
md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics())
return newDataPointCount
}

type batchLogs struct {
Expand Down Expand Up @@ -547,13 +668,13 @@ func (bl *batchLogs) itemCount() int {
return bl.logCount
}

func (bl *batchLogs) add(item any) {
func (bl *batchLogs) add(item any) int {
ld := item.(plog.Logs)

newLogsCount := ld.LogRecordCount()
if newLogsCount == 0 {
return
if newLogsCount != 0 {
bl.logCount += newLogsCount
ld.ResourceLogs().MoveAndAppendTo(bl.logData.ResourceLogs())
}
bl.logCount += newLogsCount
ld.ResourceLogs().MoveAndAppendTo(bl.logData.ResourceLogs())
return newLogsCount
}
Loading
Loading