diff --git a/lib/ipc/buffer-messages.js b/lib/ipc/buffer-messages.js index ac8bc3c986..6f0ee7bc26 100644 --- a/lib/ipc/buffer-messages.js +++ b/lib/ipc/buffer-messages.js @@ -1,6 +1,7 @@ import {checkIpcMaxBuffer} from '../io/max-buffer.js'; import {shouldLogIpc, logIpcOutput} from '../verbose/ipc.js'; import {loopOnMessages} from './get-each.js'; +import {waitForDisconnect} from './forward.js'; // Iterate through IPC messages sent by the subprocess export const waitForIpcOutput = async ({ @@ -20,6 +21,7 @@ export const waitForIpcOutput = async ({ const maxBuffer = maxBufferArray.at(-1); if (!isVerbose && !buffer) { + await waitForDisconnect(subprocess); return ipcOutput; } @@ -42,3 +44,8 @@ export const waitForIpcOutput = async ({ return ipcOutput; }; + +export const getBufferedIpcOutput = async (ipcOutputPromise, ipcOutput) => { + await Promise.allSettled([ipcOutputPromise]); + return ipcOutput; +}; diff --git a/lib/ipc/forward.js b/lib/ipc/forward.js index ee794b5d9e..d3abc0d7c1 100644 --- a/lib/ipc/forward.js +++ b/lib/ipc/forward.js @@ -1,4 +1,4 @@ -import {EventEmitter} from 'node:events'; +import {EventEmitter, once} from 'node:events'; import {onMessage, onDisconnect} from './incoming.js'; import {undoAddedReferences} from './reference.js'; @@ -48,3 +48,20 @@ export const isConnected = anyProcess => { ? anyProcess.channel !== null : ipcEmitter.connected; }; + +// Wait for `disconnect` event, including debounced messages processing during disconnection. +// But does not set up message proxying. +export const waitForDisconnect = async subprocess => { + // Unlike `once()`, this does not stop on `error` events + await new Promise(resolve => { + subprocess.once('disconnect', resolve); + }); + + const ipcEmitter = IPC_EMITTERS.get(subprocess); + if (ipcEmitter === undefined || !ipcEmitter.connected) { + return; + } + + // This never emits an `error` event + await once(ipcEmitter, 'disconnect'); +}; diff --git a/lib/resolve/wait-subprocess.js b/lib/resolve/wait-subprocess.js index 6d1eefc9af..12a274e4e5 100644 --- a/lib/resolve/wait-subprocess.js +++ b/lib/resolve/wait-subprocess.js @@ -4,7 +4,7 @@ import {throwOnTimeout} from '../terminate/timeout.js'; import {isStandardStream} from '../utils/standard-stream.js'; import {TRANSFORM_TYPES} from '../stdio/type.js'; import {getBufferedData} from '../io/contents.js'; -import {waitForIpcOutput} from '../ipc/buffer-messages.js'; +import {waitForIpcOutput, getBufferedIpcOutput} from '../ipc/buffer-messages.js'; import {sendIpcInput} from '../ipc/ipc-input.js'; import {waitForAllStream} from './all-async.js'; import {waitForStdioStreams} from './stdio.js'; @@ -61,6 +61,14 @@ export const waitForSubprocessResult = async ({ streamInfo, }); const ipcOutput = []; + const ipcOutputPromise = waitForIpcOutput({ + subprocess, + buffer, + maxBuffer, + ipc, + ipcOutput, + verboseInfo, + }); const originalPromises = waitForOriginalStreams(originalStreams, subprocess, streamInfo); const customStreamsEndPromises = waitForCustomStreamsEnd(fileDescriptors, streamInfo); @@ -71,14 +79,7 @@ export const waitForSubprocessResult = async ({ waitForSuccessfulExit(exitPromise), Promise.all(stdioPromises), allPromise, - waitForIpcOutput({ - subprocess, - buffer, - maxBuffer, - ipc, - ipcOutput, - verboseInfo, - }), + ipcOutputPromise, sendIpcInput(subprocess, ipcInput), ...originalPromises, ...customStreamsEndPromises, @@ -93,7 +94,7 @@ export const waitForSubprocessResult = async ({ exitPromise, Promise.all(stdioPromises.map(stdioPromise => getBufferedData(stdioPromise))), getBufferedData(allPromise), - ipcOutput, + getBufferedIpcOutput(ipcOutputPromise, ipcOutput), Promise.allSettled(originalPromises), Promise.allSettled(customStreamsEndPromises), ]); diff --git a/test/ipc/buffer-messages.js b/test/ipc/buffer-messages.js index 30c3c60b0c..fb22120573 100644 --- a/test/ipc/buffer-messages.js +++ b/test/ipc/buffer-messages.js @@ -22,6 +22,13 @@ const testResultNoBuffer = async (t, options) => { test('Sets empty result.ipcOutput if buffer is false', testResultNoBuffer, {buffer: false}); test('Sets empty result.ipcOutput if buffer is false, fd-specific buffer', testResultNoBuffer, {buffer: {ipc: false}}); +test('Can use IPC methods when buffer is false', async t => { + const subprocess = execa('ipc-send.js', {ipc: true, buffer: false}); + t.is(await subprocess.getOneMessage(), foobarString); + const {ipcOutput} = await subprocess; + t.deepEqual(ipcOutput, []); +}); + test('Sets empty result.ipcOutput if ipc is false', async t => { const {ipcOutput} = await execa('empty.js'); t.deepEqual(ipcOutput, []); diff --git a/test/ipc/get-each.js b/test/ipc/get-each.js index 21db99fc3b..aac2e2a06a 100644 --- a/test/ipc/get-each.js +++ b/test/ipc/get-each.js @@ -1,3 +1,4 @@ +import {scheduler} from 'node:timers/promises'; import test from 'ava'; import {execa} from '../../index.js'; import {setFixtureDirectory} from '../helpers/fixtures-directory.js'; @@ -172,11 +173,11 @@ const testCleanupListeners = async (t, buffer) => { const subprocess = execa('ipc-send.js', {ipc: true, buffer}); t.is(subprocess.listenerCount('message'), buffer ? 1 : 0); - t.is(subprocess.listenerCount('disconnect'), buffer ? 1 : 0); + t.is(subprocess.listenerCount('disconnect'), 1); const promise = iterateAllMessages(subprocess); t.is(subprocess.listenerCount('message'), 1); - t.is(subprocess.listenerCount('disconnect'), 1); + t.is(subprocess.listenerCount('disconnect'), buffer ? 1 : 2); t.deepEqual(await promise, [foobarString]); t.is(subprocess.listenerCount('message'), 0); @@ -185,3 +186,28 @@ const testCleanupListeners = async (t, buffer) => { test('Cleans up subprocess.getEachMessage() listeners, buffer false', testCleanupListeners, false); test('Cleans up subprocess.getEachMessage() listeners, buffer true', testCleanupListeners, true); + +const sendContinuousMessages = async subprocess => { + while (subprocess.connected) { + for (let index = 0; index < 10; index += 1) { + subprocess.emit('message', foobarString); + } + + // eslint-disable-next-line no-await-in-loop + await scheduler.yield(); + } +}; + +test.serial('Handles buffered messages when disconnecting', async t => { + const subprocess = execa('ipc-send-fail.js', {ipc: true, buffer: false}); + + const promise = subprocess.getOneMessage(); + subprocess.emit('message', foobarString); + t.is(await promise, foobarString); + sendContinuousMessages(subprocess); + + const {exitCode, isTerminated, ipcOutput} = await t.throwsAsync(iterateAllMessages(subprocess)); + t.is(exitCode, 1); + t.false(isTerminated); + t.deepEqual(ipcOutput, []); +}); diff --git a/test/ipc/get-one.js b/test/ipc/get-one.js index fd809a5c0d..1da69ac58d 100644 --- a/test/ipc/get-one.js +++ b/test/ipc/get-one.js @@ -114,11 +114,11 @@ const testCleanupListeners = async (t, buffer, filter) => { const subprocess = execa('ipc-send.js', {ipc: true, buffer}); t.is(subprocess.listenerCount('message'), buffer ? 1 : 0); - t.is(subprocess.listenerCount('disconnect'), buffer ? 1 : 0); + t.is(subprocess.listenerCount('disconnect'), 1); const promise = subprocess.getOneMessage({filter}); t.is(subprocess.listenerCount('message'), 1); - t.is(subprocess.listenerCount('disconnect'), 1); + t.is(subprocess.listenerCount('disconnect'), buffer ? 1 : 2); t.is(await promise, foobarString); await subprocess;