diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 8dc4e5792c47d8..ab7e3cbb6c8a26 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -33,6 +33,7 @@ const { isIterable, isReadableNodeStream, isNodeStream, + isReadableFinished, } = require('internal/streams/utils'); const { AbortController } = require('internal/abort_controller'); @@ -229,7 +230,14 @@ function pipelineImpl(streams, callback, opts) { if (isNodeStream(stream)) { finishCount++; - destroys.push(destroyer(stream, reading, writing, finish)); + destroys.push(destroyer(stream, reading, writing, (err) => { + if (!err && !reading && isReadableFinished(stream, false)) { + stream.read(0); + destroyer(stream, true, writing, finish); + } else { + finish(err); + } + })); } if (i === 0) { diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 061ef923d03a59..1fc3386fc16257 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1027,7 +1027,7 @@ const tsp = require('timers/promises'); const src = new PassThrough(); const dst = new PassThrough(); pipeline(src, dst, common.mustSucceed(() => { - assert.strictEqual(dst.destroyed, false); + assert.strictEqual(dst.destroyed, true); })); src.end(); } @@ -1447,3 +1447,23 @@ const tsp = require('timers/promises'); assert.strictEqual(text, 'Hello World!'); })); } + +{ + const pipelinePromise = promisify(pipeline); + + async function run() { + const read = new Readable({ + read() {} + }); + + const duplex = new PassThrough(); + + read.push(null); + + await pipelinePromise(read, duplex); + + assert.strictEqual(duplex.destroyed, true); + } + + run(); +}