Skip to content

Commit

Permalink
fix(idempotency): skip persistence for optional idempotency key (#1507)
Browse files Browse the repository at this point in the history
* add skip idempotency step in the handler in specific cases
  • Loading branch information
Alexander Schueren committed Jun 20, 2023
1 parent 043b4ad commit b9fcef6
Show file tree
Hide file tree
Showing 17 changed files with 311 additions and 109 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"scripts": {
"init-environment": "husky install",
"test": "npm t -ws",
"test:e2e": "npm run test:e2e -ws",
"commit": "commit",
"package": "npm run package -ws",
"setup-local": "npm ci && npm run build && npm run init-environment",
Expand Down
12 changes: 3 additions & 9 deletions packages/commons/tests/utils/e2eUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* E2E utils is used by e2e tests. They are helper function that calls either CDK or SDK
* to interact with services.
*/
import { App, CfnOutput, Stack, Duration } from 'aws-cdk-lib';
import { App, CfnOutput, Duration, Stack } from 'aws-cdk-lib';
import {
NodejsFunction,
NodejsFunctionProps,
Expand Down Expand Up @@ -91,15 +91,11 @@ export const invokeFunction = async (
): Promise<InvocationLogs[]> => {
const invocationLogs: InvocationLogs[] = [];

const promiseFactory = (
index?: number,
includeIndex = true
): Promise<void> => {
const promiseFactory = (index?: number): Promise<void> => {
// in some cases we need to send a payload without the index, i.e. idempotency tests
const payloadToSend = includeIndex
? { invocation: index, ...payload }
: { ...payload };

const invokePromise = lambdaClient
.send(
new InvokeCommand({
Expand All @@ -126,9 +122,7 @@ export const invokeFunction = async (

const invocation =
invocationMode == 'PARALLEL'
? Promise.all(
promiseFactories.map((factory, index) => factory(index, includeIndex))
)
? Promise.all(promiseFactories.map((factory, index) => factory(index)))
: chainPromises(promiseFactories);
await invocation;

Expand Down
2 changes: 2 additions & 0 deletions packages/idempotency/src/IdempotencyConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class IdempotencyConfig {
/**
* Throw an error if the idempotency key is not found in the event.
* In some cases, you may want to allow the request to continue without idempotency.
* If set to false and idempotency key is not found, the request will continue without idempotency.
* @default false
*/
public throwOnNoIdempotencyKey: boolean;
/**
Expand Down
31 changes: 31 additions & 0 deletions packages/idempotency/src/IdempotencyHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
import { BasePersistenceLayer, IdempotencyRecord } from './persistence';
import { IdempotencyConfig } from './IdempotencyConfig';
import { MAX_RETRIES } from './constants';
import { search } from 'jmespath';

/**
* @internal
Expand Down Expand Up @@ -127,6 +128,17 @@ export class IdempotencyHandler<U> {
}

public async processIdempotency(): Promise<U> {
// early return if we should skip idempotency completely
if (
IdempotencyHandler.shouldSkipIdempotency(
this.idempotencyConfig.eventKeyJmesPath,
this.idempotencyConfig.throwOnNoIdempotencyKey,
this.fullFunctionPayload
)
) {
return await this.functionToMakeIdempotent(this.fullFunctionPayload);
}

try {
await this.persistenceStore.saveInProgress(
this.functionPayloadToBeHashed
Expand All @@ -146,4 +158,23 @@ export class IdempotencyHandler<U> {

return this.getFunctionResult();
}

/**
* avoid idempotency if the eventKeyJmesPath is not present in the payload and throwOnNoIdempotencyKey is false
* static so {@link makeHandlerIdempotent} middleware can use it
* TOOD: refactor so middy uses IdempotencyHandler internally wihtout reimplementing the logic
* @param eventKeyJmesPath
* @param throwOnNoIdempotencyKey
* @param fullFunctionPayload
* @private
*/
public static shouldSkipIdempotency(
eventKeyJmesPath: string,
throwOnNoIdempotencyKey: boolean,
fullFunctionPayload: Record<string, unknown>
): boolean {
return (eventKeyJmesPath &&
!throwOnNoIdempotencyKey &&
!search(fullFunctionPayload, eventKeyJmesPath)) as boolean;
}
}
24 changes: 22 additions & 2 deletions packages/idempotency/src/middleware/makeHandlerIdempotent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import { IdempotencyHandler } from '../IdempotencyHandler';
import { IdempotencyConfig } from '../IdempotencyConfig';
import { cleanupMiddlewares } from '@aws-lambda-powertools/commons/lib/middleware';
import {
IdempotencyInconsistentStateError,
IdempotencyItemAlreadyExistsError,
IdempotencyPersistenceLayerError,
IdempotencyInconsistentStateError,
} from '../Exceptions';
import { IdempotencyRecord } from '../persistence';
import { MAX_RETRIES } from '../constants';
Expand Down Expand Up @@ -50,6 +50,9 @@ const makeHandlerIdempotent = (
config: idempotencyConfig,
});

// keep the flag for after and onError checks
let shouldSkipIdempotency = false;

/**
* Function called before the handler is executed.
*
Expand All @@ -72,6 +75,18 @@ const makeHandlerIdempotent = (
request: MiddyLikeRequest,
retryNo = 0
): Promise<unknown | void> => {
if (
IdempotencyHandler.shouldSkipIdempotency(
idempotencyConfig.eventKeyJmesPath,
idempotencyConfig.throwOnNoIdempotencyKey,
request.event as Record<string, unknown>
)
) {
// set the flag to skip checks in after and onError
shouldSkipIdempotency = true;

return;
}
try {
await persistenceStore.saveInProgress(
request.event as Record<string, unknown>,
Expand Down Expand Up @@ -114,7 +129,6 @@ const makeHandlerIdempotent = (
}
}
};

/**
* Function called after the handler has executed successfully.
*
Expand All @@ -125,6 +139,9 @@ const makeHandlerIdempotent = (
* @param request - The Middy request object
*/
const after = async (request: MiddyLikeRequest): Promise<void> => {
if (shouldSkipIdempotency) {
return;
}
try {
await persistenceStore.saveSuccess(
request.event as Record<string, unknown>,
Expand All @@ -146,6 +163,9 @@ const makeHandlerIdempotent = (
* @param request - The Middy request object
*/
const onError = async (request: MiddyLikeRequest): Promise<void> => {
if (shouldSkipIdempotency) {
return;
}
try {
await persistenceStore.deleteRecord(
request.event as Record<string, unknown>
Expand Down
7 changes: 5 additions & 2 deletions packages/idempotency/src/persistence/BasePersistenceLayer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { createHash, Hash } from 'node:crypto';
import { search } from 'jmespath';
import { IdempotencyRecordStatus } from '../types';
import type { BasePersistenceLayerOptions } from '../types';
import { IdempotencyRecordStatus } from '../types';
import { EnvironmentVariablesService } from '../config';
import { IdempotencyRecord } from './IdempotencyRecord';
import { BasePersistenceLayerInterface } from './BasePersistenceLayerInterface';
Expand Down Expand Up @@ -176,10 +176,13 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
}

protected abstract _deleteRecord(record: IdempotencyRecord): Promise<void>;

protected abstract _getRecord(
idempotencyKey: string
): Promise<IdempotencyRecord>;

protected abstract _putRecord(record: IdempotencyRecord): Promise<void>;

protected abstract _updateRecord(record: IdempotencyRecord): Promise<void>;

private deleteFromCache(idempotencyKey: string): void {
Expand Down Expand Up @@ -294,7 +297,7 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
* Save record to local cache except for when status is `INPROGRESS`.
*
* We can't cache `INPROGRESS` records because we have no way to reflect updates
* that might happen to the record outside of the execution context of the function.
* that might happen to the record outside the execution context of the function.
*
* @param record - record to save
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { LambdaInterface } from '@aws-lambda-powertools/commons';
import { idempotentFunction, idempotentLambdaHandler } from '../../src';
import { Logger } from '../../../logger';
import { DynamoDBPersistenceLayer } from '../../src/persistence/DynamoDBPersistenceLayer';
import { IdempotencyConfig } from '../../src/';

const IDEMPOTENCY_TABLE_NAME =
process.env.IDEMPOTENCY_TABLE_NAME || 'table_name';
Expand Down Expand Up @@ -62,10 +63,25 @@ class DefaultLambda implements LambdaInterface {
_context: Context
): Promise<string> {
logger.info(`Got test event: ${JSON.stringify(_event)}`);
// sleep for 5 seconds

throw new Error('Failed');
}

@idempotentLambdaHandler({
persistenceStore: dynamoDBPersistenceLayer,
config: new IdempotencyConfig({
eventKeyJmesPath: 'idempotencyKey',
throwOnNoIdempotencyKey: false,
}),
})
public async handlerWithOptionalIdempoitencyKey(
_event: TestEvent,
_context: Context
): Promise<string> {
logger.info(`Got test event: ${JSON.stringify(_event)}`);

return 'This should not be stored in DynamoDB';
}
}

const defaultLambda = new DefaultLambda();
Expand All @@ -74,6 +90,9 @@ export const handlerCustomized =
defaultLambda.handlerCustomized.bind(defaultLambda);
export const handlerFails = defaultLambda.handlerFails.bind(defaultLambda);

export const handlerWithOptionalIdempoitencyKey =
defaultLambda.handlerWithOptionalIdempoitencyKey.bind(defaultLambda);

const logger = new Logger();

class LambdaWithKeywordArgument implements LambdaInterface {
Expand Down
40 changes: 39 additions & 1 deletion packages/idempotency/tests/e2e/idempotencyDecorator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
destroyStack,
} from '../../../commons/tests/utils/cdk-cli';
import { LEVEL } from '../../../commons/tests/utils/InvocationLogs';
import { GetCommand } from '@aws-sdk/lib-dynamodb';
import { GetCommand, ScanCommand } from '@aws-sdk/lib-dynamodb';
import { createHash } from 'node:crypto';
import { createIdempotencyResources } from '../helpers/idempotencyUtils';

Expand Down Expand Up @@ -110,6 +110,23 @@ createIdempotencyResources(
functionNameFails,
'handlerFails'
);

const functionNameOptionalIdempotencyKey = generateUniqueName(
RESOURCE_NAME_PREFIX,
uuid,
runtime,
'optionalIdempotencyKey'
);
const ddbTableNameOptionalIdempotencyKey =
stackName + '-optional-idempotencyKey-table';
createIdempotencyResources(
stack,
runtime,
ddbTableNameOptionalIdempotencyKey,
decoratorFunctionFile,
functionNameOptionalIdempotencyKey,
'handlerWithOptionalIdempoitencyKey'
);
describe('Idempotency e2e test decorator, default settings', () => {
beforeAll(async () => {
await deployStack(app, stack);
Expand Down Expand Up @@ -285,6 +302,27 @@ describe('Idempotency e2e test decorator, default settings', () => {
TEST_CASE_TIMEOUT
);

test(
'when called with a function with optional idempotency key and thorwOnNoIdempotencyKey is false, it does not create ddb entry',
async () => {
const payload = { foo: 'baz' }; // we set eventKeyJmesPath: 'idempotencyKey' in the idempotency configuration
await invokeFunction(
functionNameOptionalIdempotencyKey,
2,
'PARALLEL',
payload,
false
);
const result = await ddb.send(
new ScanCommand({
TableName: ddbTableNameOptionalIdempotencyKey,
})
);
expect(result?.Items).toEqual([]);
},
TEST_CASE_TIMEOUT
);

afterAll(async () => {
if (!process.env.DISABLE_TEARDOWN) {
await destroyStack(app, stack);
Expand Down
38 changes: 38 additions & 0 deletions packages/idempotency/tests/unit/IdempotencyHandler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,44 @@ describe('Class IdempotencyHandler', () => {
expect(mockGetRecord).toHaveBeenCalledTimes(1);
expect(mockDetermineResultFromIdempotencyRecord).toHaveBeenCalledTimes(1);
});

test('when throwOnNoIdempotencyKey is false and the key is missing, we skip idempotency', async () => {
const idempotentHandlerSkips = new IdempotencyHandler({
functionToMakeIdempotent: mockFunctionToMakeIdempotent,
functionPayloadToBeHashed: mockFunctionPayloadToBeHashed,
persistenceStore: mockIdempotencyOptions.persistenceStore,
fullFunctionPayload: mockFullFunctionPayload,
idempotencyConfig: new IdempotencyConfig({
throwOnNoIdempotencyKey: false,
eventKeyJmesPath: 'idempotencyKey',
}),
});

const mockSaveInProgress = jest.spyOn(
mockIdempotencyOptions.persistenceStore,
'saveInProgress'
);

const mockSaveSuccessfulResult = jest.spyOn(
mockIdempotencyOptions.persistenceStore,
'saveSuccess'
);
const mockGetRecord = jest.spyOn(
mockIdempotencyOptions.persistenceStore,
'getRecord'
);

mockFunctionToMakeIdempotent.mockImplementation(() => {
return 'result';
});

await expect(idempotentHandlerSkips.processIdempotency()).resolves.toBe(
'result'
);
expect(mockSaveInProgress).toHaveBeenCalledTimes(0);
expect(mockGetRecord).toHaveBeenCalledTimes(0);
expect(mockSaveSuccessfulResult).toHaveBeenCalledTimes(0);
});
});

describe('Method: getFunctionResult', () => {
Expand Down
Loading

0 comments on commit b9fcef6

Please sign in to comment.