From a8776d76f0348705d202ac703413e23f297a5a6c Mon Sep 17 00:00:00 2001 From: Daniel Meyer <8926560+pubkey@users.noreply.github.com> Date: Thu, 30 Mar 2023 16:36:03 +0200 Subject: [PATCH] REFACTOR Map cache code (#4604) * REFACTOR * ADD getPrimaryKeyFromIndexableString() * FIX lint --- CHANGELOG.md | 2 + src/custom-index.ts | 10 + src/doc-cache.ts | 4 +- src/event-reduce.ts | 106 ++++++----- src/incremental-write.ts | 6 +- src/plugin-helpers.ts | 22 +-- src/plugins/backup/index.ts | 15 +- src/plugins/key-compression/index.ts | 123 ++++++------ src/plugins/leader-election/index.ts | 13 +- src/plugins/migration/index.ts | 21 +-- src/plugins/migration/migration-state.ts | 15 +- .../replication-graphql/graphql-websocket.ts | 40 ++-- .../replication-websocket/websocket-client.ts | 101 +++++----- .../replication-websocket/websocket-server.ts | 25 +-- src/plugins/replication/index.ts | 11 +- src/plugins/storage-dexie/dexie-helper.ts | 77 ++++---- src/plugins/storage-lokijs/lokijs-helper.ts | 177 +++++++++--------- src/plugins/utils/index.ts | 1 + src/plugins/utils/utils-map.ts | 17 ++ src/plugins/utils/utils-object.ts | 2 - src/plugins/utils/utils-other.ts | 16 -- src/query-cache.ts | 64 ++++--- src/rx-document-prototype-merge.ts | 13 +- test/unit/custom-index.test.ts | 51 +++-- 24 files changed, 491 insertions(+), 441 deletions(-) create mode 100644 src/plugins/utils/utils-map.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index fe114f879ce..fd94008931e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ +- ADD method `getPrimaryKeyFromIndexableString()` +- REFACTOR utils for `Map` and `WeakMap` caching diff --git a/src/custom-index.ts b/src/custom-index.ts index 4032eee1e43..250af4edcc4 100644 --- a/src/custom-index.ts +++ b/src/custom-index.ts @@ -169,6 +169,16 @@ export function getIndexStringLength( } +export function getPrimaryKeyFromIndexableString( + indexableString: string, + primaryKeyLength: number +): string { + const paddedPrimaryKey = indexableString.slice(primaryKeyLength * -1); + const primaryKey = paddedPrimaryKey.trimEnd(); + return primaryKey; +} + + export function getNumberIndexString( parsedLengths: ParsedLengths, fieldValue: number diff --git a/src/doc-cache.ts b/src/doc-cache.ts index 356af16c090..fdf193ce61c 100644 --- a/src/doc-cache.ts +++ b/src/doc-cache.ts @@ -4,7 +4,7 @@ import type { RxDocumentData } from './types'; import { - getFromMapOrFill, + getFromMapOrCreate, getFromMapOrThrow, getHeightOfRevision } from './plugins/utils'; @@ -109,7 +109,7 @@ export class DocumentCache { public getCachedRxDocument(docData: RxDocumentData): RxDocument { const docId: string = (docData as any)[this.primaryPath]; const revisionHeight = getHeightOfRevision(docData._rev); - const cacheItem = getFromMapOrFill>( + const cacheItem = getFromMapOrCreate>( this.cacheItemByDocId, docId, () => getNewCacheItem(docData) diff --git a/src/event-reduce.ts b/src/event-reduce.ts index 579f2439faa..ef8096cb1c8 100644 --- a/src/event-reduce.ts +++ b/src/event-reduce.ts @@ -19,7 +19,8 @@ import { rxChangeEventToEventReduceChangeEvent } from './rx-change-event'; import { arrayFilterNotEmpty, clone, - ensureNotFalsy + ensureNotFalsy, + getFromMapOrCreate } from './plugins/utils'; import { getQueryMatcher, getSortComparator, normalizeMangoQuery } from './rx-query-helper'; @@ -51,63 +52,64 @@ export const RXQUERY_QUERY_PARAMS_CACHE: WeakMap> = ne export function getQueryParams( rxQuery: RxQuery ): QueryParams { - if (!RXQUERY_QUERY_PARAMS_CACHE.has(rxQuery)) { - const collection = rxQuery.collection; - const normalizedMangoQuery = normalizeMangoQuery( - collection.storageInstance.schema, - clone(rxQuery.mangoQuery) - ); - const primaryKey = collection.schema.primaryPath; + return getFromMapOrCreate( + RXQUERY_QUERY_PARAMS_CACHE, + rxQuery, + () => { + const collection = rxQuery.collection; + const normalizedMangoQuery = normalizeMangoQuery( + collection.storageInstance.schema, + clone(rxQuery.mangoQuery) + ); + const primaryKey = collection.schema.primaryPath; - /** - * Create a custom sort comparator - * that uses the hooks to ensure - * we send for example compressed documents to be sorted by compressed queries. - */ - const sortComparator = getSortComparator( - collection.schema.jsonSchema, - normalizedMangoQuery - ); + /** + * Create a custom sort comparator + * that uses the hooks to ensure + * we send for example compressed documents to be sorted by compressed queries. + */ + const sortComparator = getSortComparator( + collection.schema.jsonSchema, + normalizedMangoQuery + ); - const useSortComparator: DeterministicSortComparator = (docA: RxDocType, docB: RxDocType) => { - const sortComparatorData = { - docA, - docB, - rxQuery + const useSortComparator: DeterministicSortComparator = (docA: RxDocType, docB: RxDocType) => { + const sortComparatorData = { + docA, + docB, + rxQuery + }; + return sortComparator(sortComparatorData.docA, sortComparatorData.docB); }; - return sortComparator(sortComparatorData.docA, sortComparatorData.docB); - }; - /** - * Create a custom query matcher - * that uses the hooks to ensure - * we send for example compressed documents to match compressed queries. - */ - const queryMatcher = getQueryMatcher( - collection.schema.jsonSchema, - normalizedMangoQuery - ); - const useQueryMatcher: QueryMatcher> = (doc: RxDocumentData) => { - const queryMatcherData = { - doc, - rxQuery + /** + * Create a custom query matcher + * that uses the hooks to ensure + * we send for example compressed documents to match compressed queries. + */ + const queryMatcher = getQueryMatcher( + collection.schema.jsonSchema, + normalizedMangoQuery + ); + const useQueryMatcher: QueryMatcher> = (doc: RxDocumentData) => { + const queryMatcherData = { + doc, + rxQuery + }; + return queryMatcher(queryMatcherData.doc); }; - return queryMatcher(queryMatcherData.doc); - }; - const ret: QueryParams = { - primaryKey: rxQuery.collection.schema.primaryPath as any, - skip: normalizedMangoQuery.skip, - limit: normalizedMangoQuery.limit, - sortFields: getSortFieldsOfQuery(primaryKey, normalizedMangoQuery) as string[], - sortComparator: useSortComparator, - queryMatcher: useQueryMatcher - }; - RXQUERY_QUERY_PARAMS_CACHE.set(rxQuery, ret); - return ret; - } else { - return RXQUERY_QUERY_PARAMS_CACHE.get(rxQuery) as QueryParams; - } + const ret: QueryParams = { + primaryKey: rxQuery.collection.schema.primaryPath as any, + skip: normalizedMangoQuery.skip, + limit: normalizedMangoQuery.limit, + sortFields: getSortFieldsOfQuery(primaryKey, normalizedMangoQuery) as string[], + sortComparator: useSortComparator, + queryMatcher: useQueryMatcher + }; + return ret; + } + ); } diff --git a/src/incremental-write.ts b/src/incremental-write.ts index 2c8f37d058a..09ff6c9d641 100644 --- a/src/incremental-write.ts +++ b/src/incremental-write.ts @@ -17,7 +17,7 @@ import type { import { clone, ensureNotFalsy, - getFromMapOrFill, + getFromMapOrCreate, getFromMapOrThrow, parseRevision, stripMetaDataFromDocument @@ -62,7 +62,7 @@ export class IncrementalWriteQueue { modifier: IncrementalWriteModifier ): Promise> { const docId: string = lastKnownDocumentState[this.primaryPath] as any; - const ar = getFromMapOrFill(this.queueByDocId, docId, () => []); + const ar = getFromMapOrCreate(this.queueByDocId, docId, () => []); const ret = new Promise>((resolve, reject) => { const item: IncrementalWriteQueueItem = { lastKnownDocumentState, @@ -157,7 +157,7 @@ export class IncrementalWriteQueue { const isConflict = isBulkWriteConflictError(error); if (isConflict) { // had conflict -> retry afterwards - const ar = getFromMapOrFill(this.queueByDocId, docId, () => []); + const ar = getFromMapOrCreate(this.queueByDocId, docId, () => []); /** * Add the items back to this.queueByDocId * by maintaining the original order. diff --git a/src/plugin-helpers.ts b/src/plugin-helpers.ts index 337dbaf59ad..fbea3f52b31 100644 --- a/src/plugin-helpers.ts +++ b/src/plugin-helpers.ts @@ -21,7 +21,7 @@ import type { import { defaultHashSha256, flatClone, - getFromMapOrThrow, + getFromMapOrCreate, requestIdleCallbackIfAvailable } from './plugins/utils'; import { BehaviorSubject, firstValueFrom } from 'rxjs'; @@ -60,21 +60,21 @@ export function wrappedValidateStorageFactory( */ validatorKey: string ): WrappedStorageFunction { - if (!VALIDATOR_CACHE_BY_VALIDATOR_KEY.has(validatorKey)) { - VALIDATOR_CACHE_BY_VALIDATOR_KEY.set(validatorKey, new Map()); - } - const VALIDATOR_CACHE = getFromMapOrThrow(VALIDATOR_CACHE_BY_VALIDATOR_KEY, validatorKey); + const VALIDATOR_CACHE = getFromMapOrCreate( + VALIDATOR_CACHE_BY_VALIDATOR_KEY, + validatorKey, + () => new Map() + ); function initValidator( schema: RxJsonSchema ): ValidatorFunction { const hash = defaultHashSha256(JSON.stringify(schema)); - if (!VALIDATOR_CACHE.has(hash)) { - const validator = getValidator(schema); - VALIDATOR_CACHE.set(hash, validator); - return validator; - } - return getFromMapOrThrow(VALIDATOR_CACHE, hash); + return getFromMapOrCreate( + VALIDATOR_CACHE, + hash, + () => getValidator(schema) + ); } return (args) => { diff --git a/src/plugins/backup/index.ts b/src/plugins/backup/index.ts index 066f81374b4..96381e0c652 100644 --- a/src/plugins/backup/index.ts +++ b/src/plugins/backup/index.ts @@ -10,7 +10,6 @@ import { filter, map } from 'rxjs/operators'; -import { newRxError } from '../../rx-error'; import type { BackupOptions, RxBackupWriteEvent, @@ -20,7 +19,7 @@ import type { RxPlugin } from '../../types'; import { - getFromMapOrThrow, + getFromMapOrCreate, PROMISE_RESOLVE_FALSE, PROMISE_RESOLVE_TRUE, PROMISE_RESOLVE_VOID @@ -85,13 +84,11 @@ export async function backupSingleDocument( const BACKUP_STATES_BY_DB: WeakMap = new WeakMap(); function addToBackupStates(db: RxDatabase, state: RxBackupState) { - if (!BACKUP_STATES_BY_DB.has(db)) { - BACKUP_STATES_BY_DB.set(db, []); - } - const ar = getFromMapOrThrow(BACKUP_STATES_BY_DB, db); - if (!ar) { - throw newRxError('SNH'); - } + const ar = getFromMapOrCreate( + BACKUP_STATES_BY_DB, + db, + () => [] + ); ar.push(state); } diff --git a/src/plugins/key-compression/index.ts b/src/plugins/key-compression/index.ts index af9eff73fa1..81366428f1b 100644 --- a/src/plugins/key-compression/index.ts +++ b/src/plugins/key-compression/index.ts @@ -34,6 +34,7 @@ import type { } from '../../types'; import { flatClone, + getFromMapOrCreate, isMaybeReadonlyArray } from '../../plugins/utils'; @@ -63,69 +64,71 @@ export function getCompressionStateByRxJsonSchema( */ overwritable.deepFreezeWhenDevMode(schema); - let compressionState = COMPRESSION_STATE_BY_SCHEMA.get(schema); - if (!compressionState) { - const compressionSchema: KeyCompressionJsonSchema = flatClone(schema) as any; - delete (compressionSchema as any).primaryKey; + return getFromMapOrCreate( + COMPRESSION_STATE_BY_SCHEMA, + schema, + () => { + const compressionSchema: KeyCompressionJsonSchema = flatClone(schema) as any; + delete (compressionSchema as any).primaryKey; + + const table = createCompressionTable( + compressionSchema, + DEFAULT_COMPRESSION_FLAG, + [ + /** + * Do not compress the primary field + * for easier debugging. + */ + getPrimaryFieldOfPrimaryKey(schema.primaryKey), + '_rev', + '_attachments', + '_deleted', + '_meta' + ] + ); + + delete (compressionSchema as any).primaryKey; + const compressedSchema: RxJsonSchema = createCompressedJsonSchema( + table, + compressionSchema + ) as RxJsonSchema; + + // also compress primary key + if (typeof schema.primaryKey !== 'string') { + const composedPrimary: CompositePrimaryKey = schema.primaryKey; + const newComposedPrimary: CompositePrimaryKey = { + key: compressedPath(table, composedPrimary.key as string), + fields: composedPrimary.fields.map(field => compressedPath(table, field as string)), + separator: composedPrimary.separator + }; + compressedSchema.primaryKey = newComposedPrimary; + } else { + compressedSchema.primaryKey = compressedPath(table, schema.primaryKey); + } - const table = createCompressionTable( - compressionSchema, - DEFAULT_COMPRESSION_FLAG, - [ - /** - * Do not compress the primary field - * for easier debugging. - */ - getPrimaryFieldOfPrimaryKey(schema.primaryKey), - '_rev', - '_attachments', - '_deleted', - '_meta' - ] - ); - - delete (compressionSchema as any).primaryKey; - const compressedSchema: RxJsonSchema = createCompressedJsonSchema( - table, - compressionSchema - ) as RxJsonSchema; - - // also compress primary key - if (typeof schema.primaryKey !== 'string') { - const composedPrimary: CompositePrimaryKey = schema.primaryKey; - const newComposedPrimary: CompositePrimaryKey = { - key: compressedPath(table, composedPrimary.key as string), - fields: composedPrimary.fields.map(field => compressedPath(table, field as string)), - separator: composedPrimary.separator - }; - compressedSchema.primaryKey = newComposedPrimary; - } else { - compressedSchema.primaryKey = compressedPath(table, schema.primaryKey); - } + /** + * the key compression module does not know about indexes + * in the schema, so we have to also compress them here. + */ + if (schema.indexes) { + const newIndexes = schema.indexes.map(idx => { + if (isMaybeReadonlyArray(idx)) { + return idx.map(subIdx => compressedPath(table, subIdx)); + } else { + return compressedPath(table, idx); + } + }); + compressedSchema.indexes = newIndexes; + } - /** - * the key compression module does not know about indexes - * in the schema, so we have to also compress them here. - */ - if (schema.indexes) { - const newIndexes = schema.indexes.map(idx => { - if (isMaybeReadonlyArray(idx)) { - return idx.map(subIdx => compressedPath(table, subIdx)); - } else { - return compressedPath(table, idx); - } - }); - compressedSchema.indexes = newIndexes; + const compressionState = { + table, + schema, + compressedSchema + }; + return compressionState; } - - compressionState = { - table, - schema, - compressedSchema - }; - COMPRESSION_STATE_BY_SCHEMA.set(schema, compressionState); - } - return compressionState; + ); } export function wrappedKeyCompressionStorage( diff --git a/src/plugins/leader-election/index.ts b/src/plugins/leader-election/index.ts index adda9b124f9..347246f89db 100644 --- a/src/plugins/leader-election/index.ts +++ b/src/plugins/leader-election/index.ts @@ -16,7 +16,7 @@ import type { RxDatabase, RxPlugin } from '../../types'; -import { PROMISE_RESOLVE_TRUE } from '../utils'; +import { PROMISE_RESOLVE_TRUE, getFromMapOrCreate } from '../utils'; const LEADER_ELECTORS_OF_DB: WeakMap = new WeakMap(); const LEADER_ELECTOR_BY_BROADCAST_CHANNEL: WeakMap = new WeakMap(); @@ -27,12 +27,11 @@ const LEADER_ELECTOR_BY_BROADCAST_CHANNEL: WeakMap createLeaderElection(broadcastChannel) + ); } /** diff --git a/src/plugins/migration/index.ts b/src/plugins/migration/index.ts index f8cbfacfed5..f68c29404e1 100644 --- a/src/plugins/migration/index.ts +++ b/src/plugins/migration/index.ts @@ -12,7 +12,7 @@ import type { RxDatabase, AllMigrationStates } from '../../types'; -import { PROMISE_RESOLVE_FALSE, RXJS_SHARE_REPLAY_DEFAULTS } from '../../plugins/utils'; +import { getFromMapOrCreate, PROMISE_RESOLVE_FALSE, RXJS_SHARE_REPLAY_DEFAULTS } from '../../plugins/utils'; import { mustMigrate, DataMigrator @@ -43,17 +43,14 @@ export const RxDBMigrationPlugin: RxPlugin = { }, RxCollection: (proto: any) => { proto.getDataMigrator = function (this: RxCollection): DataMigrator { - if (!DATA_MIGRATOR_BY_COLLECTION.has(this)) { - DATA_MIGRATOR_BY_COLLECTION.set( - this, - new DataMigrator( - this.asRxCollection, - this.migrationStrategies - ) - ); - - } - return DATA_MIGRATOR_BY_COLLECTION.get(this) as any; + return getFromMapOrCreate( + DATA_MIGRATOR_BY_COLLECTION, + this, + () => new DataMigrator( + this.asRxCollection, + this.migrationStrategies + ) + ); }; proto.migrationNeeded = function (this: RxCollection) { if (this.schema.version === 0) { diff --git a/src/plugins/migration/migration-state.ts b/src/plugins/migration/migration-state.ts index 39c982eb15b..63fad8f063b 100644 --- a/src/plugins/migration/migration-state.ts +++ b/src/plugins/migration/migration-state.ts @@ -7,7 +7,7 @@ import type { RxCollection, RxDatabase } from '../../types'; -import { ensureNotFalsy } from '../../plugins/utils'; +import { getFromMapOrCreate } from '../../plugins/utils'; export type MigrationStateWithCollection = { collection: RxCollection; @@ -17,14 +17,11 @@ export type MigrationStateWithCollection = { export const DATA_MIGRATION_STATE_SUBJECT_BY_DATABASE = new WeakMap[]>>(); export function getMigrationStateByDatabase(database: RxDatabase): BehaviorSubject[]> { - if (!DATA_MIGRATION_STATE_SUBJECT_BY_DATABASE.has(database)) { - DATA_MIGRATION_STATE_SUBJECT_BY_DATABASE.set( - database, - new BehaviorSubject[]>([]) - ); - } - const subject = DATA_MIGRATION_STATE_SUBJECT_BY_DATABASE.get(database); - return ensureNotFalsy(subject); + return getFromMapOrCreate( + DATA_MIGRATION_STATE_SUBJECT_BY_DATABASE, + database, + () => new BehaviorSubject[]>([]) + ); } /** diff --git a/src/plugins/replication-graphql/graphql-websocket.ts b/src/plugins/replication-graphql/graphql-websocket.ts index 6ba90799b81..0deb71cda54 100644 --- a/src/plugins/replication-graphql/graphql-websocket.ts +++ b/src/plugins/replication-graphql/graphql-websocket.ts @@ -1,5 +1,5 @@ import { Client, createClient } from 'graphql-ws'; -import { getFromMapOrThrow } from '../../plugins/utils'; +import { getFromMapOrCreate, getFromMapOrThrow } from '../../plugins/utils'; import ws from 'isomorphic-ws'; const { WebSocket: IsomorphicWebSocket } = ws; @@ -17,23 +17,27 @@ export function getGraphQLWebSocket( url: string, headers?: { [k: string]: string; } ): Client { - let has = GRAPHQL_WEBSOCKET_BY_URL.get(url); - if (!has) { - const wsClient = createClient({ - url, - shouldRetry: () => true, - webSocketImpl: IsomorphicWebSocket, - connectionParams: headers ? { headers } : undefined, - }); - has = { - url, - socket: wsClient, - refCount: 1 - }; - GRAPHQL_WEBSOCKET_BY_URL.set(url, has); - } else { - has.refCount = has.refCount + 1; - } + + const has = getFromMapOrCreate( + GRAPHQL_WEBSOCKET_BY_URL, + url, + () => { + const wsClient = createClient({ + url, + shouldRetry: () => true, + webSocketImpl: IsomorphicWebSocket, + connectionParams: headers ? { headers } : undefined, + }); + return { + url, + socket: wsClient, + refCount: 1 + }; + }, + (value) => { + value.refCount = value.refCount + 1; + } + ); return has.socket; } diff --git a/src/plugins/replication-websocket/websocket-client.ts b/src/plugins/replication-websocket/websocket-client.ts index 94f7ef49825..db45d088822 100644 --- a/src/plugins/replication-websocket/websocket-client.ts +++ b/src/plugins/replication-websocket/websocket-client.ts @@ -8,6 +8,7 @@ import ReconnectingWebSocket from 'reconnecting-websocket'; import IsomorphicWebSocket from 'isomorphic-ws'; import { errorToPlainJson, + getFromMapOrCreate, getFromMapOrThrow, randomCouchString, toArray @@ -70,59 +71,61 @@ export async function getWebSocket( */ const cacheKey = url + '|||' + databaseToken; - let has = WEBSOCKET_BY_CACHE_KEY.get(cacheKey); - if (!has) { - ensureIsWebsocket(IsomorphicWebSocket); - const wsClient = new ReconnectingWebSocket( - url, - [], - { - WebSocket: IsomorphicWebSocket - } - ); - const connected$ = new BehaviorSubject(false); - const openPromise = new Promise(res => { - wsClient.onopen = () => { - connected$.next(true); - res(); - }; - }); - wsClient.onclose = () => { - connected$.next(false); - }; - - const message$ = new Subject(); - wsClient.onmessage = (messageObj) => { - const message = JSON.parse(messageObj.data); - message$.next(message); - }; - - const error$ = new Subject(); - wsClient.onerror = (err) => { - const emitError = newRxError('RC_STREAM', { - errors: toArray(err).map((er: any) => errorToPlainJson(er)), - direction: 'pull' + const has = getFromMapOrCreate( + WEBSOCKET_BY_CACHE_KEY, + cacheKey, + () => { + ensureIsWebsocket(IsomorphicWebSocket); + const wsClient = new ReconnectingWebSocket( + url, + [], + { + WebSocket: IsomorphicWebSocket + } + ); + + const connected$ = new BehaviorSubject(false); + const openPromise = new Promise(res => { + wsClient.onopen = () => { + connected$.next(true); + res(); + }; }); - error$.next(emitError); - }; - - - has = { - url, - socket: wsClient, - openPromise, - refCount: 1, - connected$, - message$, - error$ - }; - WEBSOCKET_BY_CACHE_KEY.set(cacheKey, has); - } else { - has.refCount = has.refCount + 1; - } + wsClient.onclose = () => { + connected$.next(false); + }; + + const message$ = new Subject(); + wsClient.onmessage = (messageObj) => { + const message = JSON.parse(messageObj.data); + message$.next(message); + }; + + const error$ = new Subject(); + wsClient.onerror = (err) => { + const emitError = newRxError('RC_STREAM', { + errors: toArray(err).map((er: any) => errorToPlainJson(er)), + direction: 'pull' + }); + error$.next(emitError); + }; + return { + url, + socket: wsClient, + openPromise, + refCount: 1, + connected$, + message$, + error$ + }; + }, + (value) => { + value.refCount = value.refCount + 1; + } + ); await has.openPromise; return has; } diff --git a/src/plugins/replication-websocket/websocket-server.ts b/src/plugins/replication-websocket/websocket-server.ts index 5def1faf072..5c0e38b0795 100644 --- a/src/plugins/replication-websocket/websocket-server.ts +++ b/src/plugins/replication-websocket/websocket-server.ts @@ -14,7 +14,7 @@ import type { } from './websocket-types'; import { rxStorageInstanceToReplicationHandler } from '../../replication-protocol'; import { - PROMISE_RESOLVE_VOID + PROMISE_RESOLVE_VOID, getFromMapOrCreate } from '../../plugins/utils'; import { Subject } from 'rxjs'; @@ -70,16 +70,19 @@ export function startWebsocketServer(options: WebsocketServerOptions): Websocket if (!database.collections[collectionName]) { throw new Error('collection ' + collectionName + ' does not exist'); } - let handler = replicationHandlerByCollection.get(collectionName); - if (!handler) { - const collection = database.collections[collectionName]; - handler = rxStorageInstanceToReplicationHandler( - collection.storageInstance, - collection.conflictHandler, - database.token - ); - replicationHandlerByCollection.set(collectionName, handler); - } + + const handler = getFromMapOrCreate( + replicationHandlerByCollection, + collectionName, + () => { + const collection = database.collections[collectionName]; + return rxStorageInstanceToReplicationHandler( + collection.storageInstance, + collection.conflictHandler, + database.token + ); + } + ); return handler; } diff --git a/src/plugins/replication/index.ts b/src/plugins/replication/index.ts index 41619c8bcea..e037a24a7f4 100644 --- a/src/plugins/replication/index.ts +++ b/src/plugins/replication/index.ts @@ -34,6 +34,7 @@ import { ensureNotFalsy, errorToPlainJson, flatClone, + getFromMapOrCreate, PROMISE_RESOLVE_FALSE, PROMISE_RESOLVE_TRUE, toArray @@ -93,11 +94,11 @@ export class RxReplicationState { public retryTime?: number, public autoStart?: boolean, ) { - let replicationStates = REPLICATION_STATE_BY_COLLECTION.get(collection); - if (!replicationStates) { - replicationStates = []; - REPLICATION_STATE_BY_COLLECTION.set(collection, replicationStates); - } + const replicationStates = getFromMapOrCreate( + REPLICATION_STATE_BY_COLLECTION, + collection, + () => [] + ); replicationStates.push(this); // stop the replication when the collection gets destroyed diff --git a/src/plugins/storage-dexie/dexie-helper.ts b/src/plugins/storage-dexie/dexie-helper.ts index 8c8b95b4fa3..7cfffcb7f1d 100644 --- a/src/plugins/storage-dexie/dexie-helper.ts +++ b/src/plugins/storage-dexie/dexie-helper.ts @@ -5,7 +5,7 @@ import type { } from '../../types'; import { Dexie } from 'dexie'; import { DexieSettings } from '../../types'; -import { flatClone, toArray } from '../utils'; +import { flatClone, getFromMapOrCreate, toArray } from '../utils'; import { newRxError } from '../../rx-error'; import { getPrimaryFieldOfPrimaryKey, @@ -31,45 +31,48 @@ export function getDexieDbWithTables( ): DexieStorageInternals { const primaryPath = getPrimaryFieldOfPrimaryKey(schema.primaryKey); const dexieDbName = 'rxdb-dexie-' + databaseName + '--' + schema.version + '--' + collectionName; - let state = DEXIE_STATE_DB_BY_NAME.get(dexieDbName); - if (!state) { - state = (async () => { - /** - * IndexedDB was not designed for dynamically adding tables on the fly, - * so we create one dexie database per RxDB storage instance. - * @link https://github.com/dexie/Dexie.js/issues/684#issuecomment-373224696 - */ - const useSettings = flatClone(settings); - useSettings.autoOpen = false; - const dexieDb = new Dexie(dexieDbName, useSettings); - const dexieStoresSettings = { - [DEXIE_DOCS_TABLE_NAME]: getDexieStoreSchema(schema), - [DEXIE_CHANGES_TABLE_NAME]: '++sequence, id', + + const state = getFromMapOrCreate( + DEXIE_STATE_DB_BY_NAME, + dexieDbName, + () => { + const value = (async () => { /** - * Instead of adding {deleted: false} to every query we run over the document store, - * we move deleted documents into a separate store where they can only be queried - * by primary key. - * This increases performance because it is way easier for the query planner to select - * a good index and we also do not have to add the _deleted field to every index. - * - * We also need the [_meta.lwt+' + primaryPath + '] index for getChangedDocumentsSince() + * IndexedDB was not designed for dynamically adding tables on the fly, + * so we create one dexie database per RxDB storage instance. + * @link https://github.com/dexie/Dexie.js/issues/684#issuecomment-373224696 */ - [DEXIE_DELETED_DOCS_TABLE_NAME]: primaryPath + ',_meta.lwt,[_meta.lwt+' + primaryPath + ']' - }; - - dexieDb.version(1).stores(dexieStoresSettings); - await dexieDb.open(); - return { - dexieDb, - dexieTable: (dexieDb as any)[DEXIE_DOCS_TABLE_NAME], - dexieDeletedTable: (dexieDb as any)[DEXIE_DELETED_DOCS_TABLE_NAME] - }; - })(); - - DEXIE_STATE_DB_BY_NAME.set(dexieDbName, state); - REF_COUNT_PER_DEXIE_DB.set(state, 0); - } + const useSettings = flatClone(settings); + useSettings.autoOpen = false; + const dexieDb = new Dexie(dexieDbName, useSettings); + const dexieStoresSettings = { + [DEXIE_DOCS_TABLE_NAME]: getDexieStoreSchema(schema), + [DEXIE_CHANGES_TABLE_NAME]: '++sequence, id', + /** + * Instead of adding {deleted: false} to every query we run over the document store, + * we move deleted documents into a separate store where they can only be queried + * by primary key. + * This increases performance because it is way easier for the query planner to select + * a good index and we also do not have to add the _deleted field to every index. + * + * We also need the [_meta.lwt+' + primaryPath + '] index for getChangedDocumentsSince() + */ + [DEXIE_DELETED_DOCS_TABLE_NAME]: primaryPath + ',_meta.lwt,[_meta.lwt+' + primaryPath + ']' + }; + dexieDb.version(1).stores(dexieStoresSettings); + await dexieDb.open(); + return { + dexieDb, + dexieTable: (dexieDb as any)[DEXIE_DOCS_TABLE_NAME], + dexieDeletedTable: (dexieDb as any)[DEXIE_DELETED_DOCS_TABLE_NAME] + }; + })(); + DEXIE_STATE_DB_BY_NAME.set(dexieDbName, state); + REF_COUNT_PER_DEXIE_DB.set(state, 0); + return value; + } + ); return state; } diff --git a/src/plugins/storage-lokijs/lokijs-helper.ts b/src/plugins/storage-lokijs/lokijs-helper.ts index a8c9c439e9d..bab76ac46c5 100644 --- a/src/plugins/storage-lokijs/lokijs-helper.ts +++ b/src/plugins/storage-lokijs/lokijs-helper.ts @@ -19,6 +19,7 @@ import { import { ensureNotFalsy, flatClone, + getFromMapOrCreate, getProperty, promiseWait, randomCouchString @@ -88,98 +89,102 @@ export function getLokiDatabase( databaseName: string, databaseSettings: LokiDatabaseSettings ): Promise { - let databaseState: Promise | undefined = LOKI_DATABASE_STATE_BY_NAME.get(databaseName); - if (!databaseState) { - /** - * We assume that as soon as an adapter is passed, - * the database has to be persistent. - */ - const hasPersistence: boolean = !!databaseSettings.adapter; - databaseState = (async () => { - let persistenceMethod = hasPersistence ? 'adapter' : 'memory'; - if (databaseSettings.persistenceMethod) { - persistenceMethod = databaseSettings.persistenceMethod; - } - const useSettings = Object.assign( - // defaults - { - autoload: hasPersistence, - persistenceMethod, - verbose: true - }, - databaseSettings, - // overwrites - { - /** - * RxDB uses its custom load and save handling - * so we disable the LokiJS save/load handlers. - */ - autoload: false, - autosave: false, - throttledSaves: false - } - ); - const database = new lokijs( - databaseName + '.db', - flatClone(useSettings) - ); - const lokiSaveQueue = new LokiSaveQueue( - database, - useSettings - ); - /** - * Wait until all data is loaded from persistence adapter. - * Wrap the loading into the saveQueue to ensure that when many - * collections are created at the same time, the load-calls do not interfere - * with each other and cause error logs. - */ - if (hasPersistence) { - const loadDatabasePromise = new Promise((res, rej) => { - try { - database.loadDatabase({ - recursiveWait: false - }, (err) => { - if (useSettings.autoloadCallback) { - useSettings.autoloadCallback(err); - } - if (err) { - rej(err); - } else { - res(); - } - }); - } catch (err) { - rej(err); - } - }); - lokiSaveQueue.saveQueue = lokiSaveQueue.saveQueue.then(() => loadDatabasePromise); - await loadDatabasePromise; - } + return getFromMapOrCreate( + LOKI_DATABASE_STATE_BY_NAME, + databaseName, + () => { /** - * Autosave database on process end + * We assume that as soon as an adapter is passed, + * the database has to be persistent. */ - const unloads: AddReturn[] = []; - if (hasPersistence) { - unloads.push( - unloadAdd(() => lokiSaveQueue.run()) + const hasPersistence: boolean = !!databaseSettings.adapter; + const databaseState = (async () => { + let persistenceMethod = hasPersistence ? 'adapter' : 'memory'; + if (databaseSettings.persistenceMethod) { + persistenceMethod = databaseSettings.persistenceMethod; + } + const useSettings = Object.assign( + // defaults + { + autoload: hasPersistence, + persistenceMethod, + verbose: true + }, + databaseSettings, + // overwrites + { + /** + * RxDB uses its custom load and save handling + * so we disable the LokiJS save/load handlers. + */ + autoload: false, + autosave: false, + throttledSaves: false + } + ); + const database = new lokijs( + databaseName + '.db', + flatClone(useSettings) + ); + const lokiSaveQueue = new LokiSaveQueue( + database, + useSettings ); - } - const state: LokiDatabaseState = { - database, - databaseSettings: useSettings, - saveQueue: lokiSaveQueue, - collections: {}, - unloads - }; - - return state; - })(); - LOKI_DATABASE_STATE_BY_NAME.set(databaseName, databaseState); - } - return databaseState; + /** + * Wait until all data is loaded from persistence adapter. + * Wrap the loading into the saveQueue to ensure that when many + * collections are created at the same time, the load-calls do not interfere + * with each other and cause error logs. + */ + if (hasPersistence) { + const loadDatabasePromise = new Promise((res, rej) => { + try { + database.loadDatabase({ + recursiveWait: false + }, (err) => { + if (useSettings.autoloadCallback) { + useSettings.autoloadCallback(err); + } + if (err) { + rej(err); + } else { + res(); + } + }); + } catch (err) { + rej(err); + } + }); + lokiSaveQueue.saveQueue = lokiSaveQueue.saveQueue.then(() => loadDatabasePromise); + await loadDatabasePromise; + } + + /** + * Autosave database on process end + */ + const unloads: AddReturn[] = []; + if (hasPersistence) { + unloads.push( + unloadAdd(() => lokiSaveQueue.run()) + ); + } + + const state: LokiDatabaseState = { + database, + databaseSettings: useSettings, + saveQueue: lokiSaveQueue, + collections: {}, + unloads + }; + + return state; + })(); + return databaseState; + } + ); } export async function closeLokiCollections( diff --git a/src/plugins/utils/index.ts b/src/plugins/utils/index.ts index 43ae867ed54..a8eb4035d31 100644 --- a/src/plugins/utils/index.ts +++ b/src/plugins/utils/index.ts @@ -9,6 +9,7 @@ export * from './utils-string'; export * from './utils-object-deep-equal'; export * from './utils-object-dot-prop'; export * from './utils-object'; +export * from './utils-map'; export * from './utils-error'; export * from './utils-time'; export * from './utils-regex'; diff --git a/src/plugins/utils/utils-map.ts b/src/plugins/utils/utils-map.ts new file mode 100644 index 00000000000..808374c5373 --- /dev/null +++ b/src/plugins/utils/utils-map.ts @@ -0,0 +1,17 @@ + + +export function getFromMapOrCreate( + map: Map | WeakMap, + index: MapIndex, + creator: () => MapValue, + ifWasThere?: (value: MapValue) => void +): MapValue { + let value = map.get(index); + if (typeof value === 'undefined') { + value = creator(); + map.set(index, value); + } else if (ifWasThere) { + ifWasThere(value); + } + return value; +} diff --git a/src/plugins/utils/utils-object.ts b/src/plugins/utils/utils-object.ts index 4d806b44378..fdbb96115a3 100644 --- a/src/plugins/utils/utils-object.ts +++ b/src/plugins/utils/utils-object.ts @@ -226,5 +226,3 @@ export function stringifyFilter(key: string, value: any) { } return value; } - - diff --git a/src/plugins/utils/utils-other.ts b/src/plugins/utils/utils-other.ts index abcbe953b00..1d5596e9385 100644 --- a/src/plugins/utils/utils-other.ts +++ b/src/plugins/utils/utils-other.ts @@ -25,22 +25,6 @@ export function getFromMapOrThrow(map: Map | WeakMap, key: K return val; } -export function getFromMapOrFill( - map: Map | WeakMap, - key: K, - fillerFunction: () => V -): V { - let value = map.get(key); - if (!value) { - value = fillerFunction(); - map.set(key, value); - } - return value; -} - - - - /** * Using shareReplay() without settings will not unsubscribe * if there are no more subscribers. diff --git a/src/query-cache.ts b/src/query-cache.ts index 1c9342dec66..c3dac0b5d5f 100644 --- a/src/query-cache.ts +++ b/src/query-cache.ts @@ -8,6 +8,7 @@ import type { RxCollection } from './types'; import { + getFromMapOrCreate, nextTick, now, requestIdlePromise @@ -23,10 +24,11 @@ export class QueryCache { */ getByQuery(rxQuery: RxQuery): RxQuery { const stringRep = rxQuery.toString(); - if (!this._map.has(stringRep)) { - this._map.set(stringRep, rxQuery); - } - return this._map.get(stringRep) as RxQuery; + return getFromMapOrCreate( + this._map, + stringRep, + () => rxQuery + ); } } @@ -67,36 +69,36 @@ export const defaultCacheReplacementPolicyMonad: ( _collection: RxCollection, queryCache: QueryCache ) => { - if (queryCache._map.size < tryToKeepMax) { - return; - } + if (queryCache._map.size < tryToKeepMax) { + return; + } - const minUnExecutedLifetime = now() - unExecutedLifetime; - const maybeUncache: RxQuery[] = []; - - const queriesInCache = Array.from(queryCache._map.values()); - for (const rxQuery of queriesInCache) { - // filter out queries with subscribers - if (countRxQuerySubscribers(rxQuery) > 0) { - continue; - } - // directly uncache queries that never executed and are older then unExecutedLifetime - if (rxQuery._lastEnsureEqual === 0 && rxQuery._creationTime < minUnExecutedLifetime) { - uncacheRxQuery(queryCache, rxQuery); - continue; - } - maybeUncache.push(rxQuery); - } + const minUnExecutedLifetime = now() - unExecutedLifetime; + const maybeUncache: RxQuery[] = []; + + const queriesInCache = Array.from(queryCache._map.values()); + for (const rxQuery of queriesInCache) { + // filter out queries with subscribers + if (countRxQuerySubscribers(rxQuery) > 0) { + continue; + } + // directly uncache queries that never executed and are older then unExecutedLifetime + if (rxQuery._lastEnsureEqual === 0 && rxQuery._creationTime < minUnExecutedLifetime) { + uncacheRxQuery(queryCache, rxQuery); + continue; + } + maybeUncache.push(rxQuery); + } - const mustUncache = maybeUncache.length - tryToKeepMax; - if (mustUncache <= 0) { - return; - } + const mustUncache = maybeUncache.length - tryToKeepMax; + if (mustUncache <= 0) { + return; + } - const sortedByLastUsage = maybeUncache.sort((a, b) => a._lastEnsureEqual - b._lastEnsureEqual); - const toRemove = sortedByLastUsage.slice(0, mustUncache); - toRemove.forEach(rxQuery => uncacheRxQuery(queryCache, rxQuery)); -}; + const sortedByLastUsage = maybeUncache.sort((a, b) => a._lastEnsureEqual - b._lastEnsureEqual); + const toRemove = sortedByLastUsage.slice(0, mustUncache); + toRemove.forEach(rxQuery => uncacheRxQuery(queryCache, rxQuery)); + }; export const defaultCacheReplacementPolicy: RxCacheReplacementPolicy = defaultCacheReplacementPolicyMonad( diff --git a/src/rx-document-prototype-merge.ts b/src/rx-document-prototype-merge.ts index 4da04a7bd02..5725d685eed 100644 --- a/src/rx-document-prototype-merge.ts +++ b/src/rx-document-prototype-merge.ts @@ -21,6 +21,7 @@ import { runPluginHooks } from './hooks'; import { overwritable } from './overwritable'; +import { getFromMapOrCreate } from './plugins/utils'; const constructorForCollection = new WeakMap(); @@ -78,13 +79,13 @@ export function getDocumentPrototype( export function getRxDocumentConstructor( rxCollection: RxCollection ) { - if (!constructorForCollection.has(rxCollection)) { - const ret = createRxDocumentConstructor( + return getFromMapOrCreate( + constructorForCollection, + rxCollection, + () => createRxDocumentConstructor( getDocumentPrototype(rxCollection as any) - ); - constructorForCollection.set(rxCollection, ret); - } - return constructorForCollection.get(rxCollection); + ) + ); } /** diff --git a/test/unit/custom-index.test.ts b/test/unit/custom-index.test.ts index 7f669c3d451..07d51b8fa65 100644 --- a/test/unit/custom-index.test.ts +++ b/test/unit/custom-index.test.ts @@ -14,7 +14,9 @@ import { getStartIndexStringFromUpperBound, fillWithDefaultSettings, now, - getIndexStringLength + getIndexStringLength, + getPrimaryKeyFromIndexableString, + ensureNotFalsy } from '../../'; import { EXAMPLE_REVISION_1 } from '../helper/revisions'; import config from './config'; @@ -255,20 +257,39 @@ config.parallel('custom-index.test.ts', () => { }); }); describe('.getIndexStringLength()', () => { - - [ - ['id', 'num'], - ['bool', 'id', 'num'] - ].forEach(index => { - const length = getIndexStringLength( - schema, - index - ); - const indexString = getIndexableStringMonad( - schema, - index - )(getIndexTestDoc({ bool: true })); - assert.strictEqual(indexString.length, length); + it('get the correct length', () => { + [ + ['num', 'id'], + ['bool', 'num', 'id'] + ].forEach(index => { + const length = getIndexStringLength( + schema, + index + ); + const indexString = getIndexableStringMonad( + schema, + index + )(getIndexTestDoc({ bool: true })); + assert.strictEqual(indexString.length, length); + }); + }); + }); + describe('.getPrimaryKeyFromIndexableString()', () => { + it('get the correct id', () => { + [ + ['num', 'id'], + ['bool', 'num', 'id'] + ].forEach(index => { + const indexString = getIndexableStringMonad( + schema, + index + )(getIndexTestDoc({ id: 'foobar' })); + const id = getPrimaryKeyFromIndexableString( + indexString, + ensureNotFalsy(schema.properties.id.maxLength) + ); + assert.strictEqual(id, 'foobar'); + }); }); }); describe('.getStartIndexStringFromLowerBound()', () => {