Skip to content

Commit

Permalink
fix(engine): remove engine circular dependencies in connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
burgerni10 authored and Nicolas Burger committed Nov 20, 2022
1 parent 6f04975 commit 57f6389
Show file tree
Hide file tree
Showing 89 changed files with 1,543 additions and 4,449 deletions.
43 changes: 0 additions & 43 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
"@koa/multer": "3.0.0",
"@koa/router": "12.0.0",
"ads-client": "1.14.1",
"axios": "0.27.2",
"basic-auth": "2.0.1",
"better-sqlite3": "7.6.2",
"form-data": "4.0.0",
Expand Down
27 changes: 9 additions & 18 deletions src/engine/base-engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ southList.OPCHDA = require('../south/south-opchda/south-opchda')
southList.RestApi = require('../south/south-rest/south-rest')

const LoggerService = require('../service/logger/logger.service')
const { createRequestService } = require('../service/request')
const StatusService = require('../service/status.service')

/**
Expand Down Expand Up @@ -56,7 +55,6 @@ class BaseEngine {
// Variable initialized in initEngineServices
this.statusService = null
this.logger = null
this.requestService = null
}

/**
Expand All @@ -66,32 +64,25 @@ class BaseEngine {
* @returns {Promise<void>} - The result promise
*/
async initEngineServices(engineConfig, loggerScope) {
this.oibusName = engineConfig.engineName
this.defaultLogParameters = engineConfig.logParameters
this.proxies = engineConfig.proxies
this.statusService = new StatusService()
// Configure the logger
this.logger = new LoggerService(loggerScope)
this.logger.setEncryptionService(this.encryptionService)
await this.logger.changeParameters(engineConfig, {})

// Buffer delay in ms: when a South connector generates a lot of values at the same time, it may be better to accumulate them
// in a buffer before sending them to the engine
// Max buffer: if the buffer reaches this length, it will be sent to the engine immediately
// these parameters could be settings from OIBus UI
this.bufferMax = engineConfig.caching.bufferMax
this.bufferTimeoutInterval = engineConfig.caching.bufferTimeoutInterval

// Request service
this.requestService = createRequestService(this)
await this.logger.changeParameters(this.oibusName, this.defaultLogParameters)
}

