From afab3455737c898e88d8c68e2910f7fda4700580 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Mon, 8 May 2023 17:27:58 +0200 Subject: [PATCH] feat(processors.scale): Add scaling by factor and offset (#13227) --- plugins/processors/scale/README.md | 35 ++- plugins/processors/scale/sample.conf | 20 +- plugins/processors/scale/scale.go | 56 +++- plugins/processors/scale/scale_test.go | 397 ++++++++++++++++++++----- 4 files changed, 391 insertions(+), 117 deletions(-) diff --git a/plugins/processors/scale/README.md b/plugins/processors/scale/README.md index 17e458c16f5d4..0a33e8103e38d 100644 --- a/plugins/processors/scale/README.md +++ b/plugins/processors/scale/README.md @@ -10,11 +10,18 @@ the given output range according to this formula: \text{output\_minimum} ``` -Input fields are converted to floating point values. -If the conversion fails, those fields are ignored. +Alternatively, you can apply a factor and offset to the input according to +this formula -**Please note:** Neither the input nor the output values -are clipped to their respective ranges! +```math +\text{result}=\text{factor} \cdot \text{value} + \text{offset} +``` + +Input fields are converted to floating point values if possible. Otherwise, +fields that cannot be converted are ignored and keep their original value. + +**Please note:** Neither the input nor the output values are clipped to their + respective ranges! ## Global configuration options @@ -37,24 +44,24 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## - input_maximum: Maximum expected input value ## - output_minimum: Minimum desired output value ## - output_maximum: Maximum desired output value + ## alternatively you can specify a scaling with factor and offset + ## - factor: factor to scale the input value with + ## - offset: additive offset for value after scaling ## - fields: a list of field names (or filters) to apply this scaling to - - ## Example: Define a scaling + + ## Example: Scaling with minimum and maximum values # [processors.scale.scaling] # input_minimum = 0 # input_maximum = 1 # output_minimum = 0 # output_maximum = 100 # fields = ["temperature1", "temperature2"] - - ## Multiple scalings can be defined simultaneously - ## Example: A second scaling. + + ## Example: Scaling with factor and offset # [processors.scale.scaling] - # input_minimum = 0 - # input_maximum = 50 - # output_minimum = 50 - # output_maximum = 100 - # fields = ["humidity*"] + # factor = 10.0 + # offset = -5.0 + # fields = ["voltage*"] ``` ## Example diff --git a/plugins/processors/scale/sample.conf b/plugins/processors/scale/sample.conf index d1cee70401693..a92dcfe2f8557 100644 --- a/plugins/processors/scale/sample.conf +++ b/plugins/processors/scale/sample.conf @@ -7,21 +7,21 @@ ## - input_maximum: Maximum expected input value ## - output_minimum: Minimum desired output value ## - output_maximum: Maximum desired output value + ## alternatively you can specify a scaling with factor and offset + ## - factor: factor to scale the input value with + ## - offset: additive offset for value after scaling ## - fields: a list of field names (or filters) to apply this scaling to - - ## Example: Define a scaling + + ## Example: Scaling with minimum and maximum values # [processors.scale.scaling] # input_minimum = 0 # input_maximum = 1 # output_minimum = 0 # output_maximum = 100 # fields = ["temperature1", "temperature2"] - - ## Multiple scalings can be defined simultaneously - ## Example: A second scaling. + + ## Example: Scaling with factor and offset # [processors.scale.scaling] - # input_minimum = 0 - # input_maximum = 50 - # output_minimum = 50 - # output_maximum = 100 - # fields = ["humidity*"] + # factor = 10.0 + # offset = -5.0 + # fields = ["voltage*"] diff --git a/plugins/processors/scale/scale.go b/plugins/processors/scale/scale.go index 45b6b399e5078..6498640407560 100644 --- a/plugins/processors/scale/scale.go +++ b/plugins/processors/scale/scale.go @@ -21,14 +21,18 @@ func (*Scale) SampleConfig() string { } type Scaling struct { - InMin float64 `toml:"input_minimum"` - InMax float64 `toml:"input_maximum"` - OutMin float64 `toml:"output_minimum"` - OutMax float64 `toml:"output_maximum"` + InMin *float64 `toml:"input_minimum"` + InMax *float64 `toml:"input_maximum"` + OutMin *float64 `toml:"output_minimum"` + OutMax *float64 `toml:"output_maximum"` + Factor *float64 `toml:"factor"` + Offset *float64 `toml:"offset"` Fields []string `toml:"fields"` - factor float64 fieldFilter filter.Filter + scale float64 + shiftIn float64 + shiftOut float64 } type Scale struct { @@ -36,13 +40,36 @@ type Scale struct { Log telegraf.Logger `toml:"-"` } -func (s *Scaling) init() error { - if s.InMax == s.InMin { - return fmt.Errorf("input minimum and maximum are equal for fields %s", strings.Join(s.Fields, ",")) - } +func (s *Scaling) Init() error { + s.scale, s.shiftOut, s.shiftIn = float64(1.0), float64(0.0), float64(0.0) + allMinMaxSet := s.OutMax != nil && s.OutMin != nil && s.InMax != nil && s.InMin != nil + anyMinMaxSet := s.OutMax != nil || s.OutMin != nil || s.InMax != nil || s.InMin != nil + factorSet := s.Factor != nil || s.Offset != nil + if anyMinMaxSet && factorSet { + return fmt.Errorf("cannot use factor/offset and minimum/maximum at the same time for fields %s", strings.Join(s.Fields, ",")) + } else if anyMinMaxSet && !allMinMaxSet { + return fmt.Errorf("all minimum and maximum values need to be set for fields %s", strings.Join(s.Fields, ",")) + } else if !anyMinMaxSet && !factorSet { + return fmt.Errorf("no scaling defined for fields %s", strings.Join(s.Fields, ",")) + } else if allMinMaxSet { + if *s.InMax == *s.InMin { + return fmt.Errorf("input minimum and maximum are equal for fields %s", strings.Join(s.Fields, ",")) + } + + if *s.OutMax == *s.OutMin { + return fmt.Errorf("output minimum and maximum are equal for fields %s", strings.Join(s.Fields, ",")) + } - if s.OutMax == s.OutMin { - return fmt.Errorf("output minimum and maximum are equal for fields %s", strings.Join(s.Fields, ",")) + s.scale = (*s.OutMax - *s.OutMin) / (*s.InMax - *s.InMin) + s.shiftOut = *s.OutMin + s.shiftIn = *s.InMin + } else { + if s.Factor != nil { + s.scale = *s.Factor + } + if s.Offset != nil { + s.shiftOut = *s.Offset + } } scalingFilter, err := filter.Compile(s.Fields) @@ -51,18 +78,17 @@ func (s *Scaling) init() error { } s.fieldFilter = scalingFilter - s.factor = (s.OutMax - s.OutMin) / (s.InMax - s.InMin) return nil } // scale a float according to the input and output range func (s *Scaling) process(value float64) float64 { - return (value-s.InMin)*s.factor + s.OutMin + return s.scale*(value-s.shiftIn) + s.shiftOut } func (s *Scale) Init() error { if s.Scalings == nil { - return errors.New("no valid scalings defined") + return errors.New("no valid scaling defined") } allFields := make(map[string]bool) @@ -77,7 +103,7 @@ func (s *Scale) Init() error { } } - if err := s.Scalings[i].init(); err != nil { + if err := s.Scalings[i].Init(); err != nil { return fmt.Errorf("scaling %d: %w", i+1, err) } } diff --git a/plugins/processors/scale/scale_test.go b/plugins/processors/scale/scale_test.go index 6fb8dc13d313e..3a60008e286e7 100644 --- a/plugins/processors/scale/scale_test.go +++ b/plugins/processors/scale/scale_test.go @@ -4,36 +4,49 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) -func TestScaler(t *testing.T) { +type scalingValuesMinMax struct { + InMin float64 + InMax float64 + OutMin float64 + OutMax float64 + Fields []string +} + +type scalingValuesFactor struct { + Factor float64 + Offset float64 + Fields []string +} + +func TestMinMax(t *testing.T) { tests := []struct { name string - scale *Scale + scale []scalingValuesMinMax inputs []telegraf.Metric expected []telegraf.Metric }{ { name: "Field Scaling", - scale: &Scale{ - Scalings: []Scaling{ - { - InMin: -1, - InMax: 1, - OutMin: 0, - OutMax: 100, - Fields: []string{"test1", "test2"}, - }, - { - InMin: -5, - InMax: 0, - OutMin: 1, - OutMax: 9, - Fields: []string{"test3", "test4"}, - }, + scale: []scalingValuesMinMax{ + { + InMin: -1, + InMax: 1, + OutMin: 0, + OutMax: 100, + Fields: []string{"test1", "test2"}, + }, + { + InMin: -5, + InMax: 0, + OutMin: 1, + OutMax: 9, + Fields: []string{"test3", "test4"}, }, }, inputs: []telegraf.Metric{ @@ -82,16 +95,14 @@ func TestScaler(t *testing.T) { }, }, { - name: "Ignored Fileds", - scale: &Scale{ - Scalings: []Scaling{ - { - InMin: -1, - InMax: 1, - OutMin: 0, - OutMax: 100, - Fields: []string{"test1", "test2"}, - }, + name: "Ignored Fields", + scale: []scalingValuesMinMax{ + { + InMin: -1, + InMax: 1, + OutMin: 0, + OutMax: 100, + Fields: []string{"test1", "test2"}, }, }, inputs: []telegraf.Metric{ @@ -113,15 +124,13 @@ func TestScaler(t *testing.T) { }, { name: "Out of range tests", - scale: &Scale{ - Scalings: []Scaling{ - { - InMin: -1, - InMax: 1, - OutMin: 0, - OutMax: 100, - Fields: []string{"test1", "test2"}, - }, + scale: []scalingValuesMinMax{ + { + InMin: -1, + InMax: 1, + OutMin: 0, + OutMax: 100, + Fields: []string{"test1", "test2"}, }, }, inputs: []telegraf.Metric{ @@ -141,15 +150,13 @@ func TestScaler(t *testing.T) { }, { name: "Missing field Fileds", - scale: &Scale{ - Scalings: []Scaling{ - { - InMin: -1, - InMax: 1, - OutMin: 0, - OutMax: 100, - Fields: []string{"test1", "test2"}, - }, + scale: []scalingValuesMinMax{ + { + InMin: -1, + InMax: 1, + OutMin: 0, + OutMax: 100, + Fields: []string{"test1", "test2"}, }, }, inputs: []telegraf.Metric{ @@ -169,63 +176,297 @@ func TestScaler(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tt.scale.Log = testutil.Logger{} - - require.NoError(t, tt.scale.Init()) - actual := tt.scale.Apply(tt.inputs...) + plugin := &Scale{ + Scalings: make([]Scaling, 0, len(tt.scale)), + Log: testutil.Logger{}, + } + for i := range tt.scale { + plugin.Scalings = append(plugin.Scalings, Scaling{ + InMin: &tt.scale[i].InMin, + InMax: &tt.scale[i].InMax, + OutMin: &tt.scale[i].OutMin, + OutMax: &tt.scale[i].OutMax, + Fields: tt.scale[i].Fields, + }) + } + require.NoError(t, plugin.Init()) + actual := plugin.Apply(tt.inputs...) testutil.RequireMetricsEqual(t, tt.expected, actual) }) } } -func TestErrorCases(t *testing.T) { +func TestFactor(t *testing.T) { + tests := []struct { + name string + scale []scalingValuesFactor + inputs []telegraf.Metric + expected []telegraf.Metric + }{ + { + name: "Field Scaling", + scale: []scalingValuesFactor{ + { + Factor: 50.0, + Offset: 50.0, + Fields: []string{"test1", "test2"}, + }, + { + Factor: 1.6, + Offset: 9.0, + Fields: []string{"test3", "test4"}, + }, + }, + inputs: []telegraf.Metric{ + testutil.MustMetric("Name1", map[string]string{}, + map[string]interface{}{ + "test1": int64(0), + "test2": uint64(1), + }, time.Unix(0, 0)), + testutil.MustMetric("Name2", map[string]string{}, + map[string]interface{}{ + "test1": "0.5", + "test2": float32(-0.5), + }, time.Unix(0, 0)), + testutil.MustMetric("Name3", map[string]string{}, + map[string]interface{}{ + "test3": int64(-3), + "test4": uint64(0), + }, time.Unix(0, 0)), + testutil.MustMetric("Name4", map[string]string{}, + map[string]interface{}{ + "test3": int64(-5), + "test4": float32(-0.5), + }, time.Unix(0, 0)), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("Name1", map[string]string{}, + map[string]interface{}{ + "test1": float64(50), + "test2": float64(100), + }, time.Unix(0, 0)), + testutil.MustMetric("Name2", map[string]string{}, + map[string]interface{}{ + "test1": float64(75), + "test2": float32(25), + }, time.Unix(0, 0)), + testutil.MustMetric("Name3", map[string]string{}, + map[string]interface{}{ + "test3": float64(4.2), + "test4": float64(9), + }, time.Unix(0, 0)), + testutil.MustMetric("Name4", map[string]string{}, + map[string]interface{}{ + "test3": float64(1), + "test4": float64(8.2), + }, time.Unix(0, 0)), + }, + }, + { + name: "Ignored Fields", + scale: []scalingValuesFactor{ + { + Factor: 50.0, + Offset: 50.0, + Fields: []string{"test1", "test2"}, + }, + }, + inputs: []telegraf.Metric{ + testutil.MustMetric("Name1", map[string]string{}, + map[string]interface{}{ + "test1": int64(0), + "test2": uint64(1), + "test3": int64(1), + }, time.Unix(0, 0)), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("Name1", map[string]string{}, + map[string]interface{}{ + "test1": float64(50), + "test2": float64(100), + "test3": int64(1), + }, time.Unix(0, 0)), + }, + }, + { + name: "Missing field Fields", + scale: []scalingValuesFactor{ + { + Factor: 50.0, + Offset: 50.0, + Fields: []string{"test1", "test2"}, + }, + }, + inputs: []telegraf.Metric{ + testutil.MustMetric("Name1", map[string]string{}, + map[string]interface{}{ + "test1": int64(0), + }, time.Unix(0, 0)), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("Name1", map[string]string{}, + map[string]interface{}{ + "test1": float64(50), + }, time.Unix(0, 0)), + }, + }, + { + name: "No Offset", + scale: []scalingValuesFactor{ + { + Factor: 50.0, + Fields: []string{"test1"}, + }, + }, + inputs: []telegraf.Metric{ + testutil.MustMetric("Name1", map[string]string{}, + map[string]interface{}{ + "test1": int64(1), + }, time.Unix(0, 0)), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("Name1", map[string]string{}, + map[string]interface{}{ + "test1": float64(50), + }, time.Unix(0, 0)), + }, + }, + { + name: "No Factor", + scale: []scalingValuesFactor{ + { + Offset: 50.0, + Fields: []string{"test1"}, + }, + }, + inputs: []telegraf.Metric{ + testutil.MustMetric("Name1", map[string]string{}, + map[string]interface{}{ + "test1": int64(1), + }, time.Unix(0, 0)), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("Name1", map[string]string{}, + map[string]interface{}{ + "test1": float64(51), + }, time.Unix(0, 0)), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := &Scale{ + Scalings: make([]Scaling, 0, len(tt.scale)), + Log: testutil.Logger{}, + } + for i := range tt.scale { + s := Scaling{ + Fields: tt.scale[i].Fields, + } + if tt.scale[i].Factor != 0.0 { + s.Factor = &tt.scale[i].Factor + } + if tt.scale[i].Offset != 0.0 { + s.Offset = &tt.scale[i].Offset + } + plugin.Scalings = append(plugin.Scalings, s) + } + require.NoError(t, plugin.Init()) + actual := plugin.Apply(tt.inputs...) + + testutil.RequireMetricsEqual(t, tt.expected, actual, cmpopts.EquateApprox(0, 1e-6)) + }) + } +} + +func TestErrorCasesMinMax(t *testing.T) { + a0, a1, a100 := float64(0.0), float64(1.0), float64(100.0) tests := []struct { name string - scale *Scale + scaling []Scaling + fields []string expectedErrorMsg string }{ { name: "Same input range values", - scale: &Scale{ - Scalings: []Scaling{ - { - InMin: 1, - InMax: 1, - OutMin: 0, - OutMax: 100, - Fields: []string{"test"}, - }, + scaling: []Scaling{ + { + InMin: &a1, + InMax: &a1, + OutMin: &a0, + OutMax: &a100, + Fields: []string{"test"}, }, }, + fields: []string{"test"}, expectedErrorMsg: "input minimum and maximum are equal for fields test", }, { name: "Same input range values", - scale: &Scale{ - Scalings: []Scaling{ - { - InMin: 0, - InMax: 1, - OutMin: 100, - OutMax: 100, - Fields: []string{"test"}, - }, - }, - }, - expectedErrorMsg: "output minimum and maximum are equal for fields test", + scaling: []Scaling{ + { + InMin: &a0, + InMax: &a1, + OutMin: &a100, + OutMax: &a100, + Fields: []string{"test"}, + }, + }, + fields: []string{"test"}, + expectedErrorMsg: "output minimum and maximum are equal", + }, + { + name: "Nothing set", + scaling: []Scaling{ + { + Fields: []string{"test"}, + }, + }, + fields: []string{"test"}, + expectedErrorMsg: "no scaling defined", + }, + { + name: "Partial minimum and maximum", + scaling: []Scaling{ + { + InMin: &a0, + Fields: []string{"test"}, + }, + }, + fields: []string{"test"}, + expectedErrorMsg: "all minimum and maximum values need to be set", + }, + { + name: "Mixed minimum, maximum and factor", + scaling: []Scaling{ + { + InMin: &a0, + InMax: &a1, + OutMin: &a100, + OutMax: &a100, + Factor: &a1, + Fields: []string{"test"}, + }, + }, + fields: []string{"test"}, + expectedErrorMsg: "cannot use factor/offset and minimum/maximum at the same time", }, { - name: "No scalings", - scale: &Scale{Log: testutil.Logger{}}, - expectedErrorMsg: "no valid scalings defined", + name: "No scaling", + expectedErrorMsg: "no valid scaling defined", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tt.scale.Log = testutil.Logger{} - require.ErrorContains(t, tt.scale.Init(), tt.expectedErrorMsg) + plugin := &Scale{ + Scalings: tt.scaling, + Log: testutil.Logger{}, + } + err := plugin.Init() + require.ErrorContains(t, err, tt.expectedErrorMsg) }) } }