Skip to content

Commit

Permalink
Refactor reference counting with IPC (#1089)
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky authored May 24, 2024
1 parent 97b17e6 commit 18e607c
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 31 deletions.
1 change: 1 addition & 0 deletions lib/ipc/buffer-messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export const waitForIpcOutput = async ({

for await (const message of loopOnMessages({
anyProcess: subprocess,
channel: subprocess.channel,
isSubprocess: false,
ipc,
shouldAwait: false,
Expand Down
20 changes: 15 additions & 5 deletions lib/ipc/forward.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {undoAddedReferences} from './reference.js';
// Forward the `message` and `disconnect` events from the process and subprocess to a proxy emitter.
// This prevents the `error` event from stopping IPC.
// This also allows debouncing the `message` event.
export const getIpcEmitter = (anyProcess, isSubprocess) => {
export const getIpcEmitter = (anyProcess, channel, isSubprocess) => {
if (IPC_EMITTERS.has(anyProcess)) {
return IPC_EMITTERS.get(anyProcess);
}
Expand All @@ -15,7 +15,12 @@ export const getIpcEmitter = (anyProcess, isSubprocess) => {
const ipcEmitter = new EventEmitter();
ipcEmitter.connected = true;
IPC_EMITTERS.set(anyProcess, ipcEmitter);
forwardEvents(ipcEmitter, anyProcess, isSubprocess);
forwardEvents({
ipcEmitter,
anyProcess,
channel,
isSubprocess,
});
return ipcEmitter;
};

Expand All @@ -24,11 +29,16 @@ const IPC_EMITTERS = new WeakMap();
// The `message` and `disconnect` events are buffered in the subprocess until the first listener is setup.
// However, unbuffering happens after one tick, so this give enough time for the caller to setup the listener on the proxy emitter first.
// See https://github.com/nodejs/node/blob/2aaeaa863c35befa2ebaa98fb7737ec84df4d8e9/lib/internal/child_process.js#L721
const forwardEvents = (ipcEmitter, anyProcess, isSubprocess) => {
const forwardEvents = ({ipcEmitter, anyProcess, channel, isSubprocess}) => {
const boundOnMessage = onMessage.bind(undefined, anyProcess, ipcEmitter);
anyProcess.on('message', boundOnMessage);
anyProcess.once('disconnect', onDisconnect.bind(undefined, {anyProcess, ipcEmitter, boundOnMessage}));
undoAddedReferences(anyProcess, isSubprocess);
anyProcess.once('disconnect', onDisconnect.bind(undefined, {
anyProcess,
channel,
ipcEmitter,
boundOnMessage,
}));
undoAddedReferences(channel, isSubprocess);
};

// Check whether there might still be some `message` events to receive
Expand Down
14 changes: 8 additions & 6 deletions lib/ipc/get-each.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,30 @@ import {getIpcEmitter, isConnected} from './forward.js';
import {addReference, removeReference} from './reference.js';

// Like `[sub]process.on('message')` but promise-based
export const getEachMessage = ({anyProcess, isSubprocess, ipc}) => loopOnMessages({
export const getEachMessage = ({anyProcess, channel, isSubprocess, ipc}) => loopOnMessages({
anyProcess,
channel,
isSubprocess,
ipc,
shouldAwait: !isSubprocess,
});

// Same but used internally
export const loopOnMessages = ({anyProcess, isSubprocess, ipc, shouldAwait}) => {
export const loopOnMessages = ({anyProcess, channel, isSubprocess, ipc, shouldAwait}) => {
validateIpcMethod({
methodName: 'getEachMessage',
isSubprocess,
ipc,
isConnected: isConnected(anyProcess),
});

addReference(anyProcess);
const ipcEmitter = getIpcEmitter(anyProcess, isSubprocess);
addReference(channel);
const ipcEmitter = getIpcEmitter(anyProcess, channel, isSubprocess);
const controller = new AbortController();
stopOnDisconnect(anyProcess, ipcEmitter, controller);
return iterateOnMessages({
anyProcess,
channel,
ipcEmitter,
isSubprocess,
shouldAwait,
Expand All @@ -40,14 +42,14 @@ const stopOnDisconnect = async (anyProcess, ipcEmitter, controller) => {
} catch {}
};

const iterateOnMessages = async function * ({anyProcess, ipcEmitter, isSubprocess, shouldAwait, controller}) {
const iterateOnMessages = async function * ({anyProcess, channel, ipcEmitter, isSubprocess, shouldAwait, controller}) {
try {
for await (const [message] of on(ipcEmitter, 'message', {signal: controller.signal})) {
yield message;
}
} catch {} finally {
controller.abort();
removeReference(anyProcess);
removeReference(channel);

if (!isSubprocess) {
disconnect(anyProcess);
Expand Down
17 changes: 11 additions & 6 deletions lib/ipc/get-one.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@ import {getIpcEmitter, isConnected} from './forward.js';
import {addReference, removeReference} from './reference.js';

// Like `[sub]process.once('message')` but promise-based
export const getOneMessage = ({anyProcess, isSubprocess, ipc}, {filter} = {}) => {
export const getOneMessage = ({anyProcess, channel, isSubprocess, ipc}, {filter} = {}) => {
validateIpcMethod({
methodName: 'getOneMessage',
isSubprocess,
ipc,
isConnected: isConnected(anyProcess),
});

return getOneMessageAsync(anyProcess, isSubprocess, filter);
return getOneMessageAsync({
anyProcess,
channel,
isSubprocess,
filter,
});
};

const getOneMessageAsync = async (anyProcess, isSubprocess, filter) => {
addReference(anyProcess);
const ipcEmitter = getIpcEmitter(anyProcess, isSubprocess);
const getOneMessageAsync = async ({anyProcess, channel, isSubprocess, filter}) => {
addReference(channel);
const ipcEmitter = getIpcEmitter(anyProcess, channel, isSubprocess);
const controller = new AbortController();
try {
return await Promise.race([
Expand All @@ -26,7 +31,7 @@ const getOneMessageAsync = async (anyProcess, isSubprocess, filter) => {
]);
} finally {
controller.abort();
removeReference(anyProcess);
removeReference(channel);
}
};

Expand Down
4 changes: 3 additions & 1 deletion lib/ipc/incoming.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {once} from 'node:events';
import {scheduler} from 'node:timers/promises';
import {waitForOutgoingMessages} from './outgoing.js';
import {redoAddedReferences} from './reference.js';

// Debounce the `message` event so it is emitted at most once per macrotask.
// This allows users to call `await getOneMessage()`/`getEachMessage()` multiple times in a row.
Expand Down Expand Up @@ -28,14 +29,15 @@ export const onMessage = async (anyProcess, ipcEmitter, message) => {
const INCOMING_MESSAGES = new WeakMap();

// If the `message` event is currently debounced, the `disconnect` event must wait for it
export const onDisconnect = async ({anyProcess, ipcEmitter, boundOnMessage}) => {
export const onDisconnect = async ({anyProcess, channel, ipcEmitter, boundOnMessage}) => {
const incomingMessages = INCOMING_MESSAGES.get(anyProcess);
while (incomingMessages?.length > 0) {
// eslint-disable-next-line no-await-in-loop
await once(ipcEmitter, 'message');
}

anyProcess.removeListener('message', boundOnMessage);
redoAddedReferences(channel);
ipcEmitter.connected = false;
ipcEmitter.emit('disconnect');
};
15 changes: 13 additions & 2 deletions lib/ipc/methods.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,25 @@ const getIpcMethods = (anyProcess, isSubprocess, ipc) => {
const anyProcessSend = anyProcess.send === undefined
? undefined
: promisify(anyProcess.send.bind(anyProcess));
const {channel} = anyProcess;
return {
sendMessage: sendMessage.bind(undefined, {
anyProcess,
anyProcessSend,
isSubprocess,
ipc,
}),
getOneMessage: getOneMessage.bind(undefined, {anyProcess, isSubprocess, ipc}),
getEachMessage: getEachMessage.bind(undefined, {anyProcess, isSubprocess, ipc}),
getOneMessage: getOneMessage.bind(undefined, {
anyProcess,
channel,
isSubprocess,
ipc,
}),
getEachMessage: getEachMessage.bind(undefined, {
anyProcess,
channel,
isSubprocess,
ipc,
}),
};
};
27 changes: 19 additions & 8 deletions lib/ipc/reference.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
// By default, Node.js keeps the subprocess alive while it has a `message` or `disconnect` listener.
// We replicate the same logic for the events that we proxy.
// This ensures the subprocess is kept alive while `sendMessage()`, `getOneMessage()` and `getEachMessage()` are ongoing.
// This ensures the subprocess is kept alive while `getOneMessage()` and `getEachMessage()` are ongoing.
// This is not a problem with `sendMessage()` since Node.js handles that method automatically.
// We do not use `anyProcess.channel.ref()` since this would prevent the automatic `.channel.refCounted()` Node.js is doing.
// We keep a reference to `anyProcess.channel` since it might be `null` while `getOneMessage()` or `getEachMessage()` is still processing debounced messages.
// See https://github.com/nodejs/node/blob/2aaeaa863c35befa2ebaa98fb7737ec84df4d8e9/lib/internal/child_process.js#L547
export const addReference = anyProcess => {
anyProcess.channel?.refCounted();
export const addReference = channel => {
channel.refCounted();
};

export const removeReference = anyProcess => {
anyProcess.channel?.unrefCounted();
export const removeReference = channel => {
channel.unrefCounted();
};

// To proxy events, we setup some global listeners on the `message` and `disconnect` events.
// Those should not keep the subprocess alive, so we remove the automatic counting that Node.js is doing.
// See https://github.com/nodejs/node/blob/1b965270a9c273d4cf70e8808e9d28b9ada7844f/lib/child_process.js#L180
export const undoAddedReferences = (anyProcess, isSubprocess) => {
export const undoAddedReferences = (channel, isSubprocess) => {
if (isSubprocess) {
removeReference(anyProcess);
removeReference(anyProcess);
removeReference(channel);
removeReference(channel);
}
};

// Reverse it during `disconnect`
export const redoAddedReferences = (channel, isSubprocess) => {
if (isSubprocess) {
addReference(channel);
addReference(channel);
}
};
3 changes: 0 additions & 3 deletions lib/ipc/send.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
disconnect,
} from './validation.js';
import {startSendMessage, endSendMessage} from './outgoing.js';
import {addReference, removeReference} from './reference.js';

// Like `[sub]process.send()` but promise-based.
// We do not `await subprocess` during `.sendMessage()` nor `.getOneMessage()` since those methods are transient.
Expand All @@ -28,7 +27,6 @@ export const sendMessage = ({anyProcess, anyProcessSend, isSubprocess, ipc}, mes
};

const sendMessageAsync = async ({anyProcess, anyProcessSend, isSubprocess, message}) => {
addReference(anyProcess);
const outgoingMessagesState = startSendMessage(anyProcess);
try {
await anyProcessSend(message);
Expand All @@ -39,6 +37,5 @@ const sendMessageAsync = async ({anyProcess, anyProcessSend, isSubprocess, messa
throw error;
} finally {
endSendMessage(outgoingMessagesState);
removeReference(anyProcess);
}
};
7 changes: 7 additions & 0 deletions test/fixtures/ipc-disconnect-get.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env node
import process from 'node:process';
import {getOneMessage} from '../../index.js';

process.disconnect();
console.log(process.channel);
await getOneMessage();
19 changes: 19 additions & 0 deletions test/ipc/reference.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,22 @@ test('process.once("disconnect") does not keep the subprocess alive, after getOn
t.deepEqual(ipcOutput, ['.']);
t.is(stdout, '.');
});

test('Can call subprocess.disconnect() right away', async t => {
const subprocess = execa('ipc-send.js', {ipc: true});
subprocess.disconnect();
t.is(subprocess.channel, null);

await t.throwsAsync(subprocess.getOneMessage(), {
message: /subprocess.getOneMessage\(\) could not complete/,
});
await t.throwsAsync(subprocess, {
message: /Error: sendMessage\(\) cannot be used/,
});
});

test('Can call process.disconnect() right away', async t => {
const {stdout, stderr} = await t.throwsAsync(execa('ipc-disconnect-get.js', {ipc: true}));
t.is(stdout, 'null');
t.true(stderr.includes('Error: getOneMessage() cannot be used'));
});

0 comments on commit 18e607c

Please sign in to comment.