diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index b17b2d8bb9b9f..70bc4b70108dd 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -26,7 +26,7 @@ import type { RequestOptions } from 'oauth-1.0a'; import clientOAuth1 from 'oauth-1.0a'; import { - BinaryDataManager, + BinaryDataService, Credentials, LoadMappingOptions, LoadNodeParameterOptions, @@ -202,6 +202,8 @@ export class Server extends AbstractServer { push: Push; + binaryDataService: BinaryDataService; + constructor() { super('main'); @@ -361,6 +363,7 @@ export class Server extends AbstractServer { this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint'); this.push = Container.get(Push); + this.binaryDataService = Container.get(BinaryDataService); await super.start(); LoggerProxy.debug(`Server ID: ${this.uniqueInstanceId}`); @@ -1427,13 +1430,12 @@ export class Server extends AbstractServer { async (req: BinaryDataRequest, res: express.Response): Promise => { // TODO UM: check if this needs permission check for UM const identifier = req.params.path; - const binaryDataManager = BinaryDataManager.getInstance(); try { - const binaryPath = binaryDataManager.getBinaryPath(identifier); + const binaryPath = this.binaryDataService.getPath(identifier); let { mode, fileName, mimeType } = req.query; if (!fileName || !mimeType) { try { - const metadata = await binaryDataManager.getBinaryMetadata(identifier); + const metadata = await this.binaryDataService.getMetadata(identifier); fileName = metadata.fileName; mimeType = metadata.mimeType; res.setHeader('Content-Length', metadata.fileSize); diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index 427856d5f1ee1..703e4b63b800d 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -14,7 +14,7 @@ import stream from 'stream'; import { promisify } from 'util'; import formidable from 'formidable'; -import { BinaryDataManager, NodeExecuteFunctions } from 'n8n-core'; +import { BinaryDataService, NodeExecuteFunctions } from 'n8n-core'; import type { IBinaryData, @@ -514,7 +514,7 @@ export async function executeWebhook( const binaryData = (response.body as IDataObject)?.binaryData as IBinaryData; if (binaryData?.id) { res.header(response.headers); - const stream = BinaryDataManager.getInstance().getBinaryStream(binaryData.id); + const stream = Container.get(BinaryDataService).getAsStream(binaryData.id); void pipeline(stream, res).then(() => responseCallback(null, { noWebhookResponse: true }), ); @@ -734,7 +734,7 @@ export async function executeWebhook( // Send the webhook response manually res.setHeader('Content-Type', binaryData.mimeType); if (binaryData.id) { - const stream = BinaryDataManager.getInstance().getBinaryStream(binaryData.id); + const stream = Container.get(BinaryDataService).getAsStream(binaryData.id); await pipeline(stream, res); } else { res.end(Buffer.from(binaryData.data, BINARY_ENCODING)); diff --git a/packages/cli/src/WorkflowRunnerProcess.ts b/packages/cli/src/WorkflowRunnerProcess.ts index 42a59d82c1000..4266f0facf6fe 100644 --- a/packages/cli/src/WorkflowRunnerProcess.ts +++ b/packages/cli/src/WorkflowRunnerProcess.ts @@ -11,7 +11,7 @@ setDefaultResultOrder('ipv4first'); import { Container } from 'typedi'; import type { IProcessMessage } from 'n8n-core'; -import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core'; +import { BinaryDataService, UserSettings, WorkflowExecute } from 'n8n-core'; import type { ExecutionError, @@ -124,7 +124,7 @@ class WorkflowRunnerProcess { await Container.get(InternalHooks).init(instanceId); const binaryDataConfig = config.getEnv('binaryDataManager'); - await BinaryDataManager.init(binaryDataConfig); + await Container.get(BinaryDataService).init(binaryDataConfig); const license = Container.get(License); await license.init(instanceId); diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index 039660843a674..922634b3f68af 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -3,7 +3,7 @@ import { ExitError } from '@oclif/errors'; import { Container } from 'typedi'; import { LoggerProxy, ErrorReporterProxy as ErrorReporter, sleep } from 'n8n-workflow'; import type { IUserSettings } from 'n8n-core'; -import { BinaryDataManager, UserSettings } from 'n8n-core'; +import { BinaryDataService, UserSettings } from 'n8n-core'; import type { AbstractServer } from '@/AbstractServer'; import { getLogger } from '@/Logger'; import config from '@/config'; @@ -105,9 +105,9 @@ export abstract class BaseCommand extends Command { process.exit(1); } - async initBinaryManager() { + async initBinaryDataService() { const binaryDataConfig = config.getEnv('binaryDataManager'); - await BinaryDataManager.init(binaryDataConfig, true); + await Container.get(BinaryDataService).init(binaryDataConfig); } async initExternalHooks() { diff --git a/packages/cli/src/commands/execute.ts b/packages/cli/src/commands/execute.ts index c708e664decbe..f50d6c7f53d51 100644 --- a/packages/cli/src/commands/execute.ts +++ b/packages/cli/src/commands/execute.ts @@ -33,7 +33,7 @@ export class Execute extends BaseCommand { async init() { await super.init(); - await this.initBinaryManager(); + await this.initBinaryDataService(); await this.initExternalHooks(); } diff --git a/packages/cli/src/commands/executeBatch.ts b/packages/cli/src/commands/executeBatch.ts index 4822747478da2..a20348c92e5da 100644 --- a/packages/cli/src/commands/executeBatch.ts +++ b/packages/cli/src/commands/executeBatch.ts @@ -180,7 +180,7 @@ export class ExecuteBatch extends BaseCommand { async init() { await super.init(); - await this.initBinaryManager(); + await this.initBinaryDataService(); await this.initExternalHooks(); } diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 0cebdb633ec8e..235c0c8144a07 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -201,7 +201,7 @@ export class Start extends BaseCommand { this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner); await this.initLicense('main'); - await this.initBinaryManager(); + await this.initBinaryDataService(); await this.initExternalHooks(); await this.initExternalSecrets(); diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index 912f8a22762ce..1a681be8b6b5a 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -78,7 +78,7 @@ export class Webhook extends BaseCommand { await super.init(); await this.initLicense('webhook'); - await this.initBinaryManager(); + await this.initBinaryDataService(); await this.initExternalHooks(); await this.initExternalSecrets(); } diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index bffaea3e4a0a7..28501a90cad26 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -257,7 +257,7 @@ export class Worker extends BaseCommand { this.logger.debug('Starting n8n worker...'); await this.initLicense('worker'); - await this.initBinaryManager(); + await this.initBinaryDataService(); await this.initExternalHooks(); await this.initExternalSecrets(); await this.initEventBus(); diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index d035de34ae2f7..65c17fd3c7634 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -2,28 +2,21 @@ import path from 'path'; import convict from 'convict'; import { UserSettings } from 'n8n-core'; import { jsonParse } from 'n8n-workflow'; +import { ensureStringArray } from './utils'; convict.addFormat({ - name: 'nodes-list', - // @ts-ignore - validate(values: string[], { env }: { env: string }): void { - try { - if (!Array.isArray(values)) { - throw new Error(); - } + name: 'json-string-array', + coerce: (rawStr: string) => + jsonParse(rawStr, { + errorMessage: `Expected this value "${rawStr}" to be valid JSON`, + }), + validate: ensureStringArray, +}); - for (const value of values) { - if (typeof value !== 'string') { - throw new Error(); - } - } - } catch (error) { - throw new TypeError(`${env} is not a valid Array of strings.`); - } - }, - coerce(rawValue: string): string[] { - return jsonParse(rawValue, { errorMessage: 'nodes-list needs to be valid JSON' }); - }, +convict.addFormat({ + name: 'comma-separated-list', + coerce: (rawStr: string) => rawStr.split(','), + validate: ensureStringArray, }); export const schema = { @@ -788,13 +781,13 @@ export const schema = { nodes: { include: { doc: 'Nodes to load', - format: 'nodes-list', + format: 'json-string-array', default: undefined, env: 'NODES_INCLUDE', }, exclude: { doc: 'Nodes not to load', - format: 'nodes-list', + format: 'json-string-array', default: undefined, env: 'NODES_EXCLUDE', }, @@ -902,7 +895,7 @@ export const schema = { binaryDataManager: { availableModes: { - format: String, + format: 'comma-separated-list', default: 'filesystem', env: 'N8N_AVAILABLE_BINARY_DATA_MODES', doc: 'Available modes of binary data storage, as comma separated strings', diff --git a/packages/cli/src/config/types.ts b/packages/cli/src/config/types.ts index 28dee1e73fba0..b5646ff5ae8fb 100644 --- a/packages/cli/src/config/types.ts +++ b/packages/cli/src/config/types.ts @@ -1,7 +1,7 @@ /* eslint-disable @typescript-eslint/naming-convention */ /* eslint-disable @typescript-eslint/no-unused-vars */ -import type { IBinaryDataConfig } from 'n8n-core'; +import type { BinaryData } from 'n8n-core'; import type { schema } from './schema'; // ----------------------------------- @@ -76,7 +76,7 @@ type ToReturnType = T extends NumericPath type ExceptionPaths = { 'queue.bull.redis': object; - binaryDataManager: IBinaryDataConfig; + binaryDataManager: BinaryData.Config; 'nodes.exclude': string[] | undefined; 'nodes.include': string[] | undefined; 'userManagement.isInstanceOwnerSetUp': boolean; diff --git a/packages/cli/src/config/utils.ts b/packages/cli/src/config/utils.ts new file mode 100644 index 0000000000000..4fee1a41108fd --- /dev/null +++ b/packages/cli/src/config/utils.ts @@ -0,0 +1,17 @@ +import type { SchemaObj } from 'convict'; + +class NotStringArrayError extends Error { + constructor(env: string) { + super(`${env} is not a string array.`); + } +} + +export const ensureStringArray = (values: string[], { env }: SchemaObj) => { + if (!env) throw new Error(`Missing env: ${env}`); + + if (!Array.isArray(values)) throw new NotStringArrayError(env); + + for (const value of values) { + if (typeof value !== 'string') throw new NotStringArrayError(env); + } +}; diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index 651e6d7bfc6d7..051d206b54dd8 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -10,7 +10,7 @@ import type { import { parse, stringify } from 'flatted'; import { LoggerProxy as Logger } from 'n8n-workflow'; import type { IExecutionsSummary, IRunExecutionData } from 'n8n-workflow'; -import { BinaryDataManager } from 'n8n-core'; +import { BinaryDataService } from 'n8n-core'; import type { ExecutionPayload, IExecutionBase, @@ -89,6 +89,7 @@ export class ExecutionRepository extends Repository { constructor( dataSource: DataSource, private readonly executionDataRepository: ExecutionDataRepository, + private readonly binaryDataService: BinaryDataService, ) { super(ExecutionEntity, dataSource.manager); @@ -520,8 +521,7 @@ export class ExecutionRepository extends Repository { }) ).map(({ id }) => id); - const binaryDataManager = BinaryDataManager.getInstance(); - await binaryDataManager.deleteBinaryDataByExecutionIds(executionIds); + await this.binaryDataService.deleteManyByExecutionIds(executionIds); this.logger.debug(`Hard-deleting ${executionIds.length} executions from database`, { executionIds, diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index d860579ee392b..2d9191c124db7 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -5,7 +5,7 @@ import { LoggerProxy } from 'n8n-workflow'; import { Telemetry } from '@/telemetry'; import { getLogger } from '@/Logger'; import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; -import { BinaryDataManager } from 'n8n-core'; +import { BinaryDataService } from 'n8n-core'; import { CacheService } from '@/services/cache.service'; import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; import { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber'; @@ -26,7 +26,7 @@ beforeAll(async () => { mockInstance(InternalHooks); mockInstance(CacheService); mockInstance(ExternalSecretsManager); - mockInstance(BinaryDataManager); + mockInstance(BinaryDataService); mockInstance(MessageEventBus); mockInstance(LoadNodesAndCredentials); mockInstance(CredentialTypes); @@ -41,7 +41,7 @@ test('worker initializes all its components', async () => { jest.spyOn(worker, 'init'); jest.spyOn(worker, 'initLicense').mockImplementation(async () => {}); - jest.spyOn(worker, 'initBinaryManager').mockImplementation(async () => {}); + jest.spyOn(worker, 'initBinaryDataService').mockImplementation(async () => {}); jest.spyOn(worker, 'initExternalHooks').mockImplementation(async () => {}); jest.spyOn(worker, 'initExternalSecrets').mockImplementation(async () => {}); jest.spyOn(worker, 'initEventBus').mockImplementation(async () => {}); @@ -64,7 +64,7 @@ test('worker initializes all its components', async () => { expect(worker.uniqueInstanceId).toContain('worker'); expect(worker.uniqueInstanceId.length).toBeGreaterThan(15); expect(worker.initLicense).toHaveBeenCalled(); - expect(worker.initBinaryManager).toHaveBeenCalled(); + expect(worker.initBinaryDataService).toHaveBeenCalled(); expect(worker.initExternalHooks).toHaveBeenCalled(); expect(worker.initExternalSecrets).toHaveBeenCalled(); expect(worker.initEventBus).toHaveBeenCalled(); diff --git a/packages/cli/test/integration/publicApi/executions.test.ts b/packages/cli/test/integration/publicApi/executions.test.ts index 9b208548a5b05..5e1712a221912 100644 --- a/packages/cli/test/integration/publicApi/executions.test.ts +++ b/packages/cli/test/integration/publicApi/executions.test.ts @@ -27,8 +27,8 @@ beforeAll(async () => { user1 = await testDb.createUser({ globalRole: globalUserRole, apiKey: randomApiKey() }); user2 = await testDb.createUser({ globalRole: globalUserRole, apiKey: randomApiKey() }); - // TODO: mock BinaryDataManager instead - await utils.initBinaryManager(); + // TODO: mock BinaryDataService instead + await utils.initBinaryDataService(); await utils.initNodeTypes(); workflowRunner = await utils.initActiveWorkflowRunner(); diff --git a/packages/cli/test/integration/shared/utils/index.ts b/packages/cli/test/integration/shared/utils/index.ts index 660b4ada5cbc1..ee4b1e57509f3 100644 --- a/packages/cli/test/integration/shared/utils/index.ts +++ b/packages/cli/test/integration/shared/utils/index.ts @@ -1,7 +1,7 @@ import { Container } from 'typedi'; import { randomBytes } from 'crypto'; import { existsSync } from 'fs'; -import { BinaryDataManager, UserSettings } from 'n8n-core'; +import { BinaryDataService, UserSettings } from 'n8n-core'; import type { INode } from 'n8n-workflow'; import { GithubApi } from 'n8n-nodes-base/credentials/GithubApi.credentials'; import { Ftp } from 'n8n-nodes-base/credentials/Ftp.credentials'; @@ -72,11 +72,14 @@ export async function initNodeTypes() { } /** - * Initialize a BinaryManager for test runs. + * Initialize a BinaryDataService for test runs. */ -export async function initBinaryManager() { - const binaryDataConfig = config.getEnv('binaryDataManager'); - await BinaryDataManager.init(binaryDataConfig); +export async function initBinaryDataService() { + const binaryDataService = new BinaryDataService(); + + await binaryDataService.init(config.getEnv('binaryDataManager')); + + Container.set(BinaryDataService, binaryDataService); } /** diff --git a/packages/core/package.json b/packages/core/package.json index 93174a329a924..bd6d57c0fca25 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -47,8 +47,8 @@ "n8n-nodes-base": "workspace:*" }, "dependencies": { - "axios": "^0.21.1", "@n8n/client-oauth2": "workspace:*", + "axios": "^0.21.1", "concat-stream": "^2.0.0", "cron": "~1.7.2", "crypto-js": "~4.1.1", @@ -63,6 +63,7 @@ "p-cancelable": "^2.0.0", "pretty-bytes": "^5.6.0", "qs": "^6.10.1", + "typedi": "^0.10.0", "uuid": "^8.3.2" } } diff --git a/packages/core/src/BinaryData/BinaryData.service.ts b/packages/core/src/BinaryData/BinaryData.service.ts new file mode 100644 index 0000000000000..cd5b65816c9cc --- /dev/null +++ b/packages/core/src/BinaryData/BinaryData.service.ts @@ -0,0 +1,227 @@ +import { readFile, stat } from 'fs/promises'; +import concatStream from 'concat-stream'; +import prettyBytes from 'pretty-bytes'; +import { Service } from 'typedi'; +import { BINARY_ENCODING, LoggerProxy as Logger, IBinaryData } from 'n8n-workflow'; + +import { FileSystemManager } from './FileSystem.manager'; +import { InvalidBinaryDataManagerError, InvalidBinaryDataModeError, areValidModes } from './utils'; + +import type { Readable } from 'stream'; +import type { BinaryData } from './types'; +import type { INodeExecutionData } from 'n8n-workflow'; +import { LogCatch } from '../decorators/LogCatch.decorator'; + +@Service() +export class BinaryDataService { + private availableModes: BinaryData.Mode[] = []; + + private mode: BinaryData.Mode = 'default'; + + private managers: Record = {}; + + async init(config: BinaryData.Config) { + if (!areValidModes(config.availableModes)) throw new InvalidBinaryDataModeError(); + + this.availableModes = config.availableModes; + this.mode = config.mode; + + if (this.availableModes.includes('filesystem')) { + this.managers.filesystem = new FileSystemManager(config.localStoragePath); + + await this.managers.filesystem.init(); + } + } + + @LogCatch((error) => Logger.error('Failed to copy binary data file', { error })) + async copyBinaryFile(binaryData: IBinaryData, path: string, executionId: string) { + const manager = this.managers[this.mode]; + + if (!manager) { + const { size } = await stat(path); + binaryData.fileSize = prettyBytes(size); + binaryData.data = await readFile(path, { encoding: BINARY_ENCODING }); + + return binaryData; + } + + const identifier = await manager.copyByPath(path, executionId); + binaryData.id = this.createIdentifier(identifier); + binaryData.data = this.mode; // clear binary data from memory + + const fileSize = await manager.getSize(identifier); + binaryData.fileSize = prettyBytes(fileSize); + + await manager.storeMetadata(identifier, { + fileName: binaryData.fileName, + mimeType: binaryData.mimeType, + fileSize, + }); + + return binaryData; + } + + @LogCatch((error) => Logger.error('Failed to write binary data file', { error })) + async store(binaryData: IBinaryData, input: Buffer | Readable, executionId: string) { + const manager = this.managers[this.mode]; + + if (!manager) { + const buffer = await this.binaryToBuffer(input); + binaryData.data = buffer.toString(BINARY_ENCODING); + binaryData.fileSize = prettyBytes(buffer.length); + + return binaryData; + } + + const identifier = await manager.store(input, executionId); + binaryData.id = this.createIdentifier(identifier); + binaryData.data = this.mode; // clear binary data from memory + + const fileSize = await manager.getSize(identifier); + binaryData.fileSize = prettyBytes(fileSize); + + await manager.storeMetadata(identifier, { + fileName: binaryData.fileName, + mimeType: binaryData.mimeType, + fileSize, + }); + + return binaryData; + } + + async binaryToBuffer(body: Buffer | Readable) { + return new Promise((resolve) => { + if (Buffer.isBuffer(body)) resolve(body); + else body.pipe(concatStream(resolve)); + }); + } + + getAsStream(identifier: string, chunkSize?: number) { + const { mode, id } = this.splitBinaryModeFileId(identifier); + + return this.getManager(mode).getStream(id, chunkSize); + } + + async getBinaryDataBuffer(binaryData: IBinaryData) { + if (binaryData.id) return this.retrieveBinaryDataByIdentifier(binaryData.id); + + return Buffer.from(binaryData.data, BINARY_ENCODING); + } + + async retrieveBinaryDataByIdentifier(identifier: string) { + const { mode, id } = this.splitBinaryModeFileId(identifier); + + return this.getManager(mode).getBuffer(id); + } + + getPath(identifier: string) { + const { mode, id } = this.splitBinaryModeFileId(identifier); + + return this.getManager(mode).getPath(id); + } + + async getMetadata(identifier: string) { + const { mode, id } = this.splitBinaryModeFileId(identifier); + + return this.getManager(mode).getMetadata(id); + } + + async deleteManyByExecutionIds(executionIds: string[]) { + const manager = this.getManager(this.mode); + + if (!manager) return; + + await manager.deleteManyByExecutionIds(executionIds); + } + + @LogCatch((error) => + Logger.error('Failed to copy all binary data files for execution', { error }), + ) + async duplicateBinaryData(inputData: Array, executionId: string) { + if (inputData && this.managers[this.mode]) { + const returnInputData = (inputData as INodeExecutionData[][]).map( + async (executionDataArray) => { + if (executionDataArray) { + return Promise.all( + executionDataArray.map(async (executionData) => { + if (executionData.binary) { + return this.duplicateBinaryDataInExecData(executionData, executionId); + } + + return executionData; + }), + ); + } + + return executionDataArray; + }, + ); + + return Promise.all(returnInputData); + } + + return inputData as INodeExecutionData[][]; + } + + // ---------------------------------- + // private methods + // ---------------------------------- + + private createIdentifier(filename: string) { + return `${this.mode}:${filename}`; + } + + private splitBinaryModeFileId(fileId: string) { + const [mode, id] = fileId.split(':'); + + return { mode, id }; + } + + private async duplicateBinaryDataInExecData( + executionData: INodeExecutionData, + executionId: string, + ) { + const manager = this.managers[this.mode]; + + if (executionData.binary) { + const binaryDataKeys = Object.keys(executionData.binary); + const bdPromises = binaryDataKeys.map(async (key: string) => { + if (!executionData.binary) { + return { key, newId: undefined }; + } + + const binaryDataId = executionData.binary[key].id; + if (!binaryDataId) { + return { key, newId: undefined }; + } + + return manager + ?.copyByIdentifier(this.splitBinaryModeFileId(binaryDataId).id, executionId) + .then((filename) => ({ + newId: this.createIdentifier(filename), + key, + })); + }); + + return Promise.all(bdPromises).then((b) => { + return b.reduce((acc, curr) => { + if (acc.binary && curr) { + acc.binary[curr.key].id = curr.newId; + } + + return acc; + }, executionData); + }); + } + + return executionData; + } + + private getManager(mode: string) { + const manager = this.managers[mode]; + + if (manager) return manager; + + throw new InvalidBinaryDataManagerError(mode); + } +} diff --git a/packages/core/src/BinaryData/FileSystem.manager.ts b/packages/core/src/BinaryData/FileSystem.manager.ts new file mode 100644 index 0000000000000..84a86716ddbad --- /dev/null +++ b/packages/core/src/BinaryData/FileSystem.manager.ts @@ -0,0 +1,142 @@ +import { createReadStream } from 'fs'; +import fs from 'fs/promises'; +import path from 'path'; +import { v4 as uuid } from 'uuid'; +import { jsonParse } from 'n8n-workflow'; + +import { FileNotFoundError } from '../errors'; + +import type { Readable } from 'stream'; +import type { BinaryMetadata } from 'n8n-workflow'; +import type { BinaryData } from './types'; + +const EXECUTION_ID_EXTRACTOR = + /^(\w+)(?:[0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12})$/; + +export class FileSystemManager implements BinaryData.Manager { + constructor(private storagePath: string) {} + + async init() { + await this.ensureDirExists(this.storagePath); + } + + getPath(identifier: string) { + return this.resolvePath(identifier); + } + + async getSize(identifier: string) { + const filePath = this.getPath(identifier); + + try { + const stats = await fs.stat(filePath); + return stats.size; + } catch (error) { + throw new Error('Failed to find binary data file in filesystem', { cause: error }); + } + } + + getStream(identifier: string, chunkSize?: number) { + const filePath = this.getPath(identifier); + + return createReadStream(filePath, { highWaterMark: chunkSize }); + } + + async getBuffer(identifier: string) { + const filePath = this.getPath(identifier); + + try { + return await fs.readFile(filePath); + } catch { + throw new Error(`Error finding file: ${filePath}`); + } + } + + async storeMetadata(identifier: string, metadata: BinaryMetadata) { + const filePath = this.resolvePath(`${identifier}.metadata`); + + await fs.writeFile(filePath, JSON.stringify(metadata), { encoding: 'utf-8' }); + } + + async getMetadata(identifier: string): Promise { + const filePath = this.resolvePath(`${identifier}.metadata`); + + return jsonParse(await fs.readFile(filePath, { encoding: 'utf-8' })); + } + + async store(binaryData: Buffer | Readable, executionId: string) { + const identifier = this.createIdentifier(executionId); + const filePath = this.getPath(identifier); + + await fs.writeFile(filePath, binaryData); + + return identifier; + } + + async deleteOne(identifier: string) { + const filePath = this.getPath(identifier); + + return fs.rm(filePath); + } + + async deleteManyByExecutionIds(executionIds: string[]) { + const set = new Set(executionIds); + const fileNames = await fs.readdir(this.storagePath); + const deletedIds = []; + + for (const fileName of fileNames) { + const executionId = fileName.match(EXECUTION_ID_EXTRACTOR)?.[1]; + + if (executionId && set.has(executionId)) { + const filePath = this.resolvePath(fileName); + + await Promise.all([fs.rm(filePath), fs.rm(`${filePath}.metadata`)]); + + deletedIds.push(executionId); + } + } + + return deletedIds; + } + + async copyByPath(filePath: string, executionId: string) { + const identifier = this.createIdentifier(executionId); + + await fs.cp(filePath, this.getPath(identifier)); + + return identifier; + } + + async copyByIdentifier(identifier: string, executionId: string) { + const newIdentifier = this.createIdentifier(executionId); + + await fs.copyFile(this.resolvePath(identifier), this.resolvePath(newIdentifier)); + + return newIdentifier; + } + + // ---------------------------------- + // private methods + // ---------------------------------- + + private async ensureDirExists(dir: string) { + try { + await fs.access(dir); + } catch { + await fs.mkdir(dir, { recursive: true }); + } + } + + private createIdentifier(executionId: string) { + return [executionId, uuid()].join(''); + } + + private resolvePath(...args: string[]) { + const returnPath = path.join(this.storagePath, ...args); + + if (path.relative(this.storagePath, returnPath).startsWith('..')) { + throw new FileNotFoundError('Invalid path detected'); + } + + return returnPath; + } +} diff --git a/packages/core/src/BinaryData/types.ts b/packages/core/src/BinaryData/types.ts new file mode 100644 index 0000000000000..e6bc3f6cedc43 --- /dev/null +++ b/packages/core/src/BinaryData/types.ts @@ -0,0 +1,44 @@ +import type { Readable } from 'stream'; +import type { BinaryMetadata } from 'n8n-workflow'; +import type { BINARY_DATA_MODES } from './utils'; + +export namespace BinaryData { + export type Mode = (typeof BINARY_DATA_MODES)[number]; + + export type Config = { + mode: 'default' | 'filesystem'; + availableModes: string[]; + localStoragePath: string; + }; + + export interface Manager { + init(): Promise; + + store(binaryData: Buffer | Readable, executionId: string): Promise; + getPath(identifier: string): string; + + // @TODO: Refactor to use identifier + getSize(path: string): Promise; + + getBuffer(identifier: string): Promise; + getStream(identifier: string, chunkSize?: number): Readable; + + // @TODO: Refactor out - not needed for object storage + storeMetadata(identifier: string, metadata: BinaryMetadata): Promise; + + // @TODO: Refactor out - not needed for object storage + getMetadata(identifier: string): Promise; + + // @TODO: Refactor to also use `workflowId` to support full path-like identifier: + // `workflows/{workflowId}/executions/{executionId}/binary_data/{fileId}` + copyByPath(path: string, executionId: string): Promise; + + copyByIdentifier(identifier: string, prefix: string): Promise; + + deleteOne(identifier: string): Promise; + + // @TODO: Refactor to also receive `workflowId` to support full path-like identifier: + // `workflows/{workflowId}/executions/{executionId}/binary_data/{fileId}` + deleteManyByExecutionIds(executionIds: string[]): Promise; + } +} diff --git a/packages/core/src/BinaryData/utils.ts b/packages/core/src/BinaryData/utils.ts new file mode 100644 index 0000000000000..c2bea73850f85 --- /dev/null +++ b/packages/core/src/BinaryData/utils.ts @@ -0,0 +1,25 @@ +import type { BinaryData } from './types'; + +/** + * Modes for storing binary data: + * - `default` (in memory) + * - `filesystem` (on disk) + * - `s3` (S3-compatible storage) + */ +export const BINARY_DATA_MODES = ['default', 'filesystem', 's3'] as const; + +export function areValidModes(modes: string[]): modes is BinaryData.Mode[] { + return modes.every((m) => BINARY_DATA_MODES.includes(m as BinaryData.Mode)); +} + +export class InvalidBinaryDataModeError extends Error { + constructor() { + super(`Invalid binary data mode. Valid modes: ${BINARY_DATA_MODES.join(', ')}`); + } +} + +export class InvalidBinaryDataManagerError extends Error { + constructor(mode: string) { + super('No binary data manager found for mode: ' + mode); + } +} diff --git a/packages/core/src/BinaryDataManager/FileSystem.ts b/packages/core/src/BinaryDataManager/FileSystem.ts deleted file mode 100644 index bcc40a38ec5ec..0000000000000 --- a/packages/core/src/BinaryDataManager/FileSystem.ts +++ /dev/null @@ -1,138 +0,0 @@ -import { createReadStream } from 'fs'; -import fs from 'fs/promises'; -import path from 'path'; -import { v4 as uuid } from 'uuid'; -import type { Readable } from 'stream'; -import type { BinaryMetadata } from 'n8n-workflow'; -import { jsonParse } from 'n8n-workflow'; - -import { IBinaryDataConfig } from '../Interfaces'; -import type { IBinaryDataManager } from '../Interfaces'; -import { FileNotFoundError } from '../errors'; - -const executionExtractionRegexp = - /^(\w+)(?:[0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12})$/; - -export class BinaryDataFileSystem implements IBinaryDataManager { - private storagePath: string; - - constructor(config: IBinaryDataConfig) { - this.storagePath = config.localStoragePath; - } - - async init() { - await this.assertFolder(this.storagePath); - } - - async getFileSize(identifier: string): Promise { - const stats = await fs.stat(this.getBinaryPath(identifier)); - return stats.size; - } - - async copyBinaryFile(filePath: string, executionId: string): Promise { - const binaryDataId = this.generateFileName(executionId); - await this.copyFileToLocalStorage(filePath, binaryDataId); - return binaryDataId; - } - - async storeBinaryMetadata(identifier: string, metadata: BinaryMetadata) { - await fs.writeFile(this.getMetadataPath(identifier), JSON.stringify(metadata), { - encoding: 'utf-8', - }); - } - - async getBinaryMetadata(identifier: string): Promise { - return jsonParse(await fs.readFile(this.getMetadataPath(identifier), { encoding: 'utf-8' })); - } - - async storeBinaryData(binaryData: Buffer | Readable, executionId: string): Promise { - const binaryDataId = this.generateFileName(executionId); - await this.saveToLocalStorage(binaryData, binaryDataId); - return binaryDataId; - } - - getBinaryStream(identifier: string, chunkSize?: number): Readable { - return createReadStream(this.getBinaryPath(identifier), { highWaterMark: chunkSize }); - } - - async retrieveBinaryDataByIdentifier(identifier: string): Promise { - return this.retrieveFromLocalStorage(identifier); - } - - getBinaryPath(identifier: string): string { - return this.resolveStoragePath(identifier); - } - - getMetadataPath(identifier: string): string { - return this.resolveStoragePath(`${identifier}.metadata`); - } - - async duplicateBinaryDataByIdentifier(binaryDataId: string, prefix: string): Promise { - const newBinaryDataId = this.generateFileName(prefix); - - await fs.copyFile( - this.resolveStoragePath(binaryDataId), - this.resolveStoragePath(newBinaryDataId), - ); - return newBinaryDataId; - } - - async deleteBinaryDataByExecutionIds(executionIds: string[]): Promise { - const set = new Set(executionIds); - const fileNames = await fs.readdir(this.storagePath); - const deletedIds = []; - for (const fileName of fileNames) { - const executionId = fileName.match(executionExtractionRegexp)?.[1]; - if (executionId && set.has(executionId)) { - const filePath = this.resolveStoragePath(fileName); - await Promise.all([fs.rm(filePath), fs.rm(`${filePath}.metadata`)]); - deletedIds.push(executionId); - } - } - return deletedIds; - } - - async deleteBinaryDataByIdentifier(identifier: string): Promise { - return this.deleteFromLocalStorage(identifier); - } - - private async assertFolder(folder: string): Promise { - try { - await fs.access(folder); - } catch { - await fs.mkdir(folder, { recursive: true }); - } - } - - private generateFileName(prefix: string): string { - return [prefix, uuid()].join(''); - } - - private async deleteFromLocalStorage(identifier: string) { - return fs.rm(this.getBinaryPath(identifier)); - } - - private async copyFileToLocalStorage(source: string, identifier: string): Promise { - await fs.cp(source, this.getBinaryPath(identifier)); - } - - private async saveToLocalStorage(binaryData: Buffer | Readable, identifier: string) { - await fs.writeFile(this.getBinaryPath(identifier), binaryData); - } - - private async retrieveFromLocalStorage(identifier: string): Promise { - const filePath = this.getBinaryPath(identifier); - try { - return await fs.readFile(filePath); - } catch (e) { - throw new Error(`Error finding file: ${filePath}`); - } - } - - private resolveStoragePath(...args: string[]) { - const returnPath = path.join(this.storagePath, ...args); - if (path.relative(this.storagePath, returnPath).startsWith('..')) - throw new FileNotFoundError('Invalid path detected'); - return returnPath; - } -} diff --git a/packages/core/src/BinaryDataManager/index.ts b/packages/core/src/BinaryDataManager/index.ts deleted file mode 100644 index 677a5a73f6799..0000000000000 --- a/packages/core/src/BinaryDataManager/index.ts +++ /dev/null @@ -1,252 +0,0 @@ -import { readFile, stat } from 'fs/promises'; -import type { BinaryMetadata, INodeExecutionData } from 'n8n-workflow'; -import prettyBytes from 'pretty-bytes'; -import type { Readable } from 'stream'; -import { BINARY_ENCODING, LoggerProxy as Logger, IBinaryData } from 'n8n-workflow'; -import { IBinaryDataConfig } from '../Interfaces'; -import type { IBinaryDataManager } from '../Interfaces'; -import { BinaryDataFileSystem } from './FileSystem'; -import { binaryToBuffer } from './utils'; -import { LogCatch } from '../decorators/LogCatch.decorator'; - -export class BinaryDataManager { - static instance: BinaryDataManager | undefined; - - private managers: { - [key: string]: IBinaryDataManager; - }; - - private binaryDataMode: string; - - private availableModes: string[]; - - constructor(config: IBinaryDataConfig) { - this.binaryDataMode = config.mode; - this.availableModes = config.availableModes.split(','); - this.managers = {}; - } - - static async init(config: IBinaryDataConfig, mainManager = false): Promise { - if (BinaryDataManager.instance) { - throw new Error('Binary Data Manager already initialized'); - } - - BinaryDataManager.instance = new BinaryDataManager(config); - - if (BinaryDataManager.instance.availableModes.includes('filesystem')) { - BinaryDataManager.instance.managers.filesystem = new BinaryDataFileSystem(config); - await BinaryDataManager.instance.managers.filesystem.init(mainManager); - } - - return undefined; - } - - static getInstance(): BinaryDataManager { - if (!BinaryDataManager.instance) { - throw new Error('Binary Data Manager not initialized'); - } - - return BinaryDataManager.instance; - } - - @LogCatch((error) => Logger.error('Failed to copy binary data file', { error })) - async copyBinaryFile( - binaryData: IBinaryData, - filePath: string, - executionId: string, - ): Promise { - // If a manager handles this binary, copy over the binary file and return its reference id. - const manager = this.managers[this.binaryDataMode]; - if (manager) { - const identifier = await manager.copyBinaryFile(filePath, executionId); - // Add data manager reference id. - binaryData.id = this.generateBinaryId(identifier); - - // Prevent preserving data in memory if handled by a data manager. - binaryData.data = this.binaryDataMode; - - const fileSize = await manager.getFileSize(identifier); - binaryData.fileSize = prettyBytes(fileSize); - - await manager.storeBinaryMetadata(identifier, { - fileName: binaryData.fileName, - mimeType: binaryData.mimeType, - fileSize, - }); - } else { - const { size } = await stat(filePath); - binaryData.fileSize = prettyBytes(size); - binaryData.data = await readFile(filePath, { encoding: BINARY_ENCODING }); - } - - return binaryData; - } - - @LogCatch((error) => Logger.error('Failed to write binary data file', { error })) - async storeBinaryData( - binaryData: IBinaryData, - input: Buffer | Readable, - executionId: string, - ): Promise { - // If a manager handles this binary, return the binary data with its reference id. - const manager = this.managers[this.binaryDataMode]; - if (manager) { - const identifier = await manager.storeBinaryData(input, executionId); - - // Add data manager reference id. - binaryData.id = this.generateBinaryId(identifier); - - // Prevent preserving data in memory if handled by a data manager. - binaryData.data = this.binaryDataMode; - - const fileSize = await manager.getFileSize(identifier); - binaryData.fileSize = prettyBytes(fileSize); - - await manager.storeBinaryMetadata(identifier, { - fileName: binaryData.fileName, - mimeType: binaryData.mimeType, - fileSize, - }); - } else { - const buffer = await binaryToBuffer(input); - binaryData.data = buffer.toString(BINARY_ENCODING); - binaryData.fileSize = prettyBytes(buffer.length); - } - - return binaryData; - } - - getBinaryStream(identifier: string, chunkSize?: number): Readable { - const { mode, id } = this.splitBinaryModeFileId(identifier); - if (this.managers[mode]) { - return this.managers[mode].getBinaryStream(id, chunkSize); - } - - throw new Error('Storage mode used to store binary data not available'); - } - - async getBinaryDataBuffer(binaryData: IBinaryData): Promise { - if (binaryData.id) { - return this.retrieveBinaryDataByIdentifier(binaryData.id); - } - - return Buffer.from(binaryData.data, BINARY_ENCODING); - } - - async retrieveBinaryDataByIdentifier(identifier: string): Promise { - const { mode, id } = this.splitBinaryModeFileId(identifier); - if (this.managers[mode]) { - return this.managers[mode].retrieveBinaryDataByIdentifier(id); - } - - throw new Error('Storage mode used to store binary data not available'); - } - - getBinaryPath(identifier: string): string { - const { mode, id } = this.splitBinaryModeFileId(identifier); - if (this.managers[mode]) { - return this.managers[mode].getBinaryPath(id); - } - - throw new Error('Storage mode used to store binary data not available'); - } - - async getBinaryMetadata(identifier: string): Promise { - const { mode, id } = this.splitBinaryModeFileId(identifier); - if (this.managers[mode]) { - return this.managers[mode].getBinaryMetadata(id); - } - - throw new Error('Storage mode used to store binary data not available'); - } - - async deleteBinaryDataByExecutionIds(executionIds: string[]): Promise { - if (this.managers[this.binaryDataMode]) { - await this.managers[this.binaryDataMode].deleteBinaryDataByExecutionIds(executionIds); - } - } - - @LogCatch((error) => - Logger.error('Failed to copy all binary data files for execution', { error }), - ) - async duplicateBinaryData( - inputData: Array, - executionId: string, - ): Promise { - if (inputData && this.managers[this.binaryDataMode]) { - const returnInputData = (inputData as INodeExecutionData[][]).map( - async (executionDataArray) => { - if (executionDataArray) { - return Promise.all( - executionDataArray.map(async (executionData) => { - if (executionData.binary) { - return this.duplicateBinaryDataInExecData(executionData, executionId); - } - - return executionData; - }), - ); - } - - return executionDataArray; - }, - ); - - return Promise.all(returnInputData); - } - - return inputData as INodeExecutionData[][]; - } - - private generateBinaryId(filename: string) { - return `${this.binaryDataMode}:${filename}`; - } - - private splitBinaryModeFileId(fileId: string): { mode: string; id: string } { - const [mode, id] = fileId.split(':'); - return { mode, id }; - } - - private async duplicateBinaryDataInExecData( - executionData: INodeExecutionData, - executionId: string, - ): Promise { - const binaryManager = this.managers[this.binaryDataMode]; - - if (executionData.binary) { - const binaryDataKeys = Object.keys(executionData.binary); - const bdPromises = binaryDataKeys.map(async (key: string) => { - if (!executionData.binary) { - return { key, newId: undefined }; - } - - const binaryDataId = executionData.binary[key].id; - if (!binaryDataId) { - return { key, newId: undefined }; - } - - return binaryManager - ?.duplicateBinaryDataByIdentifier( - this.splitBinaryModeFileId(binaryDataId).id, - executionId, - ) - .then((filename) => ({ - newId: this.generateBinaryId(filename), - key, - })); - }); - - return Promise.all(bdPromises).then((b) => { - return b.reduce((acc, curr) => { - if (acc.binary && curr) { - acc.binary[curr.key].id = curr.newId; - } - - return acc; - }, executionData); - }); - } - - return executionData; - } -} diff --git a/packages/core/src/BinaryDataManager/utils.ts b/packages/core/src/BinaryDataManager/utils.ts deleted file mode 100644 index 85fb05f6870bd..0000000000000 --- a/packages/core/src/BinaryDataManager/utils.ts +++ /dev/null @@ -1,8 +0,0 @@ -import concatStream from 'concat-stream'; -import type { Readable } from 'stream'; - -export const binaryToBuffer = async (body: Buffer | Readable) => - new Promise((resolve) => { - if (Buffer.isBuffer(body)) resolve(body); - else body.pipe(concatStream(resolve)); - }); diff --git a/packages/core/src/Interfaces.ts b/packages/core/src/Interfaces.ts index f1f3f4ff7a5bd..aa723045fdcd1 100644 --- a/packages/core/src/Interfaces.ts +++ b/packages/core/src/Interfaces.ts @@ -1,9 +1,7 @@ -import type { Readable } from 'stream'; import type { IPollResponse, ITriggerResponse, IWorkflowSettings as IWorkflowSettingsWorkflow, - BinaryMetadata, ValidationResult, } from 'n8n-workflow'; @@ -34,27 +32,6 @@ export interface IWorkflowData { triggerResponses?: ITriggerResponse[]; } -export interface IBinaryDataConfig { - mode: 'default' | 'filesystem'; - availableModes: string; - localStoragePath: string; -} - -export interface IBinaryDataManager { - init(startPurger: boolean): Promise; - getFileSize(filePath: string): Promise; - copyBinaryFile(filePath: string, executionId: string): Promise; - storeBinaryMetadata(identifier: string, metadata: BinaryMetadata): Promise; - getBinaryMetadata(identifier: string): Promise; - storeBinaryData(binaryData: Buffer | Readable, executionId: string): Promise; - retrieveBinaryDataByIdentifier(identifier: string): Promise; - getBinaryPath(identifier: string): string; - getBinaryStream(identifier: string, chunkSize?: number): Readable; - deleteBinaryDataByIdentifier(identifier: string): Promise; - duplicateBinaryDataByIdentifier(binaryDataId: string, prefix: string): Promise; - deleteBinaryDataByExecutionIds(executionIds: string[]): Promise; -} - export namespace n8n { export interface PackageJson { name: string; diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 56360c6022b49..388ae31493968 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -117,8 +117,7 @@ import type { RequestPromiseOptions } from 'request-promise-native'; import { Readable } from 'stream'; import url, { URL, URLSearchParams } from 'url'; -import { BinaryDataManager } from './BinaryDataManager'; -import { binaryToBuffer } from './BinaryDataManager/utils'; +import { BinaryDataService } from './BinaryData/BinaryData.service'; import { BINARY_DATA_STORAGE_PATH, BLOCK_FILE_ACCESS_TO_N8N_FILES, @@ -132,14 +131,15 @@ import { import { extractValue } from './ExtractValue'; import type { ExtendedValidationResult, IResponseError, IWorkflowSettings } from './Interfaces'; import { getClientCredentialsToken } from './OAuth2Helper'; -import { getSecretsProxy } from './Secrets'; -import { getUserN8nFolderPath } from './UserSettings'; import { getAllWorkflowExecutionMetadata, getWorkflowExecutionMetadata, setAllWorkflowExecutionMetadata, setWorkflowExecutionMetadata, } from './WorkflowExecutionMetadata'; +import { getSecretsProxy } from './Secrets'; +import { getUserN8nFolderPath } from './UserSettings'; +import Container from 'typedi'; axios.defaults.timeout = 300000; // Prevent axios from adding x-form-www-urlencoded headers by default @@ -774,9 +774,9 @@ export async function proxyRequestToAxios( let responseData = response.data; if (Buffer.isBuffer(responseData) || responseData instanceof Readable) { - responseData = await binaryToBuffer(responseData).then((buffer) => - buffer.toString('utf-8'), - ); + responseData = await Container.get(BinaryDataService) + .binaryToBuffer(responseData) + .then((buffer) => buffer.toString('utf-8')); } if (configObject.simple === false) { @@ -941,21 +941,21 @@ async function httpRequest( } export function getBinaryPath(binaryDataId: string): string { - return BinaryDataManager.getInstance().getBinaryPath(binaryDataId); + return Container.get(BinaryDataService).getPath(binaryDataId); } /** * Returns binary file metadata */ export async function getBinaryMetadata(binaryDataId: string): Promise { - return BinaryDataManager.getInstance().getBinaryMetadata(binaryDataId); + return Container.get(BinaryDataService).getMetadata(binaryDataId); } /** * Returns binary file stream for piping */ export function getBinaryStream(binaryDataId: string, chunkSize?: number): Readable { - return BinaryDataManager.getInstance().getBinaryStream(binaryDataId, chunkSize); + return Container.get(BinaryDataService).getAsStream(binaryDataId, chunkSize); } export function assertBinaryData( @@ -992,7 +992,7 @@ export async function getBinaryDataBuffer( inputIndex: number, ): Promise { const binaryData = inputData.main[inputIndex]![itemIndex]!.binary![propertyName]!; - return BinaryDataManager.getInstance().getBinaryDataBuffer(binaryData); + return Container.get(BinaryDataService).getBinaryDataBuffer(binaryData); } /** @@ -1008,7 +1008,7 @@ export async function setBinaryDataBuffer( binaryData: Buffer | Readable, executionId: string, ): Promise { - return BinaryDataManager.getInstance().storeBinaryData(data, binaryData, executionId); + return Container.get(BinaryDataService).store(data, binaryData, executionId); } export async function copyBinaryFile( @@ -1061,7 +1061,7 @@ export async function copyBinaryFile( returnData.fileName = path.parse(filePath).base; } - return BinaryDataManager.getInstance().copyBinaryFile(returnData, filePath, executionId); + return Container.get(BinaryDataService).copyBinaryFile(returnData, filePath, executionId); } /** @@ -2573,7 +2573,8 @@ const getBinaryHelperFunctions = ({ getBinaryPath, getBinaryStream, getBinaryMetadata, - binaryToBuffer, + binaryToBuffer: async (body: Buffer | Readable) => + Container.get(BinaryDataService).binaryToBuffer(body), prepareBinaryData: async (binaryData, filePath, mimeType) => prepareBinaryData(binaryData, executionId!, filePath, mimeType), setBinaryDataBuffer: async (data, binaryData) => @@ -2761,7 +2762,7 @@ export function getExecuteFunctions( parentWorkflowSettings: workflow.settings, }) .then(async (result) => - BinaryDataManager.getInstance().duplicateBinaryData( + Container.get(BinaryDataService).duplicateBinaryData( result, additionalData.executionId!, ), @@ -2833,7 +2834,8 @@ export function getExecuteFunctions( ); return dataProxy.getDataProxy(); }, - binaryToBuffer, + binaryToBuffer: async (body: Buffer | Readable) => + Container.get(BinaryDataService).binaryToBuffer(body), async putExecutionToWait(waitTill: Date): Promise { runExecutionData.waitTill = waitTill; if (additionalData.setExecutionStatus) { diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 2b441605eeb0d..9d6b5bfeebda3 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -2,7 +2,8 @@ import * as NodeExecuteFunctions from './NodeExecuteFunctions'; import * as UserSettings from './UserSettings'; export * from './ActiveWorkflows'; -export * from './BinaryDataManager'; +export * from './BinaryData/BinaryData.service'; +export * from './BinaryData/types'; export * from './ClassLoader'; export * from './Constants'; export * from './Credentials'; diff --git a/packages/core/test/NodeExecuteFunctions.test.ts b/packages/core/test/NodeExecuteFunctions.test.ts index b0ef257a89562..f6eefe67dfad6 100644 --- a/packages/core/test/NodeExecuteFunctions.test.ts +++ b/packages/core/test/NodeExecuteFunctions.test.ts @@ -1,4 +1,3 @@ -import { BinaryDataManager } from '@/BinaryDataManager'; import { getBinaryDataBuffer, parseIncomingMessage, @@ -16,25 +15,24 @@ import type { Workflow, WorkflowHooks, } from 'n8n-workflow'; +import { BinaryDataService } from '@/BinaryData/BinaryData.service'; import nock from 'nock'; import { tmpdir } from 'os'; import { join } from 'path'; import { initLogger } from './helpers/utils'; +import Container from 'typedi'; const temporaryDir = mkdtempSync(join(tmpdir(), 'n8n')); describe('NodeExecuteFunctions', () => { describe('test binary data helper methods', () => { - // Reset BinaryDataManager for each run. This is a dirty operation, as individual managers are not cleaned. - beforeEach(() => { - BinaryDataManager.instance = undefined; - }); - test("test getBinaryDataBuffer(...) & setBinaryDataBuffer(...) methods in 'default' mode", async () => { // Setup a 'default' binary data manager instance - await BinaryDataManager.init({ + Container.set(BinaryDataService, new BinaryDataService()); + + await Container.get(BinaryDataService).init({ mode: 'default', - availableModes: 'default', + availableModes: ['default'], localStoragePath: temporaryDir, }); @@ -80,10 +78,12 @@ describe('NodeExecuteFunctions', () => { }); test("test getBinaryDataBuffer(...) & setBinaryDataBuffer(...) methods in 'filesystem' mode", async () => { + Container.set(BinaryDataService, new BinaryDataService()); + // Setup a 'filesystem' binary data manager instance - await BinaryDataManager.init({ + await Container.get(BinaryDataService).init({ mode: 'filesystem', - availableModes: 'filesystem', + availableModes: ['filesystem'], localStoragePath: temporaryDir, }); diff --git a/packages/nodes-base/nodes/Aws/S3/test/V1/AwsS3.node.test.ts b/packages/nodes-base/nodes/Aws/S3/test/V1/AwsS3.node.test.ts index 91150c8c86d2d..c13c572ef2758 100644 --- a/packages/nodes-base/nodes/Aws/S3/test/V1/AwsS3.node.test.ts +++ b/packages/nodes-base/nodes/Aws/S3/test/V1/AwsS3.node.test.ts @@ -1,5 +1,5 @@ import nock from 'nock'; -import { getWorkflowFilenames, initBinaryDataManager, testWorkflows } from '@test/nodes/Helpers'; +import { getWorkflowFilenames, initBinaryDataService, testWorkflows } from '@test/nodes/Helpers'; const workflows = getWorkflowFilenames(__dirname); @@ -11,7 +11,7 @@ describe('Test S3 V1 Node', () => { beforeAll(async () => { jest.useFakeTimers({ doNotFake: ['nextTick'], now }); - await initBinaryDataManager(); + await initBinaryDataService(); nock.disableNetConnect(); mock = nock('https://bucket.s3.eu-central-1.amazonaws.com'); diff --git a/packages/nodes-base/nodes/Aws/S3/test/V2/AwsS3.node.test.ts b/packages/nodes-base/nodes/Aws/S3/test/V2/AwsS3.node.test.ts index b8a9a3d6c0a22..2f3b0516c8cbc 100644 --- a/packages/nodes-base/nodes/Aws/S3/test/V2/AwsS3.node.test.ts +++ b/packages/nodes-base/nodes/Aws/S3/test/V2/AwsS3.node.test.ts @@ -1,5 +1,5 @@ import nock from 'nock'; -import { getWorkflowFilenames, initBinaryDataManager, testWorkflows } from '@test/nodes/Helpers'; +import { getWorkflowFilenames, initBinaryDataService, testWorkflows } from '@test/nodes/Helpers'; const workflows = getWorkflowFilenames(__dirname); @@ -11,7 +11,7 @@ describe('Test S3 V2 Node', () => { beforeAll(async () => { jest.useFakeTimers({ doNotFake: ['nextTick'], now }); - await initBinaryDataManager(); + await initBinaryDataService(); nock.disableNetConnect(); mock = nock('https://bucket.s3.eu-central-1.amazonaws.com'); diff --git a/packages/nodes-base/nodes/Code/test/Code.node.test.ts b/packages/nodes-base/nodes/Code/test/Code.node.test.ts index 7c47a0a5632f0..da798db3a8206 100644 --- a/packages/nodes-base/nodes/Code/test/Code.node.test.ts +++ b/packages/nodes-base/nodes/Code/test/Code.node.test.ts @@ -3,7 +3,7 @@ import { NodeVM } from '@n8n/vm2'; import type { IExecuteFunctions, IWorkflowDataProxyData } from 'n8n-workflow'; import { NodeHelpers } from 'n8n-workflow'; import { normalizeItems } from 'n8n-core'; -import { testWorkflows, getWorkflowFilenames, initBinaryDataManager } from '@test/nodes/Helpers'; +import { testWorkflows, getWorkflowFilenames, initBinaryDataService } from '@test/nodes/Helpers'; import { Code } from '../Code.node'; import { ValidationError } from '../ValidationError'; @@ -11,7 +11,7 @@ describe('Test Code Node', () => { const workflows = getWorkflowFilenames(__dirname); beforeAll(async () => { - await initBinaryDataManager(); + await initBinaryDataService(); }); testWorkflows(workflows); diff --git a/packages/nodes-base/nodes/Compression/test/node/Compression.test.ts b/packages/nodes-base/nodes/Compression/test/node/Compression.test.ts index 5fbe41796346d..ea7ff18819ebd 100644 --- a/packages/nodes-base/nodes/Compression/test/node/Compression.test.ts +++ b/packages/nodes-base/nodes/Compression/test/node/Compression.test.ts @@ -5,7 +5,7 @@ import type { IDataObject } from 'n8n-workflow'; import { getResultNodeData, setup, - initBinaryDataManager, + initBinaryDataService, readJsonFileSync, } from '@test/nodes/Helpers'; import { executeWorkflow } from '@test/nodes/ExecuteWorkflow'; @@ -16,7 +16,7 @@ import os from 'node:os'; if (os.platform() !== 'win32') { describe('Execute Compression Node', () => { beforeEach(async () => { - await initBinaryDataManager(); + await initBinaryDataService(); }); const workflowData = readJsonFileSync('nodes/Compression/test/node/workflow.compression.json'); diff --git a/packages/nodes-base/nodes/Crypto/test/Crypto.test.ts b/packages/nodes-base/nodes/Crypto/test/Crypto.test.ts index ab4742b70dd62..b321d4e852b67 100644 --- a/packages/nodes-base/nodes/Crypto/test/Crypto.test.ts +++ b/packages/nodes-base/nodes/Crypto/test/Crypto.test.ts @@ -1,7 +1,7 @@ import fs from 'fs'; import fsPromises from 'fs/promises'; import { Readable } from 'stream'; -import { testWorkflows, getWorkflowFilenames, initBinaryDataManager } from '@test/nodes/Helpers'; +import { testWorkflows, getWorkflowFilenames, initBinaryDataService } from '@test/nodes/Helpers'; const workflows = getWorkflowFilenames(__dirname); @@ -13,7 +13,7 @@ describe('Test Crypto Node', () => { fs.createReadStream = () => Readable.from(Buffer.from('test')) as fs.ReadStream; beforeEach(async () => { - await initBinaryDataManager(); + await initBinaryDataService(); }); testWorkflows(workflows); diff --git a/packages/nodes-base/nodes/HttpRequest/test/binaryData/HttpRequest.test.ts b/packages/nodes-base/nodes/HttpRequest/test/binaryData/HttpRequest.test.ts index c539f4bc44273..4ef4a1720189a 100644 --- a/packages/nodes-base/nodes/HttpRequest/test/binaryData/HttpRequest.test.ts +++ b/packages/nodes-base/nodes/HttpRequest/test/binaryData/HttpRequest.test.ts @@ -4,7 +4,7 @@ import { equalityTest, workflowToTests, getWorkflowFilenames, - initBinaryDataManager, + initBinaryDataService, } from '@test/nodes/Helpers'; describe('Test Binary Data Download', () => { @@ -14,7 +14,7 @@ describe('Test Binary Data Download', () => { const baseUrl = 'https://dummy.domain'; beforeAll(async () => { - await initBinaryDataManager(); + await initBinaryDataService(); nock.disableNetConnect(); diff --git a/packages/nodes-base/nodes/HttpRequest/test/node/HttpRequest.test.ts b/packages/nodes-base/nodes/HttpRequest/test/node/HttpRequest.test.ts index 9be90520ca86d..49617225e96f6 100644 --- a/packages/nodes-base/nodes/HttpRequest/test/node/HttpRequest.test.ts +++ b/packages/nodes-base/nodes/HttpRequest/test/node/HttpRequest.test.ts @@ -1,4 +1,10 @@ -import { setup, equalityTest, workflowToTests, getWorkflowFilenames } from '@test/nodes/Helpers'; +import { + initBinaryDataService, + setup, + equalityTest, + workflowToTests, + getWorkflowFilenames, +} from '@test/nodes/Helpers'; import nock from 'nock'; @@ -8,7 +14,8 @@ describe('Test HTTP Request Node', () => { const baseUrl = 'https://dummyjson.com'; - beforeAll(() => { + beforeAll(async () => { + await initBinaryDataService(); nock.disableNetConnect(); //GET diff --git a/packages/nodes-base/nodes/ICalendar/test/node/ICalendar.test.ts b/packages/nodes-base/nodes/ICalendar/test/node/ICalendar.test.ts index f9adbf508f22f..9757cd5f2a479 100644 --- a/packages/nodes-base/nodes/ICalendar/test/node/ICalendar.test.ts +++ b/packages/nodes-base/nodes/ICalendar/test/node/ICalendar.test.ts @@ -5,13 +5,13 @@ import { getResultNodeData, setup, readJsonFileSync, - initBinaryDataManager, + initBinaryDataService, } from '@test/nodes/Helpers'; import { executeWorkflow } from '@test/nodes/ExecuteWorkflow'; describe('Execute iCalendar Node', () => { beforeEach(async () => { - await initBinaryDataManager(); + await initBinaryDataService(); }); const workflowData = readJsonFileSync('nodes/ICalendar/test/node/workflow.iCalendar.json'); diff --git a/packages/nodes-base/nodes/MoveBinaryData/test/MoveBinaryData.test.ts b/packages/nodes-base/nodes/MoveBinaryData/test/MoveBinaryData.test.ts index e0aef1e046fc1..8abf8e2db53bf 100644 --- a/packages/nodes-base/nodes/MoveBinaryData/test/MoveBinaryData.test.ts +++ b/packages/nodes-base/nodes/MoveBinaryData/test/MoveBinaryData.test.ts @@ -6,7 +6,7 @@ import path from 'path'; describe('Test Move Binary Data Node', () => { beforeEach(async () => { - await Helpers.initBinaryDataManager(); + await Helpers.initBinaryDataService(); }); const workflow = Helpers.readJsonFileSync( diff --git a/packages/nodes-base/nodes/QuickChart/test/QuickChart.node.test.ts b/packages/nodes-base/nodes/QuickChart/test/QuickChart.node.test.ts index eeaa22449341d..94ea8cc6bd688 100644 --- a/packages/nodes-base/nodes/QuickChart/test/QuickChart.node.test.ts +++ b/packages/nodes-base/nodes/QuickChart/test/QuickChart.node.test.ts @@ -6,7 +6,7 @@ import nock from 'nock'; describe('Test QuickChart Node', () => { beforeEach(async () => { - await Helpers.initBinaryDataManager(); + await Helpers.initBinaryDataService(); nock.disableNetConnect(); nock('https://quickchart.io') .persist() diff --git a/packages/nodes-base/nodes/ReadBinaryFile/test/ReadBinaryFile.test.ts b/packages/nodes-base/nodes/ReadBinaryFile/test/ReadBinaryFile.test.ts index 637b1d6b07847..8590bf61fca05 100644 --- a/packages/nodes-base/nodes/ReadBinaryFile/test/ReadBinaryFile.test.ts +++ b/packages/nodes-base/nodes/ReadBinaryFile/test/ReadBinaryFile.test.ts @@ -6,7 +6,7 @@ import path from 'path'; describe('Test Read Binary File Node', () => { beforeEach(async () => { - await Helpers.initBinaryDataManager(); + await Helpers.initBinaryDataService(); }); const workflow = Helpers.readJsonFileSync( diff --git a/packages/nodes-base/nodes/ReadBinaryFiles/test/ReadBinaryFiles.test.ts b/packages/nodes-base/nodes/ReadBinaryFiles/test/ReadBinaryFiles.test.ts index 78f021c504318..d4bf87f04533f 100644 --- a/packages/nodes-base/nodes/ReadBinaryFiles/test/ReadBinaryFiles.test.ts +++ b/packages/nodes-base/nodes/ReadBinaryFiles/test/ReadBinaryFiles.test.ts @@ -6,7 +6,7 @@ import path from 'path'; describe('Test Read Binary Files Node', () => { beforeEach(async () => { - await Helpers.initBinaryDataManager(); + await Helpers.initBinaryDataService(); }); const workflow = Helpers.readJsonFileSync( diff --git a/packages/nodes-base/nodes/ReadPdf/test/ReadPDF.test.ts b/packages/nodes-base/nodes/ReadPdf/test/ReadPDF.test.ts index cd91f69a1df06..8a394c0ce9ba4 100644 --- a/packages/nodes-base/nodes/ReadPdf/test/ReadPDF.test.ts +++ b/packages/nodes-base/nodes/ReadPdf/test/ReadPDF.test.ts @@ -1,10 +1,10 @@ -import { getWorkflowFilenames, initBinaryDataManager, testWorkflows } from '@test/nodes/Helpers'; +import { getWorkflowFilenames, initBinaryDataService, testWorkflows } from '@test/nodes/Helpers'; describe('Test Read PDF Node', () => { const workflows = getWorkflowFilenames(__dirname); beforeAll(async () => { - await initBinaryDataManager(); + await initBinaryDataService(); }); testWorkflows(workflows); diff --git a/packages/nodes-base/nodes/SpreadsheetFile/test/SpreadsheetFile.test.ts b/packages/nodes-base/nodes/SpreadsheetFile/test/SpreadsheetFile.test.ts index ab7313deda2b9..ddbe08c271611 100644 --- a/packages/nodes-base/nodes/SpreadsheetFile/test/SpreadsheetFile.test.ts +++ b/packages/nodes-base/nodes/SpreadsheetFile/test/SpreadsheetFile.test.ts @@ -6,7 +6,7 @@ import path from 'path'; describe('Execute Spreadsheet File Node', () => { beforeEach(async () => { - await Helpers.initBinaryDataManager(); + await Helpers.initBinaryDataService(); }); // replace workflow json 'Read Binary File' node's filePath to local file diff --git a/packages/nodes-base/nodes/WriteBinaryFile/test/WriteBinaryFile.test.ts b/packages/nodes-base/nodes/WriteBinaryFile/test/WriteBinaryFile.test.ts index 147d4318f3aa5..9cd6df1da5fa7 100644 --- a/packages/nodes-base/nodes/WriteBinaryFile/test/WriteBinaryFile.test.ts +++ b/packages/nodes-base/nodes/WriteBinaryFile/test/WriteBinaryFile.test.ts @@ -6,7 +6,7 @@ import path from 'path'; describe('Test Write Binary File Node', () => { beforeEach(async () => { - await Helpers.initBinaryDataManager(); + await Helpers.initBinaryDataService(); }); const temporaryDir = Helpers.createTemporaryDir(); diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index 1172e272d5d19..1b099321f642a 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -872,6 +872,7 @@ "snowflake-sdk": "^1.8.0", "ssh2-sftp-client": "^7.0.0", "tmp-promise": "^3.0.2", + "typedi": "^0.10.0", "uuid": "^8.3.2", "xlsx": "https://cdn.sheetjs.com/xlsx-0.19.3/xlsx-0.19.3.tgz", "xml2js": "^0.5.0" diff --git a/packages/nodes-base/test/nodes/Helpers.ts b/packages/nodes-base/test/nodes/Helpers.ts index d14ef99577011..6735223e3fdb3 100644 --- a/packages/nodes-base/test/nodes/Helpers.ts +++ b/packages/nodes-base/test/nodes/Helpers.ts @@ -3,7 +3,8 @@ import path from 'path'; import { tmpdir } from 'os'; import { isEmpty } from 'lodash'; import { get } from 'lodash'; -import { BinaryDataManager, Credentials, constructExecutionMetaData } from 'n8n-core'; +import { BinaryDataService, Credentials, constructExecutionMetaData } from 'n8n-core'; +import { Container } from 'typedi'; import type { CredentialLoadingDetails, ICredentialDataDecryptedObject, @@ -216,14 +217,10 @@ export function createTemporaryDir(prefix = 'n8n') { return mkdtempSync(path.join(tmpdir(), prefix)); } -export async function initBinaryDataManager(mode: 'default' | 'filesystem' = 'default') { - const temporaryDir = createTemporaryDir(); - await BinaryDataManager.init({ - mode, - availableModes: mode, - localStoragePath: temporaryDir, - }); - return temporaryDir; +export async function initBinaryDataService(mode: 'default' | 'filesystem' = 'default') { + const binaryDataService = new BinaryDataService(); + await binaryDataService.init({ mode: 'default', availableModes: [mode] }); + Container.set(BinaryDataService, binaryDataService); } const credentialTypes = new CredentialType(); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7ae16a82e49a9..d88bff582d7c0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -139,7 +139,7 @@ importers: dependencies: axios: specifier: ^0.21.1 - version: 0.21.4 + version: 0.21.4(debug@4.3.2) packages/@n8n_io/eslint-config: devDependencies: @@ -217,7 +217,7 @@ importers: version: 7.28.1 axios: specifier: ^0.21.1 - version: 0.21.4 + version: 0.21.4(debug@4.3.2) basic-auth: specifier: ^2.0.1 version: 2.0.1 @@ -572,7 +572,7 @@ importers: version: link:../@n8n/client-oauth2 axios: specifier: ^0.21.1 - version: 0.21.4 + version: 0.21.4(debug@4.3.2) concat-stream: specifier: ^2.0.0 version: 2.0.0 @@ -618,6 +618,9 @@ importers: qs: specifier: ^6.10.1 version: 6.11.0 + typedi: + specifier: ^0.10.0 + version: 0.10.0(patch_hash=62r6bc2crgimafeyruodhqlgo4) uuid: specifier: ^8.3.2 version: 8.3.2 @@ -835,7 +838,7 @@ importers: version: 10.2.0(vue@3.3.4) axios: specifier: ^0.21.1 - version: 0.21.4 + version: 0.21.4(debug@4.3.2) codemirror-lang-html-n8n: specifier: ^1.0.0 version: 1.0.0 @@ -1171,6 +1174,9 @@ importers: tmp-promise: specifier: ^3.0.2 version: 3.0.3 + typedi: + specifier: ^0.10.0 + version: 0.10.0(patch_hash=62r6bc2crgimafeyruodhqlgo4) uuid: specifier: ^8.3.2 version: 8.3.2 @@ -5016,7 +5022,7 @@ packages: dependencies: '@segment/loosely-validate-event': 2.0.0 auto-changelog: 1.16.4 - axios: 0.21.4 + axios: 0.21.4(debug@4.3.2) axios-retry: 3.3.1 bull: 3.29.3 lodash.clonedeep: 4.5.0 @@ -9066,18 +9072,19 @@ packages: is-retry-allowed: 2.2.0 dev: false - /axios@0.21.4: + /axios@0.21.4(debug@4.3.2): resolution: {integrity: sha512-ut5vewkiu8jjGBdqpM44XxjuCjq9LAKeHVmoVfHVzy8eHgxxq8SbAVQNovDA8mVi05kP0Ea/n/UzcSHcTJQfNg==} dependencies: - follow-redirects: 1.15.2(debug@4.3.4) + follow-redirects: 1.15.2(debug@4.3.2) transitivePeerDependencies: - debug dev: false - /axios@0.21.4(debug@4.3.2): - resolution: {integrity: sha512-ut5vewkiu8jjGBdqpM44XxjuCjq9LAKeHVmoVfHVzy8eHgxxq8SbAVQNovDA8mVi05kP0Ea/n/UzcSHcTJQfNg==} + /axios@0.27.2: + resolution: {integrity: sha512-t+yRIyySRTp/wua5xEr+z1q60QmLq8ABsS5O9Me1AsE5dfKqgnCFzwiCZZ/cGNd1lq4/7akDWMxdhVlucjmnOQ==} dependencies: follow-redirects: 1.15.2(debug@4.3.2) + form-data: 4.0.0 transitivePeerDependencies: - debug dev: false @@ -9098,11 +9105,12 @@ packages: form-data: 4.0.0 transitivePeerDependencies: - debug + dev: true /axios@1.4.0: resolution: {integrity: sha512-S4XCWMEmzvo64T9GfvQDOXgYRDJ/wsSZc7Jvdgx5u1sd0JwsuPLqb3SYmusag+edF6ziyMensPVqLTSc1PiSEA==} dependencies: - follow-redirects: 1.15.2(debug@4.3.4) + follow-redirects: 1.15.2(debug@4.3.2) form-data: 4.0.0 proxy-from-env: 1.1.0 transitivePeerDependencies: @@ -12668,6 +12676,7 @@ packages: optional: true dependencies: debug: 4.3.4(supports-color@8.1.1) + dev: true /for-each@0.3.3: resolution: {integrity: sha512-jqYfLp7mo9vIyQf8ykW2v7A+2N4QjeCeI5+Dz9XraiO1ign81wjiH7Fb9vSOWvQfNtmSa4H2RoQTrrXivdUZmw==} @@ -17997,7 +18006,7 @@ packages: resolution: {integrity: sha512-aXYe/D+28kF63W8Cz53t09ypEORz+ULeDCahdAqhVrRm2scbOXFbtnn0GGhvMpYe45grepLKuwui9KxrZ2ZuMw==} engines: {node: '>=14.17.0'} dependencies: - axios: 0.27.2(debug@4.3.4) + axios: 0.27.2 transitivePeerDependencies: - debug dev: false