From 2ceb26e22b670041adc4dd0efa40da5bce4110ad Mon Sep 17 00:00:00 2001 From: Alex Boten <223565+codeboten@users.noreply.github.com> Date: Wed, 29 May 2024 08:33:51 -0700 Subject: [PATCH 1/4] [chore] update batch processor tests to use generated utility This updates the tests for the batch processor. This allows us to remove a bunch of custom test code. Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- .../batchprocessor/batch_processor_test.go | 267 ++++++++++++++++-- processor/batchprocessor/metrics_test.go | 203 ------------- 2 files changed, 237 insertions(+), 233 deletions(-) delete mode 100644 processor/batchprocessor/metrics_test.go diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index 5d3666a71c9..98a4f0db0d3 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -285,10 +285,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { } func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { - telemetryTest(t, testBatchProcessorSentBySizeWithMaxSize) -} - -func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry) { + tel := setupTestTelemetry() sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) sendBatchSize := 20 @@ -296,7 +293,7 @@ func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry) { cfg.SendBatchSize = uint32(sendBatchSize) cfg.SendBatchMaxSize = uint32(sendBatchMaxSize) cfg.Timeout = 500 * time.Millisecond - creationSet := tel.NewProcessorCreateSettings() + creationSet := tel.NewCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -324,11 +321,97 @@ func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry) { receivedTraces := sink.AllTraces() require.EqualValues(t, expectedBatchesNum, len(receivedTraces)) - tel.assertMetrics(t, expectedMetrics{ - sendCount: float64(expectedBatchesNum), - sendSizeSum: float64(sink.SpanCount()), - sizeTrigger: math.Floor(float64(totalSpans) / float64(sendBatchMaxSize)), - timeoutTrigger: 1, + // tel.assertMetrics(t, expectedMetrics{ + // sendCount: float64(expectedBatchesNum), + // sendSizeSum: float64(sink.SpanCount()), + // sizeTrigger: math.Floor(float64(totalSpans) / float64(sendBatchMaxSize)), + // timeoutTrigger: 1, + // }) + tel.assertMetrics(t, []metricdata.Metrics{ + { + Name: "processor_batch_batch_send_size_bytes", + Description: "Number of bytes in batch that was sent", + Unit: "By", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Count: uint64(expectedBatchesNum), + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, + 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, + 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000}, + BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Sum: int64(sink.SpanCount()), + Min: metricdata.NewExtrema(int64(sink.SpanCount() / expectedBatchesNum)), + Max: metricdata.NewExtrema(int64(sink.SpanCount() / expectedBatchesNum)), + }, + }, + }, + }, + { + Name: "processor_batch_batch_send_size", + Description: "Number of units in the batch", + Unit: "1", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Count: uint64(expectedBatchesNum), + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, + BucketCounts: []uint64{0, 1, uint64(expectedBatchesNum - 1), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Sum: int64(sink.SpanCount()), + Min: metricdata.NewExtrema(int64(sendBatchSize - 1)), + Max: metricdata.NewExtrema(int64(cfg.SendBatchMaxSize)), + }, + }, + }, + }, + { + Name: "processor_batch_batch_size_trigger_send", + Description: "Number of times the batch was sent due to a size trigger", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(expectedBatchesNum - 1), + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + }, + }, + }, + }, + { + Name: "processor_batch_timeout_trigger_send", + Description: "Number of times the batch was sent due to a timeout trigger", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + }, + }, + }, + }, + { + Name: "processor_batch_metadata_cardinality", + Description: "Number of distinct metadata value combinations being processed", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, }) } @@ -459,10 +542,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { } func TestBatchMetricProcessorBatchSize(t *testing.T) { - telemetryTest(t, testBatchMetricProcessorBatchSize) -} - -func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry) { + tel := setupTestTelemetry() sizer := &pmetric.ProtoMarshaler{} // Instantiate the batch processor with low config values to test data @@ -478,7 +558,7 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry) { dataPointsPerRequest := metricsPerRequest * dataPointsPerMetric sink := new(consumertest.MetricsSink) - creationSet := tel.NewProcessorCreateSettings() + creationSet := tel.NewCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -509,11 +589,76 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry) { } } - tel.assertMetrics(t, expectedMetrics{ - sendCount: float64(expectedBatchesNum), - sendSizeSum: float64(sink.DataPointCount()), - sendSizeBytesSum: float64(size), - sizeTrigger: 20, + tel.assertMetrics(t, []metricdata.Metrics{ + { + Name: "processor_batch_batch_send_size_bytes", + Description: "Number of bytes in batch that was sent", + Unit: "By", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Count: uint64(expectedBatchesNum), + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, + 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, + 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000}, + BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Sum: int64(size), + Min: metricdata.NewExtrema(int64(size / expectedBatchesNum)), + Max: metricdata.NewExtrema(int64(size / expectedBatchesNum)), + }, + }, + }, + }, + { + Name: "processor_batch_batch_send_size", + Description: "Number of units in the batch", + Unit: "1", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Count: uint64(expectedBatchesNum), + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, + BucketCounts: []uint64{0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Sum: int64(sink.DataPointCount()), + Min: metricdata.NewExtrema(int64(cfg.SendBatchSize)), + Max: metricdata.NewExtrema(int64(cfg.SendBatchSize)), + }, + }, + }, + }, + { + Name: "processor_batch_batch_size_trigger_send", + Description: "Number of times the batch was sent due to a size trigger", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(expectedBatchesNum), + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + }, + }, + }, + }, + { + Name: "processor_batch_metadata_cardinality", + Description: "Number of distinct metadata value combinations being processed", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, }) } @@ -778,10 +923,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { } func TestBatchLogProcessor_BatchSize(t *testing.T) { - telemetryTest(t, testBatchLogProcessorBatchSize) -} - -func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry) { + tel := setupTestTelemetry() sizer := &plog.ProtoMarshaler{} // Instantiate the batch processor with low config values to test data @@ -795,7 +937,7 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry) { logsPerRequest := 5 sink := new(consumertest.LogsSink) - creationSet := tel.NewProcessorCreateSettings() + creationSet := tel.NewCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -826,11 +968,76 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry) { } } - tel.assertMetrics(t, expectedMetrics{ - sendCount: float64(expectedBatchesNum), - sendSizeSum: float64(sink.LogRecordCount()), - sendSizeBytesSum: float64(size), - sizeTrigger: float64(expectedBatchesNum), + tel.assertMetrics(t, []metricdata.Metrics{ + { + Name: "processor_batch_batch_send_size_bytes", + Description: "Number of bytes in batch that was sent", + Unit: "By", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Count: uint64(expectedBatchesNum), + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, + 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, + 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000}, + BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Sum: int64(size), + Min: metricdata.NewExtrema(int64(size / expectedBatchesNum)), + Max: metricdata.NewExtrema(int64(size / expectedBatchesNum)), + }, + }, + }, + }, + { + Name: "processor_batch_batch_send_size", + Description: "Number of units in the batch", + Unit: "1", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Count: uint64(expectedBatchesNum), + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, + BucketCounts: []uint64{0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Sum: int64(sink.LogRecordCount()), + Min: metricdata.NewExtrema(int64(cfg.SendBatchSize)), + Max: metricdata.NewExtrema(int64(cfg.SendBatchSize)), + }, + }, + }, + }, + { + Name: "processor_batch_batch_size_trigger_send", + Description: "Number of times the batch was sent due to a size trigger", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: int64(expectedBatchesNum), + Attributes: attribute.NewSet(attribute.String("processor", "batch")), + }, + }, + }, + }, + { + Name: "processor_batch_metadata_cardinality", + Description: "Number of distinct metadata value combinations being processed", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, }) } diff --git a/processor/batchprocessor/metrics_test.go b/processor/batchprocessor/metrics_test.go deleted file mode 100644 index 0f7f4db9165..00000000000 --- a/processor/batchprocessor/metrics_test.go +++ /dev/null @@ -1,203 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package batchprocessor - -import ( - "context" - "fmt" - "math" - "net/http" - "net/http/httptest" - "testing" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - io_prometheus_client "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - otelprom "go.opentelemetry.io/otel/exporters/prometheus" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/processor" - "go.opentelemetry.io/collector/processor/processorhelper" - "go.opentelemetry.io/collector/processor/processortest" -) - -type testTelemetry struct { - promHandler http.Handler - meterProvider *sdkmetric.MeterProvider -} - -type expectedMetrics struct { - // processor_batch_batch_send_size_count - // processor_batch_batch_send_size_bytes_count - sendCount float64 - // processor_batch_batch_send_size_sum - sendSizeSum float64 - // processor_batch_batch_send_size_bytes_sum - sendSizeBytesSum float64 - // processor_batch_batch_size_trigger_send - sizeTrigger float64 - // processor_batch_batch_timeout_trigger_send - timeoutTrigger float64 -} - -func telemetryTest(t *testing.T, testFunc func(t *testing.T, tel testTelemetry)) { - testFunc(t, setupTelemetry(t)) -} - -func setupTelemetry(t *testing.T) testTelemetry { - telemetry := testTelemetry{} - - promReg := prometheus.NewRegistry() - exporter, err := otelprom.New(otelprom.WithRegisterer(promReg), otelprom.WithoutUnits(), otelprom.WithoutScopeInfo(), otelprom.WithoutCounterSuffixes()) - require.NoError(t, err) - - telemetry.meterProvider = sdkmetric.NewMeterProvider( - sdkmetric.WithResource(resource.Empty()), - sdkmetric.WithReader(exporter), - sdkmetric.WithView(batchViews()...), - ) - - telemetry.promHandler = promhttp.HandlerFor(promReg, promhttp.HandlerOpts{}) - - t.Cleanup(func() { assert.NoError(t, telemetry.meterProvider.Shutdown(context.Background())) }) - - return telemetry -} - -func (tt *testTelemetry) NewProcessorCreateSettings() processor.CreateSettings { - settings := processortest.NewNopCreateSettings() - settings.MeterProvider = tt.meterProvider - settings.ID = component.MustNewID("batch") - - return settings -} - -func (tt *testTelemetry) assertMetrics(t *testing.T, expected expectedMetrics) { - req, err := http.NewRequest(http.MethodGet, "/metrics", nil) - require.NoError(t, err) - - rr := httptest.NewRecorder() - tt.promHandler.ServeHTTP(rr, req) - - var parser expfmt.TextParser - metrics, err := parser.TextToMetricFamilies(rr.Body) - require.NoError(t, err) - - if expected.sendSizeBytesSum > 0 { - name := "processor_batch_batch_send_size_bytes" - metric := tt.getMetric(t, name, io_prometheus_client.MetricType_HISTOGRAM, metrics) - - assertFloat(t, expected.sendSizeBytesSum, metric.GetHistogram().GetSampleSum(), name) - assertFloat(t, expected.sendCount, float64(metric.GetHistogram().GetSampleCount()), name) - - tt.assertBoundaries(t, - []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, - 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, - 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000, math.Inf(1)}, - metric.GetHistogram(), - name, - ) - } - - if expected.sendSizeSum > 0 { - name := "processor_batch_batch_send_size" - metric := tt.getMetric(t, name, io_prometheus_client.MetricType_HISTOGRAM, metrics) - - assertFloat(t, expected.sendSizeSum, metric.GetHistogram().GetSampleSum(), name) - assertFloat(t, expected.sendCount, float64(metric.GetHistogram().GetSampleCount()), name) - - tt.assertBoundaries(t, - []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000, math.Inf(1)}, - metric.GetHistogram(), - name, - ) - } - - if expected.sizeTrigger > 0 { - name := "processor_batch_batch_size_trigger_send" - metric := tt.getMetric(t, name, io_prometheus_client.MetricType_COUNTER, metrics) - - assertFloat(t, expected.sizeTrigger, metric.GetCounter().GetValue(), name) - } - - if expected.timeoutTrigger > 0 { - name := "processor_batch_timeout_trigger_send" - metric := tt.getMetric(t, name, io_prometheus_client.MetricType_COUNTER, metrics) - - assertFloat(t, expected.timeoutTrigger, metric.GetCounter().GetValue(), name) - } -} - -func (tt *testTelemetry) assertBoundaries(t *testing.T, expected []float64, histogram *io_prometheus_client.Histogram, metric string) { - var got []float64 - for _, bucket := range histogram.GetBucket() { - got = append(got, bucket.GetUpperBound()) - } - - if len(expected) != len(got) { - assert.Failf(t, "different boundaries size", "metric '%s' expected boundaries '%x' but got '%x'", metric, expected, got) - return - } - - for i := range expected { - if math.Abs(expected[i]-got[i]) > 0.00001 { - assert.Failf(t, "unexpected boundary", "boundary for metric '%s' did not match, expected '%f' got '%f'", metric, expected[i], got[i]) - } - } - -} - -func (tt *testTelemetry) getMetric(t *testing.T, name string, mtype io_prometheus_client.MetricType, got map[string]*io_prometheus_client.MetricFamily) *io_prometheus_client.Metric { - metricFamily, ok := got[name] - require.True(t, ok, "expected metric '%s' not found", name) - require.Equal(t, mtype, metricFamily.GetType()) - - metric, err := getSingleMetric(metricFamily) - require.NoError(t, err) - - return metric -} - -func getSingleMetric(metric *io_prometheus_client.MetricFamily) (*io_prometheus_client.Metric, error) { - if l := len(metric.Metric); l != 1 { - return nil, fmt.Errorf("expected metric '%s' with one set of attributes, but found %d", metric.GetName(), l) - } - first := metric.Metric[0] - - if len(first.Label) != 1 || "processor" != first.Label[0].GetName() || "batch" != first.Label[0].GetValue() { - return nil, fmt.Errorf("expected metric '%s' with a single `batch=processor` attribute but got '%s'", metric.GetName(), first.GetLabel()) - } - - return first, nil -} - -func assertFloat(t *testing.T, expected, got float64, metric string) { - if math.Abs(expected-got) > 0.00001 { - assert.Failf(t, "unexpected metric value", "value for metric '%s' did not match, expected '%f' got '%f'", metric, expected, got) - } -} - -func batchViews() []sdkmetric.View { - return []sdkmetric.View{ - sdkmetric.NewView( - sdkmetric.Instrument{Name: processorhelper.BuildCustomMetricName("batch", "batch_send_size")}, - sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ - Boundaries: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, - }}, - ), - sdkmetric.NewView( - sdkmetric.Instrument{Name: processorhelper.BuildCustomMetricName("batch", "batch_send_size_bytes")}, - sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ - Boundaries: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, - 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, - 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000}, - }}, - ), - } -} From 100d2a5aadcd85a4a5fad7a2105a0fb4341b9c40 Mon Sep 17 00:00:00 2001 From: Alex Boten <223565+codeboten@users.noreply.github.com> Date: Wed, 29 May 2024 10:51:31 -0700 Subject: [PATCH 2/4] update tests Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- .../batchprocessor/batch_processor_test.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index 98a4f0db0d3..c997a0bf96d 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -286,6 +286,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { tel := setupTestTelemetry() + sizer := &ptrace.ProtoMarshaler{} sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) sendBatchSize := 20 @@ -304,8 +305,11 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { totalSpans := requestCount * spansPerRequest start := time.Now() + + sizeSum := 0 for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) + sizeSum += sizer.TracesSize(td) assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) } @@ -341,8 +345,8 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000}, - BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, - Sum: int64(sink.SpanCount()), + BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, uint64(expectedBatchesNum - 1), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Sum: int64(sizeSum), Min: metricdata.NewExtrema(int64(sink.SpanCount() / expectedBatchesNum)), Max: metricdata.NewExtrema(int64(sink.SpanCount() / expectedBatchesNum)), }, @@ -407,7 +411,8 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { IsMonotonic: false, DataPoints: []metricdata.DataPoint[int64]{ { - Value: 1, + Value: 1, + Attributes: attribute.NewSet(attribute.String("processor", "batch")), }, }, }, @@ -654,7 +659,8 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { IsMonotonic: false, DataPoints: []metricdata.DataPoint[int64]{ { - Value: 1, + Value: 1, + Attributes: attribute.NewSet(attribute.String("processor", "batch")), }, }, }, @@ -1033,7 +1039,8 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { IsMonotonic: false, DataPoints: []metricdata.DataPoint[int64]{ { - Value: 1, + Value: 1, + Attributes: attribute.NewSet(attribute.String("processor", "batch")), }, }, }, From 9f8d6781f789393d742794f507dc04bd1e816cd4 Mon Sep 17 00:00:00 2001 From: Alex Boten <223565+codeboten@users.noreply.github.com> Date: Wed, 29 May 2024 11:04:53 -0700 Subject: [PATCH 3/4] tidy Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- processor/batchprocessor/go.mod | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/processor/batchprocessor/go.mod b/processor/batchprocessor/go.mod index 7a03fb04555..4690e1a9928 100644 --- a/processor/batchprocessor/go.mod +++ b/processor/batchprocessor/go.mod @@ -3,9 +3,6 @@ module go.opentelemetry.io/collector/processor/batchprocessor go 1.21.0 require ( - github.com/prometheus/client_golang v1.19.1 - github.com/prometheus/client_model v0.6.1 - github.com/prometheus/common v0.53.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector v0.101.0 go.opentelemetry.io/collector/component v0.101.0 @@ -16,9 +13,7 @@ require ( go.opentelemetry.io/collector/pdata/testdata v0.101.0 go.opentelemetry.io/collector/processor v0.101.0 go.opentelemetry.io/otel v1.27.0 - go.opentelemetry.io/otel/exporters/prometheus v0.49.0 go.opentelemetry.io/otel/metric v1.27.0 - go.opentelemetry.io/otel/sdk v1.27.0 go.opentelemetry.io/otel/sdk/metric v1.27.0 go.opentelemetry.io/otel/trace v1.27.0 go.uber.org/goleak v1.3.0 @@ -43,7 +38,12 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_golang v1.19.1 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.53.0 // indirect github.com/prometheus/procfs v0.15.0 // indirect + go.opentelemetry.io/otel/exporters/prometheus v0.49.0 // indirect + go.opentelemetry.io/otel/sdk v1.27.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.25.0 // indirect golang.org/x/sys v0.20.0 // indirect From e49eb285cd8e3f97e3a9b4578f2243d06e8fc1eb Mon Sep 17 00:00:00 2001 From: Alex Boten <223565+codeboten@users.noreply.github.com> Date: Wed, 29 May 2024 11:54:02 -0700 Subject: [PATCH 4/4] fix test Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- .../batchprocessor/batch_processor_test.go | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index c997a0bf96d..2cb189c13c3 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -187,7 +187,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { sizeSum := 0 for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) - sizeSum += sizer.TracesSize(td) + assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) } @@ -203,6 +203,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { receivedTraces := sink.AllTraces() require.EqualValues(t, expectedBatchesNum, len(receivedTraces)) for _, td := range receivedTraces { + sizeSum += sizer.TracesSize(td) rss := td.ResourceSpans() require.Equal(t, expectedBatchingFactor, rss.Len()) for i := 0; i < expectedBatchingFactor; i++ { @@ -309,7 +310,6 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { sizeSum := 0 for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) - sizeSum += sizer.TracesSize(td) assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) } @@ -324,13 +324,19 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { require.Equal(t, totalSpans, sink.SpanCount()) receivedTraces := sink.AllTraces() require.EqualValues(t, expectedBatchesNum, len(receivedTraces)) + // we have to count the size after it was processed since splitTraces will cause some + // repeated ResourceSpan data to be sent through the processor + var min, max int + for _, td := range receivedTraces { + if min == 0 || sizer.TracesSize(td) < min { + min = sizer.TracesSize(td) + } + if sizer.TracesSize(td) > max { + max = sizer.TracesSize(td) + } + sizeSum += sizer.TracesSize(td) + } - // tel.assertMetrics(t, expectedMetrics{ - // sendCount: float64(expectedBatchesNum), - // sendSizeSum: float64(sink.SpanCount()), - // sizeTrigger: math.Floor(float64(totalSpans) / float64(sendBatchMaxSize)), - // timeoutTrigger: 1, - // }) tel.assertMetrics(t, []metricdata.Metrics{ { Name: "processor_batch_batch_send_size_bytes", @@ -347,8 +353,8 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000}, BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, uint64(expectedBatchesNum - 1), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Sum: int64(sizeSum), - Min: metricdata.NewExtrema(int64(sink.SpanCount() / expectedBatchesNum)), - Max: metricdata.NewExtrema(int64(sink.SpanCount() / expectedBatchesNum)), + Min: metricdata.NewExtrema(int64(min)), + Max: metricdata.NewExtrema(int64(max)), }, }, },