-
Notifications
You must be signed in to change notification settings - Fork 139
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(batch): Implementation of base batch processing classes (#1588)
* chore: init workspace * chore: init workspace * Initial base class implementation * Added BatchProcessor implementation, attempted fix for async * Added unit tests * Refactoring unit tests * Lint fix, updated docstrings * Added response and identifier typings * test(idempotency): improve integration tests for utility (#1591) * docs: new name * chore: rename e2e files * tests(idempotency): expand integration tests * chore(idempotency): remove unreachable code * Removed unnecessary type casting * Moved exports for handlers and factories * Updated imports, refactored randomization in factories * Refactored EventType to be const instead of enum * Refactored and added documentation for errors * Removed debugging line * chore(ci): add canary to layer deployment (#1593) * docs(idempotency): write utility docs (#1592) * docs: base docs * wip * chore: added paths to snippets tsconfig * chore: added page to docs menu * docs(idempotency): utility docs * highlights * chore: remove CDK mention * build(internal): bump semver from 5.7.1 to 5.7.2 (#1594) Bumps [semver](https://github.com/npm/node-semver) from 5.7.1 to 5.7.2. - [Release notes](https://github.com/npm/node-semver/releases) - [Changelog](https://github.com/npm/node-semver/blob/v5.7.2/CHANGELOG.md) - [Commits](npm/node-semver@v5.7.1...v5.7.2) --- updated-dependencies: - dependency-name: semver dependency-type: indirect ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * chore(idempotency): mark the utility ready public beta (#1595) * chore(idempotency): mark utility as public beta * chore: manually increment version in commons * docs(internal): update AWS SDK links to new docs (#1597) * chore(maintenance): remove parameters utility from layer bundling and layers e2e tests (#1599) * remove parameter from e2e tests * remove parameters from canary stack as well * chore(release): v1.11.1 [skip ci] * fix canary deploy in ci with correct workspace name (#1601) * chore: update layer ARN on documentation --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: Andrea Amorosi <dreamorosi@gmail.com> Co-authored-by: Alexander Schueren <amelnyk@amazon.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Release bot[bot] <aws-devax-open-source@amazon.com>
- Loading branch information
1 parent
1c8c4aa
commit 76bd7b8
Showing
11 changed files
with
969 additions
and
8 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
/** | ||
* Process batch and partially report failed items | ||
*/ | ||
import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; | ||
import { | ||
BasePartialProcessor, | ||
BatchProcessingError, | ||
DATA_CLASS_MAPPING, | ||
DEFAULT_RESPONSE, | ||
EventSourceDataClassTypes, | ||
EventType, | ||
ItemIdentifier, | ||
BatchResponse, | ||
} from '.'; | ||
|
||
abstract class BasePartialBatchProcessor extends BasePartialProcessor { | ||
public COLLECTOR_MAPPING; | ||
|
||
public batchResponse: BatchResponse; | ||
|
||
public eventType: keyof typeof EventType; | ||
|
||
/** | ||
* Initializes base batch processing class | ||
* @param eventType Whether this is SQS, DynamoDB stream, or Kinesis data stream event | ||
*/ | ||
public constructor(eventType: keyof typeof EventType) { | ||
super(); | ||
this.eventType = eventType; | ||
this.batchResponse = DEFAULT_RESPONSE; | ||
this.COLLECTOR_MAPPING = { | ||
[EventType.SQS]: () => this.collectSqsFailures(), | ||
[EventType.KinesisDataStreams]: () => this.collectKinesisFailures(), | ||
[EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(), | ||
}; | ||
} | ||
|
||
/** | ||
* Report messages to be deleted in case of partial failures | ||
*/ | ||
public clean(): void { | ||
if (!this.hasMessagesToReport()) { | ||
return; | ||
} | ||
|
||
if (this.entireBatchFailed()) { | ||
throw new BatchProcessingError( | ||
'All records failed processing. ' + | ||
this.exceptions.length + | ||
' individual errors logged separately below.', | ||
this.exceptions | ||
); | ||
} | ||
|
||
const messages: ItemIdentifier[] = this.getMessagesToReport(); | ||
this.batchResponse = { batchItemFailures: messages }; | ||
} | ||
|
||
/** | ||
* Collects identifiers of failed items for a DynamoDB stream | ||
* @returns list of identifiers for failed items | ||
*/ | ||
public collectDynamoDBFailures(): ItemIdentifier[] { | ||
const failures: ItemIdentifier[] = []; | ||
|
||
for (const msg of this.failureMessages) { | ||
const msgId = (msg as DynamoDBRecord).dynamodb?.SequenceNumber; | ||
if (msgId) { | ||
failures.push({ itemIdentifier: msgId }); | ||
} | ||
} | ||
|
||
return failures; | ||
} | ||
|
||
/** | ||
* Collects identifiers of failed items for a Kinesis stream | ||
* @returns list of identifiers for failed items | ||
*/ | ||
public collectKinesisFailures(): ItemIdentifier[] { | ||
const failures: ItemIdentifier[] = []; | ||
|
||
for (const msg of this.failureMessages) { | ||
const msgId = (msg as KinesisStreamRecord).kinesis.sequenceNumber; | ||
failures.push({ itemIdentifier: msgId }); | ||
} | ||
|
||
return failures; | ||
} | ||
|
||
/** | ||
* Collects identifiers of failed items for an SQS batch | ||
* @returns list of identifiers for failed items | ||
*/ | ||
public collectSqsFailures(): ItemIdentifier[] { | ||
const failures: ItemIdentifier[] = []; | ||
|
||
for (const msg of this.failureMessages) { | ||
const msgId = (msg as SQSRecord).messageId; | ||
failures.push({ itemIdentifier: msgId }); | ||
} | ||
|
||
return failures; | ||
} | ||
|
||
/** | ||
* Determines whether all records in a batch failed to process | ||
* @returns true if all records resulted in exception results | ||
*/ | ||
public entireBatchFailed(): boolean { | ||
return this.exceptions.length == this.records.length; | ||
} | ||
|
||
/** | ||
* Collects identifiers for failed batch items | ||
* @returns formatted messages to use in batch deletion | ||
*/ | ||
public getMessagesToReport(): ItemIdentifier[] { | ||
return this.COLLECTOR_MAPPING[this.eventType](); | ||
} | ||
|
||
/** | ||
* Determines if any records failed to process | ||
* @returns true if any records resulted in exception | ||
*/ | ||
public hasMessagesToReport(): boolean { | ||
if (this.failureMessages.length != 0) { | ||
return true; | ||
} | ||
|
||
// console.debug('All ' + this.successMessages.length + ' records successfully processed'); | ||
|
||
return false; | ||
} | ||
|
||
/** | ||
* Remove results from previous execution | ||
*/ | ||
public prepare(): void { | ||
this.successMessages.length = 0; | ||
this.failureMessages.length = 0; | ||
this.exceptions.length = 0; | ||
this.batchResponse = DEFAULT_RESPONSE; | ||
} | ||
|
||
/** | ||
* @returns Batch items that failed processing, if any | ||
*/ | ||
public response(): BatchResponse { | ||
return this.batchResponse; | ||
} | ||
|
||
public toBatchType( | ||
record: EventSourceDataClassTypes, | ||
eventType: keyof typeof EventType | ||
): SQSRecord | KinesisStreamRecord | DynamoDBRecord { | ||
return DATA_CLASS_MAPPING[eventType](record); | ||
} | ||
} | ||
|
||
export { BasePartialBatchProcessor }; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
/** | ||
* Abstract class for batch processors | ||
*/ | ||
import { | ||
BaseRecord, | ||
EventSourceDataClassTypes, | ||
FailureResponse, | ||
ResultType, | ||
SuccessResponse, | ||
} from '.'; | ||
|
||
abstract class BasePartialProcessor { | ||
public exceptions: Error[]; | ||
|
||
public failureMessages: EventSourceDataClassTypes[]; | ||
|
||
public handler: CallableFunction; | ||
|
||
public records: BaseRecord[]; | ||
|
||
public successMessages: EventSourceDataClassTypes[]; | ||
|
||
/** | ||
* Initializes base processor class | ||
*/ | ||
public constructor() { | ||
this.successMessages = []; | ||
this.failureMessages = []; | ||
this.exceptions = []; | ||
this.records = []; | ||
this.handler = new Function(); | ||
} | ||
|
||
/** | ||
* Clean class instance after processing | ||
*/ | ||
public abstract clean(): void; | ||
|
||
/** | ||
* Keeps track of batch records that failed processing | ||
* @param record record that failed processing | ||
* @param exception exception that was thrown | ||
* @returns FailureResponse object with ["fail", exception, original record] | ||
*/ | ||
public failureHandler( | ||
record: EventSourceDataClassTypes, | ||
exception: Error | ||
): FailureResponse { | ||
const entry: FailureResponse = ['fail', exception.message, record]; | ||
// console.debug('Record processing exception: ' + exception.message); | ||
this.exceptions.push(exception); | ||
this.failureMessages.push(record); | ||
|
||
return entry; | ||
} | ||
|
||
/** | ||
* Prepare class instance before processing | ||
*/ | ||
public abstract prepare(): void; | ||
|
||
/** | ||
* Call instance's handler for each record | ||
* @returns List of processed records | ||
*/ | ||
public async process(): Promise<(SuccessResponse | FailureResponse)[]> { | ||
this.prepare(); | ||
|
||
const processedRecords: (SuccessResponse | FailureResponse)[] = []; | ||
for (const record of this.records) { | ||
processedRecords.push(await this.processRecord(record)); | ||
} | ||
|
||
this.clean(); | ||
|
||
return processedRecords; | ||
} | ||
|
||
/** | ||
* Process a record with the handler | ||
* @param record Record to be processed | ||
*/ | ||
public abstract processRecord( | ||
record: BaseRecord | ||
): Promise<SuccessResponse | FailureResponse>; | ||
|
||
/** | ||
* Set class instance attributes before execution | ||
* @param records List of records to be processed | ||
* @param handler CallableFunction to process entries of "records" | ||
* @returns this object | ||
*/ | ||
public register( | ||
records: BaseRecord[], | ||
handler: CallableFunction | ||
): BasePartialProcessor { | ||
this.records = records; | ||
this.handler = handler; | ||
|
||
return this; | ||
} | ||
|
||
/** | ||
* Keeps track of batch records that were processed successfully | ||
* @param record record that succeeded processing | ||
* @param result result from record handler | ||
* @returns SuccessResponse object with ["success", result, original record] | ||
*/ | ||
public successHandler( | ||
record: EventSourceDataClassTypes, | ||
result: ResultType | ||
): SuccessResponse { | ||
const entry: SuccessResponse = ['success', result, record]; | ||
this.successMessages.push(record); | ||
|
||
return entry; | ||
} | ||
} | ||
|
||
export { BasePartialProcessor }; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,31 @@ | ||
class BatchProcessor {} | ||
/** | ||
* Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB | ||
*/ | ||
import { | ||
BasePartialBatchProcessor, | ||
BaseRecord, | ||
FailureResponse, | ||
SuccessResponse, | ||
} from '.'; | ||
|
||
class BatchProcessor extends BasePartialBatchProcessor { | ||
/** | ||
* Process a record with instance's handler | ||
* @param record Batch record to be processed | ||
* @returns response of success or failure | ||
*/ | ||
public async processRecord( | ||
record: BaseRecord | ||
): Promise<SuccessResponse | FailureResponse> { | ||
try { | ||
const data = this.toBatchType(record, this.eventType); | ||
const result = await this.handler(data); | ||
|
||
return this.successHandler(record, result); | ||
} catch (e) { | ||
return this.failureHandler(record, e as Error); | ||
} | ||
} | ||
} | ||
|
||
export { BatchProcessor }; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
/** | ||
* Constants for batch processor classes | ||
*/ | ||
import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; | ||
import type { BatchResponse, EventSourceDataClassTypes } from '.'; | ||
|
||
const EventType = { | ||
SQS: 'SQS', | ||
KinesisDataStreams: 'KinesisDataStreams', | ||
DynamoDBStreams: 'DynamoDBStreams', | ||
} as const; | ||
|
||
const DEFAULT_RESPONSE: BatchResponse = { | ||
batchItemFailures: [], | ||
}; | ||
|
||
const DATA_CLASS_MAPPING = { | ||
[EventType.SQS]: (record: EventSourceDataClassTypes) => record as SQSRecord, | ||
[EventType.KinesisDataStreams]: (record: EventSourceDataClassTypes) => | ||
record as KinesisStreamRecord, | ||
[EventType.DynamoDBStreams]: (record: EventSourceDataClassTypes) => | ||
record as DynamoDBRecord, | ||
}; | ||
|
||
export { EventType, DEFAULT_RESPONSE, DATA_CLASS_MAPPING }; |
Oops, something went wrong.