Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-5628): bulkWriteResult.insertedIds does not filter out _ids that are not actually inserted #3867

Merged
merged 13 commits into from
Sep 27, 2023
Merged
45 changes: 38 additions & 7 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,17 @@ export class BulkWriteResult {
* Create a new BulkWriteResult instance
* @internal
*/
constructor(bulkResult: BulkResult) {
constructor(bulkResult: BulkResult, isOrdered: boolean) {
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
this.result = bulkResult;
this.insertedCount = this.result.nInserted ?? 0;
this.matchedCount = this.result.nMatched ?? 0;
this.modifiedCount = this.result.nModified ?? 0;
this.deletedCount = this.result.nRemoved ?? 0;
this.upsertedCount = this.result.upserted.length ?? 0;
this.upsertedIds = BulkWriteResult.generateIdMap(this.result.upserted);
this.insertedIds = BulkWriteResult.generateIdMap(this.result.insertedIds);
this.insertedIds = BulkWriteResult.generateIdMap(
this.getSuccessfullyInsertedIds(bulkResult, isOrdered)
);
Object.defineProperty(this, 'result', { value: this.result, enumerable: false });
}

Expand All @@ -214,6 +216,22 @@ export class BulkWriteResult {
return this.result.ok;
}

/**
* Returns document_ids that were actually inserted
* @internal
*/
private getSuccessfullyInsertedIds(bulkResult: BulkResult, isOrdered: boolean): Document[] {
if (bulkResult.writeErrors.length === 0) return bulkResult.insertedIds;

if (isOrdered) {
return bulkResult.insertedIds.slice(0, bulkResult.writeErrors[0].index);
}

return bulkResult.insertedIds.filter(
({ index }) => !bulkResult.writeErrors.some(writeError => index === writeError.index)
);
}

/** Returns the upserted id at the given index */
getUpsertedIdAt(index: number): Document | undefined {
return this.result.upserted[index];
Expand Down Expand Up @@ -479,7 +497,10 @@ function executeCommands(
callback: Callback<BulkWriteResult>
) {
if (bulkOperation.s.batches.length === 0) {
return callback(undefined, new BulkWriteResult(bulkOperation.s.bulkResult));
return callback(
undefined,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
);
}

const batch = bulkOperation.s.batches.shift() as Batch;
Expand All @@ -488,17 +509,26 @@ function executeCommands(
// Error is a driver related error not a bulk op error, return early
if (err && 'message' in err && !(err instanceof MongoWriteConcernError)) {
return callback(
new MongoBulkWriteError(err, new BulkWriteResult(bulkOperation.s.bulkResult))
new MongoBulkWriteError(
err,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
)
);
}

if (err instanceof MongoWriteConcernError) {
return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, err, callback);
return handleMongoWriteConcernError(
batch,
bulkOperation.s.bulkResult,
bulkOperation.isOrdered,
err,
callback
);
}

// Merge the results together
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, result);
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult);
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
if (bulkOperation.handleWriteError(callback, writeResult)) return;

// Execute the next command in line
Expand Down Expand Up @@ -572,6 +602,7 @@ function executeCommands(
function handleMongoWriteConcernError(
batch: Batch,
bulkResult: BulkResult,
isOrdered: boolean,
err: MongoWriteConcernError,
callback: Callback<BulkWriteResult>
) {
Expand All @@ -583,7 +614,7 @@ function handleMongoWriteConcernError(
message: err.result?.writeConcernError.errmsg,
code: err.result?.writeConcernError.result
},
new BulkWriteResult(bulkResult)
new BulkWriteResult(bulkResult, isOrdered)
)
);
}
Expand Down
118 changes: 117 additions & 1 deletion test/integration/crud/bulk.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
type Collection,
Long,
MongoBatchReExecutionError,
MongoBulkWriteError,
type MongoClient,
MongoDriverError,
MongoInvalidArgumentError
Expand All @@ -31,7 +32,6 @@ describe('Bulk', function () {
.createCollection('test')
.catch(() => null); // make ns exist
});

afterEach(async function () {
const cleanup = this.configuration.newClient();
await cleanup
Expand Down Expand Up @@ -91,6 +91,7 @@ describe('Bulk', function () {
}
});
});

