Skip to content

Commit

Permalink
added eurostat data ingester + sdmx to csv converter utils
Browse files Browse the repository at this point in the history
  • Loading branch information
Jean-Baptiste-Lasselle committed Apr 7, 2024
1 parent 96e1690 commit ab3e508
Show file tree
Hide file tree
Showing 9 changed files with 395 additions and 5 deletions.
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"dev:docs:astro": "pnpm run gen:api-docs:astro && cd documentation/astro/ && pnpm run dev",
"dev:docs:html": "pnpm run gen:api-docs && serve docs/",
"dev:docs": "pnpm run gen:api-docs && pnpm exec serve docs/",
"test": "jest",
"test": "jest --silent=false",
"test:e2e": "echo 'Run e2e Tests'",
"build": "pnpm exec tsc",
"start": "node dist/dev.js",
Expand All @@ -55,7 +55,8 @@
"dependencies": {
"commander": "^12.0.0",
"fs": "^0.0.1-security",
"nodejs-polars": "^0.10.0"
"nodejs-polars": "^0.10.0",
"redaxios": "^0.5.1"
},
"devDependencies": {
"@types/htmlparser2": "^3.10.7",
Expand Down
7 changes: 7 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

135 changes: 135 additions & 0 deletions src/ingesters/DecoderLecoEurostatDataIngester.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import * as fs from "node:fs"
import axios from 'redaxios';
// import fs from 'fs/promises';
export enum EurostatDatasetFormat {
SDMX_CSV = "SDMX-CSV",
TSV = "TSV"
}
/**
* Ingestion des données csv provenant du repsitory Github https://github.com/decoderleco/deces_europe
*
* - Download the CSV dataset file,
* - Sends the CSV dataset file to kafka (publish to kafka topic)
* - Persists the CSV dataset file to an S3 bucket (using the local filesystem in much too uncomfortable, using an S3 bucket so much better)
* - the S3 bucket (containing the CSV dataset file) is added on a git branch in LakeFS
*
**/
export class DecoderLecoEurostatDataIngester {
static baseUrl: string = "https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/data"

/**
* @param dataset_name
* @param desiredFormat
* @param dataWorkDir
*/
constructor(protected dataset_name: string, protected desiredFormat?: EurostatDatasetFormat, protected dataWorkDir?: string) {
this.dataset_name = dataset_name;
if (!desiredFormat) {
this.desiredFormat = EurostatDatasetFormat.SDMX_CSV;
} else {
this.desiredFormat = desiredFormat;
}

this.dataWorkDir = dataWorkDir || `./data_pipeline_workdir`
}
async run() {
await this.createDir()
await this.download()
}

getIngestedDataFileFolderPath(): string {
let folderPath: string = ``;
folderPath = `${this.dataWorkDir}/${this.dataset_name}`
return folderPath;
}
getIngestedDataFilePath(): string {
let filePath = ``;
switch (this.desiredFormat) {
case EurostatDatasetFormat.SDMX_CSV:
filePath = `${this.getIngestedDataFileFolderPath()}/${this.dataset_name}.sdmx.csv`
break;
case EurostatDatasetFormat.TSV:
filePath = `${this.getIngestedDataFileFolderPath()}/${this.dataset_name}.tsv`
break;
default:
break;
}
return filePath;
}
public getDatasetName(): string {
return this.dataset_name
}
public getFormat(): EurostatDatasetFormat {
return this.desiredFormat || EurostatDatasetFormat.SDMX_CSV
}
public getDatasetDownloadUrl(): string {
const datasetDownloadUrl = `${DecoderLecoEurostatDataIngester.baseUrl}/${this.dataset_name}/?format=${this.desiredFormat}`;
return datasetDownloadUrl;
}
public createDir(): void {
let folderToCreate = `${this.getIngestedDataFileFolderPath()}`;
if (!fs.existsSync(folderToCreate)) {
try {
fs.mkdirSync(folderToCreate, { recursive: true } )
console.log(`directory ${folderToCreate} created`)
} catch (error) {
throw new Error(`Failed to create the [${folderToCreate}] folder.`, { cause: error })
}
} else {
console.info(`Skipped creating the [${folderToCreate}] folder, because it already exists.`)
}
}
/*
getIngestedDataFileName(): string {
let splittedFilePath = this.desiredFormat.split("/");
let filename: string = splittedFilePath[splittedFilePath.length - 1]
return filename;
}
*/

async download() {
// console.log("download: ", DecoderLecoEurostatDataIngester.baseUrl + this.dataset_name + this.desiredFormat)
const datasetDownloadUrl = `${DecoderLecoEurostatDataIngester.baseUrl}/${this.dataset_name}/?format=${this.desiredFormat}`

let fileData = null;
let response = null;
try {
// const downloadLink = 'https://example.com/test.pdf'
response = await axios.get(datasetDownloadUrl, {
// responseType: 'arraybuffer'
responseType: "arrayBuffer"
});
fileData = Buffer.from(response.data, 'binary');
// await fs.writeFile('./file.pdf', fileData);
// console.log('PDF file saved!');
if (response.status != 200 && response.status != 201) {
// console.log((`error while fetching ${DecoderLecoEurostatDataIngester.baseUrl + this.dataset_name + this.desiredFormat}`))
throw new Error(`HTTP - ${response.statusText} - ${response.status} - An Error occured while fetching [${datasetDownloadUrl}]`)
}
} catch (err) {
console.error(err);
throw new Error(`An Error occured while fetching [${this.dataset_name}] Eurostat Dataset dataset [${datasetDownloadUrl}]`, {
cause: err
})
}

const filePath = this.getIngestedDataFilePath();

try {

fs.writeFileSync(filePath, fileData.toString(),
{
encoding: 'utf8',
flag: 'w',
mode: 0o666
},
)
console.log(`[${this.dataset_name}] dataset has succesfully been downloaded from eurostat [${datasetDownloadUrl}], and persisted to File [${filePath}]`)
} catch (err) {
// console.log("error while writing " + this.desiredFormat + ": ", err)
throw new Error(`Failed to persist [${this.dataset_name}] Eurostat dataset to [${filePath}]`, {
cause: err
})
}
}
}
1 change: 1 addition & 0 deletions src/ingesters/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './DecoderLecoGithubDataIngester'
export * from './DecoderLecoEurostatDataIngester'
16 changes: 16 additions & 0 deletions src/utils/SdmxToCsvConverter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/**
* Utility to convert SDMX-CSV formatted data, to CSV formatted data.
*/
export class SdmxToCsvConverter {
/**
* Converts SDMX-CSV formatted (string) data, to CSV formatted (string) data
* @param sdmxData the SDMX-CSV formatted (string) data to convert
*/
public static async convert(sdmxData: string): Promise<string> {
// return sdmxData.replace(/\t\\/,'').replace(/\\/,'').replace(/\t/g, ',')
let toReturn = sdmxData.split(`\t\\`).join(``)
toReturn = toReturn.split(`\\`).join(``)
toReturn = toReturn.split(`\t`).join(`,`)
return toReturn;
}
}
3 changes: 2 additions & 1 deletion src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './PolarsDataFramesUtils'
export * from './PolarsDataFramesUtils'
export * from './SdmxToCsvConverter'
106 changes: 106 additions & 0 deletions tests/ingesters/DecoderLecoEurostatDataIngester.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import * as ingesters from "../../src/ingesters"
import * as fs from 'node:fs'
// ${testDataWorkDir}/${testFilePathInRepo}
// ${ingester.getIngestedDataFileFolderPath()}
const SECONDS = 1000;
const testDataWorkDir = "data_pipeline_tests"
const ingester =
new ingesters.DecoderLecoEurostatDataIngester(`demo_pjan`, ingesters.EurostatDatasetFormat.SDMX_CSV, testDataWorkDir)

describe('Testing - DecoderLecoEurostatDataIngester', () => {

describe('Test when the directory already exist', () => {
beforeEach( () => {
try {
fs.mkdirSync( `${ingester.getIngestedDataFileFolderPath()}`, { recursive: true} )
} catch (err) {
console.log(`in beforeEach creation of [${ingester.getIngestedDataFileFolderPath()}] failed`, err)
}
})

afterEach( () => {
try {
fs.rmSync( `${ingester.getIngestedDataFileFolderPath()}`, { recursive: true })
} catch (err) {
console.log(`In afterEach deletion of [${ingester.getIngestedDataFileFolderPath()}] failed`, err)
}
})

it('createDir shall not create the directory when it already exist', () => {
// Test de la presence du directory
expect(fs.existsSync(`${ingester.getIngestedDataFileFolderPath()}`)).toBe(true)

const result = ingester.createDir()
// Test de la presence du diretory créé
expect(fs.existsSync(`${ingester.getIngestedDataFileFolderPath()}`)).toBe(true)
})
})

describe('Test when the directory doesnt exist', () => {
beforeEach( () => {
try {
if (fs.existsSync(`${ingester.getIngestedDataFileFolderPath()}`)) {
fs.rmSync( `${ingester.getIngestedDataFileFolderPath()}`, { recursive: true })
}
} catch (err) {
console.log(`In beforeEach deletion of [${ingester.getIngestedDataFileFolderPath()}] failed`, err)
}
})

afterEach( () => {
try {
if (fs.existsSync(`${ingester.getIngestedDataFileFolderPath()}`)) {
fs.rmSync( `${ingester.getIngestedDataFileFolderPath()}`, { recursive: true })
}
} catch (err) {
console.log(`In afterEach deletion of [${ingester.getIngestedDataFileFolderPath()}] failed`, err)
}
})

it('createDir shall create the directory when it doesnt exist', () => {
// Test de la presence du directory
expect(fs.existsSync(`${ingester.getIngestedDataFileFolderPath()}`)).toBe(false)
ingester.createDir()
// Test de la presence du directory
expect(fs.existsSync(`${ingester.getIngestedDataFileFolderPath()}`)).toBe(true)
})
})

describe('Tests about the downloaded file', () => {
beforeEach( () => { // Nous aurons besoins du directory local disponible pour les tests suivants
ingester.createDir()
})

afterEach( () => { // Cleanup du directory local
try {
fs.rmSync( `${ingester.getIngestedDataFileFolderPath()}`, { recursive: true })
} catch (err) {
console.log(`in afterEach deletion of [${ingester.getIngestedDataFilePath()}] failed`, err)
}
})

it('Test if the file was downloaded', async () => {
//expect(ingester.download).not.toThrow(Error);
/*
await expect(async () => {
await ingester.download()
// Test de la presence du fichier à ingest dans sa destination
expect(fs.existsSync(`${ingester.getIngestedDataFilePath()}`)).toBe(true)
// Test if the file isnt empty
expect(fs.readFileSync(`${ingester.getIngestedDataFilePath()}`, { encoding: 'utf8', flag: 'r' }).length == 0 ).toBe(false)
// Test if the file contains "404 error" as text
expect(fs.readFileSync(`${ingester.getIngestedDataFilePath()}`, { encoding: 'utf8', flag: 'r' }) == "404 error" ).toBe(false)
}).not.toThrow();
*/
await ingester.download()
// Test de la presence du fichier à ingest dans sa destination
expect(fs.existsSync(`${ingester.getIngestedDataFilePath()}`)).toBe(true)
// Test if the file isnt empty
expect(fs.readFileSync(`${ingester.getIngestedDataFilePath()}`, { encoding: 'utf8', flag: 'r' }).length == 0 ).toBe(false)
// Test if the file contains "404 error" as text
expect(fs.readFileSync(`${ingester.getIngestedDataFilePath()}`, { encoding: 'utf8', flag: 'r' }) == "404 error" ).toBe(false)

}, 600 * SECONDS) // set timeout of that specific test, increased from default 5 seconds, to 600 seconds
})
})
4 changes: 2 additions & 2 deletions tests/ingesters/DecoderLecoGithubDataIngester.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import * as projection from "../../src/ingesters"
import * as ingesters from "../../src/ingesters"
import * as fs from 'node:fs'
// ${testDataWorkDir}/${testFilePathInRepo}
// ${testDataWorkDir}/${ingester.getIngestedDataFileFolderPath()}
const testFilePathInRepo = "data/csv/deces_ireland.csv"
const testDataWorkDir = "data_pipeline_tests"
const ingester =
new projection.DecoderLecoGithubDataIngester("main", testFilePathInRepo, testDataWorkDir)
new ingesters.DecoderLecoGithubDataIngester("main", testFilePathInRepo, testDataWorkDir)

describe('Testing - DecoderLecoGithubDataIngester', () => {

Expand Down
Loading

0 comments on commit ab3e508

Please sign in to comment.