From 458807138ebadb8a7662fb847049cb96ede83573 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Wed, 24 Jul 2024 22:28:07 +0200 Subject: [PATCH 1/5] refactor(core): Use type-safe event emitters (no-changelog) --- packages/cli/src/TypedEmitter.ts | 35 +++++++++++++++++++ .../concurrency-control.service.ts | 6 ++-- .../cli/src/concurrency/concurrency-queue.ts | 14 ++++---- 3 files changed, 44 insertions(+), 11 deletions(-) create mode 100644 packages/cli/src/TypedEmitter.ts diff --git a/packages/cli/src/TypedEmitter.ts b/packages/cli/src/TypedEmitter.ts new file mode 100644 index 0000000000000..7aaa885835108 --- /dev/null +++ b/packages/cli/src/TypedEmitter.ts @@ -0,0 +1,35 @@ +import { EventEmitter } from 'node:events'; +import debounce from 'lodash/debounce'; + +type EventName = string; + +type Payloads = { + [E in Extract]: unknown; +}; + +type Listener

= (payload: P) => void; + +export class TypedEmitter> extends EventEmitter { + protected debounceWait = 300; + + override on>(event: U, listener: Listener) { + return super.on(event, listener); + } + + override once>(event: U, listener: Listener) { + return super.once(event, listener); + } + + override off>(event: U, listener: Listener) { + return super.off(event, listener); + } + + override emit>(event: U, payload?: L[U]): boolean { + return super.emit(event, payload); + } + + protected debouncedEmit = debounce( + >(event: U, payload?: L[U]) => super.emit(event, payload), + this.debounceWait, + ); +} diff --git a/packages/cli/src/concurrency/concurrency-control.service.ts b/packages/cli/src/concurrency/concurrency-control.service.ts index cf249c51722eb..c562448379fef 100644 --- a/packages/cli/src/concurrency/concurrency-control.service.ts +++ b/packages/cli/src/concurrency/concurrency-control.service.ts @@ -53,7 +53,7 @@ export class ConcurrencyControlService { this.isEnabled = true; - this.productionQueue.on('concurrency-check', ({ capacity }: { capacity: number }) => { + this.productionQueue.on('concurrency-check', ({ capacity }) => { if (this.shouldReport(capacity)) { void this.telemetry.track('User hit concurrency limit', { threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity, @@ -61,12 +61,12 @@ export class ConcurrencyControlService { } }); - this.productionQueue.on('execution-throttled', ({ executionId }: { executionId: string }) => { + this.productionQueue.on('execution-throttled', ({ executionId }) => { this.log('Execution throttled', { executionId }); this.eventService.emit('execution-throttled', { executionId }); }); - this.productionQueue.on('execution-released', async (executionId: string) => { + this.productionQueue.on('execution-released', async (executionId) => { this.log('Execution released', { executionId }); await this.executionRepository.resetStartedAt(executionId); }); diff --git a/packages/cli/src/concurrency/concurrency-queue.ts b/packages/cli/src/concurrency/concurrency-queue.ts index 90c62d2efc21e..c0212282112b5 100644 --- a/packages/cli/src/concurrency/concurrency-queue.ts +++ b/packages/cli/src/concurrency/concurrency-queue.ts @@ -1,9 +1,12 @@ import { Service } from 'typedi'; -import { EventEmitter } from 'node:events'; -import debounce from 'lodash/debounce'; +import { TypedEmitter } from '@/TypedEmitter'; @Service() -export class ConcurrencyQueue extends EventEmitter { +export class ConcurrencyQueue extends TypedEmitter<{ + 'execution-throttled': { executionId: string }; + 'execution-released': string; + 'concurrency-check': { capacity: number }; +}> { private readonly queue: Array<{ executionId: string; resolve: () => void; @@ -63,9 +66,4 @@ export class ConcurrencyQueue extends EventEmitter { resolve(); } - - private debouncedEmit = debounce( - (event: string, payload: object) => this.emit(event, payload), - 300, - ); } From 5478022eb4201225d98c05780af485e4123bbee0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Mon, 29 Jul 2024 19:37:20 +0200 Subject: [PATCH 2/5] convert more emitters --- packages/cli/src/WebhookHelpers.ts | 9 ++- .../cli/src/WorkflowExecuteAdditionalData.ts | 18 +++-- packages/cli/src/eventbus/event.service.ts | 14 +--- packages/cli/src/push/index.ts | 7 +- .../cli/src/services/cache/cache.service.ts | 16 ++--- .../cli/src/services/cache/cache.types.ts | 2 - .../orchestration/main/MultiMainSetup.ee.ts | 9 ++- .../services/workflow-statistics.service.ts | 65 +++++++++---------- 8 files changed, 62 insertions(+), 78 deletions(-) diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index 95e8825e1a840..9342a97cda69c 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -360,11 +360,10 @@ export async function executeWebhook( NodeExecuteFunctions, executionMode, ); - Container.get(WorkflowStatisticsService).emit( - 'nodeFetchedData', - workflow.id, - workflowStartNode, - ); + Container.get(WorkflowStatisticsService).emit('nodeFetchedData', { + workflowId: workflow.id, + node: workflowStartNode, + }); } catch (err) { // Send error response to webhook caller const errorMessage = 'Workflow Webhook Error: Workflow could not be started!'; diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index e5a17de7ec26d..754cfea693c3d 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -525,17 +525,16 @@ function hookFunctionsSave(): IWorkflowExecuteHooks { ); } } finally { - workflowStatisticsService.emit( - 'workflowExecutionCompleted', - this.workflowData, + workflowStatisticsService.emit('workflowExecutionCompleted', { + workflowData: this.workflowData, fullRunData, - ); + }); } }, ], nodeFetchedData: [ async (workflowId: string, node: INode) => { - workflowStatisticsService.emit('nodeFetchedData', workflowId, node); + workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); }, ], }; @@ -636,11 +635,10 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { this.retryOf, ); } finally { - workflowStatisticsService.emit( - 'workflowExecutionCompleted', - this.workflowData, + workflowStatisticsService.emit('workflowExecutionCompleted', { + workflowData: this.workflowData, fullRunData, - ); + }); } }, async function (this: WorkflowHooks, runData: IRun): Promise { @@ -676,7 +674,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { ], nodeFetchedData: [ async (workflowId: string, node: INode) => { - workflowStatisticsService.emit('nodeFetchedData', workflowId, node); + workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); }, ], }; diff --git a/packages/cli/src/eventbus/event.service.ts b/packages/cli/src/eventbus/event.service.ts index 2df51af22cad1..2b16ff06ab40c 100644 --- a/packages/cli/src/eventbus/event.service.ts +++ b/packages/cli/src/eventbus/event.service.ts @@ -1,16 +1,6 @@ -import { EventEmitter } from 'node:events'; import { Service } from 'typedi'; +import { TypedEmitter } from '@/TypedEmitter'; import type { Event } from './event.types'; @Service() -export class EventService extends EventEmitter { - emit(eventName: K, arg?: Event[K]) { - super.emit(eventName, arg); - return true; - } - - on(eventName: K, handler: (arg: Event[K]) => void) { - super.on(eventName, handler); - return this; - } -} +export class EventService extends TypedEmitter {} diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index 647ab40acf0c5..12f7de9ac4931 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -1,4 +1,3 @@ -import { EventEmitter } from 'events'; import { ServerResponse } from 'http'; import type { Server } from 'http'; import type { Socket } from 'net'; @@ -17,6 +16,7 @@ import { OrchestrationService } from '@/services/orchestration.service'; import { SSEPush } from './sse.push'; import { WebSocketPush } from './websocket.push'; import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types'; +import { TypedEmitter } from '@/TypedEmitter'; const useWebSockets = config.getEnv('push.backend') === 'websocket'; @@ -28,7 +28,9 @@ const useWebSockets = config.getEnv('push.backend') === 'websocket'; * @emits message when a message is received from a client */ @Service() -export class Push extends EventEmitter { +export class Push extends TypedEmitter<{ + editorUiConnected: string; +}> { private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush); constructor(private readonly orchestrationService: OrchestrationService) { @@ -37,7 +39,6 @@ export class Push extends EventEmitter { handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) { const { - user, ws, query: { pushRef }, } = req; diff --git a/packages/cli/src/services/cache/cache.service.ts b/packages/cli/src/services/cache/cache.service.ts index 8e9a4dc95c009..93507ad4be316 100644 --- a/packages/cli/src/services/cache/cache.service.ts +++ b/packages/cli/src/services/cache/cache.service.ts @@ -1,5 +1,3 @@ -import EventEmitter from 'node:events'; - import Container, { Service } from 'typedi'; import { caching } from 'cache-manager'; import { ApplicationError, jsonStringify } from 'n8n-workflow'; @@ -10,14 +8,20 @@ import { MalformedRefreshValueError } from '@/errors/cache-errors/malformed-refr import type { TaggedRedisCache, TaggedMemoryCache, - CacheEvent, MaybeHash, Hash, } from '@/services/cache/cache.types'; import { TIME } from '@/constants'; +import { TypedEmitter } from '@/TypedEmitter'; + +interface Events { + 'metrics.cache.hit': never; + 'metrics.cache.miss': never; + 'metrics.cache.update': never; +} @Service() -export class CacheService extends EventEmitter { +export class CacheService extends TypedEmitter { private cache: TaggedRedisCache | TaggedMemoryCache; async init() { @@ -66,10 +70,6 @@ export class CacheService extends EventEmitter { await this.cache.store.reset(); } - emit(event: CacheEvent, ...args: unknown[]) { - return super.emit(event, ...args); - } - isRedis() { return this.cache.kind === 'redis'; } diff --git a/packages/cli/src/services/cache/cache.types.ts b/packages/cli/src/services/cache/cache.types.ts index 4e96b8012a799..f598e22494446 100644 --- a/packages/cli/src/services/cache/cache.types.ts +++ b/packages/cli/src/services/cache/cache.types.ts @@ -8,5 +8,3 @@ export type TaggedMemoryCache = MemoryCache & { kind: 'memory' }; export type Hash = Record; export type MaybeHash = Hash | undefined; - -export type CacheEvent = `metrics.cache.${'hit' | 'miss' | 'update'}`; diff --git a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts index 19cd14c4a87b9..30b3e0a4c17ce 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts @@ -1,4 +1,3 @@ -import { EventEmitter } from 'node:events'; import config from '@/config'; import { Service } from 'typedi'; import { TIME } from '@/constants'; @@ -6,9 +5,15 @@ import { ErrorReporterProxy as EventReporter } from 'n8n-workflow'; import { Logger } from '@/Logger'; import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; import { RedisClientService } from '@/services/redis/redis-client.service'; +import { TypedEmitter } from '@/TypedEmitter'; + +interface Events { + 'leader-stepdown': never; + 'leader-takeover': never; +} @Service() -export class MultiMainSetup extends EventEmitter { +export class MultiMainSetup extends TypedEmitter { constructor( private readonly logger: Logger, private readonly redisPublisher: RedisServicePubSubPublisher, diff --git a/packages/cli/src/services/workflow-statistics.service.ts b/packages/cli/src/services/workflow-statistics.service.ts index 516732add87b4..3988e222a91b7 100644 --- a/packages/cli/src/services/workflow-statistics.service.ts +++ b/packages/cli/src/services/workflow-statistics.service.ts @@ -1,29 +1,48 @@ -import { EventEmitter } from 'events'; -import { Container, Service } from 'typedi'; +import { Service } from 'typedi'; import type { INode, IRun, IWorkflowBase } from 'n8n-workflow'; import { StatisticsNames } from '@db/entities/WorkflowStatistics'; import { WorkflowStatisticsRepository } from '@db/repositories/workflowStatistics.repository'; import { UserService } from '@/services/user.service'; import { Logger } from '@/Logger'; import { OwnershipService } from './ownership.service'; +import { TypedEmitter } from '@/TypedEmitter'; + +interface Events { + nodeFetchedData: { workflowId: string; node: INode }; + workflowExecutionCompleted: { workflowData: IWorkflowBase; fullRunData: IRun }; + 'telemetry.onFirstProductionWorkflowSuccess': { + project_id: string; + workflow_id: string; + user_id: string; + }; + 'telemetry.onFirstWorkflowDataLoad': { + user_id: string; + project_id: string; + workflow_id: string; + node_type: string; + node_id: string; + }; +} @Service() -export class WorkflowStatisticsService extends EventEmitter { +export class WorkflowStatisticsService extends TypedEmitter { constructor( private readonly logger: Logger, private readonly repository: WorkflowStatisticsRepository, private readonly ownershipService: OwnershipService, + private readonly userService: UserService, ) { super({ captureRejections: true }); if ('SKIP_STATISTICS_EVENTS' in process.env) return; this.on( 'nodeFetchedData', - async (workflowId, node) => await this.nodeFetchedData(workflowId, node), + async ({ workflowId, node }) => await this.nodeFetchedData(workflowId, node), ); this.on( 'workflowExecutionCompleted', - async (workflowData, runData) => await this.workflowExecutionCompleted(workflowData, runData), + async ({ workflowData, fullRunData }) => + await this.workflowExecutionCompleted(workflowData, fullRunData), ); } @@ -49,18 +68,18 @@ export class WorkflowStatisticsService extends EventEmitter { const upsertResult = await this.repository.upsertWorkflowStatistics(name, workflowId); if (name === StatisticsNames.productionSuccess && upsertResult === 'insert') { - const project = await Container.get(OwnershipService).getWorkflowProjectCached(workflowId); + const project = await this.ownershipService.getWorkflowProjectCached(workflowId); if (project.type === 'personal') { - const owner = await Container.get(OwnershipService).getProjectOwnerCached(project.id); + const owner = await this.ownershipService.getProjectOwnerCached(project.id); const metrics = { project_id: project.id, workflow_id: workflowId, - user_id: owner?.id, + user_id: owner!.id, }; if (owner && !owner.settings?.userActivated) { - await Container.get(UserService).updateSettings(owner.id, { + await this.userService.updateSettings(owner.id, { firstSuccessfulWorkflowId: workflowId, userActivated: true, userActivatedAt: runData.startedAt.getTime(), @@ -90,7 +109,7 @@ export class WorkflowStatisticsService extends EventEmitter { const owner = await this.ownershipService.getProjectOwnerCached(project.id); let metrics = { - user_id: owner?.id, + user_id: owner!.id, project_id: project.id, workflow_id: workflowId, node_type: node.type, @@ -111,29 +130,3 @@ export class WorkflowStatisticsService extends EventEmitter { this.emit('telemetry.onFirstWorkflowDataLoad', metrics); } } - -export declare interface WorkflowStatisticsService { - on( - event: 'nodeFetchedData', - listener: (workflowId: string | undefined | null, node: INode) => void, - ): this; - on( - event: 'workflowExecutionCompleted', - listener: (workflowData: IWorkflowBase, runData: IRun) => void, - ): this; - on( - event: 'telemetry.onFirstProductionWorkflowSuccess', - listener: (metrics: { user_id: string; workflow_id: string }) => void, - ): this; - on( - event: 'telemetry.onFirstWorkflowDataLoad', - listener: (metrics: { - user_id: string; - workflow_id: string; - node_type: string; - node_id: string; - credential_type?: string; - credential_id?: string; - }) => void, - ): this; -} From 76908cd710878c85bb14361dd4504d129f2575a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Mon, 29 Jul 2024 19:43:57 +0200 Subject: [PATCH 3/5] consistent code style --- packages/cli/src/concurrency/concurrency-queue.ts | 8 +++++--- packages/cli/src/push/index.ts | 8 +++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/packages/cli/src/concurrency/concurrency-queue.ts b/packages/cli/src/concurrency/concurrency-queue.ts index c0212282112b5..7ef77569f45ab 100644 --- a/packages/cli/src/concurrency/concurrency-queue.ts +++ b/packages/cli/src/concurrency/concurrency-queue.ts @@ -1,12 +1,14 @@ import { Service } from 'typedi'; import { TypedEmitter } from '@/TypedEmitter'; -@Service() -export class ConcurrencyQueue extends TypedEmitter<{ +interface Events { 'execution-throttled': { executionId: string }; 'execution-released': string; 'concurrency-check': { capacity: number }; -}> { +} + +@Service() +export class ConcurrencyQueue extends TypedEmitter { private readonly queue: Array<{ executionId: string; resolve: () => void; diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index 12f7de9ac4931..544a4f6a72eae 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -18,6 +18,10 @@ import { WebSocketPush } from './websocket.push'; import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types'; import { TypedEmitter } from '@/TypedEmitter'; +interface Events { + editorUiConnected: string; +} + const useWebSockets = config.getEnv('push.backend') === 'websocket'; /** @@ -28,9 +32,7 @@ const useWebSockets = config.getEnv('push.backend') === 'websocket'; * @emits message when a message is received from a client */ @Service() -export class Push extends TypedEmitter<{ - editorUiConnected: string; -}> { +export class Push extends TypedEmitter { private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush); constructor(private readonly orchestrationService: OrchestrationService) { From 3af518def809b1e049aefe8a98f28bc6e8b24468 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Mon, 29 Jul 2024 19:51:02 +0200 Subject: [PATCH 4/5] fix unit tests --- .../cli/test/unit/services/workflow-statistics.service.test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/cli/test/unit/services/workflow-statistics.service.test.ts b/packages/cli/test/unit/services/workflow-statistics.service.test.ts index da1338f8eb764..9bd9864bb0446 100644 --- a/packages/cli/test/unit/services/workflow-statistics.service.test.ts +++ b/packages/cli/test/unit/services/workflow-statistics.service.test.ts @@ -48,6 +48,7 @@ describe('WorkflowStatisticsService', () => { mock(), new WorkflowStatisticsRepository(dataSource, globalConfig), ownershipService, + userService, ); const onFirstProductionWorkflowSuccess = jest.fn(); From b8f7c4849ad9c9df33392aa1ee07c4c653fff527 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Tue, 30 Jul 2024 12:01:43 +0200 Subject: [PATCH 5/5] PR feedback --- packages/cli/src/TypedEmitter.ts | 45 ++++++++++++------- .../cli/src/concurrency/concurrency-queue.ts | 6 +-- packages/cli/src/push/index.ts | 6 +-- .../cli/src/services/cache/cache.service.ts | 6 +-- .../orchestration/main/MultiMainSetup.ee.ts | 6 +-- .../services/workflow-statistics.service.ts | 6 +-- 6 files changed, 44 insertions(+), 31 deletions(-) diff --git a/packages/cli/src/TypedEmitter.ts b/packages/cli/src/TypedEmitter.ts index 7aaa885835108..176aa9584c907 100644 --- a/packages/cli/src/TypedEmitter.ts +++ b/packages/cli/src/TypedEmitter.ts @@ -1,35 +1,48 @@ import { EventEmitter } from 'node:events'; import debounce from 'lodash/debounce'; -type EventName = string; - -type Payloads = { - [E in Extract]: unknown; +type Payloads = { + [E in keyof ListenerMap]: unknown; }; -type Listener

= (payload: P) => void; +type Listener = (payload: Payload) => void; -export class TypedEmitter> extends EventEmitter { - protected debounceWait = 300; +export class TypedEmitter> extends EventEmitter { + private debounceWait = 300; // milliseconds - override on>(event: U, listener: Listener) { - return super.on(event, listener); + override on( + eventName: EventName, + listener: Listener, + ) { + return super.on(eventName, listener); } - override once>(event: U, listener: Listener) { - return super.once(event, listener); + override once( + eventName: EventName, + listener: Listener, + ) { + return super.once(eventName, listener); } - override off>(event: U, listener: Listener) { - return super.off(event, listener); + override off( + eventName: EventName, + listener: Listener, + ) { + return super.off(eventName, listener); } - override emit>(event: U, payload?: L[U]): boolean { - return super.emit(event, payload); + override emit( + eventName: EventName, + payload?: ListenerMap[EventName], + ): boolean { + return super.emit(eventName, payload); } protected debouncedEmit = debounce( - >(event: U, payload?: L[U]) => super.emit(event, payload), + ( + eventName: EventName, + payload?: ListenerMap[EventName], + ) => super.emit(eventName, payload), this.debounceWait, ); } diff --git a/packages/cli/src/concurrency/concurrency-queue.ts b/packages/cli/src/concurrency/concurrency-queue.ts index 7ef77569f45ab..1b578b5a8ef1d 100644 --- a/packages/cli/src/concurrency/concurrency-queue.ts +++ b/packages/cli/src/concurrency/concurrency-queue.ts @@ -1,14 +1,14 @@ import { Service } from 'typedi'; import { TypedEmitter } from '@/TypedEmitter'; -interface Events { +type ConcurrencyEvents = { 'execution-throttled': { executionId: string }; 'execution-released': string; 'concurrency-check': { capacity: number }; -} +}; @Service() -export class ConcurrencyQueue extends TypedEmitter { +export class ConcurrencyQueue extends TypedEmitter { private readonly queue: Array<{ executionId: string; resolve: () => void; diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index 544a4f6a72eae..a946348430133 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -18,9 +18,9 @@ import { WebSocketPush } from './websocket.push'; import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types'; import { TypedEmitter } from '@/TypedEmitter'; -interface Events { +type PushEvents = { editorUiConnected: string; -} +}; const useWebSockets = config.getEnv('push.backend') === 'websocket'; @@ -32,7 +32,7 @@ const useWebSockets = config.getEnv('push.backend') === 'websocket'; * @emits message when a message is received from a client */ @Service() -export class Push extends TypedEmitter { +export class Push extends TypedEmitter { private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush); constructor(private readonly orchestrationService: OrchestrationService) { diff --git a/packages/cli/src/services/cache/cache.service.ts b/packages/cli/src/services/cache/cache.service.ts index 93507ad4be316..75dad03b49d26 100644 --- a/packages/cli/src/services/cache/cache.service.ts +++ b/packages/cli/src/services/cache/cache.service.ts @@ -14,14 +14,14 @@ import type { import { TIME } from '@/constants'; import { TypedEmitter } from '@/TypedEmitter'; -interface Events { +type CacheEvents = { 'metrics.cache.hit': never; 'metrics.cache.miss': never; 'metrics.cache.update': never; -} +}; @Service() -export class CacheService extends TypedEmitter { +export class CacheService extends TypedEmitter { private cache: TaggedRedisCache | TaggedMemoryCache; async init() { diff --git a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts index 30b3e0a4c17ce..37705f12be07f 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts @@ -7,13 +7,13 @@ import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSub import { RedisClientService } from '@/services/redis/redis-client.service'; import { TypedEmitter } from '@/TypedEmitter'; -interface Events { +type MultiMainEvents = { 'leader-stepdown': never; 'leader-takeover': never; -} +}; @Service() -export class MultiMainSetup extends TypedEmitter { +export class MultiMainSetup extends TypedEmitter { constructor( private readonly logger: Logger, private readonly redisPublisher: RedisServicePubSubPublisher, diff --git a/packages/cli/src/services/workflow-statistics.service.ts b/packages/cli/src/services/workflow-statistics.service.ts index 3988e222a91b7..9311ef885dfbd 100644 --- a/packages/cli/src/services/workflow-statistics.service.ts +++ b/packages/cli/src/services/workflow-statistics.service.ts @@ -7,7 +7,7 @@ import { Logger } from '@/Logger'; import { OwnershipService } from './ownership.service'; import { TypedEmitter } from '@/TypedEmitter'; -interface Events { +type WorkflowStatisticsEvents = { nodeFetchedData: { workflowId: string; node: INode }; workflowExecutionCompleted: { workflowData: IWorkflowBase; fullRunData: IRun }; 'telemetry.onFirstProductionWorkflowSuccess': { @@ -22,10 +22,10 @@ interface Events { node_type: string; node_id: string; }; -} +}; @Service() -export class WorkflowStatisticsService extends TypedEmitter { +export class WorkflowStatisticsService extends TypedEmitter { constructor( private readonly logger: Logger, private readonly repository: WorkflowStatisticsRepository,