Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-6418): change stream resumes infinitely after failed aggregates #4267

Merged
merged 5 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ export class ChangeStream<
// If the change stream has been closed explicitly, do not process error.
if (this[kClosed]) return;

if (isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
if (this.cursor.id != null && isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
this._endStream();

this.cursor.close().then(undefined, squashError);
Expand Down Expand Up @@ -975,7 +975,10 @@ export class ChangeStream<
throw new MongoAPIError(CHANGESTREAM_CLOSED_ERROR);
}

if (!isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
if (
this.cursor.id == null ||
!isResumableError(changeStreamError, this.cursor.maxWireVersion)
) {
try {
await this.close();
} catch (error) {
Expand Down
147 changes: 147 additions & 0 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2061,6 +2061,34 @@ describe('ChangeStream resumability', function () {
}
);
});

context('when the error occurs on the aggregate command', function () {
it(
'does not resume',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
const resumableErrorCode = 7; // Host not found
await client.db('admin').command({
configureFailPoint: 'failCommand',
mode: { times: 2 }, // Fail twice to account for retry attempt in executeOperation which is separate from the change stream's resume attempt
data: {
failCommands: ['aggregate'],
errorCode: resumableErrorCode
}
} as FailPoint);

changeStream = collection.watch([]);

await collection.insertOne({ name: 'bailey' });

const maybeError = await changeStream.next().catch(e => e);

expect(maybeError).to.be.instanceof(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(2);
expect(changeStream.closed).to.be.true;
}
);
});
});

context('#hasNext', function () {
Expand Down Expand Up @@ -2225,6 +2253,34 @@ describe('ChangeStream resumability', function () {
}
);
});

context('when the error occurs on the aggregate command', function () {
it(
'does not resume',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
const resumableErrorCode = 7; // Host not found
await client.db('admin').command({
configureFailPoint: 'failCommand',
mode: { times: 2 }, // Fail twice to account for retry attempt in executeOperation which is separate from the change stream's resume attempt
data: {
failCommands: ['aggregate'],
errorCode: resumableErrorCode
}
} as FailPoint);

changeStream = collection.watch([]);

await collection.insertOne({ name: 'bailey' });

const maybeError = await changeStream.hasNext().catch(e => e);

expect(maybeError).to.be.instanceof(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(2);
expect(changeStream.closed).to.be.true;
}
);
});
});

context('#tryNext', function () {
Expand Down Expand Up @@ -2401,6 +2457,34 @@ describe('ChangeStream resumability', function () {
}
);
});

context('when the error occurs on the aggregate command', function () {
it(
'does not resume',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
const resumableErrorCode = 7; // Host not found
await client.db('admin').command({
configureFailPoint: 'failCommand',
mode: { times: 2 }, // Fail twice to account for retry attempt in executeOperation which is separate from the change stream's resume attempt
data: {
failCommands: ['aggregate'],
errorCode: resumableErrorCode
}
} as FailPoint);

changeStream = collection.watch([]);

await collection.insertOne({ name: 'bailey' });

const maybeError = await changeStream.tryNext().catch(e => e);

expect(maybeError).to.be.instanceof(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(2);
expect(changeStream.closed).to.be.true;
}
);
});
});

context('#asyncIterator', function () {
Expand Down Expand Up @@ -2551,6 +2635,40 @@ describe('ChangeStream resumability', function () {
}
);
});

context('when the error occurs on the aggregate command', function () {
it(
'does not resume',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);

const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }];
await collection.insertMany(docs);

const resumableErrorCode = 7;
await client.db('admin').command({
configureFailPoint: 'failCommand',
mode: { times: 2 }, // Account for retry in executeOperation which is separate from change stream's resume
data: {
failCommands: ['aggregate'],
errorCode: resumableErrorCode
}
} as FailPoint);

try {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const change of changeStream) {
expect.fail('Change stream produced events on an unresumable error');
}
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
} catch (error) {
expect(error).to.be.instanceOf(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(2);
expect(changeStream.closed).to.be.true;
}
}
);
});
});
});

Expand Down Expand Up @@ -2721,6 +2839,35 @@ describe('ChangeStream resumability', function () {
}
);
});

context('when the error occurred on the aggregate', function () {
it(
'does not resume',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);

const resumableErrorCode = 7;
await client.db('admin').command({
configureFailPoint: 'failCommand',
mode: { times: 2 }, // account for retry attempt in executeOperation which is separate from change stream's retry
data: {
failCommands: ['aggregate'],
errorCode: resumableErrorCode
}
} as FailPoint);

const willBeError = once(changeStream, 'change').catch(error => error);
await collection.insertOne({ name: 'bailey' });

const error = await willBeError;

expect(error).to.be.instanceOf(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(2);
expect(changeStream.closed).to.be.true;
}
);
});
});

it(
Expand Down
47 changes: 27 additions & 20 deletions test/integration/change-streams/change_streams.prose.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import { setTimeout } from 'timers';

import {
type ChangeStream,
type Collection,
type CommandFailedEvent,
type CommandStartedEvent,
type CommandSucceededEvent,
type Document,
isHello,
LEGACY_HELLO_COMMAND,
Long,
type MongoClient,
MongoNetworkError,
ObjectId,
Timestamp
Expand Down Expand Up @@ -840,8 +842,8 @@ describe('Change Stream prose tests', function () {
// 15 - 16 removed by spec

describe('Change Stream prose 17-18', function () {
let client;
let coll;
let client: MongoClient;
let coll: Collection;
let startAfter;

function recordEvent(events, e) {
Expand Down Expand Up @@ -886,31 +888,36 @@ describe('Change Stream prose tests', function () {
// when resuming a change stream.
it('$changeStream without results must include startAfter and not resumeAfter', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=4.1.1' } },
test: function (done) {
test: async function () {
const events = [];
client.on('commandStarted', e => recordEvent(events, e));
const changeStream = coll.watch([], { startAfter });
this.defer(() => changeStream.close());

changeStream.once('change', change => {
expect(change).to.containSubset({
operationType: 'insert',
fullDocument: { x: 2 }
});

expect(events).to.be.an('array').with.lengthOf(3);
expect(events[0]).nested.property('$changeStream.startAfter').to.exist;
expect(events[1]).to.equal('error');
expect(events[2]).nested.property('$changeStream.startAfter').to.exist;
done();
changeStream.on('error', async e => {
await changeStream.close(e);
});

waitForStarted(changeStream, () => {
triggerResumableError(changeStream, () => {
events.push('error');
coll.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } });
});
const changePromise = once(changeStream, 'change');
await once(changeStream.cursor, 'init');

const stub = sinon.stub(changeStream.cursor, 'close');

stub.callsFake(async function () {
stub.wrappedMethod.call(this);
stub.restore();
events.push('error');
await coll.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } });
});

changeStream.cursorStream.emit('error', new MongoNetworkError('error triggered from test'));

const [change] = await changePromise;
expect(change).to.containSubset({ operationType: 'insert', fullDocument: { x: 2 } });
expect(events).to.be.an('array').with.lengthOf(3);

expect(events[0]).nested.property('$changeStream.startAfter').to.exist;
expect(events[1]).to.equal('error');
expect(events[2]).nested.property('$changeStream.startAfter').to.exist;
}
});

Expand Down