diff --git a/src/north/north-amazon-s3/north-amazon-s3.spec.js b/src/north/north-amazon-s3/north-amazon-s3.spec.js index a5d7e77ca6..e76e986a62 100644 --- a/src/north/north-amazon-s3/north-amazon-s3.spec.js +++ b/src/north/north-amazon-s3/north-amazon-s3.spec.js @@ -20,6 +20,7 @@ jest.mock('../../service/certificate.service') jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) })) jest.mock('../../service/cache/value-cache.service') jest.mock('../../service/cache/file-cache.service') +jest.mock('../../service/cache/archive.service') const proxies = [ { diff --git a/src/north/north-connector.js b/src/north/north-connector.js index 7b81b59187..2c7f17a5c7 100644 --- a/src/north/north-connector.js +++ b/src/north/north-connector.js @@ -8,6 +8,7 @@ const ValueCache = require('../service/cache/value-cache.service') const FileCache = require('../service/cache/file-cache.service') const { createFolder } = require('../service/utils') const { createProxyAgent } = require('../service/http-request-static-functions') +const ArchiveService = require('../service/cache/archive.service') /** * Class NorthConnector : provides general attributes and methods for north connectors. @@ -106,10 +107,17 @@ class NorthConnector { this.id, this.logger, this.baseFolder, + ) + await this.fileCache.start() + + this.archiveService = new ArchiveService( + this.id, + this.logger, + this.baseFolder, this.cacheSettings.archive.enabled, this.cacheSettings.archive.retentionDuration, ) - await this.fileCache.start() + await this.archiveService.start() this.certificate = new CertificateService(this.logger) await this.certificate.init(this.keyFile, this.certFile, this.caFile) @@ -148,6 +156,7 @@ class NorthConnector { } await this.fileCache.stop() await this.valueCache.stop() + await this.archiveService.stop() this.logger.debug(`North connector ${this.id} stopped.`) } @@ -219,7 +228,7 @@ class NorthConnector { try { this.logger.debug(`Handling file "${fileToSend.path}".`) await this.handleFile(fileToSend.path) - await this.fileCache.removeFileFromCache(fileToSend.path, this.cacheSettings.archive.enabled) + await this.archiveService.archiveOrRemoveFile(fileToSend.path) this.numberOfSentFiles += 1 this.statusService.updateStatusDataStream({ 'Last uploaded file': fileToSend.path, diff --git a/src/north/north-connector.spec.js b/src/north/north-connector.spec.js index 1e58d92a76..6b020b7060 100644 --- a/src/north/north-connector.spec.js +++ b/src/north/north-connector.spec.js @@ -15,6 +15,7 @@ jest.mock('../service/utils') jest.mock('../service/http-request-static-functions') jest.mock('../service/cache/value-cache.service') jest.mock('../service/cache/file-cache.service') +jest.mock('../service/cache/archive.service') // Method used to flush promises called in setTimeout const flushPromises = () => new Promise(jest.requireActual('timers').setImmediate) @@ -106,9 +107,9 @@ describe('NorthConnector', () => { expect(clearTimeoutSpy).toHaveBeenCalledTimes(1) expect(north.fileCache.stop).toHaveBeenCalledTimes(1) expect(north.valueCache.stop).toHaveBeenCalledTimes(1) + expect(north.archiveService.stop).toHaveBeenCalledTimes(1) clearTimeoutSpy.mockClear() - north.valuesTimeout = null north.filesTimeout = null await north.stop() expect(clearTimeoutSpy).toHaveBeenCalledTimes(0) @@ -149,11 +150,10 @@ describe('NorthConnector', () => { }) it('should retry to send files if it fails', async () => { - clearTimeout(north.valuesTimeout) clearTimeout(north.filesTimeout) const fileToSend = { path: 'myFile' } north.fileCache.retrieveFileFromCache = jest.fn(() => fileToSend) - north.fileCache.removeFileFromCache = jest.fn() + north.archiveService.archiveOrRemoveFile = jest.fn() north.handleFile = jest.fn().mockImplementationOnce(() => { throw new Error('handleFile error 1') }).mockImplementationOnce(() => { @@ -170,17 +170,16 @@ describe('NorthConnector', () => { expect(north.handleFile).toHaveBeenCalledWith(fileToSend.path) expect(north.handleFile).toHaveBeenCalledTimes(3) - expect(north.fileCache.removeFileFromCache).toHaveBeenCalledTimes(0) + expect(north.archiveService.archiveOrRemoveFile).toHaveBeenCalledTimes(0) expect(north.fileCache.manageErroredFiles).toHaveBeenCalledTimes(1) expect(north.fileCache.manageErroredFiles).toHaveBeenCalledWith(fileToSend.path) }) it('should successfully send files', async () => { - clearTimeout(north.valuesTimeout) clearTimeout(north.filesTimeout) const fileToSend = { path: 'myFile' } north.fileCache.retrieveFileFromCache = jest.fn(() => fileToSend) - north.fileCache.removeFileFromCache = jest.fn() + north.archiveService.archiveOrRemoveFile = jest.fn() north.handleFile = jest.fn() await north.retrieveFromCacheAndSendFile() @@ -188,17 +187,16 @@ describe('NorthConnector', () => { await flushPromises() expect(north.handleFile).toHaveBeenCalledWith(fileToSend.path) expect(north.handleFile).toHaveBeenCalledTimes(2) - expect(north.fileCache.removeFileFromCache).toHaveBeenCalledTimes(2) - expect(north.fileCache.removeFileFromCache).toHaveBeenCalledWith(fileToSend.path, configuration.caching.archive.enabled) + expect(north.archiveService.archiveOrRemoveFile).toHaveBeenCalledTimes(2) + expect(north.archiveService.archiveOrRemoveFile).toHaveBeenCalledWith(fileToSend.path) expect(north.fileCache.manageErroredFiles).toHaveBeenCalledTimes(0) }) it('should send file immediately', async () => { - clearTimeout(north.valuesTimeout) clearTimeout(north.filesTimeout) const fileToSend = { path: 'myFile' } north.fileCache.retrieveFileFromCache = jest.fn(() => fileToSend) - north.fileCache.removeFileFromCache = jest.fn() + north.archiveService.archiveOrRemoveFile = jest.fn() north.resetFilesTimeout = jest.fn() // handle file takes twice the sending interval time const promiseToResolve = new Promise((resolve) => { diff --git a/src/north/north-console/north-console.spec.js b/src/north/north-console/north-console.spec.js index 9c53cb58fa..02d66cf939 100644 --- a/src/north/north-console/north-console.spec.js +++ b/src/north/north-console/north-console.spec.js @@ -17,6 +17,7 @@ jest.mock('../../service/certificate.service') jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) })) jest.mock('../../service/cache/value-cache.service') jest.mock('../../service/cache/file-cache.service') +jest.mock('../../service/cache/archive.service') const nowDateString = '2020-02-02T02:02:02.222Z' let configuration = null diff --git a/src/north/north-csv-to-http/north-csv-to-http.spec.js b/src/north/north-csv-to-http/north-csv-to-http.spec.js index 783cf8547b..72bcf49704 100644 --- a/src/north/north-csv-to-http/north-csv-to-http.spec.js +++ b/src/north/north-csv-to-http/north-csv-to-http.spec.js @@ -20,6 +20,7 @@ jest.mock('../../service/certificate.service') jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) })) jest.mock('../../service/cache/value-cache.service') jest.mock('../../service/cache/file-cache.service') +jest.mock('../../service/cache/archive.service') jest.mock('../../service/utils') jest.mock('../../service/http-request-static-functions') diff --git a/src/north/north-file-writer/north-file-writer.spec.js b/src/north/north-file-writer/north-file-writer.spec.js index 6cf2f83e5d..10cd5f3c6b 100644 --- a/src/north/north-file-writer/north-file-writer.spec.js +++ b/src/north/north-file-writer/north-file-writer.spec.js @@ -14,6 +14,7 @@ jest.mock('../../service/certificate.service') jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) })) jest.mock('../../service/cache/value-cache.service') jest.mock('../../service/cache/file-cache.service') +jest.mock('../../service/cache/archive.service') const nowDateString = '2020-02-02T02:02:02.222Z' let configuration = null diff --git a/src/north/north-influx-db/north-influx-db.spec.js b/src/north/north-influx-db/north-influx-db.spec.js index fee61019b8..97a76f7073 100644 --- a/src/north/north-influx-db/north-influx-db.spec.js +++ b/src/north/north-influx-db/north-influx-db.spec.js @@ -12,6 +12,7 @@ jest.mock('../../service/certificate.service') jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) })) jest.mock('../../service/cache/value-cache.service') jest.mock('../../service/cache/file-cache.service') +jest.mock('../../service/cache/archive.service') jest.mock('../../service/utils') jest.mock('../../service/http-request-static-functions') diff --git a/src/north/north-mongo-db/north-mongo-db.spec.js b/src/north/north-mongo-db/north-mongo-db.spec.js index f80b87fd54..e794326180 100644 --- a/src/north/north-mongo-db/north-mongo-db.spec.js +++ b/src/north/north-mongo-db/north-mongo-db.spec.js @@ -15,6 +15,7 @@ jest.mock('../../service/certificate.service') jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) })) jest.mock('../../service/cache/value-cache.service') jest.mock('../../service/cache/file-cache.service') +jest.mock('../../service/cache/archive.service') const nowDateString = '2020-02-02T02:02:02.222Z' const values = [ diff --git a/src/north/north-mqtt/north-mqtt.spec.js b/src/north/north-mqtt/north-mqtt.spec.js index bc76ad69ba..7671dad710 100644 --- a/src/north/north-mqtt/north-mqtt.spec.js +++ b/src/north/north-mqtt/north-mqtt.spec.js @@ -15,6 +15,7 @@ jest.mock('../../service/status.service') jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) })) jest.mock('../../service/cache/value-cache.service') jest.mock('../../service/cache/file-cache.service') +jest.mock('../../service/cache/archive.service') // Mock certificate service const CertificateService = jest.mock('../../service/certificate.service') diff --git a/src/north/north-oianalytics/north-oianalytics.spec.js b/src/north/north-oianalytics/north-oianalytics.spec.js index 5cb280dec3..5c2cf2bd6a 100644 --- a/src/north/north-oianalytics/north-oianalytics.spec.js +++ b/src/north/north-oianalytics/north-oianalytics.spec.js @@ -15,6 +15,7 @@ jest.mock('../../service/certificate.service') jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) })) jest.mock('../../service/cache/value-cache.service') jest.mock('../../service/cache/file-cache.service') +jest.mock('../../service/cache/archive.service') jest.mock('../../service/utils') jest.mock('../../service/http-request-static-functions') diff --git a/src/north/north-oiconnect/north-oiconnect.spec.js b/src/north/north-oiconnect/north-oiconnect.spec.js index 2cd161daaa..ad0cf2bff8 100644 --- a/src/north/north-oiconnect/north-oiconnect.spec.js +++ b/src/north/north-oiconnect/north-oiconnect.spec.js @@ -14,6 +14,7 @@ jest.mock('../../service/certificate.service') jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) })) jest.mock('../../service/cache/value-cache.service') jest.mock('../../service/cache/file-cache.service') +jest.mock('../../service/cache/archive.service') jest.mock('../../service/utils') jest.mock('../../service/http-request-static-functions') diff --git a/src/north/north-timescale-db/north-timescale-db.spec.js b/src/north/north-timescale-db/north-timescale-db.spec.js index 2d02ecee21..4fe2def2cd 100644 --- a/src/north/north-timescale-db/north-timescale-db.spec.js +++ b/src/north/north-timescale-db/north-timescale-db.spec.js @@ -15,6 +15,7 @@ jest.mock('../../service/certificate.service') jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) })) jest.mock('../../service/cache/value-cache.service') jest.mock('../../service/cache/file-cache.service') +jest.mock('../../service/cache/archive.service') const nowDateString = '2020-02-02T02:02:02.222Z' const values = [ diff --git a/src/north/north-watsy/north-watsy.spec.js b/src/north/north-watsy/north-watsy.spec.js index f80917027f..3dc925e1d9 100644 --- a/src/north/north-watsy/north-watsy.spec.js +++ b/src/north/north-watsy/north-watsy.spec.js @@ -24,6 +24,7 @@ jest.mock('../../service/certificate.service') jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) })) jest.mock('../../service/cache/value-cache.service') jest.mock('../../service/cache/file-cache.service') +jest.mock('../../service/cache/archive.service') let configuration = null let north = null diff --git a/src/service/cache/archive.service.js b/src/service/cache/archive.service.js new file mode 100644 index 0000000000..12c5a622c3 --- /dev/null +++ b/src/service/cache/archive.service.js @@ -0,0 +1,147 @@ +const fs = require('node:fs/promises') +const path = require('node:path') + +const { createFolder } = require('../utils') + +// Time between two checks of the Archive Folder +const ARCHIVE_TIMEOUT = 3600000 // one hour +const ARCHIVE_FOLDER = 'archive' + +/** + * Archive service used to archive sent file and check periodically the archive folder to remove old files + * Once a file is sent by a North connector, the archiveOrRemoveFile is called by the connector to manage the file + */ +class ArchiveService { + /** + * @param {String} northId - The North ID connector + * @param {Logger} logger - The logger + * @param {String} baseFolder - The North cache folder generated as north-connectorId. This base folder can + * be in data-stream or history-query folder depending on the connector use case + * @param {Boolean} enabled - If the archive mode for this North connector is enabled + * @param {Number} retentionDuration - File retention duration in archive folder (in hours) + * @return {void} + */ + constructor( + northId, + logger, + baseFolder, + enabled, + retentionDuration, + ) { + this.northId = northId + this.logger = logger + this.baseFolder = baseFolder + this.enabled = enabled + // Convert from hours to ms to compare with mtimeMs (file modified time in ms) + this.retentionDuration = retentionDuration * 3600000 + this.archiveFolder = path.resolve(this.baseFolder, ARCHIVE_FOLDER) + this.archiveTimeout = null + } + + /** + * Create folders and activate archive cleanup if needed + * @returns {Promise} - The result promise + */ + async start() { + await createFolder(this.archiveFolder) + // refresh the archiveFolder at the beginning only if retentionDuration is different from 0 + if (this.enabled && this.retentionDuration > 0) { + this.refreshArchiveFolder() + } + } + + /** + * Stop the archive timeout and close the databases + * @return {void} + */ + async stop() { + if (this.archiveTimeout) { + clearTimeout(this.archiveTimeout) + } + } + + /** + * Remove file from North connector cache and place it to archive folder if enabled. + * @param {String} filePathInCache - The file to remove + * @return {Promise} - The result promise + */ + async archiveOrRemoveFile(filePathInCache) { + if (this.enabled) { + const filenameInfo = path.parse(filePathInCache) + const archivePath = path.join(this.archiveFolder, filenameInfo.base) + // Move cache file into the archive folder + try { + await fs.rename(filePathInCache, archivePath) + this.logger.debug(`File "${filePathInCache}" moved to "${archivePath}".`) + } catch (renameError) { + this.logger.error(renameError) + } + } else { + // Delete original file + try { + await fs.unlink(filePathInCache) + this.logger.debug(`File "${filePathInCache}" removed from disk.`) + } catch (unlinkError) { + this.logger.error(unlinkError) + } + } + } + + /** + * Delete files in archiveFolder if they are older thant the retention time. + * @return {void} + */ + async refreshArchiveFolder() { + this.logger.debug('Parse archive folder to remove old files.') + // If a timeout already runs, clear it + if (this.archiveTimeout) { + clearTimeout(this.archiveTimeout) + } + + let files = [] + try { + files = await fs.readdir(this.archiveFolder) + } catch (error) { + // If the archive folder doest not exist (removed by the user for example), an error is logged + this.logger.error(error) + } + if (files.length > 0) { + const referenceDate = new Date().getTime() + + // Map each file to a promise and remove files sequentially + await files.reduce((promise, file) => promise.then( + async () => this.removeFileIfTooOld(file, referenceDate, this.archiveFolder), + ), Promise.resolve()) + } else { + this.logger.debug(`The archive folder "${this.archiveFolder}" is empty. Nothing to delete.`) + } + this.archiveTimeout = setTimeout(this.refreshArchiveFolder.bind(this), ARCHIVE_TIMEOUT) + } + + /** + * Check the modified time of a file and remove it if older than the retention duration + * @param {String} filePath - The path of the file to remove + * @param {Number} referenceDate - The reference date (in ms) + * @param {String} archiveFolder - The archive folder + * @returns {Promise} - The result promise + */ + async removeFileIfTooOld(filePath, referenceDate, archiveFolder) { + let stats + try { + // If a file is being written or corrupted, the stat method can fail an error is logged + stats = await fs.stat(path.join(archiveFolder, filePath)) + } catch (error) { + this.logger.error(error) + } + if (stats && stats.mtimeMs + this.retentionDuration < referenceDate) { + try { + await fs.unlink(path.join(archiveFolder, filePath)) + this.logger.debug(`File "${path.join(archiveFolder, filePath)}" removed from archive.`) + } catch (unlinkError) { + this.logger.error(unlinkError) + } + } + } +} + +module.exports = ArchiveService diff --git a/src/service/cache/archive.service.spec.js b/src/service/cache/archive.service.spec.js new file mode 100644 index 0000000000..18d2ed515e --- /dev/null +++ b/src/service/cache/archive.service.spec.js @@ -0,0 +1,148 @@ +const path = require('node:path') +const fs = require('node:fs/promises') + +const ArchiveService = require('./archive.service') + +const { createFolder } = require('../utils') + +jest.mock('node:fs/promises') + +// Mock logger +const logger = { + error: jest.fn(), + warn: jest.fn(), + info: jest.fn(), + debug: jest.fn(), + trace: jest.fn(), +} +jest.mock('../../service/utils') +// Method used to flush promises called in setTimeout +const flushPromises = () => new Promise(jest.requireActual('timers').setImmediate) +const nowDateString = '2020-02-02T02:02:02.222Z' +let archiveService = null +describe('ArchiveService', () => { + beforeEach(async () => { + jest.resetAllMocks() + jest.useFakeTimers().setSystemTime(new Date(nowDateString)) + + archiveService = new ArchiveService('northId', logger, 'myCacheFolder', true, 1) + }) + + it('should be properly initialized with archive enabled', async () => { + archiveService.refreshArchiveFolder = jest.fn() + await archiveService.start() + expect(archiveService.northId).toEqual('northId') + expect(archiveService.baseFolder).toEqual('myCacheFolder') + expect(createFolder).toHaveBeenCalledWith(path.resolve('myCacheFolder', 'archive')) + + expect(archiveService.refreshArchiveFolder).toHaveBeenCalledTimes(1) + }) + + it('should be properly initialized with archive disabled', async () => { + archiveService.enabled = false + archiveService.refreshArchiveFolder = jest.fn() + await archiveService.start() + expect(archiveService.northId).toEqual('northId') + expect(archiveService.baseFolder).toEqual('myCacheFolder') + expect(createFolder).toHaveBeenCalledWith(path.resolve('myCacheFolder', 'archive')) + + expect(archiveService.refreshArchiveFolder).not.toHaveBeenCalled() + }) + + it('should properly stop', () => { + const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout') + + archiveService.stop() + expect(clearTimeoutSpy).not.toHaveBeenCalled() + + archiveService.archiveTimeout = 1 + archiveService.stop() + expect(clearTimeoutSpy).toHaveBeenCalledTimes(1) + }) + + it('should properly move file from cache to archive folder', async () => { + fs.rename.mockImplementationOnce(() => {}).mockImplementationOnce(() => { + throw new Error('rename error') + }) + + await archiveService.archiveOrRemoveFile('myFile.csv') + expect(logger.debug).toHaveBeenCalledWith(`File "myFile.csv" moved to "${path.resolve(archiveService.archiveFolder, 'myFile.csv')}".`) + + await archiveService.archiveOrRemoveFile('myFile.csv') + expect(logger.error).toHaveBeenCalledWith(new Error('rename error')) + }) + + it('should properly remove file from cache', async () => { + archiveService.enabled = false + fs.unlink.mockImplementationOnce(() => {}).mockImplementationOnce(() => { + throw new Error('unlink error') + }) + + await archiveService.archiveOrRemoveFile('myFile.csv') + expect(logger.debug).toHaveBeenCalledWith('File "myFile.csv" removed from disk.') + + await archiveService.archiveOrRemoveFile('myFile.csv') + expect(logger.error).toHaveBeenCalledWith(new Error('unlink error')) + }) + + it('should refresh archive folder', async () => { + const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout') + + fs.readdir.mockImplementationOnce(() => []).mockImplementation(() => ['myFile1', 'myFile2']) + archiveService.removeFileIfTooOld = jest.fn() + + await archiveService.refreshArchiveFolder() + expect(clearTimeoutSpy).not.toHaveBeenCalled() + expect(logger.debug).toHaveBeenCalledWith('Parse archive folder to remove old files.') + expect(logger.debug).toHaveBeenCalledWith(`The archive folder "${path.resolve('myCacheFolder', 'archive')}" is empty. Nothing to delete.`) + + jest.advanceTimersByTime(3600000) + await flushPromises() + + expect(archiveService.removeFileIfTooOld).toHaveBeenCalledTimes(2) + }) + + it('should manage archive folder readdir error', async () => { + fs.readdir.mockImplementationOnce(() => { + throw new Error('readdir error') + }) + archiveService.removeFileIfTooOld = jest.fn() + + await archiveService.refreshArchiveFolder() + expect(archiveService.removeFileIfTooOld).not.toHaveBeenCalled() + expect(logger.error).toHaveBeenCalledWith(new Error('readdir error')) + }) + + it('should remove file if too old', async () => { + fs.stat.mockImplementationOnce(() => ({ mtimeMs: new Date('2020-02-01T02:02:02.222Z').getTime() })) + .mockImplementationOnce(() => ({ mtimeMs: new Date('2020-02-02T02:02:01.222Z').getTime() })) + + await archiveService.removeFileIfTooOld('myOldFile.csv', new Date(), 'archiveFolder') + expect(fs.unlink).toHaveBeenCalledWith(path.join('archiveFolder', 'myOldFile.csv')) + expect(logger.debug).toHaveBeenCalledWith(`File "${path.join('archiveFolder', 'myOldFile.csv')}" removed from archive.`) + await archiveService.removeFileIfTooOld('myNewFile.csv', new Date(), 'archiveFolder') + expect(logger.debug).toHaveBeenCalledTimes(1) + expect(fs.unlink).toHaveBeenCalledTimes(1) + }) + + it('should log an error if can not remove old file', async () => { + fs.stat.mockImplementationOnce(() => ({ mtimeMs: new Date('2020-02-01T02:02:02.222Z').getTime() })) + fs.unlink.mockImplementationOnce(() => { + throw new Error('unlink error') + }) + + await archiveService.removeFileIfTooOld('myOldFile.csv', new Date(), 'archiveFolder') + expect(logger.error).toHaveBeenCalledWith(new Error('unlink error')) + }) + + it('should log an error if a problem occur accessing the file', async () => { + fs.stat.mockImplementationOnce(() => { + throw new Error('stat error') + }) + + await archiveService.removeFileIfTooOld('myOldFile.csv', new Date(), 'archiveFolder') + expect(fs.unlink).not.toHaveBeenCalled() + expect(logger.error).toHaveBeenCalledWith(new Error('stat error')) + expect(logger.error).toHaveBeenCalledTimes(1) + }) +}) diff --git a/src/service/cache/file-cache.service.js b/src/service/cache/file-cache.service.js index 4159fd3d20..c7659403c8 100644 --- a/src/service/cache/file-cache.service.js +++ b/src/service/cache/file-cache.service.js @@ -3,11 +3,6 @@ const path = require('node:path') const { createFolder } = require('../utils') -// Time between two checks of the Archive Folder -const ARCHIVE_TIMEOUT = 3600000 // one hour - -const ARCHIVE_FOLDER = 'archive' - const FILE_FOLDER = 'files' const ERROR_FOLDER = 'files-errors' @@ -20,32 +15,22 @@ class FileCacheService { * @param {Logger} logger - The logger * @param {String} baseFolder - The North cache folder generated as north-connectorId. This base folder can * be in data-stream or history-query folder depending on the connector use case - * @param {Boolean} archiveFiles - If the archive mode for this North connector is enabled - * @param {Number} retentionDuration - File retention duration in archive folder (in hours) * @return {void} */ constructor( northId, logger, baseFolder, - archiveFiles, - retentionDuration, ) { this.northId = northId this.logger = logger this.baseFolder = baseFolder - this.archiveFiles = archiveFiles - // Convert from hours to ms to compare with mtimeMs (file modified time in ms) - this.retentionDuration = retentionDuration * 3600000 - this.archiveFolder = path.resolve(this.baseFolder, ARCHIVE_FOLDER) this.fileFolder = path.resolve(this.baseFolder, FILE_FOLDER) this.errorFolder = path.resolve(this.baseFolder, ERROR_FOLDER) - - this.archiveTimeout = null } /** - * Create databases, folders and activate archive cleanup if needed + * Create folders and check errors files * @returns {Promise} - The result promise */ async start() { @@ -75,14 +60,6 @@ class FileCacheService { // If the folder does not exist, an error is logged but not thrown if the file cache folder is accessible this.logger.error(error) } - - if (this.archiveFiles) { - await createFolder(this.archiveFolder) - // refresh the archiveFolder at the beginning only if retentionDuration is different from 0 - if (this.retentionDuration > 0) { - this.refreshArchiveFolder() - } - } } /** @@ -90,9 +67,7 @@ class FileCacheService { * @return {void} */ stop() { - if (this.archiveTimeout) { - clearTimeout(this.archiveTimeout) - } + this.logger.info('stopping file cache') } /** @@ -171,34 +146,6 @@ class FileCacheService { } } - /** - * Remove file from North connector cache and place it to archive folder if required. - * @param {String} filePathInCache - The file to remove - * @param {Boolean} archiveFile - If the file must be archived or not - * @return {Promise} - The result promise - */ - async removeFileFromCache(filePathInCache, archiveFile) { - if (archiveFile) { - const filenameInfo = path.parse(filePathInCache) - const archivePath = path.join(this.archiveFolder, filenameInfo.base) - // Move cache file into the archive folder - try { - await fs.rename(filePathInCache, archivePath) - this.logger.debug(`File "${filePathInCache}" moved to "${archivePath}".`) - } catch (renameError) { - this.logger.error(renameError) - } - } else { - // Delete original file - try { - await fs.unlink(filePathInCache) - this.logger.debug(`File "${filePathInCache}" removed from disk.`) - } catch (unlinkError) { - this.logger.error(unlinkError) - } - } - } - /** * Check if the file cache is empty or not * @returns {Promise} - Cache empty or not @@ -213,62 +160,6 @@ class FileCacheService { } return files.length === 0 } - - /** - * Delete files in archiveFolder if they are older thant the retention time. - * @return {void} - */ - async refreshArchiveFolder() { - this.logger.debug('Parse archive folder to remove old files.') - // If a timeout already runs, clear it - if (this.archiveTimeout) { - clearTimeout(this.archiveTimeout) - } - - let files = [] - try { - files = await fs.readdir(this.archiveFolder) - } catch (error) { - // If the archive folder doest not exist (removed by the user for example), an error is logged - this.logger.error(error) - } - if (files.length > 0) { - const referenceDate = new Date().getTime() - - // Map each file to a promise and remove files sequentially - await files.reduce((promise, file) => promise.then( - async () => this.removeFileIfTooOld(file, referenceDate, this.archiveFolder), - ), Promise.resolve()) - } else { - this.logger.debug(`The archive folder "${this.archiveFolder}" is empty. Nothing to delete.`) - } - this.archiveTimeout = setTimeout(this.refreshArchiveFolder.bind(this), ARCHIVE_TIMEOUT) - } - - /** - * Check the modified time of a file and remove it if older than the retention duration - * @param {String} filePath - The path of the file to remove - * @param {Number} referenceDate - The reference date (in ms) - * @param {String} archiveFolder - The archive folder - * @returns {Promise} - The result promise - */ - async removeFileIfTooOld(filePath, referenceDate, archiveFolder) { - let stats - try { - // If a file is being written or corrupted, the stat method can fail an error is logged - stats = await fs.stat(path.join(archiveFolder, filePath)) - } catch (error) { - this.logger.error(error) - } - if (stats && stats.mtimeMs + this.retentionDuration < referenceDate) { - try { - await fs.unlink(path.join(archiveFolder, filePath)) - this.logger.debug(`File "${path.join(archiveFolder, filePath)}" removed from archive.`) - } catch (unlinkError) { - this.logger.error(unlinkError) - } - } - } } module.exports = FileCacheService diff --git a/src/service/cache/file-cache.service.spec.js b/src/service/cache/file-cache.service.spec.js index cb1c067dcf..f6fbba20a9 100644 --- a/src/service/cache/file-cache.service.spec.js +++ b/src/service/cache/file-cache.service.spec.js @@ -16,8 +16,6 @@ const logger = { trace: jest.fn(), } jest.mock('../../service/utils') -// Method used to flush promises called in setTimeout -const flushPromises = () => new Promise(jest.requireActual('timers').setImmediate) const nowDateString = '2020-02-02T02:02:02.222Z' let cache = null describe('FileCache', () => { @@ -29,74 +27,37 @@ describe('FileCache', () => { }) it('should be properly initialized with files in cache', async () => { - cache.refreshArchiveFolder = jest.fn() fs.readdir.mockImplementation(() => ['file1']) await cache.start() expect(cache.northId).toEqual('northId') expect(cache.baseFolder).toEqual('myCacheFolder') expect(createFolder).toHaveBeenCalledWith(path.resolve('myCacheFolder', 'files')) expect(createFolder).toHaveBeenCalledWith(path.resolve('myCacheFolder', 'files-errors')) - expect(createFolder).toHaveBeenCalledWith(path.resolve('myCacheFolder', 'archive')) expect(logger.debug).toHaveBeenCalledWith('1 files in cache.') expect(logger.warn).toHaveBeenCalledWith('1 files in error cache.') - - expect(cache.refreshArchiveFolder).toHaveBeenCalledTimes(1) }) it('should be properly initialized without files in cache', async () => { - cache.refreshArchiveFolder = jest.fn() fs.readdir.mockImplementation(() => []) - cache.retentionDuration = 0 await cache.start() expect(cache.northId).toEqual('northId') expect(cache.baseFolder).toEqual('myCacheFolder') expect(createFolder).toHaveBeenCalledWith(path.resolve('myCacheFolder', 'files')) expect(createFolder).toHaveBeenCalledWith(path.resolve('myCacheFolder', 'files-errors')) - expect(createFolder).toHaveBeenCalledWith(path.resolve('myCacheFolder', 'archive')) expect(logger.debug).toHaveBeenCalledWith('No files in cache.') expect(logger.debug).toHaveBeenCalledWith('No error files in cache.') - - expect(cache.refreshArchiveFolder).not.toHaveBeenCalled() }) it('should be properly initialized with a readdir error', async () => { - cache.refreshArchiveFolder = jest.fn() fs.readdir.mockImplementation(() => { throw new Error('readdir error') }) - cache.retentionDuration = 0 await cache.start() expect(logger.error).toHaveBeenCalledWith(new Error('readdir error')) expect(logger.error).toHaveBeenCalledTimes(2) - - expect(cache.refreshArchiveFolder).not.toHaveBeenCalled() - }) - - it('should be properly initialized without archive files', async () => { - cache.archiveFiles = false - cache.refreshArchiveFolder = jest.fn() - fs.readdir.mockImplementation(() => []) - cache.retentionDuration = 0 - await cache.start() - expect(createFolder).toHaveBeenCalledWith(path.resolve('myCacheFolder', 'files')) - expect(createFolder).toHaveBeenCalledWith(path.resolve('myCacheFolder', 'files-errors')) - expect(createFolder).toHaveBeenCalledTimes(2) - - expect(cache.refreshArchiveFolder).not.toHaveBeenCalled() - }) - - it('should properly stop', () => { - const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout') - - cache.stop() - expect(clearTimeoutSpy).not.toHaveBeenCalled() - - cache.archiveTimeout = 1 - cache.stop() - expect(clearTimeoutSpy).toHaveBeenCalledTimes(1) }) it('should properly cache file', async () => { @@ -175,30 +136,6 @@ describe('FileCache', () => { expect(logger.error).toHaveBeenCalledWith(new Error('rename error')) }) - it('should properly move file from cache to archive folder', async () => { - fs.rename.mockImplementationOnce(() => {}).mockImplementationOnce(() => { - throw new Error('rename error') - }) - - await cache.removeFileFromCache('myFile.csv', true) - expect(logger.debug).toHaveBeenCalledWith(`File "myFile.csv" moved to "${path.resolve(cache.archiveFolder, 'myFile.csv')}".`) - - await cache.removeFileFromCache('myFile.csv', true) - expect(logger.error).toHaveBeenCalledWith(new Error('rename error')) - }) - - it('should properly remove file from cache', async () => { - fs.unlink.mockImplementationOnce(() => {}).mockImplementationOnce(() => { - throw new Error('unlink error') - }) - - await cache.removeFileFromCache('myFile.csv', false) - expect(logger.debug).toHaveBeenCalledWith('File "myFile.csv" removed from disk.') - - await cache.removeFileFromCache('myFile.csv', false) - expect(logger.error).toHaveBeenCalledWith(new Error('unlink error')) - }) - it('should check if cache is empty', async () => { fs.readdir.mockImplementationOnce(() => []) .mockImplementationOnce(() => ['myFile1', 'myFile2']) @@ -215,65 +152,4 @@ describe('FileCache', () => { expect(logger.error).toHaveBeenCalledTimes(1) expect(logger.error).toHaveBeenCalledWith(new Error('readdir error')) }) - - it('should refresh archive folder', async () => { - const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout') - - fs.readdir.mockImplementationOnce(() => []).mockImplementation(() => ['myFile1', 'myFile2']) - cache.removeFileIfTooOld = jest.fn() - - await cache.refreshArchiveFolder() - expect(clearTimeoutSpy).not.toHaveBeenCalled() - expect(logger.debug).toHaveBeenCalledWith('Parse archive folder to remove old files.') - expect(logger.debug).toHaveBeenCalledWith(`The archive folder "${path.resolve('myCacheFolder', 'archive')}" is empty. Nothing to delete.`) - - jest.advanceTimersByTime(3600000) - await flushPromises() - - expect(cache.removeFileIfTooOld).toHaveBeenCalledTimes(2) - }) - - it('should manage archive folder readdir error', async () => { - fs.readdir.mockImplementationOnce(() => { - throw new Error('readdir error') - }) - cache.removeFileIfTooOld = jest.fn() - - await cache.refreshArchiveFolder() - expect(cache.removeFileIfTooOld).not.toHaveBeenCalled() - expect(logger.error).toHaveBeenCalledWith(new Error('readdir error')) - }) - - it('should remove file if too old', async () => { - fs.stat.mockImplementationOnce(() => ({ mtimeMs: new Date('2020-02-01T02:02:02.222Z').getTime() })) - .mockImplementationOnce(() => ({ mtimeMs: new Date('2020-02-02T02:02:01.222Z').getTime() })) - - await cache.removeFileIfTooOld('myOldFile.csv', new Date(), 'archiveFolder') - expect(fs.unlink).toHaveBeenCalledWith(path.join('archiveFolder', 'myOldFile.csv')) - expect(logger.debug).toHaveBeenCalledWith(`File "${path.join('archiveFolder', 'myOldFile.csv')}" removed from archive.`) - await cache.removeFileIfTooOld('myNewFile.csv', new Date(), 'archiveFolder') - expect(logger.debug).toHaveBeenCalledTimes(1) - expect(fs.unlink).toHaveBeenCalledTimes(1) - }) - - it('should log an error if can not remove old file', async () => { - fs.stat.mockImplementationOnce(() => ({ mtimeMs: new Date('2020-02-01T02:02:02.222Z').getTime() })) - fs.unlink.mockImplementationOnce(() => { - throw new Error('unlink error') - }) - - await cache.removeFileIfTooOld('myOldFile.csv', new Date(), 'archiveFolder') - expect(logger.error).toHaveBeenCalledWith(new Error('unlink error')) - }) - - it('should log an error if a problem occur accessing the file', async () => { - fs.stat.mockImplementationOnce(() => { - throw new Error('stat error') - }) - - await cache.removeFileIfTooOld('myOldFile.csv', new Date(), 'archiveFolder') - expect(fs.unlink).not.toHaveBeenCalled() - expect(logger.error).toHaveBeenCalledWith(new Error('stat error')) - expect(logger.error).toHaveBeenCalledTimes(1) - }) })