/**
* Add new values from a South connector to the Engine.
* The Engine will forward the values to the Cache.
* @param {String} id - The South connector id
* @param {String} southId - The South connector id
* @param {Object[]} values - Array of values
* @returns {Promise<void>} - The result promise
*/
async addValues(id, values) {
this.logger.warn(`addValues() should be surcharged. Called with South "${id}" and ${values.length} values.`)
async addValues(southId, values) {
this.logger.warn(`addValues() should be surcharged. Called with South "${southId}" and ${values.length} values.`)
}

/**
Expand Down Expand Up @@ -133,7 +124,7 @@ class BaseEngine {
try {
const SouthConnector = this.installedSouthConnectors[configuration.type]
if (SouthConnector) {
return new SouthConnector(configuration, this)
return new SouthConnector(configuration, this.addValues.bind(this), this.addFile.bind(this))
}
this.logger.error(`South connector for "${configuration.name}" is not found: ${configuration.type}`)
} catch (error) {
Expand Down Expand Up @@ -163,7 +154,7 @@ class BaseEngine {
try {
const NorthConnector = this.installedNorthConnectors[configuration.type]
if (NorthConnector) {
return new NorthConnector(configuration, this)
return new NorthConnector(configuration, this.proxies)
}
this.logger.error(`North connector for "${configuration.name}" is not found: ${configuration.type}`)
} catch (error) {
Expand Down
5 changes: 0 additions & 5 deletions src/engine/base-engine.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ describe('BaseEngine', () => {
await engine.initEngineServices(config.engine, 'base')
})

it('should be properly initialized', () => {
expect(engine.bufferMax).toEqual(config.engine.caching.bufferMax)
expect(engine.bufferTimeoutInterval).toEqual(config.engine.caching.bufferTimeoutInterval)
})

it('should warn when calling add values', async () => {
const sampleValues = [{
timestamp: 'today',
Expand Down
18 changes: 0 additions & 18 deletions src/engine/cache/base-cache.js

This file was deleted.

14 changes: 5 additions & 9 deletions src/engine/cache/file-cache.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const fs = require('node:fs/promises')
const path = require('node:path')

const BaseCache = require('./base-cache')
const { createFolder } = require('../../service/utils')

// Time between two checks of the Archive Folder
Expand All @@ -14,7 +13,7 @@ const FILE_FOLDER = 'files'
/**
* Local cache implementation to group events and store them when the communication with the North is down.
*/
class FileCache extends BaseCache {
class FileCache {
/**
* @param {String} northId - The North ID connector
* @param {Logger} logger - The logger
Expand All @@ -30,12 +29,9 @@ class FileCache extends BaseCache {
archiveFiles,
retentionDuration,
) {
super(
northId,
logger,
baseFolder,
)

this.northId = northId
this.logger = logger
this.baseFolder = baseFolder
this.archiveFiles = archiveFiles
// Convert from hours to ms to compare with mtimeMs (file modified time in ms)
this.retentionDuration = retentionDuration * 3600000
Expand Down Expand Up @@ -203,7 +199,7 @@ class FileCache extends BaseCache {

/**
* Check if the file cache is empty or not
* @returns {Boolean} - Cache empty or not
* @returns {Promise<Boolean>} - Cache empty or not
*/
async isEmpty() {
let files = []
Expand Down
19 changes: 17 additions & 2 deletions src/engine/cache/value-cache.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,30 @@
const path = require('node:path')

const databaseService = require('../../service/database.service')
const BaseCache = require('./base-cache')

const VALUES_DB_FILE_NAME = 'values.db'
const VALUES_ERROR_DB_FILE_NAME = 'values-error.db'

/**
* Local cache implementation to group events and store them when the communication with the North is down.
*/
class ValueCache extends BaseCache {
class ValueCache {
/**
* @param {String} northId - The North ID connector
* @param {Logger} logger - The logger
* @param {String} baseFolder - The North cache folder
* @return {void}
*/
constructor(
northId,
logger,
baseFolder,
) {
this.northId = northId
this.logger = logger
this.baseFolder = baseFolder
}

/**
* Create databases and folders
* @returns {Promise<void>} - The result promise
Expand Down
60 changes: 49 additions & 11 deletions src/engine/health-signal.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const { httpSend, createProxyAgent, addAuthenticationToHeaders } = require('../service/utils')

/**
* Class HealthSignal - sends health signal to a remote host or into the logs
*/
Expand All @@ -21,7 +23,7 @@ class HealthSignal {
this.http.proxy = Array.isArray(engineConfig.proxies) ? engineConfig.proxies.find(({ name }) => name === this.http.proxy) : null
this.httpTimer = null
this.loggingTimer = null
this.engineName = engineConfig.engineName
this.oibusName = engineConfig.engineName
}

/**
Expand Down Expand Up @@ -58,17 +60,35 @@ class HealthSignal {
*/
async sendHttpSignal() {
const healthStatus = this.prepareStatus(this.http.verbose)
healthStatus.id = this.engineName
healthStatus.id = this.oibusName
try {
const data = JSON.stringify(healthStatus)
const headers = { 'Content-Type': 'application/json' }
await this.engine.requestService.httpSend(
let proxyAgent
if (this.http.proxy) {
proxyAgent = createProxyAgent(
this.http.proxy.protocol,
this.http.proxy.host,
this.http.proxy.port,
this.http.proxy.username,
await this.engine.encryptionService.decryptText(this.http.proxy.password),
)
}
if (this.http.authentication) {
addAuthenticationToHeaders(
headers,
this.http.authentication.type,
this.http.authentication.key,
await this.engine.encryptionService.decryptText(this.http.authentication.secret),
)
}
await httpSend(
`${this.http.host}${this.http.endpoint}`,
'POST',
this.http.authentication,
this.http.proxy,
data,
headers,
data,
10,
proxyAgent,
)
this.logger.debug('HTTP health signal sent successfully.')
} catch (error) {
Expand Down Expand Up @@ -107,15 +127,33 @@ class HealthSignal {
const stringData = JSON.stringify(data)
if (this.http.enabled) {
this.logger.trace(`Forwarding health signal to "${this.http.host}".`)
const headers = { 'Content-Type': 'application/json' }
this.logger.info(stringData)
await this.engine.requestService.httpSend(
const headers = { 'Content-Type': 'application/json' }
let proxyAgent
if (this.http.proxy) {
proxyAgent = createProxyAgent(
this.http.proxy.protocol,
this.http.proxy.host,
this.http.proxy.port,
this.http.proxy.username,
await this.engine.encryptionService.decryptText(this.http.proxy.password),
)
}
if (this.http.authentication) {
addAuthenticationToHeaders(
headers,
this.http.authentication.type,
this.http.authentication.key,
await this.engine.encryptionService.decryptText(this.http.authentication.secret),
)
}
await httpSend(
`${this.http.host}${this.http.endpoint}`,
'POST',
this.http.authentication,
this.http.proxy,
stringData,
headers,
stringData,
10,
proxyAgent,
)
this.logger.trace(`Health signal successfully forwarded to "${this.http.host}".`)
} else {
Expand Down
Loading

0 comments on commit 57f6389

Please sign in to comment.