diff --git a/packages/datastore/src/sync/merger.ts b/packages/datastore/src/sync/merger.ts index ca26bcd2d02..c90038d3cd7 100644 --- a/packages/datastore/src/sync/merger.ts +++ b/packages/datastore/src/sync/merger.ts @@ -5,7 +5,6 @@ import { PersistentModelConstructor, } from '../types'; import { MutationEventOutbox } from './outbox'; - class ModelMerger { constructor( private readonly outbox: MutationEventOutbox, diff --git a/packages/datastore/src/sync/outbox.ts b/packages/datastore/src/sync/outbox.ts index aed7adbb65d..eedadd7951c 100644 --- a/packages/datastore/src/sync/outbox.ts +++ b/packages/datastore/src/sync/outbox.ts @@ -11,9 +11,6 @@ import { import { SYNC } from '../util'; import { TransformerMutationType } from './utils'; -import { Logger } from '@aws-amplify/core'; -const logger = new Logger('DataStore - Outbox'); - // TODO: Persist deleted ids class MutationEventOutbox { @@ -31,23 +28,11 @@ class MutationEventOutbox { mutationEvent: MutationEvent ): Promise { storage.runExclusive(async s => { - const mutationEventModelDefinition = this.schema.namespaces[SYNC].models[ - 'MutationEvent' - ]; - - const predicate = ModelPredicateCreator.createFromExisting( - mutationEventModelDefinition, - c => - c - .modelId('eq', mutationEvent.modelId) - .id('ne', this.inProgressMutationEventId) - ); - + const predicate = this.currentPredicate(mutationEvent); const existing = await s.query(this.MutationEvent, predicate); const [first] = existing; - logger.debug('enqueue'); + if (first === undefined) { - logger.debug(name, { existing, mutationEvent }); await s.save(mutationEvent, undefined, this.ownSymbol); return; } @@ -56,6 +41,8 @@ class MutationEventOutbox { if (first.operation === TransformerMutationType.CREATE) { if (incomingMutationType === TransformerMutationType.DELETE) { + // get predicate again to avoid race condition with inProgressMutationEventId + const predicate = this.currentPredicate(mutationEvent); // delete all for model await s.delete(this.MutationEvent, predicate); } else { @@ -77,27 +64,21 @@ class MutationEventOutbox { mutationEvent ); - // TODO: delete this: - const { name } = JSON.parse(mutationEvent.data); - logger.debug(name, { existing, updated, mutationEvent }); - // If no condition if (Object.keys(incomingCondition).length === 0) { + // get predicate again to avoid race condition with inProgressMutationEventId + const predicate = this.currentPredicate(mutationEvent); // delete all for model await s.delete(this.MutationEvent, predicate); } if (updated) { - logger.debug('enqueue pre-save'); await s.save(updated, undefined, this.ownSymbol); - logger.debug('end enqueue'); return; } // Enqueue new one await s.save(mutationEvent, undefined, this.ownSymbol); - - logger.debug('end enqueue'); } }); } @@ -108,7 +89,16 @@ class MutationEventOutbox { ): Promise { const head = await this.peek(storage); - logger.debug('Dequeue', record); + const mutationEventModelDefinition = this.schema.namespaces[SYNC].models[ + 'MutationEvent' + ]; + + const predicate = ModelPredicateCreator.createFromExisting( + mutationEventModelDefinition, + c => c.modelId('eq', record.id) + ); + + const all = await storage.query(this.MutationEvent, predicate); if (record) { await this.reconcileOutboxOnDequeue(storage, record); @@ -118,8 +108,6 @@ class MutationEventOutbox { this.inProgressMutationEventId = undefined; - logger.debug('end Dequeue'); - return head; } @@ -215,8 +203,6 @@ class MutationEventOutbox { const outdatedMutations = await s.query(this.MutationEvent, predicate); - logger.debug('Reconcile Dequeue', outdatedMutations); - if (!outdatedMutations.length) { return; } @@ -242,6 +228,20 @@ class MutationEventOutbox { ); }); } + + private currentPredicate(mutationEvent: MutationEvent) { + const mutationEventModelDefinition = this.schema.namespaces[SYNC].models[ + 'MutationEvent' + ]; + + return ModelPredicateCreator.createFromExisting( + mutationEventModelDefinition, + c => + c + .modelId('eq', mutationEvent.modelId) + .id('ne', this.inProgressMutationEventId) + ); + } } export { MutationEventOutbox }; diff --git a/packages/datastore/src/sync/processors/mutation.ts b/packages/datastore/src/sync/processors/mutation.ts index 4360915526d..2035f6d214c 100644 --- a/packages/datastore/src/sync/processors/mutation.ts +++ b/packages/datastore/src/sync/processors/mutation.ts @@ -214,7 +214,6 @@ class MutationProcessor { do { try { - logger.debug('Outbox sending', tryWith); const result = >>( await API.graphql(tryWith) );