Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Filter out certain executions from crash recovery #9904

Merged
merged 4 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/cli/src/ActiveExecutions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ export class ActiveExecutions {
async shutdown(cancelAll = false) {
let executionIds = Object.keys(this.activeExecutions);

if (config.getEnv('executions.mode') === 'regular') {
// removal of active executions will no longer release capacity back,
// so that throttled executions cannot resume during shutdown
this.concurrencyControl.disable();
}
Comment on lines +207 to +211
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏽


if (cancelAll) {
if (config.getEnv('executions.mode') === 'regular') {
await this.concurrencyControl.removeAll(this.activeExecutions);
Expand Down
5 changes: 5 additions & 0 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { ExecutionService } from '@/executions/execution.service';
import { OwnershipService } from '@/services/ownership.service';
import { WorkflowRunner } from '@/WorkflowRunner';
import { ExecutionRecoveryService } from '@/executions/execution-recovery.service';
import { EventRelay } from '@/eventbus/event-relay.service';

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open');
Expand Down Expand Up @@ -375,6 +376,10 @@ export class Start extends BaseCommand {
projectId: project.id,
};

Container.get(EventRelay).emit('execution-started-during-bootup', {
executionId: execution.id,
});

// do not block - each execution either runs concurrently or is queued
void workflowRunner.run(data, undefined, false, execution.id);
}
Expand Down
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only change in this file is adding a dependency.

Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutingWorkflowData } from '@/Interfaces';
import type { Telemetry } from '@/telemetry';
import type { EventRelay } from '@/eventbus/event-relay.service';

describe('ConcurrencyControlService', () => {
const logger = mock<Logger>();
const executionRepository = mock<ExecutionRepository>();
const telemetry = mock<Telemetry>();
const eventRelay = mock<EventRelay>();

afterEach(() => {
config.set('executions.concurrency.productionLimit', -1);
Expand All @@ -35,7 +37,12 @@ describe('ConcurrencyControlService', () => {
/**
* Act
*/
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);

/**
* Assert
Expand All @@ -56,7 +63,7 @@ describe('ConcurrencyControlService', () => {
/**
* Act
*/
new ConcurrencyControlService(logger, executionRepository, telemetry);
new ConcurrencyControlService(logger, executionRepository, telemetry, eventRelay);
} catch (error) {
/**
* Assert
Expand All @@ -74,7 +81,12 @@ describe('ConcurrencyControlService', () => {
/**
* Act
*/
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);

/**
* Assert
Expand All @@ -92,7 +104,12 @@ describe('ConcurrencyControlService', () => {
/**
* Act
*/
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);

/**
* Act
Expand All @@ -111,7 +128,12 @@ describe('ConcurrencyControlService', () => {
/**
* Act
*/
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);

/**
* Assert
Expand All @@ -135,7 +157,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 1);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue');

/**
Expand All @@ -156,7 +183,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 1);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue');

/**
Expand All @@ -180,7 +212,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 1);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue');

/**
Expand All @@ -201,7 +238,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 1);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue');

/**
Expand All @@ -225,7 +267,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 1);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');

/**
Expand All @@ -248,7 +295,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 1);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');

/**
Expand All @@ -271,7 +323,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 2);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);

jest
.spyOn(ConcurrencyQueue.prototype, 'getAll')
Expand Down Expand Up @@ -310,7 +367,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', -1);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue');

/**
Expand All @@ -333,7 +395,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', -1);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue');

/**
Expand All @@ -355,7 +422,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', -1);

const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');

/**
Expand Down Expand Up @@ -385,7 +457,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT);
config.set('deployment.type', 'cloud');
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);

/**
* Act
Expand All @@ -410,7 +487,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT);
config.set('deployment.type', 'cloud');
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);

/**
* Act
Expand All @@ -437,7 +519,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT);
config.set('deployment.type', 'cloud');
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);

/**
* Act
Expand Down
9 changes: 8 additions & 1 deletion packages/cli/src/concurrency/concurrency-control.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito
import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow';
import type { IExecutingWorkflowData } from '@/Interfaces';
import { Telemetry } from '@/telemetry';
import { EventRelay } from '@/eventbus/event-relay.service';

export const CLOUD_TEMP_PRODUCTION_LIMIT = 999;
export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200];

@Service()
export class ConcurrencyControlService {
private readonly isEnabled: boolean;
private isEnabled: boolean;

private readonly productionLimit: number;

Expand All @@ -28,6 +29,7 @@ export class ConcurrencyControlService {
private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository,
private readonly telemetry: Telemetry,
private readonly eventRelay: EventRelay,
) {
this.productionLimit = config.getEnv('executions.concurrency.productionLimit');

Expand Down Expand Up @@ -61,6 +63,7 @@ export class ConcurrencyControlService {

this.productionQueue.on('execution-throttled', ({ executionId }: { executionId: string }) => {
this.log('Execution throttled', { executionId });
this.eventRelay.emit('execution-throttled', { executionId });
});

this.productionQueue.on('execution-released', async (executionId: string) => {
Expand Down Expand Up @@ -130,6 +133,10 @@ export class ConcurrencyControlService {
this.logger.info('Canceled enqueued executions with response promises', { executionIds });
}

disable() {
this.isEnabled = false;
}

// ----------------------------------
// private
// ----------------------------------
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/concurrency/concurrency-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export class ConcurrencyQueue extends EventEmitter {
}

getAll() {
return new Set(...this.queue.map((item) => item.executionId));
return new Set(this.queue.map((item) => item.executionId));
}

private resolveNext() {
Expand Down
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied from existing ones. At some point we need to simplify all this.

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { AbstractEventMessage, isEventMessageOptionsWithType } from './AbstractEventMessage';
import type { JsonObject } from 'n8n-workflow';
import { EventMessageTypeNames } from 'n8n-workflow';
import type { AbstractEventMessageOptions } from './AbstractEventMessageOptions';
import type { AbstractEventPayload } from './AbstractEventPayload';
import type { EventNamesExecutionType } from '.';

export interface EventPayloadExecution extends AbstractEventPayload {
executionId: string;
}

export interface EventMessageExecutionOptions extends AbstractEventMessageOptions {
eventName: EventNamesExecutionType;

payload?: EventPayloadExecution;
}

export class EventMessageExecution extends AbstractEventMessage {
readonly __type = EventMessageTypeNames.execution;

eventName: EventNamesExecutionType;

payload: EventPayloadExecution;

constructor(options: EventMessageExecutionOptions) {
super(options);
if (options.payload) this.setPayload(options.payload);
if (options.anonymize) {
this.anonymize();
}
}

setPayload(payload: EventPayloadExecution): this {
this.payload = payload;
return this;
}

deserialize(data: JsonObject): this {
if (isEventMessageOptionsWithType(data, this.__type)) {
this.setOptionsOrDefault(data);
if (data.payload) this.setPayload(data.payload as EventPayloadExecution);
}
return this;
}
}
Loading
Loading