Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

Commit

Permalink
Improvements to client join process (#155)
Browse files Browse the repository at this point in the history
* Debug in the terminal. network logging overwhelms the internal console.

* Sort the functional test menu.

* Remove position and rotation fields from rigid body.

* Lots of improvements to client join process.

* rollback server.ts

* rebase on red and resolve conflicts
  • Loading branch information
eanders-ms authored Jan 31, 2019
1 parent 993d484 commit edf03dc
Show file tree
Hide file tree
Showing 22 changed files with 394 additions and 399 deletions.
4 changes: 2 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
"port": 9229,
"cwd": "${workspaceFolder}/packages/functional-tests",
"autoAttachChildProcesses": true,
"console": "internalConsole",
"internalConsoleOptions": "openOnSessionStart",
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen",
"outputCapture": "std"
}
]
Expand Down
68 changes: 46 additions & 22 deletions packages/sdk/src/adapters/multipeer/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import QueryString from 'query-string';
import * as Restify from 'restify';
import UUID from 'uuid/v4';
import * as WS from 'ws';
import { Adapter, AdapterOptions } from '..';
import { Adapter, AdapterOptions, ClientHandshake, ClientStartup } from '..';
import { Context, ParameterSet, Pipe, WebSocket } from '../../';
import * as Constants from '../../constants';
import verifyClient from '../../utils/verifyClient';
Expand Down Expand Up @@ -61,7 +61,7 @@ export class MultipeerAdapter extends Adapter {
/**
* Start the adapter listening for new incoming connections from engine clients
*/
public listen(): Promise<Restify.Server> {
public listen() {
if (!this.server) {
// If necessary, create a new web server
return new Promise<Restify.Server>((resolve) => {
Expand All @@ -78,7 +78,7 @@ export class MultipeerAdapter extends Adapter {
}
}

private async getOrCreateSession(sessionId: string, params: ParameterSet): Promise<Session> {
private async getOrCreateSession(sessionId: string, params: ParameterSet) {
let session = this.sessions[sessionId];
if (!session) {
// Create an in-memory "connection" (If the app were running remotely, we would connect
Expand All @@ -89,20 +89,20 @@ export class MultipeerAdapter extends Adapter {
sessionId,
connection: pipe.remote
});
// Start the connection update loop (todo move this)
context.internal.start();
// Startup the context.
context.internal.start().catch(() => pipe.remote.close());
// Instantiate a new session
session = this.sessions[sessionId] = new Session(
pipe.local, sessionId, this.options.peerAuthoritative);
// Handle session close
const $this = this;
session.on('close', () => delete $this.sessions[sessionId]);
// Connect the session to the context
await session.connect();
await session.connect(); // Allow exceptions to propagate.
// Pass the new context to the app
this.emitter.emit('connection', context, params);
}
return Promise.resolve(session);
return session;
}

private startListening() {
Expand All @@ -111,28 +111,52 @@ export class MultipeerAdapter extends Adapter {

// Handle WebSocket connection upgrades
wss.on('connection', async (ws: WS, request: http.IncomingMessage) => {
log.info('network', "New Multi-peer connection");
try {
log.info('network', "New Multi-peer connection");

// Read the sessionId header.
let sessionId = request.headers[Constants.HTTPHeaders.SessionID] as string || UUID();
sessionId = decodeURIComponent(sessionId);
// Read the sessionId header.
let sessionId = request.headers[Constants.HTTPHeaders.SessionID] as string || UUID();
sessionId = decodeURIComponent(sessionId);

// Parse URL parameters.
const params = QueryString.parseUrl(request.url).query;
// Parse URL parameters.
const params = QueryString.parseUrl(request.url).query;

// Get the session for the sessionId
const session = await this.getOrCreateSession(sessionId, params);
// Get the client's IP address rather than the last proxy connecting to you.
const address = forwarded(request, request.headers);

// Create a WebSocket for this connection.
const conn = new WebSocket(ws, address.ip);

// Instantiate a client for this connection.
const client = new Client(conn);

// Get the client's IP address rather than the last proxy connecting to you
const address = forwarded(request, request.headers);
// Join the client to the session.
await this.joinClientToSession(client, sessionId, params);
} catch (e) {
log.error('network', e);
ws.close();
}
});
}

const conn = new WebSocket(ws, address.ip);
private async joinClientToSession(client: Client, sessionId: string, params: QueryString.OutputParams) {
try {
// Handshake with the client.
const handshake = new ClientHandshake(client, sessionId);
await handshake.run();

// Instantiate a client for this connection
const client = new Client(conn);
// Measure the connection quality and wait for sync-request message.
const startup = new ClientStartup(client);
await startup.run();

// Join the client to the session
// Get the session for the sessionId.
const session = await this.getOrCreateSession(sessionId, params);

// Join the client to the session.
await session.join(client);
});
} catch (e) {
log.error('network', e);
client.conn.close();
}
}
}
55 changes: 26 additions & 29 deletions packages/sdk/src/adapters/multipeer/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,40 +65,37 @@ export class Client extends EventEmitter {
/**
* Syncs state with the client
*/
public join(session: Session): Promise<void> {
this._session = session;
return new Promise<void>((resolve, reject) => {
// Handshake with the client
const handshake = this._protocol = new ClientHandshake(this);
handshake.on('protocol.handshake-complete', () => {
// Sync state to the client
const sync = this._protocol = new ClientSync(this);
sync.on('protocol.sync-complete', () => {
// Join the session as a user
const execution = this._protocol = new ClientExecution(this);
execution.on('recv', (message) => this.emit('recv', this, message));
execution.startListening();
resolve();
});
sync.startListening();
});
handshake.startListening();
});
public async join(session: Session) {
try {
this._session = session;
// Sync state to the client
const sync = this._protocol = new ClientSync(this);
await sync.run();
// Join the session as a user
const execution = this._protocol = new ClientExecution(this);
execution.on('recv', (message) => this.emit('recv', this, message));
execution.startListening();
} catch (e) {
log.error('network', e);
this.leave();
}
}

public leave() {
if (this._protocol) {
this._protocol.stopListening();
this._protocol = undefined;
}
this._conn.off('close', this.leave);
this._conn.off('error', this.leave);
this._conn.close();
this._session = undefined;
this.emit('close');
try {
if (this._protocol) {
this._protocol.stopListening();
this._protocol = undefined;
}
this._conn.off('close', this.leave);
this._conn.off('error', this.leave);
this._conn.close();
this._session = undefined;
this.emit('close');
} catch { }
}

public joinedOrLeft(): Promise<void> {
public joinedOrLeft() {
if (this.protocol && this.protocol.constructor.name === "ClientExecution") {
return Promise.resolve();
}
Expand Down
12 changes: 9 additions & 3 deletions packages/sdk/src/adapters/multipeer/protocols/clientExecution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,24 @@ export class ClientExecution extends Protocols.Protocol implements Protocols.Mid
private setHeartbeatTimer(): NodeJS.Timer {
return setTimeout(async () => {
if (this.heartbeatTimer) {
await this.heartbeat.send();
this.heartbeatTimer = this.setHeartbeatTimer();
try {
await this.heartbeat.send();
this.heartbeatTimer = this.setHeartbeatTimer();
} catch {
this.client.leave();
this.resolve();
}
}
// Irregular heartbeats are a good thing in this instance.
}, 1000 * (4 + 2 * Math.random()));
}, 1000 * (5 + 5 * Math.random()));
}

public beforeRecv = (message: Message): Message => {
if (this.promises[message.replyToId]) {
// If we have a queued promise for this message, let it through
return message;
} else {
// Notify listeners we received a message.
this.emit('recv', message);
// Cancel the message
return undefined;
Expand Down
37 changes: 5 additions & 32 deletions packages/sdk/src/adapters/multipeer/protocols/clientHandshake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,20 @@
* Licensed under the MIT License.
*/

import { Client, MissingRule, Rules } from '..';
import { Message } from '../../..';
import { Client } from '../../..';
import { Handshake } from '../../../protocols/handshake';
import { OperatingModel } from '../../../types/network/operatingModel';
import { ExportedPromise } from '../../../utils/exportedPromise';

/**
* @hidden
*/
export class ClientHandshake extends Handshake {
/** @override */
public get name(): string { return `${this.constructor.name} client ${this.client.id}`; }

constructor(private client: Client) {
super(client.conn, client.session.sessionId, OperatingModel.PeerAuthoritative);
public get name(): string {
return `${this.constructor.name} client ${this.client.id}`;
}

/**
* @override
* Handle the outgoing message according to the handshake rules specified for this payload.
*/
public sendMessage(message: Message, promise?: ExportedPromise) {
const rule = Rules[message.payload.type] || MissingRule;
const handling = rule.handshake.during;
// tslint:disable-next-line:switch-default
switch (handling) {
case 'allow': {
super.sendMessage(message, promise);
break;
}
case 'queue': {
this.client.queueMessage(message, promise);
break;
}
case 'ignore': {
break;
}
case 'error': {
// tslint:disable-next-line:no-console
console.log(`[ERROR] ${this.name}: Invalid message for send during handshake: ${message.payload.type}`);
}
}
constructor(private client: Client, sessionId: string) {
super(client.conn, sessionId, OperatingModel.PeerAuthoritative);
}
}
29 changes: 29 additions & 0 deletions packages/sdk/src/adapters/multipeer/protocols/clientStartup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*!
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

import { Client } from '..';
import * as Protocols from '../../../protocols';
import * as Payloads from '../../../types/network/payloads';

export class ClientStartup extends Protocols.Protocol {
/** @override */
public get name(): string { return `${this.constructor.name} client ${this.client.id}`; }

constructor(private client: Client) {
super(client.conn);
// Behave like a server-side endpoint (send heartbeats, measure connection quality).
this.use(new Protocols.ServerPreprocessing());
}

/**
* @hidden
*/
public 'recv-sync-request' = async (payload: Payloads.SyncRequest) => {
// Do a quick measurement of connection latency.
const heartbeat = new Protocols.Heartbeat(this);
await heartbeat.runIterations(10); // Allow exceptions to propagate out.
this.resolve();
}
}
Loading

0 comments on commit edf03dc

Please sign in to comment.