Skip to content

Commit

Permalink
feat: Add Sync and Async AWS Lambda Ingest (#72)
Browse files Browse the repository at this point in the history
* feat: add lambda invoke ingest

* docs: README

* chore: add async source lambda example

* feat: microservice example

* chore: add Lambda URL to example

* refactor: json raw message
  • Loading branch information
jshlbrd committed Jan 19, 2023
1 parent cee1932 commit 141fdf5
Show file tree
Hide file tree
Showing 49 changed files with 1,317 additions and 636 deletions.
2 changes: 2 additions & 0 deletions cmd/aws/lambda/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ We recommend using one autoscaling Lambda for an entire Substation deployment, b
This app handles ingest, transform, and load (ITL) for data from these AWS services:
* [API Gateway](https://docs.aws.amazon.com/lambda/latest/dg/services-apigateway.html)
* [Kinesis Data Streams](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html)
* [Asynchronous Invocation (Lambda)](https://docs.aws.amazon.com/lambda/latest/dg/invocation-async.html)
* [Synchronous Invocation (Lambda)](https://docs.aws.amazon.com/lambda/latest/dg/invocation-sync.html)
* [S3](https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html)
* [S3 via SNS](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-notification-config-to-bucket.html)
* [SNS](https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html)
Expand Down
80 changes: 80 additions & 0 deletions cmd/aws/lambda/substation/api_gateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package main

import (
"context"
"fmt"
"sync"

"github.com/aws/aws-lambda-go/events"
"github.com/brexhq/substation/cmd"
"github.com/brexhq/substation/config"
"golang.org/x/sync/errgroup"
)

type gatewayMetadata struct {
Resource string `json:"resource"`
Path string `json:"path"`
Headers map[string]string `json:"headers"`
}

func gatewayHandler(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
sub := cmd.New()

// retrieve and load configuration
cfg, err := getConfig(ctx)
if err != nil {
return events.APIGatewayProxyResponse{StatusCode: 500}, fmt.Errorf("gateway: %v", err)
}

if err := sub.SetConfig(cfg); err != nil {
return events.APIGatewayProxyResponse{StatusCode: 500}, fmt.Errorf("gateway: %v", err)
}

// maintains app state
group, ctx := errgroup.WithContext(ctx)

// load
var sinkWg sync.WaitGroup
sinkWg.Add(1)
group.Go(func() error {
return sub.Sink(ctx, &sinkWg)
})

// transform
var transformWg sync.WaitGroup
for w := 0; w < sub.Concurrency(); w++ {
transformWg.Add(1)
group.Go(func() error {
return sub.Transform(ctx, &transformWg)
})
}

// ingest
group.Go(func() error {
if len(request.Body) != 0 {
capsule := config.NewCapsule()
capsule.SetData([]byte(request.Body))
if _, err := capsule.SetMetadata(gatewayMetadata{
request.Resource,
request.Path,
request.Headers,
}); err != nil {
return fmt.Errorf("gateway handler: %v", err)
}

sub.Send(capsule)
}

sub.WaitTransform(&transformWg)
sub.WaitSink(&sinkWg)

return nil
})

// block until ITL is complete
if err := sub.Block(ctx, group); err != nil {
return events.APIGatewayProxyResponse{StatusCode: 500}, fmt.Errorf("gateway: %v", err)
}

return events.APIGatewayProxyResponse{StatusCode: 200}, nil
}
98 changes: 98 additions & 0 deletions cmd/aws/lambda/substation/kinesis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package main

import (
"context"
"fmt"
"sync"
"time"

"github.com/aws/aws-lambda-go/events"
"github.com/awslabs/kinesis-aggregation/go/deaggregator"
"github.com/brexhq/substation/cmd"
"github.com/brexhq/substation/config"
"github.com/brexhq/substation/internal/aws/kinesis"
"golang.org/x/sync/errgroup"
)

type kinesisMetadata struct {
ApproximateArrivalTimestamp time.Time `json:"approximateArrivalTimestamp"`
EventSourceArn string `json:"eventSourceArn"`
PartitionKey string `json:"partitionKey"`
SequenceNumber string `json:"sequenceNumber"`
}

func kinesisHandler(ctx context.Context, event events.KinesisEvent) error {
sub := cmd.New()

// retrieve and load configuration
cfg, err := getConfig(ctx)
if err != nil {
return fmt.Errorf("kinesis handler: %v", err)
}

if err := sub.SetConfig(cfg); err != nil {
return fmt.Errorf("kinesis handler: %v", err)
}

// maintains app state
group, ctx := errgroup.WithContext(ctx)

// load
var sinkWg sync.WaitGroup
sinkWg.Add(1)
group.Go(func() error {
return sub.Sink(ctx, &sinkWg)
})

// transform
var transformWg sync.WaitGroup
for w := 0; w < sub.Concurrency(); w++ {
transformWg.Add(1)
group.Go(func() error {
return sub.Transform(ctx, &transformWg)
})
}

// ingest
eventSourceArn := event.Records[len(event.Records)-1].EventSourceArn

group.Go(func() error {
converted := kinesis.ConvertEventsRecords(event.Records)
deaggregated, err := deaggregator.DeaggregateRecords(converted)
if err != nil {
return fmt.Errorf("kinesis handler: %v", err)
}

for _, record := range deaggregated {
select {
case <-ctx.Done():
return ctx.Err()
default:
capsule := config.NewCapsule()
capsule.SetData(record.Data)
if _, err := capsule.SetMetadata(kinesisMetadata{
*record.ApproximateArrivalTimestamp,
eventSourceArn,
*record.PartitionKey,
*record.SequenceNumber,
}); err != nil {
return fmt.Errorf("kinesis handler: %v", err)
}

sub.Send(capsule)
}
}

sub.WaitTransform(&transformWg)
sub.WaitSink(&sinkWg)

return nil
})

// block until ITL is complete
if err := sub.Block(ctx, group); err != nil {
return fmt.Errorf("kinesis handler: %v", err)
}

return nil
}
182 changes: 182 additions & 0 deletions cmd/aws/lambda/substation/lambda.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package main

import (
"context"
"encoding/json"
"fmt"
"sync"

"github.com/brexhq/substation/cmd"
"github.com/brexhq/substation/config"
"github.com/brexhq/substation/internal/errors"
"github.com/brexhq/substation/internal/service"
"golang.org/x/sync/errgroup"
)

// lambdaAsyncHandler implements ITL that is triggered by an asynchronous invocation
// of the Lambda. Read more about synchronous invocation here:
// https://docs.aws.amazon.com/lambda/latest/dg/invocation-async.html.
//
// This implementation of Substation only supports the object data handling pattern
// -- if the payload sent to the Lambda is not JSON, then the invocation will fail.
func lambdaAsyncHandler(ctx context.Context, event json.RawMessage) error {
evt, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("lambda async: %v", err)
}

sub := cmd.New()

// retrieve and load configuration
cfg, err := getConfig(ctx)
if err != nil {
return fmt.Errorf("lambda async: %v", err)
}

if err := sub.SetConfig(cfg); err != nil {
return fmt.Errorf("lambda async: %v", err)
}

// maintains app state
group, ctx := errgroup.WithContext(ctx)

// load
var sinkWg sync.WaitGroup
sinkWg.Add(1)
group.Go(func() error {
return sub.Sink(ctx, &sinkWg)
})

// transform
var transformWg sync.WaitGroup
for w := 0; w < sub.Concurrency(); w++ {
transformWg.Add(1)
group.Go(func() error {
return sub.Transform(ctx, &transformWg)
})
}

// ingest
group.Go(func() error {
capsule := config.NewCapsule()
capsule.SetData(evt)

// do not add metadata -- there is no metadata worth adding from the invocation
sub.Send(capsule)

sub.WaitTransform(&transformWg)
sub.WaitSink(&sinkWg)

return nil
})

// block until ITL is complete
if err := sub.Block(ctx, group); err != nil {
panic(err)
}

return nil
}

// errLambdaSyncMultipleItems is returned when an invocation of the lambdaSync handler
// produces multiple items, which cannot be returned.
const errLambdaSyncMultipleItems = errors.Error("transformed data into multiple items")

// lambdaSyncHandler implements ITL using a request-reply service that is triggered
// by synchronous invocation of the Lambda. Read more about synchronous invocation here:
// https://docs.aws.amazon.com/lambda/latest/dg/invocation-sync.html.
//
// This implementation of Substation has some limitations and requirements:
//
// - Only supports the object data handling pattern -- if the payload sent to the Lambda
// and the result are not JSON, then the invocation will fail
//
// - Only returns a single object -- if many objects may be returned, then they should be
// aggregated into one object using the Aggregate processor
//
// - Must use the gRPC sink configured to send data to localhost:50051 -- data is routed
// from the sink to the handler using the Substation gRPC Sink service
func lambdaSyncHandler(ctx context.Context, event json.RawMessage) (json.RawMessage, error) {
evt, err := json.Marshal(event)
if err != nil {
return nil, fmt.Errorf("lambda sync: %v", err)
}

sub := cmd.New()

// retrieve and load configuration
cfg, err := getConfig(ctx)
if err != nil {
return nil, fmt.Errorf("lambda sync: %v", err)
}

if err := sub.SetConfig(cfg); err != nil {
return nil, fmt.Errorf("lambda sync: %v", err)
}

// maintains app state
group, ctx := errgroup.WithContext(ctx)

// gRPC service, required for catching results from the sink
server := service.Server{}
server.Setup()

// deferring guarantees that the gRPC server will shutdown
defer server.Stop()

srv := &service.Sink{}
server.RegisterSink(srv)

// gRPC server runs in a goroutine to prevent blocking main
group.Go(func() error {
return server.Start("localhost:50051")
})

// load
var sinkWg sync.WaitGroup
sinkWg.Add(1)
group.Go(func() error {
return sub.Sink(ctx, &sinkWg)
})

// transform
var transformWg sync.WaitGroup
for w := 0; w < sub.Concurrency(); w++ {
transformWg.Add(1)
group.Go(func() error {
return sub.Transform(ctx, &transformWg)
})
}

// ingest
group.Go(func() error {
capsule := config.NewCapsule()
capsule.SetData(evt)

// do not add metadata -- there is no metadata worth adding from the invocation
sub.Send(capsule)

sub.WaitTransform(&transformWg)
sub.WaitSink(&sinkWg)

return nil
})

// block until ITL is complete and the gRPC stream is closed
if err := sub.Block(ctx, group); err != nil {
panic(err)
}
srv.Block()

if len(srv.Capsules) > 1 {
return nil, fmt.Errorf("lambda sync: %v", errLambdaSyncMultipleItems)
}

capsule := srv.Capsules[0]
var output json.RawMessage
if err := json.Unmarshal(capsule.Data(), &output); err != nil {
return nil, fmt.Errorf("lambda sync: %v", err)
}

return output, nil
}
Loading

0 comments on commit 141fdf5

Please sign in to comment.