diff --git a/CHANGELOG.md b/CHANGELOG.md index d914d23..894a829 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,3 +13,9 @@ and this project adheres to ### Added - Support `consistentRead` option on `get` API + +## 1.5.0 - 2021-10-27 + +### Added + +- Support optimistic locking for `put`, `update` and `delete` APIs diff --git a/README.md b/README.md index d8b8ad2..1d1584b 100644 --- a/README.md +++ b/README.md @@ -137,6 +137,43 @@ const { total } = await myDocumentDao.decr( ); ``` +**Optimistic Locking with Version Numbers** + +For callers who wish to enable an optimistic locking strategy there are two +available toggles: + +1. Provide the attribute you wish to be used to store the version number. This + will enable optimistic locking on the following operations: `put`, `update`, + and `delete`. + + Writes for documents that do not have a version number attribute will + initialize the version number to 1. All subsequent writes will need to + provide the current version number. If an out-of-date version number is + supplied, an error will be thrown. + + Example of Dao constructed with optimistic locking enabled. + + ``` + const dao = new DynamoDbDao({ + tableName, + documentClient, + optimisticLockingAttribute: 'version', + }); + ``` + +2. If you wish to ignore optimistic locking for a save operation, specify + `ignoreOptimisticLocking: true` in the options on your `put`, `update`, or + `delete`. + +NOTE: Optimistic locking is NOT supported for `batchWrite` or `batchPut` +operations. Consuming those APIs for data models that do have optimistic locking +enabled may clobber your version data and could produce undesirable effects for +other callers. + +This was modeled after the +[Java Dynamo client](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBMapper.OptimisticLocking.html) +implementation. + ## Developing The test setup requires that [docker-compose]() be installed. To run the tests, diff --git a/package.json b/package.json index 658f97c..a07e709 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@jupiterone/dynamodb-dao", - "version": "1.4.0", + "version": "1.5.0", "description": "DynamoDB Data Access Object (DAO) helper library", "main": "index.js", "types": "index.d.ts", diff --git a/src/index.test.ts b/src/index.test.ts index 9073ec1..a02cadd 100644 --- a/src/index.test.ts +++ b/src/index.test.ts @@ -559,6 +559,82 @@ test('#generateUpdateParams should generate both update and remove params for do } }); +test('#generateUpdateParams should increment the version number', () => { + { + const options = { + tableName: 'blah2', + key: { + HashKey: 'abc', + }, + data: { + a: 123, + b: 'abc', + c: undefined, + lockVersion: 1, + }, + optimisticLockVersionAttribute: 'lockVersion', + }; + + expect(generateUpdateParams(options)).toEqual({ + TableName: options.tableName, + Key: options.key, + ReturnValues: 'ALL_NEW', + ConditionExpression: '#lockVersion = :lockVersion', + UpdateExpression: + 'add #lockVersion :lockVersionInc set #a0 = :a0, #a1 = :a1 remove #a2', + ExpressionAttributeNames: { + '#a0': 'a', + '#a1': 'b', + '#a2': 'c', + '#lockVersion': 'lockVersion', + }, + ExpressionAttributeValues: { + ':a0': options.data.a, + ':a1': options.data.b, + ':lockVersionInc': 1, + ':lockVersion': 1, + }, + }); + } +}); + +test('#generateUpdateParams should increment the version number even when not supplied', () => { + { + const options = { + tableName: 'blah3', + key: { + HashKey: 'abc', + }, + data: { + a: 123, + b: 'abc', + c: undefined, + }, + optimisticLockVersionAttribute: 'lockVersion', + }; + + expect(generateUpdateParams(options)).toEqual({ + TableName: options.tableName, + Key: options.key, + ReturnValues: 'ALL_NEW', + UpdateExpression: + 'add #lockVersion :lockVersionInc set #a0 = :a0, #a1 = :a1 remove #a2', + ConditionExpression: 'attribute_not_exists(lockVersion)', + ExpressionAttributeNames: { + '#a0': 'a', + '#a1': 'b', + '#a2': 'c', + '#lockVersion': 'lockVersion', + }, + ExpressionAttributeValues: { + ':a0': options.data.a, + ':a1': options.data.b, + ':lockVersionInc': 1, + }, + }); + } +}); + test(`#queryUntilLimitReached should call #query if "filterExpression" not provided`, async () => { const keyConditionExpression = 'id = :id'; const attributeValues = { id: uuid() }; diff --git a/src/index.ts b/src/index.ts index 674ddb7..ac5918c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -120,9 +120,57 @@ export interface ConditionalOptions { attributeValues?: AttributeValues; } -export type PutOptions = ConditionalOptions; -export type UpdateOptions = ConditionalOptions; -export type DeleteOptions = ConditionalOptions; +export interface SaveBehavior { + optimisticLockVersionAttribute?: string; + optimisticLockVersionIncrement?: number; +} + +export interface MutateBehavior { + ignoreOptimisticLocking?: boolean; +} + +export type PutOptions = ConditionalOptions & MutateBehavior; +export type UpdateOptions = ConditionalOptions & MutateBehavior; +export type DeleteOptions = ConditionalOptions & MutateBehavior; + +export interface BuildOptimisticLockOptionsInput extends ConditionalOptions { + versionAttribute: string; + versionAttributeValue: any; +} + +export function buildOptimisticLockOptions( + options: BuildOptimisticLockOptionsInput +): ConditionalOptions { + const { versionAttribute, versionAttributeValue } = options; + let { conditionExpression, attributeNames, attributeValues } = options; + + const lockExpression = versionAttributeValue + ? `#${versionAttribute} = :${versionAttribute}` + : `attribute_not_exists(${versionAttribute})`; + + conditionExpression = conditionExpression + ? `(${conditionExpression}) AND ${lockExpression}` + : lockExpression; + + if (versionAttributeValue) { + attributeNames = { + ...attributeNames, + [`#${versionAttribute}`]: versionAttribute, + }; + attributeValues = { + ...attributeValues, + [`:${versionAttribute}`]: versionAttributeValue, + }; + } + + return { + conditionExpression, + attributeNames, + attributeValues, + }; +} + +type DataModelAsMap = { [key: string]: any }; export interface GenerateUpdateParamsInput extends UpdateOptions { tableName: string; @@ -131,9 +179,10 @@ export interface GenerateUpdateParamsInput extends UpdateOptions { } export function generateUpdateParams( - options: GenerateUpdateParamsInput + options: GenerateUpdateParamsInput & SaveBehavior ): DocumentClient.UpdateItemInput { const setExpressions: string[] = []; + const addExpressions: string[] = []; const removeExpressions: string[] = []; const expressionAttributeNameMap: AttributeNames = {}; const expressionAttributeValueMap: AttributeValues = {}; @@ -142,15 +191,41 @@ export function generateUpdateParams( tableName, key, data, - conditionExpression, attributeNames, attributeValues, + optimisticLockVersionAttribute: versionAttribute, + optimisticLockVersionIncrement: versionInc, + ignoreOptimisticLocking: ignoreLocking = false, } = options; + let conditionExpression = options.conditionExpression; + + if (versionAttribute) { + addExpressions.push(`#${versionAttribute} :${versionAttribute}Inc`); + expressionAttributeNameMap[`#${versionAttribute}`] = versionAttribute; + expressionAttributeValueMap[`:${versionAttribute}Inc`] = versionInc ?? 1; + + if (!ignoreLocking) { + ({ conditionExpression } = buildOptimisticLockOptions({ + versionAttribute, + versionAttributeValue: (data as DataModelAsMap)[versionAttribute], + conditionExpression, + })); + expressionAttributeValueMap[`:${versionAttribute}`] = ( + data as DataModelAsMap + )[versionAttribute]; + } + } + const keys = Object.keys(options.data).sort(); for (let i = 0; i < keys.length; i++) { const name = keys[i]; + if (name === versionAttribute) { + // versionAttribute is a special case and should always be handled + // explicitly as above with the supplied value ignored + continue; + } const valueName = `:a${i}`; const attributeName = `#a${i}`; @@ -178,11 +253,13 @@ export function generateUpdateParams( ? 'remove ' + removeExpressions.join(', ') : undefined; + const addString = + addExpressions.length > 0 ? 'add ' + addExpressions.join(', ') : undefined; return { TableName: tableName, Key: key, ConditionExpression: conditionExpression, - UpdateExpression: [setString, removeString] + UpdateExpression: [addString, setString, removeString] .filter((val) => val !== undefined) .join(' '), ExpressionAttributeNames: { @@ -197,9 +274,10 @@ export function generateUpdateParams( }; } -interface DynamoDbDaoInput { +export interface DynamoDbDaoInput { tableName: string; documentClient: DocumentClient; + optimisticLockingAttribute?: keyof NumberPropertiesInType; } function invalidCursorError(cursor: string): Error { @@ -261,10 +339,12 @@ export type NumberPropertiesInType = Pick< export default class DynamoDbDao { public readonly tableName: string; public readonly documentClient: DocumentClient; + public readonly optimisticLockingAttribute?: keyof NumberPropertiesInType; - constructor(options: DynamoDbDaoInput) { + constructor(options: DynamoDbDaoInput) { this.tableName = options.tableName; this.documentClient = options.documentClient; + this.optimisticLockingAttribute = options.optimisticLockingAttribute; } /** @@ -292,16 +372,30 @@ export default class DynamoDbDao { */ async delete( key: KeySchema, - options: DeleteOptions = {} + options: DeleteOptions = {}, + data: Partial = {} ): Promise { + let { attributeNames, attributeValues, conditionExpression } = options; + + if (this.optimisticLockingAttribute && !options.ignoreOptimisticLocking) { + const versionAttribute = this.optimisticLockingAttribute.toString(); + ({ attributeNames, attributeValues, conditionExpression } = + buildOptimisticLockOptions({ + versionAttribute, + versionAttributeValue: (data as DataModelAsMap)[versionAttribute], + conditionExpression: conditionExpression, + attributeNames, + attributeValues, + })); + } const { Attributes: attributes } = await this.documentClient .delete({ TableName: this.tableName, Key: key, ReturnValues: 'ALL_OLD', - ConditionExpression: options.conditionExpression, - ExpressionAttributeNames: options.attributeNames, - ExpressionAttributeValues: options.attributeValues, + ConditionExpression: conditionExpression, + ExpressionAttributeNames: attributeNames, + ExpressionAttributeValues: attributeValues, }) .promise(); @@ -312,13 +406,36 @@ export default class DynamoDbDao { * Creates/Updates an item in the table */ async put(data: DataModel, options: PutOptions = {}): Promise { + let { conditionExpression, attributeNames, attributeValues } = options; + if (this.optimisticLockingAttribute) { + // Must cast data to avoid tripping the linter, otherwise, it'll complain + // about expression of type 'string' can't be used to index type 'unknown' + const dataAsMap = data as DataModelAsMap; + const versionAttribute = this.optimisticLockingAttribute.toString(); + + if (!options.ignoreOptimisticLocking) { + ({ conditionExpression, attributeNames, attributeValues } = + buildOptimisticLockOptions({ + versionAttribute, + versionAttributeValue: dataAsMap[versionAttribute], + conditionExpression, + attributeNames, + attributeValues, + })); + } + + dataAsMap[versionAttribute] = dataAsMap[versionAttribute] + ? dataAsMap[versionAttribute] + 1 + : 1; + } + await this.documentClient .put({ TableName: this.tableName, Item: data, - ConditionExpression: options.conditionExpression, - ExpressionAttributeNames: options.attributeNames, - ExpressionAttributeValues: options.attributeValues, + ConditionExpression: conditionExpression, + ExpressionAttributeNames: attributeNames, + ExpressionAttributeValues: attributeValues, }) .promise(); return data; @@ -337,6 +454,8 @@ export default class DynamoDbDao { key, data, ...updateOptions, + optimisticLockVersionAttribute: + this.optimisticLockingAttribute?.toString(), }); const { Attributes: attributes } = await this.documentClient .update(params) diff --git a/test/delete.locking.test.ts b/test/delete.locking.test.ts new file mode 100644 index 0000000..7e93590 --- /dev/null +++ b/test/delete.locking.test.ts @@ -0,0 +1,95 @@ +import { v4 as uuid } from 'uuid'; +import TestContext from './helpers/TestContext'; + +let context: TestContext; + +beforeAll(async () => { + context = await TestContext.setup(true); +}); + +afterAll(() => { + if (context) { + return context.teardown(); + } +}); + +test('should require version number to remove item from the table', async () => { + const { dao } = context; + + const key = { id: uuid() }; + const updateData = { test: uuid(), newField: uuid() }; + + // put data into dynamodb, which should set the version number + await dao.update(key, updateData); + await dao.update(key, { ...updateData, version: 1 }); + + // ensure it exists + const item = await dao.get(key); + expect(item).toEqual({ + ...key, + ...updateData, + version: 2, + }); + + await dao.delete(key, undefined, { + version: 2, + }); + + expect(await dao.get(key)).toBeUndefined(); +}); + +test('should error when version number is missing when removing item from the table', async () => { + const { dao } = context; + + const key = { id: uuid() }; + const updateData = { test: uuid(), newField: uuid() }; + + // put data into dynamodb, which should set the version number + await dao.update(key, updateData); + await dao.update(key, { ...updateData, version: 1 }); + + await expect(async () => { + await dao.delete(key); + }).rejects.toThrow('The conditional request failed'); +}); + +test('should error when version number is old when removing item from the table', async () => { + const { dao } = context; + + const key = { id: uuid() }; + const updateData = { test: uuid(), newField: uuid() }; + + // put data into dynamodb, which should set the version number + await dao.update(key, updateData); + await dao.update(key, { ...updateData, version: 1 }); + + await expect(async () => { + await dao.delete(key, undefined, { + version: 1, + }); + }).rejects.toThrow('The conditional request failed'); +}); + +test('should not require version number to remove item from the table when ignore flag is set', async () => { + const { dao } = context; + + const key = { id: uuid() }; + const updateData = { test: uuid(), newField: uuid() }; + + // put data into dynamodb, which should set the version number + await dao.update(key, updateData); + + // ensure it exists + const item = await dao.get(key); + expect(item).toEqual({ + ...key, + ...updateData, + version: 1, + }); + + await dao.delete(key, { ignoreOptimisticLocking: true }); + + // ensure it deleted + const deletedItem = await dao.get(key); + expect(deletedItem).toEqual(undefined); +}); diff --git a/test/helpers/TestContext.ts b/test/helpers/TestContext.ts index 97f492a..024a0c3 100644 --- a/test/helpers/TestContext.ts +++ b/test/helpers/TestContext.ts @@ -2,7 +2,7 @@ import { DynamoDB } from 'aws-sdk'; import { v4 as uuid } from 'uuid'; -import DynamoDbDao from '../../src'; +import DynamoDbDao, { DynamoDbDaoInput } from '../../src'; const { DYNAMODB_ENDPOINT = 'http://localhost:8000' } = process.env; @@ -43,7 +43,9 @@ export default class TestContext { this.dao = dao; } - static async setup(): Promise { + static async setup( + useOptimisticLocking: boolean = false + ): Promise { const tableName = uuid(); const indexName = uuid(); @@ -98,7 +100,8 @@ export default class TestContext { const dao = new DynamoDbDao({ tableName, documentClient, - }); + optimisticLockingAttribute: useOptimisticLocking ? 'version' : undefined, + } as DynamoDbDaoInput); return new TestContext(tableName, indexName, dao); } diff --git a/test/put.locking.test.ts b/test/put.locking.test.ts new file mode 100644 index 0000000..f066709 --- /dev/null +++ b/test/put.locking.test.ts @@ -0,0 +1,146 @@ +import { v4 as uuid } from 'uuid'; +import TestContext, { documentClient } from './helpers/TestContext'; + +let context: TestContext; + +beforeAll(async () => { + context = await TestContext.setup(true); +}); + +afterAll(() => { + if (context) { + return context.teardown(); + } +}); + +test('should add version number on first put', async () => { + const { tableName, dao } = context; + + const key = { id: uuid() }; + + const input = { + ...key, + test: uuid(), + }; + + // put data into dynamodb + await dao.put(input); + + // ensure it exists + const { Item: item } = await documentClient + .get({ + TableName: tableName, + Key: key, + }) + .promise(); + + expect(item).toEqual({ + ...input, + version: 1, + }); +}); + +test('should throw error if version number is not supplied on second update', async () => { + const { tableName, dao } = context; + + const key = { id: uuid() }; + + const input = { + ...key, + test: uuid(), + }; + + // put data into dynamodb + await dao.put(input); + + expect(1).toEqual(1); + + // ensure it exists + const { Item: item } = await documentClient + .get({ + TableName: tableName, + Key: key, + }) + .promise(); + + expect(item).toEqual({ + ...input, + version: 1, + }); + + await expect(async () => { + await dao.put({ + ...key, + test: uuid(), + }); + }).rejects.toThrow('The conditional request failed'); +}); + +test('should allow multiple puts if version number is incremented', async () => { + const { tableName, dao } = context; + + const key = { id: uuid() }; + + const input = { + ...key, + test: uuid(), + }; + + // put data into dynamodb + await dao.put(input); + await dao.put({ + ...input, + version: 1, + }); + + // ensure it exists + const { Item: item } = await documentClient + .get({ + TableName: tableName, + Key: key, + }) + .promise(); + + expect(item).toEqual({ + ...input, + version: 2, + }); +}); + +test('should allow multiple puts if version number is incremented when multiple conditions exist', async () => { + const { tableName, dao } = context; + + const key = { id: uuid() }; + + const input = { + ...key, + test: uuid(), + }; + + // put data into dynamodb + await dao.put(input); + await dao.put( + { + ...input, + version: 1, + }, + { + attributeNames: { '#test': 'test' }, + attributeValues: { ':test': input.test, ':test2': '2' }, + conditionExpression: '#test = :test or #test = :test2', + } + ); + + // ensure it exists + const { Item: item } = await documentClient + .get({ + TableName: tableName, + Key: key, + }) + .promise(); + + expect(item).toEqual({ + ...input, + version: 2, + }); +}); diff --git a/test/update.locking.test.ts b/test/update.locking.test.ts new file mode 100644 index 0000000..952b89c --- /dev/null +++ b/test/update.locking.test.ts @@ -0,0 +1,124 @@ +import { v4 as uuid } from 'uuid'; +import TestContext, { documentClient } from './helpers/TestContext'; + +let context: TestContext; + +beforeAll(async () => { + context = await TestContext.setup(true); +}); + +afterAll(() => { + if (context) { + return context.teardown(); + } +}); + +let key: any; +let item: any; + +beforeEach(async () => { + const { tableName } = context; + + key = { id: uuid() }; + + const input = { + ...key, + test: uuid(), + }; + + // put data into dynamodb + await documentClient + .put({ + TableName: tableName, + Item: input, + }) + .promise(); + + // ensure it exists + const { Item: storedItem } = await documentClient + .get({ + TableName: tableName, + Key: key, + }) + .promise(); + + // eslint-disable-next-line jest/no-standalone-expect + expect(storedItem).toEqual(input); + item = storedItem; +}); + +test('should set the version number to 1 on first update', async () => { + const { tableName, dao } = context; + const updateData = { test: uuid(), newField: uuid() }; + await dao.update(key, updateData); + + const { Item: updatedItem } = await documentClient + .get({ + TableName: tableName, + Key: key, + }) + .promise(); + + expect(updatedItem).toEqual({ + ...item, + ...updateData, + version: 1, + }); +}); + +test('should increment the version number by 1 on subsequent updates', async () => { + const { tableName, dao } = context; + const updateData = { test: uuid(), newField: uuid() }; + await dao.update(key, updateData); + await dao.update(key, { ...updateData, version: 1 }); + await dao.update(key, { ...updateData, version: 2 }); + + const { Item: updatedItem } = await documentClient + .get({ + TableName: tableName, + Key: key, + }) + .promise(); + + expect(updatedItem).toEqual({ + ...item, + ...updateData, + version: 3, + }); +}); + +test('should error if update does not supply the correct version number', async () => { + const { dao } = context; + const updateData = { test: uuid(), newField: uuid() }; + // sets the initial version to 1 + await dao.update(key, updateData); + + await expect(async () => { + // doesn't supply a version, throws error + await dao.update(key, updateData); + }).rejects.toThrow('The conditional request failed'); +}); + +test('should allow update without correct version number if ignore flag is set', async () => { + const { tableName, dao } = context; + const updateData = { test: uuid(), newField: uuid() }; + // sets the initial version to 1 + await dao.update(key, updateData); + // Still increments the version + await dao.update(key, updateData, { + ignoreOptimisticLocking: true, + }); + + const { Item: updatedItem } = await documentClient + .get({ + TableName: tableName, + Key: key, + }) + .promise(); + + expect(updatedItem).toEqual({ + ...item, + ...updateData, + version: 2, + }); +}); diff --git a/test/update.test.ts b/test/update.test.ts index df7c24f..f2e3d41 100644 --- a/test/update.test.ts +++ b/test/update.test.ts @@ -1,7 +1,6 @@ -import TestContext, { documentClient } from './helpers/TestContext'; import { v4 as uuid } from 'uuid'; - import reservedWords from './fixtures/reservedWords'; +import TestContext, { documentClient } from './helpers/TestContext'; let context: TestContext;