Skip to content

Commit

Permalink
stream: manual destroy IncomingRequest on pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
RafaelGSS committed Dec 10, 2021
1 parent 32f7218 commit e13d9ef
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 1 deletion.
9 changes: 9 additions & 0 deletions lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ const onResponseFinishChannel = dc.channel('http.server.response.finish');
const kServerResponse = Symbol('ServerResponse');
const kServerResponseStatistics = Symbol('ServerResponseStatistics');

const {
kManualDestroy,
kPipelineStream
} = require('internal/streams/utils');

const {
hasObserver,
} = require('internal/perf/observe');
Expand Down Expand Up @@ -234,6 +239,10 @@ function onServerResponseClose() {
}
}

ServerResponse.prototype[kPipelineStream] = function() {
this[kManualDestroy] = true;
};

ServerResponse.prototype.assignSocket = function assignSocket(socket) {
assert(!socket._httpMessage);
socket._httpMessage = this;
Expand Down
10 changes: 9 additions & 1 deletion lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const {
isIterable,
isReadableNodeStream,
isNodeStream,
kManualDestroy,
kPipelineStream,
} = require('internal/streams/utils');
const { AbortController } = require('internal/abort_controller');

Expand All @@ -52,7 +54,9 @@ function destroyer(stream, reading, writing) {
return (err) => {
if (finished) return;
finished = true;
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
if (stream[kManualDestroy] !== true) {
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
}
};
}

Expand Down Expand Up @@ -205,6 +209,10 @@ function pipelineImpl(streams, callback, opts) {
const writing = i > 0;
const end = reading || opts?.end !== false;

if (stream[kPipelineStream]) {
stream[kPipelineStream]();
}

if (isNodeStream(stream)) {
if (end) {
destroys.push(destroyer(stream, reading, writing));
Expand Down
4 changes: 4 additions & 0 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ const {

const kDestroyed = Symbol('kDestroyed');
const kIsDisturbed = Symbol('kIsDisturbed');
const kManualDestroy = Symbol('kManualDestroy');
const kPipelineStream = Symbol('kPipelineStream');

function isReadableNodeStream(obj, strict = false) {
return !!(
Expand Down Expand Up @@ -247,6 +249,8 @@ function isDisturbed(stream) {

module.exports = {
kDestroyed,
kManualDestroy,
kPipelineStream,
isDisturbed,
kIsDisturbed,
isClosed,
Expand Down
110 changes: 110 additions & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const http = require('http');
const { promisify } = require('util');
const net = require('net');
const tsp = require('timers/promises');
const fs = require('fs');

{
let finished = false;
Expand Down Expand Up @@ -1512,3 +1513,112 @@ const tsp = require('timers/promises');
assert.strictEqual(s.destroyed, true);
}));
}

{
const r = new Readable({
read() {}
});
r.push('hello');
r.push('world');
r.push(null);
let res = '';
const w = new Writable({
write(chunk, encoding, callback) {
res += chunk;
callback();
}
});
pipeline([r, w], common.mustCall((err) => {
assert.ok(r.destroyed)
assert.ok(w.destroyed)
assert.ok(!err);
assert.strictEqual(res, 'helloworld');
}));
}

{
const r = new Readable({
read() {}
});
r.push('hello');
r.push('world');
r.push(null);
let res = '';
const w = new Writable({
write(chunk, encoding, callback) {
res += chunk;
callback();
}
});
pipeline([r, w], common.mustCall((err) => {
assert.ok(r.destroyed)
assert.ok(w.destroyed)
assert.ok(!err);
assert.strictEqual(res, 'helloworld');
}));
}

// When occurs an error in the pipeline the IncomingRequest should not destroy the connection automatically.
{
const server = http.createServer(common.mustCall((req, res) => {
const r = fs.createReadStream('./notfound');
pipeline(r, res, common.mustCall((err) => {
assert.ok(res.destroyed === false);
assert.ok(r.destroyed);
assert.strictEqual(err.code, 'ENOENT');
assert.strictEqual(err.message,
'ENOENT: no such file or directory, ' +
'open \'./notfound\'');
res.end(err.message);
}));
}));

server.listen(0, common.mustCall(() => {
http.request({
port: server.address().port
}, common.mustCall((res) => {
res.setEncoding('utf8');
let responseData = '';
res.on('data', (chunk) => { responseData += chunk; });
res.on('end', common.mustCall(() => {
assert.strictEqual(responseData,
'ENOENT: no such file or directory, ' +
'open \'./notfound\'');
setImmediate(() => {
res.destroy();
server.close();
});
}));
})).end();
}));
}

// Should close the IncomingRequest stream automatically when no error occurs
{
const server = http.createServer(common.mustCall((req, res) => {
const r = fs.createReadStream(__filename);
pipeline(r, res, common.mustCall((err) => {
assert.ok(res.destroyed);
assert.ok(r.destroyed);
assert.ok(err === undefined)
}));
}));

server.listen(0, common.mustCall(() => {
http.request({
port: server.address().port
}, common.mustCall((res) => {
res.setEncoding('utf8');
let responseData = '';
res.on('data', (chunk) => { responseData += chunk; });
res.on('end', common.mustCall(() => {
const data = fs.readFileSync(__filename)
assert.strictEqual(responseData, data.toString());
setImmediate(() => {
res.destroy();
server.close();
});
}));
})).end();
}));
}

0 comments on commit e13d9ef

Please sign in to comment.