context('when passed a valid document list', function () {
it('insertMany should not throw a MongoInvalidArgument error when called with a valid operation', async function () {
try {
Expand All @@ -104,6 +105,121 @@ describe('Bulk', function () {
}
});
});

context('when inserting duplicate values', function () {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
let col;

beforeEach(async function () {
const db = client.db();
col = db.collection('test');
await col.createIndex([{ a: 1 }], { unique: true, sparse: false });
});

async function assertFailsWithDuplicateFields(input, isOrdered, expectedInsertedIds) {
const error = await col.insertMany(input, { ordered: isOrdered }).catch(error => error);
expect(error).to.be.instanceOf(MongoBulkWriteError);
expect(error.result.insertedCount).to.equal(Object.keys(error.result.insertedIds).length);
expect(error.result.insertedIds).to.deep.equal(expectedInsertedIds);
}

context('when the insert is ordered', function () {
it('contains the correct insertedIds on one duplicate insert', async function () {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
await assertFailsWithDuplicateFields(
[
{ _id: 0, a: 1 },
{ _id: 1, a: 1 }
],
true,
{ 0: 0 }
);
});

it('contains the correct insertedIds on multiple duplicate inserts', async function () {
await assertFailsWithDuplicateFields(
[
{ _id: 0, a: 1 },
{ _id: 1, a: 1 },
{ _id: 2, a: 1 },
{ _id: 3, b: 2 }
],
true,
{ 0: 0 }
);
});
});

context('when the insert is unordered', function () {
it('contains the correct insertedIds on multiple duplicate inserts', async function () {
await assertFailsWithDuplicateFields(
[
{ _id: 0, a: 1 },
{ _id: 1, a: 1 },
{ _id: 2, a: 1 },
{ _id: 3, b: 2 }
],
false,
{ 0: 0, 3: 3 }
);
});
});
});
});

describe('#bulkWrite()', function () {
context('when inserting duplicate values', function () {
let col;

beforeEach(async function () {
const db = client.db();
col = db.collection('test');
await col.createIndex([{ a: 1 }], { unique: true, sparse: false });
});

async function assertFailsWithDuplicateFields(input, isOrdered, expectedInsertedIds) {
const error = await col.bulkWrite(input, { ordered: isOrdered }).catch(error => error);
expect(error).to.be.instanceOf(MongoBulkWriteError);
expect(error.result.insertedCount).to.equal(Object.keys(error.result.insertedIds).length);
expect(error.result.insertedIds).to.deep.equal(expectedInsertedIds);
}

context('when the insert is ordered', function () {
it('contains the correct insertedIds on one duplicate insert', async function () {
await assertFailsWithDuplicateFields(
[{ insertOne: { _id: 0, a: 1 } }, { insertOne: { _id: 1, a: 1 } }],
true,
{ 0: 0 }
);
});

it('contains the correct insertedIds on multiple duplicate inserts', async function () {
await assertFailsWithDuplicateFields(
[
{ insertOne: { _id: 0, a: 1 } },
{ insertOne: { _id: 1, a: 1 } },
{ insertOne: { _id: 2, a: 1 } },
{ insertOne: { _id: 3, b: 2 } }
],
true,
{ 0: 0 }
);
});
});

context('when the insert is unordered', function () {
it('contains the correct insertedIds on multiple duplicate inserts', async function () {
await assertFailsWithDuplicateFields(
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
[
{ insertOne: { _id: 0, a: 1 } },
{ insertOne: { _id: 1, a: 1 } },
{ insertOne: { _id: 2, a: 1 } },
{ insertOne: { _id: 3, b: 2 } }
],
false,
{ 0: 0, 3: 3 }
);
});
});
});
});
});

Expand Down