From 33827e3f6864872f357da6f9566f3c90b138c4ef Mon Sep 17 00:00:00 2001 From: Ben Noordhuis Date: Tue, 3 Nov 2015 19:06:50 +0100 Subject: [PATCH] cluster: remove handles when disconnecting worker Due to the race window between the master's "disconnect" message and the worker's "handle received" message, connections sometimes got stuck in the pending handles queue when calling `worker.disconnect()` in the master process. The observable effect from the client's perspective was a TCP or HTTP connection that simply stalled. This commit fixes that by closing open handles in the master when the "disconnect" message is sent. Fixes: https://github.com/nodejs/node/issues/3551 PR-URL: https://github.com/nodejs/node/pull/3677 Reviewed-By: Colin Ihrig Reviewed-By: Fedor Indutny Reviewed-By: James M Snell --- lib/cluster.js | 49 +++++++------- lib/internal/cluster.js | 4 ++ node.gyp | 1 + .../test-cluster-disconnect-handles.js | 65 +++++++++++++++++++ 4 files changed, 96 insertions(+), 23 deletions(-) create mode 100644 lib/internal/cluster.js create mode 100644 test/parallel/test-cluster-disconnect-handles.js diff --git a/lib/cluster.js b/lib/cluster.js index fbad0b08212384..550a17bed9fc8d 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -217,7 +217,7 @@ function masterInit() { // Keyed on address:port:etc. When a worker dies, we walk over the handles // and remove() the worker from each one. remove() may do a linear scan // itself so we might end up with an O(n*m) operation. Ergo, FIXME. - var handles = {}; + const handles = require('internal/cluster').handles; var initialized = false; cluster.setupMaster = function(options) { @@ -308,6 +308,26 @@ function masterInit() { var ids = 0; + function removeWorker(worker) { + assert(worker); + + delete cluster.workers[worker.id]; + + if (Object.keys(cluster.workers).length === 0) { + assert(Object.keys(handles).length === 0, 'Resource leak detected.'); + intercom.emit('disconnect'); + } + } + + function removeHandlesForWorker(worker) { + assert(worker); + + for (var key in handles) { + var handle = handles[key]; + if (handle.remove(worker)) delete handles[key]; + } + } + cluster.fork = function(env) { cluster.setupMaster(); const id = ++ids; @@ -319,26 +339,6 @@ function masterInit() { worker.on('message', this.emit.bind(this, 'message')); - function removeWorker(worker) { - assert(worker); - - delete cluster.workers[worker.id]; - - if (Object.keys(cluster.workers).length === 0) { - assert(Object.keys(handles).length === 0, 'Resource leak detected.'); - intercom.emit('disconnect'); - } - } - - function removeHandlesForWorker(worker) { - assert(worker); - - for (var key in handles) { - var handle = handles[key]; - if (handle.remove(worker)) delete handles[key]; - } - } - worker.process.once('exit', function(exitCode, signalCode) { /* * Remove the worker from the workers list only @@ -404,6 +404,8 @@ function masterInit() { Worker.prototype.disconnect = function() { this.suicide = true; send(this, { act: 'disconnect' }); + removeHandlesForWorker(this); + removeWorker(this); }; Worker.prototype.destroy = function(signo) { @@ -490,11 +492,12 @@ function masterInit() { cluster.emit('listening', worker, info); } - // Server in worker is closing, remove from list. + // Server in worker is closing, remove from list. The handle may have been + // removed by a prior call to removeHandlesForWorker() so guard against that. function close(worker, message) { var key = message.key; var handle = handles[key]; - if (handle.remove(worker)) delete handles[key]; + if (handle && handle.remove(worker)) delete handles[key]; } function send(worker, message, handle, cb) { diff --git a/lib/internal/cluster.js b/lib/internal/cluster.js new file mode 100644 index 00000000000000..8380ea7482c670 --- /dev/null +++ b/lib/internal/cluster.js @@ -0,0 +1,4 @@ +'use strict'; + +// Used in tests. +exports.handles = {}; diff --git a/node.gyp b/node.gyp index e63e750414ee60..0e8fd576f0735a 100644 --- a/node.gyp +++ b/node.gyp @@ -70,6 +70,7 @@ 'lib/vm.js', 'lib/zlib.js', 'lib/internal/child_process.js', + 'lib/internal/cluster.js', 'lib/internal/freelist.js', 'lib/internal/linkedlist.js', 'lib/internal/module.js', diff --git a/test/parallel/test-cluster-disconnect-handles.js b/test/parallel/test-cluster-disconnect-handles.js new file mode 100644 index 00000000000000..5fa98844537eb8 --- /dev/null +++ b/test/parallel/test-cluster-disconnect-handles.js @@ -0,0 +1,65 @@ +/* eslint-disable no-debugger */ +// Flags: --expose_internals +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const cluster = require('cluster'); +const net = require('net'); + +const Protocol = require('_debugger').Protocol; + +if (common.isWindows) { + console.log('1..0 # Skipped: SCHED_RR not reliable on Windows'); + return; +} + +cluster.schedulingPolicy = cluster.SCHED_RR; + +// Worker sends back a "I'm here" message, then immediately suspends +// inside the debugger. The master connects to the debug agent first, +// connects to the TCP server second, then disconnects the worker and +// unsuspends it again. The ultimate goal of this tortured exercise +// is to make sure the connection is still sitting in the master's +// pending handle queue. +if (cluster.isMaster) { + const handles = require('internal/cluster').handles; + // FIXME(bnoordhuis) lib/cluster.js scans the execArgv arguments for + // debugger flags and renumbers any port numbers it sees starting + // from the default port 5858. Add a '.' that circumvents the + // scanner but is ignored by atoi(3). Heinous hack. + cluster.setupMaster({ execArgv: [`--debug=${common.PORT}.`] }); + const worker = cluster.fork(); + worker.on('message', common.mustCall(message => { + assert.strictEqual(Array.isArray(message), true); + assert.strictEqual(message[0], 'listening'); + const address = message[1]; + const host = address.address; + const debugClient = net.connect({ host, port: common.PORT }); + const protocol = new Protocol(); + debugClient.setEncoding('utf8'); + debugClient.on('data', data => protocol.execute(data)); + debugClient.once('connect', common.mustCall(() => { + protocol.onResponse = common.mustCall(res => { + protocol.onResponse = () => {}; + const conn = net.connect({ host, port: address.port }); + conn.once('connect', common.mustCall(() => { + conn.destroy(); + assert.notDeepStrictEqual(handles, {}); + worker.disconnect(); + assert.deepStrictEqual(handles, {}); + const req = protocol.serialize({ command: 'continue' }); + debugClient.write(req); + })); + }); + })); + })); + process.on('exit', () => assert.deepStrictEqual(handles, {})); +} else { + const server = net.createServer(socket => socket.pipe(socket)); + server.listen(() => { + process.send(['listening', server.address()]); + debugger; + }); + process.on('disconnect', process.exit); +}