Skip to content

Commit

Permalink
stream: adjust src hwm when pipelining
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Nov 22, 2021
1 parent 8ee4e67 commit 662d3e8
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 23 deletions.
4 changes: 4 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1948,6 +1948,10 @@ changes:
A module method to pipe between streams and generators forwarding errors and
properly cleaning up and provide a callback when the pipeline is complete.

Pipeline will try to use the `.read()` API when available and dynamically
adjust the `highWaterMark` of each readable stream to match the destination.
If `.read()` is not available it will fallback to use `.pipe(dst)`.

```js
const { pipeline } = require('stream');
const fs = require('fs');
Expand Down
64 changes: 56 additions & 8 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,7 @@ function pipelineImpl(streams, callback, opts) {
}
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
ret.pipe(stream, { end });

// Compat. Before node v10.12.0 stdio used to throw an error so
// pipe() did/does not end() stdio destinations.
// Now they allow it but "secretly" don't close the underlying fd.
if (stream === process.stdout || stream === process.stderr) {
ret.on('end', () => stream.end());
}
pipe(ret, stream, { end });
} else {
ret = makeAsyncIterable(ret);

Expand All @@ -339,4 +332,59 @@ function pipelineImpl(streams, callback, opts) {
return ret;
}

function pipe(src, dst, opts) {
if (typeof src.read !== 'function') {
src.pipe(dst);
return;
}

src
.on('end', end)
.on('readable', pump)
.on('error', done);
dst
.on('drain', pump)
.on('error', done);

function done() {
src
.off('end', end)
.off('readable', pump)
.off('error', done);
dst
.off('drain', pump)
.off('error', done);
}

function end() {
if (opts?.end !== false) {
dst.end();
}
done();
}

const objectMode = (
src.readableObjectMode ||
src._readableState?.objectMode ||
dst.writableObjectMode ||
dst._writableState?.objectMode
);

function pump() {
if (dst.writableNeedDrain) {
return;
}

while (true) {
const n = (!objectMode && dst.writableHighwaterMark) || undefined;
const chunk = src.read(n);
if (chunk === null || !dst.write(chunk)) {
return;
}
}
}

process.nextTick(pump);
}

module.exports = { pipelineImpl, pipeline };
18 changes: 7 additions & 11 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ const tsp = require('timers/promises');
let finished = false;
const processed = [];
const expected = [
Buffer.from('a'),
Buffer.from('b'),
Buffer.from('c'),
Buffer.from('abc'),
];

const read = new Readable({
Expand Down Expand Up @@ -217,10 +215,9 @@ const tsp = require('timers/promises');
let sent = 0;
const rs = new Readable({
read() {
if (sent++ > 10) {
return;
}
rs.push('hello');
setImmediate(() => {
rs.push('hello');
});
},
destroy: common.mustCall((err, cb) => {
cb();
Expand Down Expand Up @@ -348,8 +345,7 @@ const tsp = require('timers/promises');
};

const expected = [
Buffer.from('hello'),
Buffer.from('world'),
Buffer.from('helloworld'),
];

const rs = new Readable({
Expand Down Expand Up @@ -985,7 +981,7 @@ const tsp = require('timers/promises');
// Make sure 'close' before 'end' finishes without error
// if readable has received eof.
// Ref: https://github.com/nodejs/node/issues/29699
const r = new Readable();
const r = new Readable(({ read() {} }));
const w = new Writable({
write(chunk, encoding, cb) {
cb();
Expand Down Expand Up @@ -1350,7 +1346,7 @@ const tsp = require('timers/promises');
});
const cb = common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
assert.strictEqual(res, '012345');
assert.strictEqual(res, '01234');
assert.strictEqual(w.destroyed, true);
assert.strictEqual(r.destroyed, true);
assert.strictEqual(pipelined.destroyed, true);
Expand Down
4 changes: 1 addition & 3 deletions test/parallel/test-stream-promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ assert.strictEqual(finished, promisify(stream.finished));
let finished = false;
const processed = [];
const expected = [
Buffer.from('a'),
Buffer.from('b'),
Buffer.from('c'),
Buffer.from('abc'),
];

const read = new Readable({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class TestSource {

{
const writableStream = new WritableStream({
write: common.mustCall(2),
write: common.mustCall(),
close: common.mustCall(),
});
const writable = newStreamWritableFromWritableStream(writableStream);
Expand Down

0 comments on commit 662d3e8

Please sign in to comment.