Skip to content

Commit

Permalink
Add filter option to getOneMessage() (#1076)
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky committed May 20, 2024
1 parent ae9b360 commit 571e052
Show file tree
Hide file tree
Showing 15 changed files with 215 additions and 64 deletions.
20 changes: 17 additions & 3 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#me

[More info.](ipc.md#exchanging-messages)

### getOneMessage()
### getOneMessage(getOneMessageOptions?)

_getOneMessageOptions_: [`GetOneMessageOptions`](#getonemessageoptions)\
_Returns_: [`Promise<Message>`](ipc.md#message-type)

Receive a single `message` from the parent process.
Expand All @@ -113,6 +114,18 @@ This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#me

[More info.](ipc.md#exchanging-messages)

#### getOneMessageOptions

_Type_: `object`

#### getOneMessageOptions.filter

_Type_: [`(Message) => boolean`](ipc.md#message-type)

Ignore any `message` that returns `false`.

[More info.](ipc.md#filter-messages)

### getEachMessage()

_Returns_: [`AsyncIterable<Message>`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols)
Expand Down Expand Up @@ -259,8 +272,9 @@ This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#me

[More info.](ipc.md#exchanging-messages)

### subprocess.getOneMessage()
### subprocess.getOneMessage(getOneMessageOptions?)

_getOneMessageOptions_: [`GetOneMessageOptions`](#getonemessageoptions)\
_Returns_: [`Promise<Message>`](ipc.md#message-type)

Receive a single `message` from the subprocess.
Expand Down Expand Up @@ -900,7 +914,7 @@ By default, this applies to both `stdout` and `stderr`, but [different values ca
_Type:_ `boolean`\
_Default:_ `true` if either the [`node`](#optionsnode) option or the [`ipcInput`](#optionsipcinput) is set, `false` otherwise

Enables exchanging messages with the subprocess using [`subprocess.sendMessage(message)`](#subprocesssendmessagemessage), [`subprocess.getOneMessage()`](#subprocessgetonemessage) and [`subprocess.getEachMessage()`](#subprocessgeteachmessage).
Enables exchanging messages with the subprocess using [`subprocess.sendMessage(message)`](#subprocesssendmessagemessage), [`subprocess.getOneMessage()`](#subprocessgetonemessagegetonemessageoptions) and [`subprocess.getEachMessage()`](#subprocessgeteachmessage).

The subprocess must be a Node.js file.

Expand Down
2 changes: 1 addition & 1 deletion docs/execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ Synchronous execution is generally discouraged as it holds the CPU and prevents
- Signal termination: [`subprocess.kill()`](api.md#subprocesskillerror), [`subprocess.pid`](api.md#subprocesspid), [`cleanup`](api.md#optionscleanup) option, [`cancelSignal`](api.md#optionscancelsignal) option, [`forceKillAfterDelay`](api.md#optionsforcekillafterdelay) option.
- Piping multiple subprocesses: [`subprocess.pipe()`](api.md#subprocesspipefile-arguments-options).
- [`subprocess.iterable()`](lines.md#progressive-splitting).
- [IPC](ipc.md): [`sendMessage()`](api.md#sendmessagemessage), [`getOneMessage()`](api.md#getonemessage), [`getEachMessage()`](api.md#geteachmessage), [`result.ipcOutput`](output.md#any-output-type), [`ipc`](api.md#optionsipc) option, [`serialization`](api.md#optionsserialization) option, [`ipcInput`](input.md#any-input-type) option.
- [IPC](ipc.md): [`sendMessage()`](api.md#sendmessagemessage), [`getOneMessage()`](api.md#getonemessagegetonemessageoptions), [`getEachMessage()`](api.md#geteachmessage), [`result.ipcOutput`](output.md#any-output-type), [`ipc`](api.md#optionsipc) option, [`serialization`](api.md#optionsserialization) option, [`ipcInput`](input.md#any-input-type) option.
- [`result.all`](api.md#resultall) is not interleaved.
- [`detached`](api.md#optionsdetached) option.
- The [`maxBuffer`](api.md#optionsmaxbuffer) option is always measured in bytes, not in characters, [lines](api.md#optionslines) nor [objects](transform.md#object-mode). Also, it ignores transforms and the [`encoding`](api.md#optionsencoding) option.
Expand Down
2 changes: 1 addition & 1 deletion docs/input.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ await execa({stdin: 'inherit'})`npm run scaffold`;
## Any input type
If the subprocess [uses Node.js](node.md), [almost any type](ipc.md#message-type) can be passed to the subprocess using the [`ipcInput`](ipc.md#send-an-initial-message) option. The subprocess retrieves that input using [`getOneMessage()`](api.md#getonemessage).
If the subprocess [uses Node.js](node.md), [almost any type](ipc.md#message-type) can be passed to the subprocess using the [`ipcInput`](ipc.md#send-an-initial-message) option. The subprocess retrieves that input using [`getOneMessage()`](api.md#getonemessagegetonemessageoptions).
```js
// main.js
Expand Down
28 changes: 25 additions & 3 deletions docs/ipc.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ When the [`ipc`](api.md#optionsipc) option is `true`, the current process and su

The `ipc` option defaults to `true` when using [`execaNode()`](node.md#run-nodejs-files) or the [`node`](node.md#run-nodejs-files) option.

The current process sends messages with [`subprocess.sendMessage(message)`](api.md#subprocesssendmessagemessage) and receives them with [`subprocess.getOneMessage()`](api.md#subprocessgetonemessage). The subprocess uses [`sendMessage(message)`](api.md#sendmessagemessage) and [`getOneMessage()`](api.md#getonemessage) instead.
The current process sends messages with [`subprocess.sendMessage(message)`](api.md#subprocesssendmessagemessage) and receives them with [`subprocess.getOneMessage()`](api.md#subprocessgetonemessagegetonemessageoptions). The subprocess uses [`sendMessage(message)`](api.md#sendmessagemessage) and [`getOneMessage()`](api.md#getonemessagegetonemessageoptions) instead.

```js
// parent.js
Expand All @@ -33,7 +33,7 @@ console.log(await getOneMessage()); // 'Hello from parent'

## Listening to messages

[`subprocess.getOneMessage()`](api.md#subprocessgetonemessage) and [`getOneMessage()`](api.md#getonemessage) read a single message. To listen to multiple messages in a row, [`subprocess.getEachMessage()`](api.md#subprocessgeteachmessage) and [`getEachMessage()`](api.md#geteachmessage) should be used instead.
[`subprocess.getOneMessage()`](api.md#subprocessgetonemessagegetonemessageoptions) and [`getOneMessage()`](api.md#getonemessagegetonemessageoptions) read a single message. To listen to multiple messages in a row, [`subprocess.getEachMessage()`](api.md#subprocessgeteachmessage) and [`getEachMessage()`](api.md#geteachmessage) should be used instead.

[`subprocess.getEachMessage()`](api.md#subprocessgeteachmessage) waits for the subprocess to end (even when using [`break`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/break) or [`return`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/return)). It throws if the subprocess [fails](api.md#result). This means you do not need to `await` the subprocess' [promise](execution.md#result).

Expand Down Expand Up @@ -67,9 +67,29 @@ for await (const message of getEachMessage()) {
}
```

## Filter messages

```js
import {getOneMessage} from 'execa';

const startMessage = await getOneMessage({
filter: message => message.type === 'start',
});
```

```js
import {getEachMessage} from 'execa';

for await (const message of getEachMessage()) {
if (message.type === 'start') {
// ...
}
}
```

## Retrieve all messages

The [`result.ipcOutput`](api.md#resultipcoutput) array contains all the messages sent by the subprocess. In many situations, this is simpler than using [`subprocess.getOneMessage()`](api.md#subprocessgetonemessage) and [`subprocess.getEachMessage()`](api.md#subprocessgeteachmessage).
The [`result.ipcOutput`](api.md#resultipcoutput) array contains all the messages sent by the subprocess. In many situations, this is simpler than using [`subprocess.getOneMessage()`](api.md#subprocessgetonemessagegetonemessageoptions) and [`subprocess.getEachMessage()`](api.md#subprocessgeteachmessage).

```js
// main.js
Expand Down Expand Up @@ -118,6 +138,8 @@ By default, messages are serialized using [`structuredClone()`](https://develope
To limit messages to JSON instead, the [`serialization`](api.md#optionsserialization) option can be set to `'json'`.

```js
import {execaNode} from 'execa';

const subprocess = execaNode({serialization: 'json'})`child.js`;
```

Expand Down
31 changes: 24 additions & 7 deletions lib/ipc/get-one.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {once} from 'node:events';
import {once, on} from 'node:events';
import {
validateIpcOption,
validateConnection,
Expand All @@ -7,27 +7,31 @@ import {
} from './validation.js';

// Like `[sub]process.once('message')` but promise-based
export const getOneMessage = ({anyProcess, isSubprocess, ipc}) => {
export const getOneMessage = ({anyProcess, isSubprocess, ipc}, {filter} = {}) => {
const methodName = 'getOneMessage';
validateIpcOption(methodName, isSubprocess, ipc);
validateConnection(methodName, isSubprocess, anyProcess.channel !== null);

return onceMessage(anyProcess, isSubprocess, methodName);
return onceMessage({
anyProcess,
isSubprocess,
methodName,
filter,
});
};

const onceMessage = async (anyProcess, isSubprocess, methodName) => {
const onceMessage = async ({anyProcess, isSubprocess, methodName, filter}) => {
const controller = new AbortController();
try {
const [message] = await Promise.race([
once(anyProcess, 'message', {signal: controller.signal}),
return await Promise.race([
getMessage(anyProcess, filter, controller),
throwOnDisconnect({
anyProcess,
isSubprocess,
methodName,
controller,
}),
]);
return message;
} catch (error) {
disconnect(anyProcess);
throw error;
Expand All @@ -36,6 +40,19 @@ const onceMessage = async (anyProcess, isSubprocess, methodName) => {
}
};

const getMessage = async (anyProcess, filter, {signal}) => {
if (filter === undefined) {
const [message] = await once(anyProcess, 'message', {signal});
return message;
}

for await (const [message] of on(anyProcess, 'message', {signal})) {
if (filter(message)) {
return message;
}
}
};

const throwOnDisconnect = async ({anyProcess, isSubprocess, methodName, controller: {signal}}) => {
await once(anyProcess, 'disconnect', {signal});
throwOnEarlyDisconnect(methodName, isSubprocess);
Expand Down
11 changes: 5 additions & 6 deletions test-d/ipc/get-each.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@ import {
type Options,
} from '../../index.js';

for await (const message of getEachMessage()) {
expectType<Message>(message);
}

expectError(getEachMessage(''));

const subprocess = execa('test', {ipc: true});

for await (const message of subprocess.getEachMessage()) {
Expand All @@ -22,7 +16,12 @@ for await (const message of execa('test', {ipc: true, serialization: 'json'}).ge
expectType<Message<'json'>>(message);
}

for await (const message of getEachMessage()) {
expectType<Message>(message);
}

expectError(subprocess.getEachMessage(''));
expectError(getEachMessage(''));

execa('test', {ipcInput: ''}).getEachMessage();
execa('test', {ipcInput: '' as Message}).getEachMessage();
Expand Down
37 changes: 32 additions & 5 deletions test-d/ipc/get-one.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import {
type Options,
} from '../../index.js';

expectType<Promise<Message>>(getOneMessage());
expectError(await getOneMessage(''));

const subprocess = execa('test', {ipc: true});
expectType<Message<'advanced'>>(await subprocess.getOneMessage());
expectType<Message<'json'>>(await execa('test', {ipc: true, serialization: 'json'}).getOneMessage());
expectType<Promise<Message<'advanced'>>>(subprocess.getOneMessage());
const jsonSubprocess = execa('test', {ipc: true, serialization: 'json'});
expectType<Promise<Message<'json'>>>(jsonSubprocess.getOneMessage());
expectType<Promise<Message>>(getOneMessage());

expectError(await subprocess.getOneMessage(''));
expectError(await getOneMessage(''));

await execa('test', {ipcInput: ''}).getOneMessage();
await execa('test', {ipcInput: '' as Message}).getOneMessage();
Expand All @@ -26,3 +26,30 @@ expectType<undefined>(execa('test', {}).getOneMessage);
expectType<undefined>(execa('test', {ipc: false}).getOneMessage);
expectType<undefined>(execa('test', {ipcInput: undefined}).getOneMessage);
expectType<undefined>(execa('test', {ipc: false, ipcInput: ''}).getOneMessage);

await subprocess.getOneMessage({filter: undefined} as const);
await subprocess.getOneMessage({filter: (message: Message<'advanced'>) => true} as const);
await jsonSubprocess.getOneMessage({filter: (message: Message<'json'>) => true} as const);
await jsonSubprocess.getOneMessage({filter: (message: Message<'advanced'>) => true} as const);
await subprocess.getOneMessage({filter: (message: Message<'advanced'> | bigint) => true} as const);
await subprocess.getOneMessage({filter: () => true} as const);
expectError(await subprocess.getOneMessage({filter: (message: Message<'advanced'>) => ''} as const));
// eslint-disable-next-line @typescript-eslint/no-empty-function
expectError(await subprocess.getOneMessage({filter(message: Message<'advanced'>) {}} as const));
expectError(await subprocess.getOneMessage({filter: (message: Message<'json'>) => true} as const));
expectError(await subprocess.getOneMessage({filter: (message: '') => true} as const));
expectError(await subprocess.getOneMessage({filter: true} as const));
expectError(await subprocess.getOneMessage({unknownOption: true} as const));

await getOneMessage({filter: undefined} as const);
await getOneMessage({filter: (message: Message) => true} as const);
await getOneMessage({filter: (message: Message<'advanced'>) => true} as const);
await getOneMessage({filter: (message: Message | bigint) => true} as const);
await getOneMessage({filter: () => true} as const);
expectError(await getOneMessage({filter: (message: Message) => ''} as const));
// eslint-disable-next-line @typescript-eslint/no-empty-function
expectError(await getOneMessage({filter(message: Message) {}} as const));
expectError(await getOneMessage({filter: (message: Message<'json'>) => true} as const));
expectError(await getOneMessage({filter: (message: '') => true} as const));
expectError(await getOneMessage({filter: true} as const));
expectError(await getOneMessage({unknownOption: true} as const));
11 changes: 6 additions & 5 deletions test-d/ipc/send.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ import {
type Options,
} from '../../index.js';

const subprocess = execa('test', {ipc: true});
expectType<void>(await subprocess.sendMessage(''));
expectType<Promise<void>>(sendMessage(''));

expectError(await subprocess.sendMessage());
expectError(await sendMessage());
expectError(await subprocess.sendMessage(undefined));
expectError(await sendMessage(undefined));
expectError(await subprocess.sendMessage(0n));
expectError(await sendMessage(0n));
expectError(await subprocess.sendMessage(Symbol('test')));
expectError(await sendMessage(Symbol('test')));

const subprocess = execa('test', {ipc: true});
expectType<void>(await subprocess.sendMessage(''));

expectError(await subprocess.sendMessage());

await execa('test', {ipcInput: ''}).sendMessage('');
await execa('test', {ipcInput: '' as Message}).sendMessage('');
await execa('test', {} as Options).sendMessage?.('');
Expand Down
5 changes: 5 additions & 0 deletions test/fixtures/ipc-echo-filter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env node
import {sendMessage, getOneMessage} from '../../index.js';
import {foobarArray} from '../helpers/input.js';

await sendMessage(await getOneMessage(({filter: message => message === foobarArray[1]})));
8 changes: 8 additions & 0 deletions test/fixtures/ipc-echo-twice-filter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env node
import {sendMessage, getOneMessage} from '../../index.js';
import {alwaysPass} from '../helpers/ipc.js';

const message = await getOneMessage({filter: alwaysPass});
const secondMessagePromise = getOneMessage({filter: alwaysPass});
await sendMessage(message);
await sendMessage(await secondMessagePromise);
11 changes: 11 additions & 0 deletions test/fixtures/ipc-process-error-filter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/usr/bin/env node
import process from 'node:process';
import {getOneMessage} from '../../index.js';
import {foobarString} from '../helpers/input.js';
import {alwaysPass} from '../helpers/ipc.js';

const cause = new Error(foobarString);
await Promise.all([
getOneMessage({filter: alwaysPass}),
process.emit('error', cause),
]);
2 changes: 2 additions & 0 deletions test/helpers/ipc.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ export const iterateAllMessages = async subprocess => {

return messages;
};

export const alwaysPass = () => true;
11 changes: 11 additions & 0 deletions test/ipc/buffer-messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,14 @@ test('Sets empty error.ipcOutput, sync', t => {
const {ipcOutput} = t.throws(() => execaSync('fail.js'));
t.deepEqual(ipcOutput, []);
});

test('"error" event interrupts result.ipcOutput', async t => {
const subprocess = execa('ipc-echo-twice.js', {ipcInput: foobarString});
t.is(await subprocess.getOneMessage(), foobarString);

const cause = new Error(foobarString);
subprocess.emit('error', cause);
const error = await t.throwsAsync(subprocess);
t.is(error.cause, cause);
t.deepEqual(error.ipcOutput, [foobarString]);
});
Loading

0 comments on commit 571e052

Please sign in to comment.