diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 9f8d8852c5..e20563b20d 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -261,7 +261,7 @@ All records in the batch will be passed to this handler for processing, even if * **All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}` * **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing -* **All records failed to be processed**. We will raise `BatchProcessingError` exception with a list of all exceptions raised when processing +* **All records failed to be processed**. We will throw a `FullBatchFailureError` error with a list of all the errors thrown while processing unless `throwOnFullBatchFailure` is disabled. The following sequence diagrams explain how each Batch processor behaves under different scenarios. @@ -450,6 +450,18 @@ We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lam --8<-- "examples/snippets/batch/accessLambdaContext.ts" ``` +### Working with full batch failures + +By default, the `BatchProcessor` will throw a `FullBatchFailureError` if all records in the batch fail to process, we do this to reflect the failure in your operational metrics. + +When working with functions that handle batches with a small number of records, or when you use errors as a flow control mechanism, this behavior might not be desirable as your function might generate an unnaturally high number of errors. When this happens, the [Lambda service will scale down the concurrency of your function](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-backoff-strategy){target="_blank"}, potentially impacting performance. + +For these scenarios, you can set the `throwOnFullBatchFailure` option to `false` when calling. + +```typescript hl_lines="17" +--8<-- "examples/snippets/batch/noThrowOnFullBatchFailure.ts" +``` + ### Extending BatchProcessor You might want to bring custom logic to the existing `BatchProcessor` to slightly override how we handle successes and failures. diff --git a/examples/snippets/batch/noThrowOnFullBatchFailure.ts b/examples/snippets/batch/noThrowOnFullBatchFailure.ts new file mode 100644 index 0000000000..c5462f9ddc --- /dev/null +++ b/examples/snippets/batch/noThrowOnFullBatchFailure.ts @@ -0,0 +1,18 @@ +import { + BatchProcessor, + EventType, + processPartialResponse, +} from '@aws-lambda-powertools/batch'; +import type { SQSHandler, SQSRecord } from 'aws-lambda'; + +const processor = new BatchProcessor(EventType.SQS); + +const recordHandler = async (_record: SQSRecord): Promise => { + // Process the record +}; + +export const handler: SQSHandler = async (event, context) => + processPartialResponse(event, recordHandler, processor, { + context, + throwOnFullBatchFailure: false, + }); diff --git a/packages/batch/src/BasePartialBatchProcessor.ts b/packages/batch/src/BasePartialBatchProcessor.ts index ca913e49e9..d89f584c37 100644 --- a/packages/batch/src/BasePartialBatchProcessor.ts +++ b/packages/batch/src/BasePartialBatchProcessor.ts @@ -63,9 +63,8 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { /** * Clean up logic to be run after processing a batch * - * If the entire batch failed, and the utility is not configured otherwise, - * this method will throw a `FullBatchFailureError` with the list of errors - * that occurred during processing. + * If the entire batch failed this method will throw a {@link FullBatchFailureError | `FullBatchFailureError`} with the list of + * errors that occurred during processing, unless the `throwOnFullBatchFailure` option is set to `false`. * * Otherwise, it will build the partial failure response based on the event type. */ @@ -74,7 +73,10 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { return; } - if (this.entireBatchFailed()) { + if ( + this.options?.throwOnFullBatchFailure !== false && + this.entireBatchFailed() + ) { throw new FullBatchFailureError(this.errors); } diff --git a/packages/batch/src/processPartialResponse.ts b/packages/batch/src/processPartialResponse.ts index ca74e2801e..4423e1c179 100644 --- a/packages/batch/src/processPartialResponse.ts +++ b/packages/batch/src/processPartialResponse.ts @@ -42,6 +42,31 @@ import type { * }); * ``` * + * By default, if the entire batch fails, the function will throw an error. + * If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false` + * + * @example + * ```typescript + * import { + * BatchProcessor, + * EventType, + * processPartialResponse, + * } from '@aws-lambda-powertools/batch'; + * import type { KinesisStreamHandler, KinesisStreamRecord } from 'aws-lambda'; + * + * const processor = new BatchProcessor(EventType.KinesisDataStreams); + * + * const recordHandler = async (record: KinesisStreamRecord): Promise => { + * const payload = JSON.parse(record.kinesis.data); + * }; + * + * export const handler: KinesisStreamHandler = async (event, context) => + * processPartialResponse(event, recordHandler, processor, { + * context, + * throwOnFullBatchFailure: false + * }); + * ``` + * * @param event The event object containing the batch of records * @param recordHandler Async function to process each record from the batch * @param processor Batch processor instance to handle the batch processing diff --git a/packages/batch/src/processPartialResponseSync.ts b/packages/batch/src/processPartialResponseSync.ts index cdf349dd84..c8eca684e7 100644 --- a/packages/batch/src/processPartialResponseSync.ts +++ b/packages/batch/src/processPartialResponseSync.ts @@ -68,6 +68,30 @@ import type { * }); * ``` * + * By default, if the entire batch fails, the function will throw an error. + * If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false` + * + * @example + * ```typescript + * import { + * SqsFifoPartialProcessor, + * processPartialResponseSync, + * } from '@aws-lambda-powertools/batch'; + * import type { SQSRecord, SQSHandler } from 'aws-lambda'; + * + * const processor = new SqsFifoPartialProcessor(); + * + * const recordHandler = async (record: SQSRecord): Promise => { + * const payload = JSON.parse(record.body); + * }; + * + * export const handler: SQSHandler = async (event, context) => + * processPartialResponseSync(event, recordHandler, processor, { + * context, + * throwOnFullBatchFailure: false + * }); + * ``` + * * @param event The event object containing the batch of records * @param recordHandler Sync function to process each record from the batch * @param processor Batch processor instance to handle the batch processing diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index 51f9b78e34..a267f092b6 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -13,6 +13,7 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; * @template T The type of the batch processor, defaults to BasePartialBatchProcessor * @property context The context object provided by the AWS Lambda runtime * @property skipGroupOnError The option to group on error during processing + * @property throwOnFullBatchFailure The option to throw an error if the entire batch fails */ type BatchProcessingOptions = { /** @@ -25,6 +26,10 @@ type BatchProcessingOptions = { * If true skip the group on error during processing. */ skipGroupOnError?: T extends SqsFifoPartialProcessor ? boolean : never; + /** + * Set this to false to prevent throwing an error if the entire batch fails. + */ + throwOnFullBatchFailure?: boolean; }; /** diff --git a/packages/batch/tests/unit/processPartialResponse.test.ts b/packages/batch/tests/unit/processPartialResponse.test.ts index 732d40ee3b..1d379e6398 100644 --- a/packages/batch/tests/unit/processPartialResponse.test.ts +++ b/packages/batch/tests/unit/processPartialResponse.test.ts @@ -15,6 +15,7 @@ import { processPartialResponse, EventType, UnexpectedBatchTypeError, + FullBatchFailureError, } from '../../src/index.js'; import type { BatchProcessingOptions, @@ -90,6 +91,59 @@ describe('Function: processPartialResponse()', () => { // Assess expect(ret).toStrictEqual({ batchItemFailures: [] }); }); + + test('Process partial response function call with asynchronous handler for full batch failure', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); + + // Act & Assess + await expect( + processPartialResponse(batch, asyncSqsRecordHandler, processor) + ).rejects.toThrow(FullBatchFailureError); + }); + + test('Process partial response function call with asynchronous handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); + + // Act & Assess + await expect( + processPartialResponse(batch, asyncSqsRecordHandler, processor, { + ...options, + throwOnFullBatchFailure: true, + }) + ).rejects.toThrow(FullBatchFailureError); + }); + + test('Process partial response function call with asynchronous handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); + + // Act + const response = await processPartialResponse( + batch, + asyncSqsRecordHandler, + processor, + { + ...options, + throwOnFullBatchFailure: false, + } + ); + + // Assess + expect(response).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: records[0].messageId }, + { itemIdentifier: records[1].messageId }, + ], + }); + }); }); describe('Process partial response function call through handler', () => { @@ -228,5 +282,74 @@ describe('Function: processPartialResponse()', () => { // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); }); + + test('Process partial response through handler for full batch failure', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const processor = new BatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return processPartialResponse(event, asyncSqsRecordHandler, processor); + }; + + // Act & Assess + await expect(handler(event, context)).rejects.toThrow( + FullBatchFailureError + ); + }); + + test('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const processor = new BatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return processPartialResponse(event, asyncSqsRecordHandler, processor, { + ...options, + throwOnFullBatchFailure: true, + }); + }; + + // Act & Assess + await expect(handler(event, context)).rejects.toThrow( + FullBatchFailureError + ); + }); + + test('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const processor = new BatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return processPartialResponse(event, asyncSqsRecordHandler, processor, { + ...options, + throwOnFullBatchFailure: false, + }); + }; + + // Act + const response = await handler(event, context); + + // Assess + expect(response).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: records[0].messageId }, + { itemIdentifier: records[1].messageId }, + ], + }); + }); }); }); diff --git a/packages/batch/tests/unit/processPartialResponseSync.test.ts b/packages/batch/tests/unit/processPartialResponseSync.test.ts index 4fe7da8609..8d4bb1e263 100644 --- a/packages/batch/tests/unit/processPartialResponseSync.test.ts +++ b/packages/batch/tests/unit/processPartialResponseSync.test.ts @@ -15,6 +15,7 @@ import { processPartialResponseSync, EventType, UnexpectedBatchTypeError, + FullBatchFailureError, } from '../../src/index.js'; import type { BatchProcessingOptions, @@ -90,6 +91,59 @@ describe('Function: processPartialResponse()', () => { // Assess expect(ret).toStrictEqual({ batchItemFailures: [] }); }); + + test('Process partial response function call with synchronous handler for full batch failure', () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const batch = { Records: records }; + const processor = new BatchProcessorSync(EventType.SQS); + + // Act & Assess + expect(() => + processPartialResponseSync(batch, sqsRecordHandler, processor) + ).toThrow(FullBatchFailureError); + }); + + test('Process partial response function call with synchronous handler for full batch failure when `throwOnFullBatchFailure` is `true`', () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const batch = { Records: records }; + const processor = new BatchProcessorSync(EventType.SQS); + + // Act & Assess + expect(() => + processPartialResponseSync(batch, sqsRecordHandler, processor, { + ...options, + throwOnFullBatchFailure: true, + }) + ).toThrow(FullBatchFailureError); + }); + + test('Process partial response function call with synchronous handler for full batch failure when `throwOnFullBatchFailure` is `false`', () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const batch = { Records: records }; + const processor = new BatchProcessorSync(EventType.SQS); + + // Act + const response = processPartialResponseSync( + batch, + sqsRecordHandler, + processor, + { + ...options, + throwOnFullBatchFailure: false, + } + ); + + // Assess + expect(response).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: records[0].messageId }, + { itemIdentifier: records[1].messageId }, + ], + }); + }); }); describe('Process partial response function call through handler', () => { @@ -224,5 +278,70 @@ describe('Function: processPartialResponse()', () => { // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); }); + + test('Process partial response through handler for full batch failure', () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const processor = new BatchProcessorSync(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = ( + event: SQSEvent, + _context: Context + ): PartialItemFailureResponse => { + return processPartialResponseSync(event, sqsRecordHandler, processor); + }; + + // Act & Assess + expect(() => handler(event, context)).toThrow(FullBatchFailureError); + }); + + test('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `true`', () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const processor = new BatchProcessorSync(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = ( + event: SQSEvent, + _context: Context + ): PartialItemFailureResponse => { + return processPartialResponseSync(event, sqsRecordHandler, processor, { + ...options, + throwOnFullBatchFailure: true, + }); + }; + + // Act & Assess + expect(() => handler(event, context)).toThrow(FullBatchFailureError); + }); + + test('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `false`', () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const processor = new BatchProcessorSync(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = ( + event: SQSEvent, + _context: Context + ): PartialItemFailureResponse => { + return processPartialResponseSync(event, sqsRecordHandler, processor, { + ...options, + throwOnFullBatchFailure: false, + }); + }; + + // Act + const response = handler(event, context); + + // Assess + expect(response).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: records[0].messageId }, + { itemIdentifier: records[1].messageId }, + ], + }); + }); }); });