Skip to content

Commit

Permalink
stream: propagate errors from src streams in async iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosc90 committed Dec 9, 2019
1 parent eac3f0a commit 82b60df
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
21 changes: 20 additions & 1 deletion lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ function ReadableState(options, stream, isDuplex) {
this.buffer = new BufferList();
this.length = 0;
this.pipes = [];
this.pipeSources = [];
this.flowing = null;
this.ended = false;
this.endEmitted = false;
Expand Down Expand Up @@ -698,6 +699,9 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
}
}

if (dest._readableState)
dest._readableState.pipeSources.push(this);

state.pipes.push(dest);
debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts);

Expand Down Expand Up @@ -859,6 +863,16 @@ function pipeOnDrain(src, dest) {
};
}

function unpipeSources(src, dest) {
const destState = dest._readableState;
if (!destState)
return;

const pipeSourcesIndex = destState.pipeSources.indexOf(src);
if (pipeSourcesIndex !== -1)
destState.pipeSources.splice(pipeSourcesIndex, 1);
}


Readable.prototype.unpipe = function(dest) {
const state = this._readableState;
Expand All @@ -874,11 +888,14 @@ Readable.prototype.unpipe = function(dest) {
state.pipes = [];
state.flowing = false;

for (var i = 0; i < dests.length; i++)
for (let i = 0; i < dests.length; i++) {
unpipeSources(this, dests[i]);
dests[i].emit('unpipe', this, { hasUnpiped: false });
}
return this;
}


// Try to find the right one.
const index = state.pipes.indexOf(dest);
if (index === -1)
Expand All @@ -888,6 +905,8 @@ Readable.prototype.unpipe = function(dest) {
if (state.pipes.length === 0)
state.flowing = false;

unpipeSources(this, dest);

dest.emit('unpipe', this, unpipeInfo);

return this;
Expand Down
14 changes: 14 additions & 0 deletions lib/internal/streams/async_iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ function wrapForNext(lastPromise, iter) {
};
}

function handlePipelineError(sources, current) {

if (!sources) return;

for (const stream of sources) {
const listener = (err) => current.emit('error', err);
stream.on('error', listener);
stream.on('unpipe', () => stream.off('error', listener));
handlePipelineError(stream._readableState.pipeSources, current);
}
}

const AsyncIteratorPrototype = ObjectGetPrototypeOf(
ObjectGetPrototypeOf(async function* () {}).prototype);

Expand Down Expand Up @@ -169,6 +181,8 @@ const createReadableStreamAsyncIterator = (stream) => {
});
iterator[kLastPromise] = null;

handlePipelineError(stream._readableState.pipeSources, stream);

finished(stream, { writable: false }, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
const reject = iterator[kLastReject];
Expand Down

0 comments on commit 82b60df

Please sign in to comment.