Skip to content

Commit

Permalink
Merge pull request #24 from frmscoe/proto-buf
Browse files Browse the repository at this point in the history
Proto buf
  • Loading branch information
cshezi authored Sep 14, 2023
2 parents 9c16a3a + 65d77f6 commit cc76482
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 51 deletions.
4 changes: 2 additions & 2 deletions __tests__/nats.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ describe('init', () => {
// Do stuff
const req = JSON.parse(reqObj as string);
req.Test = { some: 'val', another: 'one' };
const resp = JSON.stringify(req);
const resp = req;

// Done, so call response method
handleResponse(resp, []);
Expand All @@ -44,7 +44,7 @@ describe('init', () => {
// Do stuff
const req = JSON.parse(reqObj as string);
req.Test = { some: 'val', another: 'one' };
const resp = JSON.stringify(req);
const resp = req;

// Done, so call response method
handleResponse(resp, []);
Expand Down
157 changes: 122 additions & 35 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@frmscoe/frms-coe-startup-lib",
"version": "1.0.0",
"version": "2.0.0",
"description": "FRMS Center of Excellence startup package library",
"main": "lib/index.js",
"repository": {
Expand All @@ -20,7 +20,7 @@
"prepare": "husky install"
},
"dependencies": {
"@frmscoe/frms-coe-lib": "^0.4.0",
"@frmscoe/frms-coe-lib": "^1.0.0",
"arangojs": "^8.3.0",
"fast-json-stringify": "^5.8.0",
"ioredis": "^5.3.2",
Expand Down
2 changes: 1 addition & 1 deletion src/interfaces/iStartupService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ import { type onMessageFunction } from '../types/onMessageFunction';
export interface IStartupService {
init: (onMessage: onMessageFunction, loggerService?: ILoggerService) => Promise<boolean>;
initProducer: (loggerService?: ILoggerService) => Promise<boolean>;
handleResponse: (response: unknown, subject?: string[]) => Promise<void>;
handleResponse: (response: object, subject?: string[]) => Promise<void>;
}
20 changes: 11 additions & 9 deletions src/services/natsService.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { StringCodec, connect, type NatsConnection, type Subscription } from 'nats';
import { connect, type NatsConnection, type Subscription } from 'nats';
import { type ILoggerService } from '../interfaces';
import { startupConfig } from '../interfaces/iStartupConfig';
import { type onMessageFunction } from '../types/onMessageFunction';
import { type IStartupService } from '..';
import fastJson from 'fast-json-stringify';
import { messageSchema } from '@frmscoe/frms-coe-lib/lib/helpers/schemas/message';
import FRMSMessage from '@frmscoe/frms-coe-lib/lib/helpers/protobuf';

export class NatsService implements IStartupService {
server = {
Expand Down Expand Up @@ -48,7 +49,7 @@ export class NatsService implements IStartupService {
const done = this.NatsConn.closed();

// Add consumer streams
this.consumerStreamName = startupConfig.consumerStreamName; // "RuleRequest";
this.consumerStreamName = startupConfig.consumerStreamName;
const consumerStreamNames = this.consumerStreamName.split(',');
const subs: Subscription[] = [];
for (const consumerStream of consumerStreamNames) {
Expand All @@ -70,8 +71,9 @@ export class NatsService implements IStartupService {
async subscribe(subscription: Subscription, onMessage: onMessageFunction): Promise<void> {
for await (const message of subscription) {
console.debug(`${Date.now().toLocaleString()} sid:[${message?.sid}] subject:[${message.subject}]: ${message.data.length}`);
const request = message.json<string>();
await onMessage(request, this.handleResponse);
const messageDecoded = FRMSMessage.decode(message.data);
const messageObject = FRMSMessage.toObject(messageDecoded);
await onMessage(messageObject, this.handleResponse);
}
}

Expand Down Expand Up @@ -142,16 +144,16 @@ export class NatsService implements IStartupService {
*
* @return {*} {Promise<void>}
*/
async handleResponse(response: unknown, subject?: string[]): Promise<void> {
const sc = StringCodec();
const res = this.#serialise(response);
async handleResponse(response: object, subject?: string[]): Promise<void> {
const message = FRMSMessage.create(response);
const messageBuffer = FRMSMessage.encode(message).finish();

if (this.producerStreamName && this.NatsConn) {
if (!subject) {
this.NatsConn.publish(this.producerStreamName, sc.encode(res));
this.NatsConn.publish(this.producerStreamName, messageBuffer);
} else {
for (const sub of subject) {
this.NatsConn.publish(sub, sc.encode(res));
this.NatsConn.publish(sub, messageBuffer);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/services/startupFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class StartupFactory implements IStartupService {
return await this.startupService.initProducer(loggerService);
}

async handleResponse(response: unknown, subject?: string[] | undefined): Promise<void> {
async handleResponse(response: object, subject?: string[] | undefined): Promise<void> {
await this.startupService.handleResponse(response, subject);
}
}
2 changes: 1 addition & 1 deletion src/types/onMessageFunction.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export type onMessageFunction = (reqObj: unknown, handleResponse: responseCallback) => Promise<void>;
export type responseCallback = (response: unknown, subject: string[]) => Promise<void>;
export type responseCallback = (response: object, subject: string[]) => Promise<void>;

0 comments on commit cc76482

Please sign in to comment.