From 2a73d7892c1a52a09bf23cff374d5a0bebfcb6a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 13 Aug 2024 13:12:55 +0200 Subject: [PATCH] feat(core): Support bidirectional communication between specific mains and specific workers --- packages/cli/src/commands/start.ts | 5 +- .../orchestration.handler.base.service.ts | 9 +++- .../main/handleWorkerResponseMessageMain.ts | 51 ++++++++++++------- .../orchestration.handler.main.service.ts | 5 +- .../src/services/orchestration/main/types.ts | 6 +++ .../services/redis/RedisServiceCommands.ts | 2 +- 6 files changed, 54 insertions(+), 24 deletions(-) create mode 100644 packages/cli/src/services/orchestration/main/types.ts diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 0ebfbbc1a6dae..3ca068ea908ba 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -240,7 +240,10 @@ export class Start extends BaseCommand { await orchestrationService.init(); - await Container.get(OrchestrationHandlerMainService).init(); + await Container.get(OrchestrationHandlerMainService).initWithOptions({ + queueModeId: this.queueModeId, + redisPublisher: Container.get(OrchestrationService).redisPublisher, + }); if (!orchestrationService.isMultiMainSetupEnabled) return; diff --git a/packages/cli/src/services/orchestration.handler.base.service.ts b/packages/cli/src/services/orchestration.handler.base.service.ts index 6a706434e43bb..b2507151ce410 100644 --- a/packages/cli/src/services/orchestration.handler.base.service.ts +++ b/packages/cli/src/services/orchestration.handler.base.service.ts @@ -2,6 +2,7 @@ import Container from 'typedi'; import { RedisService } from './redis.service'; import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/types'; +import type { MainResponseReceivedHandlerOptions } from './orchestration/main/types'; export abstract class OrchestrationHandlerService { protected initialized = false; @@ -19,7 +20,9 @@ export abstract class OrchestrationHandlerService { this.initialized = true; } - async initWithOptions(options: WorkerCommandReceivedHandlerOptions) { + async initWithOptions( + options: WorkerCommandReceivedHandlerOptions | MainResponseReceivedHandlerOptions, + ) { await this.initSubscriber(options); this.initialized = true; } @@ -29,5 +32,7 @@ export abstract class OrchestrationHandlerService { this.initialized = false; } - protected abstract initSubscriber(options?: WorkerCommandReceivedHandlerOptions): Promise; + protected abstract initSubscriber( + options?: WorkerCommandReceivedHandlerOptions | MainResponseReceivedHandlerOptions, + ): Promise; } diff --git a/packages/cli/src/services/orchestration/main/handleWorkerResponseMessageMain.ts b/packages/cli/src/services/orchestration/main/handleWorkerResponseMessageMain.ts index 47a1a080192f6..4e60d53741333 100644 --- a/packages/cli/src/services/orchestration/main/handleWorkerResponseMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleWorkerResponseMessageMain.ts @@ -3,25 +3,40 @@ import Container from 'typedi'; import { Logger } from '@/Logger'; import { Push } from '../../../push'; import type { RedisServiceWorkerResponseObject } from '../../redis/RedisServiceCommands'; +import { WORKER_RESPONSE_REDIS_CHANNEL } from '@/services/redis/RedisConstants'; +import type { MainResponseReceivedHandlerOptions } from './types'; -export async function handleWorkerResponseMessageMain(messageString: string) { - const workerResponse = jsonParse(messageString); - if (workerResponse) { - switch (workerResponse.command) { - case 'getStatus': - const push = Container.get(Push); - push.broadcast('sendWorkerStatusMessage', { - workerId: workerResponse.workerId, - status: workerResponse.payload, - }); - break; - case 'getId': - break; - default: - Container.get(Logger).debug( - `Received worker response ${workerResponse.command} from ${workerResponse.workerId}`, - ); - } +export async function handleWorkerResponseMessageMain( + messageString: string, + options: MainResponseReceivedHandlerOptions, +) { + const workerResponse = jsonParse(messageString, { + fallbackValue: null, + }); + + if (!workerResponse) { + Container.get(Logger).debug( + `Received invalid message via channel ${WORKER_RESPONSE_REDIS_CHANNEL}: "${messageString}"`, + ); + return; } + + if (workerResponse.targets && !workerResponse.targets.includes(options.queueModeId)) return; + + switch (workerResponse.command) { + case 'getStatus': + Container.get(Push).broadcast('sendWorkerStatusMessage', { + workerId: workerResponse.workerId, + status: workerResponse.payload, + }); + break; + case 'getId': + break; + default: + Container.get(Logger).debug( + `Received worker response ${workerResponse.command} from ${workerResponse.workerId}`, + ); + } + return workerResponse; } diff --git a/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts b/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts index 6cc86c9f51eec..983c39fd3333d 100644 --- a/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts +++ b/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts @@ -3,10 +3,11 @@ import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from '../../redi import { handleWorkerResponseMessageMain } from './handleWorkerResponseMessageMain'; import { handleCommandMessageMain } from './handleCommandMessageMain'; import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; +import type { MainResponseReceivedHandlerOptions } from './types'; @Service() export class OrchestrationHandlerMainService extends OrchestrationHandlerService { - async initSubscriber() { + async initSubscriber(options: MainResponseReceivedHandlerOptions) { this.redisSubscriber = await this.redisService.getPubSubSubscriber(); await this.redisSubscriber.subscribeToCommandChannel(); @@ -16,7 +17,7 @@ export class OrchestrationHandlerMainService extends OrchestrationHandlerService 'OrchestrationMessageReceiver', async (channel: string, messageString: string) => { if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { - await handleWorkerResponseMessageMain(messageString); + await handleWorkerResponseMessageMain(messageString, options); } else if (channel === COMMAND_REDIS_CHANNEL) { await handleCommandMessageMain(messageString); } diff --git a/packages/cli/src/services/orchestration/main/types.ts b/packages/cli/src/services/orchestration/main/types.ts new file mode 100644 index 0000000000000..d189d7cdf7b13 --- /dev/null +++ b/packages/cli/src/services/orchestration/main/types.ts @@ -0,0 +1,6 @@ +import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; + +export type MainResponseReceivedHandlerOptions = { + queueModeId: string; + redisPublisher: RedisServicePubSubPublisher; +}; diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index a8ae41c11390b..b7786adad31ef 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -94,7 +94,7 @@ export type RedisServiceWorkerResponseObject = { workflowId: string; }; } -); +) & { targets?: string[] }; export type RedisServiceCommandObject = { targets?: string[];