Skip to content

Commit

Permalink
creation zip function
Browse files Browse the repository at this point in the history
  • Loading branch information
JulienMattiussi committed Oct 23, 2023
1 parent 364922d commit 80c4dcd
Showing 1 changed file with 67 additions and 6 deletions.
73 changes: 67 additions & 6 deletions src/api/services/precomputed/precomputed.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
//TODO - Precomputing Task will be coded in next card

import ezs from '@ezs/core';
//import ezs from '@ezs/core';
import progress from '../progress';
import localConfig from '../../../../config.json';
import tar from 'tar-stream';
import fs from 'fs';
import { pipeline } from 'stream';
import { promisify } from 'util';

import from from 'from';
//import from from 'from';
import {
PENDING as PRECOMPUTED_PENDING,
IN_PROGRESS,
Expand All @@ -19,7 +23,7 @@ import getLogger from '../logger';

const { precomputedBatchSize: BATCH_SIZE = 10 } = localConfig;

const getSourceData = async (ctx, sourceColumns) => {
/*const getSourceData = async (ctx, sourceColumns) => {
const excerptLines = await ctx.dataset.getExcerpt(
sourceColumns
? {
Expand All @@ -37,7 +41,7 @@ const getSourceData = async (ctx, sourceColumns) => {
} catch {
return sourceData;
}
};
};*/

export const getPrecomputedDataPreview = async ctx => {
const { sourceColumns } = ctx.request.body;
Expand Down Expand Up @@ -71,7 +75,7 @@ export const getPrecomputedDataPreview = async ctx => {
return result;
};

const createEzsRuleCommands = rule => ezs.compileScript(rule).get();
//const createEzsRuleCommands = rule => ezs.compileScript(rule).get();

export const getSourceError = error => {
const sourceError = error?.sourceError;
Expand All @@ -81,7 +85,7 @@ export const getSourceError = error => {
return error;
};

function preformat(data, feed) {
/*function preformat(data, feed) {
if (this.isLast()) {
return feed.close();
}
Expand Down Expand Up @@ -130,6 +134,60 @@ const processEzsEnrichment = (entries, commands, ctx, preview = false) => {
.on('end', () => resolve(values))
.on('error', error => reject(error));
});
};*/

const processZippedData = async (precomputed, ctx) => {
const initDate = new Date();
const pack = tar.pack();

const dataSetSize = await ctx.dataset.count();
for (
let indexDataset = 0;
indexDataset < dataSetSize;
indexDataset += BATCH_SIZE
) {
if (!(await ctx.job?.isActive())) {
throw new CancelWorkerError('Job has been canceled');
}
const entries = await ctx.dataset
.find()
.skip(indexDataset)
.limit(BATCH_SIZE)
.toArray();

for (const entry of entries) {
await pack.entry(
{
name: `data/${'f' +
indexDataset.toString().padStart(10, 0)}.json`,
},
JSON.stringify(entry),
);
}
}

const endDate = new Date();
await pack.entry(
{ name: `manifest.json` },
JSON.stringify({
creationDate: endDate.toGMTString(),
updateDate: endDate.toGMTString(),
itemsCounter: dataSetSize,
processingMSTime: endDate - initDate,
version: '1',
generator: 'lodex',
}),
);

await pack.finalize();

const pipe = promisify(pipeline);
const fileName = `./webservice_temp/__entry_${
ctx.tenant
}_${Date.now().toString()}.tar.gz`;
await pipe(pack, fs.createWriteStream(fileName));

return pack;
};

export const processPrecomputed = async (precomputed, ctx) => {
Expand All @@ -138,6 +196,8 @@ export const processPrecomputed = async (precomputed, ctx) => {
let errorCount = 0;

const room = `${ctx.tenant}-precomputed-job-${ctx.job.id}`;

const data = processZippedData(precomputed, ctx);
/*const commands = createEzsRuleCommands(precomputed.rule);
const dataSetSize = await ctx.dataset.count();
for (let index = 0; index < dataSetSize; index += BATCH_SIZE) {
Expand Down Expand Up @@ -239,6 +299,7 @@ export const processPrecomputed = async (precomputed, ctx) => {
}
}
}*/

await ctx.precomputed.updateStatus(precomputed._id, FINISHED, {
errorCount,
});
Expand Down

0 comments on commit 80c4dcd

Please sign in to comment.