Skip to content

Commit

Permalink
fix(NODE-5587): recursive calls to next cause memory leak (#3842)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken authored Aug 29, 2023
1 parent 2fab06b commit f60f1b5
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 76 deletions.
139 changes: 65 additions & 74 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ export abstract class AbstractCursor<
return this[kId] ?? undefined;
}

/** @internal */
get isDead() {
return (this[kId]?.isZero() ?? false) || this[kClosed] || this[kKilled];
}

/** @internal */
get client(): MongoClient {
return this[kClient];
Expand Down Expand Up @@ -671,7 +676,7 @@ export abstract class AbstractCursor<
return cleanupCursor(this, { error }, () => callback(error, undefined));
}

if (cursorIsDead(this)) {
if (this.isDead) {
return cleanupCursor(this, undefined, () => callback());
}

Expand Down Expand Up @@ -701,96 +706,82 @@ async function next<T>(
transform: boolean;
}
): Promise<T | null> {
const cursorId = cursor[kId];
if (cursor.closed) {
return null;
}

if (cursor[kDocuments].length !== 0) {
const doc = cursor[kDocuments].shift();
do {
if (cursor[kId] == null) {
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
await promisify(cursor[kInit].bind(cursor))();
}

if (cursor[kDocuments].length !== 0) {
const doc = cursor[kDocuments].shift();

if (doc != null && transform && cursor[kTransform]) {
try {
return cursor[kTransform](doc);
} catch (error) {
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => {
if (doc != null && transform && cursor[kTransform]) {
try {
return cursor[kTransform](doc);
} catch (error) {
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
// error instead.
});
throw error;
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => null);
throw error;
}
}
}

return doc;
}
return doc;
}

if (cursorId == null) {
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
const init = promisify(cb => cursor[kInit](cb));
await init();
return next(cursor, { blocking, transform });
}
if (cursor.isDead) {
// if the cursor is dead, we clean it up
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
// and we should surface the error
await cleanupCursorAsync(cursor, {});
return null;
}

if (cursorIsDead(cursor)) {
// if the cursor is dead, we clean it up
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
// and we should surface the error
await cleanupCursorAsync(cursor, {});
return null;
}
// otherwise need to call getMore
const batchSize = cursor[kOptions].batchSize || 1000;

// otherwise need to call getMore
const batchSize = cursor[kOptions].batchSize || 1000;
const getMore = promisify((batchSize: number, cb: Callback<Document | undefined>) =>
cursor._getMore(batchSize, cb)
);

let response: Document | undefined;
try {
response = await getMore(batchSize);
} catch (error) {
if (error) {
await cleanupCursorAsync(cursor, { error }).catch(() => {
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
// error instead.
});
try {
const response = await promisify(cursor._getMore.bind(cursor))(batchSize);

if (response) {
const cursorId =
typeof response.cursor.id === 'number'
? Long.fromNumber(response.cursor.id)
: typeof response.cursor.id === 'bigint'
? Long.fromBigInt(response.cursor.id)
: response.cursor.id;

cursor[kDocuments].pushMany(response.cursor.nextBatch);
cursor[kId] = cursorId;
}
} catch (error) {
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
// error instead.
await cleanupCursorAsync(cursor, { error }).catch(() => null);
throw error;
}
}

if (response) {
const cursorId =
typeof response.cursor.id === 'number'
? Long.fromNumber(response.cursor.id)
: typeof response.cursor.id === 'bigint'
? Long.fromBigInt(response.cursor.id)
: response.cursor.id;

cursor[kDocuments].pushMany(response.cursor.nextBatch);
cursor[kId] = cursorId;
}

if (cursorIsDead(cursor)) {
// If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
// we intentionally clean up the cursor to release its session back into the pool before the cursor
// is iterated. This prevents a cursor that is exhausted on the server from holding
// onto a session indefinitely until the AbstractCursor is iterated.
//
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
// and we should surface the error
await cleanupCursorAsync(cursor, {});
}

if (cursor[kDocuments].length === 0 && blocking === false) {
return null;
}
if (cursor.isDead) {
// If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
// we intentionally clean up the cursor to release its session back into the pool before the cursor
// is iterated. This prevents a cursor that is exhausted on the server from holding
// onto a session indefinitely until the AbstractCursor is iterated.
//
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
// and we should surface the error
await cleanupCursorAsync(cursor, {});
}

return next(cursor, { blocking, transform });
}
if (cursor[kDocuments].length === 0 && blocking === false) {
return null;
}
} while (!cursor.isDead || cursor[kDocuments].length !== 0);

function cursorIsDead(cursor: AbstractCursor): boolean {
const cursorId = cursor[kId];
return !!cursorId && cursorId.isZero();
return null;
}

const cleanupCursorAsync = promisify(cleanupCursor);
Expand Down
6 changes: 4 additions & 2 deletions src/cursor/find_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Document } from '../bson';
import { type Document, Long } from '../bson';
import { MongoInvalidArgumentError, MongoTailableCursorError } from '../error';
import type { ExplainVerbosityLike } from '../explain';
import type { MongoClient } from '../mongo_client';
Expand Down Expand Up @@ -101,7 +101,9 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
limit && limit > 0 && numReturned + batchSize > limit ? limit - numReturned : batchSize;

if (batchSize <= 0) {
this.close().finally(() => callback());
this.close().finally(() =>
callback(undefined, { cursor: { id: Long.ZERO, nextBatch: [] } })
);
return;
}
}
Expand Down

0 comments on commit f60f1b5

Please sign in to comment.