Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUST-1488 Populate inserted_ids when an insert_many fails #761

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions src/coll/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
pub mod options;

use std::{borrow::Borrow, collections::HashSet, fmt, fmt::Debug, sync::Arc};
use std::{
borrow::Borrow,
collections::{HashMap, HashSet},
fmt,
fmt::Debug,
sync::Arc,
};

use futures_util::{
future,
Expand Down Expand Up @@ -1196,7 +1202,7 @@ where

let mut cumulative_failure: Option<BulkWriteFailure> = None;
let mut error_labels: HashSet<String> = Default::default();
let mut cumulative_result: Option<InsertManyResult> = None;
let mut cumulative_inserted_ids = HashMap::new();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially had just added logic to set the inserted_ids on the cumulative_failure for failed batches, but I realized that would not correctly handle cases where there are a mix of successful and unsuccessful batches -- for example, if the first batch succeeds and the second batch fails, we would put the inserted_ids from batch 1 on the cumulative_result but the inserted_ids from batch 2 on the cumulative_failure, and then only return the failure.
given this, it seemed simplest to switch to tracking the inserted_ids in one place and then just setting them on the InsertManyResult / BulkWriteFailure at the end.


let mut n_attempted = 0;

Expand All @@ -1211,13 +1217,8 @@ where
{
Ok(result) => {
let current_batch_size = result.inserted_ids.len();

let cumulative_result =
cumulative_result.get_or_insert_with(InsertManyResult::new);
for (index, id) in result.inserted_ids {
cumulative_result
.inserted_ids
.insert(index + n_attempted, id);
cumulative_inserted_ids.insert(index + n_attempted, id);
}

n_attempted += current_batch_size;
Expand All @@ -1235,6 +1236,10 @@ where

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the event that one of the batches fails with a non-BulkWriteError, we just return early without continuing it looks like. I think this is probably inconsistent with the docstring of unordered writes ("If false, when a write fails, continue with the remaining writes, if any), but besides that, it also means we discard our cumulative idea of what has been inserted thus far. I wonder if we would instead want to add a Vec<Error> to BulkWriteFailure to account for the other errors that might occur during a multi-batch write in order to preserve the inserted_ids map. We have something similar in Swift, though I think it's only a single error instead of an array of them.

This may get kind of tricky when deciding which errors we want to continue on with and which constitute a "terminal" error. For example, I imagine an auth error would not be something we'd want to continue attempting to insert after, but a network error would be (e.g. so that we could get a different mongos for the next batch). Maybe its worth deferring to another ticket? Or maybe we keep the existing behavior?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let failure_ref =
cumulative_failure.get_or_insert_with(BulkWriteFailure::new);
for (index, id) in bw.inserted_ids {
cumulative_inserted_ids.insert(index + n_attempted, id);
}

if let Some(write_errors) = bw.write_errors {
for err in write_errors {
let index = n_attempted + err.index;
Expand All @@ -1255,7 +1260,8 @@ where
if ordered {
// this will always be true since we invoked get_or_insert_with
// above.
if let Some(failure) = cumulative_failure {
if let Some(mut failure) = cumulative_failure {
failure.inserted_ids = cumulative_inserted_ids;
return Err(Error::new(
ErrorKind::BulkWrite(failure),
Some(error_labels),
Expand All @@ -1271,11 +1277,14 @@ where
}

match cumulative_failure {
Some(failure) => Err(Error::new(
ErrorKind::BulkWrite(failure),
Some(error_labels),
)),
None => Ok(cumulative_result.unwrap_or_else(InsertManyResult::new)),
Some(mut failure) => {
failure.inserted_ids = cumulative_inserted_ids;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is actually currently pub(crate), and the only reason the user was able to observe it was the derived Debug implementation. I did some digging and found that RUST-260 was filed to eventually expose this field properly and to stop discarding the intermediate results, and I believe the field originally was introduced for counting writes attempted rather than actually tracking inserted ids (per #377 (comment)).

Given that we've done the work for populating this field, I think we probably ought to do RUST-260 also and mark the field as pub.

Regarding the Jira logistics of this, I wonder if it makes sense to rename RUST-1488 to cover the bug of the Debug implementation leaking the unset, pub(crate) inserted_ids map, and instead have this PR be titled RUST-260. Wdyt?

Err(Error::new(
ErrorKind::BulkWrite(failure),
Some(error_labels),
))
}
None => Ok(InsertManyResult::new(cumulative_inserted_ids)),
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ pub struct InsertManyResult {
}

impl InsertManyResult {
pub(crate) fn new() -> Self {
InsertManyResult {
inserted_ids: HashMap::new(),
}
pub(crate) fn new(inserted_ids: HashMap<usize, Bson>) -> Self {
InsertManyResult { inserted_ids }
}
}

Expand Down
214 changes: 213 additions & 1 deletion src/test/coll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
UpdateOptions,
WriteConcern,
},
results::DeleteResult,
results::{DeleteResult, InsertManyResult},
runtime,
test::{
log_uncaptured,
Expand Down Expand Up @@ -1201,3 +1201,215 @@ fn assert_duplicate_key_error_with_utf8_replacement(error: &ErrorKind) {
),
}
}

async fn run_inserted_ids_test(
client: &TestClient,
docs: &Vec<Document>,
ordered: bool,
) -> Result<InsertManyResult> {
let coll = client.init_db_and_coll("bulk_write_test", "test").await;
coll.insert_one(doc! { "_id": 1}, None).await.unwrap();

let insert_opts = InsertManyOptions::builder().ordered(ordered).build();
coll.insert_many(docs, insert_opts).await
}

/// Verify that when an insert_many fails, the returned BulkWriteFailure has
/// its inserted_ids correctly populated.
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
async fn bulk_write_failure_has_inserted_ids() {
let _guard: RwLockReadGuard<()> = LOCK.run_concurrently().await;

let client = TestClient::new().await;

// an ordered, single batch bulk write where the last doc generates a write error:
// everything before the last doc should be inserted
let docs = vec![doc! { "_id": 2}, doc! { "_id": 1}];
let res = run_inserted_ids_test(&client, &docs, true);
let err = res.await.expect_err("insert_many should fail");
match *err.kind {
ErrorKind::BulkWrite(failure) => {
assert_eq!(
failure.inserted_ids.len(),
1,
"one document should have been inserted"
);
assert!(
failure.inserted_ids.contains_key(&0),
"document at index 0 should have been inserted"
);
assert_eq!(
failure.inserted_ids.get(&0).unwrap(),
&Bson::Int32(2),
"inserted document should have _id 2"
);
}
_ => panic!("Expected BulkWrite error, but got: {:?}", err),
}

// an ordered, single batch bulk write where the first doc generates a write error:
// nothing should be inserted
let docs = vec![doc! { "_id": 1}, doc! { "_id": 2}];
let res = run_inserted_ids_test(&client, &docs, true);
let err = res.await.expect_err("insert_many should fail");
match *err.kind {
ErrorKind::BulkWrite(failure) => {
assert_eq!(
failure.inserted_ids.len(),
0,
"inserted_ids should be empty"
);
}
_ => panic!("Expected BulkWrite error, but got: {:?}", err),
}

// an unordered, single batch bulk write where the first doc generates a write error:
// everything after the first doc should be inserted
let res = run_inserted_ids_test(&client, &docs, false);
let err = res.await.expect_err("insert_many should fail");
match *err.kind {
ErrorKind::BulkWrite(failure) => {
assert_eq!(
failure.inserted_ids.len(),
1,
"one document should have been inserted"
);
assert!(
failure.inserted_ids.contains_key(&1),
"document at index 1 should have been inserted"
);
assert_eq!(
failure.inserted_ids.get(&1).unwrap(),
&Bson::Int32(2),
"inserted document should have _id 2"
);
}
_ => panic!("Expected BulkWrite error, but got: {:?}", err),
}

// an ordered, 2-batch bulk write where a document in the first batch generates write error:
// nothing should be inserted
let mut docs = Vec::with_capacity(100001);
for i in 1..100002 {
docs.push(doc! { "_id": Bson::Int32(i) });
}

let res = run_inserted_ids_test(&client, &docs, true);
let err = res.await.expect_err("insert_many should fail");
match *err.kind {
ErrorKind::BulkWrite(failure) => {
assert_eq!(
failure.inserted_ids.len(),
0,
"0 documents should have been inserted"
);
}
_ => panic!("Expected BulkWrite error, but got: {:?}", err),
}

// an unordered, 2-batch bulk write where a document in the first batch generates a write error:
// everything besides that document should be inserted
let res = run_inserted_ids_test(&client, &docs, false);
let err = res.await.expect_err("insert_many should fail");
match *err.kind {
ErrorKind::BulkWrite(failure) => {
assert_eq!(
failure.inserted_ids.len(),
100000,
"100,000 documents should have been inserted"
);
// docs at index 1 up to and including 100,000 should have been inserted
for i in 1..100001 {
assert!(
failure.inserted_ids.contains_key(&i),
"document at index {} should have been inserted",
i,
);
assert_eq!(
failure.inserted_ids.get(&i).unwrap(),
docs[i].get("_id").unwrap(),
"inserted_id for index {} should be {}",
i,
docs[i].get("_id").unwrap(),
)
}
}
_ => panic!("Expected BulkWrite error, but got: {:?}", err),
}

// an ordered, 2-batch bulk write where the second-to-last document in the second batch
// generates a write error: everything before that document should be inserted
let mut docs = Vec::with_capacity(100002);
for i in 2..100003 {
docs.push(doc! { "_id": Bson::Int32(i) });
}
docs.push(doc! { "_id": 1 });
docs.push(doc! { "_id": 100003 });

let res = run_inserted_ids_test(&client, &docs, true);
let err = res.await.expect_err("insert_many should fail");
match *err.kind {
ErrorKind::BulkWrite(failure) => {
assert_eq!(
failure.inserted_ids.len(),
100001,
"100001 documents should have been inserted"
);
// docs at index 0 up to and including 100,000 should have been inserted;
// doc at index 100,001 generates a duplicate key error
for i in 0..100001 {
assert!(
failure.inserted_ids.contains_key(&i),
"document at index {} should have been inserted",
i,
);
assert_eq!(
failure.inserted_ids.get(&i).unwrap(),
docs[i].get("_id").unwrap(),
"inserted_id for index {} should be {}",
i,
docs[i].get("_id").unwrap(),
)
}
}
_ => panic!("Expected BulkWrite error, but got: {:?}", err),
}

// an unordered, 2-batch bulk write where the second-to-last document in the second batch
// generates a write error: everything besides that document should be inserted
let res = run_inserted_ids_test(&client, &docs, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add one more case where we have a 2 batch write with a write error in both batches?

let err = res.await.expect_err("insert_many should fail");
match *err.kind {
ErrorKind::BulkWrite(failure) => {
assert_eq!(
failure.inserted_ids.len(),
100002,
"100002 documents should have been inserted"
);
// docs at index 0 up to and including 100,000 should have been inserted
for i in 0..100001 {
assert!(
failure.inserted_ids.contains_key(&i),
"document at index {} should have been inserted",
i,
);
assert_eq!(
failure.inserted_ids.get(&i).unwrap(),
docs[i].get("_id").unwrap(),
"inserted_id for index {} should be {}",
i,
docs[i].get("_id").unwrap(),
);
// doc at index 100,001 generates a duplicate key error; doc at index
// 100,002 should be inserted
assert_eq!(
failure.inserted_ids.get(&100002).unwrap(),
docs[100002].get("_id").unwrap(),
"inserted_id for index 1000002 should be 100003",
);
}
}
_ => panic!("Expected BulkWrite error, but got: {:?}", err),
}
}