From d70858085f1be265a02ac5f1298efd986e7f275e Mon Sep 17 00:00:00 2001 From: Josh Liburdi Date: Thu, 7 Mar 2024 09:19:12 -0800 Subject: [PATCH] feat(transform): Add Metrics Bytes Transform (#142) * feat(transform): Add utilMetricBytes * docs(examples): Add utilMetricBytes * docs(examples): Comments --- build/config/substation.libsonnet | 8 +++ .../utility/message_bytes/config.jsonnet | 19 +++++ .../utility/message_bytes/data.jsonl | 13 ++++ transform/transform.go | 2 + transform/utility_metric_bytes.go | 71 +++++++++++++++++++ 5 files changed, 113 insertions(+) create mode 100644 examples/config/transform/utility/message_bytes/config.jsonnet create mode 100644 examples/config/transform/utility/message_bytes/data.jsonl create mode 100644 transform/utility_metric_bytes.go diff --git a/build/config/substation.libsonnet b/build/config/substation.libsonnet index 0b32657d..76f56c9c 100644 --- a/build/config/substation.libsonnet +++ b/build/config/substation.libsonnet @@ -979,6 +979,14 @@ settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))), }, metric: { + bytes(settings={}): { + local default = { + metric: $.config.metric, + }, + + type: 'utility_metric_bytes', + settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))), + }, count(settings={}): { local default = { metric: $.config.metric, diff --git a/examples/config/transform/utility/message_bytes/config.jsonnet b/examples/config/transform/utility/message_bytes/config.jsonnet new file mode 100644 index 00000000..da8604ba --- /dev/null +++ b/examples/config/transform/utility/message_bytes/config.jsonnet @@ -0,0 +1,19 @@ +// This example shows how to use the `utility_metric_bytes` transform to +// sum the amount of data received and transformed by Substation. +local sub = import '../../../../../build/config/substation.libsonnet'; + +local attr = { AppName: 'example' }; +local dest = { type: 'aws_cloudwatch_embedded_metrics' }; + +{ + transforms: [ + // If the transform is configured first, then the metric reflects + // the sum of bytes received by Substation. + sub.transform.utility.metric.bytes({ metric: { name: 'BytesReceived', attributes: attr, destination: dest } }), + // This inserts a value into the object so that the message size increases. + sub.transform.object.insert({obj: {target_key: '_'}, value: 1}), + // If the transform is configured last, then the metric reflects + // the sum of bytes transformed by Substation. + sub.transform.utility.metric.bytes({ metric: { name: 'BytesTransformed', attributes: attr, destination: dest } }), + ], +} diff --git a/examples/config/transform/utility/message_bytes/data.jsonl b/examples/config/transform/utility/message_bytes/data.jsonl new file mode 100644 index 00000000..d101df01 --- /dev/null +++ b/examples/config/transform/utility/message_bytes/data.jsonl @@ -0,0 +1,13 @@ +{"a":"b"} +{"c":"d"} +{"e":"f"} +{"g":"h"} +{"i":"j"} +{"k":"l"} +{"m":"n"} +{"o":"p"} +{"q":"r"} +{"s":"t"} +{"u":"v"} +{"w":"x"} +{"y":"z"} diff --git a/transform/transform.go b/transform/transform.go index c8b7687c..69bf4511 100644 --- a/transform/transform.go +++ b/transform/transform.go @@ -177,6 +177,8 @@ func New(ctx context.Context, cfg config.Config) (Transformer, error) { //nolint return newUtilityDrop(ctx, cfg) case "utility_err": return newUtilityErr(ctx, cfg) + case "utility_metric_bytes": + return newUtilityMetricBytes(ctx, cfg) case "utility_metric_count": return newUtilityMetricCount(ctx, cfg) case "utility_secret": diff --git a/transform/utility_metric_bytes.go b/transform/utility_metric_bytes.go new file mode 100644 index 00000000..3668d3d4 --- /dev/null +++ b/transform/utility_metric_bytes.go @@ -0,0 +1,71 @@ +package transform + +import ( + "context" + "encoding/json" + "fmt" + "sync/atomic" + + "github.com/brexhq/substation/config" + iconfig "github.com/brexhq/substation/internal/config" + "github.com/brexhq/substation/internal/metrics" + "github.com/brexhq/substation/message" +) + +type utilityMetricBytesConfig struct { + Metric iconfig.Metric `json:"metric"` +} + +func (c *utilityMetricBytesConfig) Decode(in interface{}) error { + return iconfig.Decode(in, c) +} + +func newUtilityMetricBytes(ctx context.Context, cfg config.Config) (*utilityMetricBytes, error) { + // conf gets validated when calling metrics.New. + conf := utilityMetricBytesConfig{} + if err := conf.Decode(cfg.Settings); err != nil { + return nil, fmt.Errorf("transform: utility_metric_bytes: %v", err) + } + + m, err := metrics.New(ctx, conf.Metric.Destination) + if err != nil { + return nil, fmt.Errorf("transform: utility_metric_bytes: %v", err) + } + + tf := utilityMetricBytes{ + conf: conf, + metric: m, + } + + return &tf, nil +} + +type utilityMetricBytes struct { + conf utilityMetricBytesConfig + + metric metrics.Generator + bytes uint32 +} + +func (tf *utilityMetricBytes) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { + if msg.IsControl() { + if err := tf.metric.Generate(ctx, metrics.Data{ + Name: tf.conf.Metric.Name, + Value: tf.bytes, + Attributes: tf.conf.Metric.Attributes, + }); err != nil { + return nil, fmt.Errorf("transform: utility_metric_bytes: %v", err) + } + + atomic.StoreUint32(&tf.bytes, 0) + return []*message.Message{msg}, nil + } + + atomic.AddUint32(&tf.bytes, uint32(len(msg.Data()))) + return []*message.Message{msg}, nil +} + +func (tf *utilityMetricBytes) String() string { + b, _ := json.Marshal(tf.conf) + return string(b) +}