Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Centralize scaling mode (no-changelog) #9835

Merged
merged 34 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8c4ba29
Initial setup
ivov Jun 21, 2024
2af8654
Remove outdated `job.id` being `undefined` comments
ivov Jun 21, 2024
c62dd67
Update comment
ivov Jun 21, 2024
e7226d9
Rename arg for accuracy
ivov Jun 21, 2024
5d31ea7
Fix reference
ivov Jun 21, 2024
5874d16
Refactor to scaling mode
ivov Jun 24, 2024
0178ac1
Update breaking changes doc
ivov Jun 24, 2024
f705749
Add `UnsupportedRedisVersionError`
ivov Jun 27, 2024
4a93183
Back to `bull`
ivov Jul 4, 2024
8fac1f9
Merge master, fix conflicts
ivov Jul 4, 2024
40ad161
Fix misresolved conflicts
ivov Jul 4, 2024
8243c3a
Update shutdown
ivov Jul 4, 2024
2e943c4
Update lockfile
ivov Jul 4, 2024
618476b
Extract `JobProcessor`
ivov Jul 5, 2024
cdb9f63
Remove lockfile from diff
ivov Jul 5, 2024
6962cb8
Merge branch 'master' into pay-1658-upgrade-to-bullmq
ivov Jul 9, 2024
5a89544
Move running jobs to `JobProcessor`
ivov Jul 9, 2024
c94b656
Separate listeners
ivov Jul 9, 2024
7060cd4
Use job processor directly in worker
ivov Jul 9, 2024
28ae137
Merge branch 'master' into pay-1658-upgrade-to-bullmq
ivov Jul 9, 2024
57cd9f7
Refactor to `ScalingService`
ivov Jul 10, 2024
a3c7620
Merge branch 'master' into pay-1658-upgrade-to-bullmq
ivov Jul 15, 2024
77c32e4
Rename `Consumer` to `Processor`
ivov Jul 15, 2024
cc18f5c
Fix import
ivov Jul 15, 2024
5c2d77e
Add `executionId` to `Job finished running` log
ivov Jul 15, 2024
362b3dd
Rename `RunningJobProps` to `RunningJob`
ivov Jul 15, 2024
211ab03
Fix stopping tests
ivov Jul 15, 2024
294efb4
Merge master, fix conflicts
ivov Jul 31, 2024
a0be4b6
Merge master, fix conflicts
ivov Aug 5, 2024
7e76a27
Apply feedback
ivov Aug 6, 2024
1225b9c
Better typing
ivov Aug 6, 2024
1792e17
Rename to `JobProcessor`
ivov Aug 6, 2024
83b14e2
Rename labels as well
ivov Aug 6, 2024
30017e6
Merge master, fix conflicts
ivov Aug 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions packages/cli/BREAKING-CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

This list shows all the versions which include breaking changes and how to upgrade.

## 1.??.? @TODO: Update version

### What changed?

n8n upgraded its scaling mode from `bull@4.12.1` to `bullmq@5.8.2`.

### When is action necessary?

If you are using queue mode, before you upgrade, stop all running workflows, wait until all jobs in the queue have completed, stop all main instances, and stop all workers. Then upgrade the main instance and all workers to the new n8n version and restart them. Additionally, ensure you are running Redis version 5 or above, which supports Redis Streams.

## 1.47.0

### What changed?
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
"axios": "1.6.7",
"basic-auth": "2.0.1",
"bcryptjs": "2.4.3",
"bull": "4.12.1",
"bullmq": "5.8.2",
"cache-manager": "5.2.3",
"callsites": "3.1.0",
"change-case": "4.1.2",
Expand Down
141 changes: 0 additions & 141 deletions packages/cli/src/Queue.ts

This file was deleted.

