Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Generalize binary data manager interface (no-changelog) #7164

Merged
merged 115 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 103 commits
Commits
Show all changes
115 commits
Select commit Hold shift + click to select a range
29f1d11
refactor(core): Simplify executions and binary data pruning
netroy Aug 31, 2023
f7206fd
Merge branch 'master' into pay-771-implement-soft-deletion
ivov Sep 4, 2023
9c8efbc
Merge branch 'master' into pay-771-implement-soft-deletion
ivov Sep 5, 2023
05e3fef
Remove test code from service
ivov Sep 5, 2023
9dabbe8
Use time constants
ivov Sep 5, 2023
3f7de8e
Improve naming
ivov Sep 5, 2023
5583814
Use native typeorm soft deletion
ivov Sep 5, 2023
f50108a
Improve naming
ivov Sep 5, 2023
85ac53a
Make batch size class field
ivov Sep 5, 2023
1d87e9e
Cleanup
ivov Sep 5, 2023
dd16e45
Remove unused method
ivov Sep 5, 2023
619bff6
Cleanup
ivov Sep 5, 2023
8cace11
Filter out soft-deleted executions
ivov Sep 5, 2023
1fea046
Merge branch 'master' into pay-771-implement-soft-deletion
ivov Sep 5, 2023
c4ffe6d
Add tests
ivov Sep 5, 2023
738eb1b
Improve test
ivov Sep 6, 2023
259fe75
Soft-delete in single pass
ivov Sep 13, 2023
f3f5b27
Restore value
ivov Sep 13, 2023
65d17cb
Restore from debug value
ivov Sep 13, 2023
96ae7b4
Add clarifying comments
ivov Sep 13, 2023
21040b5
Update tests
ivov Sep 13, 2023
aa4d633
Add `typedi` to core
ivov Sep 13, 2023
e54becb
Update package.json
ivov Sep 13, 2023
80e7258
Turn `BinaryDataManager` into service
ivov Sep 13, 2023
3a8960f
Add `object` to binary data schema options
ivov Sep 13, 2023
64b1a96
Use service throughout core and cli
ivov Sep 13, 2023
f974b55
Update tests
ivov Sep 13, 2023
36d0243
Reduce diff
ivov Sep 13, 2023
5e209e9
Update default
ivov Sep 13, 2023
0c3a6aa
Add docline
ivov Sep 13, 2023
3d4beba
Remove redundant output types
ivov Sep 13, 2023
1bf7a3d
Remove unused type
ivov Sep 13, 2023
1152092
Rename to submanagers to client
ivov Sep 13, 2023
5627b4c
Rename `BinaryDataFileSystem` to `FileSystemClient`
ivov Sep 13, 2023
6451750
Clean up types
ivov Sep 13, 2023
7a7ad54
Move `binaryToBuffer` to manager
ivov Sep 13, 2023
cdfe2fc
Rename `BinaryDataManager` to `BinaryDataService`
ivov Sep 13, 2023
b5bc390
Unify storage path naming
ivov Sep 13, 2023
c876697
Set binary data service in nodes-base test helper
ivov Sep 13, 2023
25a796c
Rename manager in nodes-base
ivov Sep 13, 2023
538d318
More renamings
ivov Sep 13, 2023
c7cf4b1
Generalize interface
ivov Sep 13, 2023
307bb1b
Continue generalizing
ivov Sep 13, 2023
a2528d7
Fix test
ivov Sep 13, 2023
d0cd85e
Better naming
ivov Sep 13, 2023
bfdbdf5
More renamings
ivov Sep 13, 2023
25e80b9
Minor cleanup
ivov Sep 13, 2023
3476f6c
More minor cleanup
ivov Sep 13, 2023
9a25301
Add clarifying comment
ivov Sep 13, 2023
0e075c2
Speed up pruning if high volume
ivov Sep 14, 2023
e8ccf05
Merge branch 'master' into pay-771-implement-soft-deletion
ivov Sep 14, 2023
2071c7f
Remove call from hook
ivov Sep 14, 2023
be44721
Merge parent branch
ivov Sep 14, 2023
1d50b61
Refix conflict
ivov Sep 14, 2023
e042e91
Add formats to schema
ivov Sep 14, 2023
e948f8f
Add `ensureStringArray` util
ivov Sep 14, 2023
0feb926
Adjust types based on parsing
ivov Sep 14, 2023
c73f18c
Add type guard
ivov Sep 14, 2023
f8fec92
Clean up types in `init()`
ivov Sep 14, 2023
93ea57c
Fix test
ivov Sep 14, 2023
c8676be
Remove unneeded modifier
ivov Sep 14, 2023
25006e8
Cleanup
ivov Sep 14, 2023
a7156f8
Fix node tests
ivov Sep 14, 2023
e0e90b6
Remove unneeded line
ivov Sep 14, 2023
219addb
Rename constant
ivov Sep 14, 2023
d7c7f5f
Remove comment
ivov Sep 14, 2023
1ea380d
Fix tests
ivov Sep 14, 2023
a5b388b
Cleanup
ivov Sep 14, 2023
9204754
Rename error
ivov Sep 14, 2023
9597508
Improve error message
ivov Sep 14, 2023
8fde8be
Better error
ivov Sep 14, 2023
ada012e
Better message
ivov Sep 14, 2023
67e163f
Make execution ID non-nullable
ivov Sep 15, 2023
9665da1
Merge master
ivov Sep 15, 2023
12636dd
Readability improvements
ivov Sep 15, 2023
e5c8c72
Adjust types, followup to 67e163f
ivov Sep 15, 2023
8849007
Merge parent branch
ivov Sep 15, 2023
b7062e5
Fix lint
ivov Sep 15, 2023
79ecaa9
Fix lint
ivov Sep 15, 2023
02b6d86
Cleanup
ivov Sep 18, 2023
96ea9ea
Comment out cache keys
ivov Sep 18, 2023
3d76b21
Revert "Comment out cache keys"
ivov Sep 18, 2023
455c327
Rename dir to `TempBinaryData`
ivov Sep 18, 2023
65b4fb0
Rename back to `BinaryData`
ivov Sep 18, 2023
3bf3430
Merge master
ivov Sep 18, 2023
abe6d9d
Clear timers on shutdown
ivov Sep 18, 2023
2682b3d
Set timers only on main instance
ivov Sep 18, 2023
1c788fe
Also for clearing timers
ivov Sep 18, 2023
f248982
Merge parent branch
ivov Sep 18, 2023
04f4b83
Rename back to `localStoragePath`
ivov Sep 18, 2023
03264f9
Remove unused arg
ivov Sep 18, 2023
a6e33d4
Fix lint
ivov Sep 18, 2023
8823554
Add logging, refactor for readability
ivov Sep 19, 2023
a5def40
Ensure hard-deletion select includes soft-deleted rows
ivov Sep 19, 2023
77955e4
Switch `info` to `debug`
ivov Sep 19, 2023
b930e3e
Fix tests
ivov Sep 19, 2023
e91f3de
Remove redundant checks for `deletedAt` being `NULL`
ivov Sep 19, 2023
2c3704d
Fix lint
ivov Sep 19, 2023
215f58e
Fix last test
ivov Sep 19, 2023
c33d164
More missing loggers in tests
ivov Sep 19, 2023
69764f7
Add logger to even more test
ivov Sep 19, 2023
7c3e58d
Refactor logging for tests
ivov Sep 19, 2023
6fe79b8
Merge parent branch
ivov Sep 19, 2023
4f025b2
Merge master, fix conflicts
ivov Sep 20, 2023
80a4fc5
Fix misresolved conflict
ivov Sep 20, 2023
20d5ea7
Remove excess check on init
ivov Sep 20, 2023
0f84eb6
Remove `ObjectStore.manager.ts` stub
ivov Sep 21, 2023
605d02b
Revert rename to `BinaryDataManager`
ivov Sep 21, 2023
c430f22
Missing renaming in test
ivov Sep 21, 2023
0cdfb79
Rename constant
ivov Sep 22, 2023
08ce3f7
Merge master
ivov Sep 22, 2023
61c2f5a
Restore `LogCatch`
ivov Sep 22, 2023
74adb24
Add file not found error to `getSize`
ivov Sep 22, 2023
0bc2457
Add `mode` to `InvalidBinaryDataManagerError`
ivov Sep 22, 2023
d27e4c4
Delete many by execution IDs only if manager available
ivov Sep 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/cli/src/ActiveExecutions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { createDeferredPromise, LoggerProxy } from 'n8n-workflow';
import type { ChildProcess } from 'child_process';
import type PCancelable from 'p-cancelable';
import type {
ExecutionPayload,
IExecutingWorkflowData,
IExecutionDb,
IExecutionsCurrentSummary,
Expand All @@ -38,7 +39,7 @@ export class ActiveExecutions {
if (executionId === undefined) {
// Is a new execution so save in DB

const fullExecutionData: IExecutionDb = {
const fullExecutionData: ExecutionPayload = {
data: executionData.executionData!,
mode: executionData.executionMode,
finished: false,
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/GenericHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { Container } from 'typedi';
import { Like } from 'typeorm';
import config from '@/config';
import * as Db from '@/Db';
import type { ICredentialsDb, IExecutionDb, IWorkflowDb } from '@/Interfaces';
import type { ExecutionPayload, ICredentialsDb, IWorkflowDb } from '@/Interfaces';
import * as ResponseHelper from '@/ResponseHelper';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type { CredentialsEntity } from '@db/entities/CredentialsEntity';
Expand Down Expand Up @@ -178,7 +178,7 @@ export async function createErrorExecution(
},
};

const fullExecutionData: IExecutionDb = {
const fullExecutionData: ExecutionPayload = {
data: executionData,
mode,
finished: false,
Expand Down
7 changes: 6 additions & 1 deletion packages/cli/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ export type ICredentialsDecryptedResponse = ICredentialsDecryptedDb;
export type SaveExecutionDataType = 'all' | 'none';

export interface IExecutionBase {
id?: string;
id: string;
mode: WorkflowExecuteMode;
startedAt: Date;
stoppedAt?: Date; // empty value means execution is still running
Expand All @@ -189,6 +189,11 @@ export interface IExecutionDb extends IExecutionBase {
workflowData?: IWorkflowBase;
}

/**
* Payload for creating or updating an execution.
*/
export type ExecutionPayload = Omit<IExecutionDb, 'id'>;

export interface IExecutionPushResponse {
executionId?: string;
waitingForWebhook?: boolean;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
import type express from 'express';

import { BinaryDataManager } from 'n8n-core';

import {
getExecutions,
getExecutionInWorkflows,
deleteExecution,
getExecutionsCount,
} from './executions.service';
import { getExecutions, getExecutionInWorkflows, getExecutionsCount } from './executions.service';
import { ActiveExecutions } from '@/ActiveExecutions';
import { authorize, validCursor } from '../../shared/middlewares/global.middleware';
import type { ExecutionRequest } from '../../../types';
import { getSharedWorkflowIds } from '../workflows/workflows.service';
import { encodeNextCursor } from '../../shared/services/pagination.service';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
import { ExecutionRepository } from '@/databases/repositories';

export = {
deleteExecution: [
Expand All @@ -37,9 +31,7 @@ export = {
return res.status(404).json({ message: 'Not Found' });
}

await BinaryDataManager.getInstance().deleteBinaryDataByExecutionIds([execution.id!]);

await deleteExecution(execution);
await Container.get(ExecutionRepository).softDelete(execution.id);

execution.id = id;

Expand Down Expand Up @@ -111,7 +103,7 @@ export = {

const executions = await getExecutions(filters);

const newLastId = !executions.length ? '0' : (executions.slice(-1)[0].id as string);
const newLastId = !executions.length ? '0' : executions.slice(-1)[0].id;

filters.lastId = newLastId;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { DeleteResult, FindOptionsWhere } from 'typeorm';
import type { FindOptionsWhere } from 'typeorm';
import { In, Not, Raw, LessThan } from 'typeorm';
import { Container } from 'typedi';
import type { ExecutionStatus } from 'n8n-workflow';
Expand Down Expand Up @@ -109,7 +109,3 @@ export async function getExecutionInWorkflows(
unflattenData: true,
});
}

export async function deleteExecution(execution: IExecutionBase): Promise<DeleteResult> {
return Container.get(ExecutionRepository).deleteExecution(execution.id as string);
}
13 changes: 7 additions & 6 deletions packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import type { RequestOptions } from 'oauth-1.0a';
import clientOAuth1 from 'oauth-1.0a';

import {
BinaryDataManager,
BinaryDataService,
Credentials,
LoadMappingOptions,
LoadNodeParameterOptions,
Expand Down Expand Up @@ -202,6 +202,8 @@ export class Server extends AbstractServer {

push: Push;

binaryDataService: BinaryDataService;

constructor() {
super('main');

Expand Down Expand Up @@ -358,12 +360,13 @@ export class Server extends AbstractServer {
this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint');

this.push = Container.get(Push);
this.binaryDataService = Container.get(BinaryDataService);

await super.start();
LoggerProxy.debug(`Server ID: ${this.uniqueInstanceId}`);

const cpus = os.cpus();
const binaryDataConfig = config.getEnv('binaryDataManager');
const binaryDataConfig = config.getEnv('binaryDataService');
const diagnosticInfo: IDiagnosticInfo = {
databaseType: config.getEnv('database.type'),
disableProductionWebhooksOnMainProcess: config.getEnv(
Expand Down Expand Up @@ -396,7 +399,6 @@ export class Server extends AbstractServer {
),
executions_data_prune: config.getEnv('executions.pruneData'),
executions_data_max_age: config.getEnv('executions.pruneDataMaxAge'),
executions_data_prune_timeout: config.getEnv('executions.pruneDataTimeout'),
},
deploymentType: config.getEnv('deployment.type'),
binaryDataMode: binaryDataConfig.mode,
Expand Down Expand Up @@ -1425,13 +1427,12 @@ export class Server extends AbstractServer {
async (req: BinaryDataRequest, res: express.Response): Promise<void> => {
// TODO UM: check if this needs permission check for UM
const identifier = req.params.path;
const binaryDataManager = BinaryDataManager.getInstance();
try {
const binaryPath = binaryDataManager.getBinaryPath(identifier);
const binaryPath = this.binaryDataService.getPath(identifier);
let { mode, fileName, mimeType } = req.query;
if (!fileName || !mimeType) {
try {
const metadata = await binaryDataManager.getBinaryMetadata(identifier);
const metadata = await this.binaryDataService.getMetadata(identifier);
fileName = metadata.fileName;
mimeType = metadata.mimeType;
res.setHeader('Content-Length', metadata.fileSize);
Expand Down
6 changes: 3 additions & 3 deletions packages/cli/src/WebhookHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import stream from 'stream';
import { promisify } from 'util';
import formidable from 'formidable';

import { BinaryDataManager, NodeExecuteFunctions } from 'n8n-core';
import { BinaryDataService, NodeExecuteFunctions } from 'n8n-core';

import type {
IBinaryData,
Expand Down Expand Up @@ -514,7 +514,7 @@ export async function executeWebhook(
const binaryData = (response.body as IDataObject)?.binaryData as IBinaryData;
if (binaryData?.id) {
res.header(response.headers);
const stream = BinaryDataManager.getInstance().getBinaryStream(binaryData.id);
const stream = Container.get(BinaryDataService).getAsStream(binaryData.id);
void pipeline(stream, res).then(() =>
responseCallback(null, { noWebhookResponse: true }),
);
Expand Down Expand Up @@ -734,7 +734,7 @@ export async function executeWebhook(
// Send the webhook response manually
res.setHeader('Content-Type', binaryData.mimeType);
if (binaryData.id) {
const stream = BinaryDataManager.getInstance().getBinaryStream(binaryData.id);
const stream = Container.get(BinaryDataService).getAsStream(binaryData.id);
await pipeline(stream, res);
} else {
res.end(Buffer.from(binaryData.data, BINARY_ENCODING));
Expand Down
98 changes: 5 additions & 93 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
/* eslint-disable @typescript-eslint/no-unused-vars */

/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core';
import { UserSettings, WorkflowExecute } from 'n8n-core';

import type {
IDataObject,
Expand Down Expand Up @@ -37,21 +37,16 @@ import {
} from 'n8n-workflow';

import { Container } from 'typedi';
import type { FindOptionsWhere } from 'typeorm';
import { LessThanOrEqual, In } from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils';
import config from '@/config';
import * as Db from '@/Db';
import { ActiveExecutions } from '@/ActiveExecutions';
import { CredentialsHelper } from '@/CredentialsHelper';
import { ExternalHooks } from '@/ExternalHooks';
import type {
IExecutionDb,
IExecutionFlattedDb,
IPushDataExecutionFinished,
IWorkflowExecuteProcess,
IWorkflowExecutionDataProcess,
IWorkflowErrorData,
ExecutionPayload,
} from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import { Push } from '@/push';
Expand Down Expand Up @@ -184,77 +179,6 @@ export function executeErrorWorkflow(
}
}

/**
* Prunes Saved Execution which are older than configured.
* Throttled to be executed just once in configured timeframe.
* TODO: Consider moving this whole function to the repository or at least the queries
*/
let throttling = false;
async function pruneExecutionData(this: WorkflowHooks): Promise<void> {
if (!throttling) {
Logger.verbose('Pruning execution data from database');

throttling = true;
const timeout = config.getEnv('executions.pruneDataTimeout'); // in seconds
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
const maxCount = config.getEnv('executions.pruneDataMaxCount');
const date = new Date(); // today
date.setHours(date.getHours() - maxAge);

// date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286

const utcDate = DateUtils.mixedDateToUtcDatetimeString(date);

const toPrune: Array<FindOptionsWhere<IExecutionFlattedDb>> = [
{ stoppedAt: LessThanOrEqual(utcDate) },
];

if (maxCount > 0) {
const executions = await Db.collections.Execution.find({
select: ['id'],
skip: maxCount,
take: 1,
order: { id: 'DESC' },
});

if (executions[0]) {
toPrune.push({ id: LessThanOrEqual(executions[0].id) });
}
}

try {
setTimeout(() => {
throttling = false;
}, timeout * 1000);
let executionIds: Array<IExecutionFlattedDb['id']>;
do {
executionIds = (
await Db.collections.Execution.find({
select: ['id'],
where: toPrune,
take: 100,
})
).map(({ id }) => id);
await Db.collections.Execution.delete({ id: In(executionIds) });
// Mark binary data for deletion for all executions
await BinaryDataManager.getInstance().markDataForDeletionByExecutionIds(executionIds);
} while (executionIds.length > 0);
} catch (error) {
ErrorReporter.error(error);
throttling = false;
Logger.error(
`Failed pruning execution data from database for execution ID ${this.executionId} (hookFunctionsSave)`,
{
...error,
executionId: this.executionId,
sessionId: this.sessionId,
workflowId: this.workflowData.id,
},
);
}
}
}

/**
* Returns hook functions to push data to Editor-UI
*
Expand Down Expand Up @@ -522,11 +446,6 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
workflowId: this.workflowData.id,
});

// Prune old execution data
if (config.getEnv('executions.pruneData')) {
await pruneExecutionData.call(this);
}

const isManualMode = [this.mode, parentProcessMode].includes('manual');

try {
Expand Down Expand Up @@ -554,8 +473,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
}

if (isManualMode && !saveManualExecutions && !fullRunData.waitTill) {
// Data is always saved, so we remove from database
await Container.get(ExecutionRepository).deleteExecution(this.executionId, true);
await Container.get(ExecutionRepository).softDelete(this.executionId);

return;
}
Expand Down Expand Up @@ -586,8 +504,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
this.executionId,
this.retryOf,
);
// Data is always saved, so we remove from database
await Container.get(ExecutionRepository).deleteExecution(this.executionId);
await Container.get(ExecutionRepository).softDelete(this.executionId);

return;
}
Expand Down Expand Up @@ -682,11 +599,6 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
workflowId: this.workflowData.id,
});
try {
// Prune old execution data
if (config.getEnv('executions.pruneData')) {
await pruneExecutionData.call(this);
}

if (isWorkflowIdValid(this.workflowData.id) && newStaticData) {
// Workflow is saved so update in database
try {
Expand Down Expand Up @@ -973,7 +885,7 @@ async function executeWorkflow(
// Therefore, database might not contain finished errors.
// Force an update to db as there should be no harm doing this

const fullExecutionData: IExecutionDb = {
const fullExecutionData: ExecutionPayload = {
data: fullRunData.data,
mode: fullRunData.mode,
finished: fullRunData.finished ? fullRunData.finished : false,
Expand Down
8 changes: 6 additions & 2 deletions packages/cli/src/WorkflowHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ import {
} from 'n8n-workflow';
import { v4 as uuid } from 'uuid';
import * as Db from '@/Db';
import type { IExecutionDb, IWorkflowErrorData, IWorkflowExecutionDataProcess } from '@/Interfaces';
import type {
ExecutionPayload,
IWorkflowErrorData,
IWorkflowExecutionDataProcess,
} from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
// eslint-disable-next-line import/no-cycle
import { WorkflowRunner } from '@/WorkflowRunner';
Expand Down Expand Up @@ -186,7 +190,7 @@ export async function executeErrorWorkflow(
initialNode,
);

const fullExecutionData: IExecutionDb = {
const fullExecutionData: ExecutionPayload = {
data: fakeExecution.data,
mode: fakeExecution.mode,
finished: false,
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ export class WorkflowRunner {
(workflowDidSucceed && saveDataSuccessExecution === 'none') ||
(!workflowDidSucceed && saveDataErrorExecution === 'none')
) {
await Container.get(ExecutionRepository).deleteExecution(executionId);
await Container.get(ExecutionRepository).softDelete(executionId);
}
// eslint-disable-next-line id-denylist
} catch (err) {
Expand Down
Loading