From 7b26a7a6210114b6d37aba1b873f9a8bb0cec577 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 22 Dec 2023 17:49:15 +0100 Subject: [PATCH] refactor(core): Move `typeorm` operators from `PruningService` to `ExecutionRepository` (no-changelog) (#8145) Follow-up to https://github.com/n8n-io/n8n/pull/8143 --- .../repositories/execution.repository.ts | 84 +++++++++++++++++- packages/cli/src/services/pruning.service.ts | 86 +++---------------- 2 files changed, 93 insertions(+), 77 deletions(-) diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index ad9ac71e7244c..67b3d0de4bf34 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -1,5 +1,14 @@ import { Service } from 'typedi'; -import { DataSource, In, LessThanOrEqual, MoreThanOrEqual, Repository } from 'typeorm'; +import { + Brackets, + DataSource, + In, + IsNull, + LessThanOrEqual, + MoreThanOrEqual, + Not, + Repository, +} from 'typeorm'; import { DateUtils } from 'typeorm/util/DateUtils'; import type { FindManyOptions, @@ -434,4 +443,77 @@ export class ExecutionRepository extends Repository { }, }).then((executions) => executions.map(({ id }) => id)); } + + async softDeletePrunableExecutions() { + const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h + const maxCount = config.getEnv('executions.pruneDataMaxCount'); + + // Find ids of all executions that were stopped longer that pruneDataMaxAge ago + const date = new Date(); + date.setHours(date.getHours() - maxAge); + + const toPrune: Array> = [ + // date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286 + { stoppedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)) }, + ]; + + if (maxCount > 0) { + const executions = await this.find({ + select: ['id'], + skip: maxCount, + take: 1, + order: { id: 'DESC' }, + }); + + if (executions[0]) { + toPrune.push({ id: LessThanOrEqual(executions[0].id) }); + } + } + + const [timeBasedWhere, countBasedWhere] = toPrune; + + return this.createQueryBuilder() + .update(ExecutionEntity) + .set({ deletedAt: new Date() }) + .where({ + deletedAt: IsNull(), + // Only mark executions as deleted if they are in an end state + status: Not(In(['new', 'running', 'waiting'])), + }) + .andWhere( + new Brackets((qb) => + countBasedWhere + ? qb.where(timeBasedWhere).orWhere(countBasedWhere) + : qb.where(timeBasedWhere), + ), + ) + .execute(); + } + + async hardDeleteSoftDeletedExecutions() { + const date = new Date(); + date.setHours(date.getHours() - config.getEnv('executions.pruneDataHardDeleteBuffer')); + + const workflowIdsAndExecutionIds = ( + await this.find({ + select: ['workflowId', 'id'], + where: { + deletedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)), + }, + take: this.hardDeletionBatchSize, + + /** + * @important This ensures soft-deleted executions are included, + * else `@DeleteDateColumn()` at `deletedAt` will exclude them. + */ + withDeleted: true, + }) + ).map(({ id: executionId, workflowId }) => ({ workflowId, executionId })); + + return workflowIdsAndExecutionIds; + } + + async deleteByIds(executionIds: string[]) { + return this.delete({ id: In(executionIds) }); + } } diff --git a/packages/cli/src/services/pruning.service.ts b/packages/cli/src/services/pruning.service.ts index 10b7d589c5605..f2b142bf1f488 100644 --- a/packages/cli/src/services/pruning.service.ts +++ b/packages/cli/src/services/pruning.service.ts @@ -1,14 +1,9 @@ import { Service } from 'typedi'; import { BinaryDataService } from 'n8n-core'; -import type { FindOptionsWhere } from 'typeorm'; -import { Brackets, In, IsNull, LessThanOrEqual, Not } from 'typeorm'; -import { DateUtils } from 'typeorm/util/DateUtils'; - import { inTest, TIME } from '@/constants'; import config from '@/config'; import { ExecutionRepository } from '@db/repositories/execution.repository'; import { Logger } from '@/Logger'; -import { ExecutionEntity } from '@db/entities/ExecutionEntity'; import { jsonStringify } from 'n8n-workflow'; import { OnShutdown } from '@/decorators/OnShutdown'; @@ -113,50 +108,7 @@ export class PruningService { async softDeleteOnPruningCycle() { this.logger.debug('[Pruning] Starting soft-deletion of executions'); - const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h - const maxCount = config.getEnv('executions.pruneDataMaxCount'); - - // Find ids of all executions that were stopped longer that pruneDataMaxAge ago - const date = new Date(); - date.setHours(date.getHours() - maxAge); - - const toPrune: Array> = [ - // date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286 - { stoppedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)) }, - ]; - - if (maxCount > 0) { - const executions = await this.executionRepository.find({ - select: ['id'], - skip: maxCount, - take: 1, - order: { id: 'DESC' }, - }); - - if (executions[0]) { - toPrune.push({ id: LessThanOrEqual(executions[0].id) }); - } - } - - const [timeBasedWhere, countBasedWhere] = toPrune; - - const result = await this.executionRepository - .createQueryBuilder() - .update(ExecutionEntity) - .set({ deletedAt: new Date() }) - .where({ - deletedAt: IsNull(), - // Only mark executions as deleted if they are in an end state - status: Not(In(['new', 'running', 'waiting'])), - }) - .andWhere( - new Brackets((qb) => - countBasedWhere - ? qb.where(timeBasedWhere).orWhere(countBasedWhere) - : qb.where(timeBasedWhere), - ), - ) - .execute(); + const result = await this.executionRepository.softDeletePrunableExecutions(); if (result.affected === 0) { this.logger.debug('[Pruning] Found no executions to soft-delete'); @@ -177,40 +129,22 @@ export class PruningService { * @return Delay in ms after which the next cycle should be started */ private async hardDeleteOnPruningCycle() { - const date = new Date(); - date.setHours(date.getHours() - config.getEnv('executions.pruneDataHardDeleteBuffer')); - - const workflowIdsAndExecutionIds = ( - await this.executionRepository.find({ - select: ['workflowId', 'id'], - where: { - deletedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)), - }, - take: this.hardDeletionBatchSize, - - /** - * @important This ensures soft-deleted executions are included, - * else `@DeleteDateColumn()` at `deletedAt` will exclude them. - */ - withDeleted: true, - }) - ).map(({ id: executionId, workflowId }) => ({ workflowId, executionId })); - - const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId); + const ids = await this.executionRepository.hardDeleteSoftDeletedExecutions(); + + const executionIds = ids.map((o) => o.executionId); if (executionIds.length === 0) { this.logger.debug('[Pruning] Found no executions to hard-delete'); + return this.rates.hardDeletion; } try { - this.logger.debug('[Pruning] Starting hard-deletion of executions', { - executionIds, - }); + this.logger.debug('[Pruning] Starting hard-deletion of executions', { executionIds }); - await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds); + await this.binaryDataService.deleteMany(ids); - await this.executionRepository.delete({ id: In(executionIds) }); + await this.executionRepository.deleteByIds(executionIds); this.logger.debug('[Pruning] Hard-deleted executions', { executionIds }); } catch (error) { @@ -225,7 +159,7 @@ export class PruningService { * to prevent high concurrency from causing duplicate deletions. */ const isHighVolume = executionIds.length >= this.hardDeletionBatchSize; - const rate = isHighVolume ? 1 * TIME.SECOND : this.rates.hardDeletion; - return rate; + + return isHighVolume ? 1 * TIME.SECOND : this.rates.hardDeletion; } }