Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Add Job Summary to Worker response #7360

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import { EventMessageGeneric } from '@/eventbus/EventMessageClasses/EventMessage
import { IConfig } from '@oclif/config';
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
import type { WorkerJobStatusSummary } from '../services/orchestration/worker/types';

export class Worker extends BaseCommand {
static description = '\nStarts a n8n worker';
Expand All @@ -56,6 +57,10 @@ export class Worker extends BaseCommand {
[key: string]: PCancelable<IRun>;
} = {};

static runningJobsSummary: {
[jobId: string]: WorkerJobStatusSummary;
} = {};

static jobQueue: JobQueue;

redisSubscriber: RedisServicePubSubSubscriber;
Expand Down Expand Up @@ -232,11 +237,22 @@ export class Worker extends BaseCommand {
}

Worker.runningJobs[job.id] = workflowRun;
Worker.runningJobsSummary[job.id] = {
jobId: job.id.toString(),
executionId,
workflowId: fullExecutionData.workflowId ?? '',
workflowName: fullExecutionData.workflowData.name,
mode: fullExecutionData.mode,
ivov marked this conversation as resolved.
Show resolved Hide resolved
startedAt: fullExecutionData.startedAt,
retryOf: fullExecutionData.retryOf ?? '',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
retryOf: fullExecutionData.retryOf ?? '',
retryOf: fullExecutionData.retryOf ?? null,

status: fullExecutionData.status,
};

// Wait till the execution is finished
await workflowRun;

delete Worker.runningJobs[job.id];
delete Worker.runningJobsSummary[job.id];

// do NOT call workflowExecuteAfter hook here, since it is being called from processSuccessExecution()
// already!
Expand Down Expand Up @@ -305,6 +321,7 @@ export class Worker extends BaseCommand {
instanceId: this.instanceId,
redisPublisher: Container.get(OrchestrationWorkerService).redisPublisher,
getRunningJobIds: () => Object.keys(Worker.runningJobs),
getRunningJobsSummary: () => Object.values(Worker.runningJobsSummary),
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Container from 'typedi';
import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/handleCommandMessageWorker';
import { RedisService } from './redis.service';
import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/types';

export abstract class OrchestrationHandlerService {
protected initialized = false;
Expand Down
7 changes: 7 additions & 0 deletions packages/cli/src/services/orchestration/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { LoggerProxy, jsonParse } from 'n8n-workflow';
import type { RedisServiceCommandObject } from '../redis/RedisServiceCommands';
import { COMMAND_REDIS_CHANNEL } from '../redis/RedisServiceHelper';
import * as os from 'os';

export interface RedisServiceCommandLastReceived {
[date: string]: Date;
Expand Down Expand Up @@ -31,3 +32,9 @@ export function debounceMessageReceiver(message: RedisServiceCommandObject, time
lastReceived[message.command] = now;
return true;
}

export function getOsCpuString(): string {
const cpus = os.cpus();
if (cpus.length === 0) return 'no CPU info';
return `${cpus.length}x ${cpus[0].model} - speed: ${cpus[0].speed}`;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ export async function handleWorkerResponseMessageMain(messageString: string) {
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(messageString);
if (workerResponse) {
// TODO: Handle worker response
LoggerProxy.debug('Received worker response', workerResponse);
LoggerProxy.debug(
`Received worker response ${workerResponse.command} from ${workerResponse.workerId}`,
);
}
return workerResponse;
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
import { jsonParse, LoggerProxy } from 'n8n-workflow';
import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands';
import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper';
import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
import * as os from 'os';
import Container from 'typedi';
import { License } from '@/License';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import { debounceMessageReceiver } from '../helpers';

export interface WorkerCommandReceivedHandlerOptions {
queueModeId: string;
instanceId: string;
redisPublisher: RedisServicePubSubPublisher;
getRunningJobIds: () => string[];
}
import { debounceMessageReceiver, getOsCpuString } from '../helpers';
import type { WorkerCommandReceivedHandlerOptions } from './types';

export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHandlerOptions) {
return async (channel: string, messageString: string) => {
Expand Down Expand Up @@ -45,11 +38,12 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
payload: {
workerId: options.queueModeId,
runningJobs: options.getRunningJobIds(),
runningJobsSummary: options.getRunningJobsSummary(),
freeMem: os.freemem(),
totalMem: os.totalmem(),
uptime: process.uptime(),
loadAvg: os.loadavg(),
cpus: os.cpus().map((cpu) => `${cpu.model} - speed: ${cpu.speed}`),
cpus: getOsCpuString(),
arch: os.arch(),
platform: os.platform(),
hostname: os.hostname(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Service } from 'typedi';
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
import type { WorkerCommandReceivedHandlerOptions } from './handleCommandMessageWorker';
import { getWorkerCommandReceivedHandler } from './handleCommandMessageWorker';
import type { WorkerCommandReceivedHandlerOptions } from './types';

@Service()
export class OrchestrationHandlerWorkerService extends OrchestrationHandlerService {
Expand Down
21 changes: 21 additions & 0 deletions packages/cli/src/services/orchestration/worker/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import type { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow';
import type { RedisServicePubSubPublisher } from '../../redis/RedisServicePubSubPublisher';

export interface WorkerCommandReceivedHandlerOptions {
queueModeId: string;
instanceId: string;
redisPublisher: RedisServicePubSubPublisher;
getRunningJobIds: () => string[];
getRunningJobsSummary: () => WorkerJobStatusSummary[];
}

export interface WorkerJobStatusSummary {
jobId: string;
executionId: string;
retryOf?: string;
startedAt: Date;
mode: WorkflowExecuteMode;
workflowName: string;
workflowId: string;
status: ExecutionStatus;
}
5 changes: 4 additions & 1 deletion packages/cli/src/services/redis/RedisServiceCommands.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { WorkerJobStatusSummary } from '../orchestration/worker/types';

export type RedisServiceCommand =
| 'getStatus'
| 'getId'
Expand Down Expand Up @@ -29,11 +31,12 @@ export type RedisServiceWorkerResponseObject = {
payload: {
workerId: string;
runningJobs: string[];
runningJobsSummary: WorkerJobStatusSummary[];
freeMem: number;
totalMem: number;
uptime: number;
loadAvg: number[];
cpus: string[];
cpus: string;
arch: string;
platform: NodeJS.Platform;
hostname: string;
Expand Down
Loading