From 4e6c0d2fb0abe613ea0b76c55cd9b7378ec6306d Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Wed, 19 Jul 2023 19:10:03 +0200 Subject: [PATCH] feat(batch): add async processor (#1616) * feat(batch): add async processor * tests: improved unit tests * chore: removed docstring + edited test handler --- packages/batch/package.json | 7 +- packages/batch/src/AsyncBatchProcessor.ts | 31 ++ .../batch/src/BasePartialBatchProcessor.ts | 34 +- packages/batch/src/BasePartialProcessor.ts | 58 +++- packages/batch/src/BatchProcessor.ts | 27 +- packages/batch/src/SqsFifoPartialProcessor.ts | 9 +- .../batch/src/asyncProcessPartialResponse.ts | 38 +++ packages/batch/src/constants.ts | 8 +- packages/batch/src/errors.ts | 4 - packages/batch/src/index.ts | 2 + packages/batch/src/processPartialResponse.ts | 14 +- packages/batch/src/types.ts | 7 +- packages/batch/tests/helpers/factories.ts | 11 +- packages/batch/tests/helpers/handlers.ts | 26 +- .../tests/unit/AsyncBatchProcessor.test.ts | 296 ++++++++++++++++++ .../batch/tests/unit/BatchProcessor.test.ts | 291 +++-------------- .../unit/SqsFifoPartialProcessor.test.ts | 69 +--- .../unit/asyncProcessPartialResponse.test.ts | 231 ++++++++++++++ .../tests/unit/processPartialResponse.test.ts | 117 +++---- 19 files changed, 819 insertions(+), 461 deletions(-) create mode 100644 packages/batch/src/AsyncBatchProcessor.ts create mode 100644 packages/batch/src/asyncProcessPartialResponse.ts create mode 100644 packages/batch/tests/unit/AsyncBatchProcessor.test.ts create mode 100644 packages/batch/tests/unit/asyncProcessPartialResponse.test.ts diff --git a/packages/batch/package.json b/packages/batch/package.json index 44e9b37964..4ed9288751 100644 --- a/packages/batch/package.json +++ b/packages/batch/package.json @@ -1,6 +1,6 @@ { "name": "@aws-lambda-powertools/batch", - "version": "1.10.0", + "version": "1.11.1", "description": "The batch processing package for the Powertools for AWS Lambda (TypeScript) library.", "author": { "name": "Amazon Web Services", @@ -22,8 +22,7 @@ "prepack": "node ../../.github/scripts/release_patch_package_json.js ." }, "lint-staged": { - "*.ts": "npm run lint-fix", - "*.js": "npm run lint-fix" + "*.{js,ts}": "npm run lint-fix" }, "homepage": "https://github.com/aws-powertools/powertools-lambda-typescript/tree/main/packages/batch#readme", "license": "MIT-0", @@ -50,4 +49,4 @@ "nodejs" ], "devDependencies": {} -} +} \ No newline at end of file diff --git a/packages/batch/src/AsyncBatchProcessor.ts b/packages/batch/src/AsyncBatchProcessor.ts new file mode 100644 index 0000000000..781c7f1c79 --- /dev/null +++ b/packages/batch/src/AsyncBatchProcessor.ts @@ -0,0 +1,31 @@ +import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; +import type { BaseRecord, FailureResponse, SuccessResponse } from './types'; + +/** + * Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB + */ +class AsyncBatchProcessor extends BasePartialBatchProcessor { + public async asyncProcessRecord( + record: BaseRecord + ): Promise { + try { + const data = this.toBatchType(record, this.eventType); + const result = await this.handler(data, this.options); + + return this.successHandler(record, result); + } catch (error) { + return this.failureHandler(record, error as Error); + } + } + + /** + * Process a record with instance's handler + * @param record Batch record to be processed + * @returns response of success or failure + */ + public processRecord(_record: BaseRecord): SuccessResponse | FailureResponse { + throw new Error('Not implemented. Use asyncProcess() instead.'); + } +} + +export { AsyncBatchProcessor }; diff --git a/packages/batch/src/BasePartialBatchProcessor.ts b/packages/batch/src/BasePartialBatchProcessor.ts index 806331d6fc..d4cfd7e9ce 100644 --- a/packages/batch/src/BasePartialBatchProcessor.ts +++ b/packages/batch/src/BasePartialBatchProcessor.ts @@ -1,18 +1,20 @@ -/** - * Process batch and partially report failed items - */ -import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; -import { - BasePartialProcessor, - BatchProcessingError, - DATA_CLASS_MAPPING, - DEFAULT_RESPONSE, +import type { + DynamoDBRecord, + KinesisStreamRecord, + SQSRecord, +} from 'aws-lambda'; +import { BasePartialProcessor } from './BasePartialProcessor'; +import { DATA_CLASS_MAPPING, DEFAULT_RESPONSE, EventType } from './constants'; +import { BatchProcessingError } from './errors'; +import type { EventSourceDataClassTypes, - EventType, - PartialItemFailures, PartialItemFailureResponse, -} from '.'; + PartialItemFailures, +} from './types'; +/** + * Process batch and partially report failed items + */ abstract class BasePartialBatchProcessor extends BasePartialProcessor { public COLLECTOR_MAPPING; @@ -124,13 +126,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { * @returns true if any records resulted in exception */ public hasMessagesToReport(): boolean { - if (this.failureMessages.length != 0) { - return true; - } - - // console.debug('All ' + this.successMessages.length + ' records successfully processed'); - - return false; + return this.failureMessages.length != 0; } /** diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts index e1200a75f7..ecd62c29b0 100644 --- a/packages/batch/src/BasePartialProcessor.ts +++ b/packages/batch/src/BasePartialProcessor.ts @@ -1,15 +1,15 @@ -/** - * Abstract class for batch processors - */ -import { +import type { BaseRecord, BatchProcessingOptions, EventSourceDataClassTypes, FailureResponse, ResultType, SuccessResponse, -} from '.'; +} from './types'; +/** + * Abstract class for batch processors. + */ abstract class BasePartialProcessor { public exceptions: Error[]; @@ -34,6 +34,40 @@ abstract class BasePartialProcessor { this.handler = new Function(); } + /** + * Call instance's handler for each record + * @returns List of processed records + */ + public async asyncProcess(): Promise<(SuccessResponse | FailureResponse)[]> { + /** + * If this is an sync processor, user should have called process instead, + * so we call the method early to throw the error early thus failing fast. + */ + if (this.constructor.name === 'BatchProcessor') { + await this.asyncProcessRecord(this.records[0]); + } + this.prepare(); + + const processingPromises: Promise[] = + this.records.map((record) => this.asyncProcessRecord(record)); + + const processedRecords: (SuccessResponse | FailureResponse)[] = + await Promise.all(processingPromises); + + this.clean(); + + return processedRecords; + } + + /** + * Process a record with an asyncronous handler + * + * @param record Record to be processed + */ + public abstract asyncProcessRecord( + record: BaseRecord + ): Promise; + /** * Clean class instance after processing */ @@ -50,7 +84,6 @@ abstract class BasePartialProcessor { exception: Error ): FailureResponse { const entry: FailureResponse = ['fail', exception.message, record]; - // console.debug('Record processing exception: ' + exception.message); this.exceptions.push(exception); this.failureMessages.push(record); @@ -66,12 +99,19 @@ abstract class BasePartialProcessor { * Call instance's handler for each record * @returns List of processed records */ - public async process(): Promise<(SuccessResponse | FailureResponse)[]> { + public process(): (SuccessResponse | FailureResponse)[] { + /** + * If this is an async processor, user should have called processAsync instead, + * so we call the method early to throw the error early thus failing fast. + */ + if (this.constructor.name === 'AsyncBatchProcessor') { + this.processRecord(this.records[0]); + } this.prepare(); const processedRecords: (SuccessResponse | FailureResponse)[] = []; for (const record of this.records) { - processedRecords.push(await this.processRecord(record)); + processedRecords.push(this.processRecord(record)); } this.clean(); @@ -85,7 +125,7 @@ abstract class BasePartialProcessor { */ public abstract processRecord( record: BaseRecord - ): Promise; + ): SuccessResponse | FailureResponse; /** * Set class instance attributes before execution diff --git a/packages/batch/src/BatchProcessor.ts b/packages/batch/src/BatchProcessor.ts index 40d8a2fcd8..3d2a75a8da 100644 --- a/packages/batch/src/BatchProcessor.ts +++ b/packages/batch/src/BatchProcessor.ts @@ -1,30 +1,29 @@ +import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; +import type { BaseRecord, FailureResponse, SuccessResponse } from './types'; + /** * Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB */ -import { - BasePartialBatchProcessor, - BaseRecord, - FailureResponse, - SuccessResponse, -} from '.'; - class BatchProcessor extends BasePartialBatchProcessor { + public async asyncProcessRecord( + _record: BaseRecord + ): Promise { + throw new Error('Not implemented. Use process() instead.'); + } + /** * Process a record with instance's handler * @param record Batch record to be processed * @returns response of success or failure */ - public async processRecord( - record: BaseRecord - ): Promise { + public processRecord(record: BaseRecord): SuccessResponse | FailureResponse { try { const data = this.toBatchType(record, this.eventType); - - const result = await this.handler(data, this.options); + const result = this.handler(data, this.options); return this.successHandler(record, result); - } catch (e) { - return this.failureHandler(record, e as Error); + } catch (error) { + return this.failureHandler(record, error as Error); } } } diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index 703476c6c5..0c10993273 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -1,4 +1,6 @@ -import { BatchProcessor, EventType, FailureResponse, SuccessResponse } from '.'; +import { BatchProcessor } from './BatchProcessor'; +import { EventType } from './constants'; +import type { FailureResponse, SuccessResponse } from './types'; /** * Process native partial responses from SQS FIFO queues @@ -14,9 +16,8 @@ class SqsFifoPartialProcessor extends BatchProcessor { * Call instance's handler for each record. * When the first failed message is detected, the process is short-circuited * And the remaining messages are reported as failed items - * TODO: change to synchronous execution if possible */ - public async process(): Promise<(SuccessResponse | FailureResponse)[]> { + public process(): (SuccessResponse | FailureResponse)[] { this.prepare(); const processedRecords: (SuccessResponse | FailureResponse)[] = []; @@ -28,7 +29,7 @@ class SqsFifoPartialProcessor extends BatchProcessor { return this.shortCircuitProcessing(currentIndex, processedRecords); } - processedRecords.push(await this.processRecord(record)); + processedRecords.push(this.processRecord(record)); currentIndex++; } diff --git a/packages/batch/src/asyncProcessPartialResponse.ts b/packages/batch/src/asyncProcessPartialResponse.ts new file mode 100644 index 0000000000..eee584ed1f --- /dev/null +++ b/packages/batch/src/asyncProcessPartialResponse.ts @@ -0,0 +1,38 @@ +import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; +import { EventType } from './constants'; +import type { + BaseRecord, + BatchProcessingOptions, + PartialItemFailureResponse, +} from './types'; + +/** + * Higher level function to handle batch event processing + * @param event Lambda's original event + * @param recordHandler Callable function to process each record from the batch + * @param processor Batch processor to handle partial failure cases + * @returns Lambda Partial Batch Response + */ +const asyncProcessPartialResponse = async ( + event: { Records: BaseRecord[] }, + recordHandler: CallableFunction, + processor: BasePartialBatchProcessor, + options?: BatchProcessingOptions +): Promise => { + if (!event.Records) { + const eventTypes: string = Object.values(EventType).toString(); + throw new Error( + 'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' + + eventTypes + + ' event.' + ); + } + + processor.register(event.Records, recordHandler, options); + + await processor.asyncProcess(); + + return processor.response(); +}; + +export { asyncProcessPartialResponse }; diff --git a/packages/batch/src/constants.ts b/packages/batch/src/constants.ts index b707a79ca5..02437e356c 100644 --- a/packages/batch/src/constants.ts +++ b/packages/batch/src/constants.ts @@ -1,8 +1,8 @@ -/** - * Constants for batch processor classes - */ import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; -import type { PartialItemFailureResponse, EventSourceDataClassTypes } from '.'; +import type { + PartialItemFailureResponse, + EventSourceDataClassTypes, +} from './types'; const EventType = { SQS: 'SQS', diff --git a/packages/batch/src/errors.ts b/packages/batch/src/errors.ts index f319166602..ed5bd4fc9e 100644 --- a/packages/batch/src/errors.ts +++ b/packages/batch/src/errors.ts @@ -1,7 +1,3 @@ -/** - * Batch processing exceptions - */ - /** * Base error type for batch processing * All errors thrown by major failures extend this base class diff --git a/packages/batch/src/index.ts b/packages/batch/src/index.ts index 82abac172a..96f931823d 100644 --- a/packages/batch/src/index.ts +++ b/packages/batch/src/index.ts @@ -4,5 +4,7 @@ export * from './types'; export * from './BasePartialProcessor'; export * from './BasePartialBatchProcessor'; export * from './BatchProcessor'; +export * from './AsyncBatchProcessor'; export * from './processPartialResponse'; +export * from './asyncProcessPartialResponse'; export * from './SqsFifoPartialProcessor'; diff --git a/packages/batch/src/processPartialResponse.ts b/packages/batch/src/processPartialResponse.ts index 15abaa10af..d09e7be6b9 100644 --- a/packages/batch/src/processPartialResponse.ts +++ b/packages/batch/src/processPartialResponse.ts @@ -1,10 +1,10 @@ -import { - BasePartialBatchProcessor, +import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; +import { EventType } from './constants'; +import type { BaseRecord, BatchProcessingOptions, - EventType, PartialItemFailureResponse, -} from '.'; +} from './types'; /** * Higher level function to handle batch event processing @@ -13,12 +13,12 @@ import { * @param processor Batch processor to handle partial failure cases * @returns Lambda Partial Batch Response */ -const processPartialResponse = async ( +const processPartialResponse = ( event: { Records: BaseRecord[] }, recordHandler: CallableFunction, processor: BasePartialBatchProcessor, options?: BatchProcessingOptions -): Promise => { +): PartialItemFailureResponse => { if (!event.Records) { const eventTypes: string = Object.values(EventType).toString(); throw new Error( @@ -30,7 +30,7 @@ const processPartialResponse = async ( processor.register(event.Records, recordHandler, options); - await processor.process(); + processor.process(); return processor.response(); }; diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index add07642fa..17ce3633c7 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -1,6 +1,3 @@ -/** - * Types for batch processing utility - */ import { Context, DynamoDBRecord, @@ -21,9 +18,9 @@ type RecordValue = unknown; type BaseRecord = { [key: string]: RecordValue } | EventSourceDataClassTypes; type ResultType = unknown; -type SuccessResponse = [string, ResultType, EventSourceDataClassTypes]; +type SuccessResponse = ['success', ResultType, EventSourceDataClassTypes]; -type FailureResponse = [string, string, EventSourceDataClassTypes]; +type FailureResponse = ['fail', string, EventSourceDataClassTypes]; type PartialItemFailures = { itemIdentifier: string }; type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] }; diff --git a/packages/batch/tests/helpers/factories.ts b/packages/batch/tests/helpers/factories.ts index b55e401474..7df6110742 100644 --- a/packages/batch/tests/helpers/factories.ts +++ b/packages/batch/tests/helpers/factories.ts @@ -1,10 +1,13 @@ -import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; -import { randomInt } from 'crypto'; -import { v4 } from 'uuid'; +import type { + DynamoDBRecord, + KinesisStreamRecord, + SQSRecord, +} from 'aws-lambda'; +import { randomInt, randomUUID } from 'node:crypto'; const sqsRecordFactory = (body: string): SQSRecord => { return { - messageId: v4(), + messageId: randomUUID(), receiptHandle: 'AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a', body: body, attributes: { diff --git a/packages/batch/tests/helpers/handlers.ts b/packages/batch/tests/helpers/handlers.ts index e2862730c3..3a6d17b76a 100644 --- a/packages/batch/tests/helpers/handlers.ts +++ b/packages/batch/tests/helpers/handlers.ts @@ -1,5 +1,9 @@ -import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; -import { BatchProcessingOptions } from '../../src'; +import type { + DynamoDBRecord, + KinesisStreamRecord, + SQSRecord, +} from 'aws-lambda'; +import type { BatchProcessingOptions } from '../../src/types'; const sqsRecordHandler = (record: SQSRecord): string => { const body = record.body; @@ -76,6 +80,23 @@ const handlerWithContext = ( return record.body; }; +const asyncHandlerWithContext = async ( + record: SQSRecord, + options: BatchProcessingOptions +): Promise => { + const context = options.context; + + try { + if (context.getRemainingTimeInMillis() == 0) { + throw Error('No time remaining.'); + } + } catch (e) { + throw Error('Context possibly malformed. Displaying context:\n' + context); + } + + return Promise.resolve(record.body); +}; + export { sqsRecordHandler, asyncSqsRecordHandler, @@ -84,4 +105,5 @@ export { dynamodbRecordHandler, asyncDynamodbRecordHandler, handlerWithContext, + asyncHandlerWithContext, }; diff --git a/packages/batch/tests/unit/AsyncBatchProcessor.test.ts b/packages/batch/tests/unit/AsyncBatchProcessor.test.ts new file mode 100644 index 0000000000..9079a1c464 --- /dev/null +++ b/packages/batch/tests/unit/AsyncBatchProcessor.test.ts @@ -0,0 +1,296 @@ +/** + * Test AsyncBatchProcessor class + * + * @group unit/batch/class/asyncBatchProcessor + */ +import type { Context } from 'aws-lambda'; +import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts'; +import { AsyncBatchProcessor } from '../../src/AsyncBatchProcessor'; +import { EventType } from '../../src/constants'; +import { BatchProcessingError } from '../../src/errors'; +import type { BatchProcessingOptions } from '../../src/types'; +import { + dynamodbRecordFactory, + kinesisRecordFactory, + sqsRecordFactory, +} from '../helpers/factories'; +import { + asyncDynamodbRecordHandler, + asyncKinesisRecordHandler, + asyncSqsRecordHandler, + asyncHandlerWithContext, +} from '../helpers/handlers'; + +describe('Class: AsyncBatchProcessor', () => { + const ENVIRONMENT_VARIABLES = process.env; + const options: BatchProcessingOptions = { context: dummyContext }; + + beforeEach(() => { + jest.clearAllMocks(); + jest.resetModules(); + process.env = { ...ENVIRONMENT_VARIABLES }; + }); + + afterAll(() => { + process.env = ENVIRONMENT_VARIABLES; + }); + + describe('Asynchronously processing SQS Records', () => { + test('Batch processing SQS records with no failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new AsyncBatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler); + const processedMessages = await processor.asyncProcess(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.body, firstRecord], + ['success', secondRecord.body, secondRecord], + ]); + }); + + test('Batch processing SQS records with some failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('success'); + const thirdRecord = sqsRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new AsyncBatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler); + const processedMessages = await processor.asyncProcess(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.body, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.messageId }, + { itemIdentifier: thirdRecord.messageId }, + ], + }); + }); + + test('Batch processing SQS records with all failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('failure'); + const thirdRecord = sqsRecordFactory('fail'); + + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new AsyncBatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler); + + // Assess + await expect(processor.asyncProcess()).rejects.toThrowError( + BatchProcessingError + ); + }); + }); + + describe('Asynchronously processing Kinesis Records', () => { + test('Batch processing Kinesis records with no failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('success'); + const secondRecord = kinesisRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new AsyncBatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, asyncKinesisRecordHandler); + const processedMessages = await processor.asyncProcess(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.kinesis.data, firstRecord], + ['success', secondRecord.kinesis.data, secondRecord], + ]); + }); + + test('Batch processing Kinesis records with some failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('success'); + const thirdRecord = kinesisRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new AsyncBatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, asyncKinesisRecordHandler); + const processedMessages = await processor.asyncProcess(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.kinesis.data, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.kinesis.sequenceNumber }, + { itemIdentifier: thirdRecord.kinesis.sequenceNumber }, + ], + }); + }); + + test('Batch processing Kinesis records with all failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('failure'); + const thirdRecord = kinesisRecordFactory('fail'); + + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new AsyncBatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, asyncKinesisRecordHandler); + + // Assess + await expect(processor.asyncProcess()).rejects.toThrowError( + BatchProcessingError + ); + }); + }); + + describe('Asynchronously processing DynamoDB Records', () => { + test('Batch processing DynamoDB records with no failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('success'); + const secondRecord = dynamodbRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new AsyncBatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, asyncDynamodbRecordHandler); + const processedMessages = await processor.asyncProcess(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.dynamodb?.NewImage?.Message, firstRecord], + ['success', secondRecord.dynamodb?.NewImage?.Message, secondRecord], + ]); + }); + + test('Batch processing DynamoDB records with some failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('success'); + const thirdRecord = dynamodbRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new AsyncBatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, asyncDynamodbRecordHandler); + const processedMessages = await processor.asyncProcess(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.dynamodb?.NewImage?.Message, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.dynamodb?.SequenceNumber }, + { itemIdentifier: thirdRecord.dynamodb?.SequenceNumber }, + ], + }); + }); + + test('Batch processing DynamoDB records with all failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('failure'); + const thirdRecord = dynamodbRecordFactory('fail'); + + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new AsyncBatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, asyncDynamodbRecordHandler); + + // Assess + await expect(processor.asyncProcess()).rejects.toThrowError( + BatchProcessingError + ); + }); + }); + + describe('Batch processing with Lambda context', () => { + test('Batch processing when context is provided and handler accepts', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new AsyncBatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncHandlerWithContext, options); + const processedMessages = await processor.asyncProcess(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.body, firstRecord], + ['success', secondRecord.body, secondRecord], + ]); + }); + + test('Batch processing when context is provided and handler does not accept', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new AsyncBatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler, options); + const processedMessages = await processor.asyncProcess(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.body, firstRecord], + ['success', secondRecord.body, secondRecord], + ]); + }); + + test('Batch processing when malformed context is provided and handler attempts to use', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new AsyncBatchProcessor(EventType.SQS); + const badContext = { foo: 'bar' }; + const badOptions = { context: badContext as unknown as Context }; + + // Act + processor.register(records, asyncHandlerWithContext, badOptions); + await expect(() => processor.asyncProcess()).rejects.toThrowError( + BatchProcessingError + ); + }); + }); + + test('When calling the sync process method, it should throw an error', () => { + // Prepare + const processor = new AsyncBatchProcessor(EventType.SQS); + + // Act & Assess + expect(() => processor.process()).toThrowError( + 'Not implemented. Use asyncProcess() instead.' + ); + }); +}); diff --git a/packages/batch/tests/unit/BatchProcessor.test.ts b/packages/batch/tests/unit/BatchProcessor.test.ts index 36529ee533..5be28271d2 100644 --- a/packages/batch/tests/unit/BatchProcessor.test.ts +++ b/packages/batch/tests/unit/BatchProcessor.test.ts @@ -3,29 +3,23 @@ * * @group unit/batch/class/batchprocessor */ - -import { - BatchProcessingError, - BatchProcessingOptions, - BatchProcessor, - EventType, -} from '../../src'; +import type { Context } from 'aws-lambda'; +import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts'; +import { BatchProcessor } from '../../src/BatchProcessor'; +import { EventType } from '../../src/constants'; +import { BatchProcessingError } from '../../src/errors'; +import type { BatchProcessingOptions } from '../../src/types'; import { - sqsRecordFactory, - kinesisRecordFactory, dynamodbRecordFactory, -} from '../../tests/helpers/factories'; + kinesisRecordFactory, + sqsRecordFactory, +} from '../helpers/factories'; import { - sqsRecordHandler, - asyncSqsRecordHandler, - kinesisRecordHandler, - asyncKinesisRecordHandler, dynamodbRecordHandler, - asyncDynamodbRecordHandler, handlerWithContext, -} from '../../tests/helpers/handlers'; -import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts'; -import { Context } from 'aws-lambda'; + kinesisRecordHandler, + sqsRecordHandler, +} from '../helpers/handlers'; describe('Class: BatchProcessor', () => { const ENVIRONMENT_VARIABLES = process.env; @@ -42,7 +36,7 @@ describe('Class: BatchProcessor', () => { }); describe('Synchronously processing SQS Records', () => { - test('Batch processing SQS records with no failures', async () => { + test('Batch processing SQS records with no failures', () => { // Prepare const firstRecord = sqsRecordFactory('success'); const secondRecord = sqsRecordFactory('success'); @@ -51,7 +45,7 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, sqsRecordHandler); - const processedMessages = await processor.process(); + const processedMessages = processor.process(); // Assess expect(processedMessages).toStrictEqual([ @@ -60,7 +54,7 @@ describe('Class: BatchProcessor', () => { ]); }); - test('Batch processing SQS records with some failures', async () => { + test('Batch processing SQS records with some failures', () => { // Prepare const firstRecord = sqsRecordFactory('failure'); const secondRecord = sqsRecordFactory('success'); @@ -70,7 +64,7 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, sqsRecordHandler); - const processedMessages = await processor.process(); + const processedMessages = processor.process(); // Assess expect(processedMessages[1]).toStrictEqual([ @@ -87,7 +81,7 @@ describe('Class: BatchProcessor', () => { }); }); - test('Batch processing SQS records with all failures', async () => { + test('Batch processing SQS records with all failures', () => { // Prepare const firstRecord = sqsRecordFactory('failure'); const secondRecord = sqsRecordFactory('failure'); @@ -98,79 +92,12 @@ describe('Class: BatchProcessor', () => { // Act & Assess processor.register(records, sqsRecordHandler); - await expect(processor.process()).rejects.toThrowError( - BatchProcessingError - ); - }); - }); - - describe('Asynchronously processing SQS Records', () => { - test('Batch processing SQS records with no failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS); - - // Act - processor.register(records, asyncSqsRecordHandler); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.body, firstRecord], - ['success', secondRecord.body, secondRecord], - ]); - }); - - test('Batch processing SQS records with some failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('failure'); - const secondRecord = sqsRecordFactory('success'); - const thirdRecord = sqsRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.SQS); - - // Act - processor.register(records, asyncSqsRecordHandler); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.body, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.messageId }, - { itemIdentifier: thirdRecord.messageId }, - ], - }); - }); - - test('Batch processing SQS records with all failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('failure'); - const secondRecord = sqsRecordFactory('failure'); - const thirdRecord = sqsRecordFactory('fail'); - - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.SQS); - - // Act - processor.register(records, asyncSqsRecordHandler); - - // Assess - await expect(processor.process()).rejects.toThrowError( - BatchProcessingError - ); + expect(() => processor.process()).toThrowError(BatchProcessingError); }); }); describe('Synchronously processing Kinesis Records', () => { - test('Batch processing Kinesis records with no failures', async () => { + test('Batch processing Kinesis records with no failures', () => { // Prepare const firstRecord = kinesisRecordFactory('success'); const secondRecord = kinesisRecordFactory('success'); @@ -179,7 +106,7 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, kinesisRecordHandler); - const processedMessages = await processor.process(); + const processedMessages = processor.process(); // Assess expect(processedMessages).toStrictEqual([ @@ -188,7 +115,7 @@ describe('Class: BatchProcessor', () => { ]); }); - test('Batch processing Kinesis records with some failures', async () => { + test('Batch processing Kinesis records with some failures', () => { // Prepare const firstRecord = kinesisRecordFactory('failure'); const secondRecord = kinesisRecordFactory('success'); @@ -198,7 +125,7 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, kinesisRecordHandler); - const processedMessages = await processor.process(); + const processedMessages = processor.process(); // Assess expect(processedMessages[1]).toStrictEqual([ @@ -215,7 +142,7 @@ describe('Class: BatchProcessor', () => { }); }); - test('Batch processing Kinesis records with all failures', async () => { + test('Batch processing Kinesis records with all failures', () => { const firstRecord = kinesisRecordFactory('failure'); const secondRecord = kinesisRecordFactory('failure'); const thirdRecord = kinesisRecordFactory('fail'); @@ -227,79 +154,12 @@ describe('Class: BatchProcessor', () => { processor.register(records, kinesisRecordHandler); // Assess - await expect(processor.process()).rejects.toThrowError( - BatchProcessingError - ); - }); - }); - - describe('Asynchronously processing Kinesis Records', () => { - test('Batch processing Kinesis records with no failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('success'); - const secondRecord = kinesisRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams); - - // Act - processor.register(records, asyncKinesisRecordHandler); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.kinesis.data, firstRecord], - ['success', secondRecord.kinesis.data, secondRecord], - ]); - }); - - test('Batch processing Kinesis records with some failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('failure'); - const secondRecord = kinesisRecordFactory('success'); - const thirdRecord = kinesisRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams); - - // Act - processor.register(records, asyncKinesisRecordHandler); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.kinesis.data, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.kinesis.sequenceNumber }, - { itemIdentifier: thirdRecord.kinesis.sequenceNumber }, - ], - }); - }); - - test('Batch processing Kinesis records with all failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('failure'); - const secondRecord = kinesisRecordFactory('failure'); - const thirdRecord = kinesisRecordFactory('fail'); - - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams); - - // Act - processor.register(records, asyncKinesisRecordHandler); - - // Assess - await expect(processor.process()).rejects.toThrowError( - BatchProcessingError - ); + expect(() => processor.process()).toThrowError(BatchProcessingError); }); }); describe('Synchronously processing DynamoDB Records', () => { - test('Batch processing DynamoDB records with no failures', async () => { + test('Batch processing DynamoDB records with no failures', () => { // Prepare const firstRecord = dynamodbRecordFactory('success'); const secondRecord = dynamodbRecordFactory('success'); @@ -308,7 +168,7 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, dynamodbRecordHandler); - const processedMessages = await processor.process(); + const processedMessages = processor.process(); // Assess expect(processedMessages).toStrictEqual([ @@ -317,7 +177,7 @@ describe('Class: BatchProcessor', () => { ]); }); - test('Batch processing DynamoDB records with some failures', async () => { + test('Batch processing DynamoDB records with some failures', () => { // Prepare const firstRecord = dynamodbRecordFactory('failure'); const secondRecord = dynamodbRecordFactory('success'); @@ -327,7 +187,7 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, dynamodbRecordHandler); - const processedMessages = await processor.process(); + const processedMessages = processor.process(); // Assess expect(processedMessages[1]).toStrictEqual([ @@ -344,7 +204,7 @@ describe('Class: BatchProcessor', () => { }); }); - test('Batch processing DynamoDB records with all failures', async () => { + test('Batch processing DynamoDB records with all failures', () => { // Prepare const firstRecord = dynamodbRecordFactory('failure'); const secondRecord = dynamodbRecordFactory('failure'); @@ -357,79 +217,12 @@ describe('Class: BatchProcessor', () => { processor.register(records, dynamodbRecordHandler); // Assess - await expect(processor.process()).rejects.toThrowError( - BatchProcessingError - ); - }); - }); - - describe('Asynchronously processing DynamoDB Records', () => { - test('Batch processing DynamoDB records with no failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('success'); - const secondRecord = dynamodbRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams); - - // Act - processor.register(records, asyncDynamodbRecordHandler); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.dynamodb?.NewImage?.Message, firstRecord], - ['success', secondRecord.dynamodb?.NewImage?.Message, secondRecord], - ]); - }); - - test('Batch processing DynamoDB records with some failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('failure'); - const secondRecord = dynamodbRecordFactory('success'); - const thirdRecord = dynamodbRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams); - - // Act - processor.register(records, asyncDynamodbRecordHandler); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.dynamodb?.NewImage?.Message, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.dynamodb?.SequenceNumber }, - { itemIdentifier: thirdRecord.dynamodb?.SequenceNumber }, - ], - }); - }); - - test('Batch processing DynamoDB records with all failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('failure'); - const secondRecord = dynamodbRecordFactory('failure'); - const thirdRecord = dynamodbRecordFactory('fail'); - - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams); - - // Act - processor.register(records, asyncDynamodbRecordHandler); - - // Assess - await expect(processor.process()).rejects.toThrowError( - BatchProcessingError - ); + expect(() => processor.process()).toThrowError(BatchProcessingError); }); }); describe('Batch processing with Lambda context', () => { - test('Batch processing when context is provided and handler accepts', async () => { + test('Batch processing when context is provided and handler accepts', () => { // Prepare const firstRecord = sqsRecordFactory('success'); const secondRecord = sqsRecordFactory('success'); @@ -438,7 +231,7 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, handlerWithContext, options); - const processedMessages = await processor.process(); + const processedMessages = processor.process(); // Assess expect(processedMessages).toStrictEqual([ @@ -447,7 +240,7 @@ describe('Class: BatchProcessor', () => { ]); }); - test('Batch processing when context is provided and handler does not accept', async () => { + test('Batch processing when context is provided and handler does not accept', () => { // Prepare const firstRecord = sqsRecordFactory('success'); const secondRecord = sqsRecordFactory('success'); @@ -456,7 +249,7 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, sqsRecordHandler, options); - const processedMessages = await processor.process(); + const processedMessages = processor.process(); // Assess expect(processedMessages).toStrictEqual([ @@ -465,7 +258,7 @@ describe('Class: BatchProcessor', () => { ]); }); - test('Batch processing when malformed context is provided and handler attempts to use', async () => { + test('Batch processing when malformed context is provided and handler attempts to use', () => { // Prepare const firstRecord = sqsRecordFactory('success'); const secondRecord = sqsRecordFactory('success'); @@ -476,9 +269,17 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, handlerWithContext, badOptions); - await expect(processor.process()).rejects.toThrowError( - BatchProcessingError - ); + expect(() => processor.process()).toThrowError(BatchProcessingError); }); }); + + test('When calling the async process method, it should throw an error', async () => { + // Prepare + const processor = new BatchProcessor(EventType.SQS); + + // Act & Assess + await expect(() => processor.asyncProcess()).rejects.toThrow( + 'Not implemented. Use process() instead.' + ); + }); }); diff --git a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts index b49e3fbdf1..564886b1d8 100644 --- a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts +++ b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts @@ -3,13 +3,9 @@ * * @group unit/batch/class/sqsfifobatchprocessor */ - import { SqsFifoPartialProcessor, processPartialResponse } from '../../src'; -import { sqsRecordFactory } from '../../tests/helpers/factories'; -import { - asyncSqsRecordHandler, - sqsRecordHandler, -} from '../../tests/helpers/handlers'; +import { sqsRecordFactory } from '../helpers/factories'; +import { sqsRecordHandler } from '../helpers/handlers'; describe('Class: SqsFifoBatchProcessor', () => { const ENVIRONMENT_VARIABLES = process.env; @@ -25,7 +21,7 @@ describe('Class: SqsFifoBatchProcessor', () => { }); describe('Synchronous SQS FIFO batch processing', () => { - test('SQS FIFO Batch processor with no failures', async () => { + test('SQS FIFO Batch processor with no failures', () => { // Prepare const firstRecord = sqsRecordFactory('success'); const secondRecord = sqsRecordFactory('success'); @@ -33,17 +29,13 @@ describe('Class: SqsFifoBatchProcessor', () => { const processor = new SqsFifoPartialProcessor(); // Act - const result = await processPartialResponse( - event, - sqsRecordHandler, - processor - ); + const result = processPartialResponse(event, sqsRecordHandler, processor); // Assess expect(result['batchItemFailures']).toStrictEqual([]); }); - test('SQS FIFO Batch processor with failures', async () => { + test('SQS FIFO Batch processor with failures', () => { // Prepare const firstRecord = sqsRecordFactory('success'); const secondRecord = sqsRecordFactory('fail'); @@ -52,56 +44,7 @@ describe('Class: SqsFifoBatchProcessor', () => { const processor = new SqsFifoPartialProcessor(); // Act - const result = await processPartialResponse( - event, - sqsRecordHandler, - processor - ); - - // Assess - expect(result['batchItemFailures'].length).toBe(2); - expect(result['batchItemFailures'][0]['itemIdentifier']).toBe( - secondRecord.messageId - ); - expect(result['batchItemFailures'][1]['itemIdentifier']).toBe( - thirdRecord.messageId - ); - }); - }); - - describe('Asynchronous SQS FIFO batch processing', () => { - test('SQS FIFO Batch processor with no failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('success'); - const event = { Records: [firstRecord, secondRecord] }; - const processor = new SqsFifoPartialProcessor(); - - // Act - const result = await processPartialResponse( - event, - asyncSqsRecordHandler, - processor - ); - - // Assess - expect(result['batchItemFailures']).toStrictEqual([]); - }); - - test('SQS FIFO Batch processor with failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('fail'); - const thirdRecord = sqsRecordFactory('success'); - const event = { Records: [firstRecord, secondRecord, thirdRecord] }; - const processor = new SqsFifoPartialProcessor(); - - // Act - const result = await processPartialResponse( - event, - asyncSqsRecordHandler, - processor - ); + const result = processPartialResponse(event, sqsRecordHandler, processor); // Assess expect(result['batchItemFailures'].length).toBe(2); diff --git a/packages/batch/tests/unit/asyncProcessPartialResponse.test.ts b/packages/batch/tests/unit/asyncProcessPartialResponse.test.ts new file mode 100644 index 0000000000..fde15ccf42 --- /dev/null +++ b/packages/batch/tests/unit/asyncProcessPartialResponse.test.ts @@ -0,0 +1,231 @@ +/** + * Test asyncProcessPartialResponse function + * + * @group unit/batch/function/asyncProcesspartialresponse + */ +import type { + Context, + DynamoDBStreamEvent, + KinesisStreamEvent, + SQSEvent, +} from 'aws-lambda'; +import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts'; +import { Custom as dummyEvent } from '../../../commons/src/samples/resources/events'; +import { AsyncBatchProcessor, asyncProcessPartialResponse } from '../../src'; +import { EventType } from '../../src/constants'; +import type { + BatchProcessingOptions, + PartialItemFailureResponse, +} from '../../src/types'; +import { + dynamodbRecordFactory, + kinesisRecordFactory, + sqsRecordFactory, +} from '../helpers/factories'; +import { + asyncDynamodbRecordHandler, + asyncHandlerWithContext, + asyncKinesisRecordHandler, + asyncSqsRecordHandler, +} from '../helpers/handlers'; + +describe('Function: processPartialResponse()', () => { + const ENVIRONMENT_VARIABLES = process.env; + const context = dummyContext; + const options: BatchProcessingOptions = { context: dummyContext }; + + beforeEach(() => { + jest.clearAllMocks(); + jest.resetModules(); + process.env = { ...ENVIRONMENT_VARIABLES }; + }); + + afterAll(() => { + process.env = ENVIRONMENT_VARIABLES; + }); + + describe('Process partial response function call tests', () => { + test('Process partial response function call with asynchronous handler', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const batch = { Records: records }; + const processor = new AsyncBatchProcessor(EventType.SQS); + + // Act + const ret = await asyncProcessPartialResponse( + batch, + asyncSqsRecordHandler, + processor + ); + + // Assess + expect(ret).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response function call with context provided', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const batch = { Records: records }; + const processor = new AsyncBatchProcessor(EventType.SQS); + + // Act + const ret = await asyncProcessPartialResponse( + batch, + asyncHandlerWithContext, + processor, + options + ); + + // Assess + expect(ret).toStrictEqual({ batchItemFailures: [] }); + }); + }); + + describe('Process partial response function call through handler', () => { + test('Process partial response through handler with SQS event', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const processor = new AsyncBatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return asyncProcessPartialResponse( + event, + asyncSqsRecordHandler, + processor + ); + }; + + // Act + const result = await handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response through handler with Kinesis event', async () => { + // Prepare + const records = [ + kinesisRecordFactory('success'), + kinesisRecordFactory('success'), + ]; + const processor = new AsyncBatchProcessor(EventType.KinesisDataStreams); + const event: KinesisStreamEvent = { Records: records }; + + const handler = async ( + event: KinesisStreamEvent, + _context: Context + ): Promise => { + return await asyncProcessPartialResponse( + event, + asyncKinesisRecordHandler, + processor + ); + }; + + // Act + const result = await handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response through handler with DynamoDB event', async () => { + // Prepare + const records = [ + dynamodbRecordFactory('success'), + dynamodbRecordFactory('success'), + ]; + const processor = new AsyncBatchProcessor(EventType.DynamoDBStreams); + const event: DynamoDBStreamEvent = { Records: records }; + + const handler = async ( + event: DynamoDBStreamEvent, + _context: Context + ): Promise => { + return await asyncProcessPartialResponse( + event, + asyncDynamodbRecordHandler, + processor + ); + }; + + // Act + const result = await handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response through handler for SQS records with incorrect event type', async () => { + // Prepare + const processor = new AsyncBatchProcessor(EventType.SQS); + const event = dummyEvent; + const eventTypes: string = Object.values(EventType).toString(); + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return await asyncProcessPartialResponse( + event, + asyncSqsRecordHandler, + processor + ); + }; + + // Act & Assess + await expect(() => + handler(event as unknown as SQSEvent, context) + ).rejects.toThrowError( + new Error( + 'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' + + eventTypes + + ' event.' + ) + ); + }); + + test('Process partial response through handler with context provided', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const processor = new AsyncBatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + context: Context + ): Promise => { + const options: BatchProcessingOptions = { context: context }; + + return await asyncProcessPartialResponse( + event, + asyncHandlerWithContext, + processor, + options + ); + }; + + // Act + const result = await handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + }); +}); diff --git a/packages/batch/tests/unit/processPartialResponse.test.ts b/packages/batch/tests/unit/processPartialResponse.test.ts index 1d3c4fb844..3de2edcce3 100644 --- a/packages/batch/tests/unit/processPartialResponse.test.ts +++ b/packages/batch/tests/unit/processPartialResponse.test.ts @@ -3,34 +3,31 @@ * * @group unit/batch/function/processpartialresponse */ - -import { +import type { Context, DynamoDBStreamEvent, KinesisStreamEvent, SQSEvent, } from 'aws-lambda'; -import { +import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts'; +import { Custom as dummyEvent } from '../../../commons/src/samples/resources/events'; +import { BatchProcessor, processPartialResponse } from '../../src'; +import { EventType } from '../../src/constants'; +import type { BatchProcessingOptions, - BatchProcessor, - EventType, PartialItemFailureResponse, - processPartialResponse, -} from '../../src'; +} from '../../src/types'; import { dynamodbRecordFactory, kinesisRecordFactory, sqsRecordFactory, -} from '../../tests/helpers/factories'; +} from '../helpers/factories'; import { - asyncSqsRecordHandler, dynamodbRecordHandler, handlerWithContext, kinesisRecordHandler, sqsRecordHandler, -} from '../../tests/helpers/handlers'; -import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts'; -import { Custom as dummyEvent } from '../../../commons/src/samples/resources/events'; +} from '../helpers/handlers'; describe('Function: processPartialResponse()', () => { const ENVIRONMENT_VARIABLES = process.env; @@ -48,27 +45,7 @@ describe('Function: processPartialResponse()', () => { }); describe('Process partial response function call tests', () => { - test('Process partial response function call with synchronous handler', async () => { - // Prepare - const records = [ - sqsRecordFactory('success'), - sqsRecordFactory('success'), - ]; - const batch = { Records: records }; - const processor = new BatchProcessor(EventType.SQS); - - // Act - const ret = await processPartialResponse( - batch, - sqsRecordHandler, - processor - ); - - // Assess - expect(ret).toStrictEqual({ batchItemFailures: [] }); - }); - - test('Process partial response function call with asynchronous handler', async () => { + test('Process partial response function call with synchronous handler', () => { // Prepare const records = [ sqsRecordFactory('success'), @@ -78,17 +55,13 @@ describe('Function: processPartialResponse()', () => { const processor = new BatchProcessor(EventType.SQS); // Act - const ret = await processPartialResponse( - batch, - asyncSqsRecordHandler, - processor - ); + const ret = processPartialResponse(batch, sqsRecordHandler, processor); // Assess expect(ret).toStrictEqual({ batchItemFailures: [] }); }); - test('Process partial response function call with context provided', async () => { + test('Process partial response function call with context provided', () => { // Prepare const records = [ sqsRecordFactory('success'), @@ -98,7 +71,7 @@ describe('Function: processPartialResponse()', () => { const processor = new BatchProcessor(EventType.SQS); // Act - const ret = await processPartialResponse( + const ret = processPartialResponse( batch, handlerWithContext, processor, @@ -111,7 +84,7 @@ describe('Function: processPartialResponse()', () => { }); describe('Process partial response function call through handler', () => { - test('Process partial response through handler with SQS event', async () => { + test('Process partial response through handler with SQS event', () => { // Prepare const records = [ sqsRecordFactory('success'), @@ -120,21 +93,21 @@ describe('Function: processPartialResponse()', () => { const processor = new BatchProcessor(EventType.SQS); const event: SQSEvent = { Records: records }; - const handler = async ( + const handler = ( event: SQSEvent, _context: Context - ): Promise => { - return await processPartialResponse(event, sqsRecordHandler, processor); + ): PartialItemFailureResponse => { + return processPartialResponse(event, sqsRecordHandler, processor); }; // Act - const result = await handler(event, context); + const result = handler(event, context); // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); }); - test('Process partial response through handler with Kinesis event', async () => { + test('Process partial response through handler with Kinesis event', () => { // Prepare const records = [ kinesisRecordFactory('success'), @@ -143,25 +116,21 @@ describe('Function: processPartialResponse()', () => { const processor = new BatchProcessor(EventType.KinesisDataStreams); const event: KinesisStreamEvent = { Records: records }; - const handler = async ( + const handler = ( event: KinesisStreamEvent, _context: Context - ): Promise => { - return await processPartialResponse( - event, - kinesisRecordHandler, - processor - ); + ): PartialItemFailureResponse => { + return processPartialResponse(event, kinesisRecordHandler, processor); }; // Act - const result = await handler(event, context); + const result = handler(event, context); // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); }); - test('Process partial response through handler with DynamoDB event', async () => { + test('Process partial response through handler with DynamoDB event', () => { // Prepare const records = [ dynamodbRecordFactory('success'), @@ -170,41 +139,35 @@ describe('Function: processPartialResponse()', () => { const processor = new BatchProcessor(EventType.DynamoDBStreams); const event: DynamoDBStreamEvent = { Records: records }; - const handler = async ( + const handler = ( event: DynamoDBStreamEvent, _context: Context - ): Promise => { - return await processPartialResponse( - event, - dynamodbRecordHandler, - processor - ); + ): PartialItemFailureResponse => { + return processPartialResponse(event, dynamodbRecordHandler, processor); }; // Act - const result = await handler(event, context); + const result = handler(event, context); // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); }); - test('Process partial response through handler for SQS records with incorrect event type', async () => { + test('Process partial response through handler for SQS records with incorrect event type', () => { // Prepare const processor = new BatchProcessor(EventType.SQS); const event = dummyEvent; const eventTypes: string = Object.values(EventType).toString(); - const handler = async ( + const handler = ( event: SQSEvent, _context: Context - ): Promise => { - return await processPartialResponse(event, sqsRecordHandler, processor); + ): PartialItemFailureResponse => { + return processPartialResponse(event, sqsRecordHandler, processor); }; // Act & Assess - await expect( - handler(event as unknown as SQSEvent, context) - ).rejects.toThrowError( + expect(() => handler(event as unknown as SQSEvent, context)).toThrowError( new Error( 'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' + eventTypes + @@ -213,7 +176,7 @@ describe('Function: processPartialResponse()', () => { ); }); - test('Process partial response through handler with context provided', async () => { + test('Process partial response through handler with context provided', () => { // Prepare const records = [ sqsRecordFactory('success'), @@ -222,13 +185,13 @@ describe('Function: processPartialResponse()', () => { const processor = new BatchProcessor(EventType.SQS); const event: SQSEvent = { Records: records }; - const handler = async ( + const handler = ( event: SQSEvent, - _context: Context - ): Promise => { - const options: BatchProcessingOptions = { context: _context }; + context: Context + ): PartialItemFailureResponse => { + const options: BatchProcessingOptions = { context: context }; - return await processPartialResponse( + return processPartialResponse( event, handlerWithContext, processor, @@ -237,7 +200,7 @@ describe('Function: processPartialResponse()', () => { }; // Act - const result = await handler(event, context); + const result = handler(event, context); // Assess expect(result).toStrictEqual({ batchItemFailures: [] });