Skip to content

Commit

Permalink
RxPipeline (#6235)
Browse files Browse the repository at this point in the history
* CHORE

* CHORE

* CHORE

* CHORE

* CHORE

* CHORE

* ADD pipeline test

* FIX typos

* FIX lint

* FIX do not use eval

* FIX fast tests

* FIX

* FIX stack error

* TRY fix

* FIX lint

* FIX checkpoints

* TRY Fix error

* FIX wrong checkpoints

* FIX operation on destroyed db

* ADD test logs

* FIX randomly failing foundationdb

* FIX deno writes failure

* UPDATE deno

* FIX Deno

* FIX deno 517

* FIX random-delay

* ADD docs

* FIX typos

* FIX import
  • Loading branch information
pubkey committed Aug 14, 2024
1 parent 8e9ae7b commit 071bebc
Show file tree
Hide file tree
Showing 19 changed files with 1,015 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ jobs:
- uses: denoland/setup-deno@v1
with:
# https://github.com/denoland/deno/releases
deno-version: "1.38.5"
deno-version: "1.44.1"
- name: run deno tests:dexie
run: |
sudo npm i -g cross-env
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
# RxDB Changelog

<!-- CHANGELOG NEWEST -->

- ADD [RxPipeline Plugin](https://rxdb.info/rx-pipeline.html).
- FIX (denoKV RxStorage) retry writes when db is locked.
<!-- ADD new changes here! -->

<!-- /CHANGELOG NEWEST -->
Expand Down
163 changes: 163 additions & 0 deletions docs-src/docs/rx-pipeline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# RxPipeline (beta)

The RxPipeline plugin enables you to run operations depending on writes to a collection.
Whenever a write happens on the source collection of a pipeline, a handler is called to process the writes and run operations on another collection.

You could have a similar behavior as observing the collection stream and process data on emits:

```ts
mySourceCollection.$.subscribe(event => {/* ...process...*/});
```

While this could work in some cases, it causes many problems that are fixed by using the pipeline plugin instead:
- In a RxPipeline, only the [Leading Instance](./leader-election.md) runs the operations. For example when you have multiple browser tabs open, only one will run the processing and when that tab is closed, another tab will become elected leader and continue the pipeline processing.
- On sudden stops and restarts of the JavaScript process, the processing will continue at the correct checkpoint and not miss out any documents even on unexpected crashes.
- Reads/Writes on the destination collection are halted while the pipeline is processing. This ensures your queries only return fully processed documents and no partial results.



## Creating a RxPipeline

Pipelines are created on top of a source [RxCollection](./rx-collection.md) and have another `RxCollection` as destination. An identifier is used to identify the state of the pipeline so that different pipelines have a different processing checkpoint state. A plain JavaScript function `handler` is used to process the data of the source collection writes.

```ts
const pipeline = await mySourceCollection.addPipeline({
identifier: 'my-pipeline',
destination: myDestinationCollection,
handler: async (docs) => {
/**
* Here you can process the documents and to writes to
* the destination collection.
*/
for (const doc of docs) {
await myDestinationCollection.insert({
id: doc.primary,
category: doc.category
});
}
}
});
```

:::warning beta
The pipeline plugin is in **beta** mode and the API might be changed without a major RxDB release.
:::


## Pipeline handlers must be idempotent

Because a JavaScript process can exit at any time, like when the user closes a browser tab, the pipeline handler function must be idempotent. This means when it only runs partially and is started again with the same input, it should still end up in the correct results.

## Pipeline handlers must not throw

Pipeline handlers must never throw. If you run operations inside of the handler that might cause errors, you must wrap the handlers code with a `try catch` by yourself and also handle retries.

## Be careful when doing http requests in the handler

When you run http requests inside of your handler, you no longer have an [offline first](./offline-first.md) application because reads to the destination collection will be blocked until all handlers have finished. When your client is offline, therefore the collection is blocked for reads and writes.

## Use Cases for RxPipeline

The RxPipeline is a handy building block for different features and plugins. You can use it to aggregate data or restructure local data.

### UseCase: Re-Index data that comes from replication

Sometimes you want to [replicate](./replication.md) atomic documents over the wire but locally you want to split these documents for better indexing. For example you replicate email documents that have multiple receivers in a string-array. While string-arrays cannot be indexes, locally you need a way to query for all emails of a given receiver.
To handle this case you can set up a RxPipeline that writes the mapping into a separate collection:

```ts
const pipeline = await emailCollection.addPipeline({
identifier: 'map-email-receivers',
destination: emailByReceiverCollection,
handler: async (docs) => {
for (const doc of docs) {
// remove previous mapping
await emailByReceiverCollection.find({emailId: doc.primary}).remove();
// add new mapping
if(!doc.deleted) {
await emailByReceiverCollection.bulkInsert(
doc.receivers.map(receiver => ({
emailId: doc.primary,
receiver: receiver
}))
);
}
}
}
});
```

With this you can efficiently query for "all emails that a person received" by running:

```ts
const mailIds = await emailByReceiverCollection.find({receiver: 'foobar@example.com'}).exec();
```

### UseCase: Fulltext Search

You can utilize the pipeline plugin to index text data for efficient fulltext search.

```ts
const pipeline = await emailCollection.addPipeline({
identifier: 'email-fulltext-search',
destination: mailByWordCollection,
handler: async (docs) => {
for (const doc of docs) {
// remove previous mapping
await mailByWordCollection.find({emailId: doc.primary}).remove();
// add new mapping
if(!doc.deleted) {
const words = doc.text.split(' ');
await mailByWordCollection.bulkInsert(
words.map(word => ({
emailId: doc.primary,
word: word
}))
);
}
}
}
});
```

With this you can efficiently query for "all emails that contain a given word" by running:

```ts
const mailIds = await emailByReceiverCollection.find({word: 'foobar'}).exec();
```

### UseCase: Download data based on source documents

When you have to fetch data for each document of a collection from a server, you can use the pipeline to ensure all documents have their data downloaded and no document is missed out.

```ts
const pipeline = await emailCollection.addPipeline({
identifier: 'download-data',
destination: serverDataCollection,
handler: async (docs) => {
for (const doc of docs) {
const response = await fetch('https://example.com/doc/' + doc.primary);
const serverData = await response.json();
await serverDataCollection.upsert({
id: doc.primary,
data: serverData
});
}
}
});
```


## RxPipeline method

### awaitIdle()

You can await the idleness of a pipeline with `await myRxPipeline.awaitIdle()`. This will await a promise that resolved when the pipeline has processed all documents and is not running anymore.

### destroy()

`await myRxPipeline.destroy()` stops the pipeline so that is no longer doing stuff. This is automatically called when the RxCollection or RxDatabase of the pipeline is destroyed.

### remove()

`await myRxPipeline.remove()` removes the pipeline and all metadata which it has stored. Recreating the pipeline afterwards will start processing all source document from scratch.
1 change: 1 addition & 0 deletions docs-src/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const sidebars = {
'rx-document',
'rx-query',
'rx-attachment',
'rx-pipeline',
{
type: 'category',
label: '💾 RxStorage',
Expand Down
8 changes: 7 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,12 @@
"import": "./dist/esm/plugins/state/index.js",
"default": "./dist/esm/plugins/state/index.js"
},
"./plugins/pipeline": {
"types": "./dist/types/plugins/pipeline/index.d.ts",
"require": "./dist/cjs/plugins/pipeline/index.js",
"import": "./dist/esm/plugins/pipeline/index.js",
"default": "./dist/esm/plugins/pipeline/index.js"
},
"./plugins/validate-ajv": {
"types": "./dist/types/plugins/validate-ajv/index.d.ts",
"require": "./dist/cjs/plugins/validate-ajv/index.js",
Expand Down Expand Up @@ -582,4 +588,4 @@
"webpack-cli": "5.1.4",
"webpack-dev-server": "5.0.4"
}
}
}
5 changes: 4 additions & 1 deletion src/change-event-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import type {
RxChangeEvent,
RxCollection
} from './types/index.d.ts';
import { appendToArray, requestIdlePromiseNoQueue } from './plugins/utils/index.ts';
import {
appendToArray,
requestIdlePromiseNoQueue
} from './plugins/utils/index.ts';

/**
* This buffer rembemers previous change events
Expand Down
131 changes: 131 additions & 0 deletions src/plugins/pipeline/flagged-functions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import { ensureNotFalsy } from '../utils/index.ts';

/**
* This is the most hacky thing we do in RxDB.
* When a pipeline "transaction" is running,
* we have to make all calls to the collection from the outside
* wait while still make it possible to run reads and writes
* from inside the transaction.
*
* We can decide where the call came from by checking the stack `new Error().stack`
* for a random "flag".
* But creating random flagged functions requires eval which we should not use.
* Instead we have a list of some flagged functions here
* that can be used and checked for in the stacktrace.
*
*
* When doing this with eval() instead it would look like:
* ```ts
* eval(`
* async function ${this.secretFunctionName}(docs){ const x = await _this.handler(docs); return x; }
* o.${this.secretFunctionName} = ${this.secretFunctionName};
* `);
* await o[this.secretFunctionName](rxDocuments);
*
* ```
*/
async function rx_pipeline_fn_1_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_2_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_3_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_4_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_5_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_6_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_7_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_8_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_9_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_10_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_11_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_12_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_13_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_14_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_15_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_16_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_17_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_18_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_19_(fn: any) {
return await fn();
}
async function rx_pipeline_fn_20_(fn: any) {
return await fn();
}





export const FLAGGED_FUNCTIONS = {
rx_pipeline_fn_1_,
rx_pipeline_fn_2_,
rx_pipeline_fn_3_,
rx_pipeline_fn_4_,
rx_pipeline_fn_5_,
rx_pipeline_fn_6_,
rx_pipeline_fn_7_,
rx_pipeline_fn_8_,
rx_pipeline_fn_9_,
rx_pipeline_fn_10_,
rx_pipeline_fn_11_,
rx_pipeline_fn_12_,
rx_pipeline_fn_13_,
rx_pipeline_fn_14_,
rx_pipeline_fn_15_,
rx_pipeline_fn_16_,
rx_pipeline_fn_17_,
rx_pipeline_fn_18_,
rx_pipeline_fn_19_,
rx_pipeline_fn_20_,
} as const;



const ids: (keyof typeof FLAGGED_FUNCTIONS)[] = Object.keys(FLAGGED_FUNCTIONS) as any;

export function blockFlaggedFunctionKey(): keyof typeof FLAGGED_FUNCTIONS {
/**
* If this happens and we have no more flagged keys left
* it means that more pipeline handlers are running in parallel.
* To fix this, add more functions.
*/
const id = ensureNotFalsy(ids.pop(), 'no flagged keys left');
return id;
}

export function releaseFlaggedFunctionKey(key: keyof typeof FLAGGED_FUNCTIONS) {
ids.push(key);
}
18 changes: 18 additions & 0 deletions src/plugins/pipeline/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import type {
RxPlugin
} from '../../types/index.d.ts';
import { addPipeline } from './rx-pipeline.ts';

export type * from './types.ts';
export * from './flagged-functions.ts';
export * from './rx-pipeline.ts';

export const RxDBPipelinePlugin: RxPlugin = {
name: 'pipeline',
rxdb: true,
prototypes: {
RxCollection(proto: any) {
proto.addPipeline = addPipeline;
}
}
};
Loading

0 comments on commit 071bebc

Please sign in to comment.