Skip to content

Commit

Permalink
chore(rpc): bootstrap demo for rpc (#2741)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavelfeldman authored Jun 27, 2020
1 parent 4e94bda commit e920fde
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 12 deletions.
33 changes: 33 additions & 0 deletions src/rpc/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright (c) Microsoft Corporation.
*
* Licensed under the Apache License, Version 2.0 (the 'License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as childProcess from 'child_process';
import * as path from 'path';
import { Connection } from './connection';
import { Transport } from './transport';

(async () => {
const spawnedProcess = childProcess.fork(path.join(__dirname, 'server'), [], { stdio: 'pipe' });
const transport = new Transport(spawnedProcess.stdin, spawnedProcess.stdout);
const connection = new Connection();
connection.onmessage = message => transport.send(message);
transport.onmessage = message => connection.send(message);

const chromium = await connection.waitForObjectWithKnownName('chromium');
const browser = await chromium.launch({ headless: false });
const page = await browser.newPage();
await page.goto('https://example.com');
})();
6 changes: 3 additions & 3 deletions src/rpc/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import { parseError } from './serializers';
export class Connection {
private _channels = new Map<string, Channel>();
private _waitingForObject = new Map<string, any>();
sendMessageToServerTransport = (message: string): void => {};
onmessage = (message: string): void => {};
private _lastId = 0;
private _callbacks = new Map<number, { resolve: (a: any) => void, reject: (a: Error) => void }>();

Expand Down Expand Up @@ -110,11 +110,11 @@ export class Connection {
const id = ++this._lastId;
const converted = { id, ...message, params: this._replaceChannelsWithGuids(message.params) };
debug('pw:channel:command')(converted);
this.sendMessageToServerTransport(JSON.stringify(converted));
this.onmessage(JSON.stringify(converted));
return new Promise((resolve, reject) => this._callbacks.set(id, { resolve, reject }));
}

dispatchMessageFromServer(message: string) {
send(message: string) {
const parsedMessage = JSON.parse(message);
const { id, guid, method, params, result, error } = parsedMessage;
if (id) {
Expand Down
10 changes: 5 additions & 5 deletions src/rpc/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,21 @@ export class Dispatcher<Type, Initializer> extends EventEmitter implements Chann
export class DispatcherScope {
readonly dispatchers = new Map<string, Dispatcher<any, any>>();
readonly dispatcherSymbol = Symbol('dispatcher');
sendMessageToClientTransport = (message: string) => {};
onmessage = (message: string) => {};

async sendMessageToClient(guid: string, method: string, params: any): Promise<any> {
this.sendMessageToClientTransport(JSON.stringify({ guid, method, params: this._replaceDispatchersWithGuids(params) }));
this.onmessage(JSON.stringify({ guid, method, params: this._replaceDispatchersWithGuids(params) }));
}

async dispatchMessageFromClient(message: string) {
async send(message: string) {
const parsedMessage = JSON.parse(message);
const { id, guid, method, params } = parsedMessage;
const dispatcher = this.dispatchers.get(guid)!;
try {
const result = await (dispatcher as any)[method](this._replaceGuidsWithDispatchers(params));
this.sendMessageToClientTransport(JSON.stringify({ id, result: this._replaceDispatchersWithGuids(result) }));
this.onmessage(JSON.stringify({ id, result: this._replaceDispatchersWithGuids(result) }));
} catch (e) {
this.sendMessageToClientTransport(JSON.stringify({ id, error: serializeError(e) }));
this.onmessage(JSON.stringify({ id, error: serializeError(e) }));
}
}

Expand Down
30 changes: 30 additions & 0 deletions src/rpc/server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright (c) Microsoft Corporation.
*
* Licensed under the Apache License, Version 2.0 (the 'License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Transport } from './transport';
import { DispatcherScope } from './dispatcher';
import { Playwright } from '../server/playwright';
import { BrowserTypeDispatcher } from './server/browserTypeDispatcher';

const dispatcherScope = new DispatcherScope();
const transport = new Transport(process.stdout, process.stdin);
transport.onmessage = message => dispatcherScope.send(message);
dispatcherScope.onmessage = message => transport.send(message);

const playwright = new Playwright(__dirname, require('../../browsers.json')['browsers']);
BrowserTypeDispatcher.from(dispatcherScope, playwright.chromium!);
BrowserTypeDispatcher.from(dispatcherScope, playwright.firefox!);
BrowserTypeDispatcher.from(dispatcherScope, playwright.webkit!);
77 changes: 77 additions & 0 deletions src/rpc/transport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* Copyright (c) Microsoft Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { helper } from '../helper';

export class Transport {
private _pipeWrite: NodeJS.WritableStream;
private _data = Buffer.from([]);
private _waitForNextTask = helper.makeWaitForNextTask();
private _closed = false;
private _bytesLeft = 0;

onmessage?: (message: any) => void;
onclose?: () => void;

constructor(pipeWrite: NodeJS.WritableStream, pipeRead: NodeJS.ReadableStream) {
this._pipeWrite = pipeWrite;
pipeRead.on('data', buffer => this._dispatch(buffer));
this.onmessage = undefined;
this.onclose = undefined;
}

send(message: any) {
if (this._closed)
throw new Error('Pipe has been closed');
const data = Buffer.from(JSON.stringify(message), 'utf-8');
const dataLength = Buffer.alloc(4);
dataLength.writeUInt32LE(data.length, 0);
this._pipeWrite.write(dataLength);
this._pipeWrite.write(data);
}

close() {
throw new Error('unimplemented');
}

_dispatch(buffer: Buffer) {
this._data = Buffer.concat([this._data, buffer]);
while (true) {
if (!this._bytesLeft && this._data.length < 4) {
// Need more data.
break;
}

if (!this._bytesLeft) {
this._bytesLeft = this._data.readUInt32LE(0);
this._data = this._data.slice(4);
}

if (!this._bytesLeft || this._data.length < this._bytesLeft) {
// Need more data.
break;
}

const message = this._data.slice(0, this._bytesLeft);
this._data = this._data.slice(this._bytesLeft);
this._bytesLeft = 0;
this._waitForNextTask(() => {
if (this.onmessage)
this.onmessage.call(null, JSON.parse(message.toString('utf-8')));
});
}
}
}
8 changes: 4 additions & 4 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ function collect(browserNames) {
if (process.env.PWCHANNEL) {
const dispatcherScope = new DispatcherScope();
const connection = new Connection();
dispatcherScope.sendMessageToClientTransport = async message => {
setImmediate(() => connection.dispatchMessageFromServer(message));
dispatcherScope.onmessage = async message => {
setImmediate(() => connection.send(message));
};
connection.sendMessageToServerTransport = async message => {
const result = await dispatcherScope.dispatchMessageFromClient(message);
connection.onmessage = async message => {
const result = await dispatcherScope.send(message);
await new Promise(f => setImmediate(f));
return result;
};
Expand Down

0 comments on commit e920fde

Please sign in to comment.