diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index 95e8825e1a840..9342a97cda69c 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -360,11 +360,10 @@ export async function executeWebhook( NodeExecuteFunctions, executionMode, ); - Container.get(WorkflowStatisticsService).emit( - 'nodeFetchedData', - workflow.id, - workflowStartNode, - ); + Container.get(WorkflowStatisticsService).emit('nodeFetchedData', { + workflowId: workflow.id, + node: workflowStartNode, + }); } catch (err) { // Send error response to webhook caller const errorMessage = 'Workflow Webhook Error: Workflow could not be started!'; diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index e5a17de7ec26d..754cfea693c3d 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -525,17 +525,16 @@ function hookFunctionsSave(): IWorkflowExecuteHooks { ); } } finally { - workflowStatisticsService.emit( - 'workflowExecutionCompleted', - this.workflowData, + workflowStatisticsService.emit('workflowExecutionCompleted', { + workflowData: this.workflowData, fullRunData, - ); + }); } }, ], nodeFetchedData: [ async (workflowId: string, node: INode) => { - workflowStatisticsService.emit('nodeFetchedData', workflowId, node); + workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); }, ], }; @@ -636,11 +635,10 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { this.retryOf, ); } finally { - workflowStatisticsService.emit( - 'workflowExecutionCompleted', - this.workflowData, + workflowStatisticsService.emit('workflowExecutionCompleted', { + workflowData: this.workflowData, fullRunData, - ); + }); } }, async function (this: WorkflowHooks, runData: IRun): Promise { @@ -676,7 +674,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { ], nodeFetchedData: [ async (workflowId: string, node: INode) => { - workflowStatisticsService.emit('nodeFetchedData', workflowId, node); + workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); }, ], }; diff --git a/packages/cli/src/eventbus/event.service.ts b/packages/cli/src/eventbus/event.service.ts index 2df51af22cad1..2b16ff06ab40c 100644 --- a/packages/cli/src/eventbus/event.service.ts +++ b/packages/cli/src/eventbus/event.service.ts @@ -1,16 +1,6 @@ -import { EventEmitter } from 'node:events'; import { Service } from 'typedi'; +import { TypedEmitter } from '@/TypedEmitter'; import type { Event } from './event.types'; @Service() -export class EventService extends EventEmitter { - emit(eventName: K, arg?: Event[K]) { - super.emit(eventName, arg); - return true; - } - - on(eventName: K, handler: (arg: Event[K]) => void) { - super.on(eventName, handler); - return this; - } -} +export class EventService extends TypedEmitter {} diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index 647ab40acf0c5..12f7de9ac4931 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -1,4 +1,3 @@ -import { EventEmitter } from 'events'; import { ServerResponse } from 'http'; import type { Server } from 'http'; import type { Socket } from 'net'; @@ -17,6 +16,7 @@ import { OrchestrationService } from '@/services/orchestration.service'; import { SSEPush } from './sse.push'; import { WebSocketPush } from './websocket.push'; import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types'; +import { TypedEmitter } from '@/TypedEmitter'; const useWebSockets = config.getEnv('push.backend') === 'websocket'; @@ -28,7 +28,9 @@ const useWebSockets = config.getEnv('push.backend') === 'websocket'; * @emits message when a message is received from a client */ @Service() -export class Push extends EventEmitter { +export class Push extends TypedEmitter<{ + editorUiConnected: string; +}> { private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush); constructor(private readonly orchestrationService: OrchestrationService) { @@ -37,7 +39,6 @@ export class Push extends EventEmitter { handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) { const { - user, ws, query: { pushRef }, } = req; diff --git a/packages/cli/src/services/cache/cache.service.ts b/packages/cli/src/services/cache/cache.service.ts index 8e9a4dc95c009..93507ad4be316 100644 --- a/packages/cli/src/services/cache/cache.service.ts +++ b/packages/cli/src/services/cache/cache.service.ts @@ -1,5 +1,3 @@ -import EventEmitter from 'node:events'; - import Container, { Service } from 'typedi'; import { caching } from 'cache-manager'; import { ApplicationError, jsonStringify } from 'n8n-workflow'; @@ -10,14 +8,20 @@ import { MalformedRefreshValueError } from '@/errors/cache-errors/malformed-refr import type { TaggedRedisCache, TaggedMemoryCache, - CacheEvent, MaybeHash, Hash, } from '@/services/cache/cache.types'; import { TIME } from '@/constants'; +import { TypedEmitter } from '@/TypedEmitter'; + +interface Events { + 'metrics.cache.hit': never; + 'metrics.cache.miss': never; + 'metrics.cache.update': never; +} @Service() -export class CacheService extends EventEmitter { +export class CacheService extends TypedEmitter { private cache: TaggedRedisCache | TaggedMemoryCache; async init() { @@ -66,10 +70,6 @@ export class CacheService extends EventEmitter { await this.cache.store.reset(); } - emit(event: CacheEvent, ...args: unknown[]) { - return super.emit(event, ...args); - } - isRedis() { return this.cache.kind === 'redis'; } diff --git a/packages/cli/src/services/cache/cache.types.ts b/packages/cli/src/services/cache/cache.types.ts index 4e96b8012a799..f598e22494446 100644 --- a/packages/cli/src/services/cache/cache.types.ts +++ b/packages/cli/src/services/cache/cache.types.ts @@ -8,5 +8,3 @@ export type TaggedMemoryCache = MemoryCache & { kind: 'memory' }; export type Hash = Record; export type MaybeHash = Hash | undefined; - -export type CacheEvent = `metrics.cache.${'hit' | 'miss' | 'update'}`; diff --git a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts index 19cd14c4a87b9..30b3e0a4c17ce 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts @@ -1,4 +1,3 @@ -import { EventEmitter } from 'node:events'; import config from '@/config'; import { Service } from 'typedi'; import { TIME } from '@/constants'; @@ -6,9 +5,15 @@ import { ErrorReporterProxy as EventReporter } from 'n8n-workflow'; import { Logger } from '@/Logger'; import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; import { RedisClientService } from '@/services/redis/redis-client.service'; +import { TypedEmitter } from '@/TypedEmitter'; + +interface Events { + 'leader-stepdown': never; + 'leader-takeover': never; +} @Service() -export class MultiMainSetup extends EventEmitter { +export class MultiMainSetup extends TypedEmitter { constructor( private readonly logger: Logger, private readonly redisPublisher: RedisServicePubSubPublisher, diff --git a/packages/cli/src/services/workflow-statistics.service.ts b/packages/cli/src/services/workflow-statistics.service.ts index 516732add87b4..3988e222a91b7 100644 --- a/packages/cli/src/services/workflow-statistics.service.ts +++ b/packages/cli/src/services/workflow-statistics.service.ts @@ -1,29 +1,48 @@ -import { EventEmitter } from 'events'; -import { Container, Service } from 'typedi'; +import { Service } from 'typedi'; import type { INode, IRun, IWorkflowBase } from 'n8n-workflow'; import { StatisticsNames } from '@db/entities/WorkflowStatistics'; import { WorkflowStatisticsRepository } from '@db/repositories/workflowStatistics.repository'; import { UserService } from '@/services/user.service'; import { Logger } from '@/Logger'; import { OwnershipService } from './ownership.service'; +import { TypedEmitter } from '@/TypedEmitter'; + +interface Events { + nodeFetchedData: { workflowId: string; node: INode }; + workflowExecutionCompleted: { workflowData: IWorkflowBase; fullRunData: IRun }; + 'telemetry.onFirstProductionWorkflowSuccess': { + project_id: string; + workflow_id: string; + user_id: string; + }; + 'telemetry.onFirstWorkflowDataLoad': { + user_id: string; + project_id: string; + workflow_id: string; + node_type: string; + node_id: string; + }; +} @Service() -export class WorkflowStatisticsService extends EventEmitter { +export class WorkflowStatisticsService extends TypedEmitter { constructor( private readonly logger: Logger, private readonly repository: WorkflowStatisticsRepository, private readonly ownershipService: OwnershipService, + private readonly userService: UserService, ) { super({ captureRejections: true }); if ('SKIP_STATISTICS_EVENTS' in process.env) return; this.on( 'nodeFetchedData', - async (workflowId, node) => await this.nodeFetchedData(workflowId, node), + async ({ workflowId, node }) => await this.nodeFetchedData(workflowId, node), ); this.on( 'workflowExecutionCompleted', - async (workflowData, runData) => await this.workflowExecutionCompleted(workflowData, runData), + async ({ workflowData, fullRunData }) => + await this.workflowExecutionCompleted(workflowData, fullRunData), ); } @@ -49,18 +68,18 @@ export class WorkflowStatisticsService extends EventEmitter { const upsertResult = await this.repository.upsertWorkflowStatistics(name, workflowId); if (name === StatisticsNames.productionSuccess && upsertResult === 'insert') { - const project = await Container.get(OwnershipService).getWorkflowProjectCached(workflowId); + const project = await this.ownershipService.getWorkflowProjectCached(workflowId); if (project.type === 'personal') { - const owner = await Container.get(OwnershipService).getProjectOwnerCached(project.id); + const owner = await this.ownershipService.getProjectOwnerCached(project.id); const metrics = { project_id: project.id, workflow_id: workflowId, - user_id: owner?.id, + user_id: owner!.id, }; if (owner && !owner.settings?.userActivated) { - await Container.get(UserService).updateSettings(owner.id, { + await this.userService.updateSettings(owner.id, { firstSuccessfulWorkflowId: workflowId, userActivated: true, userActivatedAt: runData.startedAt.getTime(), @@ -90,7 +109,7 @@ export class WorkflowStatisticsService extends EventEmitter { const owner = await this.ownershipService.getProjectOwnerCached(project.id); let metrics = { - user_id: owner?.id, + user_id: owner!.id, project_id: project.id, workflow_id: workflowId, node_type: node.type, @@ -111,29 +130,3 @@ export class WorkflowStatisticsService extends EventEmitter { this.emit('telemetry.onFirstWorkflowDataLoad', metrics); } } - -export declare interface WorkflowStatisticsService { - on( - event: 'nodeFetchedData', - listener: (workflowId: string | undefined | null, node: INode) => void, - ): this; - on( - event: 'workflowExecutionCompleted', - listener: (workflowData: IWorkflowBase, runData: IRun) => void, - ): this; - on( - event: 'telemetry.onFirstProductionWorkflowSuccess', - listener: (metrics: { user_id: string; workflow_id: string }) => void, - ): this; - on( - event: 'telemetry.onFirstWorkflowDataLoad', - listener: (metrics: { - user_id: string; - workflow_id: string; - node_type: string; - node_id: string; - credential_type?: string; - credential_id?: string; - }) => void, - ): this; -}