diff --git a/README.md b/README.md index c4c0afd73..e818a2b4d 100644 --- a/README.md +++ b/README.md @@ -976,6 +976,7 @@ request.defaults({ [linux-timeout]: http://www.sekuda.com/overriding_the_default_linux_kernel_20_second_tcp_socket_connect_timeout - `maxResponseSize` - Abort request if the response size exceeds this threshold (bytes). +- `agentIdleTimeout` - set to number of milliseconds after which the agent should be discarded for reuse --- - `localAddress` - local interface to bind for network connections. diff --git a/lib/autohttp/request.js b/lib/autohttp/request.js index 8b12466b9..84b5db875 100644 --- a/lib/autohttp/request.js +++ b/lib/autohttp/request.js @@ -66,7 +66,7 @@ class MultiProtocolRequest extends EventEmitter { ob.on('end', (...args) => this.emit('end', ...args)) ob.on('close', (...args) => this.emit('close', ...args)) ob.on('response', (...args) => this.emit('response', ...args)) - ob.once('error', (...args) => this.emit('error', ...args)) + ob.on('error', (...args) => this.emit('error', ...args)) } processQueuedOpens () { diff --git a/lib/http2/agent.js b/lib/http2/agent.js index 464f5f720..86abb3d1d 100644 --- a/lib/http2/agent.js +++ b/lib/http2/agent.js @@ -42,11 +42,16 @@ class Http2Agent extends EventEmitter { const oldRef = connection.ref const oldUnref = connection.unref + const timeoutHandler = () => { + delete connectionsMap[name] + connection.close() + } + connection.refCount = 0 connection.ref = function () { this.refCount++ oldRef.call(this) - connection.removeAllListeners('timeout') + connection.off('timeout', timeoutHandler) connection.setTimeout(0) } const connectionsMap = this.connections @@ -54,11 +59,8 @@ class Http2Agent extends EventEmitter { this.refCount-- if (this.refCount === 0) { oldUnref.call(this) - if (_options.sessionIdleTimeout) { - connection.setTimeout(_options.sessionIdleTimeout, () => { - connection.close() - delete connectionsMap[name] - }) + if (_options.timeout) { + connection.setTimeout(_options.timeout, timeoutHandler) } } } diff --git a/lib/http2/request.js b/lib/http2/request.js index 1887c4925..dc78d56fa 100644 --- a/lib/http2/request.js +++ b/lib/http2/request.js @@ -42,6 +42,7 @@ class Http2Request extends EventEmitter { this.onClose = this.onClose.bind(this) this.onResponse = this.onResponse.bind(this) this.onEnd = this.onEnd.bind(this) + this.onTimeout = this.onTimeout.bind(this) this.registerListeners = this.registerListeners.bind(this) this._flushHeaders = this._flushHeaders.bind(this) @@ -102,6 +103,7 @@ class Http2Request extends EventEmitter { this.stream.on('close', this.onClose) this.stream.on('response', this.onResponse) this.stream.on('end', this.onEnd) + this.stream.on('timeout', this.onTimeout) } onDrain (...args) { @@ -120,9 +122,14 @@ class Http2Request extends EventEmitter { this.emit('end') } + onTimeout () { + this.stream.close() + } + onClose (...args) { if (this.stream.rstCode) { // Emit error message in case of abnormal stream closure + // It is fine if the error is emitted multiple times, since the callback has checks to prevent multiple invocations this.onError(new Error(`HTTP/2 Stream closed with error code ${rstErrorCodesMap[this.stream.rstCode]}`)) } @@ -134,6 +141,7 @@ class Http2Request extends EventEmitter { this.stream.off('response', this.onResponse) this.stream.off('end', this.onEnd) this.stream.off('close', this.onClose) + this.stream.off('timeout', this.onTimeout) this.removeAllListeners() } diff --git a/request.js b/request.js index 505ceab20..c48ec646d 100644 --- a/request.js +++ b/request.js @@ -649,7 +649,7 @@ Request.prototype.init = function (options) { self.agent = false } else { try { - self.agent = self.agent || self.getNewAgent() + self.agent = self.agent || self.getNewAgent({agentIdleTimeout: options.agentIdleTimeout}) } catch (error) { // tls.createSecureContext() throws on bad options return self.emit('error', error) @@ -774,7 +774,7 @@ Request.prototype.init = function (options) { }) } -Request.prototype.getNewAgent = function () { +Request.prototype.getNewAgent = function ({agentIdleTimeout}) { var self = this var Agent = self.agentClass var options = {} @@ -900,7 +900,7 @@ Request.prototype.getNewAgent = function () { } } - if (self.pool === globalPool && !poolKey && Object.keys(options).length === 0 && self.httpModule.globalAgent) { + if (self.pool === globalPool && !poolKey && Object.keys(options).length === 0 && self.httpModule.globalAgent && typeof agentIdleTimeout !== 'number') { // not doing anything special. Use the globalAgent return self.httpModule.globalAgent } @@ -908,16 +908,19 @@ Request.prototype.getNewAgent = function () { // we're using a stored agent. Make sure it's protocol-specific poolKey = self.protocolVersion + ':' + self.uri.protocol + poolKey + let agent = self.pool[poolKey] + // generate a new agent for this setting if none yet exists - if (!self.pool[poolKey]) { - self.pool[poolKey] = new Agent(options) + if (!agent || (typeof agentIdleTimeout === 'number' && (agent.lastUsedAt || 0) + agentIdleTimeout < Date.now())) { + agent = self.pool[poolKey] = new Agent(options) // properly set maxSockets on new agents if (self.pool.maxSockets) { self.pool[poolKey].maxSockets = self.pool.maxSockets } } - return self.pool[poolKey] + agent.lastUsedAt = Date.now() + return agent } Request.prototype.start = function () { diff --git a/tests/test-agentIdleTimeout.js b/tests/test-agentIdleTimeout.js new file mode 100644 index 000000000..3c105fc7e --- /dev/null +++ b/tests/test-agentIdleTimeout.js @@ -0,0 +1,69 @@ +'use strict' + +var server = require('./server') +var request = require('../index') +var tape = require('tape') + +var s + +function createServer () { + const s = server.createServer() + + s.on('/', function (req, res) { + res.writeHead(200, { 'content-type': 'text/plain' }) + res.end() + }) + + return s +} + +tape('setup', function (t) { + s = createServer() + s.listen(0, function () { + t.end() + }) +}) + +tape('should reuse the same agent', function (t) { + const data = { + url: s.url + '/', + agentIdleTimeout: 1000 + } + + const r1 = request(data, function (err, res) { + t.equal(err, null) + t.equal(res.statusCode, 200) + const r2 = request(data, function (err) { + t.equal(err, null) + t.end() + t.equal(r1.agent.identifier, r2.agent.identifier) + }) + }) + r1.agent.identifier = '1234' +}) + +tape('should use new agent after timeout', function (t) { + const data = { + url: s.url + '/', + agentIdleTimeout: 100 + } + + const r1 = request(data, function (err, res) { + t.equal(err, null) + t.equal(res.statusCode, 200) + setTimeout(() => { + const r2 = request(data, function (err) { + t.equal(err, null) + t.end() + t.notEqual(r1.agent.identifier, r2.agent.identifier) + }) + }, 200) + }) + r1.agent.identifier = '12345' +}) + +tape('cleanup', function (t) { + s.close(function () { + t.end() + }) +}) diff --git a/tests/test-errors-http2-auto-specific.js b/tests/test-errors-http2-auto-specific.js new file mode 100644 index 000000000..dd1d6f564 --- /dev/null +++ b/tests/test-errors-http2-auto-specific.js @@ -0,0 +1,73 @@ +'use strict' + +var request = require('../index') +var tape = require('tape') +var server = require('./server') +var destroyable = require('server-destroy') + +const s = server.createHttp2Server() + +destroyable(s) + +tape('setup', function (t) { + s.listen(0, function () { + t.end() + }) +}) + +function addTest (errorCode, data = {}) { + tape('test ' + errorCode, function (t) { + s.on('/' + errorCode, function (req, res) { + if (errorCode === 0) { + res.end() + return + } + res.stream.close(errorCode) + }) + data.uri = s.url + '/' + errorCode + request( + { ...data, strictSSL: false, protocolVersion: 'auto' }, + function (err) { + if (errorCode === 0) { + t.equal(err, null) + t.end() + return + } + if (errorCode === 8) { + t.equal(err.message, `HTTP/2 Stream closed with error code NGHTTP2_CANCEL`) + t.end() + return + } + t.equal(err.message, `Stream closed with error code ${errorCodes[errorCode]}`) + t.end() + } + ) + }) +} + +const errorCodes = [ + 'NGHTTP2_NO_ERROR', + 'NGHTTP2_PROTOCOL_ERROR', + 'NGHTTP2_INTERNAL_ERROR', + 'NGHTTP2_FLOW_CONTROL_ERROR', + 'NGHTTP2_SETTINGS_TIMEOUT', + 'NGHTTP2_STREAM_CLOSED', + 'NGHTTP2_FRAME_SIZE_ERROR', + 'NGHTTP2_REFUSED_STREAM', + 'NGHTTP2_CANCEL', + 'NGHTTP2_COMPRESSION_ERROR', + 'NGHTTP2_CONNECT_ERROR', + 'NGHTTP2_ENHANCE_YOUR_CALM', + 'NGHTTP2_INADEQUATE_SECURITY', + 'NGHTTP2_HTTP_1_1_REQUIRED' +] + +for (let i = 0; i < errorCodes.length; i++) { + addTest(i) +} + +tape('cleanup', function (t) { + s.destroy(function () { + t.end() + }) +}) diff --git a/tests/test-errors-http2-specific.js b/tests/test-errors-http2-specific.js new file mode 100644 index 000000000..92f631f6f --- /dev/null +++ b/tests/test-errors-http2-specific.js @@ -0,0 +1,73 @@ +'use strict' + +var request = require('../index') +var tape = require('tape') +var server = require('./server') +var destroyable = require('server-destroy') + +const s = server.createHttp2Server() + +destroyable(s) + +tape('setup', function (t) { + s.listen(0, function () { + t.end() + }) +}) + +function addTest (errorCode, data = {}) { + tape('test ' + errorCode, function (t) { + s.on('/' + errorCode, function (req, res) { + if (errorCode === 0) { + res.end() + return + } + res.stream.close(errorCode) + }) + data.uri = s.url + '/' + errorCode + request( + { ...data, strictSSL: false, protocolVersion: 'http2' }, + function (err, resp, body) { + if (errorCode === 0) { + t.equal(err, null) + t.end() + return + } + if (errorCode === 8) { + t.equal(err.message, `HTTP/2 Stream closed with error code NGHTTP2_CANCEL`) + t.end() + return + } + t.equal(err.message, `Stream closed with error code ${errorCodes[errorCode]}`) + t.end() + } + ) + }) +} + +const errorCodes = [ + 'NGHTTP2_NO_ERROR', + 'NGHTTP2_PROTOCOL_ERROR', + 'NGHTTP2_INTERNAL_ERROR', + 'NGHTTP2_FLOW_CONTROL_ERROR', + 'NGHTTP2_SETTINGS_TIMEOUT', + 'NGHTTP2_STREAM_CLOSED', + 'NGHTTP2_FRAME_SIZE_ERROR', + 'NGHTTP2_REFUSED_STREAM', + 'NGHTTP2_CANCEL', + 'NGHTTP2_COMPRESSION_ERROR', + 'NGHTTP2_CONNECT_ERROR', + 'NGHTTP2_ENHANCE_YOUR_CALM', + 'NGHTTP2_INADEQUATE_SECURITY', + 'NGHTTP2_HTTP_1_1_REQUIRED' +] + +for (let i = 0; i < errorCodes.length; i++) { + addTest(i) +} + +tape('cleanup', function (t) { + s.destroy(function () { + t.end() + }) +}) diff --git a/tests/test-idle-timeout-http2.js b/tests/test-idle-timeout-http2.js new file mode 100644 index 000000000..f50e127e6 --- /dev/null +++ b/tests/test-idle-timeout-http2.js @@ -0,0 +1,170 @@ +'use strict' + +var server = require('./server') +var request = require('../index') +var tape = require('tape') +var destroyable = require('server-destroy') + +function checkErrCode (t, err) { + t.notEqual(err, null) + t.ok(err.code === 'ETIMEDOUT' || err.code === 'ESOCKETTIMEDOUT', + 'Error ETIMEDOUT or ESOCKETTIMEDOUT') +} + +var s = server.createHttp2Server() + +var streams = [] +s.on('/', function (req, res) { + streams.push(req.stream) + res.writeHead(200, { 'content-type': 'text/plain' }) + res.end() +}) + +destroyable(s) + +tape('setup', function (t) { + s.listen(0, function () { + t.end() + }) +}) + +tape('should reuse the same socket', function (t) { + var shouldTimeout = { + url: s.url + '/', + pool: {}, + protocolVersion: 'http2', + strictSSL: false, + agentOptions: {} + } + + var socket + request(shouldTimeout, function (err) { + t.equal(err, null) + request(shouldTimeout, function (err) { + t.equal(err, null) + t.end() + }).on('socket', function (socket_) { + t.equal(socket.identifier, socket_.identifier) + socket.identifier = undefined + }) + }).on('socket', function (socket_) { + socket = socket_ + socket.identifier = '1234' + }) +}) + +tape('create a new socket when idle timeout is less than keep alive and time b/w requests is greater than idle timeout', function (t) { + var shouldTimeout = { + url: s.url + '/', + pool: {}, + protocolVersion: 'http2', + strictSSL: false, + agentOptions: { + timeout: 1000 + } + } + + var socket + request(shouldTimeout, function (err) { + t.equal(err, null) + setTimeout(function () { + request(shouldTimeout, function (err) { + t.equal(err, null) + t.end() + }).on('socket', function () { + try { + socket.identifier // eslint-disable-line + } catch (e) { + t.equal(e.message, 'The socket has been disconnected from the Http2Session') + } + }) + }, 1100) + }).on('socket', function (socket_) { + socket = socket_ + socket.identifier = '12345' + }) +}) + +tape('create a new socket when idle timeout is greater than keep alive and time b/w requests is greater than idle timeout', function (t) { + var shouldTimeout = { + url: s.url + '/', + pool: {}, + protocolVersion: 'http2', + strictSSL: false, + agentOptions: { + timeout: 2000 + } + } + + var socket + request(shouldTimeout, function (err) { + t.equal(err, null) + setTimeout(function () { + request(shouldTimeout, function (err) { + t.equal(err, null) + t.end() + }).on('socket', function () { + try { + socket.identifier // eslint-disable-line + } catch (e) { + t.equal(e.message, 'The socket has been disconnected from the Http2Session') + } + }) + }, 2100) + }).on('socket', function (socket_) { + socket = socket_ + socket.identifier = '12345' + }) +}) + +tape('agent timeout shouldn\'t affect request timeout', (t) => { + s.on('/timeout', (req, res) => { + streams.push(req.stream) + setTimeout(() => { + if (res.stream.closed) { + return + } + res.writeHead(200, { 'content-type': 'text/plain' }) + res.end() + }, 2000) + }) + + const shouldTimeout = { + url: s.url + '/timeout', + pool: {}, + timeout: 2500, + protocolVersion: 'http2', + strictSSL: false, + agentOptions: { + timeout: 1000 + } + } + + request(shouldTimeout, function (err) { + t.equal(err, null) + setTimeout(function () { + request({ ...shouldTimeout, timeout: 1000 }, function (err) { + checkErrCode(t, err) + t.end() + }) + }, 2100) + }) +}) + +tape('cleanup', function (t) { + const sessions = [] + + streams.forEach((stream) => { + sessions.push(stream.session) + stream.destroy() + }) + + sessions.forEach((session) => { + if (!session) { return } + session.close() + }) + + s.destroy(function () { + t.end() + }) +}) diff --git a/tests/test-idle-timeout.js b/tests/test-idle-timeout.js new file mode 100644 index 000000000..0c8a3a204 --- /dev/null +++ b/tests/test-idle-timeout.js @@ -0,0 +1,112 @@ +'use strict' + +var server = require('./server') +var request = require('../index') +var tape = require('tape') + +var s + +function createServer () { + const s = server.createServer() + s.keepAliveTimeout = 5000 + + // Request that waits for 200ms + s.on('/timeout', function (req, res) { + res.writeHead(200, { 'content-type': 'text/plain' }) + res.end() + }) + + return s +} + +tape('setup', function (t) { + s = createServer() + s.listen(0, function () { + t.end() + }) +}) + +tape('should reuse the same socket', function (t) { + var shouldTimeout = { + url: s.url + '/timeout', + pool: {}, // Provide pool so global pool is not shared between tests + agentOptions: { + keepAlive: true + } + } + + var socket + request(shouldTimeout, function (err) { + t.equal(err, null) + request(shouldTimeout, function (err) { + t.equal(err, null) + t.end() + }).on('socket', function (socket_) { + t.equal(socket.identifier, socket_.identifier) + socket.identifier = undefined + }) + }).on('socket', function (socket_) { + socket = socket_ + socket.identifier = '1234' + }) +}) + +tape('create a new socket when idle timeout is less than keep alive and time b/w requests is greater than idle timeout', function (t) { + var shouldTimeout = { + url: s.url + '/timeout', + pool: {}, + agentOptions: { + keepAlive: true, + timeout: 1000 + } + } + + var socket + request(shouldTimeout, function (err) { + t.equal(err, null) + setTimeout(function () { + request(shouldTimeout, function (err) { + t.equal(err, null) + t.end() + }).on('socket', function (socket_) { + t.notEqual(socket.identifier, socket_.identifier) + }) + }, 1100) + }).on('socket', function (socket_) { + socket = socket_ + socket.identifier = '12345' + }) +}) + +tape('create a new socket when idle timeout is greater than keep alive and time b/w requests is greater than idle timeout', function (t) { + var shouldTimeout = { + url: s.url + '/timeout', + pool: {}, + agentOptions: { + keepAlive: true, + timeout: 2000 + } + } + + var socket + request(shouldTimeout, function (err) { + t.equal(err, null) + setTimeout(function () { + request(shouldTimeout, function (err) { + t.equal(err, null) + t.end() + }).on('socket', function (socket_) { + t.notEqual(socket.identifier, socket_.identifier) + }) + }, 2100) + }).on('socket', function (socket_) { + socket = socket_ + socket.identifier = '12345' + }) +}) + +tape('cleanup', function (t) { + s.close(function () { + t.end() + }) +})