diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index 9a1fdc367f4b5..73b1e99ec6559 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -42,7 +42,7 @@ import type { WorkflowRepository } from '@db/repositories/workflow.repository'; import type { ExternalHooks } from './ExternalHooks'; import type { LICENSE_FEATURES, LICENSE_QUOTAS } from './constants'; import type { WorkflowWithSharingsAndCredentials } from './workflows/workflows.types'; -import type { WorkerJobStatusSummary } from './services/orchestration/worker/types'; +import type { RunningJobSummary } from './scaling/types'; import type { Scope } from '@n8n/permissions'; export interface ICredentialsTypeData { @@ -420,7 +420,7 @@ export interface IPushDataWorkerStatusMessage { export interface IPushDataWorkerStatusPayload { workerId: string; - runningJobsSummary: WorkerJobStatusSummary[]; + runningJobsSummary: RunningJobSummary[]; freeMem: number; totalMem: number; uptime: number; diff --git a/packages/cli/src/Queue.ts b/packages/cli/src/Queue.ts deleted file mode 100644 index 11cfed839bee7..0000000000000 --- a/packages/cli/src/Queue.ts +++ /dev/null @@ -1,166 +0,0 @@ -import type Bull from 'bull'; -import Container, { Service } from 'typedi'; -import { - ApplicationError, - BINARY_ENCODING, - type IDataObject, - type ExecutionError, - type IExecuteResponsePromiseData, -} from 'n8n-workflow'; -import { ActiveExecutions } from '@/ActiveExecutions'; -import config from '@/config'; -import { OnShutdown } from './decorators/OnShutdown'; -import { HIGHEST_SHUTDOWN_PRIORITY } from './constants'; - -export type JobId = Bull.JobId; -export type Job = Bull.Job; -export type JobQueue = Bull.Queue; - -export interface JobData { - executionId: string; - loadStaticData: boolean; -} - -export interface JobResponse { - success: boolean; - error?: ExecutionError; -} - -export interface WebhookResponse { - executionId: string; - response: IExecuteResponsePromiseData; -} - -@Service() -export class Queue { - private jobQueue: JobQueue; - - /** - * The number of jobs a single server can process concurrently - * Any worker that wants to process executions must first set this to a non-zero value - */ - private concurrency = 0; - - setConcurrency(concurrency: number) { - this.concurrency = concurrency; - // This sets the max event listeners on the jobQueue EventEmitter to prevent the logs getting flooded with MaxListenersExceededWarning - // see: https://github.com/OptimalBits/bull/blob/develop/lib/job.js#L497-L521 - this.jobQueue.setMaxListeners( - 4 + // `close` - 2 + // `error` - 2 + // `global:progress` - concurrency * 2, // 2 global events for every call to `job.finished()` - ); - } - - constructor(private activeExecutions: ActiveExecutions) {} - - async init() { - const { default: Bull } = await import('bull'); - const { RedisClientService } = await import('@/services/redis/redis-client.service'); - - const redisClientService = Container.get(RedisClientService); - - const bullPrefix = config.getEnv('queue.bull.prefix'); - const prefix = redisClientService.toValidPrefix(bullPrefix); - - this.jobQueue = new Bull('jobs', { - prefix, - settings: config.get('queue.bull.settings'), - createClient: (type) => redisClientService.createClient({ type: `${type}(bull)` }), - }); - - this.jobQueue.on('global:progress', (_jobId, progress: WebhookResponse) => { - this.activeExecutions.resolveResponsePromise( - progress.executionId, - this.decodeWebhookResponse(progress.response), - ); - }); - } - - async findRunningJobBy({ executionId }: { executionId: string }) { - const activeOrWaitingJobs = await this.getJobs(['active', 'waiting']); - - return activeOrWaitingJobs.find(({ data }) => data.executionId === executionId) ?? null; - } - - decodeWebhookResponse(response: IExecuteResponsePromiseData): IExecuteResponsePromiseData { - if ( - typeof response === 'object' && - typeof response.body === 'object' && - (response.body as IDataObject)['__@N8nEncodedBuffer@__'] - ) { - response.body = Buffer.from( - (response.body as IDataObject)['__@N8nEncodedBuffer@__'] as string, - BINARY_ENCODING, - ); - } - - return response; - } - - async add(jobData: JobData, jobOptions: object): Promise { - return await this.jobQueue.add(jobData, jobOptions); - } - - async getJob(jobId: JobId): Promise { - return await this.jobQueue.getJob(jobId); - } - - async getJobs(jobTypes: Bull.JobStatus[]): Promise { - return await this.jobQueue.getJobs(jobTypes); - } - - /** - * Get IDs of executions that are currently in progress in the queue. - */ - async getInProgressExecutionIds() { - const inProgressJobs = await this.getJobs(['active', 'waiting']); - - return new Set(inProgressJobs.map((job) => job.data.executionId)); - } - - async process(fn: Bull.ProcessCallbackFunction): Promise { - return await this.jobQueue.process(this.concurrency, fn); - } - - async ping(): Promise { - return await this.jobQueue.client.ping(); - } - - @OnShutdown(HIGHEST_SHUTDOWN_PRIORITY) - // Stop accepting new jobs, `doNotWaitActive` allows reporting progress - async pause(): Promise { - return await this.jobQueue?.pause(true, true); - } - - getBullObjectInstance(): JobQueue { - if (this.jobQueue === undefined) { - // if queue is not initialized yet throw an error, since we do not want to hand around an undefined queue - throw new ApplicationError('Queue is not initialized yet!'); - } - return this.jobQueue; - } - - /** - * - * @param job A Job instance - * @returns boolean true if we were able to securely stop the job - */ - async stopJob(job: Job): Promise { - if (await job.isActive()) { - // Job is already running so tell it to stop - await job.progress(-1); - return true; - } - // Job did not get started yet so remove from queue - try { - await job.remove(); - return true; - } catch (e) { - await job.progress(-1); - } - - return false; - } -} diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 0eb2040e7f1bb..777472b99fa42 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -211,8 +211,8 @@ export class Server extends AbstractServer { setupPushHandler(restEndpoint, app); if (config.getEnv('executions.mode') === 'queue') { - const { Queue } = await import('@/Queue'); - await Container.get(Queue).init(); + const { ScalingService } = await import('@/scaling/scaling.service'); + await Container.get(ScalingService).setupQueue(); } await handleMfaDisable(); diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index 2f7976a073b30..ac5f586b0ad43 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -28,8 +28,8 @@ import { ExecutionRepository } from '@db/repositories/execution.repository'; import { ExternalHooks } from '@/ExternalHooks'; import type { IExecutionResponse, IWorkflowExecutionDataProcess } from '@/Interfaces'; import { NodeTypes } from '@/NodeTypes'; -import type { Job, JobData, JobResponse } from '@/Queue'; -import { Queue } from '@/Queue'; +import type { Job, JobData, JobResult } from '@/scaling/types'; +import { ScalingService } from '@/scaling/scaling.service'; import * as WorkflowHelpers from '@/WorkflowHelpers'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; import { generateFailedExecutionFromError } from '@/WorkflowHelpers'; @@ -40,7 +40,7 @@ import { EventService } from './events/event.service'; @Service() export class WorkflowRunner { - private jobQueue: Queue; + private readonly scalingService: ScalingService; private executionsMode = config.getEnv('executions.mode'); @@ -55,7 +55,7 @@ export class WorkflowRunner { private readonly eventService: EventService, ) { if (this.executionsMode === 'queue') { - this.jobQueue = Container.get(Queue); + this.scalingService = Container.get(ScalingService); } } @@ -375,9 +375,7 @@ export class WorkflowRunner { let job: Job; let hooks: WorkflowHooks; try { - job = await this.jobQueue.add(jobData, jobOptions); - - this.logger.info(`Started with job ID: ${job.id.toString()} (Execution ID: ${executionId})`); + job = await this.scalingService.addJob(jobData, jobOptions); hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerMain( data.executionMode, @@ -406,8 +404,7 @@ export class WorkflowRunner { async (resolve, reject, onCancel) => { onCancel.shouldReject = false; onCancel(async () => { - const queue = Container.get(Queue); - await queue.stopJob(job); + await Container.get(ScalingService).stopJob(job); // We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the // "workflowExecuteAfter" which we require. @@ -424,11 +421,11 @@ export class WorkflowRunner { reject(error); }); - const jobData: Promise = job.finished(); + const jobData: Promise = job.finished(); const queueRecoveryInterval = config.getEnv('queue.bull.queueRecoveryInterval'); - const racingPromises: Array> = [jobData]; + const racingPromises: Array> = [jobData]; let clearWatchdogInterval; if (queueRecoveryInterval > 0) { @@ -446,9 +443,9 @@ export class WorkflowRunner { ************************************************ */ let watchDogInterval: NodeJS.Timeout | undefined; - const watchDog: Promise = new Promise((res) => { + const watchDog: Promise = new Promise((res) => { watchDogInterval = setInterval(async () => { - const currentJob = await this.jobQueue.getJob(job.id); + const currentJob = await this.scalingService.getJob(job.id); // When null means job is finished (not found in queue) if (currentJob === null) { // Mimic worker's success message diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index 2c8532ec76342..505e2e216170d 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -4,8 +4,8 @@ import { ApplicationError } from 'n8n-workflow'; import config from '@/config'; import { ActiveExecutions } from '@/ActiveExecutions'; +import { ScalingService } from '@/scaling/scaling.service'; import { WebhookServer } from '@/webhooks/WebhookServer'; -import { Queue } from '@/Queue'; import { BaseCommand } from './BaseCommand'; import { OrchestrationWebhookService } from '@/services/orchestration/webhook/orchestration.webhook.service'; @@ -96,7 +96,7 @@ export class Webhook extends BaseCommand { ); } - await Container.get(Queue).init(); + await Container.get(ScalingService).setupQueue(); await this.server.start(); this.logger.debug(`Webhook listener ID: ${this.server.uniqueInstanceId}`); this.logger.info('Webhook listener waiting for requests.'); diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 5681b417273d6..18f039029c020 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -2,21 +2,13 @@ import { Container } from 'typedi'; import { Flags, type Config } from '@oclif/core'; import express from 'express'; import http from 'http'; -import type PCancelable from 'p-cancelable'; -import { WorkflowExecute } from 'n8n-core'; -import type { ExecutionStatus, IExecuteResponsePromiseData, INodeTypes, IRun } from 'n8n-workflow'; -import { Workflow, sleep, ApplicationError } from 'n8n-workflow'; +import { sleep, ApplicationError } from 'n8n-workflow'; import * as Db from '@/Db'; import * as ResponseHelper from '@/ResponseHelper'; -import * as WebhookHelpers from '@/webhooks/WebhookHelpers'; -import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; import config from '@/config'; -import type { Job, JobId, JobResponse, WebhookResponse } from '@/Queue'; -import { Queue } from '@/Queue'; +import { ScalingService } from '@/scaling/scaling.service'; import { N8N_VERSION, inTest } from '@/constants'; -import { ExecutionRepository } from '@db/repositories/execution.repository'; -import { WorkflowRepository } from '@db/repositories/workflow.repository'; import type { ICredentialsOverwrite } from '@/Interfaces'; import { CredentialsOverwrites } from '@/CredentialsOverwrites'; import { rawBodyReader, bodyParser } from '@/middlewares'; @@ -25,10 +17,9 @@ import type { RedisServicePubSubSubscriber } from '@/services/redis/RedisService import { EventMessageGeneric } from '@/eventbus/EventMessageClasses/EventMessageGeneric'; import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service'; import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; -import type { WorkerJobStatusSummary } from '@/services/orchestration/worker/types'; import { ServiceUnavailableError } from '@/errors/response-errors/service-unavailable.error'; import { BaseCommand } from './BaseCommand'; -import { MaxStalledCountError } from '@/errors/max-stalled-count.error'; +import { JobProcessor } from '@/scaling/job-processor'; import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay'; export class Worker extends BaseCommand { @@ -44,15 +35,17 @@ export class Worker extends BaseCommand { }), }; - static runningJobs: { - [key: string]: PCancelable; - } = {}; + /** + * How many jobs this worker may run concurrently. + * + * Taken from env var `N8N_CONCURRENCY_PRODUCTION_LIMIT` if set to a value + * other than -1, else taken from `--concurrency` flag. + */ + concurrency: number; - static runningJobsSummary: { - [jobId: string]: WorkerJobStatusSummary; - } = {}; + scalingService: ScalingService; - static jobQueue: Queue; + jobProcessor: JobProcessor; redisSubscriber: RedisServicePubSubSubscriber; @@ -73,12 +66,12 @@ export class Worker extends BaseCommand { // Wait for active workflow executions to finish let count = 0; - while (Object.keys(Worker.runningJobs).length !== 0) { + while (this.jobProcessor.getRunningJobIds().length !== 0) { if (count++ % 4 === 0) { const waitLeft = Math.ceil((hardStopTimeMs - Date.now()) / 1000); this.logger.info( `Waiting for ${ - Object.keys(Worker.runningJobs).length + Object.keys(this.jobProcessor.getRunningJobIds()).length } active executions to finish... (max wait ${waitLeft} more seconds)`, ); } @@ -92,143 +85,6 @@ export class Worker extends BaseCommand { await this.exitSuccessFully(); } - async runJob(job: Job, nodeTypes: INodeTypes): Promise { - const { executionId, loadStaticData } = job.data; - const executionRepository = Container.get(ExecutionRepository); - const fullExecutionData = await executionRepository.findSingleExecution(executionId, { - includeData: true, - unflattenData: true, - }); - - if (!fullExecutionData) { - this.logger.error( - `Worker failed to find data of execution "${executionId}" in database. Cannot continue.`, - { executionId }, - ); - throw new ApplicationError( - 'Unable to find data of execution in database. Aborting execution.', - { extra: { executionId } }, - ); - } - const workflowId = fullExecutionData.workflowData.id; - - this.logger.info( - `Start job: ${job.id} (Workflow ID: ${workflowId} | Execution: ${executionId})`, - ); - await executionRepository.updateStatus(executionId, 'running'); - - let { staticData } = fullExecutionData.workflowData; - if (loadStaticData) { - const workflowData = await Container.get(WorkflowRepository).findOne({ - select: ['id', 'staticData'], - where: { - id: workflowId, - }, - }); - if (workflowData === null) { - this.logger.error( - 'Worker execution failed because workflow could not be found in database.', - { workflowId, executionId }, - ); - throw new ApplicationError('Workflow could not be found', { extra: { workflowId } }); - } - staticData = workflowData.staticData; - } - - const workflowSettings = fullExecutionData.workflowData.settings ?? {}; - - let workflowTimeout = workflowSettings.executionTimeout ?? config.getEnv('executions.timeout'); // initialize with default - - let executionTimeoutTimestamp: number | undefined; - if (workflowTimeout > 0) { - workflowTimeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')); - executionTimeoutTimestamp = Date.now() + workflowTimeout * 1000; - } - - const workflow = new Workflow({ - id: workflowId, - name: fullExecutionData.workflowData.name, - nodes: fullExecutionData.workflowData.nodes, - connections: fullExecutionData.workflowData.connections, - active: fullExecutionData.workflowData.active, - nodeTypes, - staticData, - settings: fullExecutionData.workflowData.settings, - }); - - const additionalData = await WorkflowExecuteAdditionalData.getBase( - undefined, - undefined, - executionTimeoutTimestamp, - ); - additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter( - fullExecutionData.mode, - job.data.executionId, - fullExecutionData.workflowData, - { - retryOf: fullExecutionData.retryOf as string, - }, - ); - - additionalData.hooks.hookFunctions.sendResponse = [ - async (response: IExecuteResponsePromiseData): Promise => { - const progress: WebhookResponse = { - executionId, - response: WebhookHelpers.encodeWebhookResponse(response), - }; - await job.progress(progress); - }, - ]; - - additionalData.executionId = executionId; - - additionalData.setExecutionStatus = (status: ExecutionStatus) => { - // Can't set the status directly in the queued worker, but it will happen in InternalHook.onWorkflowPostExecute - this.logger.debug(`Queued worker execution status for ${executionId} is "${status}"`); - }; - - let workflowExecute: WorkflowExecute; - let workflowRun: PCancelable; - if (fullExecutionData.data !== undefined) { - workflowExecute = new WorkflowExecute( - additionalData, - fullExecutionData.mode, - fullExecutionData.data, - ); - workflowRun = workflowExecute.processRunExecutionData(workflow); - } else { - // Execute all nodes - // Can execute without webhook so go on - workflowExecute = new WorkflowExecute(additionalData, fullExecutionData.mode); - workflowRun = workflowExecute.run(workflow); - } - - Worker.runningJobs[job.id] = workflowRun; - Worker.runningJobsSummary[job.id] = { - jobId: job.id.toString(), - executionId, - workflowId: fullExecutionData.workflowId ?? '', - workflowName: fullExecutionData.workflowData.name, - mode: fullExecutionData.mode, - startedAt: fullExecutionData.startedAt, - retryOf: fullExecutionData.retryOf ?? '', - status: fullExecutionData.status, - }; - - // Wait till the execution is finished - await workflowRun; - - delete Worker.runningJobs[job.id]; - delete Worker.runningJobsSummary[job.id]; - - // do NOT call workflowExecuteAfter hook here, since it is being called from processSuccessExecution() - // already! - - return { - success: true, - }; - } - constructor(argv: string[], cmdConfig: Config) { super(argv, cmdConfig); @@ -256,6 +112,7 @@ export class Worker extends BaseCommand { this.logger.debug('Starting n8n worker...'); this.logger.debug(`Queue mode id: ${this.queueModeId}`); + await this.setConcurrency(); await super.init(); await this.initLicense(); @@ -268,8 +125,7 @@ export class Worker extends BaseCommand { this.logger.debug('External secrets init complete'); await this.initEventBus(); this.logger.debug('Event bus init complete'); - await this.initQueue(); - this.logger.debug('Queue init complete'); + await this.initScalingService(); await this.initOrchestration(); this.logger.debug('Orchestration init complete'); @@ -301,80 +157,27 @@ export class Worker extends BaseCommand { await Container.get(OrchestrationHandlerWorkerService).initWithOptions({ queueModeId: this.queueModeId, redisPublisher: Container.get(OrchestrationWorkerService).redisPublisher, - getRunningJobIds: () => Object.keys(Worker.runningJobs), - getRunningJobsSummary: () => Object.values(Worker.runningJobsSummary), + getRunningJobIds: () => this.jobProcessor.getRunningJobIds(), + getRunningJobsSummary: () => this.jobProcessor.getRunningJobsSummary(), }); } - async initQueue() { + async setConcurrency() { const { flags } = await this.parse(Worker); - const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold'); - - this.logger.debug( - `Opening Redis connection to listen to messages with timeout ${redisConnectionTimeoutLimit}`, - ); - - Worker.jobQueue = Container.get(Queue); - await Worker.jobQueue.init(); - this.logger.debug('Queue singleton ready'); - const envConcurrency = config.getEnv('executions.concurrency.productionLimit'); - const concurrency = envConcurrency !== -1 ? envConcurrency : flags.concurrency; - Worker.jobQueue.setConcurrency(concurrency); - void Worker.jobQueue.process(async (job) => await this.runJob(job, this.nodeTypes)); + this.concurrency = envConcurrency !== -1 ? envConcurrency : flags.concurrency; + } - Worker.jobQueue.getBullObjectInstance().on('global:progress', (jobId: JobId, progress) => { - // Progress of a job got updated which does get used - // to communicate that a job got canceled. + async initScalingService() { + this.scalingService = Container.get(ScalingService); - if (progress === -1) { - // Job has to get canceled - if (Worker.runningJobs[jobId] !== undefined) { - // Job is processed by current worker so cancel - Worker.runningJobs[jobId].cancel(); - delete Worker.runningJobs[jobId]; - } - } - }); + await this.scalingService.setupQueue(); - let lastTimer = 0; - let cumulativeTimeout = 0; - Worker.jobQueue.getBullObjectInstance().on('error', (error: Error) => { - if (error.toString().includes('ECONNREFUSED')) { - const now = Date.now(); - if (now - lastTimer > 30000) { - // Means we had no timeout at all or last timeout was temporary and we recovered - lastTimer = now; - cumulativeTimeout = 0; - } else { - cumulativeTimeout += now - lastTimer; - lastTimer = now; - if (cumulativeTimeout > redisConnectionTimeoutLimit) { - this.logger.error( - `Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`, - ); - process.exit(1); - } - } - this.logger.warn('Redis unavailable - trying to reconnect...'); - } else if (error.toString().includes('Error initializing Lua scripts')) { - // This is a non-recoverable error - // Happens when worker starts and Redis is unavailable - // Even if Redis comes back online, worker will be zombie - this.logger.error('Error initializing worker.'); - process.exit(2); - } else { - this.logger.error('Error from queue: ', error); - - if (error.message.includes('job stalled more than maxStalledCount')) { - throw new MaxStalledCountError(error); - } + this.scalingService.setupWorker(this.concurrency); - throw error; - } - }); + this.jobProcessor = Container.get(JobProcessor); } async setupHealthMonitor() { @@ -410,7 +213,7 @@ export class Worker extends BaseCommand { // if it loses the connection to redis try { // Redis ping - await Worker.jobQueue.ping(); + await this.scalingService.pingQueue(); } catch (e) { this.logger.error('No Redis connection!', e as Error); const error = new ServiceUnavailableError('No Redis connection!'); @@ -476,18 +279,16 @@ export class Worker extends BaseCommand { } async run() { - const { flags } = await this.parse(Worker); - this.logger.info('\nn8n worker is now ready'); this.logger.info(` * Version: ${N8N_VERSION}`); - this.logger.info(` * Concurrency: ${flags.concurrency}`); + this.logger.info(` * Concurrency: ${this.concurrency}`); this.logger.info(''); if (config.getEnv('queue.health.active')) { await this.setupHealthMonitor(); } - if (process.stdout.isTTY) { + if (!inTest && process.stdout.isTTY) { process.stdin.setRawMode(true); process.stdin.resume(); process.stdin.setEncoding('utf8'); diff --git a/packages/cli/src/errors/max-stalled-count.error.ts b/packages/cli/src/errors/max-stalled-count.error.ts index 6715de0ade837..653ca18eacac7 100644 --- a/packages/cli/src/errors/max-stalled-count.error.ts +++ b/packages/cli/src/errors/max-stalled-count.error.ts @@ -1,7 +1,7 @@ import { ApplicationError } from 'n8n-workflow'; /** - * See https://github.com/OptimalBits/bull/blob/60fa88f08637f0325639988a3f054880a04ce402/docs/README.md?plain=1#L133 + * @docs https://docs.bullmq.io/guide/workers/stalled-jobs */ export class MaxStalledCountError extends ApplicationError { constructor(cause: Error) { diff --git a/packages/cli/src/executions/__tests__/execution.service.test.ts b/packages/cli/src/executions/__tests__/execution.service.test.ts index 04c266a4d071e..879ee707cdc8d 100644 --- a/packages/cli/src/executions/__tests__/execution.service.test.ts +++ b/packages/cli/src/executions/__tests__/execution.service.test.ts @@ -6,14 +6,15 @@ import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.err import { MissingExecutionStopError } from '@/errors/missing-execution-stop.error'; import type { ActiveExecutions } from '@/ActiveExecutions'; import type { IExecutionResponse } from '@/Interfaces'; -import type { Job, Queue } from '@/Queue'; +import type { ScalingService } from '@/scaling/scaling.service'; import type { WaitTracker } from '@/WaitTracker'; import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; import type { ExecutionRequest } from '@/executions/execution.types'; import type { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; +import type { Job } from '@/scaling/types'; describe('ExecutionService', () => { - const queue = mock(); + const scalingService = mock(); const activeExecutions = mock(); const executionRepository = mock(); const waitTracker = mock(); @@ -22,7 +23,7 @@ describe('ExecutionService', () => { const executionService = new ExecutionService( mock(), mock(), - queue, + scalingService, activeExecutions, executionRepository, mock(), @@ -211,7 +212,7 @@ describe('ExecutionService', () => { expect(concurrencyControl.remove).not.toHaveBeenCalled(); expect(waitTracker.stopExecution).not.toHaveBeenCalled(); - expect(queue.stopJob).not.toHaveBeenCalled(); + expect(scalingService.stopJob).not.toHaveBeenCalled(); }); }); @@ -224,7 +225,8 @@ describe('ExecutionService', () => { const execution = mock({ id: '123', status: 'running' }); executionRepository.findSingleExecution.mockResolvedValue(execution); waitTracker.has.mockReturnValue(false); - queue.findRunningJobBy.mockResolvedValue(mock()); + const job = mock({ data: { executionId: '123' } }); + scalingService.findJobsByStatus.mockResolvedValue([job]); executionRepository.stopDuringRun.mockResolvedValue(mock()); /** @@ -237,8 +239,8 @@ describe('ExecutionService', () => { */ expect(waitTracker.stopExecution).not.toHaveBeenCalled(); expect(activeExecutions.stopExecution).toHaveBeenCalled(); - expect(queue.findRunningJobBy).toBeCalledWith({ executionId: execution.id }); - expect(queue.stopJob).toHaveBeenCalled(); + expect(scalingService.findJobsByStatus).toHaveBeenCalled(); + expect(scalingService.stopJob).toHaveBeenCalled(); expect(executionRepository.stopDuringRun).toHaveBeenCalled(); }); @@ -250,7 +252,8 @@ describe('ExecutionService', () => { const execution = mock({ id: '123', status: 'waiting' }); executionRepository.findSingleExecution.mockResolvedValue(execution); waitTracker.has.mockReturnValue(true); - queue.findRunningJobBy.mockResolvedValue(mock()); + const job = mock({ data: { executionId: '123' } }); + scalingService.findJobsByStatus.mockResolvedValue([job]); executionRepository.stopDuringRun.mockResolvedValue(mock()); /** @@ -262,9 +265,8 @@ describe('ExecutionService', () => { * Assert */ expect(waitTracker.stopExecution).toHaveBeenCalledWith(execution.id); - expect(activeExecutions.stopExecution).toHaveBeenCalled(); - expect(queue.findRunningJobBy).toBeCalledWith({ executionId: execution.id }); - expect(queue.stopJob).toHaveBeenCalled(); + expect(scalingService.findJobsByStatus).toHaveBeenCalled(); + expect(scalingService.stopJob).toHaveBeenCalled(); expect(executionRepository.stopDuringRun).toHaveBeenCalled(); }); }); diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts index 435145545ab16..d691df1b3ce36 100644 --- a/packages/cli/src/executions/execution-recovery.service.ts +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -135,9 +135,11 @@ export class ExecutionRecoveryService { return waitMs; } - const { Queue } = await import('@/Queue'); + const { ScalingService } = await import('@/scaling/scaling.service'); - const queuedIds = await Container.get(Queue).getInProgressExecutionIds(); + const runningJobs = await Container.get(ScalingService).findJobsByStatus(['active', 'waiting']); + + const queuedIds = new Set(runningJobs.map((job) => job.data.executionId)); if (queuedIds.size === 0) { this.logger.debug('[Recovery] Completed queue recovery check, no dangling executions'); diff --git a/packages/cli/src/executions/execution.service.ts b/packages/cli/src/executions/execution.service.ts index 0823bde36fe7a..849d06c968c7e 100644 --- a/packages/cli/src/executions/execution.service.ts +++ b/packages/cli/src/executions/execution.service.ts @@ -24,7 +24,7 @@ import type { IWorkflowExecutionDataProcess, } from '@/Interfaces'; import { NodeTypes } from '@/NodeTypes'; -import { Queue } from '@/Queue'; +import { ScalingService } from '@/scaling/scaling.service'; import type { ExecutionRequest, ExecutionSummaries, StopResult } from './execution.types'; import { WorkflowRunner } from '@/WorkflowRunner'; import type { IGetExecutionsQueryFilter } from '@db/repositories/execution.repository'; @@ -85,7 +85,7 @@ export class ExecutionService { constructor( private readonly globalConfig: GlobalConfig, private readonly logger: Logger, - private readonly queue: Queue, + private readonly scalingService: ScalingService, private readonly activeExecutions: ActiveExecutions, private readonly executionRepository: ExecutionRepository, private readonly workflowRepository: WorkflowRepository, @@ -471,10 +471,12 @@ export class ExecutionService { this.waitTracker.stopExecution(execution.id); } - const job = await this.queue.findRunningJobBy({ executionId: execution.id }); + const jobs = await this.scalingService.findJobsByStatus(['active', 'waiting']); + + const job = jobs.find(({ data }) => data.executionId === execution.id); if (job) { - await this.queue.stopJob(job); + await this.scalingService.stopJob(job); } else { this.logger.debug('Job to stop not in queue', { executionId: execution.id }); } diff --git a/packages/cli/src/scaling/constants.ts b/packages/cli/src/scaling/constants.ts new file mode 100644 index 0000000000000..8ef5f716b17aa --- /dev/null +++ b/packages/cli/src/scaling/constants.ts @@ -0,0 +1,3 @@ +export const QUEUE_NAME = 'jobs'; + +export const JOB_TYPE_NAME = 'job'; diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts new file mode 100644 index 0000000000000..6057a1937db76 --- /dev/null +++ b/packages/cli/src/scaling/job-processor.ts @@ -0,0 +1,182 @@ +import { Service } from 'typedi'; +import { BINARY_ENCODING, ApplicationError, Workflow } from 'n8n-workflow'; +import { WorkflowExecute } from 'n8n-core'; +import { Logger } from '@/Logger'; +import config from '@/config'; +import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; +import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; +import { NodeTypes } from '@/NodeTypes'; +import type { ExecutionStatus, IExecuteResponsePromiseData, IRun } from 'n8n-workflow'; +import type { Job, JobId, JobResult, RunningJob, RunningJobSummary } from './types'; +import type PCancelable from 'p-cancelable'; + +/** + * Responsible for processing jobs from the queue, i.e. running enqueued executions. + */ +@Service() +export class JobProcessor { + private readonly runningJobs: { [jobId: JobId]: RunningJob } = {}; + + constructor( + private readonly logger: Logger, + private readonly executionRepository: ExecutionRepository, + private readonly workflowRepository: WorkflowRepository, + private readonly nodeTypes: NodeTypes, + ) {} + + async processJob(job: Job): Promise { + const { executionId, loadStaticData } = job.data; + + const execution = await this.executionRepository.findSingleExecution(executionId, { + includeData: true, + unflattenData: true, + }); + + if (!execution) { + this.logger.error('[JobProcessor] Failed to find execution data', { executionId }); + throw new ApplicationError('Failed to find execution data. Aborting execution.', { + extra: { executionId }, + }); + } + + const workflowId = execution.workflowData.id; + + this.logger.info(`[JobProcessor] Starting job ${job.id} (execution ${executionId})`); + + await this.executionRepository.updateStatus(executionId, 'running'); + + let { staticData } = execution.workflowData; + + if (loadStaticData) { + const workflowData = await this.workflowRepository.findOne({ + select: ['id', 'staticData'], + where: { id: workflowId }, + }); + + if (workflowData === null) { + this.logger.error('[JobProcessor] Failed to find workflow', { workflowId, executionId }); + throw new ApplicationError('Failed to find workflow', { extra: { workflowId } }); + } + + staticData = workflowData.staticData; + } + + const workflowSettings = execution.workflowData.settings ?? {}; + + let workflowTimeout = workflowSettings.executionTimeout ?? config.getEnv('executions.timeout'); + + let executionTimeoutTimestamp: number | undefined; + + if (workflowTimeout > 0) { + workflowTimeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')); + executionTimeoutTimestamp = Date.now() + workflowTimeout * 1000; + } + + const workflow = new Workflow({ + id: workflowId, + name: execution.workflowData.name, + nodes: execution.workflowData.nodes, + connections: execution.workflowData.connections, + active: execution.workflowData.active, + nodeTypes: this.nodeTypes, + staticData, + settings: execution.workflowData.settings, + }); + + const additionalData = await WorkflowExecuteAdditionalData.getBase( + undefined, + undefined, + executionTimeoutTimestamp, + ); + + additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter( + execution.mode, + job.data.executionId, + execution.workflowData, + { retryOf: execution.retryOf as string }, + ); + + additionalData.hooks.hookFunctions.sendResponse = [ + async (response: IExecuteResponsePromiseData): Promise => { + await job.progress({ + kind: 'respond-to-webhook', + executionId, + response: this.encodeWebhookResponse(response), + }); + }, + ]; + + additionalData.executionId = executionId; + + additionalData.setExecutionStatus = (status: ExecutionStatus) => { + // Can't set the status directly in the queued worker, but it will happen in InternalHook.onWorkflowPostExecute + this.logger.debug( + `[JobProcessor] Queued worker execution status for ${executionId} is "${status}"`, + ); + }; + + let workflowExecute: WorkflowExecute; + let workflowRun: PCancelable; + if (execution.data !== undefined) { + workflowExecute = new WorkflowExecute(additionalData, execution.mode, execution.data); + workflowRun = workflowExecute.processRunExecutionData(workflow); + } else { + // Execute all nodes + // Can execute without webhook so go on + workflowExecute = new WorkflowExecute(additionalData, execution.mode); + workflowRun = workflowExecute.run(workflow); + } + + const runningJob: RunningJob = { + run: workflowRun, + executionId, + workflowId: execution.workflowId, + workflowName: execution.workflowData.name, + mode: execution.mode, + startedAt: execution.startedAt, + retryOf: execution.retryOf ?? '', + status: execution.status, + }; + + this.runningJobs[job.id] = runningJob; + + await workflowRun; + + delete this.runningJobs[job.id]; + + this.logger.debug('[JobProcessor] Job finished running', { jobId: job.id, executionId }); + + /** + * @important Do NOT call `workflowExecuteAfter` hook here. + * It is being called from processSuccessExecution() already. + */ + + return { success: true }; + } + + stopJob(jobId: JobId) { + this.runningJobs[jobId]?.run.cancel(); + delete this.runningJobs[jobId]; + } + + getRunningJobIds(): JobId[] { + return Object.keys(this.runningJobs); + } + + getRunningJobsSummary(): RunningJobSummary[] { + return Object.values(this.runningJobs).map(({ run, ...summary }) => summary); + } + + private encodeWebhookResponse( + response: IExecuteResponsePromiseData, + ): IExecuteResponsePromiseData { + if (typeof response === 'object' && Buffer.isBuffer(response.body)) { + response.body = { + '__@N8nEncodedBuffer@__': response.body.toString(BINARY_ENCODING), + }; + } + + return response; + } +} diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts new file mode 100644 index 0000000000000..c52a82e8a0da6 --- /dev/null +++ b/packages/cli/src/scaling/scaling.service.ts @@ -0,0 +1,213 @@ +import Container, { Service } from 'typedi'; +import { ApplicationError, BINARY_ENCODING } from 'n8n-workflow'; +import { ActiveExecutions } from '@/ActiveExecutions'; +import config from '@/config'; +import { Logger } from '@/Logger'; +import { MaxStalledCountError } from '@/errors/max-stalled-count.error'; +import { HIGHEST_SHUTDOWN_PRIORITY } from '@/constants'; +import { OnShutdown } from '@/decorators/OnShutdown'; +import { JOB_TYPE_NAME, QUEUE_NAME } from './constants'; +import { JobProcessor } from './job-processor'; +import type { JobQueue, Job, JobData, JobOptions, JobMessage, JobStatus, JobId } from './types'; +import type { IExecuteResponsePromiseData } from 'n8n-workflow'; + +@Service() +export class ScalingService { + private queue: JobQueue; + + private readonly instanceType = config.getEnv('generic.instanceType'); + + constructor( + private readonly logger: Logger, + private readonly activeExecutions: ActiveExecutions, + private readonly jobProcessor: JobProcessor, + ) {} + + // #region Lifecycle + + async setupQueue() { + const { default: BullQueue } = await import('bull'); + const { RedisClientService } = await import('@/services/redis/redis-client.service'); + const service = Container.get(RedisClientService); + + const bullPrefix = config.getEnv('queue.bull.prefix'); + const prefix = service.toValidPrefix(bullPrefix); + + this.queue = new BullQueue(QUEUE_NAME, { + prefix, + settings: config.get('queue.bull.settings'), + createClient: (type) => service.createClient({ type: `${type}(bull)` }), + }); + + this.registerListeners(); + + this.logger.debug('[ScalingService] Queue setup completed'); + } + + setupWorker(concurrency: number) { + this.assertWorker(); + + void this.queue.process( + JOB_TYPE_NAME, + concurrency, + async (job: Job) => await this.jobProcessor.processJob(job), + ); + + this.logger.debug('[ScalingService] Worker setup completed'); + } + + @OnShutdown(HIGHEST_SHUTDOWN_PRIORITY) + async pauseQueue() { + await this.queue.pause(true, true); + + this.logger.debug('[ScalingService] Queue paused'); + } + + async pingQueue() { + await this.queue.client.ping(); + } + + // #endregion + + // #region Jobs + + async addJob(jobData: JobData, jobOptions: JobOptions) { + const { executionId } = jobData; + + const job = await this.queue.add(JOB_TYPE_NAME, jobData, jobOptions); + + this.logger.info(`[ScalingService] Added job ${job.id} (execution ${executionId})`); + + return job; + } + + async getJob(jobId: JobId) { + return await this.queue.getJob(jobId); + } + + async findJobsByStatus(statuses: JobStatus[]) { + return await this.queue.getJobs(statuses); + } + + async stopJob(job: Job) { + const props = { jobId: job.id, executionId: job.data.executionId }; + + try { + if (await job.isActive()) { + await job.progress({ kind: 'abort-job' }); + this.logger.debug('[ScalingService] Stopped active job', props); + return true; + } + + await job.remove(); + this.logger.debug('[ScalingService] Stopped inactive job', props); + return true; + } catch (error: unknown) { + await job.progress({ kind: 'abort-job' }); + this.logger.error('[ScalingService] Failed to stop job', { ...props, error }); + return false; + } + } + + // #endregion + + // #region Listeners + + private registerListeners() { + this.queue.on('global:progress', (_jobId: JobId, msg: JobMessage) => { + if (msg.kind === 'respond-to-webhook') { + const { executionId, response } = msg; + this.activeExecutions.resolveResponsePromise( + executionId, + this.decodeWebhookResponse(response), + ); + } + }); + + this.queue.on('global:progress', (jobId: JobId, msg: JobMessage) => { + if (msg.kind === 'abort-job') { + this.jobProcessor.stopJob(jobId); + } + }); + + let latestAttemptTs = 0; + let cumulativeTimeoutMs = 0; + + const MAX_TIMEOUT_MS = config.getEnv('queue.bull.redis.timeoutThreshold'); + const RESET_LENGTH_MS = 30_000; + + this.queue.on('error', (error: Error) => { + this.logger.error('[ScalingService] Queue errored', { error }); + + /** + * On Redis connection failure, try to reconnect. On every failed attempt, + * increment a cumulative timeout - if this exceeds a limit, exit the + * process. Reset the cumulative timeout if >30s between retries. + */ + if (error.message.includes('ECONNREFUSED')) { + const nowTs = Date.now(); + if (nowTs - latestAttemptTs > RESET_LENGTH_MS) { + latestAttemptTs = nowTs; + cumulativeTimeoutMs = 0; + } else { + cumulativeTimeoutMs += nowTs - latestAttemptTs; + latestAttemptTs = nowTs; + if (cumulativeTimeoutMs > MAX_TIMEOUT_MS) { + this.logger.error('[ScalingService] Redis unavailable after max timeout'); + this.logger.error('[ScalingService] Exiting process...'); + process.exit(1); + } + } + + this.logger.warn('[ScalingService] Redis unavailable - retrying to connect...'); + return; + } + + if ( + this.instanceType === 'worker' && + error.message.includes('job stalled more than maxStalledCount') + ) { + throw new MaxStalledCountError(error); + } + + /** + * Non-recoverable error on worker start with Redis unavailable. + * Even if Redis recovers, worker will remain unable to process jobs. + */ + if ( + this.instanceType === 'worker' && + error.message.includes('Error initializing Lua scripts') + ) { + this.logger.error('[ScalingService] Fatal error initializing worker', { error }); + this.logger.error('[ScalingService] Exiting process...'); + process.exit(1); + } + + throw error; + }); + } + + // #endregion + + private decodeWebhookResponse( + response: IExecuteResponsePromiseData, + ): IExecuteResponsePromiseData { + if ( + typeof response === 'object' && + typeof response.body === 'object' && + response.body !== null && + '__@N8nEncodedBuffer@__' in response.body && + typeof response.body['__@N8nEncodedBuffer@__'] === 'string' + ) { + response.body = Buffer.from(response.body['__@N8nEncodedBuffer@__'], BINARY_ENCODING); + } + + return response; + } + + private assertWorker() { + if (this.instanceType === 'worker') return; + + throw new ApplicationError('This method must be called on a `worker` instance'); + } +} diff --git a/packages/cli/src/scaling/types.ts b/packages/cli/src/scaling/types.ts new file mode 100644 index 0000000000000..55d49b8c48487 --- /dev/null +++ b/packages/cli/src/scaling/types.ts @@ -0,0 +1,55 @@ +import type { + ExecutionError, + ExecutionStatus, + IExecuteResponsePromiseData, + IRun, + WorkflowExecuteMode as WorkflowExecutionMode, +} from 'n8n-workflow'; +import type Bull from 'bull'; +import type PCancelable from 'p-cancelable'; + +export type JobQueue = Bull.Queue; + +export type Job = Bull.Job; + +export type JobId = Job['id']; + +export type JobData = { + executionId: string; + loadStaticData: boolean; +}; + +export type JobResult = { + success: boolean; + error?: ExecutionError; +}; + +export type JobStatus = Bull.JobStatus; + +export type JobOptions = Bull.JobOptions; + +/** Message sent by worker to queue or by queue to worker. */ +export type JobMessage = RepondToWebhookMessage | AbortJobMessage; + +export type RepondToWebhookMessage = { + kind: 'respond-to-webhook'; + executionId: string; + response: IExecuteResponsePromiseData; +}; + +export type AbortJobMessage = { + kind: 'abort-job'; +}; + +export type RunningJob = { + executionId: string; + workflowId: string; + workflowName: string; + mode: WorkflowExecutionMode; + startedAt: Date; + retryOf: string; + status: ExecutionStatus; + run: PCancelable; +}; + +export type RunningJobSummary = Omit; diff --git a/packages/cli/src/services/orchestration/worker/types.ts b/packages/cli/src/services/orchestration/worker/types.ts index 351c56394a31c..84c515466e2c2 100644 --- a/packages/cli/src/services/orchestration/worker/types.ts +++ b/packages/cli/src/services/orchestration/worker/types.ts @@ -1,11 +1,12 @@ import type { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow'; import type { RedisServicePubSubPublisher } from '../../redis/RedisServicePubSubPublisher'; +import type { RunningJobSummary } from '@/scaling/types'; export interface WorkerCommandReceivedHandlerOptions { queueModeId: string; redisPublisher: RedisServicePubSubPublisher; - getRunningJobIds: () => string[]; - getRunningJobsSummary: () => WorkerJobStatusSummary[]; + getRunningJobIds: () => Array; + getRunningJobsSummary: () => RunningJobSummary[]; } export interface WorkerJobStatusSummary { diff --git a/packages/cli/src/webhooks/WebhookHelpers.ts b/packages/cli/src/webhooks/WebhookHelpers.ts index 8a98c74047985..f090a7ea7e318 100644 --- a/packages/cli/src/webhooks/WebhookHelpers.ts +++ b/packages/cli/src/webhooks/WebhookHelpers.ts @@ -20,7 +20,6 @@ import type { IDataObject, IDeferredPromise, IExecuteData, - IExecuteResponsePromiseData, IHttpRequestMethods, IN8nHttpFullResponse, INode, @@ -190,18 +189,6 @@ export function getWorkflowWebhooks( return returnData; } -export function encodeWebhookResponse( - response: IExecuteResponsePromiseData, -): IExecuteResponsePromiseData { - if (typeof response === 'object' && Buffer.isBuffer(response.body)) { - response.body = { - '__@N8nEncodedBuffer@__': response.body.toString(BINARY_ENCODING), - }; - } - - return response; -} - const normalizeFormData = (values: Record) => { for (const key in values) { const value = values[key]; diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index 54c15d381de95..205a60307c643 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -1,5 +1,4 @@ import { BinaryDataService } from 'n8n-core'; -import { mock } from 'jest-mock-extended'; import { Worker } from '@/commands/worker'; import config from '@/config'; @@ -11,7 +10,7 @@ import { OrchestrationHandlerWorkerService } from '@/services/orchestration/work import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; import { License } from '@/License'; import { ExternalHooks } from '@/ExternalHooks'; -import { type JobQueue, Queue } from '@/Queue'; +import { ScalingService } from '@/scaling/scaling.service'; import { setupTestCommand } from '@test-integration/utils/testCommand'; import { mockInstance } from '../../shared/mocking'; @@ -28,12 +27,10 @@ const license = mockInstance(License); const messageEventBus = mockInstance(MessageEventBus); const logStreamingEventRelay = mockInstance(LogStreamingEventRelay); const orchestrationHandlerWorkerService = mockInstance(OrchestrationHandlerWorkerService); -const queue = mockInstance(Queue); +const scalingService = mockInstance(ScalingService); const orchestrationWorkerService = mockInstance(OrchestrationWorkerService); const command = setupTestCommand(Worker); -queue.getBullObjectInstance.mockReturnValue(mock({ on: jest.fn() })); - test('worker initializes all its components', async () => { const worker = await command.run(); @@ -45,9 +42,9 @@ test('worker initializes all its components', async () => { expect(externalHooks.init).toHaveBeenCalledTimes(1); expect(externalSecretsManager.init).toHaveBeenCalledTimes(1); expect(messageEventBus.initialize).toHaveBeenCalledTimes(1); + expect(scalingService.setupQueue).toHaveBeenCalledTimes(1); + expect(scalingService.setupWorker).toHaveBeenCalledTimes(1); expect(logStreamingEventRelay.init).toHaveBeenCalledTimes(1); - expect(queue.init).toHaveBeenCalledTimes(1); - expect(queue.process).toHaveBeenCalledTimes(1); expect(orchestrationWorkerService.init).toHaveBeenCalledTimes(1); expect(orchestrationHandlerWorkerService.initWithOptions).toHaveBeenCalledTimes(1); expect(messageEventBus.send).toHaveBeenCalledTimes(1);