Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: adjust src hwm when pipelining #40751

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1948,6 +1948,10 @@ changes:
A module method to pipe between streams and generators forwarding errors and
properly cleaning up and provide a callback when the pipeline is complete.

Pipeline will try to use the `.read()` API when available and dynamically
adjust the `highWaterMark` of each readable stream to match the destination.
If `.read()` is not available it will fallback to use `.pipe(dst)`.

```js
const { pipeline } = require('stream');
const fs = require('fs');
Expand Down
71 changes: 63 additions & 8 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,7 @@ function pipelineImpl(streams, callback, opts) {
}
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
ret.pipe(stream, { end });

// Compat. Before node v10.12.0 stdio used to throw an error so
// pipe() did/does not end() stdio destinations.
// Now they allow it but "secretly" don't close the underlying fd.
if (stream === process.stdout || stream === process.stderr) {
ret.on('end', () => stream.end());
}
pipe(ret, stream, { end });
} else {
ret = makeAsyncIterable(ret);

Expand All @@ -339,4 +332,66 @@ function pipelineImpl(streams, callback, opts) {
return ret;
}

function pipe(src, dst, opts) {
if (typeof src.read !== 'function') {
src.pipe(dst);
return;
}

src
.on('end', end)
.on('readable', pump)
.on('error', done);
dst
.on('drain', pump)
.on('error', done);

function done() {
src
.off('end', end)
.off('readable', pump)
.off('error', done);
dst
.off('drain', pump)
.off('error', done);
}

function end() {
if (opts?.end !== false) {
dst.end();
}
done();
}

const objectMode = (
src.readableObjectMode ||
src._readableState?.objectMode ||
dst.writableObjectMode ||
dst._writableState?.objectMode
);
const rState = src._readableState;
const wState = dst._writableState;

function pump() {
if (dst.writableNeedDrain) {
return;
}

while (true) {
if (!objectMode) {
const n = (wState && wState.highWaterMark) ?? dst.writableHighwaterMark;
if (n && rState && rState.highWaterMark < n) {
rState.highWaterMark = n;
}
}
const chunk = src.read();
if (chunk === null || !dst.write(chunk)) {
return;
}
}
}

process.nextTick(pump);
}

module.exports = { pipelineImpl, pipeline };
18 changes: 7 additions & 11 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ const tsp = require('timers/promises');
let finished = false;
const processed = [];
const expected = [
Buffer.from('a'),
Buffer.from('b'),
Buffer.from('c'),
Buffer.from('abc'),
];

const read = new Readable({
Expand Down Expand Up @@ -217,10 +215,9 @@ const tsp = require('timers/promises');
let sent = 0;
const rs = new Readable({
read() {
if (sent++ > 10) {
return;
}
rs.push('hello');
setImmediate(() => {
rs.push('hello');
});
},
destroy: common.mustCall((err, cb) => {
cb();
Expand Down Expand Up @@ -348,8 +345,7 @@ const tsp = require('timers/promises');
};

const expected = [
Buffer.from('hello'),
Buffer.from('world'),
Buffer.from('helloworld'),
];

const rs = new Readable({
Expand Down Expand Up @@ -985,7 +981,7 @@ const tsp = require('timers/promises');
// Make sure 'close' before 'end' finishes without error
// if readable has received eof.
// Ref: https://github.com/nodejs/node/issues/29699
const r = new Readable();
const r = new Readable(({ read() {} }));
const w = new Writable({
write(chunk, encoding, cb) {
cb();
Expand Down Expand Up @@ -1350,7 +1346,7 @@ const tsp = require('timers/promises');
});
const cb = common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
assert.strictEqual(res, '012345');
assert.strictEqual(res, '01234');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a timing difference due to how streams are resumed with 'readable' event.

assert.strictEqual(w.destroyed, true);
assert.strictEqual(r.destroyed, true);
assert.strictEqual(pipelined.destroyed, true);
Expand Down
4 changes: 1 addition & 3 deletions test/parallel/test-stream-promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ assert.strictEqual(finished, promisify(stream.finished));
let finished = false;
const processed = [];
const expected = [
Buffer.from('a'),
Buffer.from('b'),
Buffer.from('c'),
Buffer.from('abc'),
];

const read = new Readable({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class TestSource {

{
const writableStream = new WritableStream({
write: common.mustCall(2),
write: common.mustCall(),
close: common.mustCall(),
});
const writable = newStreamWritableFromWritableStream(writableStream);
Expand Down