Skip to content

Commit

Permalink
feat(batch): add async processor (#1616)
Browse files Browse the repository at this point in the history
* feat(batch): add async processor

* tests: improved unit tests

* chore: removed docstring + edited test handler
  • Loading branch information
dreamorosi committed Jul 19, 2023
1 parent 49bf172 commit 4e6c0d2
Show file tree
Hide file tree
Showing 19 changed files with 819 additions and 461 deletions.
7 changes: 3 additions & 4 deletions packages/batch/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@aws-lambda-powertools/batch",
"version": "1.10.0",
"version": "1.11.1",
"description": "The batch processing package for the Powertools for AWS Lambda (TypeScript) library.",
"author": {
"name": "Amazon Web Services",
Expand All @@ -22,8 +22,7 @@
"prepack": "node ../../.github/scripts/release_patch_package_json.js ."
},
"lint-staged": {
"*.ts": "npm run lint-fix",
"*.js": "npm run lint-fix"
"*.{js,ts}": "npm run lint-fix"
},
"homepage": "https://github.com/aws-powertools/powertools-lambda-typescript/tree/main/packages/batch#readme",
"license": "MIT-0",
Expand All @@ -50,4 +49,4 @@
"nodejs"
],
"devDependencies": {}
}
}
31 changes: 31 additions & 0 deletions packages/batch/src/AsyncBatchProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor';
import type { BaseRecord, FailureResponse, SuccessResponse } from './types';

/**
* Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB
*/
class AsyncBatchProcessor extends BasePartialBatchProcessor {
public async asyncProcessRecord(
record: BaseRecord
): Promise<SuccessResponse | FailureResponse> {
try {
const data = this.toBatchType(record, this.eventType);
const result = await this.handler(data, this.options);

return this.successHandler(record, result);
} catch (error) {
return this.failureHandler(record, error as Error);
}
}

/**
* Process a record with instance's handler
* @param record Batch record to be processed
* @returns response of success or failure
*/
public processRecord(_record: BaseRecord): SuccessResponse | FailureResponse {
throw new Error('Not implemented. Use asyncProcess() instead.');
}
}

export { AsyncBatchProcessor };
34 changes: 15 additions & 19 deletions packages/batch/src/BasePartialBatchProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
/**
* Process batch and partially report failed items
*/
import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda';
import {
BasePartialProcessor,
BatchProcessingError,
DATA_CLASS_MAPPING,
DEFAULT_RESPONSE,
import type {
DynamoDBRecord,
KinesisStreamRecord,
SQSRecord,
} from 'aws-lambda';
import { BasePartialProcessor } from './BasePartialProcessor';
import { DATA_CLASS_MAPPING, DEFAULT_RESPONSE, EventType } from './constants';
import { BatchProcessingError } from './errors';
import type {
EventSourceDataClassTypes,
EventType,
PartialItemFailures,
PartialItemFailureResponse,
} from '.';
PartialItemFailures,
} from './types';

