Skip to content

Commit

Permalink
Improve buffer: false behavior with IPC (#1092)
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky committed May 24, 2024
1 parent 1bda998 commit 76d637a
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 15 deletions.
7 changes: 7 additions & 0 deletions lib/ipc/buffer-messages.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {checkIpcMaxBuffer} from '../io/max-buffer.js';
import {shouldLogIpc, logIpcOutput} from '../verbose/ipc.js';
import {loopOnMessages} from './get-each.js';
import {waitForDisconnect} from './forward.js';

// Iterate through IPC messages sent by the subprocess
export const waitForIpcOutput = async ({
Expand All @@ -20,6 +21,7 @@ export const waitForIpcOutput = async ({
const maxBuffer = maxBufferArray.at(-1);

if (!isVerbose && !buffer) {
await waitForDisconnect(subprocess);
return ipcOutput;
}

Expand All @@ -42,3 +44,8 @@ export const waitForIpcOutput = async ({

return ipcOutput;
};

export const getBufferedIpcOutput = async (ipcOutputPromise, ipcOutput) => {
await Promise.allSettled([ipcOutputPromise]);
return ipcOutput;
};
19 changes: 18 additions & 1 deletion lib/ipc/forward.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {EventEmitter} from 'node:events';
import {EventEmitter, once} from 'node:events';
import {onMessage, onDisconnect} from './incoming.js';
import {undoAddedReferences} from './reference.js';

Expand Down Expand Up @@ -48,3 +48,20 @@ export const isConnected = anyProcess => {
? anyProcess.channel !== null
: ipcEmitter.connected;
};

// Wait for `disconnect` event, including debounced messages processing during disconnection.
// But does not set up message proxying.
export const waitForDisconnect = async subprocess => {
// Unlike `once()`, this does not stop on `error` events
await new Promise(resolve => {
subprocess.once('disconnect', resolve);
});

const ipcEmitter = IPC_EMITTERS.get(subprocess);
if (ipcEmitter === undefined || !ipcEmitter.connected) {
return;
}

// This never emits an `error` event
await once(ipcEmitter, 'disconnect');
};
21 changes: 11 additions & 10 deletions lib/resolve/wait-subprocess.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {throwOnTimeout} from '../terminate/timeout.js';
import {isStandardStream} from '../utils/standard-stream.js';
import {TRANSFORM_TYPES} from '../stdio/type.js';
import {getBufferedData} from '../io/contents.js';
import {waitForIpcOutput} from '../ipc/buffer-messages.js';
import {waitForIpcOutput, getBufferedIpcOutput} from '../ipc/buffer-messages.js';
import {sendIpcInput} from '../ipc/ipc-input.js';
import {waitForAllStream} from './all-async.js';
import {waitForStdioStreams} from './stdio.js';
Expand Down Expand Up @@ -61,6 +61,14 @@ export const waitForSubprocessResult = async ({
streamInfo,
});
const ipcOutput = [];
const ipcOutputPromise = waitForIpcOutput({
subprocess,
buffer,
maxBuffer,
ipc,
ipcOutput,
verboseInfo,
});
const originalPromises = waitForOriginalStreams(originalStreams, subprocess, streamInfo);
const customStreamsEndPromises = waitForCustomStreamsEnd(fileDescriptors, streamInfo);

Expand All @@ -71,14 +79,7 @@ export const waitForSubprocessResult = async ({
waitForSuccessfulExit(exitPromise),
Promise.all(stdioPromises),
allPromise,
waitForIpcOutput({
subprocess,
buffer,
maxBuffer,
ipc,
ipcOutput,
verboseInfo,
}),
ipcOutputPromise,
sendIpcInput(subprocess, ipcInput),
...originalPromises,
...customStreamsEndPromises,
Expand All @@ -93,7 +94,7 @@ export const waitForSubprocessResult = async ({
exitPromise,
Promise.all(stdioPromises.map(stdioPromise => getBufferedData(stdioPromise))),
getBufferedData(allPromise),
ipcOutput,
getBufferedIpcOutput(ipcOutputPromise, ipcOutput),
Promise.allSettled(originalPromises),
Promise.allSettled(customStreamsEndPromises),
]);
Expand Down
7 changes: 7 additions & 0 deletions test/ipc/buffer-messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ const testResultNoBuffer = async (t, options) => {
test('Sets empty result.ipcOutput if buffer is false', testResultNoBuffer, {buffer: false});
test('Sets empty result.ipcOutput if buffer is false, fd-specific buffer', testResultNoBuffer, {buffer: {ipc: false}});

test('Can use IPC methods when buffer is false', async t => {
const subprocess = execa('ipc-send.js', {ipc: true, buffer: false});
t.is(await subprocess.getOneMessage(), foobarString);
const {ipcOutput} = await subprocess;
t.deepEqual(ipcOutput, []);
});

test('Sets empty result.ipcOutput if ipc is false', async t => {
const {ipcOutput} = await execa('empty.js');
t.deepEqual(ipcOutput, []);
Expand Down
30 changes: 28 additions & 2 deletions test/ipc/get-each.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import {scheduler} from 'node:timers/promises';
import test from 'ava';
import {execa} from '../../index.js';
import {setFixtureDirectory} from '../helpers/fixtures-directory.js';
Expand Down Expand Up @@ -172,11 +173,11 @@ const testCleanupListeners = async (t, buffer) => {
const subprocess = execa('ipc-send.js', {ipc: true, buffer});

t.is(subprocess.listenerCount('message'), buffer ? 1 : 0);
t.is(subprocess.listenerCount('disconnect'), buffer ? 1 : 0);
t.is(subprocess.listenerCount('disconnect'), 1);

const promise = iterateAllMessages(subprocess);
t.is(subprocess.listenerCount('message'), 1);
t.is(subprocess.listenerCount('disconnect'), 1);
t.is(subprocess.listenerCount('disconnect'), buffer ? 1 : 2);
t.deepEqual(await promise, [foobarString]);

t.is(subprocess.listenerCount('message'), 0);
Expand All @@ -185,3 +186,28 @@ const testCleanupListeners = async (t, buffer) => {

test('Cleans up subprocess.getEachMessage() listeners, buffer false', testCleanupListeners, false);
test('Cleans up subprocess.getEachMessage() listeners, buffer true', testCleanupListeners, true);

const sendContinuousMessages = async subprocess => {
while (subprocess.connected) {
for (let index = 0; index < 10; index += 1) {
subprocess.emit('message', foobarString);
}

// eslint-disable-next-line no-await-in-loop
await scheduler.yield();
}
};

test.serial('Handles buffered messages when disconnecting', async t => {
const subprocess = execa('ipc-send-fail.js', {ipc: true, buffer: false});

const promise = subprocess.getOneMessage();
subprocess.emit('message', foobarString);
t.is(await promise, foobarString);
sendContinuousMessages(subprocess);

const {exitCode, isTerminated, ipcOutput} = await t.throwsAsync(iterateAllMessages(subprocess));
t.is(exitCode, 1);
t.false(isTerminated);
t.deepEqual(ipcOutput, []);
});
4 changes: 2 additions & 2 deletions test/ipc/get-one.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ const testCleanupListeners = async (t, buffer, filter) => {
const subprocess = execa('ipc-send.js', {ipc: true, buffer});

t.is(subprocess.listenerCount('message'), buffer ? 1 : 0);
t.is(subprocess.listenerCount('disconnect'), buffer ? 1 : 0);
t.is(subprocess.listenerCount('disconnect'), 1);

const promise = subprocess.getOneMessage({filter});
t.is(subprocess.listenerCount('message'), 1);
t.is(subprocess.listenerCount('disconnect'), 1);
t.is(subprocess.listenerCount('disconnect'), buffer ? 1 : 2);

t.is(await promise, foobarString);
await subprocess;
Expand Down

0 comments on commit 76d637a

Please sign in to comment.