Skip to content

Commit

Permalink
feat(core): Switch binary filesystem mode to nested path structure (#…
Browse files Browse the repository at this point in the history
…7307)

Depends on #7253 | Story:
[PAY-863](https://linear.app/n8n/issue/PAY-863/switch-binary-filesystem-mode-to-nested-path-structure)

This PR introduces `filesystem-v2` to store binary data in the
filesystem in the same format as `s3`.
  • Loading branch information
ivov authored Oct 10, 2023
1 parent 86e7ec7 commit 0847623
Show file tree
Hide file tree
Showing 13 changed files with 321 additions and 181 deletions.
22 changes: 7 additions & 15 deletions packages/cli/src/executionLifecycleHooks/restoreBinaryDataId.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,14 @@ import { BinaryDataService } from 'n8n-core';
import type { IRun } from 'n8n-workflow';
import type { BinaryData } from 'n8n-core';

export function isMissingExecutionId(
fileId: string,
mode: BinaryData.NonDefaultMode,
uuidV4CharLength = 36,
) {
return mode === 'filesystem' ? uuidV4CharLength === fileId.length : fileId.includes('/temp/');
}

/**
* Whenever the execution ID is not available to the binary data service at the
* time of writing a binary data file, its name is missing the execution ID.
*
* This function restores the ID in the file name and run data reference.
*
* ```txt
* filesystem:11869055-83c4-4493-876a-9092c4708b9b ->
* filesystem:39011869055-83c4-4493-876a-9092c4708b9b
* filesystem-v2:workflows/123/executions/temp/binary_data/69055-83c4-4493-876a-9092c4708b9b ->
* filesystem-v2:workflows/123/executions/390/binary_data/69055-83c4-4493-876a-9092c4708b9b
*
* s3:workflows/123/executions/temp/binary_data/69055-83c4-4493-876a-9092c4708b9b ->
* s3:workflows/123/executions/390/binary_data/69055-83c4-4493-876a-9092c4708b9b
Expand All @@ -33,12 +24,13 @@ export async function restoreBinaryDataId(run: IRun, executionId: string) {

if (!binaryDataId) return;

const [mode, fileId] = binaryDataId.split(':') as [BinaryData.NonDefaultMode, string];
const [mode, fileId] = binaryDataId.split(':') as [BinaryData.StoredMode, string];

const isMissingExecutionId = fileId.includes('/temp/');

if (!isMissingExecutionId(fileId, mode)) return;
if (!isMissingExecutionId) return;

const correctFileId =
mode === 'filesystem' ? `${executionId}${fileId}` : fileId.replace('temp', executionId);
const correctFileId = fileId.replace('temp', executionId);

await Container.get(BinaryDataService).rename(fileId, correctFileId);

Expand Down
7 changes: 7 additions & 0 deletions packages/cli/src/workflows/workflows.services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { OwnershipService } from '@/services/ownership.service';
import { isStringArray, isWorkflowIdValid } from '@/utils';
import { isWorkflowHistoryLicensed } from './workflowHistory/workflowHistoryHelper.ee';
import { WorkflowHistoryService } from './workflowHistory/workflowHistory.service.ee';
import { BinaryDataService } from 'n8n-core';

export class WorkflowsService {
static async getSharing(
Expand Down Expand Up @@ -463,7 +464,13 @@ export class WorkflowsService {
await Container.get(ActiveWorkflowRunner).remove(workflowId);
}

const idsForDeletion = await Db.collections.Execution.find({
select: ['id'],
where: { workflowId },
}).then((rows) => rows.map(({ id: executionId }) => ({ workflowId, executionId })));

await Db.collections.Workflow.delete(workflowId);
await Container.get(BinaryDataService).deleteMany(idsForDeletion);

void Container.get(InternalHooks).onWorkflowDeleted(user, workflowId, false);
await Container.get(ExternalHooks).run('workflow.afterDelete', [workflowId]);
Expand Down
169 changes: 49 additions & 120 deletions packages/cli/test/unit/execution.lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,148 +30,77 @@ function getDataId(run: IRun, kind: 'binary' | 'json') {

const binaryDataService = mockInstance(BinaryDataService);

describe('on filesystem mode', () => {
describe('restoreBinaryDataId()', () => {
beforeAll(() => {
config.set('binaryDataManager.mode', 'filesystem');
});

afterEach(() => {
jest.clearAllMocks();
});

it('should restore if binary data ID is missing execution ID', async () => {
const executionId = '999';
const incorrectFileId = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6';
const run = toIRun({
binary: {
data: { id: `filesystem:${incorrectFileId}` },
},
for (const mode of ['filesystem-v2', 's3'] as const) {
describe(`on ${mode} mode`, () => {
describe('restoreBinaryDataId()', () => {
beforeAll(() => {
config.set('binaryDataManager.mode', mode);
});

await restoreBinaryDataId(run, executionId);
afterEach(() => {
jest.clearAllMocks();
});

const correctFileId = `${executionId}${incorrectFileId}`;
const correctBinaryDataId = `filesystem:${correctFileId}`;
it('should restore if binary data ID is missing execution ID', async () => {
const workflowId = '6HYhhKmJch2cYxGj';
const executionId = 'temp';
const binaryDataFileUuid = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6';

expect(binaryDataService.rename).toHaveBeenCalledWith(incorrectFileId, correctFileId);
expect(getDataId(run, 'binary')).toBe(correctBinaryDataId);
});
const incorrectFileId = `workflows/${workflowId}/executions/temp/binary_data/${binaryDataFileUuid}`;

it('should do nothing if binary data ID is not missing execution ID', async () => {
const executionId = '999';
const fileId = `${executionId}a5c3f1ed-9d59-4155-bc68-9a370b3c51f6`;
const binaryDataId = `filesystem:${fileId}`;
const run = toIRun({
binary: {
data: {
id: binaryDataId,
const run = toIRun({
binary: {
data: { id: `s3:${incorrectFileId}` },
},
},
});
});

await restoreBinaryDataId(run, executionId);
await restoreBinaryDataId(run, executionId);

expect(binaryDataService.rename).not.toHaveBeenCalled();
expect(getDataId(run, 'binary')).toBe(binaryDataId);
});
const correctFileId = incorrectFileId.replace('temp', executionId);
const correctBinaryDataId = `s3:${correctFileId}`;

it('should do nothing if no binary data ID', async () => {
const executionId = '999';
const dataId = '123';
const run = toIRun({
json: {
data: { id: dataId },
},
expect(binaryDataService.rename).toHaveBeenCalledWith(incorrectFileId, correctFileId);
expect(getDataId(run, 'binary')).toBe(correctBinaryDataId);
});

await restoreBinaryDataId(run, executionId);
it('should do nothing if binary data ID is not missing execution ID', async () => {
const workflowId = '6HYhhKmJch2cYxGj';
const executionId = '999';
const binaryDataFileUuid = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6';

expect(binaryDataService.rename).not.toHaveBeenCalled();
expect(getDataId(run, 'json')).toBe(dataId);
});
});
});

describe('on s3 mode', () => {
describe('restoreBinaryDataId()', () => {
beforeAll(() => {
config.set('binaryDataManager.mode', 's3');
});
const fileId = `workflows/${workflowId}/executions/${executionId}/binary_data/${binaryDataFileUuid}`;

afterEach(() => {
jest.clearAllMocks();
});
const binaryDataId = `s3:${fileId}`;

it('should restore if binary data ID is missing execution ID', async () => {
const workflowId = '6HYhhKmJch2cYxGj';
const executionId = 'temp';
const binaryDataFileUuid = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6';
const run = toIRun({
binary: {
data: {
id: binaryDataId,
},
},
});

const incorrectFileId = `workflows/${workflowId}/executions/temp/binary_data/${binaryDataFileUuid}`;
await restoreBinaryDataId(run, executionId);

const run = toIRun({
binary: {
data: { id: `s3:${incorrectFileId}` },
},
expect(binaryDataService.rename).not.toHaveBeenCalled();
expect(getDataId(run, 'binary')).toBe(binaryDataId);
});

await restoreBinaryDataId(run, executionId);

const correctFileId = incorrectFileId.replace('temp', executionId);
const correctBinaryDataId = `s3:${correctFileId}`;
it('should do nothing if no binary data ID', async () => {
const executionId = '999';
const dataId = '123';

expect(binaryDataService.rename).toHaveBeenCalledWith(incorrectFileId, correctFileId);
expect(getDataId(run, 'binary')).toBe(correctBinaryDataId);
});

it('should do nothing if binary data ID is not missing execution ID', async () => {
const workflowId = '6HYhhKmJch2cYxGj';
const executionId = '999';
const binaryDataFileUuid = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6';

const fileId = `workflows/${workflowId}/executions/${executionId}/binary_data/${binaryDataFileUuid}`;

const binaryDataId = `s3:${fileId}`;

const run = toIRun({
binary: {
data: {
id: binaryDataId,
const run = toIRun({
json: {
data: { id: dataId },
},
},
});

await restoreBinaryDataId(run, executionId);

expect(binaryDataService.rename).not.toHaveBeenCalled();
expect(getDataId(run, 'binary')).toBe(binaryDataId);
});
});

it('should do nothing if no binary data ID', async () => {
const executionId = '999';
const dataId = '123';
await restoreBinaryDataId(run, executionId);

const run = toIRun({
json: {
data: { id: dataId },
},
expect(binaryDataService.rename).not.toHaveBeenCalled();
expect(getDataId(run, 'json')).toBe(dataId);
});

await restoreBinaryDataId(run, executionId);

expect(binaryDataService.rename).not.toHaveBeenCalled();
expect(getDataId(run, 'json')).toBe(dataId);
});
});

it('should do nothing on itemless case', async () => {
const executionId = '999';

const promise = restoreBinaryDataId(toIRun(), executionId);

await expect(promise).resolves.not.toThrow();

expect(binaryDataService.rename).not.toHaveBeenCalled();
});
});
}
18 changes: 7 additions & 11 deletions packages/core/src/BinaryData/BinaryData.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { readFile, stat } from 'node:fs/promises';
import prettyBytes from 'pretty-bytes';
import Container, { Service } from 'typedi';
import { BINARY_ENCODING, LoggerProxy as Logger, IBinaryData } from 'n8n-workflow';
import { UnknownBinaryDataManagerError, InvalidBinaryDataModeError } from './errors';
import { areValidModes, toBuffer } from './utils';
import { UnknownManagerError, InvalidModeError } from './errors';
import { areConfigModes, toBuffer } from './utils';
import { LogCatch } from '../decorators/LogCatch.decorator';

import type { Readable } from 'stream';
Expand All @@ -14,21 +14,20 @@ import type { INodeExecutionData } from 'n8n-workflow';

@Service()
export class BinaryDataService {
private mode: BinaryData.Mode = 'default';
private mode: BinaryData.ServiceMode = 'default';

private managers: Record<string, BinaryData.Manager> = {};

async init(config: BinaryData.Config) {
if (!areValidModes(config.availableModes)) {
throw new InvalidBinaryDataModeError();
}
if (!areConfigModes(config.availableModes)) throw new InvalidModeError();

this.mode = config.mode;
this.mode = config.mode === 'filesystem' ? 'filesystem-v2' : config.mode;

if (config.availableModes.includes('filesystem')) {
const { FileSystemManager } = await import('./FileSystem.manager');

this.managers.filesystem = new FileSystemManager(config.localStoragePath);
this.managers['filesystem-v2'] = this.managers.filesystem;

await this.managers.filesystem.init();
}
Expand Down Expand Up @@ -200,9 +199,6 @@ export class BinaryDataService {
// private methods
// ----------------------------------

/**
* Create an identifier `${mode}:{fileId}` for `IBinaryData['id']`.
*/
private createBinaryDataId(fileId: string) {
return `${this.mode}:${fileId}`;
}
Expand Down Expand Up @@ -253,6 +249,6 @@ export class BinaryDataService {

if (manager) return manager;

throw new UnknownBinaryDataManagerError(mode);
throw new UnknownManagerError(mode);
}
}
Loading

0 comments on commit 0847623

Please sign in to comment.