Skip to content

Commit

Permalink
Merge pull request #28 from frmscoe/route-network-map
Browse files Browse the repository at this point in the history
feat: route from networkmap option
  • Loading branch information
cshezi authored Dec 1, 2023
2 parents b297a20 + 82c36c9 commit 8d0886d
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 26 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@frmscoe/frms-coe-startup-lib",
"version": "2.1.0",
"version": "2.1.1",
"description": "FRMS Center of Excellence startup package library",
"main": "lib/index.js",
"repository": {
Expand Down
9 changes: 7 additions & 2 deletions src/interfaces/iStartupService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ import { type ILoggerService } from '.';
import { type onMessageFunction } from '../types/onMessageFunction';

export interface IStartupService {
init: (onMessage: onMessageFunction, loggerService?: ILoggerService) => Promise<boolean>;
initProducer: (loggerService?: ILoggerService) => Promise<boolean>;
init: (
onMessage: onMessageFunction,
loggerService?: ILoggerService,
parConsumerStreamNames?: string[],
parProducerStreamName?: string,
) => Promise<boolean>;
initProducer: (loggerService?: ILoggerService, parProducerStreamName?: string) => Promise<boolean>;
handleResponse: (response: object, subject?: string[]) => Promise<void>;
}
33 changes: 20 additions & 13 deletions src/services/natsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export class NatsService implements IStartupService {
};

producerStreamName = '';
consumerStreamName = '';
consumerStreamName: string[] | undefined;
functionName = '';
NatsConn?: NatsConnection;
logger?: ILoggerService | Console;
Expand All @@ -35,24 +35,30 @@ export class NatsService implements IStartupService {
* @return {*} {Promise<boolean>}
*/

async init(onMessage: onMessageFunction, loggerService?: ILoggerService): Promise<boolean> {
async init(
onMessage: onMessageFunction,
loggerService?: ILoggerService,
parConsumerStreamNames?: string[],
parProducerStreamName?: string,
): Promise<boolean> {
try {
// Validate additional Environmental Variables.
if (!startupConfig.consumerStreamName) {
throw new Error(`No Consumer Stream Name Provided in environmental Variable`);
if (!startupConfig.consumerStreamName && !parConsumerStreamNames?.length) {
throw new Error(`No Consumer Stream Name Provided in environmental Variable or on startup as an arguement`);
}
if (parProducerStreamName) startupConfig.producerStreamName = parProducerStreamName;
if (parConsumerStreamNames) startupConfig.consumerStreamName = String(parConsumerStreamNames);

await this.initProducer(loggerService);
await this.initProducer(loggerService, parProducerStreamName);
if (!this.NatsConn || !this.logger) return await Promise.resolve(false);

// this promise indicates the client closed
const done = this.NatsConn.closed();

// Add consumer streams
this.consumerStreamName = startupConfig.consumerStreamName;
const consumerStreamNames = this.consumerStreamName.split(',');
this.consumerStreamName = startupConfig.consumerStreamName.split(',');
const subs: Subscription[] = [];
for (const consumerStream of consumerStreamNames) {
for (const consumerStream of this.consumerStreamName) {
subs.push(this.NatsConn.subscribe(`${consumerStream}`, { queue: `${this.functionName}` }));
}

Expand Down Expand Up @@ -93,8 +99,8 @@ export class NatsService implements IStartupService {
* @return {*} {Promise<boolean>}
*/

async initProducer(loggerService?: ILoggerService): Promise<boolean> {
await this.validateEnvironment();
async initProducer(loggerService?: ILoggerService, parProducerStreamName?: string): Promise<boolean> {
await this.validateEnvironment(parProducerStreamName);
if (loggerService) {
this.logger = startupConfig.env === 'dev' || startupConfig.env === 'test' ? console : loggerService;
} else {
Expand All @@ -110,6 +116,7 @@ export class NatsService implements IStartupService {

// Init producer streams
this.producerStreamName = startupConfig.producerStreamName;
if (parProducerStreamName) this.producerStreamName = parProducerStreamName;
} catch (error) {
this.logger.log(`Error communicating with NATS on: ${JSON.stringify(this.server)}, with error: ${JSON.stringify(error)}`);
throw error;
Expand All @@ -122,9 +129,9 @@ export class NatsService implements IStartupService {
return true;
}

async validateEnvironment(): Promise<void> {
if (!startupConfig.producerStreamName) {
throw new Error(`No Producer Stream Name Provided in environmental Variable`);
async validateEnvironment(parProducerStreamName?: string): Promise<void> {
if (!startupConfig.producerStreamName && !parProducerStreamName) {
throw new Error(`No Producer Stream Name Provided in environmental Variable or on startup as an arguement`);
}

if (!startupConfig.serverUrl) {
Expand Down
21 changes: 13 additions & 8 deletions src/services/startupFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,33 @@ export class StartupFactory implements IStartupService {
}

/* eslint-disable @typescript-eslint/no-misused-promises */
async init(onMessage: onMessageFunction, loggerService?: ILoggerService | undefined): Promise<boolean> {
async init(
onMessage: onMessageFunction,
loggerService?: ILoggerService | undefined,
parConsumerStreamNames?: string[],
parProducerStreamName?: string,
): Promise<boolean> {
process.on('uncaughtException', async (): Promise<void> => {
await this.startupService.init(onMessage, loggerService);
await this.startupService.init(onMessage, loggerService, parConsumerStreamNames, parProducerStreamName);
});

process.on('unhandledRejection', async (): Promise<void> => {
await this.startupService.init(onMessage, loggerService);
await this.startupService.init(onMessage, loggerService, parConsumerStreamNames, parProducerStreamName);
});

return await this.startupService.init(onMessage, loggerService);
return await this.startupService.init(onMessage, loggerService, parConsumerStreamNames, parProducerStreamName);
}

async initProducer(loggerService?: ILoggerService | undefined): Promise<boolean> {
async initProducer(loggerService?: ILoggerService | undefined, parProducerStreamName?: string): Promise<boolean> {
process.on('uncaughtException', async (): Promise<void> => {
await this.startupService.initProducer(loggerService);
await this.startupService.initProducer(loggerService, parProducerStreamName);
});

process.on('unhandledRejection', async (): Promise<void> => {
await this.startupService.initProducer(loggerService);
await this.startupService.initProducer(loggerService, parProducerStreamName);
});

return await this.startupService.initProducer(loggerService);
return await this.startupService.initProducer(loggerService, parProducerStreamName);
}

async handleResponse(response: object, subject?: string[] | undefined): Promise<void> {
Expand Down

0 comments on commit 8d0886d

Please sign in to comment.