Skip to content

Commit

Permalink
feat: improve request timeout handling
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-tkachenko committed Nov 4, 2023
1 parent 14cc37a commit 5069f60
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 136 deletions.
31 changes: 11 additions & 20 deletions src/Prxi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ interface Proxy {

export class Prxi {
private server: Server = null;
private connections = new Set<Socket>();
private logInfo: (message?: any, ...params: any[]) => void;
private logError: (message?: any, ...params: any[]) => void;

constructor(private configuration: Configuration) {
// set default values
/* istanbul ignore next */
configuration.proxyRequestTimeout = configuration.proxyRequestTimeout ?? 60 * 1000;

const {logInfo, logError} = this.configuration;
this.logInfo = (msg) => {
// istanbul ignore next
Expand Down Expand Up @@ -69,7 +72,7 @@ export class Prxi {

let id = 0;
// create server
const server = this.server = createServer((req: IncomingMessage, res: ServerResponse) => {
const server = createServer((req: IncomingMessage, res: ServerResponse) => {
const requestId = id++;
const path = RequestUtils.getPath(req);

Expand Down Expand Up @@ -118,10 +121,7 @@ export class Prxi {

// keep track of all open connections
server.on('connection', (connection: Socket) => {
this.connections.add(connection);
connection.on('close', () => {
this.connections.delete(connection);
});
connection.setTimeout(this.configuration.proxyRequestTimeout);
});

// handle upgrade action
Expand Down Expand Up @@ -172,6 +172,7 @@ export class Prxi {
// start listening on incoming connections
await new Promise<void>(res => {
server.listen(port, hostname, () => {
this.server = server;
this.logInfo(`Prxi started listening on ${hostname}:${port}`);
res();
});
Expand Down Expand Up @@ -252,24 +253,14 @@ export class Prxi {
* Stop proxy service if running
*/
public async stop(): Promise<void> {
const server = this.server;

/* istanbul ignore next */
if (this.server) {
if (server) {
await new Promise<void>((res, rej) => {
const timer = setTimeout(() => {
this.connections.forEach((connection: Socket) => {
try {
connection.destroy();
} catch (e) {
this.logError('Failed to destroy connection', e);
}
});
}, this.configuration.proxyRequestTimeout ?? 60 * 1000);

this.logInfo('Stopping Prxi');

this.server.close((err) => {
clearTimeout(timer);

server.close((err) => {
if (err) {
this.logError('Failed to stop Prxi', err);
return rej(err);
Expand Down
64 changes: 27 additions & 37 deletions src/handlers/HttpProxyHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {request as httpsRequest} from 'node:https';

import { Configuration, ProxyRequestConfiguration } from "../interfaces";
import { UpstreamConfiguration } from "../interfaces/UpstreamConfiguration";
import { RequestUtils, Timer } from "../utils";
import { RequestUtils } from "../utils";

const emptyObj = {};

Expand Down Expand Up @@ -44,7 +44,6 @@ export class HttpProxyHandler {

this.logInfo(`[${requestId}] [HttpProxyHandler] Processing HTTP/HTTPS proxy request with method ${method} to ${target}${url}`);

const proxyRequestTimeout = this.configuration.proxyRequestTimeout ?? 60 * 1000;
const options: RequestOptions = {
method,
host,
Expand All @@ -58,49 +57,40 @@ export class HttpProxyHandler {
proxyConfiguration?.proxyRequestHeaders,
),
path: RequestUtils.concatPath(initialPath, url),
timeout: proxyRequestTimeout,
timeout: this.configuration.proxyRequestTimeout,
};

// setup timer to force incoming request to be destroyed after 2x of proxyRequestTimeout configuration setting
const timer = new Timer(() => {
req.destroy();
}, proxyRequestTimeout * 2);

const client = request(options);

try {
await new Promise<void>((resolve, reject) => {
req.pipe(client);

client.on('error', (err) => {
reject(err);
});
await new Promise<void>((resolve, reject) => {
req.pipe(client);

client.on('response', (response: IncomingMessage) => {
const headersToSet = RequestUtils.prepareProxyHeaders(
response.headers,
this.configuration.responseHeaders,
this.upstream.responseHeaders,
// istanbul ignore next
proxyConfiguration?.proxyResponseHeaders
);
RequestUtils.updateResponseHeaders(res, headersToSet);
client.on('error', (err) => {
reject(err);
});

// istanbul ignore else
if (!res.writableEnded) {
response.on('end', () => {
this.logInfo(`[${requestId}] [HttpProxyHandler] Proxy request with method ${method} to ${host}${url} completed`);
resolve();
});
client.on('response', (response: IncomingMessage) => {
const headersToSet = RequestUtils.prepareProxyHeaders(
response.headers,
this.configuration.responseHeaders,
this.upstream.responseHeaders,
// istanbul ignore next
proxyConfiguration?.proxyResponseHeaders
);
RequestUtils.updateResponseHeaders(res, headersToSet);

response.pipe(res);
} else {
// istanbul ignore else
if (!res.writableEnded) {
response.on('end', () => {
this.logInfo(`[${requestId}] [HttpProxyHandler] Proxy request with method ${method} to ${host}${url} completed`);
resolve();
}
});
});

response.pipe(res);
} else {
resolve();
}
});
} finally {
timer.cancel();
}
});
}
}
13 changes: 2 additions & 11 deletions src/handlers/WebSocketProxyHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Socket } from "node:net";

import { Configuration, ProxyRequestConfiguration } from "../interfaces";
import { UpstreamConfiguration } from "../interfaces/UpstreamConfiguration";
import { RequestUtils, Timer, WebSocketUtils } from "../utils";
import { RequestUtils, WebSocketUtils } from "../utils";

const emptyObj = {};

Expand Down Expand Up @@ -40,13 +40,6 @@ export class WebSocketProxyHandler {
head: Buffer,
proxyConfiguration?: ProxyRequestConfiguration,
): Promise<void> {
const proxyRequestTimeout = this.configuration.proxyRequestTimeout ?? 60 * 1000;

// setup timer to force incoming request to be destroyed after 2x of proxyRequestTimeout configuration setting
const timer = new Timer(() => {
req.destroy();
}, proxyRequestTimeout * 2);

try {
WebSocketProxyHandler.debug.incomingSocket = socket;
proxyConfiguration = proxyConfiguration || emptyObj;
Expand Down Expand Up @@ -78,7 +71,7 @@ export class WebSocketProxyHandler {
proxyConfiguration?.proxyRequestHeaders,
),
path: RequestUtils.concatPath(initialPath, url),
timeout: proxyRequestTimeout,
timeout: this.configuration.proxyRequestTimeout,
};

const client = request(options);
Expand Down Expand Up @@ -158,7 +151,6 @@ export class WebSocketProxyHandler {
});

// keep sockets alive
timer.cancel();
WebSocketUtils.keepAlive(socket);
WebSocketUtils.keepAlive(proxySocket);

Expand All @@ -173,7 +165,6 @@ export class WebSocketProxyHandler {
});
});
} finally {
timer.cancel();
delete WebSocketProxyHandler.debug.incomingSocket;
delete WebSocketProxyHandler.debug.upstreamSocket;
delete WebSocketProxyHandler.debug.upstreamRequest;
Expand Down
21 changes: 0 additions & 21 deletions src/utils/Timer.ts

This file was deleted.

1 change: 0 additions & 1 deletion src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
export * from './RequestUtils';
export * from './Timer';
export * from './WebSocketUtils';
46 changes: 0 additions & 46 deletions test/HttpProxy.success.suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,6 @@ export class HttpProxySuccessSuite {
});
}

@test()
async destroyRequestDueToTimeoutSettings(): Promise<void> {
await this.initProxy({
proxyRequestTimeout: 0,
});

let err;
try {
const resp = await axios.get(`${this.proxyUrl}/headers`);
console.log(resp);
} catch (e) {
err = e;
}

strictEqual(err.message, 'socket hang up');
}

@test()
async headersRequest(): Promise<void> {
await this.initProxy();
Expand Down Expand Up @@ -149,35 +132,6 @@ export class HttpProxySuccessSuite {
});
}

@test()
async destroyWebsocketRequestDueToTimeoutSettings(): Promise<void> {
await this.initProxy({
proxyRequestTimeout: 0,
});

const sio = io(`http://localhost:${TestProxy.PORT}`, {
transports: ['websocket'],
reconnection: false,
});

let err: Error;
await new Promise<void>((res, rej) => {
const timeout = setTimeout(() => {
sio.disconnect();
rej(new Error('Unable to connect to WS'));
}, 2000);

sio.on('connect_error', (e) => {
err = e;
sio.disconnect();
clearTimeout(timeout);
res();
});
});

strictEqual(err.message, 'websocket error');
}

@test()
async websocket(): Promise<void> {
await this.initProxy();
Expand Down

0 comments on commit 5069f60

Please sign in to comment.