From 9ddc8b409d9d84264829661e39e3c1526ae4176c Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Sat, 29 Jun 2024 17:08:02 +0200 Subject: [PATCH 1/2] bento-lambda: Include local testing guide and some more information on errors --- website/docs/guides/serverless/lambda.md | 72 ++++++++++++++++++++++-- 1 file changed, 66 insertions(+), 6 deletions(-) diff --git a/website/docs/guides/serverless/lambda.md b/website/docs/guides/serverless/lambda.md index 699a0c96d..f2643826f 100644 --- a/website/docs/guides/serverless/lambda.md +++ b/website/docs/guides/serverless/lambda.md @@ -91,11 +91,9 @@ error unless the output fails. This means that errors that occur within your processors will not result in the handler failing, which will instead return the final state of the message. -In the next major version release (V4) this will change and the handler will -fail if messages have encountered an uncaught error during execution. However, -in the meantime it is possible to configure your output to use the new -[`reject` output][output.reject] in order to trigger a handler error on -processor errors: +The handler will fail if messages have encountered an uncaught error during execution. +However, it is also possible to configure your output to use the [`reject` output][output.reject] +in order to trigger a handler error on processor errors: ```yaml output: @@ -109,6 +107,11 @@ output: reject: "processing failed due to: ${! error() }" ``` +:::caution +If you are using [partial batch responses](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-batchfailurereporting) then +throwing an exception will cause the entire batch to be considered a failure. +::: + ### Running a combination It's possible to configure pipelines that send messages to third party @@ -154,7 +157,7 @@ aws lambda create-function \ There is also an example [SAM template][sam-template] and [Terraform resource][tf-example] in the repo to copy from. -### provided.al2 on amd64 +### provided.al2 on arm64 Grab an archive labelled `bento-lambda-al2` for `arm64` from the [releases page][releases] page and then create your function (AWS CLI v2 only): @@ -194,6 +197,63 @@ go build github.com/warpstreamlabs/bento/cmd/serverless/bento-lambda zip bento-lambda.zip bento-lambda ``` +## Local testing +A quick guide on using the [LocalStack AWS emulator](https://docs.localstack.cloud/overview/) to test your `bento-lambda` locally. + +### Installation +The quickest way to get up-and-running is using the [LocalStack Docker image](https://docs.localstack.cloud/getting-started/installation/#docker): +```sh +docker run \ + --rm -it \ + -p 127.0.0.1:4566:4566 \ + -p 127.0.0.1:4510-4559:4510-4559 \ + -v /var/run/docker.sock:/var/run/docker.sock \ + localstack/localstack +``` + +### Using the AWS CLI v2 + +First, configure AWS CLI to connect to LocalStack: +```sh +export AWS_ACCESS_KEY_ID="test" +export AWS_SECRET_ACCESS_KEY="test" +export AWS_DEFAULT_REGION="us-east-1" +export AWS_ENDPOINT_URL=http://localhost:4566 # Unset this to create resource in AWS +``` + +### Creation and Invocation Examples + +With the container running and CLI configured, we can go ahead with creating and invoking our Bento Lambda using [Lambda on LocalStack](https://docs.localstack.cloud/user-guide/aws/lambda/). + +Running the example in [go1.x on x86_64](#go1x-on-x86_64) can be easily achieved by including the specified endpoint flag. + +```sh +LAMBDA_ENV=`cat yourconfig.yaml | jq -csR {Variables:{BENTO_CONFIG:.}}` +aws lambda create-function \ + --runtime go1.x \ + --handler bento-lambda \ + --role 'arn:aws:iam::000000000000:role/bento-example-role' \ + --zip-file fileb://bento-lambda.zip \ + --environment "$LAMBDA_ENV" \ + --function-name bento-example +``` + +Invocation can be done the same way: +```sh +aws lambda invoke \ + --function-name bento-example \ + --cli-binary-format raw-in-base64-out \ + --payload '{"your":"document"}' \ + out.txt && cat out.txt && rm out.txt +``` + +The LocalStack logs should then include: +```sh +localstack.request.aws : AWS lambda.CreateFunction => 201 +... +localstack.request.aws : AWS lambda.Invoke => 200 +``` + [releases]: https://github.com/warpstreamlabs/bento/releases [sam-template]: https://github.com/warpstreamlabs/bento/tree/main/resources/serverless/lambda/bento-lambda-sam.yaml [tf-example]: https://github.com/warpstreamlabs/bento/tree/main/resources/serverless/lambda/bento-lambda.tf From 3b983ed1415baf9e764b33d358f800c90bbec8e3 Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Sat, 29 Jun 2024 17:08:42 +0200 Subject: [PATCH 2/2] bento-lambda: Add handler exception and error tests --- internal/serverless/handler_test.go | 98 +++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/internal/serverless/handler_test.go b/internal/serverless/handler_test.go index 5ecf18cff..b1eb83833 100644 --- a/internal/serverless/handler_test.go +++ b/internal/serverless/handler_test.go @@ -202,3 +202,101 @@ output: t.Error(err) } } + +func TestHandlerError(t *testing.T) { + var results [][]byte + var resMut sync.Mutex + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + resMut.Lock() + defer resMut.Unlock() + + resBytes, err := io.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + results = append(results, resBytes) + + _, _ = w.Write([]byte("success")) + })) + defer ts.Close() + + conf, err := testutil.ConfigFromYAML(` +pipeline: + processors: + - bloblang: throw("error") + +output: + sync_response: {} +`) + require.NoError(t, err) + + h, err := NewHandler(conf) + if err != nil { + t.Fatal(err) + } + + var res any + if res, err = h.Handle(context.Background(), map[string]any{"foo": "bar"}); err != nil { + t.Fatal(err) + } + + if exp, act := map[string]any{"foo": "bar"}, res; !reflect.DeepEqual(exp, act) { + t.Errorf("Wrong sync response: %v != %v", exp, act) + } + if len(results) != 0 { + t.Error("Wrong sync response: results should be empty") + } + + if err = h.Close(time.Second * 10); err != nil { + t.Error(err) + } +} + +func TestHandlerErrorOnReject(t *testing.T) { + var results [][]byte + var resMut sync.Mutex + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + resMut.Lock() + defer resMut.Unlock() + + resBytes, err := io.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + results = append(results, resBytes) + + _, _ = w.Write([]byte("success")) + })) + defer ts.Close() + + conf, err := testutil.ConfigFromYAML(` +pipeline: + processors: + - mapping: throw("error") + +output: + reject_errored: + drop: {} +`) + require.NoError(t, err) + + h, err := NewHandler(conf) + if err != nil { + t.Fatal(err) + } + + var res any + res, err = h.Handle(context.Background(), map[string]any{"foo": "bar"}) + + if err == nil { + t.Fatalf("expected processing error") + } + + if res != nil { + t.Errorf("expected %v, got %v", nil, res) + } + + if err = h.Close(time.Second * 10); err != nil { + t.Error(err) + } +}