From 011ee2e04b62e0182e9d6787064dea70654cb4ab Mon Sep 17 00:00:00 2001 From: Michael Auerswald Date: Tue, 5 Sep 2023 13:32:09 +0200 Subject: [PATCH] fix(core): Split event bus controller into community and ee (#7107) --- packages/cli/src/Server.ts | 2 + .../src/eventbus/eventBus.controller.ee.ts | 132 ++++++ .../cli/src/eventbus/eventBus.controller.ts | 119 +----- .../logStreamingEnabled.middleware.ee.ts | 15 + .../cli/test/integration/eventbus.ee.test.ts | 391 ++++++++++++++++++ .../cli/test/integration/eventbus.test.ts | 370 +---------------- .../integration/shared/utils/testServer.ts | 2 + 7 files changed, 566 insertions(+), 465 deletions(-) create mode 100644 packages/cli/src/eventbus/eventBus.controller.ee.ts create mode 100644 packages/cli/src/eventbus/middleware/logStreamingEnabled.middleware.ee.ts create mode 100644 packages/cli/test/integration/eventbus.ee.test.ts diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 7500ef17c0bd7..3d8f8de2aefa1 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -132,6 +132,7 @@ import * as WebhookHelpers from '@/WebhookHelpers'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; import { toHttpNodeParameters } from '@/CurlConverterHelper'; import { EventBusController } from '@/eventbus/eventBus.controller'; +import { EventBusControllerEE } from '@/eventbus/eventBus.controller.ee'; import { isLogStreamingEnabled } from '@/eventbus/MessageEventBus/MessageEventBusHelper'; import { licenseController } from './license/license.controller'; import { Push, setupPushServer, setupPushHandler } from '@/push'; @@ -508,6 +509,7 @@ export class Server extends AbstractServer { const controllers: object[] = [ new EventBusController(), + new EventBusControllerEE(), new AuthController(config, logger, internalHooks, mfaService, userService, postHog), new OwnerController( config, diff --git a/packages/cli/src/eventbus/eventBus.controller.ee.ts b/packages/cli/src/eventbus/eventBus.controller.ee.ts new file mode 100644 index 0000000000000..67077725e737e --- /dev/null +++ b/packages/cli/src/eventbus/eventBus.controller.ee.ts @@ -0,0 +1,132 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import express from 'express'; +import { eventBus } from './MessageEventBus/MessageEventBus'; +import { + isMessageEventBusDestinationSentryOptions, + MessageEventBusDestinationSentry, +} from './MessageEventBusDestination/MessageEventBusDestinationSentry.ee'; +import { + isMessageEventBusDestinationSyslogOptions, + MessageEventBusDestinationSyslog, +} from './MessageEventBusDestination/MessageEventBusDestinationSyslog.ee'; +import { MessageEventBusDestinationWebhook } from './MessageEventBusDestination/MessageEventBusDestinationWebhook.ee'; +import { BadRequestError } from '@/ResponseHelper'; +import type { + MessageEventBusDestinationWebhookOptions, + MessageEventBusDestinationOptions, +} from 'n8n-workflow'; +import { MessageEventBusDestinationTypeNames } from 'n8n-workflow'; +import { RestController, Get, Post, Delete, Authorized } from '@/decorators'; +import type { MessageEventBusDestination } from './MessageEventBusDestination/MessageEventBusDestination.ee'; +import type { DeleteResult } from 'typeorm'; +import { AuthenticatedRequest } from '@/requests'; +import { logStreamingLicensedMiddleware } from './middleware/logStreamingEnabled.middleware.ee'; + +// ---------------------------------------- +// TypeGuards +// ---------------------------------------- + +const isWithIdString = (candidate: unknown): candidate is { id: string } => { + const o = candidate as { id: string }; + if (!o) return false; + return o.id !== undefined; +}; + +const isMessageEventBusDestinationWebhookOptions = ( + candidate: unknown, +): candidate is MessageEventBusDestinationWebhookOptions => { + const o = candidate as MessageEventBusDestinationWebhookOptions; + if (!o) return false; + return o.url !== undefined; +}; + +const isMessageEventBusDestinationOptions = ( + candidate: unknown, +): candidate is MessageEventBusDestinationOptions => { + const o = candidate as MessageEventBusDestinationOptions; + if (!o) return false; + return o.__type !== undefined; +}; + +// ---------------------------------------- +// Controller +// ---------------------------------------- + +@Authorized() +@RestController('/eventbus') +export class EventBusControllerEE { + // ---------------------------------------- + // Destinations + // ---------------------------------------- + + @Get('/destination', { middlewares: [logStreamingLicensedMiddleware] }) + async getDestination(req: express.Request): Promise { + if (isWithIdString(req.query)) { + return eventBus.findDestination(req.query.id); + } else { + return eventBus.findDestination(); + } + } + + @Authorized(['global', 'owner']) + @Post('/destination', { middlewares: [logStreamingLicensedMiddleware] }) + async postDestination(req: AuthenticatedRequest): Promise { + let result: MessageEventBusDestination | undefined; + if (isMessageEventBusDestinationOptions(req.body)) { + switch (req.body.__type) { + case MessageEventBusDestinationTypeNames.sentry: + if (isMessageEventBusDestinationSentryOptions(req.body)) { + result = await eventBus.addDestination( + new MessageEventBusDestinationSentry(eventBus, req.body), + ); + } + break; + case MessageEventBusDestinationTypeNames.webhook: + if (isMessageEventBusDestinationWebhookOptions(req.body)) { + result = await eventBus.addDestination( + new MessageEventBusDestinationWebhook(eventBus, req.body), + ); + } + break; + case MessageEventBusDestinationTypeNames.syslog: + if (isMessageEventBusDestinationSyslogOptions(req.body)) { + result = await eventBus.addDestination( + new MessageEventBusDestinationSyslog(eventBus, req.body), + ); + } + break; + default: + throw new BadRequestError( + `Body is missing ${req.body.__type} options or type ${req.body.__type} is unknown`, + ); + } + if (result) { + await result.saveToDb(); + return { + ...result.serialize(), + eventBusInstance: undefined, + }; + } + throw new BadRequestError('There was an error adding the destination'); + } + throw new BadRequestError('Body is not configuring MessageEventBusDestinationOptions'); + } + + @Get('/testmessage', { middlewares: [logStreamingLicensedMiddleware] }) + async sendTestMessage(req: express.Request): Promise { + if (isWithIdString(req.query)) { + return eventBus.testDestination(req.query.id); + } + return false; + } + + @Authorized(['global', 'owner']) + @Delete('/destination', { middlewares: [logStreamingLicensedMiddleware] }) + async deleteDestination(req: AuthenticatedRequest): Promise { + if (isWithIdString(req.query)) { + return eventBus.removeDestination(req.query.id); + } else { + throw new BadRequestError('Query is missing id'); + } + } +} diff --git a/packages/cli/src/eventbus/eventBus.controller.ts b/packages/cli/src/eventbus/eventBus.controller.ts index 123f9c33ad952..50d2f84525240 100644 --- a/packages/cli/src/eventbus/eventBus.controller.ts +++ b/packages/cli/src/eventbus/eventBus.controller.ts @@ -6,66 +6,28 @@ import type { EventMessageWorkflowOptions } from './EventMessageClasses/EventMes import { EventMessageWorkflow } from './EventMessageClasses/EventMessageWorkflow'; import type { EventMessageReturnMode } from './MessageEventBus/MessageEventBus'; import { eventBus } from './MessageEventBus/MessageEventBus'; -import { - isMessageEventBusDestinationSentryOptions, - MessageEventBusDestinationSentry, -} from './MessageEventBusDestination/MessageEventBusDestinationSentry.ee'; -import { - isMessageEventBusDestinationSyslogOptions, - MessageEventBusDestinationSyslog, -} from './MessageEventBusDestination/MessageEventBusDestinationSyslog.ee'; -import { MessageEventBusDestinationWebhook } from './MessageEventBusDestination/MessageEventBusDestinationWebhook.ee'; import type { EventMessageTypes, FailedEventSummary } from './EventMessageClasses'; import { eventNamesAll } from './EventMessageClasses'; import type { EventMessageAuditOptions } from './EventMessageClasses/EventMessageAudit'; import { EventMessageAudit } from './EventMessageClasses/EventMessageAudit'; import { BadRequestError } from '@/ResponseHelper'; -import type { - MessageEventBusDestinationWebhookOptions, - MessageEventBusDestinationOptions, - IRunExecutionData, -} from 'n8n-workflow'; -import { MessageEventBusDestinationTypeNames, EventMessageTypeNames } from 'n8n-workflow'; +import type { IRunExecutionData } from 'n8n-workflow'; +import { EventMessageTypeNames } from 'n8n-workflow'; import type { EventMessageNodeOptions } from './EventMessageClasses/EventMessageNode'; import { EventMessageNode } from './EventMessageClasses/EventMessageNode'; import { recoverExecutionDataFromEventLogMessages } from './MessageEventBus/recoverEvents'; -import { RestController, Get, Post, Delete, Authorized } from '@/decorators'; -import type { MessageEventBusDestination } from './MessageEventBusDestination/MessageEventBusDestination.ee'; -import type { DeleteResult } from 'typeorm'; -import { AuthenticatedRequest } from '@/requests'; +import { RestController, Get, Post, Authorized } from '@/decorators'; // ---------------------------------------- // TypeGuards // ---------------------------------------- -const isWithIdString = (candidate: unknown): candidate is { id: string } => { - const o = candidate as { id: string }; - if (!o) return false; - return o.id !== undefined; -}; - const isWithQueryString = (candidate: unknown): candidate is { query: string } => { const o = candidate as { query: string }; if (!o) return false; return o.query !== undefined; }; -const isMessageEventBusDestinationWebhookOptions = ( - candidate: unknown, -): candidate is MessageEventBusDestinationWebhookOptions => { - const o = candidate as MessageEventBusDestinationWebhookOptions; - if (!o) return false; - return o.url !== undefined; -}; - -const isMessageEventBusDestinationOptions = ( - candidate: unknown, -): candidate is MessageEventBusDestinationOptions => { - const o = candidate as MessageEventBusDestinationOptions; - if (!o) return false; - return o.__type !== undefined; -}; - // ---------------------------------------- // Controller // ---------------------------------------- @@ -158,81 +120,6 @@ export class EventBusController { return msg; } - // ---------------------------------------- - // Destinations - // ---------------------------------------- - - @Get('/destination') - async getDestination(req: express.Request): Promise { - if (isWithIdString(req.query)) { - return eventBus.findDestination(req.query.id); - } else { - return eventBus.findDestination(); - } - } - - @Authorized(['global', 'owner']) - @Post('/destination') - async postDestination(req: AuthenticatedRequest): Promise { - let result: MessageEventBusDestination | undefined; - if (isMessageEventBusDestinationOptions(req.body)) { - switch (req.body.__type) { - case MessageEventBusDestinationTypeNames.sentry: - if (isMessageEventBusDestinationSentryOptions(req.body)) { - result = await eventBus.addDestination( - new MessageEventBusDestinationSentry(eventBus, req.body), - ); - } - break; - case MessageEventBusDestinationTypeNames.webhook: - if (isMessageEventBusDestinationWebhookOptions(req.body)) { - result = await eventBus.addDestination( - new MessageEventBusDestinationWebhook(eventBus, req.body), - ); - } - break; - case MessageEventBusDestinationTypeNames.syslog: - if (isMessageEventBusDestinationSyslogOptions(req.body)) { - result = await eventBus.addDestination( - new MessageEventBusDestinationSyslog(eventBus, req.body), - ); - } - break; - default: - throw new BadRequestError( - `Body is missing ${req.body.__type} options or type ${req.body.__type} is unknown`, - ); - } - if (result) { - await result.saveToDb(); - return { - ...result.serialize(), - eventBusInstance: undefined, - }; - } - throw new BadRequestError('There was an error adding the destination'); - } - throw new BadRequestError('Body is not configuring MessageEventBusDestinationOptions'); - } - - @Get('/testmessage') - async sendTestMessage(req: express.Request): Promise { - if (isWithIdString(req.query)) { - return eventBus.testDestination(req.query.id); - } - return false; - } - - @Authorized(['global', 'owner']) - @Delete('/destination') - async deleteDestination(req: AuthenticatedRequest): Promise { - if (isWithIdString(req.query)) { - return eventBus.removeDestination(req.query.id); - } else { - throw new BadRequestError('Query is missing id'); - } - } - // ---------------------------------------- // Utilities // ---------------------------------------- diff --git a/packages/cli/src/eventbus/middleware/logStreamingEnabled.middleware.ee.ts b/packages/cli/src/eventbus/middleware/logStreamingEnabled.middleware.ee.ts new file mode 100644 index 0000000000000..621fe3a5b4903 --- /dev/null +++ b/packages/cli/src/eventbus/middleware/logStreamingEnabled.middleware.ee.ts @@ -0,0 +1,15 @@ +import type { RequestHandler } from 'express'; +import Container from 'typedi'; +import { License } from '../../License'; + +export function islogStreamingLicensed(): boolean { + return Container.get(License).isLogStreamingEnabled(); +} + +export const logStreamingLicensedMiddleware: RequestHandler = (req, res, next) => { + if (islogStreamingLicensed()) { + next(); + } else { + res.status(403).json({ status: 'error', message: 'Unauthorized' }); + } +}; diff --git a/packages/cli/test/integration/eventbus.ee.test.ts b/packages/cli/test/integration/eventbus.ee.test.ts new file mode 100644 index 0000000000000..e06e10f6c0a5d --- /dev/null +++ b/packages/cli/test/integration/eventbus.ee.test.ts @@ -0,0 +1,391 @@ +import config from '@/config'; +import axios from 'axios'; +import syslog from 'syslog-client'; +import { v4 as uuid } from 'uuid'; +import type { SuperAgentTest } from 'supertest'; +import * as utils from './shared/utils'; +import * as testDb from './shared/testDb'; +import type { Role } from '@db/entities/Role'; +import type { User } from '@db/entities/User'; +import type { + MessageEventBusDestinationSentryOptions, + MessageEventBusDestinationSyslogOptions, + MessageEventBusDestinationWebhookOptions, +} from 'n8n-workflow'; +import { + defaultMessageEventBusDestinationSentryOptions, + defaultMessageEventBusDestinationSyslogOptions, + defaultMessageEventBusDestinationWebhookOptions, +} from 'n8n-workflow'; +import { eventBus } from '@/eventbus'; +import { EventMessageGeneric } from '@/eventbus/EventMessageClasses/EventMessageGeneric'; +import type { MessageEventBusDestinationSyslog } from '@/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee'; +import type { MessageEventBusDestinationWebhook } from '@/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee'; +import type { MessageEventBusDestinationSentry } from '@/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee'; +import { EventMessageAudit } from '@/eventbus/EventMessageClasses/EventMessageAudit'; +import type { EventNamesTypes } from '@/eventbus/EventMessageClasses'; + +jest.unmock('@/eventbus/MessageEventBus/MessageEventBus'); +jest.mock('axios'); +const mockedAxios = axios as jest.Mocked; +jest.mock('syslog-client'); +const mockedSyslog = syslog as jest.Mocked; + +let globalOwnerRole: Role; +let owner: User; +let authOwnerAgent: SuperAgentTest; + +const testSyslogDestination: MessageEventBusDestinationSyslogOptions = { + ...defaultMessageEventBusDestinationSyslogOptions, + id: 'b88038f4-0a89-4e94-89a9-658dfdb74539', + protocol: 'udp', + label: 'Test Syslog', + enabled: false, + subscribedEvents: ['n8n.test.message', 'n8n.audit.user.updated'], +}; + +const testWebhookDestination: MessageEventBusDestinationWebhookOptions = { + ...defaultMessageEventBusDestinationWebhookOptions, + id: '88be6560-bfb4-455c-8aa1-06971e9e5522', + url: 'http://localhost:3456', + method: 'POST', + label: 'Test Webhook', + enabled: false, + subscribedEvents: ['n8n.test.message', 'n8n.audit.user.updated'], +}; + +const testSentryDestination: MessageEventBusDestinationSentryOptions = { + ...defaultMessageEventBusDestinationSentryOptions, + id: '450ca04b-87dd-4837-a052-ab3a347a00e9', + dsn: 'http://localhost:3000', + label: 'Test Sentry', + enabled: false, + subscribedEvents: ['n8n.test.message', 'n8n.audit.user.updated'], +}; + +async function confirmIdInAll(id: string) { + const sent = await eventBus.getEventsAll(); + expect(sent.length).toBeGreaterThan(0); + expect(sent.find((msg) => msg.id === id)).toBeTruthy(); +} + +async function confirmIdSent(id: string) { + const sent = await eventBus.getEventsSent(); + expect(sent.length).toBeGreaterThan(0); + expect(sent.find((msg) => msg.id === id)).toBeTruthy(); +} + +const testServer = utils.setupTestServer({ + endpointGroups: ['eventBus'], + enabledFeatures: ['feat:logStreaming'], +}); + +beforeAll(async () => { + globalOwnerRole = await testDb.getGlobalOwnerRole(); + owner = await testDb.createUser({ globalRole: globalOwnerRole }); + authOwnerAgent = testServer.authAgentFor(owner); + + mockedSyslog.createClient.mockImplementation(() => new syslog.Client()); + + await utils.initEncryptionKey(); + config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter'); + config.set('eventBus.logWriter.keepLogCount', 1); + + await eventBus.initialize(); +}); + +afterAll(async () => { + jest.mock('@/eventbus/MessageEventBus/MessageEventBus'); + await eventBus.close(); +}); + +test('should have a running logwriter process', () => { + const thread = eventBus.logWriter.worker; + expect(thread).toBeDefined(); +}); + +test('should have logwriter log messages', async () => { + const testMessage = new EventMessageGeneric({ + eventName: 'n8n.test.message' as EventNamesTypes, + id: uuid(), + }); + await eventBus.send(testMessage); + await new Promise((resolve) => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + eventBus.logWriter.worker?.once('message', async (msg: { command: string; data: any }) => { + expect(msg.command).toBe('appendMessageToLog'); + expect(msg.data).toBe(true); + await confirmIdInAll(testMessage.id); + resolve(true); + }); + }); +}); + +describe('GET /eventbus/destination', () => { + test('should fail due to missing authentication', async () => { + const response = await testServer.authlessAgent.get('/eventbus/destination'); + expect(response.statusCode).toBe(401); + }); + + test('all returned destinations should exist in eventbus', async () => { + const response = await authOwnerAgent.get('/eventbus/destination'); + expect(response.statusCode).toBe(200); + + const data = response.body.data; + expect(data).toBeTruthy(); + expect(Array.isArray(data)).toBeTruthy(); + + for (let index = 0; index < data.length; index++) { + const destination = data[index]; + const foundDestinations = await eventBus.findDestination(destination.id); + expect(Array.isArray(foundDestinations)).toBeTruthy(); + expect(foundDestinations.length).toBe(1); + expect(foundDestinations[0].label).toBe(destination.label); + } + }); +}); + +describe('POST /eventbus/destination', () => { + test('create syslog destination', async () => { + const response = await authOwnerAgent.post('/eventbus/destination').send(testSyslogDestination); + expect(response.statusCode).toBe(200); + }); + + test('create sentry destination', async () => { + const response = await authOwnerAgent.post('/eventbus/destination').send(testSentryDestination); + expect(response.statusCode).toBe(200); + }); + + test('create webhook destination', async () => { + const response = await authOwnerAgent + .post('/eventbus/destination') + .send(testWebhookDestination); + expect(response.statusCode).toBe(200); + }); +}); + +// this test (presumably the mocking) is causing the test suite to randomly fail +// eslint-disable-next-line n8n-local-rules/no-skipped-tests +test.skip('should send message to syslog', async () => { + const testMessage = new EventMessageGeneric({ + eventName: 'n8n.test.message' as EventNamesTypes, + id: uuid(), + }); + + const syslogDestination = eventBus.destinations[ + testSyslogDestination.id! + ] as MessageEventBusDestinationSyslog; + + syslogDestination.enable(); + + const mockedSyslogClientLog = jest.spyOn(syslogDestination.client, 'log'); + mockedSyslogClientLog.mockImplementation((_m, _options, _cb) => { + eventBus.confirmSent(testMessage, { + id: syslogDestination.id, + name: syslogDestination.label, + }); + return syslogDestination.client; + }); + + await eventBus.send(testMessage); + await new Promise((resolve) => { + eventBus.logWriter.worker?.on( + 'message', + async function handler001(msg: { command: string; data: any }) { + if (msg.command === 'appendMessageToLog') { + await confirmIdInAll(testMessage.id); + } else if (msg.command === 'confirmMessageSent') { + await confirmIdSent(testMessage.id); + expect(mockedSyslogClientLog).toHaveBeenCalled(); + syslogDestination.disable(); + eventBus.logWriter.worker?.removeListener('message', handler001); + resolve(true); + } + }, + ); + }); +}); + +// eslint-disable-next-line n8n-local-rules/no-skipped-tests +test.skip('should confirm send message if there are no subscribers', async () => { + const testMessageUnsubscribed = new EventMessageGeneric({ + eventName: 'n8n.test.unsub' as EventNamesTypes, + id: uuid(), + }); + + const syslogDestination = eventBus.destinations[ + testSyslogDestination.id! + ] as MessageEventBusDestinationSyslog; + + syslogDestination.enable(); + + await eventBus.send(testMessageUnsubscribed); + + await new Promise((resolve) => { + eventBus.logWriter.worker?.on( + 'message', + async function handler002(msg: { command: string; data: any }) { + if (msg.command === 'appendMessageToLog') { + await confirmIdInAll(testMessageUnsubscribed.id); + } else if (msg.command === 'confirmMessageSent') { + await confirmIdSent(testMessageUnsubscribed.id); + syslogDestination.disable(); + eventBus.logWriter.worker?.removeListener('message', handler002); + resolve(true); + } + }, + ); + }); +}); + +test('should anonymize audit message to syslog ', async () => { + const testAuditMessage = new EventMessageAudit({ + eventName: 'n8n.audit.user.updated', + payload: { + _secret: 'secret', + public: 'public', + }, + id: uuid(), + }); + + const syslogDestination = eventBus.destinations[ + testSyslogDestination.id! + ] as MessageEventBusDestinationSyslog; + + syslogDestination.enable(); + + const mockedSyslogClientLog = jest.spyOn(syslogDestination.client, 'log'); + mockedSyslogClientLog.mockImplementation((m, _options, _cb) => { + const o = JSON.parse(m); + expect(o).toHaveProperty('payload'); + expect(o.payload).toHaveProperty('_secret'); + syslogDestination.anonymizeAuditMessages + ? expect(o.payload._secret).toBe('*') + : expect(o.payload._secret).toBe('secret'); + expect(o.payload).toHaveProperty('public'); + expect(o.payload.public).toBe('public'); + return syslogDestination.client; + }); + + syslogDestination.anonymizeAuditMessages = true; + await eventBus.send(testAuditMessage); + await new Promise((resolve) => { + eventBus.logWriter.worker?.on( + 'message', + async function handler005(msg: { command: string; data: any }) { + if (msg.command === 'appendMessageToLog') { + const sent = await eventBus.getEventsAll(); + await confirmIdInAll(testAuditMessage.id); + expect(mockedSyslogClientLog).toHaveBeenCalled(); + eventBus.logWriter.worker?.removeListener('message', handler005); + resolve(true); + } + }, + ); + }); + + syslogDestination.anonymizeAuditMessages = false; + await eventBus.send(testAuditMessage); + await new Promise((resolve) => { + eventBus.logWriter.worker?.on( + 'message', + async function handler006(msg: { command: string; data: any }) { + if (msg.command === 'appendMessageToLog') { + const sent = await eventBus.getEventsAll(); + await confirmIdInAll(testAuditMessage.id); + expect(mockedSyslogClientLog).toHaveBeenCalled(); + syslogDestination.disable(); + eventBus.logWriter.worker?.removeListener('message', handler006); + resolve(true); + } + }, + ); + }); +}); + +test('should send message to webhook ', async () => { + const testMessage = new EventMessageGeneric({ + eventName: 'n8n.test.message' as EventNamesTypes, + id: uuid(), + }); + + const webhookDestination = eventBus.destinations[ + testWebhookDestination.id! + ] as MessageEventBusDestinationWebhook; + + webhookDestination.enable(); + + mockedAxios.post.mockResolvedValue({ status: 200, data: { msg: 'OK' } }); + mockedAxios.request.mockResolvedValue({ status: 200, data: { msg: 'OK' } }); + + await eventBus.send(testMessage); + await new Promise((resolve) => { + eventBus.logWriter.worker?.on( + 'message', + async function handler003(msg: { command: string; data: any }) { + if (msg.command === 'appendMessageToLog') { + await confirmIdInAll(testMessage.id); + } else if (msg.command === 'confirmMessageSent') { + await confirmIdSent(testMessage.id); + expect(mockedAxios.request).toHaveBeenCalled(); + webhookDestination.disable(); + eventBus.logWriter.worker?.removeListener('message', handler003); + resolve(true); + } + }, + ); + }); +}); + +test('should send message to sentry ', async () => { + const testMessage = new EventMessageGeneric({ + eventName: 'n8n.test.message' as EventNamesTypes, + id: uuid(), + }); + + const sentryDestination = eventBus.destinations[ + testSentryDestination.id! + ] as MessageEventBusDestinationSentry; + + sentryDestination.enable(); + + const mockedSentryCaptureMessage = jest.spyOn(sentryDestination.sentryClient!, 'captureMessage'); + mockedSentryCaptureMessage.mockImplementation((_m, _level, _hint, _scope) => { + eventBus.confirmSent(testMessage, { + id: sentryDestination.id, + name: sentryDestination.label, + }); + return testMessage.id; + }); + + await eventBus.send(testMessage); + await new Promise((resolve) => { + eventBus.logWriter.worker?.on( + 'message', + async function handler004(msg: { command: string; data: any }) { + if (msg.command === 'appendMessageToLog') { + await confirmIdInAll(testMessage.id); + } else if (msg.command === 'confirmMessageSent') { + await confirmIdSent(testMessage.id); + expect(mockedSentryCaptureMessage).toHaveBeenCalled(); + sentryDestination.disable(); + eventBus.logWriter.worker?.removeListener('message', handler004); + resolve(true); + } + }, + ); + }); +}); + +test('DELETE /eventbus/destination delete all destinations by id', async () => { + const existingDestinationIds = [...Object.keys(eventBus.destinations)]; + + await Promise.all( + existingDestinationIds.map(async (id) => { + const response = await authOwnerAgent.del('/eventbus/destination').query({ id }); + expect(response.statusCode).toBe(200); + }), + ); + + expect(Object.keys(eventBus.destinations).length).toBe(0); +}); diff --git a/packages/cli/test/integration/eventbus.test.ts b/packages/cli/test/integration/eventbus.test.ts index 8f76fa146c45a..c6512152684f9 100644 --- a/packages/cli/test/integration/eventbus.test.ts +++ b/packages/cli/test/integration/eventbus.test.ts @@ -1,123 +1,28 @@ -import config from '@/config'; -import axios from 'axios'; -import syslog from 'syslog-client'; -import { v4 as uuid } from 'uuid'; import type { SuperAgentTest } from 'supertest'; import * as utils from './shared/utils/'; import * as testDb from './shared/testDb'; import type { Role } from '@db/entities/Role'; import type { User } from '@db/entities/User'; -import type { - MessageEventBusDestinationSentryOptions, - MessageEventBusDestinationSyslogOptions, - MessageEventBusDestinationWebhookOptions, -} from 'n8n-workflow'; -import { - defaultMessageEventBusDestinationSentryOptions, - defaultMessageEventBusDestinationSyslogOptions, - defaultMessageEventBusDestinationWebhookOptions, -} from 'n8n-workflow'; -import { eventBus } from '@/eventbus'; -import { EventMessageGeneric } from '@/eventbus/EventMessageClasses/EventMessageGeneric'; -import type { MessageEventBusDestinationSyslog } from '@/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee'; -import type { MessageEventBusDestinationWebhook } from '@/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee'; -import type { MessageEventBusDestinationSentry } from '@/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee'; -import { EventMessageAudit } from '@/eventbus/EventMessageClasses/EventMessageAudit'; -import type { EventNamesTypes } from '@/eventbus/EventMessageClasses'; -jest.unmock('@/eventbus/MessageEventBus/MessageEventBus'); -jest.mock('axios'); -const mockedAxios = axios as jest.Mocked; -jest.mock('syslog-client'); -const mockedSyslog = syslog as jest.Mocked; +/** + * NOTE: due to issues with mocking the MessageEventBus in multiple tests running in parallel, + * the event bus tests are run in the eventbus.ee.test.ts file + * The tests in this file are only checking endpoint permissions. + */ let globalOwnerRole: Role; let owner: User; let authOwnerAgent: SuperAgentTest; -const testSyslogDestination: MessageEventBusDestinationSyslogOptions = { - ...defaultMessageEventBusDestinationSyslogOptions, - id: 'b88038f4-0a89-4e94-89a9-658dfdb74539', - protocol: 'udp', - label: 'Test Syslog', - enabled: false, - subscribedEvents: ['n8n.test.message', 'n8n.audit.user.updated'], -}; - -const testWebhookDestination: MessageEventBusDestinationWebhookOptions = { - ...defaultMessageEventBusDestinationWebhookOptions, - id: '88be6560-bfb4-455c-8aa1-06971e9e5522', - url: 'http://localhost:3456', - method: 'POST', - label: 'Test Webhook', - enabled: false, - subscribedEvents: ['n8n.test.message', 'n8n.audit.user.updated'], -}; - -const testSentryDestination: MessageEventBusDestinationSentryOptions = { - ...defaultMessageEventBusDestinationSentryOptions, - id: '450ca04b-87dd-4837-a052-ab3a347a00e9', - dsn: 'http://localhost:3000', - label: 'Test Sentry', - enabled: false, - subscribedEvents: ['n8n.test.message', 'n8n.audit.user.updated'], -}; - -async function confirmIdInAll(id: string) { - const sent = await eventBus.getEventsAll(); - expect(sent.length).toBeGreaterThan(0); - expect(sent.find((msg) => msg.id === id)).toBeTruthy(); -} - -async function confirmIdSent(id: string) { - const sent = await eventBus.getEventsSent(); - expect(sent.length).toBeGreaterThan(0); - expect(sent.find((msg) => msg.id === id)).toBeTruthy(); -} - const testServer = utils.setupTestServer({ endpointGroups: ['eventBus'], - enabledFeatures: ['feat:logStreaming'], + enabledFeatures: [], // do not enable logstreaming }); beforeAll(async () => { globalOwnerRole = await testDb.getGlobalOwnerRole(); owner = await testDb.createUser({ globalRole: globalOwnerRole }); authOwnerAgent = testServer.authAgentFor(owner); - - mockedSyslog.createClient.mockImplementation(() => new syslog.Client()); - - await utils.initEncryptionKey(); - config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter'); - config.set('eventBus.logWriter.keepLogCount', 1); - - await eventBus.initialize(); -}); - -afterAll(async () => { - jest.mock('@/eventbus/MessageEventBus/MessageEventBus'); - await eventBus.close(); -}); - -test('should have a running logwriter process', () => { - const thread = eventBus.logWriter.worker; - expect(thread).toBeDefined(); -}); - -test('should have logwriter log messages', async () => { - const testMessage = new EventMessageGeneric({ - eventName: 'n8n.test.message' as EventNamesTypes, - id: uuid(), - }); - await eventBus.send(testMessage); - await new Promise((resolve) => { - eventBus.logWriter.worker?.once('message', async (msg: { command: string; data: any }) => { - expect(msg.command).toBe('appendMessageToLog'); - expect(msg.data).toBe(true); - await confirmIdInAll(testMessage.id); - resolve(true); - }); - }); }); describe('GET /eventbus/destination', () => { @@ -126,265 +31,32 @@ describe('GET /eventbus/destination', () => { expect(response.statusCode).toBe(401); }); - test('all returned destinations should exist in eventbus', async () => { + test('should fail due to missing license when authenticated', async () => { const response = await authOwnerAgent.get('/eventbus/destination'); - expect(response.statusCode).toBe(200); - - const data = response.body.data; - expect(data).toBeTruthy(); - expect(Array.isArray(data)).toBeTruthy(); - - for (let index = 0; index < data.length; index++) { - const destination = data[index]; - const foundDestinations = await eventBus.findDestination(destination.id); - expect(Array.isArray(foundDestinations)).toBeTruthy(); - expect(foundDestinations.length).toBe(1); - expect(foundDestinations[0].label).toBe(destination.label); - } + expect(response.statusCode).toBe(403); }); }); describe('POST /eventbus/destination', () => { - test('create syslog destination', async () => { - const response = await authOwnerAgent.post('/eventbus/destination').send(testSyslogDestination); - expect(response.statusCode).toBe(200); - }); - - test('create sentry destination', async () => { - const response = await authOwnerAgent.post('/eventbus/destination').send(testSentryDestination); - expect(response.statusCode).toBe(200); - }); - - test('create webhook destination', async () => { - const response = await authOwnerAgent - .post('/eventbus/destination') - .send(testWebhookDestination); - expect(response.statusCode).toBe(200); - }); -}); - -// this test (presumably the mocking) is causing the test suite to randomly fail -// eslint-disable-next-line n8n-local-rules/no-skipped-tests -test.skip('should send message to syslog', async () => { - const testMessage = new EventMessageGeneric({ - eventName: 'n8n.test.message' as EventNamesTypes, - id: uuid(), - }); - - const syslogDestination = eventBus.destinations[ - testSyslogDestination.id! - ] as MessageEventBusDestinationSyslog; - - syslogDestination.enable(); - - const mockedSyslogClientLog = jest.spyOn(syslogDestination.client, 'log'); - mockedSyslogClientLog.mockImplementation((_m, _options, _cb) => { - eventBus.confirmSent(testMessage, { - id: syslogDestination.id, - name: syslogDestination.label, - }); - return syslogDestination.client; - }); - - await eventBus.send(testMessage); - await new Promise((resolve) => { - eventBus.logWriter.worker?.on( - 'message', - async function handler001(msg: { command: string; data: any }) { - if (msg.command === 'appendMessageToLog') { - await confirmIdInAll(testMessage.id); - } else if (msg.command === 'confirmMessageSent') { - await confirmIdSent(testMessage.id); - expect(mockedSyslogClientLog).toHaveBeenCalled(); - syslogDestination.disable(); - eventBus.logWriter.worker?.removeListener('message', handler001); - resolve(true); - } - }, - ); - }); -}); - -// eslint-disable-next-line n8n-local-rules/no-skipped-tests -test.skip('should confirm send message if there are no subscribers', async () => { - const testMessageUnsubscribed = new EventMessageGeneric({ - eventName: 'n8n.test.unsub' as EventNamesTypes, - id: uuid(), - }); - - const syslogDestination = eventBus.destinations[ - testSyslogDestination.id! - ] as MessageEventBusDestinationSyslog; - - syslogDestination.enable(); - - await eventBus.send(testMessageUnsubscribed); - - await new Promise((resolve) => { - eventBus.logWriter.worker?.on( - 'message', - async function handler002(msg: { command: string; data: any }) { - if (msg.command === 'appendMessageToLog') { - await confirmIdInAll(testMessageUnsubscribed.id); - } else if (msg.command === 'confirmMessageSent') { - await confirmIdSent(testMessageUnsubscribed.id); - syslogDestination.disable(); - eventBus.logWriter.worker?.removeListener('message', handler002); - resolve(true); - } - }, - ); - }); -}); - -test('should anonymize audit message to syslog ', async () => { - const testAuditMessage = new EventMessageAudit({ - eventName: 'n8n.audit.user.updated', - payload: { - _secret: 'secret', - public: 'public', - }, - id: uuid(), - }); - - const syslogDestination = eventBus.destinations[ - testSyslogDestination.id! - ] as MessageEventBusDestinationSyslog; - - syslogDestination.enable(); - - const mockedSyslogClientLog = jest.spyOn(syslogDestination.client, 'log'); - mockedSyslogClientLog.mockImplementation((m, _options, _cb) => { - const o = JSON.parse(m); - expect(o).toHaveProperty('payload'); - expect(o.payload).toHaveProperty('_secret'); - syslogDestination.anonymizeAuditMessages - ? expect(o.payload._secret).toBe('*') - : expect(o.payload._secret).toBe('secret'); - expect(o.payload).toHaveProperty('public'); - expect(o.payload.public).toBe('public'); - return syslogDestination.client; - }); - - syslogDestination.anonymizeAuditMessages = true; - await eventBus.send(testAuditMessage); - await new Promise((resolve) => { - eventBus.logWriter.worker?.on( - 'message', - async function handler005(msg: { command: string; data: any }) { - if (msg.command === 'appendMessageToLog') { - const sent = await eventBus.getEventsAll(); - await confirmIdInAll(testAuditMessage.id); - expect(mockedSyslogClientLog).toHaveBeenCalled(); - eventBus.logWriter.worker?.removeListener('message', handler005); - resolve(true); - } - }, - ); - }); - - syslogDestination.anonymizeAuditMessages = false; - await eventBus.send(testAuditMessage); - await new Promise((resolve) => { - eventBus.logWriter.worker?.on( - 'message', - async function handler006(msg: { command: string; data: any }) { - if (msg.command === 'appendMessageToLog') { - const sent = await eventBus.getEventsAll(); - await confirmIdInAll(testAuditMessage.id); - expect(mockedSyslogClientLog).toHaveBeenCalled(); - syslogDestination.disable(); - eventBus.logWriter.worker?.removeListener('message', handler006); - resolve(true); - } - }, - ); - }); -}); - -test('should send message to webhook ', async () => { - const testMessage = new EventMessageGeneric({ - eventName: 'n8n.test.message' as EventNamesTypes, - id: uuid(), + test('should fail due to missing authentication', async () => { + const response = await testServer.authlessAgent.post('/eventbus/destination'); + expect(response.statusCode).toBe(401); }); - const webhookDestination = eventBus.destinations[ - testWebhookDestination.id! - ] as MessageEventBusDestinationWebhook; - - webhookDestination.enable(); - - mockedAxios.post.mockResolvedValue({ status: 200, data: { msg: 'OK' } }); - mockedAxios.request.mockResolvedValue({ status: 200, data: { msg: 'OK' } }); - - await eventBus.send(testMessage); - await new Promise((resolve) => { - eventBus.logWriter.worker?.on( - 'message', - async function handler003(msg: { command: string; data: any }) { - if (msg.command === 'appendMessageToLog') { - await confirmIdInAll(testMessage.id); - } else if (msg.command === 'confirmMessageSent') { - await confirmIdSent(testMessage.id); - expect(mockedAxios.request).toHaveBeenCalled(); - webhookDestination.disable(); - eventBus.logWriter.worker?.removeListener('message', handler003); - resolve(true); - } - }, - ); + test('should fail due to missing license when authenticated', async () => { + const response = await authOwnerAgent.post('/eventbus/destination'); + expect(response.statusCode).toBe(403); }); }); -test('should send message to sentry ', async () => { - const testMessage = new EventMessageGeneric({ - eventName: 'n8n.test.message' as EventNamesTypes, - id: uuid(), - }); - - const sentryDestination = eventBus.destinations[ - testSentryDestination.id! - ] as MessageEventBusDestinationSentry; - - sentryDestination.enable(); - - const mockedSentryCaptureMessage = jest.spyOn(sentryDestination.sentryClient!, 'captureMessage'); - mockedSentryCaptureMessage.mockImplementation((_m, _level, _hint, _scope) => { - eventBus.confirmSent(testMessage, { - id: sentryDestination.id, - name: sentryDestination.label, - }); - return testMessage.id; +describe('DELETE /eventbus/destination', () => { + test('should fail due to missing authentication', async () => { + const response = await testServer.authlessAgent.del('/eventbus/destination'); + expect(response.statusCode).toBe(401); }); - await eventBus.send(testMessage); - await new Promise((resolve) => { - eventBus.logWriter.worker?.on( - 'message', - async function handler004(msg: { command: string; data: any }) { - if (msg.command === 'appendMessageToLog') { - await confirmIdInAll(testMessage.id); - } else if (msg.command === 'confirmMessageSent') { - await confirmIdSent(testMessage.id); - expect(mockedSentryCaptureMessage).toHaveBeenCalled(); - sentryDestination.disable(); - eventBus.logWriter.worker?.removeListener('message', handler004); - resolve(true); - } - }, - ); + test('should fail due to missing license when authenticated', async () => { + const response = await authOwnerAgent.del('/eventbus/destination'); + expect(response.statusCode).toBe(403); }); }); - -test('DELETE /eventbus/destination delete all destinations by id', async () => { - const existingDestinationIds = [...Object.keys(eventBus.destinations)]; - - await Promise.all( - existingDestinationIds.map(async (id) => { - const response = await authOwnerAgent.del('/eventbus/destination').query({ id }); - expect(response.statusCode).toBe(200); - }), - ); - - expect(Object.keys(eventBus.destinations).length).toBe(0); -}); diff --git a/packages/cli/test/integration/shared/utils/testServer.ts b/packages/cli/test/integration/shared/utils/testServer.ts index b00273eb66958..2ea593305b2af 100644 --- a/packages/cli/test/integration/shared/utils/testServer.ts +++ b/packages/cli/test/integration/shared/utils/testServer.ts @@ -43,6 +43,7 @@ import { Push } from '@/push'; import { setSamlLoginEnabled } from '@/sso/saml/samlHelpers'; import { SamlController } from '@/sso/saml/routes/saml.controller.ee'; import { EventBusController } from '@/eventbus/eventBus.controller'; +import { EventBusControllerEE } from '@/eventbus/eventBus.controller.ee'; import { License } from '@/License'; import { SourceControlController } from '@/environments/sourceControl/sourceControl.controller.ee'; @@ -206,6 +207,7 @@ export const setupTestServer = ({ break; case 'eventBus': registerController(app, config, new EventBusController()); + registerController(app, config, new EventBusControllerEE()); break; case 'auth': registerController(