Skip to content

Commit

Permalink
feat(core): Support bidirectional communication between specific main…
Browse files Browse the repository at this point in the history
…s and specific workers (#10377)
  • Loading branch information
ivov authored and riascho committed Aug 26, 2024
1 parent d2bbfa0 commit 17f741d
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 25 deletions.
5 changes: 4 additions & 1 deletion packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,10 @@ export class Start extends BaseCommand {

await orchestrationService.init();

await Container.get(OrchestrationHandlerMainService).init();
await Container.get(OrchestrationHandlerMainService).initWithOptions({
queueModeId: this.queueModeId,
redisPublisher: Container.get(OrchestrationService).redisPublisher,
});

if (!orchestrationService.isMultiMainSetupEnabled) return;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { Push } from '@/push';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { mockInstance } from '@test/mocking';
import { RedisClientService } from '@/services/redis/redis-client.service';
import type { MainResponseReceivedHandlerOptions } from '../orchestration/main/types';

const instanceSettings = Container.get(InstanceSettings);
const redisClientService = mockInstance(RedisClientService);
Expand Down Expand Up @@ -96,8 +97,9 @@ describe('Orchestration Service', () => {
test('should handle worker responses', async () => {
const response = await handleWorkerResponseMessageMain(
JSON.stringify(workerRestartEventBusResponse),
mock<MainResponseReceivedHandlerOptions>(),
);
expect(response.command).toEqual('restartEventBus');
expect(response?.command).toEqual('restartEventBus');
});

test('should handle command messages from others', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import Container from 'typedi';
import { RedisService } from './redis.service';
import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/types';
import type { MainResponseReceivedHandlerOptions } from './orchestration/main/types';

export abstract class OrchestrationHandlerService {
protected initialized = false;
Expand All @@ -19,7 +20,9 @@ export abstract class OrchestrationHandlerService {
this.initialized = true;
}

async initWithOptions(options: WorkerCommandReceivedHandlerOptions) {
async initWithOptions(
options: WorkerCommandReceivedHandlerOptions | MainResponseReceivedHandlerOptions,
) {
await this.initSubscriber(options);
this.initialized = true;
}
Expand All @@ -29,5 +32,7 @@ export abstract class OrchestrationHandlerService {
this.initialized = false;
}

protected abstract initSubscriber(options?: WorkerCommandReceivedHandlerOptions): Promise<void>;
protected abstract initSubscriber(
options?: WorkerCommandReceivedHandlerOptions | MainResponseReceivedHandlerOptions,
): Promise<void>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,40 @@ import Container from 'typedi';
import { Logger } from '@/Logger';
import { Push } from '../../../push';
import type { RedisServiceWorkerResponseObject } from '../../redis/RedisServiceCommands';
import { WORKER_RESPONSE_REDIS_CHANNEL } from '@/services/redis/RedisConstants';
import type { MainResponseReceivedHandlerOptions } from './types';

export async function handleWorkerResponseMessageMain(messageString: string) {
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(messageString);
if (workerResponse) {
switch (workerResponse.command) {
case 'getStatus':
const push = Container.get(Push);
push.broadcast('sendWorkerStatusMessage', {
workerId: workerResponse.workerId,
status: workerResponse.payload,
});
break;
case 'getId':
break;
default:
Container.get(Logger).debug(
`Received worker response ${workerResponse.command} from ${workerResponse.workerId}`,
);
}
export async function handleWorkerResponseMessageMain(
messageString: string,
options: MainResponseReceivedHandlerOptions,
) {
const workerResponse = jsonParse<RedisServiceWorkerResponseObject | null>(messageString, {
fallbackValue: null,
});

if (!workerResponse) {
Container.get(Logger).debug(
`Received invalid message via channel ${WORKER_RESPONSE_REDIS_CHANNEL}: "${messageString}"`,
);
return;
}

if (workerResponse.targets && !workerResponse.targets.includes(options.queueModeId)) return;

switch (workerResponse.command) {
case 'getStatus':
Container.get(Push).broadcast('sendWorkerStatusMessage', {
workerId: workerResponse.workerId,
status: workerResponse.payload,
});
break;
case 'getId':
break;
default:
Container.get(Logger).debug(
`Received worker response ${workerResponse.command} from ${workerResponse.workerId}`,
);
}

return workerResponse;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from '../../redi
import { handleWorkerResponseMessageMain } from './handleWorkerResponseMessageMain';
import { handleCommandMessageMain } from './handleCommandMessageMain';
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
import type { MainResponseReceivedHandlerOptions } from './types';

@Service()
export class OrchestrationHandlerMainService extends OrchestrationHandlerService {
async initSubscriber() {
async initSubscriber(options: MainResponseReceivedHandlerOptions) {
this.redisSubscriber = await this.redisService.getPubSubSubscriber();

await this.redisSubscriber.subscribeToCommandChannel();
Expand All @@ -16,7 +17,7 @@ export class OrchestrationHandlerMainService extends OrchestrationHandlerService
'OrchestrationMessageReceiver',
async (channel: string, messageString: string) => {
if (channel === WORKER_RESPONSE_REDIS_CHANNEL) {
await handleWorkerResponseMessageMain(messageString);
await handleWorkerResponseMessageMain(messageString, options);
} else if (channel === COMMAND_REDIS_CHANNEL) {
await handleCommandMessageMain(messageString);
}
Expand Down
6 changes: 6 additions & 0 deletions packages/cli/src/services/orchestration/main/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';

export type MainResponseReceivedHandlerOptions = {
queueModeId: string;
redisPublisher: RedisServicePubSubPublisher;
};
2 changes: 1 addition & 1 deletion packages/cli/src/services/redis/RedisServiceCommands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ export type RedisServiceWorkerResponseObject = {
workflowId: string;
};
}
);
) & { targets?: string[] };

export type RedisServiceCommandObject = {
targets?: string[];
Expand Down

0 comments on commit 17f741d

Please sign in to comment.