Skip to content

Commit

Permalink
refactor(core): Limit soft-deletions to pruning only (#7469)
Browse files Browse the repository at this point in the history
Based on customer feedback, we should limit soft deletions to pruning
only, to prevent executions from piling up in very high volume cases.
  • Loading branch information
ivov authored Oct 20, 2023
1 parent 3c0a166 commit 0b42d1a
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ export = {
return res.status(404).json({ message: 'Not Found' });
}

await Container.get(ExecutionRepository).softDelete(execution.id);
await Container.get(ExecutionRepository).hardDelete({
workflowId: execution.workflowId as string,
executionId: execution.id,
});

execution.id = id;

Expand Down
10 changes: 8 additions & 2 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,10 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
}

if (isManualMode && !saveManualExecutions && !fullRunData.waitTill) {
await Container.get(ExecutionRepository).softDelete(this.executionId);
await Container.get(ExecutionRepository).hardDelete({
workflowId: this.workflowData.id as string,
executionId: this.executionId,
});

return;
}
Expand Down Expand Up @@ -547,7 +550,10 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
this.executionId,
this.retryOf,
);
await Container.get(ExecutionRepository).softDelete(this.executionId);
await Container.get(ExecutionRepository).hardDelete({
workflowId: this.workflowData.id as string,
executionId: this.executionId,
});

return;
}
Expand Down
5 changes: 4 additions & 1 deletion packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,10 @@ export class WorkflowRunner {
(workflowDidSucceed && saveDataSuccessExecution === 'none') ||
(!workflowDidSucceed && saveDataErrorExecution === 'none')
) {
await Container.get(ExecutionRepository).softDelete(executionId);
await Container.get(ExecutionRepository).hardDelete({
workflowId: data.workflowData.id as string,
executionId,
});
}
// eslint-disable-next-line id-denylist
} catch (err) {
Expand Down
41 changes: 30 additions & 11 deletions packages/cli/src/databases/repositories/execution.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
} min`,
);

this.intervals.softDeletion = setInterval(async () => this.prune(), this.rates.softDeletion);
this.intervals.softDeletion = setInterval(
async () => this.softDeleteOnPruningCycle(),
this.rates.softDeletion,
);
}

setHardDeletionInterval() {
Expand All @@ -136,7 +139,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
);

this.intervals.hardDeletion = setInterval(
async () => this.hardDelete(),
async () => this.hardDeleteOnPruningCycle(),
this.rates.hardDeletion,
);
}
Expand Down Expand Up @@ -294,6 +297,16 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
);
}

/**
* Permanently remove a single execution and its binary data.
*/
async hardDelete(ids: { workflowId: string; executionId: string }) {
return Promise.all([
this.binaryDataService.deleteMany([ids]),
this.delete({ id: ids.executionId }),
]);
}

async updateExistingExecution(executionId: string, execution: Partial<IExecutionResponse>) {
// Se isolate startedAt because it must be set when the execution starts and should never change.
// So we prevent updating it, if it's sent (it usually is and causes problems to executions that
Expand Down Expand Up @@ -467,12 +480,15 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
do {
// Delete in batches to avoid "SQLITE_ERROR: Expression tree is too large (maximum depth 1000)" error
const batch = executionIds.splice(0, this.deletionBatchSize);
await this.softDelete(batch);
await this.delete(batch);
} while (executionIds.length > 0);
}

async prune() {
Logger.verbose('Soft-deleting (pruning) execution data from database');
/**
* Mark executions as deleted based on age and count, in a pruning cycle.
*/
async softDeleteOnPruningCycle() {
Logger.verbose('Soft-deleting execution data from database (pruning cycle)');

const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
const maxCount = config.getEnv('executions.pruneDataMaxCount');
Expand Down Expand Up @@ -520,9 +536,9 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
}

/**
* Permanently delete all soft-deleted executions and their binary data, in batches.
* Permanently remove all soft-deleted executions and their binary data, in a pruning cycle.
*/
private async hardDelete() {
private async hardDeleteOnPruningCycle() {
const date = new Date();
date.setHours(date.getHours() - config.getEnv('executions.pruneDataHardDeleteBuffer'));

Expand Down Expand Up @@ -551,9 +567,12 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {

await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds);

this.logger.debug(`Hard-deleting ${executionIds.length} executions from database`, {
executionIds,
});
this.logger.debug(
`Hard-deleting ${executionIds.length} executions from database (pruning cycle)`,
{
executionIds,
},
);

// Actually delete these executions
await this.delete({ id: In(executionIds) });
Expand All @@ -569,7 +588,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
if (executionIds.length === this.deletionBatchSize) {
clearInterval(this.intervals.hardDeletion);

setTimeout(async () => this.hardDelete(), 1 * TIME.SECOND);
setTimeout(async () => this.hardDeleteOnPruningCycle(), 1 * TIME.SECOND);
} else {
if (this.intervals.hardDeletion) return;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ describe('ExecutionRepository.prune()', () => {
await testDb.createSuccessfulExecution(workflow),
];

await executionRepository.prune();
await executionRepository.softDeleteOnPruningCycle();

const result = await findAllExecutions();
expect(result).toEqual([
Expand All @@ -76,7 +76,7 @@ describe('ExecutionRepository.prune()', () => {
await testDb.createSuccessfulExecution(workflow),
];

await executionRepository.prune();
await executionRepository.softDeleteOnPruningCycle();

const result = await findAllExecutions();
expect(result).toEqual([
Expand All @@ -98,7 +98,7 @@ describe('ExecutionRepository.prune()', () => {
await testDb.createSuccessfulExecution(workflow),
];

await executionRepository.prune();
await executionRepository.softDeleteOnPruningCycle();

const result = await findAllExecutions();
expect(result).toEqual([
Expand All @@ -117,7 +117,7 @@ describe('ExecutionRepository.prune()', () => {
await testDb.createSuccessfulExecution(workflow),
];

await executionRepository.prune();
await executionRepository.softDeleteOnPruningCycle();

const result = await findAllExecutions();
expect(result).toEqual([
Expand Down Expand Up @@ -145,7 +145,7 @@ describe('ExecutionRepository.prune()', () => {
),
];

await executionRepository.prune();
await executionRepository.softDeleteOnPruningCycle();

const result = await findAllExecutions();
expect(result).toEqual([
Expand All @@ -169,7 +169,7 @@ describe('ExecutionRepository.prune()', () => {
await testDb.createSuccessfulExecution(workflow),
];

await executionRepository.prune();
await executionRepository.softDeleteOnPruningCycle();

const result = await findAllExecutions();
expect(result).toEqual([
Expand All @@ -188,7 +188,7 @@ describe('ExecutionRepository.prune()', () => {
])('should prune %s executions', async (status, attributes) => {
const execution = await testDb.createExecution({ status, ...attributes }, workflow);

await executionRepository.prune();
await executionRepository.softDeleteOnPruningCycle();

const result = await findAllExecutions();
expect(result).toEqual([
Expand All @@ -206,7 +206,7 @@ describe('ExecutionRepository.prune()', () => {
await testDb.createSuccessfulExecution(workflow),
];

await executionRepository.prune();
await executionRepository.softDeleteOnPruningCycle();

const result = await findAllExecutions();
expect(result).toEqual([
Expand Down

0 comments on commit 0b42d1a

Please sign in to comment.