Skip to content

Commit

Permalink
feat(processors.batch): fix bug and simplify tests
Browse files Browse the repository at this point in the history
  • Loading branch information
LarsStegman committed Sep 11, 2024
1 parent 3d63787 commit d94682e
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 42 deletions.
5 changes: 2 additions & 3 deletions plugins/processors/batch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ This processor groups metrics into batches by adding a batch tag. This is
useful for parallel processing of metrics where downstream processors,
aggregators or outputs can then select a batch using `tagpass` or `metricpass`.

Metrics are distributed across batches using the round-robin scheme.

## Global configuration options <!-- @/docs/includes/plugin_config.md -->

In addition to the plugin-specific configuration settings, plugins support
Expand All @@ -17,9 +19,6 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

```toml @sample.conf
## Batch metrics into separate batches by adding a tag indicating the batch index.
## Can be used to route batches of metrics to different outputs
## to parallelize writing of metrics to an output
## Metrics are distributed across batches using the round-robin scheme.
[[processors.batch]]
## The name of the tag to use for adding the batch index
batch_tag = "my_batch"
Expand Down
1 change: 1 addition & 0 deletions plugins/processors/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func (b *Batch) Apply(in ...telegraf.Metric) []telegraf.Metric {
out := make([]telegraf.Metric, 0, len(in))
for _, m := range in {
if b.SkipExisting && m.HasTag(b.BatchTag) {
out = append(out, m)
continue
}

Expand Down
113 changes: 77 additions & 36 deletions plugins/processors/batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,11 @@ import (

const batchTag = "?internal_batch_idx"

func MakeBatching(batches uint64) *Batch {
return &Batch{
func Test_SingleMetricPutInBatch0(t *testing.T) {
b := &Batch{
BatchTag: batchTag,
NumBatches: batches,
NumBatches: 1,
}
}

func MakeXMetrics(count int) []telegraf.Metric {
ms := make([]telegraf.Metric, 0, count)
for range count {
ms = append(ms, testutil.MockMetrics()...)
}

return ms
}

func requireMetricInBatch(t *testing.T, m telegraf.Metric, batch string) {
batchTagValue, ok := m.GetTag(batchTag)
require.True(t, ok)
require.Equal(t, batch, batchTagValue)
}

func Test_SingleMetricPutInBatch0(t *testing.T) {
b := MakeBatching(1)
m := testutil.MockMetricsWithValue(1)
expectedM := testutil.MockMetricsWithValue(1)
expectedM[0].AddTag(batchTag, "0")
Expand All @@ -42,28 +23,88 @@ func Test_SingleMetricPutInBatch0(t *testing.T) {
}

func Test_MetricsSmallerThanBatchSizeAreInDifferentBatches(t *testing.T) {
b := MakeBatching(3)
ms := MakeXMetrics(2)
b := &Batch{
BatchTag: batchTag,
NumBatches: 3,
}

ms := make([]telegraf.Metric, 0, 2)
for range cap(ms) {
ms = append(ms, testutil.MockMetrics()...)
}

res := b.Apply(ms...)
requireMetricInBatch(t, res[0], "0")
requireMetricInBatch(t, res[1], "1")

batchTagValue, ok := res[0].GetTag(batchTag)
require.True(t, ok)
require.Equal(t, "0", batchTagValue)

batchTagValue, ok = res[1].GetTag(batchTag)
require.True(t, ok)
require.Equal(t, "1", batchTagValue)
}

func Test_MetricsEqualToBatchSizeInDifferentBatches(t *testing.T) {
b := MakeBatching(3)
ms := MakeXMetrics(3)
b := &Batch{
BatchTag: batchTag,
NumBatches: 3,
}

ms := make([]telegraf.Metric, 0, 3)
for range cap(ms) {
ms = append(ms, testutil.MockMetrics()...)
}

res := b.Apply(ms...)
requireMetricInBatch(t, res[0], "0")
requireMetricInBatch(t, res[1], "1")
requireMetricInBatch(t, res[2], "2")
batchTagValue, ok := res[0].GetTag(batchTag)
require.True(t, ok)
require.Equal(t, "0", batchTagValue)

batchTagValue, ok = res[1].GetTag(batchTag)
require.True(t, ok)
require.Equal(t, "1", batchTagValue)

batchTagValue, ok = res[2].GetTag(batchTag)
require.True(t, ok)
require.Equal(t, "2", batchTagValue)
}

func Test_MetricsMoreThanBatchSizeInSameBatch(t *testing.T) {
b := MakeBatching(2)
ms := MakeXMetrics(3)
b := &Batch{
BatchTag: batchTag,
NumBatches: 2,
}

ms := make([]telegraf.Metric, 0, 3)
for range cap(ms) {
ms = append(ms, testutil.MockMetrics()...)
}

res := b.Apply(ms...)
batchTagValue, ok := res[0].GetTag(batchTag)
require.True(t, ok)
require.Equal(t, "0", batchTagValue)

batchTagValue, ok = res[1].GetTag(batchTag)
require.True(t, ok)
require.Equal(t, "1", batchTagValue)

batchTagValue, ok = res[2].GetTag(batchTag)
require.True(t, ok)
require.Equal(t, "0", batchTagValue)
}

func Test_MetricWithExistingTagNotChanged(t *testing.T) {
b := &Batch{
BatchTag: batchTag,
NumBatches: 2,
SkipExisting: true,
}

requireMetricInBatch(t, res[0], "0")
requireMetricInBatch(t, res[1], "1")
requireMetricInBatch(t, res[2], "0")
m := testutil.MockMetricsWithValue(1)
m[0].AddTag(batchTag, "4")
res := b.Apply(m...)
tv, ok := res[0].GetTag(batchTag)
require.True(t, ok)
require.Equal(t, "4", tv)
}
3 changes: 0 additions & 3 deletions plugins/processors/batch/sample.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
## Batch metrics into separate batches by adding a tag indicating the batch index.
## Can be used to route batches of metrics to different outputs
## to parallelize writing of metrics to an output
## Metrics are distributed across batches using the round-robin scheme.
[[processors.batch]]
## The name of the tag to use for adding the batch index
batch_tag = "my_batch"
Expand Down

0 comments on commit d94682e

Please sign in to comment.