diff --git a/build/config/substation.libsonnet b/build/config/substation.libsonnet index 76f56c9c..e2136435 100644 --- a/build/config/substation.libsonnet +++ b/build/config/substation.libsonnet @@ -1148,8 +1148,7 @@ condition=$.cnd.meta.negate({ inspector: $.cnd.fmt.json() }), transform=$.tf.util.drop(), ), - $.tf.agg.to.arr(), - $.tf.arr.join({ separator: '\n' }), + $.tf.agg.to.string({ separator: '\n' }), $.tf.str.append({ suffix: '\n' }), ], }, diff --git a/transform/aggregate.go b/transform/aggregate.go index a6ea96e3..a5af3dba 100644 --- a/transform/aggregate.go +++ b/transform/aggregate.go @@ -3,10 +3,10 @@ package transform import ( "bytes" "fmt" + "slices" iconfig "github.com/brexhq/substation/internal/config" "github.com/brexhq/substation/internal/errors" - "github.com/brexhq/substation/message" ) type aggregateArrayConfig struct { @@ -18,17 +18,8 @@ func (c *aggregateArrayConfig) Decode(in interface{}) error { return iconfig.Decode(in, c) } -func aggToArray(data [][]byte) ([]byte, error) { - msg := message.New() - - for _, d := range data { - if err := msg.SetValue("array.-1", d); err != nil { - return nil, err - } - } - - b := msg.GetValue("array") - return b.Bytes(), nil +func aggToArray(data [][]byte) []byte { + return slices.Concat([]byte("["), bytes.Join(data, []byte(",")), []byte("]")) } type aggregateStrConfig struct { diff --git a/transform/aggregate_to_array.go b/transform/aggregate_to_array.go index 15bf4e2a..4e302986 100644 --- a/transform/aggregate_to_array.go +++ b/transform/aggregate_to_array.go @@ -47,23 +47,19 @@ func (tf *aggregateToArray) Transform(ctx context.Context, msg *message.Message) tf.mu.Lock() defer tf.mu.Unlock() - //nolint: nestif // ignore nesting complexity if msg.IsControl() { var output []*message.Message for _, items := range tf.agg.GetAll() { - agg, err := aggToArray(items.Get()) - if err != nil { - return nil, fmt.Errorf("transform: aggregate_to_array: %v", err) - } + array := aggToArray(items.Get()) outMsg := message.New() if tf.hasObjTrg { - if err := outMsg.SetValue(tf.conf.Object.TargetKey, agg); err != nil { + if err := outMsg.SetValue(tf.conf.Object.TargetKey, array); err != nil { return nil, fmt.Errorf("transform: aggregate_to_array: %v", err) } } else { - outMsg.SetData(agg) + outMsg.SetData(array) } output = append(output, outMsg) @@ -80,18 +76,15 @@ func (tf *aggregateToArray) Transform(ctx context.Context, msg *message.Message) return nil, nil } - agg, err := aggToArray(tf.agg.Get(key)) - if err != nil { - return nil, fmt.Errorf("transform: aggregate_to_array: %v", err) - } + array := aggToArray(tf.agg.Get(key)) outMsg := message.New() if tf.hasObjTrg { - if err := outMsg.SetValue(tf.conf.Object.TargetKey, agg); err != nil { + if err := outMsg.SetValue(tf.conf.Object.TargetKey, array); err != nil { return nil, fmt.Errorf("transform: aggregate_to_array: %v", err) } } else { - outMsg.SetData(agg) + outMsg.SetData(array) } // If data cannot be added after reset, then the batch is misconfgured.