Skip to content

Commit

Permalink
fix(@aws-amplify/datastore): fix concurrent updates
Browse files Browse the repository at this point in the history
  • Loading branch information
iartemiev committed Feb 2, 2021
1 parent 651c3b2 commit d939a1c
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 18 deletions.
1 change: 0 additions & 1 deletion packages/datastore/__tests__/DataStore.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import 'fake-indexeddb/auto';
import FDBCursor from 'fake-indexeddb/build/FDBCursor';
import { decodeTime } from 'ulid';
import uuidValidate from 'uuid-validate';
import Observable from 'zen-observable-ts';
Expand Down
1 change: 0 additions & 1 deletion packages/datastore/src/sync/merger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
PersistentModelConstructor,
} from '../types';
import { MutationEventOutbox } from './outbox';

class ModelMerger {
constructor(
private readonly outbox: MutationEventOutbox,
Expand Down
141 changes: 127 additions & 14 deletions packages/datastore/src/sync/outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,9 @@ class MutationEventOutbox {
mutationEvent: MutationEvent
): Promise<void> {
storage.runExclusive(async s => {
const mutationEventModelDefinition = this.schema.namespaces[SYNC].models[
'MutationEvent'
];

const predicate = ModelPredicateCreator.createFromExisting<MutationEvent>(
mutationEventModelDefinition,
c =>
c
.modelId('eq', mutationEvent.modelId)
.id('ne', this.inProgressMutationEventId)
);

const [first] = await s.query(this.MutationEvent, predicate);
const predicate = this.currentPredicate(mutationEvent);
const existing = await s.query(this.MutationEvent, predicate);
const [first] = existing;

if (first === undefined) {
await s.save(mutationEvent, undefined, this.ownSymbol);
Expand All @@ -51,6 +41,8 @@ class MutationEventOutbox {

if (first.operation === TransformerMutationType.CREATE) {
if (incomingMutationType === TransformerMutationType.DELETE) {
// get predicate again to avoid race condition with inProgressMutationEventId
const predicate = this.currentPredicate(mutationEvent);
// delete all for model
await s.delete(this.MutationEvent, predicate);
} else {
Expand All @@ -67,21 +59,51 @@ class MutationEventOutbox {
const { condition: incomingConditionJSON } = mutationEvent;
const incomingCondition = JSON.parse(incomingConditionJSON);

const updated = await this.reconcileOutboxOnEnqueue(
existing,
mutationEvent
);

// If no condition
if (Object.keys(incomingCondition).length === 0) {
// get predicate again to avoid race condition with inProgressMutationEventId
const predicate = this.currentPredicate(mutationEvent);
// delete all for model
await s.delete(this.MutationEvent, predicate);
}

if (updated) {
await s.save(updated, undefined, this.ownSymbol);
return;
}

// Enqueue new one
await s.save(mutationEvent, undefined, this.ownSymbol);
}
});
}

public async dequeue(storage: StorageFacade): Promise<MutationEvent> {
public async dequeue(
storage: Storage,
record?: PersistentModel
): Promise<MutationEvent> {
const head = await this.peek(storage);

const mutationEventModelDefinition = this.schema.namespaces[SYNC].models[
'MutationEvent'
];

const predicate = ModelPredicateCreator.createFromExisting<MutationEvent>(
mutationEventModelDefinition,
c => c.modelId('eq', record.id)
);

const all = await storage.query(this.MutationEvent, predicate);

if (record) {
await this.reconcileOutboxOnDequeue(storage, record);
}

await storage.delete(head);

this.inProgressMutationEventId = undefined;
Expand Down Expand Up @@ -129,6 +151,97 @@ class MutationEventOutbox {

return result;
}

private async reconcileOutboxOnEnqueue(
existing: MutationEvent[],
mutationEvent: MutationEvent
): Promise<MutationEvent | undefined> {
const { _version, _lastChangedAt } = existing.reduce(
(acc, cur) => {
const oldData = JSON.parse(cur.data);
const { _version: lastVersion } = acc;
const { _version: _v, _lastChangedAt: _lCA } = oldData;

if (_v > lastVersion) {
return { _version: _v, _lastChangedAt: _lCA };
}

return acc;
},
{
_version: 0,
_lastChangedAt: 0,
}
);

const currentData = JSON.parse(mutationEvent.data);
const currentVersion = currentData._version;

if (currentVersion < _version) {
const newData = { ...currentData, _version, _lastChangedAt };
const newMutation = new this.MutationEvent({
...mutationEvent,
data: JSON.stringify(newData),
});
return newMutation;
}
}

private async reconcileOutboxOnDequeue(
storage: Storage,
record: PersistentModel
): Promise<void> {
storage.runExclusive(async s => {
const mutationEventModelDefinition = this.schema.namespaces[SYNC].models[
'MutationEvent'
];

const predicate = ModelPredicateCreator.createFromExisting<MutationEvent>(
mutationEventModelDefinition,
c => c.modelId('eq', record.id).id('ne', this.inProgressMutationEventId)
);

const outdatedMutations = await s.query(this.MutationEvent, predicate);

if (!outdatedMutations.length) {
return;
}

const { _version, _lastChangedAt } = record;

const reconciledMutations = outdatedMutations.map(m => {
const oldData = JSON.parse(m.data);

const newData = { ...oldData, _version, _lastChangedAt };

return this.MutationEvent.copyOf(m, draft => {
draft.data = JSON.stringify(newData);
});
});

await s.delete(this.MutationEvent, predicate);

await Promise.all(
reconciledMutations.map(
async m => await s.save(m, undefined, this.ownSymbol)
)
);
});
}

private currentPredicate(mutationEvent: MutationEvent) {
const mutationEventModelDefinition = this.schema.namespaces[SYNC].models[
'MutationEvent'
];

return ModelPredicateCreator.createFromExisting<MutationEvent>(
mutationEventModelDefinition,
c =>
c
.modelId('eq', mutationEvent.modelId)
.id('ne', this.inProgressMutationEventId)
);
}
}

export { MutationEventOutbox };
2 changes: 1 addition & 1 deletion packages/datastore/src/sync/processors/mutation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class MutationProcessor {
}

const record = result.data[opName];
await this.outbox.dequeue(this.storage);
await this.outbox.dequeue(this.storage, record);

const hasMore = (await this.outbox.peek(this.storage)) !== undefined;

Expand Down
3 changes: 2 additions & 1 deletion scripts/build.js
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ function buildES6(typeScriptCompiler) {
lib: ['dom', 'es2017', 'esnext.asynciterable', 'es2018.asyncgenerator'],
downlevelIteration: true,
jsx: jsx,
sourceMap: true,
inlineSourceMap: true,
inlineSources: true,
target: 'es5',
module: 'es2015',
moduleResolution: 'node',
Expand Down

0 comments on commit d939a1c

Please sign in to comment.