Skip to content

Commit

Permalink
fix(core): Start WaitTracker only in the main container (n8n-io#9600)
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy authored Jun 3, 2024
1 parent d6db8cb commit 08d9c9a
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 45 deletions.
10 changes: 6 additions & 4 deletions packages/cli/src/WaitTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ export class WaitTracker {
private readonly executionRepository: ExecutionRepository,
private readonly ownershipService: OwnershipService,
private readonly workflowRunner: WorkflowRunner,
readonly orchestrationService: OrchestrationService,
) {
const { isSingleMainSetup, isLeader, multiMainSetup } = orchestrationService;
private readonly orchestrationService: OrchestrationService,
) {}

init() {
const { isSingleMainSetup, isLeader, multiMainSetup } = this.orchestrationService;

if (isSingleMainSetup) {
this.startTracking();
Expand All @@ -43,7 +45,7 @@ export class WaitTracker {
.on('leader-stepdown', () => this.stopTracking());
}

startTracking() {
private startTracking() {
this.logger.debug('Wait tracker started tracking waiting executions');

// Poll every 60 seconds a list of upcoming executions
Expand Down
2 changes: 2 additions & 0 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ export class Start extends BaseCommand {

await this.initOrchestration();
this.logger.debug('Orchestration init complete');
Container.get(WaitTracker).init();
this.logger.debug('Wait tracker init complete');
await this.initBinaryDataService();
this.logger.debug('Binary data service init complete');
await this.initExternalHooks();
Expand Down
69 changes: 28 additions & 41 deletions packages/cli/test/unit/WaitTracker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,58 @@ import { WaitTracker } from '@/WaitTracker';
import { mock } from 'jest-mock-extended';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutionResponse } from '@/Interfaces';
import type { OrchestrationService } from '@/services/orchestration.service';
import { OrchestrationService } from '@/services/orchestration.service';
import type { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';

jest.useFakeTimers();

describe('WaitTracker', () => {
const executionRepository = mock<ExecutionRepository>();
const orchestrationService = mock<OrchestrationService>({
isSingleMainSetup: true,
});
const multiMainSetup = mock<MultiMainSetup>();
const orchestrationService = new OrchestrationService(mock(), mock(), multiMainSetup);

const execution = mock<IExecutionResponse>({
id: '123',
waitTill: new Date(Date.now() + 1000),
});

let waitTracker: WaitTracker;
beforeEach(() => {
waitTracker = new WaitTracker(
mock(),
executionRepository,
mock(),
mock(),
orchestrationService,
);
multiMainSetup.on.mockReturnThis();
});

afterEach(() => {
jest.clearAllMocks();
});

describe('constructor()', () => {
describe('init()', () => {
it('should query DB for waiting executions', async () => {
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);

new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService);
waitTracker.init();

expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1);
});

it('if no executions to start, should do nothing', () => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);

new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService);
waitTracker.init();

expect(executionRepository.findSingleExecution).not.toHaveBeenCalled();
});

describe('if execution to start', () => {
it('if not enough time passed, should not start execution', async () => {
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
const waitTracker = new WaitTracker(
mock(),
executionRepository,
mock(),
mock(),
orchestrationService,
);
waitTracker.init();

executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
await waitTracker.getWaitingExecutions();
Expand All @@ -62,13 +67,7 @@ describe('WaitTracker', () => {

it('if enough time passed, should start execution', async () => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);
const waitTracker = new WaitTracker(
mock(),
executionRepository,
mock(),
mock(),
orchestrationService,
);
waitTracker.init();

executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
await waitTracker.getWaitingExecutions();
Expand All @@ -85,13 +84,7 @@ describe('WaitTracker', () => {
describe('startExecution()', () => {
it('should query for execution to start', async () => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);
const waitTracker = new WaitTracker(
mock(),
executionRepository,
mock(),
mock(),
orchestrationService,
);
waitTracker.init();

executionRepository.findSingleExecution.mockResolvedValue(execution);
waitTracker.startExecution(execution.id);
Expand All @@ -108,37 +101,31 @@ describe('WaitTracker', () => {
it('should start tracking', () => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);

new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService);
waitTracker.init();

expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1);
});
});

describe('multi-main setup', () => {
it('should start tracking if leader', () => {
const orchestrationService = mock<OrchestrationService>({
isLeader: true,
isSingleMainSetup: false,
multiMainSetup: mock<MultiMainSetup>({ on: jest.fn().mockReturnThis() }),
});
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(true);
jest.spyOn(orchestrationService, 'isSingleMainSetup', 'get').mockReturnValue(false);

executionRepository.getWaitingExecutions.mockResolvedValue([]);

new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService);
waitTracker.init();

expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1);
});

it('should not start tracking if follower', () => {
const orchestrationService = mock<OrchestrationService>({
isLeader: false,
isSingleMainSetup: false,
multiMainSetup: mock<MultiMainSetup>({ on: jest.fn().mockReturnThis() }),
});
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(false);
jest.spyOn(orchestrationService, 'isSingleMainSetup', 'get').mockReturnValue(false);

executionRepository.getWaitingExecutions.mockResolvedValue([]);

new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService);
waitTracker.init();

expect(executionRepository.getWaitingExecutions).not.toHaveBeenCalled();
});
Expand Down

0 comments on commit 08d9c9a

Please sign in to comment.