From bec25942e80e835f974244674a237784df1d3cff Mon Sep 17 00:00:00 2001 From: AveryQi115 Date: Fri, 12 Jan 2024 14:36:14 -0500 Subject: [PATCH 1/5] diskann init Signed-off-by: AveryQi115 --- crates/service/src/index/indexing/diskann.rs | 157 +++++++++++++++++++ crates/service/src/index/indexing/mod.rs | 19 +++ 2 files changed, 176 insertions(+) create mode 100644 crates/service/src/index/indexing/diskann.rs diff --git a/crates/service/src/index/indexing/diskann.rs b/crates/service/src/index/indexing/diskann.rs new file mode 100644 index 000000000..345c4af5d --- /dev/null +++ b/crates/service/src/index/indexing/diskann.rs @@ -0,0 +1,157 @@ +use super::AbstractIndexing; +use crate::algorithms::diskann::DiskANN; +use crate::algorithms::quantization::QuantizationOptions; +use crate::index::segments::growing::GrowingSegment; +use crate::index::segments::sealed::SealedSearchGucs; +use crate::index::segments::sealed::SealedSegment; +use crate::index::IndexOptions; +use crate::prelude::*; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; +use std::sync::Arc; +use validator::Validate; + +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] +#[serde(deny_unknown_fields)] +pub struct DiskANNIndexingOptions { + #[serde(default = "DiskANNIndexingOptions::default_index_path_prefix")] + pub index_path_prefix: String, + + #[serde(default = "DiskANNIndexingOptions::default_data_path")] + pub data_path: String, + + // DRAM budget in GB for searching the index to set the compressed level + // for data while search happens + + //bound on the memory footprint of the index at search time in GB. Once built, + // the index will use up only the specified RAM limit, the rest will reside on disk. + // This will dictate how aggressively we compress the data vectors to store in memory. + // Larger will yield better performance at search time. For an n point index, to use + // b byte PQ compressed representation in memory, use `B = ((n * b) / 2^30 + (250000*(4*R + sizeof(T)*ndim)) / 2^30)`. + // The second term in the summation is to allow some buffer for caching about 250,000 nodes from the graph in memory while serving. + // If you are not sure about this term, add 0.25GB to the first term. + #[serde(default = "DiskANNIndexingOptions::default_search_DRAM_budget")] + pub search_DRAM_budget: u32, + + // DRAM budget in GB for building the index + // Limit on the memory allowed for building the index in GB. + // If you specify a value less than what is required to build the index + // in one pass, the index is built using a divide and conquer approach so + // that sub-graphs will fit in the RAM budget. The sub-graphs are overlayed + // to build the overall index. This approach can be upto 1.5 times slower than + // building the index in one shot. Allocate as much memory as your RAM allows. + #[serde(default = "DiskANNIndexingOptions::default_build_DRAM_budget")] + pub build_DRAM_budget: u32, + + #[serde(default = "DiskANNIndexingOptions::default_num_threads")] + pub num_threads: u32, + + // R in the paper + #[serde(default = "DiskANNIndexingOptions::default_max_degree")] + pub max_degree: u32, + + // L in the paper + #[serde(default = "DiskANNIndexingOptions::default_Lbuild")] + pub max_degree: u32, + + // TODO: QD (quantized dimension) + // TODO: codebook prefix + // TODO: PQ disk bytes (compressed bytes on SSD; 0 for no compression) + // TODO: append reorder data (include full precision data in the index; use only in conjunction with compressed data on SSD) + // TODO: build_PQ_bytes + // TODO: use opq + // TODO: label file (for filtered diskANN) + // TODO: universal label (for filtered diskANN) + // TODO: filtered Lbuild (for filtered diskANN) + // TODO: filter threshold (for filtered diskANN) + // TODO: label type (for filtered diskANN) + + #[serde(default)] + #[validate] + pub quantization: QuantizationOptions, +} + +impl DiskANNIndexingOptions { + fn default_index_path_prefix() -> String { + "DiskANN_index".to_string() + } + fn default_data_path() -> u32 { + "DiskANN_data".to_string() + } + fn default_search_DRAM_budget() -> u32 { + 1 + } + fn default_build_DRAM_budget() -> u32 { + 1 + } + fn default_num_threads() -> usize { + match std::thread::available_parallelism() { + Ok(threads) => (threads.get() as f64).sqrt() as _, + Err(_) => 1, + } + } + fn default_max_degree() -> u32 { + 64 + } + fn default_Lbuild() -> u32 { + 100 + } +} + +impl Default for DiskANNIndexingOptions { + fn default() -> Self { + Self { + index_path_prefix: Self::default_index_path_prefix(), + data_path: Self::default_data_path(), + search_DRAM_budget: Self::default_search_DRAM_budget(), + build_DRAM_budget: Self::default_build_DRAM_budget(), + num_threads: Self::default_num_threads(), + max_degree: Self::default_max_degree(), + Lbuild: Self::default_Lbuild(), + quantization: Default::default(), + } + } +} + +pub struct DiskANNIndexing { + raw: Ivf, +} + +impl AbstractIndexing for IvfIndexing { + fn create( + path: PathBuf, + options: IndexOptions, + sealed: Vec>>, + growing: Vec>>, + ) -> Self { + let raw = Ivf::create(path, options, sealed, growing); + Self { raw } + } + + fn open(path: PathBuf, options: IndexOptions) -> Self { + let raw = Ivf::open(path, options); + Self { raw } + } + + fn len(&self) -> u32 { + self.raw.len() + } + + fn vector(&self, i: u32) -> &[S::Scalar] { + self.raw.vector(i) + } + + fn payload(&self, i: u32) -> Payload { + self.raw.payload(i) + } + + fn search( + &self, + k: usize, + vector: &[S::Scalar], + gucs: SealedSearchGucs, + filter: &mut impl Filter, + ) -> Heap { + self.raw.search(k, vector, gucs.ivf_nprob, filter) + } +} diff --git a/crates/service/src/index/indexing/mod.rs b/crates/service/src/index/indexing/mod.rs index 9b2468d87..274a79199 100644 --- a/crates/service/src/index/indexing/mod.rs +++ b/crates/service/src/index/indexing/mod.rs @@ -1,10 +1,12 @@ pub mod flat; pub mod hnsw; pub mod ivf; +pub mod diskann; use self::flat::{FlatIndexing, FlatIndexingOptions}; use self::hnsw::{HnswIndexing, HnswIndexingOptions}; use self::ivf::{IvfIndexing, IvfIndexingOptions}; +use self::diskann::{DiskANNIndexing, DiskANNIndexingOptions}; use super::segments::growing::GrowingSegment; use super::segments::sealed::SealedSegment; use super::IndexOptions; @@ -24,6 +26,7 @@ pub enum IndexingOptions { Flat(FlatIndexingOptions), Ivf(IvfIndexingOptions), Hnsw(HnswIndexingOptions), + DiskANN(DiskANNIndexingOptions), } impl IndexingOptions { @@ -45,6 +48,12 @@ impl IndexingOptions { }; x } + pub fn unwrap_diskann(self) -> DiskANNIndexingOptions { + let IndexingOptions::DiskANN(x) = self else { + unreachable!() + }; + x + } } impl Default for IndexingOptions { @@ -59,6 +68,7 @@ impl Validate for IndexingOptions { Self::Flat(x) => x.validate(), Self::Ivf(x) => x.validate(), Self::Hnsw(x) => x.validate(), + Self::DiskANN(x) => x.validate(), } } } @@ -92,6 +102,7 @@ pub enum DynamicIndexing { Flat(FlatIndexing), Ivf(IvfIndexing), Hnsw(HnswIndexing), + DiskANN(DiskANNIndexing), } impl DynamicIndexing { @@ -111,6 +122,9 @@ impl DynamicIndexing { IndexingOptions::Hnsw(_) => { Self::Hnsw(HnswIndexing::create(path, options, sealed, growing)) } + IndexingOptions::DiskANN(_) => { + Self::DiskANN(DiskANNIndexing::create(path, options, sealed, growing)) + } } } @@ -119,6 +133,7 @@ impl DynamicIndexing { IndexingOptions::Flat(_) => Self::Flat(FlatIndexing::open(path, options)), IndexingOptions::Ivf(_) => Self::Ivf(IvfIndexing::open(path, options)), IndexingOptions::Hnsw(_) => Self::Hnsw(HnswIndexing::open(path, options)), + IndexingOptions::DiskANN(_) => Self::DiskANN(DiskANNIndexing::open(path, options)), } } @@ -127,6 +142,7 @@ impl DynamicIndexing { DynamicIndexing::Flat(x) => x.len(), DynamicIndexing::Ivf(x) => x.len(), DynamicIndexing::Hnsw(x) => x.len(), + DynamicIndexing::DiskANN(x) => x.len(), } } @@ -135,6 +151,7 @@ impl DynamicIndexing { DynamicIndexing::Flat(x) => x.vector(i), DynamicIndexing::Ivf(x) => x.vector(i), DynamicIndexing::Hnsw(x) => x.vector(i), + DynamicIndexing::DiskANN(x) => x.vector(i), } } @@ -143,6 +160,7 @@ impl DynamicIndexing { DynamicIndexing::Flat(x) => x.payload(i), DynamicIndexing::Ivf(x) => x.payload(i), DynamicIndexing::Hnsw(x) => x.payload(i), + DynamicIndexing::DiskANN(x) => x.payload(i), } } @@ -156,6 +174,7 @@ impl DynamicIndexing { DynamicIndexing::Flat(x) => x.basic(vector, opts, filter), DynamicIndexing::Ivf(x) => x.basic(vector, opts, filter), DynamicIndexing::Hnsw(x) => x.basic(vector, opts, filter), + DynamicIndexing::DiskANN(x) => x.basic(vector, opts, filter), } } From 1f9842bc1eba42b49005ab75500e26c6c566d437 Mon Sep 17 00:00:00 2001 From: AveryQi115 Date: Mon, 15 Jan 2024 15:21:52 -0500 Subject: [PATCH 2/5] diskann: in memory vamana based on current segments Signed-off-by: AveryQi115 --- crates/service/src/algorithms/diskann.rs | 564 +++++++++++++++++++ crates/service/src/algorithms/mod.rs | 1 + crates/service/src/index/indexing/diskann.rs | 147 +++-- crates/service/src/index/indexing/mod.rs | 1 + crates/service/src/index/mod.rs | 2 + 5 files changed, 656 insertions(+), 59 deletions(-) create mode 100644 crates/service/src/algorithms/diskann.rs diff --git a/crates/service/src/algorithms/diskann.rs b/crates/service/src/algorithms/diskann.rs new file mode 100644 index 000000000..8d6af325f --- /dev/null +++ b/crates/service/src/algorithms/diskann.rs @@ -0,0 +1,564 @@ +use super::raw::Raw; +use crate::index::segments::growing::GrowingSegment; +use crate::index::segments::sealed::SealedSegment; +use crate::index::{IndexOptions, SearchOptions, VectorOptions}; +use crate::prelude::*; +use crate::utils::dir_ops::sync_dir; +use crate::utils::mmap_array::MmapArray; +use crate::utils::element_heap::ElementHeap; +use rayon::prelude::{IntoParallelIterator, ParallelIterator}; +use rand::distributions::Uniform; +use rand::Rng; +use std::cmp::Reverse; +use std::collections::BinaryHeap; +use std::fs::create_dir; +use std::collections::{BTreeMap, HashSet}; +use rand::prelude::SliceRandom; +use parking_lot::{RwLock, RwLockWriteGuard}; +use std::path::PathBuf; +use std::sync::Arc; + +pub struct DiskANN { + mmap: DiskANNMmap, +} + +impl DiskANN { + pub fn create( + path: PathBuf, + options: IndexOptions, + sealed: Vec>>, + growing: Vec>>, + ) -> Self { + create_dir(&path).unwrap(); + let ram = make(path.clone(), sealed, growing, options.clone()); + let mmap = save(ram, path.clone()); + sync_dir(&path); + Self { mmap } + } + pub fn open(path: PathBuf, options: IndexOptions) -> Self { + let mmap = load(path, options.clone()); + Self { mmap } + } + + pub fn len(&self) -> u32 { + self.mmap.raw.len() + } + + pub fn vector(&self, i: u32) -> &[S::Scalar] { + self.mmap.raw.vector(i) + } + + pub fn payload(&self, i: u32) -> Payload { + self.mmap.raw.payload(i) + } + + pub fn basic( + &self, + vector: &[S::Scalar], + opts: &SearchOptions, + filter: impl Filter, + ) -> BinaryHeap> { + basic(&self.mmap, vector, opts.disk_ann_k, filter) + } +} + +unsafe impl Send for DiskANN {} +unsafe impl Sync for DiskANN {} + +pub struct VertexWithDistance { + pub id: u32, + pub distance: F32, +} + +impl VertexWithDistance { + pub fn new(id: u32, distance: F32) -> Self { + Self { id, distance } + } +} + +impl PartialEq for VertexWithDistance { + fn eq(&self, other: &Self) -> bool { + self.distance.eq(&other.distance) + } +} + +impl Eq for VertexWithDistance {} + +impl PartialOrd for VertexWithDistance { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.distance.cmp(&other.distance)) + } +} + +impl Ord for VertexWithDistance { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.distance.cmp(&other.distance) + } +} + +pub struct SearchState { + pub visited: HashSet, + candidates: BTreeMap, + heap: BinaryHeap>, + heap_visited: HashSet, + l: usize, + k: usize, +} + +impl SearchState { + /// Creates a new search state. + pub(crate) fn new(k: usize, l: usize) -> Self { + Self { + visited: HashSet::new(), + candidates: BTreeMap::new(), + heap: BinaryHeap::new(), + heap_visited: HashSet::new(), + k, + l, + } + } + + /// Return the next unvisited vertex. + fn pop(&mut self) -> Option { + while let Some(vertex) = self.heap.pop() { + if !self.candidates.contains_key(&vertex.0.distance) { + // The vertex has been removed from the candidate lists, + // from [`push()`]. + continue; + } + + self.visited.insert(vertex.0.id); + return Some(vertex.0.id); + } + + None + } + + /// Push a new (unvisited) vertex into the search state. + fn push(&mut self, vertex_id: u32, distance: F32) { + assert!(!self.visited.contains(&vertex_id)); + self.heap_visited.insert(vertex_id); + self.heap + .push(Reverse(VertexWithDistance::new(vertex_id, distance))); + self.candidates.insert(distance, vertex_id); + if self.candidates.len() > self.l { + self.candidates.pop_last(); + } + } + + /// Mark a vertex as visited. + fn visit(&mut self, vertex_id: u32) { + self.visited.insert(vertex_id); + } + + // Returns true if the vertex has been visited. + fn is_visited(&self, vertex_id: u32) -> bool { + self.visited.contains(&vertex_id) || self.heap_visited.contains(&vertex_id) + } +} + +struct VertexNeighbor{ + neighbors: Vec, +} + +// DiskANNRam is for constructing the index +// it stores the intermediate structure when constructing +// the index and these data are stored in memory +pub struct DiskANNRam { + raw: Arc>, + // quantization: Quantization, + vertexs: Vec>, + /// the entry for the entire graph, the closet vector to centroid + medoid: u32, + dims: u16, + max_degree: u32, + alpha: f32, + l_build: u32, +} + +pub struct DiskANNMmap { + raw: Arc>, + neighbors: MmapArray, + neighbor_offset: MmapArray, + medoid: MmapArray, + r: u32, + alpha: f32, + l: u32, +} + +impl DiskANNRam{ + fn _init_graph(&self, n: u32, mut rng: impl Rng) { + let distribution = Uniform::new(0, n); + for i in 0..n { + let mut neighbor_ids: HashSet = HashSet::new(); + if self.max_degree < n { + while neighbor_ids.len() < self.max_degree as usize { + let neighbor_id = rng.sample(distribution); + if neighbor_id != i { + neighbor_ids.insert(neighbor_id); + } + } + } else { + neighbor_ids = (0..n).collect(); + } + + self._set_neighbors(i, &neighbor_ids); + } + } + + fn _set_neighbors( + &self, + vertex_index: u32, + neighbor_ids: &HashSet, + ) { + assert!(neighbor_ids.len() <= self.max_degree as usize); + assert!((vertex_index as usize) < self.vertexs.len()); + + let mut vertex = self.vertexs[vertex_index as usize].write(); + vertex.neighbors.clear(); + for item in neighbor_ids.iter() { + vertex.neighbors.push(*item); + } + } + + fn _set_neighbors_with_write_guard( + &self, + neighbor_ids: &HashSet, + guard: &RwLockWriteGuard, + ) { + assert!(neighbor_ids.len() <= self.max_degree as usize); + (*guard).neighbors.clear(); + for item in neighbor_ids.iter() { + (*guard).neighbors.push(*item); + } + } + + fn _get_neighbors( + &self, + vertex_index: u32, + ) -> VertexNeighbor { + let vertex = self.vertexs[vertex_index as usize].read(); + *vertex + } + + fn _find_medoid(&self, n: u32) -> u32 { + let centroid = self._compute_centroid(n); + let centroid_arr: &[S::Scalar] = ¢roid; + + let mut medoid_index = 0; + let mut min_dis = F32::infinity(); + for i in 0..n { + let dis = S::distance(centroid_arr, self.raw.vector(i)); + if dis < min_dis { + min_dis = dis; + medoid_index = i; + } + } + medoid_index + } + + fn _compute_centroid(&self, n: u32) -> Vec { + let dim = self.dims as usize; + let mut sum = vec![0_f32; dim]; + for i in 0..n { + let vec = self.raw.vector(i); + for j in 0..dim { + sum[j] += vec[j].to_f32(); + } + } + + let collection: Vec = sum + .iter() + .map(|v| S::Scalar::from_f32((*v / n as f32) as f32)) + .collect(); + collection + } + + // r and l leave here for multiple pass extension + fn _one_pass(&self, n: u32, alpha: f32, r: u32, l: u32, mut rng: impl Rng) { + let mut ids = (0..n).collect::>(); + ids.shuffle(&mut rng); + + ids.into_par_iter() + .for_each(|id| self.search_and_prune_for_one_vertex(id, alpha, r, l)); + } + + fn search_and_prune_for_one_vertex(&self, id: u32, alpha: f32, r: u32, l: u32) { + let query = self.raw.vector(id); + let mut state = self._greedy_search(self.medoid, query, 1, l as usize); + state.visited.remove(&id); // in case visited has id itself + let mut new_neighbor_ids: HashSet = HashSet::new(); + { + let mut guard = self.vertexs[id as usize].write(); + let neighbor_ids : Vec = (*guard).neighbors; + state.visited.extend(neighbor_ids.iter().map(|x| *x)); + let neighbor_ids = self._robust_prune(id, state.visited, alpha, r); + let neighbor_ids: HashSet = neighbor_ids.into_iter().collect(); + self._set_neighbors_with_write_guard(&neighbor_ids, &mut guard); + new_neighbor_ids = neighbor_ids; + } + + for &neighbor_id in new_neighbor_ids.iter() { + { + let mut guard = self.vertexs[neighbor_id as usize].write(); + let old_neighbors : Vec = (*guard).neighbors; + let mut old_neighbors: HashSet = + old_neighbors.iter().map(|x| *x).collect(); + old_neighbors.insert(id); + if old_neighbors.len() > r as usize { + // need robust prune + let new_neighbors = self._robust_prune(neighbor_id, old_neighbors, alpha, r); + let new_neighbors: HashSet = new_neighbors.into_iter().collect(); + self._set_neighbors_with_write_guard(&new_neighbors, &mut guard); + } else { + self._set_neighbors_with_write_guard(&old_neighbors, &mut guard); + } + } + } + } + + fn _greedy_search( + &self, + start: u32, + query: &[S::Scalar], + k: usize, + search_size: usize, + ) -> SearchState { + let mut state = SearchState::new(k, search_size); + + let dist = S::distance(query, self.raw.vector(start)); + state.push(start, dist); + while let Some(id) = state.pop() { + // only pop id in the search list but not visited + state.visit(id); + { + let neighbor_ids = self._get_neighbors(id).neighbors; + for neighbor_id in neighbor_ids { + if state.is_visited(neighbor_id) { + continue; + } + + let dist = S::distance(query, self.raw.vector(neighbor_id)); + state.push(neighbor_id, dist); // push and retain closet l nodes + } + } + } + + state + } + + fn _robust_prune(&self, id: u32, mut visited: HashSet, alpha: f32, r: u32) -> Vec { + let mut heap: BinaryHeap = visited + .iter() + .map(|v| { + let dist = S::distance(self.raw.vector(id), self.raw.vector(*v)); + VertexWithDistance { + id: *v, + distance: dist, + } + }) + .collect(); + + let mut new_neighbor_ids: Vec = vec![]; + while !visited.is_empty() { + if let Some(mut p) = heap.pop() { + while !visited.contains(&p.id) { + match heap.pop() { + Some(value) => { + p = value; + } + None => { + return new_neighbor_ids; + } + } + } + new_neighbor_ids.push(p.id); + if new_neighbor_ids.len() >= r as usize { + break; + } + let mut to_remove: HashSet = HashSet::new(); + for pv in visited.iter() { + let dist_prime = S::distance(self.raw.vector(p.id), self.raw.vector(*pv)); + let dist_query = S::distance(self.raw.vector(id), self.raw.vector(*pv)); + if F32::from(alpha) * dist_prime <= dist_query { + to_remove.insert(*pv); + } + } + for pv in to_remove.iter() { + visited.remove(pv); + } + } else { + return new_neighbor_ids; + } + } + new_neighbor_ids + } +} + +impl DiskANNMmap{ + fn _get_neighbors(self, id: u32) -> Vec{ + let start = self.neighbor_offset[id as usize]; + let end = self.neighbor_offset[id as usize + 1]; + self.neighbors[start..end].to_vec() + } +} + +unsafe impl Send for DiskANNMmap {} +unsafe impl Sync for DiskANNMmap {} + +fn calculate_offsets(iter: impl Iterator) -> impl Iterator { + let mut offset = 0usize; + let mut iter = std::iter::once(0).chain(iter); + std::iter::from_fn(move || { + let x = iter.next()?; + offset += x; + Some(offset) + }) +} + +pub fn make( + path: PathBuf, + sealed: Vec>>, + growing: Vec>>, + options: IndexOptions, +) -> DiskANNRam { + let idx_opts = options.indexing.clone().unwrap_diskann(); + let raw = Arc::new(Raw::create( + path.join("raw"), + options.clone(), + sealed, + growing, + )); + + let n = raw.len(); + let r = idx_opts.max_degree; + let VectorOptions { dims, .. } = options.vector; + + let vertexs: Vec> = (0..n).map(|_| { + RwLock::new(VertexNeighbor { neighbors: Vec::new() }) + }).collect(); + + let medoid = 0; + + let mut new_vamana = DiskANNRam:: { + raw, + vertexs, + medoid, + dims, + max_degree: idx_opts.max_degree, + alpha: idx_opts.alpha, + l_build: idx_opts.l_build, + }; + + // 1. init graph with r random neighbors for each node + let rng = rand::thread_rng(); + new_vamana._init_graph(n, rng.clone()); + + // 2. find medoid + new_vamana.medoid = new_vamana._find_medoid(n); + + // 3. iterate pass + new_vamana._one_pass(n, 1.0, r, idx_opts.l_build, rng.clone()); + + new_vamana._one_pass(n, idx_opts.alpha, r, idx_opts.max_degree, rng.clone()); + + new_vamana +} + +pub fn save(ram: DiskANNRam, path: PathBuf) -> DiskANNMmap { + + let neighbors_iter = ram.vertexs.iter() + .flat_map(|vertex| { + let vertex = vertex.read(); + vertex.neighbors.iter().cloned() + }); + + // Create the neighbors array using MmapArray::create. + let neighbors = MmapArray::create(path.join("neighbors"), neighbors_iter); + + // Create an iterator for the size of each neighbor list. + let neighbor_offset_iter = { + let iter = ram.vertexs.iter() + .map(|vertex| { + let vertex = vertex.read(); + vertex.neighbors.len() + }); + calculate_offsets(iter) + }; + + // Create the neighbor_size array using MmapArray::create. + let neighbor_offset = MmapArray::create(path.join("neighbor_offset"), neighbor_offset_iter); + + let medoid_vec = vec![ram.medoid]; + let medoid = MmapArray::create(path.join("medoid"), medoid_vec.into_iter()); + + DiskANNMmap{ + raw: ram.raw, + neighbors, + neighbor_offset, + medoid, + r: ram.max_degree, + alpha: ram.alpha, + l: ram.l_build + } +} + +pub fn load(path: PathBuf, options: IndexOptions) -> DiskANNMmap { + let idx_opts = options.indexing.clone().unwrap_diskann(); + let raw = Arc::new(Raw::open(path.join("raw"), options.clone())); + let neighbors = MmapArray::open(path.join("neighbors")); + let neighbor_offset = MmapArray::open(path.join("neighbor_offset")); + let medoid = MmapArray::open(path.join("medoid")); + assert!(medoid.len() == 1); + + DiskANNMmap{ + raw, + neighbors, + neighbor_offset, + medoid, + r: idx_opts.max_degree, + alpha: idx_opts.alpha, + l: idx_opts.l_build + } +} + +pub fn basic( + mmap: &DiskANNMmap, + vector: &[S::Scalar], + k: u32, + mut filter: impl Filter, +) -> BinaryHeap> { + let mut state = SearchState::new(k as usize, mmap.l as usize); + + let start = mmap.medoid[0]; + let dist = S::distance(vector, mmap.raw.vector(start)); + state.push(start, dist); + while let Some(id) = state.pop() { + // only pop id in the search list but not visited + state.visit(id); + { + let neighbor_ids = mmap._get_neighbors(id); + for neighbor_id in neighbor_ids { + if state.is_visited(neighbor_id) { + continue; + } + + let payload = mmap.raw.payload(neighbor_id); + + if filter.check(payload) { + let dist = S::distance(vector, mmap.raw.vector(neighbor_id)); + state.push(neighbor_id, dist); // push and retain closet l nodes + } + } + } + } + + let mut results = ElementHeap::new(k as usize); + for (distance, id) in state.candidates { + results.push(Element { + distance: distance, + payload: mmap.raw.payload(id), + }); + } + results.into_reversed_heap() +} diff --git a/crates/service/src/algorithms/mod.rs b/crates/service/src/algorithms/mod.rs index a3c5ffd52..215435c99 100644 --- a/crates/service/src/algorithms/mod.rs +++ b/crates/service/src/algorithms/mod.rs @@ -4,3 +4,4 @@ pub mod hnsw; pub mod ivf; pub mod quantization; pub mod raw; +pub mod diskann; \ No newline at end of file diff --git a/crates/service/src/index/indexing/diskann.rs b/crates/service/src/index/indexing/diskann.rs index 345c4af5d..74a6ef3bd 100644 --- a/crates/service/src/index/indexing/diskann.rs +++ b/crates/service/src/index/indexing/diskann.rs @@ -2,46 +2,58 @@ use super::AbstractIndexing; use crate::algorithms::diskann::DiskANN; use crate::algorithms::quantization::QuantizationOptions; use crate::index::segments::growing::GrowingSegment; -use crate::index::segments::sealed::SealedSearchGucs; use crate::index::segments::sealed::SealedSegment; use crate::index::IndexOptions; +use crate::index::SearchOptions; use crate::prelude::*; use serde::{Deserialize, Serialize}; use std::path::PathBuf; use std::sync::Arc; use validator::Validate; +use std::cmp::Reverse; +use std::collections::BinaryHeap; #[derive(Debug, Clone, Serialize, Deserialize, Validate)] #[serde(deny_unknown_fields)] pub struct DiskANNIndexingOptions { - #[serde(default = "DiskANNIndexingOptions::default_index_path_prefix")] - pub index_path_prefix: String, - - #[serde(default = "DiskANNIndexingOptions::default_data_path")] - pub data_path: String, - - // DRAM budget in GB for searching the index to set the compressed level - // for data while search happens + // TODO(avery): Referenced from Microsoft.DiskANN algorithm, One design is to + // leave the definition of memory usage to users and estimate the required + // memory and the given memory to decide the quantization options. + // + // Current design is to let users define the ratio of PQ for in memory index. + // + // Besides, it is hard to estimate current memory usage as sealed segment and + // growing segement are passed to RawMmap and RawRam. Different from the direct + // calculation of the vector layout. + + // #[serde(default = "DiskANNIndexingOptions::default_index_path_prefix")] + // pub index_path_prefix: PathBuf, + + // #[serde(default = "DiskANNIndexingOptions::default_data_path")] + // pub data_path: PathBuf, + + // // DRAM budget in GB for searching the index to set the compressed level + // // for data while search happens - //bound on the memory footprint of the index at search time in GB. Once built, - // the index will use up only the specified RAM limit, the rest will reside on disk. - // This will dictate how aggressively we compress the data vectors to store in memory. - // Larger will yield better performance at search time. For an n point index, to use - // b byte PQ compressed representation in memory, use `B = ((n * b) / 2^30 + (250000*(4*R + sizeof(T)*ndim)) / 2^30)`. - // The second term in the summation is to allow some buffer for caching about 250,000 nodes from the graph in memory while serving. - // If you are not sure about this term, add 0.25GB to the first term. - #[serde(default = "DiskANNIndexingOptions::default_search_DRAM_budget")] - pub search_DRAM_budget: u32, - - // DRAM budget in GB for building the index - // Limit on the memory allowed for building the index in GB. - // If you specify a value less than what is required to build the index - // in one pass, the index is built using a divide and conquer approach so - // that sub-graphs will fit in the RAM budget. The sub-graphs are overlayed - // to build the overall index. This approach can be upto 1.5 times slower than - // building the index in one shot. Allocate as much memory as your RAM allows. - #[serde(default = "DiskANNIndexingOptions::default_build_DRAM_budget")] - pub build_DRAM_budget: u32, + // // bound on the memory footprint of the index at search time in GB. Once built, + // // the index will use up only the specified RAM limit, the rest will reside on disk. + // // This will dictate how aggressively we compress the data vectors to store in memory. + // // Larger will yield better performance at search time. For an n point index, to use + // // b byte PQ compressed representation in memory, use `B = ((n * b) / 2^30 + (250000*(4*R + sizeof(T)*ndim)) / 2^30)`. + // // The second term in the summation is to allow some buffer for caching about 250,000 nodes from the graph in memory while serving. + // // If you are not sure about this term, add 0.25GB to the first term. + // #[serde(default = "DiskANNIndexingOptions::default_search_DRAM_budget")] + // pub search_DRAM_budget: u32, + + // // DRAM budget in GB for building the index + // // Limit on the memory allowed for building the index in GB. + // // If you specify a value less than what is required to build the index + // // in one pass, the index is built using a divide and conquer approach so + // // that sub-graphs will fit in the RAM budget. The sub-graphs are overlayed + // // to build the overall index. This approach can be upto 1.5 times slower than + // // building the index in one shot. Allocate as much memory as your RAM allows. + // #[serde(default = "DiskANNIndexingOptions::default_build_DRAM_budget")] + // pub build_DRAM_budget: u32, #[serde(default = "DiskANNIndexingOptions::default_num_threads")] pub num_threads: u32, @@ -51,8 +63,12 @@ pub struct DiskANNIndexingOptions { pub max_degree: u32, // L in the paper - #[serde(default = "DiskANNIndexingOptions::default_Lbuild")] - pub max_degree: u32, + #[serde(default = "DiskANNIndexingOptions::default_l_build")] + pub l_build: u32, + + // alpha in the paper, slack factor + #[serde(default = "DiskANNIndexingOptions::default_alpha")] + pub alpha: f32, // TODO: QD (quantized dimension) // TODO: codebook prefix @@ -72,19 +88,20 @@ pub struct DiskANNIndexingOptions { } impl DiskANNIndexingOptions { - fn default_index_path_prefix() -> String { - "DiskANN_index".to_string() - } - fn default_data_path() -> u32 { - "DiskANN_data".to_string() - } - fn default_search_DRAM_budget() -> u32 { - 1 - } - fn default_build_DRAM_budget() -> u32 { - 1 - } - fn default_num_threads() -> usize { + // fn default_index_path_prefix() -> PathBuf { + // "DiskANN_index".to_string().into() + // } + // fn default_data_path() -> PathBuf { + // "DiskANN_data".to_string().into() + // } + // fn default_search_DRAM_budget() -> u32 { + // 1 + // } + // fn default_build_DRAM_budget() -> u32 { + // 1 + // } + + fn default_num_threads() -> u32 { match std::thread::available_parallelism() { Ok(threads) => (threads.get() as f64).sqrt() as _, Err(_) => 1, @@ -93,43 +110,47 @@ impl DiskANNIndexingOptions { fn default_max_degree() -> u32 { 64 } - fn default_Lbuild() -> u32 { + fn default_l_build() -> u32 { 100 } + fn default_alpha() -> f32 { + 1.2 + } } impl Default for DiskANNIndexingOptions { fn default() -> Self { Self { - index_path_prefix: Self::default_index_path_prefix(), - data_path: Self::default_data_path(), - search_DRAM_budget: Self::default_search_DRAM_budget(), - build_DRAM_budget: Self::default_build_DRAM_budget(), + // index_path_prefix: Self::default_index_path_prefix(), + // data_path: Self::default_data_path(), + // search_DRAM_budget: Self::default_search_DRAM_budget(), + // build_DRAM_budget: Self::default_build_DRAM_budget(), num_threads: Self::default_num_threads(), max_degree: Self::default_max_degree(), - Lbuild: Self::default_Lbuild(), + l_build: Self::default_l_build(), + alpha: Self::default_alpha(), quantization: Default::default(), } } } pub struct DiskANNIndexing { - raw: Ivf, + raw: DiskANN, } -impl AbstractIndexing for IvfIndexing { +impl AbstractIndexing for DiskANNIndexing { fn create( path: PathBuf, options: IndexOptions, sealed: Vec>>, growing: Vec>>, ) -> Self { - let raw = Ivf::create(path, options, sealed, growing); + let raw = DiskANN::create(path, options, sealed, growing); Self { raw } } fn open(path: PathBuf, options: IndexOptions) -> Self { - let raw = Ivf::open(path, options); + let raw = DiskANN::open(path, options); Self { raw } } @@ -145,13 +166,21 @@ impl AbstractIndexing for IvfIndexing { self.raw.payload(i) } - fn search( + fn basic( &self, - k: usize, vector: &[S::Scalar], - gucs: SealedSearchGucs, - filter: &mut impl Filter, - ) -> Heap { - self.raw.search(k, vector, gucs.ivf_nprob, filter) + opts: &SearchOptions, + filter: impl Filter, + ) -> BinaryHeap> { + self.raw.search() + } + + fn vbase<'a>( + &'a self, + vector: &'a [S::Scalar], + opts: &'a SearchOptions, + filter: impl Filter + 'a, + ) -> (Vec, Box<(dyn Iterator + 'a)>) { + unimplemented!("DiskANN does not support vbase mode") } } diff --git a/crates/service/src/index/indexing/mod.rs b/crates/service/src/index/indexing/mod.rs index 274a79199..41667a910 100644 --- a/crates/service/src/index/indexing/mod.rs +++ b/crates/service/src/index/indexing/mod.rs @@ -188,6 +188,7 @@ impl DynamicIndexing { DynamicIndexing::Flat(x) => x.vbase(vector, opts, filter), DynamicIndexing::Ivf(x) => x.vbase(vector, opts, filter), DynamicIndexing::Hnsw(x) => x.vbase(vector, opts, filter), + DynamicIndexing::DiskANN(x) => x.vbase(vector, opts, filter), } } } diff --git a/crates/service/src/index/mod.rs b/crates/service/src/index/mod.rs index 0baad255f..c7f0b3562 100644 --- a/crates/service/src/index/mod.rs +++ b/crates/service/src/index/mod.rs @@ -68,6 +68,8 @@ pub struct SearchOptions { pub hnsw_ef_search: usize, #[validate(range(min = 1, max = 1_000_000))] pub ivf_nprobe: u32, + #[validate(range(min = 1, max = 65535))] + pub disk_ann_k: u32, } #[derive(Debug, Serialize, Deserialize)] From fc2d2b2494289da51231fe6584e08f05a2e09357 Mon Sep 17 00:00:00 2001 From: AveryQi115 Date: Mon, 15 Jan 2024 15:48:31 -0500 Subject: [PATCH 3/5] fix and format Signed-off-by: AveryQi115 --- crates/service/src/algorithms/diskann.rs | 86 ++++++++++---------- crates/service/src/algorithms/mod.rs | 2 +- crates/service/src/index/indexing/diskann.rs | 21 +++-- crates/service/src/index/indexing/mod.rs | 4 +- 4 files changed, 56 insertions(+), 57 deletions(-) diff --git a/crates/service/src/algorithms/diskann.rs b/crates/service/src/algorithms/diskann.rs index 8d6af325f..6766db0f7 100644 --- a/crates/service/src/algorithms/diskann.rs +++ b/crates/service/src/algorithms/diskann.rs @@ -4,17 +4,17 @@ use crate::index::segments::sealed::SealedSegment; use crate::index::{IndexOptions, SearchOptions, VectorOptions}; use crate::prelude::*; use crate::utils::dir_ops::sync_dir; -use crate::utils::mmap_array::MmapArray; use crate::utils::element_heap::ElementHeap; -use rayon::prelude::{IntoParallelIterator, ParallelIterator}; +use crate::utils::mmap_array::MmapArray; +use parking_lot::{RwLock, RwLockWriteGuard}; use rand::distributions::Uniform; +use rand::prelude::SliceRandom; use rand::Rng; +use rayon::prelude::{IntoParallelIterator, ParallelIterator}; use std::cmp::Reverse; use std::collections::BinaryHeap; -use std::fs::create_dir; use std::collections::{BTreeMap, HashSet}; -use rand::prelude::SliceRandom; -use parking_lot::{RwLock, RwLockWriteGuard}; +use std::fs::create_dir; use std::path::PathBuf; use std::sync::Arc; @@ -157,12 +157,12 @@ impl SearchState { } } -struct VertexNeighbor{ +struct VertexNeighbor { neighbors: Vec, } // DiskANNRam is for constructing the index -// it stores the intermediate structure when constructing +// it stores the intermediate structure when constructing // the index and these data are stored in memory pub struct DiskANNRam { raw: Arc>, @@ -186,7 +186,7 @@ pub struct DiskANNMmap { l: u32, } -impl DiskANNRam{ +impl DiskANNRam { fn _init_graph(&self, n: u32, mut rng: impl Rng) { let distribution = Uniform::new(0, n); for i in 0..n { @@ -206,11 +206,7 @@ impl DiskANNRam{ } } - fn _set_neighbors( - &self, - vertex_index: u32, - neighbor_ids: &HashSet, - ) { + fn _set_neighbors(&self, vertex_index: u32, neighbor_ids: &HashSet) { assert!(neighbor_ids.len() <= self.max_degree as usize); assert!((vertex_index as usize) < self.vertexs.len()); @@ -224,7 +220,7 @@ impl DiskANNRam{ fn _set_neighbors_with_write_guard( &self, neighbor_ids: &HashSet, - guard: &RwLockWriteGuard, + guard: &mut RwLockWriteGuard, ) { assert!(neighbor_ids.len() <= self.max_degree as usize); (*guard).neighbors.clear(); @@ -233,12 +229,11 @@ impl DiskANNRam{ } } - fn _get_neighbors( - &self, - vertex_index: u32, - ) -> VertexNeighbor { + fn _get_neighbors(&self, vertex_index: u32) -> VertexNeighbor { let vertex = self.vertexs[vertex_index as usize].read(); - *vertex + VertexNeighbor { + neighbors: vertex.neighbors.clone(), + } } fn _find_medoid(&self, n: u32) -> u32 { @@ -290,7 +285,7 @@ impl DiskANNRam{ let mut new_neighbor_ids: HashSet = HashSet::new(); { let mut guard = self.vertexs[id as usize].write(); - let neighbor_ids : Vec = (*guard).neighbors; + let neighbor_ids = &(*guard).neighbors; state.visited.extend(neighbor_ids.iter().map(|x| *x)); let neighbor_ids = self._robust_prune(id, state.visited, alpha, r); let neighbor_ids: HashSet = neighbor_ids.into_iter().collect(); @@ -301,9 +296,8 @@ impl DiskANNRam{ for &neighbor_id in new_neighbor_ids.iter() { { let mut guard = self.vertexs[neighbor_id as usize].write(); - let old_neighbors : Vec = (*guard).neighbors; - let mut old_neighbors: HashSet = - old_neighbors.iter().map(|x| *x).collect(); + let old_neighbors = &(*guard).neighbors; + let mut old_neighbors: HashSet = old_neighbors.iter().map(|x| *x).collect(); old_neighbors.insert(id); if old_neighbors.len() > r as usize { // need robust prune @@ -395,8 +389,8 @@ impl DiskANNRam{ } } -impl DiskANNMmap{ - fn _get_neighbors(self, id: u32) -> Vec{ +impl DiskANNMmap { + fn _get_neighbors(&self, id: u32) -> Vec { let start = self.neighbor_offset[id as usize]; let end = self.neighbor_offset[id as usize + 1]; self.neighbors[start..end].to_vec() @@ -434,9 +428,13 @@ pub fn make( let r = idx_opts.max_degree; let VectorOptions { dims, .. } = options.vector; - let vertexs: Vec> = (0..n).map(|_| { - RwLock::new(VertexNeighbor { neighbors: Vec::new() }) - }).collect(); + let vertexs: Vec> = (0..n) + .map(|_| { + RwLock::new(VertexNeighbor { + neighbors: Vec::new(), + }) + }) + .collect(); let medoid = 0; @@ -461,25 +459,27 @@ pub fn make( new_vamana._one_pass(n, 1.0, r, idx_opts.l_build, rng.clone()); new_vamana._one_pass(n, idx_opts.alpha, r, idx_opts.max_degree, rng.clone()); - + new_vamana } pub fn save(ram: DiskANNRam, path: PathBuf) -> DiskANNMmap { - - let neighbors_iter = ram.vertexs.iter() - .flat_map(|vertex| { - let vertex = vertex.read(); - vertex.neighbors.iter().cloned() - }); + let neighbors_iter = ram.vertexs.iter().flat_map(|vertex| { + let vertex = vertex.read(); + vertex + .neighbors + .iter() + .copied() + .collect::>() + .into_iter() + }); // Create the neighbors array using MmapArray::create. let neighbors = MmapArray::create(path.join("neighbors"), neighbors_iter); // Create an iterator for the size of each neighbor list. - let neighbor_offset_iter = { - let iter = ram.vertexs.iter() - .map(|vertex| { + let neighbor_offset_iter = { + let iter = ram.vertexs.iter().map(|vertex| { let vertex = vertex.read(); vertex.neighbors.len() }); @@ -492,14 +492,14 @@ pub fn save(ram: DiskANNRam, path: PathBuf) -> DiskANNMmap { let medoid_vec = vec![ram.medoid]; let medoid = MmapArray::create(path.join("medoid"), medoid_vec.into_iter()); - DiskANNMmap{ + DiskANNMmap { raw: ram.raw, neighbors, neighbor_offset, medoid, r: ram.max_degree, alpha: ram.alpha, - l: ram.l_build + l: ram.l_build, } } @@ -510,15 +510,15 @@ pub fn load(path: PathBuf, options: IndexOptions) -> DiskANNMmap { let neighbor_offset = MmapArray::open(path.join("neighbor_offset")); let medoid = MmapArray::open(path.join("medoid")); assert!(medoid.len() == 1); - - DiskANNMmap{ + + DiskANNMmap { raw, neighbors, neighbor_offset, medoid, r: idx_opts.max_degree, alpha: idx_opts.alpha, - l: idx_opts.l_build + l: idx_opts.l_build, } } diff --git a/crates/service/src/algorithms/mod.rs b/crates/service/src/algorithms/mod.rs index 215435c99..9c20f5655 100644 --- a/crates/service/src/algorithms/mod.rs +++ b/crates/service/src/algorithms/mod.rs @@ -1,7 +1,7 @@ pub mod clustering; +pub mod diskann; pub mod flat; pub mod hnsw; pub mod ivf; pub mod quantization; pub mod raw; -pub mod diskann; \ No newline at end of file diff --git a/crates/service/src/index/indexing/diskann.rs b/crates/service/src/index/indexing/diskann.rs index 74a6ef3bd..273260b44 100644 --- a/crates/service/src/index/indexing/diskann.rs +++ b/crates/service/src/index/indexing/diskann.rs @@ -7,11 +7,11 @@ use crate::index::IndexOptions; use crate::index::SearchOptions; use crate::prelude::*; use serde::{Deserialize, Serialize}; +use std::cmp::Reverse; +use std::collections::BinaryHeap; use std::path::PathBuf; use std::sync::Arc; use validator::Validate; -use std::cmp::Reverse; -use std::collections::BinaryHeap; #[derive(Debug, Clone, Serialize, Deserialize, Validate)] #[serde(deny_unknown_fields)] @@ -21,8 +21,8 @@ pub struct DiskANNIndexingOptions { // memory and the given memory to decide the quantization options. // // Current design is to let users define the ratio of PQ for in memory index. - // - // Besides, it is hard to estimate current memory usage as sealed segment and + // + // Besides, it is hard to estimate current memory usage as sealed segment and // growing segement are passed to RawMmap and RawRam. Different from the direct // calculation of the vector layout. @@ -32,16 +32,16 @@ pub struct DiskANNIndexingOptions { // #[serde(default = "DiskANNIndexingOptions::default_data_path")] // pub data_path: PathBuf, - // // DRAM budget in GB for searching the index to set the compressed level + // // DRAM budget in GB for searching the index to set the compressed level // // for data while search happens - + // // bound on the memory footprint of the index at search time in GB. Once built, // // the index will use up only the specified RAM limit, the rest will reside on disk. // // This will dictate how aggressively we compress the data vectors to store in memory. - // // Larger will yield better performance at search time. For an n point index, to use + // // Larger will yield better performance at search time. For an n point index, to use // // b byte PQ compressed representation in memory, use `B = ((n * b) / 2^30 + (250000*(4*R + sizeof(T)*ndim)) / 2^30)`. // // The second term in the summation is to allow some buffer for caching about 250,000 nodes from the graph in memory while serving. - // // If you are not sure about this term, add 0.25GB to the first term. + // // If you are not sure about this term, add 0.25GB to the first term. // #[serde(default = "DiskANNIndexingOptions::default_search_DRAM_budget")] // pub search_DRAM_budget: u32, @@ -54,7 +54,6 @@ pub struct DiskANNIndexingOptions { // // building the index in one shot. Allocate as much memory as your RAM allows. // #[serde(default = "DiskANNIndexingOptions::default_build_DRAM_budget")] // pub build_DRAM_budget: u32, - #[serde(default = "DiskANNIndexingOptions::default_num_threads")] pub num_threads: u32, @@ -81,7 +80,6 @@ pub struct DiskANNIndexingOptions { // TODO: filtered Lbuild (for filtered diskANN) // TODO: filter threshold (for filtered diskANN) // TODO: label type (for filtered diskANN) - #[serde(default)] #[validate] pub quantization: QuantizationOptions, @@ -172,9 +170,10 @@ impl AbstractIndexing for DiskANNIndexing { opts: &SearchOptions, filter: impl Filter, ) -> BinaryHeap> { - self.raw.search() + self.raw.basic(vector, opts, filter) } + #[allow(unused_variables)] fn vbase<'a>( &'a self, vector: &'a [S::Scalar], diff --git a/crates/service/src/index/indexing/mod.rs b/crates/service/src/index/indexing/mod.rs index 41667a910..e3b1e3de5 100644 --- a/crates/service/src/index/indexing/mod.rs +++ b/crates/service/src/index/indexing/mod.rs @@ -1,12 +1,12 @@ +pub mod diskann; pub mod flat; pub mod hnsw; pub mod ivf; -pub mod diskann; +use self::diskann::{DiskANNIndexing, DiskANNIndexingOptions}; use self::flat::{FlatIndexing, FlatIndexingOptions}; use self::hnsw::{HnswIndexing, HnswIndexingOptions}; use self::ivf::{IvfIndexing, IvfIndexingOptions}; -use self::diskann::{DiskANNIndexing, DiskANNIndexingOptions}; use super::segments::growing::GrowingSegment; use super::segments::sealed::SealedSegment; use super::IndexOptions; From fb499e407e00658b12fdb605b4151484a5c47658 Mon Sep 17 00:00:00 2001 From: AveryQi115 Date: Mon, 15 Jan 2024 15:55:21 -0500 Subject: [PATCH 4/5] fix warning Signed-off-by: AveryQi115 --- crates/service/src/algorithms/diskann.rs | 27 ++++++------------------ 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/crates/service/src/algorithms/diskann.rs b/crates/service/src/algorithms/diskann.rs index 6766db0f7..d8941226f 100644 --- a/crates/service/src/algorithms/diskann.rs +++ b/crates/service/src/algorithms/diskann.rs @@ -102,18 +102,16 @@ pub struct SearchState { heap: BinaryHeap>, heap_visited: HashSet, l: usize, - k: usize, } impl SearchState { /// Creates a new search state. - pub(crate) fn new(k: usize, l: usize) -> Self { + pub(crate) fn new(l: usize) -> Self { Self { visited: HashSet::new(), candidates: BTreeMap::new(), heap: BinaryHeap::new(), heap_visited: HashSet::new(), - k, l, } } @@ -172,7 +170,6 @@ pub struct DiskANNRam { medoid: u32, dims: u16, max_degree: u32, - alpha: f32, l_build: u32, } @@ -181,8 +178,6 @@ pub struct DiskANNMmap { neighbors: MmapArray, neighbor_offset: MmapArray, medoid: MmapArray, - r: u32, - alpha: f32, l: u32, } @@ -278,9 +273,10 @@ impl DiskANNRam { .for_each(|id| self.search_and_prune_for_one_vertex(id, alpha, r, l)); } + #[allow(unused_assignments)] fn search_and_prune_for_one_vertex(&self, id: u32, alpha: f32, r: u32, l: u32) { let query = self.raw.vector(id); - let mut state = self._greedy_search(self.medoid, query, 1, l as usize); + let mut state = self._greedy_search(self.medoid, query, l as usize); state.visited.remove(&id); // in case visited has id itself let mut new_neighbor_ids: HashSet = HashSet::new(); { @@ -311,14 +307,8 @@ impl DiskANNRam { } } - fn _greedy_search( - &self, - start: u32, - query: &[S::Scalar], - k: usize, - search_size: usize, - ) -> SearchState { - let mut state = SearchState::new(k, search_size); + fn _greedy_search(&self, start: u32, query: &[S::Scalar], search_size: usize) -> SearchState { + let mut state = SearchState::new(search_size); let dist = S::distance(query, self.raw.vector(start)); state.push(start, dist); @@ -444,7 +434,6 @@ pub fn make( medoid, dims, max_degree: idx_opts.max_degree, - alpha: idx_opts.alpha, l_build: idx_opts.l_build, }; @@ -497,8 +486,6 @@ pub fn save(ram: DiskANNRam, path: PathBuf) -> DiskANNMmap { neighbors, neighbor_offset, medoid, - r: ram.max_degree, - alpha: ram.alpha, l: ram.l_build, } } @@ -516,8 +503,6 @@ pub fn load(path: PathBuf, options: IndexOptions) -> DiskANNMmap { neighbors, neighbor_offset, medoid, - r: idx_opts.max_degree, - alpha: idx_opts.alpha, l: idx_opts.l_build, } } @@ -528,7 +513,7 @@ pub fn basic( k: u32, mut filter: impl Filter, ) -> BinaryHeap> { - let mut state = SearchState::new(k as usize, mmap.l as usize); + let mut state = SearchState::new(mmap.l as usize); let start = mmap.medoid[0]; let dist = S::distance(vector, mmap.raw.vector(start)); From 54ba5876c06a9ea03e13047029296cffb120f14b Mon Sep 17 00:00:00 2001 From: AveryQi115 Date: Mon, 15 Jan 2024 16:12:57 -0500 Subject: [PATCH 5/5] typo fix Signed-off-by: AveryQi115 --- crates/service/src/algorithms/diskann.rs | 21 ++++++++------------ crates/service/src/index/indexing/diskann.rs | 4 ++-- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/crates/service/src/algorithms/diskann.rs b/crates/service/src/algorithms/diskann.rs index d8941226f..973878430 100644 --- a/crates/service/src/algorithms/diskann.rs +++ b/crates/service/src/algorithms/diskann.rs @@ -218,9 +218,9 @@ impl DiskANNRam { guard: &mut RwLockWriteGuard, ) { assert!(neighbor_ids.len() <= self.max_degree as usize); - (*guard).neighbors.clear(); + guard.neighbors.clear(); for item in neighbor_ids.iter() { - (*guard).neighbors.push(*item); + guard.neighbors.push(*item); } } @@ -281,8 +281,8 @@ impl DiskANNRam { let mut new_neighbor_ids: HashSet = HashSet::new(); { let mut guard = self.vertexs[id as usize].write(); - let neighbor_ids = &(*guard).neighbors; - state.visited.extend(neighbor_ids.iter().map(|x| *x)); + let neighbor_ids = &guard.neighbors; + state.visited.extend(neighbor_ids.iter().copied()); let neighbor_ids = self._robust_prune(id, state.visited, alpha, r); let neighbor_ids: HashSet = neighbor_ids.into_iter().collect(); self._set_neighbors_with_write_guard(&neighbor_ids, &mut guard); @@ -292,8 +292,8 @@ impl DiskANNRam { for &neighbor_id in new_neighbor_ids.iter() { { let mut guard = self.vertexs[neighbor_id as usize].write(); - let old_neighbors = &(*guard).neighbors; - let mut old_neighbors: HashSet = old_neighbors.iter().map(|x| *x).collect(); + let old_neighbors = &guard.neighbors; + let mut old_neighbors: HashSet = old_neighbors.iter().copied().collect(); old_neighbors.insert(id); if old_neighbors.len() > r as usize { // need robust prune @@ -455,12 +455,7 @@ pub fn make( pub fn save(ram: DiskANNRam, path: PathBuf) -> DiskANNMmap { let neighbors_iter = ram.vertexs.iter().flat_map(|vertex| { let vertex = vertex.read(); - vertex - .neighbors - .iter() - .copied() - .collect::>() - .into_iter() + vertex.neighbors.to_vec().into_iter() }); // Create the neighbors array using MmapArray::create. @@ -541,7 +536,7 @@ pub fn basic( let mut results = ElementHeap::new(k as usize); for (distance, id) in state.candidates { results.push(Element { - distance: distance, + distance, payload: mmap.raw.payload(id), }); } diff --git a/crates/service/src/index/indexing/diskann.rs b/crates/service/src/index/indexing/diskann.rs index 273260b44..15651c82a 100644 --- a/crates/service/src/index/indexing/diskann.rs +++ b/crates/service/src/index/indexing/diskann.rs @@ -23,7 +23,7 @@ pub struct DiskANNIndexingOptions { // Current design is to let users define the ratio of PQ for in memory index. // // Besides, it is hard to estimate current memory usage as sealed segment and - // growing segement are passed to RawMmap and RawRam. Different from the direct + // growing segments are passed to RawMmap and RawRam. Different from the direct // calculation of the vector layout. // #[serde(default = "DiskANNIndexingOptions::default_index_path_prefix")] @@ -49,7 +49,7 @@ pub struct DiskANNIndexingOptions { // // Limit on the memory allowed for building the index in GB. // // If you specify a value less than what is required to build the index // // in one pass, the index is built using a divide and conquer approach so - // // that sub-graphs will fit in the RAM budget. The sub-graphs are overlayed + // // that sub-graphs will fit in the RAM budget. The sub-graphs are overlaid // // to build the overall index. This approach can be upto 1.5 times slower than // // building the index in one shot. Allocate as much memory as your RAM allows. // #[serde(default = "DiskANNIndexingOptions::default_build_DRAM_budget")]