Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add stream support #42

Merged
merged 14 commits into from
Mar 15, 2024
3 changes: 2 additions & 1 deletion packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"dependencies": {
"@amplitude/analytics-node": "^1.3.4",
"@amplitude/analytics-types": "^1.3.1",
"@amplitude/experiment-core": "^0.7.2"
"@amplitude/experiment-core": "^0.7.2",
zhukaihan marked this conversation as resolved.
Show resolved Hide resolved
"eventsource": "^2.0.2"
}
}
43 changes: 33 additions & 10 deletions packages/node/src/local/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ import * as amplitude from '@amplitude/analytics-node';
import {
EvaluationEngine,
EvaluationFlag,
StreamEventSourceFactory,
topologicalSort,
} from '@amplitude/experiment-core';
import EventSource from 'eventsource';

import { Assignment, AssignmentService } from '../assignment/assignment';
import { InMemoryAssignmentFilter } from '../assignment/assignment-filter';
Expand All @@ -30,6 +32,8 @@ import {
import { InMemoryFlagConfigCache } from './cache';
import { FlagConfigFetcher } from './fetcher';
import { FlagConfigPoller } from './poller';
import { FlagConfigStreamer } from './streamer';
import { FlagConfigUpdater } from './updater';

/**
* Experiment client for evaluating variants for a user locally.
Expand All @@ -38,7 +42,7 @@ import { FlagConfigPoller } from './poller';
export class LocalEvaluationClient {
private readonly logger: Logger;
private readonly config: LocalEvaluationConfig;
private readonly poller: FlagConfigPoller;
private readonly updater: FlagConfigUpdater;
private readonly assignmentService: AssignmentService;
private readonly evaluation: EvaluationEngine;

Expand All @@ -54,6 +58,8 @@ export class LocalEvaluationClient {
config: LocalEvaluationConfig,
flagConfigCache?: FlagConfigCache,
httpClient: HttpClient = new FetchHttpClient(config?.httpAgent),
streamEventSourceFactory: StreamEventSourceFactory = (url, params) =>
new EventSource(url, params),
) {
this.config = { ...LocalEvaluationDefaults, ...config };
const fetcher = new FlagConfigFetcher(
Expand All @@ -67,12 +73,27 @@ export class LocalEvaluationClient {
this.config.bootstrap,
);
this.logger = new ConsoleLogger(this.config.debug);
this.poller = new FlagConfigPoller(
fetcher,
this.cache,
this.config.flagConfigPollingIntervalMillis,
this.config.debug,
);
this.updater = this.config.getFlagConfigUpdateWithStream
? new FlagConfigStreamer(
apiKey,
fetcher,
this.cache,
streamEventSourceFactory,
this.config.flagConfigPollingIntervalMillis,
this.config.streamConnTimeoutMillis,
this.config.streamFlagConnTimeoutMillis,
this.config.streamFlagTryAttempts,
this.config.streamFlagTryDelayMillis,
this.config.retryStreamFlagDelayMillis,
this.config.streamServerUrl,
this.config.debug,
)
: new FlagConfigPoller(
fetcher,
this.cache,
this.config.flagConfigPollingIntervalMillis,
this.config.debug,
);
if (this.config.assignmentConfig) {
this.config.assignmentConfig = {
...AssignmentConfigDefaults,
Expand Down Expand Up @@ -157,8 +178,10 @@ export class LocalEvaluationClient {
*
* Calling this function while the poller is already running does nothing.
*/
public async start(): Promise<void> {
return await this.poller.start();
public async start(
cb: (cache: FlagConfigCache) => Promise<void>,
): Promise<void> {
return await this.updater.start(cb);
}

/**
Expand All @@ -167,6 +190,6 @@ export class LocalEvaluationClient {
* Calling this function while the poller is not running will do nothing.
*/
public stop(): void {
return this.poller.stop();
return this.updater.stop();
}
}
3 changes: 2 additions & 1 deletion packages/node/src/local/poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { ConsoleLogger } from '../util/logger';
import { Logger } from '../util/logger';

import { FlagConfigFetcher } from './fetcher';
import { FlagConfigUpdater } from './updater';

const BACKOFF_POLICY: BackoffPolicy = {
attempts: 5,
Expand All @@ -13,7 +14,7 @@ const BACKOFF_POLICY: BackoffPolicy = {
scalar: 1,
};

export class FlagConfigPoller {
export class FlagConfigPoller implements FlagConfigUpdater {
private readonly logger: Logger;
private readonly pollingIntervalMillis: number;

Expand Down
173 changes: 173 additions & 0 deletions packages/node/src/local/streamer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import {
SdkStreamFlagApi,
StreamErrorEvent,
StreamEventSourceFactory,
} from '@amplitude/experiment-core';

import { version as PACKAGE_VERSION } from '../../gen/version';
import { LocalEvaluationDefaults } from '../types/config';
import { FlagConfigCache } from '../types/flag';
import { ConsoleLogger } from '../util/logger';
import { Logger } from '../util/logger';

import { FlagConfigFetcher } from './fetcher';
import { FlagConfigPoller } from './poller';

export class FlagConfigStreamer {
zhukaihan marked this conversation as resolved.
Show resolved Hide resolved
private readonly logger: Logger;

private readonly poller: FlagConfigPoller;
private readonly stream: SdkStreamFlagApi;
private readonly retryStreamFlagDelayMillis: number;

private streamRetryInterval?: NodeJS.Timeout;

public readonly cache: FlagConfigCache;

constructor(
apiKey: string,
fetcher: FlagConfigFetcher,
cache: FlagConfigCache,
streamEventSourceFactory: StreamEventSourceFactory,
pollingIntervalMillis = LocalEvaluationDefaults.flagConfigPollingIntervalMillis,
streamConnTimeoutMillis = LocalEvaluationDefaults.streamConnTimeoutMillis,
streamFlagConnTimeoutMillis = LocalEvaluationDefaults.streamFlagConnTimeoutMillis,
streamFlagTryAttempts = LocalEvaluationDefaults.streamFlagTryAttempts,
streamFlagTryDelayMillis = LocalEvaluationDefaults.streamFlagTryDelayMillis,
retryStreamFlagDelayMillis = LocalEvaluationDefaults.retryStreamFlagDelayMillis,
serverUrl: string = LocalEvaluationDefaults.serverUrl,
debug = false,
) {
this.logger = new ConsoleLogger(debug);
this.logger.debug('[Experiment] streamer - init');
this.cache = cache;
this.poller = new FlagConfigPoller(
fetcher,
cache,
pollingIntervalMillis,
debug,
);
this.stream = new SdkStreamFlagApi(
apiKey,
serverUrl,
streamEventSourceFactory,
streamConnTimeoutMillis,
streamFlagConnTimeoutMillis,
streamFlagTryAttempts,
streamFlagTryDelayMillis,
);
this.retryStreamFlagDelayMillis = retryStreamFlagDelayMillis;
}

/**
* Fetch initial flag configurations and start polling for updates.
*
* You must call this function to begin polling for flag config updates.
* The promise returned by this function is resolved when the initial call
* to fetch the flag configuration completes.
*
* Calling this function while the poller is already running does nothing.
*/
public async start(
onChange?: (cache: FlagConfigCache) => Promise<void>,
): Promise<void> {
this.stream.onError = (e) => {
const err = e as StreamErrorEvent;
this.logger.debug(
`[Experiment] streamer - onError, fallback to poller, err status: ${err.status}, err message: ${err.message}`,
);
this.poller.start(onChange);
this.startRetryStreamInterval();
};

this.stream.onUpdate = async (flagConfigs) => {
this.logger.debug('[Experiment] streamer - receives updates');
let changed = false;
if (onChange) {
const current = await this.cache.getAll();
if (!Object.is(current, flagConfigs)) {
changed = true;
}
}
await this.cache.clear();
await this.cache.putAll(flagConfigs);
if (changed) {
await onChange(this.cache);
}
};

try {
// Clear retry timeout. If stream isn't connected, we're trying now.
// If stream is connected, timeout will be undefined and connect will do nothing.
this.clearRetryStreamInterval();
// stream connect error will be raised, not through calling onError.
// So onError won't be called.
// If close is called during connect, connect will return success. No sideeffects here.
await this.stream.connect({
libraryName: 'experiment-node-server',
libraryVersion: PACKAGE_VERSION,
});
this.poller.stop();
this.logger.debug('[Experiment] streamer - start stream success');
} catch (e) {
const err = e as StreamErrorEvent;
this.logger.debug(
`[Experiment] streamer - start stream failed, fallback to poller, err status: ${err.status}, err message: ${err.message}`,
);
await this.poller.start(onChange);
this.startRetryStreamInterval();
}
}

/**
* Stop polling for flag configurations.
*
* Calling this function while the poller is not running will do nothing.
*/
public stop(): void {
this.logger.debug('[Experiment] streamer - stop');
this.clearRetryStreamInterval();
this.poller.stop();
this.stream.close();
}

/**
* Force a flag config fetch and cache the update with an optional callback
* which gets called if the flag configs change in any way.
*
* @param onChange optional callback which will get called if the flag configs
* in the cache have changed.
*/
public async update(
onChange?: (cache: FlagConfigCache) => Promise<void>,
): Promise<void> {
this.poller.update(onChange);
}

// Retry stream after a while.
private startRetryStreamInterval() {
this.clearRetryStreamInterval();
this.streamRetryInterval = setInterval(() => {
this.logger.debug('[Experiment] streamer - retry stream');
this.stream
.connect()
.then(() => {
this.logger.debug('[Experiment] streamer - retry stream success');
// Clear interval.
this.clearRetryStreamInterval();
// Stop poller.
this.poller.stop();
})
// eslint-disable-next-line @typescript-eslint/no-empty-function
.catch(() => {});
}, this.retryStreamFlagDelayMillis);
}

// Clear retry interval.
private clearRetryStreamInterval() {
if (this.streamRetryInterval) {
clearInterval(this.streamRetryInterval);
this.streamRetryInterval = undefined;
}
}
}
26 changes: 26 additions & 0 deletions packages/node/src/local/updater.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { FlagConfigCache } from '..';

export interface FlagConfigUpdater {
/**
* Fetch initial flag configurations and start watching for updates.
*
* You must call this function to begin watching for flag config updates.
* The promise returned by this function is resolved when the initial call
* to fetch the flag configuration completes.
*/
start(onChange?: (cache: FlagConfigCache) => Promise<void>): Promise<void>;

/**
* Stop updating flag configurations.
*/
stop(): void;

/**
* Force a flag config fetch and cache the update with an optional callback
* which gets called if the flag configs change in any way.
*
* @param onChange optional callback which will get called if the flag configs
* in the cache have changed.
*/
update(onChange?: (cache: FlagConfigCache) => Promise<void>): Promise<void>;
}
45 changes: 45 additions & 0 deletions packages/node/src/types/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,44 @@ export type LocalEvaluationConfig = {
* evaluation.
*/
assignmentConfig?: AssignmentConfig;

/**
* To use streaming API or polling. With streaming, flag config updates are
* received immediately, no polling is necessary. If stream fails, it will
* fallback to polling automatically.
*/
getFlagConfigUpdateWithStream?: boolean;

/**
* The stream server endpoint from which to stream data.
*/
streamServerUrl?: string;

/**
* To use with streaming. The timeout for connecting an server-side event stream. Aka, the timeout for http connection.
*/
streamConnTimeoutMillis?: number;

/**
* To use with streaming. The timeout for a single attempt of establishing a valid stream of flag configs.
* This includes streamConnTimeoutMillis and time for receiving initial flag configs.
*/
streamFlagConnTimeoutMillis?: number;

/**
* To use with streaming. The number attempts to connect before declaring streaming fatal error.
*/
streamFlagTryAttempts?: number;

/**
* To use with streaming. The delay between attempts to connect.
*/
streamFlagTryDelayMillis?: number;

/**
* To use with streaming. The delay to retry streaming after stream fatal error and fallbacked to poller.
*/
retryStreamFlagDelayMillis?: number;
};

export type AssignmentConfig = {
Expand Down Expand Up @@ -181,6 +219,13 @@ export const LocalEvaluationDefaults: LocalEvaluationConfig = {
bootstrap: {},
flagConfigPollingIntervalMillis: 30000,
httpAgent: null,
getFlagConfigUpdateWithStream: false,
streamServerUrl: 'https://stream.lab.amplitude.com',
streamConnTimeoutMillis: 1000,
streamFlagConnTimeoutMillis: 1000,
streamFlagTryAttempts: 2,
streamFlagTryDelayMillis: 1000,
retryStreamFlagDelayMillis: 15000,
zhukaihan marked this conversation as resolved.
Show resolved Hide resolved
};

export const AssignmentConfigDefaults: Omit<AssignmentConfig, 'apiKey'> = {
Expand Down
Loading
Loading