Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Coordinate manual workflow activation and deactivation in multi-main scenario #7643

Merged
merged 65 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
2af1a85
Handle workflow activation if leader
ivov Nov 7, 2023
80983ee
Centralize into `MultiMainSetup`
ivov Nov 7, 2023
cd1a6d4
Broadcast workflow update event
ivov Nov 7, 2023
b6dcaa4
Fix license test
ivov Nov 7, 2023
783df6a
Merge master
ivov Nov 7, 2023
8dd24d3
Fix pruning test
ivov Nov 7, 2023
a484907
Add missing mocks
ivov Nov 7, 2023
dda9007
Clean up tests
ivov Nov 8, 2023
9a75e07
Merge master
ivov Nov 8, 2023
2a27ce2
Inform FE of activation and deactivation
ivov Nov 8, 2023
5b0dc90
Fix orchestration test
ivov Nov 8, 2023
c7b6b24
Merge master, fix conflicts
ivov Nov 8, 2023
248935d
Fix some tests
ivov Nov 8, 2023
7d37083
Simplify init in `BaseCommand`
ivov Nov 8, 2023
467e3ed
Remove leftover logs
ivov Nov 8, 2023
64d3d3b
Remove unneeded handlers
ivov Nov 8, 2023
4829b0d
Fix webhooks tests
ivov Nov 8, 2023
b1adae5
Refactor pruning in multi-main scenario
ivov Nov 8, 2023
08dbd28
Fix `Start` command test
ivov Nov 8, 2023
735d30d
Merge master
ivov Nov 9, 2023
44bf302
Fix tests finally
ivov Nov 9, 2023
91cad72
Add more missing mocks
ivov Nov 9, 2023
d68d20c
Break dep cycle
ivov Nov 9, 2023
2d064e2
Fix `webhooks.api.test.ts`
ivov Nov 9, 2023
086e522
Fix relabeling on destructuring
ivov Nov 9, 2023
7424434
Refactor to use `workflowActiveStateChanged`
ivov Nov 9, 2023
0484402
Forgotten removal
ivov Nov 9, 2023
5ba2d2e
Coordinate activation errors
ivov Nov 9, 2023
59386f0
Fix tests
ivov Nov 9, 2023
10af7f6
Refactor to global config
ivov Nov 9, 2023
d5e6679
Remove unneeded leader check
ivov Nov 10, 2023
0b72bc1
Merge master, fix conflicts
ivov Nov 11, 2023
c4f5ee6
Fix misresolved conflict
ivov Nov 11, 2023
38754c7
Prefix context for pruning log messages
ivov Nov 11, 2023
7750ede
Stop pruning if follower
ivov Nov 11, 2023
6cf5904
Implement activation errors in Redis
ivov Nov 13, 2023
b1b8f18
Fix lint
ivov Nov 13, 2023
ea9eb70
Fix import
ivov Nov 13, 2023
0d7728c
Remove TODOs
ivov Nov 13, 2023
103e639
Remove unused method and type
ivov Nov 13, 2023
b9a6948
Better naming
ivov Nov 13, 2023
e05083b
Call `toCacheKey` only if needed
ivov Nov 13, 2023
b82e2cc
Fix lint
ivov Nov 13, 2023
7dd7416
Add guard for init
ivov Nov 13, 2023
2355251
Better naming
ivov Nov 13, 2023
98d22cd
Rename service file
ivov Nov 13, 2023
cca423b
Simplify error creation
ivov Nov 13, 2023
6a1064b
Add more guards
ivov Nov 13, 2023
3a494d9
Cleanup
ivov Nov 13, 2023
aade6a7
Tighten guard
ivov Nov 13, 2023
9c44e0b
Add clarifying comment
ivov Nov 13, 2023
55a33bf
More cleanup
ivov Nov 13, 2023
f9b2972
Better naming
ivov Nov 13, 2023
f060297
Add defensive `init`
ivov Nov 13, 2023
90337b6
Add guard to `WorkflowService`
ivov Nov 13, 2023
5c7d2a9
Cleanup
ivov Nov 13, 2023
4a0071d
Replace `ActivationError` with string
ivov Nov 15, 2023
0580647
Add missing check
ivov Nov 15, 2023
88c8952
Fix lint
ivov Nov 15, 2023
7435dd1
Ensure also followers init license
ivov Nov 15, 2023
401c02e
Consolidate activation errors into single object
ivov Nov 15, 2023
2d09363
Remove no longer relevant test
ivov Nov 15, 2023
31c9bae
Add telemetry
ivov Nov 16, 2023
c23f7e8
fix(core): Account for activation error caused by follower `main` ins…
ivov Nov 16, 2023
a3147a9
Address feedback
ivov Nov 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions packages/cli/src/ActivationErrors.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { Service } from 'typedi';
import { CacheService } from './services/cache.service';
import { jsonParse } from 'n8n-workflow';

