Skip to content

Commit

Permalink
fix(core): Filter out certain executions from crash recovery (n8n-io#…
Browse files Browse the repository at this point in the history
…9904)

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
  • Loading branch information
ivov and netroy authored Jul 2, 2024
1 parent 61c20d1 commit 7044d1c
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 20 deletions.
6 changes: 6 additions & 0 deletions packages/cli/src/ActiveExecutions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ export class ActiveExecutions {
async shutdown(cancelAll = false) {
let executionIds = Object.keys(this.activeExecutions);

if (config.getEnv('executions.mode') === 'regular') {
// removal of active executions will no longer release capacity back,
// so that throttled executions cannot resume during shutdown
this.concurrencyControl.disable();
}

if (cancelAll) {
if (config.getEnv('executions.mode') === 'regular') {
await this.concurrencyControl.removeAll(this.activeExecutions);
Expand Down
5 changes: 5 additions & 0 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { ExecutionService } from '@/executions/execution.service';
import { OwnershipService } from '@/services/ownership.service';
import { WorkflowRunner } from '@/WorkflowRunner';
import { ExecutionRecoveryService } from '@/executions/execution-recovery.service';
import { EventRelay } from '@/eventbus/event-relay.service';

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open');
Expand Down Expand Up @@ -375,6 +376,10 @@ export class Start extends BaseCommand {
projectId: project.id,
};

Container.get(EventRelay).emit('execution-started-during-bootup', {
executionId: execution.id,
});

// do not block - each execution either runs concurrently or is queued
void workflowRunner.run(data, undefined, false, execution.id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutingWorkflowData } from '@/Interfaces';
import type { Telemetry } from '@/telemetry';
import type { EventRelay } from '@/eventbus/event-relay.service';

describe('ConcurrencyControlService', () => {
const logger = mock<Logger>();
const executionRepository = mock<ExecutionRepository>();
const telemetry = mock<Telemetry>();
const eventRelay = mock<EventRelay>();

afterEach(() => {
config.set('executions.concurrency.productionLimit', -1);
Expand All @@ -35,7 +37,12 @@ describe('ConcurrencyControlService', () => {
/**
* Act
*/
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);

/**
* Assert
Expand All @@ -56,7 +63,7 @@ describe('ConcurrencyControlService', () => {
/**
* Act
*/
new ConcurrencyControlService(logger, executionRepository, telemetry);
new ConcurrencyControlService(logger, executionRepository, telemetry, eventRelay);
} catch (error) {
/**
* Assert
Expand All @@ -74,7 +81,12 @@ describe('ConcurrencyControlService', () => {
/**
* Act
*/
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);

/**
* Assert
Expand All @@ -92,7 +104,12 @@ describe('ConcurrencyControlService', () => {
/**
* Act
*/
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);

/**
* Act
Expand All @@ -111,7 +128,12 @@ describe('ConcurrencyControlService', () => {
/**
* Act
*/
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);

/**
* Assert
Expand All @@ -135,7 +157,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 1);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue');

/**
Expand All @@ -156,7 +183,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 1);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue');

/**
Expand All @@ -180,7 +212,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 1);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue');

/**
Expand All @@ -201,7 +238,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 1);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue');

/**
Expand All @@ -225,7 +267,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 1);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');

/**
Expand All @@ -248,7 +295,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 1);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');

/**
Expand All @@ -271,7 +323,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 2);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);

jest
.spyOn(ConcurrencyQueue.prototype, 'getAll')
Expand Down Expand Up @@ -310,7 +367,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', -1);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue');

/**
Expand All @@ -333,7 +395,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', -1);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue');

/**
Expand All @@ -355,7 +422,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', -1);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');

/**
Expand Down Expand Up @@ -385,7 +457,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT);
config.set('deployment.type', 'cloud');
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);

/**
* Act
Expand All @@ -410,7 +487,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT);
config.set('deployment.type', 'cloud');
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);

/**
* Act
Expand All @@ -437,7 +519,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT);
config.set('deployment.type', 'cloud');
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);

/**
* Act
Expand Down
9 changes: 8 additions & 1 deletion packages/cli/src/concurrency/concurrency-control.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito
import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow';
import type { IExecutingWorkflowData } from '@/Interfaces';
import { Telemetry } from '@/telemetry';
import { EventRelay } from '@/eventbus/event-relay.service';

export const CLOUD_TEMP_PRODUCTION_LIMIT = 999;
export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200];

@Service()
export class ConcurrencyControlService {
private readonly isEnabled: boolean;
private isEnabled: boolean;

private readonly productionLimit: number;

Expand All @@ -28,6 +29,7 @@ export class ConcurrencyControlService {
private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository,
private readonly telemetry: Telemetry,
private readonly eventRelay: EventRelay,
) {
this.productionLimit = config.getEnv('executions.concurrency.productionLimit');

Expand Down Expand Up @@ -61,6 +63,7 @@ export class ConcurrencyControlService {

this.productionQueue.on('execution-throttled', ({ executionId }: { executionId: string }) => {
this.log('Execution throttled', { executionId });
this.eventRelay.emit('execution-throttled', { executionId });
});

this.productionQueue.on('execution-released', async (executionId: string) => {
Expand Down Expand Up @@ -130,6 +133,10 @@ export class ConcurrencyControlService {
this.logger.info('Canceled enqueued executions with response promises', { executionIds });
}

disable() {
this.isEnabled = false;
}

// ----------------------------------
// private
// ----------------------------------
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/concurrency/concurrency-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export class ConcurrencyQueue extends EventEmitter {
}

getAll() {
return new Set(...this.queue.map((item) => item.executionId));
return new Set(this.queue.map((item) => item.executionId));
}

private resolveNext() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { AbstractEventMessage, isEventMessageOptionsWithType } from './AbstractEventMessage';
import type { JsonObject } from 'n8n-workflow';
import { EventMessageTypeNames } from 'n8n-workflow';
import type { AbstractEventMessageOptions } from './AbstractEventMessageOptions';
import type { AbstractEventPayload } from './AbstractEventPayload';
import type { EventNamesExecutionType } from '.';

export interface EventPayloadExecution extends AbstractEventPayload {
executionId: string;
}

export interface EventMessageExecutionOptions extends AbstractEventMessageOptions {
eventName: EventNamesExecutionType;

payload?: EventPayloadExecution;
}

export class EventMessageExecution extends AbstractEventMessage {
readonly __type = EventMessageTypeNames.execution;

eventName: EventNamesExecutionType;

payload: EventPayloadExecution;

constructor(options: EventMessageExecutionOptions) {
super(options);
if (options.payload) this.setPayload(options.payload);
if (options.anonymize) {
this.anonymize();
}
}

setPayload(payload: EventPayloadExecution): this {
this.payload = payload;
return this;
}

deserialize(data: JsonObject): this {
if (isEventMessageOptionsWithType(data, this.__type)) {
this.setOptionsOrDefault(data);
if (data.payload) this.setPayload(data.payload as EventPayloadExecution);
}
return this;
}
}
Loading

0 comments on commit 7044d1c

Please sign in to comment.