diff --git a/build/config/substation.libsonnet b/build/config/substation.libsonnet index 95a9c189..d33c8962 100644 --- a/build/config/substation.libsonnet +++ b/build/config/substation.libsonnet @@ -971,6 +971,14 @@ }, util: $.transform.utility, utility: { + control(settings={}): { + local default = { + batch: $.config.batch, + }, + + type: 'utility_control', + settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))), + }, delay(settings={}): { local default = { duration: null, diff --git a/examples/config/transform/utility/generate_ctrl/config.jsonnet b/examples/config/transform/utility/generate_ctrl/config.jsonnet new file mode 100644 index 00000000..feb1b242 --- /dev/null +++ b/examples/config/transform/utility/generate_ctrl/config.jsonnet @@ -0,0 +1,13 @@ +// This example shows how to use the `utility_control` transform to +// generate a control (ctrl) Message based on the amount of data Messages +// received by the system. ctrl Messages overrides the settings of the +// `aggregate_to_array` transform (and any other transform that supports). +local sub = import '../../../../../build/config/substation.libsonnet'; + +{ + transforms: [ + sub.tf.utility.control({ batch: { count: 2 } }), + sub.tf.aggregate.to.array({ batch: { count: 10000 } }), + sub.tf.send.stdout(), + ], +} diff --git a/examples/config/transform/utility/generate_ctrl/data.jsonl b/examples/config/transform/utility/generate_ctrl/data.jsonl new file mode 100644 index 00000000..d101df01 --- /dev/null +++ b/examples/config/transform/utility/generate_ctrl/data.jsonl @@ -0,0 +1,13 @@ +{"a":"b"} +{"c":"d"} +{"e":"f"} +{"g":"h"} +{"i":"j"} +{"k":"l"} +{"m":"n"} +{"o":"p"} +{"q":"r"} +{"s":"t"} +{"u":"v"} +{"w":"x"} +{"y":"z"} diff --git a/transform/aggregate.go b/transform/aggregate.go index a5af3dba..e4d4d3be 100644 --- a/transform/aggregate.go +++ b/transform/aggregate.go @@ -19,6 +19,10 @@ func (c *aggregateArrayConfig) Decode(in interface{}) error { } func aggToArray(data [][]byte) []byte { + if len(data) == 0 { + return nil + } + return slices.Concat([]byte("["), bytes.Join(data, []byte(",")), []byte("]")) } @@ -43,6 +47,10 @@ func (c *aggregateStrConfig) Validate() error { } func aggToStr(data [][]byte, separator []byte) []byte { + if len(data) == 0 { + return nil + } + return bytes.Join(data, separator) } diff --git a/transform/transform.go b/transform/transform.go index be136be8..29cf0618 100644 --- a/transform/transform.go +++ b/transform/transform.go @@ -173,6 +173,8 @@ func New(ctx context.Context, cfg config.Config) (Transformer, error) { //nolint case "time_to_unix_milli": return newTimeToUnixMilli(ctx, cfg) // Utility transforms. + case "utility_control": + return newUtilityControl(ctx, cfg) case "utility_delay": return newUtilityDelay(ctx, cfg) case "utility_drop": diff --git a/transform/utility_control.go b/transform/utility_control.go new file mode 100644 index 00000000..adf6a6b7 --- /dev/null +++ b/transform/utility_control.go @@ -0,0 +1,81 @@ +package transform + +import ( + "context" + "encoding/json" + "fmt" + "sync" + + "github.com/brexhq/substation/config" + "github.com/brexhq/substation/internal/aggregate" + iconfig "github.com/brexhq/substation/internal/config" + "github.com/brexhq/substation/message" +) + +type utilityControlConfig struct { + Batch iconfig.Batch `json:"batch"` +} + +func (c *utilityControlConfig) Decode(in interface{}) error { + return iconfig.Decode(in, c) +} + +func newUtilityControl(_ context.Context, cfg config.Config) (*utilityControl, error) { + conf := utilityControlConfig{} + if err := conf.Decode(cfg.Settings); err != nil { + return nil, fmt.Errorf("transform: utility_control: %v", err) + } + + agg, err := aggregate.New(aggregate.Config{ + Count: conf.Batch.Count, + Size: conf.Batch.Size, + Duration: conf.Batch.Duration, + }) + if err != nil { + return nil, fmt.Errorf("transform: utility_control: %v", err) + } + + tf := utilityControl{ + conf: conf, + agg: *agg, + } + + return &tf, nil +} + +type utilityControl struct { + conf utilityControlConfig + + mu sync.Mutex + agg aggregate.Aggregate +} + +func (tf *utilityControl) Transform(_ context.Context, msg *message.Message) ([]*message.Message, error) { + tf.mu.Lock() + defer tf.mu.Unlock() + + if msg.IsControl() { + // If a control message is received, then the aggregation is reset + // to prevent sending duplicate control messages. + tf.agg.ResetAll() + + return []*message.Message{msg}, nil + } + + if ok := tf.agg.Add("", msg.Data()); ok { + return []*message.Message{msg}, nil + } + + tf.agg.Reset("") + if ok := tf.agg.Add("", msg.Data()); !ok { + return nil, fmt.Errorf("transform: utility_control: %v", errSendBatchMisconfigured) + } + + ctrl := message.New().AsControl() + return []*message.Message{msg, ctrl}, nil +} + +func (tf *utilityControl) String() string { + b, _ := json.Marshal(tf.conf) + return string(b) +}