Skip to content

Commit

Permalink
feat: Improvements to Reading and Decoding Files (#24)
Browse files Browse the repository at this point in the history
* fix: edge case for gz files

* feat: env var for reading non-text data
  • Loading branch information
jshlbrd committed Sep 9, 2022
1 parent da9de62 commit e310cb5
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 9 deletions.
21 changes: 20 additions & 1 deletion cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (sub *Substation) Sink(ctx context.Context, wg *sync.WaitGroup) {
sub.DoneSignal()
}

// GetConcurrency retrieves a concurrency value from the SUBSTATION_CONCURRENCY environment variable. If the environment variable is missing, then the concurrency value is the number of CPUs on the operating system. In native Substation applications, this value determines the number of transform goroutines; if set to 1, then multi-core processing is not enabled.
// GetConcurrency retrieves a concurrency value from the SUBSTATION_CONCURRENCY environment variable. If the environment variable is missing, then the concurrency value is the number of CPUs on the host. In native Substation applications, this value determines the number of transform goroutines; if set to 1, then multi-core processing is not enabled.
func GetConcurrency() (int, error) {
if val, found := os.LookupEnv("SUBSTATION_CONCURRENCY"); found {
v, err := strconv.Atoi(val)
Expand All @@ -163,3 +163,22 @@ func GetConcurrency() (int, error) {

return runtime.NumCPU(), nil
}

/*
GetScanMethod retrieves a scan method from the SUBSTATION_SCAN_METHOD environment variable. This impacts the behavior of bufio scanners that are used throughout the application to read files. The options for this variable are:
- "bytes" (https://pkg.go.dev/bufio#Scanner.Bytes)
- "text" (https://pkg.go.dev/bufio#Scanner.Text)
If the environment variable is missing, then the default method is "text".
*/
func GetScanMethod() string {
if val, found := os.LookupEnv("SUBSTATION_SCAN_METHOD"); found {
if val == "bytes" || val == "text" {
return val
}
}

return "text"
}
20 changes: 18 additions & 2 deletions cmd/aws/lambda/substation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
var sub cmd.Substation
var concurrency int
var handler string
var scanMethod string

// LambdaMissingHandler is returned when the Lambda is deployed without a configured handler.
const LambdaMissingHandler = errors.Error("LambdaMissingHandler")
Expand Down Expand Up @@ -63,6 +64,9 @@ func init() {
if err != nil {
panic(fmt.Errorf("init concurrency: %v", err))
}

// retrieves scan method from SUBSTATION_SCAN_METHOD environment variable
scanMethod = cmd.GetScanMethod()
}

type gatewayMetadata struct {
Expand Down Expand Up @@ -240,7 +244,13 @@ func s3Handler(ctx context.Context, event events.S3Event) error {
})

for scanner.Scan() {
cap.SetData([]byte(scanner.Text()))
switch scanMethod {
case "bytes":
cap.SetData(scanner.Bytes())
case "text":
cap.SetData([]byte(scanner.Text()))
}

sub.SendTransform(cap)
}
}
Expand Down Expand Up @@ -318,7 +328,13 @@ func s3SnsHandler(ctx context.Context, event events.SNSEvent) error {
})

for scanner.Scan() {
cap.SetData([]byte(scanner.Text()))
switch scanMethod {
case "bytes":
cap.SetData(scanner.Bytes())
case "text":
cap.SetData([]byte(scanner.Text()))
}

sub.SendTransform(cap)
}
}
Expand Down
13 changes: 12 additions & 1 deletion cmd/file/substation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
)

var sub cmd.Substation
var scanMethod string

func loadConfig(f string) error {
bytes, err := ioutil.ReadFile(f)
Expand Down Expand Up @@ -50,11 +51,15 @@ func main() {
}

func file(ctx context.Context, filename string) error {
// retrieves concurrency value from SUBSTATION_CONCURRENCY environment variable
concurrency, err := cmd.GetConcurrency()
if err != nil {
return fmt.Errorf("file concurrency: %v", err)
}

// retrieves scan method from SUBSTATION_SCAN_METHOD environment variable
scanMethod = cmd.GetScanMethod()

sub.CreateChannels(concurrency)
defer sub.KillSignal()

Expand Down Expand Up @@ -95,7 +100,13 @@ func file(ctx context.Context, filename string) error {
scanner.Buffer([]byte{}, 100*1024*1024)

for scanner.Scan() {
cap.SetData([]byte(scanner.Text()))
switch scanMethod {
case "bytes":
cap.SetData(scanner.Bytes())
case "text":
cap.SetData([]byte(scanner.Text()))
}

sub.SendTransform(cap)
}

Expand Down
30 changes: 25 additions & 5 deletions internal/aws/s3manager/s3manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"os"
"strconv"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
Expand Down Expand Up @@ -110,8 +111,7 @@ func (a *DownloaderAPI) DownloadAsScanner(ctx aws.Context, bucket, key string) (
return nil, nil
}

ct := http.DetectContentType(buf)
decoded, err := decode(buf, ct)
decoded, err := decode(buf, key)
if err != nil {
return nil, fmt.Errorf("decode bucket %s key %s: %v", bucket, key, err)
}
Expand All @@ -122,9 +122,29 @@ func (a *DownloaderAPI) DownloadAsScanner(ctx aws.Context, bucket, key string) (
return scanner, nil
}

// decode converts bytes into a decoded io.Reader.
func decode(buf []byte, contentType string) (io.Reader, error) {
switch t := contentType; t {
/*
decode converts bytes into a decoded io.Reader using two different file identification techniques:
- file extension matching
- MIME type matching
If either technque fails, then the content is returned with no decoding.
*/
func decode(buf []byte, file string) (io.Reader, error) {
s := strings.Split(file, ".")
extension := s[len(s)-1]

switch extension {
case "gz":
content, err := gzip.NewReader(bytes.NewBuffer(buf))
if err != nil {
return nil, fmt.Errorf("decode file extension %s: %v", extension, err)
}
return content, nil
}

switch contentType := http.DetectContentType(buf); contentType {
case "application/x-gzip":
content, err := gzip.NewReader(bytes.NewBuffer(buf))
if err != nil {
Expand Down

0 comments on commit e310cb5

Please sign in to comment.