Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): add async processor #1616

Merged
merged 3 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions packages/batch/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@aws-lambda-powertools/batch",
"version": "1.10.0",
"version": "1.11.1",
"description": "The batch processing package for the Powertools for AWS Lambda (TypeScript) library.",
"author": {
"name": "Amazon Web Services",
Expand All @@ -22,8 +22,7 @@
"prepack": "node ../../.github/scripts/release_patch_package_json.js ."
},
"lint-staged": {
"*.ts": "npm run lint-fix",
"*.js": "npm run lint-fix"
"*.{js,ts}": "npm run lint-fix"
},
"homepage": "https://github.com/aws-powertools/powertools-lambda-typescript/tree/main/packages/batch#readme",
"license": "MIT-0",
Expand All @@ -50,4 +49,4 @@
"nodejs"
],
"devDependencies": {}
}
}
31 changes: 31 additions & 0 deletions packages/batch/src/AsyncBatchProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor';
import type { BaseRecord, FailureResponse, SuccessResponse } from './types';

/**
* Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB
*/
class AsyncBatchProcessor extends BasePartialBatchProcessor {
public async asyncProcessRecord(
record: BaseRecord
): Promise<SuccessResponse | FailureResponse> {
try {
const data = this.toBatchType(record, this.eventType);
const result = await this.handler(data, this.options);

return this.successHandler(record, result);
} catch (error) {
return this.failureHandler(record, error as Error);
}
}

/**
* Process a record with instance's handler
* @param record Batch record to be processed
* @returns response of success or failure
*/
public processRecord(_record: BaseRecord): SuccessResponse | FailureResponse {
throw new Error('Not implemented. Use asyncProcess() instead.');
}
}

export { AsyncBatchProcessor };
34 changes: 15 additions & 19 deletions packages/batch/src/BasePartialBatchProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
/**
* Process batch and partially report failed items
*/
import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda';
import {
BasePartialProcessor,
BatchProcessingError,
DATA_CLASS_MAPPING,
DEFAULT_RESPONSE,
import type {
DynamoDBRecord,
KinesisStreamRecord,
SQSRecord,
} from 'aws-lambda';
import { BasePartialProcessor } from './BasePartialProcessor';
import { DATA_CLASS_MAPPING, DEFAULT_RESPONSE, EventType } from './constants';
import { BatchProcessingError } from './errors';
import type {
EventSourceDataClassTypes,
EventType,
PartialItemFailures,
PartialItemFailureResponse,
} from '.';
PartialItemFailures,
} from './types';

