Skip to content

Commit

Permalink
feat: Enhance eachAsync to process documents in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
khaledosama999 committed Feb 5, 2021
1 parent 068be2f commit 37d39e2
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 10 deletions.
5 changes: 4 additions & 1 deletion docs/api/querycursor.html
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
<span class="method-type">&laquo;Function&raquo;</span> </li></ul><h5>Returns:</h5><ul><li><span class="method-type">&laquo;Promise&raquo;</span> </li></ul><div><p>Marks this cursor as closed. Will stop streaming and subsequent calls to <code>next()</code> will error.</p> </div><hr class="separate-api-elements"><h3 id="querycursor_QueryCursor-eachAsync"><a href="#querycursor_QueryCursor-eachAsync">QueryCursor.prototype.eachAsync()</a></h3><h5>Parameters</h5><ul class="params"><li class="param">fn
<span class="method-type">&laquo;Function&raquo;</span> </li><li class="param">[options]
<span class="method-type">&laquo;Object&raquo;</span> </li><ul style="margin-top: 0.5em"><li>[options.parallel]
<span class="method-type">&laquo;Number&raquo;</span> the number of promises to execute in parallel. Defaults to 1.</li></ul><li class="param">[callback]
<span class="method-type">&laquo;Number&raquo;</span> the number of promises to execute in parallel. Defaults to 1.
<span class="method-type">&laquo;Object&raquo;</span><ul style="margin-top: 0.5em"><li>[options.batchSize]
<span class="method-type">&laquo;Number&raquo;</span> The size of documents processed by the passed in function to eachAsync (works with the parallel option)</li></ul>
</li></ul><li class="param">[callback]
<span class="method-type">&laquo;Function&raquo;</span> executed when all docs have been processed</li></ul><h5>Returns:</h5><ul><li><span class="method-type">&laquo;Promise&raquo;</span> </li></ul><div><p>Execute <code>fn</code> for every document in the cursor. If <code>fn</code> returns a promise, will wait for the promise to resolve before iterating on to the next one. Returns a promise that resolves when done.</p> </div><hr class="separate-api-elements"><h3 id="querycursor_QueryCursor-map"><a href="#querycursor_QueryCursor-map">QueryCursor.prototype.map()</a></h3><h5>Parameters</h5><ul class="params"><li class="param">fn
<span class="method-type">&laquo;Function&raquo;</span> </li></ul><h5>Returns:</h5><ul><li><span class="method-type">&laquo;QueryCursor&raquo;</span> </li></ul><div><p>Registers a transform function which subsequently maps documents retrieved via the streams interface or <code>.next()</code></p>

