Skip to content

Commit

Permalink
outbox - update predicate before deleting
Browse files Browse the repository at this point in the history
  • Loading branch information
iartemiev committed Jan 23, 2021
1 parent f55ecfc commit 01129ce
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 32 deletions.
1 change: 0 additions & 1 deletion packages/datastore/src/sync/merger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
PersistentModelConstructor,
} from '../types';
import { MutationEventOutbox } from './outbox';

class ModelMerger {
constructor(
private readonly outbox: MutationEventOutbox,
Expand Down
60 changes: 30 additions & 30 deletions packages/datastore/src/sync/outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import {
import { SYNC } from '../util';
import { TransformerMutationType } from './utils';

import { Logger } from '@aws-amplify/core';
const logger = new Logger('DataStore - Outbox');

// TODO: Persist deleted ids

class MutationEventOutbox {
Expand All @@ -31,23 +28,11 @@ class MutationEventOutbox {
mutationEvent: MutationEvent
): Promise<void> {
storage.runExclusive(async s => {
const mutationEventModelDefinition = this.schema.namespaces[SYNC].models[
'MutationEvent'
];

const predicate = ModelPredicateCreator.createFromExisting<MutationEvent>(
mutationEventModelDefinition,
c =>
c
.modelId('eq', mutationEvent.modelId)
.id('ne', this.inProgressMutationEventId)
);

const predicate = this.currentPredicate(mutationEvent);
const existing = await s.query(this.MutationEvent, predicate);
const [first] = existing;
logger.debug('enqueue');

if (first === undefined) {
logger.debug(name, { existing, mutationEvent });
await s.save(mutationEvent, undefined, this.ownSymbol);
return;
}
Expand All @@ -56,6 +41,8 @@ class MutationEventOutbox {

if (first.operation === TransformerMutationType.CREATE) {
if (incomingMutationType === TransformerMutationType.DELETE) {
// get predicate again to avoid race condition with inProgressMutationEventId
const predicate = this.currentPredicate(mutationEvent);
// delete all for model
await s.delete(this.MutationEvent, predicate);
} else {
Expand All @@ -77,27 +64,21 @@ class MutationEventOutbox {
mutationEvent
);

// TODO: delete this:
const { name } = JSON.parse(mutationEvent.data);
logger.debug(name, { existing, updated, mutationEvent });

// If no condition
if (Object.keys(incomingCondition).length === 0) {
// get predicate again to avoid race condition with inProgressMutationEventId
const predicate = this.currentPredicate(mutationEvent);
// delete all for model
await s.delete(this.MutationEvent, predicate);
}

if (updated) {
logger.debug('enqueue pre-save');
await s.save(updated, undefined, this.ownSymbol);
logger.debug('end enqueue');
return;
}

// Enqueue new one
await s.save(mutationEvent, undefined, this.ownSymbol);

logger.debug('end enqueue');
}
});
}
Expand All @@ -108,7 +89,16 @@ class MutationEventOutbox {
): Promise<MutationEvent> {
const head = await this.peek(storage);

logger.debug('Dequeue', record);
const mutationEventModelDefinition = this.schema.namespaces[SYNC].models[
'MutationEvent'
];

const predicate = ModelPredicateCreator.createFromExisting<MutationEvent>(
mutationEventModelDefinition,
c => c.modelId('eq', record.id)
);

const all = await storage.query(this.MutationEvent, predicate);

if (record) {
await this.reconcileOutboxOnDequeue(storage, record);
Expand All @@ -118,8 +108,6 @@ class MutationEventOutbox {

this.inProgressMutationEventId = undefined;

logger.debug('end Dequeue');

return head;
}

Expand Down Expand Up @@ -215,8 +203,6 @@ class MutationEventOutbox {

const outdatedMutations = await s.query(this.MutationEvent, predicate);

logger.debug('Reconcile Dequeue', outdatedMutations);

if (!outdatedMutations.length) {
return;
}
Expand All @@ -242,6 +228,20 @@ class MutationEventOutbox {
);
});
}

private currentPredicate(mutationEvent: MutationEvent) {
const mutationEventModelDefinition = this.schema.namespaces[SYNC].models[
'MutationEvent'
];

return ModelPredicateCreator.createFromExisting<MutationEvent>(
mutationEventModelDefinition,
c =>
c
.modelId('eq', mutationEvent.modelId)
.id('ne', this.inProgressMutationEventId)
);
}
}

export { MutationEventOutbox };
1 change: 0 additions & 1 deletion packages/datastore/src/sync/processors/mutation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ class MutationProcessor {

do {
try {
logger.debug('Outbox sending', tryWith);
const result = <GraphQLResult<Record<string, PersistentModel>>>(
await API.graphql(tryWith)
);
Expand Down

0 comments on commit 01129ce

Please sign in to comment.