Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

Improvements to client join process #155

Merged
merged 6 commits into from
Jan 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
"port": 9229,
"cwd": "${workspaceFolder}/packages/functional-tests",
"autoAttachChildProcesses": true,
"console": "internalConsole",
"internalConsoleOptions": "openOnSessionStart",
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen",
"outputCapture": "std"
}
]
Expand Down
68 changes: 46 additions & 22 deletions packages/sdk/src/adapters/multipeer/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import QueryString from 'query-string';
import * as Restify from 'restify';
import UUID from 'uuid/v4';
import * as WS from 'ws';
import { Adapter, AdapterOptions } from '..';
import { Adapter, AdapterOptions, ClientHandshake, ClientStartup } from '..';
import { Context, ParameterSet, Pipe, WebSocket } from '../../';
import * as Constants from '../../constants';
import verifyClient from '../../utils/verifyClient';
Expand Down Expand Up @@ -61,7 +61,7 @@ export class MultipeerAdapter extends Adapter {
/**
* Start the adapter listening for new incoming connections from engine clients
*/
public listen(): Promise<Restify.Server> {
public listen() {
if (!this.server) {
// If necessary, create a new web server
return new Promise<Restify.Server>((resolve) => {
Expand All @@ -78,7 +78,7 @@ export class MultipeerAdapter extends Adapter {
}
}

private async getOrCreateSession(sessionId: string, params: ParameterSet): Promise<Session> {
private async getOrCreateSession(sessionId: string, params: ParameterSet) {
let session = this.sessions[sessionId];
if (!session) {
// Create an in-memory "connection" (If the app were running remotely, we would connect
Expand All @@ -89,20 +89,20 @@ export class MultipeerAdapter extends Adapter {
sessionId,
connection: pipe.remote
});
// Start the connection update loop (todo move this)
context.internal.start();
// Startup the context.
context.internal.start().catch(() => pipe.remote.close());
// Instantiate a new session
session = this.sessions[sessionId] = new Session(
pipe.local, sessionId, this.options.peerAuthoritative);
// Handle session close
const $this = this;
session.on('close', () => delete $this.sessions[sessionId]);
// Connect the session to the context
await session.connect();
await session.connect(); // Allow exceptions to propagate.
// Pass the new context to the app
this.emitter.emit('connection', context, params);
}
return Promise.resolve(session);
return session;
}

private startListening() {
Expand All @@ -111,28 +111,52 @@ export class MultipeerAdapter extends Adapter {

// Handle WebSocket connection upgrades
wss.on('connection', async (ws: WS, request: http.IncomingMessage) => {
log.info('network', "New Multi-peer connection");
try {
log.info('network', "New Multi-peer connection");

// Read the sessionId header.
let sessionId = request.headers[Constants.HTTPHeaders.SessionID] as string || UUID();
sessionId = decodeURIComponent(sessionId);
// Read the sessionId header.
let sessionId = request.headers[Constants.HTTPHeaders.SessionID] as string || UUID();
sessionId = decodeURIComponent(sessionId);

// Parse URL parameters.
const params = QueryString.parseUrl(request.url).query;
// Parse URL parameters.
const params = QueryString.parseUrl(request.url).query;

// Get the session for the sessionId
const session = await this.getOrCreateSession(sessionId, params);
// Get the client's IP address rather than the last proxy connecting to you.
const address = forwarded(request, request.headers);

// Create a WebSocket for this connection.
const conn = new WebSocket(ws, address.ip);

// Instantiate a client for this connection.
const client = new Client(conn);

// Get the client's IP address rather than the last proxy connecting to you
const address = forwarded(request, request.headers);
// Join the client to the session.
await this.joinClientToSession(client, sessionId, params);
} catch (e) {
log.error('network', e);
ws.close();
}
});
}

const conn = new WebSocket(ws, address.ip);
private async joinClientToSession(client: Client, sessionId: string, params: QueryString.OutputParams) {
try {
// Handshake with the client.
const handshake = new ClientHandshake(client, sessionId);
await handshake.run();

// Instantiate a client for this connection
const client = new Client(conn);
// Measure the connection quality and wait for sync-request message.
const startup = new ClientStartup(client);
await startup.run();

// Join the client to the session
// Get the session for the sessionId.
const session = await this.getOrCreateSession(sessionId, params);

// Join the client to the session.
await session.join(client);
});
} catch (e) {
log.error('network', e);
client.conn.close();
}
}
}
55 changes: 26 additions & 29 deletions packages/sdk/src/adapters/multipeer/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,40 +65,37 @@ export class Client extends EventEmitter {
/**
* Syncs state with the client
*/
public join(session: Session): Promise<void> {
this._session = session;
return new Promise<void>((resolve, reject) => {
// Handshake with the client
const handshake = this._protocol = new ClientHandshake(this);
handshake.on('protocol.handshake-complete', () => {
// Sync state to the client
const sync = this._protocol = new ClientSync(this);
sync.on('protocol.sync-complete', () => {
// Join the session as a user
const execution = this._protocol = new ClientExecution(this);
execution.on('recv', (message) => this.emit('recv', this, message));
execution.startListening();
resolve();
});
sync.startListening();
});
handshake.startListening();
});
public async join(session: Session) {
try {
this._session = session;
// Sync state to the client
const sync = this._protocol = new ClientSync(this);
await sync.run();
// Join the session as a user
const execution = this._protocol = new ClientExecution(this);
execution.on('recv', (message) => this.emit('recv', this, message));
execution.startListening();
} catch (e) {
log.error('network', e);
this.leave();
}
}

public leave() {
if (this._protocol) {
this._protocol.stopListening();
this._protocol = undefined;
}
this._conn.off('close', this.leave);
this._conn.off('error', this.leave);
this._conn.close();
this._session = undefined;
this.emit('close');
try {
if (this._protocol) {
this._protocol.stopListening();
this._protocol = undefined;
}
this._conn.off('close', this.leave);
this._conn.off('error', this.leave);
this._conn.close();
this._session = undefined;
this.emit('close');
} catch { }
}

public joinedOrLeft(): Promise<void> {
public joinedOrLeft() {
if (this.protocol && this.protocol.constructor.name === "ClientExecution") {
return Promise.resolve();
}
Expand Down
12 changes: 9 additions & 3 deletions packages/sdk/src/adapters/multipeer/protocols/clientExecution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,24 @@ export class ClientExecution extends Protocols.Protocol implements Protocols.Mid
private setHeartbeatTimer(): NodeJS.Timer {
return setTimeout(async () => {
if (this.heartbeatTimer) {
await this.heartbeat.send();
this.heartbeatTimer = this.setHeartbeatTimer();
try {
await this.heartbeat.send();
this.heartbeatTimer = this.setHeartbeatTimer();
} catch {
this.client.leave();
this.resolve();
}
}
// Irregular heartbeats are a good thing in this instance.
}, 1000 * (4 + 2 * Math.random()));
}, 1000 * (5 + 5 * Math.random()));
}

public beforeRecv = (message: Message): Message => {
if (this.promises[message.replyToId]) {
// If we have a queued promise for this message, let it through
return message;
} else {
// Notify listeners we received a message.
this.emit('recv', message);
// Cancel the message
return undefined;
Expand Down
37 changes: 5 additions & 32 deletions packages/sdk/src/adapters/multipeer/protocols/clientHandshake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,20 @@
* Licensed under the MIT License.
*/

import { Client, MissingRule, Rules } from '..';
import { Message } from '../../..';
import { Client } from '../../..';
import { Handshake } from '../../../protocols/handshake';
import { OperatingModel } from '../../../types/network/operatingModel';
import { ExportedPromise } from '../../../utils/exportedPromise';

/**
* @hidden
*/
export class ClientHandshake extends Handshake {
/** @override */
public get name(): string { return `${this.constructor.name} client ${this.client.id}`; }

constructor(private client: Client) {
super(client.conn, client.session.sessionId, OperatingModel.PeerAuthoritative);
public get name(): string {
return `${this.constructor.name} client ${this.client.id}`;
}

/**
* @override
* Handle the outgoing message according to the handshake rules specified for this payload.
*/
public sendMessage(message: Message, promise?: ExportedPromise) {
const rule = Rules[message.payload.type] || MissingRule;
const handling = rule.handshake.during;
// tslint:disable-next-line:switch-default
switch (handling) {
case 'allow': {
super.sendMessage(message, promise);
break;
}
case 'queue': {
this.client.queueMessage(message, promise);
break;
}
case 'ignore': {
break;
}
case 'error': {
// tslint:disable-next-line:no-console
console.log(`[ERROR] ${this.name}: Invalid message for send during handshake: ${message.payload.type}`);
}
}
constructor(private client: Client, sessionId: string) {
super(client.conn, sessionId, OperatingModel.PeerAuthoritative);
}
}
29 changes: 29 additions & 0 deletions packages/sdk/src/adapters/multipeer/protocols/clientStartup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*!
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

import { Client } from '..';
import * as Protocols from '../../../protocols';
import * as Payloads from '../../../types/network/payloads';

export class ClientStartup extends Protocols.Protocol {
/** @override */
public get name(): string { return `${this.constructor.name} client ${this.client.id}`; }

constructor(private client: Client) {
super(client.conn);
// Behave like a server-side endpoint (send heartbeats, measure connection quality).
this.use(new Protocols.ServerPreprocessing());
}

/**
* @hidden
*/
public 'recv-sync-request' = async (payload: Payloads.SyncRequest) => {
// Do a quick measurement of connection latency.
const heartbeat = new Protocols.Heartbeat(this);
await heartbeat.runIterations(10); // Allow exceptions to propagate out.
this.resolve();
}
}
Loading