From 4640ea24bdf50c76faacd825fd211d74fbfe604c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 5 Mar 2020 23:18:23 +0100 Subject: [PATCH] stream: don't destroy final readable stream in pipeline If the last stream in a pipeline is still usable/readable don't destroy it to allow further composition. Fixes: https://github.com/nodejs/node/issues/32105 Backport-PR-URL: https://github.com/nodejs/node/pull/32111 PR-URL: https://github.com/nodejs/node/pull/32110 Reviewed-By: Matteo Collina Reviewed-By: Luigi Pinca --- lib/internal/streams/pipeline.js | 9 ++++-- test/parallel/test-stream-pipeline.js | 46 +++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index b03c4e1513925b..3e9915c24f26b7 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -36,7 +36,7 @@ function destroyStream(stream, err) { if (typeof stream.close === 'function') return stream.close(); } -function destroyer(stream, reading, writing, callback) { +function destroyer(stream, reading, writing, final, callback) { callback = once(callback); let destroyed = false; @@ -44,7 +44,10 @@ function destroyer(stream, reading, writing, callback) { eos(stream, { readable: reading, writable: writing }, (err) => { if (destroyed) return; destroyed = true; - destroyStream(stream, err); + const readable = stream.readable || isRequest(stream); + if (err || !final || !readable) { + destroyStream(stream, err); + } callback(err); }); @@ -176,7 +179,7 @@ function pipeline(...streams) { } function wrap(stream, reading, writing, final) { - destroys.push(destroyer(stream, reading, writing, (err) => { + destroys.push(destroyer(stream, reading, writing, final, (err) => { finish(err, final); })); } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 6bfa1331834968..0d90f65f5a6643 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -918,6 +918,52 @@ const { promisify } = require('util'); const dst = new PassThrough({ autoDestroy: false }); pipeline(src, dst, common.mustCall(() => { assert.strictEqual(src.destroyed, true); + assert.strictEqual(dst.destroyed, false); + })); + src.end(); +} + +{ + const server = http.createServer((req, res) => { + }); + + server.listen(0, () => { + const req = http.request({ + port: server.address().port + }); + + const body = new PassThrough(); + pipeline( + body, + req, + common.mustCall((err) => { + assert(!err); + assert(!req.res); + assert(!req.aborted); + req.abort(); + server.close(); + }) + ); + body.end(); + }); +} + +{ + const src = new PassThrough(); + const dst = new PassThrough(); + pipeline(src, dst, common.mustCall((err) => { + assert(!err); + assert.strictEqual(dst.destroyed, false); + })); + src.end(); +} + +{ + const src = new PassThrough(); + const dst = new PassThrough(); + dst.readable = false; + pipeline(src, dst, common.mustCall((err) => { + assert(!err); assert.strictEqual(dst.destroyed, true); })); src.end();