From 6fc6d6fc7bf64f342478bd49261114bc905733e5 Mon Sep 17 00:00:00 2001 From: Lukas Hroch Date: Wed, 9 Oct 2024 11:42:57 +0100 Subject: [PATCH] fix(scheduler): update scheduled jobs mgmt (V4-1288) --- .../api/src/http/routers/admin/task.router.ts | 11 +- apps/api/src/jobs/job.ts | 6 +- .../core/queues/jobs-queue-handler.ts | 2 +- .../core/queues/tasks-queue-handler.ts | 165 +++++------------- packages/common/src/types/jobs.ts | 2 +- 5 files changed, 55 insertions(+), 131 deletions(-) diff --git a/apps/api/src/http/routers/admin/task.router.ts b/apps/api/src/http/routers/admin/task.router.ts index 6ad517dc59..78022df5ba 100644 --- a/apps/api/src/http/routers/admin/task.router.ts +++ b/apps/api/src/http/routers/admin/task.router.ts @@ -41,7 +41,7 @@ export function task() { await uniqueMiddleware(body.name, { req }); const task = await Task.create(body); - await req.scope.cradle.scheduler.tasks.addJob(task); + await req.scope.cradle.scheduler.tasks.updateTaskInQueue(task); return { status: 201, body: task }; }, @@ -53,7 +53,7 @@ export function task() { if (!task) throw new NotFoundError(); - const bullJob = await req.scope.cradle.scheduler.tasks.getRepeatableJobById(taskId); + const bullJob = await req.scope.cradle.scheduler.tasks.getScheduledJobById(taskId); return { status: 200, body: { ...task.get(), bullJob } }; }, @@ -68,9 +68,8 @@ export function task() { throw new NotFoundError(); await task.update(body); - await req.scope.cradle.scheduler.tasks.updateJob(task); - - const bullJob = await req.scope.cradle.scheduler.tasks.getRepeatableJobById(taskId); + await req.scope.cradle.scheduler.tasks.updateTaskInQueue(task); + const bullJob = await req.scope.cradle.scheduler.tasks.getScheduledJobById(taskId); return { status: 200, body: { ...task.get(), bullJob } }; }, @@ -82,8 +81,8 @@ export function task() { if (!task) throw new NotFoundError(); + await req.scope.cradle.scheduler.tasks.removeTaskFromQueue(task); await task.destroy(); - await req.scope.cradle.scheduler.tasks.removeJob(task); return { status: 204, body: undefined }; }, diff --git a/apps/api/src/jobs/job.ts b/apps/api/src/jobs/job.ts index c18c958cd8..fbc1301455 100644 --- a/apps/api/src/jobs/job.ts +++ b/apps/api/src/jobs/job.ts @@ -26,7 +26,7 @@ export default abstract class Job { this.logger = logger; } - protected init(job: BullJob>): void { + protected init(job: BullJob>): void { const { id, data: { params }, @@ -48,11 +48,11 @@ export default abstract class Job { * To be implemented by each job * * @abstract - * @param {BullJob>} job + * @param {BullJob>} job * @returns {Promise} * @memberof Job */ - abstract run(job: BullJob>): Promise; + abstract run(job: BullJob>): Promise; /** * Get current progress diff --git a/apps/api/src/services/core/queues/jobs-queue-handler.ts b/apps/api/src/services/core/queues/jobs-queue-handler.ts index 89561fb903..34e2752aa5 100644 --- a/apps/api/src/services/core/queues/jobs-queue-handler.ts +++ b/apps/api/src/services/core/queues/jobs-queue-handler.ts @@ -198,7 +198,7 @@ export default class JobsQueueHandler extends QueueHandler { private async queueJob(job: DbJob, options: JobsOptions = {}) { const { id, type, params } = job; - const bullJob = await this.queue.add(type, { params }, { ...options, jobId: `db-${id}` }); + const bullJob = await this.queue.add(type, { type, params }, { ...options, jobId: `db-${id}` }); this.logger.debug(`Queue ${this.name}: Job ${id} | ${type} queued.`); diff --git a/apps/api/src/services/core/queues/tasks-queue-handler.ts b/apps/api/src/services/core/queues/tasks-queue-handler.ts index c84b431e1c..7cc11bbd06 100644 --- a/apps/api/src/services/core/queues/tasks-queue-handler.ts +++ b/apps/api/src/services/core/queues/tasks-queue-handler.ts @@ -5,7 +5,6 @@ import type { IoC } from '@intake24/api/ioc'; import type { Job } from '@intake24/api/jobs'; import type { JobData } from '@intake24/common/types'; import ioc from '@intake24/api/ioc'; -import { sleep } from '@intake24/api/util'; import { Task } from '@intake24/db'; import { QueueHandler } from './queue-handler'; @@ -63,172 +62,98 @@ export default class TasksQueueHandler extends QueueHandler { this.workers.push(worker); - await this.clearRepeatableJobs(); - await this.loadRepeatableJobs(); + await this.clearScheduledJobs(); + await this.loadScheduledJobs(); this.logger.info(`${this.name} has been loaded.`); } async processor(job: BullJob) { - const { id, name } = job; + const { id, data: { type } } = job; if (!id) { this.logger.error(`Queue ${this.name}: Job ID missing.`); return; } - const newJob = ioc.resolve>(name); + const newJob = ioc.resolve>(type); await newJob.run(job); } - async getRepeatableJobById(id: string) { - const jobs = await this.queue.getRepeatableJobs(); + private async clearScheduledJobs() { + const repeatableJobs = await this.queue.getJobSchedulers(); - return jobs.find(job => job.id?.replace('db-', '') === id); + await Promise.all(repeatableJobs.map(job => this.queue.removeJobScheduler(job.key))); } - /** - * Remove repeatable job(s) from queue - * - * @private - * @param {string} [id] - * @memberof TasksQueueHandler - */ - private async clearRepeatableJobs(id?: string) { - const repeatableJobs = await this.queue.getRepeatableJobs(); + private createJobParams(task: Task) { + const { name, job, cron, params } = task; - for (const job of repeatableJobs) { - if (id && job.id?.replace('db-', '') !== id) - continue; - - await this.queue.removeRepeatableByKey(job.key); - } + return { + name, + data: { type: job, params }, + opts: { pattern: cron }, + }; } - /** - * Load repeatable jobs from DB to the queue - * - * @private - * @memberof TasksQueueHandler - */ - private async loadRepeatableJobs() { + private async loadScheduledJobs() { const tasks = await Task.findAll({ where: { active: true } }); - for (const task of tasks) - await this.addJob(task); + for (const task of tasks) { + const { name, data, opts } = this.createJobParams(task); + await this.queue.upsertJobScheduler(`db-${task.id}`, opts, { name, data }); + } } - /** - * Push job into the queue - * - * @private - * @param {Task} task - * @memberof TasksQueueHandler - */ - private async queueJob(task: Task) { - const { id, job, cron, params } = task; + private async getScheduledJobByKey(key: string) { + const jobs = await this.queue.getJobSchedulers(); - return await this.queue.add(job, { params }, { repeat: { pattern: cron }, jobId: `db-${id}` }); + return jobs.find(job => job.key === key); } - /** - * Remove job from queue - * - * @private - * @param {Task} task - * @memberof TasksQueueHandler - */ - private async dequeueJob(task: Task) { - this.clearRepeatableJobs(task.id); + async getScheduledJobById(id: string) { + return await this.getScheduledJobByKey(`db-${id}`); } - /** - * Add task's job to queue - * - * @param {Task} task - * @memberof TasksQueueHandler - */ - public async addJob(task: Task) { - const { id, name, active } = task; - - if (!active) { - this.logger.warn(`Queue ${this.name}: Task (ID: ${id}, Name: ${name}) not set as active.`); - return; + private async removeScheduledJobById(id: string) { + const job = await this.getScheduledJobById(id); + if (!job) { + this.logger.debug(`Queue ${this.name}: Scheduled task (ID: ${id}) not in queue.`); + return undefined; } - await this.queueJob(task); - - this.logger.debug(`Queue ${this.name}: Task (ID: ${id}, Name: ${name}) added.`); + return await this.queue.removeJobScheduler(job.key); } - /** - * Update task's job to queue - * - * @param {Task} task - * @memberof TasksQueueHandler - */ - public async updateJob(task: Task) { - const { id, name, active } = task; + public async updateTaskInQueue(task: Task) { + const { id, active } = task; - await this.dequeueJob(task); - - /* - * Bullmq bug - * When repeatable job removed right away and new job pushed in, job entry doesn't end up in redis store - * Though queue.add returns correct job entry - * Workaround: simple sleep/wait for few ms solves it for now - */ - await sleep(20); + if (!active) { + await this.removeScheduledJobById(id); + return; + } - if (active) - await this.queueJob(task); + const { name, data, opts } = this.createJobParams(task); + const job = await this.queue.upsertJobScheduler(`db-${id}`, opts, { name, data }); this.logger.debug(`Queue ${this.name}: Task (ID: ${id}, Name: ${name}) updated.`); - } - - /** - * Remove task's job from queue - * - * @param {Task} task - * @memberof TasksQueueHandler - */ - public async removeJob(task: Task) { - const { id, name } = task; - - await this.dequeueJob(task); - this.logger.debug(`Queue ${this.name}: Task (ID: ${id}, Name: ${name}) removed.`); + return job; } - /** - * Add task's job to queue - * - * @param {Task} task - * @memberof TasksQueueHandler - */ - public async runJob(task: Task) { - const { id, name, job, params } = task; + public async removeTaskFromQueue(task: Task) { + const { id, name } = task; - await this.queue.add(job, { params }, { delay: 500 }); + await this.removeScheduledJobById(id); - this.logger.debug(`Queue ${this.name}: Task (ID: ${id}, Name: ${name}) queued.`); + this.logger.debug(`Queue ${this.name}: Task (ID: ${id}, Name: ${name}) updated.`); } - /** - * Pause all scheduled tasks in queue - * - * @memberof TasksQueueHandler - */ public async pauseAll() { - await this.clearRepeatableJobs(); + await this.clearScheduledJobs(); } - /** - * Resume all scheduled tasks in queue - * - * @memberof TasksQueueHandler - */ public async resumeAll() { - await this.loadRepeatableJobs(); + await this.loadScheduledJobs(); } } diff --git a/packages/common/src/types/jobs.ts b/packages/common/src/types/jobs.ts index 315e1f4818..21f0a95b54 100644 --- a/packages/common/src/types/jobs.ts +++ b/packages/common/src/types/jobs.ts @@ -19,7 +19,7 @@ export const repeatableBullJob = z.object({ export type RepeatableBullJob = z.infer; -export type JobData = { params: T }; +export type JobData = { type: T; params: JobParams[T] }; export const redisStoreTypes = ['cache', 'rateLimiter', 'session'] as const; export type RedisStoreType = (typeof redisStoreTypes)[number];