From 118b7353ddeb4af799478733b305b3090fc91d9d Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 30 Jun 2024 12:05:06 +0600 Subject: [PATCH 01/10] feat: `throwOnFullBatchFailure` option for `BatchProcessingOptions` --- packages/batch/src/types.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index 51f9b78e34..a267f092b6 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -13,6 +13,7 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; * @template T The type of the batch processor, defaults to BasePartialBatchProcessor * @property context The context object provided by the AWS Lambda runtime * @property skipGroupOnError The option to group on error during processing + * @property throwOnFullBatchFailure The option to throw an error if the entire batch fails */ type BatchProcessingOptions = { /** @@ -25,6 +26,10 @@ type BatchProcessingOptions = { * If true skip the group on error during processing. */ skipGroupOnError?: T extends SqsFifoPartialProcessor ? boolean : never; + /** + * Set this to false to prevent throwing an error if the entire batch fails. + */ + throwOnFullBatchFailure?: boolean; }; /** From 9ccfb13323dc4e512aac6c5be51b009871536bb0 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 30 Jun 2024 12:14:08 +0600 Subject: [PATCH 02/10] feat: check `throwOnFullBatchFailure` option while throwing `FullBatchFailureError` error --- packages/batch/src/BasePartialBatchProcessor.ts | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/packages/batch/src/BasePartialBatchProcessor.ts b/packages/batch/src/BasePartialBatchProcessor.ts index ca913e49e9..09f11b3d94 100644 --- a/packages/batch/src/BasePartialBatchProcessor.ts +++ b/packages/batch/src/BasePartialBatchProcessor.ts @@ -63,9 +63,9 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { /** * Clean up logic to be run after processing a batch * - * If the entire batch failed, and the utility is not configured otherwise, - * this method will throw a `FullBatchFailureError` with the list of errors - * that occurred during processing. + * If the entire batch failed, and `throwOnFullBatchFailure` option is not explicitly + * set to `false`, this method will throw a `FullBatchFailureError` with the list of + * errors that occurred during processing. * * Otherwise, it will build the partial failure response based on the event type. */ @@ -74,7 +74,10 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { return; } - if (this.entireBatchFailed()) { + if ( + this.options?.throwOnFullBatchFailure !== false && + this.entireBatchFailed() + ) { throw new FullBatchFailureError(this.errors); } From dba0f4474ebe21214733e67c7533eb98c21d358f Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 30 Jun 2024 12:39:04 +0600 Subject: [PATCH 03/10] test: process partial response function call with asynchronous handler for full batch failure --- .../tests/unit/processPartialResponse.test.ts | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/packages/batch/tests/unit/processPartialResponse.test.ts b/packages/batch/tests/unit/processPartialResponse.test.ts index 732d40ee3b..a055a4b811 100644 --- a/packages/batch/tests/unit/processPartialResponse.test.ts +++ b/packages/batch/tests/unit/processPartialResponse.test.ts @@ -15,6 +15,7 @@ import { processPartialResponse, EventType, UnexpectedBatchTypeError, + FullBatchFailureError, } from '../../src/index.js'; import type { BatchProcessingOptions, @@ -90,6 +91,59 @@ describe('Function: processPartialResponse()', () => { // Assess expect(ret).toStrictEqual({ batchItemFailures: [] }); }); + + test('Process partial response function call with asynchronous handler for full batch failure', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); + + // Act & Assess + await expect( + processPartialResponse(batch, asyncSqsRecordHandler, processor) + ).rejects.toThrow(FullBatchFailureError); + }); + + test('Process partial response function call with asynchronous handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); + + // Act & Assess + await expect( + processPartialResponse(batch, asyncSqsRecordHandler, processor, { + ...options, + throwOnFullBatchFailure: true, + }) + ).rejects.toThrow(FullBatchFailureError); + }); + + test('Process partial response function call with asynchronous handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); + + // Act + const response = await processPartialResponse( + batch, + asyncSqsRecordHandler, + processor, + { + ...options, + throwOnFullBatchFailure: false, + } + ); + + // Assess + expect(response).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: records[0].messageId }, + { itemIdentifier: records[1].messageId }, + ], + }); + }); }); describe('Process partial response function call through handler', () => { From 52aca96ef606b6ca38f5cf7db6730c4ec1ec24f7 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 30 Jun 2024 12:42:30 +0600 Subject: [PATCH 04/10] test: process partial response through handler for full batch failure --- .../tests/unit/processPartialResponse.test.ts | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/packages/batch/tests/unit/processPartialResponse.test.ts b/packages/batch/tests/unit/processPartialResponse.test.ts index a055a4b811..1d379e6398 100644 --- a/packages/batch/tests/unit/processPartialResponse.test.ts +++ b/packages/batch/tests/unit/processPartialResponse.test.ts @@ -282,5 +282,74 @@ describe('Function: processPartialResponse()', () => { // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); }); + + test('Process partial response through handler for full batch failure', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const processor = new BatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return processPartialResponse(event, asyncSqsRecordHandler, processor); + }; + + // Act & Assess + await expect(handler(event, context)).rejects.toThrow( + FullBatchFailureError + ); + }); + + test('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const processor = new BatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return processPartialResponse(event, asyncSqsRecordHandler, processor, { + ...options, + throwOnFullBatchFailure: true, + }); + }; + + // Act & Assess + await expect(handler(event, context)).rejects.toThrow( + FullBatchFailureError + ); + }); + + test('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const processor = new BatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return processPartialResponse(event, asyncSqsRecordHandler, processor, { + ...options, + throwOnFullBatchFailure: false, + }); + }; + + // Act + const response = await handler(event, context); + + // Assess + expect(response).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: records[0].messageId }, + { itemIdentifier: records[1].messageId }, + ], + }); + }); }); }); From a491971d3fbc3822fd94140dcc7bddf5f7ac8ee6 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 30 Jun 2024 16:10:53 +0600 Subject: [PATCH 05/10] test: processPartialResponseSync for full batch failure with `throwOnFullBatchFailure` option --- .../unit/processPartialResponseSync.test.ts | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/packages/batch/tests/unit/processPartialResponseSync.test.ts b/packages/batch/tests/unit/processPartialResponseSync.test.ts index 4fe7da8609..8d4bb1e263 100644 --- a/packages/batch/tests/unit/processPartialResponseSync.test.ts +++ b/packages/batch/tests/unit/processPartialResponseSync.test.ts @@ -15,6 +15,7 @@ import { processPartialResponseSync, EventType, UnexpectedBatchTypeError, + FullBatchFailureError, } from '../../src/index.js'; import type { BatchProcessingOptions, @@ -90,6 +91,59 @@ describe('Function: processPartialResponse()', () => { // Assess expect(ret).toStrictEqual({ batchItemFailures: [] }); }); + + test('Process partial response function call with synchronous handler for full batch failure', () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const batch = { Records: records }; + const processor = new BatchProcessorSync(EventType.SQS); + + // Act & Assess + expect(() => + processPartialResponseSync(batch, sqsRecordHandler, processor) + ).toThrow(FullBatchFailureError); + }); + + test('Process partial response function call with synchronous handler for full batch failure when `throwOnFullBatchFailure` is `true`', () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const batch = { Records: records }; + const processor = new BatchProcessorSync(EventType.SQS); + + // Act & Assess + expect(() => + processPartialResponseSync(batch, sqsRecordHandler, processor, { + ...options, + throwOnFullBatchFailure: true, + }) + ).toThrow(FullBatchFailureError); + }); + + test('Process partial response function call with synchronous handler for full batch failure when `throwOnFullBatchFailure` is `false`', () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const batch = { Records: records }; + const processor = new BatchProcessorSync(EventType.SQS); + + // Act + const response = processPartialResponseSync( + batch, + sqsRecordHandler, + processor, + { + ...options, + throwOnFullBatchFailure: false, + } + ); + + // Assess + expect(response).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: records[0].messageId }, + { itemIdentifier: records[1].messageId }, + ], + }); + }); }); describe('Process partial response function call through handler', () => { @@ -224,5 +278,70 @@ describe('Function: processPartialResponse()', () => { // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); }); + + test('Process partial response through handler for full batch failure', () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const processor = new BatchProcessorSync(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = ( + event: SQSEvent, + _context: Context + ): PartialItemFailureResponse => { + return processPartialResponseSync(event, sqsRecordHandler, processor); + }; + + // Act & Assess + expect(() => handler(event, context)).toThrow(FullBatchFailureError); + }); + + test('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `true`', () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const processor = new BatchProcessorSync(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = ( + event: SQSEvent, + _context: Context + ): PartialItemFailureResponse => { + return processPartialResponseSync(event, sqsRecordHandler, processor, { + ...options, + throwOnFullBatchFailure: true, + }); + }; + + // Act & Assess + expect(() => handler(event, context)).toThrow(FullBatchFailureError); + }); + + test('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `false`', () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const processor = new BatchProcessorSync(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = ( + event: SQSEvent, + _context: Context + ): PartialItemFailureResponse => { + return processPartialResponseSync(event, sqsRecordHandler, processor, { + ...options, + throwOnFullBatchFailure: false, + }); + }; + + // Act + const response = handler(event, context); + + // Assess + expect(response).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: records[0].messageId }, + { itemIdentifier: records[1].messageId }, + ], + }); + }); }); }); From ea8a02014d301c119ccf85676bc8c2839ebb0d8f Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 30 Jun 2024 16:22:50 +0600 Subject: [PATCH 06/10] doc: update do block for process functions --- packages/batch/src/processPartialResponse.ts | 25 +++++++++++++++++++ .../batch/src/processPartialResponseSync.ts | 24 ++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/packages/batch/src/processPartialResponse.ts b/packages/batch/src/processPartialResponse.ts index ca74e2801e..4423e1c179 100644 --- a/packages/batch/src/processPartialResponse.ts +++ b/packages/batch/src/processPartialResponse.ts @@ -42,6 +42,31 @@ import type { * }); * ``` * + * By default, if the entire batch fails, the function will throw an error. + * If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false` + * + * @example + * ```typescript + * import { + * BatchProcessor, + * EventType, + * processPartialResponse, + * } from '@aws-lambda-powertools/batch'; + * import type { KinesisStreamHandler, KinesisStreamRecord } from 'aws-lambda'; + * + * const processor = new BatchProcessor(EventType.KinesisDataStreams); + * + * const recordHandler = async (record: KinesisStreamRecord): Promise => { + * const payload = JSON.parse(record.kinesis.data); + * }; + * + * export const handler: KinesisStreamHandler = async (event, context) => + * processPartialResponse(event, recordHandler, processor, { + * context, + * throwOnFullBatchFailure: false + * }); + * ``` + * * @param event The event object containing the batch of records * @param recordHandler Async function to process each record from the batch * @param processor Batch processor instance to handle the batch processing diff --git a/packages/batch/src/processPartialResponseSync.ts b/packages/batch/src/processPartialResponseSync.ts index cdf349dd84..c8eca684e7 100644 --- a/packages/batch/src/processPartialResponseSync.ts +++ b/packages/batch/src/processPartialResponseSync.ts @@ -68,6 +68,30 @@ import type { * }); * ``` * + * By default, if the entire batch fails, the function will throw an error. + * If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false` + * + * @example + * ```typescript + * import { + * SqsFifoPartialProcessor, + * processPartialResponseSync, + * } from '@aws-lambda-powertools/batch'; + * import type { SQSRecord, SQSHandler } from 'aws-lambda'; + * + * const processor = new SqsFifoPartialProcessor(); + * + * const recordHandler = async (record: SQSRecord): Promise => { + * const payload = JSON.parse(record.body); + * }; + * + * export const handler: SQSHandler = async (event, context) => + * processPartialResponseSync(event, recordHandler, processor, { + * context, + * throwOnFullBatchFailure: false + * }); + * ``` + * * @param event The event object containing the batch of records * @param recordHandler Sync function to process each record from the batch * @param processor Batch processor instance to handle the batch processing From c01e2d6b12c37695738684c1348499b4f543d3f6 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 30 Jun 2024 17:10:45 +0600 Subject: [PATCH 07/10] doc: mention `throwOnFullBatchFailure` in all records failed section --- docs/utilities/batch.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 9f8d8852c5..deffc9818a 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -261,7 +261,7 @@ All records in the batch will be passed to this handler for processing, even if * **All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}` * **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing -* **All records failed to be processed**. We will raise `BatchProcessingError` exception with a list of all exceptions raised when processing +* **All records failed to be processed**. We will raise `BatchProcessingError` exception with a list of all exceptions raised when processing. This exception can be bypassed if you set `throwOnFullBatchFailure` option to `false` The following sequence diagrams explain how each Batch processor behaves under different scenarios. From db1a5412f261b673e44e409ba5e180c286bad974 Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Thu, 4 Jul 2024 10:11:30 +0200 Subject: [PATCH 08/10] chore: add period to sentence --- docs/utilities/batch.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index deffc9818a..e46f1c1c91 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -261,7 +261,7 @@ All records in the batch will be passed to this handler for processing, even if * **All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}` * **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing -* **All records failed to be processed**. We will raise `BatchProcessingError` exception with a list of all exceptions raised when processing. This exception can be bypassed if you set `throwOnFullBatchFailure` option to `false` +* **All records failed to be processed**. We will raise `BatchProcessingError` exception with a list of all exceptions raised when processing. This exception can be bypassed if you set `throwOnFullBatchFailure` option to `false`. The following sequence diagrams explain how each Batch processor behaves under different scenarios. From e9450936789fe63b82e04d8077b6db87b5a4248f Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Mon, 8 Jul 2024 15:34:19 +0200 Subject: [PATCH 09/10] docs(batch): added new option to dosc --- docs/utilities/batch.md | 14 +++++++++++++- .../batch/noThrowOnFullBatchFailure.ts | 18 ++++++++++++++++++ .../batch/src/BasePartialBatchProcessor.ts | 5 ++--- 3 files changed, 33 insertions(+), 4 deletions(-) create mode 100644 examples/snippets/batch/noThrowOnFullBatchFailure.ts diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index e46f1c1c91..5ecc603728 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -261,7 +261,7 @@ All records in the batch will be passed to this handler for processing, even if * **All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}` * **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing -* **All records failed to be processed**. We will raise `BatchProcessingError` exception with a list of all exceptions raised when processing. This exception can be bypassed if you set `throwOnFullBatchFailure` option to `false`. +* **All records failed to be processed**. We will throw a `FullBatchFailureError` error with a list of all the errors thrown while processing unless `throwOnFullBatchFailure` is disabled. The following sequence diagrams explain how each Batch processor behaves under different scenarios. @@ -450,6 +450,18 @@ We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lam --8<-- "examples/snippets/batch/accessLambdaContext.ts" ``` +### Working with full batch failures + +By default, the `BatchProcessor` will throw a `FullBatchFailureError` if all records in the batch fail to process, we do this to reflect the failure in your operational metrics. + +In some cases, for example such as when working with small batches or when using errors as flow control mechanism, this behavior might not be desired and end up negatively impacting the concurrency of your function. + +For these scenarios, you can set the `throwOnFullBatchFailure` option to `false` when calling. + +```typescript hl_lines="17" +--8<-- "examples/snippets/batch/noThrowOnFullBatchFailure.ts" +``` + ### Extending BatchProcessor You might want to bring custom logic to the existing `BatchProcessor` to slightly override how we handle successes and failures. diff --git a/examples/snippets/batch/noThrowOnFullBatchFailure.ts b/examples/snippets/batch/noThrowOnFullBatchFailure.ts new file mode 100644 index 0000000000..c5462f9ddc --- /dev/null +++ b/examples/snippets/batch/noThrowOnFullBatchFailure.ts @@ -0,0 +1,18 @@ +import { + BatchProcessor, + EventType, + processPartialResponse, +} from '@aws-lambda-powertools/batch'; +import type { SQSHandler, SQSRecord } from 'aws-lambda'; + +const processor = new BatchProcessor(EventType.SQS); + +const recordHandler = async (_record: SQSRecord): Promise => { + // Process the record +}; + +export const handler: SQSHandler = async (event, context) => + processPartialResponse(event, recordHandler, processor, { + context, + throwOnFullBatchFailure: false, + }); diff --git a/packages/batch/src/BasePartialBatchProcessor.ts b/packages/batch/src/BasePartialBatchProcessor.ts index 09f11b3d94..d89f584c37 100644 --- a/packages/batch/src/BasePartialBatchProcessor.ts +++ b/packages/batch/src/BasePartialBatchProcessor.ts @@ -63,9 +63,8 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { /** * Clean up logic to be run after processing a batch * - * If the entire batch failed, and `throwOnFullBatchFailure` option is not explicitly - * set to `false`, this method will throw a `FullBatchFailureError` with the list of - * errors that occurred during processing. + * If the entire batch failed this method will throw a {@link FullBatchFailureError | `FullBatchFailureError`} with the list of + * errors that occurred during processing, unless the `throwOnFullBatchFailure` option is set to `false`. * * Otherwise, it will build the partial failure response based on the event type. */ From 37bd6667a7a122dfdd3d92d294d322fd5f5e7c06 Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Mon, 8 Jul 2024 16:57:16 +0200 Subject: [PATCH 10/10] docs(batch): rephrase paragraph --- docs/utilities/batch.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 5ecc603728..e20563b20d 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -454,7 +454,7 @@ We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lam By default, the `BatchProcessor` will throw a `FullBatchFailureError` if all records in the batch fail to process, we do this to reflect the failure in your operational metrics. -In some cases, for example such as when working with small batches or when using errors as flow control mechanism, this behavior might not be desired and end up negatively impacting the concurrency of your function. +When working with functions that handle batches with a small number of records, or when you use errors as a flow control mechanism, this behavior might not be desirable as your function might generate an unnaturally high number of errors. When this happens, the [Lambda service will scale down the concurrency of your function](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-backoff-strategy){target="_blank"}, potentially impacting performance. For these scenarios, you can set the `throwOnFullBatchFailure` option to `false` when calling.