From 729d81623815a428160b93e930054a1a2a5403ef Mon Sep 17 00:00:00 2001 From: Benjamin Pasero Date: Sun, 17 Oct 2021 10:40:21 +0200 Subject: [PATCH] sandbox - implement affinity and lifecycle for shared process workers (#132282) --- .../sharedProcess/sharedProcessMain.ts | 28 ++- .../files/node/diskFileSystemProvider.ts | 72 +++--- .../common/sharedProcessWorkerService.ts | 36 ++- .../electron-browser/sharedProcessWorker.ts | 20 +- .../sharedProcessWorkerMain.ts | 135 +++++++---- .../sharedProcessWorkerService.ts | 216 +++++++++++++++--- .../electron-main/sharedProcess.ts | 51 ++++- .../platform/windows/electron-main/window.ts | 2 +- .../sharedProcessWorkerWorkbenchService.ts | 3 + 9 files changed, 417 insertions(+), 146 deletions(-) diff --git a/src/vs/code/electron-browser/sharedProcess/sharedProcessMain.ts b/src/vs/code/electron-browser/sharedProcess/sharedProcessMain.ts index dc6b0da4336bf..b3e2c22751f33 100644 --- a/src/vs/code/electron-browser/sharedProcess/sharedProcessMain.ts +++ b/src/vs/code/electron-browser/sharedProcess/sharedProcessMain.ts @@ -94,13 +94,15 @@ import { ITunnelService } from 'vs/platform/remote/common/tunnel'; import { TunnelService } from 'vs/platform/remote/node/tunnelService'; import { ipcSharedProcessTunnelChannelName, ISharedProcessTunnelService } from 'vs/platform/remote/common/sharedProcessTunnelService'; import { SharedProcessTunnelService } from 'vs/platform/remote/node/sharedProcessTunnelService'; -import { ipcSharedProcessWorkerChannelName, ISharedProcessWorkerService } from 'vs/platform/sharedProcess/common/sharedProcessWorkerService'; +import { ipcSharedProcessWorkerChannelName, ISharedProcessWorkerConfiguration, ISharedProcessWorkerService } from 'vs/platform/sharedProcess/common/sharedProcessWorkerService'; import { SharedProcessWorkerService } from 'vs/platform/sharedProcess/electron-browser/sharedProcessWorkerService'; class SharedProcessMain extends Disposable { private server = this._register(new MessagePortServer()); + private sharedProcessWorkerService: ISharedProcessWorkerService | undefined = undefined; + constructor(private configuration: ISharedProcessConfiguration) { super(); @@ -112,10 +114,25 @@ class SharedProcessMain extends Disposable { private registerListeners(): void { - // Dispose on exit + // Shared process lifecycle const onExit = () => this.dispose(); process.once('exit', onExit); ipcRenderer.once('vscode:electron-main->shared-process=exit', onExit); + + // Shared process worker lifecycle + // + // We dispose the listener when the shared process is + // disposed to avoid disposing workers when the entire + // application is shutting down anyways. + // + const eventName = 'vscode:electron-main->shared-process=disposeWorker'; + const onDisposeWorker = (event: unknown, configuration: ISharedProcessWorkerConfiguration) => this.onDisposeWorker(configuration); + ipcRenderer.on(eventName, onDisposeWorker); + this._register(toDisposable(() => ipcRenderer.removeListener(eventName, onDisposeWorker))); + } + + private onDisposeWorker(configuration: ISharedProcessWorkerConfiguration): void { + this.sharedProcessWorkerService?.disposeWorker(configuration); } async open(): Promise { @@ -162,9 +179,6 @@ class SharedProcessMain extends Disposable { const mainProcessService = new MessagePortMainProcessService(this.server, mainRouter); services.set(IMainProcessService, mainProcessService); - // Worker - services.set(ISharedProcessWorkerService, new SyncDescriptor(SharedProcessWorkerService)); - // Environment const environmentService = new NativeEnvironmentService(this.configuration.args, productService); services.set(INativeEnvironmentService, environmentService); @@ -183,6 +197,10 @@ class SharedProcessMain extends Disposable { const logService = this._register(new FollowerLogService(logLevelClient, multiplexLogger)); services.set(ILogService, logService); + // Worker + this.sharedProcessWorkerService = new SharedProcessWorkerService(logService); + services.set(ISharedProcessWorkerService, this.sharedProcessWorkerService); + // Files const fileService = this._register(new FileService(logService)); services.set(IFileService, fileService); diff --git a/src/vs/platform/files/node/diskFileSystemProvider.ts b/src/vs/platform/files/node/diskFileSystemProvider.ts index 5306ead6089ba..088a6926d7612 100644 --- a/src/vs/platform/files/node/diskFileSystemProvider.ts +++ b/src/vs/platform/files/node/diskFileSystemProvider.ts @@ -10,7 +10,7 @@ import { VSBuffer } from 'vs/base/common/buffer'; import { CancellationToken } from 'vs/base/common/cancellation'; import { Emitter, Event } from 'vs/base/common/event'; import { isEqual } from 'vs/base/common/extpath'; -import { combinedDisposable, Disposable, dispose, IDisposable, toDisposable } from 'vs/base/common/lifecycle'; +import { combinedDisposable, Disposable, IDisposable, toDisposable } from 'vs/base/common/lifecycle'; import { basename, dirname, normalize } from 'vs/base/common/path'; import { isLinux, isWindows } from 'vs/base/common/platform'; import { joinPath } from 'vs/base/common/resources'; @@ -536,8 +536,6 @@ export class DiskFileSystemProvider extends Disposable implements private readonly recursiveFoldersToWatch: IWatchRequest[] = []; private recursiveWatchRequestDelayer = this._register(new ThrottledDelayer(0)); - private recursiveWatcherLogLevelListener: IDisposable | undefined; - watch(resource: URI, opts: IWatchOptions): IDisposable { if (opts.recursive) { return this.watchRecursive(resource, opts); @@ -574,6 +572,35 @@ export class DiskFileSystemProvider extends Disposable implements }); } + private doRefreshRecursiveWatchers(): void { + + // Reuse existing + if (this.recursiveWatcher) { + this.recursiveWatcher.watch(this.recursiveFoldersToWatch); + } + + // Otherwise, create new if we have folders to watch + else if (this.recursiveFoldersToWatch.length > 0) { + this.recursiveWatcher = this._register(this.createRecursiveWatcher( + this.recursiveFoldersToWatch, + changes => this._onDidChangeFile.fire(toFileChanges(changes)), + msg => { + if (msg.type === 'error') { + this._onDidWatchErrorOccur.fire(msg.message); + } + + this.logService[msg.type](msg.message); + }, + this.logService.getLevel() === LogLevel.Trace + )); + + // Apply log levels dynamicaly + this._register(this.logService.onDidChangeLogLevel(() => { + this.recursiveWatcher?.setVerboseLogging(this.logService.getLevel() === LogLevel.Trace); + })); + } + } + protected createRecursiveWatcher( folders: IWatchRequest[], onChange: (changes: IDiskFileChange[]) => void, @@ -632,35 +659,6 @@ export class DiskFileSystemProvider extends Disposable implements ); } - private doRefreshRecursiveWatchers(): void { - - // Reuse existing - if (this.recursiveWatcher) { - this.recursiveWatcher.watch(this.recursiveFoldersToWatch); - } - - // Otherwise, create new if we have folders to watch - else if (this.recursiveFoldersToWatch.length > 0) { - this.recursiveWatcher = this.createRecursiveWatcher( - this.recursiveFoldersToWatch, - changes => this._onDidChangeFile.fire(toFileChanges(changes)), - msg => { - if (msg.type === 'error') { - this._onDidWatchErrorOccur.fire(msg.message); - } - - this.logService[msg.type](msg.message); - }, - this.logService.getLevel() === LogLevel.Trace - ); - - // Apply log levels dynamicaly - this.recursiveWatcherLogLevelListener = this.logService.onDidChangeLogLevel(() => { - this.recursiveWatcher?.setVerboseLogging(this.logService.getLevel() === LogLevel.Trace); - }); - } - } - private watchNonRecursive(resource: URI): IDisposable { const watcherService = new NodeJSWatcherService( this.toFilePath(resource), @@ -741,14 +739,4 @@ export class DiskFileSystemProvider extends Disposable implements } //#endregion - - override dispose(): void { - super.dispose(); - - dispose(this.recursiveWatcher); - this.recursiveWatcher = undefined; - - dispose(this.recursiveWatcherLogLevelListener); - this.recursiveWatcherLogLevelListener = undefined; - } } diff --git a/src/vs/platform/sharedProcess/common/sharedProcessWorkerService.ts b/src/vs/platform/sharedProcess/common/sharedProcessWorkerService.ts index 56602f1ee6965..c0a3d69149a72 100644 --- a/src/vs/platform/sharedProcess/common/sharedProcessWorkerService.ts +++ b/src/vs/platform/sharedProcess/common/sharedProcessWorkerService.ts @@ -3,6 +3,7 @@ * Licensed under the MIT License. See License.txt in the project root for license information. *--------------------------------------------------------------------------------------------*/ +import { hash as hashObject } from 'vs/base/common/hash'; import { createDecorator } from 'vs/platform/instantiation/common/instantiation'; export interface ISharedProcessWorkerProcess { @@ -28,7 +29,7 @@ export interface ISharedProcessWorkerConfiguration { /** * Configuration specific for how to respond with the - * communication message port. + * communication message port to the receiver window. */ reply: { windowId: number; @@ -37,6 +38,19 @@ export interface ISharedProcessWorkerConfiguration { }; } +/** + * Converts the process configuration into a hash to + * identify processes of the same kind by taking those + * components that make the process and reply unique. + */ +export function hash(configuration: ISharedProcessWorkerConfiguration): number { + return hashObject({ + moduleId: configuration.process.moduleId, + windowId: configuration.reply.windowId, + channelId: configuration.reply.channel + }); +} + export const ISharedProcessWorkerService = createDecorator('sharedProcessWorkerService'); export const ipcSharedProcessWorkerChannelName = 'sharedProcessWorker'; @@ -46,10 +60,22 @@ export interface ISharedProcessWorkerService { readonly _serviceBrand: undefined; /** - * Forks the provided process from the passed in configuration inside - * the shared process and establishes a `MessagePort` communication - * channel that is being sent back to via the `reply` options of the - * configuration. + * Will fork a new process with the provided module identifier off the shared + * process and establishes a message port connection to that process. The other + * end of the message port connection will be sent back to the calling window + * as identified by the `reply` configuration. + * + * Requires the forked process to be AMD module that uses our IPC channel framework + * to respond to the provided `channelName` as a server. + * + * The process will be automatically terminated when the receiver window closes, + * crashes or loads/reloads. It can also explicitly be terminated by calling + * `disposeWorker`. */ createWorker(configuration: ISharedProcessWorkerConfiguration): Promise; + + /** + * Terminates the process for the provided configuration if any. + */ + disposeWorker(configuration: ISharedProcessWorkerConfiguration): Promise; } diff --git a/src/vs/platform/sharedProcess/electron-browser/sharedProcessWorker.ts b/src/vs/platform/sharedProcess/electron-browser/sharedProcessWorker.ts index e009f00d03fc2..62152c639674c 100644 --- a/src/vs/platform/sharedProcess/electron-browser/sharedProcessWorker.ts +++ b/src/vs/platform/sharedProcess/electron-browser/sharedProcessWorker.ts @@ -7,17 +7,19 @@ import { ISharedProcessWorkerConfiguration } from 'vs/platform/sharedProcess/com export enum SharedProcessWorkerMessages { - // Message Port Exchange - RequestPort = 'vscode:requestSharedProcessWorkerPort', - ReceivePort = 'vscode:receiveSharedProcessWorkerPort', + // Process + WorkerSpawn = 'vscode:shared-process->shared-process-worker=spawn', + WorkerTerminate = 'vscode:shared-process->shared-process-worker=terminate', // Lifecycle - WorkerReady = 'vscode:sharedProcessWorkerReady', + WorkerReady = 'vscode:shared-process-worker->shared-process=ready', + WorkerAck = 'vscode:shared-process-worker->shared-process=ack', // Diagnostics - WorkerTrace = 'vscode:sharedProcessWorkerTrace', - WorkerWarn = 'vscode:sharedProcessWorkerWarn', - WorkerError = 'vscode:sharedProcessWorkerError' + WorkerTrace = 'vscode:shared-process-worker->shared-process=trace', + WorkerInfo = 'vscode:shared-process-worker->shared-process=info', + WorkerWarn = 'vscode:shared-process-worker->shared-process=warn', + WorkerError = 'vscode:shared-process-worker->shared-process=error' } export interface ISharedProcessWorkerEnvironment { @@ -31,10 +33,12 @@ export interface ISharedProcessWorkerEnvironment { export interface ISharedProcessToWorkerMessage { id: string; configuration: ISharedProcessWorkerConfiguration; - environment: ISharedProcessWorkerEnvironment; + environment?: ISharedProcessWorkerEnvironment; + nonce?: string; } export interface IWorkerToSharedProcessMessage { id: string; message?: string; + nonce?: string; } diff --git a/src/vs/platform/sharedProcess/electron-browser/sharedProcessWorkerMain.ts b/src/vs/platform/sharedProcess/electron-browser/sharedProcessWorkerMain.ts index 8bd01a4235370..eb3ddb05d7e05 100644 --- a/src/vs/platform/sharedProcess/electron-browser/sharedProcessWorkerMain.ts +++ b/src/vs/platform/sharedProcess/electron-browser/sharedProcessWorkerMain.ts @@ -4,11 +4,12 @@ *--------------------------------------------------------------------------------------------*/ import { ChildProcess, fork } from 'child_process'; +import { log } from 'console'; import { VSBuffer } from 'vs/base/common/buffer'; -import { isRemoteConsoleLog, log } from 'vs/base/common/console'; +import { isRemoteConsoleLog } from 'vs/base/common/console'; import { toErrorMessage } from 'vs/base/common/errorMessage'; import { Event, Emitter } from 'vs/base/common/event'; -import { Disposable } from 'vs/base/common/lifecycle'; +import { hash } from 'vs/base/common/hash'; import { deepClone } from 'vs/base/common/objects'; import { removeDangerousEnvVariables } from 'vs/base/node/processes'; import { ISharedProcessWorkerConfiguration } from 'vs/platform/sharedProcess/common/sharedProcessWorkerService'; @@ -18,53 +19,90 @@ import { SharedProcessWorkerMessages, ISharedProcessToWorkerMessage, ISharedProc * The `create` function needs to be there by convention because * we are loaded via the `vs/base/worker/workerMain` utility. */ -export function create(): { onmessage: (message: ISharedProcessToWorkerMessage, transfer: Transferable[]) => void } { +export function create(): { onmessage: (message: ISharedProcessToWorkerMessage, transfer?: Transferable[]) => void } { + const sharedProcessWorkerMain = new SharedProcessWorkerMain(); - // Ask to receive the message channel port & config - postMessage({ id: SharedProcessWorkerMessages.RequestPort }); + // Signal we are ready + postMessage({ id: SharedProcessWorkerMessages.WorkerReady }); - // Return a message handler that awaits port and config return { - onmessage: (message, transfer) => { - switch (message.id) { - case SharedProcessWorkerMessages.ReceivePort: - if (transfer[0] instanceof MessagePort) { - Logger.trace('Received the message port and configuration'); - - try { - - // Spawn a new worker process with given configuration - const workerProcess = new SharedProcessWorkerProcess(transfer[0], message.configuration, message.environment); - workerProcess.spawn(); - - // Indicate we are ready - Logger.trace('Worker is ready'); - postMessage({ id: SharedProcessWorkerMessages.WorkerReady }); - } catch (error) { - Logger.error(`Unexpected error forking worker process: ${toErrorMessage(error)}`); - } - } - break; - - default: - Logger.warn(`Unexpected message '${message}'`); - } - } + onmessage: (message, transfer) => sharedProcessWorkerMain.onMessage(message, transfer) }; } -class SharedProcessWorkerProcess extends Disposable { +class SharedProcessWorkerMain { + + private readonly processes = new Map(); + + onMessage(message: ISharedProcessToWorkerMessage, transfer?: Transferable[]): void { + + // Handle message from shared process + switch (message.id) { + + // Spawn new process + case SharedProcessWorkerMessages.WorkerSpawn: + if (transfer && transfer[0] instanceof MessagePort && message.environment) { + this.spawn(transfer[0], message.configuration, message.environment); + } + break; + + // Terminate exisisting process + case SharedProcessWorkerMessages.WorkerTerminate: + this.terminate(message.configuration); + break; + + default: + Logger.warn(`Unexpected shared process message '${message}'`); + } + + // Acknowledge message processed if we have a nonce + if (message.nonce) { + postMessage({ + id: SharedProcessWorkerMessages.WorkerAck, + nonce: message.nonce + }); + } + } + + private spawn(port: MessagePort, configuration: ISharedProcessWorkerConfiguration, environment: ISharedProcessWorkerEnvironment): void { + try { + + // Ensure to terminate any existing process for config + this.terminate(configuration); + + // Spawn a new worker process with given configuration + const process = new SharedProcessWorkerProcess(port, configuration, environment); + process.spawn(); + + // Remember in map for lifecycle + this.processes.set(hash(configuration), process); + } catch (error) { + Logger.error(`Unexpected error forking worker process: ${toErrorMessage(error)}`); + } + } + + private terminate(configuration: ISharedProcessWorkerConfiguration): void { + const configurationHash = hash(configuration); + const process = this.processes.get(configurationHash); + if (process) { + this.processes.delete(configurationHash); + + process.kill(); + } + } +} + +class SharedProcessWorkerProcess { private child: ChildProcess | undefined = undefined; - private isDisposed = false; + private isKilled = false; constructor( private readonly port: MessagePort, private readonly configuration: ISharedProcessWorkerConfiguration, private readonly environment: ISharedProcessWorkerEnvironment ) { - super(); } spawn(): void { @@ -77,30 +115,34 @@ class SharedProcessWorkerProcess extends Disposable { { env: this.getEnv() } ); + Logger.info(`Starting worker process with pid ${this.child.pid} (type: ${this.configuration.process.type}, window: ${this.configuration.reply.windowId})`); + // Re-emit errors to outside this.child.on('error', error => Logger.warn(`Error from child process: ${toErrorMessage(error)}`)); // Handle unexpected termination this.child.on('exit', (code, signal) => { - if (this.isDisposed) { + Logger.info(`Worker process with pid ${this.child?.pid} exited with code ${code}, signal: ${signal} (type: ${this.configuration.process.type}, window: ${this.configuration.reply.windowId})`); + + if (this.isKilled) { return; } if (code !== 0 && signal !== 'SIGTERM') { - Logger.error(`Crashed with exit code ${code} and signal ${signal}`); + Logger.error(`Child process crashed with exit code ${code} and signal ${signal}`); } }); const onMessageEmitter = new Emitter(); const onRawMessage = Event.fromNodeEventEmitter(this.child, 'message', msg => msg); onRawMessage(msg => { - if (this.isDisposed) { + if (this.isKilled) { return; } // Handle remote console logs specially if (isRemoteConsoleLog(msg)) { - log(msg, `SharedProcess [worker]: `); + log(msg, `SharedProcess worker`); } // Anything else goes to the outside @@ -110,7 +152,7 @@ class SharedProcessWorkerProcess extends Disposable { }); const send = (buffer: VSBuffer) => { - if (this.isDisposed) { + if (this.isKilled) { return; } @@ -123,15 +165,13 @@ class SharedProcessWorkerProcess extends Disposable { // Re-emit messages from the process via the port const onMessage = onMessageEmitter.event; - onMessage(buffer => this.port.postMessage(buffer)); + onMessage(message => this.port.postMessage(message.buffer)); // Relay message from the port into the process this.port.onmessage = (e => send(VSBuffer.wrap(e.data))); } private getEnv(): NodeJS.ProcessEnv { - - // Build environment const env: NodeJS.ProcessEnv = { ...deepClone(process.env), VSCODE_AMD_ENTRYPOINT: this.configuration.process.moduleId, @@ -140,15 +180,16 @@ class SharedProcessWorkerProcess extends Disposable { VSCODE_PARENT_PID: String(process.pid) }; + // Sanitize environment removeDangerousEnvVariables(env); return env; } - override dispose(): void { - super.dispose(); + kill(): void { + Logger.trace('Terminating worker process'); - this.isDisposed = true; + this.isKilled = true; this.child?.kill(); } @@ -167,6 +208,10 @@ namespace Logger { postMessage({ id: SharedProcessWorkerMessages.WorkerWarn, message }); } + export function info(message: string): void { + postMessage({ id: SharedProcessWorkerMessages.WorkerInfo, message }); + } + export function trace(message: string): void { postMessage({ id: SharedProcessWorkerMessages.WorkerTrace, message }); } diff --git a/src/vs/platform/sharedProcess/electron-browser/sharedProcessWorkerService.ts b/src/vs/platform/sharedProcess/electron-browser/sharedProcessWorkerService.ts index 4c797cddf4f2a..4c6c2333f4431 100644 --- a/src/vs/platform/sharedProcess/electron-browser/sharedProcessWorkerService.ts +++ b/src/vs/platform/sharedProcess/electron-browser/sharedProcessWorkerService.ts @@ -4,15 +4,21 @@ *--------------------------------------------------------------------------------------------*/ import { ipcRenderer } from 'electron'; +import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation'; +import { IDisposable, toDisposable } from 'vs/base/common/lifecycle'; import { FileAccess } from 'vs/base/common/network'; +import { generateUuid } from 'vs/base/common/uuid'; import { ILogService } from 'vs/platform/log/common/log'; -import { ISharedProcessWorkerConfiguration, ISharedProcessWorkerService } from 'vs/platform/sharedProcess/common/sharedProcessWorkerService'; +import { hash, ISharedProcessWorkerConfiguration, ISharedProcessWorkerService } from 'vs/platform/sharedProcess/common/sharedProcessWorkerService'; import { SharedProcessWorkerMessages, ISharedProcessToWorkerMessage, IWorkerToSharedProcessMessage } from 'vs/platform/sharedProcess/electron-browser/sharedProcessWorker'; export class SharedProcessWorkerService implements ISharedProcessWorkerService { declare readonly _serviceBrand: undefined; + private readonly workers = new Map>(); + private readonly processes = new Map(); + constructor( @ILogService private readonly logService: ILogService ) { @@ -22,74 +28,222 @@ export class SharedProcessWorkerService implements ISharedProcessWorkerService { const workerLogId = `window: ${configuration.reply.windowId}, moduleId: ${configuration.process.moduleId}`; this.logService.trace(`SharedProcess: createWorker (${workerLogId})`); + // Ensure to dispose any existing process for config + this.disposeWorker(configuration); + + const cts = new CancellationTokenSource(); + + let worker: SharedProcessWebWorker | undefined = undefined; + let windowPort: MessagePort | undefined = undefined; + let workerPort: MessagePort | undefined = undefined; + + // Store as process for later disposal + const configurationHash = hash(configuration); + this.processes.set(configurationHash, toDisposable(() => { + + // Signal to token + cts.dispose(true); + + // Terminate process + worker?.terminate(configuration, CancellationToken.None /* we want to deliver this message */); + + // Close ports + windowPort?.close(); + workerPort?.close(); + + // Remove from processes + this.processes.delete(configurationHash); + })); + + // Acquire a worker for the configuration + worker = await this.getOrCreateWebWorker(configuration); + + if (cts.token.isCancellationRequested) { + return; + } + // Create a `MessageChannel` with 2 ports: // `windowPort`: send back to the requesting window // `workerPort`: send into a new worker to use - const { port1: windowPort, port2: workerPort } = new MessageChannel(); + const { port1, port2 } = new MessageChannel(); + windowPort = port1; + workerPort = port2; + + // Spawn in worker and pass over port + await worker.spawn(configuration, workerPort, cts.token); + + if (cts.token.isCancellationRequested) { + return; + } + + // We cannot just send the `MessagePort` through our protocol back + // because the port can only be sent via `postMessage`. So we need + // to send it through the main process to back to the window. + this.logService.trace(`SharedProcess: createWorker sending message port back to window (${workerLogId})`); + ipcRenderer.postMessage('vscode:relaySharedProcessWorkerMessageChannel', configuration, [windowPort]); + } + + private getOrCreateWebWorker(configuration: ISharedProcessWorkerConfiguration): Promise { + + // keep 1 web-worker per process module id to reduce + // the overall number of web workers while still + // keeping workers for separate processes around. + let webWorkerPromise = this.workers.get(configuration.process.moduleId); + + // create a new web worker if this is the first time + // for the given process + if (!webWorkerPromise) { + this.logService.trace(`SharedProcess: creating new web worker (${configuration.process.moduleId})`); + + const sharedProcessWorker = new SharedProcessWebWorker(configuration.process.type, this.logService); + webWorkerPromise = sharedProcessWorker.init(); + + this.workers.set(configuration.process.moduleId, webWorkerPromise); + } + + return webWorkerPromise; + } + + async disposeWorker(configuration: ISharedProcessWorkerConfiguration): Promise { + const processDisposable = this.processes.get(hash(configuration)); + if (processDisposable) { + this.logService.trace(`SharedProcess: disposeWorker (window: ${configuration.reply.windowId}, moduleId: ${configuration.process.moduleId})`); + + processDisposable.dispose(); + } + } +} + +class SharedProcessWebWorker { + + private readonly workerReady: Promise = this.doInit(); + private readonly mapMessageNonceToMessageResolve = new Map void>(); + + constructor( + private readonly type: string, + private readonly logService: ILogService + ) { + } + + async init(): Promise { + await this.workerReady; + + return this; + } + + private doInit(): Promise { + let readyResolve: (result: Worker) => void; + const readyPromise = new Promise(resolve => readyResolve = resolve); - // TODO@bpasero what is the lifecycle of workers? - // Should probably dispose on port close event? const worker = new Worker('../../../base/worker/workerMain.js', { - name: `Shared Process Worker (${workerLogId})` + name: `Shared Process Worker (${this.type})` }); worker.onerror = event => { - this.logService.error(`SharedProcess: worker error (${workerLogId})`, event.message); + this.logService.error(`SharedProcess: worker error (${this.type})`, event.message); }; worker.onmessageerror = event => { - this.logService.error(`SharedProcess: worker message error (${workerLogId})`, event); + this.logService.error(`SharedProcess: worker message error (${this.type})`, event); }; worker.onmessage = event => { - const { id, message } = event.data as IWorkerToSharedProcessMessage; + const { id, message, nonce } = event.data as IWorkerToSharedProcessMessage; switch (id) { - // Hand off configuration and port to the worker once - // we are being asked from the worker. - case SharedProcessWorkerMessages.RequestPort: - const workerMessage: ISharedProcessToWorkerMessage = { - id: SharedProcessWorkerMessages.ReceivePort, - configuration, - environment: { - bootstrapPath: FileAccess.asFileUri('bootstrap-fork', require).fsPath - } - }; - worker.postMessage(workerMessage, [workerPort]); - break; - - // Hand off the window port back when the worker is ready + // Lifecycle: Ready case SharedProcessWorkerMessages.WorkerReady: - this.logService.trace(`SharedProcess: sending message port back to window (${workerLogId})`); + readyResolve(worker); + break; - // We cannot just send the `MessagePort` through our protocol back - // because the port can only be sent via `postMessage`. So we need - // to send it through the main process to back to the window. - ipcRenderer.postMessage('vscode:relaySharedProcessWorkerMessageChannel', configuration, [windowPort]); + // Lifecycle: Ack + case SharedProcessWorkerMessages.WorkerAck: + if (nonce) { + const messageAwaiter = this.mapMessageNonceToMessageResolve.get(nonce); + if (messageAwaiter) { + this.mapMessageNonceToMessageResolve.delete(nonce); + messageAwaiter(); + } + } break; // Diagostics: trace case SharedProcessWorkerMessages.WorkerTrace: - this.logService.trace(`SharedProcess (${workerLogId}) [worker]:`, message); + this.logService.trace(`SharedProcess (${this.type}) [worker]:`, message); + break; + + // Diagostics: info + case SharedProcessWorkerMessages.WorkerInfo: + if (message) { + this.logService.info(message); // take as is + } break; // Diagostics: warn case SharedProcessWorkerMessages.WorkerWarn: - this.logService.warn(`SharedProcess (${workerLogId}) [worker]:`, message); + this.logService.warn(`SharedProcess (${this.type}) [worker]:`, message); break; // Diagnostics: error case SharedProcessWorkerMessages.WorkerError: - this.logService.error(`SharedProcess (${workerLogId}) [worker]:`, message); + this.logService.error(`SharedProcess (${this.type}) [worker]:`, message); break; + // Any other message default: - this.logService.warn(`SharedProcess: unexpected worker message (${workerLogId})`, event); + this.logService.warn(`SharedProcess: unexpected worker message (${this.type})`, event); } }; // First message triggers the load of the worker worker.postMessage('vs/platform/sharedProcess/electron-browser/sharedProcessWorkerMain'); + + return readyPromise; + } + + private async send(message: ISharedProcessToWorkerMessage, token: CancellationToken, port?: MessagePort): Promise { + const worker = await this.workerReady; + + if (token.isCancellationRequested) { + return; + } + + return new Promise(resolve => { + + // Store the awaiter for resolving when message + // is received with the given nonce + const nonce = generateUuid(); + this.mapMessageNonceToMessageResolve.set(nonce, resolve); + + // Post message into worker + const workerMessage: ISharedProcessToWorkerMessage = { ...message, nonce }; + if (port) { + worker.postMessage(workerMessage, [port]); + } else { + worker.postMessage(workerMessage); + } + }); + } + + spawn(configuration: ISharedProcessWorkerConfiguration, port: MessagePort, token: CancellationToken): Promise { + const workerMessage: ISharedProcessToWorkerMessage = { + id: SharedProcessWorkerMessages.WorkerSpawn, + configuration, + environment: { + bootstrapPath: FileAccess.asFileUri('bootstrap-fork', require).fsPath + } + }; + + return this.send(workerMessage, token, port); + } + + terminate(configuration: ISharedProcessWorkerConfiguration, token: CancellationToken): Promise { + const workerMessage: ISharedProcessToWorkerMessage = { + id: SharedProcessWorkerMessages.WorkerTerminate, + configuration + }; + + return this.send(workerMessage, token); } } diff --git a/src/vs/platform/sharedProcess/electron-main/sharedProcess.ts b/src/vs/platform/sharedProcess/electron-main/sharedProcess.ts index d70479bd19c4a..a1ca4dded760c 100644 --- a/src/vs/platform/sharedProcess/electron-main/sharedProcess.ts +++ b/src/vs/platform/sharedProcess/electron-main/sharedProcess.ts @@ -6,7 +6,7 @@ import { BrowserWindow, Event as ElectronEvent, ipcMain, IpcMainEvent, MessagePortMain } from 'electron'; import { Barrier } from 'vs/base/common/async'; import { Emitter, Event } from 'vs/base/common/event'; -import { Disposable } from 'vs/base/common/lifecycle'; +import { Disposable, DisposableStore } from 'vs/base/common/lifecycle'; import { FileAccess } from 'vs/base/common/network'; import { IProcessEnvironment } from 'vs/base/common/platform'; import { assertIsDefined } from 'vs/base/common/types'; @@ -86,17 +86,42 @@ export class SharedProcess extends Disposable implements ISharedProcess { } private onWorkerConnection(e: IpcMainEvent, configuration: ISharedProcessWorkerConfiguration): void { - this.logService.trace('SharedProcess: on vscode:relaySharedProcessWorkerMessageChannel', configuration); + this.logService.trace('SharedProcess: onWorkerConnection', configuration); - const receiver = BrowserWindow.fromId(configuration.reply.windowId)?.webContents; - if (!receiver || receiver.isDestroyed()) { - return; // ensure the sender is a valid target to send to + const disposables = new DisposableStore(); + + const disposeWorker = (reason: string) => { + this.logService.trace(`SharedProcess: disposing worker (reason: '${reason}')`, configuration); + + // Only once! + disposables.dispose(); + + // Send this into the shared process who owns workers + this.send('vscode:electron-main->shared-process=disposeWorker', configuration); + }; + + // ensure the sender is a valid target to send to + const receiverWindow = BrowserWindow.fromId(configuration.reply.windowId); + if (!receiverWindow || receiverWindow.isDestroyed() || receiverWindow.webContents.isDestroyed()) { + disposeWorker('unavailable'); + + return; + } + + // attach to lifecycle of receiver to manage worker lifecycle + disposables.add(Event.fromNodeEventEmitter(receiverWindow, 'closed')(() => disposeWorker('closed'))); + for (const webContentsEvent of [ + 'destroyed', // receiver window closed + 'render-process-gone', // receiver window crashed + 'did-start-loading' // receiver window loaded: since workers are bound to window contents, treat as disposal + ]) { + disposables.add(Event.fromNodeEventEmitter(receiverWindow.webContents, webContentsEvent)(() => disposeWorker(webContentsEvent))); } // The shared process window asks us to relay a `MessagePort` // from a shared process worker to the target window. It needs // to be send via `postMessage` to transfer the port. - receiver.postMessage(configuration.reply.channel, configuration.reply.nonce, e.ports); + receiverWindow.webContents.postMessage(configuration.reply.channel, configuration.reply.nonce, e.ports); } private onWillShutdown(): void { @@ -106,9 +131,7 @@ export class SharedProcess extends Disposable implements ISharedProcess { } // Signal exit to shared process when shutting down - if (!window.isDestroyed() && !window.webContents.isDestroyed()) { - window.webContents.send('vscode:electron-main->shared-process=exit'); - } + this.send('vscode:electron-main->shared-process=exit'); // Shut the shared process down when we are quitting // @@ -133,6 +156,16 @@ export class SharedProcess extends Disposable implements ISharedProcess { }, 0); } + private send(channel: string, ...args: any[]): void { + const window = this.window; + if (!window || window.isDestroyed() || window.webContents.isDestroyed()) { + this.logService.warn(`Sending IPC message to channel '${channel}' for shared process window that is destroyed`); + return; + } + + window.webContents.send(channel, ...args); + } + private _whenReady: Promise | undefined = undefined; whenReady(): Promise { if (!this._whenReady) { diff --git a/src/vs/platform/windows/electron-main/window.ts b/src/vs/platform/windows/electron-main/window.ts index 8f11d2e72b85e..99ae6e0ab8cbc 100644 --- a/src/vs/platform/windows/electron-main/window.ts +++ b/src/vs/platform/windows/electron-main/window.ts @@ -1291,7 +1291,7 @@ export class CodeWindow extends Disposable implements ICodeWindow { send(channel: string, ...args: any[]): void { if (this._win) { if (this._win.isDestroyed() || this._win.webContents.isDestroyed()) { - this.logService.warn(`Sending IPC message to channel ${channel} for window that is destroyed`); + this.logService.warn(`Sending IPC message to channel '${channel}' for window that is destroyed`); return; } diff --git a/src/vs/workbench/services/ipc/electron-sandbox/sharedProcessWorkerWorkbenchService.ts b/src/vs/workbench/services/ipc/electron-sandbox/sharedProcessWorkerWorkbenchService.ts index b8d74b45f2417..3784a00b593b5 100644 --- a/src/vs/workbench/services/ipc/electron-sandbox/sharedProcessWorkerWorkbenchService.ts +++ b/src/vs/workbench/services/ipc/electron-sandbox/sharedProcessWorkerWorkbenchService.ts @@ -26,6 +26,9 @@ export interface ISharedProcessWorkerWorkbenchService { * Requires the forked process to be AMD module that uses our IPC channel framework * to respond to the provided `channelName` as a server. * + * The process will be automatically terminated when the workbench window closes, + * crashes or loads/reloads. + * * @param process information around the process to fork * @param channelName the name of the channel the process will respond to */