Skip to content

Commit

Permalink
chore(websocket): extract common socket part (#2506)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavelfeldman authored Jun 10, 2020
1 parent 1bb3365 commit 903de25
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 326 deletions.
39 changes: 5 additions & 34 deletions src/server/browserServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,13 @@

import { ChildProcess } from 'child_process';
import { EventEmitter } from 'events';

export class WebSocketWrapper {
readonly wsEndpoint: string;
private _bindings: (Map<any, any> | Set<any>)[];

constructor(wsEndpoint: string, bindings: (Map<any, any>|Set<any>)[]) {
this.wsEndpoint = wsEndpoint;
this._bindings = bindings;
}

async checkLeaks() {
let counter = 0;
return new Promise((fulfill, reject) => {
const check = () => {
const filtered = this._bindings.filter(entry => entry.size);
if (!filtered.length) {
fulfill();
return;
}

if (++counter >= 50) {
reject(new Error('Web socket leak ' + filtered.map(entry => [...entry.keys()].join(':')).join('|')));
return;
}
setTimeout(check, 100);
};
check();
});
}
}
import { WebSocketServer } from './webSocketServer';

export class BrowserServer extends EventEmitter {
private _process: ChildProcess;
private _gracefullyClose: () => Promise<void>;
private _kill: () => Promise<void>;
_webSocketWrapper: WebSocketWrapper | null = null;
_webSocketServer: WebSocketServer | null = null;

constructor(process: ChildProcess, gracefullyClose: () => Promise<void>, kill: () => Promise<void>) {
super();
Expand All @@ -65,7 +36,7 @@ export class BrowserServer extends EventEmitter {
}

wsEndpoint(): string {
return this._webSocketWrapper ? this._webSocketWrapper.wsEndpoint : '';
return this._webSocketServer ? this._webSocketServer.wsEndpoint : '';
}

async kill(): Promise<void> {
Expand All @@ -77,8 +48,8 @@ export class BrowserServer extends EventEmitter {
}

async _checkLeaks(): Promise<void> {
if (this._webSocketWrapper)
await this._webSocketWrapper.checkLeaks();
if (this._webSocketServer)
await this._webSocketServer.checkLeaks();
}

async _closeOrKill(timeout: number): Promise<void> {
Expand Down
7 changes: 4 additions & 3 deletions src/server/browserType.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import * as os from 'os';
import * as path from 'path';
import * as util from 'util';
import { BrowserContext, PersistentContextOptions, verifyProxySettings, validateBrowserContextOptions } from '../browserContext';
import { BrowserServer, WebSocketWrapper } from './browserServer';
import { BrowserServer } from './browserServer';
import * as browserPaths from '../install/browserPaths';
import { Logger, InnerLogger } from '../logger';
import { ConnectionTransport, WebSocketTransport } from '../transport';
Expand All @@ -31,6 +31,7 @@ import { PipeTransport } from './pipeTransport';
import { Progress, runAbortableTask } from '../progress';
import { ProxySettings } from '../types';
import { TimeoutSettings } from '../timeoutSettings';
import { WebSocketServer } from './webSocketServer';

export type FirefoxUserPrefsOptions = {
firefoxUserPrefs?: { [key: string]: string | number | boolean },
Expand Down Expand Up @@ -150,7 +151,7 @@ export abstract class BrowserTypeBase implements BrowserType {
const logger = new InnerLogger(options.logger);
return runAbortableTask(async progress => {
const { browserServer, transport } = await this._launchServer(progress, options, false, logger);
browserServer._webSocketWrapper = this._wrapTransportWithWebSocket(transport, logger, port);
browserServer._webSocketServer = this._startWebSocketServer(transport, logger, port);
return browserServer;
}, logger, TimeoutSettings.timeout(options));
}
Expand Down Expand Up @@ -248,7 +249,7 @@ export abstract class BrowserTypeBase implements BrowserType {

abstract _defaultArgs(options: LaunchOptionsBase, isPersistent: boolean, userDataDir: string): string[];
abstract _connectToTransport(transport: ConnectionTransport, options: BrowserOptions): Promise<BrowserBase>;
abstract _wrapTransportWithWebSocket(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketWrapper;
abstract _startWebSocketServer(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketServer;
abstract _amendEnvironment(env: Env, userDataDir: string, executable: string, browserArguments: string[]): Env;
abstract _attemptToGracefullyCloseBrowser(transport: ConnectionTransport): void;
}
Expand Down
166 changes: 76 additions & 90 deletions src/server/chromium.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@
*/

import * as path from 'path';
import { helper, assert, getFromENV, logPolitely } from '../helper';
import { assert, getFromENV, logPolitely } from '../helper';
import { CRBrowser } from '../chromium/crBrowser';
import * as ws from 'ws';
import { Env } from './processLauncher';
import { kBrowserCloseMessageId } from '../chromium/crConnection';
import { LaunchOptionsBase, BrowserTypeBase, processBrowserArgOptions } from './browserType';
import { WebSocketWrapper } from './browserServer';
import { ConnectionTransport, ProtocolRequest } from '../transport';
import { InnerLogger, logError } from '../logger';
import { ConnectionTransport, ProtocolRequest, ProtocolResponse } from '../transport';
import { InnerLogger } from '../logger';
import { BrowserDescriptor } from '../install/browserPaths';
import { CRDevTools } from '../debug/crDevTools';
import * as debugSupport from '../debug/debugSupport';
import { BrowserOptions } from '../browser';
import { WebSocketServer } from './webSocketServer';

export class Chromium extends BrowserTypeBase {
private _devtools: CRDevTools | undefined;
Expand Down Expand Up @@ -73,8 +73,8 @@ export class Chromium extends BrowserTypeBase {
transport.send(message);
}

_wrapTransportWithWebSocket(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketWrapper {
return wrapTransportWithWebSocket(transport, logger, port);
_startWebSocketServer(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketServer {
return startWebSocketServer(transport, logger, port);
}

_defaultArgs(options: LaunchOptionsBase, isPersistent: boolean, userDataDir: string): string[] {
Expand Down Expand Up @@ -132,21 +132,17 @@ type SessionData = {
parent?: string,
};

function wrapTransportWithWebSocket(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketWrapper {
const server = new ws.Server({ port });
const guid = helper.guid();

function startWebSocketServer(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketServer {
const awaitingBrowserTarget = new Map<number, ws>();
const sessionToData = new Map<string, SessionData>();
const socketToBrowserSession = new Map<ws, { sessionId?: string, queue?: ProtocolRequest[] }>();
let lastSequenceNumber = 1;

function addSession(sessionId: string, socket: ws, parentSessionId?: string) {
sessionToData.set(sessionId, {
socket,
children: new Set(),
isBrowserSession: !parentSessionId,
parent: parentSessionId
parent: parentSessionId,
});
if (parentSessionId)
sessionToData.get(parentSessionId)!.children.add(sessionId);
Expand All @@ -161,111 +157,101 @@ function wrapTransportWithWebSocket(transport: ConnectionTransport, logger: Inne
sessionToData.delete(sessionId);
}

transport.onmessage = message => {
if (typeof message.id === 'number' && awaitingBrowserTarget.has(message.id)) {
const freshSocket = awaitingBrowserTarget.get(message.id)!;
awaitingBrowserTarget.delete(message.id);
const server = new WebSocketServer(transport, logger, port, {
onBrowserResponse(seqNum: number, source: ws, message: ProtocolResponse) {
if (awaitingBrowserTarget.has(seqNum)) {
const freshSocket = awaitingBrowserTarget.get(seqNum)!;
awaitingBrowserTarget.delete(seqNum);

const sessionId = message.result.sessionId;
if (freshSocket.readyState !== ws.CLOSED && freshSocket.readyState !== ws.CLOSING) {
const { queue } = socketToBrowserSession.get(freshSocket)!;
for (const item of queue!) {
item.sessionId = sessionId;
transport.send(item);
const sessionId = message.result.sessionId;
if (freshSocket.readyState !== ws.CLOSED && freshSocket.readyState !== ws.CLOSING) {
const { queue } = socketToBrowserSession.get(freshSocket)!;
for (const item of queue!) {
item.sessionId = sessionId;
server.sendMessageToBrowser(item, source);
}
socketToBrowserSession.set(freshSocket, { sessionId });
addSession(sessionId, freshSocket);
} else {
server.sendMessageToBrowserOneWay('Target.detachFromTarget', { sessionId });
socketToBrowserSession.delete(freshSocket);
}
socketToBrowserSession.set(freshSocket, { sessionId });
addSession(sessionId, freshSocket);
} else {
transport.send({
id: ++lastSequenceNumber,
method: 'Target.detachFromTarget',
params: { sessionId }
});
socketToBrowserSession.delete(freshSocket);
return;
}
return;
}

// At this point everything we care about has sessionId.
if (!message.sessionId)
return;
if (message.id === -1)
return;

const data = sessionToData.get(message.sessionId);
if (data && data.socket.readyState !== ws.CLOSING) {
if (message.method === 'Target.attachedToTarget')
addSession(message.params.sessionId, data.socket, message.sessionId);
if (message.method === 'Target.detachedFromTarget')
removeSession(message.params.sessionId);
// Strip session ids from the browser sessions.
if (data.isBrowserSession)
delete message.sessionId;
data.socket.send(JSON.stringify(message));
}
};
// At this point everything we care about has sessionId.
if (!message.sessionId)
return;

transport.onclose = () => {
for (const socket of socketToBrowserSession.keys()) {
socket.removeListener('close', (socket as any).__closeListener);
socket.close(undefined, 'Browser disconnected');
}
server.close();
transport.onmessage = undefined;
transport.onclose = undefined;
};
const data = sessionToData.get(message.sessionId);
if (data && data.socket.readyState !== ws.CLOSING) {
if (data.isBrowserSession)
delete message.sessionId;
data.socket.send(JSON.stringify(message));
}
},

server.on('connection', (socket: ws, req) => {
if (req.url !== '/' + guid) {
socket.close();
return;
}
socketToBrowserSession.set(socket, { queue: [] });
onBrowserNotification(message: ProtocolResponse) {
// At this point everything we care about has sessionId.
if (!message.sessionId)
return;

transport.send({
id: ++lastSequenceNumber,
method: 'Target.attachToBrowserTarget',
params: {}
});
awaitingBrowserTarget.set(lastSequenceNumber, socket);
const data = sessionToData.get(message.sessionId);
if (data && data.socket.readyState !== ws.CLOSING) {
if (message.method === 'Target.attachedToTarget')
addSession(message.params.sessionId, data.socket, message.sessionId);
if (message.method === 'Target.detachedFromTarget')
removeSession(message.params.sessionId);
// Strip session ids from the browser sessions.
if (data.isBrowserSession)
delete message.sessionId;
data.socket.send(JSON.stringify(message));
}
},

onClientAttached(socket: ws) {
socketToBrowserSession.set(socket, { queue: [] });

socket.on('message', (message: string) => {
const parsedMessage = JSON.parse(Buffer.from(message).toString()) as ProtocolRequest;
const seqNum = server.sendMessageToBrowser({
id: -1, // Proxy-initiated request.
method: 'Target.attachToBrowserTarget',
params: {}
}, socket);
awaitingBrowserTarget.set(seqNum, socket);
},

onClientRequest(socket: ws, message: ProtocolRequest) {
// If message has sessionId, pass through.
if (parsedMessage.sessionId) {
transport.send(parsedMessage);
if (message.sessionId) {
server.sendMessageToBrowser(message, socket);
return;
}

// If message has no sessionId, look it up.
const session = socketToBrowserSession.get(socket)!;
if (session.sessionId) {
// We have it, use it.
parsedMessage.sessionId = session.sessionId;
transport.send(parsedMessage);
message.sessionId = session.sessionId;
server.sendMessageToBrowser(message, socket);
return;
}
// Pending session id, queue the message.
session.queue!.push(parsedMessage);
});
session.queue!.push(message);
},

socket.on('error', logError(logger));

socket.on('close', (socket as any).__closeListener = () => {
onClientDetached(socket: ws) {
const session = socketToBrowserSession.get(socket);
if (!session || !session.sessionId)
return;
removeSession(session.sessionId);
socketToBrowserSession.delete(socket);
transport.send({
id: ++lastSequenceNumber,
method: 'Target.detachFromTarget',
params: { sessionId: session.sessionId }
});
});
server.sendMessageToBrowserOneWay('Target.detachFromTarget', { sessionId: session.sessionId });
}
});

const address = server.address();
const wsEndpoint = typeof address === 'string' ? `${address}/${guid}` : `ws://127.0.0.1:${address.port}/${guid}`;
return new WebSocketWrapper(wsEndpoint, [awaitingBrowserTarget, sessionToData, socketToBrowserSession]);
return server;
}


Expand Down
Loading

0 comments on commit 903de25

Please sign in to comment.