From 01410ef3459d462d24da3233f8aef5a878a89524 Mon Sep 17 00:00:00 2001 From: Nicolas Thouvenin Date: Fri, 6 Dec 2024 17:36:47 +0100 Subject: [PATCH] introduce [precomputedSelect] --- packages/ezsLodex/src/index.js | 3 + packages/ezsLodex/src/precomputedSelect.js | 70 ++++++++++++++++++++++ src/api/services/enrichment/enrichment.js | 6 +- 3 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 packages/ezsLodex/src/precomputedSelect.js diff --git a/packages/ezsLodex/src/index.js b/packages/ezsLodex/src/index.js index 09a4ff552..36fe639ed 100644 --- a/packages/ezsLodex/src/index.js +++ b/packages/ezsLodex/src/index.js @@ -25,6 +25,7 @@ import buildContext from './buildContext'; import aggregateQuery from './aggregateQuery'; import LodexJoinQuery from './joinQuery'; import saveDocuments from './saveDocuments.js'; +import precomputedSelect from './precomputedSelect.js'; const funcs = { flattenPatch, @@ -53,6 +54,7 @@ const funcs = { aggregateQuery, LodexJoinQuery, saveDocuments, + precomputedSelect, // aliases fixFlatten: flattenPatch.flattenPatch, LodexContext: disabled.disabled, @@ -71,6 +73,7 @@ const funcs = { LodexInjectCountFrom: injectCountFrom.injectCountFrom, LodexAggregateQuery: aggregateQuery.aggregateQuery, LodexSaveDocuments: saveDocuments.saveDocuments, + LodexPrecomputedSelect: precomputedSelect.precomputedSelect, }; export default funcs; diff --git a/packages/ezsLodex/src/precomputedSelect.js b/packages/ezsLodex/src/precomputedSelect.js new file mode 100644 index 000000000..628189347 --- /dev/null +++ b/packages/ezsLodex/src/precomputedSelect.js @@ -0,0 +1,70 @@ +import set from 'lodash/set'; +import mongoDatabase from './mongoDatabase'; + +/** + * Take `Object` containing a MongoDB query and throw the result + * + * The input object must contain a `connectionStringURI` property, containing + * the connection string to MongoDB. + * + * @name LodexRunQuery + * @param {String} [collection="publishedDataset"] collection to use + * @param {Object} [filter] MongoDB filter + * @param {Object} [limit] limit the result + * @param {Object} [skip] limit the result + * @returns {Object} + */ +export const createFunction = () => + async function LodexPrecomputedSelect(data, feed) { + if (this.isLast()) { + return feed.close(); + } + const { ezs } = this; + const path = this.getParam('path', 'value'); + const precomputedName = this.getParam('name'); + const filter = this.getParam('filter', data.filter || {}); + const limit = Number(this.getParam('limit', data.limit || 1000000)); + const skip = Number(this.getParam('skip', data.skip || 0)); + const connectionStringURI = this.getParam( + 'connectionStringURI', + this.getEnv('connectionStringURI'), + ); + const db = await mongoDatabase(connectionStringURI); + + const [precomputedObject] = await db + .collection('precomputed') + .find({ name: precomputedName }) + .toArray(); + const collectionName = `pc_${precomputedObject._id.toString()}`; + + const collection = db.collection(collectionName); + let cursor = collection.find(filter); + + const total = await cursor.count(); + if (total === 0) { + return feed.send({ total: 0 }); + } + const func = (d, f, cxt) => { + if (cxt.isLast()) { + return f.close(); + } + if (!d._id) { + return f.end(); + } + delete d._id; + set(data, path, d); + f.send(data); + }; + + const stream = cursor + .skip(skip) + .limit(limit) + .stream() + .on('error', (e) => feed.stop(e)) + .pipe(ezs(func)); + await feed.flow(stream); + }; + +export default { + precomputedSelect: createFunction(), +}; diff --git a/src/api/services/enrichment/enrichment.js b/src/api/services/enrichment/enrichment.js index 88323b4b3..d04bc4c22 100644 --- a/src/api/services/enrichment/enrichment.js +++ b/src/api/services/enrichment/enrichment.js @@ -3,6 +3,7 @@ import path from 'path'; import ezs from '@ezs/core'; import progress from '../../services/progress'; import localConfig from '../../../../config.json'; +import { mongoConnectionString } from '../mongoClient'; import { ObjectId } from 'mongodb'; import from from 'from'; @@ -197,10 +198,13 @@ async function postcheck(data, feed) { const processEzsEnrichment = (entries, commands, ctx, preview = false) => { return new Promise((resolve, reject) => { + const environment = { + connectionStringURI: mongoConnectionString(ctx.tenant), + }; const values = []; from(entries) .pipe(ezs(preformat)) - .pipe(ezs('delegate', { commands }, {})) + .pipe(ezs('delegate', { commands }, environment)) .pipe(ezs(postcheck, { preview }, ctx)) .on('data', (data) => { if (data instanceof Error) {