diff --git a/src/convert.ts b/src/convert.ts index 48d6ef1..bdb347d 100644 --- a/src/convert.ts +++ b/src/convert.ts @@ -8,10 +8,16 @@ import { StreamedConverter, regexp } from './utils' const reg: regexp[] = [ { from: /\\/, to: ','}, { from: /\t/g, to: ','} -] +]; + +let res + +( async () => { + res = await new StreamedConverter( + './data_pipeline_tests/data/estat_proj_19np.tsv', + reg , + false + ).toFile('./data_pipeline_tests/data/estat_proj_19np_csvCleaned.csv') + console.log(res) +})() -new StreamedConverter( - '../data_pipeline_tests/data/estat_proj_19np.tsv', - reg , - true -).toFile('../data_pipeline_tests/data/estat_proj_19np_csvCleaned.csv') \ No newline at end of file diff --git a/src/dev.ts b/src/dev.ts index dd2b777..4d37de4 100644 --- a/src/dev.ts +++ b/src/dev.ts @@ -9,7 +9,7 @@ const populationProjection_2019_2024_Ingester = new DataIngester.DecoderLecoGith ); // https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/data/demo_pjan/?format=SDMX-CSV -const pjan = new DataIngester.DecoderLecoEurostatDataIngester('demo_pjan','/?format=SDMX-CSV') +const pjan = new DataIngester.DecoderLecoEurostatDataIngester('demo_pjan','SDMX-CSV') const runExamplePipeline = async(): Promise => { @@ -29,9 +29,8 @@ const runExamplePipeline = async(): Promise => { //runExamplePipeline() //( async () => await populationProjection_2019_2024_Ingester.run() )() ( async () => { - /* - await pjan.createDir() - console.log( await pjan.download() ) - */ - console.log((await pjan.run()).join("\n")) + const [ logDir, logDL ] = await pjan.run() + //console.log((await pjan.run()).join("\n")) + console.log('dir: '+logDir) + console.log('DL: '+logDL) })() \ No newline at end of file diff --git a/src/ingesters/DecoderLecoEurostatDataIngester.ts b/src/ingesters/DecoderLecoEurostatDataIngester.ts index c94d557..ea44936 100644 --- a/src/ingesters/DecoderLecoEurostatDataIngester.ts +++ b/src/ingesters/DecoderLecoEurostatDataIngester.ts @@ -10,6 +10,7 @@ import * as fs from "node:fs" **/ export class DecoderLecoEurostatDataIngester { static baseUrl: string = "https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/data" + static defaultWorkdir: string = `./data_pipeline_workdir/eurostat` // static format: String = "/?format=SDMX-CSV" /** @@ -22,23 +23,23 @@ export class DecoderLecoEurostatDataIngester { throw new Error(`[DecoderLecoEurostatDataIngester] - [filePathInEurostat] second argument of constructor must not be an ampty string`); } this.filePathInEurostat = filePathInEurostat; - this.format = format - this.dataWorkDir = dataWorkDir || `./data_pipeline_workdir/eurostat` + this.format = '/?format='+format + this.dataWorkDir = dataWorkDir || DecoderLecoEurostatDataIngester.defaultWorkdir } getIngestedDataFileFolderPath(): string { - return this.dataWorkDir || `./data_pipeline_workdir/eurostat` + return this.dataWorkDir || DecoderLecoEurostatDataIngester.defaultWorkdir } async run(): Promise { const returnMsg: string[] = [ - await this.createDir(), + this.createDir(), await this.download() ] return returnMsg } - async createDir(): Promise { + createDir(): string { const folderToCreate = `${this.dataWorkDir}`; let returnMsg: string = 'pending' if (!fs.existsSync(folderToCreate)) { @@ -77,7 +78,7 @@ export class DecoderLecoEurostatDataIngester { mode: 0o666 }, ) - returnMsg = `File ${this.filePathInEurostat} has succesfully been writed to ${this.dataWorkDir}/${this.filePathInEurostat}.csv` + returnMsg = `File ${this.dataWorkDir}/${this.filePathInEurostat}.csv has succesfully been writed` } catch (err) { returnMsg = "error while writing " + this.filePathInEurostat + ": ", err throw new Error(`Failed to write File ${this.filePathInEurostat} to ${this.dataWorkDir}/${this.filePathInEurostat}.csv`, { diff --git a/src/utils/StreamedConverter.ts b/src/utils/StreamedConverter.ts index cc72f1a..15b3014 100644 --- a/src/utils/StreamedConverter.ts +++ b/src/utils/StreamedConverter.ts @@ -25,35 +25,41 @@ export class StreamedConverter { * @param rgx [ { from: /\\/, to: ','}, { from: /\t/g, to: ','}, ... ] (type: regexp[]) * @param verbose verbose mode (type: boolean) */ + constructor(protected file: string, protected rgx: regexp[], protected verbose: boolean = false) { this.file = file this.rgx = rgx this.verbose = verbose || false } - toFile(dest: string) { - const readStream: fs.ReadStream = fs.createReadStream(this.file); - const writeStream: fs.WriteStream = fs.createWriteStream(dest) - let inc: number = 0 - const start: number = Date.now() - try { - readStream.on('data', chunk => { - // process the data chunk - let data: string = chunk.toString() - this.rgx.forEach( (reg: regexp) => data = data.replace( reg.from, reg.to )) - writeStream.write(data) - inc++ - }) - - readStream.on('end', () => { - if (this.verbose) console.log( - `file has been converted completely in ${dest}\n(${inc} chunks parsed in ${Date.now() - start} ms)` - ) - return(`file has been converted completely in ${dest}\n(${inc} chunks parsed in ${Date.now() - start} ms)`) - }) + async toFile(dest: string): Promise { + return new Promise( (resolve, reject) => { + + const readStream: fs.ReadStream = fs.createReadStream(this.file); + const writeStream: fs.WriteStream = fs.createWriteStream(dest) + let inc: number = 0 + const start: number = Date.now() + + try { + readStream.on('data', chunk => { + // process the data chunk + let data: string = chunk.toString() + this.rgx.forEach( (reg: regexp) => data = data.replace( reg.from, reg.to )) + writeStream.write(data) + inc++ + }) + + readStream.on('end', () => { + if (this.verbose) console.log( + `File has been converted completely in ${dest}\n(${inc} chunks parsed in ${Date.now() - start} ms)` + ) + resolve(`File has been converted completely in ${dest}\n(${inc} chunks parsed in ${Date.now() - start} ms)`) + }) - } catch (err) { - console.log(`error: ${err}`) - } + } catch (err) { + console.log(`error: ${err}`) + reject('error: '+ err) + } + }) } } \ No newline at end of file diff --git a/tests/ingesters/DecoderLecoEurostatDataIngester.test.ts b/tests/ingesters/DecoderLecoEurostatDataIngester.test.ts index 4b0f1ff..a5ebe8c 100644 --- a/tests/ingesters/DecoderLecoEurostatDataIngester.test.ts +++ b/tests/ingesters/DecoderLecoEurostatDataIngester.test.ts @@ -30,6 +30,7 @@ describe('Testing - DecoderLecoGithubDataIngester', () => { expect(fs.existsSync(`${testDataWorkDir}/`)).toBe(true) const result = await ingester.createDir() + // Test du retour createDir expect(result.split(" ")[0]).toEqual('Skipped') // Test de la presence du diretory créé @@ -59,7 +60,9 @@ describe('Testing - DecoderLecoGithubDataIngester', () => { it('createDir shall create the directory when it doesnt exist', async () => { // Test de la presence du directory expect(fs.existsSync(`${testDataWorkDir}/`)).toBe(false) + const result = await ingester.createDir() + // Test du retour createDir expect(result.split(" ")[0]).toEqual('directory') // Test de la presence du directory @@ -84,14 +87,15 @@ describe('Testing - DecoderLecoGithubDataIngester', () => { const result = await ingester.download() // Test du retour de download() - expect(result.split(" ")[0]).toEqual("File") - + expect(result.split(" ")[0]).toEqual("File") // Test de la presence du fichier à ingest dans sa destination expect(fs.existsSync(`${testDataWorkDir}/${testFilePathInEurostat}.csv`)).toBe(true) // Test if the file isnt empty expect(fs.readFileSync(`${testDataWorkDir}/${testFilePathInEurostat}.csv`, { encoding: 'utf8', flag: 'r' }).length == 0 ).toBe(false) // Test if the file containt "404 error" as text expect(fs.readFileSync(`${testDataWorkDir}/${testFilePathInEurostat}.csv`, { encoding: 'utf8', flag: 'r' }) == "404 error" ).toBe(false) - }, 500000) + }, + // large timeout for large files + 500000) }) }) \ No newline at end of file diff --git a/tests/utils/StreamedConverter.test.ts b/tests/utils/StreamedConverter.test.ts index a39acd1..76f60ee 100644 --- a/tests/utils/StreamedConverter.test.ts +++ b/tests/utils/StreamedConverter.test.ts @@ -25,7 +25,7 @@ describe('Test de la convertion en mode stream', () => { expect(fs.existsSync(destfile)).toBe(false) - new StreamedConverter( + const res = await new StreamedConverter( sourcefile, [ { from: /\\/, to: ','}, @@ -33,10 +33,11 @@ describe('Test de la convertion en mode stream', () => { ], false ).toFile(destfile) + + // Test du retour de la methode toFile + expect(res.split(" ")[0]).toEqual("File") + // Test de l'existence du fichier converti + expect(fs.existsSync(destfile)).toBe(true) - process.nextTick( - () => { expect(fs.existsSync(destfile)).toBe(true) } - ) - expect.assertions(1) }) }) \ No newline at end of file