diff --git a/doc/api/stream.md b/doc/api/stream.md index 4f242a5d519da2..068e076df4bc1f 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1948,6 +1948,10 @@ changes: A module method to pipe between streams and generators forwarding errors and properly cleaning up and provide a callback when the pipeline is complete. +Pipeline will try to use the `.read()` API when available and dynamically +adjust the `highWaterMark` of each readable stream to match the destination. +If `.read()` is not available it will fallback to use `.pipe(dst)`. + ```js const { pipeline } = require('stream'); const fs = require('fs'); diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index f5f489577854ac..3f69ab5df7a5d7 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -312,14 +312,7 @@ function pipelineImpl(streams, callback, opts) { } } else if (isNodeStream(stream)) { if (isReadableNodeStream(ret)) { - ret.pipe(stream, { end }); - - // Compat. Before node v10.12.0 stdio used to throw an error so - // pipe() did/does not end() stdio destinations. - // Now they allow it but "secretly" don't close the underlying fd. - if (stream === process.stdout || stream === process.stderr) { - ret.on('end', () => stream.end()); - } + pipe(ret, stream, { end }); } else { ret = makeAsyncIterable(ret); @@ -339,4 +332,66 @@ function pipelineImpl(streams, callback, opts) { return ret; } +function pipe(src, dst, opts) { + if (typeof src.read !== 'function') { + src.pipe(dst); + return; + } + + src + .on('end', end) + .on('readable', pump) + .on('error', done); + dst + .on('drain', pump) + .on('error', done); + + function done() { + src + .off('end', end) + .off('readable', pump) + .off('error', done); + dst + .off('drain', pump) + .off('error', done); + } + + function end() { + if (opts?.end !== false) { + dst.end(); + } + done(); + } + + const objectMode = ( + src.readableObjectMode || + src._readableState?.objectMode || + dst.writableObjectMode || + dst._writableState?.objectMode + ); + const rState = src._readableState; + const wState = dst._writableState; + + function pump() { + if (dst.writableNeedDrain) { + return; + } + + while (true) { + if (!objectMode) { + const n = (wState && wState.highWaterMark) ?? dst.writableHighwaterMark; + if (n && rState && rState.highWaterMark < n) { + rState.highWaterMark = n; + } + } + const chunk = src.read(); + if (chunk === null || !dst.write(chunk)) { + return; + } + } + } + + process.nextTick(pump); +} + module.exports = { pipelineImpl, pipeline }; diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index ae4e76352f3545..e8bbe586d87275 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -22,9 +22,7 @@ const tsp = require('timers/promises'); let finished = false; const processed = []; const expected = [ - Buffer.from('a'), - Buffer.from('b'), - Buffer.from('c'), + Buffer.from('abc'), ]; const read = new Readable({ @@ -217,10 +215,9 @@ const tsp = require('timers/promises'); let sent = 0; const rs = new Readable({ read() { - if (sent++ > 10) { - return; - } - rs.push('hello'); + setImmediate(() => { + rs.push('hello'); + }); }, destroy: common.mustCall((err, cb) => { cb(); @@ -348,8 +345,7 @@ const tsp = require('timers/promises'); }; const expected = [ - Buffer.from('hello'), - Buffer.from('world'), + Buffer.from('helloworld'), ]; const rs = new Readable({ @@ -985,7 +981,7 @@ const tsp = require('timers/promises'); // Make sure 'close' before 'end' finishes without error // if readable has received eof. // Ref: https://github.com/nodejs/node/issues/29699 - const r = new Readable(); + const r = new Readable(({ read() {} })); const w = new Writable({ write(chunk, encoding, cb) { cb(); @@ -1350,7 +1346,7 @@ const tsp = require('timers/promises'); }); const cb = common.mustCall((err) => { assert.strictEqual(err.name, 'AbortError'); - assert.strictEqual(res, '012345'); + assert.strictEqual(res, '01234'); assert.strictEqual(w.destroyed, true); assert.strictEqual(r.destroyed, true); assert.strictEqual(pipelined.destroyed, true); diff --git a/test/parallel/test-stream-promises.js b/test/parallel/test-stream-promises.js index 33bfa292720da1..2ceee5176a57ae 100644 --- a/test/parallel/test-stream-promises.js +++ b/test/parallel/test-stream-promises.js @@ -25,9 +25,7 @@ assert.strictEqual(finished, promisify(stream.finished)); let finished = false; const processed = []; const expected = [ - Buffer.from('a'), - Buffer.from('b'), - Buffer.from('c'), + Buffer.from('abc'), ]; const read = new Readable({ diff --git a/test/parallel/test-whatwg-webstreams-adapters-to-streamwritable.js b/test/parallel/test-whatwg-webstreams-adapters-to-streamwritable.js index 8da73b4fe9f0c3..c754e08c307d3e 100644 --- a/test/parallel/test-whatwg-webstreams-adapters-to-streamwritable.js +++ b/test/parallel/test-whatwg-webstreams-adapters-to-streamwritable.js @@ -214,7 +214,7 @@ class TestSource { { const writableStream = new WritableStream({ - write: common.mustCall(2), + write: common.mustCall(), close: common.mustCall(), }); const writable = newStreamWritableFromWritableStream(writableStream);