From 0847623f85192232d129778ab4295be3cd685877 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 10 Oct 2023 10:06:06 +0200 Subject: [PATCH] feat(core): Switch binary filesystem mode to nested path structure (#7307) Depends on #7253 | Story: [PAY-863](https://linear.app/n8n/issue/PAY-863/switch-binary-filesystem-mode-to-nested-path-structure) This PR introduces `filesystem-v2` to store binary data in the filesystem in the same format as `s3`. --- .../restoreBinaryDataId.ts | 22 +-- .../cli/src/workflows/workflows.services.ts | 7 + .../cli/test/unit/execution.lifecycle.test.ts | 169 +++++------------ .../core/src/BinaryData/BinaryData.service.ts | 18 +- .../core/src/BinaryData/FileSystem.manager.ts | 46 ++++- packages/core/src/BinaryData/errors.ts | 8 +- packages/core/src/BinaryData/types.ts | 26 ++- packages/core/src/BinaryData/utils.ts | 20 +- packages/core/src/index.ts | 2 +- packages/core/test/FileSystem.manager.test.ts | 172 ++++++++++++++++++ .../core/test/NodeExecuteFunctions.test.ts | 4 +- .../core/test/ObjectStore.manager.test.ts | 5 +- packages/core/test/utils.ts | 3 + 13 files changed, 321 insertions(+), 181 deletions(-) create mode 100644 packages/core/test/FileSystem.manager.test.ts diff --git a/packages/cli/src/executionLifecycleHooks/restoreBinaryDataId.ts b/packages/cli/src/executionLifecycleHooks/restoreBinaryDataId.ts index 9870f365fdc48..66c5c91b4f2f7 100644 --- a/packages/cli/src/executionLifecycleHooks/restoreBinaryDataId.ts +++ b/packages/cli/src/executionLifecycleHooks/restoreBinaryDataId.ts @@ -3,23 +3,14 @@ import { BinaryDataService } from 'n8n-core'; import type { IRun } from 'n8n-workflow'; import type { BinaryData } from 'n8n-core'; -export function isMissingExecutionId( - fileId: string, - mode: BinaryData.NonDefaultMode, - uuidV4CharLength = 36, -) { - return mode === 'filesystem' ? uuidV4CharLength === fileId.length : fileId.includes('/temp/'); -} - /** * Whenever the execution ID is not available to the binary data service at the * time of writing a binary data file, its name is missing the execution ID. - * * This function restores the ID in the file name and run data reference. * * ```txt - * filesystem:11869055-83c4-4493-876a-9092c4708b9b -> - * filesystem:39011869055-83c4-4493-876a-9092c4708b9b + * filesystem-v2:workflows/123/executions/temp/binary_data/69055-83c4-4493-876a-9092c4708b9b -> + * filesystem-v2:workflows/123/executions/390/binary_data/69055-83c4-4493-876a-9092c4708b9b * * s3:workflows/123/executions/temp/binary_data/69055-83c4-4493-876a-9092c4708b9b -> * s3:workflows/123/executions/390/binary_data/69055-83c4-4493-876a-9092c4708b9b @@ -33,12 +24,13 @@ export async function restoreBinaryDataId(run: IRun, executionId: string) { if (!binaryDataId) return; - const [mode, fileId] = binaryDataId.split(':') as [BinaryData.NonDefaultMode, string]; + const [mode, fileId] = binaryDataId.split(':') as [BinaryData.StoredMode, string]; + + const isMissingExecutionId = fileId.includes('/temp/'); - if (!isMissingExecutionId(fileId, mode)) return; + if (!isMissingExecutionId) return; - const correctFileId = - mode === 'filesystem' ? `${executionId}${fileId}` : fileId.replace('temp', executionId); + const correctFileId = fileId.replace('temp', executionId); await Container.get(BinaryDataService).rename(fileId, correctFileId); diff --git a/packages/cli/src/workflows/workflows.services.ts b/packages/cli/src/workflows/workflows.services.ts index 44a3ef3fcf0a1..c6115a1f3de07 100644 --- a/packages/cli/src/workflows/workflows.services.ts +++ b/packages/cli/src/workflows/workflows.services.ts @@ -35,6 +35,7 @@ import { OwnershipService } from '@/services/ownership.service'; import { isStringArray, isWorkflowIdValid } from '@/utils'; import { isWorkflowHistoryLicensed } from './workflowHistory/workflowHistoryHelper.ee'; import { WorkflowHistoryService } from './workflowHistory/workflowHistory.service.ee'; +import { BinaryDataService } from 'n8n-core'; export class WorkflowsService { static async getSharing( @@ -463,7 +464,13 @@ export class WorkflowsService { await Container.get(ActiveWorkflowRunner).remove(workflowId); } + const idsForDeletion = await Db.collections.Execution.find({ + select: ['id'], + where: { workflowId }, + }).then((rows) => rows.map(({ id: executionId }) => ({ workflowId, executionId }))); + await Db.collections.Workflow.delete(workflowId); + await Container.get(BinaryDataService).deleteMany(idsForDeletion); void Container.get(InternalHooks).onWorkflowDeleted(user, workflowId, false); await Container.get(ExternalHooks).run('workflow.afterDelete', [workflowId]); diff --git a/packages/cli/test/unit/execution.lifecycle.test.ts b/packages/cli/test/unit/execution.lifecycle.test.ts index b121d739caaac..611c10496d914 100644 --- a/packages/cli/test/unit/execution.lifecycle.test.ts +++ b/packages/cli/test/unit/execution.lifecycle.test.ts @@ -30,148 +30,77 @@ function getDataId(run: IRun, kind: 'binary' | 'json') { const binaryDataService = mockInstance(BinaryDataService); -describe('on filesystem mode', () => { - describe('restoreBinaryDataId()', () => { - beforeAll(() => { - config.set('binaryDataManager.mode', 'filesystem'); - }); - - afterEach(() => { - jest.clearAllMocks(); - }); - - it('should restore if binary data ID is missing execution ID', async () => { - const executionId = '999'; - const incorrectFileId = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6'; - const run = toIRun({ - binary: { - data: { id: `filesystem:${incorrectFileId}` }, - }, +for (const mode of ['filesystem-v2', 's3'] as const) { + describe(`on ${mode} mode`, () => { + describe('restoreBinaryDataId()', () => { + beforeAll(() => { + config.set('binaryDataManager.mode', mode); }); - await restoreBinaryDataId(run, executionId); + afterEach(() => { + jest.clearAllMocks(); + }); - const correctFileId = `${executionId}${incorrectFileId}`; - const correctBinaryDataId = `filesystem:${correctFileId}`; + it('should restore if binary data ID is missing execution ID', async () => { + const workflowId = '6HYhhKmJch2cYxGj'; + const executionId = 'temp'; + const binaryDataFileUuid = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6'; - expect(binaryDataService.rename).toHaveBeenCalledWith(incorrectFileId, correctFileId); - expect(getDataId(run, 'binary')).toBe(correctBinaryDataId); - }); + const incorrectFileId = `workflows/${workflowId}/executions/temp/binary_data/${binaryDataFileUuid}`; - it('should do nothing if binary data ID is not missing execution ID', async () => { - const executionId = '999'; - const fileId = `${executionId}a5c3f1ed-9d59-4155-bc68-9a370b3c51f6`; - const binaryDataId = `filesystem:${fileId}`; - const run = toIRun({ - binary: { - data: { - id: binaryDataId, + const run = toIRun({ + binary: { + data: { id: `s3:${incorrectFileId}` }, }, - }, - }); + }); - await restoreBinaryDataId(run, executionId); + await restoreBinaryDataId(run, executionId); - expect(binaryDataService.rename).not.toHaveBeenCalled(); - expect(getDataId(run, 'binary')).toBe(binaryDataId); - }); + const correctFileId = incorrectFileId.replace('temp', executionId); + const correctBinaryDataId = `s3:${correctFileId}`; - it('should do nothing if no binary data ID', async () => { - const executionId = '999'; - const dataId = '123'; - const run = toIRun({ - json: { - data: { id: dataId }, - }, + expect(binaryDataService.rename).toHaveBeenCalledWith(incorrectFileId, correctFileId); + expect(getDataId(run, 'binary')).toBe(correctBinaryDataId); }); - await restoreBinaryDataId(run, executionId); + it('should do nothing if binary data ID is not missing execution ID', async () => { + const workflowId = '6HYhhKmJch2cYxGj'; + const executionId = '999'; + const binaryDataFileUuid = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6'; - expect(binaryDataService.rename).not.toHaveBeenCalled(); - expect(getDataId(run, 'json')).toBe(dataId); - }); - }); -}); - -describe('on s3 mode', () => { - describe('restoreBinaryDataId()', () => { - beforeAll(() => { - config.set('binaryDataManager.mode', 's3'); - }); + const fileId = `workflows/${workflowId}/executions/${executionId}/binary_data/${binaryDataFileUuid}`; - afterEach(() => { - jest.clearAllMocks(); - }); + const binaryDataId = `s3:${fileId}`; - it('should restore if binary data ID is missing execution ID', async () => { - const workflowId = '6HYhhKmJch2cYxGj'; - const executionId = 'temp'; - const binaryDataFileUuid = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6'; + const run = toIRun({ + binary: { + data: { + id: binaryDataId, + }, + }, + }); - const incorrectFileId = `workflows/${workflowId}/executions/temp/binary_data/${binaryDataFileUuid}`; + await restoreBinaryDataId(run, executionId); - const run = toIRun({ - binary: { - data: { id: `s3:${incorrectFileId}` }, - }, + expect(binaryDataService.rename).not.toHaveBeenCalled(); + expect(getDataId(run, 'binary')).toBe(binaryDataId); }); - await restoreBinaryDataId(run, executionId); - - const correctFileId = incorrectFileId.replace('temp', executionId); - const correctBinaryDataId = `s3:${correctFileId}`; + it('should do nothing if no binary data ID', async () => { + const executionId = '999'; + const dataId = '123'; - expect(binaryDataService.rename).toHaveBeenCalledWith(incorrectFileId, correctFileId); - expect(getDataId(run, 'binary')).toBe(correctBinaryDataId); - }); - - it('should do nothing if binary data ID is not missing execution ID', async () => { - const workflowId = '6HYhhKmJch2cYxGj'; - const executionId = '999'; - const binaryDataFileUuid = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6'; - - const fileId = `workflows/${workflowId}/executions/${executionId}/binary_data/${binaryDataFileUuid}`; - - const binaryDataId = `s3:${fileId}`; - - const run = toIRun({ - binary: { - data: { - id: binaryDataId, + const run = toIRun({ + json: { + data: { id: dataId }, }, - }, - }); - - await restoreBinaryDataId(run, executionId); - - expect(binaryDataService.rename).not.toHaveBeenCalled(); - expect(getDataId(run, 'binary')).toBe(binaryDataId); - }); + }); - it('should do nothing if no binary data ID', async () => { - const executionId = '999'; - const dataId = '123'; + await restoreBinaryDataId(run, executionId); - const run = toIRun({ - json: { - data: { id: dataId }, - }, + expect(binaryDataService.rename).not.toHaveBeenCalled(); + expect(getDataId(run, 'json')).toBe(dataId); }); - - await restoreBinaryDataId(run, executionId); - - expect(binaryDataService.rename).not.toHaveBeenCalled(); - expect(getDataId(run, 'json')).toBe(dataId); }); }); - - it('should do nothing on itemless case', async () => { - const executionId = '999'; - - const promise = restoreBinaryDataId(toIRun(), executionId); - - await expect(promise).resolves.not.toThrow(); - - expect(binaryDataService.rename).not.toHaveBeenCalled(); - }); -}); +} diff --git a/packages/core/src/BinaryData/BinaryData.service.ts b/packages/core/src/BinaryData/BinaryData.service.ts index e54f24ba9d82f..393e42b4d6f8a 100644 --- a/packages/core/src/BinaryData/BinaryData.service.ts +++ b/packages/core/src/BinaryData/BinaryData.service.ts @@ -4,8 +4,8 @@ import { readFile, stat } from 'node:fs/promises'; import prettyBytes from 'pretty-bytes'; import Container, { Service } from 'typedi'; import { BINARY_ENCODING, LoggerProxy as Logger, IBinaryData } from 'n8n-workflow'; -import { UnknownBinaryDataManagerError, InvalidBinaryDataModeError } from './errors'; -import { areValidModes, toBuffer } from './utils'; +import { UnknownManagerError, InvalidModeError } from './errors'; +import { areConfigModes, toBuffer } from './utils'; import { LogCatch } from '../decorators/LogCatch.decorator'; import type { Readable } from 'stream'; @@ -14,21 +14,20 @@ import type { INodeExecutionData } from 'n8n-workflow'; @Service() export class BinaryDataService { - private mode: BinaryData.Mode = 'default'; + private mode: BinaryData.ServiceMode = 'default'; private managers: Record = {}; async init(config: BinaryData.Config) { - if (!areValidModes(config.availableModes)) { - throw new InvalidBinaryDataModeError(); - } + if (!areConfigModes(config.availableModes)) throw new InvalidModeError(); - this.mode = config.mode; + this.mode = config.mode === 'filesystem' ? 'filesystem-v2' : config.mode; if (config.availableModes.includes('filesystem')) { const { FileSystemManager } = await import('./FileSystem.manager'); this.managers.filesystem = new FileSystemManager(config.localStoragePath); + this.managers['filesystem-v2'] = this.managers.filesystem; await this.managers.filesystem.init(); } @@ -200,9 +199,6 @@ export class BinaryDataService { // private methods // ---------------------------------- - /** - * Create an identifier `${mode}:{fileId}` for `IBinaryData['id']`. - */ private createBinaryDataId(fileId: string) { return `${this.mode}:${fileId}`; } @@ -253,6 +249,6 @@ export class BinaryDataService { if (manager) return manager; - throw new UnknownBinaryDataManagerError(mode); + throw new UnknownManagerError(mode); } } diff --git a/packages/core/src/BinaryData/FileSystem.manager.ts b/packages/core/src/BinaryData/FileSystem.manager.ts index 9fa6688d678fa..a6264338d5283 100644 --- a/packages/core/src/BinaryData/FileSystem.manager.ts +++ b/packages/core/src/BinaryData/FileSystem.manager.ts @@ -3,7 +3,7 @@ import fs from 'node:fs/promises'; import path from 'node:path'; import { v4 as uuid } from 'uuid'; import { jsonParse } from 'n8n-workflow'; -import { ensureDirExists } from './utils'; +import { assertDir } from './utils'; import { FileNotFoundError } from '../errors'; import type { Readable } from 'stream'; @@ -16,7 +16,7 @@ export class FileSystemManager implements BinaryData.Manager { constructor(private storagePath: string) {} async init() { - await ensureDirExists(this.storagePath); + await assertDir(this.storagePath); } async store( @@ -28,6 +28,8 @@ export class FileSystemManager implements BinaryData.Manager { const fileId = this.toFileId(workflowId, executionId); const filePath = this.resolvePath(fileId); + await assertDir(path.dirname(filePath)); + await fs.writeFile(filePath, bufferOrStream); const fileSize = await this.getSize(fileId); @@ -64,6 +66,10 @@ export class FileSystemManager implements BinaryData.Manager { } async deleteMany(ids: BinaryData.IdsForDeletion) { + if (ids.length === 0) return; + + // binary files stored in single dir - `filesystem` + const executionIds = ids.map((o) => o.executionId); const set = new Set(executionIds); @@ -78,6 +84,18 @@ export class FileSystemManager implements BinaryData.Manager { await Promise.all([fs.rm(filePath), fs.rm(`${filePath}.metadata`)]); } } + + // binary files stored in nested dirs - `filesystem-v2` + + const binaryDataDirs = ids.map(({ workflowId, executionId }) => + this.resolvePath(`workflows/${workflowId}/executions/${executionId}/binary_data/`), + ); + + await Promise.all( + binaryDataDirs.map(async (dir) => { + await fs.rm(dir, { recursive: true }); + }), + ); } async copyByFilePath( @@ -89,6 +107,8 @@ export class FileSystemManager implements BinaryData.Manager { const targetFileId = this.toFileId(workflowId, executionId); const targetPath = this.resolvePath(targetFileId); + await assertDir(path.dirname(targetPath)); + await fs.cp(sourcePath, targetPath); const fileSize = await this.getSize(targetFileId); @@ -103,6 +123,8 @@ export class FileSystemManager implements BinaryData.Manager { const sourcePath = this.resolvePath(sourceFileId); const targetPath = this.resolvePath(targetFileId); + await assertDir(path.dirname(targetPath)); + await fs.copyFile(sourcePath, targetPath); return targetFileId; @@ -112,10 +134,17 @@ export class FileSystemManager implements BinaryData.Manager { const oldPath = this.resolvePath(oldFileId); const newPath = this.resolvePath(newFileId); + await assertDir(path.dirname(newPath)); + await Promise.all([ fs.rename(oldPath, newPath), fs.rename(`${oldPath}.metadata`, `${newPath}.metadata`), ]); + + const [tempDirParent] = oldPath.split('/temp/'); + const tempDir = path.join(tempDirParent, 'temp'); + + await fs.rm(tempDir, { recursive: true }); } // ---------------------------------- @@ -123,12 +152,15 @@ export class FileSystemManager implements BinaryData.Manager { // ---------------------------------- /** - * @tech_debt The `workflowId` argument is for compatibility with the - * `BinaryData.Manager` interface. Unused here until we refactor - * how we store binary data files in the `/binaryData` dir. + * Generate an ID for a binary data file. + * + * The legacy ID format `{executionId}{uuid}` for `filesystem` mode is + * no longer used on write, only when reading old stored execution data. */ - private toFileId(_workflowId: string, executionId: string) { - return [executionId, uuid()].join(''); + private toFileId(workflowId: string, executionId: string) { + if (!executionId) executionId = 'temp'; // missing only in edge case, see PR #7244 + + return `workflows/${workflowId}/executions/${executionId}/binary_data/${uuid()}`; } private resolvePath(...args: string[]) { diff --git a/packages/core/src/BinaryData/errors.ts b/packages/core/src/BinaryData/errors.ts index 6ff1a637c7666..31b3adc1e4599 100644 --- a/packages/core/src/BinaryData/errors.ts +++ b/packages/core/src/BinaryData/errors.ts @@ -1,10 +1,10 @@ -import { BINARY_DATA_MODES } from './utils'; +import { CONFIG_MODES } from './utils'; -export class InvalidBinaryDataModeError extends Error { - message = `Invalid binary data mode. Valid modes: ${BINARY_DATA_MODES.join(', ')}`; +export class InvalidModeError extends Error { + message = `Invalid binary data mode. Valid modes: ${CONFIG_MODES.join(', ')}`; } -export class UnknownBinaryDataManagerError extends Error { +export class UnknownManagerError extends Error { constructor(mode: string) { super(`No binary data manager found for: ${mode}`); } diff --git a/packages/core/src/BinaryData/types.ts b/packages/core/src/BinaryData/types.ts index 84c38401ee0d2..2067d90c276c0 100644 --- a/packages/core/src/BinaryData/types.ts +++ b/packages/core/src/BinaryData/types.ts @@ -1,14 +1,30 @@ import type { Readable } from 'stream'; -import type { BINARY_DATA_MODES } from './utils'; export namespace BinaryData { - export type Mode = (typeof BINARY_DATA_MODES)[number]; + type LegacyMode = 'filesystem'; - export type NonDefaultMode = Exclude; + type UpgradedMode = 'filesystem-v2'; + + /** + * Binary data mode selectable by user via env var config. + */ + export type ConfigMode = 'default' | 'filesystem' | 's3'; + + /** + * Binary data mode used internally by binary data service. User-selected + * legacy modes are replaced with upgraded modes. + */ + export type ServiceMode = Exclude | UpgradedMode; + + /** + * Binary data mode in binary data ID in stored execution data. Both legacy + * and upgraded modes may be present, except default in-memory mode. + */ + export type StoredMode = Exclude; export type Config = { - mode: Mode; - availableModes: string[]; + mode: ConfigMode; + availableModes: ConfigMode[]; localStoragePath: string; }; diff --git a/packages/core/src/BinaryData/utils.ts b/packages/core/src/BinaryData/utils.ts index 96eeb9fbfe5b9..14a9b457886c0 100644 --- a/packages/core/src/BinaryData/utils.ts +++ b/packages/core/src/BinaryData/utils.ts @@ -3,23 +3,19 @@ import type { Readable } from 'node:stream'; import type { BinaryData } from './types'; import concatStream from 'concat-stream'; -/** - * Modes for storing binary data: - * - `default` (in memory) - * - `filesystem` (on disk) - * - `s3` (S3-compatible storage) - */ -export const BINARY_DATA_MODES = ['default', 'filesystem', 's3'] as const; +export const CONFIG_MODES = ['default', 'filesystem', 's3'] as const; -export function areValidModes(modes: string[]): modes is BinaryData.Mode[] { - return modes.every((m) => BINARY_DATA_MODES.includes(m as BinaryData.Mode)); +const STORED_MODES = ['filesystem', 'filesystem-v2', 's3'] as const; + +export function areConfigModes(modes: string[]): modes is BinaryData.ConfigMode[] { + return modes.every((m) => CONFIG_MODES.includes(m as BinaryData.ConfigMode)); } -export function isValidNonDefaultMode(mode: string): mode is BinaryData.NonDefaultMode { - return BINARY_DATA_MODES.filter((m) => m !== 'default').includes(mode as BinaryData.Mode); +export function isStoredMode(mode: string): mode is BinaryData.StoredMode { + return STORED_MODES.includes(mode as BinaryData.StoredMode); } -export async function ensureDirExists(dir: string) { +export async function assertDir(dir: string) { try { await fs.access(dir); } catch { diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 3b39ec9bad709..0325dcfdb8276 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -18,4 +18,4 @@ export { NodeExecuteFunctions, UserSettings }; export * from './errors'; export { ObjectStoreService } from './ObjectStore/ObjectStore.service.ee'; export { BinaryData } from './BinaryData/types'; -export { isValidNonDefaultMode } from './BinaryData/utils'; +export { isStoredMode as isValidNonDefaultMode } from './BinaryData/utils'; diff --git a/packages/core/test/FileSystem.manager.test.ts b/packages/core/test/FileSystem.manager.test.ts new file mode 100644 index 0000000000000..3f72a40a54081 --- /dev/null +++ b/packages/core/test/FileSystem.manager.test.ts @@ -0,0 +1,172 @@ +import path from 'node:path'; +import fs from 'node:fs'; +import fsp from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { FileSystemManager } from '@/BinaryData/FileSystem.manager'; +import { isStream } from '@/ObjectStore/utils'; +import { toFileId, toStream } from './utils'; + +jest.mock('fs'); +jest.mock('fs/promises'); + +const storagePath = tmpdir(); + +const fsManager = new FileSystemManager(storagePath); + +const toFullFilePath = (fileId: string) => path.join(storagePath, fileId); + +const workflowId = 'ObogjVbqpNOQpiyV'; +const executionId = '999'; +const fileUuid = '71f6209b-5d48-41a2-a224-80d529d8bb32'; +const fileId = toFileId(workflowId, executionId, fileUuid); + +const otherWorkflowId = 'FHio8ftV6SrCAfPJ'; +const otherExecutionId = '888'; +const otherFileUuid = '71f6209b-5d48-41a2-a224-80d529d8bb33'; +const otherFileId = toFileId(otherWorkflowId, otherExecutionId, otherFileUuid); + +const mockBuffer = Buffer.from('Test data'); +const mockStream = toStream(mockBuffer); + +afterAll(() => { + jest.restoreAllMocks(); +}); + +describe('store()', () => { + it('should store a buffer', async () => { + const metadata = { mimeType: 'text/plain' }; + + const result = await fsManager.store(workflowId, executionId, mockBuffer, metadata); + + expect(result.fileSize).toBe(mockBuffer.length); + }); +}); + +describe('getPath()', () => { + it('should return a path', async () => { + const filePath = fsManager.getPath(fileId); + + expect(filePath).toBe(toFullFilePath(fileId)); + }); +}); + +describe('getAsBuffer()', () => { + it('should return a buffer', async () => { + fsp.readFile = jest.fn().mockResolvedValue(mockBuffer); + + const result = await fsManager.getAsBuffer(fileId); + + expect(Buffer.isBuffer(result)).toBe(true); + expect(fsp.readFile).toHaveBeenCalledWith(toFullFilePath(fileId)); + }); +}); + +describe('getAsStream()', () => { + it('should return a stream', async () => { + fs.createReadStream = jest.fn().mockReturnValue(mockStream); + + const stream = await fsManager.getAsStream(fileId); + + expect(isStream(stream)).toBe(true); + expect(fs.createReadStream).toHaveBeenCalledWith(toFullFilePath(fileId), { + highWaterMark: undefined, + }); + }); +}); + +describe('getMetadata()', () => { + it('should return metadata', async () => { + const mimeType = 'text/plain'; + const fileName = 'file.txt'; + + fsp.readFile = jest.fn().mockResolvedValue( + JSON.stringify({ + fileSize: 1, + mimeType, + fileName, + }), + ); + + const metadata = await fsManager.getMetadata(fileId); + + expect(metadata).toEqual(expect.objectContaining({ fileSize: 1, mimeType, fileName })); + }); +}); + +describe('copyByFileId()', () => { + it('should copy by file ID and return the file ID', async () => { + fsp.copyFile = jest.fn().mockResolvedValue(undefined); + + // @ts-expect-error - private method + jest.spyOn(fsManager, 'toFileId').mockReturnValue(otherFileId); + + const targetFileId = await fsManager.copyByFileId(workflowId, executionId, fileId); + + const sourcePath = toFullFilePath(fileId); + const targetPath = toFullFilePath(targetFileId); + + expect(fsp.copyFile).toHaveBeenCalledWith(sourcePath, targetPath); + }); +}); + +describe('copyByFilePath()', () => { + test('should copy by file path and return the file ID and size', async () => { + const sourceFilePath = tmpdir(); + const metadata = { mimeType: 'text/plain' }; + + // @ts-expect-error - private method + jest.spyOn(fsManager, 'toFileId').mockReturnValue(otherFileId); + + // @ts-expect-error - private method + jest.spyOn(fsManager, 'getSize').mockReturnValue(mockBuffer.length); + + const targetPath = toFullFilePath(otherFileId); + + fsp.cp = jest.fn().mockResolvedValue(undefined); + + const result = await fsManager.copyByFilePath( + workflowId, + executionId, + sourceFilePath, + metadata, + ); + + expect(fsp.cp).toHaveBeenCalledWith(sourceFilePath, targetPath); + expect(result.fileSize).toBe(mockBuffer.length); + }); +}); + +describe('deleteMany()', () => { + it('should delete many files by workflow ID and execution ID', async () => { + const ids = [ + { workflowId, executionId }, + { workflowId: otherWorkflowId, executionId: otherExecutionId }, + ]; + + fsp.rm = jest.fn().mockResolvedValue(undefined); + + const promise = fsManager.deleteMany(ids); + + await expect(promise).resolves.not.toThrow(); + + expect(fsp.rm).toHaveBeenCalledTimes(2); + }); +}); + +describe('rename()', () => { + it('should rename a file', async () => { + fsp.rename = jest.fn().mockResolvedValue(undefined); + fsp.rm = jest.fn().mockResolvedValue(undefined); + + const promise = fsManager.rename(fileId, otherFileId); + + const oldPath = toFullFilePath(fileId); + const newPath = toFullFilePath(otherFileId); + + await expect(promise).resolves.not.toThrow(); + + expect(fsp.rename).toHaveBeenCalledTimes(2); + expect(fsp.rename).toHaveBeenCalledWith(oldPath, newPath); + expect(fsp.rename).toHaveBeenCalledWith(`${oldPath}.metadata`, `${newPath}.metadata`); + }); +}); diff --git a/packages/core/test/NodeExecuteFunctions.test.ts b/packages/core/test/NodeExecuteFunctions.test.ts index 5009159921def..c186d9d08bf7d 100644 --- a/packages/core/test/NodeExecuteFunctions.test.ts +++ b/packages/core/test/NodeExecuteFunctions.test.ts @@ -102,12 +102,12 @@ describe('NodeExecuteFunctions', () => { ); // Expect our return object to contain the name of the configured data manager. - expect(setBinaryDataBufferResponse.data).toEqual('filesystem'); + expect(setBinaryDataBufferResponse.data).toEqual('filesystem-v2'); // Ensure that the input data was successfully persisted to disk. expect( readFileSync( - `${temporaryDir}/${setBinaryDataBufferResponse.id?.replace('filesystem:', '')}`, + `${temporaryDir}/${setBinaryDataBufferResponse.id?.replace('filesystem-v2:', '')}`, ), ).toEqual(inputData); diff --git a/packages/core/test/ObjectStore.manager.test.ts b/packages/core/test/ObjectStore.manager.test.ts index a9f23102fbaa6..79fa51910934f 100644 --- a/packages/core/test/ObjectStore.manager.test.ts +++ b/packages/core/test/ObjectStore.manager.test.ts @@ -2,16 +2,13 @@ import fs from 'node:fs/promises'; import { ObjectStoreManager } from '@/BinaryData/ObjectStore.manager'; import { ObjectStoreService } from '@/ObjectStore/ObjectStore.service.ee'; import { isStream } from '@/ObjectStore/utils'; -import { mockInstance, toStream } from './utils'; +import { mockInstance, toFileId, toStream } from './utils'; jest.mock('fs/promises'); const objectStoreService = mockInstance(ObjectStoreService); const objectStoreManager = new ObjectStoreManager(objectStoreService); -const toFileId = (workflowId: string, executionId: string, fileUuid: string) => - `workflows/${workflowId}/executions/${executionId}/binary_data/${fileUuid}`; - const workflowId = 'ObogjVbqpNOQpiyV'; const executionId = '999'; const fileUuid = '71f6209b-5d48-41a2-a224-80d529d8bb32'; diff --git a/packages/core/test/utils.ts b/packages/core/test/utils.ts index 09e2a859bf006..9dd142ebd5f9c 100644 --- a/packages/core/test/utils.ts +++ b/packages/core/test/utils.ts @@ -20,3 +20,6 @@ export function toStream(buffer: Buffer) { return duplexStream; } + +export const toFileId = (workflowId: string, executionId: string, fileUuid: string) => + `workflows/${workflowId}/executions/${executionId}/binary_data/${fileUuid}`;