diff --git a/lib/_http_client.js b/lib/_http_client.js index c7956d6eb57fb1..cea02eefa30763 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -30,7 +30,6 @@ const { _checkIsHttpToken: checkIsHttpToken, debug, freeParser, - httpSocketSetup, parsers, HTTPParser, prepareError, @@ -40,7 +39,7 @@ const Agent = require('_http_agent'); const { Buffer } = require('buffer'); const { defaultTriggerAsyncIdScope } = require('internal/async_hooks'); const { URL, urlToOptions, searchParamsSymbol } = require('internal/url'); -const { kOutHeaders, ondrain } = require('internal/http'); +const { kOutHeaders, kNeedDrain } = require('internal/http'); const { connResetException, codes } = require('internal/errors'); const { ERR_HTTP_HEADERS_SENT, @@ -335,6 +334,14 @@ function emitAbortNT() { this.emit('abort'); } +function ondrain() { + const msg = this._httpMessage; + if (msg && !msg.finished && msg[kNeedDrain]) { + msg[kNeedDrain] = false; + msg.emit('drain'); + } +} + function socketCloseListener() { const socket = this; const req = socket._httpMessage; @@ -649,9 +656,6 @@ function tickOnSocket(req, socket) { socket.parser = parser; socket._httpMessage = req; - // Setup "drain" propagation. - httpSocketSetup(socket); - // Propagate headers limit from request object to parser if (typeof req.maxHeadersCount === 'number') { parser.maxHeaderPairs = req.maxHeadersCount << 1; @@ -663,6 +667,7 @@ function tickOnSocket(req, socket) { socket.on('data', socketOnData); socket.on('end', socketOnEnd); socket.on('close', socketCloseListener); + socket.on('drain', ondrain); if ( req.timeout !== undefined || diff --git a/lib/_http_common.js b/lib/_http_common.js index b3aeb0721ae58e..c271ec54eb3ab2 100644 --- a/lib/_http_common.js +++ b/lib/_http_common.js @@ -31,7 +31,6 @@ const { methods, HTTPParser } = internalBinding('http_parser') : internalBinding('http_parser_llhttp'); const FreeList = require('internal/freelist'); -const { ondrain } = require('internal/http'); const incoming = require('_http_incoming'); const { IncomingMessage, @@ -201,12 +200,6 @@ function freeParser(parser, req, socket) { } } - -function httpSocketSetup(socket) { - socket.removeListener('drain', ondrain); - socket.on('drain', ondrain); -} - const tokenRegExp = /^[\^_`a-zA-Z\-0-9!#$%&'*+.|~]+$/; /** * Verifies that the given val is a valid HTTP token @@ -253,7 +246,6 @@ module.exports = { CRLF: '\r\n', debug, freeParser, - httpSocketSetup, methods, parsers, kIncomingMessage, diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index 2d2bbaa0e72e9c..0fd99460870cd8 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -27,7 +27,7 @@ const { getDefaultHighWaterMark } = require('internal/streams/state'); const assert = require('internal/assert'); const Stream = require('stream'); const internalUtil = require('internal/util'); -const { kOutHeaders, utcDate } = require('internal/http'); +const { kOutHeaders, utcDate, kNeedDrain } = require('internal/http'); const { Buffer } = require('buffer'); const common = require('_http_common'); const checkIsHttpToken = common._checkIsHttpToken; @@ -96,6 +96,7 @@ function OutgoingMessage() { this._contentLength = null; this._hasBody = true; this._trailer = ''; + this[kNeedDrain] = false; this.finished = false; this._headerSent = false; @@ -582,7 +583,10 @@ Object.defineProperty(OutgoingMessage.prototype, 'writableEnded', { const crlf_buf = Buffer.from('\r\n'); OutgoingMessage.prototype.write = function write(chunk, encoding, callback) { - return write_(this, chunk, encoding, callback, false); + const ret = write_(this, chunk, encoding, callback, false); + if (!ret) + this[kNeedDrain] = true; + return ret; }; function write_(msg, chunk, encoding, callback, fromEnd) { @@ -782,8 +786,8 @@ OutgoingMessage.prototype._flush = function _flush() { if (this.finished) { // This is a queue to the server or client to bring in the next this. this._finish(); - } else if (ret) { - // This is necessary to prevent https from breaking + } else if (ret && this[kNeedDrain]) { + this[kNeedDrain] = false; this.emit('drain'); } } diff --git a/lib/_http_server.js b/lib/_http_server.js index 82e9fcf7ba938c..68492f3eaaeec0 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -32,7 +32,6 @@ const { CRLF, continueExpression, chunkExpression, - httpSocketSetup, kIncomingMessage, HTTPParser, _checkInvalidHeaderChar: checkInvalidHeaderChar, @@ -41,7 +40,7 @@ const { const { OutgoingMessage } = require('_http_outgoing'); const { kOutHeaders, - ondrain, + kNeedDrain, nowDate, emitStatistics } = require('internal/http'); @@ -363,8 +362,6 @@ function connectionListener(socket) { function connectionListenerInternal(server, socket) { debug('SERVER new http connection'); - httpSocketSetup(socket); - // Ensure that the server property of the socket is correctly set. // See https://github.com/nodejs/node/issues/13435 if (socket.server === null) @@ -459,6 +456,12 @@ function socketOnDrain(socket, state) { socket.parser.resume(); socket.resume(); } + + const msg = socket._httpMessage; + if (msg && !msg.finished && msg[kNeedDrain]) { + msg[kNeedDrain] = false; + msg.emit('drain'); + } } function socketOnTimeout() { @@ -585,7 +588,6 @@ function onParserExecuteCommon(server, socket, parser, state, ret, d) { socket.removeListener('end', state.onEnd); socket.removeListener('close', state.onClose); socket.removeListener('drain', state.onDrain); - socket.removeListener('drain', ondrain); socket.removeListener('error', socketOnError); unconsume(parser, socket); parser.finish(); diff --git a/lib/internal/http.js b/lib/internal/http.js index 440d8afa2d3182..fcfd2c91f22d33 100644 --- a/lib/internal/http.js +++ b/lib/internal/http.js @@ -28,10 +28,6 @@ function resetCache() { utcCache = undefined; } -function ondrain() { - if (this._httpMessage) this._httpMessage.emit('drain'); -} - class HttpRequestTiming extends PerformanceEntry { constructor(statistics) { super(); @@ -50,7 +46,7 @@ function emitStatistics(statistics) { module.exports = { kOutHeaders: Symbol('kOutHeaders'), - ondrain, + kNeedDrain: Symbol('kNeedDrain'), nowDate, utcDate, emitStatistics