Skip to content

Commit

Permalink
bugfix: log errors from event handlers
Browse files Browse the repository at this point in the history
Add log when event handler faced an error
before the app crash

Refs #409
  • Loading branch information
Sikora00 committed Sep 14, 2021
1 parent 85dd27d commit 927b45a
Showing 1 changed file with 28 additions and 6 deletions.
34 changes: 28 additions & 6 deletions src/event-bus.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Injectable, OnModuleDestroy, Type } from '@nestjs/common';
import { Logger } from '@nestjs/common/services/logger.service';
import { ModuleRef } from '@nestjs/core';
import { Observable, Subscription } from 'rxjs';
import { filter } from 'rxjs/operators';
import { from, Observable, Subscription, throwError } from 'rxjs';
import { catchError, filter, mergeMap } from 'rxjs/operators';
import { isFunction } from 'util';
import { CommandBus } from './command-bus';
import { EVENTS_HANDLER_METADATA, SAGA_METADATA } from './decorators/constants';
Expand All @@ -24,10 +25,12 @@ export type EventHandlerType<EventBase extends IEvent = IEvent> = Type<
@Injectable()
export class EventBus<EventBase extends IEvent = IEvent>
extends ObservableBus<EventBase>
implements IEventBus<EventBase>, OnModuleDestroy {
implements IEventBus<EventBase>, OnModuleDestroy
{
protected getEventName: (event: EventBase) => string;
protected readonly subscriptions: Subscription[];

private readonly logger = new Logger('EventBus');
private _publisher: IEventPublisher<EventBase>;

constructor(
Expand Down Expand Up @@ -65,7 +68,14 @@ export class EventBus<EventBase extends IEvent = IEvent>

bind(handler: IEventHandler<EventBase>, name: string) {
const stream$ = name ? this.ofEventName(name) : this.subject$;
const subscription = stream$.subscribe((event) => handler.handle(event));
const subscription = stream$
.pipe(mergeMap((event) => from(handler.handle(event))))
.subscribe({
error: (error) => {
this.logger.error(`${handler.constructor.name} produced an error.`);
throw error;
},
});
this.subscriptions.push(subscription);
}

Expand Down Expand Up @@ -115,8 +125,20 @@ export class EventBus<EventBase extends IEvent = IEvent>
}

const subscription = stream$
.pipe(filter((e) => !!e))
.subscribe((command) => this.commandBus.execute(command));
.pipe(
filter((e) => !!e),
mergeMap((command) =>
from(this.commandBus.execute(command)).pipe(
catchError((error) => {
this.logger.error(
`${command.constructor.name} produced an error.`,
);
return throwError(() => error);
}),
),
),
)
.subscribe();

this.subscriptions.push(subscription);
}
Expand Down

0 comments on commit 927b45a

Please sign in to comment.