Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transform): Add Metric Freshness Transform #178

Merged
merged 3 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions build/config/substation.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,16 @@
type: 'utility_metric_count',
settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))),
},
freshness(settings={}): {
local default = {
threshold: null,
metric: $.config.metric,
object: $.config.object,
},

type: 'utility_metric_freshness',
settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))),
},
},
secret(settings={}): {
local default = { secret: null },
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// This example shows how to use the `utility_metric_freshness` transform to
// determine if data was processed by the system within a certain time frame.
//
// Freshness is calculated by comparing a time value in the message to the current
// time and determining if the difference is less than a threshold:
// - Success: current time - timestamp < threshold
// - Failure: current time - timestamp >= threshold
//
// The transform emits two metrics that describe success and failure, annotated
// in the `FreshnessType` attribute.
local sub = import '../../../../../build/config/substation.libsonnet';

local attr = { AppName: 'example' };
local dest = { type: 'aws_cloudwatch_embedded_metrics' };

{
transforms: [
sub.transform.utility.metric.freshness({
threshold: '5s', // Amount of time spent in the system before considered stale.
object: { source_key: 'timestamp' }, // Used as the reference to determine freshness.
metric: { name: 'MessageFreshness', attributes: attr, destination: dest }
}),
],
}
2 changes: 2 additions & 0 deletions transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ func New(ctx context.Context, cfg config.Config) (Transformer, error) { //nolint
return newUtilityMetricBytes(ctx, cfg)
case "utility_metric_count":
return newUtilityMetricCount(ctx, cfg)
case "utility_metric_freshness":
return newUtilityMetricFreshness(ctx, cfg)
case "utility_secret":
return newUtilitySecret(ctx, cfg)
default:
Expand Down
124 changes: 124 additions & 0 deletions transform/utility_metric_freshness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package transform

import (
"context"
"encoding/json"
"fmt"
"sync/atomic"
"time"

"github.com/brexhq/substation/config"
iconfig "github.com/brexhq/substation/internal/config"
"github.com/brexhq/substation/internal/errors"
"github.com/brexhq/substation/internal/metrics"
"github.com/brexhq/substation/message"
)

type utilityMetricFreshnessConfig struct {
Threshold string `json:"threshold"`

Object iconfig.Object `json:"object"`
Metric iconfig.Metric `json:"metric"`
}

func (c *utilityMetricFreshnessConfig) Decode(in interface{}) error {
return iconfig.Decode(in, c)
}

func (c *utilityMetricFreshnessConfig) Validate() error {
if c.Threshold == "" {
return fmt.Errorf("threshold: %v", errors.ErrMissingRequiredOption)
}

if c.Object.SourceKey == "" {
return fmt.Errorf("object_source_key: %v", errors.ErrMissingRequiredOption)
}

return nil
}

func newUtilityMetricFreshness(ctx context.Context, cfg config.Config) (*utilityMetricFreshness, error) {
conf := utilityMetricFreshnessConfig{}
if err := conf.Decode(cfg.Settings); err != nil {
return nil, fmt.Errorf("transform: utility_metric_freshness: %v", err)
}

if err := conf.Validate(); err != nil {
return nil, fmt.Errorf("transform: utility_metric_freshness: %v", err)
}

m, err := metrics.New(ctx, conf.Metric.Destination)
if err != nil {
return nil, fmt.Errorf("transform: utility_metric_freshness: %v", err)
}

dur, err := time.ParseDuration(conf.Threshold)
if err != nil {
return nil, fmt.Errorf("transform: utility_metric_freshness: duration: %v", err)
}

tf := utilityMetricFreshness{
conf: conf,
metric: m,
dur: dur,
}

return &tf, nil
}

type utilityMetricFreshness struct {
conf utilityMetricFreshnessConfig
metric metrics.Generator
dur time.Duration

success uint32
failure uint32
}

func (tf *utilityMetricFreshness) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) {
// ctrl messages are handled by only one process, so the map
jshlbrd marked this conversation as resolved.
Show resolved Hide resolved
// updates below are safe for concurrency.
if msg.IsControl() {
tf.conf.Metric.Attributes["FreshnessType"] = "Success"
if err := tf.metric.Generate(ctx, metrics.Data{
Name: tf.conf.Metric.Name,
Value: tf.success,
Attributes: tf.conf.Metric.Attributes,
}); err != nil {
return nil, fmt.Errorf("transform: utility_metric_freshness: %v", err)
}

tf.conf.Metric.Attributes["FreshnessType"] = "Failure"
if err := tf.metric.Generate(ctx, metrics.Data{
Name: tf.conf.Metric.Name,
Value: tf.failure,
Attributes: tf.conf.Metric.Attributes,
}); err != nil {
return nil, fmt.Errorf("transform: utility_metric_freshness: %v", err)
}

atomic.StoreUint32(&tf.success, 0)
atomic.StoreUint32(&tf.failure, 0)
return []*message.Message{msg}, nil
}

// This is a time value expected to be in nanoseconds.
val := msg.GetValue(tf.conf.Object.SourceKey).Int()
shellcromancer marked this conversation as resolved.
Show resolved Hide resolved
if val == 0 {
return []*message.Message{msg}, nil
}

ts := time.Unix(0, val)
if time.Since(ts) < tf.dur {
atomic.AddUint32(&tf.success, 1)
} else {
atomic.AddUint32(&tf.failure, 1)
}

return []*message.Message{msg}, nil
}

func (tf *utilityMetricFreshness) String() string {
b, _ := json.Marshal(tf.conf)
return string(b)
}
Loading