Skip to content

Commit

Permalink
feat: close proxy http/1 connection when request closes before the re…
Browse files Browse the repository at this point in the history
…sponse
  • Loading branch information
vlad-tkachenko committed Jan 27, 2024
1 parent 800da8f commit 015433b
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 92 deletions.
151 changes: 81 additions & 70 deletions src/handlers/HttpProxyHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,87 +79,98 @@ export class HttpProxyHandler {
}

const client = request(options);
let processed = false;

await new Promise<void>((resolve, reject) => {
req.pipe(client);
req.socket.once('close', () => {
if (processed) return;

client.once('error', (err) => {
this.log.error(context, `Proxy request failed`, err, {
class: HttpProxyHandler.LOG_CLASS,
method,
target,
path: url,
});
client.destroy(new Error('Request closed by the client'));
});

reject(err);
});
try {
await new Promise<void>((resolve, reject) => {
req.pipe(client);

client.once('error', (err) => {
this.log.error(context, `Proxy request failed`, err, {
class: HttpProxyHandler.LOG_CLASS,
method,
target,
path: url,
});

client.once('response', (response: IncomingMessage) => {
this.log.debug(context, `Response received`,{
class: HttpProxyHandler.LOG_CLASS,
method,
target,
path: url,
responseStatusCode: response.statusCode,
reject(err);
});

if (isKeepAliveRequest) {
client.setTimeout(0);
}

// map status code
res.statusCode = response.statusCode;

const headersToSet = RequestUtils.prepareProxyHeaders(
response.headers,
this.configuration.responseHeaders,
this.upstream.responseHeaders,
// istanbul ignore next
proxyConfiguration?.proxyResponseHeaders
);

const next = () => {
RequestUtils.updateResponseHeaders(res, headersToSet);

// istanbul ignore else
if (!res.writableEnded) {
response.once('end', () => {
this.log.debug(context, `Proxy request completed`,{
class: HttpProxyHandler.LOG_CLASS,
method,
target,
path: url,
responseStatusCode: response.statusCode,
client.once('response', (response: IncomingMessage) => {
this.log.debug(context, `Response received`,{
class: HttpProxyHandler.LOG_CLASS,
method,
target,
path: url,
responseStatusCode: response.statusCode,
});

if (isKeepAliveRequest) {
client.setTimeout(0);
}

// map status code
res.statusCode = response.statusCode;

const headersToSet = RequestUtils.prepareProxyHeaders(
response.headers,
this.configuration.responseHeaders,
this.upstream.responseHeaders,
// istanbul ignore next
proxyConfiguration?.proxyResponseHeaders
);

const next = () => {
RequestUtils.updateResponseHeaders(res, headersToSet);

// istanbul ignore else
if (!res.writableEnded) {
response.once('end', () => {
this.log.debug(context, `Proxy request completed`,{
class: HttpProxyHandler.LOG_CLASS,
method,
target,
path: url,
responseStatusCode: response.statusCode,
});
resolve();
});

response.pipe(res);
} else {
resolve();
});
}
}

response.pipe(res);
/* istanbul ignore else */
if (proxyConfiguration && proxyConfiguration.onBeforeResponse) {
callOptionalPromiseFunction(
() => proxyConfiguration.onBeforeResponse(res, headersToSet, context),
() => next(),
(err) => {
this.log.error(context, 'onBeforeResponse function failed', err, {
class: HttpProxyHandler.LOG_CLASS,
method,
target,
path: url,
});

reject(err);
}
)
} else {
resolve();
next();
}
}

/* istanbul ignore else */
if (proxyConfiguration && proxyConfiguration.onBeforeResponse) {
callOptionalPromiseFunction(
() => proxyConfiguration.onBeforeResponse(res, headersToSet, context),
() => next(),
(err) => {
this.log.error(context, 'onBeforeResponse function failed', err, {
class: HttpProxyHandler.LOG_CLASS,
method,
target,
path: url,
});

reject(err);
}
)
} else {
next();
}
});
});
});
} finally {
processed = true;
}
}
}
19 changes: 19 additions & 0 deletions test/HttpProxy.success.suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,25 @@ abstract class BaseHttpProxySuccessSuite {
await this.proxy.start();
}

@test()
async closeOpenProxyRequest(): Promise<void> {
await this.initProxy();

const controller = new AbortController();
let error: Error;
const promise = new FetchHelpers(this.mode, this.secure).get(`${this.proxyUrl}/hold`, {}, controller).catch(
err => error = err
)

await new Promise<void>((res) => setTimeout(() => {
controller.abort();
res();
}, 20));
await promise;

strictEqual(error.message, 'This operation was aborted');
}

@test()
async echoRequest(): Promise<void> {
let c: Record<string, any>;
Expand Down
70 changes: 48 additions & 22 deletions test/helpers/FetchHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { connect, constants } from 'node:http2';
import path = require('node:path');

export class FetchHelpers {
constructor(private mode: 'HTTP' | 'HTTP2', private secure: boolean, private repeat = 1, private delayBetweenRepeats = 0) {}
constructor(private mode: 'HTTP' | 'HTTP2', private secure: boolean, private repeat = 1, private delayBetweenRepeats = 0) { }

public fixUrl(url: string): string {
if (!this.secure) {
Expand All @@ -16,21 +16,22 @@ export class FetchHelpers {
* Make GET request
* @param url
* @param headers
* @param controller
* @returns
*/
async get(url: string, headers: Record<string, string> = {}): Promise<{
async get(url: string, headers: Record<string, string> = {}, controller?: AbortController): Promise<{
data: any,
headers: Record<string, string>,
}> {
url = this.fixUrl(url);
console.log(`-> [${this.mode}] Making GET request to ${url}`);

if (this.mode === 'HTTP') {
return await this.getHttp1(url, headers);
return await this.getHttp1(url, headers, controller);
}

if (this.mode === 'HTTP2') {
return await this.getHttp2(url, headers);
return await this.getHttp2(url, headers, controller);
}

throw new Error(`Unable to make GET request for unhandled mode ${this.mode}`);
Expand All @@ -40,29 +41,33 @@ export class FetchHelpers {
* Make HTTP/1.1 GET request
* @param url
* @param headers
* @param controller
* @returns
*/
private async getHttp1(url: string, headers: Record<string, string>): Promise<{
private async getHttp1(url: string, headers: Record<string, string>, controller?: AbortController): Promise<{
data: any,
headers: Record<string, string>,
}> {
return await this.makeHttp1Request('GET', url, headers);
return await this.makeHttp1Request('GET', url, headers, null, controller);
}

/**
* Make HTTP/2 GET request
* @param url
* @param headers
* @returns
*/
private async getHttp2(url: string, headers: Record<string, string>): Promise<{
/**
* Make HTTP/2 GET request
* @param url
* @param headers
* @param controller
* @returns
*/
private async getHttp2(url: string, headers: Record<string, string>, controller?: AbortController): Promise<{
data: any,
headers: Record<string, string>,
}> {
return await this.makeHttp2Request(
constants.HTTP2_METHOD_GET,
url,
headers,
null,
controller,
);
}

Expand All @@ -71,21 +76,22 @@ export class FetchHelpers {
* @param url
* @param data
* @param headers
* @param controller
* @returns
*/
async post(url: string, data: unknown, headers: Record<string, string> = {}): Promise<{
async post(url: string, data: unknown, headers: Record<string, string> = {}, controller?: AbortController): Promise<{
data: any,
headers: Record<string, string>,
}> {
url = this.fixUrl(url);
console.log(`-> [${this.mode}] Making POST request to ${url}`);

if (this.mode === 'HTTP') {
return await this.postHttp1(url, data, headers);
return await this.postHttp1(url, data, headers, controller);
}

if (this.mode === 'HTTP2') {
return await this.postHttp2(url, data, headers);
return await this.postHttp2(url, data, headers, controller);
}

throw new Error(`Unable to make POST request for unhandled mode ${this.mode}`);
Expand All @@ -96,39 +102,49 @@ export class FetchHelpers {
* @param url
* @param data
* @param headers
* @param AbortController
* @returns
*/
private async postHttp1(url: string, data: unknown, headers: Record<string, string>): Promise<{
private async postHttp1(url: string, data: unknown, headers: Record<string, string>, controller?: AbortController): Promise<{
data: any,
headers: Record<string, string>,
}> {
return await this.makeHttp1Request('POST', url, headers, data);
return await this.makeHttp1Request(
'POST',
url,
headers,
data,
controller,
);
}

/**
* Make HTTP/2 POST request
* @param url
* @param data
* @param headers
* @param controller
* @returns
*/
private async postHttp2(url: string, data: unknown, headers: Record<string, string>): Promise<{
private async postHttp2(url: string, data: unknown, headers: Record<string, string>, controller?: AbortController): Promise<{
data: any,
headers: Record<string, string>,
}> {
return await this.makeHttp2Request(
constants.HTTP2_METHOD_POST,
url,
headers,
data
data,
controller,
);
}

private async makeHttp1Request(method: string, url: string, headers: Record<string, string>, data?: unknown): Promise<any> {
private async makeHttp1Request(method: string, url: string, headers: Record<string, string>, data?: unknown, controller?: AbortController): Promise<any> {
try {
const makeRequest = async () => {
const response = await fetch(url, {
method,
signal: controller?.signal,
headers: {
'Connection': 'close',
'content-type': 'application/json',
Expand Down Expand Up @@ -164,7 +180,7 @@ export class FetchHelpers {
}
}

private async makeHttp2Request(method: string, url: string, headers: Record<string, string>, data?: unknown): Promise<any> {
private async makeHttp2Request(method: string, url: string, headers: Record<string, string>, data?: unknown, controller?: AbortController): Promise<any> {
const buffer = data ? Buffer.from(JSON.stringify(data)) : undefined;

return new Promise<any>((res, rej) => {
Expand Down Expand Up @@ -194,6 +210,12 @@ export class FetchHelpers {
...headers,
});

let aborted = false;
controller?.signal?.addEventListener('abort', () => {
aborted = true;
req.close();
})

let responseHeaders: Record<string, string> = {};
req.once('response', (headers, flags) => {
for (const header of Object.keys(headers)) {
Expand All @@ -217,6 +239,10 @@ export class FetchHelpers {
client.close();

try {
if (aborted && !data) {
return rej(new Error('This operation was aborted'));
}

res({
data: data ? JSON.parse(data) : undefined,
headers: responseHeaders,
Expand Down
Loading

0 comments on commit 015433b

Please sign in to comment.