diff --git a/.gitignore b/.gitignore index be28f5677b..4719168deb 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,7 @@ node_modules/ dist/ docs/ logs/ -cache/ +tests/cache/ build/ coverage/ oibus-*.json diff --git a/src/engine/Cache.class.js b/src/engine/Cache.class.js deleted file mode 100644 index 195bd649f2..0000000000 --- a/src/engine/Cache.class.js +++ /dev/null @@ -1,566 +0,0 @@ -const fs = require('fs/promises') -const path = require('path') - -const databaseService = require('../services/database.service') -const Queue = require('../services/queue.class') -const ApiHandler = require('../north/ApiHandler.class') - -// Time between two checks of the Archive Folder -const ARCHIVE_TIMEOUT = 3600000 // one hour - -/** - * Local cache implementation to group events and store them when the communication if North is down. - */ -class Cache { - /** - * Constructor for Cache - * The Engine parameters is used for the following parameters - * cacheFolder: Value mode only: will contain all sqllite databases used to cache values - * archiveMode: File mode only: decide if the file is deleted or archived after being sent to the North. - * archiveFolder: in 'archive' mode, specifies where the file is archived. - * @constructor - * @param {BaseEngine} engine - The Engine - * @return {void} - */ - constructor(engine) { - this.engine = engine - this.logger = engine.logger - // get parameters for the cache - const { engineConfig } = engine.configService.getConfig() - const { cacheFolder, archive } = engineConfig.caching - this.archiveMode = archive.enabled - this.archiveFolder = path.resolve(archive.archiveFolder) - this.retentionDuration = (archive.retentionDuration) * 3600000 - // Create cache folder if not exists - this.cacheFolder = path.resolve(cacheFolder) - - // will contain the list of North apis - this.apis = {} - // Queuing - this.sendInProgress = {} - this.resendImmediately = {} - // manage a queue for concurrent request to write to SQL - this.queue = new Queue(engine.logger) - // Cache stats - this.cacheStats = {} - // Errored files/values database path - this.filesErrorDatabasePath = `${this.cacheFolder}/fileCache-error.db` - this.valuesErrorDatabasePath = `${this.cacheFolder}/valueCache-error.db` - - this.archiveTimeout = null - } - - /** - * Initialize an active North. - * @param {Object} activeApi - The North to initialize - * @returns {Promise} - The result - */ - async initializeApi(activeApi) { - const api = { - id: activeApi.application.id, - name: activeApi.application.name, - config: activeApi.application.caching, - canHandleValues: activeApi.canHandleValues, - canHandleFiles: activeApi.canHandleFiles, - subscribedTo: activeApi.application.subscribedTo, - } - // only initialize the db if the api can handle values - if (api.canHandleValues) { - this.logger.debug(`use db: ${this.cacheFolder}/${api.id}.db for ${api.name}`) - api.database = databaseService.createValuesDatabase(`${this.cacheFolder}/${api.id}.db`, {}) - this.logger.debug(`db count: ${databaseService.getCount(api.database)}`) - } - this.apis[api.id] = api - if (api.config?.sendInterval) { - this.resetTimeout(api, api.config.sendInterval) - } else { - this.logger.warn(`Application "${api.name}" has no sendInterval`) - } - } - - stop() { - this.logger.info('Stopping cache timers...') - Object.values(this.apis).forEach((application) => { - if (application.timeout) { - clearTimeout(application.timeout) - } - }) - if (this.archiveTimeout) { - clearTimeout(this.archiveTimeout) - } - } - - /** - * Initialize the cache by creating a database for every North application. - * also initializes an internal object for North applications - * @param {object} activeApis - The active North applications - * @return {void} - */ - async initializeApis(activeApis) { - // initialize the internal object apis with the list of north apis - const actions = Object.values(activeApis).map((activeApi) => this.initializeApi(activeApi)) - await Promise.all(actions) - } - - /** - * Initialize the cache by creating a database files and value errors. - * @return {void} - */ - async initialize() { - try { - await fs.stat(this.cacheFolder) - } catch (error) { - this.logger.info(`Creating cache folder: ${this.cacheFolder}`) - await fs.mkdir(this.cacheFolder, { recursive: true }) - } - - try { - await fs.stat(this.archiveFolder) - } catch (error) { - this.logger.info(`Creating archive folder: ${this.archiveFolder}`) - await fs.mkdir(this.archiveFolder, { recursive: true }) - } - - if (this.archiveMode) { - // refresh the archiveFolder at the beginning only if retentionDuration is different from 0 - if (this.retentionDuration > 0) { - this.refreshArchiveFolder() - } - } - - this.logger.debug(`Cache initialized with cacheFolder:${this.archiveFolder} and archiveFolder: ${this.archiveFolder}`) - this.logger.debug(`Use file dbs: ${this.cacheFolder}/fileCache.db and ${this.filesErrorDatabasePath}`) - this.filesDatabase = databaseService.createFilesDatabase(`${this.cacheFolder}/fileCache.db`) - this.filesErrorDatabase = databaseService.createFilesDatabase(this.filesErrorDatabasePath) - this.valuesErrorDatabase = databaseService.createValueErrorsDatabase(this.valuesErrorDatabasePath) - this.logger.debug(`Files db count: ${databaseService.getCount(this.filesDatabase)}`) - this.logger.debug(`Files error db count: ${databaseService.getCount(this.filesErrorDatabase)}`) - this.logger.debug(`Values error db count: ${databaseService.getCount(this.valuesErrorDatabase)}`) - } - - /** - * Check whether a North is subscribed to a South. if subscribedTo is not defined - * or an empty array, the subscription is true. - * @param {string} id - The data source id - * @param {string[]} subscribedTo - The list of Souths the North is subscribed to - * @returns {boolean} - The North is subscribed to the given South - */ - static isSubscribed(id, subscribedTo) { - if (!Array.isArray(subscribedTo) || subscribedTo.length === 0) return true - return subscribedTo.includes(id) - } - - /** - * Cache a new Value from the South for a given North - * It will store the value in every database. - * to every North application - * @param {Object} api - The North to cache the Value for - * @param {string} id - The data source id - * @param {object} values - values - * @return {object} the api object or null if api should do nothing. - */ - async cacheValuesForApi(api, id, values) { - const { id: applicationId, database, config, canHandleValues, subscribedTo } = api - // save the value in the North's queue if it is subscribed to the dataSource - if (canHandleValues && Cache.isSubscribed(id, subscribedTo)) { - // Update stats for api - this.cacheStats[applicationId] = (this.cacheStats[applicationId] || 0) + values.length - - // Queue saving values. - await this.queue.add(databaseService.saveValues, database, this.engine.activeProtocols[id]?.dataSource.name || id, values) - - // if the group size is over the groupCount => we immediately send the cache - // to the North even if the timeout is not finished. - const count = databaseService.getCount(database) - if (count >= config.groupCount) { - this.logger.trace(`groupCount reached: ${count}>=${config.groupCount}`) - return api - } - } else { - // eslint-disable-next-line max-len - this.logger.trace(`Application "${this.engine.activeApis[applicationId]?.application.name || applicationId}" is not subscribed to datasource "${this.engine.activeProtocols[id]?.dataSource.name || id}"`) - } - return null - } - - /** - * Cache a new Value from the South - * It will store the value in every database. - * to every North application (used for alarm values for example) - * @param {string} id - The data source id - * @param {object} values - The new value - * @return {void} - */ - async cacheValues(id, values) { - try { - // Update stats for datasource id - this.cacheStats[id] = (this.cacheStats[id] || 0) + values.length - - // Cache values - const actions = Object.values(this.apis).map((api) => this.cacheValuesForApi(api, id, values)) - const apisToActivate = await Promise.all(actions) - apisToActivate.forEach((apiToActivate) => { - if (apiToActivate) { - this.sendCallback(apiToActivate) - } - }) - } catch (error) { - this.logger.error(error) - } - } - - /** - * Cache the new raw file for a given North. - * @param {object} api - The North to cache the file for - * @param {string} id - The data source id - * @param {String} cachePath - The path of the raw file - * @param {number} timestamp - The timestamp the file was received - * @return {object} the api object or null if api should do nothing. - */ - async cacheFileForApi(api, id, cachePath, timestamp) { - const { name: applicationName, id: applicationId, canHandleFiles, subscribedTo } = api - if (canHandleFiles && Cache.isSubscribed(id, subscribedTo)) { - // Update stats for api - this.cacheStats[applicationName] = (this.cacheStats[applicationName] || 0) + 1 - - // Cache file - this.logger.debug(`cacheFileForApi() ${cachePath} for api: ${applicationName}`) - databaseService.saveFile(this.filesDatabase, timestamp, applicationId, cachePath) - return api - } - this.logger.trace(`datasource "${this.engine.activeProtocols[id]?.dataSource.name || id}" is not subscribed to application "${applicationName}"`) - return null - } - - /** - * Cache the new raw file. - * @param {string} id - The data source id - * @param {String} filePath - The path of the raw file - * @param {boolean} preserveFiles - Whether to preserve the file at the original location - * @return {void} - */ - async cacheFile(id, filePath, preserveFiles) { - // Update stats for datasource name - this.cacheStats[id] = (this.cacheStats[id] || 0) + 1 - - // Cache files - this.logger.debug(`cacheFile(${filePath}) from "${this.engine.activeProtocols[id]?.dataSource.name || id}", preserveFiles:${preserveFiles}`) - const timestamp = new Date().getTime() - // When compressed file is received the name looks like filename.txt.gz - const filenameInfo = path.parse(filePath) - const cacheFilename = `${filenameInfo.name}-${timestamp}${filenameInfo.ext}` - const cachePath = path.join(this.cacheFolder, cacheFilename) - - try { - // Move or copy the file into the cache folder - await this.transferFile(filePath, cachePath, preserveFiles) - - // Cache the file for every subscribed North - const actions = Object.values(this.apis).map((api) => this.cacheFileForApi(api, id, cachePath, timestamp)) - const apisToActivate = await Promise.all(actions) - // Activate sending - apisToActivate.forEach((apiToActivate) => { - if (apiToActivate) { - this.sendCallback(apiToActivate) - } - }) - } catch (error) { - this.logger.error(error) - } - } - - /** - * Transfer the file into the cache folder. - * @param {string} filePath - The file path - * @param {string} cachePath - The cache path - * @param {boolean} preserveFiles - Whether to preserve the file - * @returns {Promise<*>} - The result promise - */ - async transferFile(filePath, cachePath, preserveFiles) { - this.logger.debug(`transferFile(${filePath}) - preserveFiles:${preserveFiles}, cachePath:${cachePath}`) - - if (preserveFiles) { - await fs.copyFile(filePath, cachePath) - } else { - try { - await fs.rename(filePath, cachePath) - } catch (renameError) { - // In case of cross-device link error we copy+delete instead - if (renameError.code !== 'EXDEV') throw renameError - this.logger.debug('Cross-device link error during rename, copy+paste instead') - await fs.copyFile(filePath, cachePath) - try { - await fs.unlink(filePath) - } catch (unlinkError) { - this.logger.error(unlinkError) - } - } - } - } - - /** - * Callback function used by the timer to send the values to the given North application. - * @param {object} api - The application - * @return {void} - */ - async sendCallback(api) { - const { name, canHandleValues, canHandleFiles, config } = api - let status = ApiHandler.STATUS.SUCCESS - - this.logger.trace(`sendCallback ${name}, sendInProgress ${!!this.sendInProgress[name]}`) - - if (!this.sendInProgress[name]) { - this.sendInProgress[name] = true - this.resendImmediately[name] = false - - if (canHandleValues) { - status = await this.sendCallbackForValues(api) - } - - if (canHandleFiles) { - status = await this.sendCallbackForFiles(api) - } - - const successTimeout = this.resendImmediately[name] ? 0 : config.sendInterval - const timeout = (status === ApiHandler.STATUS.SUCCESS) ? successTimeout : config.retryInterval - this.resetTimeout(api, timeout) - - this.sendInProgress[name] = false - } else { - this.resendImmediately[name] = true - } - } - - /** - * handle the values for the callback - * @param {object} application - The application to send the values to - * @return {ApiHandler.Status} - The callback status - */ - async sendCallbackForValues(application) { - this.logger.trace(`Cache sendCallbackForValues() for ${application.name}`) - const { id, name, database, config } = application - - try { - const values = databaseService.getValuesToSend(database, config.maxSendCount) - let removed - - if (values.length) { - this.logger.trace(`Cache:sendCallbackForValues() got ${values.length} values to send to ${application.name}`) - const successCountStatus = await this.engine.handleValuesFromCache(id, values) - this.logger.trace(`Cache:handleValuesFromCache, successCountStatus: ${successCountStatus}, Application: ${application.name}`) - // If there was a logic error - if (successCountStatus === ApiHandler.STATUS.LOGIC_ERROR) { - // Add errored values into error table - this.logger.trace(`Cache:addErroredValues, add ${values.length} values to error database for ${name}`) - databaseService.saveErroredValues(this.valuesErrorDatabase, id, values) - - // Remove them from the cache table - removed = databaseService.removeSentValues(database, values) - this.logger.trace(`Cache:removeSentValues, removed: ${removed} AppId: ${application.name}`) - if (removed !== values.length) { - this.logger.debug(`Cache for ${name} can't be deleted: ${removed}/${values.length}`) - } - } - // If some values were successfully sent - if (successCountStatus > 0) { - const valuesSent = values.slice(0, successCountStatus) - removed = databaseService.removeSentValues(database, valuesSent) - this.logger.trace(`Cache:removeSentValues, removed: ${removed} AppId: ${application.name}`) - if (removed !== valuesSent.length) { - this.logger.debug(`Cache for ${name} can't be deleted: ${removed}/${valuesSent.length}`) - } - } - } else { - this.logger.trace(`no values in the db for ${name}`) - } - return ApiHandler.STATUS.SUCCESS - } catch (error) { - this.logger.error(error) - return ApiHandler.STATUS.COMMUNICATION_ERROR - } - } - - /** - * Handle files resending. - * @param {object} application - The application to send the values to - * @return {ApiHandler.Status} - The callback status - */ - async sendCallbackForFiles(application) { - const { id, name } = application - this.logger.trace(`sendCallbackForFiles() for ${name}`) - - try { - const fileToSend = databaseService.getFileToSend(this.filesDatabase, id) - - if (!fileToSend) { - this.logger.trace('sendCallbackForFiles(): no file to send') - return ApiHandler.STATUS.SUCCESS - } - - this.logger.trace(`sendCallbackForFiles() file:${fileToSend.path}`) - - try { - await fs.stat(fileToSend.path) - } catch (error) { - // file in cache does not exist on filesystem - databaseService.deleteSentFile(this.filesDatabase, id, fileToSend.path) - this.logger.error(new Error(`${fileToSend.path} not found! Removing it from db.`)) - return ApiHandler.STATUS.SUCCESS - } - this.logger.trace(`sendCallbackForFiles(${fileToSend.path}) call sendFile() ${name}`) - const status = await this.engine.sendFile(id, fileToSend.path) - switch (status) { - case ApiHandler.STATUS.SUCCESS: - this.logger.trace(`sendCallbackForFiles(${fileToSend.path}) deleteSentFile for ${name}`) - databaseService.deleteSentFile(this.filesDatabase, id, fileToSend.path) - await this.handleSentFile(fileToSend.path) - break - case ApiHandler.STATUS.LOGIC_ERROR: - this.logger.error(`sendCallbackForFiles(${fileToSend.path}) move to error database for ${name}`) - databaseService.saveFile(this.filesErrorDatabase, fileToSend.timestamp, id, fileToSend.path) - - this.logger.trace(`sendCallbackForFiles(${fileToSend.path}) deleteSentFile for ${name}`) - databaseService.deleteSentFile(this.filesDatabase, id, fileToSend.path) - break - default: - break - } - return status - } catch (error) { - this.logger.error(error) - return ApiHandler.STATUS.COMMUNICATION_ERROR - } - } - - /** - * Remove file if it was sent to all North. - * @param {string} filePath - The file - * @return {void} - */ - async handleSentFile(filePath) { - this.logger.trace(`handleSentFile(${filePath})`) - const count = databaseService.getFileCount(this.filesDatabase, filePath) - if (count === 0) { - if (this.archiveMode) { - const archivedFilename = path.basename(filePath) - const archivePath = path.join(this.archiveFolder, archivedFilename) - // Move original file into the archive folder - try { - await fs.rename(filePath, archivePath) - this.logger.info(`File ${filePath} moved to ${archivePath}`) - } catch (renameError) { - this.logger.error(renameError) - } - } else { - // Delete original file - try { - await fs.unlink(filePath) - this.logger.info(`File ${filePath} deleted`) - } catch (unlinkError) { - this.logger.error(unlinkError) - } - } - } - } - - /** - * Delete files in archiveFolder if they are older thant the retention time. - * @return {void} - */ - async refreshArchiveFolder() { - this.logger.debug('Parse archive folder to empty old files') - // if a process already occurs, it clears it - if (this.archiveTimeout) { - clearTimeout(this.archiveTimeout) - } - - const files = await fs.readdir(this.archiveFolder) - const timestamp = new Date().getTime() - if (files.length > 0) { - // eslint-disable-next-line no-restricted-syntax - for (const file of files) { - // eslint-disable-next-line no-await-in-loop - const stats = await fs.stat(path.join(this.archiveFolder, file)) - if (stats.mtimeMs + this.retentionDuration < timestamp) { - try { - // eslint-disable-next-line no-await-in-loop - await fs.unlink(path.join(this.archiveFolder, file)) - this.logger.debug(`File ${path.join(this.archiveFolder, file)} removed from archive`) - } catch (unlinkError) { - this.logger.error(unlinkError) - } - } - } - } else { - this.logger.debug(`The archive folder ${this.archiveFolder} is empty. Nothing to delete`) - } - this.archiveTimeout = setTimeout(() => { - this.refreshArchiveFolder() - }, ARCHIVE_TIMEOUT) - } - - /** - * Reset application timer. - * @param {object} application - The application - * @param {number} timeout - The timeout interval - * @return {void} - */ - resetTimeout(application, timeout) { - if (application.timeout) { - clearTimeout(application.timeout) - } - application.timeout = setTimeout(this.sendCallback.bind(this, application), timeout) - } - - /** - * Generate cache stat for APIs - * @param {string[]} apiNames - The North/South list - * @param {number[]} totalCounts - The total count list - * @param {number[]} cacheSizes - The cache size list - * @returns {*} - The stats - */ - /* eslint-disable-next-line class-methods-use-this */ - generateApiCacheStat(apiNames, totalCounts, cacheSizes, mode) { - return apiNames.map((api, i) => ({ - name: `${api.name} (${mode})`, - count: totalCounts[i] || 0, - cache: cacheSizes[i] || 0, - })) - } - - /** - * Get cache stats for APIs - * @returns {object} - Cache stats - */ - async getCacheStatsForApis() { - // Get points APIs stats - const pointApis = Object.values(this.apis).filter((api) => api.canHandleValues) - const valuesTotalCounts = pointApis.map((api) => this.cacheStats[api.name]) - const valuesCacheSizeActions = pointApis.map((api) => databaseService.getCount(api.database)) - const valuesCacheSizes = await Promise.all(valuesCacheSizeActions) - const pointApisStats = this.generateApiCacheStat(pointApis, valuesTotalCounts, valuesCacheSizes, 'points') - - // Get file APIs stats - const fileApis = Object.values(this.apis).filter((api) => api.canHandleFiles) - const filesTotalCounts = fileApis.map((api) => this.cacheStats[api.name]) - const filesCacheSizes = fileApis.map((api) => databaseService.getFileCountForNorthConnector(this.filesDatabase, api.name)) - const fileApisStats = this.generateApiCacheStat(fileApis, filesTotalCounts, filesCacheSizes, 'files') - - // Merge results - return [...pointApisStats, ...fileApisStats] - } - - /** - * Get cache stats for Protocols - * @returns {object} - Cache stats - */ - async getCacheStatsForProtocols() { - const protocols = this.engine.getActiveProtocols() - return protocols.map((protocol) => ({ - name: protocol, - count: this.cacheStats[protocol] || 0, - })) - } -} - -module.exports = Cache diff --git a/src/engine/OIBusEngine.class.js b/src/engine/OIBusEngine.class.js index 46efb01e38..76a21709a3 100644 --- a/src/engine/OIBusEngine.class.js +++ b/src/engine/OIBusEngine.class.js @@ -7,7 +7,8 @@ const humanizeDuration = require('humanize-duration') // Engine classes const BaseEngine = require('./BaseEngine.class') const HealthSignal = require('./HealthSignal.class') -const Cache = require('./Cache.class') +const MainCache = require('./cache/MainCache.class') +const Queue = require('../services/queue.class') /** * @@ -38,6 +39,9 @@ class OIBusEngine extends BaseEngine { this.addValuesCount = 0 this.addFileCount = 0 this.forwardedHealthSignalMessages = 0 + + // manage a queue for concurrent request to write to SQL + this.queue = new Queue(this.logger) } /** @@ -58,10 +62,6 @@ class OIBusEngine extends BaseEngine { this.engineName = engineConfig.engineName - // Configure the Cache - this.cache = new Cache(this) - this.cache.initialize() - this.logger.info(`Starting Engine ${this.version} architecture: ${process.arch} This platform is ${process.platform} @@ -81,7 +81,12 @@ class OIBusEngine extends BaseEngine { */ async addValues(id, values) { this.logger.trace(`Engine: Add ${values?.length} values from "${this.activeProtocols[id]?.dataSource.name || id}"`) - if (values.length) await this.cache.cacheValues(id, values) + Object.values(this.activeApis) + .forEach((api) => { + if (api.canHandleValues && api.isSubscribed(id)) { + api.cacheValues(id, values) + } + }) } /** @@ -92,47 +97,28 @@ class OIBusEngine extends BaseEngine { * @param {boolean} preserveFiles - Whether to preserve the file at the original location * @return {void} */ - addFile(id, filePath, preserveFiles) { - this.logger.trace(`Engine addFile() from "${this.activeProtocols[id]?.dataSource.name || id}" with ${filePath}`) - this.cache.cacheFile(id, filePath, preserveFiles) - } + async addFile(id, filePath, preserveFiles) { + this.logger.debug(`Engine addFile(${filePath}) from "${this.activeProtocols[id]?.dataSource.name || id}", preserveFiles:${preserveFiles}`) - /** - * Send values to a North application. - * @param {string} id - The application id - * @param {object[]} values - The values to send - * @return {Promise} - The sent status - */ - async handleValuesFromCache(id, values) { - this.logger.trace(`handleValuesFromCache() call with "${this.activeApis[id]?.application.name || id}" and ${values.length} values`) + const timestamp = new Date().getTime() + // When compressed file is received the name looks like filename.txt.gz + const filenameInfo = path.parse(filePath) + const cacheFilename = `${filenameInfo.name}-${timestamp}${filenameInfo.ext}` + const cachePath = path.join(this.getCacheFolder(), cacheFilename) - let status try { - status = await this.activeApis[id].handleValues(values) - } catch (error) { - status = error - } + // Move or copy the file into the cache folder + await MainCache.transferFile(this.logger, filePath, cachePath, preserveFiles) - return status - } - - /** - * Send file to a North application. - * @param {string} id - The application id - * @param {string} filePath - The file to send - * @return {Promise} - The sent status - */ - async sendFile(id, filePath) { - this.logger.trace(`Engine sendFile() call with "${this.activeApis[id]?.application.name || id}" and ${filePath}`) - - let status - try { - status = await this.activeApis[id].handleFile(filePath) + Object.values(this.activeApis) + .forEach((api) => { + if (api.canHandleFiles && api.isSubscribed(id)) { + api.cacheFile(cachePath, timestamp) + } + }) } catch (error) { - status = error + this.logger.error(error) } - - return status } /** @@ -212,9 +198,6 @@ class OIBusEngine extends BaseEngine { } } - // 4. Initiate the cache for every North - await this.cache.initializeApis(this.activeApis) - // 5. Initialize scan lists // Associate the scanMode to all corresponding data sources @@ -316,6 +299,13 @@ class OIBusEngine extends BaseEngine { } this.activeProtocols = {} + // Log cache data + const apisCacheStats = this.getCacheStatsForApis() + this.logger.info(`API stats: ${JSON.stringify(apisCacheStats)}`) + + const protocolsCacheStats = this.getCacheStatsForProtocols() + this.logger.info(`Protocol stats: ${JSON.stringify(protocolsCacheStats)}`) + // Stop Applications // eslint-disable-next-line no-restricted-syntax for (const application of Object.values(this.activeApis)) { @@ -325,17 +315,6 @@ class OIBusEngine extends BaseEngine { } this.activeApis = {} - // Log cache data - const apisCacheStats = await this.cache.getCacheStatsForApis() - this.logger.info(`API stats: ${JSON.stringify(apisCacheStats)}`) - - const protocolsCacheStats = await this.cache.getCacheStatsForProtocols() - this.logger.info(`Protocol stats: ${JSON.stringify(protocolsCacheStats)}`) - - // stop the cache timers - this.cache.stop() - this.cache = null - // Stop the listener this.eventEmitters['/engine/sse']?.events?.removeAllListeners() this.eventEmitters['/engine/sse']?.stream?.destroy() @@ -421,6 +400,27 @@ class OIBusEngine extends BaseEngine { }, {}) } + getCacheStatsForApis() { + // Get points APIs stats + const pointApis = Object.values(this.activeApis).filter((api) => api.canHandleValues) + const pointApisStats = pointApis.map((api) => api.getValueCacheStats()) + + // Get files APIs stats + const fileApis = Object.values(this.activeApis).filter((api) => api.canHandleFiles) + const fileApisStats = fileApis.map((api) => api.getFileCacheStats()) + + // Merge results + return [...pointApisStats, ...fileApisStats] + } + + getCacheStatsForProtocols() { + const pointProtocols = Object.values(this.activeProtocols).filter((protocol) => protocol.handlesPoints) + return pointProtocols.map((protocol) => ({ + name: protocol.dataSource.name, + count: protocol.addPointsCount || 0, + })) + } + /** * Get status information. * @returns {Object} - The status information diff --git a/src/engine/OIBusEngine.class.spec.js b/src/engine/OIBusEngine.class.spec.js index e82d455743..a2a1714838 100644 --- a/src/engine/OIBusEngine.class.spec.js +++ b/src/engine/OIBusEngine.class.spec.js @@ -3,8 +3,6 @@ const EncryptionService = require('../services/EncryptionService.class') const config = require('../config/defaultConfig.json') const ConfigService = require('../services/config.service.class') -jest.mock('./Cache.class') - // Mock EncryptionService EncryptionService.getInstance = () => ({ decryptText: (password) => password, @@ -87,9 +85,35 @@ describe('Engine', () => { data: { value: '' }, }] - engine.cache.cacheValues = jest.fn() + EncryptionService.getInstance = () => ({ + decryptText: (password) => password, + setKeyFolder: () => { + }, + checkOrCreatePrivateKey: () => { + }, + }) + + const cacheValues = jest.fn() + engine.activeApis = { + id1: { + canHandleValues: true, + isSubscribed: () => true, + cacheValues, + }, + id2: { + canHandleValues: true, + isSubscribed: () => false, + cacheValues, + }, + id3: { + canHandleValues: false, + isSubscribed: () => true, + cacheValues, + }, + } + await engine.addValues('sourceId', sampleValues) - expect(engine.cache.cacheValues) + expect(cacheValues) .toBeCalledWith('sourceId', sampleValues) }) }) diff --git a/src/engine/cache/FileCache.class.js b/src/engine/cache/FileCache.class.js new file mode 100644 index 0000000000..bae3276f9a --- /dev/null +++ b/src/engine/cache/FileCache.class.js @@ -0,0 +1,263 @@ +const fs = require('fs/promises') +const path = require('path') + +const databaseService = require('../../services/database.service') +const MainCache = require('./MainCache.class') + +// Time between two checks of the Archive Folder +const ARCHIVE_TIMEOUT = 3600000 // one hour + +/** + * Local cache implementation to group events and store them when the communication with the North is down. + */ +class FileCache extends MainCache { + constructor(api, engineCacheConfig) { + super(api) + + const { + cacheFolder, + archive, + } = engineCacheConfig + this.cacheFolder = path.resolve(cacheFolder) + this.archiveMode = archive.enabled + this.archiveFolder = path.resolve(archive.archiveFolder) + this.retentionDuration = (archive.retentionDuration) * 3600000 + this.filesErrorDatabase = null + } + + /** + * Initialize the value cache. + * @returns {Promise} - The result + */ + async initialize() { + try { + await fs.stat(this.cacheFolder) + } catch (error) { + this.logger.info(`Creating cache folder: ${this.cacheFolder}`) + await fs.mkdir(this.cacheFolder, { recursive: true }) + } + try { + await fs.stat(this.archiveFolder) + } catch (error) { + this.logger.info(`Creating archive folder: ${this.archiveFolder}`) + await fs.mkdir(this.archiveFolder, { recursive: true }) + } + + this.database = MainCache.getFilesDatabaseInstance(this.logger, this.cacheFolder) + this.filesErrorDatabase = MainCache.getFilesErrorDatabaseInstance(this.logger, this.cacheFolder) + + if (this.archiveMode) { + // refresh the archiveFolder at the beginning only if retentionDuration is different from 0 + if (this.retentionDuration > 0) { + this.refreshArchiveFolder() + } + } + + if (this.apiCacheConfig?.sendInterval) { + this.resetTimeout(this.apiCacheConfig.sendInterval) + } else { + this.logger.warn(`Application "${this.api.name}" has no sendInterval`) + } + } + + /** + * Cache a new Value from the South + * @param {String} cachePath - The path of the raw file + * @param {number} timestamp - The timestamp the file was received + * @returns {Promise} - The result + */ + async cacheFile(cachePath, timestamp) { + // Update stat + this.cacheStat = (this.cacheStat || 0) + 1 + + this.logger.debug(`cacheFileForApi() ${cachePath} for api: ${this.api.application.name}`) + databaseService.saveFile(this.database, timestamp, this.api.application.id, cachePath) + + await this.sendCallback() + } + + /** + * Callback function used by the timer to send the values to the North application. + * @returns {Promise} - The result + */ + async sendCallback() { + this.logger.trace(`sendCallback ${this.api.application.name}, sendInProgress ${!!this.sendInProgress}`) + + if (!this.sendInProgress) { + this.sendInProgress = true + this.resendImmediately = false + + const status = await this.sendFile() + + const successTimeout = this.resendImmediately ? 0 : this.apiCacheConfig.sendInterval + const timeout = (status === MainCache.STATUS.SUCCESS) ? successTimeout : this.apiCacheConfig.retryInterval + this.resetTimeout(timeout) + + this.sendInProgress = false + } else { + this.resendImmediately = true + } + } + + async sendFile() { + const { id, name } = this.api.application + this.logger.trace(`sendFile() for ${name}`) + + try { + const fileToSend = databaseService.getFileToSend(this.database, id) + + if (!fileToSend) { + this.logger.trace('sendFile(): no file to send') + return MainCache.STATUS.SUCCESS + } + + this.logger.trace(`sendFile() file:${fileToSend.path}`) + + try { + await fs.stat(fileToSend.path) + } catch (error) { + // file in cache does not exist on filesystem + databaseService.deleteSentFile(this.database, id, fileToSend.path) + this.logger.error(new Error(`${fileToSend.path} not found! Removing it from db.`)) + return MainCache.STATUS.SUCCESS + } + this.logger.trace(`sendFile(${fileToSend.path}) call sendFile() ${name}`) + const status = await this.api.handleFile(fileToSend.path) + switch (status) { + case MainCache.STATUS.SUCCESS: + this.logger.trace(`sendFile(${fileToSend.path}) deleteSentFile for ${name}`) + databaseService.deleteSentFile(this.database, id, fileToSend.path) + await this.handleSentFile(fileToSend.path) + break + case MainCache.STATUS.LOGIC_ERROR: + this.logger.error(`sendFile(${fileToSend.path}) move to error database for ${name}`) + databaseService.saveFile(this.filesErrorDatabase, fileToSend.timestamp, id, fileToSend.path) + + this.logger.trace(`sendFile(${fileToSend.path}) deleteSentFile for ${name}`) + databaseService.deleteSentFile(this.database, id, fileToSend.path) + break + default: + break + } + return status + } catch (error) { + this.logger.error(error) + return MainCache.STATUS.COMMUNICATION_ERROR + } + } + + /** + * Remove file if it was sent to all North. + * @param {string} filePath - The file + * @return {void} + */ + async handleSentFile(filePath) { + this.logger.trace(`handleSentFile(${filePath})`) + const count = databaseService.getFileCount(this.database, filePath) + if (count === 0) { + if (this.archiveMode) { + const archivedFilename = path.basename(filePath) + const archivePath = path.join(this.archiveFolder, archivedFilename) + // Move original file into the archive folder + try { + await fs.rename(filePath, archivePath) + this.logger.info(`File ${filePath} moved to ${archivePath}`) + } catch (renameError) { + this.logger.error(renameError) + } + } else { + // Delete original file + try { + await fs.unlink(filePath) + this.logger.info(`File ${filePath} deleted`) + } catch (unlinkError) { + this.logger.error(unlinkError) + } + } + } + } + + /** + * Delete files in archiveFolder if they are older thant the retention time. + * @return {void} + */ + async refreshArchiveFolder() { + this.logger.debug('Parse archive folder to empty old files') + // if a process already occurs, it clears it + if (this.archiveTimeout) { + clearTimeout(this.archiveTimeout) + } + + const files = await fs.readdir(this.archiveFolder) + const timestamp = new Date().getTime() + if (files.length > 0) { + // eslint-disable-next-line no-restricted-syntax + for (const file of files) { + // eslint-disable-next-line no-await-in-loop + const stats = await fs.stat(path.join(this.archiveFolder, file)) + if (stats.mtimeMs + this.retentionDuration < timestamp) { + try { + // eslint-disable-next-line no-await-in-loop + await fs.unlink(path.join(this.archiveFolder, file)) + this.logger.debug(`File ${path.join(this.archiveFolder, file)} removed from archive`) + } catch (unlinkError) { + this.logger.error(unlinkError) + } + } + } + } else { + this.logger.debug(`The archive folder ${this.archiveFolder} is empty. Nothing to delete`) + } + this.archiveTimeout = setTimeout(() => { + this.refreshArchiveFolder() + }, ARCHIVE_TIMEOUT) + } + + getStats(apiName) { + const cacheSize = databaseService.getFileCountForNorthConnector(this.database, apiName) + return { + name: `${this.api.application.name} (files)`, + count: this.cacheStat || 0, + cache: cacheSize || 0, + } + } + + /** + * Reset timer. + * @param {number} timeout - The timeout to wait + * @return {void} + */ + resetTimeout(timeout) { + if (this.timeout) { + clearTimeout(this.timeout) + } + this.timeout = setTimeout(this.sendCallback.bind(this), timeout) + } + + /** + * Stop the cache. + * @return {void} + */ + stop() { + if (this.timeout) { + clearTimeout(this.timeout) + } + } + + /** + * Initialize and return the file cache singleton + * @param {ApiHandler} api - The api + * @param {object} engineCacheConfig - The engine cache config + * @return {FileCache} - The file cache + */ + static async getFileCacheInstance(api, engineCacheConfig) { + if (!FileCache.fileCache) { + FileCache.fileCache = new FileCache(api, engineCacheConfig) + await FileCache.fileCache.initialize() + } + + return FileCache.fileCache + } +} + +module.exports = FileCache diff --git a/src/engine/cache/MainCache.class.js b/src/engine/cache/MainCache.class.js new file mode 100644 index 0000000000..cbe314b2f4 --- /dev/null +++ b/src/engine/cache/MainCache.class.js @@ -0,0 +1,111 @@ +const fs = require('fs/promises') +const databaseService = require('../../services/database.service') + +class MainCache { + static STATUS = { + SUCCESS: 0, + LOGIC_ERROR: -1, + COMMUNICATION_ERROR: -2, + } + + static valuesErrorDatabase = null + + static filesErrorDatabase = null + + constructor(api) { + this.api = api + this.logger = api.logger + this.apiCacheConfig = api.application.caching + this.database = null + this.timeout = null + this.sendInProgress = false + this.resendImmediately = false + this.cacheStat = 0 + } + + /** + * Initialize and return the value error database singleton + * @param {Logger} logger - The logger + * @param {string} cacheFolder - The cache folder + * @return {object} - The value error database + */ + static getValuesErrorDatabaseInstance(logger, cacheFolder) { + if (!MainCache.valuesErrorDatabase) { + const valuesErrorDatabasePath = `${cacheFolder}/valueCache-error.db` + logger.debug(`Initialize values error db: ${valuesErrorDatabasePath}`) + MainCache.valuesErrorDatabase = databaseService.createValueErrorsDatabase(valuesErrorDatabasePath) + logger.debug(`Values error db count: ${databaseService.getCount(MainCache.valuesErrorDatabase)}`) + } + + return MainCache.valuesErrorDatabase + } + + /** + * Initialize and return the file error database singleton + * @param {Logger} logger - The logger + * @param {string} cacheFolder - The cache folder + * @return {object} - The file error database + */ + static getFilesErrorDatabaseInstance(logger, cacheFolder) { + if (!MainCache.filesErrorDatabase) { + const filesErrorDatabasePath = `${cacheFolder}/fileCache-error.db` + logger.debug(`Initialize files error db: ${filesErrorDatabasePath}`) + MainCache.filesErrorDatabase = databaseService.createFilesDatabase(filesErrorDatabasePath) + logger.debug(`Files error db count: ${databaseService.getCount(MainCache.filesErrorDatabase)}`) + } + + return MainCache.filesErrorDatabase + } + + /** + * Initialize and return the file database singleton + * @param {Logger} logger - The logger + * @param {string} cacheFolder - The cache folder + * @return {object} - The file database + */ + static getFilesDatabaseInstance(logger, cacheFolder) { + if (!MainCache.filesDatabase) { + const filesDatabasePath = `${cacheFolder}/fileCache.db` + logger.debug(`Initialize files db: ${filesDatabasePath}`) + MainCache.filesDatabase = databaseService.createFilesDatabase(filesDatabasePath) + logger.debug(`Files db count: ${databaseService.getCount(MainCache.filesDatabase)}`) + } + + return MainCache.filesDatabase + } + + /** + * Transfer the file into the cache folder. + * + * @param {Logger} logger - The logger + * @param {string} filePath - The file path + * @param {string} cachePath - The cache path + * @param {boolean} preserveFiles - Whether to preserve the file + * @returns {Promise<*>} - The result promise + */ + static async transferFile(logger, filePath, cachePath, preserveFiles) { + logger.debug(`transferFile(${filePath}) - preserveFiles:${preserveFiles}, cachePath:${cachePath}`) + + if (preserveFiles) { + await fs.copyFile(filePath, cachePath) + } else { + try { + await fs.rename(filePath, cachePath) + } catch (renameError) { + // In case of cross-device link error we copy+delete instead + if (renameError.code !== 'EXDEV') { + throw renameError + } + logger.debug('Cross-device link error during rename, copy+paste instead') + await fs.copyFile(filePath, cachePath) + try { + await fs.unlink(filePath) + } catch (unlinkError) { + logger.error(unlinkError) + } + } + } + } +} + +module.exports = MainCache diff --git a/src/engine/cache/ValueCache.class.js b/src/engine/cache/ValueCache.class.js new file mode 100644 index 0000000000..25888f57fc --- /dev/null +++ b/src/engine/cache/ValueCache.class.js @@ -0,0 +1,175 @@ +const fs = require('fs/promises') +const path = require('path') + +const databaseService = require('../../services/database.service') +const MainCache = require('./MainCache.class') + +/** + * Local cache implementation to group events and store them when the communication with the North is down. + */ +class ValueCache extends MainCache { + constructor(api, queue, engineCacheConfig) { + super(api) + + this.queue = queue + const cacheFolderPath = path.resolve(engineCacheConfig.cacheFolder) + this.cacheFolder = cacheFolderPath + this.databasePath = `${cacheFolderPath}/${api.application.id}.db` + this.valuesErrorDatabase = null + } + + /** + * Initialize the value cache. + * @returns {Promise} - The result + */ + async initialize() { + try { + await fs.stat(this.cacheFolder) + } catch (error) { + this.logger.info(`Creating cache folder: ${this.cacheFolder}`) + await fs.mkdir(this.cacheFolder, { recursive: true }) + } + + this.valuesErrorDatabase = MainCache.getValuesErrorDatabaseInstance(this.logger, this.cacheFolder) + + this.logger.debug(`Use db: ${this.databasePath} for ${this.api.application.name}`) + this.database = databaseService.createValuesDatabase(this.databasePath, {}) + this.logger.debug(`Value db count: ${databaseService.getCount(this.database)}`) + + if (this.apiCacheConfig?.sendInterval) { + this.resetTimeout(this.apiCacheConfig.sendInterval) + } else { + this.logger.warn(`Application "${this.api.name}" has no sendInterval`) + } + } + + /** + * Cache a new Value from the South + * @param {string} id - The data source id + * @param {object} values - values + * @returns {Promise} - The result + */ + async cacheValues(id, values) { + // Update stat + this.cacheStat = (this.cacheStat || 0) + values.length + + await this.queue.add(databaseService.saveValues, this.database, this.api.engine.activeProtocols[id]?.dataSource.name || id, values) + // If the group size is over the groupCount => we immediately send the cache + // to the North even if the timeout is not finished. + const count = databaseService.getCount(this.database) + if (count >= this.apiCacheConfig.groupCount) { + this.logger.trace(`groupCount reached: ${count} >= ${this.apiCacheConfig.groupCount}`) + await this.sendCallback() + } + } + + /** + * Callback function used by the timer to send the values to the North application. + * @returns {Promise} - The result + */ + async sendCallback() { + this.logger.trace(`sendCallback ${this.api.application.name}, sendInProgress ${!!this.sendInProgress}`) + + if (!this.sendInProgress) { + this.sendInProgress = true + this.resendImmediately = false + + const status = await this.sendValues() + + const successTimeout = this.resendImmediately ? 0 : this.apiCacheConfig.sendInterval + const timeout = (status === MainCache.STATUS.SUCCESS) ? successTimeout : this.apiCacheConfig.retryInterval + this.resetTimeout(timeout) + + this.sendInProgress = false + } else { + this.resendImmediately = true + } + } + + /** + * Send values. + * @return {ApiHandler.Status} - The callback status + */ + async sendValues() { + const { id, name } = this.api.application + this.logger.trace(`Cache sendCallbackForValues() for ${name}`) + + try { + const values = databaseService.getValuesToSend(this.database, this.apiCacheConfig.maxSendCount) + let removed = null + + if (values.length) { + this.logger.trace(`Cache:sendCallbackForValues() got ${values.length} values to send to ${name}`) + let successCountStatus + try { + successCountStatus = await this.api.handleValues(values) + } catch (error) { + successCountStatus = error + } + this.logger.trace(`Cache:handleValuesFromCache, successCountStatus: ${successCountStatus}, Application: ${name}`) + + // If there was a logic error + if (successCountStatus === MainCache.STATUS.LOGIC_ERROR) { + // Add errored values into error table + this.logger.trace(`Cache:addErroredValues, add ${values.length} values to error database for ${name}`) + await this.queue.add(databaseService.saveErroredValues, this.valuesErrorDatabase, id, values) + + // Remove them from the cache table + removed = databaseService.removeSentValues(this.database, values) + this.logger.trace(`Cache:removeSentValues, removed: ${removed} AppId: ${name}`) + if (removed !== values.length) { + this.logger.debug(`Cache for ${name} can't be deleted: ${removed}/${values.length}`) + } + } + // If some values were successfully sent + if (successCountStatus > 0) { + const valuesSent = values.slice(0, successCountStatus) + removed = databaseService.removeSentValues(this.database, valuesSent) + this.logger.trace(`Cache:removeSentValues, removed: ${removed} AppId: ${name}`) + if (removed !== valuesSent.length) { + this.logger.debug(`Cache for ${name} can't be deleted: ${removed}/${valuesSent.length}`) + } + } + } else { + this.logger.trace(`no values in the db for ${name}`) + } + return MainCache.STATUS.SUCCESS + } catch (error) { + this.logger.error(error) + return MainCache.STATUS.COMMUNICATION_ERROR + } + } + + getStats() { + const cacheSize = databaseService.getCount(this.database) + return { + name: `${this.api.application.name} (points)`, + count: this.cacheStat || 0, + cache: cacheSize || 0, + } + } + + /** + * Reset timer. + * @param {number} timeout - The timeout to wait + * @return {void} + */ + resetTimeout(timeout) { + if (this.timeout) { + clearTimeout(this.timeout) + } + this.timeout = setTimeout(this.sendCallback.bind(this), timeout) + } + + /** + * Stop the cache. + * @return {void} + */ + stop() { + if (this.timeout) { + clearTimeout(this.timeout) + } + } +} + +module.exports = ValueCache diff --git a/src/north/AmazonS3/AmazonS3.class.spec.js b/src/north/AmazonS3/AmazonS3.class.spec.js index 82f2effc83..cf1408f1af 100644 --- a/src/north/AmazonS3/AmazonS3.class.spec.js +++ b/src/north/AmazonS3/AmazonS3.class.spec.js @@ -9,8 +9,12 @@ const { defaultConfig: config } = require('../../../tests/testConfig') const EncryptionService = require('../../services/EncryptionService.class') // Mock database service -jest.mock('../../services/database.service', () => { -}) +jest.mock('../../services/database.service', () => ({ + createValueErrorsDatabase: jest.fn(), + getCount: jest.fn(), + createValuesDatabase: jest.fn(), + createFilesDatabase: jest.fn(), +})) // Mock logger jest.mock('../../engine/logger/Logger.class') diff --git a/src/north/ApiHandler.class.js b/src/north/ApiHandler.class.js index e0014c53a3..e8d92fd4b3 100644 --- a/src/north/ApiHandler.class.js +++ b/src/north/ApiHandler.class.js @@ -3,6 +3,8 @@ const EventEmitter = require('events') const EncryptionService = require('../services/EncryptionService.class') const Logger = require('../engine/logger/Logger.class') const CertificateService = require('../services/CertificateService.class') +const ValueCache = require('../engine/cache/ValueCache.class') +const FileCache = require('../engine/cache/FileCache.class') class ApiHandler { static STATUS = { @@ -50,6 +52,8 @@ class ApiHandler { this.keyFile = null this.certFile = null this.caFile = null + this.valueCache = null + this.fileCache = null } async init() { @@ -61,6 +65,11 @@ class ApiHandler { this.certificate = new CertificateService(this.logger) await this.certificate.init(this.keyFile, this.certFile, this.caFile) this.initializeStatusData() + + const { engineConfig } = this.engine.configService.getConfig() + this.valueCache = new ValueCache(this, this.engine.queue, engineConfig.caching) + await this.valueCache.initialize() + this.fileCache = await FileCache.getFileCacheInstance(this, engineConfig.caching) } /** @@ -69,7 +78,7 @@ class ApiHandler { * @param {string} additionalInfo - connection information to display in the logger * @return {void} */ - connect(additionalInfo) { + async connect(additionalInfo) { const { name, api } = this.application this.connected = true this.updateStatusDataStream({ 'Connected at': new Date().toISOString() }) @@ -101,6 +110,7 @@ class ApiHandler { * @return {void} */ async disconnect() { + this.valueCache.stop() this.connected = false const { name, id } = this.application this.updateStatusDataStream({ 'Connected at': 'Not connected' }) @@ -126,6 +136,29 @@ class ApiHandler { this.engine.eventEmitters[`/north/${this.application.id}/sse`]?.events?.emit('data', this.statusData) } + /** + * Method called by the Engine to cache an array of values in order to cache them + * and send them to a third party application. + * @param {string} id - The data source id + * @param {object[]} values - The values to handle + * @return {Promise} - The result + */ + async cacheValues(id, values) { + if (values.length) { + await this.valueCache.cacheValues(id, values) + } + } + + /** + * Method called by the Engine to cache a file. + * @param {String} filePath - The path of the raw file + * @param {number} timestamp - The timestamp the file was received + * @return {Promise} - The result + */ + async cacheFile(filePath, timestamp) { + await this.fileCache.cacheFile(filePath, timestamp) + } + /** * Method called by the Engine to handle an array of values in order for example * to send them to a third party application. @@ -181,6 +214,25 @@ class ApiHandler { const headers = { 'Content-Type': 'application/json' } return this.engine.requestService.httpSend(this.valuesUrl, 'POST', this.authentication, this.proxy, data, headers) } + + /** + * Check whether the North is subscribed to a South. + * If subscribedTo is not defined or an empty array, the subscription is true. + * @param {string} id - The data source id + * @returns {boolean} - The North is subscribed to the given South + */ + isSubscribed(id) { + if (!Array.isArray(this.application.subscribedTo) || this.application.subscribedTo.length === 0) return true + return this.application.subscribedTo.includes(id) + } + + getValueCacheStats() { + return this.valueCache.getStats() + } + + getFileCacheStats() { + return this.fileCache.getStats(this.application.name) + } } module.exports = ApiHandler diff --git a/src/north/Console/Console.class.spec.js b/src/north/Console/Console.class.spec.js index 1d80bb41cc..abc26f39e5 100644 --- a/src/north/Console/Console.class.spec.js +++ b/src/north/Console/Console.class.spec.js @@ -7,7 +7,12 @@ jest.spyOn(global.console, 'table').mockImplementation(() => {}) jest.spyOn(process.stdout, 'write').mockImplementation(() => {}) // Mock database service -jest.mock('../../services/database.service', () => {}) +jest.mock('../../services/database.service', () => ({ + createValueErrorsDatabase: jest.fn(), + getCount: jest.fn(), + createValuesDatabase: jest.fn(), + createFilesDatabase: jest.fn(), +})) // Mock logger jest.mock('../../engine/logger/Logger.class') diff --git a/src/north/FileWriter/FileWriter.class.spec.js b/src/north/FileWriter/FileWriter.class.spec.js index 06872802e4..4ffdf259c8 100644 --- a/src/north/FileWriter/FileWriter.class.spec.js +++ b/src/north/FileWriter/FileWriter.class.spec.js @@ -5,7 +5,12 @@ const FileWriter = require('./FileWriter.class') const { defaultConfig: config } = require('../../../tests/testConfig') // Mock database service -jest.mock('../../services/database.service', () => {}) +jest.mock('../../services/database.service', () => ({ + createValueErrorsDatabase: jest.fn(), + getCount: jest.fn(), + createValuesDatabase: jest.fn(), + createFilesDatabase: jest.fn(), +})) // Mock logger jest.mock('../../engine/logger/Logger.class')