Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

Commit

Permalink
Support for selective message timeouts (#379)
Browse files Browse the repository at this point in the history
  • Loading branch information
eanders-ms authored Jul 9, 2019
1 parent 39aedff commit d65a41a
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 28 deletions.
5 changes: 3 additions & 2 deletions packages/sdk/src/adapters/multipeer/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import filterEmpty from '../../utils/filterEmpty';
export type QueuedMessage = {
message: Message;
promise?: ExportedPromise;
timeoutSeconds?: number;
};

/**
Expand Down Expand Up @@ -125,15 +126,15 @@ export class Client extends EventEmitter {
}
}

public queueMessage(message: Message, promise?: ExportedPromise) {
public queueMessage(message: Message, promise?: ExportedPromise, timeoutSeconds?: number) {
const rule = Rules[message.payload.type] || MissingRule;
const beforeQueueMessageForClient = rule.client.beforeQueueMessageForClient || (() => message);
message = beforeQueueMessageForClient(this.session, this, message, promise);
if (message) {
// tslint:disable-next-line:max-line-length
log.verbose('network', `Client ${this.id.substr(0, 8)} queue id:${message.id.substr(0, 8)}, type:${message.payload.type}`);
log.verbose('network-content', JSON.stringify(message, (key, value) => filterEmpty(value)));
this.queuedMessages.push({ message, promise });
this.queuedMessages.push({ message, promise, timeoutSeconds });
}
}

Expand Down
13 changes: 9 additions & 4 deletions packages/sdk/src/adapters/multipeer/protocols/clientExecution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
*/

import { Client, ClientDesyncPreprocessor } from '..';
import { MissingRule, Rules } from '..';
import { Message } from '../../..';
import * as Protocols from '../../../protocols';
import { ExportedPromise } from '../../../utils/exportedPromise';

/**
* @hidden
Expand All @@ -20,10 +22,6 @@ export class ClientExecution extends Protocols.Protocol implements Protocols.Mid

constructor(private client: Client) {
super(client.conn);
// Set timeout a little shorter than the app/session connection, ensuring we don't
// cause an app/session message timeout - which is not a supported scenario (there
// is no reconnect).
this.timeoutSeconds = Protocols.DefaultConnectionTimeoutSeconds * 2 / 3;
this.heartbeat = new Protocols.Heartbeat(this);
this.beforeRecv = this.beforeRecv.bind(this);
// Behave like a server-side endpoint (send heartbeats, measure connection quality)
Expand All @@ -34,6 +32,13 @@ export class ClientExecution extends Protocols.Protocol implements Protocols.Mid
this.use(this);
}

/** @override */
public sendMessage(message: Message, promise?: ExportedPromise, timeoutSeconds?: number) {
// Apply timeout to messages going to the client.
const rule = Rules[message.payload.type] || MissingRule;
super.sendMessage(message, promise, rule.client.timeoutSeconds);
}

public startListening() {
super.startListening();
if (!this.heartbeatTimer) {
Expand Down
10 changes: 10 additions & 0 deletions packages/sdk/src/adapters/multipeer/protocols/clientHandshake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
* Licensed under the MIT License.
*/

import { MissingRule, Rules } from '..';
import { Client } from '../../..';
import { Message } from '../../..';
import { Handshake } from '../../../protocols/handshake';
import { OperatingModel } from '../../../types/network/operatingModel';
import { ExportedPromise } from '../../../utils/exportedPromise';

/**
* @hidden
Expand All @@ -17,4 +20,11 @@ export class ClientHandshake extends Handshake {
constructor(private client: Client, sessionId: string) {
super(client.conn, sessionId, OperatingModel.PeerAuthoritative);
}

/** @override */
public sendMessage(message: Message, promise?: ExportedPromise, timeoutSeconds?: number) {
// Apply timeout to messages going to the client.
const rule = Rules[message.payload.type] || MissingRule;
super.sendMessage(message, promise, rule.client.timeoutSeconds);
}
}
10 changes: 10 additions & 0 deletions packages/sdk/src/adapters/multipeer/protocols/clientStartup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
*/

import { Client } from '..';
import { MissingRule, Rules } from '..';
import { Message } from '../../..';
import * as Protocols from '../../../protocols';
import * as Payloads from '../../../types/network/payloads';
import { ExportedPromise } from '../../../utils/exportedPromise';

export class ClientStartup extends Protocols.Protocol {
/** @override */
Expand All @@ -23,6 +26,13 @@ export class ClientStartup extends Protocols.Protocol {
}
}

/** @override */
public sendMessage(message: Message, promise?: ExportedPromise, timeoutSeconds?: number) {
// Apply timeout to messages going to the client.
const rule = Rules[message.payload.type] || MissingRule;
super.sendMessage(message, promise, rule.client.timeoutSeconds);
}

/**
* @hidden
*/
Expand Down
8 changes: 4 additions & 4 deletions packages/sdk/src/adapters/multipeer/protocols/clientSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,17 @@ export class ClientSync extends Protocols.Protocol {
* @override
* Handle the outgoing message according to the synchronization rules specified for this payload.
*/
public sendMessage(message: Message, promise?: ExportedPromise) {
public sendMessage(message: Message, promise?: ExportedPromise, timeoutSeconds?: number) {
message.id = message.id || UUID();
const handling = this.handlingForMessage(message);
// tslint:disable-next-line:switch-default
switch (handling) {
case 'allow': {
super.sendMessage(message, promise);
super.sendMessage(message, promise, timeoutSeconds);
break;
}
case 'queue': {
this.client.queueMessage(message, promise);
this.client.queueMessage(message, promise, timeoutSeconds);
break;
}
case 'ignore': {
Expand Down Expand Up @@ -327,7 +327,7 @@ export class ClientSync extends Protocols.Protocol {
break;
}
for (const queuedMessage of queuedMessages) {
this.sendMessage(queuedMessage.message, queuedMessage.promise);
this.sendMessage(queuedMessage.message, queuedMessage.promise, queuedMessage.timeoutSeconds);
}
await this.drainPromises();
} while (true);
Expand Down
24 changes: 22 additions & 2 deletions packages/sdk/src/adapters/multipeer/rules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ export type Rule = {
* Message preprocessing applied by the Client class.
*/
client: {
/**
* If non-zero, a timeout will be set for this message. If we don't receive a reply before the
* timeout expires, the client connection will be closed. Only applicable to messages expecting
* replies.
*/
timeoutSeconds: number;
/**
* Called before a message is queued for later delivery to a client.
* @param session The current session.
Expand Down Expand Up @@ -129,6 +135,7 @@ export const DefaultRule: Rule = {
after: 'allow'
},
client: {
timeoutSeconds: 0,
beforeQueueMessageForClient: (
session: Session, client: Client, message: any, promise: ExportedPromise) => {
return message;
Expand Down Expand Up @@ -585,7 +592,13 @@ export const Rules: { [id in Payloads.PayloadType]: Rule } = {
'engine2app-rpc': ClientOnlyRule,

// ========================================================================
'handshake': ClientOnlyRule,
'handshake': {
...ClientOnlyRule,
client: {
...ClientOnlyRule.client,
timeoutSeconds: 30
},
},

// ========================================================================
'handshake-complete': ClientOnlyRule,
Expand All @@ -610,6 +623,10 @@ export const Rules: { [id in Payloads.PayloadType]: Rule } = {
during: 'allow',
after: 'allow',
},
client: {
...DefaultRule.client,
timeoutSeconds: 30
}
},

// ========================================================================
Expand Down Expand Up @@ -1004,7 +1021,6 @@ export const Rules: { [id in Payloads.PayloadType]: Rule } = {
return message;
}
}

},

// ========================================================================
Expand All @@ -1015,6 +1031,10 @@ export const Rules: { [id in Payloads.PayloadType]: Rule } = {
before: 'error',
during: 'allow',
after: 'error'
},
client: {
...DefaultRule.client,
timeoutSeconds: 30
}
},

Expand Down
19 changes: 3 additions & 16 deletions packages/sdk/src/protocols/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,12 @@ import { ExportedPromise } from '../utils/exportedPromise';
import filterEmpty from '../utils/filterEmpty';
import { Middleware } from './middleware';

// tslint:disable:variable-name
/**
* The amount of time to wait for a reply message before closing the connection.
* Set to zero to disable timeouts.
*/
export let DefaultConnectionTimeoutSeconds = 30;
// tslint:enable:variable-name

/**
* @hidden
* Class to handle sending and receiving messages with a client.
*/
export class Protocol extends EventEmitter {
private middlewares: Middleware[] = [];
// tslint:disable-next-line:variable-name
private _timeoutSeconds = DefaultConnectionTimeoutSeconds;

private promise: Promise<void>;
private promiseResolve: (value?: void | PromiseLike<void>) => void;
Expand All @@ -37,9 +27,6 @@ export class Protocol extends EventEmitter {
public get promises() { return this.conn.promises; }
public get name() { return this.constructor.name; }

public get timeoutSeconds() { return this._timeoutSeconds; }
public set timeoutSeconds(value) { this._timeoutSeconds = value; }

// tslint:disable-next-line:variable-name
constructor(private _conn: Connection) {
super();
Expand Down Expand Up @@ -84,7 +71,7 @@ export class Protocol extends EventEmitter {
this.sendMessage({ payload }, promise);
}

public sendMessage(message: Message, promise?: ExportedPromise) {
public sendMessage(message: Message, promise?: ExportedPromise, timeoutSeconds?: number) {
message.id = message.id || UUID();

// Run message through all the middlewares
Expand All @@ -102,14 +89,14 @@ export class Protocol extends EventEmitter {
}

const setReplyTimeout = () => {
if (this.timeoutSeconds > 0) {
if (timeoutSeconds > 0) {
return setTimeout(() => {
// tslint:disable-next-line:max-line-length
const reason = `${this.name} timed out awaiting response for ${message.payload.type}, id:${message.id}.`;
log.error('network', reason);
this.rejectPromiseForMessage(message.id, reason);
this.conn.close();
}, this.timeoutSeconds * 1000);
}, timeoutSeconds * 1000);
}
};

Expand Down

0 comments on commit d65a41a

Please sign in to comment.