Skip to content

Commit

Permalink
feat(core): Integrate object store as binary data manager (#7253)
Browse files Browse the repository at this point in the history
Depends on: #7225 | Story:
[PAY-848](https://linear.app/n8n/issue/PAY-848)

This PR integrates the object store service as a new binary data manager
for Enterprise.
  • Loading branch information
ivov authored Oct 5, 2023
1 parent e5ad1e7 commit 1a661e6
Show file tree
Hide file tree
Showing 28 changed files with 1,128 additions and 498 deletions.
13 changes: 13 additions & 0 deletions packages/cli/src/License.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import Container, { Service } from 'typedi';
import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces';
import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher';
import { RedisService } from './services/redis.service';
import { ObjectStoreService } from 'n8n-core';

type FeatureReturnType = Partial<
{
Expand Down Expand Up @@ -103,6 +104,18 @@ export class License {
command: 'reloadLicense',
});
}

const isS3Selected = config.getEnv('binaryDataManager.mode') === 's3';
const isS3Available = config.getEnv('binaryDataManager.availableModes').includes('s3');
const isS3Licensed = _features['feat:binaryDataS3'];

if (isS3Selected && isS3Available && !isS3Licensed) {
this.logger.debug(
'License changed with no support for external storage - blocking writes on object store. To restore writes, please upgrade to a license that supports this feature.',
);

Container.get(ObjectStoreService).setReadonly(true);
}
}

