Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Simplify worker pubsub message handler #11086

Merged
merged 7 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions packages/cli/src/commands/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,9 @@ export class Webhook extends BaseCommand {
async initOrchestration() {
await Container.get(OrchestrationWebhookService).init();

Container.get(PubSubHandler).init();
ivov marked this conversation as resolved.
Show resolved Hide resolved
const subscriber = Container.get(Subscriber);
await subscriber.subscribe('n8n.commands');
subscriber.setCommandMessageHandler();

Container.get(PubSubHandler).init();
}
}
15 changes: 7 additions & 8 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-mess
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay';
import { JobProcessor } from '@/scaling/job-processor';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import type { ScalingService } from '@/scaling/scaling.service';
import type { WorkerServerEndpointsConfig } from '@/scaling/worker-server';
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';

import { BaseCommand } from './base-command';
Expand Down Expand Up @@ -128,12 +128,11 @@ export class Worker extends BaseCommand {
*/
async initOrchestration() {
await Container.get(OrchestrationWorkerService).init();
await Container.get(OrchestrationHandlerWorkerService).initWithOptions({
queueModeId: this.queueModeId,
publisher: Container.get(Publisher),
getRunningJobIds: () => this.jobProcessor.getRunningJobIds(),
getRunningJobsSummary: () => this.jobProcessor.getRunningJobsSummary(),
});

Container.get(PubSubHandler).init();
const subscriber = Container.get(Subscriber);
await subscriber.subscribe('n8n.commands');
subscriber.setCommandMessageHandler();
}

async setConcurrency() {
Expand Down
152 changes: 150 additions & 2 deletions packages/cli/src/scaling/__tests__/pubsub-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,29 @@ import type { ExternalSecretsManager } from '@/external-secrets/external-secrets
import type { License } from '@/license';
import type { CommunityPackagesService } from '@/services/community-packages.service';

import type { Publisher } from '../pubsub/publisher.service';
import { PubSubHandler } from '../pubsub/pubsub-handler';
import type { WorkerStatus } from '../worker-status';

describe('PubSubHandler', () => {
const eventService = new EventService();
const license = mock<License>();
const eventbus = mock<MessageEventBus>();
const externalSecretsManager = mock<ExternalSecretsManager>();
const communityPackagesService = mock<CommunityPackagesService>();
const publisher = mock<Publisher>();
const workerStatus = mock<WorkerStatus>();

afterEach(() => {
eventService.removeAllListeners();
});

describe('in webhook process', () => {
const instanceSettings = mock<InstanceSettings>({ instanceType: 'webhook' });

it('should set up handlers in webhook process', () => {
// @ts-expect-error Spying on private method
const setupWebhookHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupWebhookHandlers');
const setupHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupHandlers');

new PubSubHandler(
eventService,
Expand All @@ -30,9 +38,18 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();

expect(setupWebhookHandlersSpy).toHaveBeenCalled();
expect(setupHandlersSpy).toHaveBeenCalledWith({
'reload-license': expect.any(Function),
'restart-event-bus': expect.any(Function),
'reload-external-secrets-providers': expect.any(Function),
'community-package-install': expect.any(Function),
'community-package-update': expect.any(Function),
'community-package-uninstall': expect.any(Function),
});
});

it('should reload license on `reload-license` event', () => {
Expand All @@ -43,6 +60,8 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();

eventService.emit('reload-license');
Expand All @@ -58,6 +77,8 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();

eventService.emit('restart-event-bus');
Expand All @@ -73,6 +94,8 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();

eventService.emit('reload-external-secrets-providers');
Expand All @@ -88,6 +111,8 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();

eventService.emit('community-package-install', {
Expand All @@ -109,6 +134,8 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();

eventService.emit('community-package-update', {
Expand All @@ -130,6 +157,8 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();

eventService.emit('community-package-uninstall', {
Expand All @@ -139,4 +168,123 @@ describe('PubSubHandler', () => {
expect(communityPackagesService.removeNpmPackage).toHaveBeenCalledWith('test-package');
});
});

describe('in worker process', () => {
const instanceSettings = mock<InstanceSettings>({ instanceType: 'worker' });

it('should set up handlers in worker process', () => {
// @ts-expect-error Spying on private method
const setupHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupHandlers');

new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();

expect(setupHandlersSpy).toHaveBeenCalledWith({
'reload-license': expect.any(Function),
'restart-event-bus': expect.any(Function),
'reload-external-secrets-providers': expect.any(Function),
'community-package-install': expect.any(Function),
'community-package-update': expect.any(Function),
'community-package-uninstall': expect.any(Function),
'get-worker-status': expect.any(Function),
'get-worker-id': expect.any(Function),
});
});

it('should reload license on `reload-license` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();

eventService.emit('reload-license');

expect(license.reload).toHaveBeenCalled();
});

it('should restart event bus on `restart-event-bus` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();

eventService.emit('restart-event-bus');

expect(eventbus.restart).toHaveBeenCalled();
});

it('should reload providers on `reload-external-secrets-providers` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();

eventService.emit('reload-external-secrets-providers');

expect(externalSecretsManager.reloadAllProviders).toHaveBeenCalled();
});

it('should generate status on `get-worker-status` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();

eventService.emit('get-worker-status');

expect(workerStatus.generateStatus).toHaveBeenCalled();
});

it('should get worker ID on `get-worker-id` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();

eventService.emit('get-worker-id');

expect(publisher.publishWorkerResponse).toHaveBeenCalledWith({
workerId: expect.any(String),
command: 'get-worker-id',
});
});
});
});
75 changes: 56 additions & 19 deletions packages/cli/src/scaling/pubsub/pubsub-handler.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import { InstanceSettings } from 'n8n-core';
import { Service } from 'typedi';

import config from '@/config';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
import type { PubSubEventMap } from '@/events/maps/pub-sub.event-map';
import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
import { License } from '@/license';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { CommunityPackagesService } from '@/services/community-packages.service';
import { assertNever } from '@/utils';

import { WorkerStatus } from '../worker-status';

/**
* Responsible for handling events emitted from messages received via a pubsub channel.
Expand All @@ -20,10 +25,37 @@ export class PubSubHandler {
private readonly eventbus: MessageEventBus,
private readonly externalSecretsManager: ExternalSecretsManager,
private readonly communityPackagesService: CommunityPackagesService,
private readonly publisher: Publisher,
private readonly workerStatus: WorkerStatus,
) {}

init() {
if (this.instanceSettings.instanceType === 'webhook') this.setupWebhookHandlers();
switch (this.instanceSettings.instanceType) {
case 'webhook':
this.setupHandlers(this.commonHandlers);
break;
case 'worker':
this.setupHandlers({
...this.commonHandlers,
'get-worker-status': async () =>
await this.publisher.publishWorkerResponse({
workerId: config.getEnv('redis.queueModeId'),
command: 'get-worker-status',
payload: this.workerStatus.generateStatus(),
}),
'get-worker-id': async () =>
await this.publisher.publishWorkerResponse({
workerId: config.getEnv('redis.queueModeId'),
command: 'get-worker-id',
}),
});
break;
case 'main':
// TODO
break;
default:
assertNever(this.instanceSettings.instanceType);
}
}

private setupHandlers<EventNames extends keyof PubSubEventMap>(
Expand All @@ -40,22 +72,27 @@ export class PubSubHandler {
}
}

// #region Webhook process

private setupWebhookHandlers() {
this.setupHandlers({
'reload-license': async () => await this.license.reload(),
'restart-event-bus': async () => await this.eventbus.restart(),
'reload-external-secrets-providers': async () =>
await this.externalSecretsManager.reloadAllProviders(),
'community-package-install': async ({ packageName, packageVersion }) =>
await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion),
'community-package-update': async ({ packageName, packageVersion }) =>
await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion),
'community-package-uninstall': async ({ packageName }) =>
await this.communityPackagesService.removeNpmPackage(packageName),
});
}

// #endregion
/** Handlers shared by webhook and worker processes. */
private commonHandlers: {
[K in keyof Pick<
PubSubEventMap,
| 'reload-license'
| 'restart-event-bus'
| 'reload-external-secrets-providers'
| 'community-package-install'
| 'community-package-update'
| 'community-package-uninstall'
>]: (event: PubSubEventMap[K]) => Promise<void>;
} = {
'reload-license': async () => await this.license.reload(),
'restart-event-bus': async () => await this.eventbus.restart(),
'reload-external-secrets-providers': async () =>
await this.externalSecretsManager.reloadAllProviders(),
'community-package-install': async ({ packageName, packageVersion }) =>
await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion),
'community-package-update': async ({ packageName, packageVersion }) =>
await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion),
'community-package-uninstall': async ({ packageName }) =>
await this.communityPackagesService.removeNpmPackage(packageName),
};
}
Loading
Loading