From e24697eab3d183f63e179371a065a7fbb7f0b951 Mon Sep 17 00:00:00 2001 From: Alex Boten <223565+codeboten@users.noreply.github.com> Date: Mon, 27 May 2024 10:57:08 -0700 Subject: [PATCH 1/3] [batchprocessor] use mdatagen for async metric Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- processor/batchprocessor/documentation.md | 8 +++++ .../internal/metadata/generated_telemetry.go | 30 +++++++++++++++---- processor/batchprocessor/metadata.yaml | 7 +++++ processor/batchprocessor/metrics.go | 11 ++++--- 4 files changed, 47 insertions(+), 9 deletions(-) diff --git a/processor/batchprocessor/documentation.md b/processor/batchprocessor/documentation.md index 294dc29942f..c27bb1818c6 100644 --- a/processor/batchprocessor/documentation.md +++ b/processor/batchprocessor/documentation.md @@ -30,6 +30,14 @@ Number of times the batch was sent due to a size trigger | ---- | ----------- | ---------- | --------- | | 1 | Sum | Int | true | +### processor_batch_metadata_cardinality + +Number of distinct metadata value combinations being processed + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | false | + ### processor_batch_timeout_trigger_send Number of times the batch was sent due to a timeout trigger diff --git a/processor/batchprocessor/internal/metadata/generated_telemetry.go b/processor/batchprocessor/internal/metadata/generated_telemetry.go index 6fbb2b83b94..a5ad4801b4b 100644 --- a/processor/batchprocessor/internal/metadata/generated_telemetry.go +++ b/processor/batchprocessor/internal/metadata/generated_telemetry.go @@ -3,6 +3,7 @@ package metadata import ( + "context" "errors" "go.opentelemetry.io/otel/metric" @@ -24,11 +25,13 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer { // TelemetryBuilder provides an interface for components to report telemetry // as defined in metadata and user config. type TelemetryBuilder struct { - ProcessorBatchBatchSendSize metric.Int64Histogram - ProcessorBatchBatchSendSizeBytes metric.Int64Histogram - ProcessorBatchBatchSizeTriggerSend metric.Int64Counter - ProcessorBatchTimeoutTriggerSend metric.Int64Counter - level configtelemetry.Level + ProcessorBatchBatchSendSize metric.Int64Histogram + ProcessorBatchBatchSendSizeBytes metric.Int64Histogram + ProcessorBatchBatchSizeTriggerSend metric.Int64Counter + ProcessorBatchMetadataCardinality metric.Int64ObservableUpDownCounter + observeProcessorBatchMetadataCardinality func() int64 + ProcessorBatchTimeoutTriggerSend metric.Int64Counter + level configtelemetry.Level } // telemetryBuilderOption applies changes to default builder. @@ -41,6 +44,13 @@ func WithLevel(lvl configtelemetry.Level) telemetryBuilderOption { } } +// WithProcessorBatchMetadataCardinalityCallback sets callback for observable ProcessorBatchMetadataCardinality metric. +func WithProcessorBatchMetadataCardinalityCallback(cb func() int64) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.observeProcessorBatchMetadataCardinality = cb + } +} + // NewTelemetryBuilder provides a struct with methods to update all internal telemetry // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { @@ -75,6 +85,16 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...teleme metric.WithUnit("1"), ) errs = errors.Join(errs, err) + builder.ProcessorBatchMetadataCardinality, err = meter.Int64ObservableUpDownCounter( + "processor_batch_metadata_cardinality", + metric.WithDescription("Number of distinct metadata value combinations being processed"), + metric.WithUnit("1"), + metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(builder.observeProcessorBatchMetadataCardinality()) + return nil + }), + ) + errs = errors.Join(errs, err) builder.ProcessorBatchTimeoutTriggerSend, err = meter.Int64Counter( "processor_batch_timeout_trigger_send", metric.WithDescription("Number of times the batch was sent due to a timeout trigger"), diff --git a/processor/batchprocessor/metadata.yaml b/processor/batchprocessor/metadata.yaml index c90e03120e0..e9cfaaed452 100644 --- a/processor/batchprocessor/metadata.yaml +++ b/processor/batchprocessor/metadata.yaml @@ -37,3 +37,10 @@ telemetry: unit: By histogram: value_type: int + processor_batch_metadata_cardinality: + enabled: true + description: Number of distinct metadata value combinations being processed + unit: 1 + sum: + value_type: int + async: true diff --git a/processor/batchprocessor/metrics.go b/processor/batchprocessor/metrics.go index 2e803809fab..37cd9f2bed5 100644 --- a/processor/batchprocessor/metrics.go +++ b/processor/batchprocessor/metrics.go @@ -66,15 +66,18 @@ func (bpt *batchProcessorTelemetry) createOtelMetrics(set component.TelemetrySet meter = noopmetric.Meter{} } - bpt.telemetryBuilder, err = metadata.NewTelemetryBuilder(set, metadata.WithLevel(bpt.level)) + bpt.telemetryBuilder, err = metadata.NewTelemetryBuilder(set, + metadata.WithLevel(bpt.level), + metadata.WithProcessorBatchMetadataCardinalityCallback(func() int64 { return int64(currentMetadataCardinality()) }), + ) errors = multierr.Append(errors, err) bpt.batchMetadataCardinality, err = meter.Int64ObservableUpDownCounter( - processorhelper.BuildCustomMetricName(typeStr, "metadata_cardinality"), - metric.WithDescription("Number of distinct metadata value combinations being processed"), + processorhelper.BuildCustomMetricName(typeStr, ""), + metric.WithDescription(""), metric.WithUnit("1"), metric.WithInt64Callback(func(_ context.Context, obs metric.Int64Observer) error { - obs.Observe(int64(currentMetadataCardinality())) + return nil }), ) From 04c71b01dd1f68c7a0b9af105fafaee5f252fccc Mon Sep 17 00:00:00 2001 From: Alex Boten <223565+codeboten@users.noreply.github.com> Date: Tue, 28 May 2024 09:22:30 -0700 Subject: [PATCH 2/3] cleanup Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- processor/batchprocessor/go.mod | 2 +- processor/batchprocessor/metrics.go | 62 ++++++----------------------- 2 files changed, 14 insertions(+), 50 deletions(-) diff --git a/processor/batchprocessor/go.mod b/processor/batchprocessor/go.mod index ba3fa1ab4a5..7a03fb04555 100644 --- a/processor/batchprocessor/go.mod +++ b/processor/batchprocessor/go.mod @@ -22,7 +22,6 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.27.0 go.opentelemetry.io/otel/trace v1.27.0 go.uber.org/goleak v1.3.0 - go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 ) @@ -45,6 +44,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.15.0 // indirect + go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.25.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect diff --git a/processor/batchprocessor/metrics.go b/processor/batchprocessor/metrics.go index 37cd9f2bed5..4e0df628405 100644 --- a/processor/batchprocessor/metrics.go +++ b/processor/batchprocessor/metrics.go @@ -8,15 +8,11 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - noopmetric "go.opentelemetry.io/otel/metric/noop" - "go.uber.org/multierr" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/batchprocessor/internal/metadata" - "go.opentelemetry.io/collector/processor/processorhelper" ) type trigger int @@ -28,62 +24,30 @@ const ( ) type batchProcessorTelemetry struct { - level configtelemetry.Level detailed bool exportCtx context.Context - processorAttr []attribute.KeyValue - telemetryBuilder *metadata.TelemetryBuilder - batchMetadataCardinality metric.Int64ObservableUpDownCounter + processorAttr []attribute.KeyValue + telemetryBuilder *metadata.TelemetryBuilder } func newBatchProcessorTelemetry(set processor.CreateSettings, currentMetadataCardinality func() int) (*batchProcessorTelemetry, error) { - bpt := &batchProcessorTelemetry{ - processorAttr: []attribute.KeyValue{attribute.String(obsmetrics.ProcessorKey, set.ID.String())}, - exportCtx: context.Background(), - level: set.MetricsLevel, - detailed: set.MetricsLevel == configtelemetry.LevelDetailed, - } - - if err := bpt.createOtelMetrics(set.TelemetrySettings, currentMetadataCardinality); err != nil { - return nil, err - } - - return bpt, nil -} - -func (bpt *batchProcessorTelemetry) createOtelMetrics(set component.TelemetrySettings, currentMetadataCardinality func() int) error { - var ( - errors, err error - meter metric.Meter - ) - - // BatchProcessor are emitted starting from Normal level only. - if bpt.level >= configtelemetry.LevelNormal { - meter = metadata.Meter(set) - } else { - meter = noopmetric.Meter{} - } - - bpt.telemetryBuilder, err = metadata.NewTelemetryBuilder(set, - metadata.WithLevel(bpt.level), + telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings, + metadata.WithLevel(set.MetricsLevel), metadata.WithProcessorBatchMetadataCardinalityCallback(func() int64 { return int64(currentMetadataCardinality()) }), ) - errors = multierr.Append(errors, err) - - bpt.batchMetadataCardinality, err = meter.Int64ObservableUpDownCounter( - processorhelper.BuildCustomMetricName(typeStr, ""), - metric.WithDescription(""), - metric.WithUnit("1"), - metric.WithInt64Callback(func(_ context.Context, obs metric.Int64Observer) error { - return nil - }), - ) - errors = multierr.Append(errors, err) + if err != nil { + return nil, err + } - return errors + return &batchProcessorTelemetry{ + processorAttr: []attribute.KeyValue{attribute.String(obsmetrics.ProcessorKey, set.ID.String())}, + exportCtx: context.Background(), + detailed: set.MetricsLevel == configtelemetry.LevelDetailed, + telemetryBuilder: telemetryBuilder, + }, nil } func (bpt *batchProcessorTelemetry) record(trigger trigger, sent, bytes int64) { From 654d688384b2ec9046b9108634e19719d6ec3861 Mon Sep 17 00:00:00 2001 From: Alex Boten <223565+codeboten@users.noreply.github.com> Date: Mon, 27 May 2024 10:08:07 -0700 Subject: [PATCH 3/3] [batchprocessor] move tests to mdatagen This removes the custom code in the processor in favour of the generated mdatagen test code. Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- .../batchprocessor/batch_processor_test.go | 84 ++++++++++++++++--- .../internal/metadata/generated_telemetry.go | 4 +- processor/batchprocessor/metadata.yaml | 2 + 3 files changed, 78 insertions(+), 12 deletions(-) diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index 6e8033fb387..20c1270e5e5 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -13,6 +13,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component/componenttest" @@ -165,17 +167,14 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { } func TestBatchProcessorSentBySize(t *testing.T) { - telemetryTest(t, testBatchProcessorSentBySize) -} - -func testBatchProcessorSentBySize(t *testing.T, tel testTelemetry) { + tel := setupTestTelemetry() sizer := &ptrace.ProtoMarshaler{} sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) sendBatchSize := 20 cfg.SendBatchSize = uint32(sendBatchSize) cfg.Timeout = 500 * time.Millisecond - creationSet := tel.NewProcessorCreateSettings() + creationSet := tel.NewCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -211,11 +210,76 @@ func testBatchProcessorSentBySize(t *testing.T, tel testTelemetry) { } } - tel.assertMetrics(t, expectedMetrics{ - sendCount: float64(expectedBatchesNum), - sendSizeSum: float64(sink.SpanCount()), - sendSizeBytesSum: float64(sizeSum), - sizeTrigger: float64(expectedBatchesNum), + tel.assertMetrics(t, []metricdata.Metrics{ + { + Name: "processor_batch_batch_send_size_bytes", + Description: "Number of bytes in batch that was sent", + Unit: "By", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Count: uint64(expectedBatchesNum), + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, + 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, + 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000}, + BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Sum: int64(sizeSum), + Min: metricdata.NewExtrema(int64(sizeSum / expectedBatchesNum)), + Max: metricdata.NewExtrema(int64(sizeSum / expectedBatchesNum)), + }, + }, + }, + }, + { + Name: "processor_batch_batch_send_size", + Description: "Number of units in the batch", + Unit: "1", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Count: uint64(expectedBatchesNum), + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, + BucketCounts: []uint64{0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Sum: int64(sink.SpanCount()), + Min: metricdata.NewExtrema(int64(sendBatchSize)), + Max: metricdata.NewExtrema(int64(sendBatchSize)), + }, + }, + }, + }, + { + Name: "processor_batch_batch_size_trigger_send", + Description: "Number of times the batch was sent due to a size trigger", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(expectedBatchesNum), + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + }, + }, + }, + }, + { + Name: "processor_batch_metadata_cardinality", + Description: "Number of distinct metadata value combinations being processed", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, }) } diff --git a/processor/batchprocessor/internal/metadata/generated_telemetry.go b/processor/batchprocessor/internal/metadata/generated_telemetry.go index a5ad4801b4b..2289368ab0b 100644 --- a/processor/batchprocessor/internal/metadata/generated_telemetry.go +++ b/processor/batchprocessor/internal/metadata/generated_telemetry.go @@ -70,13 +70,13 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...teleme builder.ProcessorBatchBatchSendSize, err = meter.Int64Histogram( "processor_batch_batch_send_size", metric.WithDescription("Number of units in the batch"), - metric.WithUnit("1"), + metric.WithUnit("1"), metric.WithExplicitBucketBoundaries([]float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}...), ) errs = errors.Join(errs, err) builder.ProcessorBatchBatchSendSizeBytes, err = meter.Int64Histogram( "processor_batch_batch_send_size_bytes", metric.WithDescription("Number of bytes in batch that was sent"), - metric.WithUnit("By"), + metric.WithUnit("By"), metric.WithExplicitBucketBoundaries([]float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000, 200000, 300000, 400000, 500000, 600000, 700000, 800000, 900000, 1e+06, 2e+06, 3e+06, 4e+06, 5e+06, 6e+06, 7e+06, 8e+06, 9e+06}...), ) errs = errors.Join(errs, err) builder.ProcessorBatchBatchSizeTriggerSend, err = meter.Int64Counter( diff --git a/processor/batchprocessor/metadata.yaml b/processor/batchprocessor/metadata.yaml index e9cfaaed452..dd47697b850 100644 --- a/processor/batchprocessor/metadata.yaml +++ b/processor/batchprocessor/metadata.yaml @@ -31,12 +31,14 @@ telemetry: unit: 1 histogram: value_type: int + bucket_boundaries: [10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000] processor_batch_batch_send_size_bytes: enabled: true description: Number of bytes in batch that was sent unit: By histogram: value_type: int + bucket_boundaries: [10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000] processor_batch_metadata_cardinality: enabled: true description: Number of distinct metadata value combinations being processed