Skip to content

Commit

Permalink
feat: Customizable Sink Files (#93)
Browse files Browse the repository at this point in the history
* feat: init s3 path options

* refactor: remove print debug

* docs: comments

* fix: path key order

* feat: file sink, refactor filePath

* feat: add file lib

* feat: force sink file

* refactor: force sink logic

* refactor: dateformat to timeformat

* feat: unix timeformat support

* feat: dynamic file formats

* fix: text_gzip

* feat: format, compression support

* build: pkgs, configs, examples

* docs: todo style

* docs: errs

* test: file path

* refactor: backcompat, close
  • Loading branch information
jshlbrd committed Apr 18, 2023
1 parent 110253b commit bee2463
Show file tree
Hide file tree
Showing 11 changed files with 674 additions and 103 deletions.
25 changes: 22 additions & 3 deletions build/config/substation.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,16 @@
settings: { stream: null },
},
aws_s3: {
settings: { bucket: null, prefix: null, prefix_key: null },
// TODO(v1.0.0): remove legacy prefix and prefix_key
// TODO(v1.0.0): set format and compression defaults
settings: { bucket: null, prefix: null, prefix_key: null, file_path: null, file_format: null, file_compression: null },
},
aws_sqs: {
settings: { queue: null },
},
file: {
settings: { file_path: null, file_format: { type: 'json' }, file_compression: { type: 'gzip' } },
},
grpc: {
settings: { server: null, timeout: null, certificate: null },
},
Expand Down Expand Up @@ -550,8 +555,16 @@
settings: s,
},
aws_s3(settings=$.defaults.sink.aws_s3.settings): {
local s = std.mergePatch($.defaults.sink.aws_s3.settings, settings),

local s =
// if prefix or prefix_key exists, then the legacy object name style is used
// if path exists, then the new object name style is used
// TODO(v1.0.0)
if ( std.objectHas(settings, 'prefix') || std.objectHas(settings, 'prefix_key') ) || std.objectHas(settings, 'file_path')
then std.mergePatch($.defaults.sink.aws_s3.settings, settings)
// default settings for the new object name style
// this provides back compatibility with v0.8.4
else std.mergePatch({file_path: { time_format: '2006/01/02', uuid: true, extension: true }}, settings),

type: 'aws_s3',
settings: s,
},
Expand All @@ -561,6 +574,12 @@
type: 'aws_sqs',
settings: s,
},
file(settings=$.defaults.sink.file.settings): {
local s = std.mergePatch($.defaults.sink.file.settings, settings),

type: 'file',
settings: s,
},
grpc(settings=$.defaults.sink.grpc.settings): {
local s = std.mergePatch($.defaults.sink.grpc.settings, settings),

Expand Down
35 changes: 25 additions & 10 deletions cmd/development/substation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,22 +186,37 @@ func mutateSink(cfg io.Reader, forceSink string) (*bytes.Reader, error) {
return nil, fmt.Errorf("run: %v", err)
}

var r *bytes.Reader
// removes the configured sink
oldConfig, err = json.Delete(oldConfig, "sink")
if err != nil {
return nil, fmt.Errorf("run: %v", err)
}

var newSink string

switch {
case forceSink == "stdout":
newConfig, err := json.Set(oldConfig, "sink.type", forceSink)
if err != nil {
return nil, fmt.Errorf("run: %v", err)
}
r = bytes.NewReader(newConfig)
case strings.HasPrefix(forceSink, "http://"):
return nil, fmt.Errorf("-force-sink http://* not yet implemented")
case forceSink == "stdout" || forceSink == "file":
newSink = fmt.Sprintf(`{"type": "%s"}`, forceSink)
case strings.HasPrefix(forceSink, "file://"):
newSink = fmt.Sprintf(
`{"type": "file", "settings": {"file_path": {"suffix": "%s"}}}`,
strings.TrimPrefix(forceSink, "file://"),
)
case strings.HasPrefix(forceSink, "http://") || strings.HasPrefix(forceSink, "https://"):
newSink = fmt.Sprintf(
`{"type": "http", "settings": {"url": "%s"}}`,
forceSink,
)
case strings.HasPrefix(forceSink, "s3://"):
return nil, fmt.Errorf("-force-sink s3://* not yet implemented")
default:
return nil, fmt.Errorf("%q not supported for -force-sink", forceSink)
}

return r, nil
newConfig, err := json.Set(oldConfig, "sink", newSink)
if err != nil {
return nil, fmt.Errorf("run: %v", err)
}

return bytes.NewReader(newConfig), nil
}
10 changes: 9 additions & 1 deletion examples/aws/pipeline/config/const.libsonnet
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
{
// change uuid to match the resource created by Terraform
s3_bucket: 'uuid-substation',
s3_bucket: 'uuid-substation',
// supports: json, text, data
file_format: {
type: 'json',
},
// supports: gzip, snappy, zstd
file_compression: {
type: 'gzip',
},
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,21 @@ local sub = import '../../../../../build/config/substation.libsonnet';
local const = import '../const.libsonnet';

{
// writes objects to this S3 path: {const.s3_bucket}/processed/2022/01/01/*
sink: sub.interfaces.sink.aws_s3(settings={bucket:const.s3_bucket, prefix:'processed'}),
sink: sub.interfaces.sink.aws_s3(
settings={
// change S3 bucket uuid in const to match the resource created by Terraform
bucket: const.s3_bucket,
// file path becomes processed/2006/01/02/uuid.extension
file_path: {
prefix: 'processed',
time_format: '2006/01/02',
uuid: true,
extension: true,
},
file_format: const.file_format,
file_compression: const.file_compression,
}
),
// use the transfer transform so data is not modified in transit
transform: {
type: 'transfer',
Expand Down
18 changes: 15 additions & 3 deletions examples/aws/pipeline/config/substation_raw_s3_sink/config.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,21 @@ local sub = import '../../../../../build/config/substation.libsonnet';
local const = import '../const.libsonnet';

{
// writes objects to this S3 path: uuid-substation/raw/2022/01/01/*
// change uuid to match the resource created by Terraform
sink: sub.interfaces.sink.aws_s3(settings={bucket:const.s3_bucket, prefix:'raw'}),
sink: sub.interfaces.sink.aws_s3(
settings={
// change S3 bucket uuid in const to match the resource created by Terraform
bucket: const.s3_bucket,
// file path becomes raw/2006/01/02/uuid.extension
file_path: {
prefix: 'raw',
time_format: '2006/01/02',
uuid: true,
extension: true,
},
file_format: const.file_format,
file_compression: const.file_compression,
}
),
// use the transfer transform so data is not modified in transit
transform: {
type: 'transfer',
Expand Down
9 changes: 6 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/ip2location/ip2location-go/v9 v9.5.0
github.com/itchyny/gojq v0.12.11
github.com/jshlbrd/go-aggregate v0.1.1
github.com/klauspost/compress v1.16.4
github.com/oschwald/geoip2-golang v1.8.0
github.com/oschwald/maxminddb-golang v1.10.0
github.com/sirupsen/logrus v1.9.0
Expand All @@ -25,23 +26,25 @@ require (
golang.org/x/net v0.7.0
golang.org/x/sync v0.1.0
google.golang.org/grpc v1.51.0
google.golang.org/protobuf v1.28.1
google.golang.org/protobuf v1.30.0
)

require (
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/itchyny/timefmt-go v0.1.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.15.13 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.43.0 // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
sigs.k8s.io/yaml v1.1.0 // indirect
)
32 changes: 21 additions & 11 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-jsonnet v0.19.1 h1:MORxkrG0elylUqh36R4AcSPX0oZQa9hvI3lroN+kDhs=
github.com/google/go-jsonnet v0.19.1/go.mod h1:5JVT33JVCoehdTj5Z2KJq1eIdt3Nb8PCmZ+W5D8U350=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
Expand Down Expand Up @@ -48,10 +49,11 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC
github.com/jshlbrd/go-aggregate v0.1.1 h1:/Ty43T/IZSwLYU4LLABTDnOFRc9asCYUhcWAbTgu3l0=
github.com/jshlbrd/go-aggregate v0.1.1/go.mod h1:9MOvGvxcqZj99AZa8GeHbn5lXlxk+po9lJbRntVvYzc=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.13 h1:NFn1Wr8cfnenSJSA46lLq4wHCcBzKTSjnBIexDMMOV0=
github.com/klauspost/compress v1.15.13/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/klauspost/compress v1.16.4 h1:91KN02FnsOYhuunwU4ssRe8lc2JosWmizWa91B5v1PU=
github.com/klauspost/compress v1.16.4/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
Expand Down Expand Up @@ -96,10 +98,14 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
Expand All @@ -113,6 +119,7 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand All @@ -137,10 +144,11 @@ golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.5.0 h1:+bSpV5HIeWkuvgaMfI3UmKRThoTA5ODJTUd8T17NO+4=
golang.org/x/tools v0.5.0/go.mod h1:N+Kgy78s5I24c24dU8OfWNEotWjutIs8SnJvn5IDq+k=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef h1:uQ2vjV/sHTsWSqdKeLqmwitzgvjMl7o4IdtHwUDXSJY=
Expand All @@ -149,16 +157,18 @@ google.golang.org/grpc v1.51.0 h1:E1eGv1FTqoLIdnBCZufiSHgKjlqG6fKFf6pPWtMTh8U=
google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs=
Expand Down
Loading

0 comments on commit bee2463

Please sign in to comment.