Skip to content

Commit

Permalink
stream: complete pipeline with stdio
Browse files Browse the repository at this point in the history
stdio (stderr & stdout) should for compatibility
reasons not be closed/end():ed. However, this
causes pipeline with a stdio destination to
never finish. This commit fixes this issue at
a performance cost.

Refs: nodejs#7606

Fixes: nodejs#32363
  • Loading branch information
ronag committed Mar 19, 2020
1 parent a940143 commit 571e867
Showing 1 changed file with 24 additions and 8 deletions.
32 changes: 24 additions & 8 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
const {
ArrayIsArray,
SymbolAsyncIterator,
SymbolIterator
SymbolIterator,
Promise
} = primordials;

let eos;
Expand Down Expand Up @@ -147,13 +148,23 @@ async function pump(iterable, writable, finish) {
}
let error;
try {
for await (const chunk of iterable) {
if (!writable.write(chunk)) {
if (writable.destroyed) return;
await EE.once(writable, 'drain');
if (writable !== process.stdout && writable !== process.stderr) {
for await (const chunk of iterable) {
if (!writable.write(chunk)) {
if (writable.destroyed) return;
await EE.once(writable, 'drain');
}
}
writable.end();
} else {
for await (const chunk of iterable) {
await new Promise((resolve, reject) => {
writable.write(chunk, null, (err) => {
err ? reject(err) : resolve();
});
});
}
}
writable.end();
} catch (err) {
error = err;
} finally {
Expand Down Expand Up @@ -202,7 +213,9 @@ function pipeline(...streams) {
const reading = i < streams.length - 1;
const writing = i > 0;

if (isStream(stream)) {
if (stream === process.stdout || stream === process.stderr) {
// `pipe()` doesn't .end() stdout and stderr.
} else if (isStream(stream)) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, !reading, finish));
}
Expand Down Expand Up @@ -263,7 +276,10 @@ function pipeline(...streams) {
destroys.push(destroyer(ret, false, true, true, finish));
}
} else if (isStream(stream)) {
if (isReadable(ret)) {
if (isReadable(ret) &&
stream !== process.stdout &&
stream !== process.stderr
) {
ret.pipe(stream);
} else {
ret = makeAsyncIterable(ret);
Expand Down

0 comments on commit 571e867

Please sign in to comment.