Skip to content

Commit

Permalink
feat(transform): Add Metric Freshness Transform (#178)
Browse files Browse the repository at this point in the history
* feat(transform): Add Metric Freshness Transform

* docs(examples): Add Metric Freshness Example

* docs(transform): Update Concurrency Comment
  • Loading branch information
jshlbrd committed Jun 4, 2024
1 parent 93b64cd commit 04ed699
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 0 deletions.
10 changes: 10 additions & 0 deletions build/config/substation.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,16 @@
type: 'utility_metric_count',
settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))),
},
freshness(settings={}): {
local default = {
threshold: null,
metric: $.config.metric,
object: $.config.object,
},

type: 'utility_metric_freshness',
settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))),
},
},
secret(settings={}): {
local default = { secret: null },
Expand Down
24 changes: 24 additions & 0 deletions examples/config/transform/utility/message_freshness/config.jsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// This example shows how to use the `utility_metric_freshness` transform to
// determine if data was processed by the system within a certain time frame.
//
// Freshness is calculated by comparing a time value in the message to the current
// time and determining if the difference is less than a threshold:
// - Success: current time - timestamp < threshold
// - Failure: current time - timestamp >= threshold
//
// The transform emits two metrics that describe success and failure, annotated
// in the `FreshnessType` attribute.
local sub = import '../../../../../build/config/substation.libsonnet';

local attr = { AppName: 'example' };
local dest = { type: 'aws_cloudwatch_embedded_metrics' };

{
transforms: [
sub.transform.utility.metric.freshness({
threshold: '5s', // Amount of time spent in the system before considered stale.
object: { source_key: 'timestamp' }, // Used as the reference to determine freshness.
metric: { name: 'MessageFreshness', attributes: attr, destination: dest }
}),
],
}
2 changes: 2 additions & 0 deletions transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ func New(ctx context.Context, cfg config.Config) (Transformer, error) { //nolint
return newUtilityMetricBytes(ctx, cfg)
case "utility_metric_count":
return newUtilityMetricCount(ctx, cfg)
case "utility_metric_freshness":
return newUtilityMetricFreshness(ctx, cfg)
case "utility_secret":
return newUtilitySecret(ctx, cfg)
default:
Expand Down
124 changes: 124 additions & 0 deletions transform/utility_metric_freshness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package transform

import (
"context"
"encoding/json"
"fmt"
"sync/atomic"
"time"

"github.com/brexhq/substation/config"
iconfig "github.com/brexhq/substation/internal/config"
"github.com/brexhq/substation/internal/errors"
"github.com/brexhq/substation/internal/metrics"
"github.com/brexhq/substation/message"
)

type utilityMetricFreshnessConfig struct {
Threshold string `json:"threshold"`

Object iconfig.Object `json:"object"`
Metric iconfig.Metric `json:"metric"`
}

func (c *utilityMetricFreshnessConfig) Decode(in interface{}) error {
return iconfig.Decode(in, c)
}

func (c *utilityMetricFreshnessConfig) Validate() error {
if c.Threshold == "" {
return fmt.Errorf("threshold: %v", errors.ErrMissingRequiredOption)
}

if c.Object.SourceKey == "" {
return fmt.Errorf("object_source_key: %v", errors.ErrMissingRequiredOption)
}

return nil
}

func newUtilityMetricFreshness(ctx context.Context, cfg config.Config) (*utilityMetricFreshness, error) {
conf := utilityMetricFreshnessConfig{}
if err := conf.Decode(cfg.Settings); err != nil {
return nil, fmt.Errorf("transform: utility_metric_freshness: %v", err)
}

if err := conf.Validate(); err != nil {
return nil, fmt.Errorf("transform: utility_metric_freshness: %v", err)
}

m, err := metrics.New(ctx, conf.Metric.Destination)
if err != nil {
return nil, fmt.Errorf("transform: utility_metric_freshness: %v", err)
}

dur, err := time.ParseDuration(conf.Threshold)
if err != nil {
return nil, fmt.Errorf("transform: utility_metric_freshness: duration: %v", err)
}

tf := utilityMetricFreshness{
conf: conf,
metric: m,
dur: dur,
}

return &tf, nil
}

type utilityMetricFreshness struct {
conf utilityMetricFreshnessConfig
metric metrics.Generator
dur time.Duration

success uint32
failure uint32
}

func (tf *utilityMetricFreshness) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) {
// ctrl messages are handled by only one thread, so the map
// updates below are safe for concurrency.
if msg.IsControl() {
tf.conf.Metric.Attributes["FreshnessType"] = "Success"
if err := tf.metric.Generate(ctx, metrics.Data{
Name: tf.conf.Metric.Name,
Value: tf.success,
Attributes: tf.conf.Metric.Attributes,
}); err != nil {
return nil, fmt.Errorf("transform: utility_metric_freshness: %v", err)
}

tf.conf.Metric.Attributes["FreshnessType"] = "Failure"
if err := tf.metric.Generate(ctx, metrics.Data{
Name: tf.conf.Metric.Name,
Value: tf.failure,
Attributes: tf.conf.Metric.Attributes,
}); err != nil {
return nil, fmt.Errorf("transform: utility_metric_freshness: %v", err)
}

atomic.StoreUint32(&tf.success, 0)
atomic.StoreUint32(&tf.failure, 0)
return []*message.Message{msg}, nil
}

// This is a time value expected to be in nanoseconds.
val := msg.GetValue(tf.conf.Object.SourceKey).Int()
if val == 0 {
return []*message.Message{msg}, nil
}

ts := time.Unix(0, val)
if time.Since(ts) < tf.dur {
atomic.AddUint32(&tf.success, 1)
} else {
atomic.AddUint32(&tf.failure, 1)
}

return []*message.Message{msg}, nil
}

func (tf *utilityMetricFreshness) String() string {
b, _ := json.Marshal(tf.conf)
return string(b)
}

0 comments on commit 04ed699

Please sign in to comment.