From c7bb8c30d11c98efc8a9eb747309fe0240f5a5bf Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Wed, 5 Jul 2023 14:16:37 +0200 Subject: [PATCH 1/4] docs: new name --- packages/idempotency/README.md | 95 ++++++++++++++++++++-- packages/idempotency/src/makeIdempotent.ts | 2 +- 2 files changed, 89 insertions(+), 8 deletions(-) diff --git a/packages/idempotency/README.md b/packages/idempotency/README.md index 5fb4acd204..15efa33f7d 100644 --- a/packages/idempotency/README.md +++ b/packages/idempotency/README.md @@ -53,12 +53,37 @@ Next, review the IAM permissions attached to your AWS Lambda function and make s ### Function wrapper -You can make any function idempotent, and safe to retry, by wrapping it using the `makeFunctionIdempotent` higher-order function. +You can make any function idempotent, and safe to retry, by wrapping it using the `makeIdempotent` higher-order function. The function wrapper takes a reference to the function to be made idempotent as first argument, and an object with options as second argument. +When you wrap your Lambda handler function, the utility uses the content of the `event` parameter to handle the idempotency logic. + ```ts -import { makeFunctionIdempotent } from '@aws-lambda-powertools/idempotency'; +import { makeIdempotent } from '@aws-lambda-powertools/idempotency'; +import { DynamoDBPersistenceLayer } from '@aws-lambda-powertools/idempotency/dynamodb'; +import type { Context, APIGatewayProxyEvent } from 'aws-lambda'; + +const persistenceStore = new DynamoDBPersistenceLayer({ + tableName: 'idempotencyTableName', +}); + +const myHandler = async ( + event: APIGatewayProxyEvent, + _context: Context +): Promise => { + // your code goes here here +}; + +export const handler = makeIdempotent(myHandler, { + persistenceStore, +}); +``` + +You can also use the `makeIdempotent` function to wrap any other arbitrary function, not just Lambda handlers. + +```ts +import { makeIdempotent } from '@aws-lambda-powertools/idempotency'; import { DynamoDBPersistenceLayer } from '@aws-lambda-powertools/idempotency/dynamodb'; import type { Context, SQSEvent, SQSRecord } from 'aws-lambda'; @@ -70,20 +95,76 @@ const processingFunction = async (payload: SQSRecord): Promise => { // your code goes here here }; +const processIdempotently = makeIdempotent(processingFunction, { + persistenceStore, +}); + export const handler = async ( event: SQSEvent, _context: Context ): Promise => { for (const record of event.Records) { - await makeFunctionIdempotent(processingFunction, { - dataKeywordArgument: 'transactionId', - persistenceStore, - }); + await processIdempotently(record); } }; ``` -Note that we are specifying a `dataKeywordArgument` option, this tells the Idempotency utility which field(s) will be used as idempotency key. +If your function has multiple arguments, you can use the `dataIndexArgument` option to specify which argument should be used as the idempotency key. + +```ts +import { makeIdempotent } from '@aws-lambda-powertools/idempotency'; +import { DynamoDBPersistenceLayer } from '@aws-lambda-powertools/idempotency/dynamodb'; +import type { Context, SQSEvent, SQSRecord } from 'aws-lambda'; + +const persistenceStore = new DynamoDBPersistenceLayer({ + tableName: 'idempotencyTableName', +}); + +const processingFunction = async (payload: SQSRecord, customerId: string): Promise => { + // your code goes here here +}; + +const processIdempotently = makeIdempotent(processingFunction, { + persistenceStore, + // this tells the utility to use the second argument (`customerId`) as the idempotency key + dataIndexArgument: 1, +}); + +export const handler = async ( + event: SQSEvent, + _context: Context +): Promise => { + for (const record of event.Records) { + await processIdempotently(record, 'customer-123'); + } +}; +``` + +Note that you can also specify a JMESPath expression in the Idempotency config object to select a subset of the event payload as the idempotency key. This is useful when dealing with payloads that contain timestamps or request ids. + +```ts +import { makeIdempotent, IdempotencyConfig } from '@aws-lambda-powertools/idempotency'; +import { DynamoDBPersistenceLayer } from '@aws-lambda-powertools/idempotency/dynamodb'; +import type { Context, APIGatewayProxyEvent } from 'aws-lambda'; + +const persistenceStore = new DynamoDBPersistenceLayer({ + tableName: 'idempotencyTableName', +}); + +const myHandler = async ( + event: APIGatewayProxyEvent, + _context: Context +): Promise => { + // your code goes here here +}; + +export const handler = makeIdempotent(myHandler, { + persistenceStore, + config: new IdempotencyConfig({ + eventKeyJmespath: 'requestContext.identity.user', + }), +}); +``` Check the [docs](https://docs.powertools.aws.dev/lambda/typescript/latest/utilities/idempotency/) for more examples. diff --git a/packages/idempotency/src/makeIdempotent.ts b/packages/idempotency/src/makeIdempotent.ts index a44079d493..8bae3cdf6d 100644 --- a/packages/idempotency/src/makeIdempotent.ts +++ b/packages/idempotency/src/makeIdempotent.ts @@ -53,7 +53,7 @@ const isOptionsWithDataIndexArgument = ( * }; * * // we use wrapper to make processing function idempotent with DynamoDBPersistenceLayer - * const processIdempotently = makeFunctionIdempotent(processRecord, { + * const processIdempotently = makeIdempotent(processRecord, { * persistenceStore: new DynamoDBPersistenceLayer() * dataKeywordArgument: 'transactionId', // keyword argument to hash the payload and the result * }); From 2d042df1b18bc6cd6f3ad052601d1d3a16cb396c Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Wed, 5 Jul 2023 14:19:29 +0200 Subject: [PATCH 2/4] chore: rename e2e files --- ...ts => makeIdempotent.test.FunctionCode.ts} | 35 +++++++++---------- ...mpotent.test.ts => makeIdempotent.test.ts} | 7 ++-- .../tests/unit/makeIdempotent.test.ts | 2 +- 3 files changed, 21 insertions(+), 23 deletions(-) rename packages/idempotency/tests/e2e/{makeFunctionIdempotent.test.FunctionCode.ts => makeIdempotent.test.FunctionCode.ts} (67%) rename packages/idempotency/tests/e2e/{makeFunctionIdempotent.test.ts => makeIdempotent.test.ts} (97%) diff --git a/packages/idempotency/tests/e2e/makeFunctionIdempotent.test.FunctionCode.ts b/packages/idempotency/tests/e2e/makeIdempotent.test.FunctionCode.ts similarity index 67% rename from packages/idempotency/tests/e2e/makeFunctionIdempotent.test.FunctionCode.ts rename to packages/idempotency/tests/e2e/makeIdempotent.test.FunctionCode.ts index 8e947f7950..14fc0a7c8a 100644 --- a/packages/idempotency/tests/e2e/makeFunctionIdempotent.test.FunctionCode.ts +++ b/packages/idempotency/tests/e2e/makeIdempotent.test.FunctionCode.ts @@ -1,6 +1,6 @@ import type { Context } from 'aws-lambda'; import { DynamoDBPersistenceLayer } from '../../src/persistence/DynamoDBPersistenceLayer'; -import { makeFunctionIdempotent } from '../../src'; +import { makeIdempotent } from '../../src'; import { Logger } from '@aws-lambda-powertools/logger'; import { IdempotencyConfig } from '../../src'; @@ -34,39 +34,38 @@ const processRecord = (record: Record): string => { }; const idempotencyConfig = new IdempotencyConfig({}); - -const processIdempotently = makeFunctionIdempotent(processRecord, { +const processIdempotently = makeIdempotent(processRecord, { persistenceStore: dynamoDBPersistenceLayer, - dataKeywordArgument: 'foo', - config: idempotencyConfig, }); export const handler = async ( - _event: EventRecords, - _context: Context + event: EventRecords, + context: Context ): Promise => { - idempotencyConfig.registerLambdaContext(_context); - for (const record of _event.records) { - const result = await processIdempotently(record); + idempotencyConfig.registerLambdaContext(context); + for (const record of event.records) { + const result = processIdempotently(record); logger.info(result.toString()); } return Promise.resolve(); }; -const processIdempotentlyCustomized = makeFunctionIdempotent(processRecord, { +const idempotencyConfigWithSelection = new IdempotencyConfig({ + eventKeyJmesPath: 'foo', +}); +const processIdempotentlyCustomized = makeIdempotent(processRecord, { persistenceStore: ddbPersistenceLayerCustomized, - dataKeywordArgument: 'foo', - config: idempotencyConfig, + config: idempotencyConfigWithSelection, }); export const handlerCustomized = async ( - _event: EventRecords, - _context: Context + event: EventRecords, + context: Context ): Promise => { - idempotencyConfig.registerLambdaContext(_context); - for (const record of _event.records) { - const result = await processIdempotentlyCustomized(record); + idempotencyConfigWithSelection.registerLambdaContext(context); + for (const record of event.records) { + const result = processIdempotentlyCustomized(record); logger.info(result.toString()); } diff --git a/packages/idempotency/tests/e2e/makeFunctionIdempotent.test.ts b/packages/idempotency/tests/e2e/makeIdempotent.test.ts similarity index 97% rename from packages/idempotency/tests/e2e/makeFunctionIdempotent.test.ts rename to packages/idempotency/tests/e2e/makeIdempotent.test.ts index cd4344b024..cf017a8961 100644 --- a/packages/idempotency/tests/e2e/makeFunctionIdempotent.test.ts +++ b/packages/idempotency/tests/e2e/makeIdempotent.test.ts @@ -1,7 +1,7 @@ /** - * Test makeFunctionIdempotent + * Test makeIdempotent function * - * @group e2e/idempotency + * @group e2e/idempotency/makeIdempotent */ import { generateUniqueName, @@ -37,8 +37,7 @@ const stackName = generateUniqueName( runtime, 'makeFnIdempotent' ); -const makeFunctionIdepmpotentFile = - 'makeFunctionIdempotent.test.FunctionCode.ts'; +const makeFunctionIdepmpotentFile = 'makeIdempotent.test.FunctionCode.ts'; const app = new App(); diff --git a/packages/idempotency/tests/unit/makeIdempotent.test.ts b/packages/idempotency/tests/unit/makeIdempotent.test.ts index 0eb229e002..1fa880909e 100644 --- a/packages/idempotency/tests/unit/makeIdempotent.test.ts +++ b/packages/idempotency/tests/unit/makeIdempotent.test.ts @@ -1,5 +1,5 @@ /** - * Test Function Wrapper + * Test makeIdempotent Function Wrapper * * @group unit/idempotency/makeIdempotent */ From 149f465066e10f223e25a26190c1aabfce73c454 Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Sat, 8 Jul 2023 18:37:27 +0200 Subject: [PATCH 3/4] tests(idempotency): expand integration tests --- .github/workflows/run-e2e-tests.yml | 2 +- .../src/middleware/makeHandlerIdempotent.ts | 142 +++++-- .../BasePersistenceLayerInterface.ts | 3 +- ...makeHandlerIdempotent.test.FunctionCode.ts | 119 ++++++ .../tests/e2e/makeHandlerIdempotent.test.ts | 390 ++++++++++++++++++ .../e2e/makeIdempotent.test.FunctionCode.ts | 104 +++-- .../tests/e2e/makeIdempotent.test.ts | 272 ++++++++---- .../tests/helpers/idempotencyUtils.ts | 7 +- 8 files changed, 890 insertions(+), 149 deletions(-) create mode 100644 packages/idempotency/tests/e2e/makeHandlerIdempotent.test.FunctionCode.ts create mode 100644 packages/idempotency/tests/e2e/makeHandlerIdempotent.test.ts diff --git a/.github/workflows/run-e2e-tests.yml b/.github/workflows/run-e2e-tests.yml index 1950c89b84..fb4186c420 100644 --- a/.github/workflows/run-e2e-tests.yml +++ b/.github/workflows/run-e2e-tests.yml @@ -19,7 +19,7 @@ jobs: contents: read strategy: matrix: - package: [logger, metrics, tracer, parameters] + package: [logger, metrics, tracer, parameters, idempotency] version: [14, 16, 18] fail-fast: false steps: diff --git a/packages/idempotency/src/middleware/makeHandlerIdempotent.ts b/packages/idempotency/src/middleware/makeHandlerIdempotent.ts index 153e19153b..2efe0194e8 100644 --- a/packages/idempotency/src/middleware/makeHandlerIdempotent.ts +++ b/packages/idempotency/src/middleware/makeHandlerIdempotent.ts @@ -1,6 +1,9 @@ import { IdempotencyHandler } from '../IdempotencyHandler'; import { IdempotencyConfig } from '../IdempotencyConfig'; -import { cleanupMiddlewares } from '@aws-lambda-powertools/commons/lib/middleware'; +import { + cleanupMiddlewares, + IDEMPOTENCY_KEY, +} from '@aws-lambda-powertools/commons/lib/middleware'; import { IdempotencyInconsistentStateError, IdempotencyItemAlreadyExistsError, @@ -9,33 +12,92 @@ import { import { IdempotencyRecord } from '../persistence'; import { MAX_RETRIES } from '../constants'; import type { IdempotencyLambdaHandlerOptions } from '../types'; +import type { BasePersistenceLayerInterface } from '../persistence'; import { MiddlewareLikeObj, MiddyLikeRequest, JSONValue, } from '@aws-lambda-powertools/commons'; +/** + * @internal + * Utility function to get the persistence store from the request internal storage + * + * @param request The Middy request object + * @returns The persistence store from the request internal + */ +const getPersistenceStoreFromRequestInternal = ( + request: MiddyLikeRequest +): BasePersistenceLayerInterface => { + const persistenceStore = request.internal[ + `${IDEMPOTENCY_KEY}.idempotencyPersistenceStore` + ] as BasePersistenceLayerInterface; + if (!persistenceStore) { + throw new Error( + 'Idempotency persistence store not found in request internal' + ); + } + + return persistenceStore; +}; + +/** + * @internal + * Utility function to set the persistence store in the request internal storage + * + * @param request The Middy request object + * @param persistenceStore The persistence store to set in the request internal + */ +const setPersistenceStoreInRequestInternal = ( + request: MiddyLikeRequest, + persistenceStore: BasePersistenceLayerInterface +): void => { + request.internal[`${IDEMPOTENCY_KEY}.idempotencyPersistenceStore`] = + persistenceStore; +}; + +/** + * @internal + * Utility function to set a flag in the request internal storage to skip the idempotency middleware + * This is used to skip the idempotency middleware when the idempotency key is not present in the payload + * or when idempotency is disabled + * + * @param request The Middy request object + */ +const setIdempotencySkipFlag = (request: MiddyLikeRequest): void => { + request.internal[`${IDEMPOTENCY_KEY}.skip`] = true; +}; + +/** + * @internal + * Utility function to get the idempotency key from the request internal storage + * and determine if the request should skip the idempotency middleware + * + * @param request The Middy request object + * @returns Whether the idempotency middleware should be skipped + */ +const shouldSkipIdempotency = (request: MiddyLikeRequest): boolean => { + return request.internal[`${IDEMPOTENCY_KEY}.skip`] === true; +}; + /** * A middy middleware to make your Lambda Handler idempotent. * * @example * ```typescript - * import { - * makeHandlerIdempotent, - * DynamoDBPersistenceLayer, - * } from '@aws-lambda-powertools/idempotency'; + * import { makeHandlerIdempotent } from '@aws-lambda-powertools/idempotency/middleware'; + * import { DynamoDBPersistenceLayer } from '@aws-lambda-powertools/idempotency/dynamodb'; * import middy from '@middy/core'; * - * const dynamoDBPersistenceLayer = new DynamoDBPersistenceLayer({ - * tableName: 'idempotencyTable', + * const persistenceStore = new DynamoDBPersistenceLayer({ + * tableName: 'idempotencyTable', * }); * - * const lambdaHandler = async (_event: unknown, _context: unknown) => { - * //... - * }; - * - * export const handler = middy(lambdaHandler) - * .use(makeHandlerIdempotent({ persistenceStore: dynamoDBPersistenceLayer })); + * export const handler = middy( + * async (_event: unknown, _context: unknown): Promise => { + * // your code goes here + * } + * ).use(makeHandlerIdempotent({ persistenceStore: dynamoDBPersistenceLayer })); * ``` * * @param options - Options for the idempotency middleware @@ -43,17 +105,6 @@ import { const makeHandlerIdempotent = ( options: IdempotencyLambdaHandlerOptions ): MiddlewareLikeObj => { - const idempotencyConfig = options.config - ? options.config - : new IdempotencyConfig({}); - const persistenceStore = options.persistenceStore; - persistenceStore.configure({ - config: idempotencyConfig, - }); - - // keep the flag for after and onError checks - let shouldSkipIdempotency = false; - /** * Function called before the handler is executed. * @@ -76,7 +127,16 @@ const makeHandlerIdempotent = ( request: MiddyLikeRequest, retryNo = 0 ): Promise => { + const idempotencyConfig = options.config + ? options.config + : new IdempotencyConfig({}); + const persistenceStore = options.persistenceStore; + persistenceStore.configure({ + config: idempotencyConfig, + }); + if ( + !idempotencyConfig.isEnabled() || IdempotencyHandler.shouldSkipIdempotency( idempotencyConfig.eventKeyJmesPath, idempotencyConfig.throwOnNoIdempotencyKey, @@ -84,10 +144,17 @@ const makeHandlerIdempotent = ( ) ) { // set the flag to skip checks in after and onError - shouldSkipIdempotency = true; + setIdempotencySkipFlag(request); return; } + + /** + * Store the persistence store in the request internal so that it can be + * used in after and onError + */ + setPersistenceStoreInRequestInternal(request, persistenceStore); + try { await persistenceStore.saveInProgress( request.event as JSONValue, @@ -129,6 +196,7 @@ const makeHandlerIdempotent = ( } } }; + /** * Function called after the handler has executed successfully. * @@ -139,9 +207,10 @@ const makeHandlerIdempotent = ( * @param request - The Middy request object */ const after = async (request: MiddyLikeRequest): Promise => { - if (shouldSkipIdempotency) { + if (shouldSkipIdempotency(request)) { return; } + const persistenceStore = getPersistenceStoreFromRequestInternal(request); try { await persistenceStore.saveSuccess( request.event as JSONValue, @@ -164,9 +233,10 @@ const makeHandlerIdempotent = ( * @param request - The Middy request object */ const onError = async (request: MiddyLikeRequest): Promise => { - if (shouldSkipIdempotency) { + if (shouldSkipIdempotency(request)) { return; } + const persistenceStore = getPersistenceStoreFromRequestInternal(request); try { await persistenceStore.deleteRecord(request.event as JSONValue); } catch (error) { @@ -177,19 +247,11 @@ const makeHandlerIdempotent = ( } }; - if (idempotencyConfig.isEnabled()) { - return { - before, - after, - onError, - }; - } else { - return { - before: () => { - return undefined; - }, - }; - } + return { + before, + after, + onError, + }; }; export { makeHandlerIdempotent }; diff --git a/packages/idempotency/src/persistence/BasePersistenceLayerInterface.ts b/packages/idempotency/src/persistence/BasePersistenceLayerInterface.ts index ce0d68b5d7..f4e792082a 100644 --- a/packages/idempotency/src/persistence/BasePersistenceLayerInterface.ts +++ b/packages/idempotency/src/persistence/BasePersistenceLayerInterface.ts @@ -1,10 +1,11 @@ import { IdempotencyRecord } from './IdempotencyRecord'; import type { BasePersistenceLayerOptions } from '../types/BasePersistenceLayer'; +// TODO: move this to types folder interface BasePersistenceLayerInterface { configure(options?: BasePersistenceLayerOptions): void; isPayloadValidationEnabled(): boolean; - saveInProgress(data: unknown): Promise; + saveInProgress(data: unknown, remainingTimeInMillis?: number): Promise; saveSuccess(data: unknown, result: unknown): Promise; deleteRecord(data: unknown): Promise; getRecord(data: unknown): Promise; diff --git a/packages/idempotency/tests/e2e/makeHandlerIdempotent.test.FunctionCode.ts b/packages/idempotency/tests/e2e/makeHandlerIdempotent.test.FunctionCode.ts new file mode 100644 index 0000000000..510dcd399e --- /dev/null +++ b/packages/idempotency/tests/e2e/makeHandlerIdempotent.test.FunctionCode.ts @@ -0,0 +1,119 @@ +import type { Context } from 'aws-lambda'; +import { DynamoDBPersistenceLayer } from '../../src/persistence/DynamoDBPersistenceLayer'; +import { makeHandlerIdempotent } from '../../src/middleware'; +import { IdempotencyConfig } from '../../src'; +import { Logger } from '@aws-lambda-powertools/logger'; +import middy from '@middy/core'; + +const IDEMPOTENCY_TABLE_NAME = + process.env.IDEMPOTENCY_TABLE_NAME || 'table_name'; + +const dynamoDBPersistenceLayer = new DynamoDBPersistenceLayer({ + tableName: IDEMPOTENCY_TABLE_NAME, +}); +const logger = new Logger(); + +/** + * Test handler with sequential execution. + */ +export const handler = middy( + async (event: { foo: string }, context: Context) => { + logger.addContext(context); + logger.info(`foo`, { details: event.foo }); + + return event.foo; + } +).use( + makeHandlerIdempotent({ + persistenceStore: dynamoDBPersistenceLayer, + }) +); + +/** + * Test handler with parallel execution. + * + * We put a 1.5s delay in the handler to ensure that it doesn't return + * before the second call is made. This way the slowest call will be + * rejected and the fastest will be processed. + */ +export const handlerParallel = middy( + async (event: { foo: string }, context: Context) => { + logger.addContext(context); + + await new Promise((resolve) => setTimeout(resolve, 1500)); + + logger.info('Processed event', { details: event.foo }); + + return event.foo; + } +).use( + makeHandlerIdempotent({ + persistenceStore: dynamoDBPersistenceLayer, + }) +); + +/** + * Test handler with timeout and JMESPath expression to extract the + * idempotency key. + * + * We put a 0.5s delay in the handler to ensure that it will timeout + * (timeout is set to 1s). By the time the second call is made, the + * second call is made, the first idempotency record has expired. + */ +export const handlerTimeout = middy( + async (event: { foo: string; invocation: number }, context: Context) => { + logger.addContext(context); + + if (event.invocation === 0) { + await new Promise((resolve) => setTimeout(resolve, 2000)); + } + + logger.info('Processed event', { + details: event.foo, + }); + + return { + foo: event.foo, + invocation: event.invocation, + }; + } +).use( + makeHandlerIdempotent({ + persistenceStore: dynamoDBPersistenceLayer, + config: new IdempotencyConfig({ + eventKeyJmesPath: 'foo', + }), + }) +); + +/** + * Test handler with expired idempotency record. + * + * We configure the idempotency utility to expire records after 1s. + * By the time the second call is made, the first idempotency record + * has expired. The second call will be processed. We include a JMESPath + * expression to extract the idempotency key (`foo`) but we return the + * invocation number as well so that we can check that the second call + * was processed by looking at the value in the stored idempotency record. + */ +export const handlerExpired = middy( + async (event: { foo: string; invocation: number }, context: Context) => { + logger.addContext(context); + + logger.info('Processed event', { details: event.foo }); + + return { + foo: event.foo, + invocation: event.invocation, + }; + } +).use( + makeHandlerIdempotent({ + persistenceStore: dynamoDBPersistenceLayer, + config: new IdempotencyConfig({ + useLocalCache: false, + expiresAfterSeconds: 1, + eventKeyJmesPath: 'foo', + }), + }) +); diff --git a/packages/idempotency/tests/e2e/makeHandlerIdempotent.test.ts b/packages/idempotency/tests/e2e/makeHandlerIdempotent.test.ts new file mode 100644 index 0000000000..f5bef822db --- /dev/null +++ b/packages/idempotency/tests/e2e/makeHandlerIdempotent.test.ts @@ -0,0 +1,390 @@ +/** + * Test makeHandlerIdempotent middleware + * + * @group e2e/idempotency/makeHandlerIdempotent + */ +import { + generateUniqueName, + invokeFunction, + isValidRuntimeKey, +} from '../../../commons/tests/utils/e2eUtils'; +import { InvocationLogs } from '../../../commons/tests/utils/InvocationLogs'; +import { + RESOURCE_NAME_PREFIX, + SETUP_TIMEOUT, + TEARDOWN_TIMEOUT, + TEST_CASE_TIMEOUT, +} from './constants'; +import { + deployStack, + destroyStack, +} from '../../../commons/tests/utils/cdk-cli'; +import { v4 } from 'uuid'; +import { App, Stack } from 'aws-cdk-lib'; +import { DynamoDBClient } from '@aws-sdk/client-dynamodb'; +import { createHash } from 'node:crypto'; +import { ScanCommand } from '@aws-sdk/lib-dynamodb'; +import { createIdempotencyResources } from '../helpers/idempotencyUtils'; + +const runtime: string = process.env.RUNTIME || 'nodejs18x'; + +if (!isValidRuntimeKey(runtime)) { + throw new Error(`Invalid runtime key value: ${runtime}`); +} + +const uuid = v4(); +const stackName = generateUniqueName( + RESOURCE_NAME_PREFIX, + uuid, + runtime, + 'makeFnIdempotent' +); +const makeHandlerIdempotentFile = 'makeHandlerIdempotent.test.FunctionCode.ts'; + +const app = new App(); + +const ddb = new DynamoDBClient({}); +const stack = new Stack(app, stackName); + +const testDefault = 'default-sequential'; +const functionNameDefault = generateUniqueName( + RESOURCE_NAME_PREFIX, + uuid, + runtime, + `${testDefault}-fn` +); +const ddbTableNameDefault = generateUniqueName( + RESOURCE_NAME_PREFIX, + uuid, + runtime, + `${testDefault}-table` +); +createIdempotencyResources( + stack, + runtime, + ddbTableNameDefault, + makeHandlerIdempotentFile, + functionNameDefault, + 'handler' +); + +const testDefaultParallel = 'default-parallel'; +const functionNameDefaultParallel = generateUniqueName( + RESOURCE_NAME_PREFIX, + uuid, + runtime, + `${testDefaultParallel}-fn` +); +const ddbTableNameDefaultParallel = generateUniqueName( + RESOURCE_NAME_PREFIX, + uuid, + runtime, + `${testDefaultParallel}-table` +); +createIdempotencyResources( + stack, + runtime, + ddbTableNameDefaultParallel, + makeHandlerIdempotentFile, + functionNameDefaultParallel, + 'handlerParallel' +); + +const testTimeout = 'timeout'; +const functionNameTimeout = generateUniqueName( + RESOURCE_NAME_PREFIX, + uuid, + runtime, + `${testTimeout}-fn` +); +const ddbTableNameTimeout = generateUniqueName( + RESOURCE_NAME_PREFIX, + uuid, + runtime, + `${testTimeout}-table` +); +createIdempotencyResources( + stack, + runtime, + ddbTableNameTimeout, + makeHandlerIdempotentFile, + functionNameTimeout, + 'handlerTimeout', + undefined, + 2 +); + +const testExpired = 'expired'; +const functionNameExpired = generateUniqueName( + RESOURCE_NAME_PREFIX, + uuid, + runtime, + `${testExpired}-fn` +); +const ddbTableNameExpired = generateUniqueName( + RESOURCE_NAME_PREFIX, + uuid, + runtime, + `${testExpired}-table` +); +createIdempotencyResources( + stack, + runtime, + ddbTableNameExpired, + makeHandlerIdempotentFile, + functionNameExpired, + 'handlerExpired', + undefined, + 2 +); + +describe(`Idempotency E2E tests, middy middleware usage for runtime ${runtime}`, () => { + beforeAll(async () => { + await deployStack(app, stack); + }, SETUP_TIMEOUT); + + test( + 'when called twice with the same payload, it returns the same result and runs the handler once', + async () => { + // Prepare + const payload = { + foo: 'bar', + }; + const payloadHash = createHash('md5') + .update(JSON.stringify(payload)) + .digest('base64'); + + // Act + const logs = await invokeFunction( + functionNameDefault, + 2, + 'SEQUENTIAL', + payload, + false + ); + const functionLogs = logs.map((log) => log.getFunctionLogs()); + + // Assess + const idempotencyRecords = await ddb.send( + new ScanCommand({ + TableName: ddbTableNameDefault, + }) + ); + expect(idempotencyRecords.Items?.length).toEqual(1); + expect(idempotencyRecords.Items?.[0].id).toEqual( + `${functionNameDefault}#${payloadHash}` + ); + expect(idempotencyRecords.Items?.[0].data).toEqual('bar'); + expect(idempotencyRecords.Items?.[0].status).toEqual('COMPLETED'); + + // During the first invocation the handler should be called, so the logs should contain 1 log + expect(functionLogs[0]).toHaveLength(1); + // We test the content of the log as well as the presence of fields from the context, this + // ensures that the all the arguments are passed to the handler when made idempotent + expect(InvocationLogs.parseFunctionLog(functionLogs[0][0])).toEqual( + expect.objectContaining({ + message: 'foo', + details: 'bar', + function_name: functionNameDefault, + }) + ); + // During the second invocation the handler should not be called, so the logs should be empty + expect(functionLogs[1]).toHaveLength(0); + }, + TEST_CASE_TIMEOUT + ); + + test( + 'when two identical requests are sent in parallel, the handler is called only once', + async () => { + // Prepare + const payload = { + foo: 'bar', + }; + const payloadHash = createHash('md5') + .update(JSON.stringify(payload)) + .digest('base64'); + + // Act + const logs = await invokeFunction( + functionNameDefaultParallel, + 2, + 'PARALLEL', + payload, + false + ); + const functionLogs = logs.map((log) => log.getFunctionLogs()); + + // Assess + const idempotencyRecords = await ddb.send( + new ScanCommand({ + TableName: ddbTableNameDefaultParallel, + }) + ); + expect(idempotencyRecords.Items?.length).toEqual(1); + expect(idempotencyRecords.Items?.[0].id).toEqual( + `${functionNameDefaultParallel}#${payloadHash}` + ); + expect(idempotencyRecords.Items?.[0].data).toEqual('bar'); + expect(idempotencyRecords.Items?.[0].status).toEqual('COMPLETED'); + + /** + * Since the requests are sent in parallel we don't know which one will be processed first, + * however we expect that only on of them will be processed by the handler, while the other + * one will be rejected with IdempotencyAlreadyInProgressError. + * + * We filter the logs to find which one was successful and which one failed, then we check + * that they contain the expected logs. + */ + const successfulInvocationLogs = functionLogs.find( + (functionLog) => + functionLog.find((log) => log.includes('Processed event')) !== + undefined + ); + const failedInvocationLogs = functionLogs.find( + (functionLog) => + functionLog.find((log) => + log.includes('There is already an execution in progress') + ) !== undefined + ); + expect(successfulInvocationLogs).toHaveLength(1); + expect(failedInvocationLogs).toHaveLength(1); + }, + TEST_CASE_TIMEOUT + ); + + test( + 'when the function times out, the second request is processed correctly by the handler', + async () => { + // Prepare + const payload = { + foo: 'bar', + }; + const payloadHash = createHash('md5') + .update(JSON.stringify(payload.foo)) + .digest('base64'); + + // Act + const logs = await invokeFunction( + functionNameTimeout, + 2, + 'SEQUENTIAL', + payload, + true + ); + const functionLogs = logs.map((log) => log.getFunctionLogs()); + + // Assess + const idempotencyRecords = await ddb.send( + new ScanCommand({ + TableName: ddbTableNameTimeout, + }) + ); + expect(idempotencyRecords.Items?.length).toEqual(1); + expect(idempotencyRecords.Items?.[0].id).toEqual( + `${functionNameTimeout}#${payloadHash}` + ); + expect(idempotencyRecords.Items?.[0].data).toEqual({ + ...payload, + invocation: 1, + }); + expect(idempotencyRecords.Items?.[0].status).toEqual('COMPLETED'); + + // During the first invocation the function should timeout so the logs should contain 2 logs + expect(functionLogs[0]).toHaveLength(2); + expect(functionLogs[0][0]).toContain('Task timed out after'); + // During the second invocation the handler should be called and complete, so the logs should + // contain 1 log + expect(functionLogs[1]).toHaveLength(1); + expect(InvocationLogs.parseFunctionLog(functionLogs[1][0])).toEqual( + expect.objectContaining({ + message: 'Processed event', + details: 'bar', + function_name: functionNameTimeout, + }) + ); + }, + TEST_CASE_TIMEOUT + ); + + test( + 'when the idempotency record is expired, the second request is processed correctly by the handler', + async () => { + // Prepare + const payload = { + foo: 'bar', + }; + const payloadHash = createHash('md5') + .update(JSON.stringify(payload.foo)) + .digest('base64'); + + // Act + const logs = [ + ( + await invokeFunction( + functionNameExpired, + 1, + 'SEQUENTIAL', + { ...payload, invocation: 0 }, + false + ) + )[0], + ]; + // Wait for the idempotency record to expire + await new Promise((resolve) => setTimeout(resolve, 2000)); + logs.push( + ( + await invokeFunction( + functionNameExpired, + 1, + 'SEQUENTIAL', + { ...payload, invocation: 1 }, + false + ) + )[0] + ); + const functionLogs = logs.map((log) => log.getFunctionLogs()); + + // Assess + const idempotencyRecords = await ddb.send( + new ScanCommand({ + TableName: ddbTableNameExpired, + }) + ); + expect(idempotencyRecords.Items?.length).toEqual(1); + expect(idempotencyRecords.Items?.[0].id).toEqual( + `${functionNameExpired}#${payloadHash}` + ); + expect(idempotencyRecords.Items?.[0].data).toEqual({ + ...payload, + invocation: 1, + }); + expect(idempotencyRecords.Items?.[0].status).toEqual('COMPLETED'); + + // Both invocations should be successful and the logs should contain 1 log each + expect(functionLogs[0]).toHaveLength(1); + expect(InvocationLogs.parseFunctionLog(functionLogs[1][0])).toEqual( + expect.objectContaining({ + message: 'Processed event', + details: 'bar', + function_name: functionNameExpired, + }) + ); + // During the second invocation the handler should be called and complete, so the logs should + // contain 1 log + expect(functionLogs[1]).toHaveLength(1); + expect(InvocationLogs.parseFunctionLog(functionLogs[1][0])).toEqual( + expect.objectContaining({ + message: 'Processed event', + details: 'bar', + function_name: functionNameExpired, + }) + ); + }, + TEST_CASE_TIMEOUT + ); + + afterAll(async () => { + await destroyStack(app, stack); + }, TEARDOWN_TIMEOUT); +}); diff --git a/packages/idempotency/tests/e2e/makeIdempotent.test.FunctionCode.ts b/packages/idempotency/tests/e2e/makeIdempotent.test.FunctionCode.ts index 14fc0a7c8a..9786ddea0e 100644 --- a/packages/idempotency/tests/e2e/makeIdempotent.test.FunctionCode.ts +++ b/packages/idempotency/tests/e2e/makeIdempotent.test.FunctionCode.ts @@ -6,68 +6,100 @@ import { IdempotencyConfig } from '../../src'; const IDEMPOTENCY_TABLE_NAME = process.env.IDEMPOTENCY_TABLE_NAME || 'table_name'; + +// Default persistence layer const dynamoDBPersistenceLayer = new DynamoDBPersistenceLayer({ tableName: IDEMPOTENCY_TABLE_NAME, }); +// Customized persistence layer const ddbPersistenceLayerCustomized = new DynamoDBPersistenceLayer({ tableName: IDEMPOTENCY_TABLE_NAME, - dataAttr: 'dataattr', + dataAttr: 'dataAttr', keyAttr: 'customId', - expiryAttr: 'expiryattr', - statusAttr: 'statusattr', - inProgressExpiryAttr: 'inprogressexpiryattr', - staticPkValue: 'staticpkvalue', - validationKeyAttr: 'validationkeyattr', + expiryAttr: 'expiryAttr', + statusAttr: 'statusAttr', + inProgressExpiryAttr: 'inProgressExpiryAttr', + staticPkValue: 'staticPkValue', + validationKeyAttr: 'validationKeyAttr', }); -interface EventRecords { - records: Record[]; -} - const logger = new Logger(); -const processRecord = (record: Record): string => { - logger.info(`Got test event: ${JSON.stringify(record)}`); - - return 'Processing done: ' + record['foo']; -}; - +/** + * Test idempotent arbitrary function with default persistence layer configs. + */ const idempotencyConfig = new IdempotencyConfig({}); -const processIdempotently = makeIdempotent(processRecord, { - persistenceStore: dynamoDBPersistenceLayer, -}); +const processIdempotently = makeIdempotent( + (record: Record): string => { + logger.info('Got test event', { record }); + + return `Processing done: ${record.foo}`; + }, + { + persistenceStore: dynamoDBPersistenceLayer, + config: idempotencyConfig, + } +); -export const handler = async ( - event: EventRecords, +export const handlerDefault = async ( + event: { + records: Record[]; + }, context: Context ): Promise => { idempotencyConfig.registerLambdaContext(context); for (const record of event.records) { - const result = processIdempotently(record); - logger.info(result.toString()); + await processIdempotently(record); } - - return Promise.resolve(); }; +/** + * Test idempotent arbitrary function with customized persistence layer configs + * and JMESPath expression to enable payload validation. + */ const idempotencyConfigWithSelection = new IdempotencyConfig({ - eventKeyJmesPath: 'foo', -}); -const processIdempotentlyCustomized = makeIdempotent(processRecord, { - persistenceStore: ddbPersistenceLayerCustomized, - config: idempotencyConfigWithSelection, + payloadValidationJmesPath: 'foo', }); +const processIdempotentlyCustomized = makeIdempotent( + (baz: number, record: Record): Record => { + logger.info('Got test event', { baz, record }); + + return record; + }, + { + persistenceStore: ddbPersistenceLayerCustomized, + config: idempotencyConfigWithSelection, + dataIndexArgument: 1, + } +); export const handlerCustomized = async ( - event: EventRecords, + event: { + records: Record[]; + }, context: Context ): Promise => { idempotencyConfigWithSelection.registerLambdaContext(context); - for (const record of event.records) { - const result = processIdempotentlyCustomized(record); - logger.info(result.toString()); + for (const [idx, record] of event.records.entries()) { + await processIdempotentlyCustomized(idx, record); } - - return Promise.resolve(); }; + +/** + * Test idempotent Lambda handler with JMESPath expression to extract event key. + */ +export const handlerLambda = makeIdempotent( + async (event: { foo: string }, context: Context) => { + logger.addContext(context); + logger.info(`foo`, { details: event.foo }); + + return event.foo; + }, + { + persistenceStore: dynamoDBPersistenceLayer, + config: new IdempotencyConfig({ + eventKeyJmesPath: 'foo', + }), + } +); diff --git a/packages/idempotency/tests/e2e/makeIdempotent.test.ts b/packages/idempotency/tests/e2e/makeIdempotent.test.ts index cf017a8961..1906a8a16d 100644 --- a/packages/idempotency/tests/e2e/makeIdempotent.test.ts +++ b/packages/idempotency/tests/e2e/makeIdempotent.test.ts @@ -22,8 +22,9 @@ import { deployStack, destroyStack, } from '../../../commons/tests/utils/cdk-cli'; -import { GetCommand, ScanCommand } from '@aws-sdk/lib-dynamodb'; +import { ScanCommand } from '@aws-sdk/lib-dynamodb'; import { createIdempotencyResources } from '../helpers/idempotencyUtils'; +import { InvocationLogs } from '@aws-lambda-powertools/commons/tests/utils/InvocationLogs'; const runtime: string = process.env.RUNTIME || 'nodejs18x'; @@ -37,123 +38,139 @@ const stackName = generateUniqueName( runtime, 'makeFnIdempotent' ); -const makeFunctionIdepmpotentFile = 'makeIdempotent.test.FunctionCode.ts'; +const makeFunctionIdempotentFile = 'makeIdempotent.test.FunctionCode.ts'; const app = new App(); const ddb = new DynamoDBClient({ region: 'eu-west-1' }); const stack = new Stack(app, stackName); +const testDefault = 'default'; const functionNameDefault = generateUniqueName( RESOURCE_NAME_PREFIX, uuid, runtime, - 'default' + `${testDefault}-fn` +); +const ddbTableNameDefault = generateUniqueName( + RESOURCE_NAME_PREFIX, + uuid, + runtime, + `${testDefault}-table` ); -const ddbTableNameDefault = stackName + '-default-table'; createIdempotencyResources( stack, runtime, ddbTableNameDefault, - makeFunctionIdepmpotentFile, + makeFunctionIdempotentFile, functionNameDefault, - 'handler' + 'handlerDefault' ); -const functionNameCustom = generateUniqueName( +const testCustomConfig = 'customConfig'; +const functionNameCustomConfig = generateUniqueName( + RESOURCE_NAME_PREFIX, + uuid, + runtime, + `${testCustomConfig}-fn` +); +const ddbTableNameCustomConfig = generateUniqueName( RESOURCE_NAME_PREFIX, uuid, runtime, - 'custom' + `${testCustomConfig}-fn` ); -const ddbTableNameCustom = stackName + '-custom-table'; createIdempotencyResources( stack, runtime, - ddbTableNameCustom, - makeFunctionIdepmpotentFile, - functionNameCustom, + ddbTableNameCustomConfig, + makeFunctionIdempotentFile, + functionNameCustomConfig, 'handlerCustomized', 'customId' ); -const functionNameKeywordArg = generateUniqueName( +const testLambdaHandler = 'handler'; +const functionNameLambdaHandler = generateUniqueName( RESOURCE_NAME_PREFIX, uuid, runtime, - 'keywordarg' + `${testLambdaHandler}-fn` +); +const ddbTableNameLambdaHandler = generateUniqueName( + RESOURCE_NAME_PREFIX, + uuid, + runtime, + `${testLambdaHandler}-table` ); -const ddbTableNameKeywordArg = stackName + '-keywordarg-table'; createIdempotencyResources( stack, runtime, - ddbTableNameKeywordArg, - makeFunctionIdepmpotentFile, - functionNameKeywordArg, - 'handlerWithKeywordArgument' + ddbTableNameLambdaHandler, + makeFunctionIdempotentFile, + functionNameLambdaHandler, + 'handlerLambda' ); -describe('Idempotency e2e test function wrapper, default settings', () => { +describe(`Idempotency E2E tests, wrapper function usage for runtime`, () => { beforeAll(async () => { await deployStack(app, stack); }, SETUP_TIMEOUT); it( - 'when called twice, it returns the same result', + 'when called twice with the same payload, it returns the same result', async () => { + // Prepare const payload = { records: [ { id: 1, foo: 'bar' }, { id: 2, foo: 'baz' }, - { id: 3, foo: 'bar' }, + { id: 1, foo: 'bar' }, ], }; - const invokeStart = Date.now(); - await invokeFunction( + const payloadHashes = payload.records.map((record) => + createHash('md5').update(JSON.stringify(record)).digest('base64') + ); + + // Act + const logs = await invokeFunction( functionNameDefault, 2, 'SEQUENTIAL', payload, false ); + const functionLogs = logs.map((log) => log.getFunctionLogs()); - const payloadHashFirst = createHash('md5') - .update(JSON.stringify('bar')) - .digest('base64'); - const payloadHashSecond = createHash('md5') - .update(JSON.stringify('baz')) - .digest('base64'); - - const result = await ddb.send( - new ScanCommand({ TableName: ddbTableNameDefault }) - ); - expect(result?.Items?.length).toEqual(2); - - const resultFirst = await ddb.send( - new GetCommand({ + // Assess + const idempotencyRecords = await ddb.send( + new ScanCommand({ TableName: ddbTableNameDefault, - Key: { id: `${functionNameDefault}#${payloadHashFirst}` }, }) ); - expect(resultFirst?.Item?.data).toEqual('Processing done: bar'); - expect(resultFirst?.Item?.expiration).toBeGreaterThan(Date.now() / 1000); - expect(resultFirst?.Item?.in_progress_expiration).toBeGreaterThan( - invokeStart + // Since records 1 and 3 have the same payload, only 2 records should be created + expect(idempotencyRecords?.Items?.length).toEqual(2); + const idempotencyRecordsItems = idempotencyRecords.Items?.sort((a, b) => + a.expiration > b.expiration ? 1 : -1 ); - expect(resultFirst?.Item?.status).toEqual('COMPLETED'); - const resultSecond = await ddb.send( - new GetCommand({ - TableName: ddbTableNameDefault, - Key: { id: `${functionNameDefault}#${payloadHashSecond}` }, - }) - ); - expect(resultSecond?.Item?.data).toEqual('Processing done: baz'); - expect(resultSecond?.Item?.expiration).toBeGreaterThan(Date.now() / 1000); - expect(resultSecond?.Item?.in_progress_expiration).toBeGreaterThan( - invokeStart - ); - expect(resultSecond?.Item?.status).toEqual('COMPLETED'); + expect(idempotencyRecordsItems?.[0]).toStrictEqual({ + id: `${functionNameDefault}#${payloadHashes[0]}`, + data: 'Processing done: bar', + status: 'COMPLETED', + expiration: expect.any(Number), + in_progress_expiration: expect.any(Number), + }); + + expect(idempotencyRecordsItems?.[1]).toStrictEqual({ + id: `${functionNameDefault}#${payloadHashes[1]}`, + data: 'Processing done: baz', + status: 'COMPLETED', + expiration: expect.any(Number), + in_progress_expiration: expect.any(Number), + }); + + expect(functionLogs[0]).toHaveLength(2); }, TEST_CASE_TIMEOUT ); @@ -161,6 +178,7 @@ describe('Idempotency e2e test function wrapper, default settings', () => { test( 'when called with customized function wrapper, it creates ddb entry with custom attributes', async () => { + // Prepare const payload = { records: [ { id: 1, foo: 'bar' }, @@ -168,28 +186,144 @@ describe('Idempotency e2e test function wrapper, default settings', () => { { id: 3, foo: 'bar' }, ], }; - const payloadHash = createHash('md5').update('"bar"').digest('base64'); + const payloadHashes = payload.records.map((record) => + createHash('md5').update(JSON.stringify(record)).digest('base64') + ); + const validationHashes = payload.records.map((record) => + createHash('md5').update(JSON.stringify(record.foo)).digest('base64') + ); - const invocationLogsCustmozed = await invokeFunction( - functionNameCustom, + // Act + const logs = await invokeFunction( + functionNameCustomConfig, 2, 'SEQUENTIAL', payload, false ); - const result = await ddb.send( - new GetCommand({ - TableName: ddbTableNameCustom, - Key: { customId: `${functionNameCustom}#${payloadHash}` }, + const functionLogs = logs.map((log) => log.getFunctionLogs()); + + // Assess + const idempotencyRecords = await ddb.send( + new ScanCommand({ + TableName: ddbTableNameCustomConfig, + }) + ); + /** + * Each record should have a corresponding entry in the persistence store, + * if so then we sort the entries by expiry time and compare them to the + * expected values. Expiry times should be in the same order as the + * payload records. + */ + expect(idempotencyRecords.Items?.length).toEqual(3); + const idempotencyRecordsItems = idempotencyRecords.Items?.sort((a, b) => + a.expiryAttr > b.expiryAttr ? 1 : -1 + ); + + expect(idempotencyRecordsItems?.[0]).toStrictEqual({ + customId: `${functionNameCustomConfig}#${payloadHashes[0]}`, + dataAttr: payload.records[0], + statusAttr: 'COMPLETED', + expiryAttr: expect.any(Number), + inProgressExpiryAttr: expect.any(Number), + validationKeyAttr: validationHashes[0], + }); + + expect(idempotencyRecordsItems?.[1]).toStrictEqual({ + customId: `${functionNameCustomConfig}#${payloadHashes[1]}`, + dataAttr: payload.records[1], + statusAttr: 'COMPLETED', + expiryAttr: expect.any(Number), + inProgressExpiryAttr: expect.any(Number), + validationKeyAttr: validationHashes[1], + }); + + expect(idempotencyRecordsItems?.[2]).toStrictEqual({ + customId: `${functionNameCustomConfig}#${payloadHashes[2]}`, + dataAttr: payload.records[2], + statusAttr: 'COMPLETED', + expiryAttr: expect.any(Number), + inProgressExpiryAttr: expect.any(Number), + validationKeyAttr: validationHashes[2], + }); + + // During the first invocation, the processing function should have been called 3 times (once for each record) + expect(functionLogs[0]).toHaveLength(3); + expect(InvocationLogs.parseFunctionLog(functionLogs[0][0])).toEqual( + expect.objectContaining({ + baz: 0, // index of recursion in handler, assess that all function arguments are preserved + record: payload.records[0], + message: 'Got test event', + }) + ); + expect(InvocationLogs.parseFunctionLog(functionLogs[0][1])).toEqual( + expect.objectContaining({ + baz: 1, + record: payload.records[1], + message: 'Got test event', }) ); - console.log(result); - expect(result?.Item?.dataattr).toEqual('Processing done: bar'); - expect(result?.Item?.statusattr).toEqual('COMPLETED'); - expect(result?.Item?.expiryattr).toBeGreaterThan(Date.now() / 1000); - expect(invocationLogsCustmozed[0].getFunctionLogs().toString()).toContain( - 'Got test event' + expect(InvocationLogs.parseFunctionLog(functionLogs[0][2])).toEqual( + expect.objectContaining({ + baz: 2, + record: payload.records[2], + message: 'Got test event', + }) + ); + + // During the second invocation, the processing function should have been called 0 times (all records are idempotent) + expect(functionLogs[1]).toHaveLength(0); + }, + TEST_CASE_TIMEOUT + ); + + test( + 'when called twice with the same payload, it returns the same result and runs the handler once', + async () => { + // Prepare + const payload = { + foo: 'bar', + }; + const payloadHash = createHash('md5') + .update(JSON.stringify(payload.foo)) + .digest('base64'); + + // Act + const logs = await invokeFunction( + functionNameLambdaHandler, + 2, + 'SEQUENTIAL', + payload, + true + ); + const functionLogs = logs.map((log) => log.getFunctionLogs()); + + // Assess + const idempotencyRecords = await ddb.send( + new ScanCommand({ + TableName: ddbTableNameLambdaHandler, + }) + ); + expect(idempotencyRecords.Items?.length).toEqual(1); + expect(idempotencyRecords.Items?.[0].id).toEqual( + `${functionNameLambdaHandler}#${payloadHash}` + ); + expect(idempotencyRecords.Items?.[0].data).toEqual('bar'); + expect(idempotencyRecords.Items?.[0].status).toEqual('COMPLETED'); + + // During the first invocation the handler should be called, so the logs should contain 1 log + expect(functionLogs[0]).toHaveLength(1); + // We test the content of the log as well as the presence of fields from the context, this + // ensures that the all the arguments are passed to the handler when made idempotent + expect(InvocationLogs.parseFunctionLog(functionLogs[0][0])).toEqual( + expect.objectContaining({ + message: 'foo', + details: 'bar', + function_name: functionNameLambdaHandler, + }) ); + // During the second invocation the handler should not be called, so the logs should be empty + expect(functionLogs[1]).toHaveLength(0); }, TEST_CASE_TIMEOUT ); diff --git a/packages/idempotency/tests/helpers/idempotencyUtils.ts b/packages/idempotency/tests/helpers/idempotencyUtils.ts index 46e328f76a..aa3612d853 100644 --- a/packages/idempotency/tests/helpers/idempotencyUtils.ts +++ b/packages/idempotency/tests/helpers/idempotencyUtils.ts @@ -5,6 +5,7 @@ import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs'; import { TEST_RUNTIMES } from '../../../commons/tests/utils/e2eUtils'; import { BasePersistenceLayer } from '../../src/persistence'; import path from 'path'; +import { RetentionDays } from 'aws-cdk-lib/aws-logs'; export const createIdempotencyResources = ( stack: Stack, @@ -13,7 +14,8 @@ export const createIdempotencyResources = ( pathToFunction: string, functionName: string, handler: string, - ddbPkId?: string + ddbPkId?: string, + timeout?: number ): void => { const uniqueTableId = ddbTableName + v4().substring(0, 5); const ddbTable = new Table(stack, uniqueTableId, { @@ -31,12 +33,13 @@ export const createIdempotencyResources = ( runtime: TEST_RUNTIMES[runtime], functionName: functionName, entry: path.join(__dirname, `../e2e/${pathToFunction}`), - timeout: Duration.seconds(30), + timeout: Duration.seconds(timeout || 30), handler: handler, environment: { IDEMPOTENCY_TABLE_NAME: ddbTableName, POWERTOOLS_LOGGER_LOG_EVENT: 'true', }, + logRetention: RetentionDays.ONE_DAY, }); ddbTable.grantReadWriteData(nodeJsFunction); From 500306635838c2295140cd233ce56fe3fb431102 Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Sat, 8 Jul 2023 19:31:17 +0200 Subject: [PATCH 4/4] chore(idempotency): remove unreachable code --- packages/idempotency/src/middleware/makeHandlerIdempotent.ts | 5 ----- 1 file changed, 5 deletions(-) diff --git a/packages/idempotency/src/middleware/makeHandlerIdempotent.ts b/packages/idempotency/src/middleware/makeHandlerIdempotent.ts index 2efe0194e8..c9e750e53c 100644 --- a/packages/idempotency/src/middleware/makeHandlerIdempotent.ts +++ b/packages/idempotency/src/middleware/makeHandlerIdempotent.ts @@ -32,11 +32,6 @@ const getPersistenceStoreFromRequestInternal = ( const persistenceStore = request.internal[ `${IDEMPOTENCY_KEY}.idempotencyPersistenceStore` ] as BasePersistenceLayerInterface; - if (!persistenceStore) { - throw new Error( - 'Idempotency persistence store not found in request internal' - ); - } return persistenceStore; };