diff --git a/package-lock.json b/package-lock.json index 30cb80c..e1a1b3c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@frmscoe/frms-coe-startup-lib", - "version": "2.1.0", + "version": "2.1.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@frmscoe/frms-coe-startup-lib", - "version": "2.1.0", + "version": "2.1.1", "license": "ISC", "dependencies": { "@frmscoe/frms-coe-lib": "2.1.2", diff --git a/package.json b/package.json index f36ea73..d05f17b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@frmscoe/frms-coe-startup-lib", - "version": "2.1.0", + "version": "2.1.1", "description": "FRMS Center of Excellence startup package library", "main": "lib/index.js", "repository": { diff --git a/src/interfaces/iStartupService.ts b/src/interfaces/iStartupService.ts index c3c0859..e0d10b4 100644 --- a/src/interfaces/iStartupService.ts +++ b/src/interfaces/iStartupService.ts @@ -4,7 +4,12 @@ import { type ILoggerService } from '.'; import { type onMessageFunction } from '../types/onMessageFunction'; export interface IStartupService { - init: (onMessage: onMessageFunction, loggerService?: ILoggerService) => Promise; - initProducer: (loggerService?: ILoggerService) => Promise; + init: ( + onMessage: onMessageFunction, + loggerService?: ILoggerService, + parConsumerStreamNames?: string[], + parProducerStreamName?: string, + ) => Promise; + initProducer: (loggerService?: ILoggerService, parProducerStreamName?: string) => Promise; handleResponse: (response: object, subject?: string[]) => Promise; } diff --git a/src/services/natsService.ts b/src/services/natsService.ts index bbf2044..c7002e4 100644 --- a/src/services/natsService.ts +++ b/src/services/natsService.ts @@ -13,7 +13,7 @@ export class NatsService implements IStartupService { }; producerStreamName = ''; - consumerStreamName = ''; + consumerStreamName: string[] | undefined; functionName = ''; NatsConn?: NatsConnection; logger?: ILoggerService | Console; @@ -35,24 +35,30 @@ export class NatsService implements IStartupService { * @return {*} {Promise} */ - async init(onMessage: onMessageFunction, loggerService?: ILoggerService): Promise { + async init( + onMessage: onMessageFunction, + loggerService?: ILoggerService, + parConsumerStreamNames?: string[], + parProducerStreamName?: string, + ): Promise { try { // Validate additional Environmental Variables. - if (!startupConfig.consumerStreamName) { - throw new Error(`No Consumer Stream Name Provided in environmental Variable`); + if (!startupConfig.consumerStreamName && !parConsumerStreamNames?.length) { + throw new Error(`No Consumer Stream Name Provided in environmental Variable or on startup as an arguement`); } + if (parProducerStreamName) startupConfig.producerStreamName = parProducerStreamName; + if (parConsumerStreamNames) startupConfig.consumerStreamName = String(parConsumerStreamNames); - await this.initProducer(loggerService); + await this.initProducer(loggerService, parProducerStreamName); if (!this.NatsConn || !this.logger) return await Promise.resolve(false); // this promise indicates the client closed const done = this.NatsConn.closed(); // Add consumer streams - this.consumerStreamName = startupConfig.consumerStreamName; - const consumerStreamNames = this.consumerStreamName.split(','); + this.consumerStreamName = startupConfig.consumerStreamName.split(','); const subs: Subscription[] = []; - for (const consumerStream of consumerStreamNames) { + for (const consumerStream of this.consumerStreamName) { subs.push(this.NatsConn.subscribe(`${consumerStream}`, { queue: `${this.functionName}` })); } @@ -93,8 +99,8 @@ export class NatsService implements IStartupService { * @return {*} {Promise} */ - async initProducer(loggerService?: ILoggerService): Promise { - await this.validateEnvironment(); + async initProducer(loggerService?: ILoggerService, parProducerStreamName?: string): Promise { + await this.validateEnvironment(parProducerStreamName); if (loggerService) { this.logger = startupConfig.env === 'dev' || startupConfig.env === 'test' ? console : loggerService; } else { @@ -110,6 +116,7 @@ export class NatsService implements IStartupService { // Init producer streams this.producerStreamName = startupConfig.producerStreamName; + if (parProducerStreamName) this.producerStreamName = parProducerStreamName; } catch (error) { this.logger.log(`Error communicating with NATS on: ${JSON.stringify(this.server)}, with error: ${JSON.stringify(error)}`); throw error; @@ -122,9 +129,9 @@ export class NatsService implements IStartupService { return true; } - async validateEnvironment(): Promise { - if (!startupConfig.producerStreamName) { - throw new Error(`No Producer Stream Name Provided in environmental Variable`); + async validateEnvironment(parProducerStreamName?: string): Promise { + if (!startupConfig.producerStreamName && !parProducerStreamName) { + throw new Error(`No Producer Stream Name Provided in environmental Variable or on startup as an arguement`); } if (!startupConfig.serverUrl) { diff --git a/src/services/startupFactory.ts b/src/services/startupFactory.ts index aa78d1a..ee84b76 100644 --- a/src/services/startupFactory.ts +++ b/src/services/startupFactory.ts @@ -23,28 +23,33 @@ export class StartupFactory implements IStartupService { } /* eslint-disable @typescript-eslint/no-misused-promises */ - async init(onMessage: onMessageFunction, loggerService?: ILoggerService | undefined): Promise { + async init( + onMessage: onMessageFunction, + loggerService?: ILoggerService | undefined, + parConsumerStreamNames?: string[], + parProducerStreamName?: string, + ): Promise { process.on('uncaughtException', async (): Promise => { - await this.startupService.init(onMessage, loggerService); + await this.startupService.init(onMessage, loggerService, parConsumerStreamNames, parProducerStreamName); }); process.on('unhandledRejection', async (): Promise => { - await this.startupService.init(onMessage, loggerService); + await this.startupService.init(onMessage, loggerService, parConsumerStreamNames, parProducerStreamName); }); - return await this.startupService.init(onMessage, loggerService); + return await this.startupService.init(onMessage, loggerService, parConsumerStreamNames, parProducerStreamName); } - async initProducer(loggerService?: ILoggerService | undefined): Promise { + async initProducer(loggerService?: ILoggerService | undefined, parProducerStreamName?: string): Promise { process.on('uncaughtException', async (): Promise => { - await this.startupService.initProducer(loggerService); + await this.startupService.initProducer(loggerService, parProducerStreamName); }); process.on('unhandledRejection', async (): Promise => { - await this.startupService.initProducer(loggerService); + await this.startupService.initProducer(loggerService, parProducerStreamName); }); - return await this.startupService.initProducer(loggerService); + return await this.startupService.initProducer(loggerService, parProducerStreamName); } async handleResponse(response: object, subject?: string[] | undefined): Promise {