diff --git a/packages/datastore/__tests__/outbox.test.ts b/packages/datastore/__tests__/outbox.test.ts index 3398e1effe0..2e527418676 100644 --- a/packages/datastore/__tests__/outbox.test.ts +++ b/packages/datastore/__tests__/outbox.test.ts @@ -1,83 +1,288 @@ import 'fake-indexeddb/auto'; import { - DataStore as DataStoreType, initSchema as initSchemaType, + syncClasses, + ModelInstanceCreator, } from '../src/datastore/datastore'; import { ExclusiveStorage as StorageType } from '../src/storage/storage'; - import { MutationEventOutbox } from '../src/sync/outbox'; - -import { Model, testSchema, internalTestSchema } from './helpers'; +import { ModelMerger } from '../src/sync/merger'; +import { Model as ModelType, testSchema, internalTestSchema } from './helpers'; +import { + TransformerMutationType, + createMutationInstanceFromModelOperation, +} from '../src/sync/utils'; +import { PersistentModelConstructor, InternalSchema } from '../src/types'; +import { MutationEvent } from '../src/sync/'; let initSchema: typeof initSchemaType; -// any in order to access private properties +// using to access private members let DataStore: any; -let Storage: typeof StorageType; -// let anyStorage: any; - +let Storage: StorageType; +let anyStorage: any; let outbox: MutationEventOutbox; +let merger: ModelMerger; +let modelInstanceCreator: ModelInstanceCreator; +let Model: PersistentModelConstructor; -import { PersistentModelConstructor } from '../src/types'; +const schema: InternalSchema = internalTestSchema(); -const ownSymbol = Symbol('sync'); +describe('Outbox tests', () => { + let modelId: string; -const MutationEvent = {}['MutationEvent'] as PersistentModelConstructor; + beforeAll(async () => { + // jest.resetModules(); + jest.resetAllMocks(); -outbox = new MutationEventOutbox( - internalTestSchema(), - null, - MutationEvent, - ownSymbol -); + await initializeOutbox(); -describe('Outbox tests', () => { - beforeAll(async () => { - ({ initSchema, DataStore } = require('../src/datastore/datastore')); - const classes = initSchema(testSchema()); + const newModel = new Model({ + field1: 'Some value', + dateCreated: new Date().toISOString(), + }); + + const mutationEvent = await createMutationEvent(newModel); + ({ modelId } = mutationEvent); + + await outbox.enqueue(Storage, mutationEvent); + }); + + it('Should return the create mutation from Outbox.peek', async () => { + await Storage.runExclusive(async s => { + let head = await outbox.peek(s); + const modelData: ModelType = JSON.parse(head.data); + + expect(head.modelId).toEqual(modelId); + expect(head.operation).toEqual(TransformerMutationType.CREATE); + expect(modelData.field1).toEqual('Some value'); + + const response = { + ...modelData, + _version: 1, + _lastChangedAt: Date.now(), + _deleted: false, + }; + + await processMutationResponse(s, response); + + head = await outbox.peek(s); + expect(head).toBeFalsy(); + }); + }); - const { Model } = classes as { - Model: PersistentModelConstructor; + it('Should sync the _version from a mutation response to other items with the same `id` in the queue', async () => { + const last = await DataStore.query(Model, modelId); + + const updatedModel1 = Model.copyOf(last, updated => { + updated.field1 = 'another value'; + updated.dateCreated = new Date().toISOString(); + }); + + const mutationEvent = await createMutationEvent(updatedModel1); + await outbox.enqueue(Storage, mutationEvent); + + await Storage.runExclusive(async s => { + // this mutation is now "in progress" + const head = await outbox.peek(s); + const modelData: ModelType = JSON.parse(head.data); + + expect(head.modelId).toEqual(modelId); + expect(head.operation).toEqual(TransformerMutationType.UPDATE); + expect(modelData.field1).toEqual('another value'); + + const mutationsForModel = await outbox.getForModel(s, last); + expect(mutationsForModel.length).toEqual(1); + }); + + // add 2 update mutations to the queue: + const updatedModel2 = Model.copyOf(last, updated => { + updated.field1 = 'another value2'; + updated.dateCreated = new Date().toISOString(); + }); + + await outbox.enqueue(Storage, await createMutationEvent(updatedModel2)); + + const updatedModel3 = Model.copyOf(last, updated => { + updated.field1 = 'another value3'; + updated.dateCreated = new Date().toISOString(); + }); + + await outbox.enqueue(Storage, await createMutationEvent(updatedModel3)); + + // model2 should get deleted when model3 is enqueued, so we're expecting to see + // 2 items in the queue for this Model total (including the in progress record - updatedModel1) + const mutationsForModel = await outbox.getForModel(Storage, last); + expect(mutationsForModel.length).toEqual(2); + + const [_inProgress, nextMutation] = mutationsForModel; + const modelData: ModelType = JSON.parse(nextMutation.data); + + // and the next item in the queue should be updatedModel3 + expect(modelData.field1).toEqual('another value3'); + + // response from AppSync for the first update mutation - updatedModel1: + const response = { + ...updatedModel1, + _version: (updatedModel1 as any)._version + 1, // increment version like we would expect coming back from AppSync + _lastChangedAt: Date.now(), + _deleted: false, }; - await DataStore.start(); + await Storage.runExclusive(async s => { + // process mutation response, which dequeues updatedModel1 + // and syncs its version to the remaining item in the mutation queue + await processMutationResponse(s, response); + + const inProgress = await outbox.peek(s); + const inProgressData = JSON.parse(inProgress.data); + // updatedModel3 should now be in progress with the _version from the mutation response + + expect(inProgressData.field1).toEqual('another value3'); + expect(inProgressData._version).toEqual(2); + + // response from AppSync for the second update mutation - updatedModel3: + const response2 = { + ...updatedModel3, + _version: inProgressData._version + 1, // increment version like we would expect coming back from AppSync + _lastChangedAt: Date.now(), + _deleted: false, + }; - Storage = DataStore.storage; + await processMutationResponse(s, response2); - outbox = new MutationEventOutbox( - internalTestSchema(), - null, - MutationEvent, - ownSymbol - ); + const head = await outbox.peek(s); + expect(head).toBeFalsy(); + }); }); - test('blagh', () => { - // outbox.enqueue(Storage, ) - expect(true).toBeTruthy(); + it('Should NOT sync the _version from a handled conflict mutation response', async () => { + const last = await DataStore.query(Model, modelId); + + const updatedModel1 = Model.copyOf(last, updated => { + updated.field1 = 'another value'; + updated.dateCreated = new Date().toISOString(); + }); + + const mutationEvent = await createMutationEvent(updatedModel1); + await outbox.enqueue(Storage, mutationEvent); + + await Storage.runExclusive(async s => { + // this mutation is now "in progress" + const head = await outbox.peek(s); + const modelData: ModelType = JSON.parse(head.data); + + expect(head.modelId).toEqual(modelId); + expect(head.operation).toEqual(TransformerMutationType.UPDATE); + expect(modelData.field1).toEqual('another value'); + + const mutationsForModel = await outbox.getForModel(s, last); + expect(mutationsForModel.length).toEqual(1); + }); + + // add an update mutations to the queue: + const updatedModel2 = Model.copyOf(last, updated => { + updated.field1 = 'another value2'; + updated.dateCreated = new Date().toISOString(); + }); + + await outbox.enqueue(Storage, await createMutationEvent(updatedModel2)); + + // 2 items in the queue for this Model total (including the in progress record - updatedModel1) + const mutationsForModel = await outbox.getForModel(Storage, last); + expect(mutationsForModel.length).toEqual(2); + + const [_inProgress, nextMutation] = mutationsForModel; + const modelData: ModelType = JSON.parse(nextMutation.data); + + // and the next item in the queue should be updatedModel2 + expect(modelData.field1).toEqual('another value2'); + + // response from AppSync with a handled conflict: + const response = { + ...updatedModel1, + field1: 'a different value set by another client', + _version: (updatedModel1 as any)._version + 1, // increment version like we would expect coming back from AppSync + _lastChangedAt: Date.now(), + _deleted: false, + }; + + await Storage.runExclusive(async s => { + // process mutation response, which dequeues updatedModel1 + // and syncs its version to the remaining item in the mutation queue + await processMutationResponse(s, response); + + const inProgress = await outbox.peek(s); + const inProgressData = JSON.parse(inProgress.data); + // updatedModel3 should now be in progress with the _version from the mutation response + + expect(inProgressData.field1).toEqual('another value2'); + + const oldVersion = (modelData as any)._version; + + expect(inProgressData._version).toEqual(oldVersion); + + // same response as above, + await processMutationResponse(s, response); + + const head = await outbox.peek(s); + expect(head).toBeFalsy(); + }); }); }); -const data = { - name: 'Title F - 16:22:17', - id: 'c4e457de-cfa6-49e9-84c4-48e3338ace26', - _version: 727, - _lastChangedAt: 1613596937293, - _deleted: null, -}; - -const mutationEvent = { - condition: '{}', - data: JSON.stringify(data), - id: '01EYRTP6B2R7AKMS5BJ6BRJPJS', - model: 'Todo', - modelId: 'c4e457de-cfa6-49e9-84c4-48e3338ace26', - operation: 'Update', -}; - -const response = { - id: 'c4e457de-cfa6-49e9-84c4-48e3338ace26', - name: 'Title Z - 16:29:51', - _version: 747, - _lastChangedAt: 1613597392344, - _deleted: null, -}; +// performs all the required dependency injection +// in order to have a functional Outbox without the Sync Engine +async function initializeOutbox(): Promise { + ({ initSchema, DataStore } = require('../src/datastore/datastore')); + const classes = initSchema(testSchema()); + const ownSymbol = Symbol('sync'); + + ({ Model } = classes as { + Model: PersistentModelConstructor; + }); + + const MutationEvent = syncClasses[ + 'MutationEvent' + ] as PersistentModelConstructor; + + await DataStore.start(); + + Storage = DataStore.storage; + anyStorage = Storage; + + ({ modelInstanceCreator } = anyStorage.storage); + + outbox = new MutationEventOutbox(schema, null, MutationEvent, ownSymbol); + merger = new ModelMerger(outbox, ownSymbol); +} + +async function createMutationEvent(model): Promise { + const [[originalElement, opType]] = await anyStorage.storage.save(model); + + const MutationEventConstructor = syncClasses[ + 'MutationEvent' + ] as PersistentModelConstructor; + + const modelConstructor = (Object.getPrototypeOf(originalElement) as Object) + .constructor as PersistentModelConstructor; + + return createMutationInstanceFromModelOperation( + undefined, + undefined, + opType, + modelConstructor, + originalElement, + {}, + MutationEventConstructor, + modelInstanceCreator + ); +} + +async function processMutationResponse(storage, record): Promise { + await outbox.dequeue(storage, record); + + const modelConstructor = Model as PersistentModelConstructor; + const model = modelInstanceCreator(modelConstructor, record); + + await merger.merge(storage, model); +} diff --git a/packages/datastore/src/datastore/datastore.ts b/packages/datastore/src/datastore/datastore.ts index 6fe1079483f..2e94bac06fb 100644 --- a/packages/datastore/src/datastore/datastore.ts +++ b/packages/datastore/src/datastore/datastore.ts @@ -105,12 +105,10 @@ const isValidModelConstructor = ( const namespaceResolver: NamespaceResolver = modelConstructor => modelNamespaceMap.get(modelConstructor); +// exporting for testing purposes +export let syncClasses: TypeConstructorMap; let dataStoreClasses: TypeConstructorMap; - let userClasses: TypeConstructorMap; - -let syncClasses: TypeConstructorMap; - let storageClasses: TypeConstructorMap; const initSchema = (userSchema: Schema) => { diff --git a/packages/datastore/src/sync/outbox.ts b/packages/datastore/src/sync/outbox.ts index 9a523c5f43d..1fc0f3e40ec 100644 --- a/packages/datastore/src/sync/outbox.ts +++ b/packages/datastore/src/sync/outbox.ts @@ -12,7 +12,7 @@ import { PersistentModelConstructor, QueryOne, } from '../types'; -import { SYNC } from '../util'; +import { SYNC, objectsEqual } from '../util'; import { TransformerMutationType } from './utils'; // TODO: Persist deleted ids @@ -31,7 +31,7 @@ class MutationEventOutbox { storage: Storage, mutationEvent: MutationEvent ): Promise { - storage.runExclusive(async s => { + return storage.runExclusive(async s => { const mutationEventModelDefinition = this.schema.namespaces[SYNC].models[ 'MutationEvent' ]; @@ -72,6 +72,7 @@ class MutationEventOutbox { // If no condition if (Object.keys(incomingCondition).length === 0) { + // delete all for model await s.delete(this.MutationEvent, predicate); } @@ -92,7 +93,6 @@ class MutationEventOutbox { } await storage.delete(head); - this.inProgressMutationEventId = undefined; return head; @@ -139,6 +139,8 @@ class MutationEventOutbox { return result; } + // applies _version from the AppSync mutation response to other items in the mutation queue with the same id + // see https://github.com/aws-amplify/amplify-js/pull/7354 for more details private async syncOutboxVersionsOnDequeue( storage: StorageClass, record: PersistentModel, @@ -157,7 +159,7 @@ class MutationEventOutbox { // Don't sync the version when the data in the response does not match the data // in the request, i.e., when there's a handled conflict - if (this.recordsDontMatch(incomingData, outgoingData)) { + if (!objectsEqual(incomingData, outgoingData)) { return; } @@ -197,35 +199,6 @@ class MutationEventOutbox { ) ); } - - // TODO: move to util and rename to e.g., deep compare - private recordsDontMatch( - outgoing: PersistentModel, - incoming: PersistentModel - ): boolean { - const outgoingKeys = Object.keys(outgoing); - const incomingKeys = Object.keys(incoming); - - if (outgoingKeys.length !== incomingKeys.length) { - return false; - } - - for (const key of outgoingKeys) { - const outgoingVal = outgoing[key]; - const incomingVal = incoming[key]; - - if (outgoingVal && typeof outgoingVal === 'object') { - // since we can't guarantee the order of object keys, we'll have to recursively - // call this method for values containing objects - if (this.recordsDontMatch(outgoingVal, incomingVal)) { - return true; - } - } else if (outgoingVal !== incomingVal) { - return true; - } - } - return false; - } } export { MutationEventOutbox }; diff --git a/packages/datastore/src/util.ts b/packages/datastore/src/util.ts index 81407316463..6a998f8e83e 100644 --- a/packages/datastore/src/util.ts +++ b/packages/datastore/src/util.ts @@ -434,6 +434,44 @@ export function sortCompareFunction( }; } +// deep compare any 2 objects (including arrays, Sets, and Maps) +// returns true if equal +export function objectsEqual(objA: object, objB: object): boolean { + let a = objA; + let b = objB; + + if (a instanceof Set && b instanceof Set) { + a = [...a]; + b = [...b]; + } + + // if (a instanceof Map && b instanceof Map) { + // a = Object.fromEntries(a); + // b = Object.fromEntries(b); + // } + + const aKeys = Object.keys(a); + const bKeys = Object.keys(b); + + if (aKeys.length !== bKeys.length) { + return false; + } + + for (const key of aKeys) { + const aVal = a[key]; + const bVal = b[key]; + + if (aVal && typeof aVal === 'object') { + if (!objectsEqual(aVal, bVal)) { + return false; + } + } else if (aVal !== bVal) { + return false; + } + } + return true; +} + export const isAWSDate = (val: string): boolean => { return !!/^\d{4}-\d{2}-\d{2}(Z|[+-]\d{2}:\d{2}($|:\d{2}))?$/.exec(val); };