Skip to content

Commit

Permalink
efficient Promise for StreamedConverter & it's test
Browse files Browse the repository at this point in the history
  • Loading branch information
BorisTherin committed Apr 7, 2024
1 parent f1bc606 commit 778689a
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 49 deletions.
18 changes: 12 additions & 6 deletions src/convert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@ import { StreamedConverter, regexp } from './utils'
const reg: regexp[] = [
{ from: /\\/, to: ','},
{ from: /\t/g, to: ','}
]
];

let res

( async () => {
res = await new StreamedConverter(
'./data_pipeline_tests/data/estat_proj_19np.tsv',
reg ,
false
).toFile('./data_pipeline_tests/data/estat_proj_19np_csvCleaned.csv')
console.log(res)
})()

new StreamedConverter(
'../data_pipeline_tests/data/estat_proj_19np.tsv',
reg ,
true
).toFile('../data_pipeline_tests/data/estat_proj_19np_csvCleaned.csv')
11 changes: 5 additions & 6 deletions src/dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const populationProjection_2019_2024_Ingester = new DataIngester.DecoderLecoGith
);

// https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/data/demo_pjan/?format=SDMX-CSV
const pjan = new DataIngester.DecoderLecoEurostatDataIngester('demo_pjan','/?format=SDMX-CSV')
const pjan = new DataIngester.DecoderLecoEurostatDataIngester('demo_pjan','SDMX-CSV')

