diff --git a/lib/_http_common.js b/lib/_http_common.js index 297ac8f063c495..548f3496165d7c 100644 --- a/lib/_http_common.js +++ b/lib/_http_common.js @@ -157,10 +157,8 @@ function parserOnMessageComplete() { stream.push(null); } - if (parser.socket.readable) { - // force to read the next incoming message - readStart(parser.socket); - } + // force to read the next incoming message + readStart(parser.socket); } diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js index cc93bfc9f15347..2f664002476aa0 100644 --- a/lib/_http_incoming.js +++ b/lib/_http_incoming.js @@ -23,7 +23,7 @@ var util = require('util'); var Stream = require('stream'); function readStart(socket) { - if (socket) + if (socket && !socket._paused && socket.readable) socket.resume(); } exports.readStart = readStart; diff --git a/lib/_http_server.js b/lib/_http_server.js index 15ef5a63873671..5a82c94dc8c5a3 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -351,6 +351,7 @@ function connectionListener(socket) { } function socketOnData(d) { + assert(!socket._paused); debug('SERVER socketOnData %d', d.length); var ret = parser.execute(d); if (ret instanceof Error) { @@ -382,6 +383,12 @@ function connectionListener(socket) { socket.destroy(); } } + + if (socket._paused) { + // onIncoming paused the socket, we should pause the parser as well + debug('pause parser'); + socket.parser.pause(); + } } function socketOnEnd() { @@ -411,9 +418,34 @@ function connectionListener(socket) { // new message. In this callback we setup the response object and pass it // to the user. + socket._paused = false; + function socketOnDrain() { + // If we previously paused, then start reading again. + if (socket._paused) { + socket._paused = false; + socket.parser.resume(); + socket.resume(); + } + } + socket.on('drain', socketOnDrain); + function parserOnIncoming(req, shouldKeepAlive) { incoming.push(req); + // If the writable end isn't consuming, then stop reading + // so that we don't become overwhelmed by a flood of + // pipelined requests that may never be resolved. + if (!socket._paused) { + var needPause = socket._writableState.needDrain; + if (needPause) { + socket._paused = true; + // We also need to pause the parser, but don't do that until after + // the call to execute, because we may still be processing the last + // chunk. + socket.pause(); + } + } + var res = new ServerResponse(req); res.shouldKeepAlive = shouldKeepAlive; diff --git a/test/simple/test-http-pipeline-flood.js b/test/simple/test-http-pipeline-flood.js new file mode 100644 index 00000000000000..a10f152f66e1e8 --- /dev/null +++ b/test/simple/test-http-pipeline-flood.js @@ -0,0 +1,113 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var common = require('../common'); +var assert = require('assert'); + +switch (process.argv[2]) { + case undefined: + return parent(); + case 'child': + return child(); + default: + throw new Error('wtf'); +} + +function parent() { + var http = require('http'); + var bigResponse = new Buffer(10240).fill('x'); + var gotTimeout = false; + var childClosed = false; + var requests = 0; + var connections = 0; + + var server = http.createServer(function(req, res) { + requests++; + res.setHeader('content-length', bigResponse.length); + res.end(bigResponse); + }); + + server.on('connection', function(conn) { + connections++; + }); + + // kill the connection after a bit, verifying that the + // flood of requests was eventually halted. + server.setTimeout(200, function(conn) { + gotTimeout = true; + conn.destroy(); + }); + + server.listen(common.PORT, function() { + var spawn = require('child_process').spawn; + var args = [__filename, 'child']; + var child = spawn(process.execPath, args, { stdio: 'inherit' }); + child.on('close', function(code) { + assert(!code); + childClosed = true; + server.close(); + }); + }); + + process.on('exit', function() { + assert(gotTimeout); + assert(childClosed); + assert.equal(connections, 1); + // 1213 works out to be the number of requests we end up processing + // before the outgoing connection backs up and requires a drain. + // however, to avoid being unnecessarily tied to a specific magic number, + // and making the test brittle, just assert that it's "a lot", which we + // can safely assume is more than 500. + assert(requests >= 500); + console.log('ok'); + }); +} + +function child() { + var net = require('net'); + + var gotEpipe = false; + var conn = net.connect({ port: common.PORT }); + + var req = 'GET / HTTP/1.1\r\nHost: localhost:' + + common.PORT + '\r\nAccept: */*\r\n\r\n'; + + req = new Array(10241).join(req); + + conn.on('connect', function() { + write(); + }); + + conn.on('drain', write); + + conn.on('error', function(er) { + gotEpipe = true; + }); + + process.on('exit', function() { + assert(gotEpipe); + console.log('ok - child'); + }); + + function write() { + while (false !== conn.write(req, 'ascii')); + } +}