diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index ba69c673cc6b20..703b30b19b3025 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -19,6 +19,7 @@ const { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, + ERR_STREAM_DESTROYED, }, AbortError, } = require('internal/errors'); @@ -32,10 +33,8 @@ const { isIterable, isReadableNodeStream, isNodeStream, - isReadableFinished, } = require('internal/streams/utils'); const { AbortController } = require('internal/abort_controller'); -const console = require('console'); let PassThrough; let Readable; @@ -84,9 +83,7 @@ async function* fromReadable(val) { yield* Readable.prototype[SymbolAsyncIterator].call(val); } -async function pump(iterable, writable, finish, opts) { - const end = opts?.end !== false; - +async function pump(iterable, writable, finish, { end }) { let error; let onresolve = null; @@ -208,13 +205,16 @@ function pipelineImpl(streams, callback, opts) { const writing = i > 0; const end = reading || opts?.end !== false; - if (isNodeStream(stream) && end) { - destroys.push(destroyer(stream, reading, writing)); - } - if (isNodeStream(stream)) { + if (end) { + destroys.push(destroyer(stream, reading, writing)); + } eos(stream, { readable: reading, writable: writing }, (err) => { - if (err && err.name !== 'AbortError' && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { + if ( + err && + err.name !== 'AbortError' && + err.code !== 'ERR_STREAM_PREMATURE_CLOSE' + ) { finish(err); } }); @@ -310,16 +310,14 @@ function pipelineImpl(streams, callback, opts) { return ret; } -function pipe(src, dst, finish, opts) { - const end = opts?.end !== false; - - src.pipe(dst, { end }) +function pipe(src, dst, finish, { end }) { + src.pipe(dst, { end }); if (end) { // Compat. Before node v10.12.0 stdio used to throw an error so // pipe() did/does not end() stdio destinations. // Now they allow it but "secretly" don't close the underlying fd. - src.on('end', () => dst.end()); + src.once('end', () => dst.end()); } if (!end) {