Skip to content

Commit

Permalink
IMPROVE performance of categorizeBulkWriteRows
Browse files Browse the repository at this point in the history
  • Loading branch information
pubkey committed Apr 8, 2023
1 parent e18fd9b commit 63c1ab2
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 35 deletions.
80 changes: 45 additions & 35 deletions src/rx-storage-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,9 @@ export function categorizeBulkWriteRows<RxDocType>(
const bulkUpdateDocs: BulkWriteRowProcessed<RxDocType>[] = [];
const errors: ById<RxStorageWriteError<RxDocType>> = {};
const changedDocumentIds: RxDocType[StringKeys<RxDocType>][] = [];
const eventBulkId = randomCouchString(10);
const eventBulk: EventBulk<RxStorageChangeEvent<RxDocumentData<RxDocType>>, any> = {
id: randomCouchString(10),
id: eventBulkId,
events: [],
checkpoint: null,
context
Expand All @@ -207,13 +208,18 @@ export function categorizeBulkWriteRows<RxDocType>(
const startTime = now();

const docsByIdIsMap = typeof docsInDb.get === 'function';
const hasDocsInDb = docsByIdIsMap ? (docsInDb as Map<any, any>).size > 0 : Object.keys(docsInDb).length > 0;

let newestRow: BulkWriteRowProcessed<RxDocType> | undefined;

const rowAmount = bulkWriteRows.length;
for (let i = 0; i < rowAmount; i++) {
const writeRow = bulkWriteRows[i];
const id = writeRow.document[primaryPath];
const documentInDb = docsByIdIsMap ? (docsInDb as any).get(id) : (docsInDb as any)[id];
for (let rowId = 0; rowId < rowAmount; rowId++) {
const writeRow = bulkWriteRows[rowId];
const docId = writeRow.document[primaryPath] as string;
let documentInDb: RxDocumentData<RxDocType> | false = false;
if (hasDocsInDb) {
documentInDb = docsByIdIsMap ? (docsInDb as any).get(docId) : (docsInDb as any)[docId];
}
let attachmentError: RxStorageWriteErrorAttachment<RxDocType> | undefined;

if (!documentInDb) {
Expand All @@ -228,16 +234,16 @@ export function categorizeBulkWriteRows<RxDocType>(
!(attachmentData as RxAttachmentWriteData).data
) {
attachmentError = {
documentId: id as any,
documentId: docId,
isError: true,
status: 510,
writeRow,
attachmentId
};
errors[id as any] = attachmentError;
errors[docId] = attachmentError;
} else {
attachmentsAdd.push({
documentId: id as any,
documentId: docId,
attachmentId,
attachmentData: attachmentData as any
});
Expand All @@ -259,10 +265,15 @@ export function categorizeBulkWriteRows<RxDocType>(
}

if (!insertedIsDeleted) {
changedDocumentIds.push(id);
changedDocumentIds.push(docId as any);
eventBulk.events.push({
eventId: getUniqueDeterministicEventKey(storageInstance, primaryPath as any, writeRow),
documentId: id as any,
eventId: getUniqueDeterministicEventKey(
eventBulkId,
rowId,
docId,
writeRow
),
documentId: docId,
operation: 'INSERT',
documentData: hasAttachments ? stripAttachmentsDataFromDocument(writeRow.document) : writeRow.document as any,
previousDocumentData: hasAttachments && writeRow.previous ? stripAttachmentsDataFromDocument(writeRow.previous) : writeRow.previous as any,
Expand Down Expand Up @@ -290,11 +301,11 @@ export function categorizeBulkWriteRows<RxDocType>(
const err: RxStorageWriteError<RxDocType> = {
isError: true,
status: 409,
documentId: id as any,
documentId: docId,
writeRow: writeRow,
documentInDb
};
errors[id as any] = err;
errors[docId] = err;
continue;
}

Expand All @@ -311,7 +322,7 @@ export function categorizeBulkWriteRows<RxDocType>(
.keys(writeRow.previous._attachments)
.forEach(attachmentId => {
attachmentsRemove.push({
documentId: id as any,
documentId: docId,
attachmentId
});
});
Expand All @@ -327,8 +338,8 @@ export function categorizeBulkWriteRows<RxDocType>(
!(attachmentData as RxAttachmentWriteData).data
) {
attachmentError = {
documentId: id as any,
documentInDb,
documentId: docId,
documentInDb: documentInDb as any,
isError: true,
status: 510,
writeRow,
Expand All @@ -344,7 +355,7 @@ export function categorizeBulkWriteRows<RxDocType>(
const previousAttachmentData = writeRow.previous ? writeRow.previous._attachments[attachmentId] : undefined;
if (!previousAttachmentData) {
attachmentsAdd.push({
documentId: id as any,
documentId: docId,
attachmentId,
attachmentData: attachmentData as any
});
Expand All @@ -359,7 +370,7 @@ export function categorizeBulkWriteRows<RxDocType>(
previousAttachmentData.digest !== newDigest
) {
attachmentsUpdate.push({
documentId: id as any,
documentId: docId,
attachmentId,
attachmentData: attachmentData as RxAttachmentWriteData
});
Expand All @@ -371,7 +382,7 @@ export function categorizeBulkWriteRows<RxDocType>(
}

if (attachmentError) {
errors[id as any] = attachmentError;
errors[docId] = attachmentError;
} else {
bulkUpdateDocs.push(updatedRow);
if (
Expand Down Expand Up @@ -403,11 +414,16 @@ export function categorizeBulkWriteRows<RxDocType>(
throw newRxError('SNH', { args: { writeRow } });
}

changedDocumentIds.push(id);
changedDocumentIds.push(docId as any);
eventBulk.events.push({
eventId: getUniqueDeterministicEventKey(storageInstance, primaryPath as any, writeRow),
documentId: id as any,
documentData: ensureNotFalsy(eventDocumentData),
eventId: getUniqueDeterministicEventKey(
eventBulkId,
rowId,
docId,
writeRow
),
documentId: docId,
documentData: eventDocumentData as RxDocumentData<RxDocType>,
previousDocumentData: previousEventDocumentData,
operation: operation,
startTime,
Expand Down Expand Up @@ -485,22 +501,16 @@ export function flatCloneDocWithMeta<RxDocType>(

/**
* Each event is labeled with the id
* to make it easy to filter out duplicates.
* to make it easy to filter out duplicates
* even on flattened eventBulks
*/
export function getUniqueDeterministicEventKey(
storageInstance: RxStorageInstance<any, any, any>,
primaryPath: string,
eventBulkId: string,
rowId: number,
docId: string,
writeRow: BulkWriteRow<any>
): string {
const docId = writeRow.document[primaryPath];
const binaryValues: boolean[] = [
!!writeRow.previous,
(writeRow.previous && writeRow.previous._deleted),
!!writeRow.document._deleted
];
const binary = binaryValues.map(v => v ? '1' : '0').join('');
const eventKey = storageInstance.databaseName + '|' + storageInstance.collectionName + '|' + docId + '|' + '|' + binary + '|' + writeRow.document._rev;
return eventKey;
return eventBulkId + '|' + rowId + '|' + docId + '|' + writeRow.document._rev;
}


Expand Down
1 change: 1 addition & 0 deletions test/unit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import './unit/query-planner.test';
*/
import './unit/rx-storage-implementations.test';
import './unit/rx-storage-query-correctness.test';
import './unit/rx-storage-helper.test';

import './unit/rx-storage-lokijs.test';
import './unit/rx-storage-dexie.test';
Expand Down
72 changes: 72 additions & 0 deletions test/unit/rx-storage-helper.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@

import config from './config';
import * as schemaObjects from '../helper/schema-objects';
import {
randomCouchString,
now,
fillWithDefaultSettings,
categorizeBulkWriteRows,
getPrimaryFieldOfPrimaryKey,
BulkWriteRow
} from '../../';
import * as schemas from '../helper/schemas';
import {
EXAMPLE_REVISION_1
} from '../helper/revisions';
import assert from 'assert';


const testContext = 'rx-storage-helper.test.ts';

config.parallel('rx-storage-helper.test.ts', () => {
describe('.categorizeBulkWriteRows()', () => {
it('performance', async () => {

const instance = await config.storage.getStorage().createStorageInstance({
databaseInstanceToken: randomCouchString(10),
databaseName: randomCouchString(10),
collectionName: randomCouchString(10),
schema: fillWithDefaultSettings(schemas.human),
options: {},
multiInstance: false,
devMode: true
});
const primaryPath = getPrimaryFieldOfPrimaryKey(schemas.human.primaryKey);
const amount = config.isFastMode() ? 100 : 10000;
const writeRows: BulkWriteRow<schemas.HumanDocumentType>[] = new Array(amount).fill(0).map(() => {
const document = Object.assign(
schemaObjects.human(),
{
_deleted: false,
_rev: EXAMPLE_REVISION_1,
_attachments: {},
_meta: {
lwt: now()
}
}
);
return { document };
});

const startTime = performance.now();

categorizeBulkWriteRows(
instance,
primaryPath,
new Map(),
writeRows,
testContext
);

const endTime = performance.now();
const time = endTime - startTime;

// console.log('time ' + time);
// process.exit();


assert.ok(time);
instance.close();
});
});
});

0 comments on commit 63c1ab2

Please sign in to comment.