From 04ed699000ed52605b99176aebf9b15b70203c19 Mon Sep 17 00:00:00 2001 From: Josh Liburdi Date: Tue, 4 Jun 2024 11:13:38 -0700 Subject: [PATCH] feat(transform): Add Metric Freshness Transform (#178) * feat(transform): Add Metric Freshness Transform * docs(examples): Add Metric Freshness Example * docs(transform): Update Concurrency Comment --- build/config/substation.libsonnet | 10 ++ .../utility/message_freshness/config.jsonnet | 24 ++++ transform/transform.go | 2 + transform/utility_metric_freshness.go | 124 ++++++++++++++++++ 4 files changed, 160 insertions(+) create mode 100644 examples/config/transform/utility/message_freshness/config.jsonnet create mode 100644 transform/utility_metric_freshness.go diff --git a/build/config/substation.libsonnet b/build/config/substation.libsonnet index 8f4194d1..ef8cc8ec 100644 --- a/build/config/substation.libsonnet +++ b/build/config/substation.libsonnet @@ -1015,6 +1015,16 @@ type: 'utility_metric_count', settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))), }, + freshness(settings={}): { + local default = { + threshold: null, + metric: $.config.metric, + object: $.config.object, + }, + + type: 'utility_metric_freshness', + settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))), + }, }, secret(settings={}): { local default = { secret: null }, diff --git a/examples/config/transform/utility/message_freshness/config.jsonnet b/examples/config/transform/utility/message_freshness/config.jsonnet new file mode 100644 index 00000000..09314459 --- /dev/null +++ b/examples/config/transform/utility/message_freshness/config.jsonnet @@ -0,0 +1,24 @@ +// This example shows how to use the `utility_metric_freshness` transform to +// determine if data was processed by the system within a certain time frame. +// +// Freshness is calculated by comparing a time value in the message to the current +// time and determining if the difference is less than a threshold: +// - Success: current time - timestamp < threshold +// - Failure: current time - timestamp >= threshold +// +// The transform emits two metrics that describe success and failure, annotated +// in the `FreshnessType` attribute. +local sub = import '../../../../../build/config/substation.libsonnet'; + +local attr = { AppName: 'example' }; +local dest = { type: 'aws_cloudwatch_embedded_metrics' }; + +{ + transforms: [ + sub.transform.utility.metric.freshness({ + threshold: '5s', // Amount of time spent in the system before considered stale. + object: { source_key: 'timestamp' }, // Used as the reference to determine freshness. + metric: { name: 'MessageFreshness', attributes: attr, destination: dest } + }), + ], +} diff --git a/transform/transform.go b/transform/transform.go index 29cf0618..e4708393 100644 --- a/transform/transform.go +++ b/transform/transform.go @@ -185,6 +185,8 @@ func New(ctx context.Context, cfg config.Config) (Transformer, error) { //nolint return newUtilityMetricBytes(ctx, cfg) case "utility_metric_count": return newUtilityMetricCount(ctx, cfg) + case "utility_metric_freshness": + return newUtilityMetricFreshness(ctx, cfg) case "utility_secret": return newUtilitySecret(ctx, cfg) default: diff --git a/transform/utility_metric_freshness.go b/transform/utility_metric_freshness.go new file mode 100644 index 00000000..363c4ae6 --- /dev/null +++ b/transform/utility_metric_freshness.go @@ -0,0 +1,124 @@ +package transform + +import ( + "context" + "encoding/json" + "fmt" + "sync/atomic" + "time" + + "github.com/brexhq/substation/config" + iconfig "github.com/brexhq/substation/internal/config" + "github.com/brexhq/substation/internal/errors" + "github.com/brexhq/substation/internal/metrics" + "github.com/brexhq/substation/message" +) + +type utilityMetricFreshnessConfig struct { + Threshold string `json:"threshold"` + + Object iconfig.Object `json:"object"` + Metric iconfig.Metric `json:"metric"` +} + +func (c *utilityMetricFreshnessConfig) Decode(in interface{}) error { + return iconfig.Decode(in, c) +} + +func (c *utilityMetricFreshnessConfig) Validate() error { + if c.Threshold == "" { + return fmt.Errorf("threshold: %v", errors.ErrMissingRequiredOption) + } + + if c.Object.SourceKey == "" { + return fmt.Errorf("object_source_key: %v", errors.ErrMissingRequiredOption) + } + + return nil +} + +func newUtilityMetricFreshness(ctx context.Context, cfg config.Config) (*utilityMetricFreshness, error) { + conf := utilityMetricFreshnessConfig{} + if err := conf.Decode(cfg.Settings); err != nil { + return nil, fmt.Errorf("transform: utility_metric_freshness: %v", err) + } + + if err := conf.Validate(); err != nil { + return nil, fmt.Errorf("transform: utility_metric_freshness: %v", err) + } + + m, err := metrics.New(ctx, conf.Metric.Destination) + if err != nil { + return nil, fmt.Errorf("transform: utility_metric_freshness: %v", err) + } + + dur, err := time.ParseDuration(conf.Threshold) + if err != nil { + return nil, fmt.Errorf("transform: utility_metric_freshness: duration: %v", err) + } + + tf := utilityMetricFreshness{ + conf: conf, + metric: m, + dur: dur, + } + + return &tf, nil +} + +type utilityMetricFreshness struct { + conf utilityMetricFreshnessConfig + metric metrics.Generator + dur time.Duration + + success uint32 + failure uint32 +} + +func (tf *utilityMetricFreshness) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { + // ctrl messages are handled by only one thread, so the map + // updates below are safe for concurrency. + if msg.IsControl() { + tf.conf.Metric.Attributes["FreshnessType"] = "Success" + if err := tf.metric.Generate(ctx, metrics.Data{ + Name: tf.conf.Metric.Name, + Value: tf.success, + Attributes: tf.conf.Metric.Attributes, + }); err != nil { + return nil, fmt.Errorf("transform: utility_metric_freshness: %v", err) + } + + tf.conf.Metric.Attributes["FreshnessType"] = "Failure" + if err := tf.metric.Generate(ctx, metrics.Data{ + Name: tf.conf.Metric.Name, + Value: tf.failure, + Attributes: tf.conf.Metric.Attributes, + }); err != nil { + return nil, fmt.Errorf("transform: utility_metric_freshness: %v", err) + } + + atomic.StoreUint32(&tf.success, 0) + atomic.StoreUint32(&tf.failure, 0) + return []*message.Message{msg}, nil + } + + // This is a time value expected to be in nanoseconds. + val := msg.GetValue(tf.conf.Object.SourceKey).Int() + if val == 0 { + return []*message.Message{msg}, nil + } + + ts := time.Unix(0, val) + if time.Since(ts) < tf.dur { + atomic.AddUint32(&tf.success, 1) + } else { + atomic.AddUint32(&tf.failure, 1) + } + + return []*message.Message{msg}, nil +} + +func (tf *utilityMetricFreshness) String() string { + b, _ := json.Marshal(tf.conf) + return string(b) +}