diff --git a/build/config/substation.libsonnet b/build/config/substation.libsonnet index 3d4873a6..1f06246a 100644 --- a/build/config/substation.libsonnet +++ b/build/config/substation.libsonnet @@ -535,6 +535,18 @@ type: type, settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))), }, + min(settings={}): $.transform.number.minimum(settings=settings), + minimum(settings={}): { + local type = 'number_minimum', + local default = { + id: $.helpers.id(type, settings), + object: $.config.object, + value: null, + }, + + type: type, + settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))), + }, math: { default: { object: $.config.object, @@ -1364,6 +1376,13 @@ $.tf.str.append({ suffix: '\n' }), ], }, + num: $.pattern.transform.number, + number: { + clamp(source_key, target_key, min, max): [ + $.tf.number.maximum({ object: { source_key: source_key, target_key: target_key }, value: min }), + $.tf.number.minimum({ object: { source_key: target_key, target_key: target_key }, value: max }), + ], + }, }, }, // Utility functions that can be used in conditions and transforms. diff --git a/examples/config/transform/number/clamp/config.jsonnet b/examples/config/transform/number/clamp/config.jsonnet new file mode 100644 index 00000000..0733b599 --- /dev/null +++ b/examples/config/transform/number/clamp/config.jsonnet @@ -0,0 +1,11 @@ +// This example uses the `number.clamp` pattern to return a value that is +// constrained to a range, where the range is defined by two constants. +local sub = import '../../../../../build/config/substation.libsonnet'; + +{ + concurrency: 1, + // Use `null` for object keys to operate on the entire message. + transforms: sub.pattern.tf.num.clamp(null, null, 0, 100) + [ + sub.tf.send.stdout(), + ], +} diff --git a/examples/config/transform/number/clamp/data.txt b/examples/config/transform/number/clamp/data.txt new file mode 100644 index 00000000..7779e6cb --- /dev/null +++ b/examples/config/transform/number/clamp/data.txt @@ -0,0 +1,3 @@ +-1 +101 +50 diff --git a/examples/config/transform/number/clamp/stdout.txt b/examples/config/transform/number/clamp/stdout.txt new file mode 100644 index 00000000..e7e321ee --- /dev/null +++ b/examples/config/transform/number/clamp/stdout.txt @@ -0,0 +1,3 @@ +100 +0 +50 diff --git a/examples/config/transform/number/min/config.jsonnet b/examples/config/transform/number/min/config.jsonnet new file mode 100644 index 00000000..672d7245 --- /dev/null +++ b/examples/config/transform/number/min/config.jsonnet @@ -0,0 +1,11 @@ +// This example uses the `number_minimum` transform to return the smaller +// of two values, where one value is a constant and the other is a message. +local sub = import '../../../../../build/config/substation.libsonnet'; + +{ + concurrency: 1, + transforms: [ + sub.tf.num.min({ value: 0 }), + sub.tf.send.stdout(), + ], +} diff --git a/examples/config/transform/number/min/data.txt b/examples/config/transform/number/min/data.txt new file mode 100644 index 00000000..a5c891fc --- /dev/null +++ b/examples/config/transform/number/min/data.txt @@ -0,0 +1,4 @@ +0 +-1 +-1.1 +10 diff --git a/examples/config/transform/number/min/stdout.txt b/examples/config/transform/number/min/stdout.txt new file mode 100644 index 00000000..f27880a1 --- /dev/null +++ b/examples/config/transform/number/min/stdout.txt @@ -0,0 +1,4 @@ +0 +-1 +-1.1 +0 diff --git a/transform/number_minimum.go b/transform/number_minimum.go new file mode 100644 index 00000000..124f1d93 --- /dev/null +++ b/transform/number_minimum.go @@ -0,0 +1,76 @@ +package transform + +import ( + "context" + "encoding/json" + "fmt" + "math" + + "github.com/brexhq/substation/config" + iconfig "github.com/brexhq/substation/internal/config" + "github.com/brexhq/substation/message" +) + +func newNumberMinimum(_ context.Context, cfg config.Config) (*numberMinimum, error) { + conf := numberValConfig{} + if err := iconfig.Decode(cfg.Settings, &conf); err != nil { + return nil, fmt.Errorf("transform number_minimum: %v", err) + } + + if conf.ID == "" { + conf.ID = "number_minimum" + } + + if err := conf.Validate(); err != nil { + return nil, fmt.Errorf("transform %s: %v", conf.ID, err) + } + + tf := numberMinimum{ + conf: conf, + isObject: conf.Object.SourceKey != "" && conf.Object.TargetKey != "", + } + + return &tf, nil +} + +type numberMinimum struct { + conf numberValConfig + isObject bool +} + +func (tf *numberMinimum) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { + if msg.IsControl() { + return []*message.Message{msg}, nil + } + + var value message.Value + if tf.isObject { + value = msg.GetValue(tf.conf.Object.SourceKey) + } else { + value = bytesToValue(msg.Data()) + } + + if !value.Exists() { + return []*message.Message{msg}, nil + } + + flo64 := math.Min(value.Float(), tf.conf.Value) + + if !tf.isObject { + s := numberFloat64ToString(flo64) + msg.SetData([]byte(s)) + + return []*message.Message{msg}, nil + } + + if err := msg.SetValue(tf.conf.Object.TargetKey, flo64); err != nil { + return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, err) + } + + return []*message.Message{msg}, nil +} + +func (tf *numberMinimum) String() string { + b, _ := json.Marshal(tf.conf) + return string(b) +} diff --git a/transform/number_minimum_test.go b/transform/number_minimum_test.go new file mode 100644 index 00000000..d45d649e --- /dev/null +++ b/transform/number_minimum_test.go @@ -0,0 +1,158 @@ +package transform + +import ( + "context" + "reflect" + "testing" + + "github.com/brexhq/substation/config" + "github.com/brexhq/substation/message" +) + +var _ Transformer = &numberMinimum{} + +var numberMinimumTests = []struct { + name string + cfg config.Config + test []byte + expected [][]byte +}{ + // data tests + { + "data", + config.Config{ + Settings: map[string]interface{}{ + "value": 1, + }, + }, + []byte(`0`), + [][]byte{ + []byte(`0`), + }, + }, + { + "data", + config.Config{ + Settings: map[string]interface{}{ + "value": -1, + }, + }, + []byte(`0`), + [][]byte{ + []byte(`-1`), + }, + }, + { + "data", + config.Config{ + Settings: map[string]interface{}{ + "value": -1.1, + }, + }, + []byte(`0.1`), + [][]byte{ + []byte(`-1.1`), + }, + }, + // object tests + { + "object", + config.Config{ + Settings: map[string]interface{}{ + "object": map[string]interface{}{ + "source_key": "a", + "target_key": "a", + }, + "value": 1, + }, + }, + []byte(`{"a":0}`), + [][]byte{ + []byte(`{"a":0}`), + }, + }, + { + "object", + config.Config{ + Settings: map[string]interface{}{ + "object": map[string]interface{}{ + "source_key": "a", + "target_key": "a", + }, + "value": -1, + }, + }, + []byte(`{"a":0}`), + [][]byte{ + []byte(`{"a":-1}`), + }, + }, + { + "object", + config.Config{ + Settings: map[string]interface{}{ + "object": map[string]interface{}{ + "source_key": "a", + "target_key": "a", + }, + "value": -1.1, + }, + }, + []byte(`{"a":0.1}`), + [][]byte{ + []byte(`{"a":-1.1}`), + }, + }, +} + +func TestNumberMinimum(t *testing.T) { + ctx := context.TODO() + for _, test := range numberMinimumTests { + t.Run(test.name, func(t *testing.T) { + tf, err := newNumberMinimum(ctx, test.cfg) + if err != nil { + t.Fatal(err) + } + + msg := message.New().SetData(test.test) + result, err := tf.Transform(ctx, msg) + if err != nil { + t.Error(err) + } + + var data [][]byte + for _, c := range result { + data = append(data, c.Data()) + } + + if !reflect.DeepEqual(data, test.expected) { + t.Errorf("expected %s, got %s", test.expected, data) + } + }) + } +} + +func benchmarkNumberMinimum(b *testing.B, tf *numberMinimum, data []byte) { + ctx := context.TODO() + msg := message.New().SetData(data) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, _ = tf.Transform(ctx, msg) + } +} + +func BenchmarkNumberMinimum(b *testing.B) { + for _, test := range numberMinimumTests { + tf, err := newNumberMinimum(context.TODO(), test.cfg) + if err != nil { + b.Fatal(err) + } + + b.Run(test.name, + func(b *testing.B) { + benchmarkNumberMinimum(b, tf, test.test) + }, + ) + } +} diff --git a/transform/transform.go b/transform/transform.go index ceff1e62..13129d79 100644 --- a/transform/transform.go +++ b/transform/transform.go @@ -90,6 +90,8 @@ func New(ctx context.Context, cfg config.Config) (Transformer, error) { //nolint // Number transforms. case "number_maximum": return newNumberMaximum(ctx, cfg) + case "number_minimum": + return newNumberMinimum(ctx, cfg) case "number_math_addition": return newNumberMathAddition(ctx, cfg) case "number_math_division":