From 431e06d4a90110f4eb2ae6612dbf98efec6a1111 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Thu, 6 Oct 2022 17:11:14 -0400 Subject: [PATCH 1/5] populate inserted_ids when bulk write failures occur --- src/coll/mod.rs | 4 +++ src/test/coll.rs | 85 +++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/src/coll/mod.rs b/src/coll/mod.rs index ed9bfee0f..eb391a48d 100644 --- a/src/coll/mod.rs +++ b/src/coll/mod.rs @@ -1235,6 +1235,10 @@ where let failure_ref = cumulative_failure.get_or_insert_with(BulkWriteFailure::new); + for (index, id) in bw.inserted_ids { + failure_ref.inserted_ids.insert(index + n_attempted, id); + } + if let Some(write_errors) = bw.write_errors { for err in write_errors { let index = n_attempted + err.index; diff --git a/src/test/coll.rs b/src/test/coll.rs index 49daa77bc..c6ddd7db5 100644 --- a/src/test/coll.rs +++ b/src/test/coll.rs @@ -27,7 +27,7 @@ use crate::{ UpdateOptions, WriteConcern, }, - results::DeleteResult, + results::{DeleteResult, InsertManyResult}, runtime, test::{ log_uncaptured, @@ -1201,3 +1201,86 @@ fn assert_duplicate_key_error_with_utf8_replacement(error: &ErrorKind) { ), } } + +async fn run_inserted_ids_test( + client: &TestClient, + docs: Vec, + ordered: bool, +) -> Result { + let coll = client.init_db_and_coll("bulk_write_test", "test").await; + coll.insert_one(doc! { "_id": 1}, None).await.unwrap(); + + let insert_opts = InsertManyOptions::builder().ordered(ordered).build(); + coll.insert_many(docs, insert_opts).await +} + +/// Verify that when an insert_many fails, the returned BulkWriteFailure has +/// its inserted_ids correctly populated. +#[cfg_attr(feature = "tokio-runtime", tokio::test)] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn bulk_write_failure_has_inserted_ids() { + let _guard: RwLockReadGuard<()> = LOCK.run_concurrently().await; + + let client = TestClient::new().await; + + let res = run_inserted_ids_test(&client, [doc! { "_id": 1}, doc! { "_id": 2}].to_vec(), true); + let err = res.await.expect_err("insert_many should fail"); + match *err.kind { + ErrorKind::BulkWrite(failure) => { + assert_eq!( + failure.inserted_ids.len(), + 0, + "inserted_ids should be empty" + ); + } + _ => panic!("Expected BulkWrite error, but got: {:?}", err), + } + + let res = run_inserted_ids_test(&client, [doc! { "_id": 2}, doc! { "_id": 1}].to_vec(), true); + let err = res.await.expect_err("insert_many should fail"); + match *err.kind { + ErrorKind::BulkWrite(failure) => { + assert_eq!( + failure.inserted_ids.len(), + 1, + "one document should have been inserted" + ); + assert!( + failure.inserted_ids.contains_key(&0), + "document at index 0 should have been inserted" + ); + assert_eq!( + failure.inserted_ids.get(&0).unwrap(), + &Bson::Int32(2), + "inserted document should have _id 2" + ); + } + _ => panic!("Expected BulkWrite error, but got: {:?}", err), + } + + let res = run_inserted_ids_test( + &client, + [doc! { "_id": 1}, doc! { "_id": 2}].to_vec(), + false, + ); + let err = res.await.expect_err("insert_many should fail"); + match *err.kind { + ErrorKind::BulkWrite(failure) => { + assert_eq!( + failure.inserted_ids.len(), + 1, + "one document should have been inserted" + ); + assert!( + failure.inserted_ids.contains_key(&1), + "document at index 1 should have been inserted" + ); + assert_eq!( + failure.inserted_ids.get(&1).unwrap(), + &Bson::Int32(2), + "inserted document should have _id 2" + ); + } + _ => panic!("Expected BulkWrite error, but got: {:?}", err), + } +} From 243a54dfca3cdd2638c9a649cb61ed0daba7e133 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Thu, 6 Oct 2022 18:25:23 -0400 Subject: [PATCH 2/5] add tests for and fix collating inserted_ids across batches --- src/coll/mod.rs | 35 +++++----- src/results.rs | 6 +- src/test/coll.rs | 167 +++++++++++++++++++++++++++++++++++++++++------ 3 files changed, 170 insertions(+), 38 deletions(-) diff --git a/src/coll/mod.rs b/src/coll/mod.rs index eb391a48d..e20350aab 100644 --- a/src/coll/mod.rs +++ b/src/coll/mod.rs @@ -1,6 +1,12 @@ pub mod options; -use std::{borrow::Borrow, collections::HashSet, fmt, fmt::Debug, sync::Arc}; +use std::{ + borrow::Borrow, + collections::{HashMap, HashSet}, + fmt, + fmt::Debug, + sync::Arc, +}; use futures_util::{ future, @@ -1196,7 +1202,7 @@ where let mut cumulative_failure: Option = None; let mut error_labels: HashSet = Default::default(); - let mut cumulative_result: Option = None; + let mut cumulative_inserted_ids = HashMap::new(); let mut n_attempted = 0; @@ -1211,13 +1217,8 @@ where { Ok(result) => { let current_batch_size = result.inserted_ids.len(); - - let cumulative_result = - cumulative_result.get_or_insert_with(InsertManyResult::new); for (index, id) in result.inserted_ids { - cumulative_result - .inserted_ids - .insert(index + n_attempted, id); + cumulative_inserted_ids.insert(index + n_attempted, id); } n_attempted += current_batch_size; @@ -1236,7 +1237,7 @@ where let failure_ref = cumulative_failure.get_or_insert_with(BulkWriteFailure::new); for (index, id) in bw.inserted_ids { - failure_ref.inserted_ids.insert(index + n_attempted, id); + cumulative_inserted_ids.insert(index + n_attempted, id); } if let Some(write_errors) = bw.write_errors { @@ -1259,7 +1260,8 @@ where if ordered { // this will always be true since we invoked get_or_insert_with // above. - if let Some(failure) = cumulative_failure { + if let Some(mut failure) = cumulative_failure { + failure.inserted_ids = cumulative_inserted_ids; return Err(Error::new( ErrorKind::BulkWrite(failure), Some(error_labels), @@ -1275,11 +1277,14 @@ where } match cumulative_failure { - Some(failure) => Err(Error::new( - ErrorKind::BulkWrite(failure), - Some(error_labels), - )), - None => Ok(cumulative_result.unwrap_or_else(InsertManyResult::new)), + Some(mut failure) => { + failure.inserted_ids = cumulative_inserted_ids; + Err(Error::new( + ErrorKind::BulkWrite(failure), + Some(error_labels), + )) + } + None => Ok(InsertManyResult::new(cumulative_inserted_ids)), } } diff --git a/src/results.rs b/src/results.rs index 5245fbca5..3367ae213 100644 --- a/src/results.rs +++ b/src/results.rs @@ -41,10 +41,8 @@ pub struct InsertManyResult { } impl InsertManyResult { - pub(crate) fn new() -> Self { - InsertManyResult { - inserted_ids: HashMap::new(), - } + pub(crate) fn new(inserted_ids: HashMap) -> Self { + InsertManyResult { inserted_ids } } } diff --git a/src/test/coll.rs b/src/test/coll.rs index c6ddd7db5..d75394e11 100644 --- a/src/test/coll.rs +++ b/src/test/coll.rs @@ -1204,7 +1204,7 @@ fn assert_duplicate_key_error_with_utf8_replacement(error: &ErrorKind) { async fn run_inserted_ids_test( client: &TestClient, - docs: Vec, + docs: &Vec, ordered: bool, ) -> Result { let coll = client.init_db_and_coll("bulk_write_test", "test").await; @@ -1223,7 +1223,35 @@ async fn bulk_write_failure_has_inserted_ids() { let client = TestClient::new().await; - let res = run_inserted_ids_test(&client, [doc! { "_id": 1}, doc! { "_id": 2}].to_vec(), true); + // an ordered, single batch bulk write where the last doc generates a write error: + // everything before the last doc should be inserted + let docs = vec![doc! { "_id": 2}, doc! { "_id": 1}]; + let res = run_inserted_ids_test(&client, &docs, true); + let err = res.await.expect_err("insert_many should fail"); + match *err.kind { + ErrorKind::BulkWrite(failure) => { + assert_eq!( + failure.inserted_ids.len(), + 1, + "one document should have been inserted" + ); + assert!( + failure.inserted_ids.contains_key(&0), + "document at index 0 should have been inserted" + ); + assert_eq!( + failure.inserted_ids.get(&0).unwrap(), + &Bson::Int32(2), + "inserted document should have _id 2" + ); + } + _ => panic!("Expected BulkWrite error, but got: {:?}", err), + } + + // an ordered, single batch bulk write where the first doc generates a write error: + // nothing should be inserted + let docs = vec![doc! { "_id": 1}, doc! { "_id": 2}]; + let res = run_inserted_ids_test(&client, &docs, true); let err = res.await.expect_err("insert_many should fail"); match *err.kind { ErrorKind::BulkWrite(failure) => { @@ -1236,7 +1264,9 @@ async fn bulk_write_failure_has_inserted_ids() { _ => panic!("Expected BulkWrite error, but got: {:?}", err), } - let res = run_inserted_ids_test(&client, [doc! { "_id": 2}, doc! { "_id": 1}].to_vec(), true); + // an unordered, single batch bulk write where the first doc generates a write error: + // everything after the first doc should be inserted + let res = run_inserted_ids_test(&client, &docs, false); let err = res.await.expect_err("insert_many should fail"); match *err.kind { ErrorKind::BulkWrite(failure) => { @@ -1246,11 +1276,11 @@ async fn bulk_write_failure_has_inserted_ids() { "one document should have been inserted" ); assert!( - failure.inserted_ids.contains_key(&0), - "document at index 0 should have been inserted" + failure.inserted_ids.contains_key(&1), + "document at index 1 should have been inserted" ); assert_eq!( - failure.inserted_ids.get(&0).unwrap(), + failure.inserted_ids.get(&1).unwrap(), &Bson::Int32(2), "inserted document should have _id 2" ); @@ -1258,28 +1288,127 @@ async fn bulk_write_failure_has_inserted_ids() { _ => panic!("Expected BulkWrite error, but got: {:?}", err), } - let res = run_inserted_ids_test( - &client, - [doc! { "_id": 1}, doc! { "_id": 2}].to_vec(), - false, - ); + // an ordered, 2-batch bulk write where a document in the first batch generates write error: + // nothing should be inserted + let mut docs = Vec::with_capacity(100001); + for i in 1..100002 { + docs.push(doc! { "_id": Bson::Int32(i) }); + } + + let res = run_inserted_ids_test(&client, &docs, true); let err = res.await.expect_err("insert_many should fail"); match *err.kind { ErrorKind::BulkWrite(failure) => { assert_eq!( failure.inserted_ids.len(), - 1, - "one document should have been inserted" + 0, + "0 documents should have been inserted" ); - assert!( - failure.inserted_ids.contains_key(&1), - "document at index 1 should have been inserted" + } + _ => panic!("Expected BulkWrite error, but got: {:?}", err), + } + + // an unordered, 2-batch bulk write where a document in the first batch generates a write error: + // everything besides that document should be inserted + let res = run_inserted_ids_test(&client, &docs, false); + let err = res.await.expect_err("insert_many should fail"); + match *err.kind { + ErrorKind::BulkWrite(failure) => { + assert_eq!( + failure.inserted_ids.len(), + 100000, + "100,000 documents should have been inserted" + ); + // docs at index 1 up to and including 100,000 should have been inserted + for i in 1..100001 { + assert!( + failure.inserted_ids.contains_key(&i), + "document at index {} should have been inserted", + i, + ); + assert_eq!( + failure.inserted_ids.get(&i).unwrap(), + docs[i].get("_id").unwrap(), + "inserted_id for index {} should be {}", + i, + docs[i].get("_id").unwrap(), + ) + } + } + _ => panic!("Expected BulkWrite error, but got: {:?}", err), + } + + // an ordered, 2-batch bulk write where the second-to-last document in the second batch + // generates a write error: everything before that document should be inserted + let mut docs = Vec::with_capacity(100002); + for i in 2..100003 { + docs.push(doc! { "_id": Bson::Int32(i) }); + } + docs.push(doc! { "_id": 1 }); + docs.push(doc! { "_id": 100003 }); + + let res = run_inserted_ids_test(&client, &docs, true); + let err = res.await.expect_err("insert_many should fail"); + match *err.kind { + ErrorKind::BulkWrite(failure) => { + assert_eq!( + failure.inserted_ids.len(), + 100001, + "100001 documents should have been inserted" ); + // docs at index 0 up to and including 100,000 should have been inserted; + // doc at index 100,001 generates a duplicate key error + for i in 0..100001 { + assert!( + failure.inserted_ids.contains_key(&i), + "document at index {} should have been inserted", + i, + ); + assert_eq!( + failure.inserted_ids.get(&i).unwrap(), + docs[i].get("_id").unwrap(), + "inserted_id for index {} should be {}", + i, + docs[i].get("_id").unwrap(), + ) + } + } + _ => panic!("Expected BulkWrite error, but got: {:?}", err), + } + + // an unordered, 2-batch bulk write where the second-to-last document in the second batch + // generates a write error: everything besides that document should be inserted + let res = run_inserted_ids_test(&client, &docs, false); + let err = res.await.expect_err("insert_many should fail"); + match *err.kind { + ErrorKind::BulkWrite(failure) => { assert_eq!( - failure.inserted_ids.get(&1).unwrap(), - &Bson::Int32(2), - "inserted document should have _id 2" + failure.inserted_ids.len(), + 100002, + "100002 documents should have been inserted" ); + // docs at index 0 up to and including 100,000 should have been inserted + for i in 0..100001 { + assert!( + failure.inserted_ids.contains_key(&i), + "document at index {} should have been inserted", + i, + ); + assert_eq!( + failure.inserted_ids.get(&i).unwrap(), + docs[i].get("_id").unwrap(), + "inserted_id for index {} should be {}", + i, + docs[i].get("_id").unwrap(), + ); + // doc at index 100,001 generates a duplicate key error; doc at index + // 100,002 should be inserted + assert_eq!( + failure.inserted_ids.get(&100002).unwrap(), + docs[100002].get("_id").unwrap(), + "inserted_id for index 1000002 should be 100003", + ); + } } _ => panic!("Expected BulkWrite error, but got: {:?}", err), } From fecf8442a8e3468f8ea4ac33d94e75dbd757494c Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Thu, 6 Oct 2022 18:37:14 -0400 Subject: [PATCH 3/5] Update coll.rs --- src/test/coll.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/coll.rs b/src/test/coll.rs index d75394e11..6b3c476b6 100644 --- a/src/test/coll.rs +++ b/src/test/coll.rs @@ -1290,6 +1290,7 @@ async fn bulk_write_failure_has_inserted_ids() { // an ordered, 2-batch bulk write where a document in the first batch generates write error: // nothing should be inserted + // note: these numbers were chosen because maxWriteBatchSize is 100,000 let mut docs = Vec::with_capacity(100001); for i in 1..100002 { docs.push(doc! { "_id": Bson::Int32(i) }); From 2185bcdd60e94d1232228b321868dbc3fb91ec43 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Fri, 7 Oct 2022 09:22:00 -0400 Subject: [PATCH 4/5] fix clippy and simplify assertions --- src/test/coll.rs | 77 +++++++++++++++++++----------------------------- 1 file changed, 31 insertions(+), 46 deletions(-) diff --git a/src/test/coll.rs b/src/test/coll.rs index 6b3c476b6..3fa2c4a65 100644 --- a/src/test/coll.rs +++ b/src/test/coll.rs @@ -1321,19 +1321,14 @@ async fn bulk_write_failure_has_inserted_ids() { "100,000 documents should have been inserted" ); // docs at index 1 up to and including 100,000 should have been inserted - for i in 1..100001 { - assert!( - failure.inserted_ids.contains_key(&i), - "document at index {} should have been inserted", - i, - ); - assert_eq!( - failure.inserted_ids.get(&i).unwrap(), - docs[i].get("_id").unwrap(), - "inserted_id for index {} should be {}", - i, - docs[i].get("_id").unwrap(), - ) + for (i, doc) in docs.iter().enumerate().take(100001).skip(1) { + match failure.inserted_ids.get(&i) { + Some(doc_id) => { + let expected_id = doc.get("_id").unwrap(); + assert_eq!(doc_id, expected_id, "Doc at index {} did not have expected _id", i); + }, + None => panic!("Document at index {} should have been inserted", i), + } } } _ => panic!("Expected BulkWrite error, but got: {:?}", err), @@ -1359,19 +1354,14 @@ async fn bulk_write_failure_has_inserted_ids() { ); // docs at index 0 up to and including 100,000 should have been inserted; // doc at index 100,001 generates a duplicate key error - for i in 0..100001 { - assert!( - failure.inserted_ids.contains_key(&i), - "document at index {} should have been inserted", - i, - ); - assert_eq!( - failure.inserted_ids.get(&i).unwrap(), - docs[i].get("_id").unwrap(), - "inserted_id for index {} should be {}", - i, - docs[i].get("_id").unwrap(), - ) + for (i, doc) in docs.iter().enumerate().take(100001) { + match failure.inserted_ids.get(&i) { + Some(doc_id) => { + let expected_id = doc.get("_id").unwrap(); + assert_eq!(doc_id, expected_id, "Doc at index {} did not have expected _id", i); + }, + None => panic!("Document at index {} should have been inserted", i), + } } } _ => panic!("Expected BulkWrite error, but got: {:?}", err), @@ -1389,27 +1379,22 @@ async fn bulk_write_failure_has_inserted_ids() { "100002 documents should have been inserted" ); // docs at index 0 up to and including 100,000 should have been inserted - for i in 0..100001 { - assert!( - failure.inserted_ids.contains_key(&i), - "document at index {} should have been inserted", - i, - ); - assert_eq!( - failure.inserted_ids.get(&i).unwrap(), - docs[i].get("_id").unwrap(), - "inserted_id for index {} should be {}", - i, - docs[i].get("_id").unwrap(), - ); - // doc at index 100,001 generates a duplicate key error; doc at index - // 100,002 should be inserted - assert_eq!( - failure.inserted_ids.get(&100002).unwrap(), - docs[100002].get("_id").unwrap(), - "inserted_id for index 1000002 should be 100003", - ); + for (i, doc) in docs.iter().enumerate().take(100001) { + match failure.inserted_ids.get(&i) { + Some(doc_id) => { + let expected_id = doc.get("_id").unwrap(); + assert_eq!(doc_id, expected_id, "Doc at index {} did not have expected _id", i); + }, + None => panic!("Document at index {} should have been inserted", i), + } } + // doc at index 100,001 generates a duplicate key error; doc at index + // 100,002 should be inserted + assert_eq!( + failure.inserted_ids.get(&100002).unwrap(), + docs[100002].get("_id").unwrap(), + "inserted_id for index 1000002 should be 100003", + ); } _ => panic!("Expected BulkWrite error, but got: {:?}", err), } From 57c7f85826f5d4acf00952794d7f7ccec20b2530 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Fri, 7 Oct 2022 16:41:47 -0400 Subject: [PATCH 5/5] fmt --- src/test/coll.rs | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/src/test/coll.rs b/src/test/coll.rs index 3fa2c4a65..c4b46bece 100644 --- a/src/test/coll.rs +++ b/src/test/coll.rs @@ -1325,8 +1325,12 @@ async fn bulk_write_failure_has_inserted_ids() { match failure.inserted_ids.get(&i) { Some(doc_id) => { let expected_id = doc.get("_id").unwrap(); - assert_eq!(doc_id, expected_id, "Doc at index {} did not have expected _id", i); - }, + assert_eq!( + doc_id, expected_id, + "Doc at index {} did not have expected _id", + i + ); + } None => panic!("Document at index {} should have been inserted", i), } } @@ -1358,8 +1362,12 @@ async fn bulk_write_failure_has_inserted_ids() { match failure.inserted_ids.get(&i) { Some(doc_id) => { let expected_id = doc.get("_id").unwrap(); - assert_eq!(doc_id, expected_id, "Doc at index {} did not have expected _id", i); - }, + assert_eq!( + doc_id, expected_id, + "Doc at index {} did not have expected _id", + i + ); + } None => panic!("Document at index {} should have been inserted", i), } } @@ -1383,8 +1391,12 @@ async fn bulk_write_failure_has_inserted_ids() { match failure.inserted_ids.get(&i) { Some(doc_id) => { let expected_id = doc.get("_id").unwrap(); - assert_eq!(doc_id, expected_id, "Doc at index {} did not have expected _id", i); - }, + assert_eq!( + doc_id, expected_id, + "Doc at index {} did not have expected _id", + i + ); + } None => panic!("Document at index {} should have been inserted", i), } }