From c920b4464dc9cedab4ecf7706e9b628cdc9f5216 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 15 Oct 2024 10:46:57 +0200 Subject: [PATCH 1/5] refactor(core): Move `queueModeId` as `hostId` to `InstanceSettings` --- packages/cli/src/abstract-server.ts | 6 +--- packages/cli/src/commands/base-command.ts | 13 --------- packages/cli/src/commands/start.ts | 4 +-- packages/cli/src/commands/webhook.ts | 10 ++----- packages/cli/src/commands/worker.ts | 8 +++--- packages/cli/src/config/schema.ts | 5 ---- .../worker-missing-encryption-key.error.ts | 2 +- .../__tests__/publisher.service.test.ts | 20 ++++++------- .../__tests__/subscriber.service.test.ts | 8 +++--- packages/cli/src/scaling/job-processor.ts | 7 +++-- .../src/scaling/pubsub/publisher.service.ts | 4 ++- .../cli/src/scaling/pubsub/pubsub-handler.ts | 3 +- .../src/scaling/pubsub/subscriber.service.ts | 6 ++-- packages/cli/src/scaling/scaling.service.ts | 2 +- .../cli/src/scaling/worker-status.service.ts | 9 ++++-- packages/cli/src/server.ts | 5 ++-- .../__tests__/orchestration.service.test.ts | 5 ---- .../cli/src/services/orchestration.service.ts | 6 +--- .../orchestration/main/multi-main-setup.ee.ts | 18 ++++++------ .../src/services/orchestration/main/types.ts | 2 +- .../services/orchestration/worker/types.ts | 2 +- packages/cli/src/webhooks/webhook-server.ts | 6 +--- .../integration/commands/worker.cmd.test.ts | 6 ++-- packages/core/package.json | 3 +- packages/core/src/InstanceSettings.ts | 21 +++++++++++++- pnpm-lock.yaml | 28 +++++++++++-------- 26 files changed, 100 insertions(+), 109 deletions(-) diff --git a/packages/cli/src/abstract-server.ts b/packages/cli/src/abstract-server.ts index 95ecaccdc5dff..4456470b865ef 100644 --- a/packages/cli/src/abstract-server.ts +++ b/packages/cli/src/abstract-server.ts @@ -5,7 +5,6 @@ import { engine as expressHandlebars } from 'express-handlebars'; import { readFile } from 'fs/promises'; import type { Server } from 'http'; import isbot from 'isbot'; -import type { InstanceType } from 'n8n-core'; import { Container, Service } from 'typedi'; import config from '@/config'; @@ -22,7 +21,6 @@ import { TestWebhooks } from '@/webhooks/test-webhooks'; import { WaitingWebhooks } from '@/webhooks/waiting-webhooks'; import { createWebhookHandlerFor } from '@/webhooks/webhook-request-handler'; -import { generateHostInstanceId } from './databases/utils/generators'; import { ServiceUnavailableError } from './errors/response-errors/service-unavailable.error'; @Service() @@ -61,7 +59,7 @@ export abstract class AbstractServer { readonly uniqueInstanceId: string; - constructor(instanceType: Exclude) { + constructor() { this.app = express(); this.app.disable('x-powered-by'); @@ -85,8 +83,6 @@ export abstract class AbstractServer { this.endpointWebhookTest = this.globalConfig.endpoints.webhookTest; this.endpointWebhookWaiting = this.globalConfig.endpoints.webhookWaiting; - this.uniqueInstanceId = generateHostInstanceId(instanceType); - this.logger = Container.get(Logger); } diff --git a/packages/cli/src/commands/base-command.ts b/packages/cli/src/commands/base-command.ts index f4d97a6a056d8..303b3cae3ed8d 100644 --- a/packages/cli/src/commands/base-command.ts +++ b/packages/cli/src/commands/base-command.ts @@ -19,7 +19,6 @@ import type { AbstractServer } from '@/abstract-server'; import config from '@/config'; import { LICENSE_FEATURES, inDevelopment, inTest } from '@/constants'; import * as CrashJournal from '@/crash-journal'; -import { generateHostInstanceId } from '@/databases/utils/generators'; import * as Db from '@/db'; import { getDataDeduplicationService } from '@/deduplication'; import { initErrorHandling } from '@/error-reporting'; @@ -45,8 +44,6 @@ export abstract class BaseCommand extends Command { protected instanceSettings: InstanceSettings = Container.get(InstanceSettings); - queueModeId: string; - protected server?: AbstractServer; protected shutdownService: ShutdownService = Container.get(ShutdownService); @@ -133,16 +130,6 @@ export abstract class BaseCommand extends Command { await Container.get(TelemetryEventRelay).init(); } - protected setInstanceQueueModeId() { - if (config.get('redis.queueModeId')) { - this.queueModeId = config.get('redis.queueModeId'); - return; - } - // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion - this.queueModeId = generateHostInstanceId(this.instanceSettings.instanceType!); - config.set('redis.queueModeId', this.queueModeId); - } - protected async stopProcess() { // This needs to be overridden } diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 21e277a5dc9a0..49f7d8f9aeae7 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -72,7 +72,7 @@ export class Start extends BaseCommand { constructor(argv: string[], cmdConfig: Config) { super(argv, cmdConfig); - this.setInstanceQueueModeId(); + this.instanceSettings.setHostId(); } /** @@ -176,7 +176,7 @@ export class Start extends BaseCommand { if (config.getEnv('executions.mode') === 'queue') { const scopedLogger = this.logger.withScope('scaling'); scopedLogger.debug('Starting main instance in scaling mode'); - scopedLogger.debug(`Host ID: ${this.queueModeId}`); + scopedLogger.debug(`Host ID: ${this.instanceSettings.hostId}`); } const { flags } = await this.parse(Start); diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index 8c601c7ebcb97..b9eaf1f2440eb 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -26,10 +26,7 @@ export class Webhook extends BaseCommand { constructor(argv: string[], cmdConfig: Config) { super(argv, cmdConfig); - if (this.queueModeId) { - this.logger.debug(`Webhook Instance queue mode id: ${this.queueModeId}`); - } - this.setInstanceQueueModeId(); + this.instanceSettings.setHostId(); } /** @@ -71,8 +68,8 @@ export class Webhook extends BaseCommand { await this.initCrashJournal(); this.logger.debug('Crash journal initialized'); - this.logger.info('Initializing n8n webhook process'); - this.logger.debug(`Queue mode id: ${this.queueModeId}`); + this.logger.info('Starting n8n webhook process...'); + this.logger.debug(`Host ID: ${this.instanceSettings.hostId}`); await super.init(); @@ -100,7 +97,6 @@ export class Webhook extends BaseCommand { const { ScalingService } = await import('@/scaling/scaling.service'); await Container.get(ScalingService).setupQueue(); await this.server.start(); - this.logger.debug(`Webhook listener ID: ${this.server.uniqueInstanceId}`); this.logger.info('Webhook listener waiting for requests.'); // Make sure that the process does not close diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index c86cf0936fa00..f6cdd32312c48 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -69,7 +69,7 @@ export class Worker extends BaseCommand { this.logger = Container.get(Logger).withScope('scaling'); - this.setInstanceQueueModeId(); + this.instanceSettings.setHostId(); } async init() { @@ -84,7 +84,7 @@ export class Worker extends BaseCommand { await this.initCrashJournal(); this.logger.debug('Starting n8n worker...'); - this.logger.debug(`Host ID: ${this.queueModeId}`); + this.logger.debug(`Host ID: ${this.instanceSettings.hostId}`); await this.setConcurrency(); await super.init(); @@ -109,7 +109,7 @@ export class Worker extends BaseCommand { new EventMessageGeneric({ eventName: 'n8n.worker.started', payload: { - workerId: this.queueModeId, + workerId: this.instanceSettings.hostId, }, }), ); @@ -117,7 +117,7 @@ export class Worker extends BaseCommand { async initEventBus() { await Container.get(MessageEventBus).initialize({ - workerId: this.queueModeId, + workerId: this.instanceSettings.hostId, }); Container.get(LogStreamingEventRelay).init(); } diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 047df9341eb6c..d2bb5297d436d 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -491,11 +491,6 @@ export const schema = { default: 'n8n', env: 'N8N_REDIS_KEY_PREFIX', }, - queueModeId: { - doc: 'Unique ID for this n8n instance, is usually set automatically by n8n during startup', - format: String, - default: '', - }, }, /** diff --git a/packages/cli/src/errors/worker-missing-encryption-key.error.ts b/packages/cli/src/errors/worker-missing-encryption-key.error.ts index 88ec11877aa55..29b8dad929a82 100644 --- a/packages/cli/src/errors/worker-missing-encryption-key.error.ts +++ b/packages/cli/src/errors/worker-missing-encryption-key.error.ts @@ -7,7 +7,7 @@ export class WorkerMissingEncryptionKey extends ApplicationError { 'Failed to start worker because of missing encryption key.', 'Please set the `N8N_ENCRYPTION_KEY` env var when starting the worker.', 'See: https://docs.n8n.io/hosting/configuration/configuration-examples/encryption-key/', - ].join(''), + ].join(' '), { level: 'warning' }, ); } diff --git a/packages/cli/src/scaling/__tests__/publisher.service.test.ts b/packages/cli/src/scaling/__tests__/publisher.service.test.ts index af8ff9f0c174f..f77b6b5d5afbe 100644 --- a/packages/cli/src/scaling/__tests__/publisher.service.test.ts +++ b/packages/cli/src/scaling/__tests__/publisher.service.test.ts @@ -1,8 +1,8 @@ import type { Redis as SingleNodeClient } from 'ioredis'; import { mock } from 'jest-mock-extended'; +import type { InstanceSettings } from 'n8n-core'; import config from '@/config'; -import { generateNanoId } from '@/databases/utils/generators'; import type { RedisClientService } from '@/services/redis-client.service'; import { mockLogger } from '@test/mocking'; @@ -10,28 +10,26 @@ import { Publisher } from '../pubsub/publisher.service'; import type { PubSub } from '../pubsub/pubsub.types'; describe('Publisher', () => { - let queueModeId: string; - beforeEach(() => { config.set('executions.mode', 'queue'); - queueModeId = generateNanoId(); - config.set('redis.queueModeId', queueModeId); }); const client = mock(); const logger = mockLogger(); + const hostId = 'main-bnxa1riryKUNHtln'; + const instanceSettings = mock({ hostId }); const redisClientService = mock({ createClient: () => client }); describe('constructor', () => { it('should init Redis client in scaling mode', () => { - const publisher = new Publisher(logger, redisClientService); + const publisher = new Publisher(logger, redisClientService, instanceSettings); expect(publisher.getClient()).toEqual(client); }); it('should not init Redis client in regular mode', () => { config.set('executions.mode', 'regular'); - const publisher = new Publisher(logger, redisClientService); + const publisher = new Publisher(logger, redisClientService, instanceSettings); expect(publisher.getClient()).toBeUndefined(); }); @@ -39,7 +37,7 @@ describe('Publisher', () => { describe('shutdown', () => { it('should disconnect Redis client', () => { - const publisher = new Publisher(logger, redisClientService); + const publisher = new Publisher(logger, redisClientService, instanceSettings); publisher.shutdown(); expect(client.disconnect).toHaveBeenCalled(); }); @@ -47,21 +45,21 @@ describe('Publisher', () => { describe('publishCommand', () => { it('should publish command into `n8n.commands` pubsub channel', async () => { - const publisher = new Publisher(logger, redisClientService); + const publisher = new Publisher(logger, redisClientService, instanceSettings); const msg = mock({ command: 'reload-license' }); await publisher.publishCommand(msg); expect(client.publish).toHaveBeenCalledWith( 'n8n.commands', - JSON.stringify({ ...msg, senderId: queueModeId, selfSend: false, debounce: true }), + JSON.stringify({ ...msg, senderId: hostId, selfSend: false, debounce: true }), ); }); }); describe('publishWorkerResponse', () => { it('should publish worker response into `n8n.worker-response` pubsub channel', async () => { - const publisher = new Publisher(logger, redisClientService); + const publisher = new Publisher(logger, redisClientService, instanceSettings); const msg = mock({ response: 'response-to-get-worker-status', }); diff --git a/packages/cli/src/scaling/__tests__/subscriber.service.test.ts b/packages/cli/src/scaling/__tests__/subscriber.service.test.ts index 62834dba33e2e..4f97208b99fe9 100644 --- a/packages/cli/src/scaling/__tests__/subscriber.service.test.ts +++ b/packages/cli/src/scaling/__tests__/subscriber.service.test.ts @@ -17,14 +17,14 @@ describe('Subscriber', () => { describe('constructor', () => { it('should init Redis client in scaling mode', () => { - const subscriber = new Subscriber(mock(), redisClientService, mock()); + const subscriber = new Subscriber(mock(), redisClientService, mock(), mock()); expect(subscriber.getClient()).toEqual(client); }); it('should not init Redis client in regular mode', () => { config.set('executions.mode', 'regular'); - const subscriber = new Subscriber(mock(), redisClientService, mock()); + const subscriber = new Subscriber(mock(), redisClientService, mock(), mock()); expect(subscriber.getClient()).toBeUndefined(); }); @@ -32,7 +32,7 @@ describe('Subscriber', () => { describe('shutdown', () => { it('should disconnect Redis client', () => { - const subscriber = new Subscriber(mock(), redisClientService, mock()); + const subscriber = new Subscriber(mock(), redisClientService, mock(), mock()); subscriber.shutdown(); expect(client.disconnect).toHaveBeenCalled(); }); @@ -40,7 +40,7 @@ describe('Subscriber', () => { describe('subscribe', () => { it('should subscribe to pubsub channel', async () => { - const subscriber = new Subscriber(mock(), redisClientService, mock()); + const subscriber = new Subscriber(mock(), redisClientService, mock(), mock()); await subscriber.subscribe('n8n.commands'); diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts index e11395002be75..1322beac27ff6 100644 --- a/packages/cli/src/scaling/job-processor.ts +++ b/packages/cli/src/scaling/job-processor.ts @@ -1,5 +1,5 @@ import type { RunningJobSummary } from '@n8n/api-types'; -import { WorkflowExecute } from 'n8n-core'; +import { InstanceSettings, WorkflowExecute } from 'n8n-core'; import { BINARY_ENCODING, ApplicationError, Workflow } from 'n8n-workflow'; import type { ExecutionStatus, IExecuteResponsePromiseData, IRun } from 'n8n-workflow'; import type PCancelable from 'p-cancelable'; @@ -33,6 +33,7 @@ export class JobProcessor { private readonly executionRepository: ExecutionRepository, private readonly workflowRepository: WorkflowRepository, private readonly nodeTypes: NodeTypes, + private readonly instanceSettings: InstanceSettings, ) { this.logger = this.logger.withScope('scaling'); } @@ -120,7 +121,7 @@ export class JobProcessor { kind: 'respond-to-webhook', executionId, response: this.encodeWebhookResponse(response), - workerId: config.getEnv('redis.queueModeId'), + workerId: this.instanceSettings.hostId, }; await job.progress(msg); @@ -173,7 +174,7 @@ export class JobProcessor { const msg: JobFinishedMessage = { kind: 'job-finished', executionId, - workerId: config.getEnv('redis.queueModeId'), + workerId: this.instanceSettings.hostId, }; await job.progress(msg); diff --git a/packages/cli/src/scaling/pubsub/publisher.service.ts b/packages/cli/src/scaling/pubsub/publisher.service.ts index cc25304e2c2dc..06a876accf5b2 100644 --- a/packages/cli/src/scaling/pubsub/publisher.service.ts +++ b/packages/cli/src/scaling/pubsub/publisher.service.ts @@ -1,4 +1,5 @@ import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis'; +import { InstanceSettings } from 'n8n-core'; import { Service } from 'typedi'; import config from '@/config'; @@ -20,6 +21,7 @@ export class Publisher { constructor( private readonly logger: Logger, private readonly redisClientService: RedisClientService, + private readonly instanceSettings: InstanceSettings, ) { // @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead. if (config.getEnv('executions.mode') !== 'queue') return; @@ -48,7 +50,7 @@ export class Publisher { 'n8n.commands', JSON.stringify({ ...msg, - senderId: config.getEnv('redis.queueModeId'), + senderId: this.instanceSettings.hostId, selfSend: SELF_SEND_COMMANDS.has(msg.command), debounce: !IMMEDIATE_COMMANDS.has(msg.command), }), diff --git a/packages/cli/src/scaling/pubsub/pubsub-handler.ts b/packages/cli/src/scaling/pubsub/pubsub-handler.ts index ca590dd2c2d92..deeed5b584dbb 100644 --- a/packages/cli/src/scaling/pubsub/pubsub-handler.ts +++ b/packages/cli/src/scaling/pubsub/pubsub-handler.ts @@ -3,7 +3,6 @@ import { ensureError } from 'n8n-workflow'; import { Service } from 'typedi'; import { ActiveWorkflowManager } from '@/active-workflow-manager'; -import config from '@/config'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { EventService } from '@/events/event.service'; @@ -49,7 +48,7 @@ export class PubSubHandler { ...this.commonHandlers, 'get-worker-status': async () => await this.publisher.publishWorkerResponse({ - senderId: config.getEnv('redis.queueModeId'), + senderId: this.instanceSettings.hostId, response: 'response-to-get-worker-status', payload: this.workerStatusService.generateStatus(), }), diff --git a/packages/cli/src/scaling/pubsub/subscriber.service.ts b/packages/cli/src/scaling/pubsub/subscriber.service.ts index 207c726370862..c2045215e0a91 100644 --- a/packages/cli/src/scaling/pubsub/subscriber.service.ts +++ b/packages/cli/src/scaling/pubsub/subscriber.service.ts @@ -1,5 +1,6 @@ import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis'; import debounce from 'lodash/debounce'; +import { InstanceSettings } from 'n8n-core'; import { jsonParse } from 'n8n-workflow'; import { Service } from 'typedi'; @@ -21,6 +22,7 @@ export class Subscriber { private readonly logger: Logger, private readonly redisClientService: RedisClientService, private readonly eventService: EventService, + private readonly instanceSettings: InstanceSettings, ) { // @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead. if (config.getEnv('executions.mode') !== 'queue') return; @@ -77,12 +79,12 @@ export class Subscriber { return null; } - const queueModeId = config.getEnv('redis.queueModeId'); + const { hostId } = this.instanceSettings; if ( 'command' in msg && !msg.selfSend && - (msg.senderId === queueModeId || (msg.targets && !msg.targets.includes(queueModeId))) + (msg.senderId === hostId || (msg.targets && !msg.targets.includes(hostId))) ) { return null; } diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index 5edf43eeac1f6..f965587263560 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -112,7 +112,7 @@ export class ScalingService { const msg: JobFailedMessage = { kind: 'job-failed', executionId, - workerId: config.getEnv('redis.queueModeId'), + workerId: this.instanceSettings.hostId, errorMsg: error.message, }; diff --git a/packages/cli/src/scaling/worker-status.service.ts b/packages/cli/src/scaling/worker-status.service.ts index 725cbb0ca7695..a50a1b8d2e001 100644 --- a/packages/cli/src/scaling/worker-status.service.ts +++ b/packages/cli/src/scaling/worker-status.service.ts @@ -1,19 +1,22 @@ import type { WorkerStatus } from '@n8n/api-types'; +import { InstanceSettings } from 'n8n-core'; import os from 'node:os'; import { Service } from 'typedi'; -import config from '@/config'; import { N8N_VERSION } from '@/constants'; import { JobProcessor } from './job-processor'; @Service() export class WorkerStatusService { - constructor(private readonly jobProcessor: JobProcessor) {} + constructor( + private readonly jobProcessor: JobProcessor, + private readonly instanceSettings: InstanceSettings, + ) {} generateStatus(): WorkerStatus { return { - senderId: config.getEnv('redis.queueModeId'), + senderId: this.instanceSettings.hostId, runningJobsSummary: this.jobProcessor.getRunningJobsSummary(), freeMem: os.freemem(), totalMem: os.totalmem(), diff --git a/packages/cli/src/server.ts b/packages/cli/src/server.ts index 00971d71a55cc..3cfd93054b5c9 100644 --- a/packages/cli/src/server.ts +++ b/packages/cli/src/server.ts @@ -79,8 +79,9 @@ export class Server extends AbstractServer { private readonly orchestrationService: OrchestrationService, private readonly postHogClient: PostHogClient, private readonly eventService: EventService, + private readonly instanceSettings: InstanceSettings, ) { - super('main'); + super(); this.testWebhooksEnabled = true; this.webhooksEnabled = !this.globalConfig.endpoints.disableProductionWebhooksOnMainProcess; @@ -97,7 +98,7 @@ export class Server extends AbstractServer { this.endpointPresetCredentials = this.globalConfig.credentials.overwrite.endpoint; await super.start(); - this.logger.debug(`Server ID: ${this.uniqueInstanceId}`); + this.logger.debug(`Server ID: ${this.instanceSettings.hostId}`); if (inDevelopment && process.env.N8N_DEV_RELOAD === 'true') { void this.loadNodesAndCredentials.setupHotReload(); diff --git a/packages/cli/src/services/__tests__/orchestration.service.test.ts b/packages/cli/src/services/__tests__/orchestration.service.test.ts index 6c66573047afa..0169462891a8a 100644 --- a/packages/cli/src/services/__tests__/orchestration.service.test.ts +++ b/packages/cli/src/services/__tests__/orchestration.service.test.ts @@ -23,15 +23,11 @@ redisClientService.createClient.mockReturnValue(mockRedisClient); const os = Container.get(OrchestrationService); mockInstance(ActiveWorkflowManager); -let queueModeId: string; - describe('Orchestration Service', () => { mockInstance(Push); mockInstance(ExternalSecretsManager); beforeAll(async () => { - queueModeId = config.get('redis.queueModeId'); - // @ts-expect-error readonly property instanceSettings.instanceType = 'main'; }); @@ -48,7 +44,6 @@ describe('Orchestration Service', () => { await os.init(); // @ts-expect-error Private field expect(os.publisher).toBeDefined(); - expect(queueModeId).toBeDefined(); }); describe('shouldAddWebhooks', () => { diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index a248a144eca4b..64dbd0ddae7fe 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -43,10 +43,6 @@ export class OrchestrationService { return !this.isMultiMainSetupEnabled; } - get instanceId() { - return config.getEnv('redis.queueModeId'); - } - sanityCheck() { return this.isInitialized && config.get('executions.mode') === 'queue'; } @@ -94,7 +90,7 @@ export class OrchestrationService { if (!this.sanityCheck()) return; this.logger.debug( - `[Instance ID ${this.instanceId}] Publishing command "${commandKey}"`, + `[Instance ID ${this.instanceSettings.hostId}] Publishing command "${commandKey}"`, payload, ); diff --git a/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts b/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts index 3b104cc1e3ecf..8fe922d7ce595 100644 --- a/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts +++ b/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts @@ -24,8 +24,8 @@ export class MultiMainSetup extends TypedEmitter { super(); } - get instanceId() { - return config.getEnv('redis.queueModeId'); + get hostId() { + return this.instanceSettings.hostId; } private leaderKey: string; @@ -57,16 +57,16 @@ export class MultiMainSetup extends TypedEmitter { private async checkLeader() { const leaderId = await this.publisher.get(this.leaderKey); - if (leaderId === this.instanceId) { - this.logger.debug(`[Instance ID ${this.instanceId}] Leader is this instance`); + if (leaderId === this.hostId) { + this.logger.debug(`[Instance ID ${this.hostId}] Leader is this instance`); await this.publisher.setExpiration(this.leaderKey, this.leaderKeyTtl); return; } - if (leaderId && leaderId !== this.instanceId) { - this.logger.debug(`[Instance ID ${this.instanceId}] Leader is other instance "${leaderId}"`); + if (leaderId && leaderId !== this.hostId) { + this.logger.debug(`[Instance ID ${this.hostId}] Leader is other instance "${leaderId}"`); if (this.instanceSettings.isLeader) { this.instanceSettings.markAsFollower(); @@ -81,7 +81,7 @@ export class MultiMainSetup extends TypedEmitter { if (!leaderId) { this.logger.debug( - `[Instance ID ${this.instanceId}] Leadership vacant, attempting to become leader...`, + `[Instance ID ${this.hostId}] Leadership vacant, attempting to become leader...`, ); this.instanceSettings.markAsFollower(); @@ -97,10 +97,10 @@ export class MultiMainSetup extends TypedEmitter { private async tryBecomeLeader() { // this can only succeed if leadership is currently vacant - const keySetSuccessfully = await this.publisher.setIfNotExists(this.leaderKey, this.instanceId); + const keySetSuccessfully = await this.publisher.setIfNotExists(this.leaderKey, this.hostId); if (keySetSuccessfully) { - this.logger.debug(`[Instance ID ${this.instanceId}] Leader is now this instance`); + this.logger.debug(`[Instance ID ${this.hostId}] Leader is now this instance`); this.instanceSettings.markAsLeader(); diff --git a/packages/cli/src/services/orchestration/main/types.ts b/packages/cli/src/services/orchestration/main/types.ts index 7388a55032fa5..461630d3963a3 100644 --- a/packages/cli/src/services/orchestration/main/types.ts +++ b/packages/cli/src/services/orchestration/main/types.ts @@ -1,6 +1,6 @@ import type { Publisher } from '@/scaling/pubsub/publisher.service'; export type MainResponseReceivedHandlerOptions = { - queueModeId: string; + hostId: string; publisher: Publisher; }; diff --git a/packages/cli/src/services/orchestration/worker/types.ts b/packages/cli/src/services/orchestration/worker/types.ts index d821a194b2b06..afe73622105e0 100644 --- a/packages/cli/src/services/orchestration/worker/types.ts +++ b/packages/cli/src/services/orchestration/worker/types.ts @@ -3,7 +3,7 @@ import type { RunningJobSummary } from '@n8n/api-types'; import type { Publisher } from '@/scaling/pubsub/publisher.service'; export interface WorkerCommandReceivedHandlerOptions { - queueModeId: string; + hostId: string; publisher: Publisher; getRunningJobIds: () => Array; getRunningJobsSummary: () => RunningJobSummary[]; diff --git a/packages/cli/src/webhooks/webhook-server.ts b/packages/cli/src/webhooks/webhook-server.ts index d54f39f2cf642..263375325b418 100644 --- a/packages/cli/src/webhooks/webhook-server.ts +++ b/packages/cli/src/webhooks/webhook-server.ts @@ -3,8 +3,4 @@ import { Service } from 'typedi'; import { AbstractServer } from '@/abstract-server'; @Service() -export class WebhookServer extends AbstractServer { - constructor() { - super('webhook'); - } -} +export class WebhookServer extends AbstractServer {} diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index 1840a9b17059b..8555b792e395a 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -41,10 +41,8 @@ const command = setupTestCommand(Worker); test('worker initializes all its components', async () => { config.set('executions.mode', 'regular'); // should be overridden - const worker = await command.run(); - expect(worker.queueModeId).toBeDefined(); - expect(worker.queueModeId).toContain('worker'); - expect(worker.queueModeId.length).toBeGreaterThan(15); + await command.run(); + expect(license.init).toHaveBeenCalledTimes(1); expect(binaryDataService.init).toHaveBeenCalledTimes(1); expect(externalHooks.init).toHaveBeenCalledTimes(1); diff --git a/packages/core/package.json b/packages/core/package.json index 95cf23efa676d..aec9b34891a0b 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -36,6 +36,7 @@ "@types/xml2js": "catalog:" }, "dependencies": { + "@langchain/core": "catalog:", "@n8n/client-oauth2": "workspace:*", "aws4": "1.11.0", "axios": "catalog:", @@ -45,10 +46,10 @@ "file-type": "16.5.4", "form-data": "catalog:", "lodash": "catalog:", - "@langchain/core": "catalog:", "luxon": "catalog:", "mime-types": "2.1.35", "n8n-workflow": "workspace:*", + "nanoid": "catalog:", "oauth-1.0a": "2.2.6", "p-cancelable": "2.1.1", "pretty-bytes": "5.6.0", diff --git a/packages/core/src/InstanceSettings.ts b/packages/core/src/InstanceSettings.ts index 17ccf15def546..7e831ac8d2fb7 100644 --- a/packages/core/src/InstanceSettings.ts +++ b/packages/core/src/InstanceSettings.ts @@ -1,9 +1,13 @@ import { createHash, randomBytes } from 'crypto'; import { existsSync, mkdirSync, readFileSync, writeFileSync } from 'fs'; -import { ApplicationError, jsonParse } from 'n8n-workflow'; +import { ApplicationError, jsonParse, ALPHABET } from 'n8n-workflow'; +import { customAlphabet } from 'nanoid'; +import { strict } from 'node:assert'; import path from 'path'; import { Service } from 'typedi'; +const nanoid = customAlphabet(ALPHABET, 16); + interface ReadOnlySettings { encryptionKey: string; } @@ -61,6 +65,21 @@ export class InstanceSettings { */ instanceRole: InstanceRole = 'unset'; + /** + * ID of this n8n instance in the context of scaling mode. + * + * @example 'main-bnxa1riryKUNHtln' + * @example 'worker-nDJR0FnSd2Vf6DB5' + * @example 'webhook-jxQ7AO8IzxEtfW1F' + */ + hostId = ''; + + setHostId() { + strict(this.hostId === '', 'Expected `hostId` to be unset'); + + this.hostId = `${this.instanceType}-${nanoid()}`; + } + get isLeader() { return this.instanceRole === 'leader'; } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 674809114822d..10528f347baac 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1112,6 +1112,9 @@ importers: n8n-workflow: specifier: workspace:* version: link:../workflow + nanoid: + specifier: 'catalog:' + version: 3.3.6 oauth-1.0a: specifier: 2.2.6 version: 2.2.6 @@ -2206,7 +2209,7 @@ packages: '@azure/core-http@3.0.4': resolution: {integrity: sha512-Fok9VVhMdxAFOtqiiAtg74fL0UJkt0z3D+ouUUxcRLzZNBioPRAMJFVxiWoJljYpXsRi4GDQHzQHDc9AiYaIUQ==} engines: {node: '>=14.0.0'} - deprecated: This package is no longer supported. Please migrate to use @azure/core-rest-pipeline + deprecated: deprecating as we migrated to core v2 '@azure/core-lro@2.4.0': resolution: {integrity: sha512-F65+rYkll1dpw3RGm8/SSiSj+/QkMeYDanzS/QKlM1dmuneVyXbO46C88V1MRHluLGdMP6qfD3vDRYALn0z0tQ==} @@ -5769,6 +5772,9 @@ packages: axios-retry@3.7.0: resolution: {integrity: sha512-ZTnCkJbRtfScvwiRnoVskFAfvU0UG3xNcsjwTR0mawSbIJoothxn67gKsMaNAFHRXJ1RmuLhmZBzvyXi3+9WyQ==} + axios@1.7.3: + resolution: {integrity: sha512-Ar7ND9pU99eJ9GpoGQKhKf58GpUOgnzuaB7ueNQ5BMi0p+LZ5oaEnfF999fAArcTIBwXTCHAmGcHOZJaWPq9Nw==} + axios@1.7.4: resolution: {integrity: sha512-DukmaFRnY6AzAALSH4J2M3k6PkaC+MfaAGdEERRWcC9q3/TWQwLpHR8ZRLKTdQ3aBDL64EdluRDjJqKw+BPZEw==} @@ -14925,7 +14931,7 @@ snapshots: '@n8n/localtunnel@3.0.0': dependencies: - axios: 1.7.7(debug@4.3.6) + axios: 1.7.3(debug@4.3.6) debug: 4.3.6(supports-color@8.1.1) transitivePeerDependencies: - supports-color @@ -17630,7 +17636,7 @@ snapshots: '@babel/runtime': 7.24.7 is-retry-allowed: 2.2.0 - axios@1.7.4: + axios@1.7.3(debug@4.3.6): dependencies: follow-redirects: 1.15.6(debug@4.3.6) form-data: 4.0.0 @@ -17638,7 +17644,7 @@ snapshots: transitivePeerDependencies: - debug - axios@1.7.7: + axios@1.7.4: dependencies: follow-redirects: 1.15.6(debug@4.3.6) form-data: 4.0.0 @@ -17646,7 +17652,7 @@ snapshots: transitivePeerDependencies: - debug - axios@1.7.7(debug@4.3.6): + axios@1.7.7: dependencies: follow-redirects: 1.15.6(debug@4.3.6) form-data: 4.0.0 @@ -19293,7 +19299,7 @@ snapshots: eslint-import-resolver-node@0.3.9: dependencies: - debug: 3.2.7(supports-color@8.1.1) + debug: 3.2.7(supports-color@5.5.0) is-core-module: 2.13.1 resolve: 1.22.8 transitivePeerDependencies: @@ -19318,7 +19324,7 @@ snapshots: eslint-module-utils@2.8.0(@typescript-eslint/parser@7.2.0(eslint@8.57.0)(typescript@5.6.2))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.2.0(eslint@8.57.0)(typescript@5.6.2))(eslint-plugin-import@2.29.1)(eslint@8.57.0))(eslint@8.57.0): dependencies: - debug: 3.2.7(supports-color@8.1.1) + debug: 3.2.7(supports-color@5.5.0) optionalDependencies: '@typescript-eslint/parser': 7.2.0(eslint@8.57.0)(typescript@5.6.2) eslint: 8.57.0 @@ -19338,7 +19344,7 @@ snapshots: array.prototype.findlastindex: 1.2.3 array.prototype.flat: 1.3.2 array.prototype.flatmap: 1.3.2 - debug: 3.2.7(supports-color@8.1.1) + debug: 3.2.7(supports-color@5.5.0) doctrine: 2.1.0 eslint: 8.57.0 eslint-import-resolver-node: 0.3.9 @@ -20136,7 +20142,7 @@ snapshots: array-parallel: 0.1.3 array-series: 0.1.5 cross-spawn: 4.0.2 - debug: 3.2.7(supports-color@8.1.1) + debug: 3.2.7(supports-color@5.5.0) transitivePeerDependencies: - supports-color @@ -23039,7 +23045,7 @@ snapshots: pdf-parse@1.1.1: dependencies: - debug: 3.2.7(supports-color@8.1.1) + debug: 3.2.7(supports-color@5.5.0) node-ensure: 0.0.0 transitivePeerDependencies: - supports-color @@ -23868,7 +23874,7 @@ snapshots: rhea@1.0.24: dependencies: - debug: 3.2.7(supports-color@8.1.1) + debug: 3.2.7(supports-color@5.5.0) transitivePeerDependencies: - supports-color From 29c4e8a277711f6d13418a09eed32b8d40e22d00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 15 Oct 2024 12:03:17 +0200 Subject: [PATCH 2/5] Improve docline --- packages/core/src/InstanceSettings.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/packages/core/src/InstanceSettings.ts b/packages/core/src/InstanceSettings.ts index 7e831ac8d2fb7..2aad0559eeb1a 100644 --- a/packages/core/src/InstanceSettings.ts +++ b/packages/core/src/InstanceSettings.ts @@ -44,6 +44,12 @@ export class InstanceSettings { private settings = this.loadOrCreate(); + /** + * Fixed ID of this n8n instance, for telemetry. + * Derived from encryption key. Do not confuse with `hostId`. + * + * @example '258fce876abf5ea60eb86a2e777e5e190ff8f3e36b5b37aafec6636c31d4d1f9' + */ readonly instanceId = this.generateInstanceId(); readonly instanceType: InstanceType; @@ -66,7 +72,8 @@ export class InstanceSettings { instanceRole: InstanceRole = 'unset'; /** - * ID of this n8n instance in the context of scaling mode. + * Transient ID of this n8n instance, for scaling mode. + * Reset on restart. Do not confuse with `instanceId`. * * @example 'main-bnxa1riryKUNHtln' * @example 'worker-nDJR0FnSd2Vf6DB5' From 4e5cc32479dd1bf61e50d42fd9dc59ed5dbd653a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 15 Oct 2024 12:21:36 +0200 Subject: [PATCH 3/5] Simplify further by removing `setHostId` --- packages/cli/src/commands/start.ts | 7 +------ packages/cli/src/commands/webhook.ts | 7 +------ packages/cli/src/commands/worker.ts | 2 -- packages/core/src/InstanceSettings.ts | 11 +++-------- 4 files changed, 5 insertions(+), 22 deletions(-) diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 49f7d8f9aeae7..a416878a1e175 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -1,6 +1,6 @@ /* eslint-disable @typescript-eslint/no-unsafe-call */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ -import { Flags, type Config } from '@oclif/core'; +import { Flags } from '@oclif/core'; import glob from 'fast-glob'; import { createReadStream, createWriteStream, existsSync } from 'fs'; import { mkdir } from 'fs/promises'; @@ -70,11 +70,6 @@ export class Start extends BaseCommand { override needsCommunityPackages = true; - constructor(argv: string[], cmdConfig: Config) { - super(argv, cmdConfig); - this.instanceSettings.setHostId(); - } - /** * Opens the UI in browser */ diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index b9eaf1f2440eb..43a9703087dcc 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -1,4 +1,4 @@ -import { Flags, type Config } from '@oclif/core'; +import { Flags } from '@oclif/core'; import { ApplicationError } from 'n8n-workflow'; import { Container } from 'typedi'; @@ -24,11 +24,6 @@ export class Webhook extends BaseCommand { override needsCommunityPackages = true; - constructor(argv: string[], cmdConfig: Config) { - super(argv, cmdConfig); - this.instanceSettings.setHostId(); - } - /** * Stops n8n in a graceful way. * Make for example sure that all the webhooks from third party services diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index f6cdd32312c48..24916d71783ac 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -68,8 +68,6 @@ export class Worker extends BaseCommand { super(argv, cmdConfig); this.logger = Container.get(Logger).withScope('scaling'); - - this.instanceSettings.setHostId(); } async init() { diff --git a/packages/core/src/InstanceSettings.ts b/packages/core/src/InstanceSettings.ts index 2aad0559eeb1a..4a050db121c0b 100644 --- a/packages/core/src/InstanceSettings.ts +++ b/packages/core/src/InstanceSettings.ts @@ -2,7 +2,6 @@ import { createHash, randomBytes } from 'crypto'; import { existsSync, mkdirSync, readFileSync, writeFileSync } from 'fs'; import { ApplicationError, jsonParse, ALPHABET } from 'n8n-workflow'; import { customAlphabet } from 'nanoid'; -import { strict } from 'node:assert'; import path from 'path'; import { Service } from 'typedi'; @@ -59,6 +58,8 @@ export class InstanceSettings { this.instanceType = ['webhook', 'worker'].includes(command) ? (command as InstanceType) : 'main'; + + this.hostId = `${this.instanceType}-${nanoid()}`; } /** @@ -79,13 +80,7 @@ export class InstanceSettings { * @example 'worker-nDJR0FnSd2Vf6DB5' * @example 'webhook-jxQ7AO8IzxEtfW1F' */ - hostId = ''; - - setHostId() { - strict(this.hostId === '', 'Expected `hostId` to be unset'); - - this.hostId = `${this.instanceType}-${nanoid()}`; - } + readonly hostId: string; get isLeader() { return this.instanceRole === 'leader'; From e6f038238b0e8ec4214b82b8f8f764ef5378456c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 15 Oct 2024 12:32:51 +0200 Subject: [PATCH 4/5] Add test --- packages/core/test/InstanceSettings.test.ts | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/packages/core/test/InstanceSettings.test.ts b/packages/core/test/InstanceSettings.test.ts index 64b6840f2fa21..7bc572b168c95 100644 --- a/packages/core/test/InstanceSettings.test.ts +++ b/packages/core/test/InstanceSettings.test.ts @@ -69,4 +69,19 @@ describe('InstanceSettings', () => { ); }); }); + + describe('constructor', () => { + it('should generate a `hostId`', () => { + const encryptionKey = 'test_key'; + process.env.N8N_ENCRYPTION_KEY = encryptionKey; + jest.spyOn(fs, 'existsSync').mockReturnValueOnce(true); + jest.spyOn(fs, 'readFileSync').mockReturnValueOnce(JSON.stringify({ encryptionKey })); + + const settings = new InstanceSettings(); + + const [instanceType, nanoid] = settings.hostId.split('-'); + expect(instanceType).toEqual('main'); + expect(nanoid).toHaveLength(16); // e.g. sDX6ZPc0bozv66zM + }); + }); }); From d86bf917f66ec928f8709bf563117401077fa2bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 15 Oct 2024 12:45:31 +0200 Subject: [PATCH 5/5] Remove getter --- .../orchestration/main/multi-main-setup.ee.ts | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts b/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts index 8fe922d7ce595..034a214765911 100644 --- a/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts +++ b/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts @@ -24,10 +24,6 @@ export class MultiMainSetup extends TypedEmitter { super(); } - get hostId() { - return this.instanceSettings.hostId; - } - private leaderKey: string; private readonly leaderKeyTtl = config.getEnv('multiMainSetup.ttl'); @@ -57,16 +53,18 @@ export class MultiMainSetup extends TypedEmitter { private async checkLeader() { const leaderId = await this.publisher.get(this.leaderKey); - if (leaderId === this.hostId) { - this.logger.debug(`[Instance ID ${this.hostId}] Leader is this instance`); + const { hostId } = this.instanceSettings; + + if (leaderId === hostId) { + this.logger.debug(`[Instance ID ${hostId}] Leader is this instance`); await this.publisher.setExpiration(this.leaderKey, this.leaderKeyTtl); return; } - if (leaderId && leaderId !== this.hostId) { - this.logger.debug(`[Instance ID ${this.hostId}] Leader is other instance "${leaderId}"`); + if (leaderId && leaderId !== hostId) { + this.logger.debug(`[Instance ID ${hostId}] Leader is other instance "${leaderId}"`); if (this.instanceSettings.isLeader) { this.instanceSettings.markAsFollower(); @@ -81,7 +79,7 @@ export class MultiMainSetup extends TypedEmitter { if (!leaderId) { this.logger.debug( - `[Instance ID ${this.hostId}] Leadership vacant, attempting to become leader...`, + `[Instance ID ${hostId}] Leadership vacant, attempting to become leader...`, ); this.instanceSettings.markAsFollower(); @@ -96,11 +94,13 @@ export class MultiMainSetup extends TypedEmitter { } private async tryBecomeLeader() { + const { hostId } = this.instanceSettings; + // this can only succeed if leadership is currently vacant - const keySetSuccessfully = await this.publisher.setIfNotExists(this.leaderKey, this.hostId); + const keySetSuccessfully = await this.publisher.setIfNotExists(this.leaderKey, hostId); if (keySetSuccessfully) { - this.logger.debug(`[Instance ID ${this.hostId}] Leader is now this instance`); + this.logger.debug(`[Instance ID ${hostId}] Leader is now this instance`); this.instanceSettings.markAsLeader();