/**
* Process batch and partially report failed items
*/
abstract class BasePartialBatchProcessor extends BasePartialProcessor {
public COLLECTOR_MAPPING;

Expand Down Expand Up @@ -124,13 +126,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
* @returns true if any records resulted in exception
*/
public hasMessagesToReport(): boolean {
if (this.failureMessages.length != 0) {
return true;
}

// console.debug('All ' + this.successMessages.length + ' records successfully processed');

return false;
return this.failureMessages.length != 0;
}

/**
Expand Down
58 changes: 49 additions & 9 deletions packages/batch/src/BasePartialProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
/**
* Abstract class for batch processors
*/
import {
import type {
BaseRecord,
BatchProcessingOptions,
EventSourceDataClassTypes,
FailureResponse,
ResultType,
SuccessResponse,
} from '.';
} from './types';

/**
* Abstract class for batch processors.
*/
abstract class BasePartialProcessor {
public exceptions: Error[];

Expand All @@ -34,6 +34,40 @@ abstract class BasePartialProcessor {
this.handler = new Function();
}

/**
* Call instance's handler for each record
* @returns List of processed records
*/
public async asyncProcess(): Promise<(SuccessResponse | FailureResponse)[]> {
/**
* If this is an sync processor, user should have called process instead,
* so we call the method early to throw the error early thus failing fast.
*/
if (this.constructor.name === 'BatchProcessor') {
await this.asyncProcessRecord(this.records[0]);
}
this.prepare();

const processingPromises: Promise<SuccessResponse | FailureResponse>[] =
this.records.map((record) => this.asyncProcessRecord(record));

const processedRecords: (SuccessResponse | FailureResponse)[] =
await Promise.all(processingPromises);

this.clean();

return processedRecords;
}

/**
* Process a record with an asyncronous handler
*
* @param record Record to be processed
*/
public abstract asyncProcessRecord(
record: BaseRecord
): Promise<SuccessResponse | FailureResponse>;

/**
* Clean class instance after processing
*/
Expand All @@ -50,7 +84,6 @@ abstract class BasePartialProcessor {
exception: Error
): FailureResponse {
const entry: FailureResponse = ['fail', exception.message, record];
// console.debug('Record processing exception: ' + exception.message);
this.exceptions.push(exception);
this.failureMessages.push(record);

Expand All @@ -66,12 +99,19 @@ abstract class BasePartialProcessor {
* Call instance's handler for each record
* @returns List of processed records
*/
public async process(): Promise<(SuccessResponse | FailureResponse)[]> {
public process(): (SuccessResponse | FailureResponse)[] {
/**
* If this is an async processor, user should have called processAsync instead,
* so we call the method early to throw the error early thus failing fast.
*/
if (this.constructor.name === 'AsyncBatchProcessor') {
this.processRecord(this.records[0]);
}
this.prepare();

const processedRecords: (SuccessResponse | FailureResponse)[] = [];
for (const record of this.records) {
processedRecords.push(await this.processRecord(record));
processedRecords.push(this.processRecord(record));
}

this.clean();
Expand All @@ -85,7 +125,7 @@ abstract class BasePartialProcessor {
*/
public abstract processRecord(
record: BaseRecord
): Promise<SuccessResponse | FailureResponse>;
): SuccessResponse | FailureResponse;

/**
* Set class instance attributes before execution
Expand Down
27 changes: 13 additions & 14 deletions packages/batch/src/BatchProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor';
import type { BaseRecord, FailureResponse, SuccessResponse } from './types';

/**
* Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB
*/
import {
BasePartialBatchProcessor,
BaseRecord,
FailureResponse,
SuccessResponse,
} from '.';

class BatchProcessor extends BasePartialBatchProcessor {
public async asyncProcessRecord(
_record: BaseRecord
): Promise<SuccessResponse | FailureResponse> {
throw new Error('Not implemented. Use process() instead.');
}

/**
* Process a record with instance's handler
* @param record Batch record to be processed
* @returns response of success or failure
*/
public async processRecord(
record: BaseRecord
): Promise<SuccessResponse | FailureResponse> {
public processRecord(record: BaseRecord): SuccessResponse | FailureResponse {
try {
const data = this.toBatchType(record, this.eventType);

const result = await this.handler(data, this.options);
const result = this.handler(data, this.options);

return this.successHandler(record, result);
} catch (e) {
return this.failureHandler(record, e as Error);
} catch (error) {
return this.failureHandler(record, error as Error);
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions packages/batch/src/SqsFifoPartialProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { BatchProcessor, EventType, FailureResponse, SuccessResponse } from '.';
import { BatchProcessor } from './BatchProcessor';
import { EventType } from './constants';
import type { FailureResponse, SuccessResponse } from './types';

/**
* Process native partial responses from SQS FIFO queues
Expand All @@ -14,9 +16,8 @@ class SqsFifoPartialProcessor extends BatchProcessor {
* Call instance's handler for each record.
* When the first failed message is detected, the process is short-circuited
* And the remaining messages are reported as failed items
* TODO: change to synchronous execution if possible
*/
public async process(): Promise<(SuccessResponse | FailureResponse)[]> {
public process(): (SuccessResponse | FailureResponse)[] {
this.prepare();

const processedRecords: (SuccessResponse | FailureResponse)[] = [];
Expand All @@ -28,7 +29,7 @@ class SqsFifoPartialProcessor extends BatchProcessor {
return this.shortCircuitProcessing(currentIndex, processedRecords);
}

processedRecords.push(await this.processRecord(record));
processedRecords.push(this.processRecord(record));
currentIndex++;
}

Expand Down
38 changes: 38 additions & 0 deletions packages/batch/src/asyncProcessPartialResponse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor';
import { EventType } from './constants';
import type {
BaseRecord,
BatchProcessingOptions,
PartialItemFailureResponse,
} from './types';

/**
* Higher level function to handle batch event processing
* @param event Lambda's original event
* @param recordHandler Callable function to process each record from the batch
* @param processor Batch processor to handle partial failure cases
* @returns Lambda Partial Batch Response
*/
const asyncProcessPartialResponse = async (
event: { Records: BaseRecord[] },
recordHandler: CallableFunction,
processor: BasePartialBatchProcessor,
options?: BatchProcessingOptions
): Promise<PartialItemFailureResponse> => {
if (!event.Records) {
const eventTypes: string = Object.values(EventType).toString();
throw new Error(
'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' +
eventTypes +
' event.'
);
}

processor.register(event.Records, recordHandler, options);

await processor.asyncProcess();

return processor.response();
};

export { asyncProcessPartialResponse };
8 changes: 4 additions & 4 deletions packages/batch/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/**
dreamorosi marked this conversation as resolved.
Show resolved Hide resolved
* Constants for batch processor classes
*/
import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda';
import type { PartialItemFailureResponse, EventSourceDataClassTypes } from '.';
import type {
PartialItemFailureResponse,
EventSourceDataClassTypes,
} from './types';

const EventType = {
SQS: 'SQS',
Expand Down
4 changes: 0 additions & 4 deletions packages/batch/src/errors.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
/**
* Batch processing exceptions
*/

/**
* Base error type for batch processing
* All errors thrown by major failures extend this base class
Expand Down
2 changes: 2 additions & 0 deletions packages/batch/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@ export * from './types';
export * from './BasePartialProcessor';
export * from './BasePartialBatchProcessor';
export * from './BatchProcessor';
export * from './AsyncBatchProcessor';
export * from './processPartialResponse';
export * from './asyncProcessPartialResponse';
export * from './SqsFifoPartialProcessor';
14 changes: 7 additions & 7 deletions packages/batch/src/processPartialResponse.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import {
BasePartialBatchProcessor,
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor';
import { EventType } from './constants';
import type {
BaseRecord,
BatchProcessingOptions,
EventType,
PartialItemFailureResponse,
} from '.';
} from './types';

/**
* Higher level function to handle batch event processing
Expand All @@ -13,12 +13,12 @@ import {
* @param processor Batch processor to handle partial failure cases
* @returns Lambda Partial Batch Response
*/
const processPartialResponse = async (
const processPartialResponse = (
event: { Records: BaseRecord[] },
recordHandler: CallableFunction,
processor: BasePartialBatchProcessor,
options?: BatchProcessingOptions
): Promise<PartialItemFailureResponse> => {
): PartialItemFailureResponse => {
if (!event.Records) {
const eventTypes: string = Object.values(EventType).toString();
throw new Error(
Expand All @@ -30,7 +30,7 @@ const processPartialResponse = async (

processor.register(event.Records, recordHandler, options);

await processor.process();
processor.process();

return processor.response();
};
Expand Down
4 changes: 2 additions & 2 deletions packages/batch/src/types.ts
dreamorosi marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ type RecordValue = unknown;
type BaseRecord = { [key: string]: RecordValue } | EventSourceDataClassTypes;

type ResultType = unknown;
type SuccessResponse = [string, ResultType, EventSourceDataClassTypes];
type SuccessResponse = ['success', ResultType, EventSourceDataClassTypes];

type FailureResponse = [string, string, EventSourceDataClassTypes];
type FailureResponse = ['fail', string, EventSourceDataClassTypes];

type PartialItemFailures = { itemIdentifier: string };
type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] };
Expand Down
Loading