From 4084605e8d8880b1814a08b8824038c7187f62fb Mon Sep 17 00:00:00 2001 From: Gosha Date: Tue, 28 Nov 2023 12:07:43 +0200 Subject: [PATCH] feat: add ws health indicator --- apps/ws/src/health/health.controller.ts | 5 ++- apps/ws/src/health/health.module.ts | 5 +++ apps/ws/src/socket/services/index.ts | 1 + .../services/ws-health-indicator.service.ts | 25 +++++++++++++ .../external-services-route.usecase.ts | 35 ++++++++++++------- apps/ws/src/socket/ws.gateway.ts | 16 +++++++-- .../application-generic/src/health/index.ts | 1 + 7 files changed, 71 insertions(+), 17 deletions(-) create mode 100644 apps/ws/src/socket/services/ws-health-indicator.service.ts diff --git a/apps/ws/src/health/health.controller.ts b/apps/ws/src/health/health.controller.ts index b3bb529a0f5..115395e0638 100644 --- a/apps/ws/src/health/health.controller.ts +++ b/apps/ws/src/health/health.controller.ts @@ -3,13 +3,15 @@ import { HealthCheck, HealthCheckResult, HealthCheckService, HttpHealthIndicator import { DalServiceHealthIndicator, WebSocketsQueueServiceHealthIndicator } from '@novu/application-generic'; import { version } from '../../package.json'; +import { WSHealthIndicator } from '../socket/services'; @Controller('v1/health-check') export class HealthController { constructor( private healthCheckService: HealthCheckService, private dalHealthIndicator: DalServiceHealthIndicator, - private webSocketsQueueHealthIndicator: WebSocketsQueueServiceHealthIndicator + private webSocketsQueueHealthIndicator: WebSocketsQueueServiceHealthIndicator, + private wsHealthIndicator: WSHealthIndicator ) {} @Get() @@ -18,6 +20,7 @@ export class HealthController { const result = await this.healthCheckService.check([ async () => this.dalHealthIndicator.isHealthy(), async () => this.webSocketsQueueHealthIndicator.isHealthy(), + async () => this.wsHealthIndicator.isHealthy(), async () => { return { apiVersion: { diff --git a/apps/ws/src/health/health.module.ts b/apps/ws/src/health/health.module.ts index 4f570ccfbb0..e613d53072d 100644 --- a/apps/ws/src/health/health.module.ts +++ b/apps/ws/src/health/health.module.ts @@ -4,9 +4,14 @@ import { QueuesModule } from '@novu/application-generic'; import { HealthController } from './health.controller'; import { SharedModule } from '../shared/shared.module'; +import { WSGateway } from '../socket/ws.gateway'; +import { WSHealthIndicator } from '../socket/services'; + +const PROVIDERS = [WSHealthIndicator, WSGateway]; @Module({ imports: [TerminusModule, SharedModule, QueuesModule], + providers: PROVIDERS, controllers: [HealthController], }) export class HealthModule {} diff --git a/apps/ws/src/socket/services/index.ts b/apps/ws/src/socket/services/index.ts index b03b9f0b02d..9c5f11999e9 100644 --- a/apps/ws/src/socket/services/index.ts +++ b/apps/ws/src/socket/services/index.ts @@ -1 +1,2 @@ export { WebSocketWorker } from './web-socket.worker'; +export { WSHealthIndicator } from './ws-health-indicator.service'; diff --git a/apps/ws/src/socket/services/ws-health-indicator.service.ts b/apps/ws/src/socket/services/ws-health-indicator.service.ts new file mode 100644 index 00000000000..f9be8125eb9 --- /dev/null +++ b/apps/ws/src/socket/services/ws-health-indicator.service.ts @@ -0,0 +1,25 @@ +import { HealthIndicator, HealthIndicatorResult } from '@nestjs/terminus'; +import { Injectable } from '@nestjs/common'; + +import { IHealthIndicator } from '@novu/application-generic'; + +import { WSGateway } from '../ws.gateway'; + +@Injectable() +export class WSHealthIndicator extends HealthIndicator implements IHealthIndicator { + private INDICATOR_KEY = 'ws'; + + constructor(private wsGateway: WSGateway) { + super(); + } + + async isHealthy(): Promise { + const status = !!this.wsGateway.server; + + return this.getStatus(this.INDICATOR_KEY, status); + } + + isActive(): Promise { + return this.isHealthy(); + } +} diff --git a/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts b/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts index 786550e38d4..cff06088138 100644 --- a/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts +++ b/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts @@ -15,18 +15,21 @@ export class ExternalServicesRoute { public async execute(command: ExternalServicesRouteCommand) { const isOnline = await this.connectionExist(command); - if (isOnline) { - if (command.event === WebSocketEventEnum.RECEIVED) { - await this.processReceivedEvent(command); - } - - if (command.event === WebSocketEventEnum.UNSEEN) { - await this.sendUnseenCountChange(command); - } - - if (command.event === WebSocketEventEnum.UNREAD) { - await this.sendUnreadCountChange(command); - } + + if (!isOnline) { + return; + } + + if (command.event === WebSocketEventEnum.RECEIVED) { + await this.processReceivedEvent(command); + } + + if (command.event === WebSocketEventEnum.UNSEEN) { + await this.sendUnseenCountChange(command); + } + + if (command.event === WebSocketEventEnum.UNREAD) { + await this.sendUnreadCountChange(command); } } @@ -127,7 +130,13 @@ export class ExternalServicesRoute { } } - private async connectionExist(command: ExternalServicesRouteCommand) { + private async connectionExist(command: ExternalServicesRouteCommand): Promise { + if (!this.wsGateway.server) { + Logger.error('No sw server found, unable to check if connection exists', LOG_CONTEXT); + + return; + } + return !!(await this.wsGateway.server.sockets.in(command.userId).fetchSockets()).length; } } diff --git a/apps/ws/src/socket/ws.gateway.ts b/apps/ws/src/socket/ws.gateway.ts index 91f880f75ea..f6f451db552 100644 --- a/apps/ws/src/socket/ws.gateway.ts +++ b/apps/ws/src/socket/ws.gateway.ts @@ -1,17 +1,21 @@ const nr = require('newrelic'); - +import { JwtService } from '@nestjs/jwt'; import { OnGatewayConnection, OnGatewayDisconnect, WebSocketGateway, WebSocketServer } from '@nestjs/websockets'; +import { Logger } from '@nestjs/common'; import { Server, Socket } from 'socket.io'; -import { JwtService } from '@nestjs/jwt'; + import { ISubscriberJwt, ObservabilityBackgroundTransactionEnum } from '@novu/shared'; + import { SubscriberOnlineService } from '../shared/subscriber-online'; +const LOG_CONTEXT = 'WSGateway'; + @WebSocketGateway() export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect { constructor(private jwtService: JwtService, private subscriberOnlineService: SubscriberOnlineService) {} @WebSocketServer() - server: Server; + server: Server | null; async handleDisconnect(connection: Socket) { // eslint-disable-next-line @typescript-eslint/no-this-alias @@ -112,6 +116,12 @@ export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect { // eslint-disable-next-line @typescript-eslint/no-explicit-any async sendMessage(userId: string, event: string, data: any) { + if (!this.server) { + Logger.error('No sw server available to send message', LOG_CONTEXT); + + return; + } + this.server.to(userId).emit(event, data); } diff --git a/packages/application-generic/src/health/index.ts b/packages/application-generic/src/health/index.ts index b86313e3d85..d474ff32cff 100644 --- a/packages/application-generic/src/health/index.ts +++ b/packages/application-generic/src/health/index.ts @@ -7,3 +7,4 @@ export * from './standard-queue.health-indicator'; export * from './web-sockets-queue.health-indicator'; export * from './workflow-queue.health-indicator'; export * from './subscriber-process-queue.health-indicator'; +export * from './health-indicator.interface';