Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): add option to not throw FullBatchFailureError when the entire batch fails #2711

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ All records in the batch will be passed to this handler for processing, even if

* **All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}`
* **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing
* **All records failed to be processed**. We will raise `BatchProcessingError` exception with a list of all exceptions raised when processing
* **All records failed to be processed**. We will throw a `FullBatchFailureError` error with a list of all the errors thrown while processing unless `throwOnFullBatchFailure` is disabled.

The following sequence diagrams explain how each Batch processor behaves under different scenarios.

Expand Down Expand Up @@ -450,6 +450,18 @@ We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lam
--8<-- "examples/snippets/batch/accessLambdaContext.ts"
```

### Working with full batch failures

By default, the `BatchProcessor` will throw a `FullBatchFailureError` if all records in the batch fail to process, we do this to reflect the failure in your operational metrics.

In some cases, for example such as when working with small batches or when using errors as flow control mechanism, this behavior might not be desired and end up negatively impacting the concurrency of your function.
dreamorosi marked this conversation as resolved.
Show resolved Hide resolved

For these scenarios, you can set the `throwOnFullBatchFailure` option to `false` when calling.

```typescript hl_lines="17"
--8<-- "examples/snippets/batch/noThrowOnFullBatchFailure.ts"
```

### Extending BatchProcessor

You might want to bring custom logic to the existing `BatchProcessor` to slightly override how we handle successes and failures.
Expand Down
18 changes: 18 additions & 0 deletions examples/snippets/batch/noThrowOnFullBatchFailure.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import type { SQSHandler, SQSRecord } from 'aws-lambda';

const processor = new BatchProcessor(EventType.SQS);

const recordHandler = async (_record: SQSRecord): Promise<void> => {
// Process the record
};

export const handler: SQSHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
throwOnFullBatchFailure: false,
});
10 changes: 6 additions & 4 deletions packages/batch/src/BasePartialBatchProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
/**
* Clean up logic to be run after processing a batch
*
* If the entire batch failed, and the utility is not configured otherwise,
* this method will throw a `FullBatchFailureError` with the list of errors
* that occurred during processing.
* If the entire batch failed this method will throw a {@link FullBatchFailureError | `FullBatchFailureError`} with the list of
* errors that occurred during processing, unless the `throwOnFullBatchFailure` option is set to `false`.
*
* Otherwise, it will build the partial failure response based on the event type.
*/
Expand All @@ -74,7 +73,10 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
return;
}

if (this.entireBatchFailed()) {
if (
this.options?.throwOnFullBatchFailure !== false &&
this.entireBatchFailed()
) {
throw new FullBatchFailureError(this.errors);
}

Expand Down
25 changes: 25 additions & 0 deletions packages/batch/src/processPartialResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,31 @@ import type {
* });
* ```
*
* By default, if the entire batch fails, the function will throw an error.
* If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false`
*
* @example
* ```typescript
* import {
* BatchProcessor,
* EventType,
* processPartialResponse,
* } from '@aws-lambda-powertools/batch';
* import type { KinesisStreamHandler, KinesisStreamRecord } from 'aws-lambda';
*
* const processor = new BatchProcessor(EventType.KinesisDataStreams);
*
* const recordHandler = async (record: KinesisStreamRecord): Promise<void> => {
* const payload = JSON.parse(record.kinesis.data);
* };
*
* export const handler: KinesisStreamHandler = async (event, context) =>
* processPartialResponse(event, recordHandler, processor, {
* context,
* throwOnFullBatchFailure: false
* });
* ```
*
* @param event The event object containing the batch of records
* @param recordHandler Async function to process each record from the batch
* @param processor Batch processor instance to handle the batch processing
Expand Down
24 changes: 24 additions & 0 deletions packages/batch/src/processPartialResponseSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,30 @@ import type {
* });
* ```
*
* By default, if the entire batch fails, the function will throw an error.
* If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false`
*
* @example
* ```typescript
* import {
* SqsFifoPartialProcessor,
* processPartialResponseSync,
* } from '@aws-lambda-powertools/batch';
* import type { SQSRecord, SQSHandler } from 'aws-lambda';
*
* const processor = new SqsFifoPartialProcessor();
*
* const recordHandler = async (record: SQSRecord): Promise<void> => {
* const payload = JSON.parse(record.body);
* };
*
* export const handler: SQSHandler = async (event, context) =>
* processPartialResponseSync(event, recordHandler, processor, {
* context,
* throwOnFullBatchFailure: false
* });
* ```
*
* @param event The event object containing the batch of records
* @param recordHandler Sync function to process each record from the batch
* @param processor Batch processor instance to handle the batch processing
Expand Down
5 changes: 5 additions & 0 deletions packages/batch/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js';
* @template T The type of the batch processor, defaults to BasePartialBatchProcessor
* @property context The context object provided by the AWS Lambda runtime
* @property skipGroupOnError The option to group on error during processing
* @property throwOnFullBatchFailure The option to throw an error if the entire batch fails
*/
type BatchProcessingOptions<T = BasePartialBatchProcessor> = {
/**
Expand All @@ -25,6 +26,10 @@ type BatchProcessingOptions<T = BasePartialBatchProcessor> = {
* If true skip the group on error during processing.
*/
skipGroupOnError?: T extends SqsFifoPartialProcessor ? boolean : never;
/**
* Set this to false to prevent throwing an error if the entire batch fails.
*/
throwOnFullBatchFailure?: boolean;
};

