From 63b96be941f374bed36d37f3ea01dc0eee7449ba Mon Sep 17 00:00:00 2001 From: Josh Liburdi Date: Mon, 1 Jul 2024 13:59:19 -0700 Subject: [PATCH] feat(transform): Add Storage Class Support to AWS S3 (#189) * feat(transform): Add Storage Class Support to S3 * chore: Add Check for Valid Storage Class * docs: Add Example for Glacier IR --- build/config/substation.libsonnet | 31 ++++++++++--------- build/config/substation_test.jsonnet | 9 ++++-- .../send/aws_s3_glacier/config.jsonnet | 23 ++++++++++++++ internal/aws/s3manager/s3manager.go | 16 +++++++--- internal/aws/s3manager/s3manager_test.go | 2 +- transform/send_aws_s3.go | 14 ++++++++- 6 files changed, 70 insertions(+), 25 deletions(-) create mode 100644 examples/config/transform/send/aws_s3_glacier/config.jsonnet diff --git a/build/config/substation.libsonnet b/build/config/substation.libsonnet index 16ced0ea..a9fd38c5 100644 --- a/build/config/substation.libsonnet +++ b/build/config/substation.libsonnet @@ -417,9 +417,9 @@ }, get(settings={}): { local type = 'enrich_http_get', - local default = $.transform.enrich.http.default { id: $.helpers.id(type, settings)}, + local default = $.transform.enrich.http.default { id: $.helpers.id(type, settings) }, - type: type, + type: type, settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))), }, post(settings={}): { @@ -439,9 +439,9 @@ }, get(settings={}): { local type = 'enrich_kv_store_get', - local default = $.transform.enrich.kv_store.default {id: $.helpers.id(type, settings)}, + local default = $.transform.enrich.kv_store.default { id: $.helpers.id(type, settings) }, - type: type, + type: type, settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))), }, set(settings={}): { @@ -564,10 +564,10 @@ meta: { err(settings={}): { local type = 'meta_err', - local default = { + local default = { id: $.helpers.id(type, settings), - transform: null, - error_messages: null, + transform: null, + error_messages: null, }, type: type, @@ -627,9 +627,9 @@ }, switch(settings={}): { local type = 'meta_switch', - local default = { + local default = { id: $.helpers.id(type, settings), - cases: null + cases: null, }, type: type, @@ -696,9 +696,9 @@ }, jq(settings={}): { local type = 'object_jq', - local default = { + local default = { id: $.helpers.id(type, settings), - filter: null + filter: null, }, type: type, @@ -831,6 +831,7 @@ aws: $.config.aws, retry: $.config.retry, bucket_name: null, + storage_class: 'STANDARD', file_path: $.file_path, }, @@ -1012,7 +1013,7 @@ local type = 'string_to_upper', local default = $.transform.string.to.default { id: $.helpers.id(type, settings) }, - type: type, + type: type, settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))), }, snake(settings={}): { @@ -1193,9 +1194,9 @@ }, secret(settings={}): { local type = 'utility_secret', - local default = { + local default = { id: $.helpers.id(type, settings), - secret: null + secret: null, }, type: type, @@ -1378,6 +1379,6 @@ trg: null, batch_key: if std.objectHas(s, 'btch') then s.batch else if std.objectHas(s, 'batch_key') then s.batch_key else null, }, - id(type, settings): std.join("-", [std.md5(type)[:8], std.md5(std.toString(settings))[:8]]) + id(type, settings): std.join('-', [std.md5(type)[:8], std.md5(std.toString(settings))[:8]]), }, } diff --git a/build/config/substation_test.jsonnet b/build/config/substation_test.jsonnet index ade5344a..8e9be890 100644 --- a/build/config/substation_test.jsonnet +++ b/build/config/substation_test.jsonnet @@ -9,13 +9,16 @@ local inspector = sub.condition.format.json(); { condition: { number: { - equal_to: sub.condition.number.equal_to({obj: {src: src}, value: 1}), - less_than: sub.condition.number.less_than({obj: {src: src}, value: 1}), - greater_than: sub.condition.number.greater_than({obj: {src: src}, value: 1}), + equal_to: sub.condition.number.equal_to({ obj: { src: src }, value: 1 }), + less_than: sub.condition.number.less_than({ obj: { src: src }, value: 1 }), + greater_than: sub.condition.number.greater_than({ obj: { src: src }, value: 1 }), }, }, transform: { send: { + aws: { + s3: sub.transform.send.aws.s3({ bucket: 'my-bucket' }), + }, http: { post: sub.transform.send.http.post({ url: 'http://localhost:8080', diff --git a/examples/config/transform/send/aws_s3_glacier/config.jsonnet b/examples/config/transform/send/aws_s3_glacier/config.jsonnet new file mode 100644 index 00000000..76667c8e --- /dev/null +++ b/examples/config/transform/send/aws_s3_glacier/config.jsonnet @@ -0,0 +1,23 @@ +// This example configures a storage class for the AWS S3 destination transform. +// The Glacier Instant Retrieval class is recommended for archival data that is +// compatible with Substation's serverless architecture; this class can be read +// directly by a Lambda function triggered by an SNS notification. +local sub = import '../../../../../build/config/substation.libsonnet'; + +{ + concurrency: 1, + transforms: [ + sub.tf.send.aws.s3({ + // Glacier Instant Retrieval charges a minimum of 128KB per object, otherwise + // the other values are set to impossibly high values to ensure all events are + // written to the same file. + batch: { size: 128 * 1000, count: 1000 * 1000, duration: '60m' }, + bucket_name: 'substation', + storage_class: 'GLACIER_IR', // Glacier Instant Retrieval. + // S3 objects are organized by time to the nearest hour and have a UUID filename. + file_path: { time_format: '2006/01/02/15', uuid: true, suffix: '.jsonl.gz' }, + // This example formats the data as JSONL and compresses it with Gzip. + aux_tforms: sub.pattern.tf.fmt.jsonl + [sub.tf.fmt.to.gzip()], + }), + ], +} diff --git a/internal/aws/s3manager/s3manager.go b/internal/aws/s3manager/s3manager.go index 0be97690..9f0a89ef 100644 --- a/internal/aws/s3manager/s3manager.go +++ b/internal/aws/s3manager/s3manager.go @@ -85,7 +85,7 @@ func (a *UploaderAPI) IsEnabled() bool { } // Upload is a convenience wrapper for uploading an object to S3. -func (a *UploaderAPI) Upload(ctx aws.Context, bucket, key string, src io.Reader) (*s3manager.UploadOutput, error) { +func (a *UploaderAPI) Upload(ctx aws.Context, bucket, key, storageClass string, src io.Reader) (*s3manager.UploadOutput, error) { // temporary file is used so that the src can have its content identified and be uploaded to S3 dst, err := os.CreateTemp("", "substation") if err != nil { @@ -106,11 +106,17 @@ func (a *UploaderAPI) Upload(ctx aws.Context, bucket, key string, src io.Reader) if _, err := dst.Seek(0, 0); err != nil { return nil, fmt.Errorf("s3manager upload bucket %s key %s: %v", bucket, key, err) } + + if storageClass == "" { + storageClass = "STANDARD" + } + input := &s3manager.UploadInput{ - Bucket: aws.String(bucket), - Key: aws.String(key), - Body: dst, - ContentType: aws.String(mediaType), + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: dst, + ContentType: aws.String(mediaType), + StorageClass: aws.String(storageClass), } ctx = context.WithoutCancel(ctx) diff --git a/internal/aws/s3manager/s3manager_test.go b/internal/aws/s3manager/s3manager_test.go index d0ee37e6..5ae62b43 100644 --- a/internal/aws/s3manager/s3manager_test.go +++ b/internal/aws/s3manager/s3manager_test.go @@ -106,7 +106,7 @@ func TestUpload(t *testing.T) { } src := strings.NewReader("foo") - resp, err := a.Upload(ctx, test.input.bucket, test.input.key, src) + resp, err := a.Upload(ctx, test.input.bucket, test.input.key, "", src) if err != nil { t.Fatalf("%d, unexpected error", err) } diff --git a/transform/send_aws_s3.go b/transform/send_aws_s3.go index 0bf666f6..535e1191 100644 --- a/transform/send_aws_s3.go +++ b/transform/send_aws_s3.go @@ -5,8 +5,10 @@ import ( "encoding/json" "fmt" "os" + "slices" "sync" + "github.com/aws/aws-sdk-go/service/s3" "github.com/brexhq/substation/config" "github.com/brexhq/substation/message" @@ -21,6 +23,8 @@ import ( type sendAWSS3Config struct { // BucketName is the AWS S3 bucket that data is written to. BucketName string `json:"bucket_name"` + // StorageClass is the storage class of the object. + StorageClass string `json:"storage_class"` // FilePath determines how the name of the uploaded object is constructed. // See filePath.New for more information. FilePath file.Path `json:"file_path"` @@ -45,6 +49,10 @@ func (c *sendAWSS3Config) Validate() error { return fmt.Errorf("bucket_name: %v", errors.ErrMissingRequiredOption) } + if !slices.Contains(s3.StorageClass_Values(), c.StorageClass) { + return fmt.Errorf("storage_class: %v", errors.ErrInvalidOption) + } + return nil } @@ -58,6 +66,10 @@ func newSendAWSS3(_ context.Context, cfg config.Config) (*sendAWSS3, error) { conf.ID = "send_aws_s3" } + if conf.StorageClass == "" { + conf.StorageClass = "STANDARD" + } + if err := conf.Validate(); err != nil { return nil, fmt.Errorf("transform %s: %v", conf.ID, err) } @@ -192,7 +204,7 @@ func (tf *sendAWSS3) send(ctx context.Context, key string) error { } defer f.Close() - if _, err := tf.client.Upload(ctx, tf.conf.BucketName, filePath, f); err != nil { + if _, err := tf.client.Upload(ctx, tf.conf.BucketName, filePath, tf.conf.StorageClass, f); err != nil { return err }