Skip to content

Commit

Permalink
Revert "stream: prevent 'end' to be emitted after 'error'"
Browse files Browse the repository at this point in the history
This reverts commit 0857790.

PR-URL: #20449
Fixes: #20334
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
mscdex authored and MylesBorins committed May 8, 2018
1 parent aef4f80 commit c06821a
Show file tree
Hide file tree
Showing 17 changed files with 20 additions and 76 deletions.
16 changes: 5 additions & 11 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,6 @@ function ReadableState(options, stream, isDuplex) {
this.endEmitted = false;
this.reading = false;

// Flipped if an 'error' is emitted.
this.errorEmitted = false;

// a flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
Expand Down Expand Up @@ -1072,23 +1069,20 @@ function fromList(n, state) {
function endReadable(stream) {
var state = stream._readableState;

debug('endReadable', state.endEmitted, state.errorEmitted);
if (!state.endEmitted && !state.errorEmitted) {
debug('endReadable', state.endEmitted);
if (!state.endEmitted) {
state.ended = true;
process.nextTick(endReadableNT, state, stream);
}
}

function endReadableNT(state, stream) {
debug('endReadableNT', state.endEmitted, state.length, state.errorEmitted);
debug('endReadableNT', state.endEmitted, state.length);

// Check that we didn't get one last unshift.
if (!state.endEmitted && state.length === 0) {
state.endEmitted = true;
stream.readable = false;

if (!state.errorEmitted) {
state.endEmitted = true;
stream.emit('end');
}
stream.emit('end');
}
}
10 changes: 0 additions & 10 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -424,22 +424,12 @@ function onwriteError(stream, state, sync, er, cb) {
// this can emit finish, and it will always happen
// after error
process.nextTick(finishMaybe, stream, state);

// needed for duplex, fixes https://github.com/nodejs/node/issues/6083
if (stream._readableState) {
stream._readableState.errorEmitted = true;
}
stream._writableState.errorEmitted = true;
stream.emit('error', er);
} else {
// the caller expect this to happen before if
// it is async
cb(er);

// needed for duplex, fixes https://github.com/nodejs/node/issues/6083
if (stream._readableState) {
stream._readableState.errorEmitted = true;
}
stream._writableState.errorEmitted = true;
stream.emit('error', er);
// this can emit finish, but finish must
Expand Down
14 changes: 2 additions & 12 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@ function destroy(err, cb) {
this._writableState.destroyed;

if (readableDestroyed || writableDestroyed) {
const readableErrored = this._readableState &&
this._readableState.errorEmitted;
const writableErrored = this._writableState &&
this._writableState.errorEmitted;

if (cb) {
cb(err);
} else if (err && !readableErrored && !writableErrored) {
} else if (err &&
(!this._writableState || !this._writableState.errorEmitted)) {
process.nextTick(emitErrorNT, this, err);
}
return this;
Expand All @@ -36,11 +32,6 @@ function destroy(err, cb) {
this._destroy(err || null, (err) => {
if (!cb && err) {
process.nextTick(emitErrorAndCloseNT, this, err);

if (this._readableState) {
this._readableState.errorEmitted = true;
}

if (this._writableState) {
this._writableState.errorEmitted = true;
}
Expand Down Expand Up @@ -74,7 +65,6 @@ function undestroy() {
this._readableState.reading = false;
this._readableState.ended = false;
this._readableState.endEmitted = false;
this._readableState.errorEmitted = false;
}

if (this._writableState) {
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-http2-client-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ const Countdown = require('../common/countdown');
});

req.resume();
req.on('end', common.mustCall());
req.on('close', common.mustCall(() => server.close()));
}));
}
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-http2-client-onconnect-errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ function runTest(test) {
});
}

req.on('end', common.mustCall());
req.on('close', common.mustCall(() => {
client.destroy();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ server.listen(0, common.mustCall(() => {

req.on('response', common.mustNotCall());
req.resume();
req.on('end', common.mustCall());
}));
2 changes: 2 additions & 0 deletions test/parallel/test-http2-compat-serverresponse-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ server.listen(0, common.mustCall(() => {
req.on('close', common.mustCall(() => countdown.dec()));

req.resume();
req.on('end', common.mustCall());
}

{
Expand All @@ -77,5 +78,6 @@ server.listen(0, common.mustCall(() => {
req.on('close', common.mustCall(() => countdown.dec()));

req.resume();
req.on('end', common.mustCall());
}
}));
1 change: 1 addition & 0 deletions test/parallel/test-http2-max-concurrent-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ server.listen(0, common.mustCall(() => {
req.on('aborted', common.mustCall());
req.on('response', common.mustNotCall());
req.resume();
req.on('end', common.mustCall());
req.on('close', common.mustCall(() => countdown.dec()));
req.on('error', common.expectsError({
code: 'ERR_HTTP2_STREAM_ERROR',
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-http2-misused-pseudoheaders.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ server.listen(0, common.mustCall(() => {

req.on('response', common.mustCall());
req.resume();
req.on('end', common.mustCall());
req.on('close', common.mustCall(() => {
server.close();
client.close();
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-http2-multi-content-length.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ server.listen(0, common.mustCall(() => {
// header to be set for non-payload bearing requests...
const req = client.request({ 'content-length': 1 });
req.resume();
req.on('end', common.mustCall());
req.on('close', common.mustCall(() => countdown.dec()));
req.on('error', common.expectsError({
code: 'ERR_HTTP2_STREAM_ERROR',
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-http2-respond-file-fd-invalid.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ server.listen(0, () => {
req.on('response', common.mustCall());
req.on('error', common.mustCall(errorCheck));
req.on('data', common.mustNotCall());
req.on('close', common.mustCall(() => {
req.on('end', common.mustCall(() => {
assert.strictEqual(req.rstCode, NGHTTP2_INTERNAL_ERROR);
client.close();
server.close();
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-http2-respond-nghttperrors.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ function runTest(test) {
req.resume();
req.end();

req.on('close', common.mustCall(() => {
req.on('end', common.mustCall(() => {
client.close();

if (!tests.length) {
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-http2-respond-with-fd-errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ function runTest(test) {
req.resume();
req.end();

req.on('close', common.mustCall(() => {
req.on('end', common.mustCall(() => {
client.close();

if (!tests.length) {
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-http2-server-shutdown-before-respond.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ server.on('listening', common.mustCall(() => {
}));
req.resume();
req.on('data', common.mustNotCall());
req.on('close', common.mustCall(() => server.close()));
req.on('end', common.mustCall(() => server.close()));
}));
1 change: 1 addition & 0 deletions test/parallel/test-http2-server-socket-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,5 @@ server.on('listening', common.mustCall(() => {

req.on('aborted', common.mustCall());
req.resume();
req.on('end', common.mustCall());
}));
24 changes: 0 additions & 24 deletions test/parallel/test-stream-duplex-error-write.js

This file was deleted.

15 changes: 0 additions & 15 deletions test/parallel/test-stream-readable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,3 @@ const { inherits } = require('util');
read.push('hi');
read.on('data', common.mustNotCall());
}

{
// double error case
const read = new Readable({
read() {}
});

read.on('close', common.mustCall());
read.on('error', common.mustCall());

read.destroy(new Error('kaboom 1'));
read.destroy(new Error('kaboom 2'));
assert.strictEqual(read._readableState.errorEmitted, true);
assert.strictEqual(read.destroyed, true);
}

0 comments on commit c06821a

Please sign in to comment.