diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index 6e8033fb387..20c1270e5e5 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -13,6 +13,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component/componenttest" @@ -165,17 +167,14 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { } func TestBatchProcessorSentBySize(t *testing.T) { - telemetryTest(t, testBatchProcessorSentBySize) -} - -func testBatchProcessorSentBySize(t *testing.T, tel testTelemetry) { + tel := setupTestTelemetry() sizer := &ptrace.ProtoMarshaler{} sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) sendBatchSize := 20 cfg.SendBatchSize = uint32(sendBatchSize) cfg.Timeout = 500 * time.Millisecond - creationSet := tel.NewProcessorCreateSettings() + creationSet := tel.NewCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -211,11 +210,76 @@ func testBatchProcessorSentBySize(t *testing.T, tel testTelemetry) { } } - tel.assertMetrics(t, expectedMetrics{ - sendCount: float64(expectedBatchesNum), - sendSizeSum: float64(sink.SpanCount()), - sendSizeBytesSum: float64(sizeSum), - sizeTrigger: float64(expectedBatchesNum), + tel.assertMetrics(t, []metricdata.Metrics{ + { + Name: "processor_batch_batch_send_size_bytes", + Description: "Number of bytes in batch that was sent", + Unit: "By", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Count: uint64(expectedBatchesNum), + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, + 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, + 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000}, + BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Sum: int64(sizeSum), + Min: metricdata.NewExtrema(int64(sizeSum / expectedBatchesNum)), + Max: metricdata.NewExtrema(int64(sizeSum / expectedBatchesNum)), + }, + }, + }, + }, + { + Name: "processor_batch_batch_send_size", + Description: "Number of units in the batch", + Unit: "1", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Count: uint64(expectedBatchesNum), + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, + BucketCounts: []uint64{0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Sum: int64(sink.SpanCount()), + Min: metricdata.NewExtrema(int64(sendBatchSize)), + Max: metricdata.NewExtrema(int64(sendBatchSize)), + }, + }, + }, + }, + { + Name: "processor_batch_batch_size_trigger_send", + Description: "Number of times the batch was sent due to a size trigger", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(expectedBatchesNum), + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + }, + }, + }, + }, + { + Name: "processor_batch_metadata_cardinality", + Description: "Number of distinct metadata value combinations being processed", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, }) } diff --git a/processor/batchprocessor/documentation.md b/processor/batchprocessor/documentation.md index 294dc29942f..c27bb1818c6 100644 --- a/processor/batchprocessor/documentation.md +++ b/processor/batchprocessor/documentation.md @@ -30,6 +30,14 @@ Number of times the batch was sent due to a size trigger | ---- | ----------- | ---------- | --------- | | 1 | Sum | Int | true | +### processor_batch_metadata_cardinality + +Number of distinct metadata value combinations being processed + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | false | + ### processor_batch_timeout_trigger_send Number of times the batch was sent due to a timeout trigger diff --git a/processor/batchprocessor/go.mod b/processor/batchprocessor/go.mod index ba3fa1ab4a5..7a03fb04555 100644 --- a/processor/batchprocessor/go.mod +++ b/processor/batchprocessor/go.mod @@ -22,7 +22,6 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.27.0 go.opentelemetry.io/otel/trace v1.27.0 go.uber.org/goleak v1.3.0 - go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 ) @@ -45,6 +44,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.15.0 // indirect + go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.25.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect diff --git a/processor/batchprocessor/internal/metadata/generated_telemetry.go b/processor/batchprocessor/internal/metadata/generated_telemetry.go index 6fbb2b83b94..2289368ab0b 100644 --- a/processor/batchprocessor/internal/metadata/generated_telemetry.go +++ b/processor/batchprocessor/internal/metadata/generated_telemetry.go @@ -3,6 +3,7 @@ package metadata import ( + "context" "errors" "go.opentelemetry.io/otel/metric" @@ -24,11 +25,13 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer { // TelemetryBuilder provides an interface for components to report telemetry // as defined in metadata and user config. type TelemetryBuilder struct { - ProcessorBatchBatchSendSize metric.Int64Histogram - ProcessorBatchBatchSendSizeBytes metric.Int64Histogram - ProcessorBatchBatchSizeTriggerSend metric.Int64Counter - ProcessorBatchTimeoutTriggerSend metric.Int64Counter - level configtelemetry.Level + ProcessorBatchBatchSendSize metric.Int64Histogram + ProcessorBatchBatchSendSizeBytes metric.Int64Histogram + ProcessorBatchBatchSizeTriggerSend metric.Int64Counter + ProcessorBatchMetadataCardinality metric.Int64ObservableUpDownCounter + observeProcessorBatchMetadataCardinality func() int64 + ProcessorBatchTimeoutTriggerSend metric.Int64Counter + level configtelemetry.Level } // telemetryBuilderOption applies changes to default builder. @@ -41,6 +44,13 @@ func WithLevel(lvl configtelemetry.Level) telemetryBuilderOption { } } +// WithProcessorBatchMetadataCardinalityCallback sets callback for observable ProcessorBatchMetadataCardinality metric. +func WithProcessorBatchMetadataCardinalityCallback(cb func() int64) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.observeProcessorBatchMetadataCardinality = cb + } +} + // NewTelemetryBuilder provides a struct with methods to update all internal telemetry // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { @@ -60,13 +70,13 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...teleme builder.ProcessorBatchBatchSendSize, err = meter.Int64Histogram( "processor_batch_batch_send_size", metric.WithDescription("Number of units in the batch"), - metric.WithUnit("1"), + metric.WithUnit("1"), metric.WithExplicitBucketBoundaries([]float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}...), ) errs = errors.Join(errs, err) builder.ProcessorBatchBatchSendSizeBytes, err = meter.Int64Histogram( "processor_batch_batch_send_size_bytes", metric.WithDescription("Number of bytes in batch that was sent"), - metric.WithUnit("By"), + metric.WithUnit("By"), metric.WithExplicitBucketBoundaries([]float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000, 200000, 300000, 400000, 500000, 600000, 700000, 800000, 900000, 1e+06, 2e+06, 3e+06, 4e+06, 5e+06, 6e+06, 7e+06, 8e+06, 9e+06}...), ) errs = errors.Join(errs, err) builder.ProcessorBatchBatchSizeTriggerSend, err = meter.Int64Counter( @@ -75,6 +85,16 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...teleme metric.WithUnit("1"), ) errs = errors.Join(errs, err) + builder.ProcessorBatchMetadataCardinality, err = meter.Int64ObservableUpDownCounter( + "processor_batch_metadata_cardinality", + metric.WithDescription("Number of distinct metadata value combinations being processed"), + metric.WithUnit("1"), + metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(builder.observeProcessorBatchMetadataCardinality()) + return nil + }), + ) + errs = errors.Join(errs, err) builder.ProcessorBatchTimeoutTriggerSend, err = meter.Int64Counter( "processor_batch_timeout_trigger_send", metric.WithDescription("Number of times the batch was sent due to a timeout trigger"), diff --git a/processor/batchprocessor/metadata.yaml b/processor/batchprocessor/metadata.yaml index c90e03120e0..dd47697b850 100644 --- a/processor/batchprocessor/metadata.yaml +++ b/processor/batchprocessor/metadata.yaml @@ -31,9 +31,18 @@ telemetry: unit: 1 histogram: value_type: int + bucket_boundaries: [10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000] processor_batch_batch_send_size_bytes: enabled: true description: Number of bytes in batch that was sent unit: By histogram: value_type: int + bucket_boundaries: [10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000] + processor_batch_metadata_cardinality: + enabled: true + description: Number of distinct metadata value combinations being processed + unit: 1 + sum: + value_type: int + async: true diff --git a/processor/batchprocessor/metrics.go b/processor/batchprocessor/metrics.go index 2e803809fab..4e0df628405 100644 --- a/processor/batchprocessor/metrics.go +++ b/processor/batchprocessor/metrics.go @@ -8,15 +8,11 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - noopmetric "go.opentelemetry.io/otel/metric/noop" - "go.uber.org/multierr" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/batchprocessor/internal/metadata" - "go.opentelemetry.io/collector/processor/processorhelper" ) type trigger int @@ -28,59 +24,30 @@ const ( ) type batchProcessorTelemetry struct { - level configtelemetry.Level detailed bool exportCtx context.Context - processorAttr []attribute.KeyValue - telemetryBuilder *metadata.TelemetryBuilder - batchMetadataCardinality metric.Int64ObservableUpDownCounter + processorAttr []attribute.KeyValue + telemetryBuilder *metadata.TelemetryBuilder } func newBatchProcessorTelemetry(set processor.CreateSettings, currentMetadataCardinality func() int) (*batchProcessorTelemetry, error) { - bpt := &batchProcessorTelemetry{ - processorAttr: []attribute.KeyValue{attribute.String(obsmetrics.ProcessorKey, set.ID.String())}, - exportCtx: context.Background(), - level: set.MetricsLevel, - detailed: set.MetricsLevel == configtelemetry.LevelDetailed, - } - - if err := bpt.createOtelMetrics(set.TelemetrySettings, currentMetadataCardinality); err != nil { - return nil, err - } - - return bpt, nil -} - -func (bpt *batchProcessorTelemetry) createOtelMetrics(set component.TelemetrySettings, currentMetadataCardinality func() int) error { - var ( - errors, err error - meter metric.Meter + telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings, + metadata.WithLevel(set.MetricsLevel), + metadata.WithProcessorBatchMetadataCardinalityCallback(func() int64 { return int64(currentMetadataCardinality()) }), ) - // BatchProcessor are emitted starting from Normal level only. - if bpt.level >= configtelemetry.LevelNormal { - meter = metadata.Meter(set) - } else { - meter = noopmetric.Meter{} + if err != nil { + return nil, err } - bpt.telemetryBuilder, err = metadata.NewTelemetryBuilder(set, metadata.WithLevel(bpt.level)) - errors = multierr.Append(errors, err) - - bpt.batchMetadataCardinality, err = meter.Int64ObservableUpDownCounter( - processorhelper.BuildCustomMetricName(typeStr, "metadata_cardinality"), - metric.WithDescription("Number of distinct metadata value combinations being processed"), - metric.WithUnit("1"), - metric.WithInt64Callback(func(_ context.Context, obs metric.Int64Observer) error { - obs.Observe(int64(currentMetadataCardinality())) - return nil - }), - ) - errors = multierr.Append(errors, err) - - return errors + return &batchProcessorTelemetry{ + processorAttr: []attribute.KeyValue{attribute.String(obsmetrics.ProcessorKey, set.ID.String())}, + exportCtx: context.Background(), + detailed: set.MetricsLevel == configtelemetry.LevelDetailed, + telemetryBuilder: telemetryBuilder, + }, nil } func (bpt *batchProcessorTelemetry) record(trigger trigger, sent, bytes int64) {