Skip to content

Commit

Permalink
feat(core): Unify application components shutdown (#8097)
Browse files Browse the repository at this point in the history
## Summary

Add `ShutdownService` and `OnShutdown` decorator for more unified way to
shutdown different components. Use this new way in the following
components:

- HTTP(S) server
- Pruning service
- Push connection
- License

---------

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
  • Loading branch information
tomi and netroy authored Dec 22, 2023
1 parent c158ca2 commit 3a881be
Show file tree
Hide file tree
Showing 15 changed files with 412 additions and 17 deletions.
29 changes: 27 additions & 2 deletions packages/cli/src/AbstractServer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Container } from 'typedi';
import { Container, Service } from 'typedi';
import { readFile } from 'fs/promises';
import type { Server } from 'http';
import express from 'express';
Expand All @@ -9,7 +9,8 @@ import config from '@/config';
import { N8N_VERSION, inDevelopment, inTest } from '@/constants';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import * as Db from '@/Db';
import type { N8nInstanceType, IExternalHooksClass } from '@/Interfaces';
import { N8nInstanceType } from '@/Interfaces';
import type { IExternalHooksClass } from '@/Interfaces';
import { ExternalHooks } from '@/ExternalHooks';
import { send, sendErrorResponse } from '@/ResponseHelper';
import { rawBodyReader, bodyParser, corsMiddleware } from '@/middlewares';
Expand All @@ -20,7 +21,9 @@ import { webhookRequestHandler } from '@/WebhookHelpers';
import { generateHostInstanceId } from './databases/utils/generators';
import { Logger } from '@/Logger';
import { ServiceUnavailableError } from './errors/response-errors/service-unavailable.error';
import { OnShutdown } from '@/decorators/OnShutdown';

@Service()
export abstract class AbstractServer {
protected logger: Logger;

Expand Down Expand Up @@ -246,4 +249,26 @@ export abstract class AbstractServer {
await this.externalHooks.run('n8n.ready', [this, config]);
}
}

/**
* Stops the HTTP(S) server from accepting new connections. Gives all
* connections configured amount of time to finish their work and
* then closes them forcefully.
*/
@OnShutdown()
async onShutdown(): Promise<void> {
if (!this.server) {
return;
}

this.logger.debug(`Shutting down ${this.protocol} server`);

this.server.close((error) => {
if (error) {
this.logger.error(`Error while shutting down ${this.protocol} server`, { error });
}

this.logger.debug(`${this.protocol} server shut down`);
});
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/ActiveWorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import { ActivationErrorsService } from '@/ActivationErrors.service';
import { NotFoundError } from './errors/response-errors/not-found.error';
import { ActiveWorkflowsService } from '@/services/activeWorkflows.service';
import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service';
import { OnShutdown } from '@/decorators/OnShutdown';

interface QueuedActivation {
activationMode: WorkflowActivateMode;
Expand Down Expand Up @@ -664,6 +665,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
await this.addActiveWorkflows('leadershipChange');
}

@OnShutdown()
async removeAllTriggerAndPollerBasedWorkflows() {
await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows();
}
Expand Down
13 changes: 13 additions & 0 deletions packages/cli/src/License.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } fr
import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher';
import { RedisService } from './services/redis.service';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { OnShutdown } from '@/decorators/OnShutdown';