type ActivationErrors = {
[workflowId: string]: string; // error message
};

@Service()
export class ActivationErrorsService {
private readonly cacheKey = 'workflow-activation-errors';

constructor(private readonly cacheService: CacheService) {}

async set(workflowId: string, errorMessage: string) {
const errors = await this.getAll();

errors[workflowId] = errorMessage;

await this.cacheService.set(this.cacheKey, JSON.stringify(errors));
ivov marked this conversation as resolved.
Show resolved Hide resolved
}

async unset(workflowId: string) {
const errors = await this.getAll();

if (Object.keys(errors).length === 0) return;

delete errors[workflowId];

await this.cacheService.set(this.cacheKey, JSON.stringify(errors));
}

async get(workflowId: string) {
const errors = await this.getAll();

if (Object.keys(errors).length === 0) return null;

return errors[workflowId];
}

async getAll() {
const errors = await this.cacheService.get<string>(this.cacheKey);

if (!errors) return {};

return jsonParse<ActivationErrors>(errors);
}

async clearAll() {
await this.cacheService.delete(this.cacheKey);
}
}
100 changes: 36 additions & 64 deletions packages/cli/src/ActiveWorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */

import Container, { Service } from 'typedi';
import { Service } from 'typedi';
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
import config from '@/config';

