Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Simplify webhook pubsub message handler #11048

Merged
merged 2 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions packages/cli/src/commands/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { Container } from 'typedi';

import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import { OrchestrationHandlerWebhookService } from '@/services/orchestration/webhook/orchestration.handler.webhook.service';
import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { OrchestrationWebhookService } from '@/services/orchestration/webhook/orchestration.webhook.service';
import { WebhookServer } from '@/webhooks/webhook-server';

Expand Down Expand Up @@ -110,6 +111,11 @@ export class Webhook extends BaseCommand {

async initOrchestration() {
await Container.get(OrchestrationWebhookService).init();
await Container.get(OrchestrationHandlerWebhookService).init();

const subscriber = Container.get(Subscriber);
await subscriber.subscribe('n8n.commands');
subscriber.setCommandMessageHandler();

Container.get(PubSubHandler).init();
Comment on lines +115 to +119
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const subscriber = Container.get(Subscriber);
await subscriber.subscribe('n8n.commands');
subscriber.setCommandMessageHandler();
Container.get(PubSubHandler).init();
Container.get(PubSubHandler).init();
const subscriber = Container.get(Subscriber);
subscriber.setCommandMessageHandler();
await subscriber.subscribe('n8n.commands');

Would this order also work and prevent use form missing messages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! It's a tiny window - I'll address in the next PR 👍🏻

}
}
142 changes: 142 additions & 0 deletions packages/cli/src/scaling/__tests__/pubsub-handler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core';

import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
import type { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
import type { License } from '@/license';
import type { CommunityPackagesService } from '@/services/community-packages.service';

import { PubSubHandler } from '../pubsub/pubsub-handler';

describe('PubSubHandler', () => {
const eventService = new EventService();
const license = mock<License>();
const eventbus = mock<MessageEventBus>();
const externalSecretsManager = mock<ExternalSecretsManager>();
const communityPackagesService = mock<CommunityPackagesService>();

describe('in webhook process', () => {
const instanceSettings = mock<InstanceSettings>({ instanceType: 'webhook' });

it('should set up handlers in webhook process', () => {
// @ts-expect-error Spying on private method
const setupWebhookHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupWebhookHandlers');

new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();

expect(setupWebhookHandlersSpy).toHaveBeenCalled();
});

it('should reload license on `reload-license` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();

eventService.emit('reload-license');

expect(license.reload).toHaveBeenCalled();
});

it('should restart event bus on `restart-event-bus` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();

eventService.emit('restart-event-bus');

expect(eventbus.restart).toHaveBeenCalled();
});

it('should reload providers on `reload-external-secrets-providers` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();

eventService.emit('reload-external-secrets-providers');

expect(externalSecretsManager.reloadAllProviders).toHaveBeenCalled();
});

it('should install community package on `community-package-install` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();

eventService.emit('community-package-install', {
packageName: 'test-package',
packageVersion: '1.0.0',
});

expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith(
'test-package',
'1.0.0',
);
});

it('should update community package on `community-package-update` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();

eventService.emit('community-package-update', {
packageName: 'test-package',
packageVersion: '1.0.0',
});

expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith(
'test-package',
'1.0.0',
);
});

