diff --git a/cloudapi/config.go b/cloudapi/config.go index 397d5a7422a..f90ce89a641 100644 --- a/cloudapi/config.go +++ b/cloudapi/config.go @@ -35,6 +35,8 @@ type Config struct { // APIVersion null.Int `json:"apiVersion" envconfig:"K6_CLOUD_API_VERSION"` APIVersion null.Int `json:"-"` + // TODO: rename the config field to align to the new logic by time series + // when the migration from the version 1 is completed. MaxMetricSamplesPerPackage null.Int `json:"maxMetricSamplesPerPackage" envconfig:"K6_CLOUD_MAX_METRIC_SAMPLES_PER_PACKAGE"` // The time interval between periodic API calls for sending samples to the cloud ingest service. diff --git a/output/cloud/expv2/flush.go b/output/cloud/expv2/flush.go index 5d3e3ae90de..933ec23d6d8 100644 --- a/output/cloud/expv2/flush.go +++ b/output/cloud/expv2/flush.go @@ -16,8 +16,12 @@ type metricsFlusher struct { bq *bucketQ client pusher aggregationPeriodInSeconds uint32 + maxSeriesInSingleBatch int } +// Flush flushes the queued buckets sending them to the remote Cloud service. +// If the number of time series collected is bigger than maximum batch size than +// it splits in chunks. func (f *metricsFlusher) Flush(ctx context.Context) error { // drain the buffer buckets := f.bq.PopAll() @@ -36,10 +40,24 @@ func (f *metricsFlusher) Flush(ctx context.Context) error { msb := newMetricSetBuilder(f.referenceID, f.aggregationPeriodInSeconds) for i := 0; i < len(buckets); i++ { - msb.addTimeBucket(&buckets[i]) + msb.addTimeBucket(buckets[i]) + if len(msb.seriesIndex) < f.maxSeriesInSingleBatch { + continue + } + + // we hit the chunk size, let's flush + err := f.client.push(ctx, f.referenceID, msb.MetricSet) + if err != nil { + return err + } + msb = newMetricSetBuilder(f.referenceID, f.aggregationPeriodInSeconds) + } + + if len(msb.seriesIndex) < 1 { + return nil } - // send the MetricSet to the remote service + // send the last (or the unique) MetricSet chunk to the remote service return f.client.push(ctx, f.referenceID, msb.MetricSet) } @@ -84,7 +102,7 @@ func newMetricSetBuilder(testRunID string, aggrPeriodSec uint32) metricSetBuilde } } -func (msb *metricSetBuilder) addTimeBucket(bucket *timeBucket) { +func (msb *metricSetBuilder) addTimeBucket(bucket timeBucket) { for timeSeries, sink := range bucket.Sinks { pbmetric, ok := msb.metrics[timeSeries.Metric] if !ok { diff --git a/output/cloud/expv2/flush_test.go b/output/cloud/expv2/flush_test.go index 4978abec16c..e98c24e2e6a 100644 --- a/output/cloud/expv2/flush_test.go +++ b/output/cloud/expv2/flush_test.go @@ -1,11 +1,14 @@ package expv2 import ( + "context" + "strconv" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.k6.io/k6/metrics" + "go.k6.io/k6/output/cloud/expv2/pbcloud" ) // TODO: additional case @@ -29,7 +32,7 @@ func TestMetricSetBuilderAddTimeBucket(t *testing.T) { }, } msb := newMetricSetBuilder("testrunid-123", 1) - msb.addTimeBucket(&tb) + msb.addTimeBucket(tb) assert.Contains(t, msb.metrics, m1) require.Contains(t, msb.seriesIndex, timeSeries) @@ -38,3 +41,58 @@ func TestMetricSetBuilderAddTimeBucket(t *testing.T) { require.Len(t, msb.MetricSet.Metrics, 1) assert.Len(t, msb.MetricSet.Metrics[0].TimeSeries, 1) } + +func TestMetricsFlusherFlushChunk(t *testing.T) { + t.Parallel() + + testCases := []struct { + series int + expFlushCalls int + }{ + {series: 5, expFlushCalls: 2}, + {series: 2, expFlushCalls: 1}, + } + + r := metrics.NewRegistry() + m1 := r.MustNewMetric("metric1", metrics.Counter) + + for _, tc := range testCases { + bq := &bucketQ{} + pm := &pusherMock{} + mf := metricsFlusher{ + bq: bq, + client: pm, + maxSeriesInSingleBatch: 3, + } + + bq.buckets = make([]timeBucket, 0, tc.series) + for i := 0; i < tc.series; i++ { + ts := metrics.TimeSeries{ + Metric: m1, + Tags: r.RootTagSet().With("key1", "val"+strconv.Itoa(i)), + } + bq.Push([]timeBucket{ + { + Time: int64(i) + 1, + Sinks: map[metrics.TimeSeries]metricValue{ + ts: &counter{Sum: float64(1)}, + }, + }, + }) + } + require.Len(t, bq.buckets, tc.series) + + err := mf.Flush(context.TODO()) + require.NoError(t, err) + assert.Equal(t, tc.expFlushCalls, pm.pushCalled) + } +} + +type pusherMock struct { + pushCalled int +} + +func (pm *pusherMock) push(_ context.Context, _ string, _ *pbcloud.MetricSet) error { + pm.pushCalled++ + return nil +} diff --git a/output/cloud/expv2/output.go b/output/cloud/expv2/output.go index f157fbf3bf2..1593deb1d84 100644 --- a/output/cloud/expv2/output.go +++ b/output/cloud/expv2/output.go @@ -86,6 +86,9 @@ func (o *Output) Start() error { bq: &o.collector.bq, client: mc, aggregationPeriodInSeconds: uint32(o.config.AggregationPeriod.TimeDuration().Seconds()), + // TODO: rename the config field to align to the new logic by time series + // when the migration from the version 1 is completed. + maxSeriesInSingleBatch: int(o.config.MaxMetricSamplesPerPackage.Int64), } o.periodicInvoke(o.config.MetricPushInterval.TimeDuration(), o.flushMetrics)