Skip to content

Commit

Permalink
added streamer test, added streamer onInitUpdate, clearer logic
Browse files Browse the repository at this point in the history
  • Loading branch information
zhukaihan committed Jul 13, 2024
1 parent 7cd8adb commit ffc30a6
Show file tree
Hide file tree
Showing 8 changed files with 530 additions and 516 deletions.
2 changes: 1 addition & 1 deletion packages/node/src/local/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export class LocalEvaluationClient {

constructor(
apiKey: string,
config: LocalEvaluationConfig,
config?: LocalEvaluationConfig,
flagConfigCache?: FlagConfigCache,
httpClient: HttpClient = new FetchHttpClient(config?.httpAgent),
streamEventSourceFactory: StreamEventSourceFactory = (url, params) =>
Expand Down
28 changes: 15 additions & 13 deletions packages/node/src/local/poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { CohortStorage } from 'src/types/cohort';

import { LocalEvaluationDefaults } from '../types/config';
import { FlagConfigCache } from '../types/flag';
import { doWithBackoff, BackoffPolicy } from '../util/backoff';
import { BackoffPolicy, doWithBackoffFailLoudly } from '../util/backoff';

import { CohortFetcher } from './cohort/fetcher';
import { FlagConfigFetcher } from './fetcher';
Expand Down Expand Up @@ -61,18 +61,20 @@ export class FlagConfigPoller
}, this.pollingIntervalMillis);

// Fetch initial flag configs and await the result.
await doWithBackoff<void>(async () => {
try {
const flagConfigs = await this.fetcher.fetch();
await super._update(flagConfigs, true, onChange);
} catch (e) {
this.logger.error(
'[Experiment] flag config initial poll failed, stopping',
e,
);
this.stop();
}
}, BACKOFF_POLICY);
try {
const flagConfigs = await doWithBackoffFailLoudly(
async () => await this.fetcher.fetch(),
BACKOFF_POLICY,
);
await super._update(flagConfigs, true, onChange);
} catch (e) {
this.logger.error(
'[Experiment] flag config initial poll failed, stopping',
e,
);
this.stop();
throw e;
}
}
}

Expand Down
30 changes: 21 additions & 9 deletions packages/node/src/local/stream-flag-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ export class SdkStreamFlagApi implements StreamFlagApi {
// Flag for whether the stream is open and retrying or closed. This is to avoid calling connect() twice.
private isClosedAndNotTrying = true;

// Callback for updating flag configs. Can be set or changed multiple times and effect immediately.
public onInitUpdate?: StreamFlagOnUpdateCallback;
// Callback for updating flag configs. Can be set or changed multiple times and effect immediately.
public onUpdate?: StreamFlagOnUpdateCallback;
// Callback for notifying user of fatal errors. Can be set or changed multiple times and effect immediately.
Expand Down Expand Up @@ -115,12 +117,16 @@ export class SdkStreamFlagApi implements StreamFlagApi {
return reject(DEFAULT_STREAM_ERR_EVENTS.DATA_UNPARSABLE);
}
// Update the callbacks.
this.api.onUpdate = (data: string) => this.handleNewMsg(data);
this.api.onUpdate = (data: string) => this.handleNewMsg(data, false);
this.api.onError = (err: StreamErrorEvent) => this.errorAndRetry(err);
// Handoff data to application. Make sure it finishes processing initial new flag configs.
await this.handleNewMsg(data);
// Resolve promise which declares client ready.
resolve();
try {
await this.handleNewMsg(data, true);
// Resolve promise which declares client ready.
resolve();
} catch {
reject();
}
};
this.api.onUpdate = dealWithFlagUpdateInOneTry;

Expand Down Expand Up @@ -230,7 +236,7 @@ export class SdkStreamFlagApi implements StreamFlagApi {
}

