Skip to content

Commit

Permalink
fix(core): Disconnect Redis after pausing queue during worker shutdown (
Browse files Browse the repository at this point in the history
n8n-io#9928)

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
  • Loading branch information
ivov and netroy authored Jul 4, 2024
1 parent e5c3247 commit c82579b
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 15 deletions.
10 changes: 5 additions & 5 deletions packages/cli/src/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
} from 'n8n-workflow';
import { ActiveExecutions } from '@/ActiveExecutions';
import config from '@/config';
import { HIGHEST_PRIORITY, OnShutdown } from './decorators/OnShutdown';

export type JobId = Bull.JobId;
export type Job = Bull.Job<JobData>;
Expand Down Expand Up @@ -108,11 +109,10 @@ export class Queue {
return await this.jobQueue.client.ping();
}

async pause({
isLocal,
doNotWaitActive,
}: { isLocal?: boolean; doNotWaitActive?: boolean } = {}): Promise<void> {
return await this.jobQueue.pause(isLocal, doNotWaitActive);
@OnShutdown(HIGHEST_PRIORITY)
// Stop accepting new jobs, `doNotWaitActive` allows reporting progress
async pause(): Promise<void> {
return await this.jobQueue?.pause(true, true);
}

getBullObjectInstance(): JobQueue {
Expand Down
3 changes: 0 additions & 3 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ export class Worker extends BaseCommand {
async stopProcess() {
this.logger.info('Stopping n8n...');

// Stop accepting new jobs, `doNotWaitActive` allows reporting progress
await Worker.jobQueue.pause({ isLocal: true, doNotWaitActive: true });

try {
await this.externalHooks?.run('n8n.stop', []);

Expand Down
6 changes: 5 additions & 1 deletion packages/cli/src/decorators/OnShutdown.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import { Container } from 'typedi';
import { ApplicationError } from 'n8n-workflow';
import { type ServiceClass, ShutdownService } from '@/shutdown/Shutdown.service';

export const LOWEST_PRIORITY = 0;
export const DEFAULT_PRIORITY = 100;
export const HIGHEST_PRIORITY = 200;

/**
* Decorator that registers a method as a shutdown hook. The method will
* be called when the application is shutting down.
Expand All @@ -22,7 +26,7 @@ import { type ServiceClass, ShutdownService } from '@/shutdown/Shutdown.service'
* ```
*/
export const OnShutdown =
(priority = 100): MethodDecorator =>
(priority = DEFAULT_PRIORITY): MethodDecorator =>
(prototype, propertyKey, descriptor) => {
const serviceClass = prototype.constructor as ServiceClass;
const methodName = String(propertyKey);
Expand Down
4 changes: 0 additions & 4 deletions packages/cli/src/services/redis/RedisServiceBaseClasses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ class RedisServiceBase {
}
this.redisClient = this.redisClientService.createClient({ type });

this.redisClient.on('close', () => {
this.logger.warn('Redis unavailable - trying to reconnect...');
});

this.redisClient.on('error', (error) => {
if (!String(error).includes('ECONNREFUSED')) {
this.logger.warn('Error with Redis: ', error);
Expand Down
6 changes: 4 additions & 2 deletions packages/cli/src/services/redis/redis-client.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Logger } from '@/Logger';
import ioRedis from 'ioredis';
import type { Cluster, RedisOptions } from 'ioredis';
import type { RedisClientType } from './RedisServiceBaseClasses';
import { OnShutdown } from '@/decorators/OnShutdown';
import { LOWEST_PRIORITY, OnShutdown } from '@/decorators/OnShutdown';

@Service()
export class RedisClientService {
Expand All @@ -23,7 +23,7 @@ export class RedisClientService {
return client;
}

@OnShutdown()
@OnShutdown(LOWEST_PRIORITY)
disconnectClients() {
for (const client of this.clients) {
client.disconnect();
Expand Down Expand Up @@ -144,6 +144,8 @@ export class RedisClientService {
}
}

this.logger.warn('Redis unavailable - trying to reconnect...');

return RETRY_INTERVAL;
};
}
Expand Down
1 change: 1 addition & 0 deletions packages/cli/test/integration/shared/utils/testCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export const setupTestCommand = <T extends BaseCommand>(Command: Class<T>) => {

// mock SIGINT/SIGTERM registration
process.once = jest.fn();
process.exit = jest.fn() as never;

beforeAll(async () => {
await testDb.init();
Expand Down

0 comments on commit c82579b

Please sign in to comment.