Skip to content

Commit

Permalink
stream: support array of streams in promises pipeline
Browse files Browse the repository at this point in the history
Fixes: #40191

PR-URL: #40193
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
Mesteery authored and targos committed Nov 4, 2021
1 parent 1743306 commit 36d3b12
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 7 deletions.
11 changes: 4 additions & 7 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -166,17 +166,14 @@ async function pump(iterable, writable, finish) {
}

function pipeline(...streams) {
const callback = once(popCallback(streams));
return pipelineImpl(streams, once(popCallback(streams)));
}

// stream.pipeline(streams, callback)
if (ArrayIsArray(streams[0]) && streams.length === 1) {
function pipelineImpl(streams, callback, opts) {
if (streams.length === 1 && ArrayIsArray(streams[0])) {
streams = streams[0];
}

return pipelineImpl(streams, callback);
}

function pipelineImpl(streams, callback, opts) {
if (streams.length < 2) {
throw new ERR_MISSING_ARGS('streams');
}
Expand Down
41 changes: 41 additions & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -1406,3 +1406,44 @@ const tsp = require('timers/promises');
}));
ac.abort();
}

{
async function run() {
let finished = false;
let text = '';
const write = new Writable({
write(data, enc, cb) {
text += data;
cb();
}
});
write.on('finish', () => {
finished = true;
});

await pipelinep([Readable.from('Hello World!'), write]);
assert(finished);
assert.strictEqual(text, 'Hello World!');
}

run();
}

{
let finished = false;
let text = '';
const write = new Writable({
write(data, enc, cb) {
text += data;
cb();
}
});
write.on('finish', () => {
finished = true;
});

pipeline([Readable.from('Hello World!'), write], common.mustSucceed(() => {
assert(finished);
assert.strictEqual(text, 'Hello World!');
}));
}

0 comments on commit 36d3b12

Please sign in to comment.