Skip to content

Commit

Permalink
feat(transform): Add Storage Class Support to AWS S3 (#189)
Browse files Browse the repository at this point in the history
* feat(transform): Add Storage Class Support to S3

* chore: Add Check for Valid Storage Class

* docs: Add Example for Glacier IR
  • Loading branch information
jshlbrd committed Jul 1, 2024
1 parent 1e87f6f commit 63b96be
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 25 deletions.
31 changes: 16 additions & 15 deletions build/config/substation.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,9 @@
},
get(settings={}): {
local type = 'enrich_http_get',
local default = $.transform.enrich.http.default { id: $.helpers.id(type, settings)},
local default = $.transform.enrich.http.default { id: $.helpers.id(type, settings) },

type: type,
type: type,
settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))),
},
post(settings={}): {
Expand All @@ -439,9 +439,9 @@
},
get(settings={}): {
local type = 'enrich_kv_store_get',
local default = $.transform.enrich.kv_store.default {id: $.helpers.id(type, settings)},
local default = $.transform.enrich.kv_store.default { id: $.helpers.id(type, settings) },

type: type,
type: type,
settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))),
},
set(settings={}): {
Expand Down Expand Up @@ -564,10 +564,10 @@
meta: {
err(settings={}): {
local type = 'meta_err',
local default = {
local default = {
id: $.helpers.id(type, settings),
transform: null,
error_messages: null,
transform: null,
error_messages: null,
},

type: type,
Expand Down Expand Up @@ -627,9 +627,9 @@
},
switch(settings={}): {
local type = 'meta_switch',
local default = {
local default = {
id: $.helpers.id(type, settings),
cases: null
cases: null,
},

type: type,
Expand Down Expand Up @@ -696,9 +696,9 @@
},
jq(settings={}): {
local type = 'object_jq',
local default = {
local default = {
id: $.helpers.id(type, settings),
filter: null
filter: null,
},

type: type,
Expand Down Expand Up @@ -831,6 +831,7 @@
aws: $.config.aws,
retry: $.config.retry,
bucket_name: null,
storage_class: 'STANDARD',
file_path: $.file_path,
},

Expand Down Expand Up @@ -1012,7 +1013,7 @@
local type = 'string_to_upper',
local default = $.transform.string.to.default { id: $.helpers.id(type, settings) },

type: type,
type: type,
settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))),
},
snake(settings={}): {
Expand Down Expand Up @@ -1193,9 +1194,9 @@
},
secret(settings={}): {
local type = 'utility_secret',
local default = {
local default = {
id: $.helpers.id(type, settings),
secret: null
secret: null,
},

type: type,
Expand Down Expand Up @@ -1378,6 +1379,6 @@
trg: null,
batch_key: if std.objectHas(s, 'btch') then s.batch else if std.objectHas(s, 'batch_key') then s.batch_key else null,
},
id(type, settings): std.join("-", [std.md5(type)[:8], std.md5(std.toString(settings))[:8]])
id(type, settings): std.join('-', [std.md5(type)[:8], std.md5(std.toString(settings))[:8]]),
},
}
9 changes: 6 additions & 3 deletions build/config/substation_test.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ local inspector = sub.condition.format.json();
{
condition: {
number: {
equal_to: sub.condition.number.equal_to({obj: {src: src}, value: 1}),
less_than: sub.condition.number.less_than({obj: {src: src}, value: 1}),
greater_than: sub.condition.number.greater_than({obj: {src: src}, value: 1}),
equal_to: sub.condition.number.equal_to({ obj: { src: src }, value: 1 }),
less_than: sub.condition.number.less_than({ obj: { src: src }, value: 1 }),
greater_than: sub.condition.number.greater_than({ obj: { src: src }, value: 1 }),
},
},
transform: {
send: {
aws: {
s3: sub.transform.send.aws.s3({ bucket: 'my-bucket' }),
},
http: {
post: sub.transform.send.http.post({
url: 'http://localhost:8080',
Expand Down
23 changes: 23 additions & 0 deletions examples/config/transform/send/aws_s3_glacier/config.jsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// This example configures a storage class for the AWS S3 destination transform.
// The Glacier Instant Retrieval class is recommended for archival data that is
// compatible with Substation's serverless architecture; this class can be read
// directly by a Lambda function triggered by an SNS notification.
local sub = import '../../../../../build/config/substation.libsonnet';

{
concurrency: 1,
transforms: [
sub.tf.send.aws.s3({
// Glacier Instant Retrieval charges a minimum of 128KB per object, otherwise
// the other values are set to impossibly high values to ensure all events are
// written to the same file.
batch: { size: 128 * 1000, count: 1000 * 1000, duration: '60m' },
bucket_name: 'substation',
storage_class: 'GLACIER_IR', // Glacier Instant Retrieval.
// S3 objects are organized by time to the nearest hour and have a UUID filename.
file_path: { time_format: '2006/01/02/15', uuid: true, suffix: '.jsonl.gz' },
// This example formats the data as JSONL and compresses it with Gzip.
aux_tforms: sub.pattern.tf.fmt.jsonl + [sub.tf.fmt.to.gzip()],
}),
],
}
16 changes: 11 additions & 5 deletions internal/aws/s3manager/s3manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (a *UploaderAPI) IsEnabled() bool {
}

// Upload is a convenience wrapper for uploading an object to S3.
func (a *UploaderAPI) Upload(ctx aws.Context, bucket, key string, src io.Reader) (*s3manager.UploadOutput, error) {
func (a *UploaderAPI) Upload(ctx aws.Context, bucket, key, storageClass string, src io.Reader) (*s3manager.UploadOutput, error) {
// temporary file is used so that the src can have its content identified and be uploaded to S3
dst, err := os.CreateTemp("", "substation")
if err != nil {
Expand All @@ -106,11 +106,17 @@ func (a *UploaderAPI) Upload(ctx aws.Context, bucket, key string, src io.Reader)
if _, err := dst.Seek(0, 0); err != nil {
return nil, fmt.Errorf("s3manager upload bucket %s key %s: %v", bucket, key, err)
}

if storageClass == "" {
storageClass = "STANDARD"
}

input := &s3manager.UploadInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: dst,
ContentType: aws.String(mediaType),
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: dst,
ContentType: aws.String(mediaType),
StorageClass: aws.String(storageClass),
}

ctx = context.WithoutCancel(ctx)
Expand Down
2 changes: 1 addition & 1 deletion internal/aws/s3manager/s3manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestUpload(t *testing.T) {
}

src := strings.NewReader("foo")
resp, err := a.Upload(ctx, test.input.bucket, test.input.key, src)
resp, err := a.Upload(ctx, test.input.bucket, test.input.key, "", src)
if err != nil {
t.Fatalf("%d, unexpected error", err)
}
Expand Down
14 changes: 13 additions & 1 deletion transform/send_aws_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"encoding/json"
"fmt"
"os"
"slices"
"sync"

"github.com/aws/aws-sdk-go/service/s3"
"github.com/brexhq/substation/config"
"github.com/brexhq/substation/message"

Expand All @@ -21,6 +23,8 @@ import (
type sendAWSS3Config struct {
// BucketName is the AWS S3 bucket that data is written to.
BucketName string `json:"bucket_name"`
// StorageClass is the storage class of the object.
StorageClass string `json:"storage_class"`
// FilePath determines how the name of the uploaded object is constructed.
// See filePath.New for more information.
FilePath file.Path `json:"file_path"`
Expand All @@ -45,6 +49,10 @@ func (c *sendAWSS3Config) Validate() error {
return fmt.Errorf("bucket_name: %v", errors.ErrMissingRequiredOption)
}

if !slices.Contains(s3.StorageClass_Values(), c.StorageClass) {
return fmt.Errorf("storage_class: %v", errors.ErrInvalidOption)
}

return nil
}

Expand All @@ -58,6 +66,10 @@ func newSendAWSS3(_ context.Context, cfg config.Config) (*sendAWSS3, error) {
conf.ID = "send_aws_s3"
}

if conf.StorageClass == "" {
conf.StorageClass = "STANDARD"
}

if err := conf.Validate(); err != nil {
return nil, fmt.Errorf("transform %s: %v", conf.ID, err)
}
Expand Down Expand Up @@ -192,7 +204,7 @@ func (tf *sendAWSS3) send(ctx context.Context, key string) error {
}
defer f.Close()

if _, err := tf.client.Upload(ctx, tf.conf.BucketName, filePath, f); err != nil {
if _, err := tf.client.Upload(ctx, tf.conf.BucketName, filePath, tf.conf.StorageClass, f); err != nil {
return err
}

Expand Down

0 comments on commit 63b96be

Please sign in to comment.