Skip to content

Commit

Permalink
feat: improve request timeout handling
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-tkachenko committed Nov 2, 2023
1 parent 08c8b9b commit 14cc37a
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 44 deletions.
22 changes: 22 additions & 0 deletions src/Prxi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ interface Proxy {

export class Prxi {
private server: Server = null;
private connections = new Set<Socket>();
private logInfo: (message?: any, ...params: any[]) => void;
private logError: (message?: any, ...params: any[]) => void;

Expand Down Expand Up @@ -115,6 +116,14 @@ export class Prxi {
}
});

// keep track of all open connections
server.on('connection', (connection: Socket) => {
this.connections.add(connection);
connection.on('close', () => {
this.connections.delete(connection);
});
});

// handle upgrade action
server.on('upgrade', (req: IncomingMessage, socket: Socket, head: Buffer) => {
const requestId = id++;
Expand Down Expand Up @@ -246,8 +255,21 @@ export class Prxi {
/* istanbul ignore next */
if (this.server) {
await new Promise<void>((res, rej) => {
const timer = setTimeout(() => {
this.connections.forEach((connection: Socket) => {
try {
connection.destroy();
} catch (e) {
this.logError('Failed to destroy connection', e);
}
});
}, this.configuration.proxyRequestTimeout ?? 60 * 1000);

this.logInfo('Stopping Prxi');

this.server.close((err) => {
clearTimeout(timer);

if (err) {
this.logError('Failed to stop Prxi', err);
return rej(err);
Expand Down
65 changes: 37 additions & 28 deletions src/handlers/HttpProxyHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import {request as httpsRequest} from 'node:https';

import { Configuration, ProxyRequestConfiguration } from "../interfaces";
import { UpstreamConfiguration } from "../interfaces/UpstreamConfiguration";
import { RequestUtils } from "../utils";

import { RequestUtils, Timer } from "../utils";

const emptyObj = {};

Expand Down Expand Up @@ -45,6 +44,7 @@ export class HttpProxyHandler {

this.logInfo(`[${requestId}] [HttpProxyHandler] Processing HTTP/HTTPS proxy request with method ${method} to ${target}${url}`);

const proxyRequestTimeout = this.configuration.proxyRequestTimeout ?? 60 * 1000;
const options: RequestOptions = {
method,
host,
Expand All @@ -58,40 +58,49 @@ export class HttpProxyHandler {
proxyConfiguration?.proxyRequestHeaders,
),
path: RequestUtils.concatPath(initialPath, url),
timeout: this.configuration.proxyRequestTimeout || 60 * 1000,
timeout: proxyRequestTimeout,
};

// setup timer to force incoming request to be destroyed after 2x of proxyRequestTimeout configuration setting
const timer = new Timer(() => {
req.destroy();
}, proxyRequestTimeout * 2);

const client = request(options);

await new Promise<void>((resolve, reject) => {
req.pipe(client);
try {
await new Promise<void>((resolve, reject) => {
req.pipe(client);

client.on('error', (err) => {
reject(err);
});
client.on('error', (err) => {
reject(err);
});

client.on('response', (response: IncomingMessage) => {
const headersToSet = RequestUtils.prepareProxyHeaders(
response.headers,
this.configuration.responseHeaders,
this.upstream.responseHeaders,
// istanbul ignore next
proxyConfiguration?.proxyResponseHeaders
);
RequestUtils.updateResponseHeaders(res, headersToSet);
client.on('response', (response: IncomingMessage) => {
const headersToSet = RequestUtils.prepareProxyHeaders(
response.headers,
this.configuration.responseHeaders,
this.upstream.responseHeaders,
// istanbul ignore next
proxyConfiguration?.proxyResponseHeaders
);
RequestUtils.updateResponseHeaders(res, headersToSet);

// istanbul ignore else
if (!res.writableEnded) {
response.on('end', () => {
this.logInfo(`[${requestId}] [HttpProxyHandler] Proxy request with method ${method} to ${host}${url} completed`);
resolve();
});
// istanbul ignore else
if (!res.writableEnded) {
response.on('end', () => {
this.logInfo(`[${requestId}] [HttpProxyHandler] Proxy request with method ${method} to ${host}${url} completed`);
resolve();
});

response.pipe(res);
} else {
resolve();
}
response.pipe(res);
} else {
resolve();
}
});
});
});
} finally {
timer.cancel();
}
}
}
19 changes: 13 additions & 6 deletions src/handlers/WebSocketProxyHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Socket } from "node:net";

import { Configuration, ProxyRequestConfiguration } from "../interfaces";
import { UpstreamConfiguration } from "../interfaces/UpstreamConfiguration";
import { RequestUtils, WebSocketUtils } from "../utils";
import { RequestUtils, Timer, WebSocketUtils } from "../utils";

const emptyObj = {};

Expand Down Expand Up @@ -40,6 +40,13 @@ export class WebSocketProxyHandler {
head: Buffer,
proxyConfiguration?: ProxyRequestConfiguration,
): Promise<void> {
const proxyRequestTimeout = this.configuration.proxyRequestTimeout ?? 60 * 1000;

// setup timer to force incoming request to be destroyed after 2x of proxyRequestTimeout configuration setting
const timer = new Timer(() => {
req.destroy();
}, proxyRequestTimeout * 2);

try {
WebSocketProxyHandler.debug.incomingSocket = socket;
proxyConfiguration = proxyConfiguration || emptyObj;
Expand Down Expand Up @@ -71,7 +78,7 @@ export class WebSocketProxyHandler {
proxyConfiguration?.proxyRequestHeaders,
),
path: RequestUtils.concatPath(initialPath, url),
timeout: this.configuration.proxyRequestTimeout || 60 * 1000,
timeout: proxyRequestTimeout,
};

const client = request(options);
Expand All @@ -83,9 +90,6 @@ export class WebSocketProxyHandler {
socket.unshift(head);
}

// keep socket alive
WebSocketUtils.keepAlive(socket);

await new Promise<void>((resolve, reject) => {
req.pipe(client);

Expand Down Expand Up @@ -153,7 +157,9 @@ export class WebSocketProxyHandler {
reject(err);
});

// keep socket alive
// keep sockets alive
timer.cancel();
WebSocketUtils.keepAlive(socket);
WebSocketUtils.keepAlive(proxySocket);

const headersToSet = RequestUtils.prepareProxyHeaders(
Expand All @@ -167,6 +173,7 @@ export class WebSocketProxyHandler {
});
});
} finally {
timer.cancel();
delete WebSocketProxyHandler.debug.incomingSocket;
delete WebSocketProxyHandler.debug.upstreamSocket;
delete WebSocketProxyHandler.debug.upstreamRequest;
Expand Down
21 changes: 21 additions & 0 deletions src/utils/Timer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
export class Timer {
private timer;

constructor(fn: Function, delay: number) {
if (delay <= 0) {
return fn();
}

this.timer = setTimeout(fn, delay);
}

/**
* Cancel the timer
*/
public cancel(): void {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
}
}
1 change: 1 addition & 0 deletions src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './RequestUtils';
export * from './Timer';
export * from './WebSocketUtils';
12 changes: 6 additions & 6 deletions test/HttpProxy.errors.suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export class HttpProxyErrorSuite {

@test()
async addressNotFoundFailErrorHandler(): Promise<void> {
this.proxy = new TestProxy('non-existing-host');
this.proxy = new TestProxy({}, 'non-existing-host');
await this.proxy.start();

const result = axios.post(`${this.proxyUrl}/echo`, { test: true });
Expand All @@ -44,7 +44,7 @@ export class HttpProxyErrorSuite {
@test()
async addressNotFoundPassErrorHandler(): Promise<void> {
const customError = 'Custom Error';
this.proxy = new TestProxy('non-existing-host', async (req: IncomingMessage, res: ServerResponse, err: Error): Promise<void> => {
this.proxy = new TestProxy({}, 'non-existing-host', async (req: IncomingMessage, res: ServerResponse, err: Error): Promise<void> => {
match(err.message, /getaddrinfo .* non-existing-host/gi);

await writeJson(res, JSON.stringify({customError}));
Expand All @@ -58,7 +58,7 @@ export class HttpProxyErrorSuite {
@test()
async missingHandler(): Promise<void> {
let msg = null;
this.proxy = new TestProxy('localhost', async (req: IncomingMessage, res: ServerResponse, err?: Error) => {
this.proxy = new TestProxy({}, 'localhost', async (req: IncomingMessage, res: ServerResponse, err?: Error) => {
msg = err.message;
throw err;
}, false);
Expand All @@ -72,7 +72,7 @@ export class HttpProxyErrorSuite {
@test()
async noHandlers(): Promise<void> {
let msg = null;
this.proxy = new TestProxy('localhost', async (req: IncomingMessage, res: ServerResponse, err?: Error) => {
this.proxy = new TestProxy({}, 'localhost', async (req: IncomingMessage, res: ServerResponse, err?: Error) => {
msg = err.message;
throw err;
}, null);
Expand All @@ -85,7 +85,7 @@ export class HttpProxyErrorSuite {

@test()
async noWebSocketHandler(): Promise<void> {
this.proxy = new TestProxy('localhost', false, true, false);
this.proxy = new TestProxy({}, 'localhost', false, true, false);
await this.proxy.start();
const sio = io(`http://localhost:${TestProxy.PORT}`, {
transports: ['websocket'],
Expand All @@ -108,7 +108,7 @@ export class HttpProxyErrorSuite {

@test()
async failedWebSocketHandler(): Promise<void> {
this.proxy = new TestProxy('localhost', null, true, async () => {
this.proxy = new TestProxy({}, 'localhost', null, true, async () => {
throw new Error('test');
});
await this.proxy.start();
Expand Down
53 changes: 50 additions & 3 deletions test/HttpProxy.success.suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { TestServer, TestProxy } from './helpers';
import axios from 'axios';
import {deepEqual, strictEqual} from 'assert';
import {io} from 'socket.io-client';
import { Configuration } from '../src';

@suite()
export class HttpProxySuccessSuite {
Expand Down Expand Up @@ -34,8 +35,8 @@ export class HttpProxySuccessSuite {
/**
* Init proxy server
*/
private async initProxy(): Promise<void> {
this.proxy = new TestProxy();
private async initProxy(configOverride: Partial<Configuration> = {}): Promise<void> {
this.proxy = new TestProxy(configOverride);
await this.proxy.start();
}

Expand All @@ -58,7 +59,7 @@ export class HttpProxySuccessSuite {
async customPath(): Promise<void> {
await this.after();
this.server = new TestServer(true, '/api');
this.proxy = new TestProxy('localhost', null, true, null, '/api');
this.proxy = new TestProxy({}, 'localhost', null, true, null, '/api');
await this.proxy.start();

await this.server.start();
Expand Down Expand Up @@ -86,6 +87,23 @@ export class HttpProxySuccessSuite {
});
}

@test()
async destroyRequestDueToTimeoutSettings(): Promise<void> {
await this.initProxy({
proxyRequestTimeout: 0,
});

let err;
try {
const resp = await axios.get(`${this.proxyUrl}/headers`);
console.log(resp);
} catch (e) {
err = e;
}

strictEqual(err.message, 'socket hang up');
}

@test()
async headersRequest(): Promise<void> {
await this.initProxy();
Expand Down Expand Up @@ -131,6 +149,35 @@ export class HttpProxySuccessSuite {
});
}

@test()
async destroyWebsocketRequestDueToTimeoutSettings(): Promise<void> {
await this.initProxy({
proxyRequestTimeout: 0,
});

const sio = io(`http://localhost:${TestProxy.PORT}`, {
transports: ['websocket'],
reconnection: false,
});

let err: Error;
await new Promise<void>((res, rej) => {
const timeout = setTimeout(() => {
sio.disconnect();
rej(new Error('Unable to connect to WS'));
}, 2000);

sio.on('connect_error', (e) => {
err = e;
sio.disconnect();
clearTimeout(timeout);
res();
});
});

strictEqual(err.message, 'websocket error');
}

@test()
async websocket(): Promise<void> {
await this.initProxy();
Expand Down
5 changes: 4 additions & 1 deletion test/helpers/TestProxy.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { IncomingMessage, ServerResponse } from 'http';
import { Duplex } from 'stream';
import { ErrorHandler, Prxi, ProxyRequest, WebSocketHandlerFunction, WebSocketHandlerConfig } from '../../src';
import { ErrorHandler, Prxi, ProxyRequest, WebSocketHandlerFunction, WebSocketHandlerConfig, Configuration } from '../../src';
import { TestServer } from './TestServer';

export class TestProxy {
public static readonly PORT = 8888;
private proxy: Prxi;

constructor(
private configOverride: Partial<Configuration> = {},
private host = 'localhost',
private customErrorHandler: ErrorHandler | false = null,
private isMatching: boolean | null = true,
Expand Down Expand Up @@ -51,6 +52,8 @@ export class TestProxy {
ResConfigLevelOverwrite: 'CONFIG-RESPONSE-OVERWRITE',
ResConfigLevelClear: null,
},

...this.configOverride
});

// start it
Expand Down

0 comments on commit 14cc37a

Please sign in to comment.