Skip to content

Commit

Permalink
http: invoke callback with ERR_STREAM_DESTROYED if the socket is dest…
Browse files Browse the repository at this point in the history
…royed

Fixes: #36673
Refs: #29227 (comment)
  • Loading branch information
Lxxyx committed Jan 12, 2021
1 parent ec794f9 commit 2a0ecd6
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 75 deletions.
129 changes: 54 additions & 75 deletions lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ const { Buffer } = require('buffer');
const common = require('_http_common');
const checkIsHttpToken = common._checkIsHttpToken;
const checkInvalidHeaderChar = common._checkInvalidHeaderChar;
const {
defaultTriggerAsyncIdScope,
symbols: { async_id_symbol }
} = require('internal/async_hooks');
const {
codes: {
ERR_HTTP_HEADERS_SENT,
Expand Down Expand Up @@ -341,17 +337,21 @@ OutgoingMessage.prototype._send = function _send(data, encoding, callback) {
OutgoingMessage.prototype._writeRaw = _writeRaw;
function _writeRaw(data, encoding, callback) {
const conn = this.socket;
if (conn && conn.destroyed) {
// The socket was destroyed. If we're still trying to write to it,
// then we haven't gotten the 'close' event yet.
return false;
}

if (typeof encoding === 'function') {
callback = encoding;
encoding = null;
}

if (conn?.destroyed) {
if (typeof callback === 'function') {
process.nextTick(callback, new ERR_STREAM_DESTROYED('write'));
}
// The socket was destroyed. If we're still trying to write to it,
// then we haven't gotten the 'close' event yet.
return false;
}

if (conn && conn._httpMessage === this && conn.writable) {
// There might be pending data in the this.output buffer.
if (this.outputData.length) {
Expand Down Expand Up @@ -689,23 +689,6 @@ OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
return ret;
};

function onError(msg, err, callback) {
const triggerAsyncId = msg.socket ? msg.socket[async_id_symbol] : undefined;
defaultTriggerAsyncIdScope(triggerAsyncId,
process.nextTick,
emitErrorNt,
msg,
err,
callback);
}

function emitErrorNt(msg, err, callback) {
callback(err);
if (typeof msg.emit === 'function' && !msg._closed) {
msg.emit('error', err);
}
}

function write_(msg, chunk, encoding, callback, fromEnd) {
if (typeof callback !== 'function')
callback = nop;
Expand All @@ -730,11 +713,8 @@ function write_(msg, chunk, encoding, callback, fromEnd) {
}

if (err) {
if (!msg.destroyed) {
onError(msg, err, callback);
} else {
process.nextTick(callback, err);
}
process.nextTick(callback, err);
msg.destroy(err);
return false;
}

Expand Down Expand Up @@ -804,62 +784,65 @@ OutgoingMessage.prototype.addTrailers = function addTrailers(headers) {
}
};

function onFinish(outmsg) {
if (outmsg && outmsg.socket && outmsg.socket._hadError) return;
outmsg.emit('finish');
function onFinish(err) {
if (err || this.socket?._hadError) return;
this.emit('finish');
}

OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
OutgoingMessage.prototype.end = function end(chunk, encoding, cb) {
if (typeof chunk === 'function') {
callback = chunk;
cb = chunk;
chunk = null;
encoding = null;
} else if (typeof encoding === 'function') {
callback = encoding;
cb = encoding;
encoding = null;
}

if (chunk) {
if (this.finished) {
onError(this,
new ERR_STREAM_WRITE_AFTER_END(),
typeof callback !== 'function' ? nop : callback);
return this;
}
if (this.socket) {
this.socket.cork();
}

if (this.socket) {
this.socket.cork();
}
if (chunk !== null && chunk !== undefined)
this.write(chunk, encoding);

write_(this, chunk, encoding, null, true);
} else if (this.finished) {
if (typeof callback === 'function') {
if (!this.writableFinished) {
this.on('finish', callback);
} else {
callback(new ERR_STREAM_ALREADY_FINISHED('end'));
}
let err;
if (!this.finished) {
if (!this._header) {
this._contentLength = 0;
this._implicitHeader();
}
return this;
} else if (!this._header) {
if (this.socket) {
this.socket.cork();

const finish = FunctionPrototypeBind(onFinish, this);

if (this._hasBody && this.chunkedEncoding) {
this._send('0\r\n' + this._trailer + '\r\n', 'latin1', finish);
} else {
// Force a flush, HACK.
this._send('', 'latin1', finish);
}

this._contentLength = 0;
this._implicitHeader();
this.finished = true; // aka. WritableState.ended
} else if (this.writableFinished) {
err = new ERR_STREAM_ALREADY_FINISHED('end');
} else if (this.destroyed) {
err = new ERR_STREAM_DESTROYED('end');
}

if (typeof callback === 'function')
this.once('finish', callback);

const finish = FunctionPrototypeBind(onFinish, undefined, this);
if (typeof cb === 'function') {
if (err || this.writableFinished) {
process.nextTick(cb, err);
} else {
// TODO (fix): What if error? See kOnFinished in writable.js.
this.once('finish', cb);
}
}

if (this._hasBody && this.chunkedEncoding) {
this._send('0\r\n' + this._trailer + '\r\n', 'latin1', finish);
} else {
// Force a flush, HACK.
this._send('', 'latin1', finish);
if (err) {
if (this.socket) {
this.socket.uncork();
}
return this;
}

if (this.socket) {
Expand All @@ -869,14 +852,10 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
}
this[kCorked] = 0;

this.finished = true;

// There is the first message on the outgoing queue, and we've sent
// everything to the socket.
debug('outgoing message end.');
if (this.outputData.length === 0 &&
this.socket &&
this.socket._httpMessage === this) {
if (this.outputData.length === 0 && this.socket?._httpMessage === this) {
this._finish();
}

Expand Down
61 changes: 61 additions & 0 deletions test/parallel/test-http-outgoing-socket-destroyed.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
'use strict';

const common = require('../common');
const { createServer, request } = require('http');

{
const server = createServer((req, res) => {
server.close();

req.socket.destroy();

res.write('hello', common.expectsError({
code: 'ERR_STREAM_DESTROYED'
}));
});

server.listen(0, common.mustCall(() => {
const req = request({
host: 'localhost',
port: server.address().port
});

req.on('response', common.mustNotCall());
req.on('error', common.expectsError({
code: 'ECONNRESET'
}));

req.end();
}));
}

{
const server = createServer((req, res) => {
res.write('hello');
req.resume();

const onError = common.expectsError({
code: 'ERR_STREAM_DESTROYED'
});

res.on('close', () => {
res.write('world', common.mustCall((err) => {
onError(err);
server.close();
}));
});
});

server.listen(0, common.mustCall(() => {
const req = request({
host: 'localhost',
port: server.address().port
});

req.on('response', common.mustCall((res) => {
res.socket.destroy();
}));

req.end();
}));
}

0 comments on commit 2a0ecd6

Please sign in to comment.