it('should uninstall community package on `community-package-uninstall` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();

eventService.emit('community-package-uninstall', {
packageName: 'test-package',
});

expect(communityPackagesService.removeNpmPackage).toHaveBeenCalledWith('test-package');
});
});
});
10 changes: 5 additions & 5 deletions packages/cli/src/scaling/__tests__/subscriber.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,30 @@ describe('Subscriber', () => {

describe('constructor', () => {
it('should init Redis client in scaling mode', () => {
const subscriber = new Subscriber(mock(), redisClientService);
const subscriber = new Subscriber(mock(), redisClientService, mock());

expect(subscriber.getClient()).toEqual(client);
});

it('should not init Redis client in regular mode', () => {
config.set('executions.mode', 'regular');
const subscriber = new Subscriber(mock(), redisClientService);
const subscriber = new Subscriber(mock(), redisClientService, mock());

expect(subscriber.getClient()).toBeUndefined();
});
});

describe('shutdown', () => {
it('should disconnect Redis client', () => {
const subscriber = new Subscriber(mock(), redisClientService);
const subscriber = new Subscriber(mock(), redisClientService, mock());
subscriber.shutdown();
expect(client.disconnect).toHaveBeenCalled();
});
});

describe('subscribe', () => {
it('should subscribe to pubsub channel', async () => {
const subscriber = new Subscriber(mock(), redisClientService);
const subscriber = new Subscriber(mock(), redisClientService, mock());

await subscriber.subscribe('n8n.commands');

Expand All @@ -50,7 +50,7 @@ describe('Subscriber', () => {

describe('setMessageHandler', () => {
it('should set message handler function for channel', () => {
const subscriber = new Subscriber(mock(), redisClientService);
const subscriber = new Subscriber(mock(), redisClientService, mock());
const channel = 'n8n.commands';
const handlerFn = jest.fn();

Expand Down
61 changes: 61 additions & 0 deletions packages/cli/src/scaling/pubsub/pubsub-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { InstanceSettings } from 'n8n-core';
import { Service } from 'typedi';

import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
import type { PubSubEventMap } from '@/events/maps/pub-sub.event-map';
import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
import { License } from '@/license';
import { CommunityPackagesService } from '@/services/community-packages.service';

/**
* Responsible for handling events emitted from messages received via a pubsub channel.
*/
@Service()
export class PubSubHandler {
constructor(
private readonly eventService: EventService,
private readonly instanceSettings: InstanceSettings,
private readonly license: License,
private readonly eventbus: MessageEventBus,
private readonly externalSecretsManager: ExternalSecretsManager,
private readonly communityPackagesService: CommunityPackagesService,
) {}

init() {
if (this.instanceSettings.instanceType === 'webhook') this.setupWebhookHandlers();
}

private setupHandlers<EventNames extends keyof PubSubEventMap>(
map: {
[EventName in EventNames]?: (event: PubSubEventMap[EventName]) => void | Promise<void>;
},
) {
for (const [eventName, handlerFn] of Object.entries(map) as Array<
[EventNames, (event: PubSubEventMap[EventNames]) => void | Promise<void>]
>) {
this.eventService.on(eventName, async (event) => {
await handlerFn(event);
});
}
}

// #region Webhook process

private setupWebhookHandlers() {
this.setupHandlers({
'reload-license': async () => await this.license.reload(),
ivov marked this conversation as resolved.
Show resolved Hide resolved
'restart-event-bus': async () => await this.eventbus.restart(),
'reload-external-secrets-providers': async () =>
await this.externalSecretsManager.reloadAllProviders(),
'community-package-install': async ({ packageName, packageVersion }) =>
await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion),
'community-package-update': async ({ packageName, packageVersion }) =>
await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion),
'community-package-uninstall': async ({ packageName }) =>
await this.communityPackagesService.removeNpmPackage(packageName),
});
}

// #endregion
}
39 changes: 39 additions & 0 deletions packages/cli/src/scaling/pubsub/subscriber.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis';
import debounce from 'lodash/debounce';
import { jsonParse } from 'n8n-workflow';
import { Service } from 'typedi';

import config from '@/config';
import { EventService } from '@/events/event.service';
import { Logger } from '@/logging/logger.service';
import { RedisClientService } from '@/services/redis-client.service';

Expand All @@ -21,6 +24,7 @@ export class Subscriber {
constructor(
private readonly logger: Logger,
private readonly redisClientService: RedisClientService,
private readonly eventService: EventService,
) {
// @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead.
if (config.getEnv('executions.mode') !== 'queue') return;
Expand Down Expand Up @@ -62,4 +66,39 @@ export class Subscriber {
}

// #endregion

// #region Commands

setCommandMessageHandler() {
const handlerFn = debounce((str: string) => {
const msg = this.parseCommandMessage(str);
if (msg) this.eventService.emit(msg.command, msg.payload);
}, 300);

this.setMessageHandler('n8n.commands', handlerFn);
}

private parseCommandMessage(str: string) {
const msg = jsonParse<PubSub.Command | null>(str, { fallbackValue: null });

if (!msg) {
this.logger.debug('Received invalid string via command channel', { message: str });

return null;
}

this.logger.debug('Received message via command channel', msg);

const queueModeId = config.getEnv('redis.queueModeId');

if (msg.senderId === queueModeId || (msg.targets && !msg.targets.includes(queueModeId))) {
this.logger.debug('Disregarding message - not for this instance', msg);

return null;
}

return msg;
}

// #endregion
}
Loading
Loading