Skip to content

Commit

Permalink
stream: Fixes missing 'unpipe' event
Browse files Browse the repository at this point in the history
Currently when the destination emits an 'error', 'finish' or 'close'
event the pipe calls unpipe to emit 'unpipe' and trigger the clean up
of all it's listeners.
When the source emits an 'end' event without {end: false} it calls
end() on the destination leading it to emit a 'close', this will again
lead to the pipe calling unpipe. However the source emitting an 'end'
event along side {end: false} is the only time the cleanup gets ran
directly without unpipe being called. This fixes that so the 'unpipe'
event does get emitted and cleanup in turn gets ran by that event.

Fixes: nodejs#11837
PR-URL: nodejs#11876
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
  • Loading branch information
zaide-chris authored and mcollina committed May 2, 2017
1 parent b1d3f59 commit 544c415
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 2 deletions.
4 changes: 2 additions & 2 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest !== process.stdout &&
dest !== process.stderr;

var endFn = doEnd ? onend : cleanup;
var endFn = doEnd ? onend : unpipe;
if (state.endEmitted)
process.nextTick(endFn);
else
Expand Down Expand Up @@ -530,7 +530,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest.removeListener('error', onerror);
dest.removeListener('unpipe', onunpipe);
src.removeListener('end', onend);
src.removeListener('end', cleanup);
src.removeListener('end', unpipe);
src.removeListener('data', ondata);

cleanedUp = true;
Expand Down
87 changes: 87 additions & 0 deletions test/parallel/test-stream-unpipe-event.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const {Writable, Readable} = require('stream');
class NullWriteable extends Writable {
_write(chunk, encoding, callback) {
return callback();
}
}
class QuickEndReadable extends Readable {
_read() {
this.push(null);
}
}
class NeverEndReadable extends Readable {
_read() {}
}

function noop() {}

{
const dest = new NullWriteable();
const src = new QuickEndReadable();
dest.on('pipe', common.mustCall(noop));
dest.on('unpipe', common.mustCall(noop));
src.pipe(dest);
setImmediate(() => {
assert.strictEqual(src._readableState.pipesCount, 0);
});
}

{
const dest = new NullWriteable();
const src = new NeverEndReadable();
dest.on('pipe', common.mustCall(noop));
dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted'));
src.pipe(dest);
setImmediate(() => {
assert.strictEqual(src._readableState.pipesCount, 1);
});
}

{
const dest = new NullWriteable();
const src = new NeverEndReadable();
dest.on('pipe', common.mustCall(noop));
dest.on('unpipe', common.mustCall(noop));
src.pipe(dest);
src.unpipe(dest);
setImmediate(() => {
assert.strictEqual(src._readableState.pipesCount, 0);
});
}

{
const dest = new NullWriteable();
const src = new QuickEndReadable();
dest.on('pipe', common.mustCall(noop));
dest.on('unpipe', common.mustCall(noop));
src.pipe(dest, {end: false});
setImmediate(() => {
assert.strictEqual(src._readableState.pipesCount, 0);
});
}

{
const dest = new NullWriteable();
const src = new NeverEndReadable();
dest.on('pipe', common.mustCall(noop));
dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted'));
src.pipe(dest, {end: false});
setImmediate(() => {
assert.strictEqual(src._readableState.pipesCount, 1);
});
}

{
const dest = new NullWriteable();
const src = new NeverEndReadable();
dest.on('pipe', common.mustCall(noop));
dest.on('unpipe', common.mustCall(noop));
src.pipe(dest, {end: false});
src.unpipe(dest);
setImmediate(() => {
assert.strictEqual(src._readableState.pipesCount, 0);
});
}

0 comments on commit 544c415

Please sign in to comment.