/**
Expand Down
123 changes: 123 additions & 0 deletions packages/batch/tests/unit/processPartialResponse.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
processPartialResponse,
EventType,
UnexpectedBatchTypeError,
FullBatchFailureError,
} from '../../src/index.js';
import type {
BatchProcessingOptions,
Expand Down Expand Up @@ -90,6 +91,59 @@ describe('Function: processPartialResponse()', () => {
// Assess
expect(ret).toStrictEqual({ batchItemFailures: [] });
});

test('Process partial response function call with asynchronous handler for full batch failure', async () => {
// Prepare
const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')];
const batch = { Records: records };
const processor = new BatchProcessor(EventType.SQS);

// Act & Assess
await expect(
processPartialResponse(batch, asyncSqsRecordHandler, processor)
).rejects.toThrow(FullBatchFailureError);
});

test('Process partial response function call with asynchronous handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => {
// Prepare
const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')];
const batch = { Records: records };
const processor = new BatchProcessor(EventType.SQS);

// Act & Assess
await expect(
processPartialResponse(batch, asyncSqsRecordHandler, processor, {
...options,
throwOnFullBatchFailure: true,
})
).rejects.toThrow(FullBatchFailureError);
});

test('Process partial response function call with asynchronous handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => {
// Prepare
const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')];
const batch = { Records: records };
const processor = new BatchProcessor(EventType.SQS);

// Act
const response = await processPartialResponse(
batch,
asyncSqsRecordHandler,
processor,
{
...options,
throwOnFullBatchFailure: false,
}
);

// Assess
expect(response).toStrictEqual({
batchItemFailures: [
{ itemIdentifier: records[0].messageId },
{ itemIdentifier: records[1].messageId },
],
});
});
});

describe('Process partial response function call through handler', () => {
Expand Down Expand Up @@ -228,5 +282,74 @@ describe('Function: processPartialResponse()', () => {
// Assess
expect(result).toStrictEqual({ batchItemFailures: [] });
});

test('Process partial response through handler for full batch failure', async () => {
// Prepare
const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')];
const processor = new BatchProcessor(EventType.SQS);
const event: SQSEvent = { Records: records };

const handler = async (
event: SQSEvent,
_context: Context
): Promise<PartialItemFailureResponse> => {
return processPartialResponse(event, asyncSqsRecordHandler, processor);
};

// Act & Assess
await expect(handler(event, context)).rejects.toThrow(
FullBatchFailureError
);
});

test('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => {
// Prepare
const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')];
const processor = new BatchProcessor(EventType.SQS);
const event: SQSEvent = { Records: records };

const handler = async (
event: SQSEvent,
_context: Context
): Promise<PartialItemFailureResponse> => {
return processPartialResponse(event, asyncSqsRecordHandler, processor, {
...options,
throwOnFullBatchFailure: true,
});
};

// Act & Assess
await expect(handler(event, context)).rejects.toThrow(
FullBatchFailureError
);
});

test('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => {
// Prepare
const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')];
const processor = new BatchProcessor(EventType.SQS);
const event: SQSEvent = { Records: records };

const handler = async (
event: SQSEvent,
_context: Context
): Promise<PartialItemFailureResponse> => {
return processPartialResponse(event, asyncSqsRecordHandler, processor, {
...options,
throwOnFullBatchFailure: false,
});
};

// Act
const response = await handler(event, context);

// Assess
expect(response).toStrictEqual({
batchItemFailures: [
{ itemIdentifier: records[0].messageId },
{ itemIdentifier: records[1].messageId },
],
});
});
});
});
Loading