Skip to content

Commit

Permalink
Adds more tests! Refactor rw-jobs-worker (#11335)
Browse files Browse the repository at this point in the history
Closes #11336
  • Loading branch information
cannikin committed Aug 22, 2024
1 parent 78f01a2 commit 68b37fa
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 37 deletions.
113 changes: 108 additions & 5 deletions packages/jobs/src/bins/__tests__/rw-jobs-worker.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,111 @@
import { describe, expect, it } from 'vitest'
import { beforeEach, describe, expect, it, vi } from 'vitest'

describe('worker', () => {
// TODO: Consider adding tests
it('placeholder', () => {
expect(true).toBeTruthy()
import { MockAdapter, mockLogger } from '../../core/__tests__/mocks.js'
import { JobManager } from '../../core/JobManager.js'
import { Worker } from '../../core/Worker.js'
import { getWorker, processName } from '../rw-jobs-worker.js'

vi.mock('@redwoodjs/cli-helpers/loadEnvFiles', () => {
return {
loadEnvFiles: () => {},
}
})

const mocks = vi.hoisted(() => {
return {
loadJobsManager: vi.fn(),
}
})

vi.mock('../../loaders.js', () => {
return {
loadJobsManager: mocks.loadJobsManager,
}
})

describe('processName', () => {
it('sets the process title for a single queue', () => {
const title = processName({ id: 1, queues: 'default' })

expect(title).toEqual('rw-jobs-worker.default.1')
})

it('sets the process title for an array of queues', () => {
const title = processName({ id: 1, queues: ['default', 'email'] })

expect(title).toEqual('rw-jobs-worker.default-email.1')
})
})

describe('getWorker', () => {
beforeEach(() => {
vi.resetAllMocks()
})

it('returns an instance of Worker', async () => {
mocks.loadJobsManager.mockImplementation(
() =>
new JobManager({
adapters: {
test: new MockAdapter(),
},
logger: mockLogger,
queues: ['default'],
workers: [
{
adapter: 'test',
logger: mockLogger,
queue: '*',
count: 1,
},
],
}),
)

const worker = await getWorker({
index: 0,
id: 0,
workoff: false,
clear: false,
})

expect(worker).toBeInstanceOf(Worker)
})

it('calls getWorker on the manager with the proper values', async () => {
const mockAdapter = new MockAdapter()
mocks.loadJobsManager.mockImplementation(
() =>
new JobManager({
adapters: {
test: mockAdapter,
},
logger: mockLogger,
queues: ['default'],
workers: [
{
adapter: 'test',
logger: mockLogger,
queue: '*',
count: 1,
},
],
}),
)
const spy = vi.spyOn(JobManager.prototype, 'createWorker')

await getWorker({
index: 0,
id: 0,
workoff: false,
clear: false,
})

expect(spy).toHaveBeenCalledWith({
index: 0,
workoff: false,
clear: false,
processName: 'rw-jobs-worker.*.0',
})
})
})
36 changes: 34 additions & 2 deletions packages/jobs/src/bins/__tests__/rw-jobs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,40 @@ vi.mock('node:child_process', () => {
})

describe('buildNumWorkers()', () => {
it('turns an array of counts config into an array of arrays', () => {
it('turns a single worker config into an array of arrays', () => {
const config = [
{
count: 1,
},
]

const result = buildNumWorkers(config)

expect(result).toEqual([[0, 0]])
})

it('turns a single worker config with more than 1 count an array of arrays', () => {
const config = [
{
count: 2,
},
]

const result = buildNumWorkers(config)

expect(result).toEqual([
[0, 0],
[0, 1],
])
})

it('turns multiple worker configs into an array of arrays', () => {
const config = [
{
count: 1,
count: 2,
},
{
count: 3,
},
]

Expand All @@ -38,6 +65,8 @@ describe('buildNumWorkers()', () => {
[0, 0],
[0, 1],
[1, 0],
[1, 1],
[1, 2],
])
})
})
Expand All @@ -55,6 +84,7 @@ describe('startWorkers()', () => {

startWorkers({ numWorkers: [[0, 0]], logger: mockLogger })

// single worker only
expect(mocks.fork).toHaveBeenCalledWith(
expect.stringContaining('rw-jobs-worker.js'),
['--index', '0', '--id', '0'],
Expand All @@ -79,11 +109,13 @@ describe('startWorkers()', () => {
logger: mockLogger,
})

// first worker
expect(mocks.fork).toHaveBeenCalledWith(
expect.stringContaining('rw-jobs-worker.js'),
['--index', '0', '--id', '0'],
expect.any(Object),
)
// second worker
expect(mocks.fork).toHaveBeenCalledWith(
expect.stringContaining('rw-jobs-worker.js'),
['--index', '0', '--id', '1'],
Expand Down
61 changes: 34 additions & 27 deletions packages/jobs/src/bins/rw-jobs-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@
// The process that actually starts an instance of Worker to process jobs.
// Can be run independently with `yarn rw-jobs-worker` but by default is forked
// by `yarn rw-jobs` and either monitored, or detached to run independently.
import console from 'node:console'
import process from 'node:process'

import { hideBin } from 'yargs/helpers'
import yargs from 'yargs/yargs'

import { loadEnvFiles } from '@redwoodjs/cli-helpers/loadEnvFiles'

import { DEFAULT_LOGGER, PROCESS_TITLE_PREFIX } from '../consts.js'
import { PROCESS_TITLE_PREFIX } from '../consts.js'
import type { Worker } from '../core/Worker.js'
import { WorkerConfigIndexNotFoundError } from '../errors.js'
import { loadJobsManager } from '../loaders.js'
import type { BasicLogger } from '../types.js'

loadEnvFiles()

Expand Down Expand Up @@ -49,28 +47,22 @@ const parseArgs = (argv: string[]) => {
.help().argv
}

const setProcessTitle = ({
export const processName = ({
id,
queue,
queues,
}: {
id: number
queue: string | string[]
queues: string | string[]
}) => {
process.title = `${PROCESS_TITLE_PREFIX}.${[queue].flat().join('-')}.${id}`
return `${PROCESS_TITLE_PREFIX}.${[queues].flat().join('-')}.${id}`
}

const setupSignals = ({
worker,
logger,
}: {
worker: Worker
logger: BasicLogger
}) => {
const setupSignals = ({ worker }: { worker: Worker }) => {
// if the parent itself receives a ctrl-c it'll pass that to the workers.
// workers will exit gracefully by setting `forever` to `false` which will tell
// it not to pick up a new job when done with the current one
process.on('SIGINT', () => {
logger.warn(
worker.logger.warn(
`[${process.title}] SIGINT received at ${new Date().toISOString()}, finishing work...`,
)
worker.forever = false
Expand All @@ -80,16 +72,24 @@ const setupSignals = ({
// instead in which case we exit immediately no matter what state the worker is
// in
process.on('SIGTERM', () => {
logger.warn(
worker.logger.warn(
`[${process.title}] SIGTERM received at ${new Date().toISOString()}, exiting now!`,
)
process.exit(0)
})
}

const main = async () => {
const { index, id, clear, workoff } = await parseArgs(process.argv)

export const getWorker = async ({
index,
id,
clear,
workoff,
}: {
index: number
id: number
clear: boolean
workoff: boolean
}) => {
let manager

try {
Expand All @@ -104,20 +104,27 @@ const main = async () => {
throw new WorkerConfigIndexNotFoundError(index)
}

const logger = workerConfig.logger ?? manager.logger ?? DEFAULT_LOGGER
logger.warn(
`[${process.title}] Starting work at ${new Date().toISOString()}...`,
)
return manager.createWorker({
index,
clear,
workoff,
processName: processName({ id, queues: workerConfig.queue }),
})
}

const main = async () => {
const { index, id, clear, workoff } = await parseArgs(process.argv)

const worker = await getWorker({ index, id, clear, workoff })

setProcessTitle({ id, queue: workerConfig.queue })
process.title = processName({ id, queues: worker.queues })

const worker = manager.createWorker({ index, clear, workoff })
worker.run().then(() => {
logger.info(`[${process.title}] Worker finished, shutting down.`)
worker.logger.info(`[${process.title}] Worker finished, shutting down.`)
process.exit(0)
})

setupSignals({ worker, logger })
setupSignals({ worker })
}

// Don't actaully run the worker if we're in a test environment
Expand Down
5 changes: 3 additions & 2 deletions packages/jobs/src/core/JobManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export interface CreateWorkerArgs {
index: number
workoff: WorkerOptions['workoff']
clear: WorkerOptions['clear']
processName: string
}

export class JobManager<
Expand Down Expand Up @@ -61,7 +62,7 @@ export class JobManager<
return jobDefinition as Job<TQueues, TArgs>
}

createWorker({ index, workoff, clear }: CreateWorkerArgs) {
createWorker({ index, workoff, clear, processName }: CreateWorkerArgs) {
const config = this.workers[index]
const adapter = this.adapters[config.adapter]
if (!adapter) {
Expand All @@ -75,7 +76,7 @@ export class JobManager<
maxRuntime: config.maxRuntime,
sleepDelay: config.sleepDelay,
deleteFailedJobs: config.deleteFailedJobs,
processName: process.title,
processName,
queues: [config.queue].flat(),
workoff,
clear,
Expand Down
2 changes: 1 addition & 1 deletion packages/jobs/src/core/__tests__/Executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const mocks = vi.hoisted(() => {
}
})

vi.mock('../../loaders', () => {
vi.mock('../../loaders.js', () => {
return {
loadJob: mocks.loadJob,
}
Expand Down

0 comments on commit 68b37fa

Please sign in to comment.