diff --git a/lib/_http_agent.js b/lib/_http_agent.js index 2d52ea3143b3cd..c4430e86ec3aca 100644 --- a/lib/_http_agent.js +++ b/lib/_http_agent.js @@ -34,6 +34,7 @@ const EventEmitter = require('events'); let debug = require('internal/util/debuglog').debuglog('http', (fn) => { debug = fn; }); +const { AsyncResource } = require('async_hooks'); const { async_id_symbol } = require('internal/async_hooks').symbols; const { codes: { @@ -47,6 +48,7 @@ const { validateNumber } = require('internal/validators'); const kOnKeylog = Symbol('onkeylog'); const kRequestOptions = Symbol('requestOptions'); +const kRequestAsyncResource = Symbol('requestAsyncResource'); // New Agent code. // The largest departure from the previous implementation is that @@ -127,7 +129,17 @@ function Agent(options) { const requests = this.requests[name]; if (requests && requests.length) { const req = requests.shift(); - setRequestSocket(this, req, socket); + const reqAsyncRes = req[kRequestAsyncResource]; + if (reqAsyncRes) { + // Run request within the original async context. + reqAsyncRes.runInAsyncScope(() => { + asyncResetHandle(socket); + setRequestSocket(this, req, socket); + }); + req[kRequestAsyncResource] = null; + } else { + setRequestSocket(this, req, socket); + } if (requests.length === 0) { delete this.requests[name]; } @@ -253,14 +265,7 @@ Agent.prototype.addRequest = function addRequest(req, options, port/* legacy */, const sockLen = freeLen + this.sockets[name].length; if (socket) { - // Guard against an uninitialized or user supplied Socket. - const handle = socket._handle; - if (handle && typeof handle.asyncReset === 'function') { - // Assign the handle a new asyncId and run any destroy()/init() hooks. - handle.asyncReset(new ReusedHandle(handle.getProviderType(), handle)); - socket[async_id_symbol] = handle.getAsyncId(); - } - + asyncResetHandle(socket); this.reuseSocket(socket, req); setRequestSocket(this, req, socket); this.sockets[name].push(socket); @@ -284,6 +289,8 @@ Agent.prototype.addRequest = function addRequest(req, options, port/* legacy */, // Used to create sockets for pending requests from different origin req[kRequestOptions] = options; + // Used to capture the original async context. + req[kRequestAsyncResource] = new AsyncResource('QueuedRequest'); this.requests[name].push(req); } @@ -493,6 +500,16 @@ function setRequestSocket(agent, req, socket) { socket.setTimeout(req.timeout); } +function asyncResetHandle(socket) { + // Guard against an uninitialized or user supplied Socket. + const handle = socket._handle; + if (handle && typeof handle.asyncReset === 'function') { + // Assign the handle a new asyncId and run any destroy()/init() hooks. + handle.asyncReset(new ReusedHandle(handle.getProviderType(), handle)); + socket[async_id_symbol] = handle.getAsyncId(); + } +} + module.exports = { Agent, globalAgent: new Agent() diff --git a/test/async-hooks/test-async-local-storage-http-agent.js b/test/async-hooks/test-async-local-storage-http-agent.js new file mode 100644 index 00000000000000..1de535aa709687 --- /dev/null +++ b/test/async-hooks/test-async-local-storage-http-agent.js @@ -0,0 +1,35 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { AsyncLocalStorage } = require('async_hooks'); +const http = require('http'); + +const asyncLocalStorage = new AsyncLocalStorage(); + +const agent = new http.Agent({ + maxSockets: 1 +}); + +const N = 3; +let responses = 0; + +const server = http.createServer(common.mustCall((req, res) => { + res.end('ok'); +}, N)); + +server.listen(0, common.mustCall(() => { + const port = server.address().port; + + for (let i = 0; i < N; i++) { + asyncLocalStorage.run(i, () => { + http.get({ agent, port }, common.mustCall((res) => { + assert.strictEqual(asyncLocalStorage.getStore(), i); + if (++responses === N) { + server.close(); + agent.destroy(); + } + res.resume(); + })); + }); + } +})); diff --git a/test/async-hooks/test-http-agent-handle-reuse-parallel.js b/test/async-hooks/test-http-agent-handle-reuse-parallel.js new file mode 100644 index 00000000000000..cd73b3ed2cb61c --- /dev/null +++ b/test/async-hooks/test-http-agent-handle-reuse-parallel.js @@ -0,0 +1,92 @@ +'use strict'; +// Flags: --expose-internals +const common = require('../common'); +const initHooks = require('./init-hooks'); +const { checkInvocations } = require('./hook-checks'); +const assert = require('assert'); +const { async_id_symbol } = require('internal/async_hooks').symbols; +const http = require('http'); + +// Checks that the async resource used in init in case of a reused handle +// is not reused. Test is based on parallel\test-async-hooks-http-agent.js. + +const hooks = initHooks(); +hooks.enable(); + +const reqAsyncIds = []; +let socket; +let responses = 0; + +// Make sure a single socket is transparently reused for 2 requests. +const agent = new http.Agent({ + keepAlive: true, + keepAliveMsecs: Infinity, + maxSockets: 1 +}); + +const verifyRequest = (idx) => (res) => { + reqAsyncIds[idx] = res.socket[async_id_symbol]; + assert.ok(reqAsyncIds[idx] > 0, `${reqAsyncIds[idx]} > 0`); + if (socket) { + // Check that both requests share their socket. + assert.strictEqual(res.socket, socket); + } else { + socket = res.socket; + } + + res.on('data', common.mustCallAtLeast(() => {})); + res.on('end', common.mustCall(() => { + if (++responses === 2) { + // Clean up to let the event loop stop. + server.close(); + agent.destroy(); + } + })); +}; + +const server = http.createServer(common.mustCall((req, res) => { + req.once('data', common.mustCallAtLeast(() => { + res.writeHead(200, { 'Content-Type': 'text/plain' }); + res.write('foo'); + })); + req.on('end', common.mustCall(() => { + res.end('bar'); + })); +}, 2)).listen(0, common.mustCall(() => { + const port = server.address().port; + const payload = 'hello world'; + + // First request. + const r1 = http.request({ + agent, port, method: 'POST' + }, common.mustCall(verifyRequest(0))); + r1.end(payload); + + // Second request. Sent in parallel with the first one. + const r2 = http.request({ + agent, port, method: 'POST' + }, common.mustCall(verifyRequest(1))); + r2.end(payload); +})); + + +process.on('exit', onExit); + +function onExit() { + hooks.disable(); + hooks.sanityCheck(); + const activities = hooks.activities; + + // Verify both invocations + const first = activities.filter((x) => x.uid === reqAsyncIds[0])[0]; + checkInvocations(first, { init: 1, destroy: 1 }, 'when process exits'); + + const second = activities.filter((x) => x.uid === reqAsyncIds[1])[0]; + checkInvocations(second, { init: 1, destroy: 1 }, 'when process exits'); + + // Verify reuse handle has been wrapped + assert.strictEqual(first.type, second.type); + assert.ok(first.handle !== second.handle, 'Resource reused'); + assert.ok(first.handle === second.handle.handle, + 'Resource not wrapped correctly'); +} diff --git a/test/async-hooks/test-http-agent-handle-reuse.js b/test/async-hooks/test-http-agent-handle-reuse-serial.js similarity index 98% rename from test/async-hooks/test-http-agent-handle-reuse.js rename to test/async-hooks/test-http-agent-handle-reuse-serial.js index 4db83bec54a7bf..bbc7183d6e72ca 100644 --- a/test/async-hooks/test-http-agent-handle-reuse.js +++ b/test/async-hooks/test-http-agent-handle-reuse-serial.js @@ -7,7 +7,7 @@ const assert = require('assert'); const { async_id_symbol } = require('internal/async_hooks').symbols; const http = require('http'); -// Checks that the async resource used in init in case of a resused handle +// Checks that the async resource used in init in case of a reused handle // is not reused. Test is based on parallel\test-async-hooks-http-agent.js. const hooks = initHooks();