From 8994a7a5d5ebc61362c828eaab1857d8b5a394df Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 25 Sep 2024 18:31:39 +0200 Subject: [PATCH 1/2] Accept a closure to abort the indexing process --- src/error.rs | 4 +++ src/tests/writer.rs | 34 ++++++++++++++++++++++ src/writer.rs | 71 +++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 106 insertions(+), 3 deletions(-) diff --git a/src/error.rs b/src/error.rs index db14e071..fe0935c2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -51,6 +51,10 @@ pub enum Error { #[error("The trees have not been built after an update on index {0}")] NeedBuild(u16), + /// Returned iff the `should_abort` function returned true. + #[error("The corresponding build process has been cancelled")] + BuildCancelled, + /// Internal error #[error("Internal error: {mode}({item}) is missing in index `{index}`")] MissingKey { diff --git a/src/tests/writer.rs b/src/tests/writer.rs index 2dd069dc..e8bffe4e 100644 --- a/src/tests/writer.rs +++ b/src/tests/writer.rs @@ -1,4 +1,7 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; + use heed::EnvOpenOptions; +use insta::assert_snapshot; use rand::seq::SliceRandom; use rand::Rng; @@ -1042,3 +1045,34 @@ fn prepare_changing_distance() { assert!(writer.need_build(&wtxn).unwrap(), "because an item has been updated"); writer.builder(&mut rng).build(&mut wtxn).unwrap(); } + +#[test] +fn cancel_indexing_process() { + let handle = create_database::(); + let mut rng = rng(); + let mut wtxn = handle.env.write_txn().unwrap(); + let writer = Writer::new(handle.database, 0, 2); + writer.add_item(&mut wtxn, 0, &[0.0, 0.0]).unwrap(); + // Cancel straight away + let err = writer.builder(&mut rng).cancel(|| true).build(&mut wtxn).unwrap_err(); + assert_snapshot!(err, @"The corresponding build process has been cancelled"); + + // Do not cancel at all + writer.builder(&mut rng).cancel(|| false).build(&mut wtxn).unwrap(); + + // Cancel after being called a few times + let writer = Writer::new(handle.database, 0, 2); + for i in 0..100 { + writer.add_item(&mut wtxn, i, &[i as f32, 1.1]).unwrap(); + } + let cpt = AtomicUsize::new(0); + let err = writer + .builder(&mut rng) + .cancel(|| { + let prev = cpt.fetch_add(1, Ordering::Relaxed); + prev > 5 + }) + .build(&mut wtxn) + .unwrap_err(); + assert_snapshot!(err, @"The corresponding build process has been cancelled"); +} diff --git a/src/writer.rs b/src/writer.rs index eaf64c7d..b3c77809 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -31,13 +31,20 @@ use crate::{ pub struct ArroyBuilder<'a, D: Distance, R: Rng + SeedableRng> { writer: &'a Writer, rng: &'a mut R, - inner: BuildOption, + inner: BuildOption<'a>, } /// The options available when building the arroy database. -struct BuildOption { +struct BuildOption<'a> { n_trees: Option, split_after: Option, + cancel: Box bool + 'a + Sync + Send>, +} + +impl Default for BuildOption<'_> { + fn default() -> Self { + Self { n_trees: None, split_after: None, cancel: Box::new(|| false) } + } } impl<'a, D: Distance, R: Rng + SeedableRng> ArroyBuilder<'a, D, R> { @@ -79,6 +86,41 @@ impl<'a, D: Distance, R: Rng + SeedableRng> ArroyBuilder<'a, D, R> { self } + /// Provide a closure that can cancel the indexing process early if needed. + /// There is no guarantee on when the process is going to cancel itself, but + /// arroy will try to stop as soon as possible once the closure returns `true`. + /// + /// Since the closure is not mutable and will be called from multiple threads + /// at the same time it's encouraged to make it quick to execute. A common + /// way to use it is to fetch an `AtomicBool` inside it that can be set + /// from another thread without lock. + /// + /// # Example + /// + /// ```no_run + /// # use arroy::{Writer, distances::Euclidean}; + /// # let (writer, wtxn): (Writer, heed::RwTxn) = todo!(); + /// use rand::rngs::StdRng; + /// use rand::SeedableRng; + /// use std::sync::atomic::{AtomicBool, Ordering}; + /// + /// let stops_after = AtomicBool::new(false); + /// + /// // Cancel the task after one minute + /// std::thread::spawn(|| { + /// let one_minute = std::time::Duration::from_secs(60); + /// std::thread::sleep(one_minute); + /// stops_after.store(true, Ordering::Relaxed); + /// }); + /// + /// let mut rng = StdRng::seed_from_u64(92); + /// writer.builder(&mut rng).split_after(1000).build(&mut wtxn); + /// ``` + pub fn cancel(&mut self, cancel: impl Fn() -> bool + 'a + Sync + Send) -> &mut Self { + self.inner.cancel = Box::new(cancel); + self + } + /// Generates a forest of `n_trees` trees. /// /// More trees give higher precision when querying at the cost of more disk usage. @@ -334,7 +376,7 @@ impl Writer { /// Returns an [`ArroyBuilder`] to configure the available options to build the database. pub fn builder<'a, R: Rng + SeedableRng>(&'a self, rng: &'a mut R) -> ArroyBuilder<'a, D, R> { - ArroyBuilder { writer: self, rng, inner: BuildOption { n_trees: None, split_after: None } } + ArroyBuilder { writer: self, rng, inner: BuildOption::default() } } fn build( @@ -345,6 +387,10 @@ impl Writer { ) -> Result<()> { log::debug!("started preprocessing the items..."); + if (options.cancel)() { + return Err(Error::BuildCancelled); + } + D::preprocess(wtxn, |wtxn| { Ok(self .database @@ -353,6 +399,10 @@ impl Writer { .remap_key_type::()) })?; + if (options.cancel)() { + return Err(Error::BuildCancelled); + } + let item_indices = self.item_indices(wtxn)?; let n_items = item_indices.len(); @@ -415,6 +465,10 @@ impl Writer { log::debug!("Getting a reference to your {} items...", n_items); + if (options.cancel)() { + return Err(Error::BuildCancelled); + } + let used_node_ids = self.used_tree_node(wtxn)?; let nb_tree_nodes = used_node_ids.len(); @@ -483,6 +537,7 @@ impl Writer { log::debug!("Deleting the extraneous trees if there is some..."); self.delete_extra_trees( wtxn, + options, &mut roots, options.n_trees, concurrent_node_ids.used(), @@ -565,6 +620,9 @@ impl Writer { to_delete: &RoaringBitmap, tmp_nodes: &mut TmpNodes>, ) -> Result<(NodeId, RoaringBitmap)> { + if (opt.cancel)() { + return Err(Error::BuildCancelled); + } match current_node.mode { NodeMode::Item => { // We were called on a specific item, we should create a descendants node @@ -768,6 +826,9 @@ impl Writer { item_indices: &RoaringBitmap, tmp_nodes: &mut TmpNodes>, ) -> Result { + if (opt.cancel)() { + return Err(Error::BuildCancelled); + } if item_indices.len() == 1 { return Ok(NodeId::item(item_indices.min().unwrap())); } @@ -839,6 +900,7 @@ impl Writer { fn delete_extra_trees( &self, wtxn: &mut RwTxn, + opt: &BuildOption, roots: &mut Vec, nb_trees: Option, nb_tree_nodes: u64, @@ -868,6 +930,9 @@ impl Writer { log::debug!("Deleting {} trees", to_delete.len()); for tree in to_delete { + if (opt.cancel)() { + return Err(Error::BuildCancelled); + } self.delete_tree(wtxn, NodeId::tree(tree))?; } } From d1b64edd8e9945a825d3758a5f30f81a626164b0 Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 25 Sep 2024 18:41:49 +0200 Subject: [PATCH 2/2] try to make the ci happy --- src/tests/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/writer.rs b/src/tests/writer.rs index e8bffe4e..ccfe5fec 100644 --- a/src/tests/writer.rs +++ b/src/tests/writer.rs @@ -1048,7 +1048,7 @@ fn prepare_changing_distance() { #[test] fn cancel_indexing_process() { - let handle = create_database::(); + let handle = create_database::(); let mut rng = rng(); let mut wtxn = handle.env.write_txn().unwrap(); let writer = Writer::new(handle.database, 0, 2);