From d6ad15b0f88a05816c2fe034dd6900d28315d570 Mon Sep 17 00:00:00 2001 From: Paul Hawxby Date: Thu, 11 Aug 2022 13:29:23 +0100 Subject: [PATCH] fix: worker being killed after being spawned and other worker bugs (#13107) --- CHANGELOG.md | 2 +- .../workerRestarting.test.ts.snap | 9 + e2e/__tests__/workerRestarting.test.ts | 16 + e2e/worker-restarting/__tests__/test1.js | 10 + e2e/worker-restarting/__tests__/test2.js | 10 + e2e/worker-restarting/__tests__/test3.js | 10 + e2e/worker-restarting/package.json | 6 + packages/jest-worker/src/types.ts | 32 ++ .../src/workers/ChildProcessWorker.ts | 179 +++++--- .../src/workers/NodeThreadsWorker.ts | 72 ++-- .../jest-worker/src/workers/WorkerAbstract.ts | 145 +++++++ .../__tests__/ChildProcessWorker.test.js | 1 + .../workers/__tests__/WorkerEdgeCases.test.js | 406 +++++++++++------- .../__tests__/__fixtures__/EdgeCasesWorker.js | 2 +- 14 files changed, 644 insertions(+), 256 deletions(-) create mode 100644 e2e/__tests__/__snapshots__/workerRestarting.test.ts.snap create mode 100644 e2e/__tests__/workerRestarting.test.ts create mode 100644 e2e/worker-restarting/__tests__/test1.js create mode 100644 e2e/worker-restarting/__tests__/test2.js create mode 100644 e2e/worker-restarting/__tests__/test3.js create mode 100644 e2e/worker-restarting/package.json create mode 100644 packages/jest-worker/src/workers/WorkerAbstract.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index bc3a69b6e7cb..f5ea35996abb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ - `[jest-config]` [**BREAKING**] Make `snapshotFormat` default to `escapeString: false` and `printBasicPrototype: false` ([#13036](https://github.com/facebook/jest/pull/13036)) - `[jest-environment-jsdom]` [**BREAKING**] Upgrade to `jsdom@20` ([#13037](https://github.com/facebook/jest/pull/13037), [#13058](https://github.com/facebook/jest/pull/13058)) -- `[jest-worker]` Adds `workerIdleMemoryLimit` option which is used as a check for worker memory leaks >= Node 16.11.0 and recycles child workers as required. ([#13056](https://github.com/facebook/jest/pull/13056), [#13105](https://github.com/facebook/jest/pull/13105), [#13106](https://github.com/facebook/jest/pull/13106)) +- `[jest-worker]` Adds `workerIdleMemoryLimit` option which is used as a check for worker memory leaks >= Node 16.11.0 and recycles child workers as required. ([#13056](https://github.com/facebook/jest/pull/13056), [#13105](https://github.com/facebook/jest/pull/13105), [#13106](https://github.com/facebook/jest/pull/13106), [#13107](https://github.com/facebook/jest/pull/13107)) - `[pretty-format]` [**BREAKING**] Remove `ConvertAnsi` plugin in favour of `jest-serializer-ansi-escapes` ([#13040](https://github.com/facebook/jest/pull/13040)) ### Fixes diff --git a/e2e/__tests__/__snapshots__/workerRestarting.test.ts.snap b/e2e/__tests__/__snapshots__/workerRestarting.test.ts.snap new file mode 100644 index 000000000000..edf50a0f2d69 --- /dev/null +++ b/e2e/__tests__/__snapshots__/workerRestarting.test.ts.snap @@ -0,0 +1,9 @@ +// Jest Snapshot v1, https://goo.gl/fbAQLP + +exports[`all 3 test files should complete 1`] = ` +"Test Suites: 3 passed, 3 total +Tests: 3 passed, 3 total +Snapshots: 0 total +Time: <> +Ran all test suites." +`; diff --git a/e2e/__tests__/workerRestarting.test.ts b/e2e/__tests__/workerRestarting.test.ts new file mode 100644 index 000000000000..aa8cb425f77b --- /dev/null +++ b/e2e/__tests__/workerRestarting.test.ts @@ -0,0 +1,16 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import {extractSummary} from '../Utils'; +import runJest from '../runJest'; + +it('all 3 test files should complete', () => { + const result = runJest('worker-restarting'); + expect(result.exitCode).toBe(0); + const {summary} = extractSummary(result.stderr); + expect(summary).toMatchSnapshot(); +}); diff --git a/e2e/worker-restarting/__tests__/test1.js b/e2e/worker-restarting/__tests__/test1.js new file mode 100644 index 000000000000..3e394ec4ec61 --- /dev/null +++ b/e2e/worker-restarting/__tests__/test1.js @@ -0,0 +1,10 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +test('basic test', () => { + expect(true).toBeTruthy(); +}); diff --git a/e2e/worker-restarting/__tests__/test2.js b/e2e/worker-restarting/__tests__/test2.js new file mode 100644 index 000000000000..3e394ec4ec61 --- /dev/null +++ b/e2e/worker-restarting/__tests__/test2.js @@ -0,0 +1,10 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +test('basic test', () => { + expect(true).toBeTruthy(); +}); diff --git a/e2e/worker-restarting/__tests__/test3.js b/e2e/worker-restarting/__tests__/test3.js new file mode 100644 index 000000000000..3e394ec4ec61 --- /dev/null +++ b/e2e/worker-restarting/__tests__/test3.js @@ -0,0 +1,10 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +test('basic test', () => { + expect(true).toBeTruthy(); +}); diff --git a/e2e/worker-restarting/package.json b/e2e/worker-restarting/package.json new file mode 100644 index 000000000000..3dc2bb57749f --- /dev/null +++ b/e2e/worker-restarting/package.json @@ -0,0 +1,6 @@ +{ + "jest": { + "maxWorkers": 2, + "workerIdleMemoryLimit": "1MB" + } +} diff --git a/packages/jest-worker/src/types.ts b/packages/jest-worker/src/types.ts index 302771e10b0a..a301c6b994a0 100644 --- a/packages/jest-worker/src/types.ts +++ b/packages/jest-worker/src/types.ts @@ -66,12 +66,15 @@ export interface WorkerPoolInterface { } export interface WorkerInterface { + get state(): WorkerStates; + send( request: ChildMessage, onProcessStart: OnStart, onProcessEnd: OnEnd, onCustomMessage: OnCustomMessage, ): void; + waitForExit(): Promise; forceExit(): void; @@ -83,6 +86,18 @@ export interface WorkerInterface { */ getWorkerSystemId(): number; getMemoryUsage(): Promise; + /** + * Checks to see if the child worker is actually running. + */ + isWorkerRunning(): boolean; + /** + * When the worker child is started and ready to start handling requests. + * + * @remarks + * This mostly exists to help with testing so that you don't check the status + * of things like isWorkerRunning before it actually is. + */ + waitForWorkerReady(): Promise; } export type PoolExitResult = { @@ -170,8 +185,21 @@ export type WorkerOptions = { * the raw output of the worker. */ silent?: boolean; + /** + * Used to immediately bind event handlers. + */ + on?: { + [WorkerEvents.STATE_CHANGE]: + | OnStateChangeHandler + | ReadonlyArray; + }; }; +export type OnStateChangeHandler = ( + state: WorkerStates, + oldState: WorkerStates, +) => void; + // Messages passed from the parent to the children. export type MessagePort = typeof EventEmitter & { @@ -265,3 +293,7 @@ export enum WorkerStates { SHUTTING_DOWN = 'shutting-down', SHUT_DOWN = 'shut-down', } + +export enum WorkerEvents { + STATE_CHANGE = 'state-change', +} diff --git a/packages/jest-worker/src/workers/ChildProcessWorker.ts b/packages/jest-worker/src/workers/ChildProcessWorker.ts index 4ef4d51b87cc..9706a0235f56 100644 --- a/packages/jest-worker/src/workers/ChildProcessWorker.ts +++ b/packages/jest-worker/src/workers/ChildProcessWorker.ts @@ -7,7 +7,6 @@ import {ChildProcess, ForkOptions, fork} from 'child_process'; import {totalmem} from 'os'; -import {PassThrough} from 'stream'; import mergeStream = require('merge-stream'); import {stdout as stdoutSupportsColor} from 'supports-color'; import { @@ -27,13 +26,14 @@ import { WorkerOptions, WorkerStates, } from '../types'; +import WorkerAbstract from './WorkerAbstract'; const SIGNAL_BASE_EXIT_CODE = 128; const SIGKILL_EXIT_CODE = SIGNAL_BASE_EXIT_CODE + 9; const SIGTERM_EXIT_CODE = SIGNAL_BASE_EXIT_CODE + 15; // How long to wait after SIGTERM before sending SIGKILL -const SIGKILL_DELAY = 500; +export const SIGKILL_DELAY = 500; /** * This class wraps the child process and provides a nice interface to @@ -53,7 +53,10 @@ const SIGKILL_DELAY = 500; * field is changed to "true", so that other workers which might encounter the * same call skip it. */ -export default class ChildProcessWorker implements WorkerInterface { +export default class ChildProcessWorker + extends WorkerAbstract + implements WorkerInterface +{ private _child!: ChildProcess; private _options: WorkerOptions; @@ -62,12 +65,10 @@ export default class ChildProcessWorker implements WorkerInterface { private _onProcessEnd!: OnEnd; private _onCustomMessage!: OnCustomMessage; - private _fakeStream: PassThrough | null; private _stdout: ReturnType | null; private _stderr: ReturnType | null; - private _exitPromise: Promise; - private _resolveExitPromise!: () => void; + private _stderrBuffer: Array = []; private _memoryUsagePromise: Promise | undefined; private _resolveMemoryUsage: ((arg0: number) => void) | undefined; @@ -75,37 +76,33 @@ export default class ChildProcessWorker implements WorkerInterface { private _childIdleMemoryUsage: number | null; private _childIdleMemoryUsageLimit: number | null; private _memoryUsageCheck = false; - private _state: WorkerStates; private _childWorkerPath: string; constructor(options: WorkerOptions) { + super(options); + this._options = options; this._request = null; - this._fakeStream = null; this._stdout = null; this._stderr = null; this._childIdleMemoryUsage = null; this._childIdleMemoryUsageLimit = options.idleMemoryLimit || null; - this._exitPromise = new Promise(resolve => { - this._resolveExitPromise = resolve; - }); - this._childWorkerPath = options.childWorkerPath || require.resolve('./processChild'); - this._state = WorkerStates.STARTING; + this.state = WorkerStates.STARTING; this.initialize(); } initialize(): void { if ( - this._state === WorkerStates.OUT_OF_MEMORY || - this._state === WorkerStates.SHUTTING_DOWN || - this._state === WorkerStates.SHUT_DOWN + this.state === WorkerStates.OUT_OF_MEMORY || + this.state === WorkerStates.SHUTTING_DOWN || + this.state === WorkerStates.SHUT_DOWN ) { return; } @@ -114,9 +111,22 @@ export default class ChildProcessWorker implements WorkerInterface { this._child.kill('SIGKILL'); } - this._state = WorkerStates.STARTING; + this.state = WorkerStates.STARTING; const forceColor = stdoutSupportsColor ? {FORCE_COLOR: '1'} : {}; + const silent = this._options.silent ?? true; + + if (!silent) { + // NOTE: Detecting an out of memory crash is independent of idle memory usage monitoring. We want to + // monitor for a crash occurring so that it can be handled as required and so we can tell the difference + // between an OOM crash and another kind of crash. We need to do this because if a worker crashes due to + // an OOM event sometimes it isn't seen by the worker pool and it just sits there waiting for the worker + // to respond and it never will. + console.warn('Unable to detect out of memory event if silent === false'); + } + + this._stderrBuffer = []; + const options: ForkOptions = { cwd: process.cwd(), env: { @@ -128,46 +138,45 @@ export default class ChildProcessWorker implements WorkerInterface { execArgv: process.execArgv.filter(v => !/^--(debug|inspect)/.test(v)), // default to advanced serialization in order to match worker threads serialization: 'advanced', - silent: this._options.silent ?? true, + silent, ...this._options.forkOptions, }; - const child = fork(this._childWorkerPath, [], options); + this._child = fork(this._childWorkerPath, [], options); - if (child.stdout) { + if (this._child.stdout) { if (!this._stdout) { // We need to add a permanent stream to the merged stream to prevent it // from ending when the subprocess stream ends this._stdout = mergeStream(this._getFakeStream()); } - this._stdout.add(child.stdout); + this._stdout.add(this._child.stdout); } - if (child.stderr) { + if (this._child.stderr) { if (!this._stderr) { // We need to add a permanent stream to the merged stream to prevent it // from ending when the subprocess stream ends this._stderr = mergeStream(this._getFakeStream()); } - this._stderr.add(child.stderr); + this._stderr.add(this._child.stderr); + + this._child.stderr.on('data', this.stderrDataHandler.bind(this)); } - this._detectOutOfMemoryCrash(child); - child.on('message', this._onMessage.bind(this)); - child.on('exit', this._onExit.bind(this)); + this._child.on('message', this._onMessage.bind(this)); + this._child.on('exit', this._onExit.bind(this)); + this._child.on('disconnect', this._onDisconnect.bind(this)); - child.send([ + this._child.send([ CHILD_MESSAGE_INITIALIZE, false, this._options.workerPath, this._options.setupArgs, ]); - this._child = child; - this._state = WorkerStates.OK; - this._retries++; // If we exceeded the amount of retries, we will emulate an error reply @@ -189,44 +198,60 @@ export default class ChildProcessWorker implements WorkerInterface { // Clear the request so we don't keep executing it. this._request = null; } + + this.state = WorkerStates.OK; + if (this._resolveWorkerReady) { + this._resolveWorkerReady(); + } } - private _detectOutOfMemoryCrash(child: ChildProcess): void { - let stderrStr = ''; + private stderrDataHandler(chunk: any): void { + if (chunk) { + this._stderrBuffer.push(Buffer.from(chunk)); + } - const handler = (chunk: any) => { - if (this._state !== WorkerStates.OUT_OF_MEMORY) { - let str: string | undefined = undefined; + this._detectOutOfMemoryCrash(); - if (chunk instanceof Buffer) { - str = chunk.toString('utf8'); - } else if (typeof chunk === 'string') { - str = chunk; - } + if (this.state === WorkerStates.OUT_OF_MEMORY) { + this._workerReadyPromise = undefined; + this._resolveWorkerReady = undefined; - if (str) { - stderrStr += str; - } + this.killChild(); + this._shutdown(); + } + } - if (stderrStr.includes('heap out of memory')) { - this._state = WorkerStates.OUT_OF_MEMORY; + private _detectOutOfMemoryCrash(): void { + try { + const bufferStr = Buffer.concat(this._stderrBuffer).toString('utf8'); + + if ( + bufferStr.includes('heap out of memory') || + bufferStr.includes('allocation failure;') || + bufferStr.includes('Last few GCs') + ) { + if ( + this.state === WorkerStates.OK || + this.state === WorkerStates.STARTING + ) { + this.state = WorkerStates.OUT_OF_MEMORY; } } - }; - - child.stderr?.on('data', handler); + } catch (err) { + console.error('Error looking for out of memory crash', err); + } } - private _shutdown() { - this._state = WorkerStates.SHUTTING_DOWN; + private _onDisconnect() { + this._workerReadyPromise = undefined; + this._resolveWorkerReady = undefined; - // End the temporary streams so the merged streams end too - if (this._fakeStream) { - this._fakeStream.end(); - this._fakeStream = null; - } + this._detectOutOfMemoryCrash(); - this._resolveExitPromise(); + if (this.state === WorkerStates.OUT_OF_MEMORY) { + this.killChild(); + this._shutdown(); + } } private _onMessage(response: ParentMessage) { @@ -311,7 +336,7 @@ export default class ChildProcessWorker implements WorkerInterface { this._childIdleMemoryUsage && this._childIdleMemoryUsage > limit ) { - this._state = WorkerStates.RESTARTING; + this.state = WorkerStates.RESTARTING; this.killChild(); } @@ -319,7 +344,12 @@ export default class ChildProcessWorker implements WorkerInterface { } private _onExit(exitCode: number | null) { - if (exitCode !== 0 && this._state === WorkerStates.OUT_OF_MEMORY) { + this._workerReadyPromise = undefined; + this._resolveWorkerReady = undefined; + + this._detectOutOfMemoryCrash(); + + if (exitCode !== 0 && this.state === WorkerStates.OUT_OF_MEMORY) { this._onProcessEnd( new Error('Jest worker ran out of memory and crashed'), null, @@ -331,9 +361,11 @@ export default class ChildProcessWorker implements WorkerInterface { exitCode !== null && exitCode !== SIGTERM_EXIT_CODE && exitCode !== SIGKILL_EXIT_CODE && - this._state !== WorkerStates.SHUTTING_DOWN) || - this._state === WorkerStates.RESTARTING + this.state !== WorkerStates.SHUTTING_DOWN) || + this.state === WorkerStates.RESTARTING ) { + this.state = WorkerStates.RESTARTING; + this.initialize(); if (this._request) { @@ -350,14 +382,22 @@ export default class ChildProcessWorker implements WorkerInterface { onProcessEnd: OnEnd, onCustomMessage: OnCustomMessage, ): void { + this._stderrBuffer = []; + onProcessStart(this); this._onProcessEnd = (...args) => { + const hasRequest = !!this._request; + // Clean the request to avoid sending past requests to workers that fail // while waiting for a new request (timers, unhandled rejections...) this._request = null; - if (this._childIdleMemoryUsageLimit && this._child.connected) { + if ( + this._childIdleMemoryUsageLimit && + this._child.connected && + hasRequest + ) { this.checkMemoryUsage(); } @@ -377,12 +417,16 @@ export default class ChildProcessWorker implements WorkerInterface { } killChild(): NodeJS.Timeout { - this._child.kill('SIGTERM'); - return setTimeout(() => this._child.kill('SIGKILL'), SIGKILL_DELAY); + // We store a reference so that there's no way we can accidentally + // kill a new worker that has been spawned. + const childToKill = this._child; + + childToKill.kill('SIGTERM'); + return setTimeout(() => childToKill.kill('SIGKILL'), SIGKILL_DELAY); } forceExit(): void { - this._state = WorkerStates.SHUTTING_DOWN; + this.state = WorkerStates.SHUTTING_DOWN; const sigkillTimeout = this.killChild(); this._exitPromise.then(() => clearTimeout(sigkillTimeout)); @@ -466,10 +510,7 @@ export default class ChildProcessWorker implements WorkerInterface { } } - private _getFakeStream() { - if (!this._fakeStream) { - this._fakeStream = new PassThrough(); - } - return this._fakeStream; + isWorkerRunning(): boolean { + return this._child.connected && !this._child.killed; } } diff --git a/packages/jest-worker/src/workers/NodeThreadsWorker.ts b/packages/jest-worker/src/workers/NodeThreadsWorker.ts index ad1bb822cd56..8735404ed243 100644 --- a/packages/jest-worker/src/workers/NodeThreadsWorker.ts +++ b/packages/jest-worker/src/workers/NodeThreadsWorker.ts @@ -6,7 +6,6 @@ */ import {totalmem} from 'os'; -import {PassThrough} from 'stream'; import {Worker} from 'worker_threads'; import mergeStream = require('merge-stream'); import { @@ -26,8 +25,12 @@ import { WorkerOptions, WorkerStates, } from '../types'; +import WorkerAbstract from './WorkerAbstract'; -export default class ExperimentalWorker implements WorkerInterface { +export default class ExperimentalWorker + extends WorkerAbstract + implements WorkerInterface +{ private _worker!: Worker; private _options: WorkerOptions; @@ -36,13 +39,9 @@ export default class ExperimentalWorker implements WorkerInterface { private _onProcessEnd!: OnEnd; private _onCustomMessage!: OnCustomMessage; - private _fakeStream: PassThrough | null; private _stdout: ReturnType | null; private _stderr: ReturnType | null; - private _exitPromise: Promise; - private _resolveExitPromise!: () => void; - private _memoryUsagePromise: Promise | undefined; private _resolveMemoryUsage: ((arg0: number) => void) | undefined; @@ -51,14 +50,14 @@ export default class ExperimentalWorker implements WorkerInterface { private _childIdleMemoryUsage: number | null; private _childIdleMemoryUsageLimit: number | null; private _memoryUsageCheck = false; - private _state: WorkerStates; constructor(options: WorkerOptions) { + super(options); + this._options = options; this._request = null; - this._fakeStream = null; this._stdout = null; this._stderr = null; @@ -68,19 +67,14 @@ export default class ExperimentalWorker implements WorkerInterface { this._childIdleMemoryUsage = null; this._childIdleMemoryUsageLimit = options.idleMemoryLimit || null; - this._exitPromise = new Promise(resolve => { - this._resolveExitPromise = resolve; - }); - - this._state = WorkerStates.STARTING; this.initialize(); } initialize(): void { if ( - this._state === WorkerStates.OUT_OF_MEMORY || - this._state === WorkerStates.SHUTTING_DOWN || - this._state === WorkerStates.SHUT_DOWN + this.state === WorkerStates.OUT_OF_MEMORY || + this.state === WorkerStates.SHUTTING_DOWN || + this.state === WorkerStates.SHUT_DOWN ) { return; } @@ -89,7 +83,7 @@ export default class ExperimentalWorker implements WorkerInterface { this._worker.terminate(); } - this._state = WorkerStates.STARTING; + this.state = WorkerStates.STARTING; this._worker = new Worker(this._childWorkerPath, { eval: false, @@ -157,21 +151,21 @@ export default class ExperimentalWorker implements WorkerInterface { {type: 'WorkerError'}, ]); } - } - private _shutdown() { - // End the permanent stream so the merged stream end too - if (this._fakeStream) { - this._fakeStream.end(); - this._fakeStream = null; + this.state = WorkerStates.OK; + if (this._resolveWorkerReady) { + this._resolveWorkerReady(); } - - this._resolveExitPromise(); } private _onError(error: Error) { if (error.message.includes('heap out of memory')) { - this._state = WorkerStates.OUT_OF_MEMORY; + this.state = WorkerStates.OUT_OF_MEMORY; + + // Threads don't behave like processes, they don't crash when they run out of + // memory. But for consistency we want them to behave like processes so we call + // terminate to simulate a crash happening that was not planned + this._worker.terminate(); } } @@ -238,7 +232,10 @@ export default class ExperimentalWorker implements WorkerInterface { } private _onExit(exitCode: number) { - if (exitCode !== 0 && this._state === WorkerStates.OUT_OF_MEMORY) { + this._workerReadyPromise = undefined; + this._resolveWorkerReady = undefined; + + if (exitCode !== 0 && this.state === WorkerStates.OUT_OF_MEMORY) { this._onProcessEnd( new Error('Jest worker ran out of memory and crashed'), null, @@ -247,9 +244,9 @@ export default class ExperimentalWorker implements WorkerInterface { this._shutdown(); } else if ( (exitCode !== 0 && - this._state !== WorkerStates.SHUTTING_DOWN && - this._state !== WorkerStates.SHUT_DOWN) || - this._state === WorkerStates.RESTARTING + this.state !== WorkerStates.SHUTTING_DOWN && + this.state !== WorkerStates.SHUT_DOWN) || + this.state === WorkerStates.RESTARTING ) { this.initialize(); @@ -266,7 +263,7 @@ export default class ExperimentalWorker implements WorkerInterface { } forceExit(): void { - this._state = WorkerStates.SHUTTING_DOWN; + this.state = WorkerStates.SHUTTING_DOWN; this._worker.terminate(); } @@ -278,11 +275,13 @@ export default class ExperimentalWorker implements WorkerInterface { ): void { onProcessStart(this); this._onProcessEnd = (...args) => { + const hasRequest = !!this._request; + // Clean the request to avoid sending past requests to workers that fail // while waiting for a new request (timers, unhandled rejections...) this._request = null; - if (this._childIdleMemoryUsageLimit) { + if (this._childIdleMemoryUsageLimit && hasRequest) { this.checkMemoryUsage(); } @@ -335,7 +334,7 @@ export default class ExperimentalWorker implements WorkerInterface { this._childIdleMemoryUsage && this._childIdleMemoryUsage > limit ) { - this._state = WorkerStates.RESTARTING; + this.state = WorkerStates.RESTARTING; this._worker.terminate(); } @@ -404,10 +403,7 @@ export default class ExperimentalWorker implements WorkerInterface { return this._worker.threadId; } - private _getFakeStream() { - if (!this._fakeStream) { - this._fakeStream = new PassThrough(); - } - return this._fakeStream; + isWorkerRunning(): boolean { + return this._worker.threadId >= 0; } } diff --git a/packages/jest-worker/src/workers/WorkerAbstract.ts b/packages/jest-worker/src/workers/WorkerAbstract.ts new file mode 100644 index 000000000000..957660990b4f --- /dev/null +++ b/packages/jest-worker/src/workers/WorkerAbstract.ts @@ -0,0 +1,145 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import {EventEmitter, PassThrough} from 'stream'; +import { + WorkerEvents, + WorkerInterface, + WorkerOptions, + WorkerStates, +} from '../types'; + +export default abstract class WorkerAbstract + extends EventEmitter + implements Pick +{ + /** + * DO NOT WRITE TO THIS DIRECTLY. + * Use this.state getter/setters so events are emitted correctly. + */ + #state = WorkerStates.STARTING; + + protected _fakeStream: PassThrough | null = null; + + protected _exitPromise: Promise; + protected _resolveExitPromise!: () => void; + + protected _workerReadyPromise: Promise | undefined; + protected _resolveWorkerReady: (() => void) | undefined; + + public get state(): WorkerStates { + return this.#state; + } + protected set state(value: WorkerStates) { + if (this.#state !== value) { + const oldState = this.#state; + this.#state = value; + + this.emit(WorkerEvents.STATE_CHANGE, value, oldState); + } + } + + constructor(options: WorkerOptions) { + super(); + + if (typeof options.on === 'object') { + for (const [event, handlers] of Object.entries(options.on)) { + // Can't do Array.isArray on a ReadonlyArray. + // https://github.com/microsoft/TypeScript/issues/17002 + if (typeof handlers === 'function') { + super.on(event, handlers); + } else { + for (const handler of handlers) { + super.on(event, handler); + } + } + } + } + + this._exitPromise = new Promise(resolve => { + this._resolveExitPromise = resolve; + }); + this._exitPromise.then(() => { + this.state = WorkerStates.SHUT_DOWN; + }); + } + + /** + * Wait for the worker child process to be ready to handle requests. + * + * @returns Promise which resolves when ready. + */ + public waitForWorkerReady(): Promise { + if (!this._workerReadyPromise) { + this._workerReadyPromise = new Promise((resolve, reject) => { + let settled = false; + let to: NodeJS.Timeout | undefined; + + switch (this.state) { + case WorkerStates.OUT_OF_MEMORY: + case WorkerStates.SHUTTING_DOWN: + case WorkerStates.SHUT_DOWN: + settled = true; + reject( + new Error( + `Worker state means it will never be ready: ${this.state}`, + ), + ); + break; + case WorkerStates.STARTING: + case WorkerStates.RESTARTING: + this._resolveWorkerReady = () => { + settled = true; + resolve(); + + if (to) { + clearTimeout(to); + } + }; + break; + case WorkerStates.OK: + settled = true; + resolve(); + break; + } + + if (!settled) { + to = setTimeout(() => { + if (!settled) { + reject(new Error('Timeout starting worker')); + } + }, 500); + } + }); + } + + return this._workerReadyPromise; + } + + /** + * Used to shut down the current working instance once the children have been + * killed off. + */ + protected _shutdown(): void { + this.state === WorkerStates.SHUT_DOWN; + + // End the permanent stream so the merged stream end too + if (this._fakeStream) { + this._fakeStream.end(); + this._fakeStream = null; + } + + this._resolveExitPromise(); + } + + protected _getFakeStream(): PassThrough { + if (!this._fakeStream) { + this._fakeStream = new PassThrough(); + } + return this._fakeStream; + } +} diff --git a/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js index 0948ea82be57..bce2d97a3bea 100644 --- a/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js +++ b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js @@ -430,6 +430,7 @@ it('when out of memory occurs the worker is killed and exits', async () => { 1: 0x10da153a5 node::Abort() (.cold.1) [/Users/paul/.nvm/versions/node/v16.10.0/bin/node] 2: 0x10c6f09b9 node::Abort() [/Users/paul/.nvm/versions/node/v16.10.0/bin/node]`, ); + forkInterface.stderr.emit('end'); forkInterface.emit('exit', null, 'SIGABRT'); diff --git a/packages/jest-worker/src/workers/__tests__/WorkerEdgeCases.test.js b/packages/jest-worker/src/workers/__tests__/WorkerEdgeCases.test.js index 8005048f48f4..0b68ea8755a6 100644 --- a/packages/jest-worker/src/workers/__tests__/WorkerEdgeCases.test.js +++ b/packages/jest-worker/src/workers/__tests__/WorkerEdgeCases.test.js @@ -11,15 +11,15 @@ import {transformFileAsync} from '@babel/core'; import { CHILD_MESSAGE_CALL, CHILD_MESSAGE_MEM_USAGE, + WorkerEvents, WorkerInterface, WorkerOptions, + WorkerStates, } from '../../types'; -import ChildProcessWorker from '../ChildProcessWorker'; +import ChildProcessWorker, {SIGKILL_DELAY} from '../ChildProcessWorker'; import ThreadsWorker from '../NodeThreadsWorker'; -// These tests appear to be slow/flaky. Allowing it to retry quite a few times -// will cut down on this noise and they're fast tests anyway. -jest.retryTimes(30); +jest.setTimeout(10000); const root = join('../../'); const filesToBuild = ['workers/processChild', 'workers/threadChild', 'types']; @@ -57,6 +57,15 @@ test.each(filesToBuild)('%s.js should exist', async file => { await expect(async () => await access(path)).not.toThrowError(); }); +async function closeWorkerAfter(worker, testBody) { + try { + await testBody(worker); + } finally { + worker.forceExit(); + await worker.waitForExit(); + } +} + describe.each([ { name: 'ProcessWorker', @@ -69,16 +78,9 @@ describe.each([ workerPath: threadChildWorkerPath, }, ])('$name', ({workerClass, workerPath}) => { - /** @type WorkerInterface */ - let worker; let int; afterEach(async () => { - if (worker) { - worker.forceExit(); - await worker.waitForExit(); - } - clearInterval(int); }); @@ -106,174 +108,284 @@ describe.each([ } test('should get memory usage', async () => { - worker = new workerClass({ - childWorkerPath: workerPath, - maxRetries: 0, - workerPath: join(__dirname, '__fixtures__', 'EdgeCasesWorker'), - }); - - const memoryUsagePromise = worker.getMemoryUsage(); - expect(memoryUsagePromise).toBeInstanceOf(Promise); - - expect(await memoryUsagePromise).toBeGreaterThan(0); + await closeWorkerAfter( + new workerClass({ + childWorkerPath: workerPath, + maxRetries: 0, + workerPath: join(__dirname, '__fixtures__', 'EdgeCasesWorker'), + }), + async worker => { + const memoryUsagePromise = worker.getMemoryUsage(); + expect(memoryUsagePromise).toBeInstanceOf(Promise); + + expect(await memoryUsagePromise).toBeGreaterThan(0); + }, + ); }); test('should recycle on idle limit breach', async () => { - worker = new workerClass({ - childWorkerPath: workerPath, - // There is no way this is fitting into 1000 bytes, so it should restart - // after requesting a memory usage update - idleMemoryLimit: 1000, - maxRetries: 0, - workerPath: join(__dirname, '__fixtures__', 'EdgeCasesWorker'), - }); + await closeWorkerAfter( + new workerClass({ + childWorkerPath: workerPath, + // There is no way this is fitting into 1000 bytes, so it should restart + // after requesting a memory usage update + idleMemoryLimit: 1000, + maxRetries: 0, + workerPath: join(__dirname, '__fixtures__', 'EdgeCasesWorker'), + }), + async worker => { + const startSystemId = worker.getWorkerSystemId(); + expect(startSystemId).toBeGreaterThanOrEqual(0); + + worker.checkMemoryUsage(); + + await waitForChange(() => worker.getWorkerSystemId()); + + const systemId = worker.getWorkerSystemId(); + expect(systemId).toBeGreaterThanOrEqual(0); + expect(systemId).not.toEqual(startSystemId); + + await new Promise(resolve => { + setTimeout(resolve, SIGKILL_DELAY + 100); + }); - const startSystemId = worker.getWorkerSystemId(); - expect(startSystemId).toBeGreaterThanOrEqual(0); + expect(worker.isWorkerRunning()).toBeTruthy(); + }, + ); + }); - worker.checkMemoryUsage(); + describe('should automatically recycle on idle limit breach', () => { + let startPid; + let worker; + const orderOfEvents = []; + + beforeAll(() => { + worker = new workerClass({ + childWorkerPath: workerPath, + // There is no way this is fitting into 1000 bytes, so it should restart + // after requesting a memory usage update + idleMemoryLimit: 1000, + maxRetries: 0, + on: { + [WorkerEvents.STATE_CHANGE]: state => { + orderOfEvents.push(state); + }, + }, + silent: true, + workerPath: join(__dirname, '__fixtures__', 'EdgeCasesWorker'), + }); + }); - await waitForChange(() => worker.getWorkerSystemId()); + afterAll(async () => { + if (worker) { + worker.forceExit(); + await worker.waitForExit(); + } + }); - const systemId = worker.getWorkerSystemId(); - expect(systemId).toBeGreaterThanOrEqual(0); - expect(systemId).not.toEqual(startSystemId); - }); + test('initial state', async () => { + startPid = worker.getWorkerSystemId(); + expect(startPid).toBeGreaterThanOrEqual(0); + expect(worker.state).toEqual(WorkerStates.OK); - test('should automatically recycle on idle limit breach', async () => { - worker = new workerClass({ - childWorkerPath: workerPath, - // There is no way this is fitting into 1000 bytes, so it should restart - // after requesting a memory usage update - idleMemoryLimit: 1000, - maxRetries: 0, - workerPath: join(__dirname, '__fixtures__', 'EdgeCasesWorker'), + expect(orderOfEvents).toMatchObject(['ok']); }); - const startPid = worker.getWorkerSystemId(); - expect(startPid).toBeGreaterThanOrEqual(0); + test('new worker starts', async () => { + const onStart = jest.fn(); + const onEnd = jest.fn(); + const onCustom = jest.fn(); + + worker.send( + [CHILD_MESSAGE_CALL, true, 'safeFunction', []], + onStart, + onEnd, + onCustom, + ); + + await waitForChange(() => worker.getWorkerSystemId()); + + const endPid = worker.getWorkerSystemId(); + expect(endPid).toBeGreaterThanOrEqual(0); + expect(endPid).not.toEqual(startPid); + expect(worker.isWorkerRunning()).toBeTruthy(); + expect(worker.state).toEqual(WorkerStates.OK); + }); - const onStart = jest.fn(); - const onEnd = jest.fn(); - const onCustom = jest.fn(); + test( + 'worker continues to run after kill delay', + async () => { + await new Promise(resolve => { + setTimeout(resolve, SIGKILL_DELAY + 100); + }); - worker.send( - [CHILD_MESSAGE_CALL, true, 'safeFunction', []], - onStart, - onEnd, - onCustom, + expect(worker.state).toEqual(WorkerStates.OK); + expect(worker.isWorkerRunning()).toBeTruthy(); + }, + SIGKILL_DELAY * 3, ); - await waitForChange(() => worker.getWorkerSystemId()); - - const endPid = worker.getWorkerSystemId(); - expect(endPid).toBeGreaterThanOrEqual(0); - expect(endPid).not.toEqual(startPid); - }, 10000); - - test('should cleanly exit on crash', async () => { - const workerHeapLimit = 10; - - /** @type WorkerOptions */ - const options = { - childWorkerPath: workerPath, - maxRetries: 0, - silent: true, - workerPath: join(__dirname, '__fixtures__', 'EdgeCasesWorker'), - }; - - if (workerClass === ThreadsWorker) { - options.resourceLimits = { - codeRangeSizeMb: workerHeapLimit * 2, - maxOldGenerationSizeMb: workerHeapLimit, - maxYoungGenerationSizeMb: workerHeapLimit * 2, - stackSizeMb: workerHeapLimit * 2, - }; - } else if (workerClass === ChildProcessWorker) { - options.forkOptions = { - // Forcibly set the heap limit so we can crash the process easily. - execArgv: [`--max-old-space-size=${workerHeapLimit}`], + test('expected state order', () => { + expect(orderOfEvents).toMatchObject([ + 'ok', + 'restarting', + 'starting', + 'ok', + ]); + }); + }); + + describe('should cleanly exit on out of memory crash', () => { + const workerHeapLimit = 50; + + let worker; + let orderOfEvents = []; + + beforeAll(() => { + orderOfEvents = []; + + /** @type WorkerOptions */ + const options = { + childWorkerPath: workerPath, + maxRetries: 0, + on: { + [WorkerEvents.STATE_CHANGE]: state => { + orderOfEvents.push(state); + }, + }, + silent: true, + workerPath: join(__dirname, '__fixtures__', 'EdgeCasesWorker'), }; - } - worker = new workerClass(options); + if (workerClass === ThreadsWorker) { + options.resourceLimits = { + codeRangeSizeMb: workerHeapLimit * 2, + maxOldGenerationSizeMb: workerHeapLimit, + maxYoungGenerationSizeMb: workerHeapLimit * 2, + stackSizeMb: workerHeapLimit * 2, + }; + } else if (workerClass === ChildProcessWorker) { + options.forkOptions = { + // Forcibly set the heap limit so we can crash the process easily. + execArgv: [`--max-old-space-size=${workerHeapLimit}`], + }; + } - const pid = worker.getWorkerSystemId(); - expect(pid).toBeGreaterThanOrEqual(0); + worker = new workerClass(options); + }); - const onStart = jest.fn(); - const onEnd = jest.fn(); - const onCustom = jest.fn(); + afterAll(async () => { + await new Promise(resolve => { + setTimeout(async () => { + if (worker) { + worker.forceExit(); + await worker.waitForExit(); + } - worker.send( - [CHILD_MESSAGE_CALL, true, 'leakMemory', []], - onStart, - onEnd, - onCustom, - ); + resolve(); + }, 500); + }); + }); - await worker.waitForExit(); - }, 15000); + test('starting state', async () => { + const startPid = worker.getWorkerSystemId(); + expect(startPid).toBeGreaterThanOrEqual(0); + }); - test('should handle regular fatal crashes', async () => { - worker = new workerClass({ - childWorkerPath: workerPath, - maxRetries: 4, - workerPath: join(__dirname, '__fixtures__', 'EdgeCasesWorker'), + test('worker ready', async () => { + await worker.waitForWorkerReady(); + expect(worker.state).toEqual(WorkerStates.OK); }); - const startPid = worker.getWorkerSystemId(); - expect(startPid).toBeGreaterThanOrEqual(0); + test('worker crashes and exits', async () => { + const onStart = jest.fn(); + const onEnd = jest.fn(); + const onCustom = jest.fn(); - const onStart = jest.fn(); - const onEnd = jest.fn(); - const onCustom = jest.fn(); + worker.send( + [CHILD_MESSAGE_CALL, true, 'leakMemory', []], + onStart, + onEnd, + onCustom, + ); - worker.send( - [CHILD_MESSAGE_CALL, true, 'fatalExitCode', []], - onStart, - onEnd, - onCustom, - ); + await worker.waitForExit(); - let pidChanges = 0; - - while (true) { - // Ideally this would use Promise.any but it's not supported in Node 14 - // so doing this instead. Essentially what we're doing is looping and - // capturing the pid every time it changes. When it stops changing the - // timeout will be hit and we should be left with a collection of all - // the pids used by the worker. - const newPid = await new Promise(resolve => { - const resolved = false; - - const to = setTimeout(() => { - if (!resolved) { - this.resolved = true; - resolve(undefined); - } - }, 500); + expect(worker.state).not.toEqual(WorkerStates.OK); + }); - waitForChange(() => worker.getWorkerSystemId()).then(() => { - clearTimeout(to); + test('worker stays dead', async () => { + await expect( + async () => await worker.waitForWorkerReady(), + ).rejects.toThrowError(); + expect(worker.isWorkerRunning()).toBeFalsy(); + }); - if (!resolved) { - resolve(worker.getWorkerSystemId()); - } - }); + test('expected state order', () => { + expect(orderOfEvents).toMatchObject([ + WorkerStates.OK, + WorkerStates.OUT_OF_MEMORY, + WorkerStates.SHUT_DOWN, + ]); + }); + }, 15000); + + describe('should handle regular fatal crashes', () => { + let worker; + let startedWorkers = 0; + + beforeAll(() => { + worker = new workerClass({ + childWorkerPath: workerPath, + maxRetries: 4, + on: { + [WorkerEvents.STATE_CHANGE]: state => { + if (state === WorkerStates.OK) { + startedWorkers++; + } + }, + }, + workerPath: join(__dirname, '__fixtures__', 'EdgeCasesWorker'), }); + }); - if (typeof newPid === 'number') { - pidChanges++; - } else { - break; + afterAll(async () => { + if (worker) { + worker.forceExit(); + await worker.waitForExit(); } - } + }); - // Expect the pids to be retries + 1 because it is restarted - // one last time at the end ready for the next request. - expect(pidChanges).toEqual(5); + test('starting state', async () => { + const startPid = worker.getWorkerSystemId(); + expect(startPid).toBeGreaterThanOrEqual(0); + }); - worker.forceExit(); + test('processes restart', async () => { + const onStart = jest.fn(); + const onEnd = jest.fn(); + const onCustom = jest.fn(); + + worker.send( + [CHILD_MESSAGE_CALL, true, 'fatalExitCode', []], + onStart, + onEnd, + onCustom, + ); + + // Give it some time to restart some workers + await new Promise(resolve => setTimeout(resolve, 4000)); + + expect(startedWorkers).toEqual(6); + + expect(worker.isWorkerRunning()).toBeTruthy(); + expect(worker.state).toEqual(WorkerStates.OK); + }); + + test('processes exits', async () => { + worker.forceExit(); + + await expect(() => worker.waitForWorkerReady()).rejects.toThrowError(); + }); }); }); diff --git a/packages/jest-worker/src/workers/__tests__/__fixtures__/EdgeCasesWorker.js b/packages/jest-worker/src/workers/__tests__/__fixtures__/EdgeCasesWorker.js index 482fd7c33508..02ca3754ae26 100644 --- a/packages/jest-worker/src/workers/__tests__/__fixtures__/EdgeCasesWorker.js +++ b/packages/jest-worker/src/workers/__tests__/__fixtures__/EdgeCasesWorker.js @@ -19,7 +19,7 @@ async function leakMemory() { ).toFixed(2)}MB at start`, ); - let i = 0; + let i = Number.MAX_SAFE_INTEGER / 2; while (true) { i++;