From df8b8b257d2d94905a5bcf47cc4e0514fa717e96 Mon Sep 17 00:00:00 2001 From: Brian White Date: Sun, 18 Dec 2016 21:20:29 -0500 Subject: [PATCH] http: refactor server connection handling PR-URL: https://github.com/nodejs/node/pull/6533 Reviewed-By: Matteo Collina Reviewed-By: James M Snell Reviewed-By: Fedor Indutny Reviewed-By: Benjamin Gruenbaum --- lib/_http_server.js | 440 +++++++++++++++++++++++--------------------- 1 file changed, 226 insertions(+), 214 deletions(-) diff --git a/lib/_http_server.js b/lib/_http_server.js index b32e484b4bcfa3..21113fc37a9f5b 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -264,40 +264,6 @@ exports.Server = Server; function connectionListener(socket) { - var self = this; - var outgoing = []; - var incoming = []; - var outgoingData = 0; - - function updateOutgoingData(delta) { - // `outgoingData` is an approximate amount of bytes queued through all - // inactive responses. If more data than the high watermark is queued - we - // need to pause TCP socket/HTTP parser, and wait until the data will be - // sent to the client. - outgoingData += delta; - if (socket._paused && outgoingData < socket._writableState.highWaterMark) - return socketOnDrain(); - } - - function abortIncoming() { - while (incoming.length) { - var req = incoming.shift(); - req.emit('aborted'); - req.emit('close'); - } - // abort socket._httpMessage ? - } - - function serverSocketCloseListener() { - debug('server socket close'); - // mark this parser as reusable - if (this.parser) { - freeParser(this.parser, null, this); - } - - abortIncoming(); - } - debug('SERVER new http connection'); httpSocketSetup(socket); @@ -305,18 +271,9 @@ function connectionListener(socket) { // If the user has added a listener to the server, // request, or response, then it's their responsibility. // otherwise, destroy on timeout by default - if (self.timeout) - socket.setTimeout(self.timeout); - socket.on('timeout', function() { - var req = socket.parser && socket.parser.incoming; - var reqTimeout = req && !req.complete && req.emit('timeout', socket); - var res = socket._httpMessage; - var resTimeout = res && res.emit('timeout', socket); - var serverTimeout = self.emit('timeout', socket); - - if (!reqTimeout && !resTimeout && !serverTimeout) - socket.destroy(); - }); + if (this.timeout) + socket.setTimeout(this.timeout); + socket.on('timeout', socketOnTimeout.bind(undefined, this, socket)); var parser = parsers.alloc(); parser.reinitialize(HTTPParser.REQUEST); @@ -332,17 +289,34 @@ function connectionListener(socket) { parser.maxHeaderPairs = 2000; } - socket.addListener('error', socketOnError); - socket.addListener('close', serverSocketCloseListener); - parser.onIncoming = parserOnIncoming; - socket.on('end', socketOnEnd); - socket.on('data', socketOnData); + var state = { + onData: null, + onError: null, + onEnd: null, + onClose: null, + outgoing: [], + incoming: [], + // `outgoingData` is an approximate amount of bytes queued through all + // inactive responses. If more data than the high watermark is queued - we + // need to pause TCP socket/HTTP parser, and wait until the data will be + // sent to the client. + outgoingData: 0 + }; + state.onData = socketOnData.bind(undefined, this, socket, parser, state); + state.onError = socketOnError.bind(undefined, this, socket, state); + state.onEnd = socketOnEnd.bind(undefined, this, socket, parser, state); + state.onClose = socketOnClose.bind(undefined, socket, state); + socket.on('data', state.onData); + socket.on('error', state.onError); + socket.on('end', state.onEnd); + socket.on('close', state.onClose); + parser.onIncoming = parserOnIncoming.bind(undefined, this, socket, state); // We are consuming socket, so it won't get any actual data socket.on('resume', onSocketResume); socket.on('pause', onSocketPause); - socket.on('drain', socketOnDrain); + socket.on('drain', socketOnDrain.bind(undefined, socket, state)); // Override on to unconsume on `data`, `readable` listeners socket.on = socketOnWrap; @@ -352,205 +326,243 @@ function connectionListener(socket) { parser._consumed = true; parser.consume(external); } - external = null; - parser[kOnExecute] = onParserExecute; + parser[kOnExecute] = + onParserExecute.bind(undefined, this, socket, parser, state); - // TODO(isaacs): Move all these functions out of here - function socketOnError(e) { - // Ignore further errors - this.removeListener('error', socketOnError); - this.on('error', () => {}); + socket._paused = false; +} +exports._connectionListener = connectionListener; - if (!self.emit('clientError', e, this)) - this.destroy(e); +function updateOutgoingData(socket, state, delta) { + state.outgoingData += delta; + if (socket._paused && + state.outgoingData < socket._writableState.highWaterMark) { + return socketOnDrain(socket, state); } +} - function socketOnData(d) { - assert(!socket._paused); - debug('SERVER socketOnData %d', d.length); - var ret = parser.execute(d); +function socketOnDrain(socket, state) { + var needPause = state.outgoingData > socket._writableState.highWaterMark; - onParserExecuteCommon(ret, d); + // If we previously paused, then start reading again. + if (socket._paused && !needPause) { + socket._paused = false; + if (socket.parser) + socket.parser.resume(); + socket.resume(); } +} + +function socketOnTimeout(server, socket) { + var req = socket.parser && socket.parser.incoming; + var reqTimeout = req && !req.complete && req.emit('timeout', socket); + var res = socket._httpMessage; + var resTimeout = res && res.emit('timeout', socket); + var serverTimeout = server.emit('timeout', socket); + + if (!reqTimeout && !resTimeout && !serverTimeout) + socket.destroy(); +} - function onParserExecute(ret, d) { - socket._unrefTimer(); - debug('SERVER socketOnParserExecute %d', ret); - onParserExecuteCommon(ret, undefined); +function socketOnClose(socket, state) { + debug('server socket close'); + // mark this parser as reusable + if (socket.parser) { + freeParser(socket.parser, null, socket); } - function onParserExecuteCommon(ret, d) { - if (ret instanceof Error) { - debug('parse error'); - socketOnError.call(socket, ret); - } else if (parser.incoming && parser.incoming.upgrade) { - // Upgrade or CONNECT - var bytesParsed = ret; - var req = parser.incoming; - debug('SERVER upgrade or connect', req.method); - - if (!d) - d = parser.getCurrentBuffer(); - - socket.removeListener('data', socketOnData); - socket.removeListener('end', socketOnEnd); - socket.removeListener('close', serverSocketCloseListener); - unconsume(parser, socket); - parser.finish(); - freeParser(parser, req, null); - parser = null; - - var eventName = req.method === 'CONNECT' ? 'connect' : 'upgrade'; - if (self.listenerCount(eventName) > 0) { - debug('SERVER have listener for %s', eventName); - var bodyHead = d.slice(bytesParsed, d.length); - - // TODO(isaacs): Need a way to reset a stream to fresh state - // IE, not flowing, and not explicitly paused. - socket._readableState.flowing = null; - self.emit(eventName, req, socket, bodyHead); - } else { - // Got upgrade header or CONNECT method, but have no handler. - socket.destroy(); - } - } + abortIncoming(state.incoming); +} - if (socket._paused && socket.parser) { - // onIncoming paused the socket, we should pause the parser as well - debug('pause parser'); - socket.parser.pause(); - } +function abortIncoming(incoming) { + while (incoming.length) { + var req = incoming.shift(); + req.emit('aborted'); + req.emit('close'); } + // abort socket._httpMessage ? +} - function socketOnEnd() { - var socket = this; - var ret = parser.finish(); +function socketOnEnd(server, socket, parser, state) { + var ret = parser.finish(); - if (ret instanceof Error) { - debug('parse error'); - socketOnError.call(socket, ret); - return; - } + if (ret instanceof Error) { + debug('parse error'); + state.onError(ret); + return; + } - if (!self.httpAllowHalfOpen) { - abortIncoming(); - if (socket.writable) socket.end(); - } else if (outgoing.length) { - outgoing[outgoing.length - 1]._last = true; - } else if (socket._httpMessage) { - socket._httpMessage._last = true; - } else { - if (socket.writable) socket.end(); - } + if (!server.httpAllowHalfOpen) { + abortIncoming(state.incoming); + if (socket.writable) socket.end(); + } else if (state.outgoing.length) { + state.outgoing[state.outgoing.length - 1]._last = true; + } else if (socket._httpMessage) { + socket._httpMessage._last = true; + } else { + if (socket.writable) socket.end(); } +} +function socketOnData(server, socket, parser, state, d) { + assert(!socket._paused); + debug('SERVER socketOnData %d', d.length); + var ret = parser.execute(d); - // The following callback is issued after the headers have been read on a - // new message. In this callback we setup the response object and pass it - // to the user. + onParserExecuteCommon(server, socket, parser, state, ret, d); +} - socket._paused = false; - function socketOnDrain() { - var needPause = outgoingData > socket._writableState.highWaterMark; - - // If we previously paused, then start reading again. - if (socket._paused && !needPause) { - socket._paused = false; - if (socket.parser) - socket.parser.resume(); - socket.resume(); +function onParserExecute(server, socket, parser, state, ret, d) { + socket._unrefTimer(); + debug('SERVER socketOnParserExecute %d', ret); + onParserExecuteCommon(server, socket, parser, state, ret, undefined); +} + +function socketOnError(server, socket, state, e) { + // Ignore further errors + socket.removeListener('error', state.onError); + socket.on('error', () => {}); + + if (!server.emit('clientError', e, socket)) + socket.destroy(e); +} + +function onParserExecuteCommon(server, socket, parser, state, ret, d) { + if (ret instanceof Error) { + debug('parse error'); + state.onError(ret); + } else if (parser.incoming && parser.incoming.upgrade) { + // Upgrade or CONNECT + var bytesParsed = ret; + var req = parser.incoming; + debug('SERVER upgrade or connect', req.method); + + if (!d) + d = parser.getCurrentBuffer(); + + socket.removeListener('data', state.onData); + socket.removeListener('end', state.onEnd); + socket.removeListener('close', state.onClose); + unconsume(parser, socket); + parser.finish(); + freeParser(parser, req, null); + parser = null; + + var eventName = req.method === 'CONNECT' ? 'connect' : 'upgrade'; + if (server.listenerCount(eventName) > 0) { + debug('SERVER have listener for %s', eventName); + var bodyHead = d.slice(bytesParsed, d.length); + + // TODO(isaacs): Need a way to reset a stream to fresh state + // IE, not flowing, and not explicitly paused. + socket._readableState.flowing = null; + server.emit(eventName, req, socket, bodyHead); + } else { + // Got upgrade header or CONNECT method, but have no handler. + socket.destroy(); } } - function parserOnIncoming(req, shouldKeepAlive) { - incoming.push(req); - - // If the writable end isn't consuming, then stop reading - // so that we don't become overwhelmed by a flood of - // pipelined requests that may never be resolved. - if (!socket._paused) { - var needPause = socket._writableState.needDrain || - outgoingData >= socket._writableState.highWaterMark; - if (needPause) { - socket._paused = true; - // We also need to pause the parser, but don't do that until after - // the call to execute, because we may still be processing the last - // chunk. - socket.pause(); - } - } + if (socket._paused && socket.parser) { + // onIncoming paused the socket, we should pause the parser as well + debug('pause parser'); + socket.parser.pause(); + } +} - var res = new ServerResponse(req); - res._onPendingData = updateOutgoingData; +function resOnFinish(req, res, socket, state) { + // Usually the first incoming element should be our request. it may + // be that in the case abortIncoming() was called that the incoming + // array will be empty. + assert(state.incoming.length === 0 || state.incoming[0] === req); - res.shouldKeepAlive = shouldKeepAlive; - DTRACE_HTTP_SERVER_REQUEST(req, socket); - LTTNG_HTTP_SERVER_REQUEST(req, socket); - COUNTER_HTTP_SERVER_REQUEST(); + state.incoming.shift(); - if (socket._httpMessage) { - // There are already pending outgoing res, append. - outgoing.push(res); - } else { - res.assignSocket(socket); + // if the user never called req.read(), and didn't pipe() or + // .resume() or .on('data'), then we call req._dump() so that the + // bytes will be pulled off the wire. + if (!req._consuming && !req._readableState.resumeScheduled) + req._dump(); + + res.detachSocket(socket); + + if (res._last) { + socket.destroySoon(); + } else { + // start sending the next message + var m = state.outgoing.shift(); + if (m) { + m.assignSocket(socket); } + } +} - // When we're finished writing the response, check if this is the last - // response, if so destroy the socket. - res.on('finish', resOnFinish); - function resOnFinish() { - // Usually the first incoming element should be our request. it may - // be that in the case abortIncoming() was called that the incoming - // array will be empty. - assert(incoming.length === 0 || incoming[0] === req); +// The following callback is issued after the headers have been read on a +// new message. In this callback we setup the response object and pass it +// to the user. +function parserOnIncoming(server, socket, state, req, keepAlive) { + state.incoming.push(req); + + // If the writable end isn't consuming, then stop reading + // so that we don't become overwhelmed by a flood of + // pipelined requests that may never be resolved. + if (!socket._paused) { + var needPause = socket._writableState.needDrain || + state.outgoingData >= socket._writableState.highWaterMark; + if (needPause) { + socket._paused = true; + // We also need to pause the parser, but don't do that until after + // the call to execute, because we may still be processing the last + // chunk. + socket.pause(); + } + } - incoming.shift(); + var res = new ServerResponse(req); + res._onPendingData = updateOutgoingData.bind(undefined, socket, state); - // if the user never called req.read(), and didn't pipe() or - // .resume() or .on('data'), then we call req._dump() so that the - // bytes will be pulled off the wire. - if (!req._consuming && !req._readableState.resumeScheduled) - req._dump(); + res.shouldKeepAlive = keepAlive; + DTRACE_HTTP_SERVER_REQUEST(req, socket); + LTTNG_HTTP_SERVER_REQUEST(req, socket); + COUNTER_HTTP_SERVER_REQUEST(); - res.detachSocket(socket); + if (socket._httpMessage) { + // There are already pending outgoing res, append. + state.outgoing.push(res); + } else { + res.assignSocket(socket); + } - if (res._last) { - socket.destroySoon(); - } else { - // start sending the next message - var m = outgoing.shift(); - if (m) { - m.assignSocket(socket); - } - } - } + // When we're finished writing the response, check if this is the last + // response, if so destroy the socket. + var finish = + resOnFinish.bind(undefined, req, res, socket, state); + res.on('finish', finish); + + if (req.headers.expect !== undefined && + (req.httpVersionMajor === 1 && req.httpVersionMinor === 1)) { + if (continueExpression.test(req.headers.expect)) { + res._expect_continue = true; - if (req.headers.expect !== undefined && - (req.httpVersionMajor === 1 && req.httpVersionMinor === 1)) { - if (continueExpression.test(req.headers.expect)) { - res._expect_continue = true; - - if (self.listenerCount('checkContinue') > 0) { - self.emit('checkContinue', req, res); - } else { - res.writeContinue(); - self.emit('request', req, res); - } + if (server.listenerCount('checkContinue') > 0) { + server.emit('checkContinue', req, res); } else { - if (self.listenerCount('checkExpectation') > 0) { - self.emit('checkExpectation', req, res); - } else { - res.writeHead(417); - res.end(); - } + res.writeContinue(); + server.emit('request', req, res); } } else { - self.emit('request', req, res); + if (server.listenerCount('checkExpectation') > 0) { + server.emit('checkExpectation', req, res); + } else { + res.writeHead(417); + res.end(); + } } - return false; // Not a HEAD response. (Not even a response!) + } else { + server.emit('request', req, res); } + return false; // Not a HEAD response. (Not even a response!) } -exports._connectionListener = connectionListener; function onSocketResume() { // It may seem that the socket is resumed, but this is an enemy's trick to