Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make Transport.send() synchronous #1177

Merged
merged 6 commits into from
Mar 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/chromium/crConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ export class CRSession extends platform.EventEmitter {
this.once = super.once;
}

send<T extends keyof Protocol.CommandParameters>(
async send<T extends keyof Protocol.CommandParameters>(
method: T,
params?: Protocol.CommandParameters[T]
): Promise<Protocol.CommandReturnValues[T]> {
if (!this._connection)
return Promise.reject(new Error(`Protocol error (${method}): Session closed. Most likely the ${this._targetType} has been closed.`));
throw new Error(`Protocol error (${method}): Session closed. Most likely the ${this._targetType} has been closed.`);
const id = this._connection._rawSend(this._sessionId, { method, params });
return new Promise((resolve, reject) => {
this._callbacks.set(id, {resolve, reject, error: new Error(), method});
Expand Down
27 changes: 11 additions & 16 deletions src/chromium/crExecutionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,15 @@ export class CRExecutionContext implements js.ExecutionContextDelegate {
throw new Error('Passed function is not well-serializable!');
}
}
let callFunctionOnPromise;
try {
callFunctionOnPromise = this._client.send('Runtime.callFunctionOn', {
functionDeclaration: functionText + '\n' + suffix + '\n',
executionContextId: this._contextId,
arguments: args.map(convertArgument.bind(this)),
returnByValue,
awaitPromise: true,
userGesture: true
});
} catch (err) {
if (err instanceof TypeError && err.message.startsWith('Converting circular structure to JSON'))
err.message += ' Are you passing a nested JSHandle?';
throw err;
}
const { exceptionDetails, result: remoteObject } = await callFunctionOnPromise.catch(rewriteError);

const { exceptionDetails, result: remoteObject } = await this._client.send('Runtime.callFunctionOn', {
functionDeclaration: functionText + '\n' + suffix + '\n',
executionContextId: this._contextId,
arguments: args.map(convertArgument.bind(this)),
returnByValue,
awaitPromise: true,
userGesture: true
}).catch(rewriteError);
if (exceptionDetails)
throw new Error('Evaluation failed: ' + getExceptionMessage(exceptionDetails));
return returnByValue ? valueFromRemoteObject(remoteObject) : context._createHandle(remoteObject);
Expand Down Expand Up @@ -127,6 +120,8 @@ export class CRExecutionContext implements js.ExecutionContextDelegate {

if (error.message.endsWith('Cannot find context with specified id') || error.message.endsWith('Inspected target navigated or closed') || error.message.endsWith('Execution context was destroyed.'))
throw new Error('Execution context was destroyed, most likely because of a navigation.');
if (error instanceof TypeError && error.message.startsWith('Converting circular structure to JSON'))
error.message += ' Are you passing a nested JSHandle?';
throw error;
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/firefox/ffConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export class FFConnection extends platform.EventEmitter {
return this._sessions.get(sessionId) || null;
}

send<T extends keyof Protocol.CommandParameters>(
async send<T extends keyof Protocol.CommandParameters>(
method: T,
params?: Protocol.CommandParameters[T]
): Promise<Protocol.CommandReturnValues[T]> {
Expand Down Expand Up @@ -179,12 +179,12 @@ export class FFSession extends platform.EventEmitter {
this.once = super.once;
}

send<T extends keyof Protocol.CommandParameters>(
async send<T extends keyof Protocol.CommandParameters>(
method: T,
params?: Protocol.CommandParameters[T]
): Promise<Protocol.CommandReturnValues[T]> {
if (this._disposed)
return Promise.reject(new Error(`Protocol error (${method}): Session closed. Most likely the ${this._targetType} has been closed.`));
throw new Error(`Protocol error (${method}): Session closed. Most likely the ${this._targetType} has been closed.`);
const id = this._connection.nextMessageId();
this._rawSend({method, params, id});
return new Promise((resolve, reject) => {
Expand Down
23 changes: 9 additions & 14 deletions src/firefox/ffExecutionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,13 @@ export class FFExecutionContext implements js.ExecutionContextDelegate {
return {unserializableValue: 'NaN'};
return {value: arg};
});
let callFunctionPromise;
try {
callFunctionPromise = this._session.send('Runtime.callFunction', {
functionDeclaration: functionText,
args: protocolArgs,
returnByValue,
executionContextId: this._executionContextId
});
} catch (err) {
if (err instanceof TypeError && err.message.startsWith('Converting circular structure to JSON'))
err.message += ' Are you passing a nested JSHandle?';
throw err;
}
const payload = await callFunctionPromise.catch(rewriteError);

const payload = await this._session.send('Runtime.callFunction', {
functionDeclaration: functionText,
args: protocolArgs,
returnByValue,
executionContextId: this._executionContextId
}).catch(rewriteError);
checkException(payload.exceptionDetails);
if (returnByValue)
return deserializeValue(payload.result!);
Expand All @@ -103,6 +96,8 @@ export class FFExecutionContext implements js.ExecutionContextDelegate {
return {result: {type: 'undefined', value: undefined}};
if (error.message.includes('Failed to find execution context with id') || error.message.includes('Execution context was destroyed!'))
throw new Error('Execution context was destroyed, most likely because of a navigation.');
if (error instanceof TypeError && error.message.startsWith('Converting circular structure to JSON'))
error.message += ' Are you passing a nested JSHandle?';
throw error;
}
}
Expand Down
24 changes: 13 additions & 11 deletions src/platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,22 +311,27 @@ export function makeWaitForNextTask() {
};
}

export class WebSocketTransport implements ConnectionTransport {
private _ws: WebSocket;
// 'onmessage' handler must be installed synchronously when 'onopen' callback is invoked to
// avoid missing incoming messages.
export async function connectToWebsocket<T>(url: string, onopen: (transport: ConnectionTransport) => Promise<T>): Promise<T> {
const transport = new WebSocketTransport(url);
return new Promise<T>((fulfill, reject) => {
transport._ws.addEventListener('open', async () => fulfill(await onopen(transport)));
transport._ws.addEventListener('error', event => reject(new Error('WebSocket error: ' + (event as ErrorEvent).message)));
});
}

class WebSocketTransport implements ConnectionTransport {
_ws: WebSocket;

onmessage?: (message: string) => void;
onclose?: () => void;
private _connectPromise: Promise<(Error|null)>;

constructor(url: string) {
this._ws = (isNode ? new NodeWebSocket(url, [], {
perMessageDeflate: false,
maxPayload: 256 * 1024 * 1024, // 256Mb
}) : new WebSocket(url)) as WebSocket;
this._connectPromise = new Promise(fulfill => {
this._ws.addEventListener('open', () => fulfill(null));
this._ws.addEventListener('error', event => fulfill(new Error('WebSocket error: ' + (event as ErrorEvent).message)));
});
// The 'ws' module in node sometimes sends us multiple messages in a single task.
// In Web, all IO callbacks (e.g. WebSocket callbacks)
// are dispatched into separate tasks, so there's no need
Expand All @@ -348,10 +353,7 @@ export class WebSocketTransport implements ConnectionTransport {
this._ws.addEventListener('error', () => {});
}

async send(message: string) {
const error = await this._connectPromise;
if (error)
throw error;
send(message: string) {
this._ws.send(message);
}

Expand Down
7 changes: 4 additions & 3 deletions src/server/chromium.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ export class Chromium implements BrowserType {
// We try to gracefully close to prevent crash reporting and core dumps.
// Note that it's fine to reuse the pipe transport, since
// our connection ignores kBrowserCloseMessageId.
const t = transport || new platform.WebSocketTransport(browserWSEndpoint!);
const t = transport || await platform.connectToWebsocket(browserWSEndpoint!, async transport => transport);
const message = { method: 'Browser.close', id: kBrowserCloseMessageId };
await t.send(JSON.stringify(message));
},
Expand All @@ -151,8 +151,9 @@ export class Chromium implements BrowserType {
}

async connect(options: ConnectOptions): Promise<CRBrowser> {
const transport = new platform.WebSocketTransport(options.wsEndpoint);
return CRBrowser.connect(transport, options.slowMo);
return await platform.connectToWebsocket(options.wsEndpoint, transport => {
return CRBrowser.connect(transport, options.slowMo);
});
}

executablePath(): string {
Expand Down
50 changes: 27 additions & 23 deletions src/server/firefox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,24 @@
* limitations under the License.
*/

import { FFBrowser } from '../firefox/ffBrowser';
import { BrowserFetcher, OnProgressCallback, BrowserFetcherOptions } from './browserFetcher';
import { DeviceDescriptors } from '../deviceDescriptors';
import { launchProcess, waitForLine } from './processLauncher';
import * as types from '../types';
import * as platform from '../platform';
import { kBrowserCloseMessageId } from '../firefox/ffConnection';
import * as fs from 'fs';
import * as os from 'os';
import * as path from 'path';
import * as util from 'util';
import { ConnectOptions, LaunchType } from '../browser';
import { BrowserContext } from '../browserContext';
import { DeviceDescriptors } from '../deviceDescriptors';
import { TimeoutError } from '../errors';
import { Events } from '../events';
import { FFBrowser } from '../firefox/ffBrowser';
import { kBrowserCloseMessageId } from '../firefox/ffConnection';
import { assert, helper } from '../helper';
import { LaunchOptions, BrowserArgOptions, BrowserType } from './browserType';
import { ConnectOptions, LaunchType } from '../browser';
import * as platform from '../platform';
import * as types from '../types';
import { BrowserFetcher, BrowserFetcherOptions, OnProgressCallback } from './browserFetcher';
import { BrowserServer } from './browserServer';
import { Events } from '../events';
import { ConnectionTransport } from '../transport';
import { BrowserContext } from '../browserContext';
import { BrowserArgOptions, BrowserType, LaunchOptions } from './browserType';
import { launchProcess, waitForLine } from './processLauncher';

const mkdtempAsync = platform.promisify(fs.mkdtemp);

Expand Down Expand Up @@ -62,30 +61,34 @@ export class Firefox implements BrowserType {
async launch(options?: LaunchOptions & { slowMo?: number }): Promise<FFBrowser> {
if (options && (options as any).userDataDir)
throw new Error('userDataDir option is not supported in `browserType.launch`. Use `browserType.launchPersistent` instead');
const { browserServer, transport } = await this._launchServer(options, 'local');
const browser = await FFBrowser.connect(transport!, options && options.slowMo);
const browserServer = await this._launchServer(options, 'local');
const browser = await platform.connectToWebsocket(browserServer.wsEndpoint()!, transport => {
return FFBrowser.connect(transport, options && options.slowMo);
});
// Hack: for typical launch scenario, ensure that close waits for actual process termination.
browser.close = () => browserServer.close();
(browser as any)['__server__'] = browserServer;
return browser;
}

async launchServer(options?: LaunchOptions & { port?: number }): Promise<BrowserServer> {
return (await this._launchServer(options, 'server', undefined, options && options.port)).browserServer;
return await this._launchServer(options, 'server', undefined, options && options.port);
}

async launchPersistent(userDataDir: string, options?: LaunchOptions): Promise<BrowserContext> {
const { timeout = 30000 } = options || {};
const { browserServer, transport } = await this._launchServer(options, 'persistent', userDataDir);
const browser = await FFBrowser.connect(transport!);
const browserServer = await this._launchServer(options, 'persistent', userDataDir);
const browser = await platform.connectToWebsocket(browserServer.wsEndpoint()!, transport => {
return FFBrowser.connect(transport);
});
await helper.waitWithTimeout(browser._waitForTarget(t => t.type() === 'page'), 'first page', timeout);
// Hack: for typical launch scenario, ensure that close waits for actual process termination.
const browserContext = browser._defaultContext;
browserContext.close = () => browserServer.close();
return browserContext;
}

private async _launchServer(options: LaunchOptions = {}, launchType: LaunchType, userDataDir?: string, port?: number): Promise<{ browserServer: BrowserServer, transport?: ConnectionTransport }> {
private async _launchServer(options: LaunchOptions = {}, launchType: LaunchType, userDataDir?: string, port?: number): Promise<BrowserServer> {
const {
ignoreDefaultArgs = false,
args = [],
Expand Down Expand Up @@ -142,7 +145,7 @@ export class Firefox implements BrowserType {
// We try to gracefully close to prevent crash reporting and core dumps.
// Note that it's fine to reuse the pipe transport, since
// our connection ignores kBrowserCloseMessageId.
const transport = new platform.WebSocketTransport(browserWSEndpoint);
const transport = await platform.connectToWebsocket(browserWSEndpoint, async transport => transport);
const message = { method: 'Browser.close', params: {}, id: kBrowserCloseMessageId };
await transport.send(JSON.stringify(message));
},
Expand All @@ -155,13 +158,14 @@ export class Firefox implements BrowserType {
const timeoutError = new TimeoutError(`Timed out after ${timeout} ms while trying to connect to Firefox!`);
const match = await waitForLine(launchedProcess, launchedProcess.stdout, /^Juggler listening on (ws:\/\/.*)$/, timeout, timeoutError);
const browserWSEndpoint = match[1];
browserServer = new BrowserServer(launchedProcess, gracefullyClose, launchType === 'server' ? browserWSEndpoint : null);
return { browserServer, transport: launchType === 'server' ? undefined : new platform.WebSocketTransport(browserWSEndpoint) };
browserServer = new BrowserServer(launchedProcess, gracefullyClose, browserWSEndpoint);
return browserServer;
}

async connect(options: ConnectOptions): Promise<FFBrowser> {
const transport = new platform.WebSocketTransport(options.wsEndpoint);
return FFBrowser.connect(transport, options.slowMo);
return await platform.connectToWebsocket(options.wsEndpoint, transport => {
return FFBrowser.connect(transport, options.slowMo);
});
}

executablePath(): string {
Expand Down
7 changes: 4 additions & 3 deletions src/server/webkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,9 @@ export class WebKit implements BrowserType {
}

async connect(options: ConnectOptions): Promise<WKBrowser> {
const transport = new platform.WebSocketTransport(options.wsEndpoint);
return WKBrowser.connect(transport, options.slowMo);
return await platform.connectToWebsocket(options.wsEndpoint, transport => {
return WKBrowser.connect(transport, options.slowMo);
});
}

executablePath(): string {
Expand Down Expand Up @@ -271,7 +272,7 @@ class SequenceNumberMixer<V> {
}
}

async function wrapTransportWithWebSocket(transport: ConnectionTransport, port: number) {
function wrapTransportWithWebSocket(transport: ConnectionTransport, port: number) {
const server = new ws.Server({ port });
const guid = uuidv4();
const idMixer = new SequenceNumberMixer<{id: number, socket: ws}>();
Expand Down
15 changes: 9 additions & 6 deletions src/web.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@ import * as platform from './platform';
const connect = {
chromium: {
connect: async (url: string) => {
const transport = new platform.WebSocketTransport(url);
return ChromiumBrowser.connect(transport);
return await platform.connectToWebsocket(url, transport => {
return ChromiumBrowser.connect(transport);
});
}
},
webkit: {
connect: async (url: string) => {
const transport = new platform.WebSocketTransport(url);
return WebKitBrowser.connect(transport);
return await platform.connectToWebsocket(url, transport => {
return WebKitBrowser.connect(transport);
});
}
},
firefox: {
connect: async (url: string) => {
const transport = new platform.WebSocketTransport(url);
return FirefoxBrowser.connect(transport);
return await platform.connectToWebsocket(url, transport => {
return FirefoxBrowser.connect(transport);
});
}
}
};
Expand Down