diff --git a/src/api/services/precomputed/precomputed.js b/src/api/services/precomputed/precomputed.js index 82043f823..a8006bf4a 100644 --- a/src/api/services/precomputed/precomputed.js +++ b/src/api/services/precomputed/precomputed.js @@ -1,10 +1,14 @@ //TODO - Precomputing Task will be coded in next card -import ezs from '@ezs/core'; +//import ezs from '@ezs/core'; import progress from '../progress'; import localConfig from '../../../../config.json'; +import tar from 'tar-stream'; +import fs from 'fs'; +import { pipeline } from 'stream'; +import { promisify } from 'util'; -import from from 'from'; +//import from from 'from'; import { PENDING as PRECOMPUTED_PENDING, IN_PROGRESS, @@ -19,7 +23,7 @@ import getLogger from '../logger'; const { precomputedBatchSize: BATCH_SIZE = 10 } = localConfig; -const getSourceData = async (ctx, sourceColumns) => { +/*const getSourceData = async (ctx, sourceColumns) => { const excerptLines = await ctx.dataset.getExcerpt( sourceColumns ? { @@ -37,7 +41,7 @@ const getSourceData = async (ctx, sourceColumns) => { } catch { return sourceData; } -}; +};*/ export const getPrecomputedDataPreview = async ctx => { const { sourceColumns } = ctx.request.body; @@ -71,7 +75,7 @@ export const getPrecomputedDataPreview = async ctx => { return result; }; -const createEzsRuleCommands = rule => ezs.compileScript(rule).get(); +//const createEzsRuleCommands = rule => ezs.compileScript(rule).get(); export const getSourceError = error => { const sourceError = error?.sourceError; @@ -81,7 +85,7 @@ export const getSourceError = error => { return error; }; -function preformat(data, feed) { +/*function preformat(data, feed) { if (this.isLast()) { return feed.close(); } @@ -130,6 +134,60 @@ const processEzsEnrichment = (entries, commands, ctx, preview = false) => { .on('end', () => resolve(values)) .on('error', error => reject(error)); }); +};*/ + +const processZippedData = async (precomputed, ctx) => { + const initDate = new Date(); + const pack = tar.pack(); + + const dataSetSize = await ctx.dataset.count(); + for ( + let indexDataset = 0; + indexDataset < dataSetSize; + indexDataset += BATCH_SIZE + ) { + if (!(await ctx.job?.isActive())) { + throw new CancelWorkerError('Job has been canceled'); + } + const entries = await ctx.dataset + .find() + .skip(indexDataset) + .limit(BATCH_SIZE) + .toArray(); + + for (const entry of entries) { + await pack.entry( + { + name: `data/${'f' + + indexDataset.toString().padStart(10, 0)}.json`, + }, + JSON.stringify(entry), + ); + } + } + + const endDate = new Date(); + await pack.entry( + { name: `manifest.json` }, + JSON.stringify({ + creationDate: endDate.toGMTString(), + updateDate: endDate.toGMTString(), + itemsCounter: dataSetSize, + processingMSTime: endDate - initDate, + version: '1', + generator: 'lodex', + }), + ); + + await pack.finalize(); + + const pipe = promisify(pipeline); + const fileName = `./webservice_temp/__entry_${ + ctx.tenant + }_${Date.now().toString()}.tar.gz`; + await pipe(pack, fs.createWriteStream(fileName)); + + return pack; }; export const processPrecomputed = async (precomputed, ctx) => { @@ -138,6 +196,8 @@ export const processPrecomputed = async (precomputed, ctx) => { let errorCount = 0; const room = `${ctx.tenant}-precomputed-job-${ctx.job.id}`; + + const data = processZippedData(precomputed, ctx); /*const commands = createEzsRuleCommands(precomputed.rule); const dataSetSize = await ctx.dataset.count(); for (let index = 0; index < dataSetSize; index += BATCH_SIZE) { @@ -239,6 +299,7 @@ export const processPrecomputed = async (precomputed, ctx) => { } } }*/ + await ctx.precomputed.updateStatus(precomputed._id, FINISHED, { errorCount, });