diff --git a/packages/cli/src/License.ts b/packages/cli/src/License.ts index feec7c5c5bd3b..f90443d48a32f 100644 --- a/packages/cli/src/License.ts +++ b/packages/cli/src/License.ts @@ -15,6 +15,7 @@ import Container, { Service } from 'typedi'; import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces'; import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher'; import { RedisService } from './services/redis.service'; +import { ObjectStoreService } from 'n8n-core'; type FeatureReturnType = Partial< { @@ -103,6 +104,18 @@ export class License { command: 'reloadLicense', }); } + + const isS3Selected = config.getEnv('binaryDataManager.mode') === 's3'; + const isS3Available = config.getEnv('binaryDataManager.availableModes').includes('s3'); + const isS3Licensed = _features['feat:binaryDataS3']; + + if (isS3Selected && isS3Available && !isS3Licensed) { + this.logger.debug( + 'License changed with no support for external storage - blocking writes on object store. To restore writes, please upgrade to a license that supports this feature.', + ); + + Container.get(ObjectStoreService).setReadonly(true); + } } async saveCertStr(value: TLicenseBlock): Promise { diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index a6197c50e91f6..ddde98bc89313 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -1446,28 +1446,39 @@ export class Server extends AbstractServer { // Binary data // ---------------------------------------- - // Download binary + // View or download binary file this.app.get( - `/${this.restEndpoint}/data/:path`, + `/${this.restEndpoint}/data`, async (req: BinaryDataRequest, res: express.Response): Promise => { - // TODO UM: check if this needs permission check for UM - const identifier = req.params.path; + const { id: binaryDataId, action } = req.query; + let { fileName, mimeType } = req.query; + const [mode] = binaryDataId.split(':') as ['filesystem' | 's3', string]; + try { - const binaryPath = this.binaryDataService.getPath(identifier); - let { mode, fileName, mimeType } = req.query; + const binaryPath = this.binaryDataService.getPath(binaryDataId); + if (!fileName || !mimeType) { try { - const metadata = await this.binaryDataService.getMetadata(identifier); + const metadata = await this.binaryDataService.getMetadata(binaryDataId); fileName = metadata.fileName; mimeType = metadata.mimeType; res.setHeader('Content-Length', metadata.fileSize); } catch {} } + if (mimeType) res.setHeader('Content-Type', mimeType); - if (mode === 'download') { + + if (action === 'download') { res.setHeader('Content-Disposition', `attachment; filename="${fileName}"`); } - res.sendFile(binaryPath); + + if (mode === 's3') { + const readStream = await this.binaryDataService.getAsStream(binaryDataId); + readStream.pipe(res); + return; + } else { + res.sendFile(binaryPath); + } } catch (error) { if (error instanceof FileNotFoundError) res.writeHead(404).end(); else throw error; diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 5c38fb992f4f4..7a5daab98ec85 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -485,7 +485,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { workflowId: this.workflowData.id, }); - if (this.mode === 'webhook' && config.getEnv('binaryDataManager.mode') === 'filesystem') { + if (this.mode === 'webhook' && config.getEnv('binaryDataManager.mode') !== 'default') { await restoreBinaryDataId(fullRunData, this.executionId); } diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index 49072a1082875..a1628adf10ae5 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -3,13 +3,13 @@ import { ExitError } from '@oclif/errors'; import { Container } from 'typedi'; import { LoggerProxy, ErrorReporterProxy as ErrorReporter, sleep } from 'n8n-workflow'; import type { IUserSettings } from 'n8n-core'; -import { BinaryDataService, UserSettings } from 'n8n-core'; +import { BinaryDataService, ObjectStoreService, UserSettings } from 'n8n-core'; import type { AbstractServer } from '@/AbstractServer'; import { getLogger } from '@/Logger'; import config from '@/config'; import * as Db from '@/Db'; import * as CrashJournal from '@/CrashJournal'; -import { inTest } from '@/constants'; +import { LICENSE_FEATURES, inTest } from '@/constants'; import { CredentialTypes } from '@/CredentialTypes'; import { CredentialsOverwrites } from '@/CredentialsOverwrites'; import { initErrorHandling } from '@/ErrorReporting'; @@ -125,7 +125,119 @@ export abstract class BaseCommand extends Command { process.exit(1); } + async initObjectStoreService() { + const isSelected = config.getEnv('binaryDataManager.mode') === 's3'; + const isAvailable = config.getEnv('binaryDataManager.availableModes').includes('s3'); + + if (!isSelected && !isAvailable) return; + + if (isSelected && !isAvailable) { + throw new Error( + 'External storage selected but unavailable. Please make external storage available by adding "s3" to `N8N_AVAILABLE_BINARY_DATA_MODES`.', + ); + } + + const isLicensed = Container.get(License).isFeatureEnabled(LICENSE_FEATURES.BINARY_DATA_S3); + + if (isSelected && isAvailable && isLicensed) { + LoggerProxy.debug( + 'License found for external storage - object store to init in read-write mode', + ); + + await this._initObjectStoreService(); + + return; + } + + if (isSelected && isAvailable && !isLicensed) { + LoggerProxy.debug( + 'No license found for external storage - object store to init with writes blocked. To enable writes, please upgrade to a license that supports this feature.', + ); + + await this._initObjectStoreService({ isReadOnly: true }); + + return; + } + + if (!isSelected && isAvailable) { + LoggerProxy.debug( + 'External storage unselected but available - object store to init with writes unused', + ); + + await this._initObjectStoreService(); + + return; + } + } + + private async _initObjectStoreService(options = { isReadOnly: false }) { + const objectStoreService = Container.get(ObjectStoreService); + + const host = config.getEnv('externalStorage.s3.host'); + + if (host === '') { + throw new Error( + 'External storage host not configured. Please set `N8N_EXTERNAL_STORAGE_S3_HOST`.', + ); + } + + const bucket = { + name: config.getEnv('externalStorage.s3.bucket.name'), + region: config.getEnv('externalStorage.s3.bucket.region'), + }; + + if (bucket.name === '') { + throw new Error( + 'External storage bucket name not configured. Please set `N8N_EXTERNAL_STORAGE_S3_BUCKET_NAME`.', + ); + } + + if (bucket.region === '') { + throw new Error( + 'External storage bucket region not configured. Please set `N8N_EXTERNAL_STORAGE_S3_BUCKET_REGION`.', + ); + } + + const credentials = { + accessKey: config.getEnv('externalStorage.s3.credentials.accessKey'), + accessSecret: config.getEnv('externalStorage.s3.credentials.accessSecret'), + }; + + if (credentials.accessKey === '') { + throw new Error( + 'External storage access key not configured. Please set `N8N_EXTERNAL_STORAGE_S3_ACCESS_KEY`.', + ); + } + + if (credentials.accessSecret === '') { + throw new Error( + 'External storage access secret not configured. Please set `N8N_EXTERNAL_STORAGE_S3_ACCESS_SECRET`.', + ); + } + + LoggerProxy.debug('Initializing object store service'); + + try { + await objectStoreService.init(host, bucket, credentials); + objectStoreService.setReadonly(options.isReadOnly); + + LoggerProxy.debug('Object store init completed'); + } catch (e) { + const error = e instanceof Error ? e : new Error(`${e}`); + + LoggerProxy.debug('Object store init failed', { error }); + } + } + async initBinaryDataService() { + try { + await this.initObjectStoreService(); + } catch (e) { + const error = e instanceof Error ? e : new Error(`${e}`); + LoggerProxy.error(`Failed to init object store: ${error.message}`, { error }); + process.exit(1); + } + const binaryDataConfig = config.getEnv('binaryDataManager'); await Container.get(BinaryDataService).init(binaryDataConfig); } diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 6c54ff75945f5..65590d27a1a1d 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -908,7 +908,7 @@ export const schema = { doc: 'Available modes of binary data storage, as comma separated strings', }, mode: { - format: ['default', 'filesystem'] as const, + format: ['default', 'filesystem', 's3'] as const, default: 'default', env: 'N8N_DEFAULT_BINARY_DATA_MODE', doc: 'Storage mode for binary data', @@ -921,6 +921,45 @@ export const schema = { }, }, + externalStorage: { + s3: { + host: { + format: String, + default: '', + env: 'N8N_EXTERNAL_STORAGE_S3_HOST', + doc: 'Host of the n8n bucket in S3-compatible external storage, e.g. `s3.us-east-1.amazonaws.com`', + }, + bucket: { + name: { + format: String, + default: '', + env: 'N8N_EXTERNAL_STORAGE_S3_BUCKET_NAME', + doc: 'Name of the n8n bucket in S3-compatible external storage', + }, + region: { + format: String, + default: '', + env: 'N8N_EXTERNAL_STORAGE_S3_BUCKET_REGION', + doc: 'Region of the n8n bucket in S3-compatible external storage, e.g. `us-east-1`', + }, + }, + credentials: { + accessKey: { + format: String, + default: '', + env: 'N8N_EXTERNAL_STORAGE_S3_ACCESS_KEY', + doc: 'Access key in S3-compatible external storage', + }, + accessSecret: { + format: String, + default: '', + env: 'N8N_EXTERNAL_STORAGE_S3_ACCESS_SECRET', + doc: 'Access secret in S3-compatible external storage', + }, + }, + }, + }, + deployment: { type: { format: String, diff --git a/packages/cli/src/constants.ts b/packages/cli/src/constants.ts index 8df0c513208f9..dcb4ab5e50dbe 100644 --- a/packages/cli/src/constants.ts +++ b/packages/cli/src/constants.ts @@ -81,6 +81,7 @@ export const LICENSE_FEATURES = { SHOW_NON_PROD_BANNER: 'feat:showNonProdBanner', WORKFLOW_HISTORY: 'feat:workflowHistory', DEBUG_IN_EDITOR: 'feat:debugInEditor', + BINARY_DATA_S3: 'feat:binaryDataS3', } as const; export const LICENSE_QUOTAS = { diff --git a/packages/cli/src/controllers/e2e.controller.ts b/packages/cli/src/controllers/e2e.controller.ts index f4e7a744994cb..24b499747675c 100644 --- a/packages/cli/src/controllers/e2e.controller.ts +++ b/packages/cli/src/controllers/e2e.controller.ts @@ -68,6 +68,7 @@ export class E2EController { [LICENSE_FEATURES.SHOW_NON_PROD_BANNER]: false, [LICENSE_FEATURES.WORKFLOW_HISTORY]: false, [LICENSE_FEATURES.DEBUG_IN_EDITOR]: false, + [LICENSE_FEATURES.BINARY_DATA_S3]: false, }; constructor( diff --git a/packages/cli/src/executionLifecycleHooks/restoreBinaryDataId.ts b/packages/cli/src/executionLifecycleHooks/restoreBinaryDataId.ts index 88d8688df7f6b..9870f365fdc48 100644 --- a/packages/cli/src/executionLifecycleHooks/restoreBinaryDataId.ts +++ b/packages/cli/src/executionLifecycleHooks/restoreBinaryDataId.ts @@ -1,13 +1,14 @@ import Container from 'typedi'; import { BinaryDataService } from 'n8n-core'; import type { IRun } from 'n8n-workflow'; - -export function isMissingExecutionId(binaryDataId: string) { - const UUID_CHAR_LENGTH = 36; - - return [UUID_CHAR_LENGTH + 'filesystem:'.length, UUID_CHAR_LENGTH + 's3:'.length].some( - (incorrectLength) => binaryDataId.length === incorrectLength, - ); +import type { BinaryData } from 'n8n-core'; + +export function isMissingExecutionId( + fileId: string, + mode: BinaryData.NonDefaultMode, + uuidV4CharLength = 36, +) { + return mode === 'filesystem' ? uuidV4CharLength === fileId.length : fileId.includes('/temp/'); } /** @@ -19,6 +20,9 @@ export function isMissingExecutionId(binaryDataId: string) { * ```txt * filesystem:11869055-83c4-4493-876a-9092c4708b9b -> * filesystem:39011869055-83c4-4493-876a-9092c4708b9b + * + * s3:workflows/123/executions/temp/binary_data/69055-83c4-4493-876a-9092c4708b9b -> + * s3:workflows/123/executions/390/binary_data/69055-83c4-4493-876a-9092c4708b9b * ``` */ export async function restoreBinaryDataId(run: IRun, executionId: string) { @@ -27,13 +31,18 @@ export async function restoreBinaryDataId(run: IRun, executionId: string) { const promises = Object.keys(runData).map(async (nodeName) => { const binaryDataId = runData[nodeName]?.[0]?.data?.main?.[0]?.[0]?.binary?.data.id; - if (!binaryDataId || !isMissingExecutionId(binaryDataId)) return; + if (!binaryDataId) return; - const [mode, incorrectFileId] = binaryDataId.split(':'); - const correctFileId = `${executionId}${incorrectFileId}`; - const correctBinaryDataId = `${mode}:${correctFileId}`; + const [mode, fileId] = binaryDataId.split(':') as [BinaryData.NonDefaultMode, string]; - await Container.get(BinaryDataService).rename(incorrectFileId, correctFileId); + if (!isMissingExecutionId(fileId, mode)) return; + + const correctFileId = + mode === 'filesystem' ? `${executionId}${fileId}` : fileId.replace('temp', executionId); + + await Container.get(BinaryDataService).rename(fileId, correctFileId); + + const correctBinaryDataId = `${mode}:${correctFileId}`; // @ts-expect-error Validated at the top run.data.resultData.runData[nodeName][0].data.main[0][0].binary.data.id = correctBinaryDataId; diff --git a/packages/cli/src/requests.ts b/packages/cli/src/requests.ts index 00bf110aebc58..f50cdfda8c507 100644 --- a/packages/cli/src/requests.ts +++ b/packages/cli/src/requests.ts @@ -492,11 +492,12 @@ export declare namespace LicenseRequest { } export type BinaryDataRequest = AuthenticatedRequest< - { path: string }, + {}, {}, {}, { - mode: 'view' | 'download'; + id: string; + action: 'view' | 'download'; fileName?: string; mimeType?: string; } diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index 41b01619885e2..e171864158c09 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -23,6 +23,7 @@ const oclifConfig: Config.IConfig = new Config.Config({ root: __dirname }); beforeAll(async () => { LoggerProxy.init(getLogger()); config.set('executions.mode', 'queue'); + config.set('binaryDataManager.availableModes', 'filesystem'); mockInstance(Telemetry); mockInstance(PostHogClient); mockInstance(InternalHooks); diff --git a/packages/cli/test/integration/shared/utils/index.ts b/packages/cli/test/integration/shared/utils/index.ts index ee4b1e57509f3..cd5d215f60b84 100644 --- a/packages/cli/test/integration/shared/utils/index.ts +++ b/packages/cli/test/integration/shared/utils/index.ts @@ -74,11 +74,13 @@ export async function initNodeTypes() { /** * Initialize a BinaryDataService for test runs. */ -export async function initBinaryDataService() { +export async function initBinaryDataService(mode: 'default' | 'filesystem' = 'default') { const binaryDataService = new BinaryDataService(); - - await binaryDataService.init(config.getEnv('binaryDataManager')); - + await binaryDataService.init({ + mode, + availableModes: [mode], + localStoragePath: '', + }); Container.set(BinaryDataService, binaryDataService); } diff --git a/packages/cli/test/unit/execution.lifecycle.test.ts b/packages/cli/test/unit/execution.lifecycle.test.ts index f68cf34f7dc1d..b121d739caaac 100644 --- a/packages/cli/test/unit/execution.lifecycle.test.ts +++ b/packages/cli/test/unit/execution.lifecycle.test.ts @@ -2,6 +2,7 @@ import { restoreBinaryDataId } from '@/executionLifecycleHooks/restoreBinaryData import { BinaryDataService } from 'n8n-core'; import { mockInstance } from '../integration/shared/utils/mocking'; import type { IRun } from 'n8n-workflow'; +import config from '@/config'; function toIRun(item?: object) { return { @@ -27,62 +28,141 @@ function getDataId(run: IRun, kind: 'binary' | 'json') { return run.data.resultData.runData.myNode[0].data.main[0][0][kind].data.id; } -describe('restoreBinaryDataId()', () => { - const binaryDataService = mockInstance(BinaryDataService); +const binaryDataService = mockInstance(BinaryDataService); - beforeEach(() => { - jest.clearAllMocks(); - }); +describe('on filesystem mode', () => { + describe('restoreBinaryDataId()', () => { + beforeAll(() => { + config.set('binaryDataManager.mode', 'filesystem'); + }); - it('should restore if binary data ID is missing execution ID', async () => { - const executionId = '999'; - const incorrectFileId = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6'; - const run = toIRun({ - binary: { - data: { id: `filesystem:${incorrectFileId}` }, - }, + afterEach(() => { + jest.clearAllMocks(); }); - await restoreBinaryDataId(run, executionId); + it('should restore if binary data ID is missing execution ID', async () => { + const executionId = '999'; + const incorrectFileId = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6'; + const run = toIRun({ + binary: { + data: { id: `filesystem:${incorrectFileId}` }, + }, + }); - const correctFileId = `${executionId}${incorrectFileId}`; - const correctBinaryDataId = `filesystem:${correctFileId}`; + await restoreBinaryDataId(run, executionId); - expect(binaryDataService.rename).toHaveBeenCalledWith(incorrectFileId, correctFileId); - expect(getDataId(run, 'binary')).toBe(correctBinaryDataId); - }); + const correctFileId = `${executionId}${incorrectFileId}`; + const correctBinaryDataId = `filesystem:${correctFileId}`; - it('should do nothing if binary data ID is not missing execution ID', async () => { - const executionId = '999'; - const fileId = `${executionId}a5c3f1ed-9d59-4155-bc68-9a370b3c51f6`; - const binaryDataId = `filesystem:${fileId}`; - const run = toIRun({ - binary: { - data: { - id: binaryDataId, + expect(binaryDataService.rename).toHaveBeenCalledWith(incorrectFileId, correctFileId); + expect(getDataId(run, 'binary')).toBe(correctBinaryDataId); + }); + + it('should do nothing if binary data ID is not missing execution ID', async () => { + const executionId = '999'; + const fileId = `${executionId}a5c3f1ed-9d59-4155-bc68-9a370b3c51f6`; + const binaryDataId = `filesystem:${fileId}`; + const run = toIRun({ + binary: { + data: { + id: binaryDataId, + }, }, - }, + }); + + await restoreBinaryDataId(run, executionId); + + expect(binaryDataService.rename).not.toHaveBeenCalled(); + expect(getDataId(run, 'binary')).toBe(binaryDataId); }); - await restoreBinaryDataId(run, executionId); + it('should do nothing if no binary data ID', async () => { + const executionId = '999'; + const dataId = '123'; + const run = toIRun({ + json: { + data: { id: dataId }, + }, + }); - expect(binaryDataService.rename).not.toHaveBeenCalled(); - expect(getDataId(run, 'binary')).toBe(binaryDataId); + await restoreBinaryDataId(run, executionId); + + expect(binaryDataService.rename).not.toHaveBeenCalled(); + expect(getDataId(run, 'json')).toBe(dataId); + }); }); +}); - it('should do nothing if no binary data ID', async () => { - const executionId = '999'; - const dataId = '123'; - const run = toIRun({ - json: { - data: { id: dataId }, - }, +describe('on s3 mode', () => { + describe('restoreBinaryDataId()', () => { + beforeAll(() => { + config.set('binaryDataManager.mode', 's3'); }); - await restoreBinaryDataId(run, executionId); + afterEach(() => { + jest.clearAllMocks(); + }); - expect(binaryDataService.rename).not.toHaveBeenCalled(); - expect(getDataId(run, 'json')).toBe(dataId); + it('should restore if binary data ID is missing execution ID', async () => { + const workflowId = '6HYhhKmJch2cYxGj'; + const executionId = 'temp'; + const binaryDataFileUuid = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6'; + + const incorrectFileId = `workflows/${workflowId}/executions/temp/binary_data/${binaryDataFileUuid}`; + + const run = toIRun({ + binary: { + data: { id: `s3:${incorrectFileId}` }, + }, + }); + + await restoreBinaryDataId(run, executionId); + + const correctFileId = incorrectFileId.replace('temp', executionId); + const correctBinaryDataId = `s3:${correctFileId}`; + + expect(binaryDataService.rename).toHaveBeenCalledWith(incorrectFileId, correctFileId); + expect(getDataId(run, 'binary')).toBe(correctBinaryDataId); + }); + + it('should do nothing if binary data ID is not missing execution ID', async () => { + const workflowId = '6HYhhKmJch2cYxGj'; + const executionId = '999'; + const binaryDataFileUuid = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6'; + + const fileId = `workflows/${workflowId}/executions/${executionId}/binary_data/${binaryDataFileUuid}`; + + const binaryDataId = `s3:${fileId}`; + + const run = toIRun({ + binary: { + data: { + id: binaryDataId, + }, + }, + }); + + await restoreBinaryDataId(run, executionId); + + expect(binaryDataService.rename).not.toHaveBeenCalled(); + expect(getDataId(run, 'binary')).toBe(binaryDataId); + }); + + it('should do nothing if no binary data ID', async () => { + const executionId = '999'; + const dataId = '123'; + + const run = toIRun({ + json: { + data: { id: dataId }, + }, + }); + + await restoreBinaryDataId(run, executionId); + + expect(binaryDataService.rename).not.toHaveBeenCalled(); + expect(getDataId(run, 'json')).toBe(dataId); + }); }); it('should do nothing on itemless case', async () => { diff --git a/packages/core/src/BinaryData/BinaryData.service.ts b/packages/core/src/BinaryData/BinaryData.service.ts index e480907e12881..e54f24ba9d82f 100644 --- a/packages/core/src/BinaryData/BinaryData.service.ts +++ b/packages/core/src/BinaryData/BinaryData.service.ts @@ -1,13 +1,12 @@ /* eslint-disable @typescript-eslint/naming-convention */ -import { readFile, stat } from 'fs/promises'; +import { readFile, stat } from 'node:fs/promises'; import prettyBytes from 'pretty-bytes'; -import { Service } from 'typedi'; +import Container, { Service } from 'typedi'; import { BINARY_ENCODING, LoggerProxy as Logger, IBinaryData } from 'n8n-workflow'; - -import { UnknownBinaryDataManager, InvalidBinaryDataMode } from './errors'; -import { LogCatch } from '../decorators/LogCatch.decorator'; +import { UnknownBinaryDataManagerError, InvalidBinaryDataModeError } from './errors'; import { areValidModes, toBuffer } from './utils'; +import { LogCatch } from '../decorators/LogCatch.decorator'; import type { Readable } from 'stream'; import type { BinaryData } from './types'; @@ -20,16 +19,28 @@ export class BinaryDataService { private managers: Record = {}; async init(config: BinaryData.Config) { - if (!areValidModes(config.availableModes)) throw new InvalidBinaryDataMode(); + if (!areValidModes(config.availableModes)) { + throw new InvalidBinaryDataModeError(); + } this.mode = config.mode; if (config.availableModes.includes('filesystem')) { const { FileSystemManager } = await import('./FileSystem.manager'); + this.managers.filesystem = new FileSystemManager(config.localStoragePath); await this.managers.filesystem.init(); } + + if (config.availableModes.includes('s3')) { + const { ObjectStoreManager } = await import('./ObjectStore.manager'); + const { ObjectStoreService } = await import('../ObjectStore/ObjectStore.service.ee'); + + this.managers.s3 = new ObjectStoreManager(Container.get(ObjectStoreService)); + + await this.managers.s3.init(); + } } @LogCatch((error) => Logger.error('Failed to copy binary data file', { error })) @@ -242,6 +253,6 @@ export class BinaryDataService { if (manager) return manager; - throw new UnknownBinaryDataManager(mode); + throw new UnknownBinaryDataManagerError(mode); } } diff --git a/packages/core/src/BinaryData/FileSystem.manager.ts b/packages/core/src/BinaryData/FileSystem.manager.ts index 62a47bdc54b38..9fa6688d678fa 100644 --- a/packages/core/src/BinaryData/FileSystem.manager.ts +++ b/packages/core/src/BinaryData/FileSystem.manager.ts @@ -1,18 +1,10 @@ -/** - * @tech_debt The `workflowId` arguments on write are for compatibility with the - * `BinaryData.Manager` interface. Unused in filesystem mode until we refactor - * how we store binary data files in the `/binaryData` dir. - */ - -import { createReadStream } from 'fs'; -import fs from 'fs/promises'; -import path from 'path'; +import { createReadStream } from 'node:fs'; +import fs from 'node:fs/promises'; +import path from 'node:path'; import { v4 as uuid } from 'uuid'; import { jsonParse } from 'n8n-workflow'; -import { rename } from 'node:fs/promises'; - -import { FileNotFoundError } from '../errors'; import { ensureDirExists } from './utils'; +import { FileNotFoundError } from '../errors'; import type { Readable } from 'stream'; import type { BinaryData } from './types'; @@ -27,18 +19,36 @@ export class FileSystemManager implements BinaryData.Manager { await ensureDirExists(this.storagePath); } + async store( + workflowId: string, + executionId: string, + bufferOrStream: Buffer | Readable, + { mimeType, fileName }: BinaryData.PreWriteMetadata, + ) { + const fileId = this.toFileId(workflowId, executionId); + const filePath = this.resolvePath(fileId); + + await fs.writeFile(filePath, bufferOrStream); + + const fileSize = await this.getSize(fileId); + + await this.storeMetadata(fileId, { mimeType, fileName, fileSize }); + + return { fileId, fileSize }; + } + getPath(fileId: string) { return this.resolvePath(fileId); } async getAsStream(fileId: string, chunkSize?: number) { - const filePath = this.getPath(fileId); + const filePath = this.resolvePath(fileId); return createReadStream(filePath, { highWaterMark: chunkSize }); } async getAsBuffer(fileId: string) { - const filePath = this.getPath(fileId); + const filePath = this.resolvePath(fileId); try { return await fs.readFile(filePath); @@ -53,30 +63,6 @@ export class FileSystemManager implements BinaryData.Manager { return jsonParse(await fs.readFile(filePath, { encoding: 'utf-8' })); } - async store( - _workflowId: string, - executionId: string, - bufferOrStream: Buffer | Readable, - { mimeType, fileName }: BinaryData.PreWriteMetadata, - ) { - const fileId = this.toFileId(executionId); - const filePath = this.getPath(fileId); - - await fs.writeFile(filePath, bufferOrStream); - - const fileSize = await this.getSize(fileId); - - await this.storeMetadata(fileId, { mimeType, fileName, fileSize }); - - return { fileId, fileSize }; - } - - async deleteOne(fileId: string) { - const filePath = this.getPath(fileId); - - return fs.rm(filePath); - } - async deleteMany(ids: BinaryData.IdsForDeletion) { const executionIds = ids.map((o) => o.executionId); @@ -95,24 +81,25 @@ export class FileSystemManager implements BinaryData.Manager { } async copyByFilePath( - _workflowId: string, + workflowId: string, executionId: string, - filePath: string, + sourcePath: string, { mimeType, fileName }: BinaryData.PreWriteMetadata, ) { - const newFileId = this.toFileId(executionId); + const targetFileId = this.toFileId(workflowId, executionId); + const targetPath = this.resolvePath(targetFileId); - await fs.cp(filePath, this.getPath(newFileId)); + await fs.cp(sourcePath, targetPath); - const fileSize = await this.getSize(newFileId); + const fileSize = await this.getSize(targetFileId); - await this.storeMetadata(newFileId, { mimeType, fileName, fileSize }); + await this.storeMetadata(targetFileId, { mimeType, fileName, fileSize }); - return { fileId: newFileId, fileSize }; + return { fileId: targetFileId, fileSize }; } - async copyByFileId(_workflowId: string, executionId: string, sourceFileId: string) { - const targetFileId = this.toFileId(executionId); + async copyByFileId(workflowId: string, executionId: string, sourceFileId: string) { + const targetFileId = this.toFileId(workflowId, executionId); const sourcePath = this.resolvePath(sourceFileId); const targetPath = this.resolvePath(targetFileId); @@ -122,12 +109,12 @@ export class FileSystemManager implements BinaryData.Manager { } async rename(oldFileId: string, newFileId: string) { - const oldPath = this.getPath(oldFileId); - const newPath = this.getPath(newFileId); + const oldPath = this.resolvePath(oldFileId); + const newPath = this.resolvePath(newFileId); await Promise.all([ - rename(oldPath, newPath), - rename(`${oldPath}.metadata`, `${newPath}.metadata`), + fs.rename(oldPath, newPath), + fs.rename(`${oldPath}.metadata`, `${newPath}.metadata`), ]); } @@ -135,7 +122,12 @@ export class FileSystemManager implements BinaryData.Manager { // private methods // ---------------------------------- - private toFileId(executionId: string) { + /** + * @tech_debt The `workflowId` argument is for compatibility with the + * `BinaryData.Manager` interface. Unused here until we refactor + * how we store binary data files in the `/binaryData` dir. + */ + private toFileId(_workflowId: string, executionId: string) { return [executionId, uuid()].join(''); } @@ -156,7 +148,7 @@ export class FileSystemManager implements BinaryData.Manager { } private async getSize(fileId: string) { - const filePath = this.getPath(fileId); + const filePath = this.resolvePath(fileId); try { const stats = await fs.stat(filePath); diff --git a/packages/core/src/BinaryData/ObjectStore.manager.ts b/packages/core/src/BinaryData/ObjectStore.manager.ts new file mode 100644 index 0000000000000..9a6040b1b911a --- /dev/null +++ b/packages/core/src/BinaryData/ObjectStore.manager.ts @@ -0,0 +1,120 @@ +import fs from 'node:fs/promises'; +import { Service } from 'typedi'; +import { v4 as uuid } from 'uuid'; +import { toBuffer } from './utils'; +import { ObjectStoreService } from '../ObjectStore/ObjectStore.service.ee'; + +import type { Readable } from 'node:stream'; +import type { BinaryData } from './types'; + +@Service() +export class ObjectStoreManager implements BinaryData.Manager { + constructor(private readonly objectStoreService: ObjectStoreService) {} + + async init() { + await this.objectStoreService.checkConnection(); + } + + async store( + workflowId: string, + executionId: string, + bufferOrStream: Buffer | Readable, + metadata: BinaryData.PreWriteMetadata, + ) { + const fileId = this.toFileId(workflowId, executionId); + const buffer = await this.toBuffer(bufferOrStream); + + await this.objectStoreService.put(fileId, buffer, metadata); + + return { fileId, fileSize: buffer.length }; + } + + getPath(fileId: string) { + return fileId; // already full path, no transform needed + } + + async getAsBuffer(fileId: string) { + return this.objectStoreService.get(fileId, { mode: 'buffer' }); + } + + async getAsStream(fileId: string) { + return this.objectStoreService.get(fileId, { mode: 'stream' }); + } + + async getMetadata(fileId: string): Promise { + const { + 'content-length': contentLength, + 'content-type': contentType, + 'x-amz-meta-filename': fileName, + } = await this.objectStoreService.getMetadata(fileId); + + const metadata: BinaryData.Metadata = { fileSize: Number(contentLength) }; + + if (contentType) metadata.mimeType = contentType; + if (fileName) metadata.fileName = fileName; + + return metadata; + } + + async copyByFileId(workflowId: string, executionId: string, sourceFileId: string) { + const targetFileId = this.toFileId(workflowId, executionId); + + const sourceFile = await this.objectStoreService.get(sourceFileId, { mode: 'buffer' }); + + await this.objectStoreService.put(targetFileId, sourceFile); + + return targetFileId; + } + + /** + * Copy to object store the temp file written by nodes like Webhook, FTP, and SSH. + */ + async copyByFilePath( + workflowId: string, + executionId: string, + sourcePath: string, + metadata: BinaryData.PreWriteMetadata, + ) { + const targetFileId = this.toFileId(workflowId, executionId); + const sourceFile = await fs.readFile(sourcePath); + + await this.objectStoreService.put(targetFileId, sourceFile, metadata); + + return { fileId: targetFileId, fileSize: sourceFile.length }; + } + + async deleteMany(ids: BinaryData.IdsForDeletion) { + const prefixes = ids.map( + ({ workflowId, executionId }) => + `workflows/${workflowId}/executions/${executionId}/binary_data/`, + ); + + await Promise.all( + prefixes.map(async (prefix) => { + await this.objectStoreService.deleteMany(prefix); + }), + ); + } + + async rename(oldFileId: string, newFileId: string) { + const oldFile = await this.objectStoreService.get(oldFileId, { mode: 'buffer' }); + const oldFileMetadata = await this.objectStoreService.getMetadata(oldFileId); + + await this.objectStoreService.put(newFileId, oldFile, oldFileMetadata); + await this.objectStoreService.deleteOne(oldFileId); + } + + // ---------------------------------- + // private methods + // ---------------------------------- + + private toFileId(workflowId: string, executionId: string) { + if (!executionId) executionId = 'temp'; // missing only in edge case, see PR #7244 + + return `workflows/${workflowId}/executions/${executionId}/binary_data/${uuid()}`; + } + + private async toBuffer(bufferOrStream: Buffer | Readable) { + return toBuffer(bufferOrStream); + } +} diff --git a/packages/core/src/BinaryData/errors.ts b/packages/core/src/BinaryData/errors.ts index dc52875b2a768..6ff1a637c7666 100644 --- a/packages/core/src/BinaryData/errors.ts +++ b/packages/core/src/BinaryData/errors.ts @@ -1,12 +1,10 @@ import { BINARY_DATA_MODES } from './utils'; -export class InvalidBinaryDataMode extends Error { - constructor() { - super(`Invalid binary data mode. Valid modes: ${BINARY_DATA_MODES.join(', ')}`); - } +export class InvalidBinaryDataModeError extends Error { + message = `Invalid binary data mode. Valid modes: ${BINARY_DATA_MODES.join(', ')}`; } -export class UnknownBinaryDataManager extends Error { +export class UnknownBinaryDataManagerError extends Error { constructor(mode: string) { super(`No binary data manager found for: ${mode}`); } diff --git a/packages/core/src/BinaryData/types.ts b/packages/core/src/BinaryData/types.ts index bedd024a06eb8..84c38401ee0d2 100644 --- a/packages/core/src/BinaryData/types.ts +++ b/packages/core/src/BinaryData/types.ts @@ -4,8 +4,10 @@ import type { BINARY_DATA_MODES } from './utils'; export namespace BinaryData { export type Mode = (typeof BINARY_DATA_MODES)[number]; + export type NonDefaultMode = Exclude; + export type Config = { - mode: 'default' | 'filesystem'; + mode: Mode; availableModes: string[]; localStoragePath: string; }; @@ -37,17 +39,16 @@ export namespace BinaryData { getAsStream(fileId: string, chunkSize?: number): Promise; getMetadata(fileId: string): Promise; + deleteMany(ids: IdsForDeletion): Promise; + copyByFileId(workflowId: string, executionId: string, sourceFileId: string): Promise; copyByFilePath( workflowId: string, executionId: string, - filePath: string, + sourcePath: string, metadata: PreWriteMetadata, ): Promise; - deleteOne(fileId: string): Promise; - deleteMany(ids: IdsForDeletion): Promise; - rename(oldFileId: string, newFileId: string): Promise; } } diff --git a/packages/core/src/ObjectStore/ObjectStore.service.ee.ts b/packages/core/src/ObjectStore/ObjectStore.service.ee.ts index 98d6f67f191ce..cb6c33737338b 100644 --- a/packages/core/src/ObjectStore/ObjectStore.service.ee.ts +++ b/packages/core/src/ObjectStore/ObjectStore.service.ee.ts @@ -4,31 +4,56 @@ import { createHash } from 'node:crypto'; import axios from 'axios'; import { Service } from 'typedi'; import { sign } from 'aws4'; -import { isStream, parseXml } from './utils'; -import { ExternalStorageRequestFailed } from './errors'; +import { isStream, parseXml, writeBlockedMessage } from './utils'; +import { LoggerProxy as Logger } from 'n8n-workflow'; -import type { AxiosRequestConfig, Method } from 'axios'; +import type { AxiosRequestConfig, AxiosResponse, Method } from 'axios'; import type { Request as Aws4Options, Credentials as Aws4Credentials } from 'aws4'; -import type { ListPage, ObjectStore, RawListPage } from './types'; +import type { + Bucket, + ConfigSchemaCredentials, + ListPage, + RawListPage, + RequestOptions, +} from './types'; import type { Readable } from 'stream'; import type { BinaryData } from '..'; @Service() export class ObjectStoreService { - private credentials: Aws4Credentials; + private host = ''; + + private bucket: Bucket = { region: '', name: '' }; + + private credentials: Aws4Credentials = { accessKeyId: '', secretAccessKey: '' }; + + private isReady = false; + + private isReadOnly = false; + + private logger = Logger; + + async init(host: string, bucket: Bucket, credentials: ConfigSchemaCredentials) { + this.host = host; + this.bucket.name = bucket.name; + this.bucket.region = bucket.region; - constructor( - private bucket: { region: string; name: string }, - credentials: { accountId: string; secretKey: string }, - ) { this.credentials = { - accessKeyId: credentials.accountId, - secretAccessKey: credentials.secretKey, + accessKeyId: credentials.accessKey, + secretAccessKey: credentials.accessSecret, }; + + await this.checkConnection(); + + this.setReady(true); + } + + setReadonly(newState: boolean) { + this.isReadOnly = newState; } - get host() { - return `${this.bucket.name}.s3.${this.bucket.region}.amazonaws.com`; + setReady(newState: boolean) { + this.isReady = newState; } /** @@ -37,7 +62,9 @@ export class ObjectStoreService { * @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html */ async checkConnection() { - return this.request('HEAD', this.host); + if (this.isReady) return; + + return this.request('HEAD', this.host, this.bucket.name); } /** @@ -46,6 +73,8 @@ export class ObjectStoreService { * @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html */ async put(filename: string, buffer: Buffer, metadata: BinaryData.PreWriteMetadata = {}) { + if (this.isReadOnly) return this.blockWrite(filename); + const headers: Record = { 'Content-Length': buffer.length, 'Content-MD5': createHash('md5').update(buffer).digest('base64'), @@ -54,7 +83,9 @@ export class ObjectStoreService { if (metadata.fileName) headers['x-amz-meta-filename'] = metadata.fileName; if (metadata.mimeType) headers['Content-Type'] = metadata.mimeType; - return this.request('PUT', this.host, `/${filename}`, { headers, body: buffer }); + const path = `/${this.bucket.name}/${filename}`; + + return this.request('PUT', this.host, path, { headers, body: buffer }); } /** @@ -62,9 +93,11 @@ export class ObjectStoreService { * * @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html */ - async get(path: string, { mode }: { mode: 'buffer' }): Promise; - async get(path: string, { mode }: { mode: 'stream' }): Promise; - async get(path: string, { mode }: { mode: 'stream' | 'buffer' }) { + async get(fileId: string, { mode }: { mode: 'buffer' }): Promise; + async get(fileId: string, { mode }: { mode: 'stream' }): Promise; + async get(fileId: string, { mode }: { mode: 'stream' | 'buffer' }) { + const path = `${this.bucket.name}/${fileId}`; + const { data } = await this.request('GET', this.host, path, { responseType: mode === 'buffer' ? 'arraybuffer' : 'stream', }); @@ -81,27 +114,31 @@ export class ObjectStoreService { * * @doc https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html */ - async getMetadata(path: string) { + async getMetadata(fileId: string) { type Response = { headers: { 'content-length': string; 'content-type'?: string; 'x-amz-meta-filename'?: string; - } & Record; + } & BinaryData.PreWriteMetadata; }; + const path = `${this.bucket.name}/${fileId}`; + const response: Response = await this.request('HEAD', this.host, path); return response.headers; } /** - * Delete an object in the configured bucket. + * Delete a single object in the configured bucket. * * @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html */ - async deleteOne(path: string) { - return this.request('DELETE', this.host, `/${encodeURIComponent(path)}`); + async deleteOne(fileId: string) { + const path = `${this.bucket.name}/${fileId}`; + + return this.request('DELETE', this.host, path); } /** @@ -122,13 +159,13 @@ export class ObjectStoreService { 'Content-MD5': createHash('md5').update(body).digest('base64'), }; - return this.request('POST', this.host, '/?delete', { headers, body }); + const path = `${this.bucket.name}/?delete`; + + return this.request('POST', this.host, path, { headers, body }); } /** * List objects with a common prefix in the configured bucket. - * - * @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html */ async list(prefix: string) { const items = []; @@ -149,16 +186,18 @@ export class ObjectStoreService { } /** - * Fetch a page of objects with a common prefix in the configured bucket. Max 1000 per page. + * Fetch a page of objects with a common prefix in the configured bucket. + * + * Max 1000 objects per page - set by AWS. + * + * @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html */ async getListPage(prefix: string, nextPageToken?: string) { - const bucketlessHost = this.host.split('.').slice(1).join('.'); - const qs: Record = { 'list-type': 2, prefix }; if (nextPageToken) qs['continuation-token'] = nextPageToken; - const { data } = await this.request('GET', bucketlessHost, `/${this.bucket.name}`, { qs }); + const { data } = await this.request('GET', this.host, this.bucket.name, { qs }); if (typeof data !== 'string') { throw new TypeError(`Expected XML string but received ${typeof data}`); @@ -193,11 +232,19 @@ export class ObjectStoreService { return path.concat(`?${qsParams}`); } - private async request( + private async blockWrite(filename: string): Promise { + const logMessage = writeBlockedMessage(filename); + + this.logger.warn(logMessage); + + return { status: 403, statusText: 'Forbidden', data: logMessage, headers: {}, config: {} }; + } + + private async request( method: Method, host: string, rawPath = '', - { qs, headers, body, responseType }: ObjectStore.RequestOptions = {}, + { qs, headers, body, responseType }: RequestOptions = {}, ) { const path = this.toPath(rawPath, qs); @@ -224,9 +271,17 @@ export class ObjectStoreService { if (responseType) config.responseType = responseType; try { - return await axios.request(config); - } catch (error) { - throw new ExternalStorageRequestFailed(error, config); + this.logger.debug('Sending request to S3', { config }); + + return await axios.request(config); + } catch (e) { + const error = e instanceof Error ? e : new Error(`${e}`); + + const message = `Request to S3 failed: ${error.message}`; + + this.logger.error(message, { config }); + + throw new Error(message, { cause: { error, details: config } }); } } } diff --git a/packages/core/src/ObjectStore/errors.ts b/packages/core/src/ObjectStore/errors.ts deleted file mode 100644 index 81da737d77ba0..0000000000000 --- a/packages/core/src/ObjectStore/errors.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { AxiosRequestConfig } from 'axios'; - -export class ExternalStorageRequestFailed extends Error { - constructor(error: unknown, details: AxiosRequestConfig) { - super('Request to external object storage failed'); - this.cause = { error, details }; - } -} diff --git a/packages/core/src/ObjectStore/types.ts b/packages/core/src/ObjectStore/types.ts index b5ac6c96d1aa7..444630e924e94 100644 --- a/packages/core/src/ObjectStore/types.ts +++ b/packages/core/src/ObjectStore/types.ts @@ -22,11 +22,13 @@ type Item = { export type ListPage = Omit & { contents: Item[] }; -export namespace ObjectStore { - export type RequestOptions = { - qs?: Record; - headers?: Record; - body?: string | Buffer; - responseType?: ResponseType; - }; -} +export type Bucket = { region: string; name: string }; + +export type RequestOptions = { + qs?: Record; + headers?: Record; + body?: string | Buffer; + responseType?: ResponseType; +}; + +export type ConfigSchemaCredentials = { accessKey: string; accessSecret: string }; diff --git a/packages/core/src/ObjectStore/utils.ts b/packages/core/src/ObjectStore/utils.ts index 76dcb1f076b1d..1ecad915f9091 100644 --- a/packages/core/src/ObjectStore/utils.ts +++ b/packages/core/src/ObjectStore/utils.ts @@ -14,3 +14,7 @@ export async function parseXml(xml: string): Promise { valueProcessors: [parseNumbers, parseBooleans], }) as Promise; } + +export function writeBlockedMessage(filename: string) { + return `Request to write file "${filename}" to object storage was blocked because S3 storage is not available with your current license. Please upgrade to a license that supports this feature, or set N8N_DEFAULT_BINARY_DATA_MODE to an option other than "s3".`; +} diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 9fdb093baa574..3eaecf6581d84 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -17,3 +17,4 @@ export * from './WorkflowExecute'; export { NodeExecuteFunctions, UserSettings }; export * from './errors'; export { ObjectStoreService } from './ObjectStore/ObjectStore.service.ee'; +export { BinaryData } from './BinaryData/types'; diff --git a/packages/core/test/ObjectStore.manager.test.ts b/packages/core/test/ObjectStore.manager.test.ts new file mode 100644 index 0000000000000..a9f23102fbaa6 --- /dev/null +++ b/packages/core/test/ObjectStore.manager.test.ts @@ -0,0 +1,147 @@ +import fs from 'node:fs/promises'; +import { ObjectStoreManager } from '@/BinaryData/ObjectStore.manager'; +import { ObjectStoreService } from '@/ObjectStore/ObjectStore.service.ee'; +import { isStream } from '@/ObjectStore/utils'; +import { mockInstance, toStream } from './utils'; + +jest.mock('fs/promises'); + +const objectStoreService = mockInstance(ObjectStoreService); +const objectStoreManager = new ObjectStoreManager(objectStoreService); + +const toFileId = (workflowId: string, executionId: string, fileUuid: string) => + `workflows/${workflowId}/executions/${executionId}/binary_data/${fileUuid}`; + +const workflowId = 'ObogjVbqpNOQpiyV'; +const executionId = '999'; +const fileUuid = '71f6209b-5d48-41a2-a224-80d529d8bb32'; +const fileId = toFileId(workflowId, executionId, fileUuid); +const prefix = `workflows/${workflowId}/executions/${executionId}/binary_data/`; + +const otherWorkflowId = 'FHio8ftV6SrCAfPJ'; +const otherExecutionId = '888'; +const otherFileUuid = '71f6209b-5d48-41a2-a224-80d529d8bb33'; +const otherFileId = toFileId(otherWorkflowId, otherExecutionId, otherFileUuid); + +const mockBuffer = Buffer.from('Test data'); +const mockStream = toStream(mockBuffer); + +beforeAll(() => { + jest.restoreAllMocks(); +}); + +describe('store()', () => { + it('should store a buffer', async () => { + const metadata = { mimeType: 'text/plain' }; + + const result = await objectStoreManager.store(workflowId, executionId, mockBuffer, metadata); + + expect(result.fileId.startsWith(prefix)).toBe(true); + expect(result.fileSize).toBe(mockBuffer.length); + }); +}); + +describe('getPath()', () => { + it('should return a path', async () => { + const path = objectStoreManager.getPath(fileId); + + expect(path).toBe(fileId); + }); +}); + +describe('getAsBuffer()', () => { + it('should return a buffer', async () => { + // @ts-expect-error Overload signature seemingly causing the return type to be misinferred + objectStoreService.get.mockResolvedValue(mockBuffer); + + const result = await objectStoreManager.getAsBuffer(fileId); + + expect(Buffer.isBuffer(result)).toBe(true); + expect(objectStoreService.get).toHaveBeenCalledWith(fileId, { mode: 'buffer' }); + }); +}); + +describe('getAsStream()', () => { + it('should return a stream', async () => { + objectStoreService.get.mockResolvedValue(mockStream); + + const stream = await objectStoreManager.getAsStream(fileId); + + expect(isStream(stream)).toBe(true); + expect(objectStoreService.get).toHaveBeenCalledWith(fileId, { mode: 'stream' }); + }); +}); + +describe('getMetadata()', () => { + it('should return metadata', async () => { + const mimeType = 'text/plain'; + const fileName = 'file.txt'; + + objectStoreService.getMetadata.mockResolvedValue({ + 'content-length': '1', + 'content-type': mimeType, + 'x-amz-meta-filename': fileName, + }); + + const metadata = await objectStoreManager.getMetadata(fileId); + + expect(metadata).toEqual(expect.objectContaining({ fileSize: 1, mimeType, fileName })); + expect(objectStoreService.getMetadata).toHaveBeenCalledWith(fileId); + }); +}); + +describe('copyByFileId()', () => { + it('should copy by file ID and return the file ID', async () => { + const targetFileId = await objectStoreManager.copyByFileId(workflowId, executionId, fileId); + + expect(targetFileId.startsWith(prefix)).toBe(true); + expect(objectStoreService.get).toHaveBeenCalledWith(fileId, { mode: 'buffer' }); + }); +}); + +describe('copyByFilePath()', () => { + test('should copy by file path and return the file ID and size', async () => { + const sourceFilePath = 'path/to/file/in/filesystem'; + const metadata = { mimeType: 'text/plain' }; + + fs.readFile = jest.fn().mockResolvedValue(mockBuffer); + + const result = await objectStoreManager.copyByFilePath( + workflowId, + executionId, + sourceFilePath, + metadata, + ); + + expect(result.fileId.startsWith(prefix)).toBe(true); + expect(fs.readFile).toHaveBeenCalledWith(sourceFilePath); + expect(result.fileSize).toBe(mockBuffer.length); + }); +}); + +describe('deleteMany()', () => { + it('should delete many files by prefix', async () => { + const ids = [ + { workflowId, executionId }, + { workflowId: otherWorkflowId, executionId: otherExecutionId }, + ]; + + const promise = objectStoreManager.deleteMany(ids); + + await expect(promise).resolves.not.toThrow(); + + expect(objectStoreService.deleteMany).toHaveBeenCalledTimes(2); + }); +}); + +describe('rename()', () => { + it('should rename a file', async () => { + const promise = objectStoreManager.rename(fileId, otherFileId); + + await expect(promise).resolves.not.toThrow(); + + expect(objectStoreService.get).toHaveBeenCalledWith(fileId, { mode: 'buffer' }); + expect(objectStoreService.getMetadata).toHaveBeenCalledWith(fileId); + expect(objectStoreService.deleteOne).toHaveBeenCalledWith(fileId); + }); +}); diff --git a/packages/core/test/ObjectStore.service.test.ts b/packages/core/test/ObjectStore.service.test.ts new file mode 100644 index 0000000000000..bb87a0e0a5c1b --- /dev/null +++ b/packages/core/test/ObjectStore.service.test.ts @@ -0,0 +1,311 @@ +import axios from 'axios'; +import { ObjectStoreService } from '@/ObjectStore/ObjectStore.service.ee'; +import { Readable } from 'stream'; +import { writeBlockedMessage } from '@/ObjectStore/utils'; +import { initLogger } from './helpers/utils'; + +jest.mock('axios'); + +const mockAxios = axios as jest.Mocked; + +const mockBucket = { region: 'us-east-1', name: 'test-bucket' }; +const mockHost = `s3.${mockBucket.region}.amazonaws.com`; +const mockCredentials = { accessKey: 'mock-access-key', accessSecret: 'mock-secret-key' }; +const mockUrl = `https://${mockHost}/${mockBucket.name}`; +const FAILED_REQUEST_ERROR_MESSAGE = 'Request to S3 failed'; +const mockError = new Error('Something went wrong!'); +const fileId = + 'workflows/ObogjVbqpNOQpiyV/executions/999/binary_data/71f6209b-5d48-41a2-a224-80d529d8bb32'; +const mockBuffer = Buffer.from('Test data'); + +const toDeletionXml = (filename: string) => ` +${filename} +`; + +let objectStoreService: ObjectStoreService; +initLogger(); + +beforeEach(async () => { + objectStoreService = new ObjectStoreService(); + mockAxios.request.mockResolvedValueOnce({ status: 200 }); // for checkConnection + await objectStoreService.init(mockHost, mockBucket, mockCredentials); + jest.restoreAllMocks(); +}); + +describe('checkConnection()', () => { + it('should send a HEAD request to the correct host', async () => { + mockAxios.request.mockResolvedValue({ status: 200 }); + + objectStoreService.setReady(false); + + await objectStoreService.checkConnection(); + + expect(mockAxios.request).toHaveBeenCalledWith( + expect.objectContaining({ + method: 'HEAD', + url: `https://${mockHost}/${mockBucket.name}`, + headers: expect.objectContaining({ + 'X-Amz-Content-Sha256': expect.any(String), + 'X-Amz-Date': expect.any(String), + Authorization: expect.any(String), + }), + }), + ); + }); + + it('should throw an error on request failure', async () => { + objectStoreService.setReady(false); + + mockAxios.request.mockRejectedValue(mockError); + + const promise = objectStoreService.checkConnection(); + + await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); + }); +}); + +describe('getMetadata()', () => { + it('should send a HEAD request to the correct host and path', async () => { + mockAxios.request.mockResolvedValue({ status: 200 }); + + await objectStoreService.getMetadata(fileId); + + expect(mockAxios.request).toHaveBeenCalledWith( + expect.objectContaining({ + method: 'HEAD', + url: `${mockUrl}/${fileId}`, + headers: expect.objectContaining({ + Host: mockHost, + 'X-Amz-Content-Sha256': expect.any(String), + 'X-Amz-Date': expect.any(String), + Authorization: expect.any(String), + }), + }), + ); + }); + + it('should throw an error on request failure', async () => { + mockAxios.request.mockRejectedValue(mockError); + + const promise = objectStoreService.getMetadata(fileId); + + await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); + }); +}); + +describe('put()', () => { + it('should send a PUT request to upload an object', async () => { + const metadata = { fileName: 'file.txt', mimeType: 'text/plain' }; + + mockAxios.request.mockResolvedValue({ status: 200 }); + + await objectStoreService.put(fileId, mockBuffer, metadata); + + expect(mockAxios.request).toHaveBeenCalledWith( + expect.objectContaining({ + method: 'PUT', + url: `${mockUrl}/${fileId}`, + headers: expect.objectContaining({ + 'Content-Length': mockBuffer.length, + 'Content-MD5': expect.any(String), + 'x-amz-meta-filename': metadata.fileName, + 'Content-Type': metadata.mimeType, + }), + data: mockBuffer, + }), + ); + }); + + it('should block if read-only', async () => { + initLogger(); + objectStoreService.setReadonly(true); + + const metadata = { fileName: 'file.txt', mimeType: 'text/plain' }; + + const promise = objectStoreService.put(fileId, mockBuffer, metadata); + + await expect(promise).resolves.not.toThrow(); + + const result = await promise; + + expect(result.status).toBe(403); + expect(result.statusText).toBe('Forbidden'); + + expect(result.data).toBe(writeBlockedMessage(fileId)); + }); + + it('should throw an error on request failure', async () => { + const metadata = { fileName: 'file.txt', mimeType: 'text/plain' }; + + mockAxios.request.mockRejectedValue(mockError); + + const promise = objectStoreService.put(fileId, mockBuffer, metadata); + + await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); + }); +}); + +describe('get()', () => { + it('should send a GET request to download an object as a buffer', async () => { + const fileId = 'file.txt'; + + mockAxios.request.mockResolvedValue({ status: 200, data: Buffer.from('Test content') }); + + const result = await objectStoreService.get(fileId, { mode: 'buffer' }); + + expect(mockAxios.request).toHaveBeenCalledWith( + expect.objectContaining({ + method: 'GET', + url: `${mockUrl}/${fileId}`, + responseType: 'arraybuffer', + }), + ); + + expect(Buffer.isBuffer(result)).toBe(true); + }); + + it('should send a GET request to download an object as a stream', async () => { + mockAxios.request.mockResolvedValue({ status: 200, data: new Readable() }); + + const result = await objectStoreService.get(fileId, { mode: 'stream' }); + + expect(mockAxios.request).toHaveBeenCalledWith( + expect.objectContaining({ + method: 'GET', + url: `${mockUrl}/${fileId}`, + responseType: 'stream', + }), + ); + + expect(result instanceof Readable).toBe(true); + }); + + it('should throw an error on request failure', async () => { + mockAxios.request.mockRejectedValue(mockError); + + const promise = objectStoreService.get(fileId, { mode: 'buffer' }); + + await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); + }); +}); + +describe('deleteOne()', () => { + it('should send a DELETE request to delete a single object', async () => { + mockAxios.request.mockResolvedValue({ status: 204 }); + + await objectStoreService.deleteOne(fileId); + + expect(mockAxios.request).toHaveBeenCalledWith( + expect.objectContaining({ + method: 'DELETE', + url: `${mockUrl}/${fileId}`, + }), + ); + }); + + it('should throw an error on request failure', async () => { + mockAxios.request.mockRejectedValue(mockError); + + const promise = objectStoreService.deleteOne(fileId); + + await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); + }); +}); + +describe('deleteMany()', () => { + it('should send a POST request to delete multiple objects', async () => { + const prefix = 'test-dir/'; + const fileName = 'file.txt'; + + const mockList = [ + { + key: fileName, + lastModified: '2023-09-24T12:34:56Z', + eTag: 'abc123def456', + size: 456789, + storageClass: 'STANDARD', + }, + ]; + + objectStoreService.list = jest.fn().mockResolvedValue(mockList); + + mockAxios.request.mockResolvedValue({ status: 204 }); + + await objectStoreService.deleteMany(prefix); + + expect(objectStoreService.list).toHaveBeenCalledWith(prefix); + expect(mockAxios.request).toHaveBeenCalledWith( + expect.objectContaining({ + method: 'POST', + url: `${mockUrl}/?delete`, + headers: expect.objectContaining({ + 'Content-Type': 'application/xml', + 'Content-Length': expect.any(Number), + 'Content-MD5': expect.any(String), + }), + data: toDeletionXml(fileName), + }), + ); + }); + + it('should throw an error on request failure', async () => { + mockAxios.request.mockRejectedValue(mockError); + + const promise = objectStoreService.deleteMany('test-dir/'); + + await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); + }); +}); + +describe('list()', () => { + it('should list objects with a common prefix', async () => { + const prefix = 'test-dir/'; + + const mockListPage = { + contents: [{ key: `${prefix}file1.txt` }, { key: `${prefix}file2.txt` }], + isTruncated: false, + }; + + objectStoreService.getListPage = jest.fn().mockResolvedValue(mockListPage); + + mockAxios.request.mockResolvedValue({ status: 200 }); + + const result = await objectStoreService.list(prefix); + + expect(result).toEqual(mockListPage.contents); + }); + + it('should consolidate pages', async () => { + const prefix = 'test-dir/'; + + const mockFirstListPage = { + contents: [{ key: `${prefix}file1.txt` }], + isTruncated: true, + nextContinuationToken: 'token1', + }; + + const mockSecondListPage = { + contents: [{ key: `${prefix}file2.txt` }], + isTruncated: false, + }; + + objectStoreService.getListPage = jest + .fn() + .mockResolvedValueOnce(mockFirstListPage) + .mockResolvedValueOnce(mockSecondListPage); + + mockAxios.request.mockResolvedValue({ status: 200 }); + + const result = await objectStoreService.list(prefix); + + expect(result).toEqual([...mockFirstListPage.contents, ...mockSecondListPage.contents]); + }); + + it('should throw an error on request failure', async () => { + mockAxios.request.mockRejectedValue(mockError); + + const promise = objectStoreService.list('test-dir/'); + + await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); + }); +}); diff --git a/packages/core/test/ObjectStore.test.ts b/packages/core/test/ObjectStore.test.ts deleted file mode 100644 index 9b13b253f41e8..0000000000000 --- a/packages/core/test/ObjectStore.test.ts +++ /dev/null @@ -1,301 +0,0 @@ -import axios from 'axios'; -import { ObjectStoreService } from '../src/ObjectStore/ObjectStore.service.ee'; -import { Readable } from 'stream'; - -jest.mock('axios'); - -const mockAxios = axios as jest.Mocked; - -const MOCK_BUCKET = { region: 'us-east-1', name: 'test-bucket' }; -const MOCK_CREDENTIALS = { accountId: 'mock-account-id', secretKey: 'mock-secret-key' }; -const FAILED_REQUEST_ERROR_MESSAGE = 'Request to external object storage failed'; -const EXPECTED_HOST = `${MOCK_BUCKET.name}.s3.${MOCK_BUCKET.region}.amazonaws.com`; -const MOCK_S3_ERROR = new Error('Something went wrong!'); - -const toMultipleDeletionXml = (filename: string) => ` -${filename} -`; - -describe('ObjectStoreService', () => { - let objectStoreService: ObjectStoreService; - - beforeEach(() => { - objectStoreService = new ObjectStoreService(MOCK_BUCKET, MOCK_CREDENTIALS); - jest.restoreAllMocks(); - }); - - describe('checkConnection()', () => { - it('should send a HEAD request to the correct host', async () => { - mockAxios.request.mockResolvedValue({ status: 200 }); - - await objectStoreService.checkConnection(); - - expect(mockAxios.request).toHaveBeenCalledWith( - expect.objectContaining({ - method: 'HEAD', - url: `https://${EXPECTED_HOST}/`, - headers: expect.objectContaining({ - 'X-Amz-Content-Sha256': expect.any(String), - 'X-Amz-Date': expect.any(String), - Authorization: expect.any(String), - }), - }), - ); - }); - - it('should throw an error on request failure', async () => { - mockAxios.request.mockRejectedValue(MOCK_S3_ERROR); - - const promise = objectStoreService.checkConnection(); - - await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); - }); - }); - - describe('getMetadata()', () => { - it('should send a HEAD request to the correct host and path', async () => { - const path = 'file.txt'; - - mockAxios.request.mockResolvedValue({ status: 200 }); - - await objectStoreService.getMetadata(path); - - expect(mockAxios.request).toHaveBeenCalledWith( - expect.objectContaining({ - method: 'HEAD', - url: `https://${EXPECTED_HOST}/${path}`, - headers: expect.objectContaining({ - Host: EXPECTED_HOST, - 'X-Amz-Content-Sha256': expect.any(String), - 'X-Amz-Date': expect.any(String), - Authorization: expect.any(String), - }), - }), - ); - }); - - it('should throw an error on request failure', async () => { - const path = 'file.txt'; - - mockAxios.request.mockRejectedValue(MOCK_S3_ERROR); - - const promise = objectStoreService.getMetadata(path); - - await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); - }); - }); - - describe('put()', () => { - it('should send a PUT request to upload an object', async () => { - const path = 'file.txt'; - const buffer = Buffer.from('Test content'); - const metadata = { fileName: path, mimeType: 'text/plain' }; - - mockAxios.request.mockResolvedValue({ status: 200 }); - - await objectStoreService.put(path, buffer, metadata); - - expect(mockAxios.request).toHaveBeenCalledWith( - expect.objectContaining({ - method: 'PUT', - url: `https://${EXPECTED_HOST}/${path}`, - headers: expect.objectContaining({ - 'Content-Length': buffer.length, - 'Content-MD5': expect.any(String), - 'x-amz-meta-filename': metadata.fileName, - 'Content-Type': metadata.mimeType, - }), - data: buffer, - }), - ); - }); - - it('should throw an error on request failure', async () => { - const path = 'file.txt'; - const buffer = Buffer.from('Test content'); - const metadata = { fileName: path, mimeType: 'text/plain' }; - - mockAxios.request.mockRejectedValue(MOCK_S3_ERROR); - - const promise = objectStoreService.put(path, buffer, metadata); - - await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); - }); - }); - - describe('get()', () => { - it('should send a GET request to download an object as a buffer', async () => { - const path = 'file.txt'; - - mockAxios.request.mockResolvedValue({ status: 200, data: Buffer.from('Test content') }); - - const result = await objectStoreService.get(path, { mode: 'buffer' }); - - expect(mockAxios.request).toHaveBeenCalledWith( - expect.objectContaining({ - method: 'GET', - url: `https://${EXPECTED_HOST}/${path}`, - responseType: 'arraybuffer', - }), - ); - - expect(Buffer.isBuffer(result)).toBe(true); - }); - - it('should send a GET request to download an object as a stream', async () => { - const path = 'file.txt'; - - mockAxios.request.mockResolvedValue({ status: 200, data: new Readable() }); - - const result = await objectStoreService.get(path, { mode: 'stream' }); - - expect(mockAxios.request).toHaveBeenCalledWith( - expect.objectContaining({ - method: 'GET', - url: `https://${EXPECTED_HOST}/${path}`, - responseType: 'stream', - }), - ); - - expect(result instanceof Readable).toBe(true); - }); - - it('should throw an error on request failure', async () => { - const path = 'file.txt'; - - mockAxios.request.mockRejectedValue(MOCK_S3_ERROR); - - const promise = objectStoreService.get(path, { mode: 'buffer' }); - - await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); - }); - }); - - describe('deleteOne()', () => { - it('should send a DELETE request to delete an object', async () => { - const path = 'file.txt'; - - mockAxios.request.mockResolvedValue({ status: 204 }); - - await objectStoreService.deleteOne(path); - - expect(mockAxios.request).toHaveBeenCalledWith( - expect.objectContaining({ - method: 'DELETE', - url: `https://${EXPECTED_HOST}/${path}`, - }), - ); - }); - - it('should throw an error on request failure', async () => { - const path = 'file.txt'; - - mockAxios.request.mockRejectedValue(MOCK_S3_ERROR); - - const promise = objectStoreService.deleteOne(path); - - await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); - }); - }); - - describe('deleteMany()', () => { - it('should send a POST request to delete multiple objects', async () => { - const prefix = 'test-dir/'; - const fileName = 'file.txt'; - - const mockList = [ - { - key: fileName, - lastModified: '2023-09-24T12:34:56Z', - eTag: 'abc123def456', - size: 456789, - storageClass: 'STANDARD', - }, - ]; - - objectStoreService.list = jest.fn().mockResolvedValue(mockList); - - mockAxios.request.mockResolvedValue({ status: 204 }); - - await objectStoreService.deleteMany(prefix); - - expect(mockAxios.request).toHaveBeenCalledWith( - expect.objectContaining({ - method: 'POST', - url: `https://${EXPECTED_HOST}/?delete`, - headers: expect.objectContaining({ - 'Content-Type': 'application/xml', - 'Content-Length': expect.any(Number), - 'Content-MD5': expect.any(String), - }), - data: toMultipleDeletionXml(fileName), - }), - ); - }); - - it('should throw an error on request failure', async () => { - const prefix = 'test-dir/'; - - mockAxios.request.mockRejectedValue(MOCK_S3_ERROR); - - const promise = objectStoreService.deleteMany(prefix); - - await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); - }); - }); - - describe('list()', () => { - it('should list objects with a common prefix', async () => { - const prefix = 'test-dir/'; - - const mockListPage = { - contents: [{ key: `${prefix}file1.txt` }, { key: `${prefix}file2.txt` }], - isTruncated: false, - }; - - objectStoreService.getListPage = jest.fn().mockResolvedValue(mockListPage); - - mockAxios.request.mockResolvedValue({ status: 200 }); - - const result = await objectStoreService.list(prefix); - - expect(result).toEqual(mockListPage.contents); - }); - - it('should consolidate pages', async () => { - const prefix = 'test-dir/'; - - const mockFirstListPage = { - contents: [{ key: `${prefix}file1.txt` }], - isTruncated: true, - nextContinuationToken: 'token1', - }; - - const mockSecondListPage = { - contents: [{ key: `${prefix}file2.txt` }], - isTruncated: false, - }; - - objectStoreService.getListPage = jest - .fn() - .mockResolvedValueOnce(mockFirstListPage) - .mockResolvedValueOnce(mockSecondListPage); - - mockAxios.request.mockResolvedValue({ status: 200 }); - - const result = await objectStoreService.list(prefix); - - expect(result).toEqual([...mockFirstListPage.contents, ...mockSecondListPage.contents]); - }); - - it('should throw an error on request failure', async () => { - const prefix = 'test-dir/'; - - mockAxios.request.mockRejectedValue(MOCK_S3_ERROR); - - const promise = objectStoreService.list(prefix); - - await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); - }); - }); -}); diff --git a/packages/core/test/utils.ts b/packages/core/test/utils.ts new file mode 100644 index 0000000000000..09e2a859bf006 --- /dev/null +++ b/packages/core/test/utils.ts @@ -0,0 +1,22 @@ +import { Container } from 'typedi'; +import { mock } from 'jest-mock-extended'; +import { Duplex } from 'stream'; + +import type { DeepPartial } from 'ts-essentials'; + +export const mockInstance = ( + constructor: new (...args: unknown[]) => T, + data: DeepPartial | undefined = undefined, +) => { + const instance = mock(data); + Container.set(constructor, instance); + return instance; +}; + +export function toStream(buffer: Buffer) { + const duplexStream = new Duplex(); + duplexStream.push(buffer); + duplexStream.push(null); + + return duplexStream; +} diff --git a/packages/editor-ui/src/stores/workflows.store.ts b/packages/editor-ui/src/stores/workflows.store.ts index 093e714905fbf..e41696f9c5d7a 100644 --- a/packages/editor-ui/src/stores/workflows.store.ts +++ b/packages/editor-ui/src/stores/workflows.store.ts @@ -1385,16 +1385,17 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, { }, // Binary data getBinaryUrl( - dataPath: string, - mode: 'view' | 'download', + binaryDataId: string, + action: 'view' | 'download', fileName: string, mimeType: string, ): string { const rootStore = useRootStore(); let restUrl = rootStore.getRestUrl; if (restUrl.startsWith('/')) restUrl = window.location.origin + restUrl; - const url = new URL(`${restUrl}/data/${dataPath}`); - url.searchParams.append('mode', mode); + const url = new URL(`${restUrl}/data`); + url.searchParams.append('id', binaryDataId); + url.searchParams.append('action', action); if (fileName) url.searchParams.append('fileName', fileName); if (mimeType) url.searchParams.append('mimeType', mimeType); return url.toString(); diff --git a/packages/nodes-base/test/nodes/Helpers.ts b/packages/nodes-base/test/nodes/Helpers.ts index 6735223e3fdb3..905a02ba81568 100644 --- a/packages/nodes-base/test/nodes/Helpers.ts +++ b/packages/nodes-base/test/nodes/Helpers.ts @@ -219,7 +219,11 @@ export function createTemporaryDir(prefix = 'n8n') { export async function initBinaryDataService(mode: 'default' | 'filesystem' = 'default') { const binaryDataService = new BinaryDataService(); - await binaryDataService.init({ mode: 'default', availableModes: [mode] }); + await binaryDataService.init({ + mode: 'default', + availableModes: [mode], + localStoragePath: createTemporaryDir(), + }); Container.set(BinaryDataService, binaryDataService); }