diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 97f168faee0..94e1a5a4fb5 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -220,6 +220,11 @@ export abstract class AbstractCursor< return this[kId] ?? undefined; } + /** @internal */ + get isDead() { + return (this[kId]?.isZero() ?? false) || this[kClosed] || this[kKilled]; + } + /** @internal */ get client(): MongoClient { return this[kClient]; @@ -671,7 +676,7 @@ export abstract class AbstractCursor< return cleanupCursor(this, { error }, () => callback(error, undefined)); } - if (cursorIsDead(this)) { + if (this.isDead) { return cleanupCursor(this, undefined, () => callback()); } @@ -701,96 +706,82 @@ async function next( transform: boolean; } ): Promise { - const cursorId = cursor[kId]; if (cursor.closed) { return null; } - if (cursor[kDocuments].length !== 0) { - const doc = cursor[kDocuments].shift(); + do { + if (cursor[kId] == null) { + // All cursors must operate within a session, one must be made implicitly if not explicitly provided + await promisify(cursor[kInit].bind(cursor))(); + } + + if (cursor[kDocuments].length !== 0) { + const doc = cursor[kDocuments].shift(); - if (doc != null && transform && cursor[kTransform]) { - try { - return cursor[kTransform](doc); - } catch (error) { - await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => { + if (doc != null && transform && cursor[kTransform]) { + try { + return cursor[kTransform](doc); + } catch (error) { // `cleanupCursorAsync` should never throw, but if it does we want to throw the original // error instead. - }); - throw error; + await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => null); + throw error; + } } - } - return doc; - } + return doc; + } - if (cursorId == null) { - // All cursors must operate within a session, one must be made implicitly if not explicitly provided - const init = promisify(cb => cursor[kInit](cb)); - await init(); - return next(cursor, { blocking, transform }); - } + if (cursor.isDead) { + // if the cursor is dead, we clean it up + // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver + // and we should surface the error + await cleanupCursorAsync(cursor, {}); + return null; + } - if (cursorIsDead(cursor)) { - // if the cursor is dead, we clean it up - // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver - // and we should surface the error - await cleanupCursorAsync(cursor, {}); - return null; - } + // otherwise need to call getMore + const batchSize = cursor[kOptions].batchSize || 1000; - // otherwise need to call getMore - const batchSize = cursor[kOptions].batchSize || 1000; - const getMore = promisify((batchSize: number, cb: Callback) => - cursor._getMore(batchSize, cb) - ); - - let response: Document | undefined; - try { - response = await getMore(batchSize); - } catch (error) { - if (error) { - await cleanupCursorAsync(cursor, { error }).catch(() => { - // `cleanupCursorAsync` should never throw, but if it does we want to throw the original - // error instead. - }); + try { + const response = await promisify(cursor._getMore.bind(cursor))(batchSize); + + if (response) { + const cursorId = + typeof response.cursor.id === 'number' + ? Long.fromNumber(response.cursor.id) + : typeof response.cursor.id === 'bigint' + ? Long.fromBigInt(response.cursor.id) + : response.cursor.id; + + cursor[kDocuments].pushMany(response.cursor.nextBatch); + cursor[kId] = cursorId; + } + } catch (error) { + // `cleanupCursorAsync` should never throw, but if it does we want to throw the original + // error instead. + await cleanupCursorAsync(cursor, { error }).catch(() => null); throw error; } - } - - if (response) { - const cursorId = - typeof response.cursor.id === 'number' - ? Long.fromNumber(response.cursor.id) - : typeof response.cursor.id === 'bigint' - ? Long.fromBigInt(response.cursor.id) - : response.cursor.id; - - cursor[kDocuments].pushMany(response.cursor.nextBatch); - cursor[kId] = cursorId; - } - - if (cursorIsDead(cursor)) { - // If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted, - // we intentionally clean up the cursor to release its session back into the pool before the cursor - // is iterated. This prevents a cursor that is exhausted on the server from holding - // onto a session indefinitely until the AbstractCursor is iterated. - // - // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver - // and we should surface the error - await cleanupCursorAsync(cursor, {}); - } - if (cursor[kDocuments].length === 0 && blocking === false) { - return null; - } + if (cursor.isDead) { + // If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted, + // we intentionally clean up the cursor to release its session back into the pool before the cursor + // is iterated. This prevents a cursor that is exhausted on the server from holding + // onto a session indefinitely until the AbstractCursor is iterated. + // + // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver + // and we should surface the error + await cleanupCursorAsync(cursor, {}); + } - return next(cursor, { blocking, transform }); -} + if (cursor[kDocuments].length === 0 && blocking === false) { + return null; + } + } while (!cursor.isDead || cursor[kDocuments].length !== 0); -function cursorIsDead(cursor: AbstractCursor): boolean { - const cursorId = cursor[kId]; - return !!cursorId && cursorId.isZero(); + return null; } const cleanupCursorAsync = promisify(cleanupCursor); diff --git a/src/cursor/find_cursor.ts b/src/cursor/find_cursor.ts index b6dfa62e36f..95c1d53fe60 100644 --- a/src/cursor/find_cursor.ts +++ b/src/cursor/find_cursor.ts @@ -1,4 +1,4 @@ -import type { Document } from '../bson'; +import { type Document, Long } from '../bson'; import { MongoInvalidArgumentError, MongoTailableCursorError } from '../error'; import type { ExplainVerbosityLike } from '../explain'; import type { MongoClient } from '../mongo_client'; @@ -101,7 +101,9 @@ export class FindCursor extends AbstractCursor { limit && limit > 0 && numReturned + batchSize > limit ? limit - numReturned : batchSize; if (batchSize <= 0) { - this.close().finally(() => callback()); + this.close().finally(() => + callback(undefined, { cursor: { id: Long.ZERO, nextBatch: [] } }) + ); return; } }