Skip to content

Commit

Permalink
Revert "Revert lazy init in storage"
Browse files Browse the repository at this point in the history
This reverts commit 57ba406.
  • Loading branch information
EmilianoSanchez committed Oct 18, 2024
1 parent d59aea5 commit c270290
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 34 deletions.
1 change: 1 addition & 0 deletions src/sdkFactory/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ export function sdkFactory(params: ISdkFactoryParams): SplitIO.ICsSDK | SplitIO.
// We will just log and allow for the SDK to end up throwing an SDK_TIMEOUT event for devs to handle.
validateAndTrackApiKey(log, settings.core.authorizationKey);
readiness.init();
storage.init && storage.init();
uniqueKeysTracker && uniqueKeysTracker.start();
syncManager && syncManager.start();
signalListener && signalListener.start();
Expand Down
2 changes: 1 addition & 1 deletion src/storages/inRedis/RedisAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const DEFAULT_OPTIONS = {
const DEFAULT_LIBRARY_OPTIONS = {
enableOfflineQueue: false,
connectTimeout: DEFAULT_OPTIONS.connectionTimeout,
lazyConnect: false
lazyConnect: false // @TODO true to avoid side-effects on instantiation
};

interface IRedisCommand {
Expand Down
4 changes: 4 additions & 0 deletions src/storages/pluggable/__tests__/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ describe('PLUGGABLE STORAGE', () => {
test('creates a storage instance', async () => {
const storageFactory = PluggableStorage({ prefix, wrapper: wrapperMock });
const storage = storageFactory(internalSdkParams);
storage.init();

assertStorageInterface(storage); // the instance must implement the storage interface
expect(wrapperMock.connect).toBeCalledTimes(1); // wrapper connect method should be called once when storage is created
Expand Down Expand Up @@ -74,6 +75,7 @@ describe('PLUGGABLE STORAGE', () => {
test('creates a storage instance for partial consumer mode (events and impressions cache in memory)', async () => {
const storageFactory = PluggableStorage({ prefix, wrapper: wrapperMock });
const storage = storageFactory({ ...internalSdkParams, settings: { ...internalSdkParams.settings, mode: CONSUMER_PARTIAL_MODE } });
storage.init();

assertStorageInterface(storage);
expect(wrapperMock.connect).toBeCalledTimes(1);
Expand Down Expand Up @@ -102,6 +104,7 @@ describe('PLUGGABLE STORAGE', () => {
// Create storage instance. Wrapper is pollute but doesn't have filter query key, so it should clear the cache
await new Promise(resolve => {
storage = storageFactory({ onReadyCb: resolve, settings: { ...fullSettings, mode: undefined } });
storage.init();
});

// Assert that expected caches are present
Expand All @@ -121,6 +124,7 @@ describe('PLUGGABLE STORAGE', () => {
// Create storage instance. This time the wrapper has the current filter query key, so it should not clear the cache
await new Promise(resolve => {
storage = storageFactory({ onReadyCb: resolve, settings: { ...fullSettings, mode: undefined } });
storage.init();
});

// Assert that cache was not cleared
Expand Down
71 changes: 38 additions & 33 deletions src/storages/pluggable/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IPluggableStorageWrapper, IStorageAsync, IStorageAsyncFactory, IStorageFactoryParams, ITelemetryCacheAsync } from '../types';
import { IPluggableStorageWrapper, IStorageAsyncFactory, IStorageFactoryParams, ITelemetryCacheAsync } from '../types';

import { KeyBuilderSS } from '../KeyBuilderSS';
import { SplitsCachePluggable } from './SplitsCachePluggable';
Expand Down Expand Up @@ -62,11 +62,12 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn

const prefix = validatePrefix(options.prefix);

function PluggableStorageFactory(params: IStorageFactoryParams): IStorageAsync {
function PluggableStorageFactory(params: IStorageFactoryParams) {
const { onReadyCb, settings, settings: { log, mode, sync: { impressionsMode }, scheduler: { impressionsQueueSize, eventsQueueSize } } } = params;
const metadata = metadataBuilder(settings);
const keys = new KeyBuilderSS(prefix, metadata);
const wrapper = wrapperAdapter(log, options.wrapper);
let connectPromise: Promise<void>;

const isSyncronizer = mode === undefined; // If mode is not defined, the synchronizer is running
const isPartialConsumer = mode === CONSUMER_PARTIAL_MODE;
Expand All @@ -89,35 +90,6 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
new UniqueKeysCachePluggable(log, keys.buildUniqueKeysKey(), wrapper) :
undefined;

// Connects to wrapper and emits SDK_READY event on main client
const connectPromise = wrapper.connect().then(() => {
if (isSyncronizer) {
// In standalone or producer mode, clear storage if SDK key or feature flag filter has changed
return wrapper.get(keys.buildHashKey()).then((hash) => {
const currentHash = getStorageHash(settings);
if (hash !== currentHash) {
log.info(LOG_PREFIX + 'Storage HASH has changed (SDK key, flags filter criteria or flags spec version was modified). Clearing cache');
return wrapper.getKeysByPrefix(`${keys.prefix}.`).then(storageKeys => {
return Promise.all(storageKeys.map(storageKey => wrapper.del(storageKey)));
}).then(() => wrapper.set(keys.buildHashKey(), currentHash));
}
}).then(() => {
onReadyCb();
});
} else {
// Start periodic flush of async storages if not running synchronizer (producer mode)
if (impressionCountsCache && (impressionCountsCache as ImpressionCountsCachePluggable).start) (impressionCountsCache as ImpressionCountsCachePluggable).start();
if (uniqueKeysCache && (uniqueKeysCache as UniqueKeysCachePluggable).start) (uniqueKeysCache as UniqueKeysCachePluggable).start();
if (telemetry && (telemetry as ITelemetryCacheAsync).recordConfig) (telemetry as ITelemetryCacheAsync).recordConfig();

onReadyCb();
}
}).catch((e) => {
e = e || new Error('Error connecting wrapper');
onReadyCb(e);
return e; // Propagate error for shared clients
});

return {
splits: new SplitsCachePluggable(log, keys, wrapper, settings.sync.__splitFiltersValidation),
segments: new SegmentsCachePluggable(log, keys, wrapper),
Expand All @@ -127,6 +99,39 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
telemetry,
uniqueKeys: uniqueKeysCache,

init() {
if (connectPromise) return connectPromise;

// Connects to wrapper and emits SDK_READY event on main client
return connectPromise = wrapper.connect().then(() => {
if (isSyncronizer) {
// In standalone or producer mode, clear storage if SDK key or feature flag filter has changed
return wrapper.get(keys.buildHashKey()).then((hash) => {
const currentHash = getStorageHash(settings);
if (hash !== currentHash) {
log.info(LOG_PREFIX + 'Storage HASH has changed (SDK key, flags filter criteria or flags spec version was modified). Clearing cache');
return wrapper.getKeysByPrefix(`${keys.prefix}.`).then(storageKeys => {
return Promise.all(storageKeys.map(storageKey => wrapper.del(storageKey)));
}).then(() => wrapper.set(keys.buildHashKey(), currentHash));
}
}).then(() => {
onReadyCb();
});
} else {
// Start periodic flush of async storages if not running synchronizer (producer mode)
if (impressionCountsCache && (impressionCountsCache as ImpressionCountsCachePluggable).start) (impressionCountsCache as ImpressionCountsCachePluggable).start();
if (uniqueKeysCache && (uniqueKeysCache as UniqueKeysCachePluggable).start) (uniqueKeysCache as UniqueKeysCachePluggable).start();
if (telemetry && (telemetry as ITelemetryCacheAsync).recordConfig) (telemetry as ITelemetryCacheAsync).recordConfig();

onReadyCb();
}
}).catch((e) => {
e = e || new Error('Error connecting wrapper');
onReadyCb(e);
return e; // Propagate error for shared clients
});
},

// Stop periodic flush and disconnect the underlying storage
destroy() {
return Promise.all(isSyncronizer ? [] : [
Expand All @@ -136,8 +141,8 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
},

// emits SDK_READY event on shared clients and returns a reference to the storage
shared(_, onReadyCb) {
connectPromise.then(onReadyCb);
shared(_: string, onReadyCb: (error?: any) => void) {
this.init().then(onReadyCb);

return {
...this,
Expand Down
1 change: 1 addition & 0 deletions src/storages/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ export interface IStorageBase<
events: TEventsCache,
telemetry?: TTelemetryCache,
uniqueKeys?: TUniqueKeysCache,
init?: () => void | Promise<void>,
destroy(): void | Promise<void>,
shared?: (matchingKey: string, onReadyCb: (error?: any) => void) => this
}
Expand Down

0 comments on commit c270290

Please sign in to comment.