Skip to content

Commit

Permalink
feat(session-replay-browser): WebWorker support for compression (#932)
Browse files Browse the repository at this point in the history
* feat(session-replay-browser): webworkers first pass

* test(session-replay-browser): webworkers additional coverage

* fix(session-replay-browser): build fix due to typing issues

* feat(session-replay-browser): adding options, fixing build, and adding test coverage

* fix(session-replay-browser): pr fixes
  • Loading branch information
lewgordon-amplitude authored Dec 17, 2024
1 parent 20adea2 commit 4ebe04a
Show file tree
Hide file tree
Showing 12 changed files with 233 additions and 7 deletions.
10 changes: 9 additions & 1 deletion packages/plugin-session-replay-browser/rollup.config.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import { iife, umd } from '../../scripts/build/rollup.config';
import { webWorkerPlugins } from '../session-replay-browser/rollup.config';

iife.input = umd.input;
iife.output.name = 'sessionReplay';

export default [umd, iife];
export default async () => {
const commonPlugins = await webWorkerPlugins();

iife.plugins = [...commonPlugins, ...iife.plugins];
umd.plugins = [...commonPlugins, ...umd.plugins];

return [iife, umd];
};
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export class SessionReplayPlugin implements EnrichmentPlugin {
version: { type: 'plugin', version: VERSION },
performanceConfig: this.options.performanceConfig,
storeType: this.options.storeType,
experimental: this.options.experimental,
}).promise;
} catch (error) {
config.loggerProvider.error(`Session Replay: Failed to initialize due to ${(error as Error).message}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ export interface SessionReplayOptions {
performanceConfig?: SessionReplayPerformanceConfig;
storeType?: StoreType;
customSessionId?: (event: Event) => string | undefined;
experimental?: {
useWebWorker: boolean;
};
}
1 change: 1 addition & 0 deletions packages/session-replay-browser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"@amplitude/rrweb": "2.0.0-alpha.26",
"@amplitude/rrweb-packer": "2.0.0-alpha.26",
"@amplitude/rrweb-snapshot": "2.0.0-alpha.26",
"@rollup/plugin-replace": "^6.0.1",
"idb": "^8.0.0",
"tslib": "^2.4.1"
},
Expand Down
56 changes: 55 additions & 1 deletion packages/session-replay-browser/rollup.config.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,60 @@
import { iife, umd } from '../../scripts/build/rollup.config';


import resolve from '@rollup/plugin-node-resolve';
import replace from '@rollup/plugin-replace';
import { terser } from 'rollup-plugin-terser';
import typescript from '@rollup/plugin-typescript';
import { rollup } from 'rollup';
import path from 'node:path';

iife.input = umd.input;
iife.output.name = 'sessionReplay';

export default [umd, iife];
async function buildWebWorker() {
const input = path.join(path.dirname(new URL(import.meta.url).pathname), './src/worker/compression.ts');
const bundle = await rollup({
input,
plugins: [
resolve({
browser: true,
}),
typescript({
tsconfig: 'tsconfig.json',
// no need to output types
declaration: false,
declarationMap: false,
}),
terser(),
],
});

const { output } = await bundle.generate({
format: 'iife',
name: 'WebWorker',
inlineDynamicImports: true,
});
const webWorkerCode = output[0].code;

return webWorkerCode;
}

export async function webWorkerPlugins() {
return [
replace({
preventAssignment: true,
values: {
'replace.COMPRESSION_WEBWORKER_BODY': JSON.stringify(await buildWebWorker()),
},
}),
];
}

export default async () => {
const commonPlugins = await webWorkerPlugins();

iife.plugins = [...commonPlugins, ...iife.plugins];
umd.plugins = [...commonPlugins, ...umd.plugins];

return [iife, umd];
};
4 changes: 4 additions & 0 deletions packages/session-replay-browser/src/config/local-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export class SessionReplayLocalConfig extends Config implements ISessionReplayLo
version?: SessionReplayVersion;
storeType: StoreType;
performanceConfig?: SessionReplayPerformanceConfig;
experimental?: { useWebWorker: boolean };

constructor(apiKey: string, options: SessionReplayOptions) {
const defaultConfig = getDefaultConfig();
Expand Down Expand Up @@ -59,5 +60,8 @@ export class SessionReplayLocalConfig extends Config implements ISessionReplayLo
if (options.debugMode) {
this.debugMode = options.debugMode;
}
if (options.experimental) {
this.experimental = options.experimental;
}
}
}
4 changes: 4 additions & 0 deletions packages/session-replay-browser/src/config/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ export interface SessionReplayLocalConfig extends Config {
version?: SessionReplayVersion;
performanceConfig?: SessionReplayPerformanceConfig;
storeType: StoreType;

experimental?: {
useWebWorker: boolean;
};
}

export interface SessionReplayJoinedConfig extends SessionReplayLocalConfig {
Expand Down
44 changes: 39 additions & 5 deletions packages/session-replay-browser/src/events/event-compressor.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { getGlobalScope } from '@amplitude/analytics-client-common';
import { pack } from '@amplitude/rrweb-packer';
import type { eventWithTime } from '@amplitude/rrweb-types';
import { SessionReplayJoinedConfig } from 'src/config/types';
import { SessionReplayEventsManager } from 'src/typings/session-replay';
import { pack } from '@amplitude/rrweb-packer';
import { getGlobalScope } from '@amplitude/analytics-client-common';

interface TaskQueue {
event: eventWithTime;
Expand All @@ -18,18 +18,40 @@ export class EventCompressor {
deviceId: string | undefined;
canUseIdleCallback: boolean | undefined;
timeout: number;
worker?: Worker;

constructor(
eventsManager: SessionReplayEventsManager<'replay' | 'interaction', string>,
config: SessionReplayJoinedConfig,
deviceId: string | undefined,
workerScriptInternal?: string, // this is used for unit testing
) {
const globalScope = getGlobalScope();
this.canUseIdleCallback = globalScope && 'requestIdleCallback' in globalScope;
this.eventsManager = eventsManager;
this.config = config;
this.deviceId = deviceId;
this.timeout = config.performanceConfig?.timeout || DEFAULT_TIMEOUT;

// These two lines will be changed at compile time.
const replace: Record<string, string> = {};
// This next line is going to be ridiculously hard to cover in unit tests, ignoring.
/* istanbul ignore next */
const workerScript = replace.COMPRESSION_WEBWORKER_BODY ?? workerScriptInternal;
if (this.config.experimental?.useWebWorker && globalScope && globalScope.Worker && workerScript) {
config.loggerProvider.log('[Experimental] Enabling web worker for compression');

const worker = new Worker(URL.createObjectURL(new Blob([workerScript], { type: 'application/javascript' })));
worker.onerror = (e) => {
config.loggerProvider.error(e);
};
worker.onmessage = (e) => {
const { compressedEvent, sessionId } = e.data as Record<string, string>;
this.addCompressedEventToManager(compressedEvent, sessionId);
};

this.worker = worker;
}
}

// Schedule processing during idle time
Expand Down Expand Up @@ -86,9 +108,7 @@ export class EventCompressor {
return JSON.stringify(packedEvent);
};

public addCompressedEvent = (event: eventWithTime, sessionId: string | number) => {
const compressedEvent = this.compressEvent(event);

private addCompressedEventToManager = (compressedEvent: string, sessionId: string | number) => {
if (this.eventsManager && this.deviceId) {
this.eventsManager.addEvent({
event: { type: 'replay', data: compressedEvent },
Expand All @@ -97,4 +117,18 @@ export class EventCompressor {
});
}
};

public addCompressedEvent = (event: eventWithTime, sessionId: string | number) => {
if (this.worker) {
// This indirectly compresses the event.
this.worker.postMessage({ event, sessionId });
} else {
const compressedEvent = this.compressEvent(event);
this.addCompressedEventToManager(compressedEvent, sessionId);
}
};

public terminate = () => {
this.worker?.terminate();
};
}
4 changes: 4 additions & 0 deletions packages/session-replay-browser/src/session-replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ export class SessionReplay implements AmplitudeSessionReplay {
}

this.eventsManager = new MultiEventManager<'replay' | 'interaction', string>(...managers);
// To prevent too many threads.
if (this.eventCompressor) {
this.eventCompressor.terminate();
}
this.eventCompressor = new EventCompressor(this.eventsManager, this.config, this.getDeviceId());

this.loggerProvider.log('Installing @amplitude/session-replay-browser.');
Expand Down
15 changes: 15 additions & 0 deletions packages/session-replay-browser/src/worker/compression.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { pack } from '@amplitude/rrweb-packer';

onmessage = (e) => {
const { event, sessionId } = e.data;

// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
const compressedEvent = JSON.stringify(pack(event));

postMessage({ compressedEvent, sessionId });
};

// added for testing
export const compressionOnMessage = onmessage;
66 changes: 66 additions & 0 deletions packages/session-replay-browser/test/event-compressor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { SessionReplayLocalConfig } from '../src/config/local-config';
import { EventCompressor } from '../src/events/event-compressor';
import { createEventsManager } from '../src/events/events-manager';
import { SessionReplayEventsManager } from '../src/typings/session-replay';
import { eventWithTime } from '@amplitude/rrweb';

const mockEvent = {
type: 4,
Expand Down Expand Up @@ -41,6 +42,9 @@ describe('EventCompressor', () => {
enabled: true,
timeout: 2000,
},
experimental: {
useWebWorker: true,
},
});

beforeEach(async () => {
Expand Down Expand Up @@ -159,4 +163,66 @@ describe('EventCompressor', () => {
// Ensure processQueue was called recursively
expect(processQueueMock).toHaveBeenCalledTimes(2);
});

test.each([true, false])('should use webworkers if script exists', async (error: boolean) => {
let postMessageMock = jest.fn();
let onMessageMock = jest.fn();
let onErrorMock = jest.fn();
let terminateMock = jest.fn();
class MockWorker {
postMessage = (e: any) => {
postMessageMock = jest.fn(() => {
this.onmessage({ data: { compressedEvent: '', sessionId: 1234 } });
});
onErrorMock = jest.fn(() => {
this.onerror(e);
});
if (error) {
onErrorMock(e);
} else {
postMessageMock(e);
}
};
onmessage = (e: any) => {
onMessageMock = jest.fn();
onMessageMock(e);
};
onerror = (e: any) => {
onErrorMock = jest.fn();
onErrorMock(e);
};
terminate = () => {
terminateMock = jest.fn();
terminateMock();
};
}

global.Worker = MockWorker as unknown as typeof global.Worker;

URL.createObjectURL = jest.fn();
eventsManager = await createEventsManager<'replay'>({
config,
type: 'replay',
storeType: 'memory',
});
eventCompressor = new EventCompressor(eventsManager, config, deviceId, 'console.log("hi")');

const testEvent: eventWithTime = {
data: {
height: 1,
width: 1,
href: 'http://localhost',
},
type: 4,
timestamp: 1,
};
const testSessionId = 1234;
eventCompressor.addCompressedEvent(testEvent, testSessionId);

expect(postMessageMock).toHaveBeenCalledTimes(error ? 0 : 1);
expect(onErrorMock).toHaveBeenCalledTimes(error ? 1 : 0);

eventCompressor.terminate();
expect(terminateMock).toHaveBeenCalled();
});
});
32 changes: 32 additions & 0 deletions packages/session-replay-browser/test/worker/compression.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { eventWithTime } from '@amplitude/rrweb';
import { compressionOnMessage } from '../../src/worker/compression';
import { pack } from '@amplitude/rrweb-packer';

describe('compression', () => {
test('should compress event', async () => {
global.postMessage = jest.fn();

const testEvent: eventWithTime = {
timestamp: 1,
type: 4,
data: {
height: 1,
width: 1,
href: 'http://localhost',
},
};

// hack to make typescript not complain
(compressionOnMessage as (_: unknown) => void)({
data: {
event: testEvent,
sessionId: 1234,
},
});

expect(global.postMessage).toHaveBeenCalledWith({
sessionId: 1234,
compressedEvent: JSON.stringify(pack(testEvent)),
});
});
});

0 comments on commit 4ebe04a

Please sign in to comment.