Skip to content

Commit

Permalink
Merge pull request #62 from gregfurman/serverless-house-keeping
Browse files Browse the repository at this point in the history
Serverless housekeeping: Local testing guide, small docs fixes, and some handler tests for errors
  • Loading branch information
gregfurman committed Jul 3, 2024
2 parents f18aa09 + 3b983ed commit e8d1e92
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 6 deletions.
98 changes: 98 additions & 0 deletions internal/serverless/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,101 @@ output:
t.Error(err)
}
}

func TestHandlerError(t *testing.T) {
var results [][]byte
var resMut sync.Mutex
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resMut.Lock()
defer resMut.Unlock()

resBytes, err := io.ReadAll(r.Body)
if err != nil {
t.Fatal(err)
}
results = append(results, resBytes)

_, _ = w.Write([]byte("success"))
}))
defer ts.Close()

conf, err := testutil.ConfigFromYAML(`
pipeline:
processors:
- bloblang: throw("error")
output:
sync_response: {}
`)
require.NoError(t, err)

h, err := NewHandler(conf)
if err != nil {
t.Fatal(err)
}

var res any
if res, err = h.Handle(context.Background(), map[string]any{"foo": "bar"}); err != nil {
t.Fatal(err)
}

if exp, act := map[string]any{"foo": "bar"}, res; !reflect.DeepEqual(exp, act) {
t.Errorf("Wrong sync response: %v != %v", exp, act)
}
if len(results) != 0 {
t.Error("Wrong sync response: results should be empty")
}

if err = h.Close(time.Second * 10); err != nil {
t.Error(err)
}
}

func TestHandlerErrorOnReject(t *testing.T) {
var results [][]byte
var resMut sync.Mutex
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resMut.Lock()
defer resMut.Unlock()

resBytes, err := io.ReadAll(r.Body)
if err != nil {
t.Fatal(err)
}
results = append(results, resBytes)

_, _ = w.Write([]byte("success"))
}))
defer ts.Close()

conf, err := testutil.ConfigFromYAML(`
pipeline:
processors:
- mapping: throw("error")
output:
reject_errored:
drop: {}
`)
require.NoError(t, err)

h, err := NewHandler(conf)
if err != nil {
t.Fatal(err)
}

var res any
res, err = h.Handle(context.Background(), map[string]any{"foo": "bar"})

if err == nil {
t.Fatalf("expected processing error")
}

if res != nil {
t.Errorf("expected %v, got %v", nil, res)
}

if err = h.Close(time.Second * 10); err != nil {
t.Error(err)
}
}
72 changes: 66 additions & 6 deletions website/docs/guides/serverless/lambda.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,9 @@ error unless the output fails. This means that errors that occur within your
processors will not result in the handler failing, which will instead return the
final state of the message.

In the next major version release (V4) this will change and the handler will
fail if messages have encountered an uncaught error during execution. However,
in the meantime it is possible to configure your output to use the new
[`reject` output][output.reject] in order to trigger a handler error on
processor errors:
The handler will fail if messages have encountered an uncaught error during execution.
However, it is also possible to configure your output to use the [`reject` output][output.reject]
in order to trigger a handler error on processor errors:

```yaml
output:
Expand All @@ -109,6 +107,11 @@ output:
reject: "processing failed due to: ${! error() }"
```
:::caution
If you are using [partial batch responses](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-batchfailurereporting) then
throwing an exception will cause the entire batch to be considered a failure.
:::
### Running a combination
It's possible to configure pipelines that send messages to third party
Expand Down Expand Up @@ -154,7 +157,7 @@ aws lambda create-function \
There is also an example [SAM template][sam-template] and
[Terraform resource][tf-example] in the repo to copy from.

### provided.al2 on amd64
### provided.al2 on arm64

Grab an archive labelled `bento-lambda-al2` for `arm64` from the [releases page][releases]
page and then create your function (AWS CLI v2 only):
Expand Down Expand Up @@ -194,6 +197,63 @@ go build github.com/warpstreamlabs/bento/cmd/serverless/bento-lambda
zip bento-lambda.zip bento-lambda
```

## Local testing
A quick guide on using the [LocalStack AWS emulator](https://docs.localstack.cloud/overview/) to test your `bento-lambda` locally.

### Installation
The quickest way to get up-and-running is using the [LocalStack Docker image](https://docs.localstack.cloud/getting-started/installation/#docker):
```sh
docker run \
--rm -it \
-p 127.0.0.1:4566:4566 \
-p 127.0.0.1:4510-4559:4510-4559 \
-v /var/run/docker.sock:/var/run/docker.sock \
localstack/localstack
```

### Using the AWS CLI v2

First, configure AWS CLI to connect to LocalStack:
```sh
export AWS_ACCESS_KEY_ID="test"
export AWS_SECRET_ACCESS_KEY="test"
export AWS_DEFAULT_REGION="us-east-1"
export AWS_ENDPOINT_URL=http://localhost:4566 # Unset this to create resource in AWS
```

### Creation and Invocation Examples

With the container running and CLI configured, we can go ahead with creating and invoking our Bento Lambda using [Lambda on LocalStack](https://docs.localstack.cloud/user-guide/aws/lambda/).

Running the example in [go1.x on x86_64](#go1x-on-x86_64) can be easily achieved by including the specified endpoint flag.

```sh
LAMBDA_ENV=`cat yourconfig.yaml | jq -csR {Variables:{BENTO_CONFIG:.}}`
aws lambda create-function \
--runtime go1.x \
--handler bento-lambda \
--role 'arn:aws:iam::000000000000:role/bento-example-role' \
--zip-file fileb://bento-lambda.zip \
--environment "$LAMBDA_ENV" \
--function-name bento-example
```

Invocation can be done the same way:
```sh
aws lambda invoke \
--function-name bento-example \
--cli-binary-format raw-in-base64-out \
--payload '{"your":"document"}' \
out.txt && cat out.txt && rm out.txt
```

The LocalStack logs should then include:
```sh
localstack.request.aws : AWS lambda.CreateFunction => 201
...
localstack.request.aws : AWS lambda.Invoke => 200
```

[releases]: https://github.com/warpstreamlabs/bento/releases
[sam-template]: https://github.com/warpstreamlabs/bento/tree/main/resources/serverless/lambda/bento-lambda-sam.yaml
[tf-example]: https://github.com/warpstreamlabs/bento/tree/main/resources/serverless/lambda/bento-lambda.tf
Expand Down

0 comments on commit e8d1e92

Please sign in to comment.