Skip to content

Commit

Permalink
Added failsafe in case producer is busy
Browse files Browse the repository at this point in the history
  • Loading branch information
rpanic committed Nov 7, 2023
1 parent 7dded84 commit 7f7b687
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 6 deletions.
53 changes: 48 additions & 5 deletions packages/common/src/events/EventEmitter.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,69 @@
import { EventsRecord } from "./EventEmittingComponent";
import _ from "lodash";

type ListenersHolder<Events extends EventsRecord> = {
// eslint-disable-next-line putout/putout
[key in keyof Events]?: ((...args: Events[key]) => void)[];
[key in keyof Events]?: {
id: number;
listener: (...args: Events[key]) => void;
}[];
};

export class EventEmitter<Events extends EventsRecord> {
private readonly listeners: ListenersHolder<Events> = {};

private counter = 0;

// Fields used for offSelf()
private currentListenerId: number | undefined = undefined;

private currentListenerEventName: keyof Events | undefined = undefined;

public emit(event: keyof Events, ...parameters: Events[typeof event]) {
const listeners = this.listeners[event];
if(listeners !== undefined) {
if (listeners !== undefined) {
this.currentListenerEventName = event;

listeners.forEach((listener) => {
listener(...parameters);
this.currentListenerId = listener.id;

listener.listener(...parameters);

this.currentListenerId = undefined;
});
this.currentListenerEventName = undefined;
}
}

public on<Key extends keyof Events>(
event: Key,
listener: (...args: Events[Key]) => void
) {
(this.listeners[event] ??= []).push(listener);
): number {
// eslint-disable-next-line no-multi-assign
const id = (this.counter += 1);
(this.listeners[event] ??= []).push({
id,
listener,
});
return id;
}

// TODO Improve to be thread-safe
public offSelf() {
if (
this.currentListenerEventName !== undefined &&
this.currentListenerId !== undefined
) {
this.off(this.currentListenerEventName, this.currentListenerId);
}
}

public off<Key extends keyof Events>(event: Key, id: number) {
const listeners = this.listeners[event];
if (listeners !== undefined) {
this.listeners[event] = listeners.filter(
(listener) => listener.id !== id
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,21 @@ export class AutomaticBlockTrigger
// eslint-disable-next-line @typescript-eslint/no-misused-promises
this.mempool.events.on("transactionAdded", async () => {
log.info("Transaction received, creating block...");
await this.unprovenProducerModule.tryProduceUnprovenBlock();
const block = await this.unprovenProducerModule.tryProduceUnprovenBlock();

// In case the block producer was busy, we need to re-trigger production
// as soon as the previous production was finished
if (block === undefined) {
this.unprovenProducerModule.events.on(
"unprovenBlockProduced",
async () => {
// eslint-disable-next-line max-len
// Make sure this comes before await, because otherwise we have a race condition
this.unprovenProducerModule.events.offSelf();
await this.unprovenProducerModule.tryProduceUnprovenBlock();
}
);
}
});
}
}

0 comments on commit 7f7b687

Please sign in to comment.