import type {
ExecutionError,
Expand Down Expand Up @@ -64,8 +65,8 @@ import { WebhookService } from './services/webhook.service';
import { Logger } from './Logger';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { WorkflowRepository } from '@db/repositories/workflow.repository';
import config from '@/config';
import type { MultiMainInstancePublisher } from './services/orchestration/main/MultiMainInstance.publisher.ee';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { ActivationErrorsService } from '@/ActivationErrors.service';

const WEBHOOK_PROD_UNREGISTERED_HINT =
"The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)";
Expand All @@ -74,15 +75,6 @@ const WEBHOOK_PROD_UNREGISTERED_HINT =
export class ActiveWorkflowRunner implements IWebhookManager {
activeWorkflows = new ActiveWorkflows();

private activationErrors: {
[workflowId: string]: {
time: number; // ms
error: {
message: string;
};
};
} = {};

private queuedActivations: {
[workflowId: string]: {
activationMode: WorkflowActivateMode;
Expand All @@ -92,11 +84,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
};
} = {};

isMultiMainScenario =
config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled');

multiMainInstancePublisher: MultiMainInstancePublisher | undefined;

constructor(
private readonly logger: Logger,
private readonly activeExecutions: ActiveExecutions,
Expand All @@ -105,17 +92,13 @@ export class ActiveWorkflowRunner implements IWebhookManager {
private readonly webhookService: WebhookService,
private readonly workflowRepository: WorkflowRepository,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
private readonly multiMainSetup: MultiMainSetup,
private readonly activationErrorsService: ActivationErrorsService,
) {}

async init() {
if (this.isMultiMainScenario) {
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);

this.multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);

await this.multiMainInstancePublisher.init();
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
await this.multiMainSetup.init();
krynble marked this conversation as resolved.
Show resolved Hide resolved
}

await this.addActiveWorkflows('init');
Expand Down Expand Up @@ -272,6 +255,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
async allActiveInStorage(user?: User) {
const isFullAccess = !user || user.globalRole.name === 'owner';

const activationErrors = await this.activationErrorsService.getAll();

if (isFullAccess) {
const activeWorkflows = await this.workflowRepository.find({
select: ['id'],
Expand All @@ -280,7 +265,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {

return activeWorkflows
.map((workflow) => workflow.id)
.filter((workflowId) => !this.activationErrors[workflowId]);
.filter((workflowId) => !activationErrors[workflowId]);
}

const where = whereClause({
Expand All @@ -304,7 +289,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {

return sharings
.map((sharing) => sharing.workflowId)
.filter((workflowId) => !this.activationErrors[workflowId]);
.filter((workflowId) => !activationErrors[workflowId]);
}

/**
Expand All @@ -325,8 +310,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
/**
* Return error if there was a problem activating the workflow
*/
getActivationError(workflowId: string) {
return this.activationErrors[workflowId];
async getActivationError(workflowId: string) {
return this.activationErrorsService.get(workflowId);
}

/**
Expand Down Expand Up @@ -612,12 +597,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
// Remove the workflow as "active"

void this.activeWorkflows.remove(workflowData.id);
this.activationErrors[workflowData.id] = {
time: new Date().getTime(),
error: {
message: error.message,
},
};

void this.activationErrorsService.set(workflowData.id, error.message);

// Run Error Workflow if defined
const activationError = new WorkflowActivationError(
Expand Down Expand Up @@ -709,15 +690,15 @@ export class ActiveWorkflowRunner implements IWebhookManager {
this.logger.verbose('Finished activating workflows (startup)');
}

async addAllTriggerAndPollerBasedWorkflows() {
this.logger.debug('[Leadership change] Adding all trigger- and poller-based workflows...');
async clearAllActivationErrors() {
await this.activationErrorsService.clearAll();
}

async addAllTriggerAndPollerBasedWorkflows() {
await this.addActiveWorkflows('leadershipChange');
}

async removeAllTriggerAndPollerBasedWorkflows() {
this.logger.debug('[Leadership change] Removing all trigger- and poller-based workflows...');

await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows();
}

Expand Down Expand Up @@ -750,12 +731,12 @@ export class ActiveWorkflowRunner implements IWebhookManager {
let shouldAddWebhooks = true;
let shouldAddTriggersAndPollers = true;

if (this.isMultiMainScenario && activationMode !== 'leadershipChange') {
shouldAddWebhooks = this.multiMainInstancePublisher?.isLeader ?? false;
shouldAddTriggersAndPollers = this.multiMainInstancePublisher?.isLeader ?? false;
if (this.multiMainSetup.isEnabled && activationMode !== 'leadershipChange') {
shouldAddWebhooks = this.multiMainSetup.isLeader;
shouldAddTriggersAndPollers = this.multiMainSetup.isLeader;
}

if (this.isMultiMainScenario && activationMode === 'leadershipChange') {
if (this.multiMainSetup.isEnabled && activationMode === 'leadershipChange') {
shouldAddWebhooks = false;
shouldAddTriggersAndPollers = true;
}
Expand Down Expand Up @@ -795,17 +776,13 @@ export class ActiveWorkflowRunner implements IWebhookManager {
const additionalData = await WorkflowExecuteAdditionalData.getBase(sharing.user.id);

if (shouldAddWebhooks) {
this.logger.debug('============');
this.logger.debug(`Adding webhooks for workflow "${dbWorkflow.display()}"`);
this.logger.debug('============');
this.logger.debug(`Adding webhooks for workflow ${dbWorkflow.display()}`);

await this.addWebhooks(workflow, additionalData, 'trigger', activationMode);
}

if (shouldAddTriggersAndPollers) {
this.logger.debug('============');
this.logger.debug(`Adding triggers and pollers for workflow "${dbWorkflow.display()}"`);
this.logger.debug('============');
this.logger.debug(`Adding triggers and pollers for workflow ${dbWorkflow.display()}`);

await this.addTriggersAndPollers(dbWorkflow, workflow, {
activationMode,
Expand All @@ -817,21 +794,15 @@ export class ActiveWorkflowRunner implements IWebhookManager {
// Workflow got now successfully activated so make sure nothing is left in the queue
this.removeQueuedWorkflowActivation(workflowId);

if (this.activationErrors[workflowId]) {
delete this.activationErrors[workflowId];
}
await this.activationErrorsService.unset(workflowId);

const triggerCount = this.countTriggers(workflow, additionalData);
await WorkflowsService.updateWorkflowTriggerCount(workflow.id, triggerCount);
} catch (error) {
this.activationErrors[workflowId] = {
time: new Date().getTime(),
error: {
message: error.message,
},
};
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
await this.activationErrorsService.set(workflowId, error.message);

throw error;
throw e;
}

// If for example webhooks get created it sometimes has to save the
Expand Down Expand Up @@ -950,10 +921,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
);
}

if (this.activationErrors[workflowId] !== undefined) {
// If there were any activation errors delete them
delete this.activationErrors[workflowId];
}
await this.activationErrorsService.unset(workflowId);

if (this.queuedActivations[workflowId] !== undefined) {
this.removeQueuedWorkflowActivation(workflowId);
Expand Down Expand Up @@ -1016,4 +984,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
});
}
}

async removeActivationError(workflowId: string) {
await this.activationErrorsService.unset(workflowId);
}
}
4 changes: 2 additions & 2 deletions packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
import { License } from '@/License';
import { InternalHooks } from '@/InternalHooks';
import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';

@Service()
export class ExternalSecretsManager {
Expand Down Expand Up @@ -82,7 +82,7 @@ export class ExternalSecretsManager {
}

async broadcastReloadExternalSecretsProviders() {
await Container.get(SingleMainInstancePublisher).broadcastReloadExternalSecretsProviders();
await Container.get(SingleMainSetup).broadcastReloadExternalSecretsProviders();
}

private decryptSecretsSettings(value: string): ExternalSecretsSettings {
Expand Down
34 changes: 33 additions & 1 deletion packages/cli/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ export interface IDiagnosticInfo {
ldap_allowed: boolean;
saml_enabled: boolean;
binary_data_s3: boolean;
multi_main_setup_enabled: boolean;
licensePlanName?: string;
licenseTenantId?: number;
}
Expand Down Expand Up @@ -468,7 +469,25 @@ export type IPushData =
| PushDataNodeDescriptionUpdated
| PushDataExecutionRecovered
| PushDataActiveWorkflowUsersChanged
| PushDataWorkerStatusMessage;
| PushDataWorkerStatusMessage
| PushDataWorkflowActivated
| PushDataWorkflowDeactivated
| PushDataWorkflowFailedToActivate;

type PushDataWorkflowFailedToActivate = {
data: IWorkflowFailedToActivate;
type: 'workflowFailedToActivate';
};

type PushDataWorkflowActivated = {
data: IActiveWorkflowChanged;
type: 'workflowActivated';
};

type PushDataWorkflowDeactivated = {
data: IActiveWorkflowChanged;
type: 'workflowDeactivated';
};

type PushDataActiveWorkflowUsersChanged = {
data: IActiveWorkflowUsersChanged;
Expand Down Expand Up @@ -535,11 +554,24 @@ export interface IActiveWorkflowUser {
lastSeen: Date;
}

export interface IActiveWorkflowAdded {
workflowId: Workflow['id'];
}

export interface IActiveWorkflowUsersChanged {
workflowId: Workflow['id'];
activeUsers: IActiveWorkflowUser[];
}

interface IActiveWorkflowChanged {
workflowId: Workflow['id'];
}

interface IWorkflowFailedToActivate {
workflowId: Workflow['id'];
errorMessage: string;
}

export interface IPushDataExecutionRecovered {
executionId: string;
}
Expand Down
Loading
Loading