From 903de2582a944aae1f48c7b58d183f7b8df6c065 Mon Sep 17 00:00:00 2001 From: Pavel Feldman Date: Wed, 10 Jun 2020 16:33:27 -0700 Subject: [PATCH] chore(websocket): extract common socket part (#2506) --- src/server/browserServer.ts | 39 +------- src/server/browserType.ts | 7 +- src/server/chromium.ts | 166 ++++++++++++++++------------------ src/server/firefox.ts | 157 +++++++++++++------------------- src/server/webSocketServer.ts | 139 ++++++++++++++++++++++++++++ src/server/webkit.ts | 129 +++++++++----------------- src/transport.ts | 17 ---- 7 files changed, 328 insertions(+), 326 deletions(-) create mode 100644 src/server/webSocketServer.ts diff --git a/src/server/browserServer.ts b/src/server/browserServer.ts index 7be8779c076c0..1d348efdc20ad 100644 --- a/src/server/browserServer.ts +++ b/src/server/browserServer.ts @@ -16,42 +16,13 @@ import { ChildProcess } from 'child_process'; import { EventEmitter } from 'events'; - -export class WebSocketWrapper { - readonly wsEndpoint: string; - private _bindings: (Map | Set)[]; - - constructor(wsEndpoint: string, bindings: (Map|Set)[]) { - this.wsEndpoint = wsEndpoint; - this._bindings = bindings; - } - - async checkLeaks() { - let counter = 0; - return new Promise((fulfill, reject) => { - const check = () => { - const filtered = this._bindings.filter(entry => entry.size); - if (!filtered.length) { - fulfill(); - return; - } - - if (++counter >= 50) { - reject(new Error('Web socket leak ' + filtered.map(entry => [...entry.keys()].join(':')).join('|'))); - return; - } - setTimeout(check, 100); - }; - check(); - }); - } -} +import { WebSocketServer } from './webSocketServer'; export class BrowserServer extends EventEmitter { private _process: ChildProcess; private _gracefullyClose: () => Promise; private _kill: () => Promise; - _webSocketWrapper: WebSocketWrapper | null = null; + _webSocketServer: WebSocketServer | null = null; constructor(process: ChildProcess, gracefullyClose: () => Promise, kill: () => Promise) { super(); @@ -65,7 +36,7 @@ export class BrowserServer extends EventEmitter { } wsEndpoint(): string { - return this._webSocketWrapper ? this._webSocketWrapper.wsEndpoint : ''; + return this._webSocketServer ? this._webSocketServer.wsEndpoint : ''; } async kill(): Promise { @@ -77,8 +48,8 @@ export class BrowserServer extends EventEmitter { } async _checkLeaks(): Promise { - if (this._webSocketWrapper) - await this._webSocketWrapper.checkLeaks(); + if (this._webSocketServer) + await this._webSocketServer.checkLeaks(); } async _closeOrKill(timeout: number): Promise { diff --git a/src/server/browserType.ts b/src/server/browserType.ts index 737a101a085d1..4849f32e01457 100644 --- a/src/server/browserType.ts +++ b/src/server/browserType.ts @@ -19,7 +19,7 @@ import * as os from 'os'; import * as path from 'path'; import * as util from 'util'; import { BrowserContext, PersistentContextOptions, verifyProxySettings, validateBrowserContextOptions } from '../browserContext'; -import { BrowserServer, WebSocketWrapper } from './browserServer'; +import { BrowserServer } from './browserServer'; import * as browserPaths from '../install/browserPaths'; import { Logger, InnerLogger } from '../logger'; import { ConnectionTransport, WebSocketTransport } from '../transport'; @@ -31,6 +31,7 @@ import { PipeTransport } from './pipeTransport'; import { Progress, runAbortableTask } from '../progress'; import { ProxySettings } from '../types'; import { TimeoutSettings } from '../timeoutSettings'; +import { WebSocketServer } from './webSocketServer'; export type FirefoxUserPrefsOptions = { firefoxUserPrefs?: { [key: string]: string | number | boolean }, @@ -150,7 +151,7 @@ export abstract class BrowserTypeBase implements BrowserType { const logger = new InnerLogger(options.logger); return runAbortableTask(async progress => { const { browserServer, transport } = await this._launchServer(progress, options, false, logger); - browserServer._webSocketWrapper = this._wrapTransportWithWebSocket(transport, logger, port); + browserServer._webSocketServer = this._startWebSocketServer(transport, logger, port); return browserServer; }, logger, TimeoutSettings.timeout(options)); } @@ -248,7 +249,7 @@ export abstract class BrowserTypeBase implements BrowserType { abstract _defaultArgs(options: LaunchOptionsBase, isPersistent: boolean, userDataDir: string): string[]; abstract _connectToTransport(transport: ConnectionTransport, options: BrowserOptions): Promise; - abstract _wrapTransportWithWebSocket(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketWrapper; + abstract _startWebSocketServer(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketServer; abstract _amendEnvironment(env: Env, userDataDir: string, executable: string, browserArguments: string[]): Env; abstract _attemptToGracefullyCloseBrowser(transport: ConnectionTransport): void; } diff --git a/src/server/chromium.ts b/src/server/chromium.ts index 4b4578cd292c0..29221dba21633 100644 --- a/src/server/chromium.ts +++ b/src/server/chromium.ts @@ -16,19 +16,19 @@ */ import * as path from 'path'; -import { helper, assert, getFromENV, logPolitely } from '../helper'; +import { assert, getFromENV, logPolitely } from '../helper'; import { CRBrowser } from '../chromium/crBrowser'; import * as ws from 'ws'; import { Env } from './processLauncher'; import { kBrowserCloseMessageId } from '../chromium/crConnection'; import { LaunchOptionsBase, BrowserTypeBase, processBrowserArgOptions } from './browserType'; -import { WebSocketWrapper } from './browserServer'; -import { ConnectionTransport, ProtocolRequest } from '../transport'; -import { InnerLogger, logError } from '../logger'; +import { ConnectionTransport, ProtocolRequest, ProtocolResponse } from '../transport'; +import { InnerLogger } from '../logger'; import { BrowserDescriptor } from '../install/browserPaths'; import { CRDevTools } from '../debug/crDevTools'; import * as debugSupport from '../debug/debugSupport'; import { BrowserOptions } from '../browser'; +import { WebSocketServer } from './webSocketServer'; export class Chromium extends BrowserTypeBase { private _devtools: CRDevTools | undefined; @@ -73,8 +73,8 @@ export class Chromium extends BrowserTypeBase { transport.send(message); } - _wrapTransportWithWebSocket(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketWrapper { - return wrapTransportWithWebSocket(transport, logger, port); + _startWebSocketServer(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketServer { + return startWebSocketServer(transport, logger, port); } _defaultArgs(options: LaunchOptionsBase, isPersistent: boolean, userDataDir: string): string[] { @@ -132,21 +132,17 @@ type SessionData = { parent?: string, }; -function wrapTransportWithWebSocket(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketWrapper { - const server = new ws.Server({ port }); - const guid = helper.guid(); - +function startWebSocketServer(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketServer { const awaitingBrowserTarget = new Map(); const sessionToData = new Map(); const socketToBrowserSession = new Map(); - let lastSequenceNumber = 1; function addSession(sessionId: string, socket: ws, parentSessionId?: string) { sessionToData.set(sessionId, { socket, children: new Set(), isBrowserSession: !parentSessionId, - parent: parentSessionId + parent: parentSessionId, }); if (parentSessionId) sessionToData.get(parentSessionId)!.children.add(sessionId); @@ -161,77 +157,76 @@ function wrapTransportWithWebSocket(transport: ConnectionTransport, logger: Inne sessionToData.delete(sessionId); } - transport.onmessage = message => { - if (typeof message.id === 'number' && awaitingBrowserTarget.has(message.id)) { - const freshSocket = awaitingBrowserTarget.get(message.id)!; - awaitingBrowserTarget.delete(message.id); + const server = new WebSocketServer(transport, logger, port, { + onBrowserResponse(seqNum: number, source: ws, message: ProtocolResponse) { + if (awaitingBrowserTarget.has(seqNum)) { + const freshSocket = awaitingBrowserTarget.get(seqNum)!; + awaitingBrowserTarget.delete(seqNum); - const sessionId = message.result.sessionId; - if (freshSocket.readyState !== ws.CLOSED && freshSocket.readyState !== ws.CLOSING) { - const { queue } = socketToBrowserSession.get(freshSocket)!; - for (const item of queue!) { - item.sessionId = sessionId; - transport.send(item); + const sessionId = message.result.sessionId; + if (freshSocket.readyState !== ws.CLOSED && freshSocket.readyState !== ws.CLOSING) { + const { queue } = socketToBrowserSession.get(freshSocket)!; + for (const item of queue!) { + item.sessionId = sessionId; + server.sendMessageToBrowser(item, source); + } + socketToBrowserSession.set(freshSocket, { sessionId }); + addSession(sessionId, freshSocket); + } else { + server.sendMessageToBrowserOneWay('Target.detachFromTarget', { sessionId }); + socketToBrowserSession.delete(freshSocket); } - socketToBrowserSession.set(freshSocket, { sessionId }); - addSession(sessionId, freshSocket); - } else { - transport.send({ - id: ++lastSequenceNumber, - method: 'Target.detachFromTarget', - params: { sessionId } - }); - socketToBrowserSession.delete(freshSocket); + return; } - return; - } - // At this point everything we care about has sessionId. - if (!message.sessionId) - return; + if (message.id === -1) + return; - const data = sessionToData.get(message.sessionId); - if (data && data.socket.readyState !== ws.CLOSING) { - if (message.method === 'Target.attachedToTarget') - addSession(message.params.sessionId, data.socket, message.sessionId); - if (message.method === 'Target.detachedFromTarget') - removeSession(message.params.sessionId); - // Strip session ids from the browser sessions. - if (data.isBrowserSession) - delete message.sessionId; - data.socket.send(JSON.stringify(message)); - } - }; + // At this point everything we care about has sessionId. + if (!message.sessionId) + return; - transport.onclose = () => { - for (const socket of socketToBrowserSession.keys()) { - socket.removeListener('close', (socket as any).__closeListener); - socket.close(undefined, 'Browser disconnected'); - } - server.close(); - transport.onmessage = undefined; - transport.onclose = undefined; - }; + const data = sessionToData.get(message.sessionId); + if (data && data.socket.readyState !== ws.CLOSING) { + if (data.isBrowserSession) + delete message.sessionId; + data.socket.send(JSON.stringify(message)); + } + }, - server.on('connection', (socket: ws, req) => { - if (req.url !== '/' + guid) { - socket.close(); - return; - } - socketToBrowserSession.set(socket, { queue: [] }); + onBrowserNotification(message: ProtocolResponse) { + // At this point everything we care about has sessionId. + if (!message.sessionId) + return; - transport.send({ - id: ++lastSequenceNumber, - method: 'Target.attachToBrowserTarget', - params: {} - }); - awaitingBrowserTarget.set(lastSequenceNumber, socket); + const data = sessionToData.get(message.sessionId); + if (data && data.socket.readyState !== ws.CLOSING) { + if (message.method === 'Target.attachedToTarget') + addSession(message.params.sessionId, data.socket, message.sessionId); + if (message.method === 'Target.detachedFromTarget') + removeSession(message.params.sessionId); + // Strip session ids from the browser sessions. + if (data.isBrowserSession) + delete message.sessionId; + data.socket.send(JSON.stringify(message)); + } + }, + + onClientAttached(socket: ws) { + socketToBrowserSession.set(socket, { queue: [] }); - socket.on('message', (message: string) => { - const parsedMessage = JSON.parse(Buffer.from(message).toString()) as ProtocolRequest; + const seqNum = server.sendMessageToBrowser({ + id: -1, // Proxy-initiated request. + method: 'Target.attachToBrowserTarget', + params: {} + }, socket); + awaitingBrowserTarget.set(seqNum, socket); + }, + + onClientRequest(socket: ws, message: ProtocolRequest) { // If message has sessionId, pass through. - if (parsedMessage.sessionId) { - transport.send(parsedMessage); + if (message.sessionId) { + server.sendMessageToBrowser(message, socket); return; } @@ -239,33 +234,24 @@ function wrapTransportWithWebSocket(transport: ConnectionTransport, logger: Inne const session = socketToBrowserSession.get(socket)!; if (session.sessionId) { // We have it, use it. - parsedMessage.sessionId = session.sessionId; - transport.send(parsedMessage); + message.sessionId = session.sessionId; + server.sendMessageToBrowser(message, socket); return; } // Pending session id, queue the message. - session.queue!.push(parsedMessage); - }); + session.queue!.push(message); + }, - socket.on('error', logError(logger)); - - socket.on('close', (socket as any).__closeListener = () => { + onClientDetached(socket: ws) { const session = socketToBrowserSession.get(socket); if (!session || !session.sessionId) return; removeSession(session.sessionId); socketToBrowserSession.delete(socket); - transport.send({ - id: ++lastSequenceNumber, - method: 'Target.detachFromTarget', - params: { sessionId: session.sessionId } - }); - }); + server.sendMessageToBrowserOneWay('Target.detachFromTarget', { sessionId: session.sessionId }); + } }); - - const address = server.address(); - const wsEndpoint = typeof address === 'string' ? `${address}/${guid}` : `ws://127.0.0.1:${address.port}/${guid}`; - return new WebSocketWrapper(wsEndpoint, [awaitingBrowserTarget, sessionToData, socketToBrowserSession]); + return server; } diff --git a/src/server/firefox.ts b/src/server/firefox.ts index 91efcc03fc31d..3ae72138e5986 100644 --- a/src/server/firefox.ts +++ b/src/server/firefox.ts @@ -21,14 +21,13 @@ import * as path from 'path'; import * as ws from 'ws'; import { FFBrowser } from '../firefox/ffBrowser'; import { kBrowserCloseMessageId } from '../firefox/ffConnection'; -import { helper } from '../helper'; -import { WebSocketWrapper } from './browserServer'; import { LaunchOptionsBase, BrowserTypeBase, processBrowserArgOptions, FirefoxUserPrefsOptions } from './browserType'; import { Env } from './processLauncher'; -import { ConnectionTransport, SequenceNumberMixer } from '../transport'; -import { InnerLogger, logError } from '../logger'; +import { ConnectionTransport, ProtocolResponse, ProtocolRequest } from '../transport'; +import { InnerLogger } from '../logger'; import { BrowserOptions } from '../browser'; import { BrowserDescriptor } from '../install/browserPaths'; +import { WebSocketServer } from './webSocketServer'; export class Firefox extends BrowserTypeBase { constructor(packagePath: string, browser: BrowserDescriptor) { @@ -53,8 +52,8 @@ export class Firefox extends BrowserTypeBase { transport.send(message); } - _wrapTransportWithWebSocket(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketWrapper { - return wrapTransportWithWebSocket(transport, logger, port); + _startWebSocketServer(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketServer { + return startWebSocketServer(transport, logger, port); } _defaultArgs(options: LaunchOptionsBase & FirefoxUserPrefsOptions, isPersistent: boolean, userDataDir: string): string[] { @@ -108,39 +107,36 @@ export class Firefox extends BrowserTypeBase { } } -function wrapTransportWithWebSocket(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketWrapper { - const server = new ws.Server({ port }); - const guid = helper.guid(); - const idMixer = new SequenceNumberMixer<{id: number, socket: ws}>(); +type SessionData = { + socket: ws, +}; + +function startWebSocketServer(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketServer { const pendingBrowserContextCreations = new Set(); const pendingBrowserContextDeletions = new Map(); const browserContextIds = new Map(); - const sessionToSocket = new Map(); - const sockets = new Set(); + const sessionToData = new Map(); + + function removeSession(sessionId: string): SessionData | undefined { + const data = sessionToData.get(sessionId); + if (!data) + return; + sessionToData.delete(sessionId); + return data; + } - transport.onmessage = message => { - if (typeof message.id === 'number') { + const server = new WebSocketServer(transport, logger, port, { + onBrowserResponse(seqNum: number, source: ws, message: ProtocolResponse) { // Process command response. - const seqNum = message.id; - const value = idMixer.take(seqNum); - if (!value) - return; - const { id, socket } = value; - - if (socket.readyState === ws.CLOSING) { - if (pendingBrowserContextCreations.has(id)) { - transport.send({ - id: ++SequenceNumberMixer._lastSequenceNumber, - method: 'Browser.removeBrowserContext', - params: { browserContextId: message.result.browserContextId } - }); - } + if (source.readyState === ws.CLOSING || source.readyState === ws.CLOSED) { + if (pendingBrowserContextCreations.has(seqNum)) + server.sendMessageToBrowserOneWay('Browser.removeBrowserContext', { browserContextId: message.result.browserContextId }); return; } if (pendingBrowserContextCreations.has(seqNum)) { // Browser.createBrowserContext response -> establish context attribution. - browserContextIds.set(message.result.browserContextId, socket); + browserContextIds.set(message.result.browserContextId, source); pendingBrowserContextCreations.delete(seqNum); } @@ -151,88 +147,59 @@ function wrapTransportWithWebSocket(transport: ConnectionTransport, logger: Inne pendingBrowserContextDeletions.delete(seqNum); } - message.id = id; - socket.send(JSON.stringify(message)); + source.send(JSON.stringify(message)); return; - } - - // Process notification response. - const { method, params, sessionId } = message; - if (sessionId) { - const socket = sessionToSocket.get(sessionId); - if (!socket || socket.readyState === ws.CLOSING) { - // Drop unattributed messages on the floor. + }, + + onBrowserNotification(message: ProtocolResponse) { + // Process notification response. + const { method, params, sessionId } = message; + if (sessionId) { + const data = sessionToData.get(sessionId); + if (!data || data.socket.readyState === ws.CLOSING) { + // Drop unattributed messages on the floor. + return; + } + data.socket.send(JSON.stringify(message)); return; } - socket.send(JSON.stringify(message)); - return; - } - if (method === 'Browser.attachedToTarget') { - const socket = browserContextIds.get(params.targetInfo.browserContextId); - if (!socket || socket.readyState === ws.CLOSING) { - // Drop unattributed messages on the floor. + if (method === 'Browser.attachedToTarget') { + const socket = browserContextIds.get(params.targetInfo.browserContextId); + if (!socket || socket.readyState === ws.CLOSING) { + // Drop unattributed messages on the floor. + return; + } + sessionToData.set(params.sessionId, { socket }); + socket.send(JSON.stringify(message)); return; } - sessionToSocket.set(params.sessionId, socket); - socket.send(JSON.stringify(message)); - return; - } - if (method === 'Browser.detachedFromTarget') { - const socket = sessionToSocket.get(params.sessionId); - sessionToSocket.delete(params.sessionId); - if (socket && socket.readyState !== ws.CLOSING) - socket.send(JSON.stringify(message)); - return; - } - }; + if (method === 'Browser.detachedFromTarget') { + const data = removeSession(params.sessionId); + if (data && data.socket.readyState !== ws.CLOSING) + data.socket.send(JSON.stringify(message)); + return; + } + }, - transport.onclose = () => { - for (const socket of sockets) { - socket.removeListener('close', (socket as any).__closeListener); - socket.close(undefined, 'Browser disconnected'); - } - server.close(); - transport.onmessage = undefined; - transport.onclose = undefined; - }; - - server.on('connection', (socket: ws, req) => { - if (req.url !== '/' + guid) { - socket.close(); - return; - } - sockets.add(socket); + onClientAttached() {}, - socket.on('message', (message: string) => { - const parsedMessage = JSON.parse(Buffer.from(message).toString()); - const { id, method, params } = parsedMessage; - const seqNum = idMixer.generate({ id, socket }); - transport.send({ ...parsedMessage, id: seqNum }); + onClientRequest(socket: ws, message: ProtocolRequest) { + const { method, params } = message; + const seqNum = server.sendMessageToBrowser(message, socket); if (method === 'Browser.createBrowserContext') pendingBrowserContextCreations.add(seqNum); if (method === 'Browser.removeBrowserContext') pendingBrowserContextDeletions.set(seqNum, params.browserContextId); - }); + }, - socket.on('error', logError(logger)); - - socket.on('close', (socket as any).__closeListener = () => { + onClientDetached(socket: ws) { for (const [browserContextId, s] of browserContextIds) { if (s === socket) { - transport.send({ - id: ++SequenceNumberMixer._lastSequenceNumber, - method: 'Browser.removeBrowserContext', - params: { browserContextId } - }); + server.sendMessageToBrowserOneWay('Browser.removeBrowserContext', { browserContextId }); browserContextIds.delete(browserContextId); } } - sockets.delete(socket); - }); + } }); - - const address = server.address(); - const wsEndpoint = typeof address === 'string' ? `${address}/${guid}` : `ws://127.0.0.1:${address.port}/${guid}`; - return new WebSocketWrapper(wsEndpoint, - [pendingBrowserContextCreations, pendingBrowserContextDeletions, browserContextIds, sessionToSocket, sockets]); + return server; } diff --git a/src/server/webSocketServer.ts b/src/server/webSocketServer.ts new file mode 100644 index 0000000000000..3b20fb90eb539 --- /dev/null +++ b/src/server/webSocketServer.ts @@ -0,0 +1,139 @@ +/** + * Copyright (c) Microsoft Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { IncomingMessage } from 'http'; +import * as ws from 'ws'; +import { helper } from '../helper'; +import { InnerLogger, logError } from '../logger'; +import { ConnectionTransport, ProtocolRequest, ProtocolResponse } from '../transport'; + +export interface WebSocketServerDelegate { + onClientAttached(socket: ws): void; + onClientRequest(socket: ws, message: ProtocolRequest): void; + onClientDetached(socket: ws): void; + onBrowserNotification(message: ProtocolResponse): void; + onBrowserResponse(seqNum: number, source: ws, message: ProtocolResponse): void; +} + +export class WebSocketServer { + private _transport: ConnectionTransport; + private _logger: InnerLogger; + private _server: ws.Server; + private _guid: string; + readonly wsEndpoint: string; + private _bindings: (Map | Set)[] = []; + private _lastSeqNum = 0; + private _delegate: WebSocketServerDelegate; + private _sockets = new Set(); + private _pendingRequests = new Map(); + + constructor(transport: ConnectionTransport, logger: InnerLogger, port: number, delegate: WebSocketServerDelegate) { + this._guid = helper.guid(); + this._transport = transport; + this._logger = logger; + this._server = new ws.Server({ port }); + this._delegate = delegate; + transport.onmessage = message => this._browserMessage(message); + transport.onclose = () => this._browserClosed(); + this._server.on('connection', (socket: ws, req) => this._clientAttached(socket, req)); + const address = this._server.address(); + this.wsEndpoint = typeof address === 'string' ? `${address}/${this._guid}` : `ws://127.0.0.1:${address.port}/${this._guid}`; + } + + addBindings(bindings: (Map|Set)[]) { + this._bindings.push(...bindings); + } + + sendMessageToBrowser(message: ProtocolRequest, source: ws): number { + const seqNum = ++this._lastSeqNum; + this._pendingRequests.set(seqNum, { message, source }); + this._transport.send({ ...message, id: seqNum }); + return seqNum; + } + + sendMessageToBrowserOneWay(method: string, params: any) { + this._transport.send({ id: ++this._lastSeqNum, method, params }); + } + + close() { + this._server.close(); + } + + private _browserMessage(message: ProtocolResponse) { + const seqNum = message.id; + if (typeof seqNum === 'number') { + const request = this._pendingRequests.get(seqNum); + if (!request) + return; + this._pendingRequests.delete(seqNum); + message.id = request.message.id; + if (request.source) + this._delegate.onBrowserResponse(seqNum, request.source, message); + } else { + this._delegate.onBrowserNotification(message); + } + } + + private _browserClosed() { + this._transport.onmessage = undefined; + this._transport.onclose = undefined; + for (const socket of this._sockets) { + socket.removeAllListeners('close'); + socket.close(undefined, 'Browser disconnected'); + } + this._server.close(); + } + + private _clientAttached(socket: ws, req: IncomingMessage) { + if (req.url !== '/' + this._guid) { + socket.close(); + return; + } + + this._sockets.add(socket); + this._delegate.onClientAttached(socket); + socket.on('message', (message: string) => { + const parsedMessage = JSON.parse(Buffer.from(message).toString()) as ProtocolRequest; + this._delegate.onClientRequest(socket, parsedMessage); + }); + socket.on('error', logError(this._logger)); + socket.on('close', () => { + this._delegate.onClientDetached(socket); + this._sockets.delete(socket); + }); + } + + + async checkLeaks() { + let counter = 0; + return new Promise((fulfill, reject) => { + const check = () => { + const filtered = this._bindings.filter(entry => entry.size); + if (!filtered.length) { + fulfill(); + return; + } + + if (++counter >= 50) { + reject(new Error('Web socket leak ' + filtered.map(entry => [...entry.keys()].join(':')).join('|'))); + return; + } + setTimeout(check, 100); + }; + check(); + }); + } +} diff --git a/src/server/webkit.ts b/src/server/webkit.ts index 75db50400de1b..dd12561a8ad06 100644 --- a/src/server/webkit.ts +++ b/src/server/webkit.ts @@ -18,15 +18,15 @@ import { WKBrowser } from '../webkit/wkBrowser'; import { Env } from './processLauncher'; import * as path from 'path'; -import { helper } from '../helper'; import { kBrowserCloseMessageId } from '../webkit/wkConnection'; import { LaunchOptionsBase, BrowserTypeBase, processBrowserArgOptions } from './browserType'; -import { ConnectionTransport, SequenceNumberMixer } from '../transport'; +import { ConnectionTransport, ProtocolResponse, ProtocolRequest } from '../transport'; import * as ws from 'ws'; -import { WebSocketWrapper } from './browserServer'; -import { InnerLogger, logError } from '../logger'; +import { InnerLogger } from '../logger'; import { BrowserOptions } from '../browser'; import { BrowserDescriptor } from '../install/browserPaths'; +import { WebSocketServer } from './webSocketServer'; +import { assert } from '../helper'; export class WebKit extends BrowserTypeBase { constructor(packagePath: string, browser: BrowserDescriptor) { @@ -45,8 +45,8 @@ export class WebKit extends BrowserTypeBase { transport.send({method: 'Playwright.close', params: {}, id: kBrowserCloseMessageId}); } - _wrapTransportWithWebSocket(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketWrapper { - return wrapTransportWithWebSocket(transport, logger, port); + _startWebSocketServer(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketServer { + return startWebSocketServer(transport, logger, port); } _defaultArgs(options: LaunchOptionsBase, isPersistent: boolean, userDataDir: string): string[] { @@ -88,114 +88,69 @@ export class WebKit extends BrowserTypeBase { } } -function wrapTransportWithWebSocket(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketWrapper { - const server = new ws.Server({ port }); - const guid = helper.guid(); - const idMixer = new SequenceNumberMixer<{id: number, socket: ws}>(); +function startWebSocketServer(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketServer { const pendingBrowserContextCreations = new Set(); const pendingBrowserContextDeletions = new Map(); const browserContextIds = new Map(); - const sockets = new Set(); - transport.onmessage = message => { - if (typeof message.id === 'number') { - if (message.id === -9999) - return; - // Process command response. - const value = idMixer.take(message.id); - if (!value) - return; - const { id, socket } = value; - - if (socket.readyState === ws.CLOSED || socket.readyState === ws.CLOSING) { - if (pendingBrowserContextCreations.has(id)) { - transport.send({ - id: ++SequenceNumberMixer._lastSequenceNumber, - method: 'Playwright.deleteContext', - params: { browserContextId: message.result.browserContextId } - }); - } + const server = new WebSocketServer(transport, logger, port, { + onBrowserResponse(seqNum: number, source: ws, message: ProtocolResponse) { + if (source.readyState === ws.CLOSED || source.readyState === ws.CLOSING) { + if (pendingBrowserContextCreations.has(seqNum)) + server.sendMessageToBrowserOneWay('Playwright.deleteContext', { browserContextId: message.result.browserContextId }); return; } - if (pendingBrowserContextCreations.has(message.id)) { + if (pendingBrowserContextCreations.has(seqNum)) { // Browser.createContext response -> establish context attribution. - browserContextIds.set(message.result.browserContextId, socket); - pendingBrowserContextCreations.delete(message.id); + browserContextIds.set(message.result.browserContextId, source); + pendingBrowserContextCreations.delete(seqNum); } - const deletedContextId = pendingBrowserContextDeletions.get(message.id); + const deletedContextId = pendingBrowserContextDeletions.get(seqNum); if (deletedContextId) { // Browser.deleteContext response -> remove context attribution. browserContextIds.delete(deletedContextId); - pendingBrowserContextDeletions.delete(message.id); + pendingBrowserContextDeletions.delete(seqNum); } - message.id = id; - socket.send(JSON.stringify(message)); + source.send(JSON.stringify(message)); return; - } - - // Every notification either has a browserContextId top-level field or - // has a browserContextId parameter. - const { params, browserContextId } = message; - const contextId = browserContextId || params.browserContextId; - const socket = browserContextIds.get(contextId); - if (!socket || socket.readyState === ws.CLOSING) { - // Drop unattributed messages on the floor. - return; - } - socket.send(JSON.stringify(message)); - }; + }, + + onBrowserNotification(message: ProtocolResponse) { + // Process notification response. + const { params, browserContextId } = message; + const contextId = browserContextId || params.browserContextId; + assert(contextId); + const socket = browserContextIds.get(contextId); + if (!socket || socket.readyState === ws.CLOSING) { + // Drop unattributed messages on the floor. + return; + } + socket.send(JSON.stringify(message)); + }, - transport.onclose = () => { - for (const socket of sockets) { - socket.removeListener('close', (socket as any).__closeListener); - socket.close(undefined, 'Browser disconnected'); - } - server.close(); - transport.onmessage = undefined; - transport.onclose = undefined; - }; - - server.on('connection', (socket: ws, req) => { - if (req.url !== '/' + guid) { - socket.close(); - return; - } - sockets.add(socket); + onClientAttached(socket: ws) { + }, - socket.on('message', (message: string) => { - const parsedMessage = JSON.parse(Buffer.from(message).toString()); - const { id, method, params } = parsedMessage; - const seqNum = idMixer.generate({ id, socket }); - transport.send({ ...parsedMessage, id: seqNum }); + onClientRequest(socket: ws, message: ProtocolRequest) { + const { method, params } = message; + const seqNum = server.sendMessageToBrowser(message, socket); if (method === 'Playwright.createContext') pendingBrowserContextCreations.add(seqNum); if (method === 'Playwright.deleteContext') pendingBrowserContextDeletions.set(seqNum, params.browserContextId); - }); + }, - socket.on('error', logError(logger)); - - socket.on('close', (socket as any).__closeListener = () => { + onClientDetached(socket: ws) { for (const [browserContextId, s] of browserContextIds) { if (s === socket) { - transport.send({ - id: ++SequenceNumberMixer._lastSequenceNumber, - method: 'Playwright.deleteContext', - params: { browserContextId } - }); + server.sendMessageToBrowserOneWay('Playwright.deleteContext', { browserContextId }); browserContextIds.delete(browserContextId); } } - sockets.delete(socket); - }); + } }); - - const address = server.address(); - const wsEndpoint = typeof address === 'string' ? `${address}/${guid}` : `ws://127.0.0.1:${address.port}/${guid}`; - - return new WebSocketWrapper(wsEndpoint, - [pendingBrowserContextCreations, pendingBrowserContextDeletions, browserContextIds, sockets]); + return server; } diff --git a/src/transport.ts b/src/transport.ts index fb225146f92a5..af7e6e34ccbf2 100644 --- a/src/transport.ts +++ b/src/transport.ts @@ -195,23 +195,6 @@ export class WebSocketTransport implements ConnectionTransport { } } -export class SequenceNumberMixer { - static _lastSequenceNumber = 1; - private _values = new Map(); - - generate(value: V): number { - const sequenceNumber = ++SequenceNumberMixer._lastSequenceNumber; - this._values.set(sequenceNumber, value); - return sequenceNumber; - } - - take(sequenceNumber: number): V | undefined { - const value = this._values.get(sequenceNumber); - this._values.delete(sequenceNumber); - return value; - } -} - export class InterceptingTransport implements ConnectionTransport { private readonly _delegate: ConnectionTransport; private _interceptor: (message: ProtocolRequest) => ProtocolRequest;