Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(idempotency): implement IdempotencyHandler #1416

Merged
merged 4 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 61 additions & 18 deletions packages/idempotency/src/IdempotencyHandler.ts
Original file line number Diff line number Diff line change
@@ -1,41 +1,84 @@
import type { AnyFunctionWithRecord, IdempotencyOptions } from './types';
import { IdempotencyRecordStatus } from './types';
import type {
AnyFunctionWithRecord,
IdempotencyOptions,
} from './types';
import {
IdempotencyAlreadyInProgressError,
IdempotencyInconsistentStateError,
IdempotencyItemAlreadyExistsError,
IdempotencyAlreadyInProgressError,
IdempotencyPersistenceLayerError
IdempotencyPersistenceLayerError,
} from './Exceptions';
import { IdempotencyRecord } from './persistence/IdempotencyRecord';
import { IdempotencyRecord } from './persistence';

export class IdempotencyHandler<U> {
public constructor(
private functionToMakeIdempotent: AnyFunctionWithRecord<U>,
private functionPayloadToBeHashed: Record<string, unknown>,
private functionPayloadToBeHashed: Record<string, unknown>,
private idempotencyOptions: IdempotencyOptions,
private fullFunctionPayload: Record<string, unknown>
) {}
private fullFunctionPayload: Record<string, unknown>,
) {
}

public determineResultFromIdempotencyRecord(idempotencyRecord: IdempotencyRecord): Promise<U> | U {
public determineResultFromIdempotencyRecord(
idempotencyRecord: IdempotencyRecord
): Promise<U> | U {
if (idempotencyRecord.getStatus() === IdempotencyRecordStatus.EXPIRED) {
throw new IdempotencyInconsistentStateError('Item has expired during processing and may not longer be valid.');
} else if (idempotencyRecord.getStatus() === IdempotencyRecordStatus.INPROGRESS){
throw new IdempotencyAlreadyInProgressError(`There is already an execution in progress with idempotency key: ${idempotencyRecord.idempotencyKey}`);
throw new IdempotencyInconsistentStateError(
'Item has expired during processing and may not longer be valid.'
);
} else if (
idempotencyRecord.getStatus() === IdempotencyRecordStatus.INPROGRESS
) {
if (
idempotencyRecord.inProgressExpiryTimestamp &&
idempotencyRecord.inProgressExpiryTimestamp <
new Date().getUTCMilliseconds()
) {
throw new IdempotencyInconsistentStateError(
'Item is in progress but the in progress expiry timestamp has expired.'
);
} else {
throw new IdempotencyAlreadyInProgressError(
`There is already an execution in progress with idempotency key: ${idempotencyRecord.idempotencyKey}`
);
}
} else {
// Currently recalling the method as this fulfills FR1. FR3 will address using the previously stored value https://github.com/awslabs/aws-lambda-powertools-typescript/issues/447
return this.functionToMakeIdempotent(this.fullFunctionPayload);
return this.functionToMakeIdempotent(this.fullFunctionPayload);
}
}

/**
* Main entry point for the handler
* IdempotencyInconsistentStateError can happen under rare but expected cases
* when persistent state changes in the small time between put & get requests.
* In most cases we can retry successfully on this exception.
*/
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we merge, can we fix this return type and remove the comment?

Based on what I see, we either need to change the return type to Promise<U | undefined> or make sure that the function returns something in the catch block when the if statement is skipped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this is weird construct we have there. Adding undefined to the Promise would leak it further to makeIdempotentFunction and also higher to the user api, which I'd like to avoid. TS complains about missing return statement outside of the for-loop. But adding a statement makes it impossible to test, because this return statement is unreachable. I have added a throw with coverage ignore.

public async handle(): Promise<U> {

const MAX_RETRIES = 2;
for (let i = 1; i <= MAX_RETRIES; i++) {
try {
return await this.processIdempotency();
} catch (e) {
if (!(e instanceof IdempotencyAlreadyInProgressError) || i === MAX_RETRIES) {
throw e;
}
}
}
}

public async processIdempotency(): Promise<U> {
try {
await this.idempotencyOptions.persistenceStore.saveInProgress(this.functionPayloadToBeHashed);
await this.idempotencyOptions.persistenceStore.saveInProgress(
this.functionPayloadToBeHashed,
);
} catch (e) {
if (e instanceof IdempotencyItemAlreadyExistsError) {
const idempotencyRecord: IdempotencyRecord = await this.idempotencyOptions.persistenceStore.getRecord(this.functionPayloadToBeHashed);
const idempotencyRecord: IdempotencyRecord =
await this.idempotencyOptions.persistenceStore.getRecord(
this.functionPayloadToBeHashed
);

return this.determineResultFromIdempotencyRecord(idempotencyRecord);
} else {
Expand All @@ -45,4 +88,4 @@ export class IdempotencyHandler<U> {

return this.functionToMakeIdempotent(this.fullFunctionPayload);
}
}
}
9 changes: 4 additions & 5 deletions packages/idempotency/src/idempotentDecorator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ const idempotent = function (options: IdempotencyOptions) {
descriptor.value = function(record: GenericTempRecord){
const idempotencyHandler = new IdempotencyHandler<GenericTempRecord>(childFunction, record[options.dataKeywordArgument], options, record);

return idempotencyHandler.processIdempotency();
return idempotencyHandler.handle();
};

return descriptor;
};
};
};

