Skip to content

Commit

Permalink
refactor(core): Simplify webhook pubsub message handler
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov committed Oct 2, 2024
1 parent 3191912 commit eaef187
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 108 deletions.
10 changes: 8 additions & 2 deletions packages/cli/src/commands/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { Container } from 'typedi';

import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import { OrchestrationHandlerWebhookService } from '@/services/orchestration/webhook/orchestration.handler.webhook.service';
import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { OrchestrationWebhookService } from '@/services/orchestration/webhook/orchestration.webhook.service';
import { WebhookServer } from '@/webhooks/webhook-server';

Expand Down Expand Up @@ -110,6 +111,11 @@ export class Webhook extends BaseCommand {

async initOrchestration() {
await Container.get(OrchestrationWebhookService).init();
await Container.get(OrchestrationHandlerWebhookService).init();

const subscriber = Container.get(Subscriber);
await subscriber.subscribe('n8n.commands');
subscriber.setCommandMessageHandler();

Container.get(PubSubHandler).init();
}
}
142 changes: 142 additions & 0 deletions packages/cli/src/scaling/__tests__/pubsub-handler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core';

import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
import type { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
import type { License } from '@/license';
import type { CommunityPackagesService } from '@/services/community-packages.service';

import { PubSubHandler } from '../pubsub/pubsub-handler';

describe('PubSubHandler', () => {
const eventService = new EventService();
const license = mock<License>();
const eventbus = mock<MessageEventBus>();
const externalSecretsManager = mock<ExternalSecretsManager>();
const communityPackagesService = mock<CommunityPackagesService>();

describe('in webhook process', () => {
const instanceSettings = mock<InstanceSettings>({ instanceType: 'webhook' });

it('should set up handlers in webhook process', () => {
// @ts-expect-error Spying on private method
const setupWebhookHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupWebhookHandlers');

new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();

expect(setupWebhookHandlersSpy).toHaveBeenCalled();
});

it('should reload license on `reload-license` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();

eventService.emit('reload-license');

expect(license.reload).toHaveBeenCalled();
});

it('should restart event bus on `restart-event-bus` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();

eventService.emit('restart-event-bus');

expect(eventbus.restart).toHaveBeenCalled();
});

it('should reload providers on `reload-external-secrets-providers` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();

eventService.emit('reload-external-secrets-providers');

expect(externalSecretsManager.reloadAllProviders).toHaveBeenCalled();
});

it('should install community package on `community-package-install` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();

eventService.emit('community-package-install', {
packageName: 'test-package',
packageVersion: '1.0.0',
});

expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith(
'test-package',
'1.0.0',
);
});

it('should update community package on `community-package-update` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();

eventService.emit('community-package-update', {
packageName: 'test-package',
packageVersion: '1.0.0',
});

expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith(
'test-package',
'1.0.0',
);
});

it('should uninstall community package on `community-package-uninstall` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();

eventService.emit('community-package-uninstall', {
packageName: 'test-package',
});

expect(communityPackagesService.removeNpmPackage).toHaveBeenCalledWith('test-package');
});
});
});
61 changes: 61 additions & 0 deletions packages/cli/src/scaling/pubsub/pubsub-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { InstanceSettings } from 'n8n-core';
import { Service } from 'typedi';

import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
import type { PubSubEventMap } from '@/events/maps/pub-sub.event-map';
import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
import { License } from '@/license';
import { CommunityPackagesService } from '@/services/community-packages.service';

/**
* Responsible for handling events emitted from messages received via a pubsub channel.
*/
@Service()
export class PubSubHandler {
constructor(
private readonly eventService: EventService,
private readonly instanceSettings: InstanceSettings,
private readonly license: License,
private readonly eventbus: MessageEventBus,
private readonly externalSecretsManager: ExternalSecretsManager,
private readonly communityPackagesService: CommunityPackagesService,
) {}

init() {
if (this.instanceSettings.instanceType === 'webhook') this.setupWebhookHandlers();
}

private setupHandlers<EventNames extends keyof PubSubEventMap>(
map: {
[EventName in EventNames]?: (event: PubSubEventMap[EventName]) => void | Promise<void>;
},
) {
for (const [eventName, handlerFn] of Object.entries(map) as Array<
[EventNames, (event: PubSubEventMap[EventNames]) => void | Promise<void>]
>) {
this.eventService.on(eventName, async (event) => {
await handlerFn(event);
});
}
}

// #region Webhook process

private setupWebhookHandlers() {
this.setupHandlers({
'reload-license': async () => await this.license.reload(),
'restart-event-bus': async () => await this.eventbus.restart(),
'reload-external-secrets-providers': async () =>
await this.externalSecretsManager.reloadAllProviders(),
'community-package-install': async ({ packageName, packageVersion }) =>
await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion),
'community-package-update': async ({ packageName, packageVersion }) =>
await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion),
'community-package-uninstall': async ({ packageName }) =>
await this.communityPackagesService.removeNpmPackage(packageName),
});
}

// #endregion
}
39 changes: 39 additions & 0 deletions packages/cli/src/scaling/pubsub/subscriber.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis';
import debounce from 'lodash/debounce';
import { jsonParse } from 'n8n-workflow';
import { Service } from 'typedi';

import config from '@/config';
import { EventService } from '@/events/event.service';
import { Logger } from '@/logging/logger.service';
import { RedisClientService } from '@/services/redis-client.service';

Expand All @@ -21,6 +24,7 @@ export class Subscriber {
constructor(
private readonly logger: Logger,
private readonly redisClientService: RedisClientService,
private readonly eventService: EventService,
) {
// @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead.
if (config.getEnv('executions.mode') !== 'queue') return;
Expand Down Expand Up @@ -62,4 +66,39 @@ export class Subscriber {
}

// #endregion

// #region Commands

setCommandMessageHandler() {
const handlerFn = debounce((str: string) => {
const msg = this.parseCommandMessage(str);
if (msg) this.eventService.emit(msg.command, msg.payload);
}, 300);

this.setMessageHandler('n8n.commands', handlerFn);
}

private parseCommandMessage(str: string) {
const msg = jsonParse<PubSub.Command | null>(str, { fallbackValue: null });

if (!msg) {
this.logger.debug('Received invalid string via command channel', { message: str });

return null;
}

this.logger.debug('Received message via command channel', msg);

const queueModeId = config.getEnv('redis.queueModeId');

if (msg.senderId === queueModeId || (msg.targets && !msg.targets.includes(queueModeId))) {
this.logger.debug('Disregarding message - not for this instance', msg);

return null;
}

return msg;
}

// #endregion
}

This file was deleted.

This file was deleted.

0 comments on commit eaef187

Please sign in to comment.