Skip to content

Commit

Permalink
WIP: simiplify architecture
Browse files Browse the repository at this point in the history
  • Loading branch information
ramank775 committed Dec 9, 2023
1 parent bc4fdff commit 140dee6
Show file tree
Hide file tree
Showing 23 changed files with 1,147 additions and 1,003 deletions.
29 changes: 29 additions & 0 deletions libs/channel-service-client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
const { HttpClient } = require('./http-client');

class ChannelServiceClient {
constructor(options) {
this._client = new HttpClient(options.channelMsEndpoint)
}

async getChannelInfo(channelId) {
const { members } = await this._client.get(`/${channelId}`);
return members;
}
}

function addChannelServiceClientOptions(cmd) {
cmd = cmd.option('--channel-ms-endpoint <channel-ms-endpoint>', 'Base url for channel service')
return cmd;
}

async function initChannelServiceClient(context) {
const { options } = context;
context.channelServiceClient = new ChannelServiceClient(options);
return context;
}

module.exports = {
ChannelServiceClient,
addOptions: addChannelServiceClientOptions,
init: initChannelServiceClient,
}
101 changes: 81 additions & 20 deletions libs/event-args/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ const CHANNEL_TYPE = {
GROUP: 'GROUP',
}

/**
* @typedef {Object} SerializationOption
* @property {number | null} version
* @property {string[] | null} ignore
*/

