Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: remove handles when disconnecting worker #3677

Merged
merged 1 commit into from
Nov 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 26 additions & 23 deletions lib/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ function masterInit() {
// Keyed on address:port:etc. When a worker dies, we walk over the handles
// and remove() the worker from each one. remove() may do a linear scan
// itself so we might end up with an O(n*m) operation. Ergo, FIXME.
var handles = {};
const handles = require('internal/cluster').handles;

var initialized = false;
cluster.setupMaster = function(options) {
Expand Down Expand Up @@ -308,6 +308,26 @@ function masterInit() {

var ids = 0;

function removeWorker(worker) {
assert(worker);

delete cluster.workers[worker.id];

if (Object.keys(cluster.workers).length === 0) {
assert(Object.keys(handles).length === 0, 'Resource leak detected.');
intercom.emit('disconnect');
}
}

function removeHandlesForWorker(worker) {
assert(worker);

for (var key in handles) {
var handle = handles[key];
if (handle.remove(worker)) delete handles[key];
}
}

cluster.fork = function(env) {
cluster.setupMaster();
const id = ++ids;
Expand All @@ -319,26 +339,6 @@ function masterInit() {

worker.on('message', this.emit.bind(this, 'message'));

function removeWorker(worker) {
assert(worker);

delete cluster.workers[worker.id];

if (Object.keys(cluster.workers).length === 0) {
assert(Object.keys(handles).length === 0, 'Resource leak detected.');
intercom.emit('disconnect');
}
}

function removeHandlesForWorker(worker) {
assert(worker);

for (var key in handles) {
var handle = handles[key];
if (handle.remove(worker)) delete handles[key];
}
}

worker.process.once('exit', function(exitCode, signalCode) {
/*
* Remove the worker from the workers list only
Expand Down Expand Up @@ -404,6 +404,8 @@ function masterInit() {
Worker.prototype.disconnect = function() {
this.suicide = true;
send(this, { act: 'disconnect' });
removeHandlesForWorker(this);
removeWorker(this);
};

Worker.prototype.destroy = function(signo) {
Expand Down Expand Up @@ -490,11 +492,12 @@ function masterInit() {
cluster.emit('listening', worker, info);
}

// Server in worker is closing, remove from list.
// Server in worker is closing, remove from list. The handle may have been
// removed by a prior call to removeHandlesForWorker() so guard against that.
function close(worker, message) {
var key = message.key;
var handle = handles[key];
if (handle.remove(worker)) delete handles[key];
if (handle && handle.remove(worker)) delete handles[key];
}

function send(worker, message, handle, cb) {
Expand Down
4 changes: 4 additions & 0 deletions lib/internal/cluster.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
'use strict';

// Used in tests.
exports.handles = {};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Time for Map?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
'lib/vm.js',
'lib/zlib.js',
'lib/internal/child_process.js',
'lib/internal/cluster.js',
'lib/internal/freelist.js',
'lib/internal/linkedlist.js',
'lib/internal/module.js',
Expand Down
65 changes: 65 additions & 0 deletions test/parallel/test-cluster-disconnect-handles.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/* eslint-disable no-debugger */
// Flags: --expose_internals
'use strict';

const common = require('../common');
const assert = require('assert');
const cluster = require('cluster');
const net = require('net');

const Protocol = require('_debugger').Protocol;

if (common.isWindows) {
console.log('1..0 # Skipped: SCHED_RR not reliable on Windows');
return;
}

cluster.schedulingPolicy = cluster.SCHED_RR;

// Worker sends back a "I'm here" message, then immediately suspends
// inside the debugger. The master connects to the debug agent first,
// connects to the TCP server second, then disconnects the worker and
// unsuspends it again. The ultimate goal of this tortured exercise
// is to make sure the connection is still sitting in the master's
// pending handle queue.
if (cluster.isMaster) {
const handles = require('internal/cluster').handles;
// FIXME(bnoordhuis) lib/cluster.js scans the execArgv arguments for
// debugger flags and renumbers any port numbers it sees starting
// from the default port 5858. Add a '.' that circumvents the
// scanner but is ignored by atoi(3). Heinous hack.
cluster.setupMaster({ execArgv: [`--debug=${common.PORT}.`] });
const worker = cluster.fork();
worker.on('message', common.mustCall(message => {
assert.strictEqual(Array.isArray(message), true);
assert.strictEqual(message[0], 'listening');
const address = message[1];
const host = address.address;
const debugClient = net.connect({ host, port: common.PORT });
const protocol = new Protocol();
debugClient.setEncoding('utf8');
debugClient.on('data', data => protocol.execute(data));
debugClient.once('connect', common.mustCall(() => {
protocol.onResponse = common.mustCall(res => {
protocol.onResponse = () => {};
const conn = net.connect({ host, port: address.port });
conn.once('connect', common.mustCall(() => {
conn.destroy();
assert.notDeepStrictEqual(handles, {});
worker.disconnect();
assert.deepStrictEqual(handles, {});
const req = protocol.serialize({ command: 'continue' });
debugClient.write(req);
}));
});
}));
}));
process.on('exit', () => assert.deepStrictEqual(handles, {}));
} else {
const server = net.createServer(socket => socket.pipe(socket));
server.listen(() => {
process.send(['listening', server.address()]);
debugger;
});
process.on('disconnect', process.exit);
}