Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use BroadcastChannel to support sub workers #25

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
64 changes: 42 additions & 22 deletions sync-api-common/src/browser/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,62 @@
* ------------------------------------------------------------------------------------------ */

import RAL from '../common/ral';
import { BaseServiceConnection, BaseClientConnection, Message, RequestType } from '../common/connection';
import { BaseServiceConnection, BaseClientConnection, Message, RequestType, KnownConnectionIds, BroadcastChannelName } from '../common/connection';

export class ClientConnection<Requests extends RequestType | undefined = undefined> extends BaseClientConnection<Requests> {

private readonly port: MessagePort | Worker | DedicatedWorkerGlobalScope;
private readonly channel: BroadcastChannel;

constructor(port: MessagePort | Worker | DedicatedWorkerGlobalScope) {
super();
this.port = port;
this.port.onmessage = ((event: MessageEvent<Message>) => {
this.handleMessage(event.data);
});
constructor(channelName: string = BroadcastChannelName) {
super(self.location.pathname);
this.channel = new BroadcastChannel(channelName);
this.channel.addEventListener('message', this._handleMessageEvent.bind(this));
}

dispose() {
this.channel.removeEventListener('message', this._handleMessageEvent.bind(this));
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not happy with requiring a dispose, but since Broadcast channels are global, they need to be cleaned up (don't go away on Worker shutdown)

An alternative would be to force a new channel name on each Client/Server pair, but that would be hard to reconcile because the Client is created in the worker and doesn't have access to the memory of the Server.

Additionally, the BroadcastChannel only works on the same origin. Would the same origin be used for the extension host as the workers in vscode.dev?

this.channel.close();
}

protected postMessage(sharedArrayBuffer: SharedArrayBuffer) {
this.port.postMessage(sharedArrayBuffer);
this.channel.postMessage(sharedArrayBuffer);
}

_handleMessageEvent(ev: MessageEvent) {
try {
if (ev.data.dest === this.connectionId || ev.data.dest === KnownConnectionIds.All) {
this.handleMessage(ev.data);
}
} catch (error) {
RAL().console.error(error);
}
}
}

export class ServiceConnection<RequestHandlers extends RequestType | undefined = undefined> extends BaseServiceConnection<RequestHandlers> {

private readonly port: MessagePort | Worker | DedicatedWorkerGlobalScope;
private readonly channel: BroadcastChannel;

constructor(port: MessagePort | Worker | DedicatedWorkerGlobalScope) {
super();
this.port = port;
this.port.onmessage = (async (event: MessageEvent<SharedArrayBuffer>) => {
try {
await this.handleMessage(event.data);
} catch (error) {
RAL().console.error(error);
}
});
constructor(channelName: string = BroadcastChannelName) {
super(KnownConnectionIds.Main);
this.channel = new BroadcastChannel(channelName);
this.channel.addEventListener('message', this._handleMessageEvent.bind(this));
}

dispose() {
this.channel.removeEventListener('message', this._handleMessageEvent.bind(this));
this.channel.close();
}

protected postMessage(message: Message) {
this.channel.postMessage(message);
}

protected postMessage(message: Message): void {
this.port.postMessage(message);
async _handleMessageEvent(ev: MessageEvent) {
try {
await this.handleMessage(ev.data);
} catch (error) {
RAL().console.error(error);
}
}
}
5 changes: 3 additions & 2 deletions sync-api-common/src/browser/ril.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ class TestServiceConnection<RequestHandlers extends RequestType | undefined = un
constructor(script: string, testCase?: string) {
const url = testCase !== undefined ? `${script}?toRun=${testCase}` : script;
const worker = new Worker(url);
super(worker);
super();
this.worker = worker;
}
public terminate(): Promise<number> {
this.worker.terminate();
this.dispose();
return Promise.resolve(0);
}
}
Expand Down Expand Up @@ -60,7 +61,7 @@ const _ril: RIL = Object.freeze<RIL>({
$testing: Object.freeze({
ClientConnection: Object.freeze({
create<Requests extends RequestType | undefined = undefined>() {
return new ClientConnection<Requests>(self);
return new ClientConnection<Requests>();
}
}),
ServiceConnection: Object.freeze({
Expand Down
34 changes: 29 additions & 5 deletions sync-api-common/src/common/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export type u64 = number;
export type size = u32;

export type Message = {
dest: string;
method: string;
params?: Params;
};
Expand Down Expand Up @@ -53,6 +54,8 @@ export type Params = {

export type Request = {
id: number;
src: string;
dest: string;
} & Message;

export namespace Request {
Expand Down Expand Up @@ -85,6 +88,13 @@ export type RequestType = MessageType & ({
result?: TypedArray | object | null;
});

export const BroadcastChannelName = `@vscode/sync-api/default`;

export enum KnownConnectionIds {
Main = 'main',
All = 'all'
}

class NoResult {
public static readonly kind = 0 as const;
constructor() {
Expand Down Expand Up @@ -572,6 +582,7 @@ export class RPCError extends Error {
export interface ClientConnection<Requests extends RequestType | undefined = undefined> {
readonly sendRequest: SendRequestSignatures<Requests>;
serviceReady(): Promise<void>;
dispose(): void;
}

export abstract class BaseClientConnection<Requests extends RequestType | undefined = undefined> implements ClientConnection<Requests> {
Expand All @@ -582,7 +593,7 @@ export abstract class BaseClientConnection<Requests extends RequestType | undefi
private readonly readyPromise: Promise<void>;
private readyCallbacks: PromiseCallbacks | undefined;

constructor() {
constructor(protected connectionId: string) {
this.id = 1;
this.textEncoder = RAL().TextEncoder.create();
this.textDecoder = RAL().TextDecoder.create();
Expand All @@ -592,14 +603,15 @@ export abstract class BaseClientConnection<Requests extends RequestType | undefi
}

public serviceReady(): Promise<void> {
this._sendRequest('$/checkready');
return this.readyPromise;
}

public readonly sendRequest: SendRequestSignatures<Requests> = this._sendRequest as SendRequestSignatures<Requests>;

private _sendRequest(method: string, arg1?: Params | ResultType | number, arg2?: ResultType | number, arg3?: number): { errno: 0; data: any } | { errno: RPCErrno } {
const id = this.id++;
const request: Request = { id: id, method };
const request: Request = { id: id, dest: 'main', src: this.connectionId, method };
let params: Params | undefined = undefined;
let resultType: ResultType = new NoResult();
let timeout: number | undefined = undefined;
Expand Down Expand Up @@ -658,7 +670,7 @@ export abstract class BaseClientConnection<Requests extends RequestType | undefi
raw.set(binaryData, binaryOffset);
}

// Send the shard array buffer to the other worker
// Send the shared array buffer to the other worker
const sync = new Int32Array(sharedArrayBuffer, 0, 1);
Atomics.store(sync, 0, 0);
// Send the shared array buffer to the extension host worker
Expand Down Expand Up @@ -721,6 +733,8 @@ export abstract class BaseClientConnection<Requests extends RequestType | undefi

protected abstract postMessage(sharedArrayBuffer: SharedArrayBuffer): any;

abstract dispose(): void;

protected handleMessage(message: Message): void {
if (message.method === '$/ready') {
this.readyCallbacks!.resolve(message.params);
Expand Down Expand Up @@ -763,6 +777,7 @@ type RequestHandler = {
export interface ServiceConnection<RequestHandlers extends RequestType | undefined = undefined> {
readonly onRequest: HandleRequestSignatures<RequestHandlers>;
signalReady(): void;
dispose(): void;
}

export abstract class BaseServiceConnection<RequestHandlers extends RequestType | undefined = undefined> implements ServiceConnection<RequestHandlers> {
Expand All @@ -771,8 +786,9 @@ export abstract class BaseServiceConnection<RequestHandlers extends RequestType
private readonly textEncoder: RAL.TextEncoder;
private readonly requestHandlers: Map<string, RequestHandler>;
private readonly requestResults: Map<number, TypedArray>;
private sentReady = false;

constructor() {
constructor(protected readonly connectionId: string) {
this.textDecoder = RAL().TextDecoder.create();
this.textEncoder = RAL().TextEncoder.create();
this.requestHandlers = new Map();
Expand Down Expand Up @@ -809,6 +825,11 @@ export abstract class BaseServiceConnection<RequestHandlers extends RequestType
} else {
header[HeaderIndex.errno] = RPCErrno.LazyResultFailed;
}
} else if (message.method === '$/checkready') {
// Client may not have been active when ready signal was sent. Send it again
if (this.sentReady) {
this.signalReady();
}
} else {
if (message.params?.binary === null) {
const binaryParamsLength = header[HeaderIndex.binaryParamByteLength];
Expand Down Expand Up @@ -873,9 +894,12 @@ export abstract class BaseServiceConnection<RequestHandlers extends RequestType
}

public signalReady(): void {
const notification: Notification = { method: '$/ready' };
this.sentReady = true;
const notification: Notification = { method: '$/ready', dest: KnownConnectionIds.All };
this.postMessage(notification);
}

protected abstract postMessage(message: Message): void;

abstract dispose(): void;
}
1 change: 1 addition & 0 deletions sync-api-common/src/common/test/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export type AssertionErrorData = {
operator: string;
generatedMessage: boolean;
code: string;
stack?: string;
};

export type ErrorData = {
Expand Down
4 changes: 3 additions & 1 deletion sync-api-common/src/common/test/workers/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ export async function runSingle(test: (connection: ClientConnection<TestRequests
expected: error.expected,
operator: error.operator,
generatedMessage: error.generatedMessage,
code: error.code
code: error.code,
stack: error.stack
});
} else if (error instanceof Error) {
connection.sendRequest('testing/error', {
Expand All @@ -51,5 +52,6 @@ export async function runSingle(test: (connection: ClientConnection<TestRequests
}
} finally {
connection.sendRequest('testing/done');
connection.dispose();
}
}
53 changes: 34 additions & 19 deletions sync-api-common/src/node/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,64 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
* ------------------------------------------------------------------------------------------ */

import { MessagePort, Worker } from 'worker_threads';
import { BroadcastChannel, isMainThread, threadId } from 'worker_threads';

import RAL from '../common/ral';
import { BaseServiceConnection, BaseClientConnection, Message, RequestType } from '../common/connection';
import { BaseServiceConnection, BaseClientConnection, Message, RequestType, BroadcastChannelName, KnownConnectionIds } from '../common/connection';

export class ClientConnection<Requests extends RequestType | undefined = undefined> extends BaseClientConnection<Requests> {

private readonly port: MessagePort | Worker;
private readonly channel: BroadcastChannel;

constructor(port: MessagePort | Worker) {
super();
this.port = port;
this.port.on('message', (message: Message) => {
constructor(channelName: string = BroadcastChannelName) {
super(isMainThread ? KnownConnectionIds.Main : threadId.toString());
this.channel = new BroadcastChannel(channelName);
this.channel.onmessage = (message: any) => {
try {
this.handleMessage(message);
if (message.data?.dest === this.connectionId || message.data?.dest === KnownConnectionIds.All) {
this.handleMessage(message.data);
}
} catch (error) {
RAL().console.error(error);
}
});
};
}

dispose() {
this.channel.onmessage = () => {};
this.channel.close();
}

protected postMessage(sharedArrayBuffer: SharedArrayBuffer) {
this.port.postMessage(sharedArrayBuffer);
this.channel.postMessage(sharedArrayBuffer);
}
}

export class ServiceConnection<RequestHandlers extends RequestType | undefined = undefined> extends BaseServiceConnection<RequestHandlers> {

private readonly port: MessagePort | Worker;
private readonly channel: BroadcastChannel;

constructor(port: MessagePort | Worker) {
super();
this.port = port;
this.port.on('message', async (sharedArrayBuffer: SharedArrayBuffer) => {
constructor(channelName: string = BroadcastChannelName) {
super(isMainThread ? KnownConnectionIds.Main : threadId.toString());
this.channel = new BroadcastChannel(channelName);
this.channel.onmessage = async (message: any) => {
try {
await this.handleMessage(sharedArrayBuffer);
// Skip broadcast messages that aren't SharedArrayBuffers
if (message.data?.byteLength) {
await this.handleMessage(message.data as SharedArrayBuffer);
}
} catch (error) {
RAL().console.error(error);
}
});
};
}

dispose() {
this.channel.onmessage = () => {};
this.channel.close();
}

protected postMessage(message: Message): void {
this.port.postMessage(message);
protected postMessage(message: Message) {
this.channel.postMessage(message);
}
}
6 changes: 3 additions & 3 deletions sync-api-common/src/node/ril.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
* ------------------------------------------------------------------------------------------ */
import * as path from 'path';
import { TextDecoder } from 'util';
import { parentPort, Worker } from 'worker_threads';

Expand All @@ -18,10 +17,11 @@ class TestServiceConnection<RequestHandlers extends RequestType | undefined = un
private readonly worker: Worker;
constructor(script: string, testCase?: string) {
const worker = new Worker(script, testCase !== undefined ? { argv: [testCase] } : undefined);
super(worker);
super();
this.worker = worker;
}
public terminate(): Promise<number> {
this.dispose();
return this.worker.terminate();
}
}
Expand Down Expand Up @@ -63,7 +63,7 @@ const _ril: RIL = Object.freeze<RIL>({
if (!parentPort) {
throw new Error(`No parent port defined. Shouldn't happen in test setup`);
}
return new ClientConnection<Requests>(parentPort);
return new ClientConnection<Requests>();
}
}),
ServiceConnection: Object.freeze({
Expand Down
Loading