Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Move queueModeId as hostId to InstanceSettings #11262

Merged
merged 5 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions packages/cli/src/abstract-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { engine as expressHandlebars } from 'express-handlebars';
import { readFile } from 'fs/promises';
import type { Server } from 'http';
import isbot from 'isbot';
import type { InstanceType } from 'n8n-core';
import { Container, Service } from 'typedi';

import config from '@/config';
Expand All @@ -22,7 +21,6 @@ import { TestWebhooks } from '@/webhooks/test-webhooks';
import { WaitingWebhooks } from '@/webhooks/waiting-webhooks';
import { createWebhookHandlerFor } from '@/webhooks/webhook-request-handler';

import { generateHostInstanceId } from './databases/utils/generators';
import { ServiceUnavailableError } from './errors/response-errors/service-unavailable.error';

@Service()
Expand Down Expand Up @@ -61,7 +59,7 @@ export abstract class AbstractServer {

readonly uniqueInstanceId: string;

constructor(instanceType: Exclude<InstanceType, 'worker'>) {
constructor() {
this.app = express();
this.app.disable('x-powered-by');

Expand All @@ -85,8 +83,6 @@ export abstract class AbstractServer {
this.endpointWebhookTest = this.globalConfig.endpoints.webhookTest;
this.endpointWebhookWaiting = this.globalConfig.endpoints.webhookWaiting;

this.uniqueInstanceId = generateHostInstanceId(instanceType);

this.logger = Container.get(Logger);
}

Expand Down
13 changes: 0 additions & 13 deletions packages/cli/src/commands/base-command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import type { AbstractServer } from '@/abstract-server';
import config from '@/config';
import { LICENSE_FEATURES, inDevelopment, inTest } from '@/constants';
import * as CrashJournal from '@/crash-journal';
import { generateHostInstanceId } from '@/databases/utils/generators';
import * as Db from '@/db';
import { getDataDeduplicationService } from '@/deduplication';
import { initErrorHandling } from '@/error-reporting';
Expand All @@ -45,8 +44,6 @@ export abstract class BaseCommand extends Command {

protected instanceSettings: InstanceSettings = Container.get(InstanceSettings);

queueModeId: string;

protected server?: AbstractServer;

protected shutdownService: ShutdownService = Container.get(ShutdownService);
Expand Down Expand Up @@ -133,16 +130,6 @@ export abstract class BaseCommand extends Command {
await Container.get(TelemetryEventRelay).init();
}

protected setInstanceQueueModeId() {
if (config.get('redis.queueModeId')) {
this.queueModeId = config.get('redis.queueModeId');
return;
}
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
this.queueModeId = generateHostInstanceId(this.instanceSettings.instanceType!);
config.set('redis.queueModeId', this.queueModeId);
}

protected async stopProcess() {
// This needs to be overridden
}
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export class Start extends BaseCommand {

constructor(argv: string[], cmdConfig: Config) {
super(argv, cmdConfig);
this.setInstanceQueueModeId();
this.instanceSettings.setHostId();
}

/**
Expand Down Expand Up @@ -176,7 +176,7 @@ export class Start extends BaseCommand {
if (config.getEnv('executions.mode') === 'queue') {
const scopedLogger = this.logger.withScope('scaling');
scopedLogger.debug('Starting main instance in scaling mode');
scopedLogger.debug(`Host ID: ${this.queueModeId}`);
scopedLogger.debug(`Host ID: ${this.instanceSettings.hostId}`);
}

const { flags } = await this.parse(Start);
Expand Down
10 changes: 3 additions & 7 deletions packages/cli/src/commands/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ export class Webhook extends BaseCommand {

constructor(argv: string[], cmdConfig: Config) {
super(argv, cmdConfig);
if (this.queueModeId) {
this.logger.debug(`Webhook Instance queue mode id: ${this.queueModeId}`);
}
this.setInstanceQueueModeId();
this.instanceSettings.setHostId();
}

/**
Expand Down Expand Up @@ -71,8 +68,8 @@ export class Webhook extends BaseCommand {
await this.initCrashJournal();
this.logger.debug('Crash journal initialized');

this.logger.info('Initializing n8n webhook process');
this.logger.debug(`Queue mode id: ${this.queueModeId}`);
this.logger.info('Starting n8n webhook process...');
this.logger.debug(`Host ID: ${this.instanceSettings.hostId}`);

await super.init();

Expand Down Expand Up @@ -100,7 +97,6 @@ export class Webhook extends BaseCommand {
const { ScalingService } = await import('@/scaling/scaling.service');
await Container.get(ScalingService).setupQueue();
await this.server.start();
this.logger.debug(`Webhook listener ID: ${this.server.uniqueInstanceId}`);
this.logger.info('Webhook listener waiting for requests.');

// Make sure that the process does not close
Expand Down
8 changes: 4 additions & 4 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export class Worker extends BaseCommand {

this.logger = Container.get(Logger).withScope('scaling');

this.setInstanceQueueModeId();
this.instanceSettings.setHostId();
}

async init() {
Expand All @@ -84,7 +84,7 @@ export class Worker extends BaseCommand {
await this.initCrashJournal();

this.logger.debug('Starting n8n worker...');
this.logger.debug(`Host ID: ${this.queueModeId}`);
this.logger.debug(`Host ID: ${this.instanceSettings.hostId}`);

await this.setConcurrency();
await super.init();
Expand All @@ -109,15 +109,15 @@ export class Worker extends BaseCommand {
new EventMessageGeneric({
eventName: 'n8n.worker.started',
payload: {
workerId: this.queueModeId,
workerId: this.instanceSettings.hostId,
},
}),
);
}

async initEventBus() {
await Container.get(MessageEventBus).initialize({
workerId: this.queueModeId,
workerId: this.instanceSettings.hostId,
});
Container.get(LogStreamingEventRelay).init();
}
Expand Down
5 changes: 0 additions & 5 deletions packages/cli/src/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -491,11 +491,6 @@ export const schema = {
default: 'n8n',
env: 'N8N_REDIS_KEY_PREFIX',
},
queueModeId: {
doc: 'Unique ID for this n8n instance, is usually set automatically by n8n during startup',
format: String,
default: '',
},
},

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export class WorkerMissingEncryptionKey extends ApplicationError {
'Failed to start worker because of missing encryption key.',
'Please set the `N8N_ENCRYPTION_KEY` env var when starting the worker.',
'See: https://docs.n8n.io/hosting/configuration/configuration-examples/encryption-key/',
].join(''),
].join(' '),
{ level: 'warning' },
);
}
Expand Down
20 changes: 9 additions & 11 deletions packages/cli/src/scaling/__tests__/publisher.service.test.ts
Original file line number Diff line number Diff line change
@@ -1,67 +1,65 @@
import type { Redis as SingleNodeClient } from 'ioredis';
import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core';

import config from '@/config';
import { generateNanoId } from '@/databases/utils/generators';
import type { RedisClientService } from '@/services/redis-client.service';
import { mockLogger } from '@test/mocking';

import { Publisher } from '../pubsub/publisher.service';
import type { PubSub } from '../pubsub/pubsub.types';

describe('Publisher', () => {
let queueModeId: string;

beforeEach(() => {
config.set('executions.mode', 'queue');
queueModeId = generateNanoId();
config.set('redis.queueModeId', queueModeId);
});

const client = mock<SingleNodeClient>();
const logger = mockLogger();
const hostId = 'main-bnxa1riryKUNHtln';
const instanceSettings = mock<InstanceSettings>({ hostId });
const redisClientService = mock<RedisClientService>({ createClient: () => client });

describe('constructor', () => {
it('should init Redis client in scaling mode', () => {
const publisher = new Publisher(logger, redisClientService);
const publisher = new Publisher(logger, redisClientService, instanceSettings);

expect(publisher.getClient()).toEqual(client);
});

it('should not init Redis client in regular mode', () => {
config.set('executions.mode', 'regular');
const publisher = new Publisher(logger, redisClientService);
const publisher = new Publisher(logger, redisClientService, instanceSettings);

expect(publisher.getClient()).toBeUndefined();
});
});

describe('shutdown', () => {
it('should disconnect Redis client', () => {
const publisher = new Publisher(logger, redisClientService);
const publisher = new Publisher(logger, redisClientService, instanceSettings);
publisher.shutdown();
expect(client.disconnect).toHaveBeenCalled();
});
});

describe('publishCommand', () => {
it('should publish command into `n8n.commands` pubsub channel', async () => {
const publisher = new Publisher(logger, redisClientService);
const publisher = new Publisher(logger, redisClientService, instanceSettings);
const msg = mock<PubSub.Command>({ command: 'reload-license' });

await publisher.publishCommand(msg);

expect(client.publish).toHaveBeenCalledWith(
'n8n.commands',
JSON.stringify({ ...msg, senderId: queueModeId, selfSend: false, debounce: true }),
JSON.stringify({ ...msg, senderId: hostId, selfSend: false, debounce: true }),
);
});
});

describe('publishWorkerResponse', () => {
it('should publish worker response into `n8n.worker-response` pubsub channel', async () => {
const publisher = new Publisher(logger, redisClientService);
const publisher = new Publisher(logger, redisClientService, instanceSettings);
const msg = mock<PubSub.WorkerResponse>({
response: 'response-to-get-worker-status',
});
Expand Down
8 changes: 4 additions & 4 deletions packages/cli/src/scaling/__tests__/subscriber.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,30 @@ describe('Subscriber', () => {

describe('constructor', () => {
it('should init Redis client in scaling mode', () => {
const subscriber = new Subscriber(mock(), redisClientService, mock());
const subscriber = new Subscriber(mock(), redisClientService, mock(), mock());

expect(subscriber.getClient()).toEqual(client);
});

it('should not init Redis client in regular mode', () => {
config.set('executions.mode', 'regular');
const subscriber = new Subscriber(mock(), redisClientService, mock());
const subscriber = new Subscriber(mock(), redisClientService, mock(), mock());

expect(subscriber.getClient()).toBeUndefined();
});
});

describe('shutdown', () => {
it('should disconnect Redis client', () => {
const subscriber = new Subscriber(mock(), redisClientService, mock());
const subscriber = new Subscriber(mock(), redisClientService, mock(), mock());
subscriber.shutdown();
expect(client.disconnect).toHaveBeenCalled();
});
});

describe('subscribe', () => {
it('should subscribe to pubsub channel', async () => {
const subscriber = new Subscriber(mock(), redisClientService, mock());
const subscriber = new Subscriber(mock(), redisClientService, mock(), mock());

await subscriber.subscribe('n8n.commands');

Expand Down
7 changes: 4 additions & 3 deletions packages/cli/src/scaling/job-processor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { RunningJobSummary } from '@n8n/api-types';
import { WorkflowExecute } from 'n8n-core';
import { InstanceSettings, WorkflowExecute } from 'n8n-core';
import { BINARY_ENCODING, ApplicationError, Workflow } from 'n8n-workflow';
import type { ExecutionStatus, IExecuteResponsePromiseData, IRun } from 'n8n-workflow';
import type PCancelable from 'p-cancelable';
Expand Down Expand Up @@ -33,6 +33,7 @@ export class JobProcessor {
private readonly executionRepository: ExecutionRepository,
private readonly workflowRepository: WorkflowRepository,
private readonly nodeTypes: NodeTypes,
private readonly instanceSettings: InstanceSettings,
) {
this.logger = this.logger.withScope('scaling');
}
Expand Down Expand Up @@ -120,7 +121,7 @@ export class JobProcessor {
kind: 'respond-to-webhook',
executionId,
response: this.encodeWebhookResponse(response),
workerId: config.getEnv('redis.queueModeId'),
workerId: this.instanceSettings.hostId,
};

await job.progress(msg);
Expand Down Expand Up @@ -173,7 +174,7 @@ export class JobProcessor {
const msg: JobFinishedMessage = {
kind: 'job-finished',
executionId,
workerId: config.getEnv('redis.queueModeId'),
workerId: this.instanceSettings.hostId,
};

await job.progress(msg);
Expand Down
4 changes: 3 additions & 1 deletion packages/cli/src/scaling/pubsub/publisher.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis';
import { InstanceSettings } from 'n8n-core';
import { Service } from 'typedi';

import config from '@/config';
Expand All @@ -20,6 +21,7 @@ export class Publisher {
constructor(
private readonly logger: Logger,
private readonly redisClientService: RedisClientService,
private readonly instanceSettings: InstanceSettings,
) {
// @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead.
if (config.getEnv('executions.mode') !== 'queue') return;
Expand Down Expand Up @@ -48,7 +50,7 @@ export class Publisher {
'n8n.commands',
JSON.stringify({
...msg,
senderId: config.getEnv('redis.queueModeId'),
senderId: this.instanceSettings.hostId,
selfSend: SELF_SEND_COMMANDS.has(msg.command),
debounce: !IMMEDIATE_COMMANDS.has(msg.command),
}),
Expand Down
3 changes: 1 addition & 2 deletions packages/cli/src/scaling/pubsub/pubsub-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { ensureError } from 'n8n-workflow';
import { Service } from 'typedi';

import { ActiveWorkflowManager } from '@/active-workflow-manager';
import config from '@/config';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
Expand Down Expand Up @@ -49,7 +48,7 @@ export class PubSubHandler {
...this.commonHandlers,
'get-worker-status': async () =>
await this.publisher.publishWorkerResponse({
senderId: config.getEnv('redis.queueModeId'),
senderId: this.instanceSettings.hostId,
response: 'response-to-get-worker-status',
payload: this.workerStatusService.generateStatus(),
}),
Expand Down
6 changes: 4 additions & 2 deletions packages/cli/src/scaling/pubsub/subscriber.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis';
import debounce from 'lodash/debounce';
import { InstanceSettings } from 'n8n-core';
import { jsonParse } from 'n8n-workflow';
import { Service } from 'typedi';

Expand All @@ -21,6 +22,7 @@ export class Subscriber {
private readonly logger: Logger,
private readonly redisClientService: RedisClientService,
private readonly eventService: EventService,
private readonly instanceSettings: InstanceSettings,
) {
// @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead.
if (config.getEnv('executions.mode') !== 'queue') return;
Expand Down Expand Up @@ -77,12 +79,12 @@ export class Subscriber {
return null;
}

const queueModeId = config.getEnv('redis.queueModeId');
const { hostId } = this.instanceSettings;

if (
'command' in msg &&
!msg.selfSend &&
(msg.senderId === queueModeId || (msg.targets && !msg.targets.includes(queueModeId)))
(msg.senderId === hostId || (msg.targets && !msg.targets.includes(hostId)))
) {
return null;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/scaling/scaling.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ export class ScalingService {
const msg: JobFailedMessage = {
kind: 'job-failed',
executionId,
workerId: config.getEnv('redis.queueModeId'),
workerId: this.instanceSettings.hostId,
errorMsg: error.message,
};

Expand Down
Loading
Loading