Skip to content

Commit

Permalink
fix: Handling Large S3 Files (#20)
Browse files Browse the repository at this point in the history
* fix: corrects scanner regression

* refactor: speed up processing large JSON objects

* fix: halve lambda capacity
  • Loading branch information
jshlbrd committed Aug 31, 2022
1 parent be46c0b commit 2791b91
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 13 deletions.
8 changes: 2 additions & 6 deletions cmd/file/substation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,9 @@ func file(ctx context.Context, filename string) error {
fi.ModTime(),
})

// a scanner token can be up to 100MB
scanner := bufio.NewScanner(fileHandle)

// ensures that files containing a single line can fit
// within the scanner
s := int(float64(fi.Size()) * 1.1)
buf := make([]byte, s)
scanner.Buffer(buf, s)
scanner.Buffer([]byte{}, 100*1024*1024)

for scanner.Scan() {
cap.SetData([]byte(scanner.Text()))
Expand Down
25 changes: 19 additions & 6 deletions internal/aws/s3manager/s3manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,24 @@ import (
"github.com/aws/aws-xray-sdk-go/xray"
)

// maxCapacity limits the size of tokens in bufio scanners
var maxCapacity int

func init() {
// by default a scanner token can be up to 100MB. if
// executed in Lambda, then the capacity is half of the
// function's memory.
maxCapacity = 100 * 1024 * 1024
if val, ok := os.LookupEnv("AWS_LAMBDA_FUNCTION_MEMORY_SIZE"); ok {
v, err := strconv.Atoi(val)
if err != nil {
panic(fmt.Errorf("s3manager init: %v", err))
}

maxCapacity = v * 1024 * 1024 / 2
}
}

// NewS3 returns a configured S3 client.
func NewS3() *s3.S3 {
conf := aws.NewConfig()
Expand Down Expand Up @@ -99,12 +117,7 @@ func (a *DownloaderAPI) DownloadAsScanner(ctx aws.Context, bucket, key string) (
}

scanner := bufio.NewScanner(decoded)

// ensures that files containing a single line can fit
// within the scanner
s := int(float64(size) * 1.1)
b := make([]byte, s)
scanner.Buffer(b, s)
scanner.Buffer([]byte{}, maxCapacity)

return scanner, nil
}
Expand Down
12 changes: 11 additions & 1 deletion process/expand.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,20 @@ func (p Expand) ApplyBatch(ctx context.Context, caps []config.Capsule) ([]config
// expanded:
// {"foo":"bar","quux":"corge"}
// {"baz":"qux","quux":"corge"}

// the JSON Get / Delete routine is a hack to speed up processing
// very large JSON objects, like those output by AWS CloudTrail
root := cap.Get("@this")
rootBytes, err := json.Delete([]byte(root.String()), p.InputKey)
if err != nil {
return nil, fmt.Errorf("process expand applybatch: %v", err)
}

root = json.Get(rootBytes, "@this")
result := cap.Get(p.InputKey)

newCap := config.NewCapsule()
// retains metadata from the original capsule
newCap := cap
for _, res := range result.Array() {
var err error

Expand Down

0 comments on commit 2791b91

Please sign in to comment.