Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transform): Add Storage Class Support to AWS S3 #189

Merged
merged 4 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions build/config/substation.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,9 @@
},
get(settings={}): {
local type = 'enrich_http_get',
local default = $.transform.enrich.http.default { id: $.helpers.id(type, settings)},
local default = $.transform.enrich.http.default { id: $.helpers.id(type, settings) },

type: type,
type: type,
settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))),
},
post(settings={}): {
Expand All @@ -439,9 +439,9 @@
},
get(settings={}): {
local type = 'enrich_kv_store_get',
local default = $.transform.enrich.kv_store.default {id: $.helpers.id(type, settings)},
local default = $.transform.enrich.kv_store.default { id: $.helpers.id(type, settings) },

type: type,
type: type,
settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))),
},
set(settings={}): {
Expand Down Expand Up @@ -564,10 +564,10 @@
meta: {
err(settings={}): {
local type = 'meta_err',
local default = {
local default = {
id: $.helpers.id(type, settings),
transform: null,
error_messages: null,
transform: null,
error_messages: null,
},

type: type,
Expand Down Expand Up @@ -627,9 +627,9 @@
},
switch(settings={}): {
local type = 'meta_switch',
local default = {
local default = {
id: $.helpers.id(type, settings),
cases: null
cases: null,
},

type: type,
Expand Down Expand Up @@ -696,9 +696,9 @@
},
jq(settings={}): {
local type = 'object_jq',
local default = {
local default = {
id: $.helpers.id(type, settings),
filter: null
filter: null,
},

type: type,
Expand Down Expand Up @@ -831,6 +831,7 @@
aws: $.config.aws,
retry: $.config.retry,
bucket_name: null,
storage_class: 'STANDARD',
file_path: $.file_path,
},

Expand Down Expand Up @@ -1012,7 +1013,7 @@
local type = 'string_to_upper',
local default = $.transform.string.to.default { id: $.helpers.id(type, settings) },

type: type,
type: type,
settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))),
},
snake(settings={}): {
Expand Down Expand Up @@ -1193,9 +1194,9 @@
},
secret(settings={}): {
local type = 'utility_secret',
local default = {
local default = {
id: $.helpers.id(type, settings),
secret: null
secret: null,
},

type: type,
Expand Down Expand Up @@ -1378,6 +1379,6 @@
trg: null,
batch_key: if std.objectHas(s, 'btch') then s.batch else if std.objectHas(s, 'batch_key') then s.batch_key else null,
},
id(type, settings): std.join("-", [std.md5(type)[:8], std.md5(std.toString(settings))[:8]])
id(type, settings): std.join('-', [std.md5(type)[:8], std.md5(std.toString(settings))[:8]]),
},
}
9 changes: 6 additions & 3 deletions build/config/substation_test.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ local inspector = sub.condition.format.json();
{
condition: {
number: {
equal_to: sub.condition.number.equal_to({obj: {src: src}, value: 1}),
less_than: sub.condition.number.less_than({obj: {src: src}, value: 1}),
greater_than: sub.condition.number.greater_than({obj: {src: src}, value: 1}),
equal_to: sub.condition.number.equal_to({ obj: { src: src }, value: 1 }),
less_than: sub.condition.number.less_than({ obj: { src: src }, value: 1 }),
greater_than: sub.condition.number.greater_than({ obj: { src: src }, value: 1 }),
},
},
transform: {
send: {
aws: {
s3: sub.transform.send.aws.s3({ bucket: 'my-bucket' }),
},
http: {
post: sub.transform.send.http.post({
url: 'http://localhost:8080',
Expand Down
23 changes: 23 additions & 0 deletions examples/config/transform/send/aws_s3_glacier/config.jsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// This example configures a storage class for the AWS S3 destination transform.
// The Glacier Instant Retrieval class is recommended for archival data that is
// compatible with Substation's serverless architecture; this class can be read
// directly by a Lambda function triggered by an SNS notification.
local sub = import '../../../../../build/config/substation.libsonnet';

{
concurrency: 1,
transforms: [
sub.tf.send.aws.s3({
// Glacier Instant Retrieval charges a minimum of 128KB per object, otherwise
// the other values are set to impossibly high values to ensure all events are
// written to the same file.
batch: { size: 128 * 1000, count: 1000 * 1000, duration: '60m' },
bucket_name: 'substation',
storage_class: 'GLACIER_IR', // Glacier Instant Retrieval.
// S3 objects are organized by time to the nearest hour and have a UUID filename.
file_path: { time_format: '2006/01/02/15', uuid: true, suffix: '.jsonl.gz' },
// This example formats the data as JSONL and compresses it with Gzip.
aux_tforms: sub.pattern.tf.fmt.jsonl + [sub.tf.fmt.to.gzip()],
}),
],
}
16 changes: 11 additions & 5 deletions internal/aws/s3manager/s3manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (a *UploaderAPI) IsEnabled() bool {
}

// Upload is a convenience wrapper for uploading an object to S3.
func (a *UploaderAPI) Upload(ctx aws.Context, bucket, key string, src io.Reader) (*s3manager.UploadOutput, error) {
func (a *UploaderAPI) Upload(ctx aws.Context, bucket, key, storageClass string, src io.Reader) (*s3manager.UploadOutput, error) {
// temporary file is used so that the src can have its content identified and be uploaded to S3
dst, err := os.CreateTemp("", "substation")
if err != nil {
Expand All @@ -106,11 +106,17 @@ func (a *UploaderAPI) Upload(ctx aws.Context, bucket, key string, src io.Reader)
if _, err := dst.Seek(0, 0); err != nil {
return nil, fmt.Errorf("s3manager upload bucket %s key %s: %v", bucket, key, err)
}

if storageClass == "" {
storageClass = "STANDARD"
}

input := &s3manager.UploadInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: dst,
ContentType: aws.String(mediaType),
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: dst,
ContentType: aws.String(mediaType),
StorageClass: aws.String(storageClass),
}

ctx = context.WithoutCancel(ctx)
Expand Down
2 changes: 1 addition & 1 deletion internal/aws/s3manager/s3manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestUpload(t *testing.T) {
}

src := strings.NewReader("foo")
resp, err := a.Upload(ctx, test.input.bucket, test.input.key, src)
resp, err := a.Upload(ctx, test.input.bucket, test.input.key, "", src)
if err != nil {
t.Fatalf("%d, unexpected error", err)
}
Expand Down
14 changes: 13 additions & 1 deletion transform/send_aws_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"encoding/json"
"fmt"
"os"
"slices"
"sync"

"github.com/aws/aws-sdk-go/service/s3"
"github.com/brexhq/substation/config"
"github.com/brexhq/substation/message"

Expand All @@ -21,6 +23,8 @@ import (
type sendAWSS3Config struct {
// BucketName is the AWS S3 bucket that data is written to.
BucketName string `json:"bucket_name"`
// StorageClass is the storage class of the object.
StorageClass string `json:"storage_class"`
// FilePath determines how the name of the uploaded object is constructed.
// See filePath.New for more information.
FilePath file.Path `json:"file_path"`
Expand All @@ -45,6 +49,10 @@ func (c *sendAWSS3Config) Validate() error {
return fmt.Errorf("bucket_name: %v", errors.ErrMissingRequiredOption)
}

if !slices.Contains(s3.StorageClass_Values(), c.StorageClass) {
return fmt.Errorf("storage_class: %v", errors.ErrInvalidOption)
}

return nil
}

Expand All @@ -58,6 +66,10 @@ func newSendAWSS3(_ context.Context, cfg config.Config) (*sendAWSS3, error) {
conf.ID = "send_aws_s3"
}

if conf.StorageClass == "" {
conf.StorageClass = "STANDARD"
}

if err := conf.Validate(); err != nil {
return nil, fmt.Errorf("transform %s: %v", conf.ID, err)
}
Expand Down Expand Up @@ -192,7 +204,7 @@ func (tf *sendAWSS3) send(ctx context.Context, key string) error {
}
defer f.Close()

if _, err := tf.client.Upload(ctx, tf.conf.BucketName, filePath, f); err != nil {
if _, err := tf.client.Upload(ctx, tf.conf.BucketName, filePath, tf.conf.StorageClass, f); err != nil {
return err
}

Expand Down
Loading