type FeatureReturnType = Partial<
{
Expand All @@ -30,6 +31,8 @@ export class License {

private redisPublisher: RedisServicePubSubPublisher;

private isShuttingDown = false;

constructor(
private readonly logger: Logger,
private readonly instanceSettings: InstanceSettings,
Expand All @@ -40,6 +43,11 @@ export class License {

async init(instanceType: N8nInstanceType = 'main') {
if (this.manager) {
this.logger.warn('License manager already initialized or shutting down');
return;
}
if (this.isShuttingDown) {
this.logger.warn('License manager already shutting down');
return;
}

Expand Down Expand Up @@ -191,7 +199,12 @@ export class License {
await this.manager.renew();
}

@OnShutdown()
async shutdown() {
// Shut down License manager to unclaim any floating entitlements
// Note: While this saves a new license cert to DB, the previous entitlements are still kept in memory so that the shutdown process can complete
this.isShuttingDown = true;

if (!this.manager) {
return;
}
Expand Down
3 changes: 2 additions & 1 deletion packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { Container, Service } from 'typedi';
import assert from 'assert';
import { exec as callbackExec } from 'child_process';
import { access as fsAccess } from 'fs/promises';
Expand Down Expand Up @@ -84,7 +85,6 @@ import { handleLdapInit, isLdapEnabled } from './Ldap/helpers';
import { AbstractServer } from './AbstractServer';
import { PostHogClient } from './posthog';
import { eventBus } from './eventbus';
import { Container } from 'typedi';
import { InternalHooks } from './InternalHooks';
import { License } from './License';
import { getStatusUsingPreviousExecutionStatusMethod } from './executions/executionHelpers';
Expand Down Expand Up @@ -124,6 +124,7 @@ import { PasswordUtility } from './services/password.utility';

const exec = promisify(callbackExec);

@Service()
export class Server extends AbstractServer {
private endpointPresetCredentials: string;

Expand Down
2 changes: 2 additions & 0 deletions packages/cli/src/WebhookServer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Service } from 'typedi';
import { AbstractServer } from '@/AbstractServer';

@Service()
export class WebhookServer extends AbstractServer {
constructor() {
super('webhook');
Expand Down
9 changes: 5 additions & 4 deletions packages/cli/src/commands/BaseCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager
import { initExpressionEvaluator } from '@/ExpressionEvaluator';
import { generateHostInstanceId } from '@db/utils/generators';
import { WorkflowHistoryManager } from '@/workflows/workflowHistory/workflowHistoryManager.ee';
import { ShutdownService } from '@/shutdown/Shutdown.service';

export abstract class BaseCommand extends Command {
protected logger = Container.get(Logger);
Expand All @@ -38,7 +39,7 @@ export abstract class BaseCommand extends Command {

protected server?: AbstractServer;

protected isShuttingDown = false;
protected shutdownService: ShutdownService = Container.get(ShutdownService);

/**
* How long to wait for graceful shutdown before force killing the process.
Expand Down Expand Up @@ -309,7 +310,7 @@ export abstract class BaseCommand extends Command {

private onTerminationSignal(signal: string) {
return async () => {
if (this.isShuttingDown) {
if (this.shutdownService.isShuttingDown()) {
this.logger.info(`Received ${signal}. Already shutting down...`);
return;
}
Expand All @@ -323,9 +324,9 @@ export abstract class BaseCommand extends Command {
}, this.gracefulShutdownTimeoutInS * 1000);

this.logger.info(`Received ${signal}. Shutting down...`);
this.isShuttingDown = true;
this.shutdownService.shutdown();

await this.stopProcess();
await Promise.all([this.stopProcess(), this.shutdownService.waitForShutdown()]);

clearTimeout(forceShutdownTimer);
};
Expand Down
10 changes: 1 addition & 9 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export class Start extends BaseCommand {

protected activeWorkflowRunner: ActiveWorkflowRunner;

protected server = new Server();
protected server = Container.get(Server);

private pruningService: PruningService;

Expand Down Expand Up @@ -101,14 +101,6 @@ export class Start extends BaseCommand {

await this.externalHooks?.run('n8n.stop', []);

// Shut down License manager to unclaim any floating entitlements
// Note: While this saves a new license cert to DB, the previous entitlements are still kept in memory so that the shutdown process can complete
await Container.get(License).shutdown();

if (this.pruningService.isPruningEnabled()) {
this.pruningService.stopPruning();
}

if (Container.get(MultiMainSetup).isEnabled) {
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();

Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export class Webhook extends BaseCommand {
help: flags.help({ char: 'h' }),
};

protected server = new WebhookServer();
protected server = Container.get(WebhookServer);

constructor(argv: string[], cmdConfig: IConfig) {
super(argv, cmdConfig);
Expand Down
38 changes: 38 additions & 0 deletions packages/cli/src/decorators/OnShutdown.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { Container } from 'typedi';
import { ApplicationError } from 'n8n-workflow';
import { type ServiceClass, ShutdownService } from '@/shutdown/Shutdown.service';

/**
* Decorator that registers a method as a shutdown hook. The method will
* be called when the application is shutting down.
*
* Priority is used to determine the order in which the hooks are called.
*
* NOTE: Requires also @Service() decorator to be used on the class.
*
* @example
* ```ts
* @Service()
* class MyClass {
* @OnShutdown()
* async shutdown() {
* // Will be called when the app is shutting down
* }
* }
* ```
*/
export const OnShutdown =
(priority = 100): MethodDecorator =>
(prototype, propertyKey, descriptor) => {
const serviceClass = prototype.constructor as ServiceClass;
const methodName = String(propertyKey);
// TODO: assert that serviceClass is decorated with @Service
if (typeof descriptor?.value === 'function') {
Container.get(ShutdownService).register(priority, { serviceClass, methodName });
} else {
const name = `${serviceClass.name}.${methodName}()`;
throw new ApplicationError(
`${name} must be a method on ${serviceClass.name} to use "OnShutdown"`,
);
}
};
13 changes: 13 additions & 0 deletions packages/cli/src/push/abstract.push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,17 @@ export abstract class AbstractPush<T> extends EventEmitter {

this.sendToSessions(type, data, userSessionIds);
}

/**
* Closes all push existing connections
*/
closeAllConnections() {
for (const sessionId in this.connections) {
// Signal the connection that we want to close it.
// We are not removing the sessions here because it should be
// the implementation's responsibility to do so once the connection
// has actually closed.
this.close(this.connections[sessionId]);
}
}
}
6 changes: 6 additions & 0 deletions packages/cli/src/push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { WebSocketPush } from './websocket.push';
import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types';
import type { IPushDataType } from '@/Interfaces';
import type { User } from '@db/entities/User';
import { OnShutdown } from '@/decorators/OnShutdown';

const useWebSockets = config.getEnv('push.backend') === 'websocket';

Expand Down Expand Up @@ -70,6 +71,11 @@ export class Push extends EventEmitter {
sendToUsers<D>(type: IPushDataType, data: D, userIds: Array<User['id']>) {
this.backend.sendToUsers(type, data, userIds);
}

@OnShutdown()
onShutdown(): void {
this.backend.closeAllConnections();
}
}

export const setupPushServer = (restEndpoint: string, server: Server, app: Application) => {
Expand Down
14 changes: 14 additions & 0 deletions packages/cli/src/services/pruning.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { ExecutionRepository } from '@db/repositories/execution.repository';
import { Logger } from '@/Logger';
import { ExecutionEntity } from '@db/entities/ExecutionEntity';
import { jsonStringify } from 'n8n-workflow';
import { OnShutdown } from '@/decorators/OnShutdown';

@Service()
export class PruningService {
Expand All @@ -24,6 +25,8 @@ export class PruningService {

public hardDeletionTimeout: NodeJS.Timeout | undefined;

private isShuttingDown = false;

constructor(
private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository,
Expand Down Expand Up @@ -54,6 +57,11 @@ export class PruningService {
* @important Call this method only after DB migrations have completed.
*/
startPruning() {
if (this.isShuttingDown) {
this.logger.warn('[Pruning] Cannot start pruning while shutting down');
return;
}

this.logger.debug('[Pruning] Starting soft-deletion and hard-deletion timers');

this.setSoftDeletionInterval();
Expand Down Expand Up @@ -158,6 +166,12 @@ export class PruningService {
this.logger.debug('[Pruning] Soft-deleted executions', { count: result.affected });
}

@OnShutdown()
shutdown(): void {
this.isShuttingDown = true;
this.stopPruning();
}

/**
* Permanently remove all soft-deleted executions and their binary data, in a pruning cycle.
* @return Delay in ms after which the next cycle should be started
Expand Down
Loading

0 comments on commit 3a881be

Please sign in to comment.