Skip to content

Commit

Permalink
Add reference option to getOneMessage() and getEachMessage() (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky committed May 28, 2024
1 parent 3b56977 commit ff02af6
Show file tree
Hide file tree
Showing 14 changed files with 133 additions and 23 deletions.
28 changes: 26 additions & 2 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,18 @@ Ignore any `message` that returns `false`.

[More info.](ipc.md#filter-messages)

### getEachMessage()
#### getOneMessageOptions.reference

_Type_: `boolean`\
_Default_: `true`

Keep the subprocess alive while `getOneMessage()` is waiting.

[More info.](ipc.md#keeping-the-subprocess-alive)

### getEachMessage(getEachMessageOptions?)

`getEachMessageOptions`: [`GetEachMessageOptions`](#geteachmessageoptions)\
_Returns_: [`AsyncIterable<Message>`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols)

Iterate over each `message` from the parent process.
Expand All @@ -150,6 +160,19 @@ This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#me

[More info.](ipc.md#listening-to-messages)

#### getEachMessageOptions

_Type_: `object`

#### getEachMessageOptions.reference

_Type_: `boolean`\
_Default_: `true`

Keep the subprocess alive while `getEachMessage()` is waiting.

[More info.](ipc.md#keeping-the-subprocess-alive)

## Return value

_TypeScript:_ [`ResultPromise`](typescript.md)\
Expand Down Expand Up @@ -298,8 +321,9 @@ This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#me

[More info.](ipc.md#exchanging-messages)

### subprocess.getEachMessage()
### subprocess.getEachMessage(getEachMessageOptions?)

`getEachMessageOptions`: [`GetEachMessageOptions`](#geteachmessageoptions)\
_Returns_: [`AsyncIterable<Message>`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols)

Iterate over each `message` from the subprocess.
Expand Down
17 changes: 17 additions & 0 deletions docs/ipc.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,23 @@ await Promise.all([
]);
```

## Keeping the subprocess alive

By default, the subprocess is kept alive as long as [`getOneMessage()`](api.md#getonemessagegetonemessageoptions) or [`getEachMessage()`](api.md#geteachmessagegeteachmessageoptions) is waiting. This is recommended if you're sure the current process will send a message, as this prevents the subprocess from exiting too early.

However, if you don't know whether a message will be sent, this can leave the subprocess hanging forever. In that case, the [`reference: false`](api.md#geteachmessageoptionsreference) option can be set.

```js
import {getEachMessage} from 'execa';

// {type: 'gracefulExit'} is sometimes received, but not always
for await (const message of getEachMessage()) {
if (message.type === 'gracefulExit') {
gracefulExit({reference: false});
}
}
```

## Debugging

When the [`verbose`](api.md#optionsverbose) option is `'full'`, the IPC messages sent by the subprocess to the current process are [printed on the console](debugging.md#full-mode).
Expand Down
1 change: 1 addition & 0 deletions lib/ipc/buffer-messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export const waitForIpcOutput = async ({
isSubprocess: false,
ipc,
shouldAwait: false,
reference: true,
})) {
if (buffer) {
checkIpcMaxBuffer(subprocess, ipcOutput, maxBuffer);
Expand Down
12 changes: 7 additions & 5 deletions lib/ipc/get-each.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,25 @@ import {getIpcEmitter, isConnected} from './forward.js';
import {addReference, removeReference} from './reference.js';

// Like `[sub]process.on('message')` but promise-based
export const getEachMessage = ({anyProcess, channel, isSubprocess, ipc}) => loopOnMessages({
export const getEachMessage = ({anyProcess, channel, isSubprocess, ipc}, {reference = true} = {}) => loopOnMessages({
anyProcess,
channel,
isSubprocess,
ipc,
shouldAwait: !isSubprocess,
reference,
});

// Same but used internally
export const loopOnMessages = ({anyProcess, channel, isSubprocess, ipc, shouldAwait}) => {
export const loopOnMessages = ({anyProcess, channel, isSubprocess, ipc, shouldAwait, reference}) => {
validateIpcMethod({
methodName: 'getEachMessage',
isSubprocess,
ipc,
isConnected: isConnected(anyProcess),
});

addReference(channel);
addReference(channel, reference);
const ipcEmitter = getIpcEmitter(anyProcess, channel, isSubprocess);
const controller = new AbortController();
const state = {};
Expand All @@ -40,6 +41,7 @@ export const loopOnMessages = ({anyProcess, channel, isSubprocess, ipc, shouldAw
shouldAwait,
controller,
state,
reference,
});
};

Expand All @@ -58,7 +60,7 @@ const abortOnStrictError = async ({ipcEmitter, isSubprocess, controller, state})
} catch {}
};

const iterateOnMessages = async function * ({anyProcess, channel, ipcEmitter, isSubprocess, shouldAwait, controller, state}) {
const iterateOnMessages = async function * ({anyProcess, channel, ipcEmitter, isSubprocess, shouldAwait, controller, state, reference}) {
try {
for await (const [message] of on(ipcEmitter, 'message', {signal: controller.signal})) {
throwIfStrictError(state);
Expand All @@ -68,7 +70,7 @@ const iterateOnMessages = async function * ({anyProcess, channel, ipcEmitter, is
throwIfStrictError(state);
} finally {
controller.abort();
removeReference(channel);
removeReference(channel, reference);

if (!isSubprocess) {
disconnect(anyProcess);
Expand Down
9 changes: 5 additions & 4 deletions lib/ipc/get-one.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {getIpcEmitter, isConnected} from './forward.js';
import {addReference, removeReference} from './reference.js';

// Like `[sub]process.once('message')` but promise-based
export const getOneMessage = ({anyProcess, channel, isSubprocess, ipc}, {filter} = {}) => {
export const getOneMessage = ({anyProcess, channel, isSubprocess, ipc}, {reference = true, filter} = {}) => {
validateIpcMethod({
methodName: 'getOneMessage',
isSubprocess,
Expand All @@ -22,11 +22,12 @@ export const getOneMessage = ({anyProcess, channel, isSubprocess, ipc}, {filter}
channel,
isSubprocess,
filter,
reference,
});
};

const getOneMessageAsync = async ({anyProcess, channel, isSubprocess, filter}) => {
addReference(channel);
const getOneMessageAsync = async ({anyProcess, channel, isSubprocess, filter, reference}) => {
addReference(channel, reference);
const ipcEmitter = getIpcEmitter(anyProcess, channel, isSubprocess);
const controller = new AbortController();
try {
Expand All @@ -40,7 +41,7 @@ const getOneMessageAsync = async ({anyProcess, channel, isSubprocess, filter}) =
throw error;
} finally {
controller.abort();
removeReference(channel);
removeReference(channel, reference);
}
};

Expand Down
24 changes: 18 additions & 6 deletions lib/ipc/reference.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,23 @@
// We do not use `anyProcess.channel.ref()` since this would prevent the automatic `.channel.refCounted()` Node.js is doing.
// We keep a reference to `anyProcess.channel` since it might be `null` while `getOneMessage()` or `getEachMessage()` is still processing debounced messages.
// See https://github.com/nodejs/node/blob/2aaeaa863c35befa2ebaa98fb7737ec84df4d8e9/lib/internal/child_process.js#L547
export const addReference = channel => {
export const addReference = (channel, reference) => {
if (reference) {
addReferenceCount(channel);
}
};

const addReferenceCount = channel => {
channel.refCounted();
};

export const removeReference = channel => {
export const removeReference = (channel, reference) => {
if (reference) {
removeReferenceCount(channel);
}
};

const removeReferenceCount = channel => {
channel.unrefCounted();
};

Expand All @@ -18,15 +30,15 @@ export const removeReference = channel => {
// See https://github.com/nodejs/node/blob/1b965270a9c273d4cf70e8808e9d28b9ada7844f/lib/child_process.js#L180
export const undoAddedReferences = (channel, isSubprocess) => {
if (isSubprocess) {
removeReference(channel);
removeReference(channel);
removeReferenceCount(channel);
removeReferenceCount(channel);
}
};

// Reverse it during `disconnect`
export const redoAddedReferences = (channel, isSubprocess) => {
if (isSubprocess) {
addReference(channel);
addReference(channel);
addReferenceCount(channel);
addReferenceCount(channel);
}
};
4 changes: 4 additions & 0 deletions test-d/ipc/get-each.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ expectType<undefined>(execa('test', {ipc: false}).getEachMessage);
expectType<undefined>(execa('test', {ipcInput: undefined}).getEachMessage);
expectType<undefined>(execa('test', {ipc: false, ipcInput: ''}).getEachMessage);

subprocess.getEachMessage({reference: true} as const);
getEachMessage({reference: true} as const);
expectError(subprocess.getEachMessage({reference: 'true'} as const));
expectError(getEachMessage({reference: 'true'} as const));
8 changes: 7 additions & 1 deletion test-d/ipc/get-one.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ expectError(await subprocess.getOneMessage({filter(message: Message<'advanced'>)
expectError(await subprocess.getOneMessage({filter: (message: Message<'json'>) => true} as const));
expectError(await subprocess.getOneMessage({filter: (message: '') => true} as const));
expectError(await subprocess.getOneMessage({filter: true} as const));
expectError(await subprocess.getOneMessage({unknownOption: true} as const));

await getOneMessage({filter: undefined} as const);
await getOneMessage({filter: (message: Message) => true} as const);
Expand All @@ -54,4 +53,11 @@ expectError(await getOneMessage({filter(message: Message) {}} as const));
expectError(await getOneMessage({filter: (message: Message<'json'>) => true} as const));
expectError(await getOneMessage({filter: (message: '') => true} as const));
expectError(await getOneMessage({filter: true} as const));

expectError(await subprocess.getOneMessage({unknownOption: true} as const));
expectError(await getOneMessage({unknownOption: true} as const));

await subprocess.getOneMessage({reference: true} as const);
await getOneMessage({reference: true} as const);
expectError(await subprocess.getOneMessage({reference: 'true'} as const));
expectError(await getOneMessage({reference: 'true'} as const));
4 changes: 4 additions & 0 deletions test/fixtures/ipc-get-ref.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/usr/bin/env node
import {getOneMessage} from '../../index.js';

getOneMessage();
4 changes: 4 additions & 0 deletions test/fixtures/ipc-get-unref.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/usr/bin/env node
import {getOneMessage} from '../../index.js';

getOneMessage({reference: false});
4 changes: 4 additions & 0 deletions test/fixtures/ipc-iterate-ref.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/usr/bin/env node
import {getEachMessage} from '../../index.js';

getEachMessage();
4 changes: 4 additions & 0 deletions test/fixtures/ipc-iterate-unref.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/usr/bin/env node
import {getEachMessage} from '../../index.js';

getEachMessage({reference: false});
14 changes: 11 additions & 3 deletions test/ipc/reference.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,21 @@ import {PARALLEL_COUNT} from '../helpers/parallel.js';

setFixtureDirectory();

const testKeepAliveSubprocess = async (t, fixtureName) => {
const testReference = async (t, fixtureName) => {
const {timedOut} = await t.throwsAsync(execa(fixtureName, {ipc: true, timeout: 1e3}));
t.true(timedOut);
};

test('exports.getOneMessage() keeps the subprocess alive', testKeepAliveSubprocess, 'ipc-echo.js');
test('exports.getEachMessage() keeps the subprocess alive', testKeepAliveSubprocess, 'ipc-iterate.js');
test('exports.getOneMessage() keeps the subprocess alive', testReference, 'ipc-get-ref.js');
test('exports.getEachMessage() keeps the subprocess alive', testReference, 'ipc-iterate-ref.js');

const testUnreference = async (t, fixtureName) => {
const {ipcOutput} = await execa(fixtureName, {ipc: true});
t.deepEqual(ipcOutput, []);
};

test('exports.getOneMessage() does not keep the subprocess alive, reference false', testUnreference, 'ipc-get-unref.js');
test('exports.getEachMessage() does not keep the subprocess alive, reference false', testUnreference, 'ipc-iterate-unref.js');

test('exports.sendMessage() keeps the subprocess alive', async t => {
const {ipcOutput} = await execa('ipc-send-repeat.js', [`${PARALLEL_COUNT}`], {ipc: true});
Expand Down
23 changes: 21 additions & 2 deletions types/ipc.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ type GetOneMessageOptions<
Ignore any `message` that returns `false`.
*/
readonly filter?: (message: Message<Serialization>) => boolean;

/**
Keep the subprocess alive while `getOneMessage()` is waiting.
@default true
*/
readonly reference?: boolean;
};

/**
Expand All @@ -65,12 +72,24 @@ This requires the `ipc` option to be `true`. The type of `message` depends on th
*/
export function getOneMessage(getOneMessageOptions?: GetOneMessageOptions<Options['serialization']>): Promise<Message>;

/**
Options to `getEachMessage()` and `subprocess.getEachMessage()`
*/
type GetEachMessageOptions = {
/**
Keep the subprocess alive while `getEachMessage()` is waiting.
@default true
*/
readonly reference?: boolean;
};

/**
Iterate over each `message` from the parent process.
This requires the `ipc` option to be `true`. The type of `message` depends on the `serialization` option.
*/
export function getEachMessage(): AsyncIterableIterator<Message>;
export function getEachMessage(getEachMessageOptions?: GetEachMessageOptions): AsyncIterableIterator<Message>;

// IPC methods in the current process
export type IpcMethods<
Expand All @@ -97,7 +116,7 @@ export type IpcMethods<
This requires the `ipc` option to be `true`. The type of `message` depends on the `serialization` option.
*/
getEachMessage(): AsyncIterableIterator<Message<Serialization>>;
getEachMessage(getEachMessageOptions?: GetEachMessageOptions): AsyncIterableIterator<Message<Serialization>>;
}
// Those methods only work if the `ipc` option is `true`.
// At runtime, they are actually defined, in order to provide with a nice error message.
Expand Down

0 comments on commit ff02af6

Please sign in to comment.