/**
* Process batch and partially report failed items
*/
abstract class BasePartialBatchProcessor extends BasePartialProcessor {
public COLLECTOR_MAPPING;

Expand Down Expand Up @@ -124,13 +126,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
* @returns true if any records resulted in exception
*/
public hasMessagesToReport(): boolean {
if (this.failureMessages.length != 0) {
return true;
}

// console.debug('All ' + this.successMessages.length + ' records successfully processed');

return false;
return this.failureMessages.length != 0;
}

/**
Expand Down
58 changes: 49 additions & 9 deletions packages/batch/src/BasePartialProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
/**
* Abstract class for batch processors
*/
import {
import type {
BaseRecord,
BatchProcessingOptions,
EventSourceDataClassTypes,
FailureResponse,
ResultType,
SuccessResponse,
} from '.';
} from './types';

/**
* Abstract class for batch processors.
*/
abstract class BasePartialProcessor {
public exceptions: Error[];

Expand All @@ -34,6 +34,40 @@ abstract class BasePartialProcessor {
this.handler = new Function();
}

/**
* Call instance's handler for each record
* @returns List of processed records
*/
public async asyncProcess(): Promise<(SuccessResponse | FailureResponse)[]> {
/**
* If this is an sync processor, user should have called process instead,
* so we call the method early to throw the error early thus failing fast.
*/
if (this.constructor.name === 'BatchProcessor') {
await this.asyncProcessRecord(this.records[0]);
}
this.prepare();

const processingPromises: Promise<SuccessResponse | FailureResponse>[] =
this.records.map((record) => this.asyncProcessRecord(record));

const processedRecords: (SuccessResponse | FailureResponse)[] =
await Promise.all(processingPromises);

this.clean();

return processedRecords;
}

/**
* Process a record with an asyncronous handler
*
* @param record Record to be processed
*/
public abstract asyncProcessRecord(
record: BaseRecord
): Promise<SuccessResponse | FailureResponse>;

/**
* Clean class instance after processing
*/
Expand All @@ -50,7 +84,6 @@ abstract class BasePartialProcessor {
exception: Error
): FailureResponse {
const entry: FailureResponse = ['fail', exception.message, record];
// console.debug('Record processing exception: ' + exception.message);
this.exceptions.push(exception);
this.failureMessages.push(record);

Expand All @@ -66,12 +99,19 @@ abstract class BasePartialProcessor {
* Call instance's handler for each record
* @returns List of processed records
*/
public async process(): Promise<(SuccessResponse | FailureResponse)[]> {
public process(): (SuccessResponse | FailureResponse)[] {
/**
* If this is an async processor, user should have called processAsync instead,
* so we call the method early to throw the error early thus failing fast.
*/
if (this.constructor.name === 'AsyncBatchProcessor') {
this.processRecord(this.records[0]);
}
this.prepare();

const processedRecords: (SuccessResponse | FailureResponse)[] = [];
for (const record of this.records) {
processedRecords.push(await this.processRecord(record));
processedRecords.push(this.processRecord(record));
}

this.clean();
Expand All @@ -85,7 +125,7 @@ abstract class BasePartialProcessor {
*/
public abstract processRecord(
record: BaseRecord
): Promise<SuccessResponse | FailureResponse>;
): SuccessResponse | FailureResponse;

/**
* Set class instance attributes before execution
Expand Down
27 changes: 13 additions & 14 deletions packages/batch/src/BatchProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor';
import type { BaseRecord, FailureResponse, SuccessResponse } from './types';

/**
* Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB
*/
import {
BasePartialBatchProcessor,
BaseRecord,
FailureResponse,
SuccessResponse,
} from '.';

class BatchProcessor extends BasePartialBatchProcessor {
public async asyncProcessRecord(
_record: BaseRecord
): Promise<SuccessResponse | FailureResponse> {
throw new Error('Not implemented. Use process() instead.');
}

/**
* Process a record with instance's handler
* @param record Batch record to be processed
* @returns response of success or failure
*/
public async processRecord(
record: BaseRecord
): Promise<SuccessResponse | FailureResponse> {
public processRecord(record: BaseRecord): SuccessResponse | FailureResponse {
try {
const data = this.toBatchType(record, this.eventType);

const result = await this.handler(data, this.options);
const result = this.handler(data, this.options);

return this.successHandler(record, result);
} catch (e) {
return this.failureHandler(record, e as Error);
} catch (error) {
return this.failureHandler(record, error as Error);
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions packages/batch/src/SqsFifoPartialProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { BatchProcessor, EventType, FailureResponse, SuccessResponse } from '.';
import { BatchProcessor } from './BatchProcessor';
import { EventType } from './constants';
import type { FailureResponse, SuccessResponse } from './types';

/**
* Process native partial responses from SQS FIFO queues
Expand All @@ -14,9 +16,8 @@ class SqsFifoPartialProcessor extends BatchProcessor {
* Call instance's handler for each record.
* When the first failed message is detected, the process is short-circuited
* And the remaining messages are reported as failed items
* TODO: change to synchronous execution if possible
*/
public async process(): Promise<(SuccessResponse | FailureResponse)[]> {
public process(): (SuccessResponse | FailureResponse)[] {
this.prepare();

const processedRecords: (SuccessResponse | FailureResponse)[] = [];
Expand All @@ -28,7 +29,7 @@ class SqsFifoPartialProcessor extends BatchProcessor {
return this.shortCircuitProcessing(currentIndex, processedRecords);
}

processedRecords.push(await this.processRecord(record));
processedRecords.push(this.processRecord(record));
currentIndex++;
}

Expand Down
38 changes: 38 additions & 0 deletions packages/batch/src/asyncProcessPartialResponse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor';
import { EventType } from './constants';
import type {
BaseRecord,
BatchProcessingOptions,
PartialItemFailureResponse,
} from './types';

/**
* Higher level function to handle batch event processing
* @param event Lambda's original event
* @param recordHandler Callable function to process each record from the batch
* @param processor Batch processor to handle partial failure cases
* @returns Lambda Partial Batch Response
*/
const asyncProcessPartialResponse = async (
event: { Records: BaseRecord[] },
recordHandler: CallableFunction,
processor: BasePartialBatchProcessor,
options?: BatchProcessingOptions
): Promise<PartialItemFailureResponse> => {
if (!event.Records) {
const eventTypes: string = Object.values(EventType).toString();
throw new Error(
'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' +
eventTypes +
' event.'
);
}

processor.register(event.Records, recordHandler, options);

await processor.asyncProcess();

return processor.response();
};

export { asyncProcessPartialResponse };
8 changes: 4 additions & 4 deletions packages/batch/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/**
* Constants for batch processor classes
*/
import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda';
import type { PartialItemFailureResponse, EventSourceDataClassTypes } from '.';
import type {
PartialItemFailureResponse,
EventSourceDataClassTypes,
} from './types';

const EventType = {
SQS: 'SQS',
Expand Down
4 changes: 0 additions & 4 deletions packages/batch/src/errors.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
/**
* Batch processing exceptions
*/

/**
* Base error type for batch processing
* All errors thrown by major failures extend this base class
Expand Down
2 changes: 2 additions & 0 deletions packages/batch/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@ export * from './types';
export * from './BasePartialProcessor';
export * from './BasePartialBatchProcessor';
export * from './BatchProcessor';
export * from './AsyncBatchProcessor';
export * from './processPartialResponse';
export * from './asyncProcessPartialResponse';
export * from './SqsFifoPartialProcessor';
14 changes: 7 additions & 7 deletions packages/batch/src/processPartialResponse.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import {
BasePartialBatchProcessor,
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor';
import { EventType } from './constants';
import type {
BaseRecord,
BatchProcessingOptions,
EventType,
PartialItemFailureResponse,
} from '.';
} from './types';

/**
* Higher level function to handle batch event processing
Expand All @@ -13,12 +13,12 @@ import {
* @param processor Batch processor to handle partial failure cases
* @returns Lambda Partial Batch Response
*/
const processPartialResponse = async (
const processPartialResponse = (
event: { Records: BaseRecord[] },
recordHandler: CallableFunction,
processor: BasePartialBatchProcessor,
options?: BatchProcessingOptions
): Promise<PartialItemFailureResponse> => {
): PartialItemFailureResponse => {
if (!event.Records) {
const eventTypes: string = Object.values(EventType).toString();
throw new Error(
Expand All @@ -30,7 +30,7 @@ const processPartialResponse = async (

processor.register(event.Records, recordHandler, options);

await processor.process();
processor.process();

return processor.response();
};
Expand Down
Loading

0 comments on commit 4e6c0d2

Please sign in to comment.