4 changes: 2 additions & 2 deletions packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ export class Server extends AbstractServer {
setupPushHandler(restEndpoint, app);

if (config.getEnv('executions.mode') === 'queue') {
const { Queue } = await import('@/Queue');
await Container.get(Queue).init();
const { ScalingMode } = await import('@/scaling-mode/scaling-mode');
await Container.get(ScalingMode).setupQueue();
}

await handleMfaDisable();
Expand Down
14 changes: 0 additions & 14 deletions packages/cli/src/WebhookHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import type {
IDataObject,
IDeferredPromise,
IExecuteData,
IExecuteResponsePromiseData,
IHttpRequestMethods,
IN8nHttpFullResponse,
INode,
Expand Down Expand Up @@ -192,19 +191,6 @@ export function getWorkflowWebhooks(
return returnData;
}

export function encodeWebhookResponse(
response: IExecuteResponsePromiseData,
): IExecuteResponsePromiseData {
if (typeof response === 'object' && Buffer.isBuffer(response.body)) {
response.body = {
// eslint-disable-next-line @typescript-eslint/naming-convention
'__@N8nEncodedBuffer@__': response.body.toString(BINARY_ENCODING),
};
}

return response;
}

const normalizeFormData = <T>(values: Record<string, T | T[]>) => {
for (const key in values) {
const value = values[key];
Expand Down
31 changes: 17 additions & 14 deletions packages/cli/src/WorkflowRunner.ts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully we can get rid of all the watchdog functionality

Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import { ExecutionRepository } from '@db/repositories/execution.repository';
import { ExternalHooks } from '@/ExternalHooks';
import type { IExecutionResponse, IWorkflowExecutionDataProcess } from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import type { Job, JobData, JobResponse } from '@/Queue';
import { Queue } from '@/Queue';
import type { Job, JobData, JobResult } from './scaling-mode/types';
import { ScalingMode } from '@/scaling-mode/scaling-mode';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
Expand All @@ -41,7 +41,7 @@ import { EventRelay } from './eventbus/event-relay.service';

@Service()
export class WorkflowRunner {
private jobQueue: Queue;
private scalingMode: ScalingMode;

private executionsMode = config.getEnv('executions.mode');

Expand All @@ -56,7 +56,7 @@ export class WorkflowRunner {
private readonly eventRelay: EventRelay,
) {
if (this.executionsMode === 'queue') {
this.jobQueue = Container.get(Queue);
this.scalingMode = Container.get(ScalingMode);
}
}

Expand Down Expand Up @@ -384,9 +384,7 @@ export class WorkflowRunner {
let job: Job;
let hooks: WorkflowHooks;
try {
job = await this.jobQueue.add(jobData, jobOptions);

this.logger.info(`Started with job ID: ${job.id.toString()} (Execution ID: ${executionId})`);
job = await this.scalingMode.addJob(jobData, jobOptions);

hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerMain(
data.executionMode,
Expand Down Expand Up @@ -415,8 +413,7 @@ export class WorkflowRunner {
async (resolve, reject, onCancel) => {
onCancel.shouldReject = false;
onCancel(async () => {
const queue = Container.get(Queue);
await queue.stopJob(job);
await Container.get(ScalingMode).stopJob(job);

// We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the
// "workflowExecuteAfter" which we require.
Expand All @@ -433,11 +430,17 @@ export class WorkflowRunner {
reject(error);
});

const jobData: Promise<JobResponse> = job.finished();
/**
* @TODO: Replacing `job.finished()` with `job.waitUntilFinished()` will require `QueueEvents`,
* which might be avoidable complexity. Do we need to preserve `watchDogInterval` in `bullmq`?
*/
// const jobData: Promise<JobResult> = job.finished();
// const jobData = job.waitUntilFinished(this.scalingMode.queueEvents);

const queueRecoveryInterval = config.getEnv('queue.bull.queueRecoveryInterval');

const racingPromises: Array<Promise<JobResponse>> = [jobData];
// const racingPromises: Array<Promise<JobResult>> = [jobData];
const racingPromises: Array<Promise<JobResult>> = [];

let clearWatchdogInterval;
if (queueRecoveryInterval > 0) {
Expand All @@ -455,9 +458,9 @@ export class WorkflowRunner {
************************************************ */
let watchDogInterval: NodeJS.Timeout | undefined;

const watchDog: Promise<JobResponse> = new Promise((res) => {
const watchDog: Promise<JobResult> = new Promise((res) => {
watchDogInterval = setInterval(async () => {
const currentJob = await this.jobQueue.getJob(job.id);
const currentJob = await this.scalingMode.getJob(job.id);
// When null means job is finished (not found in queue)
if (currentJob === null) {
// Mimic worker's success message
Expand All @@ -477,7 +480,7 @@ export class WorkflowRunner {
}

try {
await Promise.race(racingPromises);
// await Promise.race(racingPromises);
if (clearWatchdogInterval !== undefined) {
clearWatchdogInterval();
}
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/commands/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { ApplicationError } from 'n8n-workflow';
import config from '@/config';
import { ActiveExecutions } from '@/ActiveExecutions';
import { WebhookServer } from '@/WebhookServer';
import { Queue } from '@/Queue';
import { ScalingMode } from '@/scaling-mode/scaling-mode';
import { BaseCommand } from './BaseCommand';

import { OrchestrationWebhookService } from '@/services/orchestration/webhook/orchestration.webhook.service';
Expand Down Expand Up @@ -94,7 +94,7 @@ export class Webhook extends BaseCommand {
);
}

await Container.get(Queue).init();
await Container.get(ScalingMode).setupQueue();
await this.server.start();
this.logger.debug(`Webhook listener ID: ${this.server.uniqueInstanceId}`);
this.logger.info('Webhook listener waiting for requests.');
Expand Down
Loading