Skip to content

Commit

Permalink
Add strict option to sendMessage() (#1098)
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky committed May 26, 2024
1 parent b959030 commit e085b10
Show file tree
Hide file tree
Showing 30 changed files with 635 additions and 46 deletions.
27 changes: 21 additions & 6 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ Split a `command` string into an array. For example, `'npm run build'` returns `

[More info.](escaping.md#user-defined-input)

### sendMessage(message)
### sendMessage(message, sendMessageOptions?)

`message`: [`Message`](ipc.md#message-type)\
`sendMessageOptions`: [`SendMessageOptions`](#sendmessageoptions)\
_Returns_: `Promise<void>`

Send a `message` to the parent process.
Expand All @@ -103,9 +104,22 @@ This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#me

[More info.](ipc.md#exchanging-messages)

#### sendMessageOptions

_Type_: `object`

#### sendMessageOptions.strict

_Type_: `boolean`\
_Default_: `false`

Throw when the other process is not receiving or listening to messages.

[More info.](ipc.md#ensure-messages-are-received)

### getOneMessage(getOneMessageOptions?)

_getOneMessageOptions_: [`GetOneMessageOptions`](#getonemessageoptions)\
`getOneMessageOptions`: [`GetOneMessageOptions`](#getonemessageoptions)\
_Returns_: [`Promise<Message>`](ipc.md#message-type)

Receive a single `message` from the parent process.
Expand Down Expand Up @@ -261,9 +275,10 @@ This is `undefined` if the subprocess failed to spawn.

[More info.](termination.md#inter-process-termination)

### subprocess.sendMessage(message)
### subprocess.sendMessage(message, sendMessageOptions)

`message`: [`Message`](ipc.md#message-type)\
`sendMessageOptions`: [`SendMessageOptions`](#sendmessageoptions)\
_Returns_: `Promise<void>`

Send a `message` to the subprocess.
Expand All @@ -274,7 +289,7 @@ This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#me

### subprocess.getOneMessage(getOneMessageOptions?)

_getOneMessageOptions_: [`GetOneMessageOptions`](#getonemessageoptions)\
`getOneMessageOptions`: [`GetOneMessageOptions`](#getonemessageoptions)\
_Returns_: [`Promise<Message>`](ipc.md#message-type)

Receive a single `message` from the subprocess.
Expand Down Expand Up @@ -485,7 +500,7 @@ Items are arrays when their corresponding `stdio` option is a [transform in obje

_Type_: [`Message[]`](ipc.md#message-type)

All the messages [sent by the subprocess](#sendmessagemessage) to the current process.
All the messages [sent by the subprocess](#sendmessagemessage-sendmessageoptions) to the current process.

This is empty unless the [`ipc`](#optionsipc) option is `true`. Also, this is empty if the [`buffer`](#optionsbuffer) option is `false`.

Expand Down Expand Up @@ -914,7 +929,7 @@ By default, this applies to both `stdout` and `stderr`, but [different values ca
_Type:_ `boolean`\
_Default:_ `true` if either the [`node`](#optionsnode) option or the [`ipcInput`](#optionsipcinput) is set, `false` otherwise

Enables exchanging messages with the subprocess using [`subprocess.sendMessage(message)`](#subprocesssendmessagemessage), [`subprocess.getOneMessage()`](#subprocessgetonemessagegetonemessageoptions) and [`subprocess.getEachMessage()`](#subprocessgeteachmessage).
Enables exchanging messages with the subprocess using [`subprocess.sendMessage(message)`](#subprocesssendmessagemessage-sendmessageoptions), [`subprocess.getOneMessage()`](#subprocessgetonemessagegetonemessageoptions) and [`subprocess.getEachMessage()`](#subprocessgeteachmessage).

The subprocess must be a Node.js file.

Expand Down
2 changes: 1 addition & 1 deletion docs/execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ Synchronous execution is generally discouraged as it holds the CPU and prevents
- Signal termination: [`subprocess.kill()`](api.md#subprocesskillerror), [`subprocess.pid`](api.md#subprocesspid), [`cleanup`](api.md#optionscleanup) option, [`cancelSignal`](api.md#optionscancelsignal) option, [`forceKillAfterDelay`](api.md#optionsforcekillafterdelay) option.
- Piping multiple subprocesses: [`subprocess.pipe()`](api.md#subprocesspipefile-arguments-options).
- [`subprocess.iterable()`](lines.md#progressive-splitting).
- [IPC](ipc.md): [`sendMessage()`](api.md#sendmessagemessage), [`getOneMessage()`](api.md#getonemessagegetonemessageoptions), [`getEachMessage()`](api.md#geteachmessage), [`result.ipcOutput`](output.md#any-output-type), [`ipc`](api.md#optionsipc) option, [`serialization`](api.md#optionsserialization) option, [`ipcInput`](input.md#any-input-type) option.
- [IPC](ipc.md): [`sendMessage()`](api.md#sendmessagemessage-sendmessageoptions), [`getOneMessage()`](api.md#getonemessagegetonemessageoptions), [`getEachMessage()`](api.md#geteachmessage), [`result.ipcOutput`](output.md#any-output-type), [`ipc`](api.md#optionsipc) option, [`serialization`](api.md#optionsserialization) option, [`ipcInput`](input.md#any-input-type) option.
- [`result.all`](api.md#resultall) is not interleaved.
- [`detached`](api.md#optionsdetached) option.
- The [`maxBuffer`](api.md#optionsmaxbuffer) option is always measured in bytes, not in characters, [lines](api.md#optionslines) nor [objects](transform.md#object-mode). Also, it ignores transforms and the [`encoding`](api.md#optionsencoding) option.
Expand Down
41 changes: 38 additions & 3 deletions docs/ipc.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ When the [`ipc`](api.md#optionsipc) option is `true`, the current process and su

The `ipc` option defaults to `true` when using [`execaNode()`](node.md#run-nodejs-files) or the [`node`](node.md#run-nodejs-files) option.

The current process sends messages with [`subprocess.sendMessage(message)`](api.md#subprocesssendmessagemessage) and receives them with [`subprocess.getOneMessage()`](api.md#subprocessgetonemessagegetonemessageoptions).
The current process sends messages with [`subprocess.sendMessage(message)`](api.md#subprocesssendmessagemessage-sendmessageoptions) and receives them with [`subprocess.getOneMessage()`](api.md#subprocessgetonemessagegetonemessageoptions).

The subprocess uses [`sendMessage(message)`](api.md#sendmessagemessage) and [`getOneMessage()`](api.md#getonemessagegetonemessageoptions). Those are the same methods, but imported directly from the `'execa'` module.
The subprocess uses [`sendMessage(message)`](api.md#sendmessagemessage-sendmessageoptions) and [`getOneMessage()`](api.md#getonemessagegetonemessageoptions). Those are the same methods, but imported directly from the `'execa'` module.

```js
// parent.js
Expand All @@ -24,6 +24,7 @@ const subprocess = execaNode`child.js`;
await subprocess.sendMessage('Hello from parent');
const message = await subprocess.getOneMessage();
console.log(message); // 'Hello from child'
await subprocess;
```

```js
Expand Down Expand Up @@ -91,6 +92,40 @@ for await (const message of getEachMessage()) {
}
```

## Ensure messages are received

When a message is sent by one process, the other process must receive it using [`getOneMessage()`](#exchanging-messages), [`getEachMessage()`](#listening-to-messages), or automatically with [`result.ipcOutput`](api.md#resultipcoutput). If not, that message is silently discarded.

If the [`strict: true`](api.md#sendmessageoptionsstrict) option is passed to [`subprocess.sendMessage(message)`](api.md#subprocesssendmessagemessage-sendmessageoptions) or [`sendMessage(message)`](api.md#sendmessagemessage-sendmessageoptions), an error is thrown instead. This helps identifying subtle race conditions like the following example.

```js
// main.js
import {execaNode} from 'execa';

const subprocess = execaNode`build.js`;
// This `build` message is received
await subprocess.sendMessage('build', {strict: true});
// This `lint` message is not received, so it throws
await subprocess.sendMessage('lint', {strict: true});
await subprocess;
```

```js
// build.js
import {getOneMessage} from 'execa';

// Receives the 'build' message
const task = await getOneMessage();
// The `lint` message is sent while `runTask()` is ongoing
// Therefore the `lint` message is discarded
await runTask(task);

// Does not receive the `lint` message
// Without `strict`, this would wait forever
const secondTask = await getOneMessage();
await runTask(secondTask);
```

## Retrieve all messages

The [`result.ipcOutput`](api.md#resultipcoutput) array contains all the messages sent by the subprocess. In many situations, this is simpler than using [`subprocess.getOneMessage()`](api.md#subprocessgetonemessagegetonemessageoptions) and [`subprocess.getEachMessage()`](api.md#subprocessgeteachmessage).
Expand Down Expand Up @@ -144,7 +179,7 @@ To limit messages to JSON instead, the [`serialization`](api.md#optionsserializa
```js
import {execaNode} from 'execa';

const subprocess = execaNode({serialization: 'json'})`child.js`;
await execaNode({serialization: 'json'})`child.js`;
```

## Messages order
Expand Down
2 changes: 1 addition & 1 deletion docs/typescript.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

## Available types

The following types can be imported: [`ResultPromise`](api.md#return-value), [`Subprocess`](api.md#subprocess), [`Result`](api.md#result), [`ExecaError`](api.md#execaerror), [`Options`](api.md#options), [`StdinOption`](api.md#optionsstdin), [`StdoutStderrOption`](api.md#optionsstdout), [`TemplateExpression`](api.md#execacommand) and [`Message`](api.md#subprocesssendmessagemessage).
The following types can be imported: [`ResultPromise`](api.md#return-value), [`Subprocess`](api.md#subprocess), [`Result`](api.md#result), [`ExecaError`](api.md#execaerror), [`Options`](api.md#options), [`StdinOption`](api.md#optionsstdin), [`StdoutStderrOption`](api.md#optionsstdout), [`TemplateExpression`](api.md#execacommand) and [`Message`](api.md#subprocesssendmessagemessage-sendmessageoptions).

```ts
import {
Expand Down
8 changes: 7 additions & 1 deletion lib/ipc/forward.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,17 @@ const IPC_EMITTERS = new WeakMap();
// However, unbuffering happens after one tick, so this give enough time for the caller to setup the listener on the proxy emitter first.
// See https://github.com/nodejs/node/blob/2aaeaa863c35befa2ebaa98fb7737ec84df4d8e9/lib/internal/child_process.js#L721
const forwardEvents = ({ipcEmitter, anyProcess, channel, isSubprocess}) => {
const boundOnMessage = onMessage.bind(undefined, anyProcess, ipcEmitter);
const boundOnMessage = onMessage.bind(undefined, {
anyProcess,
channel,
isSubprocess,
ipcEmitter,
});
anyProcess.on('message', boundOnMessage);
anyProcess.once('disconnect', onDisconnect.bind(undefined, {
anyProcess,
channel,
isSubprocess,
ipcEmitter,
boundOnMessage,
}));
Expand Down
31 changes: 28 additions & 3 deletions lib/ipc/get-each.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {once, on} from 'node:events';
import {validateIpcMethod, disconnect} from './validation.js';
import {validateIpcMethod, disconnect, getStrictResponseError} from './validation.js';
import {getIpcEmitter, isConnected} from './forward.js';
import {addReference, removeReference} from './reference.js';

Expand All @@ -24,14 +24,22 @@ export const loopOnMessages = ({anyProcess, channel, isSubprocess, ipc, shouldAw
addReference(channel);
const ipcEmitter = getIpcEmitter(anyProcess, channel, isSubprocess);
const controller = new AbortController();
const state = {};
stopOnDisconnect(anyProcess, ipcEmitter, controller);
abortOnStrictError({
ipcEmitter,
isSubprocess,
controller,
state,
});
return iterateOnMessages({
anyProcess,
channel,
ipcEmitter,
isSubprocess,
shouldAwait,
controller,
state,
});
};

Expand All @@ -42,12 +50,23 @@ const stopOnDisconnect = async (anyProcess, ipcEmitter, controller) => {
} catch {}
};

const iterateOnMessages = async function * ({anyProcess, channel, ipcEmitter, isSubprocess, shouldAwait, controller}) {
const abortOnStrictError = async ({ipcEmitter, isSubprocess, controller, state}) => {
try {
const [error] = await once(ipcEmitter, 'strict:error', {signal: controller.signal});
state.error = getStrictResponseError(error, isSubprocess);
controller.abort();
} catch {}
};

const iterateOnMessages = async function * ({anyProcess, channel, ipcEmitter, isSubprocess, shouldAwait, controller, state}) {
try {
for await (const [message] of on(ipcEmitter, 'message', {signal: controller.signal})) {
throwIfStrictError(state);
yield message;
}
} catch {} finally {
} catch {
throwIfStrictError(state);
} finally {
controller.abort();
removeReference(channel);

Expand All @@ -60,3 +79,9 @@ const iterateOnMessages = async function * ({anyProcess, channel, ipcEmitter, is
}
}
};

const throwIfStrictError = ({error}) => {
if (error) {
throw error;
}
};
13 changes: 12 additions & 1 deletion lib/ipc/get-one.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import {once, on} from 'node:events';
import {validateIpcMethod, throwOnEarlyDisconnect, disconnect} from './validation.js';
import {
validateIpcMethod,
throwOnEarlyDisconnect,
disconnect,
getStrictResponseError,
} from './validation.js';
import {getIpcEmitter, isConnected} from './forward.js';
import {addReference, removeReference} from './reference.js';

Expand Down Expand Up @@ -28,6 +33,7 @@ const getOneMessageAsync = async ({anyProcess, channel, isSubprocess, filter}) =
return await Promise.race([
getMessage(ipcEmitter, filter, controller),
throwOnDisconnect(ipcEmitter, isSubprocess, controller),
throwOnStrictError(ipcEmitter, isSubprocess, controller),
]);
} catch (error) {
disconnect(anyProcess);
Expand Down Expand Up @@ -55,3 +61,8 @@ const throwOnDisconnect = async (ipcEmitter, isSubprocess, {signal}) => {
await once(ipcEmitter, 'disconnect', {signal});
throwOnEarlyDisconnect(isSubprocess);
};

const throwOnStrictError = async (ipcEmitter, isSubprocess, {signal}) => {
const [error] = await once(ipcEmitter, 'strict:error', {signal});
throw getStrictResponseError(error, isSubprocess);
};
35 changes: 26 additions & 9 deletions lib/ipc/incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {once} from 'node:events';
import {scheduler} from 'node:timers/promises';
import {waitForOutgoingMessages} from './outgoing.js';
import {redoAddedReferences} from './reference.js';
import {handleStrictRequest, handleStrictResponse} from './strict.js';

// By default, Node.js buffers `message` events.
// - Buffering happens when there is a `message` event is emitted but there is no handler.
Expand All @@ -21,39 +22,55 @@ import {redoAddedReferences} from './reference.js';
// The default behavior does not allow users to realize they made that mistake.
// To solve those problems, instead of buffering messages, we debounce them.
// The `message` event so it is emitted at most once per macrotask.
export const onMessage = async (anyProcess, ipcEmitter, message) => {
export const onMessage = async ({anyProcess, channel, isSubprocess, ipcEmitter}, wrappedMessage) => {
if (handleStrictResponse(wrappedMessage)) {
return;
}

if (!INCOMING_MESSAGES.has(anyProcess)) {
INCOMING_MESSAGES.set(anyProcess, []);
}

const incomingMessages = INCOMING_MESSAGES.get(anyProcess);
incomingMessages.push(message);
incomingMessages.push(wrappedMessage);

if (incomingMessages.length > 1) {
return;
}

while (incomingMessages.length > 0) {
// eslint-disable-next-line no-await-in-loop
await waitForOutgoingMessages(anyProcess);
await waitForOutgoingMessages(anyProcess, ipcEmitter);
// eslint-disable-next-line no-await-in-loop
await scheduler.yield();
ipcEmitter.emit('message', incomingMessages.shift());

// eslint-disable-next-line no-await-in-loop
const message = await handleStrictRequest({
wrappedMessage: incomingMessages[0],
anyProcess,
channel,
isSubprocess,
ipcEmitter,
});

incomingMessages.shift();
ipcEmitter.emit('message', message);
ipcEmitter.emit('message:done');
}
};

const INCOMING_MESSAGES = new WeakMap();

// If the `message` event is currently debounced, the `disconnect` event must wait for it
export const onDisconnect = async ({anyProcess, channel, ipcEmitter, boundOnMessage}) => {
export const onDisconnect = async ({anyProcess, channel, isSubprocess, ipcEmitter, boundOnMessage}) => {
const incomingMessages = INCOMING_MESSAGES.get(anyProcess);
while (incomingMessages?.length > 0) {
// eslint-disable-next-line no-await-in-loop
await once(ipcEmitter, 'message');
await once(ipcEmitter, 'message:done');
}

anyProcess.removeListener('message', boundOnMessage);
redoAddedReferences(channel);
redoAddedReferences(channel, isSubprocess);
ipcEmitter.connected = false;
ipcEmitter.emit('disconnect');
};

const INCOMING_MESSAGES = new WeakMap();
1 change: 1 addition & 0 deletions lib/ipc/methods.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export const getIpcExport = () => getIpcMethods(process, true, process.channel !
const getIpcMethods = (anyProcess, isSubprocess, ipc) => ({
sendMessage: sendMessage.bind(undefined, {
anyProcess,
channel: anyProcess.channel,
isSubprocess,
ipc,
}),
Expand Down
17 changes: 14 additions & 3 deletions lib/ipc/outgoing.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {createDeferred} from '../utils/deferred.js';
import {SUBPROCESS_OPTIONS} from '../arguments/fd-options.js';

// When `sendMessage()` is ongoing, any `message` being received waits before being emitted.
// This allows calling one or multiple `await sendMessage()` followed by `await getOneMessage()`/`await getEachMessage()`.
Expand All @@ -19,12 +20,22 @@ export const endSendMessage = ({outgoingMessages, onMessageSent}) => {
onMessageSent.resolve();
};

// Await while `sendMessage()` is ongoing
export const waitForOutgoingMessages = async anyProcess => {
while (OUTGOING_MESSAGES.get(anyProcess)?.size > 0) {
// Await while `sendMessage()` is ongoing, unless there is already a `message` listener
export const waitForOutgoingMessages = async (anyProcess, ipcEmitter) => {
while (!hasMessageListeners(anyProcess, ipcEmitter) && OUTGOING_MESSAGES.get(anyProcess)?.size > 0) {
// eslint-disable-next-line no-await-in-loop
await Promise.all(OUTGOING_MESSAGES.get(anyProcess));
}
};

const OUTGOING_MESSAGES = new WeakMap();

// Whether any `message` listener is setup
export const hasMessageListeners = (anyProcess, ipcEmitter) => ipcEmitter.listenerCount('message') > getMinListenerCount(anyProcess);

// When `buffer` is `false`, we set up a `message` listener that should be ignored.
// That listener is only meant to intercept `strict` acknowledgement responses.
const getMinListenerCount = anyProcess => SUBPROCESS_OPTIONS.has(anyProcess)
&& !SUBPROCESS_OPTIONS.get(anyProcess).options.buffer.at(-1)
? 1
: 0;
Loading

0 comments on commit e085b10

Please sign in to comment.