From ffa16e4533c374c2f78492171a135707c5fc75c2 Mon Sep 17 00:00:00 2001 From: Daniel Meyer <8926560+pubkey@users.noreply.github.com> Date: Thu, 6 Apr 2023 03:37:33 +0200 Subject: [PATCH] Feature/storage-remote mode option (#4609) * ADD `mode` option to remote storage * ADD default mode is storage * FIX tests * FIX typo --- CHANGELOG.md | 1 + docs-src/rx-storage-remote.md | 15 +- src/custom-index.ts | 2 +- .../electron/rx-storage-ipc-renderer.ts | 46 ++-- .../replication-websocket/websocket-client.ts | 152 ++++------- src/plugins/storage-remote-websocket/index.ts | 36 +-- src/plugins/storage-remote-websocket/types.ts | 10 +- .../storage-remote/message-channel-cache.ts | 80 ++++++ .../storage-remote/rx-storage-remote.ts | 52 +++- .../storage-remote/storage-remote-types.ts | 33 ++- src/plugins/utils/utils-map.ts | 9 + src/plugins/utils/utils-other.ts | 9 - src/plugins/utils/utils-promise.ts | 5 +- src/rx-storage-helper.ts | 2 +- test/unit/config.ts | 6 +- test/unit/rx-storage-remote.test.ts | 237 +++++++++++++++++- 16 files changed, 512 insertions(+), 183 deletions(-) create mode 100644 src/plugins/storage-remote/message-channel-cache.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index e14e98669e0..74eae986098 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - FIX `requestIdlePromise()` must run in a queue. - ADD Export ReplicationOptions type [#4606](https://github.com/pubkey/rxdb/pull/4606) +- ADD `mode` option to remote storage diff --git a/docs-src/rx-storage-remote.md b/docs-src/rx-storage-remote.md index c7d1df7d2fd..dfca568b1dd 100644 --- a/docs-src/rx-storage-remote.md +++ b/docs-src/rx-storage-remote.md @@ -8,7 +8,7 @@ The remote storage plugin is used in many RxDB plugins like the [worker](./rx-st ## Usage -The remote storage communicates over a message channel which has to implement the `messages$` observable and a `send()` function on both sides. +The remote storage communicates over a message channel which has to implement the `messageChannelCreator` function which returns an object that has a `messages$` observable and a `send()` function on both sides and a `close()` function that closes the RemoteMessageChannel. ```ts @@ -18,17 +18,18 @@ import { getRxStorageRemote } from 'rxdb/plugins/storage-remote'; const storage = getRxStorageRemote({ identifier: 'my-id', statics: RxStorageDefaultStatics, - messages$: new Subject(), - send(msg) { - // send to remote storage - } + mode: 'storage', + messageChannelCreator: () => Promise.resolve({ + messages$: new Subject(), + send(msg) { + // send to remote storage + } + }) }); const myDb = await createRxDatabase({ storage }); - - // on the remote import { getRxStorageDexie } from 'rxdb/plugins/storage-dexie'; import { exposeRxStorageRemote } from 'rxdb/plugins/storage-remote'; diff --git a/src/custom-index.ts b/src/custom-index.ts index 8937494ed7f..aacfc529c46 100644 --- a/src/custom-index.ts +++ b/src/custom-index.ts @@ -177,7 +177,7 @@ export function getPrimaryKeyFromIndexableString( primaryKeyLength: number ): string { const paddedPrimaryKey = indexableString.slice(primaryKeyLength * -1); - // we can savely trim here because the primary key is not allowed to start or end with a space char. + // we can safely trim here because the primary key is not allowed to start or end with a space char. const primaryKey = paddedPrimaryKey.trim(); return primaryKey; } diff --git a/src/plugins/electron/rx-storage-ipc-renderer.ts b/src/plugins/electron/rx-storage-ipc-renderer.ts index c87af123255..939e8a021a9 100644 --- a/src/plugins/electron/rx-storage-ipc-renderer.ts +++ b/src/plugins/electron/rx-storage-ipc-renderer.ts @@ -11,6 +11,7 @@ import type { import { IPC_RENDERER_KEY_PREFIX } from './electron-helper'; +import { PROMISE_RESOLVE_VOID } from '../utils'; export type RxStorageIpcRendererSettings = { /** @@ -21,6 +22,7 @@ export type RxStorageIpcRendererSettings = { key: string; statics: RxStorageStatics; ipcRenderer: any; + mode: RxStorageRemoteSettings['mode']; }; export type RxStorageIpcRenderer = RxStorageRemote; @@ -32,28 +34,34 @@ export function getRxStorageIpcRenderer( settings.key ].join('|'); - const messages$ = new Subject(); - settings.ipcRenderer.on(channelId, (_event: any, message: any) => { - messages$.next(message); - }); - - - settings.ipcRenderer.postMessage( - channelId, - false - ); - - const send: RxStorageRemoteSettings['send'] = (msg) => { - settings.ipcRenderer.postMessage( - channelId, - msg - ); - }; const storage = getRxStorageRemote({ identifier: 'electron-ipc-renderer', statics: settings.statics, - messages$, - send + mode: settings.mode, + messageChannelCreator() { + const messages$ = new Subject(); + const listener = (_event: any, message: any) => { + messages$.next(message); + }; + settings.ipcRenderer.on(channelId, listener); + settings.ipcRenderer.postMessage( + channelId, + false + ); + return Promise.resolve({ + messages$, + send(msg) { + settings.ipcRenderer.postMessage( + channelId, + msg + ); + }, + close() { + settings.ipcRenderer.removeListener(channelId, listener); + return PROMISE_RESOLVE_VOID; + } + }); + }, }); return storage; } diff --git a/src/plugins/replication-websocket/websocket-client.ts b/src/plugins/replication-websocket/websocket-client.ts index db45d088822..e1589e1ac98 100644 --- a/src/plugins/replication-websocket/websocket-client.ts +++ b/src/plugins/replication-websocket/websocket-client.ts @@ -8,8 +8,6 @@ import ReconnectingWebSocket from 'reconnecting-websocket'; import IsomorphicWebSocket from 'isomorphic-ws'; import { errorToPlainJson, - getFromMapOrCreate, - getFromMapOrThrow, randomCouchString, toArray } from '../../plugins/utils'; @@ -21,17 +19,14 @@ import { BehaviorSubject } from 'rxjs'; import { - RxDatabase, RxError, RxReplicationWriteToMasterRow } from '../../types'; import { newRxError } from '../../rx-error'; -export type WebsocketWithRefCount = { +export type WebsocketClient = { url: string; socket: ReconnectingWebSocket; - refCount: number; - openPromise: Promise; connected$: BehaviorSubject; message$: Subject; error$: Subject; @@ -44,7 +39,7 @@ export type WebsocketWithRefCount = { * so we directly check the correctness in RxDB to ensure that we can * throw a helpful error. */ -function ensureIsWebsocket(w: typeof IsomorphicWebSocket) { +export function ensureIsWebsocket(w: typeof IsomorphicWebSocket) { const is = typeof w !== 'undefined' && !!w && w.CLOSING === 2; if (!is) { console.dir(w); @@ -52,107 +47,60 @@ function ensureIsWebsocket(w: typeof IsomorphicWebSocket) { } } -/** - * Reuse the same socket even when multiple - * collection replicate with the same server at once. - */ -export const WEBSOCKET_BY_CACHE_KEY: Map = new Map(); -export async function getWebSocket( - url: string, - /** - * The value of RxDatabase.token. - */ - databaseToken: string -): Promise { - /** - * Also use the database token as cache-key - * to make it easier to test and debug - * multi-instance setups. - */ - const cacheKey = url + '|||' + databaseToken; - - - const has = getFromMapOrCreate( - WEBSOCKET_BY_CACHE_KEY, - cacheKey, - () => { - ensureIsWebsocket(IsomorphicWebSocket); - const wsClient = new ReconnectingWebSocket( - url, - [], - { - WebSocket: IsomorphicWebSocket - } - ); - - const connected$ = new BehaviorSubject(false); - const openPromise = new Promise(res => { - wsClient.onopen = () => { - connected$.next(true); - res(); - }; - }); - wsClient.onclose = () => { - connected$.next(false); - }; - - const message$ = new Subject(); - wsClient.onmessage = (messageObj) => { - const message = JSON.parse(messageObj.data); - message$.next(message); - }; - - const error$ = new Subject(); - wsClient.onerror = (err) => { - const emitError = newRxError('RC_STREAM', { - errors: toArray(err).map((er: any) => errorToPlainJson(er)), - direction: 'pull' - }); - error$.next(emitError); - }; - - return { - url, - socket: wsClient, - openPromise, - refCount: 1, - connected$, - message$, - error$ - }; - }, - (value) => { - value.refCount = value.refCount + 1; +export async function createWebSocketClient(url: string): Promise { + ensureIsWebsocket(IsomorphicWebSocket); + const wsClient = new ReconnectingWebSocket( + url, + [], + { + WebSocket: IsomorphicWebSocket } ); - await has.openPromise; - return has; -} - -export function removeWebSocketRef( - url: string, - database: RxDatabase -) { - const cacheKey = url + '|||' + database.token; - const obj = getFromMapOrThrow(WEBSOCKET_BY_CACHE_KEY, cacheKey); - obj.refCount = obj.refCount - 1; - if (obj.refCount === 0) { - WEBSOCKET_BY_CACHE_KEY.delete(cacheKey); - obj.connected$.complete(); - obj.socket.close(); - } -} + const connected$ = new BehaviorSubject(false); + await new Promise(res => { + wsClient.onopen = () => { + connected$.next(true); + res(); + }; + }); + wsClient.onclose = () => { + connected$.next(false); + }; + + const message$ = new Subject(); + wsClient.onmessage = (messageObj) => { + const message = JSON.parse(messageObj.data); + message$.next(message); + }; + + const error$ = new Subject(); + wsClient.onerror = (err) => { + const emitError = newRxError('RC_STREAM', { + errors: toArray(err).map((er: any) => errorToPlainJson(er)), + direction: 'pull' + }); + error$.next(emitError); + }; + + + return { + url, + socket: wsClient, + connected$, + message$, + error$ + }; +} export async function replicateWithWebsocketServer( options: WebsocketClientOptions ): Promise> { - const socketState = await getWebSocket(options.url, options.collection.database.token); - const wsClient = socketState.socket; - - const messages$ = socketState.message$; + const websocketClient = await createWebSocketClient(options.url); + const wsClient = websocketClient.socket; + const messages$ = websocketClient.message$; let requestCounter = 0; const requestFlag = randomCouchString(10); @@ -209,9 +157,9 @@ export async function replicateWithWebsocketServer( } }); - socketState.error$.subscribe(err => replicationState.subjects.error.next(err)); + websocketClient.error$.subscribe(err => replicationState.subjects.error.next(err)); - socketState.connected$.subscribe(isConnected => { + websocketClient.connected$.subscribe(isConnected => { if (isConnected) { /** * When the client goes offline and online again, @@ -235,6 +183,6 @@ export async function replicateWithWebsocketServer( } }); - options.collection.onDestroy.push(() => removeWebSocketRef(options.url, options.collection.database)); + options.collection.onDestroy.push(() => websocketClient.socket.close()); return replicationState; } diff --git a/src/plugins/storage-remote-websocket/index.ts b/src/plugins/storage-remote-websocket/index.ts index 9b1b85041a2..6e0a95d4335 100644 --- a/src/plugins/storage-remote-websocket/index.ts +++ b/src/plugins/storage-remote-websocket/index.ts @@ -3,11 +3,11 @@ import type { WebSocket } from 'ws'; import { - getFromMapOrThrow, - randomCouchString + PROMISE_RESOLVE_VOID, + getFromMapOrThrow } from '../../plugins/utils'; import { - getWebSocket, + createWebSocketClient, startSocketServer } from '../replication-websocket'; import { exposeRxStorageRemote } from '../storage-remote/remote'; @@ -79,30 +79,34 @@ export function startRxStorageRemoteWebsocketServer( }; } - - export function getRxStorageRemoteWebsocket( options: RxStorageRemoteWebsocketClientOptions ): RxStorageRemoteWebsocketClient { const identifier = [ options.url, - 'rx-remote-storage-websocket', - options.disableCache ? randomCouchString() : '' + 'rx-remote-storage-websocket' ].join(''); - const messages$ = new Subject(); - const websocketClientPromise = getWebSocket(options.url, identifier); const storage = getRxStorageRemote({ identifier, statics: options.statics, - messages$, - send(msg) { - return websocketClientPromise - .then(websocketClient => websocketClient.socket.send(JSON.stringify(msg))); + mode: options.mode, + async messageChannelCreator() { + const messages$ = new Subject(); + const websocketClient = await createWebSocketClient(options.url); + websocketClient.message$.subscribe(msg => messages$.next(msg)); + return { + messages$, + send(msg) { + return websocketClient.socket.send(JSON.stringify(msg)); + }, + close() { + websocketClient.socket.close(); + return PROMISE_RESOLVE_VOID; + } + }; + } }); - websocketClientPromise.then((websocketClient) => { - websocketClient.message$.subscribe(msg => messages$.next(msg)); - }); return storage; } diff --git a/src/plugins/storage-remote-websocket/types.ts b/src/plugins/storage-remote-websocket/types.ts index 4c3ac9873c6..b30a61b74ca 100644 --- a/src/plugins/storage-remote-websocket/types.ts +++ b/src/plugins/storage-remote-websocket/types.ts @@ -5,7 +5,8 @@ import type { ServerOptions, ClientOptions } from 'ws'; import type { RxDatabase, RxStorage, RxStorageStatics } from '../../types'; import type { CustomRequestHandler, - RxStorageRemoteExposeType + RxStorageRemoteExposeType, + RxStorageRemoteSettings } from '../storage-remote/storage-remote-types'; import { RxStorageRemote } from '../storage-remote'; @@ -23,12 +24,7 @@ export type RxStorageRemoteWebsocketServerState = { export type RxStorageRemoteWebsocketClientOptions = ClientOptions & { statics: RxStorageStatics; url: string; - /** - * By default, sockets are cached and reused by url. - * You can disable this behavior by setting reuseSocketConnection=false - * This can be useful in tests to simulate multiple clients. - */ - disableCache?: boolean; + mode: RxStorageRemoteSettings['mode']; }; export type RxStorageRemoteWebsocketClient = RxStorageRemote; diff --git a/src/plugins/storage-remote/message-channel-cache.ts b/src/plugins/storage-remote/message-channel-cache.ts new file mode 100644 index 00000000000..d1dde5c8b7b --- /dev/null +++ b/src/plugins/storage-remote/message-channel-cache.ts @@ -0,0 +1,80 @@ +import { + PROMISE_RESOLVE_VOID, + getFromMapOrCreate, + getFromMapOrThrow +} from '../utils'; +import { + RemoteMessageChannel, + RxStorageRemoteSettings +} from './storage-remote-types'; + +export type RemoteMessageChannelCacheItem = { + identifier: string; + cacheKey: string; + messageChannel: Promise; + refCount: number; +}; + +export const MESSAGE_CHANNEL_CACHE_BY_IDENTIFIER = new Map>(); +const CACHE_ITEM_BY_MESSAGE_CHANNEL = new WeakMap(); + +function getMessageChannelCache( + identifier: string +) { + return getFromMapOrCreate( + MESSAGE_CHANNEL_CACHE_BY_IDENTIFIER, + identifier, + () => new Map() + ); +} + +export function getMessageChannel( + settings: RxStorageRemoteSettings, + cacheKeys: string[] +): Promise { + const cacheKey = getCacheKey(settings, cacheKeys); + const cacheItem = getFromMapOrCreate( + getMessageChannelCache(settings.identifier), + cacheKey, + () => { + const newCacheItem: RemoteMessageChannelCacheItem = { + identifier: settings.identifier, + cacheKey, + refCount: 1, + messageChannel: settings.messageChannelCreator() + .then((messageChannel) => { + CACHE_ITEM_BY_MESSAGE_CHANNEL.set(messageChannel, newCacheItem); + return messageChannel; + }), + }; + return newCacheItem; + }, + (existingCacheItem) => { + existingCacheItem.refCount = existingCacheItem.refCount + 1; + } + ); + return cacheItem.messageChannel; +} + + +export function closeMessageChannel( + messageChannel: RemoteMessageChannel +): Promise { + const cacheItem = getFromMapOrThrow(CACHE_ITEM_BY_MESSAGE_CHANNEL, messageChannel); + cacheItem.refCount = cacheItem.refCount - 1; + if (cacheItem.refCount === 0) { + getMessageChannelCache(cacheItem.identifier).delete(cacheItem.cacheKey); + return messageChannel.close(); + } else { + return PROMISE_RESOLVE_VOID; + } +} + +function getCacheKey( + settings: RxStorageRemoteSettings, + cacheKeys: string[] +): string { + cacheKeys = cacheKeys.slice(0); + cacheKeys.unshift(settings.identifier); + return cacheKeys.join('||'); +} diff --git a/src/plugins/storage-remote/rx-storage-remote.ts b/src/plugins/storage-remote/rx-storage-remote.ts index 5a20d9e6d60..2e2f5c9a5fb 100644 --- a/src/plugins/storage-remote/rx-storage-remote.ts +++ b/src/plugins/storage-remote/rx-storage-remote.ts @@ -31,14 +31,13 @@ import type { RxStorageRemoteInternals, RxStorageRemoteSettings } from './storage-remote-types'; - - +import { closeMessageChannel, getMessageChannel } from './message-channel-cache'; export class RxStorageRemote implements RxStorage { public readonly statics: RxStorageStatics; public readonly name: string = 'remote'; - private requestIdSeed: string = randomCouchString(10); + private seed: string = randomCouchString(10); private lastRequestId: number = 0; constructor( public readonly settings: RxStorageRemoteSettings @@ -48,7 +47,7 @@ export class RxStorageRemote implements RxStorage public getRequestId() { const newId = this.lastRequestId++; - return this.requestIdSeed + '|' + newId; + return this.seed + '|' + newId; } async createStorageInstance( @@ -56,11 +55,29 @@ export class RxStorageRemote implements RxStorage ): Promise> { const connectionId = 'c|' + this.getRequestId(); + const cacheKeys: string[] = [ + 'mode-' + this.settings.mode + ]; + switch (this.settings.mode) { + case 'collection': + cacheKeys.push('collection-' + params.collectionName); + // eslint-disable-next-line no-fallthrough + case 'database': + cacheKeys.push('database-' + params.databaseName); + // eslint-disable-next-line no-fallthrough + case 'storage': + cacheKeys.push('seed-' + this.seed); + } + const messageChannel = await getMessageChannel( + this.settings, + cacheKeys + ); + const requestId = this.getRequestId(); - const waitForOkPromise = firstValueFrom(this.settings.messages$.pipe( + const waitForOkPromise = firstValueFrom(messageChannel.messages$.pipe( filter(msg => msg.answerTo === requestId) )); - this.settings.send({ + messageChannel.send({ connectionId, method: 'create', requestId, @@ -71,6 +88,7 @@ export class RxStorageRemote implements RxStorage if (waitForOkResult.error) { throw new Error('could not create instance ' + JSON.stringify(waitForOkResult.error)); } + return new RxStorageInstanceRemote( this, params.databaseName, @@ -78,19 +96,21 @@ export class RxStorageRemote implements RxStorage params.schema, { params, - connectionId + connectionId, + messageChannel }, params.options ); } async customRequest(data: In): Promise { + const messageChannel = await this.settings.messageChannelCreator(); const requestId = this.getRequestId(); const connectionId = 'custom|request|' + requestId; - const waitForAnswerPromise = firstValueFrom(this.settings.messages$.pipe( + const waitForAnswerPromise = firstValueFrom(messageChannel.messages$.pipe( filter(msg => msg.answerTo === requestId) )); - this.settings.send({ + messageChannel.send({ connectionId, method: 'custom', requestId, @@ -98,13 +118,16 @@ export class RxStorageRemote implements RxStorage }); const response = await waitForAnswerPromise; if (response.error) { + await messageChannel.close(); throw new Error('could not run customRequest(): ' + JSON.stringify({ data, error: response.error })); } else { + await messageChannel.close(); return response.return; } + } } @@ -124,7 +147,7 @@ export class RxStorageInstanceRemote implements RxStorageInstance ) { - this.messages$ = this.storage.settings.messages$.pipe( + this.messages$ = this.internals.messageChannel.messages$.pipe( filter(msg => msg.connectionId === this.internals.connectionId) ); this.subs.push( @@ -155,7 +178,7 @@ export class RxStorageInstanceRemote implements RxStorageInstance implements RxStorageInstance sub.unsubscribe()); this.changes$.complete(); await this.requestRemote('close', []); + await closeMessageChannel(this.internals.messageChannel); } async remove(): Promise { this.closed = true; await this.requestRemote('remove', []); + await closeMessageChannel(this.internals.messageChannel); } conflictResultionTasks(): Observable> { return this.conflicts$; @@ -223,5 +248,8 @@ export class RxStorageInstanceRemote implements RxStorageInstance; + close(): Promise; +}; + export type RxStorageRemoteSettings = { identifier: string; statics: RxStorageStatics; - send(msg: MessageToRemote): void; - messages$: Observable; + /** + * There are different modes + * that determine how many message channels are used. + * These modes can have different performance patterns. + * + * [default='storage'] + */ + mode?: + // create exactly one RemoteMessageChannel and reuse that everywhere. + | 'one' + // storage: create one RemoteMessageChannel per call to getRxStorage...() + | 'storage' + // database: create one RemoteMessageChannel for each database + | 'database' + // collection: create one RemoteMessageChannel for each collection + | 'collection'; + messageChannelCreator: () => Promise; }; export type RxStorageRemoteInternals = { params: RxStorageInstanceCreationParams; connectionId: string; + messageChannel: RemoteMessageChannel; }; export type RxStorageRemoteExposeSettingsBase = { diff --git a/src/plugins/utils/utils-map.ts b/src/plugins/utils/utils-map.ts index 808374c5373..1b83ce89c1c 100644 --- a/src/plugins/utils/utils-map.ts +++ b/src/plugins/utils/utils-map.ts @@ -1,5 +1,14 @@ + +export function getFromMapOrThrow(map: Map | WeakMap, key: K): V { + const val = map.get(key); + if (typeof val === 'undefined') { + throw new Error('missing value from map ' + key); + } + return val; +} + export function getFromMapOrCreate( map: Map | WeakMap, index: MapIndex, diff --git a/src/plugins/utils/utils-other.ts b/src/plugins/utils/utils-other.ts index 1d5596e9385..0db5861398e 100644 --- a/src/plugins/utils/utils-other.ts +++ b/src/plugins/utils/utils-other.ts @@ -16,15 +16,6 @@ export function ensureInteger(obj: unknown): number { return obj as number; } - -export function getFromMapOrThrow(map: Map | WeakMap, key: K): V { - const val = map.get(key); - if (typeof val === 'undefined') { - throw new Error('missing value from map ' + key); - } - return val; -} - /** * Using shareReplay() without settings will not unsubscribe * if there are no more subscribers. diff --git a/src/plugins/utils/utils-promise.ts b/src/plugins/utils/utils-promise.ts index 681f0b4ea4c..861bf5a52cf 100644 --- a/src/plugins/utils/utils-promise.ts +++ b/src/plugins/utils/utils-promise.ts @@ -18,7 +18,10 @@ export function toPromise(maybePromise: Promise | T): Promise { } } - +/** + * Reusing resolved promises has a better + * performance than creating new ones each time. + */ export const PROMISE_RESOLVE_TRUE: Promise = Promise.resolve(true); export const PROMISE_RESOLVE_FALSE: Promise = Promise.resolve(false); export const PROMISE_RESOLVE_NULL: Promise = Promise.resolve(null); diff --git a/src/rx-storage-helper.ts b/src/rx-storage-helper.ts index 929de29990a..524052a94e9 100644 --- a/src/rx-storage-helper.ts +++ b/src/rx-storage-helper.ts @@ -295,7 +295,7 @@ export function categorizeBulkWriteRows( documentInDb }; errors[id as any] = err; - break; + continue; } // handle attachments data diff --git a/test/unit/config.ts b/test/unit/config.ts index d40bf85cf59..b9a58a0822d 100644 --- a/test/unit/config.ts +++ b/test/unit/config.ts @@ -242,14 +242,16 @@ export function setDefaultStorage(storageKey: string) { getStorage: () => { return getRxStorageRemoteWebsocket({ statics: RxStorageDefaultStatics, - url: 'ws://localhost:18007' + url: 'ws://localhost:18007', + mode: 'storage' }); }, getPerformanceStorage() { return { storage: getRxStorageRemoteWebsocket({ statics: RxStorageDefaultStatics, - url: 'ws://localhost:18007' + url: 'ws://localhost:18007', + mode: 'storage' }), description: 'remote+dexie+fake-indexeddb' }; diff --git a/test/unit/rx-storage-remote.test.ts b/test/unit/rx-storage-remote.test.ts index 973987c6c7d..6cb15b94ec4 100644 --- a/test/unit/rx-storage-remote.test.ts +++ b/test/unit/rx-storage-remote.test.ts @@ -3,11 +3,12 @@ import assert from 'assert'; import config from './config'; import { - RxStorageDefaultStatics + RxStorageDefaultStatics, createRxDatabase, fillWithDefaultSettings, randomCouchString } from '../../'; import { nextPort } from '../helper/port-manager'; import * as humansCollections from '../helper/humans-collection'; import * as schemaObjects from '../helper/schema-objects'; +import * as schemas from '../helper/schemas'; import { getRxStorageRemoteWebsocket, startRxStorageRemoteWebsocketServer @@ -44,7 +45,8 @@ config.parallel('rx-storage-remote.test.ts', () => { 0, undefined, false, false, getRxStorageRemoteWebsocket({ statics: RxStorageDefaultStatics, - url: 'ws://localhost:' + port + url: 'ws://localhost:' + port, + mode: 'storage' }) ); const cols = [colServer, colClient]; @@ -63,6 +65,232 @@ config.parallel('rx-storage-remote.test.ts', () => { await colServer.database.destroy(); }); }); + describe('mode setting with RemoteMessageChannel reuse', () => { + const getStorage = (port: number) => getRxStorageRemoteWebsocket({ + statics: RxStorageDefaultStatics, + url: 'ws://localhost:' + port, + mode: 'one' + }); + it('mode: one', async () => { + const port = await nextPort(); + const colServer = await humansCollections.create(0, undefined, false, false, getRxStorageMemory()); + const server = await startRxStorageRemoteWebsocketServer({ + port, + database: colServer.database + }); + assert.ok(server); + + const storageInstanceA = await getStorage(port).createStorageInstance({ + databaseInstanceToken: randomCouchString(10), + databaseName: randomCouchString(10), + collectionName: 'human', + devMode: true, + multiInstance: false, + options: {}, + schema: fillWithDefaultSettings(schemas.human) + }); + const storageInstanceB = await getStorage(port).createStorageInstance({ + databaseInstanceToken: randomCouchString(10), + databaseName: randomCouchString(10), + collectionName: 'human', + devMode: true, + multiInstance: false, + options: {}, + schema: fillWithDefaultSettings(schemas.human) + }); + + assert.strictEqual( + storageInstanceA.internals.messageChannel, + storageInstanceB.internals.messageChannel + ); + + await storageInstanceA.close(); + await storageInstanceB.close(); + await colServer.database.destroy(); + }); + it('mode: storage', async () => { + const port = await nextPort(); + const colServer = await humansCollections.create(0, undefined, false, false, getRxStorageMemory()); + const server = await startRxStorageRemoteWebsocketServer({ + port, + database: colServer.database + }); + assert.ok(server); + + const storage = getRxStorageRemoteWebsocket({ + statics: RxStorageDefaultStatics, + url: 'ws://localhost:' + port, + mode: 'storage' + }); + const storageInstanceA = await storage.createStorageInstance({ + databaseInstanceToken: randomCouchString(10), + databaseName: randomCouchString(10), + collectionName: 'human', + devMode: true, + multiInstance: false, + options: {}, + schema: fillWithDefaultSettings(schemas.human) + }); + const storageInstanceB = await storage.createStorageInstance({ + databaseInstanceToken: randomCouchString(10), + databaseName: randomCouchString(10), + collectionName: 'human', + devMode: true, + multiInstance: false, + options: {}, + schema: fillWithDefaultSettings(schemas.human) + }); + const storageInstanceOther = await getStorage(port).createStorageInstance({ + databaseInstanceToken: randomCouchString(10), + databaseName: randomCouchString(10), + collectionName: 'human', + devMode: true, + multiInstance: false, + options: {}, + schema: fillWithDefaultSettings(schemas.human) + }); + + assert.strictEqual( + storageInstanceA.internals.messageChannel, + storageInstanceB.internals.messageChannel + ); + assert.notStrictEqual( + storageInstanceA.internals.messageChannel, + storageInstanceOther.internals.messageChannel + ); + + await storageInstanceA.close(); + await storageInstanceB.close(); + await storageInstanceOther.close(); + await colServer.database.destroy(); + }); + it('mode: database', async () => { + const port = await nextPort(); + const colServer = await humansCollections.create(0, undefined, false, false, getRxStorageMemory()); + const server = await startRxStorageRemoteWebsocketServer({ + port, + database: colServer.database + }); + assert.ok(server); + + const storage = getRxStorageRemoteWebsocket({ + statics: RxStorageDefaultStatics, + url: 'ws://localhost:' + port, + mode: 'database' + }); + const databaseName = randomCouchString(10); + const storageInstanceA = await storage.createStorageInstance({ + databaseInstanceToken: randomCouchString(10), + databaseName, + collectionName: 'human', + devMode: true, + multiInstance: false, + options: {}, + schema: fillWithDefaultSettings(schemas.human) + }); + const storageInstanceB = await storage.createStorageInstance({ + databaseInstanceToken: randomCouchString(10), + databaseName, + collectionName: 'human', + devMode: true, + multiInstance: false, + options: {}, + schema: fillWithDefaultSettings(schemas.human) + }); + const storageInstanceOther = await storage.createStorageInstance({ + databaseInstanceToken: randomCouchString(10), + databaseName: randomCouchString(10), + collectionName: 'human', + devMode: true, + multiInstance: false, + options: {}, + schema: fillWithDefaultSettings(schemas.human) + }); + + assert.strictEqual( + storageInstanceA.internals.messageChannel, + storageInstanceB.internals.messageChannel + ); + assert.notStrictEqual( + storageInstanceA.internals.messageChannel, + storageInstanceOther.internals.messageChannel + ); + + await storageInstanceA.close(); + await storageInstanceB.close(); + await storageInstanceOther.close(); + await colServer.database.destroy(); + }); + it('mode: collection', async () => { + const port = await nextPort(); + + const database = await createRxDatabase({ + name: randomCouchString(10), + storage: getRxStorageMemory(), + }); + await database.addCollections({ + one: { + schema: schemas.human + }, + two: { + schema: schemas.human + } + }); + const server = await startRxStorageRemoteWebsocketServer({ + port, + database + }); + assert.ok(server); + + const storage = getRxStorageRemoteWebsocket({ + statics: RxStorageDefaultStatics, + url: 'ws://localhost:' + port, + mode: 'collection' + }); + const databaseName = randomCouchString(10); + const storageInstanceA = await storage.createStorageInstance({ + databaseInstanceToken: randomCouchString(10), + databaseName, + collectionName: 'one', + devMode: true, + multiInstance: false, + options: {}, + schema: fillWithDefaultSettings(schemas.human) + }); + const storageInstanceB = await storage.createStorageInstance({ + databaseInstanceToken: randomCouchString(10), + databaseName, + collectionName: 'one', + devMode: true, + multiInstance: false, + options: {}, + schema: fillWithDefaultSettings(schemas.human) + }); + const storageInstanceOther = await storage.createStorageInstance({ + databaseInstanceToken: randomCouchString(10), + databaseName, + collectionName: 'two', + devMode: true, + multiInstance: false, + options: {}, + schema: fillWithDefaultSettings(schemas.human) + }); + + assert.strictEqual( + storageInstanceA.internals.messageChannel, + storageInstanceB.internals.messageChannel + ); + assert.notStrictEqual( + storageInstanceA.internals.messageChannel, + storageInstanceOther.internals.messageChannel + ); + + await storageInstanceA.close(); + await storageInstanceB.close(); + await storageInstanceOther.close(); + await database.destroy(); + }); + }); describe('custom requests', () => { it('should send the message and get the answer', async () => { const port = await nextPort(); @@ -80,7 +308,8 @@ config.parallel('rx-storage-remote.test.ts', () => { assert.ok(server); const clientStorage = getRxStorageRemoteWebsocket({ statics: RxStorageDefaultStatics, - url: 'ws://localhost:' + port + url: 'ws://localhost:' + port, + mode: 'storage' }); const result = await clientStorage.customRequest('foobar'); @@ -112,7 +341,7 @@ config.parallel('rx-storage-remote.test.ts', () => { const clientStorage = getRxStorageRemoteWebsocket({ statics: RxStorageDefaultStatics, url: 'ws://localhost:' + port, - disableCache: true + mode: 'storage' }); const result = await clientStorage.customRequest({ identifier: 'idx-' + idx