diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index 5d3666a71c9..2cb189c13c3 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -187,7 +187,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { sizeSum := 0 for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) - sizeSum += sizer.TracesSize(td) + assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) } @@ -203,6 +203,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { receivedTraces := sink.AllTraces() require.EqualValues(t, expectedBatchesNum, len(receivedTraces)) for _, td := range receivedTraces { + sizeSum += sizer.TracesSize(td) rss := td.ResourceSpans() require.Equal(t, expectedBatchingFactor, rss.Len()) for i := 0; i < expectedBatchingFactor; i++ { @@ -285,10 +286,8 @@ func TestBatchProcessorSentBySize(t *testing.T) { } func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { - telemetryTest(t, testBatchProcessorSentBySizeWithMaxSize) -} - -func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry) { + tel := setupTestTelemetry() + sizer := &ptrace.ProtoMarshaler{} sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) sendBatchSize := 20 @@ -296,7 +295,7 @@ func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry) { cfg.SendBatchSize = uint32(sendBatchSize) cfg.SendBatchMaxSize = uint32(sendBatchMaxSize) cfg.Timeout = 500 * time.Millisecond - creationSet := tel.NewProcessorCreateSettings() + creationSet := tel.NewCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -307,6 +306,8 @@ func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry) { totalSpans := requestCount * spansPerRequest start := time.Now() + + sizeSum := 0 for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) @@ -323,12 +324,105 @@ func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry) { require.Equal(t, totalSpans, sink.SpanCount()) receivedTraces := sink.AllTraces() require.EqualValues(t, expectedBatchesNum, len(receivedTraces)) + // we have to count the size after it was processed since splitTraces will cause some + // repeated ResourceSpan data to be sent through the processor + var min, max int + for _, td := range receivedTraces { + if min == 0 || sizer.TracesSize(td) < min { + min = sizer.TracesSize(td) + } + if sizer.TracesSize(td) > max { + max = sizer.TracesSize(td) + } + sizeSum += sizer.TracesSize(td) + } - tel.assertMetrics(t, expectedMetrics{ - sendCount: float64(expectedBatchesNum), - sendSizeSum: float64(sink.SpanCount()), - sizeTrigger: math.Floor(float64(totalSpans) / float64(sendBatchMaxSize)), - timeoutTrigger: 1, + tel.assertMetrics(t, []metricdata.Metrics{ + { + Name: "processor_batch_batch_send_size_bytes", + Description: "Number of bytes in batch that was sent", + Unit: "By", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Count: uint64(expectedBatchesNum), + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, + 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, + 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000}, + BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, uint64(expectedBatchesNum - 1), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Sum: int64(sizeSum), + Min: metricdata.NewExtrema(int64(min)), + Max: metricdata.NewExtrema(int64(max)), + }, + }, + }, + }, + { + Name: "processor_batch_batch_send_size", + Description: "Number of units in the batch", + Unit: "1", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Count: uint64(expectedBatchesNum), + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, + BucketCounts: []uint64{0, 1, uint64(expectedBatchesNum - 1), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Sum: int64(sink.SpanCount()), + Min: metricdata.NewExtrema(int64(sendBatchSize - 1)), + Max: metricdata.NewExtrema(int64(cfg.SendBatchMaxSize)), + }, + }, + }, + }, + { + Name: "processor_batch_batch_size_trigger_send", + Description: "Number of times the batch was sent due to a size trigger", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(expectedBatchesNum - 1), + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + }, + }, + }, + }, + { + Name: "processor_batch_timeout_trigger_send", + Description: "Number of times the batch was sent due to a timeout trigger", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + }, + }, + }, + }, + { + Name: "processor_batch_metadata_cardinality", + Description: "Number of distinct metadata value combinations being processed", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + }, + }, + }, + }, }) } @@ -459,10 +553,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { } func TestBatchMetricProcessorBatchSize(t *testing.T) { - telemetryTest(t, testBatchMetricProcessorBatchSize) -} - -func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry) { + tel := setupTestTelemetry() sizer := &pmetric.ProtoMarshaler{} // Instantiate the batch processor with low config values to test data @@ -478,7 +569,7 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry) { dataPointsPerRequest := metricsPerRequest * dataPointsPerMetric sink := new(consumertest.MetricsSink) - creationSet := tel.NewProcessorCreateSettings() + creationSet := tel.NewCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -509,11 +600,77 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry) { } } - tel.assertMetrics(t, expectedMetrics{ - sendCount: float64(expectedBatchesNum), - sendSizeSum: float64(sink.DataPointCount()), - sendSizeBytesSum: float64(size), - sizeTrigger: 20, + tel.assertMetrics(t, []metricdata.Metrics{ + { + Name: "processor_batch_batch_send_size_bytes", + Description: "Number of bytes in batch that was sent", + Unit: "By", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Count: uint64(expectedBatchesNum), + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, + 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, + 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000}, + BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Sum: int64(size), + Min: metricdata.NewExtrema(int64(size / expectedBatchesNum)), + Max: metricdata.NewExtrema(int64(size / expectedBatchesNum)), + }, + }, + }, + }, + { + Name: "processor_batch_batch_send_size", + Description: "Number of units in the batch", + Unit: "1", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Count: uint64(expectedBatchesNum), + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, + BucketCounts: []uint64{0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Sum: int64(sink.DataPointCount()), + Min: metricdata.NewExtrema(int64(cfg.SendBatchSize)), + Max: metricdata.NewExtrema(int64(cfg.SendBatchSize)), + }, + }, + }, + }, + { + Name: "processor_batch_batch_size_trigger_send", + Description: "Number of times the batch was sent due to a size trigger", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(expectedBatchesNum), + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + }, + }, + }, + }, + { + Name: "processor_batch_metadata_cardinality", + Description: "Number of distinct metadata value combinations being processed", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + }, + }, + }, + }, }) } @@ -778,10 +935,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { } func TestBatchLogProcessor_BatchSize(t *testing.T) { - telemetryTest(t, testBatchLogProcessorBatchSize) -} - -func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry) { + tel := setupTestTelemetry() sizer := &plog.ProtoMarshaler{} // Instantiate the batch processor with low config values to test data @@ -795,7 +949,7 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry) { logsPerRequest := 5 sink := new(consumertest.LogsSink) - creationSet := tel.NewProcessorCreateSettings() + creationSet := tel.NewCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -826,11 +980,77 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry) { } } - tel.assertMetrics(t, expectedMetrics{ - sendCount: float64(expectedBatchesNum), - sendSizeSum: float64(sink.LogRecordCount()), - sendSizeBytesSum: float64(size), - sizeTrigger: float64(expectedBatchesNum), + tel.assertMetrics(t, []metricdata.Metrics{ + { + Name: "processor_batch_batch_send_size_bytes", + Description: "Number of bytes in batch that was sent", + Unit: "By", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Count: uint64(expectedBatchesNum), + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, + 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, + 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000}, + BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Sum: int64(size), + Min: metricdata.NewExtrema(int64(size / expectedBatchesNum)), + Max: metricdata.NewExtrema(int64(size / expectedBatchesNum)), + }, + }, + }, + }, + { + Name: "processor_batch_batch_send_size", + Description: "Number of units in the batch", + Unit: "1", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Count: uint64(expectedBatchesNum), + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, + BucketCounts: []uint64{0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Sum: int64(sink.LogRecordCount()), + Min: metricdata.NewExtrema(int64(cfg.SendBatchSize)), + Max: metricdata.NewExtrema(int64(cfg.SendBatchSize)), + }, + }, + }, + }, + { + Name: "processor_batch_batch_size_trigger_send", + Description: "Number of times the batch was sent due to a size trigger", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(expectedBatchesNum), + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + }, + }, + }, + }, + { + Name: "processor_batch_metadata_cardinality", + Description: "Number of distinct metadata value combinations being processed", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + }, + }, + }, + }, }) } diff --git a/processor/batchprocessor/go.mod b/processor/batchprocessor/go.mod index 7a03fb04555..4690e1a9928 100644 --- a/processor/batchprocessor/go.mod +++ b/processor/batchprocessor/go.mod @@ -3,9 +3,6 @@ module go.opentelemetry.io/collector/processor/batchprocessor go 1.21.0 require ( - github.com/prometheus/client_golang v1.19.1 - github.com/prometheus/client_model v0.6.1 - github.com/prometheus/common v0.53.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector v0.101.0 go.opentelemetry.io/collector/component v0.101.0 @@ -16,9 +13,7 @@ require ( go.opentelemetry.io/collector/pdata/testdata v0.101.0 go.opentelemetry.io/collector/processor v0.101.0 go.opentelemetry.io/otel v1.27.0 - go.opentelemetry.io/otel/exporters/prometheus v0.49.0 go.opentelemetry.io/otel/metric v1.27.0 - go.opentelemetry.io/otel/sdk v1.27.0 go.opentelemetry.io/otel/sdk/metric v1.27.0 go.opentelemetry.io/otel/trace v1.27.0 go.uber.org/goleak v1.3.0 @@ -43,7 +38,12 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_golang v1.19.1 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.53.0 // indirect github.com/prometheus/procfs v0.15.0 // indirect + go.opentelemetry.io/otel/exporters/prometheus v0.49.0 // indirect + go.opentelemetry.io/otel/sdk v1.27.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.25.0 // indirect golang.org/x/sys v0.20.0 // indirect diff --git a/processor/batchprocessor/metrics_test.go b/processor/batchprocessor/metrics_test.go deleted file mode 100644 index 0f7f4db9165..00000000000 --- a/processor/batchprocessor/metrics_test.go +++ /dev/null @@ -1,203 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package batchprocessor - -import ( - "context" - "fmt" - "math" - "net/http" - "net/http/httptest" - "testing" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - io_prometheus_client "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - otelprom "go.opentelemetry.io/otel/exporters/prometheus" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/processor" - "go.opentelemetry.io/collector/processor/processorhelper" - "go.opentelemetry.io/collector/processor/processortest" -) - -type testTelemetry struct { - promHandler http.Handler - meterProvider *sdkmetric.MeterProvider -} - -type expectedMetrics struct { - // processor_batch_batch_send_size_count - // processor_batch_batch_send_size_bytes_count - sendCount float64 - // processor_batch_batch_send_size_sum - sendSizeSum float64 - // processor_batch_batch_send_size_bytes_sum - sendSizeBytesSum float64 - // processor_batch_batch_size_trigger_send - sizeTrigger float64 - // processor_batch_batch_timeout_trigger_send - timeoutTrigger float64 -} - -func telemetryTest(t *testing.T, testFunc func(t *testing.T, tel testTelemetry)) { - testFunc(t, setupTelemetry(t)) -} - -func setupTelemetry(t *testing.T) testTelemetry { - telemetry := testTelemetry{} - - promReg := prometheus.NewRegistry() - exporter, err := otelprom.New(otelprom.WithRegisterer(promReg), otelprom.WithoutUnits(), otelprom.WithoutScopeInfo(), otelprom.WithoutCounterSuffixes()) - require.NoError(t, err) - - telemetry.meterProvider = sdkmetric.NewMeterProvider( - sdkmetric.WithResource(resource.Empty()), - sdkmetric.WithReader(exporter), - sdkmetric.WithView(batchViews()...), - ) - - telemetry.promHandler = promhttp.HandlerFor(promReg, promhttp.HandlerOpts{}) - - t.Cleanup(func() { assert.NoError(t, telemetry.meterProvider.Shutdown(context.Background())) }) - - return telemetry -} - -func (tt *testTelemetry) NewProcessorCreateSettings() processor.CreateSettings { - settings := processortest.NewNopCreateSettings() - settings.MeterProvider = tt.meterProvider - settings.ID = component.MustNewID("batch") - - return settings -} - -func (tt *testTelemetry) assertMetrics(t *testing.T, expected expectedMetrics) { - req, err := http.NewRequest(http.MethodGet, "/metrics", nil) - require.NoError(t, err) - - rr := httptest.NewRecorder() - tt.promHandler.ServeHTTP(rr, req) - - var parser expfmt.TextParser - metrics, err := parser.TextToMetricFamilies(rr.Body) - require.NoError(t, err) - - if expected.sendSizeBytesSum > 0 { - name := "processor_batch_batch_send_size_bytes" - metric := tt.getMetric(t, name, io_prometheus_client.MetricType_HISTOGRAM, metrics) - - assertFloat(t, expected.sendSizeBytesSum, metric.GetHistogram().GetSampleSum(), name) - assertFloat(t, expected.sendCount, float64(metric.GetHistogram().GetSampleCount()), name) - - tt.assertBoundaries(t, - []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, - 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, - 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000, math.Inf(1)}, - metric.GetHistogram(), - name, - ) - } - - if expected.sendSizeSum > 0 { - name := "processor_batch_batch_send_size" - metric := tt.getMetric(t, name, io_prometheus_client.MetricType_HISTOGRAM, metrics) - - assertFloat(t, expected.sendSizeSum, metric.GetHistogram().GetSampleSum(), name) - assertFloat(t, expected.sendCount, float64(metric.GetHistogram().GetSampleCount()), name) - - tt.assertBoundaries(t, - []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000, math.Inf(1)}, - metric.GetHistogram(), - name, - ) - } - - if expected.sizeTrigger > 0 { - name := "processor_batch_batch_size_trigger_send" - metric := tt.getMetric(t, name, io_prometheus_client.MetricType_COUNTER, metrics) - - assertFloat(t, expected.sizeTrigger, metric.GetCounter().GetValue(), name) - } - - if expected.timeoutTrigger > 0 { - name := "processor_batch_timeout_trigger_send" - metric := tt.getMetric(t, name, io_prometheus_client.MetricType_COUNTER, metrics) - - assertFloat(t, expected.timeoutTrigger, metric.GetCounter().GetValue(), name) - } -} - -func (tt *testTelemetry) assertBoundaries(t *testing.T, expected []float64, histogram *io_prometheus_client.Histogram, metric string) { - var got []float64 - for _, bucket := range histogram.GetBucket() { - got = append(got, bucket.GetUpperBound()) - } - - if len(expected) != len(got) { - assert.Failf(t, "different boundaries size", "metric '%s' expected boundaries '%x' but got '%x'", metric, expected, got) - return - } - - for i := range expected { - if math.Abs(expected[i]-got[i]) > 0.00001 { - assert.Failf(t, "unexpected boundary", "boundary for metric '%s' did not match, expected '%f' got '%f'", metric, expected[i], got[i]) - } - } - -} - -func (tt *testTelemetry) getMetric(t *testing.T, name string, mtype io_prometheus_client.MetricType, got map[string]*io_prometheus_client.MetricFamily) *io_prometheus_client.Metric { - metricFamily, ok := got[name] - require.True(t, ok, "expected metric '%s' not found", name) - require.Equal(t, mtype, metricFamily.GetType()) - - metric, err := getSingleMetric(metricFamily) - require.NoError(t, err) - - return metric -} - -func getSingleMetric(metric *io_prometheus_client.MetricFamily) (*io_prometheus_client.Metric, error) { - if l := len(metric.Metric); l != 1 { - return nil, fmt.Errorf("expected metric '%s' with one set of attributes, but found %d", metric.GetName(), l) - } - first := metric.Metric[0] - - if len(first.Label) != 1 || "processor" != first.Label[0].GetName() || "batch" != first.Label[0].GetValue() { - return nil, fmt.Errorf("expected metric '%s' with a single `batch=processor` attribute but got '%s'", metric.GetName(), first.GetLabel()) - } - - return first, nil -} - -func assertFloat(t *testing.T, expected, got float64, metric string) { - if math.Abs(expected-got) > 0.00001 { - assert.Failf(t, "unexpected metric value", "value for metric '%s' did not match, expected '%f' got '%f'", metric, expected, got) - } -} - -func batchViews() []sdkmetric.View { - return []sdkmetric.View{ - sdkmetric.NewView( - sdkmetric.Instrument{Name: processorhelper.BuildCustomMetricName("batch", "batch_send_size")}, - sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ - Boundaries: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, - }}, - ), - sdkmetric.NewView( - sdkmetric.Instrument{Name: processorhelper.BuildCustomMetricName("batch", "batch_send_size_bytes")}, - sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ - Boundaries: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, - 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, - 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000}, - }}, - ), - } -}