diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 0ea21c18b36e5..e26829d8bc12b 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -38,6 +38,7 @@ import { EventMessageGeneric } from '@/eventbus/EventMessageClasses/EventMessage import { IConfig } from '@oclif/config'; import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service'; import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; +import type { WorkerJobStatusSummary } from '../services/orchestration/worker/types'; export class Worker extends BaseCommand { static description = '\nStarts a n8n worker'; @@ -56,6 +57,10 @@ export class Worker extends BaseCommand { [key: string]: PCancelable; } = {}; + static runningJobsSummary: { + [jobId: string]: WorkerJobStatusSummary; + } = {}; + static jobQueue: JobQueue; redisSubscriber: RedisServicePubSubSubscriber; @@ -232,11 +237,22 @@ export class Worker extends BaseCommand { } Worker.runningJobs[job.id] = workflowRun; + Worker.runningJobsSummary[job.id] = { + jobId: job.id.toString(), + executionId, + workflowId: fullExecutionData.workflowId ?? '', + workflowName: fullExecutionData.workflowData.name, + mode: fullExecutionData.mode, + startedAt: fullExecutionData.startedAt, + retryOf: fullExecutionData.retryOf ?? '', + status: fullExecutionData.status, + }; // Wait till the execution is finished await workflowRun; delete Worker.runningJobs[job.id]; + delete Worker.runningJobsSummary[job.id]; // do NOT call workflowExecuteAfter hook here, since it is being called from processSuccessExecution() // already! @@ -305,6 +321,7 @@ export class Worker extends BaseCommand { instanceId: this.instanceId, redisPublisher: Container.get(OrchestrationWorkerService).redisPublisher, getRunningJobIds: () => Object.keys(Worker.runningJobs), + getRunningJobsSummary: () => Object.values(Worker.runningJobsSummary), }); } diff --git a/packages/cli/src/services/orchestration.handler.base.service.ts b/packages/cli/src/services/orchestration.handler.base.service.ts index 933ac0dd40612..6a706434e43bb 100644 --- a/packages/cli/src/services/orchestration.handler.base.service.ts +++ b/packages/cli/src/services/orchestration.handler.base.service.ts @@ -1,7 +1,7 @@ import Container from 'typedi'; -import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/handleCommandMessageWorker'; import { RedisService } from './redis.service'; import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; +import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/types'; export abstract class OrchestrationHandlerService { protected initialized = false; diff --git a/packages/cli/src/services/orchestration/helpers.ts b/packages/cli/src/services/orchestration/helpers.ts index 4928a6feb1b7d..0c8fe74ae3e08 100644 --- a/packages/cli/src/services/orchestration/helpers.ts +++ b/packages/cli/src/services/orchestration/helpers.ts @@ -1,6 +1,7 @@ import { LoggerProxy, jsonParse } from 'n8n-workflow'; import type { RedisServiceCommandObject } from '../redis/RedisServiceCommands'; import { COMMAND_REDIS_CHANNEL } from '../redis/RedisServiceHelper'; +import * as os from 'os'; export interface RedisServiceCommandLastReceived { [date: string]: Date; @@ -31,3 +32,9 @@ export function debounceMessageReceiver(message: RedisServiceCommandObject, time lastReceived[message.command] = now; return true; } + +export function getOsCpuString(): string { + const cpus = os.cpus(); + if (cpus.length === 0) return 'no CPU info'; + return `${cpus.length}x ${cpus[0].model} - speed: ${cpus[0].speed}`; +} diff --git a/packages/cli/src/services/orchestration/main/handleWorkerResponseMessageMain.ts b/packages/cli/src/services/orchestration/main/handleWorkerResponseMessageMain.ts index 2d5251dfb5c10..3096f90f4f050 100644 --- a/packages/cli/src/services/orchestration/main/handleWorkerResponseMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleWorkerResponseMessageMain.ts @@ -5,7 +5,9 @@ export async function handleWorkerResponseMessageMain(messageString: string) { const workerResponse = jsonParse(messageString); if (workerResponse) { // TODO: Handle worker response - LoggerProxy.debug('Received worker response', workerResponse); + LoggerProxy.debug( + `Received worker response ${workerResponse.command} from ${workerResponse.workerId}`, + ); } return workerResponse; } diff --git a/packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts b/packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts index 9fa68311f674a..2b47ce8099dc5 100644 --- a/packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts +++ b/packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts @@ -1,20 +1,13 @@ import { jsonParse, LoggerProxy } from 'n8n-workflow'; import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands'; import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper'; -import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; import * as os from 'os'; import Container from 'typedi'; import { License } from '@/License'; import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; -import { debounceMessageReceiver } from '../helpers'; - -export interface WorkerCommandReceivedHandlerOptions { - queueModeId: string; - instanceId: string; - redisPublisher: RedisServicePubSubPublisher; - getRunningJobIds: () => string[]; -} +import { debounceMessageReceiver, getOsCpuString } from '../helpers'; +import type { WorkerCommandReceivedHandlerOptions } from './types'; export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHandlerOptions) { return async (channel: string, messageString: string) => { @@ -45,11 +38,12 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa payload: { workerId: options.queueModeId, runningJobs: options.getRunningJobIds(), + runningJobsSummary: options.getRunningJobsSummary(), freeMem: os.freemem(), totalMem: os.totalmem(), uptime: process.uptime(), loadAvg: os.loadavg(), - cpus: os.cpus().map((cpu) => `${cpu.model} - speed: ${cpu.speed}`), + cpus: getOsCpuString(), arch: os.arch(), platform: os.platform(), hostname: os.hostname(), diff --git a/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts b/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts index 111bdb97de9ba..cf866e06ecc14 100644 --- a/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts +++ b/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts @@ -1,7 +1,7 @@ import { Service } from 'typedi'; import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; -import type { WorkerCommandReceivedHandlerOptions } from './handleCommandMessageWorker'; import { getWorkerCommandReceivedHandler } from './handleCommandMessageWorker'; +import type { WorkerCommandReceivedHandlerOptions } from './types'; @Service() export class OrchestrationHandlerWorkerService extends OrchestrationHandlerService { diff --git a/packages/cli/src/services/orchestration/worker/types.ts b/packages/cli/src/services/orchestration/worker/types.ts new file mode 100644 index 0000000000000..d95a3c5da3065 --- /dev/null +++ b/packages/cli/src/services/orchestration/worker/types.ts @@ -0,0 +1,21 @@ +import type { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow'; +import type { RedisServicePubSubPublisher } from '../../redis/RedisServicePubSubPublisher'; + +export interface WorkerCommandReceivedHandlerOptions { + queueModeId: string; + instanceId: string; + redisPublisher: RedisServicePubSubPublisher; + getRunningJobIds: () => string[]; + getRunningJobsSummary: () => WorkerJobStatusSummary[]; +} + +export interface WorkerJobStatusSummary { + jobId: string; + executionId: string; + retryOf?: string; + startedAt: Date; + mode: WorkflowExecuteMode; + workflowName: string; + workflowId: string; + status: ExecutionStatus; +} diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index 602b5646f51bd..8423c3a0f749f 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -1,3 +1,5 @@ +import type { WorkerJobStatusSummary } from '../orchestration/worker/types'; + export type RedisServiceCommand = | 'getStatus' | 'getId' @@ -29,11 +31,12 @@ export type RedisServiceWorkerResponseObject = { payload: { workerId: string; runningJobs: string[]; + runningJobsSummary: WorkerJobStatusSummary[]; freeMem: number; totalMem: number; uptime: number; loadAvg: number[]; - cpus: string[]; + cpus: string; arch: string; platform: NodeJS.Platform; hostname: string;