diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 1a895c30830..a90b3b21cfc 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -906,7 +906,7 @@ jobs: - uses: denoland/setup-deno@v1 with: # https://github.com/denoland/deno/releases - deno-version: "1.38.5" + deno-version: "1.44.1" - name: run deno tests:dexie run: | sudo npm i -g cross-env diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a55bbbfc41..fafdfe8e022 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,8 @@ # RxDB Changelog - +- ADD [RxPipeline Plugin](https://rxdb.info/rx-pipeline.html). +- FIX (denoKV RxStorage) retry writes when db is locked. diff --git a/docs-src/docs/rx-pipeline.md b/docs-src/docs/rx-pipeline.md new file mode 100644 index 00000000000..312bdc7787c --- /dev/null +++ b/docs-src/docs/rx-pipeline.md @@ -0,0 +1,163 @@ +# RxPipeline (beta) + +The RxPipeline plugin enables you to run operations depending on writes to a collection. +Whenever a write happens on the source collection of a pipeline, a handler is called to process the writes and run operations on another collection. + +You could have a similar behavior as observing the collection stream and process data on emits: + +```ts +mySourceCollection.$.subscribe(event => {/* ...process...*/}); +``` + +While this could work in some cases, it causes many problems that are fixed by using the pipeline plugin instead: +- In a RxPipeline, only the [Leading Instance](./leader-election.md) runs the operations. For example when you have multiple browser tabs open, only one will run the processing and when that tab is closed, another tab will become elected leader and continue the pipeline processing. +- On sudden stops and restarts of the JavaScript process, the processing will continue at the correct checkpoint and not miss out any documents even on unexpected crashes. +- Reads/Writes on the destination collection are halted while the pipeline is processing. This ensures your queries only return fully processed documents and no partial results. + + + +## Creating a RxPipeline + +Pipelines are created on top of a source [RxCollection](./rx-collection.md) and have another `RxCollection` as destination. An identifier is used to identify the state of the pipeline so that different pipelines have a different processing checkpoint state. A plain JavaScript function `handler` is used to process the data of the source collection writes. + +```ts +const pipeline = await mySourceCollection.addPipeline({ + identifier: 'my-pipeline', + destination: myDestinationCollection, + handler: async (docs) => { + /** + * Here you can process the documents and to writes to + * the destination collection. + */ + for (const doc of docs) { + await myDestinationCollection.insert({ + id: doc.primary, + category: doc.category + }); + } + } +}); +``` + +:::warning beta +The pipeline plugin is in **beta** mode and the API might be changed without a major RxDB release. +::: + + +## Pipeline handlers must be idempotent + +Because a JavaScript process can exit at any time, like when the user closes a browser tab, the pipeline handler function must be idempotent. This means when it only runs partially and is started again with the same input, it should still end up in the correct results. + +## Pipeline handlers must not throw + +Pipeline handlers must never throw. If you run operations inside of the handler that might cause errors, you must wrap the handlers code with a `try catch` by yourself and also handle retries. + +## Be careful when doing http requests in the handler + +When you run http requests inside of your handler, you no longer have an [offline first](./offline-first.md) application because reads to the destination collection will be blocked until all handlers have finished. When your client is offline, therefore the collection is blocked for reads and writes. + +## Use Cases for RxPipeline + +The RxPipeline is a handy building block for different features and plugins. You can use it to aggregate data or restructure local data. + +### UseCase: Re-Index data that comes from replication + +Sometimes you want to [replicate](./replication.md) atomic documents over the wire but locally you want to split these documents for better indexing. For example you replicate email documents that have multiple receivers in a string-array. While string-arrays cannot be indexes, locally you need a way to query for all emails of a given receiver. +To handle this case you can set up a RxPipeline that writes the mapping into a separate collection: + +```ts +const pipeline = await emailCollection.addPipeline({ + identifier: 'map-email-receivers', + destination: emailByReceiverCollection, + handler: async (docs) => { + for (const doc of docs) { + // remove previous mapping + await emailByReceiverCollection.find({emailId: doc.primary}).remove(); + // add new mapping + if(!doc.deleted) { + await emailByReceiverCollection.bulkInsert( + doc.receivers.map(receiver => ({ + emailId: doc.primary, + receiver: receiver + })) + ); + } + } + } +}); +``` + +With this you can efficiently query for "all emails that a person received" by running: + +```ts +const mailIds = await emailByReceiverCollection.find({receiver: 'foobar@example.com'}).exec(); +``` + +### UseCase: Fulltext Search + +You can utilize the pipeline plugin to index text data for efficient fulltext search. + +```ts +const pipeline = await emailCollection.addPipeline({ + identifier: 'email-fulltext-search', + destination: mailByWordCollection, + handler: async (docs) => { + for (const doc of docs) { + // remove previous mapping + await mailByWordCollection.find({emailId: doc.primary}).remove(); + // add new mapping + if(!doc.deleted) { + const words = doc.text.split(' '); + await mailByWordCollection.bulkInsert( + words.map(word => ({ + emailId: doc.primary, + word: word + })) + ); + } + } + } +}); +``` + +With this you can efficiently query for "all emails that contain a given word" by running: + +```ts +const mailIds = await emailByReceiverCollection.find({word: 'foobar'}).exec(); +``` + +### UseCase: Download data based on source documents + +When you have to fetch data for each document of a collection from a server, you can use the pipeline to ensure all documents have their data downloaded and no document is missed out. + +```ts +const pipeline = await emailCollection.addPipeline({ + identifier: 'download-data', + destination: serverDataCollection, + handler: async (docs) => { + for (const doc of docs) { + const response = await fetch('https://example.com/doc/' + doc.primary); + const serverData = await response.json(); + await serverDataCollection.upsert({ + id: doc.primary, + data: serverData + }); + } + } +}); +``` + + +## RxPipeline method + +### awaitIdle() + +You can await the idleness of a pipeline with `await myRxPipeline.awaitIdle()`. This will await a promise that resolved when the pipeline has processed all documents and is not running anymore. + +### destroy() + +`await myRxPipeline.destroy()` stops the pipeline so that is no longer doing stuff. This is automatically called when the RxCollection or RxDatabase of the pipeline is destroyed. + +### remove() + +`await myRxPipeline.remove()` removes the pipeline and all metadata which it has stored. Recreating the pipeline afterwards will start processing all source document from scratch. diff --git a/docs-src/sidebars.js b/docs-src/sidebars.js index b7f48b76098..ea96e8ab490 100644 --- a/docs-src/sidebars.js +++ b/docs-src/sidebars.js @@ -27,6 +27,7 @@ const sidebars = { 'rx-document', 'rx-query', 'rx-attachment', + 'rx-pipeline', { type: 'category', label: '💾 RxStorage', diff --git a/package.json b/package.json index 02427655138..7359c9bebad 100644 --- a/package.json +++ b/package.json @@ -303,6 +303,12 @@ "import": "./dist/esm/plugins/state/index.js", "default": "./dist/esm/plugins/state/index.js" }, + "./plugins/pipeline": { + "types": "./dist/types/plugins/pipeline/index.d.ts", + "require": "./dist/cjs/plugins/pipeline/index.js", + "import": "./dist/esm/plugins/pipeline/index.js", + "default": "./dist/esm/plugins/pipeline/index.js" + }, "./plugins/validate-ajv": { "types": "./dist/types/plugins/validate-ajv/index.d.ts", "require": "./dist/cjs/plugins/validate-ajv/index.js", @@ -582,4 +588,4 @@ "webpack-cli": "5.1.4", "webpack-dev-server": "5.0.4" } -} \ No newline at end of file +} diff --git a/src/change-event-buffer.ts b/src/change-event-buffer.ts index fbfc98ebd80..35482187e46 100644 --- a/src/change-event-buffer.ts +++ b/src/change-event-buffer.ts @@ -9,7 +9,10 @@ import type { RxChangeEvent, RxCollection } from './types/index.d.ts'; -import { appendToArray, requestIdlePromiseNoQueue } from './plugins/utils/index.ts'; +import { + appendToArray, + requestIdlePromiseNoQueue +} from './plugins/utils/index.ts'; /** * This buffer rembemers previous change events diff --git a/src/plugins/pipeline/flagged-functions.ts b/src/plugins/pipeline/flagged-functions.ts new file mode 100644 index 00000000000..8d9bb7889e3 --- /dev/null +++ b/src/plugins/pipeline/flagged-functions.ts @@ -0,0 +1,131 @@ +import { ensureNotFalsy } from '../utils/index.ts'; + +/** + * This is the most hacky thing we do in RxDB. + * When a pipeline "transaction" is running, + * we have to make all calls to the collection from the outside + * wait while still make it possible to run reads and writes + * from inside the transaction. + * + * We can decide where the call came from by checking the stack `new Error().stack` + * for a random "flag". + * But creating random flagged functions requires eval which we should not use. + * Instead we have a list of some flagged functions here + * that can be used and checked for in the stacktrace. + * + * + * When doing this with eval() instead it would look like: + * ```ts + * eval(` + * async function ${this.secretFunctionName}(docs){ const x = await _this.handler(docs); return x; } + * o.${this.secretFunctionName} = ${this.secretFunctionName}; + * `); + * await o[this.secretFunctionName](rxDocuments); + * + * ``` + */ +async function rx_pipeline_fn_1_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_2_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_3_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_4_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_5_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_6_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_7_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_8_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_9_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_10_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_11_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_12_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_13_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_14_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_15_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_16_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_17_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_18_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_19_(fn: any) { + return await fn(); +} +async function rx_pipeline_fn_20_(fn: any) { + return await fn(); +} + + + + + +export const FLAGGED_FUNCTIONS = { + rx_pipeline_fn_1_, + rx_pipeline_fn_2_, + rx_pipeline_fn_3_, + rx_pipeline_fn_4_, + rx_pipeline_fn_5_, + rx_pipeline_fn_6_, + rx_pipeline_fn_7_, + rx_pipeline_fn_8_, + rx_pipeline_fn_9_, + rx_pipeline_fn_10_, + rx_pipeline_fn_11_, + rx_pipeline_fn_12_, + rx_pipeline_fn_13_, + rx_pipeline_fn_14_, + rx_pipeline_fn_15_, + rx_pipeline_fn_16_, + rx_pipeline_fn_17_, + rx_pipeline_fn_18_, + rx_pipeline_fn_19_, + rx_pipeline_fn_20_, +} as const; + + + +const ids: (keyof typeof FLAGGED_FUNCTIONS)[] = Object.keys(FLAGGED_FUNCTIONS) as any; + +export function blockFlaggedFunctionKey(): keyof typeof FLAGGED_FUNCTIONS { + /** + * If this happens and we have no more flagged keys left + * it means that more pipeline handlers are running in parallel. + * To fix this, add more functions. + */ + const id = ensureNotFalsy(ids.pop(), 'no flagged keys left'); + return id; +} + +export function releaseFlaggedFunctionKey(key: keyof typeof FLAGGED_FUNCTIONS) { + ids.push(key); +} diff --git a/src/plugins/pipeline/index.ts b/src/plugins/pipeline/index.ts new file mode 100644 index 00000000000..d6ace50ff0f --- /dev/null +++ b/src/plugins/pipeline/index.ts @@ -0,0 +1,18 @@ +import type { + RxPlugin +} from '../../types/index.d.ts'; +import { addPipeline } from './rx-pipeline.ts'; + +export type * from './types.ts'; +export * from './flagged-functions.ts'; +export * from './rx-pipeline.ts'; + +export const RxDBPipelinePlugin: RxPlugin = { + name: 'pipeline', + rxdb: true, + prototypes: { + RxCollection(proto: any) { + proto.addPipeline = addPipeline; + } + } +}; diff --git a/src/plugins/pipeline/rx-pipeline.ts b/src/plugins/pipeline/rx-pipeline.ts new file mode 100644 index 00000000000..ee22dd1f435 --- /dev/null +++ b/src/plugins/pipeline/rx-pipeline.ts @@ -0,0 +1,283 @@ +import { + BehaviorSubject, + Subject, + Subscription, + filter, + firstValueFrom, + race +} from 'rxjs'; +import type { + InternalStoreDocType, + RxCollection, + RxDocument, + RxDocumentData +} from '../../types'; +import type { + CheckpointDocData, + RxPipelineHandler, + RxPipelineOptions +} from './types'; +import { + PROMISE_RESOLVE_VOID, + clone, + createRevision, + ensureNotFalsy, + lastOfArray, + nameFunction, + now, + promiseWait, + randomCouchString +} from '../utils/index.ts'; +import { getChangedDocumentsSince } from '../../rx-storage-helper.ts'; +import { mapDocumentsDataToCacheDocs } from '../../doc-cache.ts'; +import { getPrimaryKeyOfInternalDocument } from '../../rx-database-internal-store.ts'; +import { FLAGGED_FUNCTIONS, blockFlaggedFunctionKey, releaseFlaggedFunctionKey } from './flagged-functions.ts'; +export const RX_PIPELINE_CHECKPOINT_CONTEXT = 'rx-pipeline-checkpoint'; + + +export class RxPipeline { + processQueue = PROMISE_RESOLVE_VOID; + subs: Subscription[] = []; + stopped: boolean = false; + + toRun = 1; + checkpointId: string; + + lastSourceDocTime = new BehaviorSubject(-1); + lastProcessedDocTime = new BehaviorSubject(0); + somethingChanged = new Subject(); + + + secretFunctionName = 'tx_fn_' + randomCouchString(10) + + waitBeforeWriteFn = async () => { + const stack = new Error().stack; + if (stack && ( + stack.includes(this.secretFunctionName) + )) { + } else { + await this.awaitIdle(); + } + } + + constructor( + public readonly identifier: string, + public readonly source: RxCollection, + public readonly destination: RxCollection, + public readonly handler: RxPipelineHandler, + public readonly batchSize = 100 + ) { + this.checkpointId = 'rx-pipeline-' + identifier; + this.source.onDestroy.push(() => this.destroy()); + this.destination.awaitBeforeReads.add(this.waitBeforeWriteFn); + this.subs.push( + this.source.database.eventBulks$.pipe( + filter(changeEventBulk => changeEventBulk.collectionName === this.source.name) + ).subscribe((bulk) => { + this.lastSourceDocTime.next(bulk.events[0].documentData._meta.lwt); + this.somethingChanged.next({}); + }) + ); + this.subs.push( + this.destination.database.internalStore + .changeStream() + .subscribe(eventBulk => { + const events = eventBulk.events; + for (let index = 0; index < events.length; index++) { + const event = events[index]; + if ( + event.documentData.context === RX_PIPELINE_CHECKPOINT_CONTEXT && + event.documentData.key === this.checkpointId + ) { + this.lastProcessedDocTime.next(event.documentData.data.lastDocTime); + this.somethingChanged.next({}); + } + } + }) + ); + } + + trigger() { + /** + * Do not stack up too many + * so that fast writes to the source collection + * do not block anything too long. + */ + if (this.toRun > 2) { + return; + } + this.toRun = this.toRun + 1; + + this.processQueue = this.processQueue.then(async () => { + this.toRun = this.toRun - 1; + + let done = false; + while ( + !done && + !this.stopped && + !this.destination.destroyed && + !this.source.destroyed + ) { + const checkpointDoc = await getCheckpointDoc(this); + const checkpoint = checkpointDoc ? checkpointDoc.data.checkpoint : undefined; + const docsSinceResult = await getChangedDocumentsSince( + this.source.storageInstance, + this.batchSize, + checkpoint + ); + + let lastTime = checkpointDoc ? checkpointDoc.data.lastDocTime : 0; + if (docsSinceResult.documents.length > 0) { + const rxDocuments = mapDocumentsDataToCacheDocs(this.source._docCache, docsSinceResult.documents); + const _this = this; + + + + // const o: any = {}; + // eval(` + // async function ${this.secretFunctionName}(docs){ const x = await _this.handler(docs); return x; } + // o.${this.secretFunctionName} = ${this.secretFunctionName}; + // `); + // await o[this.secretFunctionName](rxDocuments); + + const fnKey = blockFlaggedFunctionKey(); + this.secretFunctionName = fnKey; + try { + await FLAGGED_FUNCTIONS[fnKey](() => _this.handler(rxDocuments)); + } finally { + releaseFlaggedFunctionKey(fnKey); + } + + lastTime = ensureNotFalsy(lastOfArray(docsSinceResult.documents))._meta.lwt; + } + if (!this.destination.destroyed) { + await setCheckpointDoc(this, { checkpoint: docsSinceResult.checkpoint, lastDocTime: lastTime }, checkpointDoc); + } + if (docsSinceResult.documents.length < this.batchSize) { + done = true; + } + } + }); + } + + async awaitIdle() { + let done = false; + while (!done) { + await this.processQueue; + if (this.lastProcessedDocTime.getValue() >= this.lastSourceDocTime.getValue()) { + done = true; + } else { + await firstValueFrom(this.somethingChanged); + } + } + } + + async destroy() { + this.stopped = true; + this.destination.awaitBeforeReads.delete(this.waitBeforeWriteFn); + this.subs.forEach(s => s.unsubscribe()); + await this.processQueue; + } + + /** + * Remove the pipeline and all metadata which it has stored + */ + async remove() { + const insternalStore = this.destination.database.internalStore; + const checkpointDoc = await getCheckpointDoc(this); + if (checkpointDoc) { + const newDoc: RxDocumentData = clone(checkpointDoc); + newDoc._deleted = true; + const writeResult = await insternalStore.bulkWrite([{ + previous: checkpointDoc, + document: newDoc, + }], RX_PIPELINE_CHECKPOINT_CONTEXT); + if (writeResult.error.length > 0) { + throw writeResult.error; + } + } + return this.destroy(); + } +} + + +export async function getCheckpointDoc( + pipeline: RxPipeline +): Promise> | undefined> { + const insternalStore = pipeline.destination.database.internalStore; + const checkpointId = getPrimaryKeyOfInternalDocument( + pipeline.checkpointId, + RX_PIPELINE_CHECKPOINT_CONTEXT + ); + const results = await insternalStore.findDocumentsById([checkpointId], false); + const result: RxDocumentData = results[0]; + if (result) { + return result; + } else { + return undefined; + } +} + +export async function setCheckpointDoc( + pipeline: RxPipeline, + newCheckpoint: CheckpointDocData, + previous?: RxDocumentData +): Promise { + const insternalStore = pipeline.destination.database.internalStore; + const newDoc: RxDocumentData> = { + _attachments: {}, + _deleted: false, + _meta: { + lwt: now() + }, + _rev: createRevision(pipeline.destination.database.token, previous), + context: RX_PIPELINE_CHECKPOINT_CONTEXT, + data: newCheckpoint, + id: getPrimaryKeyOfInternalDocument( + pipeline.checkpointId, + RX_PIPELINE_CHECKPOINT_CONTEXT + ), + key: pipeline.checkpointId + }; + + const writeResult = await insternalStore.bulkWrite([{ + previous, + document: newDoc, + }], RX_PIPELINE_CHECKPOINT_CONTEXT); + if (writeResult.error.length > 0) { + throw writeResult.error; + } +} + + +export async function addPipeline( + this: RxCollection, + options: RxPipelineOptions +): Promise> { + const pipeline = new RxPipeline( + options.identifier, + this, + options.destination, + options.handler, + options.batchSize + ); + const waitForLeadership = typeof options.waitForLeadership === 'undefined' ? true : options.waitForLeadership; + const startPromise = waitForLeadership ? this.database.waitForLeadership() : PROMISE_RESOLVE_VOID; + startPromise.then(() => { + pipeline.trigger(); + pipeline.subs.push( + this.database.eventBulks$.pipe( + filter(changeEventBulk => changeEventBulk.collectionName === this.name), + filter(bulk => { + if (pipeline.stopped) { + return false; + } + const first = bulk.events[0]; + return !first.isLocal; + }) + ).subscribe(() => pipeline.trigger()) + ); + }); + + return pipeline; +} diff --git a/src/plugins/pipeline/types.ts b/src/plugins/pipeline/types.ts new file mode 100644 index 00000000000..510f2636c28 --- /dev/null +++ b/src/plugins/pipeline/types.ts @@ -0,0 +1,28 @@ +import type { + MaybePromise, + RxCollection, + RxDocument +} from '../../types'; + + +export type RxPipelineHandler = ( + docs: RxDocument[] +) => MaybePromise; + +export type RxPipelineOptions = { + /** + * The identifier of the pipeline. Used when + * metadata of the pipeline is stored. Changing the identifier. + */ + identifier: string; + destination: RxCollection; + handler: RxPipelineHandler; + waitForLeadership?: boolean; + batchSize?: number; +} + + +export type CheckpointDocData = { + checkpoint: CheckpointType; + lastDocTime: number; +}; diff --git a/src/plugins/replication-graphql/index.ts b/src/plugins/replication-graphql/index.ts index 7fb2da7e973..c6a8179d330 100644 --- a/src/plugins/replication-graphql/index.ts +++ b/src/plugins/replication-graphql/index.ts @@ -257,3 +257,4 @@ export * from './helper.ts'; export * from './graphql-schema-from-rx-schema.ts'; export * from './query-builder-from-rx-schema.ts'; export * from './graphql-websocket.ts'; + diff --git a/src/plugins/storage-denokv/rx-storage-instance-denokv.ts b/src/plugins/storage-denokv/rx-storage-instance-denokv.ts index 578f9dacefa..2d280e746c8 100644 --- a/src/plugins/storage-denokv/rx-storage-instance-denokv.ts +++ b/src/plugins/storage-denokv/rx-storage-instance-denokv.ts @@ -175,8 +175,20 @@ export class RxStorageInstanceDenoKV implements RxStorageInstance< }); }); - const txResult = await tx.commit(); - if (txResult.ok) { + let txResult; + try { + txResult = await tx.commit(); + } catch (err: any) { + if ( + err.message.includes('Error code 5:') || + err.message.includes('Error code 517:') + ) { + // retry + } else { + throw err; + } + } + if (txResult && txResult.ok) { appendToArray(ret.error, categorized.errors); if (categorized.eventBulk.events.length > 0) { const lastState = ensureNotFalsy(categorized.newestRow).document; diff --git a/src/plugins/utils/utils-other.ts b/src/plugins/utils/utils-other.ts index a99bff80466..41e9af37f2f 100644 --- a/src/plugins/utils/utils-other.ts +++ b/src/plugins/utils/utils-other.ts @@ -29,3 +29,15 @@ export const RXJS_SHARE_REPLAY_DEFAULTS = { bufferSize: 1, refCount: true }; + + + +/** + * Dynamically add a name to a function + * so that it can later be found in the stack. + * @link https://stackoverflow.com/a/41854075/3443137 + */ +export function nameFunction(name: string, body: T): T { + // @ts-ignore + return { [name](...args) { return body.apply(this, args) } }[name]; +} diff --git a/src/rx-collection.ts b/src/rx-collection.ts index 0b046aa4396..ed98744d10f 100644 --- a/src/rx-collection.ts +++ b/src/rx-collection.ts @@ -103,6 +103,7 @@ import { defaultConflictHandler } from './replication-protocol/index.ts'; import { IncrementalWriteQueue } from './incremental-write.ts'; import { beforeDocumentUpdateWrite } from './rx-document.ts'; import { overwritable } from './overwritable.ts'; +import type { RxPipeline, RxPipelineOptions } from './plugins/pipeline/index.ts'; const HOOKS_WHEN = ['pre', 'post'] as const; type HookWhenType = typeof HOOKS_WHEN[number]; @@ -126,6 +127,13 @@ export class RxCollectionBase< public readonly timeouts: Set> = new Set(); public incrementalWriteQueue: IncrementalWriteQueue = {} as any; + + /** + * Before reads, all these methods are awaited. Used to "block" reads + * depending on other processes, like when the RxPipeline is running. + */ + public readonly awaitBeforeReads = new Set<() => MaybePromise>(); + constructor( public database: RxDatabase, public name: string, @@ -742,6 +750,11 @@ export class RxCollectionBase< throw pluginMissing('crdt'); } + + addPipeline(_options: RxPipelineOptions): Promise> { + throw pluginMissing('pipeline'); + } + /** * HOOKS */ diff --git a/src/rx-query-single-result.ts b/src/rx-query-single-result.ts index 1e4f91f01e7..32e8a773af5 100644 --- a/src/rx-query-single-result.ts +++ b/src/rx-query-single-result.ts @@ -1,7 +1,8 @@ import { mapDocumentsDataToCacheDocs } from './doc-cache.ts'; import { now, overwriteGetterForCaching } from './plugins/utils/index.ts'; +import { newRxError } from './rx-error.ts'; +import { RxQueryBase } from './rx-query.ts'; import type { - RxCollection, RxDocument, RxDocumentData } from './types'; @@ -13,7 +14,7 @@ import type { * that initializes stuff lazily so that * we can directly work with the query results after RxQuery.exec() */ -export class RxQuerySingleResult{ +export class RxQuerySingleResult { /** * Time at which the current _result state was created. * Used to determine if the result set has changed since X @@ -22,13 +23,13 @@ export class RxQuerySingleResult{ public readonly time = now(); public readonly documents: RxDocument[]; constructor( - public readonly collection: RxCollection, + public readonly query: RxQueryBase, // only used internally, do not use outside, use this.docsData instead docsDataFromStorageInstance: RxDocumentData[], // can be overwritten for count-queries public readonly count: number, ) { - this.documents = mapDocumentsDataToCacheDocs(this.collection._docCache, docsDataFromStorageInstance); + this.documents = mapDocumentsDataToCacheDocs(this.query.collection._docCache, docsDataFromStorageInstance); } @@ -73,4 +74,29 @@ export class RxQuerySingleResult{ map ); } + + getValue(throwIfMissing?: boolean) { + const op = this.query.op; + if (op === 'count') { + return this.count; + } else if (op === 'findOne') { + // findOne()-queries emit RxDocument or null + const doc = this.documents.length === 0 ? null : this.documents[0]; + if (!doc && throwIfMissing) { + throw newRxError('QU10', { + collection: this.query.collection.name, + query: this.query.mangoQuery, + op + }); + } else { + return doc; + } + } else if (op === 'findByIds') { + return this.docsMap; + } else { + // find()-queries emit RxDocument[] + // Flat copy the array so it won't matter if the user modifies it. + return this.documents.slice(0); + } + } } diff --git a/src/rx-query.ts b/src/rx-query.ts index 7890992a16c..28d4879f1c6 100644 --- a/src/rx-query.ts +++ b/src/rx-query.ts @@ -1,6 +1,5 @@ import { BehaviorSubject, - firstValueFrom, Observable, merge } from 'rxjs'; @@ -113,7 +112,6 @@ export class RxQueryBase< } get $(): Observable { if (!this._$) { - const results$ = this.collection.$.pipe( /** * Performance shortcut. @@ -145,19 +143,7 @@ export class RxQueryBase< * depending on query type */ map((result) => { - const useResult = ensureNotFalsy(result); - if (this.op === 'count') { - return useResult.count; - } else if (this.op === 'findOne') { - // findOne()-queries emit RxDocument or null - return useResult.documents.length === 0 ? null : useResult.documents[0]; - } else if (this.op === 'findByIds') { - return useResult.docsMap; - } else { - // find()-queries emit RxDocument[] - // Flat copy the array so it won't matter if the user modifies it. - return useResult.documents.slice(0); - } + return ensureNotFalsy(result).getValue(); }) ); @@ -189,6 +175,7 @@ export class RxQueryBase< // time stamps on when the last full exec over the database has run // used to properly handle events that happen while the find-query is running + // TODO do we still need these properties? public _lastExecStart: number = 0; public _lastExecEnd: number = 0; @@ -214,7 +201,7 @@ export class RxQueryBase< _setResultData(newResultData: RxDocumentData[] | number | Map>): void { if (typeof newResultData === 'number') { this._result = new RxQuerySingleResult( - this.collection, + this as any, [], newResultData ); @@ -224,7 +211,7 @@ export class RxQueryBase< } const newQueryResult = new RxQuerySingleResult( - this.collection, + this as any, newResultData, newResultData.length ); @@ -295,7 +282,7 @@ export class RxQueryBase< */ public exec(throwIfMissing: true): Promise>; public exec(): Promise; - public exec(throwIfMissing?: boolean): Promise { + public async exec(throwIfMissing?: boolean): Promise { if (throwIfMissing && this.op !== 'findOne') { throw newRxError('QU9', { collection: this.collection.name, @@ -304,25 +291,14 @@ export class RxQueryBase< }); } - /** * run _ensureEqual() here, * this will make sure that errors in the query which throw inside of the RxStorage, * will be thrown at this execution context and not in the background. */ - return _ensureEqual(this as any) - .then(() => firstValueFrom(this.$)) - .then(result => { - if (!result && throwIfMissing) { - throw newRxError('QU10', { - collection: this.collection.name, - query: this.mangoQuery, - op: this.op - }); - } else { - return result; - } - }); + await _ensureEqual(this as any); + const useResult = ensureNotFalsy(this._result); + return useResult.getValue(throwIfMissing); } @@ -485,6 +461,7 @@ export class RxQueryBase< } } + export function _getDefaultQuery(): MangoQuery { return { selector: {} @@ -542,13 +519,17 @@ function _isResultsInSync(rxQuery: RxQueryBase): boolean { * to ensure it does not run in parallel * @return true if has changed, false if not */ -function _ensureEqual(rxQuery: RxQueryBase): Promise { +async function _ensureEqual(rxQuery: RxQueryBase): Promise { + if (rxQuery.collection.awaitBeforeReads.size > 0) { + await Promise.all(Array.from(rxQuery.collection.awaitBeforeReads).map(fn => fn())); + } + // Optimisation shortcut if ( rxQuery.collection.database.destroyed || _isResultsInSync(rxQuery) ) { - return PROMISE_RESOLVE_FALSE; + return false; } rxQuery._ensureEqualQueue = rxQuery._ensureEqualQueue diff --git a/test/unit.test.ts b/test/unit.test.ts index 7434045db8b..f994faf1292 100644 --- a/test/unit.test.ts +++ b/test/unit.test.ts @@ -28,8 +28,6 @@ import './unit/rx-storage-lokijs.test.ts'; import './unit/rx-storage-dexie.test.ts'; import './unit/rx-storage-remote.test.ts'; - - import './unit/instance-of-check.test.ts'; import './unit/rx-schema.test.ts'; import './unit/bug-report.test.ts'; @@ -54,6 +52,7 @@ import './unit/reactive-collection.test.ts'; import './unit/reactive-document.test.ts'; import './unit/cleanup.test.ts'; import './unit/hooks.test.ts'; +import './unit/rx-pipeline.test.ts'; import './unit/orm.test.ts'; import './unit/replication-protocol.test.ts'; import './unit/replication.test.ts'; diff --git a/test/unit/rx-pipeline.test.ts b/test/unit/rx-pipeline.test.ts new file mode 100644 index 00000000000..e9fed965e72 --- /dev/null +++ b/test/unit/rx-pipeline.test.ts @@ -0,0 +1,288 @@ +import assert from 'assert'; +import config, { describeParallel } from './config.ts'; + +import { + RxDocument, + addRxPlugin, + promiseWait, + randomCouchString +} from '../../plugins/core/index.mjs'; +import { + HumanWithTimestampDocumentType +} from '../../plugins/test-utils/index.mjs'; +import { + schemaObjects, + humansCollection +} from '../../plugins/test-utils/index.mjs'; +import { RxDBPipelinePlugin } from '../../plugins/pipeline/index.mjs'; +addRxPlugin(RxDBPipelinePlugin); +import { RxDBLeaderElectionPlugin } from '../../plugins/leader-election/index.mjs'; +addRxPlugin(RxDBLeaderElectionPlugin); + +describeParallel('rx-pipeline.test.js', () => { + if ( + config.storage.name.includes('random-delay') + ) { + // TODO + return; + } + describe('basics', () => { + it('add and remove a pipeline', async () => { + const c1 = await humansCollection.create(0); + await c1.database.waitForLeadership(); + const c2 = await humansCollection.create(0); + const pipeline = await c1.addPipeline({ + destination: c2, + handler: async (docs) => { + for (const doc of docs) { + await c2.insert(schemaObjects.humanData(doc.passportId)); + } + }, + identifier: randomCouchString(10) + }); + await c1.insert(schemaObjects.humanData('foobar')); + await pipeline.destroy(); + c1.database.destroy(); + c2.database.destroy(); + }); + it('write some document depending on another', async () => { + const c1 = await humansCollection.create(0); + await c1.database.waitForLeadership(); + const c2 = await humansCollection.create(0); + await c1.addPipeline({ + destination: c2, + handler: async (docs) => { + for (const doc of docs) { + await c2.insert(schemaObjects.humanData(doc.passportId)); + } + }, + identifier: randomCouchString(10) + }); + + await c1.insert(schemaObjects.humanData('foobar')); + + /** + * Here we run the query on the destination directly after + * a write to the source. The pipeline should automatically halt + * the reads to the destination until the pipeline is idle. + */ + const doc2 = await c2.findOne().exec(true); + assert.strictEqual(doc2.passportId, 'foobar'); + + await c1.database.destroy(); + await c2.database.destroy(); + }); + it('should store the transformed data to the destination', async () => { + const c1 = await humansCollection.create(0); + await c1.database.waitForLeadership(); + const c2 = await humansCollection.create(0); + const pipeline = await c1.addPipeline({ + destination: c2, + handler: async (docs) => { + for (const doc of docs) { + await c2.insert(schemaObjects.humanData(doc.passportId)); + } + }, + identifier: randomCouchString(10) + }); + await c1.insert(schemaObjects.humanData('foobar')); + await pipeline.destroy(); + c1.database.destroy(); + c2.database.destroy(); + }); + }); + describe('.awaitIdle()', () => { + it('should have updated its internal timestamps', async () => { + const c1 = await humansCollection.create(0); + await c1.database.waitForLeadership(); + const c2 = await humansCollection.create(0); + const pipeline = await c1.addPipeline({ + destination: c2, + handler: async (docs) => { + for (const doc of docs) { + await c2.insert(schemaObjects.humanData(doc.passportId)); + } + }, + identifier: randomCouchString(10) + }); + await c1.insert(schemaObjects.humanData('foobar')); + const doc2 = await c2.findOne().exec(true); + assert.strictEqual(doc2.passportId, 'foobar'); + + assert.ok(pipeline.lastSourceDocTime.getValue() > 10); + assert.ok(pipeline.lastProcessedDocTime.getValue() > 10); + + c1.database.destroy(); + c2.database.destroy(); + }); + + }); + describe('checkpoints', () => { + it('should continue from the correct checkpoint', async () => { + const dbName = randomCouchString(10); + const identifier = randomCouchString(10); + const cDestination = await humansCollection.create(0); + const c1 = await humansCollection.createHumanWithTimestamp(1, dbName); + const ids: string[] = []; + const handler = (docs: RxDocument[]) => { + for (const doc of docs) { + if (ids.includes(doc.primary)) { + throw new Error('duplicate id ' + doc.primary); + } + ids.push(doc.primary); + } + }; + await c1.addPipeline({ + destination: cDestination, + handler, + identifier + }); + await c1.insert(schemaObjects.humanWithTimestampData()); + + await cDestination.database.destroy(); + await c1.database.destroy(); + }); + }); + describe('multiInstance', () => { + if ( + !config.storage.hasMultiInstance || + config.storage.name === 'remote' // TODO + ) { + return; + } + it('should only run the pipeline at the leader', async () => { + const identifier = randomCouchString(10); + const name = randomCouchString(10); + const c1 = await humansCollection.createMultiInstance(name); + await c1.database.waitForLeadership(); + const c2 = await humansCollection.createMultiInstance(name); + const runAt: string[] = []; + await c1.addPipeline({ + destination: c2, + handler: () => { + runAt.push('c1'); + }, + identifier + }); + await c2.addPipeline({ + destination: c2, + handler: () => { + runAt.push('c2'); + }, + identifier + }); + await c1.insert(schemaObjects.humanData()); + await c2.insert(schemaObjects.humanData()); + + assert.deepStrictEqual(runAt, ['c1']); + + c1.database.destroy(); + c2.database.destroy(); + }); + it('should halt reads on other tab while pipeline is running', async () => { + const identifier = randomCouchString(10); + const name = randomCouchString(10); + const c1 = await humansCollection.createMultiInstance(name); + await c1.database.waitForLeadership(); + const c2 = await humansCollection.createMultiInstance(name); + const runAt: string[] = []; + await c1.addPipeline({ + destination: c2, + handler: async () => { + runAt.push('c1.1'); + await promiseWait(50); + runAt.push('c1.2'); + }, + identifier + }); + await c2.addPipeline({ + destination: c2, + handler: () => { + runAt.push('c2'); + }, + identifier + }); + await c1.insert(schemaObjects.humanData()); + runAt.push('doneInsert'); + await c2.find().exec(); + runAt.push('doneQuery'); + + assert.deepStrictEqual(runAt, ['doneInsert', 'c1.1', 'c1.2', 'doneQuery']); + + c1.database.destroy(); + c2.database.destroy(); + }); + }); + describe('transactional behavior', () => { + it('should not block reads/writes that come from inside the pipeline handler', async () => { + const c1 = await humansCollection.create(0); + await c1.database.waitForLeadership(); + const c2 = await humansCollection.create(0); + const pipeline = await c1.addPipeline({ + destination: c2, + handler: async function myHandler1(docs) { + await c2.find().exec(); + await c1.find().exec(); + + for (const doc of docs) { + await c2.insert(schemaObjects.humanData(doc.passportId)); + } + }, + identifier: randomCouchString(10) + }); + + await c1.insert(schemaObjects.humanData('foobar')); + await pipeline.awaitIdle(); + + c1.database.destroy(); + c2.database.destroy(); + }); + it('should not block reads that come from inside the pipeline handler when already cached outside', async () => { + const c1 = await humansCollection.create(0); + await c1.database.waitForLeadership(); + const c2 = await humansCollection.create(0); + + const cachedQuery = c2.find({ selector: { passportId: { $ne: 'foobar' } } }); + await cachedQuery.exec(); + + const pipeline = await c1.addPipeline({ + destination: c2, + handler: async function myHandler2() { + await cachedQuery.exec(); + }, + identifier: randomCouchString(10) + }); + await c1.insert(schemaObjects.humanData('foobar')); + await pipeline.awaitIdle(); + + c1.database.destroy(); + c2.database.destroy(); + }); + it('should be able to do writes dependent on reads', async () => { + const c1 = await humansCollection.create(0); + await c1.database.waitForLeadership(); + const c2 = await humansCollection.create(1); + + await c1.addPipeline({ + destination: c2, + handler: async function myHandler3() { + const c2Docs = await c2.find().exec(); + for (const doc of c2Docs) { + const useData = doc.toMutableJSON(true); + useData.firstName = 'foobar'; + await c2.upsert(useData); + } + }, + identifier: randomCouchString(10) + }); + await c1.insert(schemaObjects.humanData('foobar')); + + const c2After = await c2.findOne({ selector: { firstName: 'foobar' } }).exec(true); + assert.strictEqual(c2After.firstName, 'foobar'); + + + c1.database.destroy(); + c2.database.destroy(); + }); + }); +}); diff --git a/test/unit/rx-storage-implementations.test.ts b/test/unit/rx-storage-implementations.test.ts index bbc76de4cf0..6cdc04dfd4a 100644 --- a/test/unit/rx-storage-implementations.test.ts +++ b/test/unit/rx-storage-implementations.test.ts @@ -64,7 +64,8 @@ import { EXAMPLE_REVISION_2, EXAMPLE_REVISION_3, EXAMPLE_REVISION_4, - HumanDocumentType + HumanDocumentType, + isDeno } from '../../plugins/test-utils/index.mjs'; import { compressObject } from 'jsonschema-key-compression'; @@ -2129,7 +2130,7 @@ describeParallel('rx-storage-implementations.test.ts (implementation: ' + config * has to workaround any problems with that. */ it('should be able to insert and fetch many documents', async () => { - if (config.storage.name === 'denokv') { + if (isDeno) { // TODO return; }