Skip to content

Commit

Permalink
introduce [precomputedSelect]
Browse files Browse the repository at this point in the history
  • Loading branch information
touv committed Dec 6, 2024
1 parent 99d23e5 commit 01410ef
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 1 deletion.
3 changes: 3 additions & 0 deletions packages/ezsLodex/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import buildContext from './buildContext';
import aggregateQuery from './aggregateQuery';
import LodexJoinQuery from './joinQuery';
import saveDocuments from './saveDocuments.js';
import precomputedSelect from './precomputedSelect.js';

const funcs = {
flattenPatch,
Expand Down Expand Up @@ -53,6 +54,7 @@ const funcs = {
aggregateQuery,
LodexJoinQuery,
saveDocuments,
precomputedSelect,
// aliases
fixFlatten: flattenPatch.flattenPatch,
LodexContext: disabled.disabled,
Expand All @@ -71,6 +73,7 @@ const funcs = {
LodexInjectCountFrom: injectCountFrom.injectCountFrom,
LodexAggregateQuery: aggregateQuery.aggregateQuery,
LodexSaveDocuments: saveDocuments.saveDocuments,
LodexPrecomputedSelect: precomputedSelect.precomputedSelect,
};

export default funcs;
Expand Down
70 changes: 70 additions & 0 deletions packages/ezsLodex/src/precomputedSelect.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import set from 'lodash/set';
import mongoDatabase from './mongoDatabase';

/**
* Take `Object` containing a MongoDB query and throw the result
*
* The input object must contain a `connectionStringURI` property, containing
* the connection string to MongoDB.
*
* @name LodexRunQuery
* @param {String} [collection="publishedDataset"] collection to use
* @param {Object} [filter] MongoDB filter
* @param {Object} [limit] limit the result
* @param {Object} [skip] limit the result
* @returns {Object}
*/
export const createFunction = () =>
async function LodexPrecomputedSelect(data, feed) {
if (this.isLast()) {
return feed.close();
}
const { ezs } = this;
const path = this.getParam('path', 'value');
const precomputedName = this.getParam('name');
const filter = this.getParam('filter', data.filter || {});
const limit = Number(this.getParam('limit', data.limit || 1000000));
const skip = Number(this.getParam('skip', data.skip || 0));
const connectionStringURI = this.getParam(
'connectionStringURI',
this.getEnv('connectionStringURI'),
);
const db = await mongoDatabase(connectionStringURI);

const [precomputedObject] = await db
.collection('precomputed')
.find({ name: precomputedName })
.toArray();
const collectionName = `pc_${precomputedObject._id.toString()}`;

const collection = db.collection(collectionName);
let cursor = collection.find(filter);

const total = await cursor.count();
if (total === 0) {
return feed.send({ total: 0 });
}
const func = (d, f, cxt) => {
if (cxt.isLast()) {
return f.close();
}
if (!d._id) {
return f.end();
}
delete d._id;
set(data, path, d);
f.send(data);
};

const stream = cursor
.skip(skip)
.limit(limit)
.stream()
.on('error', (e) => feed.stop(e))
.pipe(ezs(func));
await feed.flow(stream);
};

export default {
precomputedSelect: createFunction(),
};
6 changes: 5 additions & 1 deletion src/api/services/enrichment/enrichment.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import path from 'path';
import ezs from '@ezs/core';
import progress from '../../services/progress';
import localConfig from '../../../../config.json';
import { mongoConnectionString } from '../mongoClient';

import { ObjectId } from 'mongodb';
import from from 'from';
Expand Down Expand Up @@ -197,10 +198,13 @@ async function postcheck(data, feed) {

const processEzsEnrichment = (entries, commands, ctx, preview = false) => {
return new Promise((resolve, reject) => {
const environment = {
connectionStringURI: mongoConnectionString(ctx.tenant),
};
const values = [];
from(entries)
.pipe(ezs(preformat))
.pipe(ezs('delegate', { commands }, {}))
.pipe(ezs('delegate', { commands }, environment))
.pipe(ezs(postcheck, { preview }, ctx))
.on('data', (data) => {
if (data instanceof Error) {
Expand Down

0 comments on commit 01410ef

Please sign in to comment.