class MessageEvent extends IEventArg {
static #binary_resource_name = 'Message';

Expand Down Expand Up @@ -55,23 +61,22 @@ class MessageEvent extends IEventArg {
/** @type {Record<string, string>} */
_meta = {}

/** @type {string[]|null} */
_recipients = null;

/** @type {string} */
_server_id;

/** @type {Long} */
_server_timestamp;

static fromString(payload, options) {
if (!options) options = {}
const json = JSON.parse(payload);
static fromObject(payload, options) {
const message = new MessageEvent();
message._raw = payload;
message._raw_format = 'json';
message._version = json._v || 1.0;
message._version = payload._v || 1.0;
message._timestamp = getUTCTime();
if (message._version >= 2.0) {
message._id = json.id;
const { type, to, from, ephemeral, contentType, ...others } = json.head;
message._id = payload.id;
const { type, to, from, ephemeral, contentType, ...others } = payload.head;
message._source = from || options.source;
message._destination = to;
message._channel = type.toUpperCase();
Expand All @@ -85,23 +90,30 @@ class MessageEvent extends IEventArg {
message._meta[key] = `${value}`;
})
message._meta.contentType = contentType;
message._content = json.body
message._content = payload.body
} else {
message._id = json.msgId;
message._channel = (json.chatType || json.module).toUpperCase();
message._type = json.type.toUpperCase()
message._source = json.from || options.source;
message._destination = json.to;
message._id = payload.msgId;
message._channel = (payload.chatType || payload.module).toUpperCase();
message._type = payload.type.toUpperCase()
message._source = payload.from || options.source;
message._destination = payload.to;
message._content = {
text: json.text
text: payload.text
}
message._meta.chatid = json.chatId || json.chatid;
message._meta.action = json.action;
message._meta.state = json.state;
message._meta.chatid = payload.chatId || payload.chatid;
message._meta.action = payload.action;
message._meta.state = payload.state;
}
message._recipients = payload.recipients
return message;
}

static fromString(payload, options) {
if (!options) options = {}
const json = JSON.parse(payload);
return MessageEvent.fromObject(json);
}

static fromBinary(payload, options) {
if (!options) options = {}
const messageDefination = getProtoDefination(MessageEvent.#binary_resource_name);
Expand All @@ -122,6 +134,7 @@ class MessageEvent extends IEventArg {
message._ephemeral = json.ephemeral;
message._source = options.source || json.source;
message._destination = json.destination;
message._recipients = json.recipients;
message._timestamp = json.timestamp;
message._meta = json.meta || {};
message._server_id = json.serverId;
Expand All @@ -130,13 +143,19 @@ class MessageEvent extends IEventArg {
return message;
}

toString(version = 2.1) {
/**
* Convert to JSON Object
* @param {SerializationOption} options
* @returns
*/
toObject(options = {}) {
let body = this._content
if (typeof this._content === 'string') {
body = JSON.parse(this._content)
} else if (Buffer.isBuffer(this._content)) {
body = JSON.parse(this._content.toString('utf-8'))
}
const version = options?.version || 2.1;
const message = {
_v: version,
id: this._id,
Expand Down Expand Up @@ -167,10 +186,32 @@ class MessageEvent extends IEventArg {
message.module = message.head.type;
message.action = message.head.action;
message.chatType = message.head.type;

message.recipients = message._recipients;

(options?.ignore || []).forEach((prop) => {
delete message[prop]
})

return message;
}

/**
* Serialize as string
* @param {SerializationOption} options
* @returns
*/
toString(options = {}) {
const message = this.toObject(options);
return JSON.stringify(message)
}

toBinary() {
/**
* Serialize as binary
* @param {SerializationOption} options
* @returns
*/
toBinary(options = {}) {
if (this._raw && this._raw_format === 'binary')
return this._raw;

Expand All @@ -194,12 +235,16 @@ class MessageEvent extends IEventArg {
ephemeral: this._ephemeral || false,
source: this._source,
destination: this._destination,
recipients: this._recipients,
timestamp: this._timestamp,
content,
meta: this._meta,
serverId: this._server_id,
serverTimestamp: this._server_timestamp
}
(options?.ignore || []).forEach((prop) => {
delete message[prop]
})
const errorMessage = messageDefination.verify(message);
if (errorMessage) {
throw new Error(errorMessage)
Expand All @@ -222,6 +267,18 @@ class MessageEvent extends IEventArg {
return ackMessage;
}

setRecipients(recipients) {
if (!Array.isArray(recipients)) {
throw new Error('Recipients should be an array');
}
this._recipients = recipients;
this._raw = null;
}

hasRecipients() {
return Boolean(this._recipients && this._recipients.length);
}

/**
* @param {string?} id
*/
Expand Down Expand Up @@ -272,6 +329,10 @@ class MessageEvent extends IEventArg {
return this._destination;
}

get recipients() {
return this._recipients;
}

get isServerAck() {
return this.type === MESSAGE_TYPE.SERVER_ACK;
}
Expand Down
33 changes: 33 additions & 0 deletions libs/http-client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
const axios = require('axios');

class HttpClient {
constructor(baseUrl) {
this._client = axios.create({
baseUrl,
});
}

async get(path, options) {
const response = await this._client.get(path, options)
return response.data;
}

async post(path, payload, options) {
const response = await this._client.post(path, payload, options)
return response.data;
}

async put(path, payload, options) {
const response = await this._client.put(path, payload, options)
return response.data;
}

async delete(path, payload, options) {
const response = await this._client.delete(path, payload, options);
return response.data;
}
}

module.exports = {
HttpClient
}
23 changes: 23 additions & 0 deletions libs/http-service-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class HttpServiceBase extends ServiceBase {
this.histDict = {};
this.httpServer = context.httpServer;
this.baseRoute = this.options.baseRoute || '';
this.internalBaseRoute = '_internal';
}

get uri() {
Expand Down Expand Up @@ -118,6 +119,28 @@ class HttpServiceBase extends ServiceBase {
});
}

addInternalRoute(uri, method, handler, options = {}) {
const path = `${this.internalBaseRoute}/${this.baseRoute}/${uri}`;
if (options && options.validate) {
options.validate.options = {
abortEarly: false
}
options.validate.failAction = (_req, _h, error) => {
const errorMessage = error.details.map(({ message }) => message).join('\n')
throw Boom.badRequest(errorMessage)
}
}
this.hapiServer.route({
method,
path,
handler: (req, res) => {
req.internal = true;
return handler(req, res)
},
options
});
}

async run() {
await super.run();
await this.hapiServer.start();
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
"prepare": "husky install"
},
"engines": {
"node": ">16.0.0"
"node": ">=18.0.0"
},
"dependencies": {
"@hapi/boom": "^10.0.1",
"@hapi/hapi": "^21.3.2",
"aws-sdk": "^2.1454.0",
"axios": "^1.6.2",
"commander": "^11.0.0",
"firebase-admin": "^11.10.1",
"hot-shots": "^10.0.0",
Expand All @@ -28,7 +29,6 @@
"moment": "^2.29.4",
"mongodb": "^6.0.0",
"nats": "^2.16.0",
"node-fetch": "^2.6.6",
"protobufjs": "^7.2.5",
"short-uuid": "^4.2.2",
"winston": "^3.10.0",
Expand Down
1 change: 1 addition & 0 deletions proto/event-args.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ message Message {
bytes content = 8;
uint64 timestamp = 9;
map<string, string> meta = 10;
optional repeated string recipients = 11;
string serverId = 20;
uint64 serverTimestamp = 21;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const { shortuuid, getUTCTime } = require("../../helper");
const { MessageEvent, MESSAGE_TYPE, CHANNEL_TYPE } = require("../../libs/event-args");

class GroupEvent extends MessageEvent {
class ChannelEvent extends MessageEvent {
constructor(groupId, action, actor) {
super();
this._version = 3.0;
Expand Down Expand Up @@ -31,5 +31,5 @@ class GroupEvent extends MessageEvent {
}

module.exports = {
GroupEvent
ChannelEvent
}
Loading

0 comments on commit 140dee6

Please sign in to comment.