export { idempotent };

2 changes: 1 addition & 1 deletion packages/idempotency/src/makeFunctionIdempotent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const makeFunctionIdempotent = function <U>(
const wrappedFn: AnyIdempotentFunction<U> = function (record: GenericTempRecord): Promise<U> {
const idempotencyHandler: IdempotencyHandler<U> = new IdempotencyHandler<U>(fn, record[options.dataKeywordArgument], options, record);

return idempotencyHandler.processIdempotency();
return idempotencyHandler.handle();
};

return wrappedFn;
Expand Down
178 changes: 178 additions & 0 deletions packages/idempotency/tests/unit/IdempotencyHandler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/**
* Test Idempotency Handler
*
* @group unit/idempotency/IdempotencyHandler
*/

import {
IdempotencyAlreadyInProgressError, IdempotencyInconsistentStateError,
IdempotencyItemAlreadyExistsError,
IdempotencyPersistenceLayerError
} from '../../src/Exceptions';
import { IdempotencyOptions, IdempotencyRecordStatus } from '../../src/types';
import { BasePersistenceLayer, IdempotencyRecord } from '../../src/persistence';
import { IdempotencyHandler } from '../../src/IdempotencyHandler';

class PersistenceLayerTestClass extends BasePersistenceLayer {
protected _deleteRecord = jest.fn();
protected _getRecord = jest.fn();
protected _putRecord = jest.fn();
protected _updateRecord = jest.fn();
}

const mockFunctionToMakeIdempotent = jest.fn();
const mockFunctionPayloadToBeHashed = {};
const mockIdempotencyOptions: IdempotencyOptions = {
persistenceStore: new PersistenceLayerTestClass(),
dataKeywordArgument: 'testingKey'
};
const mockFullFunctionPayload = {};

const idempotentHandler = new IdempotencyHandler(
mockFunctionToMakeIdempotent,
mockFunctionPayloadToBeHashed,
mockIdempotencyOptions,
mockFullFunctionPayload,
);

describe('Class IdempotencyHandler', () => {
beforeEach(() => jest.resetAllMocks());

describe('Method: determineResultFromIdempotencyRecord', () => {
test('when record is in progress and within expiry window, it rejects with IdempotencyAlreadyInProgressError', async () => {

const stubRecord = new IdempotencyRecord({
idempotencyKey: 'idempotencyKey',
expiryTimestamp: Date.now() + 1000, // should be in the future
inProgressExpiryTimestamp: 0, // less than current time in milliseconds
responseData: { responseData: 'responseData' },
payloadHash: 'payloadHash',
status: IdempotencyRecordStatus.INPROGRESS
});

expect(stubRecord.isExpired()).toBe(false);
expect(stubRecord.getStatus()).toBe(IdempotencyRecordStatus.INPROGRESS);

try {
await idempotentHandler.determineResultFromIdempotencyRecord(stubRecord);
} catch (e) {
expect(e).toBeInstanceOf(IdempotencyAlreadyInProgressError);
}
});

test('when record is in progress and outside expiry window, it rejects with IdempotencyInconsistentStateError', async () => {

const stubRecord = new IdempotencyRecord({
idempotencyKey: 'idempotencyKey',
expiryTimestamp: Date.now() + 1000, // should be in the future
inProgressExpiryTimestamp: new Date().getUTCMilliseconds() - 1000, // should be in the past
responseData: { responseData: 'responseData' },
payloadHash: 'payloadHash',
status: IdempotencyRecordStatus.INPROGRESS
});

expect(stubRecord.isExpired()).toBe(false);
expect(stubRecord.getStatus()).toBe(IdempotencyRecordStatus.INPROGRESS);

try {
await idempotentHandler.determineResultFromIdempotencyRecord(stubRecord);
} catch (e) {
expect(e).toBeInstanceOf(IdempotencyInconsistentStateError);
}
});

test('when record is expired, it rejects with IdempotencyInconsistentStateError', async () => {

const stubRecord = new IdempotencyRecord({
idempotencyKey: 'idempotencyKey',
expiryTimestamp: new Date().getUTCMilliseconds() - 1000, // should be in the past
inProgressExpiryTimestamp: 0, // less than current time in milliseconds
responseData: { responseData: 'responseData' },
payloadHash: 'payloadHash',
status: IdempotencyRecordStatus.EXPIRED
});

expect(stubRecord.isExpired()).toBe(true);
expect(stubRecord.getStatus()).toBe(IdempotencyRecordStatus.EXPIRED);

try {
await idempotentHandler.determineResultFromIdempotencyRecord(stubRecord);
} catch (e) {
expect(e).toBeInstanceOf(IdempotencyInconsistentStateError);
}
});
});

describe('Method: handle', () => {

afterAll(() => jest.restoreAllMocks()); // restore processIdempotency for other tests

test('when IdempotencyAlreadyInProgressError is thrown, it retries two times', async () => {
const mockProcessIdempotency = jest.spyOn(IdempotencyHandler.prototype, 'processIdempotency').mockRejectedValue(new IdempotencyAlreadyInProgressError('There is already an execution in progress'));
await expect(
idempotentHandler.handle()
).rejects.toThrow(IdempotencyAlreadyInProgressError);
expect(mockProcessIdempotency).toHaveBeenCalledTimes(2);
});

test('when non IdempotencyAlreadyInProgressError is thrown, it rejects', async () => {

const mockProcessIdempotency = jest.spyOn(IdempotencyHandler.prototype, 'processIdempotency').mockRejectedValue(new Error('Some other error'));

await expect(
idempotentHandler.handle()
).rejects.toThrow(Error);
expect(mockProcessIdempotency).toHaveBeenCalledTimes(1);
});

});

describe('Method: processIdempotency', () => {

test('when persistenceStore saves successfuly, it resolves', async () => {
const mockSaveInProgress = jest.spyOn(mockIdempotencyOptions.persistenceStore, 'saveInProgress').mockResolvedValue();

mockFunctionToMakeIdempotent.mockImplementation(() => Promise.resolve('result'));

await expect(
idempotentHandler.processIdempotency()
).resolves.toBe('result');
expect(mockSaveInProgress).toHaveBeenCalledTimes(1);
});

test('when persistences store throws any error, it wraps the error to IdempotencyPersistencesLayerError', async () => {
const mockSaveInProgress = jest.spyOn(mockIdempotencyOptions.persistenceStore, 'saveInProgress').mockRejectedValue(new Error('Some error'));
const mockDetermineResultFromIdempotencyRecord = jest.spyOn(IdempotencyHandler.prototype, 'determineResultFromIdempotencyRecord').mockResolvedValue('result');

await expect(
idempotentHandler.processIdempotency()
).rejects.toThrow(IdempotencyPersistenceLayerError);
expect(mockSaveInProgress).toHaveBeenCalledTimes(1);
expect(mockDetermineResultFromIdempotencyRecord).toHaveBeenCalledTimes(0);
});

test('when idempotency item already exists, it returns the existing record', async () => {
const mockSaveInProgress = jest.spyOn(mockIdempotencyOptions.persistenceStore, 'saveInProgress').mockRejectedValue(new IdempotencyItemAlreadyExistsError('There is already an execution in progress'));

const stubRecord = new IdempotencyRecord({
idempotencyKey: 'idempotencyKey',
expiryTimestamp: 0,
inProgressExpiryTimestamp: 0,
responseData: { responseData: 'responseData' },
payloadHash: 'payloadHash',
status: IdempotencyRecordStatus.INPROGRESS
});
const mockGetRecord = jest.spyOn(mockIdempotencyOptions.persistenceStore, 'getRecord').mockImplementation(() => Promise.resolve(stubRecord));
const mockDetermineResultFromIdempotencyRecord = jest.spyOn(IdempotencyHandler.prototype, 'determineResultFromIdempotencyRecord').mockResolvedValue('result');

await expect(
idempotentHandler.processIdempotency()
).resolves.toBe('result');
expect(mockSaveInProgress).toHaveBeenCalledTimes(1);
expect(mockGetRecord).toHaveBeenCalledTimes(1);
expect(mockDetermineResultFromIdempotencyRecord).toHaveBeenCalledTimes(1);
});
});

});

Loading