Skip to content

Commit

Permalink
fix(NODE-5627): BulkWriteResult.insertedIds includes ids that were no…
Browse files Browse the repository at this point in the history
…t inserted (#3870)
  • Loading branch information
aditi-khare-mongoDB authored Sep 29, 2023
1 parent 6f67539 commit d766ae2
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 7 deletions.
45 changes: 38 additions & 7 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,36 @@ export class BulkWriteResult {
* Create a new BulkWriteResult instance
* @internal
*/
constructor(bulkResult: BulkResult) {
constructor(bulkResult: BulkResult, isOrdered: boolean) {
this.result = bulkResult;
this.insertedCount = this.result.nInserted ?? 0;
this.matchedCount = this.result.nMatched ?? 0;
this.modifiedCount = this.result.nModified ?? 0;
this.deletedCount = this.result.nRemoved ?? 0;
this.upsertedCount = this.result.upserted.length ?? 0;
this.upsertedIds = BulkWriteResult.generateIdMap(this.result.upserted);
this.insertedIds = BulkWriteResult.generateIdMap(this.result.insertedIds);
this.insertedIds = BulkWriteResult.generateIdMap(
this.getSuccessfullyInsertedIds(bulkResult, isOrdered)
);
Object.defineProperty(this, 'result', { value: this.result, enumerable: false });
}

/**
* Returns document_ids that were actually inserted
* @internal
*/
private getSuccessfullyInsertedIds(bulkResult: BulkResult, isOrdered: boolean): Document[] {
if (bulkResult.writeErrors.length === 0) return bulkResult.insertedIds;

if (isOrdered) {
return bulkResult.insertedIds.slice(0, bulkResult.writeErrors[0].index);
}

return bulkResult.insertedIds.filter(
({ index }) => !bulkResult.writeErrors.some(writeError => index === writeError.index)
);
}

/** Evaluates to true if the bulk operation correctly executes */
get ok(): number {
return this.result.ok;
Expand Down Expand Up @@ -533,7 +551,10 @@ function executeCommands(
callback: Callback<BulkWriteResult>
) {
if (bulkOperation.s.batches.length === 0) {
return callback(undefined, new BulkWriteResult(bulkOperation.s.bulkResult));
return callback(
undefined,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
);
}

const batch = bulkOperation.s.batches.shift() as Batch;
Expand All @@ -542,17 +563,26 @@ function executeCommands(
// Error is a driver related error not a bulk op error, return early
if (err && 'message' in err && !(err instanceof MongoWriteConcernError)) {
return callback(
new MongoBulkWriteError(err, new BulkWriteResult(bulkOperation.s.bulkResult))
new MongoBulkWriteError(
err,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
)
);
}

if (err instanceof MongoWriteConcernError) {
return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, err, callback);
return handleMongoWriteConcernError(
batch,
bulkOperation.s.bulkResult,
bulkOperation.isOrdered,
err,
callback
);
}

// Merge the results together
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, result);
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult);
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
if (bulkOperation.handleWriteError(callback, writeResult)) return;

// Execute the next command in line
Expand Down Expand Up @@ -626,6 +656,7 @@ function executeCommands(
function handleMongoWriteConcernError(
batch: Batch,
bulkResult: BulkResult,
isOrdered: boolean,
err: MongoWriteConcernError,
callback: Callback<BulkWriteResult>
) {
Expand All @@ -637,7 +668,7 @@ function handleMongoWriteConcernError(
message: err.result?.writeConcernError.errmsg,
code: err.result?.writeConcernError.result
},
new BulkWriteResult(bulkResult)
new BulkWriteResult(bulkResult, isOrdered)
)
);
}
Expand Down
116 changes: 116 additions & 0 deletions test/integration/crud/bulk.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
type Collection,
Long,
MongoBatchReExecutionError,
MongoBulkWriteError,
type MongoClient,
MongoDriverError,
MongoInvalidArgumentError
Expand Down Expand Up @@ -104,6 +105,121 @@ describe('Bulk', function () {
}
});
});

context('when inserting duplicate values', function () {
let col;

beforeEach(async function () {
const db = client.db();
col = db.collection('test');
await col.createIndex([{ a: 1 }], { unique: true, sparse: false });
});

async function assertFailsWithDuplicateFields(input, isOrdered, expectedInsertedIds) {
const error = await col.insertMany(input, { ordered: isOrdered }).catch(error => error);
expect(error).to.be.instanceOf(MongoBulkWriteError);
expect(error.result.insertedCount).to.equal(Object.keys(error.result.insertedIds).length);
expect(error.result.insertedIds).to.deep.equal(expectedInsertedIds);
}

context('when the insert is ordered', function () {
it('contains the correct insertedIds on one duplicate insert', async function () {
await assertFailsWithDuplicateFields(
[
{ _id: 0, a: 1 },
{ _id: 1, a: 1 }
],
true,
{ 0: 0 }
);
});

it('contains the correct insertedIds on multiple duplicate inserts', async function () {
await assertFailsWithDuplicateFields(
[
{ _id: 0, a: 1 },
{ _id: 1, a: 1 },
{ _id: 2, a: 1 },
{ _id: 3, b: 2 }
],
true,
{ 0: 0 }
);
});
});

context('when the insert is unordered', function () {
it('contains the correct insertedIds on multiple duplicate inserts', async function () {
await assertFailsWithDuplicateFields(
[
{ _id: 0, a: 1 },
{ _id: 1, a: 1 },
{ _id: 2, a: 1 },
{ _id: 3, b: 2 }
],
false,
{ 0: 0, 3: 3 }
);
});
});
});
});

describe('#bulkWrite()', function () {
context('when inserting duplicate values', function () {
let col;

beforeEach(async function () {
const db = client.db();
col = db.collection('test');
await col.createIndex([{ a: 1 }], { unique: true, sparse: false });
});

async function assertFailsWithDuplicateFields(input, isOrdered, expectedInsertedIds) {
const error = await col.bulkWrite(input, { ordered: isOrdered }).catch(error => error);
expect(error).to.be.instanceOf(MongoBulkWriteError);
expect(error.result.insertedCount).to.equal(Object.keys(error.result.insertedIds).length);
expect(error.result.insertedIds).to.deep.equal(expectedInsertedIds);
}

context('when the insert is ordered', function () {
it('contains the correct insertedIds on one duplicate insert', async function () {
await assertFailsWithDuplicateFields(
[{ insertOne: { _id: 0, a: 1 } }, { insertOne: { _id: 1, a: 1 } }],
true,
{ 0: 0 }
);
});

it('contains the correct insertedIds on multiple duplicate inserts', async function () {
await assertFailsWithDuplicateFields(
[
{ insertOne: { _id: 0, a: 1 } },
{ insertOne: { _id: 1, a: 1 } },
{ insertOne: { _id: 2, a: 1 } },
{ insertOne: { _id: 3, b: 2 } }
],
true,
{ 0: 0 }
);
});
});

context('when the insert is unordered', function () {
it('contains the correct insertedIds on multiple duplicate inserts', async function () {
await assertFailsWithDuplicateFields(
[
{ insertOne: { _id: 0, a: 1 } },
{ insertOne: { _id: 1, a: 1 } },
{ insertOne: { _id: 2, a: 1 } },
{ insertOne: { _id: 3, b: 2 } }
],
false,
{ 0: 0, 3: 3 }
);
});
});
});
});
});

Expand Down

0 comments on commit d766ae2

Please sign in to comment.