Skip to content

Commit

Permalink
fix(core): Split event bus controller into community and ee (#7107)
Browse files Browse the repository at this point in the history
  • Loading branch information
flipswitchingmonkey authored Sep 5, 2023
1 parent 6aa7b93 commit 011ee2e
Show file tree
Hide file tree
Showing 7 changed files with 566 additions and 465 deletions.
2 changes: 2 additions & 0 deletions packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { toHttpNodeParameters } from '@/CurlConverterHelper';
import { EventBusController } from '@/eventbus/eventBus.controller';
import { EventBusControllerEE } from '@/eventbus/eventBus.controller.ee';
import { isLogStreamingEnabled } from '@/eventbus/MessageEventBus/MessageEventBusHelper';
import { licenseController } from './license/license.controller';
import { Push, setupPushServer, setupPushHandler } from '@/push';
Expand Down Expand Up @@ -508,6 +509,7 @@ export class Server extends AbstractServer {

const controllers: object[] = [
new EventBusController(),
new EventBusControllerEE(),
new AuthController(config, logger, internalHooks, mfaService, userService, postHog),
new OwnerController(
config,
Expand Down
132 changes: 132 additions & 0 deletions packages/cli/src/eventbus/eventBus.controller.ee.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import express from 'express';
import { eventBus } from './MessageEventBus/MessageEventBus';
import {
isMessageEventBusDestinationSentryOptions,
MessageEventBusDestinationSentry,
} from './MessageEventBusDestination/MessageEventBusDestinationSentry.ee';
import {
isMessageEventBusDestinationSyslogOptions,
MessageEventBusDestinationSyslog,
} from './MessageEventBusDestination/MessageEventBusDestinationSyslog.ee';
import { MessageEventBusDestinationWebhook } from './MessageEventBusDestination/MessageEventBusDestinationWebhook.ee';
import { BadRequestError } from '@/ResponseHelper';
import type {
MessageEventBusDestinationWebhookOptions,
MessageEventBusDestinationOptions,
} from 'n8n-workflow';
import { MessageEventBusDestinationTypeNames } from 'n8n-workflow';
import { RestController, Get, Post, Delete, Authorized } from '@/decorators';
import type { MessageEventBusDestination } from './MessageEventBusDestination/MessageEventBusDestination.ee';
import type { DeleteResult } from 'typeorm';
import { AuthenticatedRequest } from '@/requests';
import { logStreamingLicensedMiddleware } from './middleware/logStreamingEnabled.middleware.ee';

// ----------------------------------------
// TypeGuards
// ----------------------------------------

const isWithIdString = (candidate: unknown): candidate is { id: string } => {
const o = candidate as { id: string };
if (!o) return false;
return o.id !== undefined;
};

const isMessageEventBusDestinationWebhookOptions = (
candidate: unknown,
): candidate is MessageEventBusDestinationWebhookOptions => {
const o = candidate as MessageEventBusDestinationWebhookOptions;
if (!o) return false;
return o.url !== undefined;
};

const isMessageEventBusDestinationOptions = (
candidate: unknown,
): candidate is MessageEventBusDestinationOptions => {
const o = candidate as MessageEventBusDestinationOptions;
if (!o) return false;
return o.__type !== undefined;
};

// ----------------------------------------
// Controller
// ----------------------------------------

@Authorized()
@RestController('/eventbus')
export class EventBusControllerEE {
// ----------------------------------------
// Destinations
// ----------------------------------------

@Get('/destination', { middlewares: [logStreamingLicensedMiddleware] })
async getDestination(req: express.Request): Promise<MessageEventBusDestinationOptions[]> {
if (isWithIdString(req.query)) {
return eventBus.findDestination(req.query.id);
} else {
return eventBus.findDestination();
}
}

@Authorized(['global', 'owner'])
@Post('/destination', { middlewares: [logStreamingLicensedMiddleware] })
async postDestination(req: AuthenticatedRequest): Promise<any> {
let result: MessageEventBusDestination | undefined;
if (isMessageEventBusDestinationOptions(req.body)) {
switch (req.body.__type) {
case MessageEventBusDestinationTypeNames.sentry:
if (isMessageEventBusDestinationSentryOptions(req.body)) {
result = await eventBus.addDestination(
new MessageEventBusDestinationSentry(eventBus, req.body),
);
}
break;
case MessageEventBusDestinationTypeNames.webhook:
if (isMessageEventBusDestinationWebhookOptions(req.body)) {
result = await eventBus.addDestination(
new MessageEventBusDestinationWebhook(eventBus, req.body),
);
}
break;
case MessageEventBusDestinationTypeNames.syslog:
if (isMessageEventBusDestinationSyslogOptions(req.body)) {
result = await eventBus.addDestination(
new MessageEventBusDestinationSyslog(eventBus, req.body),
);
}
break;
default:
throw new BadRequestError(
`Body is missing ${req.body.__type} options or type ${req.body.__type} is unknown`,
);
}
if (result) {
await result.saveToDb();
return {
...result.serialize(),
eventBusInstance: undefined,
};
}
throw new BadRequestError('There was an error adding the destination');
}
throw new BadRequestError('Body is not configuring MessageEventBusDestinationOptions');
}

@Get('/testmessage', { middlewares: [logStreamingLicensedMiddleware] })
async sendTestMessage(req: express.Request): Promise<boolean> {
if (isWithIdString(req.query)) {
return eventBus.testDestination(req.query.id);
}
return false;
}

@Authorized(['global', 'owner'])
@Delete('/destination', { middlewares: [logStreamingLicensedMiddleware] })
async deleteDestination(req: AuthenticatedRequest): Promise<DeleteResult | undefined> {
if (isWithIdString(req.query)) {
return eventBus.removeDestination(req.query.id);
} else {
throw new BadRequestError('Query is missing id');
}
}
}
119 changes: 3 additions & 116 deletions packages/cli/src/eventbus/eventBus.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,66 +6,28 @@ import type { EventMessageWorkflowOptions } from './EventMessageClasses/EventMes
import { EventMessageWorkflow } from './EventMessageClasses/EventMessageWorkflow';
import type { EventMessageReturnMode } from './MessageEventBus/MessageEventBus';
import { eventBus } from './MessageEventBus/MessageEventBus';
import {
isMessageEventBusDestinationSentryOptions,
MessageEventBusDestinationSentry,
} from './MessageEventBusDestination/MessageEventBusDestinationSentry.ee';
import {
isMessageEventBusDestinationSyslogOptions,
MessageEventBusDestinationSyslog,
} from './MessageEventBusDestination/MessageEventBusDestinationSyslog.ee';
import { MessageEventBusDestinationWebhook } from './MessageEventBusDestination/MessageEventBusDestinationWebhook.ee';
import type { EventMessageTypes, FailedEventSummary } from './EventMessageClasses';
import { eventNamesAll } from './EventMessageClasses';
import type { EventMessageAuditOptions } from './EventMessageClasses/EventMessageAudit';
import { EventMessageAudit } from './EventMessageClasses/EventMessageAudit';
import { BadRequestError } from '@/ResponseHelper';
import type {
MessageEventBusDestinationWebhookOptions,
MessageEventBusDestinationOptions,
IRunExecutionData,
} from 'n8n-workflow';
import { MessageEventBusDestinationTypeNames, EventMessageTypeNames } from 'n8n-workflow';
import type { IRunExecutionData } from 'n8n-workflow';
import { EventMessageTypeNames } from 'n8n-workflow';
import type { EventMessageNodeOptions } from './EventMessageClasses/EventMessageNode';
import { EventMessageNode } from './EventMessageClasses/EventMessageNode';
import { recoverExecutionDataFromEventLogMessages } from './MessageEventBus/recoverEvents';
import { RestController, Get, Post, Delete, Authorized } from '@/decorators';
import type { MessageEventBusDestination } from './MessageEventBusDestination/MessageEventBusDestination.ee';
import type { DeleteResult } from 'typeorm';
import { AuthenticatedRequest } from '@/requests';
import { RestController, Get, Post, Authorized } from '@/decorators';

// ----------------------------------------
// TypeGuards
// ----------------------------------------

const isWithIdString = (candidate: unknown): candidate is { id: string } => {
const o = candidate as { id: string };
if (!o) return false;
return o.id !== undefined;
};

const isWithQueryString = (candidate: unknown): candidate is { query: string } => {
const o = candidate as { query: string };
if (!o) return false;
return o.query !== undefined;
};

const isMessageEventBusDestinationWebhookOptions = (
candidate: unknown,
): candidate is MessageEventBusDestinationWebhookOptions => {
const o = candidate as MessageEventBusDestinationWebhookOptions;
if (!o) return false;
return o.url !== undefined;
};

const isMessageEventBusDestinationOptions = (
candidate: unknown,
): candidate is MessageEventBusDestinationOptions => {
const o = candidate as MessageEventBusDestinationOptions;
if (!o) return false;
return o.__type !== undefined;
};

// ----------------------------------------
// Controller
// ----------------------------------------
Expand Down Expand Up @@ -158,81 +120,6 @@ export class EventBusController {
return msg;
}

// ----------------------------------------
// Destinations
// ----------------------------------------

@Get('/destination')
async getDestination(req: express.Request): Promise<MessageEventBusDestinationOptions[]> {
if (isWithIdString(req.query)) {
return eventBus.findDestination(req.query.id);
} else {
return eventBus.findDestination();
}
}

@Authorized(['global', 'owner'])
@Post('/destination')
async postDestination(req: AuthenticatedRequest): Promise<any> {
let result: MessageEventBusDestination | undefined;
if (isMessageEventBusDestinationOptions(req.body)) {
switch (req.body.__type) {
case MessageEventBusDestinationTypeNames.sentry:
if (isMessageEventBusDestinationSentryOptions(req.body)) {
result = await eventBus.addDestination(
new MessageEventBusDestinationSentry(eventBus, req.body),
);
}
break;
case MessageEventBusDestinationTypeNames.webhook:
if (isMessageEventBusDestinationWebhookOptions(req.body)) {
result = await eventBus.addDestination(
new MessageEventBusDestinationWebhook(eventBus, req.body),
);
}
break;
case MessageEventBusDestinationTypeNames.syslog:
if (isMessageEventBusDestinationSyslogOptions(req.body)) {
result = await eventBus.addDestination(
new MessageEventBusDestinationSyslog(eventBus, req.body),
);
}
break;
default:
throw new BadRequestError(
`Body is missing ${req.body.__type} options or type ${req.body.__type} is unknown`,
);
}
if (result) {
await result.saveToDb();
return {
...result.serialize(),
eventBusInstance: undefined,
};
}
throw new BadRequestError('There was an error adding the destination');
}
throw new BadRequestError('Body is not configuring MessageEventBusDestinationOptions');
}

@Get('/testmessage')
async sendTestMessage(req: express.Request): Promise<boolean> {
if (isWithIdString(req.query)) {
return eventBus.testDestination(req.query.id);
}
return false;
}

@Authorized(['global', 'owner'])
@Delete('/destination')
async deleteDestination(req: AuthenticatedRequest): Promise<DeleteResult | undefined> {
if (isWithIdString(req.query)) {
return eventBus.removeDestination(req.query.id);
} else {
throw new BadRequestError('Query is missing id');
}
}

// ----------------------------------------
// Utilities
// ----------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import type { RequestHandler } from 'express';
import Container from 'typedi';
import { License } from '../../License';

export function islogStreamingLicensed(): boolean {
return Container.get(License).isLogStreamingEnabled();
}

export const logStreamingLicensedMiddleware: RequestHandler = (req, res, next) => {
if (islogStreamingLicensed()) {
next();
} else {
res.status(403).json({ status: 'error', message: 'Unauthorized' });
}
};
Loading

0 comments on commit 011ee2e

Please sign in to comment.