async saveCertStr(value: TLicenseBlock): Promise<void> {
Expand Down
29 changes: 20 additions & 9 deletions packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1446,28 +1446,39 @@ export class Server extends AbstractServer {
// Binary data
// ----------------------------------------

// Download binary
// View or download binary file
this.app.get(
`/${this.restEndpoint}/data/:path`,
`/${this.restEndpoint}/data`,
async (req: BinaryDataRequest, res: express.Response): Promise<void> => {
// TODO UM: check if this needs permission check for UM
const identifier = req.params.path;
const { id: binaryDataId, action } = req.query;
let { fileName, mimeType } = req.query;
const [mode] = binaryDataId.split(':') as ['filesystem' | 's3', string];

try {
const binaryPath = this.binaryDataService.getPath(identifier);
let { mode, fileName, mimeType } = req.query;
const binaryPath = this.binaryDataService.getPath(binaryDataId);

if (!fileName || !mimeType) {
try {
const metadata = await this.binaryDataService.getMetadata(identifier);
const metadata = await this.binaryDataService.getMetadata(binaryDataId);
fileName = metadata.fileName;
mimeType = metadata.mimeType;
res.setHeader('Content-Length', metadata.fileSize);
} catch {}
}

if (mimeType) res.setHeader('Content-Type', mimeType);
if (mode === 'download') {

if (action === 'download') {
res.setHeader('Content-Disposition', `attachment; filename="${fileName}"`);
}
res.sendFile(binaryPath);

if (mode === 's3') {
const readStream = await this.binaryDataService.getAsStream(binaryDataId);
readStream.pipe(res);
return;
} else {
res.sendFile(binaryPath);
}
} catch (error) {
if (error instanceof FileNotFoundError) res.writeHead(404).end();
else throw error;
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
workflowId: this.workflowData.id,
});

if (this.mode === 'webhook' && config.getEnv('binaryDataManager.mode') === 'filesystem') {
if (this.mode === 'webhook' && config.getEnv('binaryDataManager.mode') !== 'default') {
await restoreBinaryDataId(fullRunData, this.executionId);
}

Expand Down
116 changes: 114 additions & 2 deletions packages/cli/src/commands/BaseCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ import { ExitError } from '@oclif/errors';
import { Container } from 'typedi';
import { LoggerProxy, ErrorReporterProxy as ErrorReporter, sleep } from 'n8n-workflow';
import type { IUserSettings } from 'n8n-core';
import { BinaryDataService, UserSettings } from 'n8n-core';
import { BinaryDataService, ObjectStoreService, UserSettings } from 'n8n-core';
import type { AbstractServer } from '@/AbstractServer';
import { getLogger } from '@/Logger';
import config from '@/config';
import * as Db from '@/Db';
import * as CrashJournal from '@/CrashJournal';
import { inTest } from '@/constants';
import { LICENSE_FEATURES, inTest } from '@/constants';
import { CredentialTypes } from '@/CredentialTypes';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { initErrorHandling } from '@/ErrorReporting';
Expand Down Expand Up @@ -125,7 +125,119 @@ export abstract class BaseCommand extends Command {
process.exit(1);
}

async initObjectStoreService() {
const isSelected = config.getEnv('binaryDataManager.mode') === 's3';
const isAvailable = config.getEnv('binaryDataManager.availableModes').includes('s3');

if (!isSelected && !isAvailable) return;

if (isSelected && !isAvailable) {
throw new Error(
'External storage selected but unavailable. Please make external storage available by adding "s3" to `N8N_AVAILABLE_BINARY_DATA_MODES`.',
);
}

const isLicensed = Container.get(License).isFeatureEnabled(LICENSE_FEATURES.BINARY_DATA_S3);

if (isSelected && isAvailable && isLicensed) {
LoggerProxy.debug(
'License found for external storage - object store to init in read-write mode',
);

await this._initObjectStoreService();

return;
}

if (isSelected && isAvailable && !isLicensed) {
LoggerProxy.debug(
'No license found for external storage - object store to init with writes blocked. To enable writes, please upgrade to a license that supports this feature.',
);

await this._initObjectStoreService({ isReadOnly: true });

return;
}

if (!isSelected && isAvailable) {
LoggerProxy.debug(
'External storage unselected but available - object store to init with writes unused',
);

await this._initObjectStoreService();

return;
}
}

private async _initObjectStoreService(options = { isReadOnly: false }) {
const objectStoreService = Container.get(ObjectStoreService);

const host = config.getEnv('externalStorage.s3.host');

if (host === '') {
throw new Error(
'External storage host not configured. Please set `N8N_EXTERNAL_STORAGE_S3_HOST`.',
);
}

const bucket = {
name: config.getEnv('externalStorage.s3.bucket.name'),
region: config.getEnv('externalStorage.s3.bucket.region'),
};

if (bucket.name === '') {
throw new Error(
'External storage bucket name not configured. Please set `N8N_EXTERNAL_STORAGE_S3_BUCKET_NAME`.',
);
}

if (bucket.region === '') {
throw new Error(
'External storage bucket region not configured. Please set `N8N_EXTERNAL_STORAGE_S3_BUCKET_REGION`.',
);
}

const credentials = {
accessKey: config.getEnv('externalStorage.s3.credentials.accessKey'),
accessSecret: config.getEnv('externalStorage.s3.credentials.accessSecret'),
};

if (credentials.accessKey === '') {
throw new Error(
'External storage access key not configured. Please set `N8N_EXTERNAL_STORAGE_S3_ACCESS_KEY`.',
);
}

if (credentials.accessSecret === '') {
throw new Error(
'External storage access secret not configured. Please set `N8N_EXTERNAL_STORAGE_S3_ACCESS_SECRET`.',
);
}

LoggerProxy.debug('Initializing object store service');

try {
await objectStoreService.init(host, bucket, credentials);
objectStoreService.setReadonly(options.isReadOnly);

LoggerProxy.debug('Object store init completed');
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);

LoggerProxy.debug('Object store init failed', { error });
}
}

async initBinaryDataService() {
try {
await this.initObjectStoreService();
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
LoggerProxy.error(`Failed to init object store: ${error.message}`, { error });
process.exit(1);
}

const binaryDataConfig = config.getEnv('binaryDataManager');
await Container.get(BinaryDataService).init(binaryDataConfig);
}
Expand Down
41 changes: 40 additions & 1 deletion packages/cli/src/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,7 @@ export const schema = {
doc: 'Available modes of binary data storage, as comma separated strings',
},
mode: {
format: ['default', 'filesystem'] as const,
format: ['default', 'filesystem', 's3'] as const,
default: 'default',
env: 'N8N_DEFAULT_BINARY_DATA_MODE',
doc: 'Storage mode for binary data',
Expand All @@ -921,6 +921,45 @@ export const schema = {
},
},

externalStorage: {
s3: {
host: {
format: String,
default: '',
env: 'N8N_EXTERNAL_STORAGE_S3_HOST',
doc: 'Host of the n8n bucket in S3-compatible external storage, e.g. `s3.us-east-1.amazonaws.com`',
},
bucket: {
name: {
format: String,
default: '',
env: 'N8N_EXTERNAL_STORAGE_S3_BUCKET_NAME',
doc: 'Name of the n8n bucket in S3-compatible external storage',
},
region: {
format: String,
default: '',
env: 'N8N_EXTERNAL_STORAGE_S3_BUCKET_REGION',
doc: 'Region of the n8n bucket in S3-compatible external storage, e.g. `us-east-1`',
},
},
credentials: {
accessKey: {
format: String,
default: '',
env: 'N8N_EXTERNAL_STORAGE_S3_ACCESS_KEY',
doc: 'Access key in S3-compatible external storage',
},
accessSecret: {
format: String,
default: '',
env: 'N8N_EXTERNAL_STORAGE_S3_ACCESS_SECRET',
doc: 'Access secret in S3-compatible external storage',
},
},
},
},

deployment: {
type: {
format: String,
Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ export const LICENSE_FEATURES = {
SHOW_NON_PROD_BANNER: 'feat:showNonProdBanner',
WORKFLOW_HISTORY: 'feat:workflowHistory',
DEBUG_IN_EDITOR: 'feat:debugInEditor',
BINARY_DATA_S3: 'feat:binaryDataS3',
} as const;

export const LICENSE_QUOTAS = {
Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/controllers/e2e.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export class E2EController {
[LICENSE_FEATURES.SHOW_NON_PROD_BANNER]: false,
[LICENSE_FEATURES.WORKFLOW_HISTORY]: false,
[LICENSE_FEATURES.DEBUG_IN_EDITOR]: false,
[LICENSE_FEATURES.BINARY_DATA_S3]: false,
};

constructor(
Expand Down
33 changes: 21 additions & 12 deletions packages/cli/src/executionLifecycleHooks/restoreBinaryDataId.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import Container from 'typedi';
import { BinaryDataService } from 'n8n-core';
import type { IRun } from 'n8n-workflow';

export function isMissingExecutionId(binaryDataId: string) {
const UUID_CHAR_LENGTH = 36;

return [UUID_CHAR_LENGTH + 'filesystem:'.length, UUID_CHAR_LENGTH + 's3:'.length].some(
(incorrectLength) => binaryDataId.length === incorrectLength,
);
import type { BinaryData } from 'n8n-core';

export function isMissingExecutionId(
fileId: string,
mode: BinaryData.NonDefaultMode,
uuidV4CharLength = 36,
) {
return mode === 'filesystem' ? uuidV4CharLength === fileId.length : fileId.includes('/temp/');
}

/**
Expand All @@ -19,6 +20,9 @@ export function isMissingExecutionId(binaryDataId: string) {
* ```txt
* filesystem:11869055-83c4-4493-876a-9092c4708b9b ->
* filesystem:39011869055-83c4-4493-876a-9092c4708b9b
*
* s3:workflows/123/executions/temp/binary_data/69055-83c4-4493-876a-9092c4708b9b ->
* s3:workflows/123/executions/390/binary_data/69055-83c4-4493-876a-9092c4708b9b
* ```
*/
export async function restoreBinaryDataId(run: IRun, executionId: string) {
Expand All @@ -27,13 +31,18 @@ export async function restoreBinaryDataId(run: IRun, executionId: string) {
const promises = Object.keys(runData).map(async (nodeName) => {
const binaryDataId = runData[nodeName]?.[0]?.data?.main?.[0]?.[0]?.binary?.data.id;

if (!binaryDataId || !isMissingExecutionId(binaryDataId)) return;
if (!binaryDataId) return;

const [mode, incorrectFileId] = binaryDataId.split(':');
const correctFileId = `${executionId}${incorrectFileId}`;
const correctBinaryDataId = `${mode}:${correctFileId}`;
const [mode, fileId] = binaryDataId.split(':') as [BinaryData.NonDefaultMode, string];

await Container.get(BinaryDataService).rename(incorrectFileId, correctFileId);
if (!isMissingExecutionId(fileId, mode)) return;

const correctFileId =
mode === 'filesystem' ? `${executionId}${fileId}` : fileId.replace('temp', executionId);

await Container.get(BinaryDataService).rename(fileId, correctFileId);

const correctBinaryDataId = `${mode}:${correctFileId}`;

// @ts-expect-error Validated at the top
run.data.resultData.runData[nodeName][0].data.main[0][0].binary.data.id = correctBinaryDataId;
Expand Down
5 changes: 3 additions & 2 deletions packages/cli/src/requests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -492,11 +492,12 @@ export declare namespace LicenseRequest {
}

export type BinaryDataRequest = AuthenticatedRequest<
{ path: string },
{},
{},
{},
{
mode: 'view' | 'download';
id: string;
action: 'view' | 'download';
fileName?: string;
mimeType?: string;
}
Expand Down
1 change: 1 addition & 0 deletions packages/cli/test/integration/commands/worker.cmd.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const oclifConfig: Config.IConfig = new Config.Config({ root: __dirname });
beforeAll(async () => {
LoggerProxy.init(getLogger());
config.set('executions.mode', 'queue');
config.set('binaryDataManager.availableModes', 'filesystem');
mockInstance(Telemetry);
mockInstance(PostHogClient);
mockInstance(InternalHooks);
Expand Down
10 changes: 6 additions & 4 deletions packages/cli/test/integration/shared/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ export async function initNodeTypes() {
/**
* Initialize a BinaryDataService for test runs.
*/
export async function initBinaryDataService() {
export async function initBinaryDataService(mode: 'default' | 'filesystem' = 'default') {
const binaryDataService = new BinaryDataService();

await binaryDataService.init(config.getEnv('binaryDataManager'));

await binaryDataService.init({
mode,
availableModes: [mode],
localStoragePath: '',
});
Container.set(BinaryDataService, binaryDataService);
}

Expand Down
Loading

0 comments on commit 1a661e6

Please sign in to comment.