diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 3f0188ef2e63ad..3588b828dc40c8 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -538,9 +538,9 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // If the user unpiped during `dest.write()`, it is possible // to get stuck in a permanently paused state if that write // also returned false. - if (state.pipesCount === 1 && - state.pipes[0] === dest && - src.listenerCount('data') === 1 && + // => Check whether `dest` is still a piping destination. + if (((state.pipesCount === 1 && state.pipes === dest) || + (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)) && !cleanedUp) { debug('false write response, pause', src._readableState.awaitDrain); src._readableState.awaitDrain++; diff --git a/test/parallel/test-stream-pipe-await-drain.js b/test/parallel/test-stream-pipe-await-drain.js new file mode 100644 index 00000000000000..0e8d3497123538 --- /dev/null +++ b/test/parallel/test-stream-pipe-await-drain.js @@ -0,0 +1,40 @@ +'use strict'; +const common = require('../common'); +const stream = require('stream'); + +// This is very similar to test-stream-pipe-cleanup-pause.js. + +const reader = new stream.Readable(); +const writer1 = new stream.Writable(); +const writer2 = new stream.Writable(); + +// 560000 is chosen here because it is larger than the (default) highWaterMark +// and will cause `.write()` to return false +// See: https://github.com/nodejs/node/issues/5820 +const buffer = Buffer(560000); + +reader._read = function(n) {}; + +writer1._write = common.mustCall(function(chunk, encoding, cb) { + this.emit('chunk-received'); + cb(); +}, 1); +writer1.once('chunk-received', function() { + setImmediate(function() { + // This one should *not* get through to writer1 because writer2 is not + // "done" processing. + reader.push(buffer); + }); +}); + +// A "slow" consumer: +writer2._write = common.mustCall(function(chunk, encoding, cb) { + // Not calling cb here to "simulate" slow stream. + + // This should be called exactly once, since the first .write() call + // will return false. +}, 1); + +reader.pipe(writer1); +reader.pipe(writer2); +reader.push(buffer);