Skip to content

Commit

Permalink
feat(cache): create specific archive service for north connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
burgerni10 authored and Nicolas Burger committed Nov 20, 2022
1 parent e10a8e2 commit 069e0f1
Show file tree
Hide file tree
Showing 17 changed files with 327 additions and 247 deletions.
1 change: 1 addition & 0 deletions src/north/north-amazon-s3/north-amazon-s3.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ jest.mock('../../service/certificate.service')
jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) }))
jest.mock('../../service/cache/value-cache.service')
jest.mock('../../service/cache/file-cache.service')
jest.mock('../../service/cache/archive.service')

const proxies = [
{
Expand Down
13 changes: 11 additions & 2 deletions src/north/north-connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const ValueCache = require('../service/cache/value-cache.service')
const FileCache = require('../service/cache/file-cache.service')
const { createFolder } = require('../service/utils')
const { createProxyAgent } = require('../service/http-request-static-functions')
const ArchiveService = require('../service/cache/archive.service')

/**
* Class NorthConnector : provides general attributes and methods for north connectors.
Expand Down Expand Up @@ -106,10 +107,17 @@ class NorthConnector {
this.id,
this.logger,
this.baseFolder,
)
await this.fileCache.start()

this.archiveService = new ArchiveService(
this.id,
this.logger,
this.baseFolder,
this.cacheSettings.archive.enabled,
this.cacheSettings.archive.retentionDuration,
)
await this.fileCache.start()
await this.archiveService.start()

this.certificate = new CertificateService(this.logger)
await this.certificate.init(this.keyFile, this.certFile, this.caFile)
Expand Down Expand Up @@ -148,6 +156,7 @@ class NorthConnector {
}
await this.fileCache.stop()
await this.valueCache.stop()
await this.archiveService.stop()
this.logger.debug(`North connector ${this.id} stopped.`)
}

Expand Down Expand Up @@ -219,7 +228,7 @@ class NorthConnector {
try {
this.logger.debug(`Handling file "${fileToSend.path}".`)
await this.handleFile(fileToSend.path)
await this.fileCache.removeFileFromCache(fileToSend.path, this.cacheSettings.archive.enabled)
await this.archiveService.archiveOrRemoveFile(fileToSend.path)
this.numberOfSentFiles += 1
this.statusService.updateStatusDataStream({
'Last uploaded file': fileToSend.path,
Expand Down
18 changes: 8 additions & 10 deletions src/north/north-connector.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ jest.mock('../service/utils')
jest.mock('../service/http-request-static-functions')
jest.mock('../service/cache/value-cache.service')
jest.mock('../service/cache/file-cache.service')
jest.mock('../service/cache/archive.service')

// Method used to flush promises called in setTimeout
const flushPromises = () => new Promise(jest.requireActual('timers').setImmediate)
Expand Down Expand Up @@ -106,9 +107,9 @@ describe('NorthConnector', () => {
expect(clearTimeoutSpy).toHaveBeenCalledTimes(1)
expect(north.fileCache.stop).toHaveBeenCalledTimes(1)
expect(north.valueCache.stop).toHaveBeenCalledTimes(1)
expect(north.archiveService.stop).toHaveBeenCalledTimes(1)

clearTimeoutSpy.mockClear()
north.valuesTimeout = null
north.filesTimeout = null
await north.stop()
expect(clearTimeoutSpy).toHaveBeenCalledTimes(0)
Expand Down Expand Up @@ -149,11 +150,10 @@ describe('NorthConnector', () => {
})

it('should retry to send files if it fails', async () => {
clearTimeout(north.valuesTimeout)
clearTimeout(north.filesTimeout)
const fileToSend = { path: 'myFile' }
north.fileCache.retrieveFileFromCache = jest.fn(() => fileToSend)
north.fileCache.removeFileFromCache = jest.fn()
north.archiveService.archiveOrRemoveFile = jest.fn()
north.handleFile = jest.fn().mockImplementationOnce(() => {
throw new Error('handleFile error 1')
}).mockImplementationOnce(() => {
Expand All @@ -170,35 +170,33 @@ describe('NorthConnector', () => {

expect(north.handleFile).toHaveBeenCalledWith(fileToSend.path)
expect(north.handleFile).toHaveBeenCalledTimes(3)
expect(north.fileCache.removeFileFromCache).toHaveBeenCalledTimes(0)
expect(north.archiveService.archiveOrRemoveFile).toHaveBeenCalledTimes(0)
expect(north.fileCache.manageErroredFiles).toHaveBeenCalledTimes(1)
expect(north.fileCache.manageErroredFiles).toHaveBeenCalledWith(fileToSend.path)
})

it('should successfully send files', async () => {
clearTimeout(north.valuesTimeout)
clearTimeout(north.filesTimeout)
const fileToSend = { path: 'myFile' }
north.fileCache.retrieveFileFromCache = jest.fn(() => fileToSend)
north.fileCache.removeFileFromCache = jest.fn()
north.archiveService.archiveOrRemoveFile = jest.fn()
north.handleFile = jest.fn()

await north.retrieveFromCacheAndSendFile()
jest.advanceTimersByTime(configuration.caching.sendInterval)
await flushPromises()
expect(north.handleFile).toHaveBeenCalledWith(fileToSend.path)
expect(north.handleFile).toHaveBeenCalledTimes(2)
expect(north.fileCache.removeFileFromCache).toHaveBeenCalledTimes(2)
expect(north.fileCache.removeFileFromCache).toHaveBeenCalledWith(fileToSend.path, configuration.caching.archive.enabled)
expect(north.archiveService.archiveOrRemoveFile).toHaveBeenCalledTimes(2)
expect(north.archiveService.archiveOrRemoveFile).toHaveBeenCalledWith(fileToSend.path)
expect(north.fileCache.manageErroredFiles).toHaveBeenCalledTimes(0)
})

it('should send file immediately', async () => {
clearTimeout(north.valuesTimeout)
clearTimeout(north.filesTimeout)
const fileToSend = { path: 'myFile' }
north.fileCache.retrieveFileFromCache = jest.fn(() => fileToSend)
north.fileCache.removeFileFromCache = jest.fn()
north.archiveService.archiveOrRemoveFile = jest.fn()
north.resetFilesTimeout = jest.fn()
// handle file takes twice the sending interval time
const promiseToResolve = new Promise((resolve) => {
Expand Down
1 change: 1 addition & 0 deletions src/north/north-console/north-console.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ jest.mock('../../service/certificate.service')
jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) }))
jest.mock('../../service/cache/value-cache.service')
jest.mock('../../service/cache/file-cache.service')
jest.mock('../../service/cache/archive.service')

const nowDateString = '2020-02-02T02:02:02.222Z'
let configuration = null
Expand Down
1 change: 1 addition & 0 deletions src/north/north-csv-to-http/north-csv-to-http.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ jest.mock('../../service/certificate.service')
jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) }))
jest.mock('../../service/cache/value-cache.service')
jest.mock('../../service/cache/file-cache.service')
jest.mock('../../service/cache/archive.service')
jest.mock('../../service/utils')
jest.mock('../../service/http-request-static-functions')

Expand Down
1 change: 1 addition & 0 deletions src/north/north-file-writer/north-file-writer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ jest.mock('../../service/certificate.service')
jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) }))
jest.mock('../../service/cache/value-cache.service')
jest.mock('../../service/cache/file-cache.service')
jest.mock('../../service/cache/archive.service')

const nowDateString = '2020-02-02T02:02:02.222Z'
let configuration = null
Expand Down
1 change: 1 addition & 0 deletions src/north/north-influx-db/north-influx-db.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ jest.mock('../../service/certificate.service')
jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) }))
jest.mock('../../service/cache/value-cache.service')
jest.mock('../../service/cache/file-cache.service')
jest.mock('../../service/cache/archive.service')
jest.mock('../../service/utils')
jest.mock('../../service/http-request-static-functions')

Expand Down
1 change: 1 addition & 0 deletions src/north/north-mongo-db/north-mongo-db.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ jest.mock('../../service/certificate.service')
jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) }))
jest.mock('../../service/cache/value-cache.service')
jest.mock('../../service/cache/file-cache.service')
jest.mock('../../service/cache/archive.service')

const nowDateString = '2020-02-02T02:02:02.222Z'
const values = [
Expand Down
1 change: 1 addition & 0 deletions src/north/north-mqtt/north-mqtt.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ jest.mock('../../service/status.service')
jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) }))
jest.mock('../../service/cache/value-cache.service')
jest.mock('../../service/cache/file-cache.service')
jest.mock('../../service/cache/archive.service')

// Mock certificate service
const CertificateService = jest.mock('../../service/certificate.service')
Expand Down
1 change: 1 addition & 0 deletions src/north/north-oianalytics/north-oianalytics.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ jest.mock('../../service/certificate.service')
jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) }))
jest.mock('../../service/cache/value-cache.service')
jest.mock('../../service/cache/file-cache.service')
jest.mock('../../service/cache/archive.service')
jest.mock('../../service/utils')
jest.mock('../../service/http-request-static-functions')

Expand Down
1 change: 1 addition & 0 deletions src/north/north-oiconnect/north-oiconnect.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ jest.mock('../../service/certificate.service')
jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) }))
jest.mock('../../service/cache/value-cache.service')
jest.mock('../../service/cache/file-cache.service')
jest.mock('../../service/cache/archive.service')
jest.mock('../../service/utils')
jest.mock('../../service/http-request-static-functions')

Expand Down
1 change: 1 addition & 0 deletions src/north/north-timescale-db/north-timescale-db.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ jest.mock('../../service/certificate.service')
jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) }))
jest.mock('../../service/cache/value-cache.service')
jest.mock('../../service/cache/file-cache.service')
jest.mock('../../service/cache/archive.service')

const nowDateString = '2020-02-02T02:02:02.222Z'
const values = [
Expand Down
1 change: 1 addition & 0 deletions src/north/north-watsy/north-watsy.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ jest.mock('../../service/certificate.service')
jest.mock('../../service/encryption.service', () => ({ getInstance: () => ({ decryptText: (password) => password }) }))
jest.mock('../../service/cache/value-cache.service')
jest.mock('../../service/cache/file-cache.service')
jest.mock('../../service/cache/archive.service')

let configuration = null
let north = null
Expand Down
147 changes: 147 additions & 0 deletions src/service/cache/archive.service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
const fs = require('node:fs/promises')
const path = require('node:path')

const { createFolder } = require('../utils')

// Time between two checks of the Archive Folder
const ARCHIVE_TIMEOUT = 3600000 // one hour
const ARCHIVE_FOLDER = 'archive'

/**
* Archive service used to archive sent file and check periodically the archive folder to remove old files
* Once a file is sent by a North connector, the archiveOrRemoveFile is called by the connector to manage the file
*/
class ArchiveService {
/**
* @param {String} northId - The North ID connector
* @param {Logger} logger - The logger
* @param {String} baseFolder - The North cache folder generated as north-connectorId. This base folder can
* be in data-stream or history-query folder depending on the connector use case
* @param {Boolean} enabled - If the archive mode for this North connector is enabled
* @param {Number} retentionDuration - File retention duration in archive folder (in hours)
* @return {void}
*/
constructor(
northId,
logger,
baseFolder,
enabled,
retentionDuration,
) {
this.northId = northId
this.logger = logger
this.baseFolder = baseFolder
this.enabled = enabled
// Convert from hours to ms to compare with mtimeMs (file modified time in ms)
this.retentionDuration = retentionDuration * 3600000
this.archiveFolder = path.resolve(this.baseFolder, ARCHIVE_FOLDER)
this.archiveTimeout = null
}

/**
* Create folders and activate archive cleanup if needed
* @returns {Promise<void>} - The result promise
*/
async start() {
await createFolder(this.archiveFolder)
// refresh the archiveFolder at the beginning only if retentionDuration is different from 0
if (this.enabled && this.retentionDuration > 0) {
this.refreshArchiveFolder()
}
}

/**
* Stop the archive timeout and close the databases
* @return {void}
*/
async stop() {
if (this.archiveTimeout) {
clearTimeout(this.archiveTimeout)
}
}

/**
* Remove file from North connector cache and place it to archive folder if enabled.
* @param {String} filePathInCache - The file to remove
* @return {Promise<void>} - The result promise
*/
async archiveOrRemoveFile(filePathInCache) {
if (this.enabled) {
const filenameInfo = path.parse(filePathInCache)
const archivePath = path.join(this.archiveFolder, filenameInfo.base)
// Move cache file into the archive folder
try {
await fs.rename(filePathInCache, archivePath)
this.logger.debug(`File "${filePathInCache}" moved to "${archivePath}".`)
} catch (renameError) {
this.logger.error(renameError)
}
} else {
// Delete original file
try {
await fs.unlink(filePathInCache)
this.logger.debug(`File "${filePathInCache}" removed from disk.`)
} catch (unlinkError) {
this.logger.error(unlinkError)
}
}
}

/**
* Delete files in archiveFolder if they are older thant the retention time.
* @return {void}
*/
async refreshArchiveFolder() {
this.logger.debug('Parse archive folder to remove old files.')
// If a timeout already runs, clear it
if (this.archiveTimeout) {
clearTimeout(this.archiveTimeout)
}

let files = []
try {
files = await fs.readdir(this.archiveFolder)
} catch (error) {
// If the archive folder doest not exist (removed by the user for example), an error is logged
this.logger.error(error)
}
if (files.length > 0) {
const referenceDate = new Date().getTime()

// Map each file to a promise and remove files sequentially
await files.reduce((promise, file) => promise.then(
async () => this.removeFileIfTooOld(file, referenceDate, this.archiveFolder),
), Promise.resolve())
} else {
this.logger.debug(`The archive folder "${this.archiveFolder}" is empty. Nothing to delete.`)
}
this.archiveTimeout = setTimeout(this.refreshArchiveFolder.bind(this), ARCHIVE_TIMEOUT)
}

/**
* Check the modified time of a file and remove it if older than the retention duration
* @param {String} filePath - The path of the file to remove
* @param {Number} referenceDate - The reference date (in ms)
* @param {String} archiveFolder - The archive folder
* @returns {Promise<void>} - The result promise
*/
async removeFileIfTooOld(filePath, referenceDate, archiveFolder) {
let stats
try {
// If a file is being written or corrupted, the stat method can fail an error is logged
stats = await fs.stat(path.join(archiveFolder, filePath))
} catch (error) {
this.logger.error(error)
}
if (stats && stats.mtimeMs + this.retentionDuration < referenceDate) {
try {
await fs.unlink(path.join(archiveFolder, filePath))
this.logger.debug(`File "${path.join(archiveFolder, filePath)}" removed from archive.`)
} catch (unlinkError) {
this.logger.error(unlinkError)
}
}
}
}

module.exports = ArchiveService
Loading

0 comments on commit 069e0f1

Please sign in to comment.