diff --git a/packages/cli/src/commands/base-command.ts b/packages/cli/src/commands/base-command.ts index 857ca231d4638..109d6a91f52ae 100644 --- a/packages/cli/src/commands/base-command.ts +++ b/packages/cli/src/commands/base-command.ts @@ -13,7 +13,7 @@ import { generateHostInstanceId } from '@/databases/utils/generators'; import * as Db from '@/db'; import { initErrorHandling } from '@/error-reporting'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; -import { TelemetryEventRelay } from '@/events/telemetry-event-relay'; +import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay'; import { initExpressionEvaluator } from '@/expression-evaluator'; import { ExternalHooks } from '@/external-hooks'; import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index f5f6b2b79bc52..8c1aabf74afcf 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -6,7 +6,7 @@ import config from '@/config'; import { N8N_VERSION, inTest } from '@/constants'; import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-message-generic'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; -import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay'; +import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay'; import { JobProcessor } from '@/scaling/job-processor'; import { Publisher } from '@/scaling/pubsub/publisher.service'; import type { ScalingService } from '@/scaling/scaling.service'; diff --git a/packages/cli/src/decorators/redactable.ts b/packages/cli/src/decorators/redactable.ts index 51d02c5c3d6fc..e2df19daa6f35 100644 --- a/packages/cli/src/decorators/redactable.ts +++ b/packages/cli/src/decorators/redactable.ts @@ -1,5 +1,5 @@ import { RedactableError } from '@/errors/redactable.error'; -import type { UserLike } from '@/events/relay-event-map'; +import type { UserLike } from '@/events/maps/relay.event-map'; function toRedactable(userLike: UserLike) { return { diff --git a/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts b/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts index d768218950d4c..4727c8ef722fa 100644 --- a/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts +++ b/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts @@ -3,8 +3,8 @@ import type { INode, IRun, IWorkflowBase } from 'n8n-workflow'; import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { EventService } from '@/events/event.service'; -import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay'; -import type { RelayEventMap } from '@/events/relay-event-map'; +import type { RelayEventMap } from '@/events/maps/relay.event-map'; +import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay'; import type { IWorkflowDb } from '@/interfaces'; describe('LogStreamingEventRelay', () => { diff --git a/packages/cli/src/events/__tests__/telemetry-event-relay.test.ts b/packages/cli/src/events/__tests__/telemetry-event-relay.test.ts index 9a05835205515..124afd901bfab 100644 --- a/packages/cli/src/events/__tests__/telemetry-event-relay.test.ts +++ b/packages/cli/src/events/__tests__/telemetry-event-relay.test.ts @@ -9,8 +9,8 @@ import type { ProjectRelationRepository } from '@/databases/repositories/project import type { SharedWorkflowRepository } from '@/databases/repositories/shared-workflow.repository'; import type { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { EventService } from '@/events/event.service'; -import type { RelayEventMap } from '@/events/relay-event-map'; -import { TelemetryEventRelay } from '@/events/telemetry-event-relay'; +import type { RelayEventMap } from '@/events/maps/relay.event-map'; +import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay'; import type { IWorkflowDb } from '@/interfaces'; import type { License } from '@/license'; import type { NodeTypes } from '@/node-types'; diff --git a/packages/cli/src/events/event.service.ts b/packages/cli/src/events/event.service.ts index 10ba7666efed3..b8e00ecea768e 100644 --- a/packages/cli/src/events/event.service.ts +++ b/packages/cli/src/events/event.service.ts @@ -2,11 +2,12 @@ import { Service } from 'typedi'; import { TypedEmitter } from '@/typed-emitter'; -import type { AiEventMap } from './ai-event-map'; -import type { QueueMetricsEventMap } from './queue-metrics-event-map'; -import type { RelayEventMap } from './relay-event-map'; +import type { AiEventMap } from './maps/ai.event-map'; +import type { PubSubEventMap } from './maps/pub-sub.event-map'; +import type { QueueMetricsEventMap } from './maps/queue-metrics.event-map'; +import type { RelayEventMap } from './maps/relay.event-map'; -type EventMap = RelayEventMap & QueueMetricsEventMap & AiEventMap; +type EventMap = RelayEventMap & QueueMetricsEventMap & AiEventMap & PubSubEventMap; @Service() export class EventService extends TypedEmitter {} diff --git a/packages/cli/src/events/ai-event-map.ts b/packages/cli/src/events/maps/ai.event-map.ts similarity index 100% rename from packages/cli/src/events/ai-event-map.ts rename to packages/cli/src/events/maps/ai.event-map.ts diff --git a/packages/cli/src/events/maps/pub-sub.event-map.ts b/packages/cli/src/events/maps/pub-sub.event-map.ts new file mode 100644 index 0000000000000..9237e79d13a9f --- /dev/null +++ b/packages/cli/src/events/maps/pub-sub.event-map.ts @@ -0,0 +1,104 @@ +import type { WorkerStatus, PushType } from '@n8n/api-types'; + +import type { IWorkflowDb } from '@/interfaces'; + +export type PubSubEventMap = PubSubCommandMap & PubSubWorkerResponseMap; + +export type PubSubCommandMap = { + // #region Lifecycle + + 'reload-license': never; + + 'restart-event-bus': never; + + 'reload-external-secrets-providers': never; + + // #endregion + + // #region Community packages + + 'community-package-install': { + packageName: string; + packageVersion: string; + }; + + 'community-package-update': { + packageName: string; + packageVersion: string; + }; + + 'community-package-uninstall': { + packageName: string; + }; + + // #endregion + + // #region Worker view + + 'get-worker-id': never; + + 'get-worker-status': never; + + // #endregion + + // #region Multi-main setup + + 'add-webhooks-triggers-and-pollers': { + workflowId: string; + }; + + 'remove-triggers-and-pollers': { + workflowId: string; + }; + + 'display-workflow-activation': { + workflowId: string; + }; + + 'display-workflow-deactivation': { + workflowId: string; + }; + + 'display-workflow-activation-error': { + workflowId: string; + errorMessage: string; + }; + + 'relay-execution-lifecycle-event': { + type: PushType; + args: Record; + pushRef: string; + }; + + 'clear-test-webhooks': { + webhookKey: string; + workflowEntity: IWorkflowDb; + pushRef: string; + }; + + // #endregion +}; + +export type PubSubWorkerResponseMap = { + // #region Lifecycle + + 'restart-event-bus': { + result: 'success' | 'error'; + error?: string; + }; + + 'reload-external-secrets-providers': { + result: 'success' | 'error'; + error?: string; + }; + + // #endregion + + // #region Worker view + + 'get-worker-id': never; + + 'get-worker-status': WorkerStatus; + + // #endregion +}; diff --git a/packages/cli/src/events/queue-metrics-event-map.ts b/packages/cli/src/events/maps/queue-metrics.event-map.ts similarity index 100% rename from packages/cli/src/events/queue-metrics-event-map.ts rename to packages/cli/src/events/maps/queue-metrics.event-map.ts diff --git a/packages/cli/src/events/relay-event-map.ts b/packages/cli/src/events/maps/relay.event-map.ts similarity index 99% rename from packages/cli/src/events/relay-event-map.ts rename to packages/cli/src/events/maps/relay.event-map.ts index a53a36842ebf2..a495820283067 100644 --- a/packages/cli/src/events/relay-event-map.ts +++ b/packages/cli/src/events/maps/relay.event-map.ts @@ -11,7 +11,7 @@ import type { ProjectRole } from '@/databases/entities/project-relation'; import type { GlobalRole } from '@/databases/entities/user'; import type { IWorkflowDb } from '@/interfaces'; -import type { AiEventMap } from './ai-event-map'; +import type { AiEventMap } from './ai.event-map'; export type UserLike = { id: string; diff --git a/packages/cli/src/events/event-relay.ts b/packages/cli/src/events/relays/event-relay.ts similarity index 81% rename from packages/cli/src/events/event-relay.ts rename to packages/cli/src/events/relays/event-relay.ts index 3202b69c1546c..13e7dc01be78f 100644 --- a/packages/cli/src/events/event-relay.ts +++ b/packages/cli/src/events/relays/event-relay.ts @@ -1,8 +1,7 @@ import { Service } from 'typedi'; -import type { RelayEventMap } from '@/events/relay-event-map'; - -import { EventService } from './event.service'; +import { EventService } from '@/events/event.service'; +import type { RelayEventMap } from '@/events/maps/relay.event-map'; @Service() export class EventRelay { diff --git a/packages/cli/src/events/log-streaming-event-relay.ts b/packages/cli/src/events/relays/log-streaming.event-relay.ts similarity index 98% rename from packages/cli/src/events/log-streaming-event-relay.ts rename to packages/cli/src/events/relays/log-streaming.event-relay.ts index 788e5e50c42b7..c65af2874c37c 100644 --- a/packages/cli/src/events/log-streaming-event-relay.ts +++ b/packages/cli/src/events/relays/log-streaming.event-relay.ts @@ -3,10 +3,9 @@ import { Service } from 'typedi'; import { Redactable } from '@/decorators/redactable'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; -import { EventRelay } from '@/events/event-relay'; -import type { RelayEventMap } from '@/events/relay-event-map'; - -import { EventService } from './event.service'; +import { EventService } from '@/events/event.service'; +import type { RelayEventMap } from '@/events/maps/relay.event-map'; +import { EventRelay } from '@/events/relays/event-relay'; @Service() export class LogStreamingEventRelay extends EventRelay { diff --git a/packages/cli/src/events/telemetry-event-relay.ts b/packages/cli/src/events/relays/telemetry.event-relay.ts similarity index 99% rename from packages/cli/src/events/telemetry-event-relay.ts rename to packages/cli/src/events/relays/telemetry.event-relay.ts index 82beb17198868..c813926bf1635 100644 --- a/packages/cli/src/events/telemetry-event-relay.ts +++ b/packages/cli/src/events/relays/telemetry.event-relay.ts @@ -12,14 +12,14 @@ import { ProjectRelationRepository } from '@/databases/repositories/project-rela import { SharedWorkflowRepository } from '@/databases/repositories/shared-workflow.repository'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { EventService } from '@/events/event.service'; -import type { RelayEventMap } from '@/events/relay-event-map'; +import type { RelayEventMap } from '@/events/maps/relay.event-map'; import { determineFinalExecutionStatus } from '@/execution-lifecycle-hooks/shared/shared-hook-functions'; import type { IExecutionTrackProperties } from '@/interfaces'; import { License } from '@/license'; import { NodeTypes } from '@/node-types'; import { EventRelay } from './event-relay'; -import { Telemetry } from '../telemetry'; +import { Telemetry } from '../../telemetry'; @Service() export class TelemetryEventRelay extends EventRelay { diff --git a/packages/cli/src/scaling/pubsub/pubsub.types.ts b/packages/cli/src/scaling/pubsub/pubsub.types.ts index 13643440fb80d..ac83659212663 100644 --- a/packages/cli/src/scaling/pubsub/pubsub.types.ts +++ b/packages/cli/src/scaling/pubsub/pubsub.types.ts @@ -1,6 +1,4 @@ -import type { PushType, WorkerStatus } from '@n8n/api-types'; - -import type { IWorkflowDb } from '@/interfaces'; +import type { PubSubCommandMap, PubSubWorkerResponseMap } from '@/events/maps/pub-sub.event-map'; import type { Resolve } from '@/utlity.types'; import type { COMMAND_PUBSUB_CHANNEL, WORKER_RESPONSE_PUBSUB_CHANNEL } from '../constants'; @@ -20,92 +18,17 @@ export namespace PubSub { // commands // ---------------------------------- - export type CommandMap = { - // #region Lifecycle - - 'reload-license': never; - - 'restart-event-bus': never; - - 'reload-external-secrets-providers': never; - - // #endregion - - // #region Community packages - - 'community-package-install': { - packageName: string; - packageVersion: string; - }; - - 'community-package-update': { - packageName: string; - packageVersion: string; - }; - - 'community-package-uninstall': { - packageName: string; - }; - - // #endregion - - // #region Worker view - - 'get-worker-id': never; - - 'get-worker-status': never; - - // #endregion - - // #region Multi-main setup - - 'add-webhooks-triggers-and-pollers': { - workflowId: string; - }; - - 'remove-triggers-and-pollers': { - workflowId: string; - }; - - 'display-workflow-activation': { - workflowId: string; - }; - - 'display-workflow-deactivation': { - workflowId: string; - }; - - 'display-workflow-activation-error': { - workflowId: string; - errorMessage: string; - }; - - 'relay-execution-lifecycle-event': { - type: PushType; - args: Record; - pushRef: string; - }; - - 'clear-test-webhooks': { - webhookKey: string; - workflowEntity: IWorkflowDb; - pushRef: string; - }; - - // #endregion - }; - - type _ToCommand = { + type _ToCommand = { senderId: string; targets?: string[]; command: CommandKey; - } & (CommandMap[CommandKey] extends never + } & (PubSubCommandMap[CommandKey] extends never ? { payload?: never } // some commands carry no payload - : { payload: CommandMap[CommandKey] }); + : { payload: PubSubCommandMap[CommandKey] }); - type ToCommand = Resolve<_ToCommand>; + type ToCommand = Resolve<_ToCommand>; - namespace Command { + namespace Commands { export type ReloadLicense = ToCommand<'reload-license'>; export type RestartEventBus = ToCommand<'restart-event-bus'>; export type ReloadExternalSecretsProviders = ToCommand<'reload-external-secrets-providers'>; @@ -125,63 +48,39 @@ export namespace PubSub { /** Command sent via the `n8n.commands` pubsub channel. */ export type Command = - | Command.ReloadLicense - | Command.RestartEventBus - | Command.ReloadExternalSecretsProviders - | Command.CommunityPackageInstall - | Command.CommunityPackageUpdate - | Command.CommunityPackageUninstall - | Command.GetWorkerId - | Command.GetWorkerStatus - | Command.AddWebhooksTriggersAndPollers - | Command.RemoveTriggersAndPollers - | Command.DisplayWorkflowActivation - | Command.DisplayWorkflowDeactivation - | Command.DisplayWorkflowActivationError - | Command.RelayExecutionLifecycleEvent - | Command.ClearTestWebhooks; + | Commands.ReloadLicense + | Commands.RestartEventBus + | Commands.ReloadExternalSecretsProviders + | Commands.CommunityPackageInstall + | Commands.CommunityPackageUpdate + | Commands.CommunityPackageUninstall + | Commands.GetWorkerId + | Commands.GetWorkerStatus + | Commands.AddWebhooksTriggersAndPollers + | Commands.RemoveTriggersAndPollers + | Commands.DisplayWorkflowActivation + | Commands.DisplayWorkflowDeactivation + | Commands.DisplayWorkflowActivationError + | Commands.RelayExecutionLifecycleEvent + | Commands.ClearTestWebhooks; // ---------------------------------- // worker responses // ---------------------------------- - export type WorkerResponseMap = { - // #region Lifecycle - - 'restart-event-bus': { - result: 'success' | 'error'; - error?: string; - }; - - 'reload-external-secrets-providers': { - result: 'success' | 'error'; - error?: string; - }; - - // #endregion - - // #region Worker view - - 'get-worker-id': never; - - 'get-worker-status': WorkerStatus; - - // #endregion - }; - - type _ToWorkerResponse = { + type _ToWorkerResponse = { workerId: string; targets?: string[]; command: WorkerResponseKey; - } & (WorkerResponseMap[WorkerResponseKey] extends never + } & (PubSubWorkerResponseMap[WorkerResponseKey] extends never ? { payload?: never } // some responses carry no payload - : { payload: WorkerResponseMap[WorkerResponseKey] }); + : { payload: PubSubWorkerResponseMap[WorkerResponseKey] }); - type ToWorkerResponse = Resolve< + type ToWorkerResponse = Resolve< _ToWorkerResponse >; - namespace WorkerResponse { + namespace WorkerResponses { export type RestartEventBus = ToWorkerResponse<'restart-event-bus'>; export type ReloadExternalSecretsProviders = ToWorkerResponse<'reload-external-secrets-providers'>; @@ -191,8 +90,8 @@ export namespace PubSub { /** Response sent via the `n8n.worker-response` pubsub channel. */ export type WorkerResponse = - | WorkerResponse.RestartEventBus - | WorkerResponse.ReloadExternalSecretsProviders - | WorkerResponse.GetWorkerId - | WorkerResponse.GetWorkerStatus; + | WorkerResponses.RestartEventBus + | WorkerResponses.ReloadExternalSecretsProviders + | WorkerResponses.GetWorkerId + | WorkerResponses.GetWorkerStatus; } diff --git a/packages/cli/src/server.ts b/packages/cli/src/server.ts index 0840714b6a54b..eeb6cdae46c63 100644 --- a/packages/cli/src/server.ts +++ b/packages/cli/src/server.ts @@ -21,7 +21,7 @@ import { CredentialsOverwrites } from '@/credentials-overwrites'; import { ControllerRegistry } from '@/decorators'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { EventService } from '@/events/event.service'; -import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay'; +import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay'; import type { ICredentialsOverwrite } from '@/interfaces'; import { isLdapEnabled } from '@/ldap/helpers.ee'; import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials'; diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index 1ee7c2687603b..bc42d871c3fa6 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -3,9 +3,9 @@ import type { WorkflowActivateMode } from 'n8n-workflow'; import Container, { Service } from 'typedi'; import config from '@/config'; +import type { PubSubCommandMap } from '@/events/maps/pub-sub.event-map'; import { Logger } from '@/logger'; import type { Publisher } from '@/scaling/pubsub/publisher.service'; -import type { PubSub } from '@/scaling/pubsub/pubsub.types'; import type { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { MultiMainSetup } from './orchestration/main/multi-main-setup.ee'; @@ -97,9 +97,9 @@ export class OrchestrationService { // pubsub // ---------------------------------- - async publish( + async publish( commandKey: CommandKey, - payload?: PubSub.CommandMap[CommandKey], + payload?: PubSubCommandMap[CommandKey], ) { if (!this.sanityCheck()) return; diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index bc39620c2006b..96ac4e1b1d336 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -40,6 +40,7 @@ import { ActiveExecutions } from '@/active-executions'; import config from '@/config'; import { CredentialsHelper } from '@/credentials-helper'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import type { AiEventMap, AiEventPayload } from '@/events/maps/ai.event-map'; import { ExternalHooks } from '@/external-hooks'; import type { IWorkflowExecuteProcess, @@ -53,7 +54,6 @@ import { findSubworkflowStart, isWorkflowIdValid } from '@/utils'; import * as WorkflowHelpers from '@/workflow-helpers'; import { WorkflowRepository } from './databases/repositories/workflow.repository'; -import type { AiEventMap, AiEventPayload } from './events/ai-event-map'; import { EventService } from './events/event.service'; import { restoreBinaryDataId } from './execution-lifecycle-hooks/restore-binary-data-id'; import { saveExecutionProgress } from './execution-lifecycle-hooks/save-execution-progress'; diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index 726c78537ec9e..67d3e7bab182c 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -5,7 +5,7 @@ import { BinaryDataService } from 'n8n-core'; import { Worker } from '@/commands/worker'; import config from '@/config'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; -import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay'; +import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay'; import { ExternalHooks } from '@/external-hooks'; import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; import { License } from '@/license'; diff --git a/packages/cli/test/integration/shared/utils/test-command.ts b/packages/cli/test/integration/shared/utils/test-command.ts index 82effd1818953..d0737ddcc1e9d 100644 --- a/packages/cli/test/integration/shared/utils/test-command.ts +++ b/packages/cli/test/integration/shared/utils/test-command.ts @@ -4,7 +4,7 @@ import type { Class } from 'n8n-core'; import type { BaseCommand } from '@/commands/base-command'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; -import { TelemetryEventRelay } from '@/events/telemetry-event-relay'; +import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay'; import { mockInstance } from '@test/mocking'; import * as testDb from '../test-db';