Skip to content

Commit

Permalink
feat: Add Initial Support for Application Metrics (#25)
Browse files Browse the repository at this point in the history
* feat: init application metrics

* feat: add metrics to Terraform example

* refactor: move metrics to transforms

* docs: added comment on logging
  • Loading branch information
jshlbrd committed Sep 14, 2022
1 parent e310cb5 commit 30f103d
Show file tree
Hide file tree
Showing 18 changed files with 225 additions and 10 deletions.
1 change: 1 addition & 0 deletions cmd/aws/lambda/substation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ func sqsHandler(ctx context.Context, event events.SQSEvent) error {
msg.Md5OfBody,
msg.Attributes,
})

sub.SendTransform(cap)
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/file/substation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func file(ctx context.Context, filename string) error {
scanner := bufio.NewScanner(fileHandle)
scanner.Buffer([]byte{}, 100*1024*1024)

var count int
for scanner.Scan() {
switch scanMethod {
case "bytes":
Expand All @@ -108,6 +109,7 @@ func file(ctx context.Context, filename string) error {
}

sub.SendTransform(cap)
count++
}

sub.TransformSignal()
Expand Down
1 change: 1 addition & 0 deletions examples/aws/terraform/example_lambda_processor.tf
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ module "lambda_example_processor" {
"AWS_APPCONFIG_EXTENSION_PREFETCH_LIST" : "/applications/substation/environments/prod/configurations/substation_example_processor"
"SUBSTATION_HANDLER" : "KINESIS"
"SUBSTATION_DEBUG" : 1
"SUBSTATION_METRICS" : "AWS_CLOUDWATCH_EMBEDDED_METRICS"
}
tags = {
owner = "example"
Expand Down
1 change: 1 addition & 0 deletions examples/aws/terraform/example_lambda_sink_dynamodb.tf
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ module "lambda_example_dynamodb_sink" {
"AWS_APPCONFIG_EXTENSION_PREFETCH_LIST" : "/applications/substation/environments/prod/configurations/substation_example_dynamodb_sink"
"SUBSTATION_HANDLER" : "KINESIS"
"SUBSTATION_DEBUG" : 1
"SUBSTATION_METRICS" : "AWS_CLOUDWATCH_EMBEDDED_METRICS"
}
tags = {
owner = "example"
Expand Down
1 change: 1 addition & 0 deletions examples/aws/terraform/example_lambda_sink_processed_s3.tf
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ module "lambda_example_processed_s3_sink" {
"AWS_APPCONFIG_EXTENSION_PREFETCH_LIST" : "/applications/substation/environments/prod/configurations/substation_example_processed_s3_sink"
"SUBSTATION_HANDLER" : "KINESIS"
"SUBSTATION_DEBUG" : 1
"SUBSTATION_METRICS" : "AWS_CLOUDWATCH_EMBEDDED_METRICS"
}
tags = {
owner = "example"
Expand Down
1 change: 1 addition & 0 deletions examples/aws/terraform/example_lambda_sink_raw_s3.tf
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ module "lambda_example_raw_s3_sink" {
"AWS_APPCONFIG_EXTENSION_PREFETCH_LIST" : "/applications/substation/environments/prod/configurations/substation_example_raw_s3_sink"
"SUBSTATION_HANDLER" : "KINESIS"
"SUBSTATION_DEBUG" : 1
"SUBSTATION_METRICS" : "AWS_CLOUDWATCH_EMBEDDED_METRICS"
}
tags = {
owner = "example"
Expand Down
1 change: 1 addition & 0 deletions examples/aws/terraform/example_lambda_source_gateway.tf
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ module "lambda_example_gateway_source" {
"AWS_APPCONFIG_EXTENSION_PREFETCH_LIST" : "/applications/substation/environments/prod/configurations/substation_example_gateway_source"
"SUBSTATION_HANDLER" : "GATEWAY"
"SUBSTATION_DEBUG" : 1
"SUBSTATION_METRICS" : "AWS_CLOUDWATCH_EMBEDDED_METRICS"
}
tags = {
owner = "example"
Expand Down
1 change: 1 addition & 0 deletions examples/aws/terraform/example_lambda_source_s3.tf
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ module "lambda_example_s3_source" {
"AWS_APPCONFIG_EXTENSION_PREFETCH_LIST" : "/applications/substation/environments/prod/configurations/substation_example_s3_source"
"SUBSTATION_HANDLER" : "S3"
"SUBSTATION_DEBUG" : 1
"SUBSTATION_METRICS" : "AWS_CLOUDWATCH_EMBEDDED_METRICS"
}
tags = {
owner = "example"
Expand Down
1 change: 1 addition & 0 deletions examples/aws/terraform/example_lambda_source_sns.tf
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ module "lambda_example_sns_source" {
"AWS_APPCONFIG_EXTENSION_PREFETCH_LIST" : "/applications/substation/environments/prod/configurations/substation_example_sns_source"
"SUBSTATION_HANDLER" : "SNS"
"SUBSTATION_DEBUG" : 1
"SUBSTATION_METRICS" : "AWS_CLOUDWATCH_EMBEDDED_METRICS"
}
tags = {
owner = "example"
Expand Down
1 change: 1 addition & 0 deletions examples/aws/terraform/example_lambda_source_sqs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ module "lambda_example_sqs_source" {
"AWS_APPCONFIG_EXTENSION_PREFETCH_LIST" : "/applications/substation/environments/prod/configurations/substation_example_sqs_source"
"SUBSTATION_HANDLER" : "SQS"
"SUBSTATION_DEBUG" : 1
"SUBSTATION_METRICS" : "AWS_CLOUDWATCH_EMBEDDED_METRICS"
}
tags = {
owner = "example"
Expand Down
5 changes: 5 additions & 0 deletions internal/metrics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# metrics

Contains interfaces and methods for generating application metrics and sending them to external services. Metrics can be generated anywhere in the application and optionally sent to a single external service. The naming convention for metrics names and attributes is PascalCase, also known as upper camel case (e.g. UpperCamelCase).

Information for each metrics generator is available in the [GoDoc](https://pkg.go.dev/github.com/brexhq/substation/internal/metrics).
82 changes: 82 additions & 0 deletions internal/metrics/aws_cloudwatch_embedded_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package metrics

import (
"context"
"fmt"
"time"

"github.com/brexhq/substation/internal/json"
)

// AWSCloudWatchEmbeddedMetrics creates a metric in the AWS Embedded Metrics Format and writes it to standard output. This is the preferred method for generating metrics from AWS Lambda functions. Read more about the Embedded Metrics Format specification here: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html.
type AWSCloudWatchEmbeddedMetrics struct{}

/*
Generate creates a metric with the AWSCloudWatchEmbeddedMetrics metrics generator. All Attributes in the metrics.Data struct are inserted as CloudWatch Metrics dimensions; if the generator is invoked from an AWS Lambda function, then the function name is automatically added as a dimension. This method creates a JSON object with the structure shown below, where references are filled in from the metrics.Data struct:
{
"_aws": {
"Timestamp": $currentTime,
"CloudWatchMetrics": [
{
"Namespace": $metricsApplication,
"Dimensions": [
[
$data.Attributes.key
]
],
"Name": $data.Name,
}
]
},
$data.Attributes.key: $data.Attributes.value,
$data.Name: $data.Value
}
*/
func (m AWSCloudWatchEmbeddedMetrics) Generate(ctx context.Context, data Data) (err error) {
emf := []byte{}

// default values for CloudWatch metrics from Substation applications
// if the metrics are generated from AWS Lambda, then the function name is automatically tagged
emf, err = json.Set(emf, "_aws.Timestamp", time.Now().UnixMilli())
if err != nil {
return fmt.Errorf("metrics log_embedded_metrics: %v", err)
}

emf, err = json.Set(emf, "_aws.CloudWatchMetrics.0.Namespace", metricsApplication)
if err != nil {
return fmt.Errorf("metrics log_embedded_metrics: %v", err)
}

if metricsAWSLambdaFunctionName != "" {
attr := map[string]string{"FunctionName": metricsAWSLambdaFunctionName}
data.AddAttributes(attr)
}

for key, val := range data.Attributes {
emf, err = json.Set(emf, "_aws.CloudWatchMetrics.0.Dimensions.-1.-1", key)
if err != nil {
return fmt.Errorf("metrics log_embedded_metrics: %v", err)
}

emf, err = json.Set(emf, key, val)
if err != nil {
return fmt.Errorf("metrics log_embedded_metrics: %v", err)
}
}

emf, err = json.Set(emf, "_aws.CloudWatchMetrics.0.Metrics.0.Name", data.Name)
if err != nil {
return fmt.Errorf("metrics log_embedded_metrics: %v", err)
}

emf, err = json.Set(emf, data.Name, data.Value)
if err != nil {
return fmt.Errorf("metrics log_embedded_metrics: %v", err)
}

// logging EMF to standard out in AWS Lambda automatically sends metrics to CloudWatch
fmt.Println(string(emf))

return nil
}
87 changes: 87 additions & 0 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package metrics

import (
"context"
"fmt"
"os"

"github.com/brexhq/substation/internal/errors"
)

// unsupportedDestination is returned when an unsupported Metrics destination is referenced in Factory.
const unsupportedDestination = errors.Error("unsupportedDestination")

// referenced across all metrics generators
var metricsDestination string
var metricsApplication string

// used when generating metrics from AWS Lambda functions
var metricsAWSLambdaFunctionName string

func init() {
// determines if metrics should be generated across the application. the value from this environment variable is used to retrieve the metrics destination from the Factory.
if m, ok := os.LookupEnv("SUBSTATION_METRICS"); ok {
metricsDestination = m
}

metricsApplication = "Substation"

metricsAWSLambdaFunctionName, _ = os.LookupEnv("AWS_LAMBDA_FUNCTION_NAME")
}

// Data contains a metric that can be sent to external services.
type Data struct {
// Contextual information related to the metric. If the external service accepts key-value pairs (e.g., identifiers, tags), then this is passed directly to the service.
Attributes map[string]string

// A short name that describes the metric. This is passed directly to the external service and should use the upper camel case (UpperCamelCase) naming convention.
Name string

// The metric data point. This value is converted to the correct data type before being sent to the external service.
Value interface{}
}

// AddAttributes is a convenience method for adding attributes to a metric.
func (d *Data) AddAttributes(attr map[string]string) {
if d.Attributes == nil {
d.Attributes = make(map[string]string)
}

for key, val := range attr {
d.Attributes[key] = val
}
}

// Generator is an interface for creating a metric and sending it to an external service.
type Generator interface {
Generate(context.Context, Data) error
}

// Generate is a convenience function that encapsulates the Factory and creates a metric. If the SUBSTATION_METRICS environment variable is not set, then no metrics are created.
func Generate(ctx context.Context, data Data) error {
if metricsDestination == "" {
return nil
}

gen, err := Factory(metricsDestination)
if err != nil {
return err
}

if err := gen.Generate(ctx, data); err != nil {
return err
}

return nil
}

// Factory returns a configured metrics Generator. This is the recommended method for retrieving ready-to-use Generators.
func Factory(destination string) (Generator, error) {
switch destination {
case "AWS_CLOUDWATCH_EMBEDDED_METRICS":
var m AWSCloudWatchEmbeddedMetrics
return m, nil
default:
return nil, fmt.Errorf("metrics destination %s: %v", destination, unsupportedDestination)
}
}
5 changes: 3 additions & 2 deletions internal/sink/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,16 @@ func (sink *DynamoDB) Send(ctx context.Context, ch chan config.Capsule, kill cha
// PutItem err returns metadata
return fmt.Errorf("sink dynamodb: %v", err)
}

count++
}
}
}

log.WithField(
"count", count,
).WithField(
"table", sink.Table,
).WithField(
"count", count,
).Debug("put items into DynamoDB")

return nil
Expand Down
8 changes: 4 additions & 4 deletions internal/sink/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ func (sink *Kinesis) Send(ctx context.Context, ch chan config.Capsule, kill chan
}

log.WithField(
"count", buffer[aggregationKey].Count,
).WithField(
"stream", sink.Stream,
).WithField(
"partition_key", aggPK,
).WithField(
"count", buffer[aggregationKey].Count,
).Debug("put records into Kinesis")

buffer[aggregationKey].New()
Expand All @@ -123,11 +123,11 @@ func (sink *Kinesis) Send(ctx context.Context, ch chan config.Capsule, kill chan
}

log.WithField(
"count", buffer[aggregationKey].Count,
).WithField(
"stream", sink.Stream,
).WithField(
"partition_key", aggPK,
).WithField(
"count", buffer[aggregationKey].Count,
).Debug("put records into Kinesis")
}

Expand Down
8 changes: 4 additions & 4 deletions internal/sink/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ func (sink *S3) Send(ctx context.Context, ch chan config.Capsule, kill chan stru
}

log.WithField(
"count", buffer[prefix].Count(),
).WithField(
"bucket", sink.Bucket,
).WithField(
"key", key,
).WithField(
"count", buffer[prefix].Count(),
).Debug("uploaded data to S3")

buffer[prefix].Reset()
Expand Down Expand Up @@ -135,11 +135,11 @@ func (sink *S3) Send(ctx context.Context, ch chan config.Capsule, kill chan stru
}

log.WithField(
"count", count,
).WithField(
"bucket", sink.Bucket,
).WithField(
"key", key,
).WithField(
"count", buffer[prefix].Count(),
).Debug("uploaded data to S3")
}

Expand Down
15 changes: 15 additions & 0 deletions internal/transform/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/brexhq/substation/config"
"github.com/brexhq/substation/internal/metrics"
"github.com/brexhq/substation/process"
)

Expand Down Expand Up @@ -48,6 +49,7 @@ func (transform *Batch) Transform(ctx context.Context, in <-chan config.Capsule,
return err
}

var received int
// read encapsulated data from the input channel into a batch
batch := make([]config.Capsule, 0, 10)
for cap := range in {
Expand All @@ -56,6 +58,7 @@ func (transform *Batch) Transform(ctx context.Context, in <-chan config.Capsule,
return nil
default:
batch = append(batch, cap)
received++
}
}

Expand All @@ -65,6 +68,7 @@ func (transform *Batch) Transform(ctx context.Context, in <-chan config.Capsule,
return err
}

var sent int
// write the processed, encapsulated data to the output channel
// if a signal is received on the kill channel, then this is interrupted
for _, cap := range batch {
Expand All @@ -73,8 +77,19 @@ func (transform *Batch) Transform(ctx context.Context, in <-chan config.Capsule,
return nil
default:
out <- cap
sent++
}
}

metrics.Generate(ctx, metrics.Data{
Name: "CapsulesReceived",
Value: received,
})

metrics.Generate(ctx, metrics.Data{
Name: "CapsulesSent",
Value: sent,
})

return nil
}
Loading

0 comments on commit 30f103d

Please sign in to comment.