Skip to content

Commit

Permalink
feat: improve event handling
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-tkachenko committed Dec 11, 2023
1 parent 8260efa commit 153744e
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 59 deletions.
23 changes: 17 additions & 6 deletions src/Prxi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ export class Prxi {
// cancel
(status: number, description: string) => {
this.logError(`[${requestId}] [Prxi] cancel websocket request with ${status}: ${description}`);
Prxi.closeSocket(req, socket, status, description, headersToSet);
this.closeSocket(req, socket, status, description, headersToSet);
},
path,
context
Expand All @@ -354,7 +354,7 @@ export class Prxi {
})
.catch(err => {
this.logError(`[${requestId}] [Prxi] Unable to handle websocket request`, err);
Prxi.closeSocket(req, socket, 500, 'Unexpected error ocurred', headersToSet);
this.closeSocket(req, socket, 500, 'Unexpected error ocurred', headersToSet);
});
} else {
/* istanbul ignore next */
Expand All @@ -366,7 +366,7 @@ export class Prxi {
this.configuration.responseHeaders,
);

Prxi.closeSocket(req, socket, 405, 'Upgrade could not be processed', headersToSet);
this.closeSocket(req, socket, 405, 'Upgrade could not be processed', headersToSet);
}
});

Expand Down Expand Up @@ -455,9 +455,20 @@ export class Prxi {
* @param description
* @param headers
*/
private static closeSocket(req: IncomingMessage, socket: Socket, status: number, message: string, headers: OutgoingHttpHeaders): void {
socket.write(WebSocketUtils.prepareRawHeadersString(`HTTP/${req.httpVersion} ${status} ${message}`, headers));
socket.destroy();
private closeSocket(req: IncomingMessage, socket: Socket, status: number, message: string, headers: OutgoingHttpHeaders): void {
try {
socket.write(WebSocketUtils.prepareRawHeadersString(`HTTP/${req.httpVersion} ${status} ${message}`, headers), (err) => {
/* istanbul ignore next */
if (err) {
this.logError(`Prxi can't write upon socket closure`, err);
}

socket.destroy();
});
} catch (e) {
/* istanbul ignore next */
this.logError(`Prxi can't close socket`, e);
}
}

/**
Expand Down
36 changes: 16 additions & 20 deletions src/handlers/Http2ProxyHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ export class Http2ProxyHandler {
connection = connect(target);
this.connections.set(session, connection);

connection.on('close', () => {
connection.once('close', () => {
this.closeConnection(session, connection);
});

session.on('close', () => {
session.once('close', () => {
this.closeConnection(session, connection);
});

Expand Down Expand Up @@ -114,21 +114,21 @@ export class Http2ProxyHandler {
reject(err);
});

proxyReq.on('response', (headers, flags) => {
const headersToSet = RequestUtils.prepareProxyHeaders(
headers,
this.configuration.responseHeaders,
this.upstream.responseHeaders,
// istanbul ignore next
proxyConfiguration?.proxyResponseHeaders
);

/* istanbul ignore else */
if (proxyConfiguration && proxyConfiguration.onBeforeResponse) {
proxyConfiguration.onBeforeResponse(null, headersToSet, context);
}

proxyReq.once('response', (headers, flags) => {
try {
const headersToSet = RequestUtils.prepareProxyHeaders(
headers,
this.configuration.responseHeaders,
this.upstream.responseHeaders,
// istanbul ignore next
proxyConfiguration?.proxyResponseHeaders
);

/* istanbul ignore else */
if (proxyConfiguration && proxyConfiguration.onBeforeResponse) {
proxyConfiguration.onBeforeResponse(null, headersToSet, context);
}

/* istanbul ignore else */
if (!stream.closed) {
stream.respond(headersToSet);
Expand All @@ -142,10 +142,6 @@ export class Http2ProxyHandler {
}
});

proxyReq.on('error', (err) => {
this.logError(`[${requestId}] [Http2ProxyHandler] HTTP/2 stream error`, err);
});

proxyReq.once('end', () => {
resolve();
});
Expand Down
6 changes: 3 additions & 3 deletions src/handlers/HttpProxyHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ export class HttpProxyHandler {
await new Promise<void>((resolve, reject) => {
req.pipe(client);

client.on('error', (err) => {
client.once('error', (err) => {
this.logInfo(`[${requestId}] [HttpProxyHandler] Proxy request failed for method ${method} to ${host}:${port}${url}, error: ${err.message}`);
reject(err);
});

client.on('response', (response: IncomingMessage) => {
client.once('response', (response: IncomingMessage) => {
this.logInfo(`[${requestId}] [HttpProxyHandler] Response received for method ${method} to ${host}:${port}${url}, status code ${response.statusCode}`);
if (isKeepAliveRequest) {
client.setTimeout(0);
Expand All @@ -104,7 +104,7 @@ export class HttpProxyHandler {

// istanbul ignore else
if (!res.writableEnded) {
response.on('end', () => {
response.once('end', () => {
this.logInfo(`[${requestId}] [HttpProxyHandler] Proxy request with method ${method} to ${host}:${port}${url} completed`);
resolve();
});
Expand Down
16 changes: 6 additions & 10 deletions src/handlers/WebSocketProxyHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,14 @@ export class WebSocketProxyHandler {
req.pipe(client);

let ps: Socket = null;
client.on('error', (err) => {
client.once('error', (err) => {
// istanbul ignore next
ps?.destroy();
socket.end();

reject(err);
});

client.on('response', (res: IncomingMessage) => {
client.once('response', (res: IncomingMessage) => {
this.logInfo(`[${requestId}] [WebSocketProxyHandler] Received response`);

// istanbul ignore else
Expand All @@ -126,21 +125,20 @@ export class WebSocketProxyHandler {
}
});

client.on('upgrade', (proxyResponse: IncomingMessage, proxySocket: Socket, proxyHead: Buffer) => {
client.once('upgrade', (proxyResponse: IncomingMessage, proxySocket: Socket, proxyHead: Buffer) => {
WebSocketProxyHandler.debug.upstreamSocket = proxySocket;

ps = proxySocket;
this.logInfo(`[${requestId}] [WebSocketProxyHandler] Upgrade received`);

proxySocket.on('error', (err) => {
proxySocket.once('error', (err) => {
this.logError(`[${requestId}] [WebSocketProxyHandler] ProxySocket error`, err);
ps.destroy();
socket.end();

reject(err);
});

proxySocket.on('end', () => {
proxySocket.once('end', () => {
this.logInfo(`[${requestId}] [WebSocketProxyHandler] ProxySocket end`);
resolve();
});
Expand All @@ -152,11 +150,9 @@ export class WebSocketProxyHandler {
}

// end proxy socket when incoming fails
socket.on('error', (err) => {
socket.once('error', (err) => {
this.logError(`[${requestId}] [WebSocketProxyHandler] Socket error`, err);

ps.destroy();
socket.end();

reject(err);
});
Expand Down
24 changes: 12 additions & 12 deletions test/HttpProxy.errors.suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ abstract class BaseHttpProxyErrorSuite {
});

const err = await assertReject(new Promise<void>((res, rej) => {
sio.on('connect_error', (err) => {
sio.once('connect_error', (err) => {
rej(err);
});

Expand Down Expand Up @@ -222,7 +222,7 @@ abstract class BaseHttpProxyErrorSuite {
rej(new Error('Unable to connect to WS'));
}, 2000);

sio.on('connect_error', (err) => {
sio.once('connect_error', (err) => {
clearTimeout(t);
rej(err);
});
Expand Down Expand Up @@ -255,16 +255,16 @@ abstract class BaseHttpProxyErrorSuite {
rej(new Error('Unable to connect to WS'));
}, 2000);

sio.on('connect', () => {
sio.once('connect', () => {
WebSocketProxyHandler.debug.upstreamRequest.emit('error', new Error('Upstream fake error'));
});

sio.on('disconnect', (reason) => {
sio.once('disconnect', (reason) => {
rej(new Error(reason));
})
}));

strictEqual(err.message, `transport close`);
strictEqual(err.message, `transport error`);
}

@test()
Expand All @@ -291,16 +291,16 @@ abstract class BaseHttpProxyErrorSuite {
rej(new Error('Unable to connect to WS'));
}, 2000);

sio.on('connect', () => {
sio.once('connect', () => {
WebSocketProxyHandler.debug.upstreamSocket.emit('error', new Error('Proxy fake error'));
});

sio.on('disconnect', (reason) => {
sio.once('disconnect', (reason) => {
rej(new Error(reason));
})
}));

strictEqual(err.message, `transport close`);
strictEqual(err.message, `transport error`);
}

@test()
Expand All @@ -327,16 +327,16 @@ abstract class BaseHttpProxyErrorSuite {
rej(new Error('Unable to connect to WS'));
}, 2000);

sio.on('connect', () => {
sio.once('connect', () => {
WebSocketProxyHandler.debug.incomingSocket.emit('error', new Error('Proxy fake error'));
});

sio.on('disconnect', (reason) => {
sio.once('disconnect', (reason) => {
rej(new Error(reason));
})
}));

strictEqual(err.message, `transport close`);
strictEqual(err.message, `transport error`);
}

@test()
Expand All @@ -363,7 +363,7 @@ abstract class BaseHttpProxyErrorSuite {
});

const err = await assertReject(new Promise<void>((res, rej) => {
sio.on('connect_error', (err) => {
sio.once('connect_error', (err) => {
rej(err);
});

Expand Down
8 changes: 4 additions & 4 deletions test/HttpProxy.success.suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,12 @@ abstract class BaseHttpProxySuccessSuite {
rej(new Error('Unable to connect to WS'));
}, 2000);

sio.on('connect_error', (err) => {
sio.once('connect_error', (err) => {
console.error('connection error', err);
});

sio.on('connect', () => {
sio.on('echo', (msg: string) => {
sio.once('connect', () => {
sio.once('echo', (msg: string) => {
received = msg;
sio.disconnect();
clearTimeout(timeout);
Expand Down Expand Up @@ -319,7 +319,7 @@ abstract class BaseHttpProxySuccessSuite {
rej(new Error('Unable to connect to WS'));
}, 2000);

sio.on('connect_error', (err) => {
sio.once('connect_error', (err) => {
rej(err);
});
}));
Expand Down
2 changes: 1 addition & 1 deletion test/helpers/FetchHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ export class FetchHelpers {
const { origin, pathname, search } = new URL(url);
let client = connect(origin);

client.on('close', () => {
client.once('close', () => {
console.log(`-> Connection closed (${count + 1} / ${this.repeat})`);
})

Expand Down
6 changes: 3 additions & 3 deletions test/helpers/TestServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ export class TestServer {
data += chunk;
});

stream.on('end', () => {
stream.once('end', () => {
process(JSON.parse(data));
});
} else {
Expand All @@ -166,7 +166,7 @@ export class TestServer {
socket.emit('echo', msg);
});

socket.on('disconnect', () => {
socket.once('disconnect', () => {
console.log('Socket.IO disconnected');
});

Expand Down Expand Up @@ -223,7 +223,7 @@ export class TestServer {
req.on('data', chunk => {
chunks.push(Buffer.from(chunk));
})
req.on('end', () => {
req.once('end', () => {
writeJson(res, Buffer.concat(chunks).toString('utf-8'));
})
}
Expand Down

0 comments on commit 153744e

Please sign in to comment.