Skip to content

Commit

Permalink
refactor(core): Move queueModeId as hostId to InstanceSettings (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored Oct 15, 2024
1 parent d3b05f1 commit 05467fd
Show file tree
Hide file tree
Showing 27 changed files with 118 additions and 122 deletions.
6 changes: 1 addition & 5 deletions packages/cli/src/abstract-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { engine as expressHandlebars } from 'express-handlebars';
import { readFile } from 'fs/promises';
import type { Server } from 'http';
import isbot from 'isbot';
import type { InstanceType } from 'n8n-core';
import { Container, Service } from 'typedi';

import config from '@/config';
Expand All @@ -22,7 +21,6 @@ import { TestWebhooks } from '@/webhooks/test-webhooks';
import { WaitingWebhooks } from '@/webhooks/waiting-webhooks';
import { createWebhookHandlerFor } from '@/webhooks/webhook-request-handler';

import { generateHostInstanceId } from './databases/utils/generators';
import { ServiceUnavailableError } from './errors/response-errors/service-unavailable.error';

@Service()
Expand Down Expand Up @@ -61,7 +59,7 @@ export abstract class AbstractServer {

readonly uniqueInstanceId: string;

constructor(instanceType: Exclude<InstanceType, 'worker'>) {
constructor() {
this.app = express();
this.app.disable('x-powered-by');

Expand All @@ -85,8 +83,6 @@ export abstract class AbstractServer {
this.endpointWebhookTest = this.globalConfig.endpoints.webhookTest;
this.endpointWebhookWaiting = this.globalConfig.endpoints.webhookWaiting;

this.uniqueInstanceId = generateHostInstanceId(instanceType);

this.logger = Container.get(Logger);
}

Expand Down
13 changes: 0 additions & 13 deletions packages/cli/src/commands/base-command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import type { AbstractServer } from '@/abstract-server';
import config from '@/config';
import { LICENSE_FEATURES, inDevelopment, inTest } from '@/constants';
import * as CrashJournal from '@/crash-journal';
import { generateHostInstanceId } from '@/databases/utils/generators';
import * as Db from '@/db';
import { getDataDeduplicationService } from '@/deduplication';
import { initErrorHandling } from '@/error-reporting';
Expand All @@ -45,8 +44,6 @@ export abstract class BaseCommand extends Command {

protected instanceSettings: InstanceSettings = Container.get(InstanceSettings);

queueModeId: string;

protected server?: AbstractServer;

protected shutdownService: ShutdownService = Container.get(ShutdownService);
Expand Down Expand Up @@ -133,16 +130,6 @@ export abstract class BaseCommand extends Command {
await Container.get(TelemetryEventRelay).init();
}

protected setInstanceQueueModeId() {
if (config.get('redis.queueModeId')) {
this.queueModeId = config.get('redis.queueModeId');
return;
}
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
this.queueModeId = generateHostInstanceId(this.instanceSettings.instanceType!);
config.set('redis.queueModeId', this.queueModeId);
}

protected async stopProcess() {
// This needs to be overridden
}
Expand Down
9 changes: 2 additions & 7 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import { Flags, type Config } from '@oclif/core';
import { Flags } from '@oclif/core';
import glob from 'fast-glob';
import { createReadStream, createWriteStream, existsSync } from 'fs';
import { mkdir } from 'fs/promises';
Expand Down Expand Up @@ -70,11 +70,6 @@ export class Start extends BaseCommand {

override needsCommunityPackages = true;

constructor(argv: string[], cmdConfig: Config) {
super(argv, cmdConfig);
this.setInstanceQueueModeId();
}

/**
* Opens the UI in browser
*/
Expand Down Expand Up @@ -176,7 +171,7 @@ export class Start extends BaseCommand {
if (config.getEnv('executions.mode') === 'queue') {
const scopedLogger = this.logger.withScope('scaling');
scopedLogger.debug('Starting main instance in scaling mode');
scopedLogger.debug(`Host ID: ${this.queueModeId}`);
scopedLogger.debug(`Host ID: ${this.instanceSettings.hostId}`);
}

const { flags } = await this.parse(Start);
Expand Down
15 changes: 3 additions & 12 deletions packages/cli/src/commands/webhook.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Flags, type Config } from '@oclif/core';
import { Flags } from '@oclif/core';
import { ApplicationError } from 'n8n-workflow';
import { Container } from 'typedi';

Expand All @@ -24,14 +24,6 @@ export class Webhook extends BaseCommand {

override needsCommunityPackages = true;

constructor(argv: string[], cmdConfig: Config) {
super(argv, cmdConfig);
if (this.queueModeId) {
this.logger.debug(`Webhook Instance queue mode id: ${this.queueModeId}`);
}
this.setInstanceQueueModeId();
}

/**
* Stops n8n in a graceful way.
* Make for example sure that all the webhooks from third party services
Expand Down Expand Up @@ -71,8 +63,8 @@ export class Webhook extends BaseCommand {
await this.initCrashJournal();
this.logger.debug('Crash journal initialized');

this.logger.info('Initializing n8n webhook process');
this.logger.debug(`Queue mode id: ${this.queueModeId}`);
this.logger.info('Starting n8n webhook process...');
this.logger.debug(`Host ID: ${this.instanceSettings.hostId}`);

await super.init();

Expand Down Expand Up @@ -100,7 +92,6 @@ export class Webhook extends BaseCommand {
const { ScalingService } = await import('@/scaling/scaling.service');
await Container.get(ScalingService).setupQueue();
await this.server.start();
this.logger.debug(`Webhook listener ID: ${this.server.uniqueInstanceId}`);
this.logger.info('Webhook listener waiting for requests.');

// Make sure that the process does not close
Expand Down
8 changes: 3 additions & 5 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ export class Worker extends BaseCommand {
super(argv, cmdConfig);

this.logger = Container.get(Logger).withScope('scaling');

this.setInstanceQueueModeId();
}

async init() {
Expand All @@ -86,7 +84,7 @@ export class Worker extends BaseCommand {
await this.initCrashJournal();

this.logger.debug('Starting n8n worker...');
this.logger.debug(`Host ID: ${this.queueModeId}`);
this.logger.debug(`Host ID: ${this.instanceSettings.hostId}`);

await this.setConcurrency();
await super.init();
Expand All @@ -111,7 +109,7 @@ export class Worker extends BaseCommand {
new EventMessageGeneric({
eventName: 'n8n.worker.started',
payload: {
workerId: this.queueModeId,
workerId: this.instanceSettings.hostId,
},
}),
);
Expand All @@ -130,7 +128,7 @@ export class Worker extends BaseCommand {

async initEventBus() {
await Container.get(MessageEventBus).initialize({
workerId: this.queueModeId,
workerId: this.instanceSettings.hostId,
});
Container.get(LogStreamingEventRelay).init();
}
Expand Down
5 changes: 0 additions & 5 deletions packages/cli/src/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -491,11 +491,6 @@ export const schema = {
default: 'n8n',
env: 'N8N_REDIS_KEY_PREFIX',
},
queueModeId: {
doc: 'Unique ID for this n8n instance, is usually set automatically by n8n during startup',
format: String,
default: '',
},
},

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export class WorkerMissingEncryptionKey extends ApplicationError {
'Failed to start worker because of missing encryption key.',
'Please set the `N8N_ENCRYPTION_KEY` env var when starting the worker.',
'See: https://docs.n8n.io/hosting/configuration/configuration-examples/encryption-key/',
].join(''),
].join(' '),
{ level: 'warning' },
);
}
Expand Down
20 changes: 9 additions & 11 deletions packages/cli/src/scaling/__tests__/publisher.service.test.ts
Original file line number Diff line number Diff line change
@@ -1,67 +1,65 @@
import type { Redis as SingleNodeClient } from 'ioredis';
import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core';

import config from '@/config';
import { generateNanoId } from '@/databases/utils/generators';
import type { RedisClientService } from '@/services/redis-client.service';
import { mockLogger } from '@test/mocking';

import { Publisher } from '../pubsub/publisher.service';
import type { PubSub } from '../pubsub/pubsub.types';

describe('Publisher', () => {
let queueModeId: string;

beforeEach(() => {
config.set('executions.mode', 'queue');
queueModeId = generateNanoId();
config.set('redis.queueModeId', queueModeId);
});

const client = mock<SingleNodeClient>();
const logger = mockLogger();
const hostId = 'main-bnxa1riryKUNHtln';
const instanceSettings = mock<InstanceSettings>({ hostId });
const redisClientService = mock<RedisClientService>({ createClient: () => client });

describe('constructor', () => {
it('should init Redis client in scaling mode', () => {
const publisher = new Publisher(logger, redisClientService);
const publisher = new Publisher(logger, redisClientService, instanceSettings);

expect(publisher.getClient()).toEqual(client);
});

it('should not init Redis client in regular mode', () => {
config.set('executions.mode', 'regular');
const publisher = new Publisher(logger, redisClientService);
const publisher = new Publisher(logger, redisClientService, instanceSettings);

expect(publisher.getClient()).toBeUndefined();
});
});

describe('shutdown', () => {
it('should disconnect Redis client', () => {
const publisher = new Publisher(logger, redisClientService);
const publisher = new Publisher(logger, redisClientService, instanceSettings);
publisher.shutdown();
expect(client.disconnect).toHaveBeenCalled();
});
});

describe('publishCommand', () => {
it('should publish command into `n8n.commands` pubsub channel', async () => {
const publisher = new Publisher(logger, redisClientService);
const publisher = new Publisher(logger, redisClientService, instanceSettings);
const msg = mock<PubSub.Command>({ command: 'reload-license' });

await publisher.publishCommand(msg);

expect(client.publish).toHaveBeenCalledWith(
'n8n.commands',
JSON.stringify({ ...msg, senderId: queueModeId, selfSend: false, debounce: true }),
JSON.stringify({ ...msg, senderId: hostId, selfSend: false, debounce: true }),
);
});
});

describe('publishWorkerResponse', () => {
it('should publish worker response into `n8n.worker-response` pubsub channel', async () => {
const publisher = new Publisher(logger, redisClientService);
const publisher = new Publisher(logger, redisClientService, instanceSettings);
const msg = mock<PubSub.WorkerResponse>({
response: 'response-to-get-worker-status',
});
Expand Down
8 changes: 4 additions & 4 deletions packages/cli/src/scaling/__tests__/subscriber.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,30 @@ describe('Subscriber', () => {

describe('constructor', () => {
it('should init Redis client in scaling mode', () => {
const subscriber = new Subscriber(mock(), redisClientService, mock());
const subscriber = new Subscriber(mock(), redisClientService, mock(), mock());

expect(subscriber.getClient()).toEqual(client);
});

it('should not init Redis client in regular mode', () => {
config.set('executions.mode', 'regular');
const subscriber = new Subscriber(mock(), redisClientService, mock());
const subscriber = new Subscriber(mock(), redisClientService, mock(), mock());

expect(subscriber.getClient()).toBeUndefined();
});
});

describe('shutdown', () => {
it('should disconnect Redis client', () => {
const subscriber = new Subscriber(mock(), redisClientService, mock());
const subscriber = new Subscriber(mock(), redisClientService, mock(), mock());
subscriber.shutdown();
expect(client.disconnect).toHaveBeenCalled();
});
});

describe('subscribe', () => {
it('should subscribe to pubsub channel', async () => {
const subscriber = new Subscriber(mock(), redisClientService, mock());
const subscriber = new Subscriber(mock(), redisClientService, mock(), mock());

await subscriber.subscribe('n8n.commands');

Expand Down
7 changes: 4 additions & 3 deletions packages/cli/src/scaling/job-processor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { RunningJobSummary } from '@n8n/api-types';
import { WorkflowExecute } from 'n8n-core';
import { InstanceSettings, WorkflowExecute } from 'n8n-core';
import { BINARY_ENCODING, ApplicationError, Workflow } from 'n8n-workflow';
import type { ExecutionStatus, IExecuteResponsePromiseData, IRun } from 'n8n-workflow';
import type PCancelable from 'p-cancelable';
Expand Down Expand Up @@ -33,6 +33,7 @@ export class JobProcessor {
private readonly executionRepository: ExecutionRepository,
private readonly workflowRepository: WorkflowRepository,
private readonly nodeTypes: NodeTypes,
private readonly instanceSettings: InstanceSettings,
) {
this.logger = this.logger.withScope('scaling');
}
Expand Down Expand Up @@ -120,7 +121,7 @@ export class JobProcessor {
kind: 'respond-to-webhook',
executionId,
response: this.encodeWebhookResponse(response),
workerId: config.getEnv('redis.queueModeId'),
workerId: this.instanceSettings.hostId,
};

await job.progress(msg);
Expand Down Expand Up @@ -173,7 +174,7 @@ export class JobProcessor {
const msg: JobFinishedMessage = {
kind: 'job-finished',
executionId,
workerId: config.getEnv('redis.queueModeId'),
workerId: this.instanceSettings.hostId,
};

await job.progress(msg);
Expand Down
4 changes: 3 additions & 1 deletion packages/cli/src/scaling/pubsub/publisher.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis';
import { InstanceSettings } from 'n8n-core';
import { Service } from 'typedi';

import config from '@/config';
Expand All @@ -20,6 +21,7 @@ export class Publisher {
constructor(
private readonly logger: Logger,
private readonly redisClientService: RedisClientService,
private readonly instanceSettings: InstanceSettings,
) {
// @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead.
if (config.getEnv('executions.mode') !== 'queue') return;
Expand Down Expand Up @@ -48,7 +50,7 @@ export class Publisher {
'n8n.commands',
JSON.stringify({
...msg,
senderId: config.getEnv('redis.queueModeId'),
senderId: this.instanceSettings.hostId,
selfSend: SELF_SEND_COMMANDS.has(msg.command),
debounce: !IMMEDIATE_COMMANDS.has(msg.command),
}),
Expand Down
3 changes: 1 addition & 2 deletions packages/cli/src/scaling/pubsub/pubsub-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { ensureError } from 'n8n-workflow';
import { Service } from 'typedi';

import { ActiveWorkflowManager } from '@/active-workflow-manager';
import config from '@/config';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
Expand Down Expand Up @@ -49,7 +48,7 @@ export class PubSubHandler {
...this.commonHandlers,
'get-worker-status': async () =>
await this.publisher.publishWorkerResponse({
senderId: config.getEnv('redis.queueModeId'),
senderId: this.instanceSettings.hostId,
response: 'response-to-get-worker-status',
payload: this.workerStatusService.generateStatus(),
}),
Expand Down
Loading

0 comments on commit 05467fd

Please sign in to comment.