From b1ee918fd83d74fa2a04303981d7ff96b87011c9 Mon Sep 17 00:00:00 2001 From: Warren James Date: Tue, 8 Oct 2024 16:31:46 -0400 Subject: [PATCH 1/4] fix change stream infinite resume --- src/change_stream.ts | 7 +- .../change-streams/change_stream.test.ts | 147 ++++++++++++++++++ 2 files changed, 152 insertions(+), 2 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index aa3e092022..34f92a4477 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -946,7 +946,7 @@ export class ChangeStream< // If the change stream has been closed explicitly, do not process error. if (this[kClosed]) return; - if (isResumableError(changeStreamError, this.cursor.maxWireVersion)) { + if (this.cursor.id != null && isResumableError(changeStreamError, this.cursor.maxWireVersion)) { this._endStream(); this.cursor.close().then(undefined, squashError); @@ -975,7 +975,10 @@ export class ChangeStream< throw new MongoAPIError(CHANGESTREAM_CLOSED_ERROR); } - if (!isResumableError(changeStreamError, this.cursor.maxWireVersion)) { + if ( + this.cursor.id == null || + !isResumableError(changeStreamError, this.cursor.maxWireVersion) + ) { try { await this.close(); } catch (error) { diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 64297ad512..19c9f2a051 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -2061,6 +2061,34 @@ describe('ChangeStream resumability', function () { } ); }); + + context('when the error occurs on the aggregate command', function () { + it( + 'does not resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + const resumableErrorCode = 7; // Host not found + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 2 }, // Fail twice to account for retry attempt in executeOperation which is separate from the change stream's resume attempt + data: { + failCommands: ['aggregate'], + errorCode: resumableErrorCode + } + } as FailPoint); + + changeStream = collection.watch([]); + + await collection.insertOne({ name: 'bailey' }); + + const maybeError = await changeStream.next().catch(e => e); + + expect(maybeError).to.be.instanceof(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(2); + expect(changeStream.closed).to.be.true; + } + ); + }); }); context('#hasNext', function () { @@ -2225,6 +2253,34 @@ describe('ChangeStream resumability', function () { } ); }); + + context('when the error occurs on the aggregate command', function () { + it( + 'does not resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + const resumableErrorCode = 7; // Host not found + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 2 }, // Fail twice to account for retry attempt in executeOperation which is separate from the change stream's resume attempt + data: { + failCommands: ['aggregate'], + errorCode: resumableErrorCode + } + } as FailPoint); + + changeStream = collection.watch([]); + + await collection.insertOne({ name: 'bailey' }); + + const maybeError = await changeStream.hasNext().catch(e => e); + + expect(maybeError).to.be.instanceof(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(2); + expect(changeStream.closed).to.be.true; + } + ); + }); }); context('#tryNext', function () { @@ -2401,6 +2457,34 @@ describe('ChangeStream resumability', function () { } ); }); + + context('when the error occurs on the aggregate command', function () { + it( + 'does not resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + const resumableErrorCode = 7; // Host not found + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 2 }, // Fail twice to account for retry attempt in executeOperation which is separate from the change stream's resume attempt + data: { + failCommands: ['aggregate'], + errorCode: resumableErrorCode + } + } as FailPoint); + + changeStream = collection.watch([]); + + await collection.insertOne({ name: 'bailey' }); + + const maybeError = await changeStream.tryNext().catch(e => e); + + expect(maybeError).to.be.instanceof(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(2); + expect(changeStream.closed).to.be.true; + } + ); + }); }); context('#asyncIterator', function () { @@ -2551,6 +2635,40 @@ describe('ChangeStream resumability', function () { } ); }); + + context('when the error occurs on the aggregate command', function () { + it( + 'does not resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + + const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); + + const resumableErrorCode = 7; + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 2 }, // Account for retry in executeOperation which is separate from change stream's resume + data: { + failCommands: ['aggregate'], + errorCode: resumableErrorCode + } + } as FailPoint); + + try { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const change of changeStream) { + expect.fail('Change stream produced events on an unresumable error'); + } + } catch (error) { + expect(error).to.be.instanceOf(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(2); + expect(changeStream.closed).to.be.true; + } + } + ); + }); }); }); @@ -2721,6 +2839,35 @@ describe('ChangeStream resumability', function () { } ); }); + + context('when the error occurred on the aggregate', function () { + it( + 'does not resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + + const resumableErrorCode = 7; + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 2 }, // account for retry attempt in executeOperation which is separate from change stream's retry + data: { + failCommands: ['aggregate'], + errorCode: resumableErrorCode + } + } as FailPoint); + + const willBeError = once(changeStream, 'change').catch(error => error); + await collection.insertOne({ name: 'bailey' }); + + const error = await willBeError; + + expect(error).to.be.instanceOf(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(2); + expect(changeStream.closed).to.be.true; + } + ); + }); }); it( From 085881437088a57c9e86e0890cf69cbd97fb1dac Mon Sep 17 00:00:00 2001 From: Warren James Date: Wed, 9 Oct 2024 11:57:37 -0400 Subject: [PATCH 2/4] fix prose test --- .../change_streams.prose.test.ts | 49 +++++++++++-------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index 4303bb1eb6..40e3d30233 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -1,3 +1,5 @@ +import { promisify } from 'node:util'; + import { expect } from 'chai'; import { once } from 'events'; import * as sinon from 'sinon'; @@ -5,6 +7,7 @@ import { setTimeout } from 'timers'; import { type ChangeStream, + type Collection, type CommandFailedEvent, type CommandStartedEvent, type CommandSucceededEvent, @@ -12,6 +15,7 @@ import { isHello, LEGACY_HELLO_COMMAND, Long, + type MongoClient, MongoNetworkError, ObjectId, Timestamp @@ -840,8 +844,8 @@ describe('Change Stream prose tests', function () { // 15 - 16 removed by spec describe('Change Stream prose 17-18', function () { - let client; - let coll; + let client: MongoClient; + let coll: Collection; let startAfter; function recordEvent(events, e) { @@ -886,31 +890,36 @@ describe('Change Stream prose tests', function () { // when resuming a change stream. it('$changeStream without results must include startAfter and not resumeAfter', { metadata: { requires: { topology: 'replicaset', mongodb: '>=4.1.1' } }, - test: function (done) { + test: async function () { const events = []; client.on('commandStarted', e => recordEvent(events, e)); const changeStream = coll.watch([], { startAfter }); - this.defer(() => changeStream.close()); - - changeStream.once('change', change => { - expect(change).to.containSubset({ - operationType: 'insert', - fullDocument: { x: 2 } - }); - expect(events).to.be.an('array').with.lengthOf(3); - expect(events[0]).nested.property('$changeStream.startAfter').to.exist; - expect(events[1]).to.equal('error'); - expect(events[2]).nested.property('$changeStream.startAfter').to.exist; - done(); + changeStream.on('error', async e => { + await changeStream.close(e); }); - waitForStarted(changeStream, () => { - triggerResumableError(changeStream, () => { - events.push('error'); - coll.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } }); - }); + const changePromise = once(changeStream, 'change'); + await once(changeStream.cursor, 'init'); + + const stub = sinon.stub(changeStream.cursor, 'close'); + + stub.callsFake(async function () { + stub.wrappedMethod.call(this); + stub.restore(); + events.push('error'); + await coll.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } }); }); + + changeStream.cursorStream.emit('error', new MongoNetworkError('error triggered from test')); + + const [change] = await changePromise; + expect(change).to.containSubset({ operationType: 'insert', fullDocument: { x: 2 } }); + expect(events).to.be.an('array').with.lengthOf(3); + + expect(events[0]).nested.property('$changeStream.startAfter').to.exist; + expect(events[1]).to.equal('error'); + expect(events[2]).nested.property('$changeStream.startAfter').to.exist; } }); From 4821ed49ca63a971a311be607aabca5b6a2bbe81 Mon Sep 17 00:00:00 2001 From: Warren James Date: Wed, 9 Oct 2024 13:19:07 -0400 Subject: [PATCH 3/4] lint --- test/integration/change-streams/change_streams.prose.test.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index 40e3d30233..60492b40d3 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -1,5 +1,3 @@ -import { promisify } from 'node:util'; - import { expect } from 'chai'; import { once } from 'events'; import * as sinon from 'sinon'; From 1efa29888793bd25719707dbf5b412c4638767ab Mon Sep 17 00:00:00 2001 From: Warren James Date: Wed, 9 Oct 2024 15:09:54 -0400 Subject: [PATCH 4/4] fix test --- test/integration/change-streams/change_stream.test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 19c9f2a051..9e171f0ee6 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -2661,6 +2661,7 @@ describe('ChangeStream resumability', function () { for await (const change of changeStream) { expect.fail('Change stream produced events on an unresumable error'); } + expect.fail('Change stream did not iterate and did not throw an error'); } catch (error) { expect(error).to.be.instanceOf(MongoServerError); expect(aggregateEvents).to.have.lengthOf(2);