From d730cc6b967b6ee853ebf5f6b18f1ead9c1bcb55 Mon Sep 17 00:00:00 2001 From: Josh Liburdi Date: Thu, 21 Mar 2024 07:54:21 -0700 Subject: [PATCH] perf(transform): Improve AggregateToArray Throughput (#150) * perf(transform): Increase aggregateToArray Throughput * build(config): Update JSON Lines Pattern --- build/config/substation.libsonnet | 3 +-- transform/aggregate.go | 15 +++------------ transform/aggregate_to_array.go | 19 ++++++------------- 3 files changed, 10 insertions(+), 27 deletions(-) diff --git a/build/config/substation.libsonnet b/build/config/substation.libsonnet index 76f56c9c..e2136435 100644 --- a/build/config/substation.libsonnet +++ b/build/config/substation.libsonnet @@ -1148,8 +1148,7 @@ condition=$.cnd.meta.negate({ inspector: $.cnd.fmt.json() }), transform=$.tf.util.drop(), ), - $.tf.agg.to.arr(), - $.tf.arr.join({ separator: '\n' }), + $.tf.agg.to.string({ separator: '\n' }), $.tf.str.append({ suffix: '\n' }), ], }, diff --git a/transform/aggregate.go b/transform/aggregate.go index a6ea96e3..a5af3dba 100644 --- a/transform/aggregate.go +++ b/transform/aggregate.go @@ -3,10 +3,10 @@ package transform import ( "bytes" "fmt" + "slices" iconfig "github.com/brexhq/substation/internal/config" "github.com/brexhq/substation/internal/errors" - "github.com/brexhq/substation/message" ) type aggregateArrayConfig struct { @@ -18,17 +18,8 @@ func (c *aggregateArrayConfig) Decode(in interface{}) error { return iconfig.Decode(in, c) } -func aggToArray(data [][]byte) ([]byte, error) { - msg := message.New() - - for _, d := range data { - if err := msg.SetValue("array.-1", d); err != nil { - return nil, err - } - } - - b := msg.GetValue("array") - return b.Bytes(), nil +func aggToArray(data [][]byte) []byte { + return slices.Concat([]byte("["), bytes.Join(data, []byte(",")), []byte("]")) } type aggregateStrConfig struct { diff --git a/transform/aggregate_to_array.go b/transform/aggregate_to_array.go index 15bf4e2a..4e302986 100644 --- a/transform/aggregate_to_array.go +++ b/transform/aggregate_to_array.go @@ -47,23 +47,19 @@ func (tf *aggregateToArray) Transform(ctx context.Context, msg *message.Message) tf.mu.Lock() defer tf.mu.Unlock() - //nolint: nestif // ignore nesting complexity if msg.IsControl() { var output []*message.Message for _, items := range tf.agg.GetAll() { - agg, err := aggToArray(items.Get()) - if err != nil { - return nil, fmt.Errorf("transform: aggregate_to_array: %v", err) - } + array := aggToArray(items.Get()) outMsg := message.New() if tf.hasObjTrg { - if err := outMsg.SetValue(tf.conf.Object.TargetKey, agg); err != nil { + if err := outMsg.SetValue(tf.conf.Object.TargetKey, array); err != nil { return nil, fmt.Errorf("transform: aggregate_to_array: %v", err) } } else { - outMsg.SetData(agg) + outMsg.SetData(array) } output = append(output, outMsg) @@ -80,18 +76,15 @@ func (tf *aggregateToArray) Transform(ctx context.Context, msg *message.Message) return nil, nil } - agg, err := aggToArray(tf.agg.Get(key)) - if err != nil { - return nil, fmt.Errorf("transform: aggregate_to_array: %v", err) - } + array := aggToArray(tf.agg.Get(key)) outMsg := message.New() if tf.hasObjTrg { - if err := outMsg.SetValue(tf.conf.Object.TargetKey, agg); err != nil { + if err := outMsg.SetValue(tf.conf.Object.TargetKey, array); err != nil { return nil, fmt.Errorf("transform: aggregate_to_array: %v", err) } } else { - outMsg.SetData(agg) + outMsg.SetData(array) } // If data cannot be added after reset, then the batch is misconfgured.