From e310cb5a8e1f32e52cb695764b88d58411a94ebc Mon Sep 17 00:00:00 2001 From: Josh Liburdi Date: Fri, 9 Sep 2022 11:46:34 -0700 Subject: [PATCH] feat: Improvements to Reading and Decoding Files (#24) * fix: edge case for gz files * feat: env var for reading non-text data --- cmd/app.go | 21 +++++++++++++++++++- cmd/aws/lambda/substation/main.go | 20 +++++++++++++++++-- cmd/file/substation/main.go | 13 ++++++++++++- internal/aws/s3manager/s3manager.go | 30 ++++++++++++++++++++++++----- 4 files changed, 75 insertions(+), 9 deletions(-) diff --git a/cmd/app.go b/cmd/app.go index 54218d97..2c0efd9c 100644 --- a/cmd/app.go +++ b/cmd/app.go @@ -151,7 +151,7 @@ func (sub *Substation) Sink(ctx context.Context, wg *sync.WaitGroup) { sub.DoneSignal() } -// GetConcurrency retrieves a concurrency value from the SUBSTATION_CONCURRENCY environment variable. If the environment variable is missing, then the concurrency value is the number of CPUs on the operating system. In native Substation applications, this value determines the number of transform goroutines; if set to 1, then multi-core processing is not enabled. +// GetConcurrency retrieves a concurrency value from the SUBSTATION_CONCURRENCY environment variable. If the environment variable is missing, then the concurrency value is the number of CPUs on the host. In native Substation applications, this value determines the number of transform goroutines; if set to 1, then multi-core processing is not enabled. func GetConcurrency() (int, error) { if val, found := os.LookupEnv("SUBSTATION_CONCURRENCY"); found { v, err := strconv.Atoi(val) @@ -163,3 +163,22 @@ func GetConcurrency() (int, error) { return runtime.NumCPU(), nil } + +/* +GetScanMethod retrieves a scan method from the SUBSTATION_SCAN_METHOD environment variable. This impacts the behavior of bufio scanners that are used throughout the application to read files. The options for this variable are: + +- "bytes" (https://pkg.go.dev/bufio#Scanner.Bytes) + +- "text" (https://pkg.go.dev/bufio#Scanner.Text) + +If the environment variable is missing, then the default method is "text". +*/ +func GetScanMethod() string { + if val, found := os.LookupEnv("SUBSTATION_SCAN_METHOD"); found { + if val == "bytes" || val == "text" { + return val + } + } + + return "text" +} diff --git a/cmd/aws/lambda/substation/main.go b/cmd/aws/lambda/substation/main.go index c796a3e2..7ce29e8d 100644 --- a/cmd/aws/lambda/substation/main.go +++ b/cmd/aws/lambda/substation/main.go @@ -24,6 +24,7 @@ import ( var sub cmd.Substation var concurrency int var handler string +var scanMethod string // LambdaMissingHandler is returned when the Lambda is deployed without a configured handler. const LambdaMissingHandler = errors.Error("LambdaMissingHandler") @@ -63,6 +64,9 @@ func init() { if err != nil { panic(fmt.Errorf("init concurrency: %v", err)) } + + // retrieves scan method from SUBSTATION_SCAN_METHOD environment variable + scanMethod = cmd.GetScanMethod() } type gatewayMetadata struct { @@ -240,7 +244,13 @@ func s3Handler(ctx context.Context, event events.S3Event) error { }) for scanner.Scan() { - cap.SetData([]byte(scanner.Text())) + switch scanMethod { + case "bytes": + cap.SetData(scanner.Bytes()) + case "text": + cap.SetData([]byte(scanner.Text())) + } + sub.SendTransform(cap) } } @@ -318,7 +328,13 @@ func s3SnsHandler(ctx context.Context, event events.SNSEvent) error { }) for scanner.Scan() { - cap.SetData([]byte(scanner.Text())) + switch scanMethod { + case "bytes": + cap.SetData(scanner.Bytes()) + case "text": + cap.SetData([]byte(scanner.Text())) + } + sub.SendTransform(cap) } } diff --git a/cmd/file/substation/main.go b/cmd/file/substation/main.go index a1138747..eeddc867 100644 --- a/cmd/file/substation/main.go +++ b/cmd/file/substation/main.go @@ -16,6 +16,7 @@ import ( ) var sub cmd.Substation +var scanMethod string func loadConfig(f string) error { bytes, err := ioutil.ReadFile(f) @@ -50,11 +51,15 @@ func main() { } func file(ctx context.Context, filename string) error { + // retrieves concurrency value from SUBSTATION_CONCURRENCY environment variable concurrency, err := cmd.GetConcurrency() if err != nil { return fmt.Errorf("file concurrency: %v", err) } + // retrieves scan method from SUBSTATION_SCAN_METHOD environment variable + scanMethod = cmd.GetScanMethod() + sub.CreateChannels(concurrency) defer sub.KillSignal() @@ -95,7 +100,13 @@ func file(ctx context.Context, filename string) error { scanner.Buffer([]byte{}, 100*1024*1024) for scanner.Scan() { - cap.SetData([]byte(scanner.Text())) + switch scanMethod { + case "bytes": + cap.SetData(scanner.Bytes()) + case "text": + cap.SetData([]byte(scanner.Text())) + } + sub.SendTransform(cap) } diff --git a/internal/aws/s3manager/s3manager.go b/internal/aws/s3manager/s3manager.go index 1b509ef9..5caaf033 100644 --- a/internal/aws/s3manager/s3manager.go +++ b/internal/aws/s3manager/s3manager.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "strconv" + "strings" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" @@ -110,8 +111,7 @@ func (a *DownloaderAPI) DownloadAsScanner(ctx aws.Context, bucket, key string) ( return nil, nil } - ct := http.DetectContentType(buf) - decoded, err := decode(buf, ct) + decoded, err := decode(buf, key) if err != nil { return nil, fmt.Errorf("decode bucket %s key %s: %v", bucket, key, err) } @@ -122,9 +122,29 @@ func (a *DownloaderAPI) DownloadAsScanner(ctx aws.Context, bucket, key string) ( return scanner, nil } -// decode converts bytes into a decoded io.Reader. -func decode(buf []byte, contentType string) (io.Reader, error) { - switch t := contentType; t { +/* +decode converts bytes into a decoded io.Reader using two different file identification techniques: + +- file extension matching + +- MIME type matching + +If either technque fails, then the content is returned with no decoding. +*/ +func decode(buf []byte, file string) (io.Reader, error) { + s := strings.Split(file, ".") + extension := s[len(s)-1] + + switch extension { + case "gz": + content, err := gzip.NewReader(bytes.NewBuffer(buf)) + if err != nil { + return nil, fmt.Errorf("decode file extension %s: %v", extension, err) + } + return content, nil + } + + switch contentType := http.DetectContentType(buf); contentType { case "application/x-gzip": content, err := gzip.NewReader(bytes.NewBuffer(buf)) if err != nil {