From 68b37fa78dcd6961b432ed1b529295ccf7ea025b Mon Sep 17 00:00:00 2001 From: Rob Cameron Date: Wed, 21 Aug 2024 17:23:29 -0700 Subject: [PATCH] Adds more tests! Refactor rw-jobs-worker (#11335) Closes #11336 --- .../src/bins/__tests__/rw-jobs-worker.test.ts | 113 +++++++++++++++++- .../jobs/src/bins/__tests__/rw-jobs.test.ts | 36 +++++- packages/jobs/src/bins/rw-jobs-worker.ts | 61 +++++----- packages/jobs/src/core/JobManager.ts | 5 +- .../jobs/src/core/__tests__/Executor.test.ts | 2 +- 5 files changed, 180 insertions(+), 37 deletions(-) diff --git a/packages/jobs/src/bins/__tests__/rw-jobs-worker.test.ts b/packages/jobs/src/bins/__tests__/rw-jobs-worker.test.ts index 22640a07f84d..90ed66b351e4 100644 --- a/packages/jobs/src/bins/__tests__/rw-jobs-worker.test.ts +++ b/packages/jobs/src/bins/__tests__/rw-jobs-worker.test.ts @@ -1,8 +1,111 @@ -import { describe, expect, it } from 'vitest' +import { beforeEach, describe, expect, it, vi } from 'vitest' -describe('worker', () => { - // TODO: Consider adding tests - it('placeholder', () => { - expect(true).toBeTruthy() +import { MockAdapter, mockLogger } from '../../core/__tests__/mocks.js' +import { JobManager } from '../../core/JobManager.js' +import { Worker } from '../../core/Worker.js' +import { getWorker, processName } from '../rw-jobs-worker.js' + +vi.mock('@redwoodjs/cli-helpers/loadEnvFiles', () => { + return { + loadEnvFiles: () => {}, + } +}) + +const mocks = vi.hoisted(() => { + return { + loadJobsManager: vi.fn(), + } +}) + +vi.mock('../../loaders.js', () => { + return { + loadJobsManager: mocks.loadJobsManager, + } +}) + +describe('processName', () => { + it('sets the process title for a single queue', () => { + const title = processName({ id: 1, queues: 'default' }) + + expect(title).toEqual('rw-jobs-worker.default.1') + }) + + it('sets the process title for an array of queues', () => { + const title = processName({ id: 1, queues: ['default', 'email'] }) + + expect(title).toEqual('rw-jobs-worker.default-email.1') + }) +}) + +describe('getWorker', () => { + beforeEach(() => { + vi.resetAllMocks() + }) + + it('returns an instance of Worker', async () => { + mocks.loadJobsManager.mockImplementation( + () => + new JobManager({ + adapters: { + test: new MockAdapter(), + }, + logger: mockLogger, + queues: ['default'], + workers: [ + { + adapter: 'test', + logger: mockLogger, + queue: '*', + count: 1, + }, + ], + }), + ) + + const worker = await getWorker({ + index: 0, + id: 0, + workoff: false, + clear: false, + }) + + expect(worker).toBeInstanceOf(Worker) + }) + + it('calls getWorker on the manager with the proper values', async () => { + const mockAdapter = new MockAdapter() + mocks.loadJobsManager.mockImplementation( + () => + new JobManager({ + adapters: { + test: mockAdapter, + }, + logger: mockLogger, + queues: ['default'], + workers: [ + { + adapter: 'test', + logger: mockLogger, + queue: '*', + count: 1, + }, + ], + }), + ) + const spy = vi.spyOn(JobManager.prototype, 'createWorker') + + await getWorker({ + index: 0, + id: 0, + workoff: false, + clear: false, + }) + + expect(spy).toHaveBeenCalledWith({ + index: 0, + workoff: false, + clear: false, + processName: 'rw-jobs-worker.*.0', + }) }) }) diff --git a/packages/jobs/src/bins/__tests__/rw-jobs.test.ts b/packages/jobs/src/bins/__tests__/rw-jobs.test.ts index d7f0670fa08e..9129c778200c 100644 --- a/packages/jobs/src/bins/__tests__/rw-jobs.test.ts +++ b/packages/jobs/src/bins/__tests__/rw-jobs.test.ts @@ -22,13 +22,40 @@ vi.mock('node:child_process', () => { }) describe('buildNumWorkers()', () => { - it('turns an array of counts config into an array of arrays', () => { + it('turns a single worker config into an array of arrays', () => { + const config = [ + { + count: 1, + }, + ] + + const result = buildNumWorkers(config) + + expect(result).toEqual([[0, 0]]) + }) + + it('turns a single worker config with more than 1 count an array of arrays', () => { const config = [ { count: 2, }, + ] + + const result = buildNumWorkers(config) + + expect(result).toEqual([ + [0, 0], + [0, 1], + ]) + }) + + it('turns multiple worker configs into an array of arrays', () => { + const config = [ { - count: 1, + count: 2, + }, + { + count: 3, }, ] @@ -38,6 +65,8 @@ describe('buildNumWorkers()', () => { [0, 0], [0, 1], [1, 0], + [1, 1], + [1, 2], ]) }) }) @@ -55,6 +84,7 @@ describe('startWorkers()', () => { startWorkers({ numWorkers: [[0, 0]], logger: mockLogger }) + // single worker only expect(mocks.fork).toHaveBeenCalledWith( expect.stringContaining('rw-jobs-worker.js'), ['--index', '0', '--id', '0'], @@ -79,11 +109,13 @@ describe('startWorkers()', () => { logger: mockLogger, }) + // first worker expect(mocks.fork).toHaveBeenCalledWith( expect.stringContaining('rw-jobs-worker.js'), ['--index', '0', '--id', '0'], expect.any(Object), ) + // second worker expect(mocks.fork).toHaveBeenCalledWith( expect.stringContaining('rw-jobs-worker.js'), ['--index', '0', '--id', '1'], diff --git a/packages/jobs/src/bins/rw-jobs-worker.ts b/packages/jobs/src/bins/rw-jobs-worker.ts index c2354da174ef..f313ef5890e2 100755 --- a/packages/jobs/src/bins/rw-jobs-worker.ts +++ b/packages/jobs/src/bins/rw-jobs-worker.ts @@ -3,7 +3,6 @@ // The process that actually starts an instance of Worker to process jobs. // Can be run independently with `yarn rw-jobs-worker` but by default is forked // by `yarn rw-jobs` and either monitored, or detached to run independently. -import console from 'node:console' import process from 'node:process' import { hideBin } from 'yargs/helpers' @@ -11,11 +10,10 @@ import yargs from 'yargs/yargs' import { loadEnvFiles } from '@redwoodjs/cli-helpers/loadEnvFiles' -import { DEFAULT_LOGGER, PROCESS_TITLE_PREFIX } from '../consts.js' +import { PROCESS_TITLE_PREFIX } from '../consts.js' import type { Worker } from '../core/Worker.js' import { WorkerConfigIndexNotFoundError } from '../errors.js' import { loadJobsManager } from '../loaders.js' -import type { BasicLogger } from '../types.js' loadEnvFiles() @@ -49,28 +47,22 @@ const parseArgs = (argv: string[]) => { .help().argv } -const setProcessTitle = ({ +export const processName = ({ id, - queue, + queues, }: { id: number - queue: string | string[] + queues: string | string[] }) => { - process.title = `${PROCESS_TITLE_PREFIX}.${[queue].flat().join('-')}.${id}` + return `${PROCESS_TITLE_PREFIX}.${[queues].flat().join('-')}.${id}` } -const setupSignals = ({ - worker, - logger, -}: { - worker: Worker - logger: BasicLogger -}) => { +const setupSignals = ({ worker }: { worker: Worker }) => { // if the parent itself receives a ctrl-c it'll pass that to the workers. // workers will exit gracefully by setting `forever` to `false` which will tell // it not to pick up a new job when done with the current one process.on('SIGINT', () => { - logger.warn( + worker.logger.warn( `[${process.title}] SIGINT received at ${new Date().toISOString()}, finishing work...`, ) worker.forever = false @@ -80,16 +72,24 @@ const setupSignals = ({ // instead in which case we exit immediately no matter what state the worker is // in process.on('SIGTERM', () => { - logger.warn( + worker.logger.warn( `[${process.title}] SIGTERM received at ${new Date().toISOString()}, exiting now!`, ) process.exit(0) }) } -const main = async () => { - const { index, id, clear, workoff } = await parseArgs(process.argv) - +export const getWorker = async ({ + index, + id, + clear, + workoff, +}: { + index: number + id: number + clear: boolean + workoff: boolean +}) => { let manager try { @@ -104,20 +104,27 @@ const main = async () => { throw new WorkerConfigIndexNotFoundError(index) } - const logger = workerConfig.logger ?? manager.logger ?? DEFAULT_LOGGER - logger.warn( - `[${process.title}] Starting work at ${new Date().toISOString()}...`, - ) + return manager.createWorker({ + index, + clear, + workoff, + processName: processName({ id, queues: workerConfig.queue }), + }) +} + +const main = async () => { + const { index, id, clear, workoff } = await parseArgs(process.argv) + + const worker = await getWorker({ index, id, clear, workoff }) - setProcessTitle({ id, queue: workerConfig.queue }) + process.title = processName({ id, queues: worker.queues }) - const worker = manager.createWorker({ index, clear, workoff }) worker.run().then(() => { - logger.info(`[${process.title}] Worker finished, shutting down.`) + worker.logger.info(`[${process.title}] Worker finished, shutting down.`) process.exit(0) }) - setupSignals({ worker, logger }) + setupSignals({ worker }) } // Don't actaully run the worker if we're in a test environment diff --git a/packages/jobs/src/core/JobManager.ts b/packages/jobs/src/core/JobManager.ts index 70dd10152492..927068b873f4 100644 --- a/packages/jobs/src/core/JobManager.ts +++ b/packages/jobs/src/core/JobManager.ts @@ -18,6 +18,7 @@ export interface CreateWorkerArgs { index: number workoff: WorkerOptions['workoff'] clear: WorkerOptions['clear'] + processName: string } export class JobManager< @@ -61,7 +62,7 @@ export class JobManager< return jobDefinition as Job } - createWorker({ index, workoff, clear }: CreateWorkerArgs) { + createWorker({ index, workoff, clear, processName }: CreateWorkerArgs) { const config = this.workers[index] const adapter = this.adapters[config.adapter] if (!adapter) { @@ -75,7 +76,7 @@ export class JobManager< maxRuntime: config.maxRuntime, sleepDelay: config.sleepDelay, deleteFailedJobs: config.deleteFailedJobs, - processName: process.title, + processName, queues: [config.queue].flat(), workoff, clear, diff --git a/packages/jobs/src/core/__tests__/Executor.test.ts b/packages/jobs/src/core/__tests__/Executor.test.ts index fb09b82b9dfb..83dbd4162214 100644 --- a/packages/jobs/src/core/__tests__/Executor.test.ts +++ b/packages/jobs/src/core/__tests__/Executor.test.ts @@ -13,7 +13,7 @@ const mocks = vi.hoisted(() => { } }) -vi.mock('../../loaders', () => { +vi.mock('../../loaders.js', () => { return { loadJob: mocks.loadJob, }