From b6bf6d0285c0410efadee30d5104a600606aa1ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 18 Oct 2024 13:34:54 +0200 Subject: [PATCH] fix(core): Prevent workers from recovering executions on startup Context: https://n8nio.slack.com/archives/C069HS026UF/p1729251084171939 --- .../message-event-bus/message-event-bus.ts | 2 +- .../execution-recovery.service.test.ts | 25 +++++++++++++++++++ .../executions/execution-recovery.service.ts | 4 +-- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts b/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts index 3cf5a5a5d0572..a7deaa2286d1e 100644 --- a/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts +++ b/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts @@ -144,7 +144,7 @@ export class MessageEventBus extends EventEmitter { // if we are in queue mode, running jobs may still be running on a worker despite the main process // crashing, so we can't just mark them as crashed - if (config.get('executions.mode') !== 'queue') { + if (config.get('executions.mode') === 'regular') { const dbUnfinishedExecutionIds = ( await this.executionRepository.find({ where: { diff --git a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts index de7c27a8e8341..10d4bcf328864 100644 --- a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts +++ b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts @@ -54,6 +54,31 @@ describe('ExecutionRecoveryService', () => { }); describe('recoverFromLogs', () => { + describe('if not main', () => { + test('should do nothing', async () => { + const executionRecoveryService = new ExecutionRecoveryService( + mock(), + mock({ instanceType: 'worker' }), + push, + executionRepository, + mock(), + ); + // @ts-expect-error Private method + const amendSpy = jest.spyOn(executionRecoveryService, 'amend'); + const messages = setupMessages('123', 'Some workflow'); + + /** + * Act + */ + await executionRecoveryService.recoverFromLogs('123', messages); + + /** + * Assert + */ + expect(amendSpy).not.toHaveBeenCalled(); + }); + }); + describe('if follower', () => { test('should do nothing', async () => { /** diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts index 33576d1368d0a..403f32a78cee2 100644 --- a/packages/cli/src/executions/execution-recovery.service.ts +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -12,7 +12,7 @@ import { EventService } from '@/events/event.service'; import type { IExecutionResponse } from '@/interfaces'; import { Logger } from '@/logging/logger.service'; import { Push } from '@/push'; -import { getWorkflowHooksMain } from '@/workflow-execute-additional-data'; // @TODO: Dependency cycle +import { getWorkflowHooksMain } from '@/workflow-execute-additional-data'; import type { EventMessageTypes } from '../eventbus/event-message-classes'; @@ -33,7 +33,7 @@ export class ExecutionRecoveryService { * Recover key properties of a truncated execution using event logs. */ async recoverFromLogs(executionId: string, messages: EventMessageTypes[]) { - if (this.instanceSettings.isFollower) return; + if (this.instanceSettings.instanceType !== 'main' || this.instanceSettings.isFollower) return; const amendedExecution = await this.amend(executionId, messages);