diff --git a/packages/cli/src/ActivationErrors.service.ts b/packages/cli/src/ActivationErrors.service.ts new file mode 100644 index 0000000000000..b693bb9ba5da0 --- /dev/null +++ b/packages/cli/src/ActivationErrors.service.ts @@ -0,0 +1,52 @@ +import { Service } from 'typedi'; +import { CacheService } from './services/cache.service'; +import { jsonParse } from 'n8n-workflow'; + +type ActivationErrors = { + [workflowId: string]: string; // error message +}; + +@Service() +export class ActivationErrorsService { + private readonly cacheKey = 'workflow-activation-errors'; + + constructor(private readonly cacheService: CacheService) {} + + async set(workflowId: string, errorMessage: string) { + const errors = await this.getAll(); + + errors[workflowId] = errorMessage; + + await this.cacheService.set(this.cacheKey, JSON.stringify(errors)); + } + + async unset(workflowId: string) { + const errors = await this.getAll(); + + if (Object.keys(errors).length === 0) return; + + delete errors[workflowId]; + + await this.cacheService.set(this.cacheKey, JSON.stringify(errors)); + } + + async get(workflowId: string) { + const errors = await this.getAll(); + + if (Object.keys(errors).length === 0) return null; + + return errors[workflowId]; + } + + async getAll() { + const errors = await this.cacheService.get(this.cacheKey); + + if (!errors) return {}; + + return jsonParse(errors); + } + + async clearAll() { + await this.cacheService.delete(this.cacheKey); + } +} diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index 4cbcda2a89ed2..a32ce7aa3d85e 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -2,8 +2,9 @@ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ -import Container, { Service } from 'typedi'; +import { Service } from 'typedi'; import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core'; +import config from '@/config'; import type { ExecutionError, @@ -64,8 +65,8 @@ import { WebhookService } from './services/webhook.service'; import { Logger } from './Logger'; import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository'; import { WorkflowRepository } from '@db/repositories/workflow.repository'; -import config from '@/config'; -import type { MultiMainInstancePublisher } from './services/orchestration/main/MultiMainInstance.publisher.ee'; +import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; +import { ActivationErrorsService } from '@/ActivationErrors.service'; const WEBHOOK_PROD_UNREGISTERED_HINT = "The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)"; @@ -74,15 +75,6 @@ const WEBHOOK_PROD_UNREGISTERED_HINT = export class ActiveWorkflowRunner implements IWebhookManager { activeWorkflows = new ActiveWorkflows(); - private activationErrors: { - [workflowId: string]: { - time: number; // ms - error: { - message: string; - }; - }; - } = {}; - private queuedActivations: { [workflowId: string]: { activationMode: WorkflowActivateMode; @@ -92,11 +84,6 @@ export class ActiveWorkflowRunner implements IWebhookManager { }; } = {}; - isMultiMainScenario = - config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled'); - - multiMainInstancePublisher: MultiMainInstancePublisher | undefined; - constructor( private readonly logger: Logger, private readonly activeExecutions: ActiveExecutions, @@ -105,17 +92,13 @@ export class ActiveWorkflowRunner implements IWebhookManager { private readonly webhookService: WebhookService, private readonly workflowRepository: WorkflowRepository, private readonly sharedWorkflowRepository: SharedWorkflowRepository, + private readonly multiMainSetup: MultiMainSetup, + private readonly activationErrorsService: ActivationErrorsService, ) {} async init() { - if (this.isMultiMainScenario) { - const { MultiMainInstancePublisher } = await import( - '@/services/orchestration/main/MultiMainInstance.publisher.ee' - ); - - this.multiMainInstancePublisher = Container.get(MultiMainInstancePublisher); - - await this.multiMainInstancePublisher.init(); + if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) { + await this.multiMainSetup.init(); } await this.addActiveWorkflows('init'); @@ -272,6 +255,8 @@ export class ActiveWorkflowRunner implements IWebhookManager { async allActiveInStorage(user?: User) { const isFullAccess = !user || user.globalRole.name === 'owner'; + const activationErrors = await this.activationErrorsService.getAll(); + if (isFullAccess) { const activeWorkflows = await this.workflowRepository.find({ select: ['id'], @@ -280,7 +265,7 @@ export class ActiveWorkflowRunner implements IWebhookManager { return activeWorkflows .map((workflow) => workflow.id) - .filter((workflowId) => !this.activationErrors[workflowId]); + .filter((workflowId) => !activationErrors[workflowId]); } const where = whereClause({ @@ -304,7 +289,7 @@ export class ActiveWorkflowRunner implements IWebhookManager { return sharings .map((sharing) => sharing.workflowId) - .filter((workflowId) => !this.activationErrors[workflowId]); + .filter((workflowId) => !activationErrors[workflowId]); } /** @@ -325,8 +310,8 @@ export class ActiveWorkflowRunner implements IWebhookManager { /** * Return error if there was a problem activating the workflow */ - getActivationError(workflowId: string) { - return this.activationErrors[workflowId]; + async getActivationError(workflowId: string) { + return this.activationErrorsService.get(workflowId); } /** @@ -612,12 +597,8 @@ export class ActiveWorkflowRunner implements IWebhookManager { // Remove the workflow as "active" void this.activeWorkflows.remove(workflowData.id); - this.activationErrors[workflowData.id] = { - time: new Date().getTime(), - error: { - message: error.message, - }, - }; + + void this.activationErrorsService.set(workflowData.id, error.message); // Run Error Workflow if defined const activationError = new WorkflowActivationError( @@ -709,15 +690,15 @@ export class ActiveWorkflowRunner implements IWebhookManager { this.logger.verbose('Finished activating workflows (startup)'); } - async addAllTriggerAndPollerBasedWorkflows() { - this.logger.debug('[Leadership change] Adding all trigger- and poller-based workflows...'); + async clearAllActivationErrors() { + await this.activationErrorsService.clearAll(); + } + async addAllTriggerAndPollerBasedWorkflows() { await this.addActiveWorkflows('leadershipChange'); } async removeAllTriggerAndPollerBasedWorkflows() { - this.logger.debug('[Leadership change] Removing all trigger- and poller-based workflows...'); - await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows(); } @@ -750,12 +731,12 @@ export class ActiveWorkflowRunner implements IWebhookManager { let shouldAddWebhooks = true; let shouldAddTriggersAndPollers = true; - if (this.isMultiMainScenario && activationMode !== 'leadershipChange') { - shouldAddWebhooks = this.multiMainInstancePublisher?.isLeader ?? false; - shouldAddTriggersAndPollers = this.multiMainInstancePublisher?.isLeader ?? false; + if (this.multiMainSetup.isEnabled && activationMode !== 'leadershipChange') { + shouldAddWebhooks = this.multiMainSetup.isLeader; + shouldAddTriggersAndPollers = this.multiMainSetup.isLeader; } - if (this.isMultiMainScenario && activationMode === 'leadershipChange') { + if (this.multiMainSetup.isEnabled && activationMode === 'leadershipChange') { shouldAddWebhooks = false; shouldAddTriggersAndPollers = true; } @@ -795,17 +776,13 @@ export class ActiveWorkflowRunner implements IWebhookManager { const additionalData = await WorkflowExecuteAdditionalData.getBase(sharing.user.id); if (shouldAddWebhooks) { - this.logger.debug('============'); - this.logger.debug(`Adding webhooks for workflow "${dbWorkflow.display()}"`); - this.logger.debug('============'); + this.logger.debug(`Adding webhooks for workflow ${dbWorkflow.display()}`); await this.addWebhooks(workflow, additionalData, 'trigger', activationMode); } if (shouldAddTriggersAndPollers) { - this.logger.debug('============'); - this.logger.debug(`Adding triggers and pollers for workflow "${dbWorkflow.display()}"`); - this.logger.debug('============'); + this.logger.debug(`Adding triggers and pollers for workflow ${dbWorkflow.display()}`); await this.addTriggersAndPollers(dbWorkflow, workflow, { activationMode, @@ -817,21 +794,15 @@ export class ActiveWorkflowRunner implements IWebhookManager { // Workflow got now successfully activated so make sure nothing is left in the queue this.removeQueuedWorkflowActivation(workflowId); - if (this.activationErrors[workflowId]) { - delete this.activationErrors[workflowId]; - } + await this.activationErrorsService.unset(workflowId); const triggerCount = this.countTriggers(workflow, additionalData); await WorkflowsService.updateWorkflowTriggerCount(workflow.id, triggerCount); - } catch (error) { - this.activationErrors[workflowId] = { - time: new Date().getTime(), - error: { - message: error.message, - }, - }; + } catch (e) { + const error = e instanceof Error ? e : new Error(`${e}`); + await this.activationErrorsService.set(workflowId, error.message); - throw error; + throw e; } // If for example webhooks get created it sometimes has to save the @@ -950,10 +921,7 @@ export class ActiveWorkflowRunner implements IWebhookManager { ); } - if (this.activationErrors[workflowId] !== undefined) { - // If there were any activation errors delete them - delete this.activationErrors[workflowId]; - } + await this.activationErrorsService.unset(workflowId); if (this.queuedActivations[workflowId] !== undefined) { this.removeQueuedWorkflowActivation(workflowId); @@ -1016,4 +984,8 @@ export class ActiveWorkflowRunner implements IWebhookManager { }); } } + + async removeActivationError(workflowId: string) { + await this.activationErrorsService.unset(workflowId); + } } diff --git a/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts b/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts index 8f8879a174826..74a8609563790 100644 --- a/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts +++ b/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts @@ -19,7 +19,7 @@ import { import { License } from '@/License'; import { InternalHooks } from '@/InternalHooks'; import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee'; -import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher'; +import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup'; @Service() export class ExternalSecretsManager { @@ -82,7 +82,7 @@ export class ExternalSecretsManager { } async broadcastReloadExternalSecretsProviders() { - await Container.get(SingleMainInstancePublisher).broadcastReloadExternalSecretsProviders(); + await Container.get(SingleMainSetup).broadcastReloadExternalSecretsProviders(); } private decryptSecretsSettings(value: string): ExternalSecretsSettings { diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index 8bcdb7875fe30..bd8015a483a30 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -297,6 +297,7 @@ export interface IDiagnosticInfo { ldap_allowed: boolean; saml_enabled: boolean; binary_data_s3: boolean; + multi_main_setup_enabled: boolean; licensePlanName?: string; licenseTenantId?: number; } @@ -468,7 +469,25 @@ export type IPushData = | PushDataNodeDescriptionUpdated | PushDataExecutionRecovered | PushDataActiveWorkflowUsersChanged - | PushDataWorkerStatusMessage; + | PushDataWorkerStatusMessage + | PushDataWorkflowActivated + | PushDataWorkflowDeactivated + | PushDataWorkflowFailedToActivate; + +type PushDataWorkflowFailedToActivate = { + data: IWorkflowFailedToActivate; + type: 'workflowFailedToActivate'; +}; + +type PushDataWorkflowActivated = { + data: IActiveWorkflowChanged; + type: 'workflowActivated'; +}; + +type PushDataWorkflowDeactivated = { + data: IActiveWorkflowChanged; + type: 'workflowDeactivated'; +}; type PushDataActiveWorkflowUsersChanged = { data: IActiveWorkflowUsersChanged; @@ -535,11 +554,24 @@ export interface IActiveWorkflowUser { lastSeen: Date; } +export interface IActiveWorkflowAdded { + workflowId: Workflow['id']; +} + export interface IActiveWorkflowUsersChanged { workflowId: Workflow['id']; activeUsers: IActiveWorkflowUser[]; } +interface IActiveWorkflowChanged { + workflowId: Workflow['id']; +} + +interface IWorkflowFailedToActivate { + workflowId: Workflow['id']; + errorMessage: string; +} + export interface IPushDataExecutionRecovered { executionId: string; } diff --git a/packages/cli/src/License.ts b/packages/cli/src/License.ts index c06f83b271dea..596695fb46d9f 100644 --- a/packages/cli/src/License.ts +++ b/packages/cli/src/License.ts @@ -16,6 +16,7 @@ import { WorkflowRepository } from '@db/repositories/workflow.repository'; import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces'; import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher'; import { RedisService } from './services/redis.service'; +import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; type FeatureReturnType = Partial< { @@ -40,6 +41,7 @@ export class License { constructor( private readonly logger: Logger, private readonly instanceSettings: InstanceSettings, + private readonly multiMainSetup: MultiMainSetup, private readonly settingsRepository: SettingsRepository, private readonly workflowRepository: WorkflowRepository, ) {} @@ -49,6 +51,10 @@ export class License { return; } + if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) { + await this.multiMainSetup.init(); + } + const isMainInstance = instanceType === 'main'; const server = config.getEnv('license.serverUrl'); const autoRenewEnabled = isMainInstance && config.getEnv('license.autoRenewEnabled'); @@ -114,22 +120,28 @@ export class License { } async onFeatureChange(_features: TFeatures): Promise { - if (config.getEnv('executions.mode') === 'queue') { - if (config.getEnv('leaderSelection.enabled')) { - const { MultiMainInstancePublisher } = await import( - '@/services/orchestration/main/MultiMainInstance.publisher.ee' - ); + if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) { + const isMultiMainLicensed = _features[LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES] as + | boolean + | undefined; - const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher); + this.multiMainSetup.setLicensed(isMultiMainLicensed ?? false); - await multiMainInstancePublisher.init(); + if (this.multiMainSetup.isEnabled && this.multiMainSetup.isFollower) { + this.logger.debug( + '[Multi-main setup] Instance is follower, skipping sending of "reloadLicense" command...', + ); + return; + } - if (multiMainInstancePublisher.isFollower) { - this.logger.debug('Instance is follower, skipping sending of reloadLicense command...'); - return; - } + if (this.multiMainSetup.isEnabled && !isMultiMainLicensed) { + this.logger.debug( + '[Multi-main setup] License changed with no support for multi-main setup - no new followers will be allowed to init. To restore multi-main setup, please upgrade to a license that supporst this feature.', + ); } + } + if (config.getEnv('executions.mode') === 'queue') { if (!this.redisPublisher) { this.logger.debug('Initializing Redis publisher for License Service'); this.redisPublisher = await Container.get(RedisService).getPubSubPublisher(); diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index c00fb4558b941..3888ea17642ac 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -233,6 +233,7 @@ export class Server extends AbstractServer { ldap_allowed: isLdapCurrentAuthenticationMethod(), saml_enabled: isSamlCurrentAuthenticationMethod(), binary_data_s3: isS3Available && isS3Selected && isS3Licensed, + multi_main_setup_enabled: config.getEnv('multiMainSetup.enabled'), licensePlanName: Container.get(License).getPlanName(), licenseTenantId: config.getEnv('license.tenantId'), }; @@ -626,7 +627,7 @@ export class Server extends AbstractServer { // Returns if the workflow with the given id had any activation errors this.app.get( `/${this.restEndpoint}/active/error/:id`, - ResponseHelper.send(async (req: WorkflowRequest.GetAllActivationErrors) => { + ResponseHelper.send(async (req: WorkflowRequest.GetActivationError) => { const { id: workflowId } = req.params; const shared = await Container.get(SharedWorkflowRepository).findOne({ diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index eb3f32de1fee4..a2a36b345afc0 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -243,21 +243,6 @@ export abstract class BaseCommand extends Command { } async initLicense(): Promise { - if (config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled')) { - const { MultiMainInstancePublisher } = await import( - '@/services/orchestration/main/MultiMainInstance.publisher.ee' - ); - - const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher); - - await multiMainInstancePublisher.init(); - - if (multiMainInstancePublisher.isFollower) { - this.logger.debug('Instance is follower, skipping license initialization...'); - return; - } - } - const license = Container.get(License); await license.init(this.instanceType ?? 'main'); diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index aacc1f7dd907a..0e5be2ea7232c 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -26,9 +26,10 @@ import { BaseCommand } from './BaseCommand'; import { InternalHooks } from '@/InternalHooks'; import { License, FeatureNotLicensedError } from '@/License'; import { IConfig } from '@oclif/config'; -import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher'; +import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup'; import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service'; import { PruningService } from '@/services/pruning.service'; +import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; import { SettingsRepository } from '@db/repositories/settings.repository'; import { ExecutionRepository } from '@db/repositories/execution.repository'; @@ -113,18 +114,14 @@ export class Start extends BaseCommand { // Note: While this saves a new license cert to DB, the previous entitlements are still kept in memory so that the shutdown process can complete await Container.get(License).shutdown(); - if (await this.pruningService.isPruningEnabled()) { - await this.pruningService.stopPruning(); + if (this.pruningService.isPruningEnabled()) { + this.pruningService.stopPruning(); } - if (config.getEnv('leaderSelection.enabled')) { - const { MultiMainInstancePublisher } = await import( - '@/services/orchestration/main/MultiMainInstance.publisher.ee' - ); - + if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) { await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows(); - await Container.get(MultiMainInstancePublisher).destroy(); + await Container.get(MultiMainSetup).shutdown(); } await Container.get(InternalHooks).onN8nStop(); @@ -231,38 +228,42 @@ export class Start extends BaseCommand { } async initOrchestration() { - if (config.get('executions.mode') !== 'queue') return; + if (config.getEnv('executions.mode') !== 'queue') return; - if (!config.get('leaderSelection.enabled')) { - await Container.get(SingleMainInstancePublisher).init(); + // queue mode in single-main scenario + + if (!config.getEnv('multiMainSetup.enabled')) { + await Container.get(SingleMainSetup).init(); await Container.get(OrchestrationHandlerMainService).init(); return; } - // multi-main scenario - - const { MultiMainInstancePublisher } = await import( - '@/services/orchestration/main/MultiMainInstance.publisher.ee' - ); + // queue mode in multi-main scenario - const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher); - - await multiMainInstancePublisher.init(); - - if ( - multiMainInstancePublisher.isLeader && - !Container.get(License).isMultipleMainInstancesLicensed() - ) { + if (!Container.get(License).isMultipleMainInstancesLicensed()) { throw new FeatureNotLicensedError(LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES); } await Container.get(OrchestrationHandlerMainService).init(); - multiMainInstancePublisher.on('leadershipChange', async () => { - if (multiMainInstancePublisher.isLeader) { + const multiMainSetup = Container.get(MultiMainSetup); + + await multiMainSetup.init(); + + multiMainSetup.on('leadershipChange', async () => { + if (multiMainSetup.isLeader) { + this.logger.debug('[Leadership change] Clearing all activation errors...'); + + await this.activeWorkflowRunner.clearAllActivationErrors(); + + this.logger.debug('[Leadership change] Adding all trigger- and poller-based workflows...'); + await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows(); } else { - // only in case of leadership change without shutdown + this.logger.debug( + '[Leadership change] Removing all trigger- and poller-based workflows...', + ); + await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows(); } }); @@ -348,10 +349,7 @@ export class Start extends BaseCommand { await this.server.start(); - this.pruningService = Container.get(PruningService); - if (await this.pruningService.isPruningEnabled()) { - this.pruningService.startPruning(); - } + await this.initPruning(); // Start to get active workflows and run their triggers await this.activeWorkflowRunner.init(); @@ -390,6 +388,32 @@ export class Start extends BaseCommand { } } + async initPruning() { + this.pruningService = Container.get(PruningService); + + if (this.pruningService.isPruningEnabled()) { + this.pruningService.startPruning(); + } + + if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) { + const multiMainSetup = Container.get(MultiMainSetup); + + await multiMainSetup.init(); + + multiMainSetup.on('leadershipChange', async () => { + if (multiMainSetup.isLeader) { + if (this.pruningService.isPruningEnabled()) { + this.pruningService.startPruning(); + } + } else { + if (this.pruningService.isPruningEnabled()) { + this.pruningService.stopPruning(); + } + } + }); + } + } + async catch(error: Error) { console.log(error.stack); await this.exitWithCrash('Exiting due to an error.', error); diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 8ab9d0aa2233e..6fe7b5f2d4297 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -1324,24 +1324,29 @@ export const schema = { }, }, - leaderSelection: { + multiMainSetup: { + instanceType: { + doc: 'Type of instance in multi-main setup', + format: ['unset', 'leader', 'follower'] as const, + default: 'unset', // only until first leader key check + }, enabled: { - doc: 'Whether to enable leader selection for multiple main instances (license required)', + doc: 'Whether to enable multi-main setup for queue mode (license required)', format: Boolean, default: false, - env: 'N8N_LEADER_SELECTION_ENABLED', + env: 'N8N_MULTI_MAIN_SETUP_ENABLED', }, ttl: { - doc: 'Time to live in Redis for leader selection key, in seconds', + doc: 'Time to live (in seconds) for leader key in multi-main setup', format: Number, default: 10, - env: 'N8N_LEADER_SELECTION_KEY_TTL', + env: 'N8N_MULTI_MAIN_SETUP_KEY_TTL', }, interval: { - doc: 'Interval in Redis for leader selection check, in seconds', + doc: 'Interval (in seconds) for leader check in multi-main setup', format: Number, default: 3, - env: 'N8N_LEADER_SELECTION_CHECK_INTERVAL', + env: 'N8N_MULTI_MAIN_SETUP_CHECK_INTERVAL', }, }, diff --git a/packages/cli/src/controllers/orchestration.controller.ts b/packages/cli/src/controllers/orchestration.controller.ts index cbe9f285d7cfa..9506a485c28d5 100644 --- a/packages/cli/src/controllers/orchestration.controller.ts +++ b/packages/cli/src/controllers/orchestration.controller.ts @@ -1,7 +1,7 @@ import { Authorized, Post, RestController } from '@/decorators'; import { OrchestrationRequest } from '@/requests'; import { Service } from 'typedi'; -import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher'; +import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup'; import { License } from '../License'; @Authorized(['global', 'owner']) @@ -9,7 +9,7 @@ import { License } from '../License'; @Service() export class OrchestrationController { constructor( - private readonly orchestrationService: SingleMainInstancePublisher, + private readonly singleMainSetup: SingleMainSetup, private readonly licenseService: License, ) {} @@ -21,18 +21,18 @@ export class OrchestrationController { async getWorkersStatus(req: OrchestrationRequest.Get) { if (!this.licenseService.isWorkerViewLicensed()) return; const id = req.params.id; - return this.orchestrationService.getWorkerStatus(id); + return this.singleMainSetup.getWorkerStatus(id); } @Post('/worker/status') async getWorkersStatusAll() { if (!this.licenseService.isWorkerViewLicensed()) return; - return this.orchestrationService.getWorkerStatus(); + return this.singleMainSetup.getWorkerStatus(); } @Post('/worker/ids') async getWorkerIdsAll() { if (!this.licenseService.isWorkerViewLicensed()) return; - return this.orchestrationService.getWorkerIds(); + return this.singleMainSetup.getWorkerIds(); } } diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index fd37bf1a39247..e578aed95fbdb 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -32,7 +32,7 @@ import { ExecutionRepository } from '@db/repositories/execution.repository'; import { WorkflowRepository } from '@db/repositories/workflow.repository'; import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; -import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher'; +import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup'; import { Logger } from '@/Logger'; import { EventDestinationsRepository } from '@db/repositories/eventDestinations.repository'; @@ -207,9 +207,7 @@ export class MessageEventBus extends EventEmitter { this.destinations[destination.getId()] = destination; this.destinations[destination.getId()].startListening(); if (notifyWorkers) { - await Container.get( - SingleMainInstancePublisher, - ).broadcastRestartEventbusAfterDestinationUpdate(); + await Container.get(SingleMainSetup).broadcastRestartEventbusAfterDestinationUpdate(); } return destination; } @@ -235,9 +233,7 @@ export class MessageEventBus extends EventEmitter { delete this.destinations[id]; } if (notifyWorkers) { - await Container.get( - SingleMainInstancePublisher, - ).broadcastRestartEventbusAfterDestinationUpdate(); + await Container.get(SingleMainSetup).broadcastRestartEventbusAfterDestinationUpdate(); } return result; } diff --git a/packages/cli/src/requests.ts b/packages/cli/src/requests.ts index 9131cc2fa33f7..a984d3277bf81 100644 --- a/packages/cli/src/requests.ts +++ b/packages/cli/src/requests.ts @@ -114,7 +114,7 @@ export declare namespace WorkflowRequest { type GetAllActive = AuthenticatedRequest; - type GetAllActivationErrors = Get; + type GetActivationError = Get; type ManualRun = AuthenticatedRequest<{}, {}, ManualRunPayload>; diff --git a/packages/cli/src/services/cache.service.ts b/packages/cli/src/services/cache.service.ts index 641e65050b77e..8bc0c563f1179 100644 --- a/packages/cli/src/services/cache.service.ts +++ b/packages/cli/src/services/cache.service.ts @@ -80,21 +80,21 @@ export class CacheService extends EventEmitter { * @param options.refreshTtl Optional ttl for the refreshFunction's set call * @param options.fallbackValue Optional value returned is cache is not hit and refreshFunction is not provided */ - async get( + async get( key: string, options: { - fallbackValue?: unknown; - refreshFunction?: (key: string) => Promise; + fallbackValue?: T; + refreshFunction?: (key: string) => Promise; refreshTtl?: number; } = {}, - ): Promise { + ): Promise { if (!key || key.length === 0) { return; } const value = await this.cache?.store.get(key); if (value !== undefined) { this.emit(this.metricsCounterEvents.cacheHit); - return value; + return value as T; } this.emit(this.metricsCounterEvents.cacheMiss); if (options.refreshFunction) { diff --git a/packages/cli/src/services/orchestration.base.service.ts b/packages/cli/src/services/orchestration.base.service.ts index 113298e786791..a2540a4a5adac 100644 --- a/packages/cli/src/services/orchestration.base.service.ts +++ b/packages/cli/src/services/orchestration.base.service.ts @@ -5,7 +5,7 @@ import config from '@/config'; import { EventEmitter } from 'node:events'; export abstract class OrchestrationService extends EventEmitter { - protected initialized = false; + protected isInitialized = false; protected queueModeId: string; @@ -36,17 +36,17 @@ export abstract class OrchestrationService extends EventEmitter { } sanityCheck(): boolean { - return this.initialized && this.isQueueMode; + return this.isInitialized && this.isQueueMode; } async init() { await this.initPublisher(); - this.initialized = true; + this.isInitialized = true; } async shutdown() { await this.redisPublisher?.destroy(); - this.initialized = false; + this.isInitialized = false; } protected async initPublisher() { diff --git a/packages/cli/src/services/orchestration/main/MultiMainInstance.publisher.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts similarity index 50% rename from packages/cli/src/services/orchestration/main/MultiMainInstance.publisher.ee.ts rename to packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts index 60ef60fda90f6..40a4a18da85e9 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainInstance.publisher.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts @@ -1,50 +1,61 @@ import config from '@/config'; import { Service } from 'typedi'; import { TIME } from '@/constants'; -import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher'; +import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup'; import { getRedisPrefix } from '@/services/redis/RedisServiceHelper'; -/** - * For use in main instance, in multiple main instances cluster. - */ @Service() -export class MultiMainInstancePublisher extends SingleMainInstancePublisher { +export class MultiMainSetup extends SingleMainSetup { private id = this.queueModeId; - private leaderId: string | undefined; + private isLicensed = false; + + get isEnabled() { + return ( + config.getEnv('executions.mode') === 'queue' && + config.getEnv('multiMainSetup.enabled') && + this.isLicensed + ); + } get isLeader() { - return this.id === this.leaderId; + return config.getEnv('multiMainSetup.instanceType') === 'leader'; } get isFollower() { return !this.isLeader; } + setLicensed(newState: boolean) { + this.isLicensed = newState; + } + private readonly leaderKey = getRedisPrefix() + ':main_instance_leader'; - private readonly leaderKeyTtl = config.getEnv('leaderSelection.ttl'); + private readonly leaderKeyTtl = config.getEnv('multiMainSetup.ttl'); private leaderCheckInterval: NodeJS.Timer | undefined; async init() { - if (this.initialized) return; + if (this.isInitialized) return; await this.initPublisher(); - this.initialized = true; + this.isInitialized = true; - await this.tryBecomeLeader(); + await this.tryBecomeLeader(); // prevent initial wait this.leaderCheckInterval = setInterval( async () => { await this.checkLeader(); }, - config.getEnv('leaderSelection.interval') * TIME.SECOND, + config.getEnv('multiMainSetup.interval') * TIME.SECOND, ); } - async destroy() { + async shutdown() { + if (!this.isInitialized) return; + clearInterval(this.leaderCheckInterval); if (this.isLeader) await this.redisPublisher.clear(this.leaderKey); @@ -69,12 +80,17 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher { } else { this.logger.debug(`Leader is other instance "${leaderId}"`); - this.leaderId = leaderId; + config.set('multiMainSetup.instanceType', 'follower'); } } private async tryBecomeLeader() { - if (this.isLeader || !this.redisPublisher.redisClient) return; + if ( + config.getEnv('multiMainSetup.instanceType') === 'leader' || + !this.redisPublisher.redisClient + ) { + return; + } // this can only succeed if leadership is currently vacant const keySetSuccessfully = await this.redisPublisher.setIfNotExists(this.leaderKey, this.id); @@ -82,11 +98,36 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher { if (keySetSuccessfully) { this.logger.debug(`Leader is now this instance "${this.id}"`); - this.leaderId = this.id; - - this.emit('leadershipChange', this.id); + config.set('multiMainSetup.instanceType', 'leader'); await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl); + + this.emit('leadershipChange', this.id); + } else { + config.set('multiMainSetup.instanceType', 'follower'); } } + + async broadcastWorkflowActiveStateChanged(payload: { + workflowId: string; + oldState: boolean; + newState: boolean; + versionId: string; + }) { + if (!this.sanityCheck()) return; + + await this.redisPublisher.publishToCommandChannel({ + command: 'workflowActiveStateChanged', + payload, + }); + } + + async broadcastWorkflowFailedToActivate(payload: { workflowId: string; errorMessage: string }) { + if (!this.sanityCheck()) return; + + await this.redisPublisher.publishToCommandChannel({ + command: 'workflowFailedToActivate', + payload, + }); + } } diff --git a/packages/cli/src/services/orchestration/main/SingleMainInstance.publisher.ts b/packages/cli/src/services/orchestration/main/SingleMainSetup.ts similarity index 90% rename from packages/cli/src/services/orchestration/main/SingleMainInstance.publisher.ts rename to packages/cli/src/services/orchestration/main/SingleMainSetup.ts index 7773ffce47582..10b020b7d8645 100644 --- a/packages/cli/src/services/orchestration/main/SingleMainInstance.publisher.ts +++ b/packages/cli/src/services/orchestration/main/SingleMainSetup.ts @@ -6,13 +6,13 @@ import { OrchestrationService } from '@/services/orchestration.base.service'; * For use in main instance, in single main instance scenario. */ @Service() -export class SingleMainInstancePublisher extends OrchestrationService { +export class SingleMainSetup extends OrchestrationService { constructor(protected readonly logger: Logger) { super(); } sanityCheck() { - return this.initialized && this.isQueueMode && this.isMainInstance; + return this.isInitialized && this.isQueueMode && this.isMainInstance; } async getWorkerStatus(id?: string) { diff --git a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index 77d3e6f8cd01f..9a2752e5d6c8e 100644 --- a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -5,12 +5,17 @@ import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; import { License } from '@/License'; import { Logger } from '@/Logger'; +import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; +import { Push } from '@/push'; +import { MultiMainSetup } from './MultiMainSetup.ee'; +import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; export async function handleCommandMessageMain(messageString: string) { - const queueModeId = config.get('redis.queueModeId'); - const isMainInstance = config.get('generic.instanceType') === 'main'; + const queueModeId = config.getEnv('redis.queueModeId'); + const isMainInstance = config.getEnv('generic.instanceType') === 'main'; const message = messageToRedisServiceCommandObject(messageString); const logger = Container.get(Logger); + const activeWorkflowRunner = Container.get(ActiveWorkflowRunner); if (message) { logger.debug( @@ -35,7 +40,7 @@ export async function handleCommandMessageMain(messageString: string) { return message; } - if (isMainInstance && !config.getEnv('leaderSelection.enabled')) { + if (isMainInstance && !config.getEnv('multiMainSetup.enabled')) { // at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently logger.error( 'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.', @@ -60,6 +65,68 @@ export async function handleCommandMessageMain(messageString: string) { return message; } await Container.get(ExternalSecretsManager).reloadAllProviders(); + break; + + case 'workflowActiveStateChanged': { + if (!debounceMessageReceiver(message, 100)) { + message.payload = { result: 'debounced' }; + return message; + } + + const { workflowId, oldState, newState, versionId } = message.payload ?? {}; + + if ( + typeof workflowId !== 'string' || + typeof oldState !== 'boolean' || + typeof newState !== 'boolean' || + typeof versionId !== 'string' + ) { + break; + } + + const push = Container.get(Push); + + if (!oldState && newState) { + try { + await activeWorkflowRunner.add(workflowId, 'activate'); + push.broadcast('workflowActivated', { workflowId }); + } catch (e) { + const error = e instanceof Error ? e : new Error(`${e}`); + + await Container.get(WorkflowRepository).update(workflowId, { + active: false, + versionId, + }); + + await Container.get(MultiMainSetup).broadcastWorkflowFailedToActivate({ + workflowId, + errorMessage: error.message, + }); + } + } else if (oldState && !newState) { + await activeWorkflowRunner.remove(workflowId); + push.broadcast('workflowDeactivated', { workflowId }); + } else { + await activeWorkflowRunner.remove(workflowId); + await activeWorkflowRunner.add(workflowId, 'update'); + } + + await activeWorkflowRunner.removeActivationError(workflowId); + } + + case 'workflowFailedToActivate': { + if (!debounceMessageReceiver(message, 100)) { + message.payload = { result: 'debounced' }; + return message; + } + + const { workflowId, errorMessage } = message.payload ?? {}; + + if (typeof workflowId !== 'string' || typeof errorMessage !== 'string') break; + + Container.get(Push).broadcast('workflowFailedToActivate', { workflowId, errorMessage }); + } + default: break; } diff --git a/packages/cli/src/services/orchestration/webhook/orchestration.webhook.service.ts b/packages/cli/src/services/orchestration/webhook/orchestration.webhook.service.ts index dc9dc8172ed50..c186a9c1d87a7 100644 --- a/packages/cli/src/services/orchestration/webhook/orchestration.webhook.service.ts +++ b/packages/cli/src/services/orchestration/webhook/orchestration.webhook.service.ts @@ -4,6 +4,6 @@ import { OrchestrationService } from '../../orchestration.base.service'; @Service() export class OrchestrationWebhookService extends OrchestrationService { sanityCheck(): boolean { - return this.initialized && this.isQueueMode && this.isWebhookInstance; + return this.isInitialized && this.isQueueMode && this.isWebhookInstance; } } diff --git a/packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts b/packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts index 0a6fb9cff2487..f044e5403b78c 100644 --- a/packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts +++ b/packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts @@ -5,7 +5,7 @@ import { OrchestrationService } from '../../orchestration.base.service'; @Service() export class OrchestrationWorkerService extends OrchestrationService { sanityCheck(): boolean { - return this.initialized && this.isQueueMode && this.isWorkerInstance; + return this.isInitialized && this.isQueueMode && this.isWorkerInstance; } async publishToEventLog(message: AbstractEventMessage) { diff --git a/packages/cli/src/services/pruning.service.ts b/packages/cli/src/services/pruning.service.ts index ea3091a1b48d9..4d1060f639a56 100644 --- a/packages/cli/src/services/pruning.service.ts +++ b/packages/cli/src/services/pruning.service.ts @@ -1,4 +1,4 @@ -import Container, { Service } from 'typedi'; +import { Service } from 'typedi'; import { BinaryDataService } from 'n8n-core'; import { LessThanOrEqual, IsNull, Not, In, Brackets } from 'typeorm'; import { DateUtils } from 'typeorm/util/DateUtils'; @@ -23,16 +23,13 @@ export class PruningService { public hardDeletionTimeout: NodeJS.Timeout | undefined; - private isMultiMainScenario = - config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled'); - constructor( private readonly logger: Logger, private readonly executionRepository: ExecutionRepository, private readonly binaryDataService: BinaryDataService, ) {} - async isPruningEnabled() { + isPruningEnabled() { if ( !config.getEnv('executions.pruneData') || inTest || @@ -41,75 +38,60 @@ export class PruningService { return false; } - if (this.isMultiMainScenario) { - const { MultiMainInstancePublisher } = await import( - '@/services/orchestration/main/MultiMainInstance.publisher.ee' - ); - - const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher); - - await multiMainInstancePublisher.init(); - - return multiMainInstancePublisher.isLeader; + if ( + config.getEnv('multiMainSetup.enabled') && + config.getEnv('multiMainSetup.instanceType') === 'follower' + ) { + return false; } return true; } /** - * @important Call only after DB connection is established and migrations have completed. + * @important Call this method only after DB migrations have completed. */ startPruning() { + this.logger.debug('[Pruning] Starting soft-deletion and hard-deletion timers'); + this.setSoftDeletionInterval(); this.scheduleHardDeletion(); } - async stopPruning() { - if (this.isMultiMainScenario) { - const { MultiMainInstancePublisher } = await import( - '@/services/orchestration/main/MultiMainInstance.publisher.ee' - ); - - const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher); - - await multiMainInstancePublisher.init(); - - if (multiMainInstancePublisher.isFollower) return; - } - - this.logger.debug('Clearing soft-deletion interval and hard-deletion timeout (pruning cycle)'); + stopPruning() { + this.logger.debug('[Pruning] Removing soft-deletion and hard-deletion timers'); clearInterval(this.softDeletionInterval); clearTimeout(this.hardDeletionTimeout); } private setSoftDeletionInterval(rateMs = this.rates.softDeletion) { - const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' '); - - this.logger.debug(`Setting soft-deletion interval at every ${when} (pruning cycle)`); + const when = [rateMs / TIME.MINUTE, 'min'].join(' '); this.softDeletionInterval = setInterval( async () => this.softDeleteOnPruningCycle(), this.rates.softDeletion, ); + + this.logger.debug(`[Pruning] Soft-deletion scheduled every ${when}`); } private scheduleHardDeletion(rateMs = this.rates.hardDeletion) { - const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' '); - - this.logger.debug(`Scheduling hard-deletion for next ${when} (pruning cycle)`); + const when = [rateMs / TIME.MINUTE, 'min'].join(' '); this.hardDeletionTimeout = setTimeout( async () => this.hardDeleteOnPruningCycle(), this.rates.hardDeletion, ); + + this.logger.debug(`[Pruning] Hard-deletion scheduled for next ${when}`); } /** * Mark executions as deleted based on age and count, in a pruning cycle. */ async softDeleteOnPruningCycle() { - this.logger.debug('Starting soft-deletion of executions (pruning cycle)'); + this.logger.debug('[Pruning] Starting soft-deletion of executions'); const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h const maxCount = config.getEnv('executions.pruneDataMaxCount'); @@ -157,8 +139,11 @@ export class PruningService { .execute(); if (result.affected === 0) { - this.logger.debug('Found no executions to soft-delete (pruning cycle)'); + this.logger.debug('[Pruning] Found no executions to soft-delete'); + return; } + + this.logger.debug('[Pruning] Soft-deleted executions', { count: result.affected }); } /** @@ -187,21 +172,23 @@ export class PruningService { const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId); if (executionIds.length === 0) { - this.logger.debug('Found no executions to hard-delete (pruning cycle)'); + this.logger.debug('[Pruning] Found no executions to hard-delete'); this.scheduleHardDeletion(); return; } try { - this.logger.debug('Starting hard-deletion of executions (pruning cycle)', { + this.logger.debug('[Pruning] Starting hard-deletion of executions', { executionIds, }); await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds); await this.executionRepository.delete({ id: In(executionIds) }); + + this.logger.debug('[Pruning] Hard-deleted executions', { executionIds }); } catch (error) { - this.logger.error('Failed to hard-delete executions (pruning cycle)', { + this.logger.error('[Pruning] Failed to hard-delete executions', { executionIds, error: error instanceof Error ? error.message : `${error}`, }); diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index 6fcc48276a515..4c622e3ac9389 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -6,7 +6,9 @@ export type RedisServiceCommand = | 'restartEventBus' | 'stopWorker' | 'reloadLicense' - | 'reloadExternalSecretsProviders'; + | 'reloadExternalSecretsProviders' + | 'workflowActiveStateChanged' // multi-main only + | 'workflowFailedToActivate'; // multi-main only /** * An object to be sent via Redis pub/sub from the main process to the workers. @@ -50,6 +52,14 @@ export type RedisServiceWorkerResponseObject = { | { command: 'stopWorker'; } + | { + command: 'workflowActiveStateChanged'; + payload: { + oldState: boolean; + newState: boolean; + workflowId: string; + }; + } ); export type RedisServiceCommandObject = { diff --git a/packages/cli/src/workflows/workflows.services.ts b/packages/cli/src/workflows/workflows.services.ts index 1976adb478159..b41ff747fa001 100644 --- a/packages/cli/src/workflows/workflows.services.ts +++ b/packages/cli/src/workflows/workflows.services.ts @@ -30,6 +30,7 @@ import { isStringArray, isWorkflowIdValid } from '@/utils'; import { WorkflowHistoryService } from './workflowHistory/workflowHistory.service.ee'; import { BinaryDataService } from 'n8n-core'; import { Logger } from '@/Logger'; +import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository'; import { WorkflowTagMappingRepository } from '@db/repositories/workflowTagMapping.repository'; import { ExecutionRepository } from '@db/repositories/execution.repository'; @@ -212,6 +213,8 @@ export class WorkflowsService { ); } + const oldState = shared.workflow.active; + if ( !forceSave && workflow.versionId !== '' && @@ -255,9 +258,14 @@ export class WorkflowsService { await Container.get(ExternalHooks).run('workflow.update', [workflow]); + /** + * If the workflow being updated is stored as `active`, remove it from + * active workflows in memory, and re-add it after the update. + * + * If a trigger or poller in the workflow was updated, the new value + * will take effect only on removing and re-adding. + */ if (shared.workflow.active) { - // When workflow gets saved always remove it as the triggers could have been - // changed and so the changes would not take effect await Container.get(ActiveWorkflowRunner).remove(workflowId); } @@ -364,6 +372,21 @@ export class WorkflowsService { } } + if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) { + const multiMainSetup = Container.get(MultiMainSetup); + + await multiMainSetup.init(); + + if (multiMainSetup.isEnabled) { + await Container.get(MultiMainSetup).broadcastWorkflowActiveStateChanged({ + workflowId, + oldState, + newState: updatedWorkflow.active, + versionId: shared.workflow.versionId, + }); + } + } + return updatedWorkflow; } diff --git a/packages/cli/test/integration/ActiveWorkflowRunner.test.ts b/packages/cli/test/integration/ActiveWorkflowRunner.test.ts index b069a8f839c12..d70f896aa1c7b 100644 --- a/packages/cli/test/integration/ActiveWorkflowRunner.test.ts +++ b/packages/cli/test/integration/ActiveWorkflowRunner.test.ts @@ -17,10 +17,9 @@ import { WorkflowRunner } from '@/WorkflowRunner'; import type { User } from '@db/entities/User'; import type { WebhookEntity } from '@db/entities/WebhookEntity'; import { NodeTypes } from '@/NodeTypes'; -import { MultiMainInstancePublisher } from '@/services/orchestration/main/MultiMainInstance.publisher.ee'; - -import { mockInstance } from '../shared/mocking'; import { chooseRandomly } from './shared/random'; +import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; +import { mockInstance } from '../shared/mocking'; import { setSchedulerAsLoadedNode } from './shared/utils'; import * as testDb from './shared/testDb'; import { createOwner } from './shared/db/users'; @@ -30,9 +29,13 @@ mockInstance(ActiveExecutions); mockInstance(ActiveWorkflows); mockInstance(Push); mockInstance(SecretsHelper); -mockInstance(MultiMainInstancePublisher); const webhookService = mockInstance(WebhookService); +const multiMainSetup = mockInstance(MultiMainSetup, { + isEnabled: false, + isLeader: false, + isFollower: false, +}); setSchedulerAsLoadedNode(); @@ -230,7 +233,7 @@ describe('executeErrorWorkflow()', () => { describe('add()', () => { describe('in single-main scenario', () => { - test('leader should add webhooks, triggers and pollers', async () => { + test('should add webhooks, triggers and pollers', async () => { const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES); const workflow = await createWorkflow({ active: true }, owner); @@ -252,72 +255,84 @@ describe('add()', () => { describe('in multi-main scenario', () => { describe('leader', () => { - test('on regular activation mode, leader should add webhooks only', async () => { - const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES); + describe('on non-leadership-change activation mode', () => { + test('should add webhooks only', async () => { + const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES); - jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true); + const workflow = await createWorkflow({ active: true }, owner); - mockInstance(MultiMainInstancePublisher, { isLeader: true }); + jest.replaceProperty(multiMainSetup, 'isEnabled', true); + jest.replaceProperty(multiMainSetup, 'isLeader', true); - const workflow = await createWorkflow({ active: true }, owner); + const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks'); + const addTriggersAndPollersSpy = jest.spyOn( + activeWorkflowRunner, + 'addTriggersAndPollers', + ); - const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks'); - const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers'); + await activeWorkflowRunner.init(); + addWebhooksSpy.mockReset(); + addTriggersAndPollersSpy.mockReset(); - await activeWorkflowRunner.init(); - addWebhooksSpy.mockReset(); - addTriggersAndPollersSpy.mockReset(); + await activeWorkflowRunner.add(workflow.id, mode); - await activeWorkflowRunner.add(workflow.id, mode); - - expect(addWebhooksSpy).toHaveBeenCalledTimes(1); - expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1); + expect(addWebhooksSpy).toHaveBeenCalledTimes(1); + expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1); + }); }); - test('on activation via leadership change, leader should add triggers and pollers only', async () => { - const mode = 'leadershipChange'; - - jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true); + describe('on leadership change activation mode', () => { + test('should add triggers and pollers only', async () => { + const mode = 'leadershipChange'; - mockInstance(MultiMainInstancePublisher, { isLeader: true }); + jest.replaceProperty(multiMainSetup, 'isEnabled', true); + jest.replaceProperty(multiMainSetup, 'isLeader', true); - const workflow = await createWorkflow({ active: true }, owner); + const workflow = await createWorkflow({ active: true }, owner); - const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks'); - const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers'); + const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks'); + const addTriggersAndPollersSpy = jest.spyOn( + activeWorkflowRunner, + 'addTriggersAndPollers', + ); - await activeWorkflowRunner.init(); - addWebhooksSpy.mockReset(); - addTriggersAndPollersSpy.mockReset(); + await activeWorkflowRunner.init(); + addWebhooksSpy.mockReset(); + addTriggersAndPollersSpy.mockReset(); - await activeWorkflowRunner.add(workflow.id, mode); + await activeWorkflowRunner.add(workflow.id, mode); - expect(addWebhooksSpy).not.toHaveBeenCalled(); - expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1); + expect(addWebhooksSpy).not.toHaveBeenCalled(); + expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1); + }); }); }); describe('follower', () => { - test('on regular activation mode, follower should not add webhooks, triggers or pollers', async () => { - const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES); - - jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true); + describe('on any activation mode', () => { + test('should not add webhooks, triggers or pollers', async () => { + const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES); - mockInstance(MultiMainInstancePublisher, { isLeader: false }); + jest.replaceProperty(multiMainSetup, 'isEnabled', true); + jest.replaceProperty(multiMainSetup, 'isLeader', false); - const workflow = await createWorkflow({ active: true }, owner); + const workflow = await createWorkflow({ active: true }, owner); - const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks'); - const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers'); + const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks'); + const addTriggersAndPollersSpy = jest.spyOn( + activeWorkflowRunner, + 'addTriggersAndPollers', + ); - await activeWorkflowRunner.init(); - addWebhooksSpy.mockReset(); - addTriggersAndPollersSpy.mockReset(); + await activeWorkflowRunner.init(); + addWebhooksSpy.mockReset(); + addTriggersAndPollersSpy.mockReset(); - await activeWorkflowRunner.add(workflow.id, mode); + await activeWorkflowRunner.add(workflow.id, mode); - expect(addWebhooksSpy).not.toHaveBeenCalled(); - expect(addTriggersAndPollersSpy).not.toHaveBeenCalled(); + expect(addWebhooksSpy).not.toHaveBeenCalled(); + expect(addTriggersAndPollersSpy).not.toHaveBeenCalled(); + }); }); }); }); diff --git a/packages/cli/test/integration/commands/start.cmd.test.ts b/packages/cli/test/integration/commands/start.cmd.test.ts deleted file mode 100644 index 1bf80c3254983..0000000000000 --- a/packages/cli/test/integration/commands/start.cmd.test.ts +++ /dev/null @@ -1,55 +0,0 @@ -import * as Config from '@oclif/config'; -import { DataSource } from 'typeorm'; - -import { Start } from '@/commands/start'; -import { BaseCommand } from '@/commands/BaseCommand'; -import config from '@/config'; -import { License } from '@/License'; -import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; -import { MultiMainInstancePublisher } from '@/services/orchestration/main/MultiMainInstance.publisher.ee'; -import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; -import { WorkflowHistoryManager } from '@/workflows/workflowHistory/workflowHistoryManager.ee'; -import { RedisService } from '@/services/redis.service'; -import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; -import { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber'; -import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service'; - -import { mockInstance } from '../../shared/mocking'; - -const oclifConfig: Config.IConfig = new Config.Config({ root: __dirname }); - -beforeAll(() => { - mockInstance(DataSource); - mockInstance(ExternalSecretsManager); - mockInstance(ActiveWorkflowRunner); - mockInstance(WorkflowHistoryManager); - mockInstance(RedisService); - mockInstance(RedisServicePubSubPublisher); - mockInstance(RedisServicePubSubSubscriber); - mockInstance(MultiMainInstancePublisher); - mockInstance(OrchestrationHandlerMainService); -}); - -afterEach(() => { - config.load(config.default); - jest.restoreAllMocks(); -}); - -test('should not init license if instance is follower in multi-main scenario', async () => { - config.set('executions.mode', 'queue'); - config.set('endpoints.disableUi', true); - config.set('leaderSelection.enabled', true); - - jest.spyOn(MultiMainInstancePublisher.prototype, 'isFollower', 'get').mockReturnValue(true); - jest.spyOn(BaseCommand.prototype, 'init').mockImplementation(async () => {}); - - const licenseMock = mockInstance(License, { - isMultipleMainInstancesLicensed: jest.fn().mockReturnValue(true), - }); - - const startCmd = new Start([], oclifConfig); - - await startCmd.init(); - - expect(licenseMock.init).not.toHaveBeenCalled(); -}); diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index 3b7567faae6e8..3092e80f6eac2 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -16,6 +16,7 @@ import { PostHogClient } from '@/posthog'; import { RedisService } from '@/services/redis.service'; import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service'; import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; +import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; import { mockInstance } from '../../shared/mocking'; @@ -37,6 +38,7 @@ beforeAll(async () => { mockInstance(RedisService); mockInstance(RedisServicePubSubPublisher); mockInstance(RedisServicePubSubSubscriber); + mockInstance(MultiMainSetup); }); test('worker initializes all its components', async () => { diff --git a/packages/cli/test/integration/shared/utils/index.ts b/packages/cli/test/integration/shared/utils/index.ts index 47a10fabc214f..f2b9d4e5e33e3 100644 --- a/packages/cli/test/integration/shared/utils/index.ts +++ b/packages/cli/test/integration/shared/utils/index.ts @@ -16,6 +16,7 @@ import { AUTH_COOKIE_NAME } from '@/constants'; import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; import { SettingsRepository } from '@db/repositories/settings.repository'; import { mockNodeTypesData } from '../../../unit/Helpers'; +import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; import { mockInstance } from '../../../shared/mocking'; export { setupTestServer } from './testServer'; @@ -28,6 +29,8 @@ export { setupTestServer } from './testServer'; * Initialize node types. */ export async function initActiveWorkflowRunner() { + mockInstance(MultiMainSetup); + const { ActiveWorkflowRunner } = await import('@/ActiveWorkflowRunner'); const workflowRunner = Container.get(ActiveWorkflowRunner); await workflowRunner.init(); diff --git a/packages/cli/test/integration/workflow.service.test.ts b/packages/cli/test/integration/workflow.service.test.ts new file mode 100644 index 0000000000000..a421ea91961de --- /dev/null +++ b/packages/cli/test/integration/workflow.service.test.ts @@ -0,0 +1,62 @@ +import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; +import * as testDb from './shared/testDb'; +import { WorkflowsService } from '@/workflows/workflows.services'; +import { mockInstance } from '../shared/mocking'; +import { Telemetry } from '@/telemetry'; +import { createOwner } from './shared/db/users'; +import { createWorkflow } from './shared/db/workflows'; + +mockInstance(Telemetry); + +const activeWorkflowRunner = mockInstance(ActiveWorkflowRunner); + +beforeAll(async () => { + await testDb.init(); +}); + +afterEach(async () => { + await testDb.truncate(['Workflow']); + jest.restoreAllMocks(); +}); + +afterAll(async () => { + await testDb.terminate(); +}); + +describe('update()', () => { + test('should remove and re-add to active workflows on `active: true` payload', async () => { + const owner = await createOwner(); + const workflow = await createWorkflow({ active: true }, owner); + + const removeSpy = jest.spyOn(activeWorkflowRunner, 'remove'); + const addSpy = jest.spyOn(activeWorkflowRunner, 'add'); + + await WorkflowsService.update(owner, workflow, workflow.id); + + expect(removeSpy).toHaveBeenCalledTimes(1); + const [removedWorkflowId] = removeSpy.mock.calls[0]; + expect(removedWorkflowId).toBe(workflow.id); + + expect(addSpy).toHaveBeenCalledTimes(1); + const [addedWorkflowId, activationMode] = addSpy.mock.calls[0]; + expect(addedWorkflowId).toBe(workflow.id); + expect(activationMode).toBe('update'); + }); + + test('should remove from active workflows on `active: false` payload', async () => { + const owner = await createOwner(); + const workflow = await createWorkflow({ active: true }, owner); + + const removeSpy = jest.spyOn(activeWorkflowRunner, 'remove'); + const addSpy = jest.spyOn(activeWorkflowRunner, 'add'); + + workflow.active = false; + await WorkflowsService.update(owner, workflow, workflow.id); + + expect(removeSpy).toHaveBeenCalledTimes(1); + const [removedWorkflowId] = removeSpy.mock.calls[0]; + expect(removedWorkflowId).toBe(workflow.id); + + expect(addSpy).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/cli/test/unit/License.test.ts b/packages/cli/test/unit/License.test.ts index e2b5551ff5b8f..7238122380f51 100644 --- a/packages/cli/test/unit/License.test.ts +++ b/packages/cli/test/unit/License.test.ts @@ -6,6 +6,7 @@ import { License } from '@/License'; import { Logger } from '@/Logger'; import { N8N_VERSION } from '@/constants'; import { mockInstance } from '../shared/mocking'; +import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; jest.mock('@n8n_io/license-sdk'); @@ -27,9 +28,10 @@ describe('License', () => { let license: License; const logger = mockInstance(Logger); const instanceSettings = mockInstance(InstanceSettings, { instanceId: MOCK_INSTANCE_ID }); + const multiMainSetup = mockInstance(MultiMainSetup); beforeEach(async () => { - license = new License(logger, instanceSettings, mock(), mock()); + license = new License(logger, instanceSettings, mock(), mock(), mock()); await license.init(); }); @@ -52,7 +54,7 @@ describe('License', () => { }); test('initializes license manager for worker', async () => { - license = new License(logger, instanceSettings, mock(), mock()); + license = new License(logger, instanceSettings, mock(), mock(), mock()); await license.init('worker'); expect(LicenseManager).toHaveBeenCalledWith({ autoRenewEnabled: false, diff --git a/packages/cli/test/unit/services/orchestration.service.test.ts b/packages/cli/test/unit/services/orchestration.service.test.ts index 6cf073f28a136..8a71d6784e4b0 100644 --- a/packages/cli/test/unit/services/orchestration.service.test.ts +++ b/packages/cli/test/unit/services/orchestration.service.test.ts @@ -1,6 +1,6 @@ import Container from 'typedi'; import config from '@/config'; -import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher'; +import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup'; import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands'; import { eventBus } from '@/eventbus'; import { RedisService } from '@/services/redis.service'; @@ -10,10 +10,13 @@ import { OrchestrationHandlerMainService } from '@/services/orchestration/main/o import * as helpers from '@/services/orchestration/helpers'; import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; import { Logger } from '@/Logger'; +import { Push } from '@/push'; +import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import { mockInstance } from '../../shared/mocking'; -const os = Container.get(SingleMainInstancePublisher); +const os = Container.get(SingleMainSetup); const handler = Container.get(OrchestrationHandlerMainService); +mockInstance(ActiveWorkflowRunner); let queueModeId: string; @@ -33,6 +36,7 @@ const workerRestartEventbusResponse: RedisServiceWorkerResponseObject = { describe('Orchestration Service', () => { const logger = mockInstance(Logger); + mockInstance(Push); beforeAll(async () => { mockInstance(RedisService); mockInstance(ExternalSecretsManager); diff --git a/packages/editor-ui/src/Interface.ts b/packages/editor-ui/src/Interface.ts index 1f641a2c981ab..7f748234143c1 100644 --- a/packages/editor-ui/src/Interface.ts +++ b/packages/editor-ui/src/Interface.ts @@ -419,7 +419,25 @@ export type IPushData = | PushDataRemoveNodeType | PushDataTestWebhook | PushDataExecutionRecovered - | PushDataWorkerStatusMessage; + | PushDataWorkerStatusMessage + | PushDataActiveWorkflowAdded + | PushDataActiveWorkflowRemoved + | PushDataWorkflowFailedToActivate; + +type PushDataActiveWorkflowAdded = { + data: IActiveWorkflowAdded; + type: 'workflowActivated'; +}; + +type PushDataActiveWorkflowRemoved = { + data: IActiveWorkflowRemoved; + type: 'workflowDeactivated'; +}; + +type PushDataWorkflowFailedToActivate = { + data: IWorkflowFailedToActivate; + type: 'workflowFailedToActivate'; +}; type PushDataExecutionRecovered = { data: IPushDataExecutionRecovered; @@ -489,6 +507,19 @@ export interface IPushDataExecutionFinished { retryOf?: string; } +export interface IActiveWorkflowAdded { + workflowId: string; +} + +export interface IActiveWorkflowRemoved { + workflowId: string; +} + +export interface IWorkflowFailedToActivate { + workflowId: string; + errorMessage: string; +} + export interface IPushDataUnsavedExecutionFinished { executionId: string; data: { finished: true; stoppedAt: Date }; diff --git a/packages/editor-ui/src/components/WorkflowActivator.vue b/packages/editor-ui/src/components/WorkflowActivator.vue index b10796a705e2c..65edefbac4f2e 100644 --- a/packages/editor-ui/src/components/WorkflowActivator.vue +++ b/packages/editor-ui/src/components/WorkflowActivator.vue @@ -119,7 +119,7 @@ export default defineComponent({ } else { errorMessage = this.$locale.baseText( 'workflowActivator.showMessage.displayActivationError.message.errorDataNotUndefined', - { interpolate: { message: errorData.error.message } }, + { interpolate: { message: errorData } }, ); } } catch (error) { diff --git a/packages/editor-ui/src/mixins/pushConnection.ts b/packages/editor-ui/src/mixins/pushConnection.ts index 2cde57284c3c7..9b0806d289fac 100644 --- a/packages/editor-ui/src/mixins/pushConnection.ts +++ b/packages/editor-ui/src/mixins/pushConnection.ts @@ -291,6 +291,33 @@ export const pushConnection = defineComponent({ } } + if ( + receivedData.type === 'workflowFailedToActivate' && + this.workflowsStore.workflowId === receivedData.data.workflowId + ) { + this.workflowsStore.setWorkflowInactive(receivedData.data.workflowId); + this.workflowsStore.setActive(false); + + this.showError( + new Error(receivedData.data.errorMessage), + this.$locale.baseText('workflowActivator.showError.title', { + interpolate: { newStateName: 'activated' }, + }) + ':', + ); + + return true; + } + + if (receivedData.type === 'workflowActivated') { + this.workflowsStore.setWorkflowActive(receivedData.data.workflowId); + return true; + } + + if (receivedData.type === 'workflowDeactivated') { + this.workflowsStore.setWorkflowInactive(receivedData.data.workflowId); + return true; + } + if (receivedData.type === 'executionFinished' || receivedData.type === 'executionRecovered') { // The workflow finished executing let pushData: IPushDataExecutionFinished; diff --git a/packages/editor-ui/src/stores/workflows.store.ts b/packages/editor-ui/src/stores/workflows.store.ts index f6588fa30b631..cf089a66c511c 100644 --- a/packages/editor-ui/src/stores/workflows.store.ts +++ b/packages/editor-ui/src/stores/workflows.store.ts @@ -10,7 +10,6 @@ import { } from '@/constants'; import type { ExecutionsQueryFilter, - IActivationError, IExecutionDeleteFilter, IExecutionPushResponse, IExecutionResponse, @@ -364,7 +363,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, { }); }, - async getActivationError(id: string): Promise { + async getActivationError(id: string): Promise { const rootStore = useRootStore(); return makeRestApiRequest(rootStore.getRestApiContext, 'GET', `/active/error/${id}`); }, @@ -551,6 +550,9 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, { if (this.workflowsById[workflowId]) { this.workflowsById[workflowId].active = true; } + if (workflowId === this.workflow.id) { + this.setActive(true); + } }, setWorkflowInactive(workflowId: string): void { @@ -561,6 +563,9 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, { if (this.workflowsById[workflowId]) { this.workflowsById[workflowId].active = false; } + if (workflowId === this.workflow.id) { + this.setActive(false); + } }, async fetchActiveWorkflows(): Promise {