Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy committed Jul 30, 2024
1 parent 3af518d commit b8f7c48
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 31 deletions.
45 changes: 29 additions & 16 deletions packages/cli/src/TypedEmitter.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,48 @@
import { EventEmitter } from 'node:events';
import debounce from 'lodash/debounce';

type EventName = string;

type Payloads<L> = {
[E in Extract<keyof L, EventName>]: unknown;
type Payloads<ListenerMap> = {
[E in keyof ListenerMap]: unknown;
};

type Listener<P> = (payload: P) => void;
type Listener<Payload> = (payload: Payload) => void;

export class TypedEmitter<L extends Payloads<L>> extends EventEmitter {
protected debounceWait = 300;
export class TypedEmitter<ListenerMap extends Payloads<ListenerMap>> extends EventEmitter {
private debounceWait = 300; // milliseconds

override on<U extends Extract<keyof L, EventName>>(event: U, listener: Listener<L[U]>) {
return super.on(event, listener);
override on<EventName extends keyof ListenerMap & string>(
eventName: EventName,
listener: Listener<ListenerMap[EventName]>,
) {
return super.on(eventName, listener);
}

override once<U extends Extract<keyof L, EventName>>(event: U, listener: Listener<L[U]>) {
return super.once(event, listener);
override once<EventName extends keyof ListenerMap & string>(
eventName: EventName,
listener: Listener<ListenerMap[EventName]>,
) {
return super.once(eventName, listener);
}

override off<U extends Extract<keyof L, EventName>>(event: U, listener: Listener<L[U]>) {
return super.off(event, listener);
override off<EventName extends keyof ListenerMap & string>(
eventName: EventName,
listener: Listener<ListenerMap[EventName]>,
) {
return super.off(eventName, listener);
}

override emit<U extends Extract<keyof L, EventName>>(event: U, payload?: L[U]): boolean {
return super.emit(event, payload);
override emit<EventName extends keyof ListenerMap & string>(
eventName: EventName,
payload?: ListenerMap[EventName],
): boolean {
return super.emit(eventName, payload);
}

protected debouncedEmit = debounce(
<U extends Extract<keyof L, EventName>>(event: U, payload?: L[U]) => super.emit(event, payload),
<EventName extends keyof ListenerMap & string>(
eventName: EventName,
payload?: ListenerMap[EventName],
) => super.emit(eventName, payload),
this.debounceWait,
);
}
6 changes: 3 additions & 3 deletions packages/cli/src/concurrency/concurrency-queue.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { Service } from 'typedi';
import { TypedEmitter } from '@/TypedEmitter';

interface Events {
type ConcurrencyEvents = {
'execution-throttled': { executionId: string };
'execution-released': string;
'concurrency-check': { capacity: number };
}
};

@Service()
export class ConcurrencyQueue extends TypedEmitter<Events> {
export class ConcurrencyQueue extends TypedEmitter<ConcurrencyEvents> {
private readonly queue: Array<{
executionId: string;
resolve: () => void;
Expand Down
6 changes: 3 additions & 3 deletions packages/cli/src/push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import { WebSocketPush } from './websocket.push';
import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types';
import { TypedEmitter } from '@/TypedEmitter';

interface Events {
type PushEvents = {
editorUiConnected: string;
}
};

const useWebSockets = config.getEnv('push.backend') === 'websocket';

Expand All @@ -32,7 +32,7 @@ const useWebSockets = config.getEnv('push.backend') === 'websocket';
* @emits message when a message is received from a client
*/
@Service()
export class Push extends TypedEmitter<Events> {
export class Push extends TypedEmitter<PushEvents> {
private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush);

constructor(private readonly orchestrationService: OrchestrationService) {
Expand Down
6 changes: 3 additions & 3 deletions packages/cli/src/services/cache/cache.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import type {
import { TIME } from '@/constants';
import { TypedEmitter } from '@/TypedEmitter';

interface Events {
type CacheEvents = {
'metrics.cache.hit': never;
'metrics.cache.miss': never;
'metrics.cache.update': never;
}
};

@Service()
export class CacheService extends TypedEmitter<Events> {
export class CacheService extends TypedEmitter<CacheEvents> {
private cache: TaggedRedisCache | TaggedMemoryCache;

async init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSub
import { RedisClientService } from '@/services/redis/redis-client.service';
import { TypedEmitter } from '@/TypedEmitter';

interface Events {
type MultiMainEvents = {
'leader-stepdown': never;
'leader-takeover': never;
}
};

@Service()
export class MultiMainSetup extends TypedEmitter<Events> {
export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
constructor(
private readonly logger: Logger,
private readonly redisPublisher: RedisServicePubSubPublisher,
Expand Down
6 changes: 3 additions & 3 deletions packages/cli/src/services/workflow-statistics.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { Logger } from '@/Logger';
import { OwnershipService } from './ownership.service';
import { TypedEmitter } from '@/TypedEmitter';

interface Events {
type WorkflowStatisticsEvents = {
nodeFetchedData: { workflowId: string; node: INode };
workflowExecutionCompleted: { workflowData: IWorkflowBase; fullRunData: IRun };
'telemetry.onFirstProductionWorkflowSuccess': {
Expand All @@ -22,10 +22,10 @@ interface Events {
node_type: string;
node_id: string;
};
}
};

@Service()
export class WorkflowStatisticsService extends TypedEmitter<Events> {
export class WorkflowStatisticsService extends TypedEmitter<WorkflowStatisticsEvents> {
constructor(
private readonly logger: Logger,
private readonly repository: WorkflowStatisticsRepository,
Expand Down

0 comments on commit b8f7c48

Please sign in to comment.