diff --git a/doc/api/errors.md b/doc/api/errors.md index dde490c29b477c..4d3829f78e7b95 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -2621,6 +2621,12 @@ or a pipeline ends non gracefully with no explicit error. An attempt was made to call [`stream.push()`][] after a `null`(EOF) had been pushed to the stream. + + +### `ERR_STREAM_UNABLE_TO_PIPE` + +An attempt was made to pipe to a closed or destroyed stream in a pipeline. + ### `ERR_STREAM_UNSHIFT_AFTER_END_EVENT` diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 8ab4a63a0f7110..f78be397d5dce2 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -1716,6 +1716,7 @@ E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error); E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError); E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error); E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error); +E('ERR_STREAM_UNABLE_TO_PIPE', 'Connot pipe to a closed or destroyed stream', Error); E('ERR_STREAM_UNSHIFT_AFTER_END_EVENT', 'stream.unshift() after end event', Error); E('ERR_STREAM_WRAP', 'Stream has StringDecoder set or is in objectMode', Error); diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index edf1d37f9fe3bf..bb34759b1fea12 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -23,6 +23,7 @@ const { ERR_MISSING_ARGS, ERR_STREAM_DESTROYED, ERR_STREAM_PREMATURE_CLOSE, + ERR_STREAM_UNABLE_TO_PIPE, }, } = require('internal/errors'); @@ -253,10 +254,15 @@ function pipelineImpl(streams, callback, opts) { const stream = streams[i]; const reading = i < streams.length - 1; const writing = i > 0; + const next = i + 1 < streams.length ? streams[i + 1] : null; const end = reading || opts?.end !== false; const isLastStream = i === streams.length - 1; if (isNodeStream(stream)) { + if (next !== null && (next?.closed || next?.destroyed)) { + throw new ERR_STREAM_UNABLE_TO_PIPE(); + } + if (end) { const { destroy, cleanup } = destroyer(stream, reading, writing); destroys.push(destroy); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 8237fff33b3ac8..7e69754b36d771 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -17,6 +17,8 @@ const http = require('http'); const { promisify } = require('util'); const net = require('net'); const tsp = require('timers/promises'); +const tmpdir = require('../common/tmpdir'); +const fs = require('fs'); { let finished = false; @@ -69,6 +71,17 @@ const tsp = require('timers/promises'); }, /ERR_INVALID_ARG_TYPE/); } +tmpdir.refresh(); +{ + assert.rejects(async () => { + const read = fs.createReadStream(__filename); + const write = fs.createWriteStream(tmpdir.resolve('a')); + const close = promisify(write.close); + await close.call(write); + await pipelinep(read, write); + }, /ERR_STREAM_UNABLE_TO_PIPE/).then(common.mustCall()); +} + { const read = new Readable({ read() {}