// Handles new messages, parse them, and handoff to application. Retries if have parsing error.
private async handleNewMsg(data: string) {
private async handleNewMsg(data: string, isInit: boolean) {
let flagConfigs;
try {
flagConfigs = SdkStreamFlagApi.parseFlagConfigs(data);
Expand All @@ -239,11 +245,17 @@ export class SdkStreamFlagApi implements StreamFlagApi {
return;
}
// Put update outside try catch. onUpdate error doesn't mean stream error.
if (this.onUpdate) {
const updateFunc =
isInit && this.onInitUpdate ? this.onInitUpdate : this.onUpdate;
if (updateFunc) {
try {
await this.onUpdate(flagConfigs);
// eslint-disable-next-line no-empty
} catch {} // Don't care about application errors after handoff.
await updateFunc(flagConfigs);
} catch (e) {
// Only care about application errors after handoff if initing. Ensure init is success.
if (isInit) {
throw e;
}
}
}
}

Expand Down
24 changes: 8 additions & 16 deletions packages/node/src/local/streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,27 +67,19 @@ export class FlagConfigStreamer
this.stream.onError = (e) => {
const err = e as StreamErrorEvent;
this.logger.debug(
`[Experiment] streamer - onError, fallback to poller, err status: ${err.status}, err message: ${err.message}`,
`[Experiment] streamer - onError, fallback to poller, err status: ${err?.status}, err message: ${err?.message}, err ${err}`,
);
this.poller.start(onChange);
this.startRetryStreamInterval();
};

let isInitUpdate = true;
this.stream.onInitUpdate = async (flagConfigs) => {
this.logger.debug('[Experiment] streamer - receives updates');
await super._update(flagConfigs, true, onChange);
};
this.stream.onUpdate = async (flagConfigs) => {
this.logger.debug('[Experiment] streamer - receives updates');
if (isInitUpdate) {
isInitUpdate = false;
try {
super._update(flagConfigs, true, onChange);
} catch {
// Flag update failed on init, stop, fallback to poller.
await this.poller.start(onChange);
this.startRetryStreamInterval();
}
} else {
super._update(flagConfigs, false, onChange);
}
await super._update(flagConfigs, false, onChange);
};

try {
Expand All @@ -102,11 +94,11 @@ export class FlagConfigStreamer
libraryVersion: PACKAGE_VERSION,
});
this.poller.stop();
this.logger.debug('[Experiment] streamer - start stream success');
this.logger.debug('[Experiment] streamer - start flags stream success');
} catch (e) {
const err = e as StreamErrorEvent;
this.logger.debug(
`[Experiment] streamer - start stream failed, fallback to poller, err status: ${err.status}, err message: ${err.message}`,
`[Experiment] streamer - start stream failed, fallback to poller, err status: ${err?.status}, err message: ${err?.message}, err ${err}`,
);
await this.poller.start(onChange);
this.startRetryStreamInterval();
Expand Down
2 changes: 1 addition & 1 deletion packages/node/src/remote/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export class RemoteEvaluationClient {
* @param apiKey The environment API Key
* @param config See {@link ExperimentConfig} for config options
*/
public constructor(apiKey: string, config: RemoteEvaluationConfig) {
public constructor(apiKey: string, config?: RemoteEvaluationConfig) {
this.apiKey = apiKey;
this.config = populateRemoteConfigDefaults(config);
this.evaluationApi = new SdkEvaluationApi(
Expand Down
15 changes: 9 additions & 6 deletions packages/node/src/util/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import {
} from '..';

export const populateRemoteConfigDefaults = (
customConfig: RemoteEvaluationConfig,
customConfig?: RemoteEvaluationConfig,
): RemoteEvaluationConfig => {
const config = { ...RemoteEvaluationDefaults, ...customConfig };
const isEu = config.serverZone.toLowerCase() === EU_SERVER_URLS.name;

if (!customConfig.serverUrl) {
if (!customConfig?.serverUrl) {
config.serverUrl = isEu
? EU_SERVER_URLS.remote
: RemoteEvaluationDefaults.serverUrl;
Expand All @@ -25,22 +25,25 @@ export const populateRemoteConfigDefaults = (
};

export const populateLocalConfigDefaults = (
customConfig: LocalEvaluationConfig,
customConfig?: LocalEvaluationConfig,
): LocalEvaluationConfig => {
const config = { ...LocalEvaluationDefaults, ...customConfig };
const isEu = config.serverZone.toLowerCase() === EU_SERVER_URLS.name;

if (!customConfig.serverUrl) {
if (!customConfig?.serverUrl) {
config.serverUrl = isEu
? EU_SERVER_URLS.flags
: LocalEvaluationDefaults.serverUrl;
}
if (!customConfig.streamServerUrl) {
if (!customConfig?.streamServerUrl) {
config.streamServerUrl = isEu
? EU_SERVER_URLS.stream
: LocalEvaluationDefaults.streamServerUrl;
}
if (customConfig.cohortConfig && !customConfig.cohortConfig.cohortServerUrl) {
if (
customConfig?.cohortConfig &&
!customConfig?.cohortConfig.cohortServerUrl
) {
config.cohortConfig.cohortServerUrl = isEu
? EU_SERVER_URLS.cohort
: CohortConfigDefaults.cohortServerUrl;
Expand Down
7 changes: 6 additions & 1 deletion packages/node/test/local/flagConfigPoller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,12 @@ test('flagConfig poller initial error', async () => {
throw new Error();
});
// FLAG should be empty, as cohort failed. Poller should be stopped immediately and test exists cleanly.
await poller.start();
try {
// Should throw when init failed.
await poller.start();
fail();
// eslint-disable-next-line no-empty
} catch {}
expect(await poller.cache.getAll()).toStrictEqual({});
});

Expand Down
Loading

0 comments on commit ffc30a6

Please sign in to comment.