Skip to content

Commit

Permalink
Feature/storage-remote mode option (pubkey#4609)
Browse files Browse the repository at this point in the history
* ADD `mode` option to remote storage

* ADD default mode is storage

* FIX tests

* FIX typo
  • Loading branch information
pubkey authored Apr 6, 2023
1 parent 831410b commit ffa16e4
Show file tree
Hide file tree
Showing 16 changed files with 512 additions and 183 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
<!-- CHANGELOG NEWEST -->
- FIX `requestIdlePromise()` must run in a queue.
- ADD Export ReplicationOptions type [#4606](https://github.com/pubkey/rxdb/pull/4606)
- ADD `mode` option to remote storage
<!-- ADD new changes here! -->

<!-- /CHANGELOG NEWEST -->
Expand Down
15 changes: 8 additions & 7 deletions docs-src/rx-storage-remote.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ The remote storage plugin is used in many RxDB plugins like the [worker](./rx-st

## Usage

The remote storage communicates over a message channel which has to implement the `messages$` observable and a `send()` function on both sides.
The remote storage communicates over a message channel which has to implement the `messageChannelCreator` function which returns an object that has a `messages$` observable and a `send()` function on both sides and a `close()` function that closes the RemoteMessageChannel.


```ts
Expand All @@ -18,17 +18,18 @@ import { getRxStorageRemote } from 'rxdb/plugins/storage-remote';
const storage = getRxStorageRemote({
identifier: 'my-id',
statics: RxStorageDefaultStatics,
messages$: new Subject(),
send(msg) {
// send to remote storage
}
mode: 'storage',
messageChannelCreator: () => Promise.resolve({
messages$: new Subject(),
send(msg) {
// send to remote storage
}
})
});
const myDb = await createRxDatabase({
storage
});



// on the remote
import { getRxStorageDexie } from 'rxdb/plugins/storage-dexie';
import { exposeRxStorageRemote } from 'rxdb/plugins/storage-remote';
Expand Down
2 changes: 1 addition & 1 deletion src/custom-index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ export function getPrimaryKeyFromIndexableString(
primaryKeyLength: number
): string {
const paddedPrimaryKey = indexableString.slice(primaryKeyLength * -1);
// we can savely trim here because the primary key is not allowed to start or end with a space char.
// we can safely trim here because the primary key is not allowed to start or end with a space char.
const primaryKey = paddedPrimaryKey.trim();
return primaryKey;
}
Expand Down
46 changes: 27 additions & 19 deletions src/plugins/electron/rx-storage-ipc-renderer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type {
import {
IPC_RENDERER_KEY_PREFIX
} from './electron-helper';
import { PROMISE_RESOLVE_VOID } from '../utils';

export type RxStorageIpcRendererSettings = {
/**
Expand All @@ -21,6 +22,7 @@ export type RxStorageIpcRendererSettings = {
key: string;
statics: RxStorageStatics;
ipcRenderer: any;
mode: RxStorageRemoteSettings['mode'];
};

export type RxStorageIpcRenderer = RxStorageRemote;
Expand All @@ -32,28 +34,34 @@ export function getRxStorageIpcRenderer(
settings.key
].join('|');

const messages$ = new Subject<MessageFromRemote>();
settings.ipcRenderer.on(channelId, (_event: any, message: any) => {
messages$.next(message);
});


settings.ipcRenderer.postMessage(
channelId,
false
);

const send: RxStorageRemoteSettings['send'] = (msg) => {
settings.ipcRenderer.postMessage(
channelId,
msg
);
};
const storage = getRxStorageRemote({
identifier: 'electron-ipc-renderer',
statics: settings.statics,
messages$,
send
mode: settings.mode,
messageChannelCreator() {
const messages$ = new Subject<MessageFromRemote>();
const listener = (_event: any, message: any) => {
messages$.next(message);
};
settings.ipcRenderer.on(channelId, listener);
settings.ipcRenderer.postMessage(
channelId,
false
);
return Promise.resolve({
messages$,
send(msg) {
settings.ipcRenderer.postMessage(
channelId,
msg
);
},
close() {
settings.ipcRenderer.removeListener(channelId, listener);
return PROMISE_RESOLVE_VOID;
}
});
},
});
return storage;
}
152 changes: 50 additions & 102 deletions src/plugins/replication-websocket/websocket-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import ReconnectingWebSocket from 'reconnecting-websocket';
import IsomorphicWebSocket from 'isomorphic-ws';
import {
errorToPlainJson,
getFromMapOrCreate,
getFromMapOrThrow,
randomCouchString,
toArray
} from '../../plugins/utils';
Expand All @@ -21,17 +19,14 @@ import {
BehaviorSubject
} from 'rxjs';
import {
RxDatabase,
RxError,
RxReplicationWriteToMasterRow
} from '../../types';
import { newRxError } from '../../rx-error';

export type WebsocketWithRefCount = {
export type WebsocketClient = {
url: string;
socket: ReconnectingWebSocket;
refCount: number;
openPromise: Promise<void>;
connected$: BehaviorSubject<boolean>;
message$: Subject<any>;
error$: Subject<RxError>;
Expand All @@ -44,115 +39,68 @@ export type WebsocketWithRefCount = {
* so we directly check the correctness in RxDB to ensure that we can
* throw a helpful error.
*/
function ensureIsWebsocket(w: typeof IsomorphicWebSocket) {
export function ensureIsWebsocket(w: typeof IsomorphicWebSocket) {
const is = typeof w !== 'undefined' && !!w && w.CLOSING === 2;
if (!is) {
console.dir(w);
throw new Error('websocket not valid');
}
}

/**
* Reuse the same socket even when multiple
* collection replicate with the same server at once.
*/
export const WEBSOCKET_BY_CACHE_KEY: Map<string, WebsocketWithRefCount> = new Map();
export async function getWebSocket(
url: string,
/**
* The value of RxDatabase.token.
*/
databaseToken: string
): Promise<WebsocketWithRefCount> {
/**
* Also use the database token as cache-key
* to make it easier to test and debug
* multi-instance setups.
*/
const cacheKey = url + '|||' + databaseToken;


const has = getFromMapOrCreate(
WEBSOCKET_BY_CACHE_KEY,
cacheKey,
() => {
ensureIsWebsocket(IsomorphicWebSocket);
const wsClient = new ReconnectingWebSocket(
url,
[],
{
WebSocket: IsomorphicWebSocket
}
);

const connected$ = new BehaviorSubject<boolean>(false);
const openPromise = new Promise<void>(res => {
wsClient.onopen = () => {
connected$.next(true);
res();
};
});
wsClient.onclose = () => {
connected$.next(false);
};

const message$ = new Subject<any>();
wsClient.onmessage = (messageObj) => {
const message = JSON.parse(messageObj.data);
message$.next(message);
};

const error$ = new Subject<any>();
wsClient.onerror = (err) => {
const emitError = newRxError('RC_STREAM', {
errors: toArray(err).map((er: any) => errorToPlainJson(er)),
direction: 'pull'
});
error$.next(emitError);
};


return {
url,
socket: wsClient,
openPromise,
refCount: 1,
connected$,
message$,
error$
};
},
(value) => {
value.refCount = value.refCount + 1;
export async function createWebSocketClient(url: string): Promise<WebsocketClient> {
ensureIsWebsocket(IsomorphicWebSocket);
const wsClient = new ReconnectingWebSocket(
url,
[],
{
WebSocket: IsomorphicWebSocket
}
);
await has.openPromise;
return has;
}

export function removeWebSocketRef(
url: string,
database: RxDatabase
) {
const cacheKey = url + '|||' + database.token;
const obj = getFromMapOrThrow(WEBSOCKET_BY_CACHE_KEY, cacheKey);
obj.refCount = obj.refCount - 1;
if (obj.refCount === 0) {
WEBSOCKET_BY_CACHE_KEY.delete(cacheKey);
obj.connected$.complete();
obj.socket.close();
}
}

const connected$ = new BehaviorSubject<boolean>(false);
await new Promise<void>(res => {
wsClient.onopen = () => {
connected$.next(true);
res();
};
});
wsClient.onclose = () => {
connected$.next(false);
};

const message$ = new Subject<any>();
wsClient.onmessage = (messageObj) => {
const message = JSON.parse(messageObj.data);
message$.next(message);
};

const error$ = new Subject<any>();
wsClient.onerror = (err) => {
const emitError = newRxError('RC_STREAM', {
errors: toArray(err).map((er: any) => errorToPlainJson(er)),
direction: 'pull'
});
error$.next(emitError);
};


return {
url,
socket: wsClient,
connected$,
message$,
error$
};

}

export async function replicateWithWebsocketServer<RxDocType, CheckpointType>(
options: WebsocketClientOptions<RxDocType>
): Promise<RxReplicationState<RxDocType, CheckpointType>> {
const socketState = await getWebSocket(options.url, options.collection.database.token);
const wsClient = socketState.socket;

const messages$ = socketState.message$;
const websocketClient = await createWebSocketClient(options.url);
const wsClient = websocketClient.socket;
const messages$ = websocketClient.message$;

let requestCounter = 0;
const requestFlag = randomCouchString(10);
Expand Down Expand Up @@ -209,9 +157,9 @@ export async function replicateWithWebsocketServer<RxDocType, CheckpointType>(
}
});

socketState.error$.subscribe(err => replicationState.subjects.error.next(err));
websocketClient.error$.subscribe(err => replicationState.subjects.error.next(err));

socketState.connected$.subscribe(isConnected => {
websocketClient.connected$.subscribe(isConnected => {
if (isConnected) {
/**
* When the client goes offline and online again,
Expand All @@ -235,6 +183,6 @@ export async function replicateWithWebsocketServer<RxDocType, CheckpointType>(
}
});

options.collection.onDestroy.push(() => removeWebSocketRef(options.url, options.collection.database));
options.collection.onDestroy.push(() => websocketClient.socket.close());
return replicationState;
}
36 changes: 20 additions & 16 deletions src/plugins/storage-remote-websocket/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import type {
WebSocket
} from 'ws';
import {
getFromMapOrThrow,
randomCouchString
PROMISE_RESOLVE_VOID,
getFromMapOrThrow
} from '../../plugins/utils';
import {
getWebSocket,
createWebSocketClient,
startSocketServer
} from '../replication-websocket';
import { exposeRxStorageRemote } from '../storage-remote/remote';
Expand Down Expand Up @@ -79,30 +79,34 @@ export function startRxStorageRemoteWebsocketServer(
};
}



export function getRxStorageRemoteWebsocket(
options: RxStorageRemoteWebsocketClientOptions
): RxStorageRemoteWebsocketClient {
const identifier = [
options.url,
'rx-remote-storage-websocket',
options.disableCache ? randomCouchString() : ''
'rx-remote-storage-websocket'
].join('');
const messages$ = new Subject<MessageFromRemote>();
const websocketClientPromise = getWebSocket(options.url, identifier);
const storage = getRxStorageRemote({
identifier,
statics: options.statics,
messages$,
send(msg) {
return websocketClientPromise
.then(websocketClient => websocketClient.socket.send(JSON.stringify(msg)));
mode: options.mode,
async messageChannelCreator() {
const messages$ = new Subject<MessageFromRemote>();
const websocketClient = await createWebSocketClient(options.url);
websocketClient.message$.subscribe(msg => messages$.next(msg));
return {
messages$,
send(msg) {
return websocketClient.socket.send(JSON.stringify(msg));
},
close() {
websocketClient.socket.close();
return PROMISE_RESOLVE_VOID;
}
};

}
});
websocketClientPromise.then((websocketClient) => {
websocketClient.message$.subscribe(msg => messages$.next(msg));
});
return storage;
}

Expand Down
Loading

0 comments on commit ffa16e4

Please sign in to comment.