diff --git a/src/rpc/client.ts b/src/rpc/client.ts new file mode 100644 index 0000000000000..64d6298546165 --- /dev/null +++ b/src/rpc/client.ts @@ -0,0 +1,33 @@ +/** + * Copyright (c) Microsoft Corporation. + * + * Licensed under the Apache License, Version 2.0 (the 'License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as childProcess from 'child_process'; +import * as path from 'path'; +import { Connection } from './connection'; +import { Transport } from './transport'; + +(async () => { + const spawnedProcess = childProcess.fork(path.join(__dirname, 'server'), [], { stdio: 'pipe' }); + const transport = new Transport(spawnedProcess.stdin, spawnedProcess.stdout); + const connection = new Connection(); + connection.onmessage = message => transport.send(message); + transport.onmessage = message => connection.send(message); + + const chromium = await connection.waitForObjectWithKnownName('chromium'); + const browser = await chromium.launch({ headless: false }); + const page = await browser.newPage(); + await page.goto('https://example.com'); +})(); diff --git a/src/rpc/connection.ts b/src/rpc/connection.ts index 613c82e52871c..d15eb35ed71bd 100644 --- a/src/rpc/connection.ts +++ b/src/rpc/connection.ts @@ -34,7 +34,7 @@ import { parseError } from './serializers'; export class Connection { private _channels = new Map(); private _waitingForObject = new Map(); - sendMessageToServerTransport = (message: string): void => {}; + onmessage = (message: string): void => {}; private _lastId = 0; private _callbacks = new Map void, reject: (a: Error) => void }>(); @@ -110,11 +110,11 @@ export class Connection { const id = ++this._lastId; const converted = { id, ...message, params: this._replaceChannelsWithGuids(message.params) }; debug('pw:channel:command')(converted); - this.sendMessageToServerTransport(JSON.stringify(converted)); + this.onmessage(JSON.stringify(converted)); return new Promise((resolve, reject) => this._callbacks.set(id, { resolve, reject })); } - dispatchMessageFromServer(message: string) { + send(message: string) { const parsedMessage = JSON.parse(message); const { id, guid, method, params, result, error } = parsedMessage; if (id) { diff --git a/src/rpc/dispatcher.ts b/src/rpc/dispatcher.ts index 547cc28cb55d3..81015994d9a7c 100644 --- a/src/rpc/dispatcher.ts +++ b/src/rpc/dispatcher.ts @@ -44,21 +44,21 @@ export class Dispatcher extends EventEmitter implements Chann export class DispatcherScope { readonly dispatchers = new Map>(); readonly dispatcherSymbol = Symbol('dispatcher'); - sendMessageToClientTransport = (message: string) => {}; + onmessage = (message: string) => {}; async sendMessageToClient(guid: string, method: string, params: any): Promise { - this.sendMessageToClientTransport(JSON.stringify({ guid, method, params: this._replaceDispatchersWithGuids(params) })); + this.onmessage(JSON.stringify({ guid, method, params: this._replaceDispatchersWithGuids(params) })); } - async dispatchMessageFromClient(message: string) { + async send(message: string) { const parsedMessage = JSON.parse(message); const { id, guid, method, params } = parsedMessage; const dispatcher = this.dispatchers.get(guid)!; try { const result = await (dispatcher as any)[method](this._replaceGuidsWithDispatchers(params)); - this.sendMessageToClientTransport(JSON.stringify({ id, result: this._replaceDispatchersWithGuids(result) })); + this.onmessage(JSON.stringify({ id, result: this._replaceDispatchersWithGuids(result) })); } catch (e) { - this.sendMessageToClientTransport(JSON.stringify({ id, error: serializeError(e) })); + this.onmessage(JSON.stringify({ id, error: serializeError(e) })); } } diff --git a/src/rpc/server.ts b/src/rpc/server.ts new file mode 100644 index 0000000000000..1918200093db1 --- /dev/null +++ b/src/rpc/server.ts @@ -0,0 +1,30 @@ +/** + * Copyright (c) Microsoft Corporation. + * + * Licensed under the Apache License, Version 2.0 (the 'License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Transport } from './transport'; +import { DispatcherScope } from './dispatcher'; +import { Playwright } from '../server/playwright'; +import { BrowserTypeDispatcher } from './server/browserTypeDispatcher'; + +const dispatcherScope = new DispatcherScope(); +const transport = new Transport(process.stdout, process.stdin); +transport.onmessage = message => dispatcherScope.send(message); +dispatcherScope.onmessage = message => transport.send(message); + +const playwright = new Playwright(__dirname, require('../../browsers.json')['browsers']); +BrowserTypeDispatcher.from(dispatcherScope, playwright.chromium!); +BrowserTypeDispatcher.from(dispatcherScope, playwright.firefox!); +BrowserTypeDispatcher.from(dispatcherScope, playwright.webkit!); diff --git a/src/rpc/transport.ts b/src/rpc/transport.ts new file mode 100644 index 0000000000000..c28557099c463 --- /dev/null +++ b/src/rpc/transport.ts @@ -0,0 +1,77 @@ +/** + * Copyright (c) Microsoft Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { helper } from '../helper'; + +export class Transport { + private _pipeWrite: NodeJS.WritableStream; + private _data = Buffer.from([]); + private _waitForNextTask = helper.makeWaitForNextTask(); + private _closed = false; + private _bytesLeft = 0; + + onmessage?: (message: any) => void; + onclose?: () => void; + + constructor(pipeWrite: NodeJS.WritableStream, pipeRead: NodeJS.ReadableStream) { + this._pipeWrite = pipeWrite; + pipeRead.on('data', buffer => this._dispatch(buffer)); + this.onmessage = undefined; + this.onclose = undefined; + } + + send(message: any) { + if (this._closed) + throw new Error('Pipe has been closed'); + const data = Buffer.from(JSON.stringify(message), 'utf-8'); + const dataLength = Buffer.alloc(4); + dataLength.writeUInt32LE(data.length, 0); + this._pipeWrite.write(dataLength); + this._pipeWrite.write(data); + } + + close() { + throw new Error('unimplemented'); + } + + _dispatch(buffer: Buffer) { + this._data = Buffer.concat([this._data, buffer]); + while (true) { + if (!this._bytesLeft && this._data.length < 4) { + // Need more data. + break; + } + + if (!this._bytesLeft) { + this._bytesLeft = this._data.readUInt32LE(0); + this._data = this._data.slice(4); + } + + if (!this._bytesLeft || this._data.length < this._bytesLeft) { + // Need more data. + break; + } + + const message = this._data.slice(0, this._bytesLeft); + this._data = this._data.slice(this._bytesLeft); + this._bytesLeft = 0; + this._waitForNextTask(() => { + if (this.onmessage) + this.onmessage.call(null, JSON.parse(message.toString('utf-8'))); + }); + } + } +} diff --git a/test/test.js b/test/test.js index e0006a0509d85..3fd242ae63bc2 100644 --- a/test/test.js +++ b/test/test.js @@ -105,11 +105,11 @@ function collect(browserNames) { if (process.env.PWCHANNEL) { const dispatcherScope = new DispatcherScope(); const connection = new Connection(); - dispatcherScope.sendMessageToClientTransport = async message => { - setImmediate(() => connection.dispatchMessageFromServer(message)); + dispatcherScope.onmessage = async message => { + setImmediate(() => connection.send(message)); }; - connection.sendMessageToServerTransport = async message => { - const result = await dispatcherScope.dispatchMessageFromClient(message); + connection.onmessage = async message => { + const result = await dispatcherScope.send(message); await new Promise(f => setImmediate(f)); return result; };