From 3e1e31a63d75a50674106e8d25d6e8d2863461f7 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 19 Nov 2021 14:03:01 +0100 Subject: [PATCH 1/5] stream: allow readable to end early without error --- lib/internal/streams/pipeline.js | 119 ++++++++++++++------------ test/parallel/test-stream-pipeline.js | 28 +++++- 2 files changed, 91 insertions(+), 56 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index f5f489577854ac..d0f8ccf83c8e02 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -33,16 +33,13 @@ const { isIterable, isReadableNodeStream, isNodeStream, - isReadableFinished, } = require('internal/streams/utils'); const { AbortController } = require('internal/abort_controller'); let PassThrough; let Readable; -function destroyer(stream, reading, writing, callback) { - callback = once(callback); - +function destroyer(stream, reading, writing) { let finished = false; stream.on('close', () => { finished = true; @@ -50,35 +47,12 @@ function destroyer(stream, reading, writing, callback) { eos(stream, { readable: reading, writable: writing }, (err) => { finished = !err; - - const rState = stream._readableState; - if ( - err && - err.code === 'ERR_STREAM_PREMATURE_CLOSE' && - reading && - (rState && rState.ended && !rState.errored && !rState.errorEmitted) - ) { - // Some readable streams will emit 'close' before 'end'. However, since - // this is on the readable side 'end' should still be emitted if the - // stream has been ended and no error emitted. This should be allowed in - // favor of backwards compatibility. Since the stream is piped to a - // destination this should not result in any observable difference. - // We don't need to check if this is a writable premature close since - // eos will only fail with premature close on the reading side for - // duplex streams. - stream - .once('end', callback) - .once('error', callback); - } else { - callback(err); - } }); return (err) => { if (finished) return; finished = true; - destroyImpl.destroyer(stream, err); - callback(err || new ERR_STREAM_DESTROYED('pipe')); + destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe')); }; } @@ -109,7 +83,7 @@ async function* fromReadable(val) { yield* Readable.prototype[SymbolAsyncIterator].call(val); } -async function pump(iterable, writable, finish, opts) { +async function pump(iterable, writable, finish, { end }) { let error; let onresolve = null; @@ -153,7 +127,7 @@ async function pump(iterable, writable, finish, opts) { } } - if (opts?.end !== false) { + if (end) { writable.end(); } @@ -233,18 +207,19 @@ function pipelineImpl(streams, callback, opts) { if (isNodeStream(stream)) { if (end) { - finishCount++; - destroys.push(destroyer(stream, reading, writing, (err) => { - if (!err && !reading && isReadableFinished(stream, false)) { - stream.read(0); - destroyer(stream, true, writing, finish); - } else { - finish(err); - } - })); - } else { - stream.on('error', finish); + destroys.push(destroyer(stream, reading, writing)); } + + // Catch stream errors that occur after pipe/pump has completed. + stream.on('error', (err) => { + if ( + err && + err.name !== 'AbortError' && + err.code !== 'ERR_STREAM_PREMATURE_CLOSE' + ) { + finish(err); + } + }); } if (i === 0) { @@ -286,6 +261,7 @@ function pipelineImpl(streams, callback, opts) { // second use. const then = ret?.then; if (typeof then === 'function') { + finishCount++; then.call(ret, (val) => { value = val; @@ -293,8 +269,10 @@ function pipelineImpl(streams, callback, opts) { if (end) { pt.end(); } + process.nextTick(finish); }, (err) => { pt.destroy(err); + process.nextTick(finish, err); }, ); } else if (isIterable(ret, true)) { @@ -307,24 +285,18 @@ function pipelineImpl(streams, callback, opts) { ret = pt; - finishCount++; - destroys.push(destroyer(ret, false, true, finish)); + destroys.push(destroyer(ret, false, true)); } } else if (isNodeStream(stream)) { if (isReadableNodeStream(ret)) { - ret.pipe(stream, { end }); - - // Compat. Before node v10.12.0 stdio used to throw an error so - // pipe() did/does not end() stdio destinations. - // Now they allow it but "secretly" don't close the underlying fd. - if (stream === process.stdout || stream === process.stderr) { - ret.on('end', () => stream.end()); - } - } else { - ret = makeAsyncIterable(ret); - + finishCount += 2; + pipe(ret, stream, finish, { end }); + } else if (isIterable(ret)) { finishCount++; pump(ret, stream, finish, { end }); + } else { + throw new ERR_INVALID_ARG_TYPE( + 'val', ['Readable', 'Iterable', 'AsyncIterable'], ret); } ret = stream; } else { @@ -339,4 +311,43 @@ function pipelineImpl(streams, callback, opts) { return ret; } +function pipe(src, dst, finish, { end }) { + src.pipe(dst, { end }); + + if (end) { + // Compat. Before node v10.12.0 stdio used to throw an error so + // pipe() did/does not end() stdio destinations. + // Now they allow it but "secretly" don't close the underlying fd. + src.once('end', () => dst.end()); + } + + if (!end) { + finish(); + } + + eos(src, { readable: true, writable: false }, (err) => { + const rState = src._readableState; + if ( + err && + err.code === 'ERR_STREAM_PREMATURE_CLOSE' && + (rState && rState.ended && !rState.errored && !rState.errorEmitted) + ) { + // Some readable streams will emit 'close' before 'end'. However, since + // this is on the readable side 'end' should still be emitted if the + // stream has been ended and no error emitted. This should be allowed in + // favor of backwards compatibility. Since the stream is piped to a + // destination this should not result in any observable difference. + // We don't need to check if this is a writable premature close since + // eos will only fail with premature close on the reading side for + // duplex streams. + src + .once('end', finish) + .once('error', finish); + } else { + finish(err); + } + }); + eos(dst, { readable: false, writable: true }, finish); +} + module.exports = { pipelineImpl, pipeline }; diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index ae4e76352f3545..ee284277915b40 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1027,7 +1027,7 @@ const tsp = require('timers/promises'); const src = new PassThrough(); const dst = new PassThrough(); pipeline(src, dst, common.mustSucceed(() => { - assert.strictEqual(dst.destroyed, true); + assert.strictEqual(dst.destroyed, false); })); src.end(); } @@ -1462,7 +1462,7 @@ const tsp = require('timers/promises'); await pipelinePromise(read, duplex); - assert.strictEqual(duplex.destroyed, true); + assert.strictEqual(duplex.destroyed, false); } run().then(common.mustCall()); @@ -1488,3 +1488,27 @@ const tsp = require('timers/promises'); run().then(common.mustCall()); } + +{ + const s = new PassThrough({ objectMode: true }); + pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + yield 'world'; + yield 'world'; + }, s, async function(source) { + let ret = ''; + let n = 0; + for await (const chunk of source) { + if (n++ > 1) { + return; + } + ret += chunk; + } + return ret; + }, common.mustCall((err, val) => { + assert.strictEqual(err, undefined); + assert.strictEqual(val, 'helloworld'); + assert.strictEqual(s.destroyed, true); + })); +} From 5b0262bb22ca81c92f3bcddabddabb4befff53ea Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 4 Dec 2021 21:38:29 +0100 Subject: [PATCH 2/5] Update lib/internal/streams/pipeline.js Co-authored-by: Luigi Pinca --- lib/internal/streams/pipeline.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index d0f8ccf83c8e02..ff643e0c5eb15a 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -319,9 +319,7 @@ function pipe(src, dst, finish, { end }) { // pipe() did/does not end() stdio destinations. // Now they allow it but "secretly" don't close the underlying fd. src.once('end', () => dst.end()); - } - - if (!end) { + } else (!end) { finish(); } From 691e73524a9064768be98a5ba78ea6f3afb90b2d Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 4 Dec 2021 21:39:01 +0100 Subject: [PATCH 3/5] Update lib/internal/streams/pipeline.js --- lib/internal/streams/pipeline.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index ff643e0c5eb15a..e3435f734be051 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -319,7 +319,7 @@ function pipe(src, dst, finish, { end }) { // pipe() did/does not end() stdio destinations. // Now they allow it but "secretly" don't close the underlying fd. src.once('end', () => dst.end()); - } else (!end) { + } else { finish(); } From f0ec29b81563fd93c5951d046c10f804e48f060f Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 4 Dec 2021 23:13:22 +0100 Subject: [PATCH 4/5] fixup --- test/parallel/test-stream-pipeline.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index ee284277915b40..bf1cb84ecda341 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1501,7 +1501,7 @@ const tsp = require('timers/promises'); let n = 0; for await (const chunk of source) { if (n++ > 1) { - return; + break; } ret += chunk; } From cf85105f2968f33eaf7365700b957f1452fc8461 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 5 Dec 2021 10:16:07 +0100 Subject: [PATCH 5/5] fixup: nextTick callback --- lib/internal/streams/pipeline.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index e3435f734be051..623973ff316453 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -194,7 +194,7 @@ function pipelineImpl(streams, callback, opts) { ac.abort(); if (final) { - callback(error, value); + process.nextTick(callback, error, value); } }