diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index 178a3418dace0a..64e0d4ab714d79 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -82,6 +82,9 @@ let debug = require('internal/util/debuglog').debuglog('http', (fn) => { }); const kCorked = Symbol('corked'); +const kSocket = Symbol('kSocket'); +const kChunkedBuffer = Symbol('kChunkedBuffer'); +const kChunkedLength = Symbol('kChunkedLength'); const kUniqueHeaders = Symbol('kUniqueHeaders'); const kBytesWritten = Symbol('kBytesWritten'); const kErrored = Symbol('errored'); @@ -140,9 +143,11 @@ function OutgoingMessage(options) { this.finished = false; this._headerSent = false; this[kCorked] = 0; + this[kChunkedBuffer] = []; + this[kChunkedLength] = 0; this._closed = false; - this.socket = null; + this[kSocket] = null; this._header = null; this[kOutHeaders] = null; @@ -177,7 +182,7 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableFinished', { return ( this.finished && this.outputSize === 0 && - (!this.socket || this.socket.writableLength === 0) + (!this[kSocket] || this[kSocket].writableLength === 0) ); }, }); @@ -192,22 +197,21 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableObjectMode', { ObjectDefineProperty(OutgoingMessage.prototype, 'writableLength', { __proto__: null, get() { - return this.outputSize + (this.socket ? this.socket.writableLength : 0); + return this.outputSize + this[kChunkedLength] + (this[kSocket] ? this[kSocket].writableLength : 0); }, }); ObjectDefineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', { __proto__: null, get() { - return this.socket ? this.socket.writableHighWaterMark : this[kHighWaterMark]; + return this[kSocket] ? this[kSocket].writableHighWaterMark : this[kHighWaterMark]; }, }); ObjectDefineProperty(OutgoingMessage.prototype, 'writableCorked', { __proto__: null, get() { - const corked = this.socket ? this.socket.writableCorked : 0; - return corked + this[kCorked]; + return this[kCorked]; }, }); @@ -235,13 +239,27 @@ ObjectDefineProperty(OutgoingMessage.prototype, '_headers', { ObjectDefineProperty(OutgoingMessage.prototype, 'connection', { __proto__: null, get: function() { - return this.socket; + return this[kSocket]; }, set: function(val) { this.socket = val; }, }); +ObjectDefineProperty(OutgoingMessage.prototype, 'socket', { + __proto__: null, + get: function() { + return this[kSocket]; + }, + set: function(val) { + for (let n = 0; n < this[kCorked]; n++) { + val?.cork(); + this[kSocket]?.uncork(); + } + this[kSocket] = val; + }, +}); + ObjectDefineProperty(OutgoingMessage.prototype, '_headerNames', { __proto__: null, get: internalUtil.deprecate(function() { @@ -299,19 +317,45 @@ OutgoingMessage.prototype._renderHeaders = function _renderHeaders() { }; OutgoingMessage.prototype.cork = function() { - if (this.socket) { - this.socket.cork(); - } else { - this[kCorked]++; + this[kCorked]++; + if (this[kSocket]) { + this[kSocket].cork(); } }; OutgoingMessage.prototype.uncork = function() { - if (this.socket) { - this.socket.uncork(); - } else if (this[kCorked]) { - this[kCorked]--; + this[kCorked]--; + if (this[kSocket]) { + this[kSocket].uncork(); + } + + if (this[kCorked] || this[kChunkedBuffer].length === 0) { + return; } + + const len = this[kChunkedLength]; + const buf = this[kChunkedBuffer]; + + assert(this.chunkedEncoding); + + let callbacks; + this._send(NumberPrototypeToString(len, 16), 'latin1', null); + this._send(crlf_buf, null, null); + for (let n = 0; n < buf.length; n += 3) { + this._send(buf[n + 0], buf[n + 1], null); + if (buf[n + 2]) { + callbacks ??= []; + callbacks.push(buf[n + 2]); + } + } + this._send(crlf_buf, null, callbacks.length ? (err) => { + for (const callback of callbacks) { + callback(err); + } + } : null); + + this[kChunkedBuffer].length = 0; + this[kChunkedLength] = 0; }; OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) { @@ -320,12 +364,12 @@ OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) { this.on('timeout', callback); } - if (!this.socket) { + if (!this[kSocket]) { this.once('socket', function socketSetTimeoutOnConnect(socket) { socket.setTimeout(msecs); }); } else { - this.socket.setTimeout(msecs); + this[kSocket].setTimeout(msecs); } return this; }; @@ -342,8 +386,8 @@ OutgoingMessage.prototype.destroy = function destroy(error) { this[kErrored] = error; - if (this.socket) { - this.socket.destroy(error); + if (this[kSocket]) { + this[kSocket].destroy(error); } else { this.once('socket', function socketDestroyOnConnect(socket) { socket.destroy(error); @@ -382,7 +426,7 @@ OutgoingMessage.prototype._send = function _send(data, encoding, callback, byteL OutgoingMessage.prototype._writeRaw = _writeRaw; function _writeRaw(data, encoding, callback, size) { - const conn = this.socket; + const conn = this[kSocket]; if (conn && conn.destroyed) { // The socket was destroyed. If we're still trying to write to it, // then we haven't gotten the 'close' event yet. @@ -938,10 +982,16 @@ function write_(msg, chunk, encoding, callback, fromEnd) { let ret; if (msg.chunkedEncoding && chunk.length !== 0) { len ??= typeof chunk === 'string' ? Buffer.byteLength(chunk, encoding) : chunk.byteLength; - msg._send(NumberPrototypeToString(len, 16), 'latin1', null); - msg._send(crlf_buf, null, null); - msg._send(chunk, encoding, null, len); - ret = msg._send(crlf_buf, null, callback); + if (msg[kCorked] && msg._headerSent) { + msg[kChunkedBuffer].push(chunk, encoding, callback); + msg[kChunkedLength] += len; + ret = msg[kChunkedLength] < msg[kHighWaterMark]; + } else { + msg._send(NumberPrototypeToString(len, 16), 'latin1', null); + msg._send(crlf_buf, null, null); + msg._send(chunk, encoding, null, len); + ret = msg._send(crlf_buf, null, callback); + } } else { ret = msg._send(chunk, encoding, callback, len); } @@ -1023,8 +1073,8 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { return this; } - if (this.socket) { - this.socket.cork(); + if (this[kSocket]) { + this[kSocket].cork(); } write_(this, chunk, encoding, null, true); @@ -1038,8 +1088,8 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { } return this; } else if (!this._header) { - if (this.socket) { - this.socket.cork(); + if (this[kSocket]) { + this[kSocket].cork(); } this._contentLength = 0; @@ -1063,12 +1113,13 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { process.nextTick(finish); } - if (this.socket) { + if (this[kSocket]) { // Fully uncork connection on end(). - this.socket._writableState.corked = 1; - this.socket.uncork(); + this[kSocket]._writableState.corked = 1; + this[kSocket].uncork(); } - this[kCorked] = 0; + this[kCorked] = 1; + this.uncork(); this.finished = true; @@ -1076,8 +1127,8 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { // everything to the socket. debug('outgoing message end.'); if (this.outputData.length === 0 && - this.socket && - this.socket._httpMessage === this) { + this[kSocket] && + this[kSocket]._httpMessage === this) { this._finish(); } @@ -1088,7 +1139,7 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { // This function is called once all user data are flushed to the socket. // Note that it has a chance that the socket is not drained. OutgoingMessage.prototype._finish = function _finish() { - assert(this.socket); + assert(this[kSocket]); this.emit('prefinish'); }; @@ -1113,7 +1164,7 @@ OutgoingMessage.prototype._finish = function _finish() { // This function, _flush(), is called by both the Server and Client // to attempt to flush any pending messages out to the socket. OutgoingMessage.prototype._flush = function _flush() { - const socket = this.socket; + const socket = this[kSocket]; if (socket && socket.writable) { // There might be remaining data in this.output; write it out @@ -1130,11 +1181,6 @@ OutgoingMessage.prototype._flush = function _flush() { }; OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) { - while (this[kCorked]) { - this[kCorked]--; - socket.cork(); - } - const outputLength = this.outputData.length; if (outputLength <= 0) return undefined;