Skip to content

Commit

Permalink
fix(north): fix file cache stat eperm error
Browse files Browse the repository at this point in the history
  • Loading branch information
burgerni10 authored and Nicolas Burger committed Oct 25, 2022
1 parent c7cd651 commit 91ac4fc
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 32 deletions.
88 changes: 59 additions & 29 deletions src/engine/cache/file-cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,28 @@ class FileCache extends BaseCache {
await createFolder(this.fileFolder)
await createFolder(this.errorFolder)

const files = await fs.readdir(this.fileFolder)
if (files.length > 0) {
this.logger.debug(`${files.length} files in cache.`)
} else {
this.logger.debug('No files in cache.')
try {
const files = await fs.readdir(this.fileFolder)
if (files.length > 0) {
this.logger.debug(`${files.length} files in cache.`)
} else {
this.logger.debug('No files in cache.')
}
} catch (error) {
// If the folder does not exist, an error is logged but OIBus keep going to check errors and archive folders
this.logger.error(error)
}

const errorFiles = await fs.readdir(this.errorFolder)
if (errorFiles.length > 0) {
this.logger.warn(`${errorFiles.length} files in error cache.`)
} else {
this.logger.debug('No error files in cache.')
try {
const errorFiles = await fs.readdir(this.errorFolder)
if (errorFiles.length > 0) {
this.logger.warn(`${errorFiles.length} files in error cache.`)
} else {
this.logger.debug('No error files in cache.')
}
} catch (error) {
// If the folder does not exist, an error is logged but not thrown if the file cache folder is accessible
this.logger.error(error)
}

if (this.archiveFiles) {
Expand Down Expand Up @@ -122,24 +132,25 @@ class FileCache extends BaseCache {
return null
}

const sortedFiles = fileNames
.map(async (fileName) => {
// Retrieve file state to retrieve the oldest one from the cache
let fileState = null
const fileStats = []
// Get file stats one after the other
await fileNames.reduce((promise, fileName) => promise.then(
async () => {
try {
// Error triggered when a file is being written (from a South)
fileState = await fs.stat(path.resolve(this.fileFolder, fileName))
const stat = await fs.stat(path.resolve(this.fileFolder, fileName))
fileStats.push({
path: path.resolve(this.fileFolder, fileName),
timestamp: stat.mtime.getTime(),
})
} catch (error) {
// If a file is being written or corrupted, the stat method can fail
// An error is logged and the cache goes through the other files
this.logger.error(error)
}
return {
path: path.resolve(this.fileFolder, fileName),
timestamp: fileState ? fileState.mtime.getTime() : null,
}
})
// filter out files that are not completely written
.filter((file) => file.timestamp !== null)
.sort((a, b) => a.timestamp - b.timestamp)
},
), Promise.resolve())

const sortedFiles = fileStats.sort((a, b) => a.timestamp - b.timestamp)

return sortedFiles[0]
}
Expand Down Expand Up @@ -195,7 +206,13 @@ class FileCache extends BaseCache {
* @returns {Boolean} - Cache empty or not
*/
async isEmpty() {
const files = await fs.readdir(this.fileFolder)
let files = []
try {
files = await fs.readdir(this.fileFolder)
} catch (error) {
// Log an error if the folder does not exist (removed by the user while OIBus is running for example)
this.logger.error(error)
}
return files.length === 0
}

Expand All @@ -210,9 +227,16 @@ class FileCache extends BaseCache {
clearTimeout(this.archiveTimeout)
}

const files = await fs.readdir(this.archiveFolder)
const referenceDate = new Date().getTime()
let files = []
try {
files = await fs.readdir(this.archiveFolder)
} catch (error) {
// If the archive folder doest not exist (removed by the user for example), an error is logged
this.logger.error(error)
}
if (files.length > 0) {
const referenceDate = new Date().getTime()

// Map each file to a promise and remove files sequentially
await files.reduce((promise, file) => promise.then(
async () => this.removeFileIfTooOld(file, referenceDate, this.archiveFolder),
Expand All @@ -231,8 +255,14 @@ class FileCache extends BaseCache {
* @returns {Promise<void>} - The result promise
*/
async removeFileIfTooOld(filePath, referenceDate, archiveFolder) {
const stats = await fs.stat(path.join(archiveFolder, filePath))
if (stats.mtimeMs + this.retentionDuration < referenceDate) {
let stats
try {
// If a file is being written or corrupted, the stat method can fail an error is logged
stats = await fs.stat(path.join(archiveFolder, filePath))
} catch (error) {
this.logger.error(error)
}
if (stats && stats.mtimeMs + this.retentionDuration < referenceDate) {
try {
await fs.unlink(path.join(archiveFolder, filePath))
this.logger.debug(`File "${path.join(archiveFolder, filePath)}" removed from archive.`)
Expand Down
89 changes: 88 additions & 1 deletion src/engine/cache/file-cache.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ describe('FileCache', () => {
expect(cache.refreshArchiveFolder).not.toHaveBeenCalled()
})

it('should be properly initialized with a readdir error', async () => {
cache.refreshArchiveFolder = jest.fn()
fs.readdir.mockImplementation(() => {
throw new Error('readdir error')
})
cache.retentionDuration = 0
await cache.init()

expect(logger.error).toHaveBeenCalledWith(new Error('readdir error'))
expect(logger.error).toHaveBeenCalledTimes(2)

expect(cache.refreshArchiveFolder).not.toHaveBeenCalled()
})

it('should be properly initialized without archive files', async () => {
cache.archiveFiles = false
cache.refreshArchiveFolder = jest.fn()
Expand Down Expand Up @@ -93,6 +107,21 @@ describe('FileCache', () => {
expect(logger.debug).toHaveBeenCalledWith(`File "myFile.csv" cached in "${path.resolve('myCacheFolder', 'files', 'myFile-1580608922222.csv')}".`)
})

it('should properly managed cache file error', async () => {
fs.copyFile.mockImplementation(() => {
throw new Error('copy file')
})

let error
try {
await cache.cacheFile('myFile.csv')
} catch (copyError) {
error = copyError
}

expect(error).toEqual(new Error('copy file'))
})

it('should properly retrieve file from cache', async () => {
fs.readdir.mockImplementationOnce(() => []).mockImplementation(() => ['myFile1', 'myFile2'])
fs.stat.mockImplementation(() => ({ mtime: new Date() }))
Expand All @@ -107,6 +136,33 @@ describe('FileCache', () => {
})
})

it('should properly manage error when retrieving file from cache', async () => {
fs.readdir.mockImplementation(() => {
throw new Error('readdir error')
})

const noFile = await cache.retrieveFileFromCache()
expect(noFile).toEqual(null)
expect(logger.error).toHaveBeenCalledWith(new Error('readdir error'))
})

it('should properly retrieve file from cache and manage file error', async () => {
fs.readdir.mockImplementation(() => ['myFile1', 'myFile2', 'myFile3'])
fs.stat.mockImplementationOnce(() => ({ mtime: new Date() }))
.mockImplementationOnce(() => {
throw new Error('stat error')
})
.mockImplementationOnce(() => ({ mtime: new Date('2010-02-01T02:02:02.222Z') }))

const oneFile = await cache.retrieveFileFromCache()
expect(oneFile).toEqual({
path: path.resolve(cache.fileFolder, 'myFile3'),
timestamp: 1264989722222,
})
expect(logger.error).toHaveBeenCalledTimes(1)
expect(logger.error).toHaveBeenCalledWith(new Error('stat error'))
})

it('should properly manage error files', async () => {
fs.rename.mockImplementationOnce(() => {}).mockImplementationOnce(() => {
throw new Error('rename error')
Expand Down Expand Up @@ -144,11 +200,20 @@ describe('FileCache', () => {
})

it('should check if cache is empty', async () => {
fs.readdir.mockImplementationOnce(() => []).mockImplementation(() => ['myFile1', 'myFile2'])
fs.readdir.mockImplementationOnce(() => [])
.mockImplementationOnce(() => ['myFile1', 'myFile2'])
.mockImplementationOnce(() => {
throw new Error('readdir error')
})
const empty = await cache.isEmpty()
expect(empty).toBeTruthy()
const notEmpty = await cache.isEmpty()
expect(notEmpty).toBeFalsy()

const emptyBecauseOfError = await cache.isEmpty()
expect(emptyBecauseOfError).toBeTruthy()
expect(logger.error).toHaveBeenCalledTimes(1)
expect(logger.error).toHaveBeenCalledWith(new Error('readdir error'))
})

it('should refresh archive folder', async () => {
Expand All @@ -168,6 +233,17 @@ describe('FileCache', () => {
expect(cache.removeFileIfTooOld).toHaveBeenCalledTimes(2)
})

it('should manage archive folder readdir error', async () => {
fs.readdir.mockImplementationOnce(() => {
throw new Error('readdir error')
})
cache.removeFileIfTooOld = jest.fn()

await cache.refreshArchiveFolder()
expect(cache.removeFileIfTooOld).not.toHaveBeenCalled()
expect(logger.error).toHaveBeenCalledWith(new Error('readdir error'))
})

it('should remove file if too old', async () => {
fs.stat.mockImplementationOnce(() => ({ mtimeMs: new Date('2020-02-01T02:02:02.222Z').getTime() }))
.mockImplementationOnce(() => ({ mtimeMs: new Date('2020-02-02T02:02:01.222Z').getTime() }))
Expand All @@ -189,4 +265,15 @@ describe('FileCache', () => {
await cache.removeFileIfTooOld('myOldFile.csv', new Date(), 'archiveFolder')
expect(logger.error).toHaveBeenCalledWith(new Error('unlink error'))
})

it('should log an error if a problem occur accessing the file', async () => {
fs.stat.mockImplementationOnce(() => {
throw new Error('stat error')
})

await cache.removeFileIfTooOld('myOldFile.csv', new Date(), 'archiveFolder')
expect(fs.unlink).not.toHaveBeenCalled()
expect(logger.error).toHaveBeenCalledWith(new Error('stat error'))
expect(logger.error).toHaveBeenCalledTimes(1)
})
})
4 changes: 2 additions & 2 deletions src/north/north-connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class NorthConnector {

if (this.valuesRetryCount < this.caching.retryCount || this.shouldRetry(error)) {
this.valuesRetryCount += 1
this.logger.debug(`Retrying in ${this.caching.retryInterval}. Retry count: ${this.valuesRetryCount}`)
this.logger.debug(`Retrying in ${this.caching.retryInterval} ms. Retry count: ${this.valuesRetryCount}`)
} else {
this.logger.debug('Too many retries. Moving values to error cache...')
this.valueCache.manageErroredValues(values)
Expand Down Expand Up @@ -268,7 +268,7 @@ class NorthConnector {
this.logger.error(error)
if (this.filesRetryCount < this.caching.retryCount || this.shouldRetry(error)) {
this.filesRetryCount += 1
this.logger.debug(`Retrying in ${this.caching.retryInterval}. Retry count: ${this.filesRetryCount}`)
this.logger.debug(`Retrying in ${this.caching.retryInterval} ms. Retry count: ${this.filesRetryCount}`)
} else {
this.logger.debug('Too many retries. Moving file to error cache...')
await this.fileCache.manageErroredFiles(fileToSend.path)
Expand Down

0 comments on commit 91ac4fc

Please sign in to comment.