const runExamplePipeline = async(): Promise<pl.DataFrame> => {

Expand All @@ -29,9 +29,8 @@ const runExamplePipeline = async(): Promise<pl.DataFrame> => {
//runExamplePipeline()
//( async () => await populationProjection_2019_2024_Ingester.run() )()
( async () => {
/*
await pjan.createDir()
console.log( await pjan.download() )
*/
console.log((await pjan.run()).join("\n"))
const [ logDir, logDL ] = await pjan.run()
//console.log((await pjan.run()).join("\n"))
console.log('dir: '+logDir)
console.log('DL: '+logDL)
})()
13 changes: 7 additions & 6 deletions src/ingesters/DecoderLecoEurostatDataIngester.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import * as fs from "node:fs"
**/
export class DecoderLecoEurostatDataIngester {
static baseUrl: string = "https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/data"
static defaultWorkdir: string = `./data_pipeline_workdir/eurostat`
// static format: String = "/?format=SDMX-CSV"

/**
Expand All @@ -22,23 +23,23 @@ export class DecoderLecoEurostatDataIngester {
throw new Error(`[DecoderLecoEurostatDataIngester] - [filePathInEurostat] second argument of constructor must not be an ampty string`);
}
this.filePathInEurostat = filePathInEurostat;
this.format = format
this.dataWorkDir = dataWorkDir || `./data_pipeline_workdir/eurostat`
this.format = '/?format='+format
this.dataWorkDir = dataWorkDir || DecoderLecoEurostatDataIngester.defaultWorkdir
}

getIngestedDataFileFolderPath(): string {
return this.dataWorkDir || `./data_pipeline_workdir/eurostat`
return this.dataWorkDir || DecoderLecoEurostatDataIngester.defaultWorkdir
}

async run(): Promise<string[]> {
const returnMsg: string[] = [
await this.createDir(),
this.createDir(),
await this.download()
]
return returnMsg
}

async createDir(): Promise<string> {
createDir(): string {
const folderToCreate = `${this.dataWorkDir}`;
let returnMsg: string = 'pending'
if (!fs.existsSync(folderToCreate)) {
Expand Down Expand Up @@ -77,7 +78,7 @@ export class DecoderLecoEurostatDataIngester {
mode: 0o666
},
)
returnMsg = `File ${this.filePathInEurostat} has succesfully been writed to ${this.dataWorkDir}/${this.filePathInEurostat}.csv`
returnMsg = `File ${this.dataWorkDir}/${this.filePathInEurostat}.csv has succesfully been writed`
} catch (err) {
returnMsg = "error while writing " + this.filePathInEurostat + ": ", err
throw new Error(`Failed to write File ${this.filePathInEurostat} to ${this.dataWorkDir}/${this.filePathInEurostat}.csv`, {
Expand Down
52 changes: 29 additions & 23 deletions src/utils/StreamedConverter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,41 @@ export class StreamedConverter {
* @param rgx [ { from: /\\/, to: ','}, { from: /\t/g, to: ','}, ... ] (type: regexp[])
* @param verbose verbose mode (type: boolean)
*/

constructor(protected file: string, protected rgx: regexp[], protected verbose: boolean = false) {
this.file = file
this.rgx = rgx
this.verbose = verbose || false
}

toFile(dest: string) {
const readStream: fs.ReadStream = fs.createReadStream(this.file);
const writeStream: fs.WriteStream = fs.createWriteStream(dest)
let inc: number = 0
const start: number = Date.now()
try {
readStream.on('data', chunk => {
// process the data chunk
let data: string = chunk.toString()
this.rgx.forEach( (reg: regexp) => data = data.replace( reg.from, reg.to ))
writeStream.write(data)
inc++
})

readStream.on('end', () => {
if (this.verbose) console.log(
`file has been converted completely in ${dest}\n(${inc} chunks parsed in ${Date.now() - start} ms)`
)
return(`file has been converted completely in ${dest}\n(${inc} chunks parsed in ${Date.now() - start} ms)`)
})
async toFile(dest: string): Promise<string> {
return new Promise( (resolve, reject) => {

const readStream: fs.ReadStream = fs.createReadStream(this.file);
const writeStream: fs.WriteStream = fs.createWriteStream(dest)
let inc: number = 0
const start: number = Date.now()

try {
readStream.on('data', chunk => {
// process the data chunk
let data: string = chunk.toString()
this.rgx.forEach( (reg: regexp) => data = data.replace( reg.from, reg.to ))
writeStream.write(data)
inc++
})

readStream.on('end', () => {
if (this.verbose) console.log(
`File has been converted completely in ${dest}\n(${inc} chunks parsed in ${Date.now() - start} ms)`
)
resolve(`File has been converted completely in ${dest}\n(${inc} chunks parsed in ${Date.now() - start} ms)`)
})

} catch (err) {
console.log(`error: ${err}`)
}
} catch (err) {
console.log(`error: ${err}`)
reject('error: '+ err)
}
})
}
}
10 changes: 7 additions & 3 deletions tests/ingesters/DecoderLecoEurostatDataIngester.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ describe('Testing - DecoderLecoGithubDataIngester', () => {
expect(fs.existsSync(`${testDataWorkDir}/`)).toBe(true)

const result = await ingester.createDir()

// Test du retour createDir
expect(result.split(" ")[0]).toEqual('Skipped')
// Test de la presence du diretory créé
Expand Down Expand Up @@ -59,7 +60,9 @@ describe('Testing - DecoderLecoGithubDataIngester', () => {
it('createDir shall create the directory when it doesnt exist', async () => {
// Test de la presence du directory
expect(fs.existsSync(`${testDataWorkDir}/`)).toBe(false)

const result = await ingester.createDir()

// Test du retour createDir
expect(result.split(" ")[0]).toEqual('directory')
// Test de la presence du directory
Expand All @@ -84,14 +87,15 @@ describe('Testing - DecoderLecoGithubDataIngester', () => {
const result = await ingester.download()

// Test du retour de download()
expect(result.split(" ")[0]).toEqual("File")

expect(result.split(" ")[0]).toEqual("File")
// Test de la presence du fichier à ingest dans sa destination
expect(fs.existsSync(`${testDataWorkDir}/${testFilePathInEurostat}.csv`)).toBe(true)
// Test if the file isnt empty
expect(fs.readFileSync(`${testDataWorkDir}/${testFilePathInEurostat}.csv`, { encoding: 'utf8', flag: 'r' }).length == 0 ).toBe(false)
// Test if the file containt "404 error" as text
expect(fs.readFileSync(`${testDataWorkDir}/${testFilePathInEurostat}.csv`, { encoding: 'utf8', flag: 'r' }) == "404 error" ).toBe(false)
}, 500000)
},
// large timeout for large files
500000)
})
})
11 changes: 6 additions & 5 deletions tests/utils/StreamedConverter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,19 @@ describe('Test de la convertion en mode stream', () => {

expect(fs.existsSync(destfile)).toBe(false)

new StreamedConverter(
const res = await new StreamedConverter(
sourcefile,
[
{ from: /\\/, to: ','},
{ from: /\t/g, to: ','}
],
false
).toFile(destfile)

// Test du retour de la methode toFile
expect(res.split(" ")[0]).toEqual("File")
// Test de l'existence du fichier converti
expect(fs.existsSync(destfile)).toBe(true)

process.nextTick(
() => { expect(fs.existsSync(destfile)).toBe(true) }
)
expect.assertions(1)
})
})

0 comments on commit 778689a

Please sign in to comment.