From 44ce6ff05d3abb68a7e199d7684b1c6ab2975114 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rub=C3=A9n=20Norte?= Date: Fri, 8 Mar 2019 14:50:42 +0000 Subject: [PATCH 1/4] Reattach stdout and stderr from new processes after retries --- packages/jest-worker/package.json | 1 + .../src/workers/ChildProcessWorker.ts | 28 ++++++++++++++++--- .../src/workers/NodeThreadsWorker.ts | 28 ++++++++++++++++--- .../__tests__/ChildProcessWorker.test.js | 21 ++++++++++---- .../__tests__/NodeThreadsWorker.test.js | 23 +++++++++++---- 5 files changed, 83 insertions(+), 18 deletions(-) diff --git a/packages/jest-worker/package.json b/packages/jest-worker/package.json index 83fb8cd2b7ae..f29b9756f903 100644 --- a/packages/jest-worker/package.json +++ b/packages/jest-worker/package.json @@ -17,6 +17,7 @@ "devDependencies": { "@types/merge-stream": "^1.1.2", "@types/supports-color": "^5.3.0", + "get-stream": "^4.1.0", "worker-farm": "^1.6.0" }, "engines": { diff --git a/packages/jest-worker/src/workers/ChildProcessWorker.ts b/packages/jest-worker/src/workers/ChildProcessWorker.ts index b0378ebac93d..67df7a7e1329 100644 --- a/packages/jest-worker/src/workers/ChildProcessWorker.ts +++ b/packages/jest-worker/src/workers/ChildProcessWorker.ts @@ -6,6 +6,7 @@ */ import childProcess, {ChildProcess} from 'child_process'; +import mergeStream from 'merge-stream'; import supportsColor from 'supports-color'; import { @@ -45,10 +46,14 @@ export default class ChildProcessWorker implements WorkerInterface { private _onProcessEnd!: OnEnd; private _request: ChildMessage | null; private _retries!: number; + private _stderr: ReturnType | null; + private _stdout: ReturnType | null; constructor(options: WorkerOptions) { this._options = options; this._request = null; + this._stdout = null; + this._stderr = null; this.initialize(); } @@ -68,6 +73,20 @@ export default class ChildProcessWorker implements WorkerInterface { ...this._options.forkOptions, }); + if (child.stdout) { + if (!this._stdout) { + this._stdout = mergeStream(); + } + this._stdout.add(child.stdout); + } + + if (child.stderr) { + if (!this._stderr) { + this._stderr = mergeStream(); + } + this._stderr.add(child.stderr); + } + child.on('message', this.onMessage.bind(this)); child.on('exit', this.onExit.bind(this)); @@ -79,6 +98,7 @@ export default class ChildProcessWorker implements WorkerInterface { ]); this._child = child; + this._retries++; // If we exceeded the amount of retries, we will emulate an error reply @@ -170,11 +190,11 @@ export default class ChildProcessWorker implements WorkerInterface { return this._options.workerId; } - getStdout() { - return this._child.stdout; + getStdout(): NodeJS.ReadableStream | null { + return this._stdout; } - getStderr() { - return this._child.stderr; + getStderr(): NodeJS.ReadableStream | null { + return this._stderr; } } diff --git a/packages/jest-worker/src/workers/NodeThreadsWorker.ts b/packages/jest-worker/src/workers/NodeThreadsWorker.ts index 58011ba01b8e..7f8ab3838656 100644 --- a/packages/jest-worker/src/workers/NodeThreadsWorker.ts +++ b/packages/jest-worker/src/workers/NodeThreadsWorker.ts @@ -9,6 +9,7 @@ import path from 'path'; // ESLint doesn't know about this experimental module // eslint-disable-next-line import/no-unresolved import {Worker} from 'worker_threads'; +import mergeStream from 'merge-stream'; import { CHILD_MESSAGE_INITIALIZE, @@ -29,10 +30,15 @@ export default class ExperimentalWorker implements WorkerInterface { private _onProcessEnd!: OnEnd; private _request: ChildMessage | null; private _retries!: number; + private _stderr: ReturnType | null; + private _stdout: ReturnType | null; constructor(options: WorkerOptions) { this._options = options; this._request = null; + this._stdout = null; + this._stderr = null; + this.initialize(); } @@ -54,6 +60,20 @@ export default class ExperimentalWorker implements WorkerInterface { }, }); + if (this._worker.stdout) { + if (!this._stdout) { + this._stdout = mergeStream(); + } + this._stdout.add(this._worker.stdout); + } + + if (this._worker.stderr) { + if (!this._stderr) { + this._stderr = mergeStream(); + } + this._stderr.add(this._worker.stderr); + } + this._worker.on('message', this.onMessage.bind(this)); this._worker.on('exit', this.onExit.bind(this)); @@ -154,11 +174,11 @@ export default class ExperimentalWorker implements WorkerInterface { return this._options.workerId; } - getStdout() { - return this._worker.stdout; + getStdout(): NodeJS.ReadableStream | null { + return this._stdout; } - getStderr() { - return this._worker.stderr; + getStderr(): NodeJS.ReadableStream | null { + return this._stderr; } } diff --git a/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js index 25ea5c85fe66..afef3185f7df 100644 --- a/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js +++ b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js @@ -9,6 +9,8 @@ import EventEmitter from 'events'; import supportsColor from 'supports-color'; +import getStream from 'get-stream'; +import {PassThrough} from 'stream'; import { CHILD_MESSAGE_CALL, @@ -30,8 +32,8 @@ beforeEach(() => { childProcess.fork.mockImplementation(() => { forkInterface = Object.assign(new EventEmitter(), { send: jest.fn(), - stderr: {}, - stdout: {}, + stderr: new PassThrough(), + stdout: new PassThrough(), }); return forkInterface; @@ -124,15 +126,24 @@ it('stops initializing the worker after the amount of retries is exceeded', () = expect(onProcessEnd.mock.calls[0][1]).toBe(null); }); -it('provides stdout and stderr fields from the child process', () => { +it('provides stdout and stderr from the child processes', async () => { const worker = new Worker({ forkOptions: {}, maxRetries: 3, workerPath: '/tmp/foo', }); - expect(worker.getStdout()).toBe(forkInterface.stdout); - expect(worker.getStderr()).toBe(forkInterface.stderr); + const stdout = worker.getStdout(); + const stderr = worker.getStderr(); + + forkInterface.stdout.end('Hello ', {encoding: 'utf8'}); + forkInterface.stderr.end('Jest ', {encoding: 'utf8'}); + forkInterface.emit('exit'); + forkInterface.stdout.end('World!', {encoding: 'utf8'}); + forkInterface.stderr.end('Workers!', {encoding: 'utf8'}); + + await expect(getStream(stdout)).resolves.toEqual('Hello World!'); + await expect(getStream(stderr)).resolves.toEqual('Jest Workers!'); }); it('sends the task to the child process', () => { diff --git a/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js b/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js index 96529d515460..fa380bce7541 100644 --- a/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js +++ b/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js @@ -9,6 +9,8 @@ /* eslint-disable no-new */ +import getStream from 'get-stream'; + import { CHILD_MESSAGE_CALL, CHILD_MESSAGE_INITIALIZE, @@ -24,10 +26,12 @@ beforeEach(() => { jest.mock('worker_threads', () => { const fakeClass = jest.fn(() => { const EventEmitter = require('events'); + const {PassThrough} = require('stream'); + const thread = new EventEmitter(); thread.postMessage = jest.fn(); - thread.stdout = 'stdout'; - thread.stderr = 'stderr'; + thread.stdout = new PassThrough(); + thread.stderr = new PassThrough(); return thread; }); @@ -134,15 +138,24 @@ it('stops initializing the worker after the amount of retries is exceeded', () = expect(onProcessEnd.mock.calls[0][1]).toBe(null); }); -it('provides stdout and stderr fields from the child process', () => { +it('provides stdout and stderr from the child processes', async () => { const worker = new Worker({ forkOptions: {}, maxRetries: 3, workerPath: '/tmp/foo', }); - expect(worker.getStdout()).toBe('stdout'); - expect(worker.getStderr()).toBe('stderr'); + const stdout = worker.getStdout(); + const stderr = worker.getStderr(); + + worker._worker.stdout.end('Hello ', {encoding: 'utf8'}); + worker._worker.stderr.end('Jest ', {encoding: 'utf8'}); + worker._worker.emit('exit'); + worker._worker.stdout.end('World!', {encoding: 'utf8'}); + worker._worker.stderr.end('Workers!', {encoding: 'utf8'}); + + await expect(getStream(stdout)).resolves.toEqual('Hello World!'); + await expect(getStream(stderr)).resolves.toEqual('Jest Workers!'); }); it('sends the task to the child process', () => { From 928927144a9860aa42a72fb9a6c2a61424c9abad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rub=C3=A9n=20Norte?= Date: Fri, 8 Mar 2019 14:55:32 +0000 Subject: [PATCH 2/4] Add changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9983aabc1980..a0aa903dd4ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - `[expect]` Compare DOM nodes even if there are multiple Node classes ([#8064](https://github.com/facebook/jest/pull/8064)) - `[jest-worker]` `worker.getStdout()` can return `null` ([#8083](https://github.com/facebook/jest/pull/8083)) +- `[jest-worker]` Re-attach stdout and stderr from new processes/threads created after retries ([#8087](https://github.com/facebook/jest/pull/8087)) ### Chore & Maintenance From 01332e8797786c9c12f5391662cc0de39df443e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rub=C3=A9n=20Norte?= Date: Fri, 8 Mar 2019 15:17:05 +0000 Subject: [PATCH 3/4] Add minor style corrections --- packages/jest-worker/src/workers/ChildProcessWorker.ts | 4 +++- packages/jest-worker/src/workers/NodeThreadsWorker.ts | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/jest-worker/src/workers/ChildProcessWorker.ts b/packages/jest-worker/src/workers/ChildProcessWorker.ts index 67df7a7e1329..46061e502c10 100644 --- a/packages/jest-worker/src/workers/ChildProcessWorker.ts +++ b/packages/jest-worker/src/workers/ChildProcessWorker.ts @@ -52,8 +52,8 @@ export default class ChildProcessWorker implements WorkerInterface { constructor(options: WorkerOptions) { this._options = options; this._request = null; - this._stdout = null; this._stderr = null; + this._stdout = null; this.initialize(); } @@ -77,6 +77,7 @@ export default class ChildProcessWorker implements WorkerInterface { if (!this._stdout) { this._stdout = mergeStream(); } + this._stdout.add(child.stdout); } @@ -84,6 +85,7 @@ export default class ChildProcessWorker implements WorkerInterface { if (!this._stderr) { this._stderr = mergeStream(); } + this._stderr.add(child.stderr); } diff --git a/packages/jest-worker/src/workers/NodeThreadsWorker.ts b/packages/jest-worker/src/workers/NodeThreadsWorker.ts index 7f8ab3838656..d87801a66ee9 100644 --- a/packages/jest-worker/src/workers/NodeThreadsWorker.ts +++ b/packages/jest-worker/src/workers/NodeThreadsWorker.ts @@ -36,8 +36,8 @@ export default class ExperimentalWorker implements WorkerInterface { constructor(options: WorkerOptions) { this._options = options; this._request = null; - this._stdout = null; this._stderr = null; + this._stdout = null; this.initialize(); } @@ -64,6 +64,7 @@ export default class ExperimentalWorker implements WorkerInterface { if (!this._stdout) { this._stdout = mergeStream(); } + this._stdout.add(this._worker.stdout); } @@ -71,6 +72,7 @@ export default class ExperimentalWorker implements WorkerInterface { if (!this._stderr) { this._stderr = mergeStream(); } + this._stderr.add(this._worker.stderr); } From fc4025b4b97d99b44b38916073d18c4ae325f47a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rub=C3=A9n=20Norte?= Date: Fri, 8 Mar 2019 16:24:33 +0000 Subject: [PATCH 4/4] Prevent worker stdout/stderr from ending when the first stream ends --- .../src/workers/ChildProcessWorker.ts | 28 +++++++++++++++++-- .../src/workers/NodeThreadsWorker.ts | 28 +++++++++++++++++-- .../__tests__/ChildProcessWorker.test.js | 1 + .../__tests__/NodeThreadsWorker.test.js | 1 + 4 files changed, 54 insertions(+), 4 deletions(-) diff --git a/packages/jest-worker/src/workers/ChildProcessWorker.ts b/packages/jest-worker/src/workers/ChildProcessWorker.ts index 46061e502c10..2b540c762c7c 100644 --- a/packages/jest-worker/src/workers/ChildProcessWorker.ts +++ b/packages/jest-worker/src/workers/ChildProcessWorker.ts @@ -6,6 +6,7 @@ */ import childProcess, {ChildProcess} from 'child_process'; +import {PassThrough} from 'stream'; import mergeStream from 'merge-stream'; import supportsColor from 'supports-color'; @@ -44,6 +45,7 @@ export default class ChildProcessWorker implements WorkerInterface { private _child!: ChildProcess; private _options: WorkerOptions; private _onProcessEnd!: OnEnd; + private _fakeStream: PassThrough | null; private _request: ChildMessage | null; private _retries!: number; private _stderr: ReturnType | null; @@ -51,6 +53,7 @@ export default class ChildProcessWorker implements WorkerInterface { constructor(options: WorkerOptions) { this._options = options; + this._fakeStream = null; this._request = null; this._stderr = null; this._stdout = null; @@ -75,7 +78,9 @@ export default class ChildProcessWorker implements WorkerInterface { if (child.stdout) { if (!this._stdout) { - this._stdout = mergeStream(); + // We need to add a permanent stream to the merged stream to prevent it + // from ending when the subprocess stream ends + this._stdout = mergeStream(this._getFakeStream()); } this._stdout.add(child.stdout); @@ -83,7 +88,9 @@ export default class ChildProcessWorker implements WorkerInterface { if (child.stderr) { if (!this._stderr) { - this._stderr = mergeStream(); + // We need to add a permanent stream to the merged stream to prevent it + // from ending when the subprocess stream ends + this._stderr = mergeStream(this._getFakeStream()); } this._stderr.add(child.stderr); @@ -119,6 +126,14 @@ export default class ChildProcessWorker implements WorkerInterface { } } + private _shutdown() { + // End the temporary streams so the merged streams end too + if (this._fakeStream) { + this._fakeStream.end(); + this._fakeStream = null; + } + } + onMessage(response: ParentMessage) { let error; @@ -171,6 +186,8 @@ export default class ChildProcessWorker implements WorkerInterface { if (this._request) { this._child.send(this._request); } + } else { + this._shutdown(); } } @@ -199,4 +216,11 @@ export default class ChildProcessWorker implements WorkerInterface { getStderr(): NodeJS.ReadableStream | null { return this._stderr; } + + private _getFakeStream() { + if (!this._fakeStream) { + this._fakeStream = new PassThrough(); + } + return this._fakeStream; + } } diff --git a/packages/jest-worker/src/workers/NodeThreadsWorker.ts b/packages/jest-worker/src/workers/NodeThreadsWorker.ts index d87801a66ee9..45d748e40e8a 100644 --- a/packages/jest-worker/src/workers/NodeThreadsWorker.ts +++ b/packages/jest-worker/src/workers/NodeThreadsWorker.ts @@ -6,6 +6,7 @@ */ import path from 'path'; +import {PassThrough} from 'stream'; // ESLint doesn't know about this experimental module // eslint-disable-next-line import/no-unresolved import {Worker} from 'worker_threads'; @@ -32,12 +33,14 @@ export default class ExperimentalWorker implements WorkerInterface { private _retries!: number; private _stderr: ReturnType | null; private _stdout: ReturnType | null; + private _fakeStream: PassThrough | null; constructor(options: WorkerOptions) { this._options = options; this._request = null; this._stderr = null; this._stdout = null; + this._fakeStream = null; this.initialize(); } @@ -62,7 +65,9 @@ export default class ExperimentalWorker implements WorkerInterface { if (this._worker.stdout) { if (!this._stdout) { - this._stdout = mergeStream(); + // We need to add a permanent stream to the merged stream to prevent it + // from ending when the subprocess stream ends + this._stdout = mergeStream(this._getFakeStream()); } this._stdout.add(this._worker.stdout); @@ -70,7 +75,9 @@ export default class ExperimentalWorker implements WorkerInterface { if (this._worker.stderr) { if (!this._stderr) { - this._stderr = mergeStream(); + // We need to add a permanent stream to the merged stream to prevent it + // from ending when the subprocess stream ends + this._stderr = mergeStream(this._getFakeStream()); } this._stderr.add(this._worker.stderr); @@ -104,6 +111,14 @@ export default class ExperimentalWorker implements WorkerInterface { } } + private _shutdown() { + // End the permanent stream so the merged stream end too + if (this._fakeStream) { + this._fakeStream.end(); + this._fakeStream = null; + } + } + onMessage(response: ParentMessage) { let error; @@ -154,6 +169,8 @@ export default class ExperimentalWorker implements WorkerInterface { if (this._request) { this._worker.postMessage(this._request); } + } else { + this._shutdown(); } } @@ -183,4 +200,11 @@ export default class ExperimentalWorker implements WorkerInterface { getStderr(): NodeJS.ReadableStream | null { return this._stderr; } + + private _getFakeStream() { + if (!this._fakeStream) { + this._fakeStream = new PassThrough(); + } + return this._fakeStream; + } } diff --git a/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js index afef3185f7df..8b1e2fad4c35 100644 --- a/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js +++ b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js @@ -141,6 +141,7 @@ it('provides stdout and stderr from the child processes', async () => { forkInterface.emit('exit'); forkInterface.stdout.end('World!', {encoding: 'utf8'}); forkInterface.stderr.end('Workers!', {encoding: 'utf8'}); + forkInterface.emit('exit', 0); await expect(getStream(stdout)).resolves.toEqual('Hello World!'); await expect(getStream(stderr)).resolves.toEqual('Jest Workers!'); diff --git a/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js b/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js index fa380bce7541..998f12c1f808 100644 --- a/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js +++ b/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js @@ -153,6 +153,7 @@ it('provides stdout and stderr from the child processes', async () => { worker._worker.emit('exit'); worker._worker.stdout.end('World!', {encoding: 'utf8'}); worker._worker.stderr.end('Workers!', {encoding: 'utf8'}); + worker._worker.emit('exit', 0); await expect(getStream(stdout)).resolves.toEqual('Hello World!'); await expect(getStream(stderr)).resolves.toEqual('Jest Workers!');