diff --git a/CHANGELOG.md b/CHANGELOG.md index 9983aabc1980..a0aa903dd4ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - `[expect]` Compare DOM nodes even if there are multiple Node classes ([#8064](https://github.com/facebook/jest/pull/8064)) - `[jest-worker]` `worker.getStdout()` can return `null` ([#8083](https://github.com/facebook/jest/pull/8083)) +- `[jest-worker]` Re-attach stdout and stderr from new processes/threads created after retries ([#8087](https://github.com/facebook/jest/pull/8087)) ### Chore & Maintenance diff --git a/packages/jest-worker/package.json b/packages/jest-worker/package.json index 83fb8cd2b7ae..f29b9756f903 100644 --- a/packages/jest-worker/package.json +++ b/packages/jest-worker/package.json @@ -17,6 +17,7 @@ "devDependencies": { "@types/merge-stream": "^1.1.2", "@types/supports-color": "^5.3.0", + "get-stream": "^4.1.0", "worker-farm": "^1.6.0" }, "engines": { diff --git a/packages/jest-worker/src/workers/ChildProcessWorker.ts b/packages/jest-worker/src/workers/ChildProcessWorker.ts index b0378ebac93d..2b540c762c7c 100644 --- a/packages/jest-worker/src/workers/ChildProcessWorker.ts +++ b/packages/jest-worker/src/workers/ChildProcessWorker.ts @@ -6,6 +6,8 @@ */ import childProcess, {ChildProcess} from 'child_process'; +import {PassThrough} from 'stream'; +import mergeStream from 'merge-stream'; import supportsColor from 'supports-color'; import { @@ -43,12 +45,18 @@ export default class ChildProcessWorker implements WorkerInterface { private _child!: ChildProcess; private _options: WorkerOptions; private _onProcessEnd!: OnEnd; + private _fakeStream: PassThrough | null; private _request: ChildMessage | null; private _retries!: number; + private _stderr: ReturnType | null; + private _stdout: ReturnType | null; constructor(options: WorkerOptions) { this._options = options; + this._fakeStream = null; this._request = null; + this._stderr = null; + this._stdout = null; this.initialize(); } @@ -68,6 +76,26 @@ export default class ChildProcessWorker implements WorkerInterface { ...this._options.forkOptions, }); + if (child.stdout) { + if (!this._stdout) { + // We need to add a permanent stream to the merged stream to prevent it + // from ending when the subprocess stream ends + this._stdout = mergeStream(this._getFakeStream()); + } + + this._stdout.add(child.stdout); + } + + if (child.stderr) { + if (!this._stderr) { + // We need to add a permanent stream to the merged stream to prevent it + // from ending when the subprocess stream ends + this._stderr = mergeStream(this._getFakeStream()); + } + + this._stderr.add(child.stderr); + } + child.on('message', this.onMessage.bind(this)); child.on('exit', this.onExit.bind(this)); @@ -79,6 +107,7 @@ export default class ChildProcessWorker implements WorkerInterface { ]); this._child = child; + this._retries++; // If we exceeded the amount of retries, we will emulate an error reply @@ -97,6 +126,14 @@ export default class ChildProcessWorker implements WorkerInterface { } } + private _shutdown() { + // End the temporary streams so the merged streams end too + if (this._fakeStream) { + this._fakeStream.end(); + this._fakeStream = null; + } + } + onMessage(response: ParentMessage) { let error; @@ -149,6 +186,8 @@ export default class ChildProcessWorker implements WorkerInterface { if (this._request) { this._child.send(this._request); } + } else { + this._shutdown(); } } @@ -170,11 +209,18 @@ export default class ChildProcessWorker implements WorkerInterface { return this._options.workerId; } - getStdout() { - return this._child.stdout; + getStdout(): NodeJS.ReadableStream | null { + return this._stdout; } - getStderr() { - return this._child.stderr; + getStderr(): NodeJS.ReadableStream | null { + return this._stderr; + } + + private _getFakeStream() { + if (!this._fakeStream) { + this._fakeStream = new PassThrough(); + } + return this._fakeStream; } } diff --git a/packages/jest-worker/src/workers/NodeThreadsWorker.ts b/packages/jest-worker/src/workers/NodeThreadsWorker.ts index 58011ba01b8e..45d748e40e8a 100644 --- a/packages/jest-worker/src/workers/NodeThreadsWorker.ts +++ b/packages/jest-worker/src/workers/NodeThreadsWorker.ts @@ -6,9 +6,11 @@ */ import path from 'path'; +import {PassThrough} from 'stream'; // ESLint doesn't know about this experimental module // eslint-disable-next-line import/no-unresolved import {Worker} from 'worker_threads'; +import mergeStream from 'merge-stream'; import { CHILD_MESSAGE_INITIALIZE, @@ -29,10 +31,17 @@ export default class ExperimentalWorker implements WorkerInterface { private _onProcessEnd!: OnEnd; private _request: ChildMessage | null; private _retries!: number; + private _stderr: ReturnType | null; + private _stdout: ReturnType | null; + private _fakeStream: PassThrough | null; constructor(options: WorkerOptions) { this._options = options; this._request = null; + this._stderr = null; + this._stdout = null; + this._fakeStream = null; + this.initialize(); } @@ -54,6 +63,26 @@ export default class ExperimentalWorker implements WorkerInterface { }, }); + if (this._worker.stdout) { + if (!this._stdout) { + // We need to add a permanent stream to the merged stream to prevent it + // from ending when the subprocess stream ends + this._stdout = mergeStream(this._getFakeStream()); + } + + this._stdout.add(this._worker.stdout); + } + + if (this._worker.stderr) { + if (!this._stderr) { + // We need to add a permanent stream to the merged stream to prevent it + // from ending when the subprocess stream ends + this._stderr = mergeStream(this._getFakeStream()); + } + + this._stderr.add(this._worker.stderr); + } + this._worker.on('message', this.onMessage.bind(this)); this._worker.on('exit', this.onExit.bind(this)); @@ -82,6 +111,14 @@ export default class ExperimentalWorker implements WorkerInterface { } } + private _shutdown() { + // End the permanent stream so the merged stream end too + if (this._fakeStream) { + this._fakeStream.end(); + this._fakeStream = null; + } + } + onMessage(response: ParentMessage) { let error; @@ -132,6 +169,8 @@ export default class ExperimentalWorker implements WorkerInterface { if (this._request) { this._worker.postMessage(this._request); } + } else { + this._shutdown(); } } @@ -154,11 +193,18 @@ export default class ExperimentalWorker implements WorkerInterface { return this._options.workerId; } - getStdout() { - return this._worker.stdout; + getStdout(): NodeJS.ReadableStream | null { + return this._stdout; } - getStderr() { - return this._worker.stderr; + getStderr(): NodeJS.ReadableStream | null { + return this._stderr; + } + + private _getFakeStream() { + if (!this._fakeStream) { + this._fakeStream = new PassThrough(); + } + return this._fakeStream; } } diff --git a/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js index 25ea5c85fe66..8b1e2fad4c35 100644 --- a/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js +++ b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js @@ -9,6 +9,8 @@ import EventEmitter from 'events'; import supportsColor from 'supports-color'; +import getStream from 'get-stream'; +import {PassThrough} from 'stream'; import { CHILD_MESSAGE_CALL, @@ -30,8 +32,8 @@ beforeEach(() => { childProcess.fork.mockImplementation(() => { forkInterface = Object.assign(new EventEmitter(), { send: jest.fn(), - stderr: {}, - stdout: {}, + stderr: new PassThrough(), + stdout: new PassThrough(), }); return forkInterface; @@ -124,15 +126,25 @@ it('stops initializing the worker after the amount of retries is exceeded', () = expect(onProcessEnd.mock.calls[0][1]).toBe(null); }); -it('provides stdout and stderr fields from the child process', () => { +it('provides stdout and stderr from the child processes', async () => { const worker = new Worker({ forkOptions: {}, maxRetries: 3, workerPath: '/tmp/foo', }); - expect(worker.getStdout()).toBe(forkInterface.stdout); - expect(worker.getStderr()).toBe(forkInterface.stderr); + const stdout = worker.getStdout(); + const stderr = worker.getStderr(); + + forkInterface.stdout.end('Hello ', {encoding: 'utf8'}); + forkInterface.stderr.end('Jest ', {encoding: 'utf8'}); + forkInterface.emit('exit'); + forkInterface.stdout.end('World!', {encoding: 'utf8'}); + forkInterface.stderr.end('Workers!', {encoding: 'utf8'}); + forkInterface.emit('exit', 0); + + await expect(getStream(stdout)).resolves.toEqual('Hello World!'); + await expect(getStream(stderr)).resolves.toEqual('Jest Workers!'); }); it('sends the task to the child process', () => { diff --git a/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js b/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js index 96529d515460..998f12c1f808 100644 --- a/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js +++ b/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js @@ -9,6 +9,8 @@ /* eslint-disable no-new */ +import getStream from 'get-stream'; + import { CHILD_MESSAGE_CALL, CHILD_MESSAGE_INITIALIZE, @@ -24,10 +26,12 @@ beforeEach(() => { jest.mock('worker_threads', () => { const fakeClass = jest.fn(() => { const EventEmitter = require('events'); + const {PassThrough} = require('stream'); + const thread = new EventEmitter(); thread.postMessage = jest.fn(); - thread.stdout = 'stdout'; - thread.stderr = 'stderr'; + thread.stdout = new PassThrough(); + thread.stderr = new PassThrough(); return thread; }); @@ -134,15 +138,25 @@ it('stops initializing the worker after the amount of retries is exceeded', () = expect(onProcessEnd.mock.calls[0][1]).toBe(null); }); -it('provides stdout and stderr fields from the child process', () => { +it('provides stdout and stderr from the child processes', async () => { const worker = new Worker({ forkOptions: {}, maxRetries: 3, workerPath: '/tmp/foo', }); - expect(worker.getStdout()).toBe('stdout'); - expect(worker.getStderr()).toBe('stderr'); + const stdout = worker.getStdout(); + const stderr = worker.getStderr(); + + worker._worker.stdout.end('Hello ', {encoding: 'utf8'}); + worker._worker.stderr.end('Jest ', {encoding: 'utf8'}); + worker._worker.emit('exit'); + worker._worker.stdout.end('World!', {encoding: 'utf8'}); + worker._worker.stderr.end('Workers!', {encoding: 'utf8'}); + worker._worker.emit('exit', 0); + + await expect(getStream(stdout)).resolves.toEqual('Hello World!'); + await expect(getStream(stderr)).resolves.toEqual('Jest Workers!'); }); it('sends the task to the child process', () => {