From 0bd905c468b5434cb5959cb4c8a869ff5d706556 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 4 Oct 2024 09:53:24 +0200 Subject: [PATCH 1/6] refactor(core): Simplify worker pubsub message handler --- packages/cli/src/commands/worker.ts | 15 +- .../scaling/__tests__/pubsub-handler.test.ts | 132 +++++++++++++++ .../cli/src/scaling/pubsub/pubsub-handler.ts | 37 +++++ packages/cli/src/scaling/worker-status.ts | 43 +++++ .../worker/handle-command-message-worker.ts | 153 ------------------ .../orchestration.handler.worker.service.ts | 22 --- .../integration/commands/worker.cmd.test.ts | 3 - 7 files changed, 219 insertions(+), 186 deletions(-) create mode 100644 packages/cli/src/scaling/worker-status.ts delete mode 100644 packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts delete mode 100644 packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 8c1aabf74afcf..6345db6763795 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -8,10 +8,10 @@ import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-mess import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay'; import { JobProcessor } from '@/scaling/job-processor'; -import { Publisher } from '@/scaling/pubsub/publisher.service'; +import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; +import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import type { ScalingService } from '@/scaling/scaling.service'; import type { WorkerServerEndpointsConfig } from '@/scaling/worker-server'; -import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service'; import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; import { BaseCommand } from './base-command'; @@ -128,12 +128,11 @@ export class Worker extends BaseCommand { */ async initOrchestration() { await Container.get(OrchestrationWorkerService).init(); - await Container.get(OrchestrationHandlerWorkerService).initWithOptions({ - queueModeId: this.queueModeId, - publisher: Container.get(Publisher), - getRunningJobIds: () => this.jobProcessor.getRunningJobIds(), - getRunningJobsSummary: () => this.jobProcessor.getRunningJobsSummary(), - }); + + Container.get(PubSubHandler).init(); + const subscriber = Container.get(Subscriber); + await subscriber.subscribe('n8n.commands'); + subscriber.setCommandMessageHandler(); } async setConcurrency() { diff --git a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts index c637b2faf9e2d..a90cfd6d52b0e 100644 --- a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts +++ b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts @@ -7,7 +7,9 @@ import type { ExternalSecretsManager } from '@/external-secrets/external-secrets import type { License } from '@/license'; import type { CommunityPackagesService } from '@/services/community-packages.service'; +import type { Publisher } from '../pubsub/publisher.service'; import { PubSubHandler } from '../pubsub/pubsub-handler'; +import type { WorkerStatus } from '../worker-status'; describe('PubSubHandler', () => { const eventService = new EventService(); @@ -15,6 +17,12 @@ describe('PubSubHandler', () => { const eventbus = mock(); const externalSecretsManager = mock(); const communityPackagesService = mock(); + const publisher = mock(); + const workerStatus = mock(); + + afterEach(() => { + eventService.removeAllListeners(); + }); describe('in webhook process', () => { const instanceSettings = mock({ instanceType: 'webhook' }); @@ -30,6 +38,8 @@ describe('PubSubHandler', () => { eventbus, externalSecretsManager, communityPackagesService, + publisher, + workerStatus, ).init(); expect(setupWebhookHandlersSpy).toHaveBeenCalled(); @@ -43,6 +53,8 @@ describe('PubSubHandler', () => { eventbus, externalSecretsManager, communityPackagesService, + publisher, + workerStatus, ).init(); eventService.emit('reload-license'); @@ -58,6 +70,8 @@ describe('PubSubHandler', () => { eventbus, externalSecretsManager, communityPackagesService, + publisher, + workerStatus, ).init(); eventService.emit('restart-event-bus'); @@ -73,6 +87,8 @@ describe('PubSubHandler', () => { eventbus, externalSecretsManager, communityPackagesService, + publisher, + workerStatus, ).init(); eventService.emit('reload-external-secrets-providers'); @@ -88,6 +104,8 @@ describe('PubSubHandler', () => { eventbus, externalSecretsManager, communityPackagesService, + publisher, + workerStatus, ).init(); eventService.emit('community-package-install', { @@ -109,6 +127,8 @@ describe('PubSubHandler', () => { eventbus, externalSecretsManager, communityPackagesService, + publisher, + workerStatus, ).init(); eventService.emit('community-package-update', { @@ -130,6 +150,8 @@ describe('PubSubHandler', () => { eventbus, externalSecretsManager, communityPackagesService, + publisher, + workerStatus, ).init(); eventService.emit('community-package-uninstall', { @@ -139,4 +161,114 @@ describe('PubSubHandler', () => { expect(communityPackagesService.removeNpmPackage).toHaveBeenCalledWith('test-package'); }); }); + + describe('in worker process', () => { + const instanceSettings = mock({ instanceType: 'worker' }); + + it('should set up handlers in worker process', () => { + // @ts-expect-error Spying on private method + const setupWorkerHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupWorkerHandlers'); + + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatus, + ).init(); + + expect(setupWorkerHandlersSpy).toHaveBeenCalled(); + }); + + it('should reload license on `reload-license` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatus, + ).init(); + + eventService.emit('reload-license'); + + expect(license.reload).toHaveBeenCalled(); + }); + + it('should restart event bus on `restart-event-bus` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatus, + ).init(); + + eventService.emit('restart-event-bus'); + + expect(eventbus.restart).toHaveBeenCalled(); + }); + + it('should reload providers on `reload-external-secrets-providers` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatus, + ).init(); + + eventService.emit('reload-external-secrets-providers'); + + expect(externalSecretsManager.reloadAllProviders).toHaveBeenCalled(); + }); + + it('should generate status on `get-worker-status` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatus, + ).init(); + + eventService.emit('get-worker-status'); + + expect(workerStatus.generateStatus).toHaveBeenCalled(); + }); + + it('should get worker ID on `get-worker-id` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatus, + ).init(); + + eventService.emit('get-worker-id'); + + expect(publisher.publishWorkerResponse).toHaveBeenCalledWith({ + workerId: expect.any(String), + command: 'get-worker-id', + }); + }); + }); }); diff --git a/packages/cli/src/scaling/pubsub/pubsub-handler.ts b/packages/cli/src/scaling/pubsub/pubsub-handler.ts index 8b7a91e4ddb09..c2168f44fab6e 100644 --- a/packages/cli/src/scaling/pubsub/pubsub-handler.ts +++ b/packages/cli/src/scaling/pubsub/pubsub-handler.ts @@ -1,13 +1,17 @@ import { InstanceSettings } from 'n8n-core'; import { Service } from 'typedi'; +import config from '@/config'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { EventService } from '@/events/event.service'; import type { PubSubEventMap } from '@/events/maps/pub-sub.event-map'; import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; import { License } from '@/license'; +import { Publisher } from '@/scaling/pubsub/publisher.service'; import { CommunityPackagesService } from '@/services/community-packages.service'; +import { WorkerStatus } from '../worker-status'; + /** * Responsible for handling events emitted from messages received via a pubsub channel. */ @@ -20,10 +24,13 @@ export class PubSubHandler { private readonly eventbus: MessageEventBus, private readonly externalSecretsManager: ExternalSecretsManager, private readonly communityPackagesService: CommunityPackagesService, + private readonly publisher: Publisher, + private readonly workerStatus: WorkerStatus, ) {} init() { if (this.instanceSettings.instanceType === 'webhook') this.setupWebhookHandlers(); + if (this.instanceSettings.instanceType === 'worker') this.setupWorkerHandlers(); } private setupHandlers( @@ -58,4 +65,34 @@ export class PubSubHandler { } // #endregion + + // #region Worker process + + private setupWorkerHandlers() { + this.setupHandlers({ + 'reload-license': async () => await this.license.reload(), + 'restart-event-bus': async () => await this.eventbus.restart(), + 'reload-external-secrets-providers': async () => + await this.externalSecretsManager.reloadAllProviders(), + 'community-package-install': async ({ packageName, packageVersion }) => + await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion), + 'community-package-update': async ({ packageName, packageVersion }) => + await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion), + 'community-package-uninstall': async ({ packageName }) => + await this.communityPackagesService.removeNpmPackage(packageName), + 'get-worker-status': async () => + await this.publisher.publishWorkerResponse({ + workerId: config.getEnv('redis.queueModeId'), + command: 'get-worker-status', + payload: this.workerStatus.generateStatus(), + }), + 'get-worker-id': async () => + await this.publisher.publishWorkerResponse({ + workerId: config.getEnv('redis.queueModeId'), + command: 'get-worker-id', + }), + }); + } + + // #endregion } diff --git a/packages/cli/src/scaling/worker-status.ts b/packages/cli/src/scaling/worker-status.ts new file mode 100644 index 0000000000000..cddccc7e1f47a --- /dev/null +++ b/packages/cli/src/scaling/worker-status.ts @@ -0,0 +1,43 @@ +import os from 'node:os'; +import { Service } from 'typedi'; + +import config from '@/config'; +import { N8N_VERSION } from '@/constants'; + +import { JobProcessor } from './job-processor'; + +@Service() +export class WorkerStatus { + constructor(private readonly jobProcessor: JobProcessor) {} + + generateStatus() { + return { + workerId: config.getEnv('redis.queueModeId'), + runningJobsSummary: this.jobProcessor.getRunningJobsSummary(), + freeMem: os.freemem(), + totalMem: os.totalmem(), + uptime: process.uptime(), + loadAvg: os.loadavg(), + cpus: this.getOsCpuString(), + arch: os.arch(), + platform: os.platform(), + hostname: os.hostname(), + interfaces: Object.values(os.networkInterfaces()).flatMap((interfaces) => + (interfaces ?? [])?.map((net) => ({ + family: net.family, + address: net.address, + internal: net.internal, + })), + ), + version: N8N_VERSION, + }; + } + + private getOsCpuString() { + const cpus = os.cpus(); + + if (cpus.length === 0) return 'no CPU info'; + + return `${cpus.length}x ${cpus[0].model} - speed: ${cpus[0].speed}`; + } +} diff --git a/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts b/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts deleted file mode 100644 index ae11ac96fed71..0000000000000 --- a/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts +++ /dev/null @@ -1,153 +0,0 @@ -import { jsonParse } from 'n8n-workflow'; -import os from 'node:os'; -import Container from 'typedi'; - -import { N8N_VERSION } from '@/constants'; -import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; -import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; -import { License } from '@/license'; -import { Logger } from '@/logging/logger.service'; -import { COMMAND_PUBSUB_CHANNEL } from '@/scaling/constants'; -import type { PubSub } from '@/scaling/pubsub/pubsub.types'; -import { CommunityPackagesService } from '@/services/community-packages.service'; - -import type { WorkerCommandReceivedHandlerOptions } from './types'; -import { debounceMessageReceiver, getOsCpuString } from '../helpers'; - -// eslint-disable-next-line complexity -export async function getWorkerCommandReceivedHandler( - messageString: string, - options: WorkerCommandReceivedHandlerOptions, -) { - if (!messageString) return; - - const logger = Container.get(Logger); - let message: PubSub.Command; - try { - message = jsonParse(messageString); - } catch { - logger.debug( - `Received invalid message via channel ${COMMAND_PUBSUB_CHANNEL}: "${messageString}"`, - ); - return; - } - if (message) { - logger.debug( - `RedisCommandHandler(worker): Received command message ${message.command} from ${message.senderId}`, - ); - if (message.targets && !message.targets.includes(options.queueModeId)) { - return; // early return if the message is not for this worker - } - switch (message.command) { - case 'get-worker-status': - if (!debounceMessageReceiver(message, 500)) return; - await options.publisher.publishWorkerResponse({ - workerId: options.queueModeId, - command: 'get-worker-status', - payload: { - workerId: options.queueModeId, - runningJobsSummary: options.getRunningJobsSummary(), - freeMem: os.freemem(), - totalMem: os.totalmem(), - uptime: process.uptime(), - loadAvg: os.loadavg(), - cpus: getOsCpuString(), - arch: os.arch(), - platform: os.platform(), - hostname: os.hostname(), - interfaces: Object.values(os.networkInterfaces()).flatMap((interfaces) => - (interfaces ?? [])?.map((net) => ({ - family: net.family, - address: net.address, - internal: net.internal, - })), - ), - version: N8N_VERSION, - }, - }); - break; - case 'get-worker-id': - if (!debounceMessageReceiver(message, 500)) return; - await options.publisher.publishWorkerResponse({ - workerId: options.queueModeId, - command: 'get-worker-id', - }); - break; - case 'restart-event-bus': - if (!debounceMessageReceiver(message, 500)) return; - try { - await Container.get(MessageEventBus).restart(); - await options.publisher.publishWorkerResponse({ - workerId: options.queueModeId, - command: 'restart-event-bus', - payload: { - result: 'success', - }, - }); - } catch (error) { - await options.publisher.publishWorkerResponse({ - workerId: options.queueModeId, - command: 'restart-event-bus', - payload: { - result: 'error', - error: (error as Error).message, - }, - }); - } - break; - case 'reload-external-secrets-providers': - if (!debounceMessageReceiver(message, 500)) return; - try { - await Container.get(ExternalSecretsManager).reloadAllProviders(); - await options.publisher.publishWorkerResponse({ - workerId: options.queueModeId, - command: 'reload-external-secrets-providers', - payload: { - result: 'success', - }, - }); - } catch (error) { - await options.publisher.publishWorkerResponse({ - workerId: options.queueModeId, - command: 'reload-external-secrets-providers', - payload: { - result: 'error', - error: (error as Error).message, - }, - }); - } - break; - case 'community-package-install': - case 'community-package-update': - case 'community-package-uninstall': - if (!debounceMessageReceiver(message, 500)) return; - const { packageName } = message.payload; - const communityPackagesService = Container.get(CommunityPackagesService); - if (message.command === 'community-package-uninstall') { - await communityPackagesService.removeNpmPackage(packageName); - } else { - await communityPackagesService.installOrUpdateNpmPackage( - packageName, - message.payload.packageVersion, - ); - } - break; - case 'reload-license': - if (!debounceMessageReceiver(message, 500)) return; - await Container.get(License).reload(); - break; - default: - if ( - message.command === 'relay-execution-lifecycle-event' || - message.command === 'clear-test-webhooks' - ) { - break; // meant only for main - } - - logger.debug( - `Received unknown command via channel ${COMMAND_PUBSUB_CHANNEL}: "${message.command}"`, - ); - break; - } - } -} diff --git a/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts b/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts deleted file mode 100644 index 06113d7344c53..0000000000000 --- a/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { Service } from 'typedi'; - -import { Subscriber } from '@/scaling/pubsub/subscriber.service'; - -import { getWorkerCommandReceivedHandler } from './handle-command-message-worker'; -import type { WorkerCommandReceivedHandlerOptions } from './types'; -import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; - -@Service() -export class OrchestrationHandlerWorkerService extends OrchestrationHandlerService { - constructor(private readonly subscriber: Subscriber) { - super(); - } - - async initSubscriber(options: WorkerCommandReceivedHandlerOptions) { - await this.subscriber.subscribe('n8n.commands'); - - this.subscriber.setMessageHandler('n8n.commands', async (message: string) => { - await getWorkerCommandReceivedHandler(message, options); - }); - } -} diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index 67d3e7bab182c..5e0503ac0f7cf 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -11,7 +11,6 @@ import { ExternalSecretsManager } from '@/external-secrets/external-secrets-mana import { License } from '@/license'; import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials'; import { ScalingService } from '@/scaling/scaling.service'; -import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service'; import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; import { setupTestCommand } from '@test-integration/utils/test-command'; @@ -26,7 +25,6 @@ const externalSecretsManager = mockInstance(ExternalSecretsManager); const license = mockInstance(License, { loadCertStr: async () => '' }); const messageEventBus = mockInstance(MessageEventBus); const logStreamingEventRelay = mockInstance(LogStreamingEventRelay); -const orchestrationHandlerWorkerService = mockInstance(OrchestrationHandlerWorkerService); const scalingService = mockInstance(ScalingService); const orchestrationWorkerService = mockInstance(OrchestrationWorkerService); @@ -47,6 +45,5 @@ test('worker initializes all its components', async () => { expect(scalingService.setupWorker).toHaveBeenCalledTimes(1); expect(logStreamingEventRelay.init).toHaveBeenCalledTimes(1); expect(orchestrationWorkerService.init).toHaveBeenCalledTimes(1); - expect(orchestrationHandlerWorkerService.initWithOptions).toHaveBeenCalledTimes(1); expect(messageEventBus.send).toHaveBeenCalledTimes(1); }); From d46b40591809662b63201e85a868d93c781428d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 4 Oct 2024 10:31:57 +0200 Subject: [PATCH 2/6] Fix order in webhook command --- packages/cli/src/commands/webhook.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index a0f9e10f8010a..5a5d656c8ca9d 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -112,10 +112,9 @@ export class Webhook extends BaseCommand { async initOrchestration() { await Container.get(OrchestrationWebhookService).init(); + Container.get(PubSubHandler).init(); const subscriber = Container.get(Subscriber); await subscriber.subscribe('n8n.commands'); subscriber.setCommandMessageHandler(); - - Container.get(PubSubHandler).init(); } } From 55463757cdaf1b7a0e36b7432f169ec9782f7ed7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 4 Oct 2024 14:49:02 +0200 Subject: [PATCH 3/6] Fix test --- packages/cli/test/integration/commands/worker.cmd.test.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index 72acc9cf68c01..585d64cfb47a7 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -11,6 +11,7 @@ import { ExternalSecretsManager } from '@/external-secrets/external-secrets-mana import { License } from '@/license'; import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials'; import { Publisher } from '@/scaling/pubsub/publisher.service'; +import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { ScalingService } from '@/scaling/scaling.service'; import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; import { setupTestCommand } from '@test-integration/utils/test-command'; @@ -29,6 +30,7 @@ const logStreamingEventRelay = mockInstance(LogStreamingEventRelay); const scalingService = mockInstance(ScalingService); const orchestrationWorkerService = mockInstance(OrchestrationWorkerService); mockInstance(Publisher); +mockInstance(Subscriber); const command = setupTestCommand(Worker); From e6741b7e0a6baa667f125db09f18a01bb934a9b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Mon, 7 Oct 2024 15:24:32 +0200 Subject: [PATCH 4/6] Add exhaustive switch --- packages/cli/src/scaling/pubsub/pubsub-handler.ts | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/packages/cli/src/scaling/pubsub/pubsub-handler.ts b/packages/cli/src/scaling/pubsub/pubsub-handler.ts index c2168f44fab6e..17b85a2a811fb 100644 --- a/packages/cli/src/scaling/pubsub/pubsub-handler.ts +++ b/packages/cli/src/scaling/pubsub/pubsub-handler.ts @@ -1,4 +1,5 @@ import { InstanceSettings } from 'n8n-core'; +import { ApplicationError } from 'n8n-workflow'; import { Service } from 'typedi'; import config from '@/config'; @@ -29,8 +30,18 @@ export class PubSubHandler { ) {} init() { - if (this.instanceSettings.instanceType === 'webhook') this.setupWebhookHandlers(); - if (this.instanceSettings.instanceType === 'worker') this.setupWorkerHandlers(); + switch (this.instanceSettings.instanceType) { + case 'webhook': + this.setupWebhookHandlers(); + break; + case 'worker': + this.setupWorkerHandlers(); + break; + default: + throw new ApplicationError('PubSubHandler found unsupported process type', { + extra: { type: this.instanceSettings.instanceType }, + }); + } } private setupHandlers( From 989c2901a00078b2012f76f9fa3853fee0324d27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Mon, 7 Oct 2024 15:44:46 +0200 Subject: [PATCH 5/6] Make exhaustiveness check at type level --- packages/cli/src/scaling/pubsub/pubsub-handler.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/cli/src/scaling/pubsub/pubsub-handler.ts b/packages/cli/src/scaling/pubsub/pubsub-handler.ts index 17b85a2a811fb..981f65006e666 100644 --- a/packages/cli/src/scaling/pubsub/pubsub-handler.ts +++ b/packages/cli/src/scaling/pubsub/pubsub-handler.ts @@ -1,5 +1,4 @@ import { InstanceSettings } from 'n8n-core'; -import { ApplicationError } from 'n8n-workflow'; import { Service } from 'typedi'; import config from '@/config'; @@ -10,6 +9,7 @@ import { ExternalSecretsManager } from '@/external-secrets/external-secrets-mana import { License } from '@/license'; import { Publisher } from '@/scaling/pubsub/publisher.service'; import { CommunityPackagesService } from '@/services/community-packages.service'; +import { assertNever } from '@/utils'; import { WorkerStatus } from '../worker-status'; @@ -37,10 +37,11 @@ export class PubSubHandler { case 'worker': this.setupWorkerHandlers(); break; + case 'main': + // TODO + break; default: - throw new ApplicationError('PubSubHandler found unsupported process type', { - extra: { type: this.instanceSettings.instanceType }, - }); + assertNever(this.instanceSettings.instanceType); } } From b920fd6977d58c8e075ad1cbec4785e6d611a6d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Mon, 7 Oct 2024 15:52:28 +0200 Subject: [PATCH 6/6] Deduplicate --- .../scaling/__tests__/pubsub-handler.test.ts | 24 ++++- .../cli/src/scaling/pubsub/pubsub-handler.ts | 88 ++++++++----------- 2 files changed, 58 insertions(+), 54 deletions(-) diff --git a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts index a90cfd6d52b0e..0cf8d5ef480a6 100644 --- a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts +++ b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts @@ -29,7 +29,7 @@ describe('PubSubHandler', () => { it('should set up handlers in webhook process', () => { // @ts-expect-error Spying on private method - const setupWebhookHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupWebhookHandlers'); + const setupHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupHandlers'); new PubSubHandler( eventService, @@ -42,7 +42,14 @@ describe('PubSubHandler', () => { workerStatus, ).init(); - expect(setupWebhookHandlersSpy).toHaveBeenCalled(); + expect(setupHandlersSpy).toHaveBeenCalledWith({ + 'reload-license': expect.any(Function), + 'restart-event-bus': expect.any(Function), + 'reload-external-secrets-providers': expect.any(Function), + 'community-package-install': expect.any(Function), + 'community-package-update': expect.any(Function), + 'community-package-uninstall': expect.any(Function), + }); }); it('should reload license on `reload-license` event', () => { @@ -167,7 +174,7 @@ describe('PubSubHandler', () => { it('should set up handlers in worker process', () => { // @ts-expect-error Spying on private method - const setupWorkerHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupWorkerHandlers'); + const setupHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupHandlers'); new PubSubHandler( eventService, @@ -180,7 +187,16 @@ describe('PubSubHandler', () => { workerStatus, ).init(); - expect(setupWorkerHandlersSpy).toHaveBeenCalled(); + expect(setupHandlersSpy).toHaveBeenCalledWith({ + 'reload-license': expect.any(Function), + 'restart-event-bus': expect.any(Function), + 'reload-external-secrets-providers': expect.any(Function), + 'community-package-install': expect.any(Function), + 'community-package-update': expect.any(Function), + 'community-package-uninstall': expect.any(Function), + 'get-worker-status': expect.any(Function), + 'get-worker-id': expect.any(Function), + }); }); it('should reload license on `reload-license` event', () => { diff --git a/packages/cli/src/scaling/pubsub/pubsub-handler.ts b/packages/cli/src/scaling/pubsub/pubsub-handler.ts index 981f65006e666..267cae6977dc2 100644 --- a/packages/cli/src/scaling/pubsub/pubsub-handler.ts +++ b/packages/cli/src/scaling/pubsub/pubsub-handler.ts @@ -32,10 +32,23 @@ export class PubSubHandler { init() { switch (this.instanceSettings.instanceType) { case 'webhook': - this.setupWebhookHandlers(); + this.setupHandlers(this.commonHandlers); break; case 'worker': - this.setupWorkerHandlers(); + this.setupHandlers({ + ...this.commonHandlers, + 'get-worker-status': async () => + await this.publisher.publishWorkerResponse({ + workerId: config.getEnv('redis.queueModeId'), + command: 'get-worker-status', + payload: this.workerStatus.generateStatus(), + }), + 'get-worker-id': async () => + await this.publisher.publishWorkerResponse({ + workerId: config.getEnv('redis.queueModeId'), + command: 'get-worker-id', + }), + }); break; case 'main': // TODO @@ -59,52 +72,27 @@ export class PubSubHandler { } } - // #region Webhook process - - private setupWebhookHandlers() { - this.setupHandlers({ - 'reload-license': async () => await this.license.reload(), - 'restart-event-bus': async () => await this.eventbus.restart(), - 'reload-external-secrets-providers': async () => - await this.externalSecretsManager.reloadAllProviders(), - 'community-package-install': async ({ packageName, packageVersion }) => - await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion), - 'community-package-update': async ({ packageName, packageVersion }) => - await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion), - 'community-package-uninstall': async ({ packageName }) => - await this.communityPackagesService.removeNpmPackage(packageName), - }); - } - - // #endregion - - // #region Worker process - - private setupWorkerHandlers() { - this.setupHandlers({ - 'reload-license': async () => await this.license.reload(), - 'restart-event-bus': async () => await this.eventbus.restart(), - 'reload-external-secrets-providers': async () => - await this.externalSecretsManager.reloadAllProviders(), - 'community-package-install': async ({ packageName, packageVersion }) => - await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion), - 'community-package-update': async ({ packageName, packageVersion }) => - await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion), - 'community-package-uninstall': async ({ packageName }) => - await this.communityPackagesService.removeNpmPackage(packageName), - 'get-worker-status': async () => - await this.publisher.publishWorkerResponse({ - workerId: config.getEnv('redis.queueModeId'), - command: 'get-worker-status', - payload: this.workerStatus.generateStatus(), - }), - 'get-worker-id': async () => - await this.publisher.publishWorkerResponse({ - workerId: config.getEnv('redis.queueModeId'), - command: 'get-worker-id', - }), - }); - } - - // #endregion + /** Handlers shared by webhook and worker processes. */ + private commonHandlers: { + [K in keyof Pick< + PubSubEventMap, + | 'reload-license' + | 'restart-event-bus' + | 'reload-external-secrets-providers' + | 'community-package-install' + | 'community-package-update' + | 'community-package-uninstall' + >]: (event: PubSubEventMap[K]) => Promise; + } = { + 'reload-license': async () => await this.license.reload(), + 'restart-event-bus': async () => await this.eventbus.restart(), + 'reload-external-secrets-providers': async () => + await this.externalSecretsManager.reloadAllProviders(), + 'community-package-install': async ({ packageName, packageVersion }) => + await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion), + 'community-package-update': async ({ packageName, packageVersion }) => + await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion), + 'community-package-uninstall': async ({ packageName }) => + await this.communityPackagesService.removeNpmPackage(packageName), + }; }