diff --git a/build/config/substation.libsonnet b/build/config/substation.libsonnet index a9fd38c5..3d4873a6 100644 --- a/build/config/substation.libsonnet +++ b/build/config/substation.libsonnet @@ -523,6 +523,18 @@ }, num: $.transform.number, number: { + max(settings={}): $.transform.number.maximum(settings=settings), + maximum(settings={}): { + local type = 'number_maximum', + local default = { + id: $.helpers.id(type, settings), + object: $.config.object, + value: null, + }, + + type: type, + settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))), + }, math: { default: { object: $.config.object, diff --git a/examples/config/transform/number/max/config.jsonnet b/examples/config/transform/number/max/config.jsonnet new file mode 100644 index 00000000..42b3d359 --- /dev/null +++ b/examples/config/transform/number/max/config.jsonnet @@ -0,0 +1,11 @@ +// This example uses the `number_maximum` transform to return the larger +// of two values, where one value is a constant and the other is a message. +local sub = import '../../../../../build/config/substation.libsonnet'; + +{ + concurrency: 1, + transforms: [ + sub.tf.num.max({ value: 0 }), + sub.tf.send.stdout(), + ], +} diff --git a/examples/config/transform/number/max/data.txt b/examples/config/transform/number/max/data.txt new file mode 100644 index 00000000..a5c891fc --- /dev/null +++ b/examples/config/transform/number/max/data.txt @@ -0,0 +1,4 @@ +0 +-1 +-1.1 +10 diff --git a/examples/config/transform/number/max/stdout.txt b/examples/config/transform/number/max/stdout.txt new file mode 100644 index 00000000..2360ec2f --- /dev/null +++ b/examples/config/transform/number/max/stdout.txt @@ -0,0 +1,4 @@ +0 +0 +0 +10 diff --git a/transform/number.go b/transform/number.go index 51193fec..ec02512b 100644 --- a/transform/number.go +++ b/transform/number.go @@ -8,6 +8,31 @@ import ( "github.com/brexhq/substation/internal/errors" ) +// Use this config for any Number transform that only requires a single value. +type numberValConfig struct { + Value float64 `json:"value"` + + ID string `json:"id"` + Object iconfig.Object `json:"object"` +} + +func (c *numberValConfig) Decode(in interface{}) error { + return iconfig.Decode(in, c) +} + +// 0.0 is a valid value and should not be checked. +func (c *numberValConfig) Validate() error { + if c.Object.SourceKey == "" && c.Object.TargetKey != "" { + return fmt.Errorf("object_source_key: %v", errors.ErrMissingRequiredOption) + } + + if c.Object.SourceKey != "" && c.Object.TargetKey == "" { + return fmt.Errorf("object_target_key: %v", errors.ErrMissingRequiredOption) + } + + return nil +} + type numberMathConfig struct { ID string `json:"id"` Object iconfig.Object `json:"object"` diff --git a/transform/number_maximum.go b/transform/number_maximum.go new file mode 100644 index 00000000..45f28ef4 --- /dev/null +++ b/transform/number_maximum.go @@ -0,0 +1,76 @@ +package transform + +import ( + "context" + "encoding/json" + "fmt" + "math" + + "github.com/brexhq/substation/config" + iconfig "github.com/brexhq/substation/internal/config" + "github.com/brexhq/substation/message" +) + +func newNumberMaximum(_ context.Context, cfg config.Config) (*numberMaximum, error) { + conf := numberValConfig{} + if err := iconfig.Decode(cfg.Settings, &conf); err != nil { + return nil, fmt.Errorf("transform number_maximum: %v", err) + } + + if conf.ID == "" { + conf.ID = "number_maximum" + } + + if err := conf.Validate(); err != nil { + return nil, fmt.Errorf("transform %s: %v", conf.ID, err) + } + + tf := numberMaximum{ + conf: conf, + isObject: conf.Object.SourceKey != "" && conf.Object.TargetKey != "", + } + + return &tf, nil +} + +type numberMaximum struct { + conf numberValConfig + isObject bool +} + +func (tf *numberMaximum) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { + if msg.IsControl() { + return []*message.Message{msg}, nil + } + + var value message.Value + if tf.isObject { + value = msg.GetValue(tf.conf.Object.SourceKey) + } else { + value = bytesToValue(msg.Data()) + } + + if !value.Exists() { + return []*message.Message{msg}, nil + } + + flo64 := math.Max(value.Float(), tf.conf.Value) + + if !tf.isObject { + s := numberFloat64ToString(flo64) + msg.SetData([]byte(s)) + + return []*message.Message{msg}, nil + } + + if err := msg.SetValue(tf.conf.Object.TargetKey, flo64); err != nil { + return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, err) + } + + return []*message.Message{msg}, nil +} + +func (tf *numberMaximum) String() string { + b, _ := json.Marshal(tf.conf) + return string(b) +} diff --git a/transform/number_maximum_test.go b/transform/number_maximum_test.go new file mode 100644 index 00000000..1850d557 --- /dev/null +++ b/transform/number_maximum_test.go @@ -0,0 +1,158 @@ +package transform + +import ( + "context" + "reflect" + "testing" + + "github.com/brexhq/substation/config" + "github.com/brexhq/substation/message" +) + +var _ Transformer = &numberMaximum{} + +var numberMaximumTests = []struct { + name string + cfg config.Config + test []byte + expected [][]byte +}{ + // data tests + { + "data", + config.Config{ + Settings: map[string]interface{}{ + "value": 1, + }, + }, + []byte(`0`), + [][]byte{ + []byte(`1`), + }, + }, + { + "data", + config.Config{ + Settings: map[string]interface{}{ + "value": -1, + }, + }, + []byte(`0`), + [][]byte{ + []byte(`0`), + }, + }, + { + "data", + config.Config{ + Settings: map[string]interface{}{ + "value": -1.1, + }, + }, + []byte(`0.1`), + [][]byte{ + []byte(`0.1`), + }, + }, + // object tests + { + "object", + config.Config{ + Settings: map[string]interface{}{ + "object": map[string]interface{}{ + "source_key": "a", + "target_key": "a", + }, + "value": 1, + }, + }, + []byte(`{"a":0}`), + [][]byte{ + []byte(`{"a":1}`), + }, + }, + { + "object", + config.Config{ + Settings: map[string]interface{}{ + "object": map[string]interface{}{ + "source_key": "a", + "target_key": "a", + }, + "value": -1, + }, + }, + []byte(`{"a":0}`), + [][]byte{ + []byte(`{"a":0}`), + }, + }, + { + "object", + config.Config{ + Settings: map[string]interface{}{ + "object": map[string]interface{}{ + "source_key": "a", + "target_key": "a", + }, + "value": -1.1, + }, + }, + []byte(`{"a":0.1}`), + [][]byte{ + []byte(`{"a":0.1}`), + }, + }, +} + +func TestNumberMaximum(t *testing.T) { + ctx := context.TODO() + for _, test := range numberMaximumTests { + t.Run(test.name, func(t *testing.T) { + tf, err := newNumberMaximum(ctx, test.cfg) + if err != nil { + t.Fatal(err) + } + + msg := message.New().SetData(test.test) + result, err := tf.Transform(ctx, msg) + if err != nil { + t.Error(err) + } + + var data [][]byte + for _, c := range result { + data = append(data, c.Data()) + } + + if !reflect.DeepEqual(data, test.expected) { + t.Errorf("expected %s, got %s", test.expected, data) + } + }) + } +} + +func benchmarkNumberMaximum(b *testing.B, tf *numberMaximum, data []byte) { + ctx := context.TODO() + msg := message.New().SetData(data) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, _ = tf.Transform(ctx, msg) + } +} + +func BenchmarkNumberMaximum(b *testing.B) { + for _, test := range numberMaximumTests { + tf, err := newNumberMaximum(context.TODO(), test.cfg) + if err != nil { + b.Fatal(err) + } + + b.Run(test.name, + func(b *testing.B) { + benchmarkNumberMaximum(b, tf, test.test) + }, + ) + } +} diff --git a/transform/transform.go b/transform/transform.go index 3f55f5de..ceff1e62 100644 --- a/transform/transform.go +++ b/transform/transform.go @@ -88,6 +88,8 @@ func New(ctx context.Context, cfg config.Config) (Transformer, error) { //nolint case "meta_switch": return newMetaSwitch(ctx, cfg) // Number transforms. + case "number_maximum": + return newNumberMaximum(ctx, cfg) case "number_math_addition": return newNumberMathAddition(ctx, cfg) case "number_math_division":