From a8490ff58262f107cb04a40ce0b70dd4ce6834ab Mon Sep 17 00:00:00 2001 From: Lars Stegman Date: Wed, 11 Sep 2024 11:18:18 +0100 Subject: [PATCH 1/7] feat(processors.batch): create batch processor --- plugins/processors/all/batch.go | 5 ++ plugins/processors/batch/README.md | 56 +++++++++++++++++++++ plugins/processors/batch/batch.go | 41 +++++++++++++++ plugins/processors/batch/batch_test.go | 69 ++++++++++++++++++++++++++ plugins/processors/batch/sample.conf | 10 ++++ 5 files changed, 181 insertions(+) create mode 100644 plugins/processors/all/batch.go create mode 100644 plugins/processors/batch/README.md create mode 100644 plugins/processors/batch/batch.go create mode 100644 plugins/processors/batch/batch_test.go create mode 100644 plugins/processors/batch/sample.conf diff --git a/plugins/processors/all/batch.go b/plugins/processors/all/batch.go new file mode 100644 index 0000000000000..65d4e677bf1dd --- /dev/null +++ b/plugins/processors/all/batch.go @@ -0,0 +1,5 @@ +//go:build !custom || processors || processors.batch + +package all + +import _ "github.com/influxdata/telegraf/plugins/processors/batch" // register plugin diff --git a/plugins/processors/batch/README.md b/plugins/processors/batch/README.md new file mode 100644 index 0000000000000..3a1f0f8b06a95 --- /dev/null +++ b/plugins/processors/batch/README.md @@ -0,0 +1,56 @@ +# Batch Processor Plugin + +The batch processor batches metrics into +batches by adding a batch tag to the metrics. + +## Global configuration options + +In addition to the plugin-specific configuration settings, plugins support +additional global and plugin configuration settings. These settings are used to +modify metrics, tags, and field or create aliases and configure ordering, etc. +See the [CONFIGURATION.md][CONFIGURATION.md] for more details. + +[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins + +## Configuration + +```toml @sample.conf +## Batch metrics into separate batches by adding a tag indicating the batch index. +## Can be used to route batches of metrics to different outputs +## to parallelize writing of metrics to an output +## Metrics are distributed across batches using the round-robin scheme. +[[processors.batch]] + ## The name of the tag to use for adding the batch index + batch_tag = "my_batch" + + ## The number of batches to create + num_batches = 16 +``` + +## Example + +The example below uses these settings: + +```toml +[[processors.batch]] + ## The tag key to use for batching + batch_tag = "batch" + + ## The number of batches to create + num_batches = 3 +``` + +```diff +- temperature cpu=25 +- temperature cpu=50 +- temperature cpu=75 +- temperature cpu=25 +- temperature cpu=50 +- temperature cpu=75 ++ temperature,batch=0 cpu=25 ++ temperature,batch=1 cpu=50 ++ temperature,batch=2 cpu=75 ++ temperature,batch=0 cpu=25 ++ temperature,batch=1 cpu=50 ++ temperature,batch=2 cpu=75 +``` diff --git a/plugins/processors/batch/batch.go b/plugins/processors/batch/batch.go new file mode 100644 index 0000000000000..f23d82e143f38 --- /dev/null +++ b/plugins/processors/batch/batch.go @@ -0,0 +1,41 @@ +package batch + +import ( + _ "embed" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" + "strconv" +) + +//go:embed sample.conf +var sampleConfig string + +type Batch struct { + BatchTag string `toml:"batch_tag"` + NumBatches uint64 `toml:"num_batches"` + + // the number of metrics that have been processed so far + count uint64 +} + +func (*Batch) SampleConfig() string { + return sampleConfig +} + +func (b *Batch) Apply(in ...telegraf.Metric) []telegraf.Metric { + out := make([]telegraf.Metric, 0, len(in)) + for _, m := range in { + batchId := b.count % b.NumBatches + b.count++ + m.AddTag(b.BatchTag, strconv.FormatUint(batchId, 10)) + out = append(out, m) + } + + return out +} + +func init() { + processors.Add("batch", func() telegraf.Processor { + return &Batch{} + }) +} diff --git a/plugins/processors/batch/batch_test.go b/plugins/processors/batch/batch_test.go new file mode 100644 index 0000000000000..30f9f15422649 --- /dev/null +++ b/plugins/processors/batch/batch_test.go @@ -0,0 +1,69 @@ +package batch + +import ( + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "testing" +) + +const batchTag = "?internal_batch_idx" + +func MakeBatching(batches uint64) *Batch { + return &Batch{ + BatchTag: batchTag, + NumBatches: batches, + } +} + +func MakeXMetrics(count int) []telegraf.Metric { + ms := make([]telegraf.Metric, 0, count) + for range count { + ms = append(ms, testutil.MockMetrics()...) + } + + return ms +} + +func requireMetricInBatch(t *testing.T, m telegraf.Metric, batch string) { + batchTagValue, ok := m.GetTag(batchTag) + require.True(t, ok) + require.Equal(t, batch, batchTagValue) +} + +func Test_SingleMetricPutInBatch0(t *testing.T) { + b := MakeBatching(1) + m := testutil.MockMetricsWithValue(1) + expectedM := testutil.MockMetricsWithValue(1) + expectedM[0].AddTag(batchTag, "0") + + res := b.Apply(m...) + testutil.RequireMetricsEqual(t, expectedM, res) +} + +func Test_MetricsSmallerThanBatchSizeAreInDifferentBatches(t *testing.T) { + b := MakeBatching(3) + ms := MakeXMetrics(2) + res := b.Apply(ms...) + requireMetricInBatch(t, res[0], "0") + requireMetricInBatch(t, res[1], "1") +} + +func Test_MetricsEqualToBatchSizeInDifferentBatches(t *testing.T) { + b := MakeBatching(3) + ms := MakeXMetrics(3) + res := b.Apply(ms...) + requireMetricInBatch(t, res[0], "0") + requireMetricInBatch(t, res[1], "1") + requireMetricInBatch(t, res[2], "2") +} + +func Test_MetricsMoreThanBatchSizeInSameBatch(t *testing.T) { + b := MakeBatching(2) + ms := MakeXMetrics(3) + res := b.Apply(ms...) + + requireMetricInBatch(t, res[0], "0") + requireMetricInBatch(t, res[1], "1") + requireMetricInBatch(t, res[2], "0") +} diff --git a/plugins/processors/batch/sample.conf b/plugins/processors/batch/sample.conf new file mode 100644 index 0000000000000..1d3ebe73a3dff --- /dev/null +++ b/plugins/processors/batch/sample.conf @@ -0,0 +1,10 @@ +## Batch metrics into separate batches by adding a tag indicating the batch index. +## Can be used to route batches of metrics to different outputs +## to parallelize writing of metrics to an output +## Metrics are distributed across batches using the round-robin scheme. +[[processors.batch]] + ## The name of the tag to use for adding the batch index + batch_tag = "my_batch" + + ## The number of batches to create + num_batches = 16 From 4209ee54e327c8f7e690281fb40e528537dbb46a Mon Sep 17 00:00:00 2001 From: Lars Stegman Date: Wed, 11 Sep 2024 13:29:01 +0100 Subject: [PATCH 2/7] feat(processors.batch): use atomic uint64 --- plugins/processors/batch/batch.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/plugins/processors/batch/batch.go b/plugins/processors/batch/batch.go index f23d82e143f38..156cf57d6f071 100644 --- a/plugins/processors/batch/batch.go +++ b/plugins/processors/batch/batch.go @@ -5,6 +5,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/processors" "strconv" + "sync/atomic" ) //go:embed sample.conf @@ -15,7 +16,7 @@ type Batch struct { NumBatches uint64 `toml:"num_batches"` // the number of metrics that have been processed so far - count uint64 + count atomic.Uint64 } func (*Batch) SampleConfig() string { @@ -25,8 +26,8 @@ func (*Batch) SampleConfig() string { func (b *Batch) Apply(in ...telegraf.Metric) []telegraf.Metric { out := make([]telegraf.Metric, 0, len(in)) for _, m := range in { - batchId := b.count % b.NumBatches - b.count++ + oldCount := b.count.Add(1) - 1 + batchId := oldCount % b.NumBatches m.AddTag(b.BatchTag, strconv.FormatUint(batchId, 10)) out = append(out, m) } From f8c0574b643915adebe391e8ffdcddb84f3bbb38 Mon Sep 17 00:00:00 2001 From: Lars Stegman Date: Wed, 11 Sep 2024 13:29:38 +0100 Subject: [PATCH 3/7] feat(processors.batch): update README.md Co-authored-by: Sven Rebhan <36194019+srebhan@users.noreply.github.com> --- plugins/processors/batch/README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/processors/batch/README.md b/plugins/processors/batch/README.md index 3a1f0f8b06a95..8eae6d684bc69 100644 --- a/plugins/processors/batch/README.md +++ b/plugins/processors/batch/README.md @@ -1,7 +1,6 @@ # Batch Processor Plugin -The batch processor batches metrics into -batches by adding a batch tag to the metrics. +This processor groups metrics into batches by adding a batch tag. This is useful for parallel processing of metrics where downstream processors, aggregators or outputs can then select a batch using `tagpass`. ## Global configuration options From 3d637870aab87fda37b5983193860c78f489b2f2 Mon Sep 17 00:00:00 2001 From: Lars Stegman Date: Wed, 11 Sep 2024 13:53:04 +0100 Subject: [PATCH 4/7] feat(processors.batch): add option to not rebatch --- plugins/processors/batch/README.md | 8 +++++++- plugins/processors/batch/batch.go | 9 +++++++-- plugins/processors/batch/sample.conf | 4 ++++ 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/plugins/processors/batch/README.md b/plugins/processors/batch/README.md index 8eae6d684bc69..02c878c45835a 100644 --- a/plugins/processors/batch/README.md +++ b/plugins/processors/batch/README.md @@ -1,6 +1,8 @@ # Batch Processor Plugin -This processor groups metrics into batches by adding a batch tag. This is useful for parallel processing of metrics where downstream processors, aggregators or outputs can then select a batch using `tagpass`. +This processor groups metrics into batches by adding a batch tag. This is +useful for parallel processing of metrics where downstream processors, +aggregators or outputs can then select a batch using `tagpass` or `metricpass`. ## Global configuration options @@ -24,6 +26,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## The number of batches to create num_batches = 16 + + ## Do not assign metrics with an existing batch assignment to a + ## different batch. + # skip_existing = false ``` ## Example diff --git a/plugins/processors/batch/batch.go b/plugins/processors/batch/batch.go index 156cf57d6f071..49ca353d8f879 100644 --- a/plugins/processors/batch/batch.go +++ b/plugins/processors/batch/batch.go @@ -12,8 +12,9 @@ import ( var sampleConfig string type Batch struct { - BatchTag string `toml:"batch_tag"` - NumBatches uint64 `toml:"num_batches"` + BatchTag string `toml:"batch_tag"` + NumBatches uint64 `toml:"num_batches"` + SkipExisting bool `toml:"skip_existing"` // the number of metrics that have been processed so far count atomic.Uint64 @@ -26,6 +27,10 @@ func (*Batch) SampleConfig() string { func (b *Batch) Apply(in ...telegraf.Metric) []telegraf.Metric { out := make([]telegraf.Metric, 0, len(in)) for _, m := range in { + if b.SkipExisting && m.HasTag(b.BatchTag) { + continue + } + oldCount := b.count.Add(1) - 1 batchId := oldCount % b.NumBatches m.AddTag(b.BatchTag, strconv.FormatUint(batchId, 10)) diff --git a/plugins/processors/batch/sample.conf b/plugins/processors/batch/sample.conf index 1d3ebe73a3dff..6e189a030707a 100644 --- a/plugins/processors/batch/sample.conf +++ b/plugins/processors/batch/sample.conf @@ -8,3 +8,7 @@ ## The number of batches to create num_batches = 16 + + ## Do not assign metrics with an existing batch assignment to a + ## different batch. + # skip_existing = false From 7e841d864f5ae87452a8cda722eddb4a95bc757f Mon Sep 17 00:00:00 2001 From: Lars Stegman Date: Wed, 11 Sep 2024 14:36:26 +0100 Subject: [PATCH 5/7] feat(processors.batch): fix bug and simplify tests --- plugins/processors/batch/README.md | 9 +- plugins/processors/batch/batch.go | 1 + plugins/processors/batch/batch_test.go | 113 +++++++++++++++++-------- plugins/processors/batch/sample.conf | 3 - 4 files changed, 82 insertions(+), 44 deletions(-) diff --git a/plugins/processors/batch/README.md b/plugins/processors/batch/README.md index 02c878c45835a..32dd44b7fb73b 100644 --- a/plugins/processors/batch/README.md +++ b/plugins/processors/batch/README.md @@ -1,9 +1,11 @@ # Batch Processor Plugin -This processor groups metrics into batches by adding a batch tag. This is -useful for parallel processing of metrics where downstream processors, +This processor groups metrics into batches by adding a batch tag. This is +useful for parallel processing of metrics where downstream processors, aggregators or outputs can then select a batch using `tagpass` or `metricpass`. +Metrics are distributed across batches using the round-robin scheme. + ## Global configuration options In addition to the plugin-specific configuration settings, plugins support @@ -17,9 +19,6 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ```toml @sample.conf ## Batch metrics into separate batches by adding a tag indicating the batch index. -## Can be used to route batches of metrics to different outputs -## to parallelize writing of metrics to an output -## Metrics are distributed across batches using the round-robin scheme. [[processors.batch]] ## The name of the tag to use for adding the batch index batch_tag = "my_batch" diff --git a/plugins/processors/batch/batch.go b/plugins/processors/batch/batch.go index 49ca353d8f879..6f5c28e48fb36 100644 --- a/plugins/processors/batch/batch.go +++ b/plugins/processors/batch/batch.go @@ -28,6 +28,7 @@ func (b *Batch) Apply(in ...telegraf.Metric) []telegraf.Metric { out := make([]telegraf.Metric, 0, len(in)) for _, m := range in { if b.SkipExisting && m.HasTag(b.BatchTag) { + out = append(out, m) continue } diff --git a/plugins/processors/batch/batch_test.go b/plugins/processors/batch/batch_test.go index 30f9f15422649..a3b6f97051a23 100644 --- a/plugins/processors/batch/batch_test.go +++ b/plugins/processors/batch/batch_test.go @@ -9,30 +9,11 @@ import ( const batchTag = "?internal_batch_idx" -func MakeBatching(batches uint64) *Batch { - return &Batch{ +func Test_SingleMetricPutInBatch0(t *testing.T) { + b := &Batch{ BatchTag: batchTag, - NumBatches: batches, + NumBatches: 1, } -} - -func MakeXMetrics(count int) []telegraf.Metric { - ms := make([]telegraf.Metric, 0, count) - for range count { - ms = append(ms, testutil.MockMetrics()...) - } - - return ms -} - -func requireMetricInBatch(t *testing.T, m telegraf.Metric, batch string) { - batchTagValue, ok := m.GetTag(batchTag) - require.True(t, ok) - require.Equal(t, batch, batchTagValue) -} - -func Test_SingleMetricPutInBatch0(t *testing.T) { - b := MakeBatching(1) m := testutil.MockMetricsWithValue(1) expectedM := testutil.MockMetricsWithValue(1) expectedM[0].AddTag(batchTag, "0") @@ -42,28 +23,88 @@ func Test_SingleMetricPutInBatch0(t *testing.T) { } func Test_MetricsSmallerThanBatchSizeAreInDifferentBatches(t *testing.T) { - b := MakeBatching(3) - ms := MakeXMetrics(2) + b := &Batch{ + BatchTag: batchTag, + NumBatches: 3, + } + + ms := make([]telegraf.Metric, 0, 2) + for range cap(ms) { + ms = append(ms, testutil.MockMetrics()...) + } + res := b.Apply(ms...) - requireMetricInBatch(t, res[0], "0") - requireMetricInBatch(t, res[1], "1") + + batchTagValue, ok := res[0].GetTag(batchTag) + require.True(t, ok) + require.Equal(t, "0", batchTagValue) + + batchTagValue, ok = res[1].GetTag(batchTag) + require.True(t, ok) + require.Equal(t, "1", batchTagValue) } func Test_MetricsEqualToBatchSizeInDifferentBatches(t *testing.T) { - b := MakeBatching(3) - ms := MakeXMetrics(3) + b := &Batch{ + BatchTag: batchTag, + NumBatches: 3, + } + + ms := make([]telegraf.Metric, 0, 3) + for range cap(ms) { + ms = append(ms, testutil.MockMetrics()...) + } + res := b.Apply(ms...) - requireMetricInBatch(t, res[0], "0") - requireMetricInBatch(t, res[1], "1") - requireMetricInBatch(t, res[2], "2") + batchTagValue, ok := res[0].GetTag(batchTag) + require.True(t, ok) + require.Equal(t, "0", batchTagValue) + + batchTagValue, ok = res[1].GetTag(batchTag) + require.True(t, ok) + require.Equal(t, "1", batchTagValue) + + batchTagValue, ok = res[2].GetTag(batchTag) + require.True(t, ok) + require.Equal(t, "2", batchTagValue) } func Test_MetricsMoreThanBatchSizeInSameBatch(t *testing.T) { - b := MakeBatching(2) - ms := MakeXMetrics(3) + b := &Batch{ + BatchTag: batchTag, + NumBatches: 2, + } + + ms := make([]telegraf.Metric, 0, 3) + for range cap(ms) { + ms = append(ms, testutil.MockMetrics()...) + } + res := b.Apply(ms...) + batchTagValue, ok := res[0].GetTag(batchTag) + require.True(t, ok) + require.Equal(t, "0", batchTagValue) + + batchTagValue, ok = res[1].GetTag(batchTag) + require.True(t, ok) + require.Equal(t, "1", batchTagValue) + + batchTagValue, ok = res[2].GetTag(batchTag) + require.True(t, ok) + require.Equal(t, "0", batchTagValue) +} + +func Test_MetricWithExistingTagNotChanged(t *testing.T) { + b := &Batch{ + BatchTag: batchTag, + NumBatches: 2, + SkipExisting: true, + } - requireMetricInBatch(t, res[0], "0") - requireMetricInBatch(t, res[1], "1") - requireMetricInBatch(t, res[2], "0") + m := testutil.MockMetricsWithValue(1) + m[0].AddTag(batchTag, "4") + res := b.Apply(m...) + tv, ok := res[0].GetTag(batchTag) + require.True(t, ok) + require.Equal(t, "4", tv) } diff --git a/plugins/processors/batch/sample.conf b/plugins/processors/batch/sample.conf index 6e189a030707a..991b89ce1e8fc 100644 --- a/plugins/processors/batch/sample.conf +++ b/plugins/processors/batch/sample.conf @@ -1,7 +1,4 @@ ## Batch metrics into separate batches by adding a tag indicating the batch index. -## Can be used to route batches of metrics to different outputs -## to parallelize writing of metrics to an output -## Metrics are distributed across batches using the round-robin scheme. [[processors.batch]] ## The name of the tag to use for adding the batch index batch_tag = "my_batch" From 2d80838485f1a0bfa7f2dae759fc23c8d97e7fad Mon Sep 17 00:00:00 2001 From: Lars Stegman Date: Wed, 11 Sep 2024 15:03:35 +0100 Subject: [PATCH 6/7] feat(processors.batch): make lint happy --- plugins/processors/batch/batch.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/processors/batch/batch.go b/plugins/processors/batch/batch.go index 6f5c28e48fb36..be5527e2ea921 100644 --- a/plugins/processors/batch/batch.go +++ b/plugins/processors/batch/batch.go @@ -33,8 +33,8 @@ func (b *Batch) Apply(in ...telegraf.Metric) []telegraf.Metric { } oldCount := b.count.Add(1) - 1 - batchId := oldCount % b.NumBatches - m.AddTag(b.BatchTag, strconv.FormatUint(batchId, 10)) + batchID := oldCount % b.NumBatches + m.AddTag(b.BatchTag, strconv.FormatUint(batchID, 10)) out = append(out, m) } From b3212e8e41035467548f422b48df00e09248267f Mon Sep 17 00:00:00 2001 From: Lars Stegman Date: Wed, 11 Sep 2024 16:58:52 +0100 Subject: [PATCH 7/7] feat(processors.batch): rename num_batches to batches --- plugins/processors/batch/README.md | 4 ++-- plugins/processors/batch/batch.go | 2 +- plugins/processors/batch/sample.conf | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/processors/batch/README.md b/plugins/processors/batch/README.md index 32dd44b7fb73b..95c5bba00aeaa 100644 --- a/plugins/processors/batch/README.md +++ b/plugins/processors/batch/README.md @@ -24,7 +24,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. batch_tag = "my_batch" ## The number of batches to create - num_batches = 16 + batches = 16 ## Do not assign metrics with an existing batch assignment to a ## different batch. @@ -41,7 +41,7 @@ The example below uses these settings: batch_tag = "batch" ## The number of batches to create - num_batches = 3 + batches = 3 ``` ```diff diff --git a/plugins/processors/batch/batch.go b/plugins/processors/batch/batch.go index be5527e2ea921..49215d26fc072 100644 --- a/plugins/processors/batch/batch.go +++ b/plugins/processors/batch/batch.go @@ -13,7 +13,7 @@ var sampleConfig string type Batch struct { BatchTag string `toml:"batch_tag"` - NumBatches uint64 `toml:"num_batches"` + NumBatches uint64 `toml:"batches"` SkipExisting bool `toml:"skip_existing"` // the number of metrics that have been processed so far diff --git a/plugins/processors/batch/sample.conf b/plugins/processors/batch/sample.conf index 991b89ce1e8fc..b4e987726eaba 100644 --- a/plugins/processors/batch/sample.conf +++ b/plugins/processors/batch/sample.conf @@ -4,7 +4,7 @@ batch_tag = "my_batch" ## The number of batches to create - num_batches = 16 + batches = 16 ## Do not assign metrics with an existing batch assignment to a ## different batch.