Skip to content

Commit

Permalink
feat(core): Add command to trigger license refresh on workers (#7184)
Browse files Browse the repository at this point in the history
This PR implements the updated license SDK so that worker and webhook
instances do not auto-renew licenses any more.

Instead, they receive a `reloadLicense` command via the Redis client
that will fetch the updated license after it was saved on the main
instance

This also contains some refactoring with moving redis sub and pub
clients into the event bus directly, to prevent cyclic dependency
issues.
  • Loading branch information
flipswitchingmonkey authored Sep 17, 2023
1 parent d317e09 commit 9f797b9
Show file tree
Hide file tree
Showing 22 changed files with 293 additions and 139 deletions.
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
},
"dependencies": {
"@n8n/client-oauth2": "workspace:*",
"@n8n_io/license-sdk": "~2.5.1",
"@n8n_io/license-sdk": "~2.6.0",
"@oclif/command": "^1.8.16",
"@oclif/core": "^1.16.4",
"@oclif/errors": "^1.3.6",
Expand Down
40 changes: 35 additions & 5 deletions packages/cli/src/License.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import {
SETTINGS_LICENSE_CERT_KEY,
UNLIMITED_LICENSE_QUOTA,
} from './constants';
import { Service } from 'typedi';
import type { BooleanLicenseFeature, NumericLicenseFeature } from './Interfaces';
import Container, { Service } from 'typedi';
import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces';
import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher';
import { RedisService } from './services/redis.service';

type FeatureReturnType = Partial<
{
Expand All @@ -26,18 +28,28 @@ export class License {

private manager: LicenseManager | undefined;

instanceId: string | undefined;

private redisPublisher: RedisServicePubSubPublisher;

constructor() {
this.logger = getLogger();
}

async init(instanceId: string) {
async init(instanceId: string, instanceType: N8nInstanceType = 'main') {
if (this.manager) {
return;
}

this.instanceId = instanceId;
const isMainInstance = instanceType === 'main';
const server = config.getEnv('license.serverUrl');
const autoRenewEnabled = config.getEnv('license.autoRenewEnabled');
const autoRenewEnabled = isMainInstance && config.getEnv('license.autoRenewEnabled');
const offlineMode = !isMainInstance;
const autoRenewOffset = config.getEnv('license.autoRenewOffset');
const saveCertStr = isMainInstance
? async (value: TLicenseBlock) => this.saveCertStr(value)
: async () => {};

try {
this.manager = new LicenseManager({
Expand All @@ -46,9 +58,10 @@ export class License {
productIdentifier: `n8n-${N8N_VERSION}`,
autoRenewEnabled,
autoRenewOffset,
offlineMode,
logger: this.logger,
loadCertStr: async () => this.loadCertStr(),
saveCertStr: async (value: TLicenseBlock) => this.saveCertStr(value),
saveCertStr,
deviceFingerprint: () => instanceId,
});

Expand Down Expand Up @@ -86,6 +99,15 @@ export class License {
},
['key'],
);
if (config.getEnv('executions.mode') === 'queue') {
if (!this.redisPublisher) {
this.logger.debug('Initializing Redis publisher for License Service');
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
}
await this.redisPublisher.publishToCommandChannel({
command: 'reloadLicense',
});
}
}

async activate(activationKey: string): Promise<void> {
Expand All @@ -96,6 +118,14 @@ export class License {
await this.manager.activate(activationKey);
}

async reload(): Promise<void> {
if (!this.manager) {
return;
}
this.logger.debug('Reloading license');
await this.manager.reload();
}

async renew() {
if (!this.manager) {
return;
Expand Down
4 changes: 3 additions & 1 deletion packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1470,7 +1470,9 @@ export class Server extends AbstractServer {
// ----------------------------------------

if (!eventBus.isInitialized) {
await eventBus.initialize();
await eventBus.initialize({
uniqueInstanceId: this.uniqueInstanceId,
});
}

if (this.endpointPresetCredentials !== '') {
Expand Down
6 changes: 3 additions & 3 deletions packages/cli/src/commands/BaseCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { initErrorHandling } from '@/ErrorReporting';
import { ExternalHooks } from '@/ExternalHooks';
import { NodeTypes } from '@/NodeTypes';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import type { IExternalHooksClass } from '@/Interfaces';
import type { IExternalHooksClass, N8nInstanceType } from '@/Interfaces';
import { InternalHooks } from '@/InternalHooks';
import { PostHogClient } from '@/posthog';
import { License } from '@/License';
Expand Down Expand Up @@ -113,9 +113,9 @@ export abstract class BaseCommand extends Command {
await this.externalHooks.init();
}

async initLicense(): Promise<void> {
async initLicense(instanceType: N8nInstanceType = 'main'): Promise<void> {
const license = Container.get(License);
await license.init(this.instanceId);
await license.init(this.instanceId, instanceType);

const activationKey = config.getEnv('license.activationKey');

Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ export class Start extends BaseCommand {
this.logger.info('Initializing n8n process');
this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner);

await this.initLicense();
await this.initLicense('main');
await this.initBinaryManager();
await this.initExternalHooks();
await this.initExternalSecrets();
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export class Webhook extends BaseCommand {
await this.initCrashJournal();
await super.init();

await this.initLicense();
await this.initLicense('webhook');
await this.initBinaryManager();
await this.initExternalHooks();
await this.initExternalSecrets();
Expand Down
4 changes: 3 additions & 1 deletion packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ export class Worker extends BaseCommand {
this.logger.debug(`Worker ID: ${this.uniqueInstanceId}`);
this.logger.debug('Starting n8n worker...');

await this.initLicense();
await this.initLicense('worker');
await this.initBinaryManager();
await this.initExternalHooks();
await this.initExternalSecrets();
Expand All @@ -268,6 +268,7 @@ export class Worker extends BaseCommand {
async initEventBus() {
await eventBus.initialize({
workerId: this.uniqueInstanceId,
uniqueInstanceId: this.uniqueInstanceId,
});
}

Expand Down Expand Up @@ -295,6 +296,7 @@ export class Worker extends BaseCommand {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
getWorkerCommandReceivedHandler({
uniqueInstanceId: this.uniqueInstanceId,
instanceId: this.instanceId,
redisPublisher: this.redisPublisher,
getRunningJobIds: () => Object.keys(Worker.runningJobs),
}),
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/controllers/e2e.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ export class E2EController {

private async resetLogStreaming() {
for (const id in eventBus.destinations) {
await eventBus.removeDestination(id);
await eventBus.removeDestination(id, false);
}
}

Expand Down
5 changes: 1 addition & 4 deletions packages/cli/src/controllers/orchestration.controller.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import config from '@/config';
import { Authorized, Get, RestController } from '@/decorators';
import { OrchestrationRequest } from '@/requests';
import { Service } from 'typedi';
import { OrchestrationService } from '../services/orchestration.service';
import { OrchestrationService } from '@/services/orchestration.service';

@Authorized(['global', 'owner'])
@RestController('/orchestration')
@Service()
export class OrchestrationController {
private config = config;

constructor(private readonly orchestrationService: OrchestrationService) {}

/**
Expand Down
104 changes: 94 additions & 10 deletions packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { LoggerProxy } from 'n8n-workflow';
import { LoggerProxy, jsonParse } from 'n8n-workflow';
import type { MessageEventBusDestinationOptions } from 'n8n-workflow';
import type { DeleteResult } from 'typeorm';
import type {
Expand Down Expand Up @@ -27,9 +27,18 @@ import {
} from '../EventMessageClasses/EventMessageGeneric';
import { recoverExecutionDataFromEventLogMessages } from './recoverEvents';
import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee';
import Container from 'typedi';
import Container, { Service } from 'typedi';
import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories';
import { OrchestrationService } from '../../services/orchestration.service';
import { RedisService } from '@/services/redis.service';
import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
import type { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber';
import {
COMMAND_REDIS_CHANNEL,
EVENT_BUS_REDIS_CHANNEL,
} from '@/services/redis/RedisServiceHelper';
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
import { messageToRedisServiceCommandObject } from '@/services/orchestration/helpers';

export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';

Expand All @@ -41,13 +50,21 @@ export interface MessageWithCallback {
export interface MessageEventBusInitializeOptions {
skipRecoveryPass?: boolean;
workerId?: string;
uniqueInstanceId?: string;
}

@Service()
export class MessageEventBus extends EventEmitter {
private static instance: MessageEventBus;

isInitialized: boolean;

uniqueInstanceId: string;

redisPublisher: RedisServicePubSubPublisher;

redisSubscriber: RedisServicePubSubSubscriber;

logWriter: MessageEventBusLogWriter;

destinations: {
Expand Down Expand Up @@ -76,11 +93,30 @@ export class MessageEventBus extends EventEmitter {
*
* Sets `isInitialized` to `true` once finished.
*/
async initialize(options?: MessageEventBusInitializeOptions): Promise<void> {
async initialize(options: MessageEventBusInitializeOptions): Promise<void> {
if (this.isInitialized) {
return;
}

this.uniqueInstanceId = options?.uniqueInstanceId ?? '';

if (config.getEnv('executions.mode') === 'queue') {
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
this.redisSubscriber = await Container.get(RedisService).getPubSubSubscriber();
await this.redisSubscriber.subscribeToEventLog();
await this.redisSubscriber.subscribeToCommandChannel();
this.redisSubscriber.addMessageHandler(
'MessageEventBusMessageReceiver',
async (channel: string, messageString: string) => {
if (channel === EVENT_BUS_REDIS_CHANNEL) {
await this.handleRedisEventBusMessage(messageString);
} else if (channel === COMMAND_REDIS_CHANNEL) {
await this.handleRedisCommandMessage(messageString);
}
},
);
}

LoggerProxy.debug('Initializing event bus...');

const savedEventDestinations = await Db.collections.EventDestinations.find({});
Expand All @@ -89,7 +125,7 @@ export class MessageEventBus extends EventEmitter {
try {
const destination = messageEventBusDestinationFromDb(this, destinationData);
if (destination) {
await this.addDestination(destination);
await this.addDestination(destination, false);
}
} catch (error) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
Expand Down Expand Up @@ -182,10 +218,13 @@ export class MessageEventBus extends EventEmitter {
this.isInitialized = true;
}

async addDestination(destination: MessageEventBusDestination) {
await this.removeDestination(destination.getId());
async addDestination(destination: MessageEventBusDestination, notifyWorkers: boolean = true) {
await this.removeDestination(destination.getId(), false);
this.destinations[destination.getId()] = destination;
this.destinations[destination.getId()].startListening();
if (notifyWorkers) {
await this.broadcastRestartEventbusAfterDestinationUpdate();
}
return destination;
}

Expand All @@ -199,19 +238,62 @@ export class MessageEventBus extends EventEmitter {
return result.sort((a, b) => (a.__type ?? '').localeCompare(b.__type ?? ''));
}

async removeDestination(id: string): Promise<DeleteResult | undefined> {
async removeDestination(
id: string,
notifyWorkers: boolean = true,
): Promise<DeleteResult | undefined> {
let result;
if (Object.keys(this.destinations).includes(id)) {
await this.destinations[id].close();
result = await this.destinations[id].deleteFromDb();
delete this.destinations[id];
}
if (notifyWorkers) {
await this.broadcastRestartEventbusAfterDestinationUpdate();
}
return result;
}

async handleRedisEventBusMessage(messageString: string) {
const eventData = jsonParse<AbstractEventMessageOptions>(messageString);
if (eventData) {
const eventMessage = getEventMessageObjectByType(eventData);
if (eventMessage) {
await Container.get(MessageEventBus).send(eventMessage);
}
}
return eventData;
}

async handleRedisCommandMessage(messageString: string) {
const message = messageToRedisServiceCommandObject(messageString);
if (message) {
if (
message.senderId === this.uniqueInstanceId ||
(message.targets && !message.targets.includes(this.uniqueInstanceId))
) {
LoggerProxy.debug(
`Skipping command message ${message.command} because it's not for this instance.`,
);
return message;
}
switch (message.command) {
case 'restartEventBus':
await this.restart();
default:
break;
}
return message;
}
return;
}

async broadcastRestartEventbusAfterDestinationUpdate() {
if (config.getEnv('executions.mode') === 'queue') {
await Container.get(OrchestrationService).restartEventBus();
await this.redisPublisher.publishToCommandChannel({
senderId: this.uniqueInstanceId,
command: 'restartEventBus',
});
}
}

Expand All @@ -235,6 +317,8 @@ export class MessageEventBus extends EventEmitter {
);
await this.destinations[destinationName].close();
}
await this.redisSubscriber?.unSubscribeFromCommandChannel();
await this.redisSubscriber?.unSubscribeFromEventLog();
this.isInitialized = false;
LoggerProxy.debug('EventBus shut down.');
}
Expand Down Expand Up @@ -417,4 +501,4 @@ export class MessageEventBus extends EventEmitter {
}
}

export const eventBus = MessageEventBus.getInstance();
export const eventBus = Container.get(MessageEventBus);
Loading

0 comments on commit 9f797b9

Please sign in to comment.