Expand Down
14 changes: 8 additions & 6 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2225,12 +2225,13 @@ declare module 'mongoose' {
close(callback: (err: CallbackError) => void): void;

/**
* Execute `fn` for every document in the cursor. If `fn` returns a promise,
* Execute `fn` for every document(s) in the cursor. If batchSize is provided
* `fn` will be executed for each batch of documents. If `fn` returns a promise,
* will wait for the promise to resolve before iterating on to the next one.
* Returns a promise that resolves when done.
*/
eachAsync(fn: (doc: DocType) => any, options?: { parallel?: number }): Promise<void>;
eachAsync(fn: (doc: DocType) => any, options?: { parallel?: number }, cb?: (err: CallbackError) => void): void;
eachAsync(fn: (doc: DocType| [DocType]) => any, options?: { parallel?: number, batchSize?: number }): Promise<void>;
eachAsync(fn: (doc: DocType| [DocType]) => any, options?: { parallel?: number, batchSize?: number }, cb?: (err: CallbackError) => void): void;

/**
* Registers a transform function which subsequently maps documents retrieved
Expand Down Expand Up @@ -2393,12 +2394,13 @@ declare module 'mongoose' {
close(callback: (err: CallbackError) => void): void;

/**
* Execute `fn` for every document in the cursor. If `fn` returns a promise,
* Execute `fn` for every document(s) in the cursor. If batchSize is provided
* `fn` will be executed for each batch of documents. If `fn` returns a promise,
* will wait for the promise to resolve before iterating on to the next one.
* Returns a promise that resolves when done.
*/
eachAsync(fn: (doc: any) => any, options?: { parallel?: number }): Promise<void>;
eachAsync(fn: (doc: any) => any, options?: { parallel?: number }, cb?: (err: CallbackError) => void): void;
eachAsync(fn: (doc: any) => any, options?: { parallel?: number, batchSize?: number }): Promise<void>;
eachAsync(fn: (doc: any) => any, options?: { parallel?: number, batchSize?: number }, cb?: (err: CallbackError) => void): void;

/**
* Registers a transform function which subsequently maps documents retrieved
Expand Down
29 changes: 26 additions & 3 deletions lib/helpers/cursor/eachAsync.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const promiseOrCallback = require('../promiseOrCallback');

module.exports = function eachAsync(next, fn, options, callback) {
const parallel = options.parallel || 1;
const batchSize = options.batchSize;
const enqueue = asyncQueue();

return promiseOrCallback(callback, cb => {
Expand All @@ -32,6 +33,7 @@ module.exports = function eachAsync(next, fn, options, callback) {
let drained = false;
let handleResultsInProgress = 0;
let currentDocumentIndex = 0;
let documentsBatch = [];

let error = null;
for (let i = 0; i < parallel; ++i) {
Expand All @@ -57,6 +59,8 @@ module.exports = function eachAsync(next, fn, options, callback) {
if (handleResultsInProgress <= 0) {
finalCallback(null);
}
else if (batchSize && documentsBatch.length)
handleNextResult(documentsBatch, currentDocumentIndex++, handleNextResultCallBack);
return done();
}

Expand All @@ -66,8 +70,25 @@ module.exports = function eachAsync(next, fn, options, callback) {
// make sure we know that we still have a result to handle re: #8422
process.nextTick(() => done());

handleNextResult(doc, currentDocumentIndex++, function(err) {
--handleResultsInProgress;
if (batchSize) {
documentsBatch.push(doc);
}

// If the current documents size is less than the provided patch size don't process the documents yet
if (batchSize && documentsBatch.length !== batchSize) {
setTimeout(() => enqueue(fetch), 0);
return;
}

const docsToProcess = batchSize ? documentsBatch : doc;

function handleNextResultCallBack(err) {
if (batchSize) {
handleResultsInProgress -= documentsBatch.length;
documentsBatch = [];
}
else
--handleResultsInProgress;
if (err != null) {
error = err;
return finalCallback(err);
Expand All @@ -77,7 +98,9 @@ module.exports = function eachAsync(next, fn, options, callback) {
}

setTimeout(() => enqueue(fetch), 0);
});
}

handleNextResult(docsToProcess, currentDocumentIndex++, handleNextResultCallBack);
});
}
}
Expand Down
75 changes: 75 additions & 0 deletions test/helpers/cursor.eachAsync.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,79 @@ describe('eachAsync()', function() {
then(() => eachAsync(next, fn, { parallel: 2 })).
then(() => assert.equal(numDone, max));
});

it('it processes the documents in batches successfully', () => {
const batchSize = 3;
let numberOfDocuments = 0;
const maxNumberOfDocuments = 9;
let numberOfBatchesProcessed = 0;

function next(cb) {
setTimeout(() => {
if (++numberOfDocuments > maxNumberOfDocuments) {
cb(null, null);
}
return cb(null, { id: numberOfDocuments });
}, 0);
}

const fn = (docs, index) => {
assert.equal(docs.length, batchSize);
assert.equal(index, numberOfBatchesProcessed++);
};

return eachAsync(next, fn, { batchSize });
});

it('it processes the documents in batches even if the batch size % document count is not zero successfully', () => {
const batchSize = 3;
let numberOfDocuments = 0;
const maxNumberOfDocuments = 10;
let numberOfBatchesProcessed = 0;

function next(cb) {
setTimeout(() => {
if (++numberOfDocuments > maxNumberOfDocuments) {
cb(null, null);
}
return cb(null, { id: numberOfDocuments });
}, 0);
}

const fn = (docs, index) => {
assert.equal(index, numberOfBatchesProcessed++);
if (index == 3) {
assert.equal(docs.length, 1);
}
else {
assert.equal(docs.length, batchSize);
}
};

return eachAsync(next, fn, { batchSize });
});

it('it processes the documents in batches with the parallel option provided', () => {
const batchSize = 3;
const parallel = 3;
let numberOfDocuments = 0;
const maxNumberOfDocuments = 9;
let numberOfBatchesProcessed = 0;

function next(cb) {
setTimeout(() => {
if (++numberOfDocuments > maxNumberOfDocuments) {
cb(null, null);
}
return cb(null, { id: numberOfDocuments });
}, 0);
}

const fn = (docs, index) => {
assert.equal(index, numberOfBatchesProcessed++);
assert.equal(docs.length, batchSize);
};

return eachAsync(next, fn, { batchSize, parallel });
});
});

0 comments on commit 37d39e2

Please sign in to comment.