From 9f76adf10ccd05a71575e139a510637492894f54 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Wed, 6 Apr 2022 07:45:05 -0700 Subject: [PATCH 01/19] init --- src/core/src/index/revindex.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/core/src/index/revindex.rs b/src/core/src/index/revindex.rs index c39fe79482..001165d8d1 100644 --- a/src/core/src/index/revindex.rs +++ b/src/core/src/index/revindex.rs @@ -15,6 +15,7 @@ use crate::index::Index; use crate::signature::{Signature, SigsTrait}; use crate::sketch::minhash::KmerMinHash; use crate::sketch::Sketch; +use crate::storage::ZipStorage; use crate::Error; use crate::HashIntoType; @@ -108,8 +109,9 @@ pub struct RevIndex { template: Sketch, colors: Colors, - //#[serde(skip)] - //storage: Option, + + #[serde(skip)] + storage: Option, } impl RevIndex { @@ -231,7 +233,7 @@ impl RevIndex { ref_sigs, template: template.clone(), colors, - // storage: Some(InnerStorage::new(MemStorage::default())), + storage: None, } } @@ -297,7 +299,7 @@ impl RevIndex { ref_sigs: search_sigs.into(), template: template.clone(), colors, - //storage: None, + storage: None, } } From 806d5cba3851f89936ff6c36ccfc83d27f83c2e8 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Sun, 10 Apr 2022 08:57:17 -0700 Subject: [PATCH 02/19] manifests and loading revindex from zipstorage --- Cargo.lock | 1 + src/core/Cargo.toml | 1 + src/core/src/errors.rs | 5 ++ src/core/src/index/revindex.rs | 132 ++++++++++++++++++++++++++++++++- src/core/src/lib.rs | 1 + src/core/src/manifest.rs | 46 ++++++++++++ 6 files changed, 183 insertions(+), 3 deletions(-) create mode 100644 src/core/src/manifest.rs diff --git a/Cargo.lock b/Cargo.lock index e094cd4bbf..bfb92b1c24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1026,6 +1026,7 @@ dependencies = [ "cfg-if", "counter", "criterion", + "csv", "finch", "fixedbitset", "getrandom", diff --git a/src/core/Cargo.toml b/src/core/Cargo.toml index 09bb7357d6..7e2a7d9ef4 100644 --- a/src/core/Cargo.toml +++ b/src/core/Cargo.toml @@ -48,6 +48,7 @@ vec-collections = "0.3.4" piz = "0.4.0" memmap2 = "0.5.0" ouroboros = "0.15.0" +csv = "1.1.6" [dev-dependencies] assert_matches = "1.3.0" diff --git a/src/core/src/errors.rs b/src/core/src/errors.rs index e09b3a1701..ec66dcf9b5 100644 --- a/src/core/src/errors.rs +++ b/src/core/src/errors.rs @@ -60,6 +60,9 @@ pub enum SourmashError { #[error(transparent)] IOError(#[from] std::io::Error), + #[error(transparent)] + CsvError(#[from] csv::Error), + #[cfg(not(all(target_arch = "wasm32", target_vendor = "unknown")))] #[error(transparent)] Panic(#[from] crate::ffi::utils::Panic), @@ -104,6 +107,7 @@ pub enum SourmashErrorCode { ParseInt = 100_003, SerdeError = 100_004, NifflerError = 100_005, + CsvError = 100_006, } #[cfg(not(all(target_arch = "wasm32", target_vendor = "unknown")))] @@ -130,6 +134,7 @@ impl SourmashErrorCode { SourmashError::IOError { .. } => SourmashErrorCode::Io, SourmashError::NifflerError { .. } => SourmashErrorCode::NifflerError, SourmashError::Utf8Error { .. } => SourmashErrorCode::Utf8Error, + SourmashError::CsvError { .. } => SourmashErrorCode::CsvError, } } } diff --git a/src/core/src/index/revindex.rs b/src/core/src/index/revindex.rs index 001165d8d1..355020dd29 100644 --- a/src/core/src/index/revindex.rs +++ b/src/core/src/index/revindex.rs @@ -12,10 +12,11 @@ use rayon::prelude::*; use crate::encodings::{Color, Colors, Idx}; use crate::index::Index; +use crate::manifest::Manifest; use crate::signature::{Signature, SigsTrait}; use crate::sketch::minhash::KmerMinHash; use crate::sketch::Sketch; -use crate::storage::ZipStorage; +use crate::storage::{Storage, ZipStorage}; use crate::Error; use crate::HashIntoType; @@ -237,6 +238,107 @@ impl RevIndex { } } + pub fn from_zipstorage( + storage: ZipStorage, + template: &Sketch, + threshold: usize, + queries: Option<&[KmerMinHash]>, + keep_sigs: bool, + ) -> Result { + // If threshold is zero, let's merge all queries and save time later + let merged_query = queries.and_then(|qs| Self::merge_queries(qs, threshold)); + + let processed_sigs = AtomicUsize::new(0); + + // Load manifest from zipstorage + let manifest = Manifest::from_reader(storage.load("SOURMASH-MANIFEST.csv")?.as_slice())?; + let search_sigs: Vec<_> = manifest + .internal_locations() + .map(|l| PathBuf::from(l)) + .collect(); + + #[cfg(feature = "parallel")] + let sig_iter = search_sigs.par_iter(); + + #[cfg(not(feature = "parallel"))] + let sig_iter = search_sigs.iter(); + + let filtered_sigs = sig_iter.enumerate().filter_map(|(dataset_id, filename)| { + let i = processed_sigs.fetch_add(1, Ordering::SeqCst); + if i % 1000 == 0 { + info!("Processed {} reference sigs", i); + } + + let sig_data = storage + .load( + filename + .to_str() + .expect(format!("error converting path {:?}", filename).as_str()), + ) + .expect(format!("error loading {:?}", filename).as_str()); + let search_sig = Signature::from_reader(sig_data.as_slice()) + .unwrap_or_else(|_| panic!("Error processing {:?}", filename)) + .swap_remove(0); + + RevIndex::map_hashes_colors( + dataset_id, + &search_sig, + queries, + &merged_query, + threshold, + template, + ) + }); + + #[cfg(feature = "parallel")] + let (hash_to_color, colors) = filtered_sigs.reduce( + || (HashToColor::new(), Colors::default()), + HashToColor::reduce_hashes_colors, + ); + + #[cfg(not(feature = "parallel"))] + let (hash_to_color, colors) = filtered_sigs.fold( + (HashToColor::new(), Colors::default()), + HashToColor::reduce_hashes_colors, + ); + + // TODO: build this together with hash_to_idx? + let ref_sigs = if keep_sigs { + #[cfg(feature = "parallel")] + let sigs_iter = search_sigs.par_iter(); + + #[cfg(not(feature = "parallel"))] + let sigs_iter = search_sigs.iter(); + + Some( + sigs_iter + .map(|ref_path| { + let sig_data = + storage + .load(ref_path.to_str().expect( + format!("error converting path {:?}", ref_path).as_str(), + )) + .expect(format!("error loading {:?}", ref_path).as_str()); + Signature::from_reader(sig_data.as_slice()) + .unwrap_or_else(|_| panic!("Error processing {:?}", ref_path)) + .swap_remove(0) + }) + .collect(), + ) + } else { + None + }; + + Ok(RevIndex { + hash_to_color, + sig_files: search_sigs.into(), + ref_sigs, + template: template.clone(), + colors, + storage: Some(storage), + }) + } + fn merge_queries(qs: &[KmerMinHash], threshold: usize) -> Option { if threshold == 0 { let mut merged = qs[0].clone(); @@ -395,8 +497,20 @@ impl RevIndex { let match_sig = if let Some(refsigs) = &self.ref_sigs { &refsigs[dataset_id as usize] } else { + let mut sig = if let Some(storage) = &self.storage { + let sig_data = storage + .load( + match_path + .to_str() + .expect(format!("error converting path {:?}", match_path).as_str()), + ) + .expect(format!("error loading {:?}", match_path).as_str()); + Signature::from_reader(sig_data.as_slice())? + } else { + Signature::from_path(&match_path)? + }; // TODO: remove swap_remove - ref_match = Signature::from_path(&match_path)?.swap_remove(0); + ref_match = sig.swap_remove(0); &ref_match }; @@ -540,8 +654,20 @@ impl RevIndex { let match_sig = if let Some(refsigs) = &self.ref_sigs { &refsigs[dataset_id as usize] } else { + let mut sig = if let Some(storage) = &self.storage { + let sig_data = storage + .load( + match_path + .to_str() + .expect(format!("error converting path {:?}", match_path).as_str()), + ) + .expect(format!("error loading {:?}", match_path).as_str()); + Signature::from_reader(sig_data.as_slice())? + } else { + Signature::from_path(&match_path)? + }; // TODO: remove swap_remove - ref_match = Signature::from_path(&match_path)?.swap_remove(0); + ref_match = sig.swap_remove(0); &ref_match }; diff --git a/src/core/src/lib.rs b/src/core/src/lib.rs index 14a25ab632..889558536a 100644 --- a/src/core/src/lib.rs +++ b/src/core/src/lib.rs @@ -26,6 +26,7 @@ pub mod prelude; pub mod cmd; +pub mod manifest; pub mod signature; pub mod sketch; pub mod storage; diff --git a/src/core/src/manifest.rs b/src/core/src/manifest.rs new file mode 100644 index 0000000000..41b4abe9f8 --- /dev/null +++ b/src/core/src/manifest.rs @@ -0,0 +1,46 @@ +use std::io::Read; + +use serde::Deserialize; + +use crate::Error; + +#[derive(Debug, Deserialize)] +struct Record { + internal_location: String, + /* + md5: String, + md5short: String, + ksize: String, + moltype: String, + num: String, + scaled: String, + n_hashes: String, + with_abundance: String, + name: String, + filename: String, + */ +} + +#[derive(Debug)] +pub struct Manifest { + records: Vec, +} + +impl Manifest { + pub fn from_reader(rdr: R) -> Result { + let mut records = vec![]; + + let mut rdr = csv::ReaderBuilder::new() + .comment(Some(b'#')) + .from_reader(rdr); + for result in rdr.deserialize() { + let record: Record = result?; + records.push(record); + } + Ok(Manifest { records }) + } + + pub fn internal_locations(&self) -> impl Iterator { + self.records.iter().map(|r| r.internal_location.as_str()) + } +} From cd7ca295d1c815279413505950d860a164c0c2b8 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Sun, 10 Apr 2022 13:23:28 -0700 Subject: [PATCH 03/19] start spliting revindex --- src/core/src/ffi/index/revindex.rs | 1 + src/core/src/index/revindex.rs | 94 +++++++++++++++++++----------- src/core/src/manifest.rs | 40 +++++++++++-- 3 files changed, 94 insertions(+), 41 deletions(-) diff --git a/src/core/src/ffi/index/revindex.rs b/src/core/src/ffi/index/revindex.rs index 3597121bce..0dd852392e 100644 --- a/src/core/src/ffi/index/revindex.rs +++ b/src/core/src/ffi/index/revindex.rs @@ -58,6 +58,7 @@ unsafe fn revindex_new_with_paths( .collect(); Some(queries_vec.as_ref()) }; + let revindex = RevIndex::new( search_sigs.as_ref(), &template, diff --git a/src/core/src/index/revindex.rs b/src/core/src/index/revindex.rs index 355020dd29..87b4000623 100644 --- a/src/core/src/index/revindex.rs +++ b/src/core/src/index/revindex.rs @@ -101,20 +101,30 @@ impl HashToColor { // https://davidkoloski.me/rkyv/ #[derive(Serialize, Deserialize)] pub struct RevIndex { + linear: LinearRevIndex, hash_to_color: HashToColor, + colors: Colors, +} - sig_files: Vec, +#[derive(Serialize, Deserialize)] +pub struct LinearRevIndex { + sig_files: Manifest, #[serde(skip)] ref_sigs: Option>, template: Sketch, - colors: Colors, #[serde(skip)] storage: Option, } +impl From for RevIndex { + fn from(v: LinearRevIndex) -> Self { + unimplemented!() + } +} + impl RevIndex { pub fn load>( index_path: P, @@ -228,13 +238,17 @@ impl RevIndex { None }; - RevIndex { - hash_to_color, + let linear = LinearRevIndex { sig_files: search_sigs.into(), - ref_sigs, template: template.clone(), - colors, + ref_sigs, storage: None, + }; + + RevIndex { + hash_to_color, + colors, + linear, } } @@ -329,13 +343,17 @@ impl RevIndex { None }; + let linear = LinearRevIndex { + sig_files: search_sigs.as_slice().into(), + template: template.clone(), + ref_sigs, + storage: Some(storage), + }; + Ok(RevIndex { hash_to_color, - sig_files: search_sigs.into(), - ref_sigs, - template: template.clone(), colors, - storage: Some(storage), + linear, }) } @@ -395,13 +413,17 @@ impl RevIndex { HashToColor::reduce_hashes_colors, ); + let linear = LinearRevIndex { + sig_files: Default::default(), + template: template.clone(), + ref_sigs: search_sigs.into(), + storage: None, + }; + RevIndex { hash_to_color, - sig_files: vec![], - ref_sigs: search_sigs.into(), - template: template.clone(), colors, - storage: None, + linear, } } @@ -464,7 +486,13 @@ impl RevIndex { for (dataset_id, size) in counter.most_common() { if size >= threshold { - matches.push(self.sig_files[dataset_id as usize].to_str().unwrap().into()); + matches.push( + self.linear.sig_files[dataset_id as usize] + .internal_location() + .to_str() + .unwrap() + .into(), + ); } else { break; }; @@ -485,19 +513,17 @@ impl RevIndex { let (dataset_id, size) = counter.most_common()[0]; match_size = if size >= threshold { size } else { break }; - let p; - let match_path = if self.sig_files.is_empty() { - p = PathBuf::new(); // TODO: Fix somehow? - &p + let match_path = if self.linear.sig_files.is_empty() { + PathBuf::new() } else { - &self.sig_files[dataset_id as usize] + self.linear.sig_files[dataset_id as usize].internal_location() }; let ref_match; - let match_sig = if let Some(refsigs) = &self.ref_sigs { + let match_sig = if let Some(refsigs) = &self.linear.ref_sigs { &refsigs[dataset_id as usize] } else { - let mut sig = if let Some(storage) = &self.storage { + let mut sig = if let Some(storage) = &self.linear.storage { let sig_data = storage .load( match_path @@ -515,7 +541,7 @@ impl RevIndex { }; let mut match_mh = None; - if let Some(Sketch::MinHash(mh)) = match_sig.select_sketch(&self.template) { + if let Some(Sketch::MinHash(mh)) = match_sig.select_sketch(&self.linear.template) { match_mh = Some(mh); } let match_mh = match_mh.expect("Couldn't find a compatible MinHash"); @@ -591,7 +617,7 @@ impl RevIndex { } pub fn template(&self) -> Sketch { - self.template.clone() + self.linear.template.clone() } // TODO: mh should be a sketch, or even a sig... @@ -642,19 +668,17 @@ impl RevIndex { for (dataset_id, size) in counter.most_common() { let match_size = if size >= threshold { size } else { break }; - let p; - let match_path = if self.sig_files.is_empty() { - p = PathBuf::new(); // TODO: Fix somehow? - &p + let match_path = if self.linear.sig_files.is_empty() { + PathBuf::new() } else { - &self.sig_files[dataset_id as usize] + self.linear.sig_files[dataset_id as usize].internal_location() }; let ref_match; - let match_sig = if let Some(refsigs) = &self.ref_sigs { + let match_sig = if let Some(refsigs) = &self.linear.ref_sigs { &refsigs[dataset_id as usize] } else { - let mut sig = if let Some(storage) = &self.storage { + let mut sig = if let Some(storage) = &self.linear.storage { let sig_data = storage .load( match_path @@ -672,7 +696,7 @@ impl RevIndex { }; let mut match_mh = None; - if let Some(Sketch::MinHash(mh)) = match_sig.select_sketch(&self.template) { + if let Some(Sketch::MinHash(mh)) = match_sig.select_sketch(&self.linear.template) { match_mh = Some(mh); } let match_mh = match_mh.unwrap(); @@ -749,15 +773,15 @@ impl<'a> Index<'a> for RevIndex { } fn len(&self) -> usize { - if let Some(refs) = &self.ref_sigs { + if let Some(refs) = &self.linear.ref_sigs { refs.len() } else { - self.sig_files.len() + self.linear.sig_files.len() } } fn signatures(&self) -> Vec { - if let Some(ref sigs) = self.ref_sigs { + if let Some(ref sigs) = self.linear.ref_sigs { sigs.to_vec() } else { unimplemented!() diff --git a/src/core/src/manifest.rs b/src/core/src/manifest.rs index 41b4abe9f8..f0ba45955b 100644 --- a/src/core/src/manifest.rs +++ b/src/core/src/manifest.rs @@ -1,11 +1,13 @@ use std::io::Read; +use std::ops::Deref; +use std::path::PathBuf; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use crate::Error; -#[derive(Debug, Deserialize)] -struct Record { +#[derive(Debug, Serialize, Deserialize)] +pub struct Record { internal_location: String, /* md5: String, @@ -13,19 +15,24 @@ struct Record { ksize: String, moltype: String, num: String, - scaled: String, - n_hashes: String, + scaled: String, n_hashes: String, with_abundance: String, name: String, filename: String, */ } -#[derive(Debug)] +#[derive(Debug, Default, Serialize, Deserialize)] pub struct Manifest { records: Vec, } +impl Record { + pub fn internal_location(&self) -> PathBuf { + self.internal_location.clone().into() + } +} + impl Manifest { pub fn from_reader(rdr: R) -> Result { let mut records = vec![]; @@ -44,3 +51,24 @@ impl Manifest { self.records.iter().map(|r| r.internal_location.as_str()) } } + +impl From<&[PathBuf]> for Manifest { + fn from(v: &[PathBuf]) -> Self { + Manifest { + records: v + .into_iter() + .map(|p| Record { + internal_location: p.to_str().unwrap().into(), + }) + .collect(), + } + } +} + +impl Deref for Manifest { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.records + } +} From 22ff25980f4fab865285b5bf1d860c2e9d360c1c Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Sun, 10 Apr 2022 14:40:32 -0700 Subject: [PATCH 04/19] refactor --- src/core/src/index/revindex.rs | 346 ++++++++++++++------------------- 1 file changed, 141 insertions(+), 205 deletions(-) diff --git a/src/core/src/index/revindex.rs b/src/core/src/index/revindex.rs index 87b4000623..09a801dcf0 100644 --- a/src/core/src/index/revindex.rs +++ b/src/core/src/index/revindex.rs @@ -119,9 +119,131 @@ pub struct LinearRevIndex { storage: Option, } -impl From for RevIndex { - fn from(v: LinearRevIndex) -> Self { - unimplemented!() +impl LinearRevIndex { + fn new( + sig_files: Manifest, + template: &Sketch, + keep_sigs: bool, + ref_sigs: Option>, + storage: Option, + ) -> Self { + let search_sigs: Vec<_> = sig_files + .internal_locations() + .map(|l| PathBuf::from(l)) + .collect(); + + let ref_sigs = if let Some(ref_sigs) = ref_sigs { + Some(ref_sigs) + } else { + if keep_sigs { + #[cfg(feature = "parallel")] + let sigs_iter = search_sigs.par_iter(); + + #[cfg(not(feature = "parallel"))] + let sigs_iter = search_sigs.iter(); + + Some( + sigs_iter + .map(|ref_path| { + if let Some(storage) = &storage { + let sig_data = storage + .load(ref_path.to_str().expect( + format!("error converting path {:?}", ref_path).as_str(), + )) + .expect(format!("error loading {:?}", ref_path).as_str()); + Signature::from_reader(sig_data.as_slice()) + .unwrap_or_else(|_| panic!("Error processing {:?}", ref_path)) + .swap_remove(0) + } else { + Signature::from_path(&ref_path) + .unwrap_or_else(|_| panic!("Error processing {:?}", ref_path)) + .swap_remove(0) + } + }) + .collect(), + ) + } else { + None + } + }; + + LinearRevIndex { + sig_files, + template: template.clone(), + ref_sigs, + storage, + } + } + + fn index( + self, + threshold: usize, + merged_query: Option, + queries: Option<&[KmerMinHash]>, + ) -> RevIndex { + let processed_sigs = AtomicUsize::new(0); + + let search_sigs: Vec<_> = self + .sig_files + .internal_locations() + .map(|l| PathBuf::from(l)) + .collect(); + + #[cfg(feature = "parallel")] + let sig_iter = search_sigs.par_iter(); + + #[cfg(not(feature = "parallel"))] + let sig_iter = search_sigs.iter(); + + let filtered_sigs = sig_iter.enumerate().filter_map(|(dataset_id, filename)| { + let i = processed_sigs.fetch_add(1, Ordering::SeqCst); + if i % 1000 == 0 { + info!("Processed {} reference sigs", i); + } + + let search_sig = if let Some(storage) = &self.storage { + let sig_data = storage + .load( + filename + .to_str() + .expect(format!("error converting path {:?}", filename).as_str()), + ) + .expect(format!("error loading {:?}", filename).as_str()); + + Signature::from_reader(sig_data.as_slice()) + } else { + Signature::from_path(&filename) + } + .unwrap_or_else(|_| panic!("Error processing {:?}", filename)) + .swap_remove(0); + + RevIndex::map_hashes_colors( + dataset_id, + &search_sig, + queries, + &merged_query, + threshold, + &self.template, + ) + }); + + #[cfg(feature = "parallel")] + let (hash_to_color, colors) = filtered_sigs.reduce( + || (HashToColor::new(), Colors::default()), + HashToColor::reduce_hashes_colors, + ); + + #[cfg(not(feature = "parallel"))] + let (hash_to_color, colors) = filtered_sigs.fold( + (HashToColor::new(), Colors::default()), + HashToColor::reduce_hashes_colors, + ); + + RevIndex { + hash_to_color, + colors, + linear: self, + } } } @@ -177,79 +299,8 @@ impl RevIndex { // If threshold is zero, let's merge all queries and save time later let merged_query = queries.and_then(|qs| Self::merge_queries(qs, threshold)); - let processed_sigs = AtomicUsize::new(0); - - #[cfg(feature = "parallel")] - let sig_iter = search_sigs.par_iter(); - - #[cfg(not(feature = "parallel"))] - let sig_iter = search_sigs.iter(); - - let filtered_sigs = sig_iter.enumerate().filter_map(|(dataset_id, filename)| { - let i = processed_sigs.fetch_add(1, Ordering::SeqCst); - if i % 1000 == 0 { - info!("Processed {} reference sigs", i); - } - - let search_sig = Signature::from_path(&filename) - .unwrap_or_else(|_| panic!("Error processing {:?}", filename)) - .swap_remove(0); - - RevIndex::map_hashes_colors( - dataset_id, - &search_sig, - queries, - &merged_query, - threshold, - template, - ) - }); - - #[cfg(feature = "parallel")] - let (hash_to_color, colors) = filtered_sigs.reduce( - || (HashToColor::new(), Colors::default()), - HashToColor::reduce_hashes_colors, - ); - - #[cfg(not(feature = "parallel"))] - let (hash_to_color, colors) = filtered_sigs.fold( - (HashToColor::new(), Colors::default()), - HashToColor::reduce_hashes_colors, - ); - - // TODO: build this together with hash_to_idx? - let ref_sigs = if keep_sigs { - #[cfg(feature = "parallel")] - let sigs_iter = search_sigs.par_iter(); - - #[cfg(not(feature = "parallel"))] - let sigs_iter = search_sigs.iter(); - - Some( - sigs_iter - .map(|ref_path| { - Signature::from_path(&ref_path) - .unwrap_or_else(|_| panic!("Error processing {:?}", ref_path)) - .swap_remove(0) - }) - .collect(), - ) - } else { - None - }; - - let linear = LinearRevIndex { - sig_files: search_sigs.into(), - template: template.clone(), - ref_sigs, - storage: None, - }; - - RevIndex { - hash_to_color, - colors, - linear, - } + let linear = LinearRevIndex::new(search_sigs.into(), template, keep_sigs, None, None); + linear.index(threshold, merged_query, queries) } pub fn from_zipstorage( @@ -262,8 +313,6 @@ impl RevIndex { // If threshold is zero, let's merge all queries and save time later let merged_query = queries.and_then(|qs| Self::merge_queries(qs, threshold)); - let processed_sigs = AtomicUsize::new(0); - // Load manifest from zipstorage let manifest = Manifest::from_reader(storage.load("SOURMASH-MANIFEST.csv")?.as_slice())?; let search_sigs: Vec<_> = manifest @@ -271,90 +320,15 @@ impl RevIndex { .map(|l| PathBuf::from(l)) .collect(); - #[cfg(feature = "parallel")] - let sig_iter = search_sigs.par_iter(); - - #[cfg(not(feature = "parallel"))] - let sig_iter = search_sigs.iter(); - - let filtered_sigs = sig_iter.enumerate().filter_map(|(dataset_id, filename)| { - let i = processed_sigs.fetch_add(1, Ordering::SeqCst); - if i % 1000 == 0 { - info!("Processed {} reference sigs", i); - } - - let sig_data = storage - .load( - filename - .to_str() - .expect(format!("error converting path {:?}", filename).as_str()), - ) - .expect(format!("error loading {:?}", filename).as_str()); - let search_sig = Signature::from_reader(sig_data.as_slice()) - .unwrap_or_else(|_| panic!("Error processing {:?}", filename)) - .swap_remove(0); - - RevIndex::map_hashes_colors( - dataset_id, - &search_sig, - queries, - &merged_query, - threshold, - template, - ) - }); - - #[cfg(feature = "parallel")] - let (hash_to_color, colors) = filtered_sigs.reduce( - || (HashToColor::new(), Colors::default()), - HashToColor::reduce_hashes_colors, - ); - - #[cfg(not(feature = "parallel"))] - let (hash_to_color, colors) = filtered_sigs.fold( - (HashToColor::new(), Colors::default()), - HashToColor::reduce_hashes_colors, + let linear = LinearRevIndex::new( + search_sigs.as_slice().into(), + template, + keep_sigs, + None, + Some(storage), ); - // TODO: build this together with hash_to_idx? - let ref_sigs = if keep_sigs { - #[cfg(feature = "parallel")] - let sigs_iter = search_sigs.par_iter(); - - #[cfg(not(feature = "parallel"))] - let sigs_iter = search_sigs.iter(); - - Some( - sigs_iter - .map(|ref_path| { - let sig_data = - storage - .load(ref_path.to_str().expect( - format!("error converting path {:?}", ref_path).as_str(), - )) - .expect(format!("error loading {:?}", ref_path).as_str()); - Signature::from_reader(sig_data.as_slice()) - .unwrap_or_else(|_| panic!("Error processing {:?}", ref_path)) - .swap_remove(0) - }) - .collect(), - ) - } else { - None - }; - - let linear = LinearRevIndex { - sig_files: search_sigs.as_slice().into(), - template: template.clone(), - ref_sigs, - storage: Some(storage), - }; - - Ok(RevIndex { - hash_to_color, - colors, - linear, - }) + Ok(linear.index(threshold, merged_query, queries)) } fn merge_queries(qs: &[KmerMinHash], threshold: usize) -> Option { @@ -378,53 +352,15 @@ impl RevIndex { // If threshold is zero, let's merge all queries and save time later let merged_query = queries.and_then(|qs| Self::merge_queries(qs, threshold)); - let processed_sigs = AtomicUsize::new(0); - - #[cfg(feature = "parallel")] - let sigs_iter = search_sigs.par_iter(); - #[cfg(not(feature = "parallel"))] - let sigs_iter = search_sigs.iter(); - - let filtered_sigs = sigs_iter.enumerate().filter_map(|(dataset_id, sig)| { - let i = processed_sigs.fetch_add(1, Ordering::SeqCst); - if i % 1000 == 0 { - info!("Processed {} reference sigs", i); - } - - RevIndex::map_hashes_colors( - dataset_id, - sig, - queries, - &merged_query, - threshold, - template, - ) - }); - - #[cfg(feature = "parallel")] - let (hash_to_color, colors) = filtered_sigs.reduce( - || (HashToColor::new(), Colors::default()), - HashToColor::reduce_hashes_colors, + let linear = LinearRevIndex::new( + Default::default(), + &template, + false, + search_sigs.into(), + None, ); - #[cfg(not(feature = "parallel"))] - let (hash_to_color, colors) = filtered_sigs.fold( - (HashToColor::new(), Colors::default()), - HashToColor::reduce_hashes_colors, - ); - - let linear = LinearRevIndex { - sig_files: Default::default(), - template: template.clone(), - ref_sigs: search_sigs.into(), - storage: None, - }; - - RevIndex { - hash_to_color, - colors, - linear, - } + linear.index(threshold, merged_query, queries) } fn map_hashes_colors( From 8f9101308a98b9f6c3de15de83fb266c8598b4a9 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Sun, 10 Apr 2022 15:19:57 -0700 Subject: [PATCH 05/19] clippy --- src/core/src/index/revindex.rs | 102 +++++++++++++++------------------ src/core/src/manifest.rs | 2 +- 2 files changed, 47 insertions(+), 57 deletions(-) diff --git a/src/core/src/index/revindex.rs b/src/core/src/index/revindex.rs index 09a801dcf0..c36dcc1bc4 100644 --- a/src/core/src/index/revindex.rs +++ b/src/core/src/index/revindex.rs @@ -127,44 +127,39 @@ impl LinearRevIndex { ref_sigs: Option>, storage: Option, ) -> Self { - let search_sigs: Vec<_> = sig_files - .internal_locations() - .map(|l| PathBuf::from(l)) - .collect(); + let search_sigs: Vec<_> = sig_files.internal_locations().map(PathBuf::from).collect(); let ref_sigs = if let Some(ref_sigs) = ref_sigs { Some(ref_sigs) + } else if keep_sigs { + #[cfg(feature = "parallel")] + let sigs_iter = search_sigs.par_iter(); + + #[cfg(not(feature = "parallel"))] + let sigs_iter = search_sigs.iter(); + + Some( + sigs_iter + .map(|ref_path| { + if let Some(storage) = &storage { + let sig_data = storage + .load(ref_path.to_str().unwrap_or_else(|| { + panic!("error converting path {:?}", ref_path) + })) + .unwrap_or_else(|_| panic!("error loading {:?}", ref_path)); + Signature::from_reader(sig_data.as_slice()) + .unwrap_or_else(|_| panic!("Error processing {:?}", ref_path)) + .swap_remove(0) + } else { + Signature::from_path(&ref_path) + .unwrap_or_else(|_| panic!("Error processing {:?}", ref_path)) + .swap_remove(0) + } + }) + .collect(), + ) } else { - if keep_sigs { - #[cfg(feature = "parallel")] - let sigs_iter = search_sigs.par_iter(); - - #[cfg(not(feature = "parallel"))] - let sigs_iter = search_sigs.iter(); - - Some( - sigs_iter - .map(|ref_path| { - if let Some(storage) = &storage { - let sig_data = storage - .load(ref_path.to_str().expect( - format!("error converting path {:?}", ref_path).as_str(), - )) - .expect(format!("error loading {:?}", ref_path).as_str()); - Signature::from_reader(sig_data.as_slice()) - .unwrap_or_else(|_| panic!("Error processing {:?}", ref_path)) - .swap_remove(0) - } else { - Signature::from_path(&ref_path) - .unwrap_or_else(|_| panic!("Error processing {:?}", ref_path)) - .swap_remove(0) - } - }) - .collect(), - ) - } else { - None - } + None }; LinearRevIndex { @@ -186,7 +181,7 @@ impl LinearRevIndex { let search_sigs: Vec<_> = self .sig_files .internal_locations() - .map(|l| PathBuf::from(l)) + .map(PathBuf::from) .collect(); #[cfg(feature = "parallel")] @@ -206,9 +201,9 @@ impl LinearRevIndex { .load( filename .to_str() - .expect(format!("error converting path {:?}", filename).as_str()), + .unwrap_or_else(|| panic!("error converting path {:?}", filename)), ) - .expect(format!("error loading {:?}", filename).as_str()); + .unwrap_or_else(|_| panic!("error loading {:?}", filename)); Signature::from_reader(sig_data.as_slice()) } else { @@ -315,10 +310,7 @@ impl RevIndex { // Load manifest from zipstorage let manifest = Manifest::from_reader(storage.load("SOURMASH-MANIFEST.csv")?.as_slice())?; - let search_sigs: Vec<_> = manifest - .internal_locations() - .map(|l| PathBuf::from(l)) - .collect(); + let search_sigs: Vec<_> = manifest.internal_locations().map(PathBuf::from).collect(); let linear = LinearRevIndex::new( search_sigs.as_slice().into(), @@ -354,7 +346,7 @@ impl RevIndex { let linear = LinearRevIndex::new( Default::default(), - &template, + template, false, search_sigs.into(), None, @@ -460,13 +452,12 @@ impl RevIndex { &refsigs[dataset_id as usize] } else { let mut sig = if let Some(storage) = &self.linear.storage { - let sig_data = storage - .load( - match_path - .to_str() - .expect(format!("error converting path {:?}", match_path).as_str()), - ) - .expect(format!("error loading {:?}", match_path).as_str()); + let sig_data = + storage + .load(match_path.to_str().unwrap_or_else(|| { + panic!("error converting path {:?}", match_path) + })) + .unwrap_or_else(|_| panic!("error loading {:?}", match_path)); Signature::from_reader(sig_data.as_slice())? } else { Signature::from_path(&match_path)? @@ -615,13 +606,12 @@ impl RevIndex { &refsigs[dataset_id as usize] } else { let mut sig = if let Some(storage) = &self.linear.storage { - let sig_data = storage - .load( - match_path - .to_str() - .expect(format!("error converting path {:?}", match_path).as_str()), - ) - .expect(format!("error loading {:?}", match_path).as_str()); + let sig_data = + storage + .load(match_path.to_str().unwrap_or_else(|| { + panic!("error converting path {:?}", match_path) + })) + .unwrap_or_else(|_| panic!("error loading {:?}", match_path)); Signature::from_reader(sig_data.as_slice())? } else { Signature::from_path(&match_path)? diff --git a/src/core/src/manifest.rs b/src/core/src/manifest.rs index f0ba45955b..ed646fc66c 100644 --- a/src/core/src/manifest.rs +++ b/src/core/src/manifest.rs @@ -56,7 +56,7 @@ impl From<&[PathBuf]> for Manifest { fn from(v: &[PathBuf]) -> Self { Manifest { records: v - .into_iter() + .iter() .map(|p| Record { internal_location: p.to_str().unwrap().into(), }) From 11b469c33cb7c9b9eff165275bec632cff13af70 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Sun, 10 Apr 2022 15:45:36 -0700 Subject: [PATCH 06/19] add test --- src/core/src/index/revindex.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/core/src/index/revindex.rs b/src/core/src/index/revindex.rs index c36dcc1bc4..6ab06074de 100644 --- a/src/core/src/index/revindex.rs +++ b/src/core/src/index/revindex.rs @@ -774,4 +774,25 @@ mod test { //assert_eq!(index.colors.len(), 3); assert_eq!(index.colors.len(), 7); } + + #[test] + fn revindex_from_zipstorage() { + let max_hash = max_hash_for_scaled(100); + let template = Sketch::MinHash( + KmerMinHash::builder() + .num(0u32) + .ksize(57) + .hash_function(crate::encodings::HashFunctions::murmur64_protein) + .max_hash(max_hash) + .build(), + ); + let storage = ZipStorage::from_file("../../tests/test-data/prot/protein.zip") + .expect("error loading zipfile"); + let index = RevIndex::from_zipstorage(storage, &template, 0, None, false) + .expect("error building from ziptorage"); + + // TODO: search too + + assert_eq!(index.colors.len(), 3); + } } From bf15111a2bc816940aabec5ee8ddf895cb55a9c9 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Mon, 11 Apr 2022 21:10:39 -0700 Subject: [PATCH 07/19] more linearindex --- src/core/src/index/revindex.rs | 464 ++++++++++++++++++++++++--------- 1 file changed, 346 insertions(+), 118 deletions(-) diff --git a/src/core/src/index/revindex.rs b/src/core/src/index/revindex.rs index 6ab06074de..e89f46ac75 100644 --- a/src/core/src/index/revindex.rs +++ b/src/core/src/index/revindex.rs @@ -240,6 +240,300 @@ impl LinearRevIndex { linear: self, } } + + pub fn counter_for_query(&self, query: &KmerMinHash) -> SigCounter { + let processed_sigs = AtomicUsize::new(0); + + // TODO: Some(ref_sigs) case + + let search_sigs: Vec<_> = self + .sig_files + .internal_locations() + .map(PathBuf::from) + .collect(); + + #[cfg(feature = "parallel")] + let sig_iter = search_sigs.par_iter(); + + #[cfg(not(feature = "parallel"))] + let sig_iter = search_sigs.iter(); + + let counters = sig_iter.enumerate().filter_map(|(dataset_id, filename)| { + let i = processed_sigs.fetch_add(1, Ordering::SeqCst); + if i % 1000 == 0 { + info!("Processed {} reference sigs", i); + } + + let search_sig = if let Some(storage) = &self.storage { + let sig_data = storage + .load( + filename + .to_str() + .unwrap_or_else(|| panic!("error converting path {:?}", filename)), + ) + .unwrap_or_else(|_| panic!("error loading {:?}", filename)); + + Signature::from_reader(sig_data.as_slice()) + } else { + Signature::from_path(&filename) + } + .unwrap_or_else(|_| panic!("Error processing {:?}", filename)) + .swap_remove(0); + + let mut search_mh = None; + if let Some(Sketch::MinHash(mh)) = search_sig.select_sketch(&self.template) { + search_mh = Some(mh); + }; + let search_mh = search_mh.expect("Couldn't find a compatible MinHash"); + + let (large_mh, small_mh) = if query.size() > search_mh.size() { + (query, search_mh) + } else { + (search_mh, query) + }; + + let (size, _) = small_mh + .intersection_size(large_mh) + .unwrap_or_else(|_| panic!("error computing intersection for {:?}", filename)); + + if size == 0 { + None + } else { + let mut counter: SigCounter = Default::default(); + counter[&(dataset_id as u64)] += size as usize; + Some(counter) + } + }); + + let reduce_counters = |mut a: SigCounter, b: SigCounter| { + a.extend(&b); + a + }; + + #[cfg(feature = "parallel")] + let counter = counters.reduce(|| SigCounter::new(), reduce_counters); + + #[cfg(not(feature = "parallel"))] + let counter = counters.fold(SigCounter::new(), reduce_counters); + + counter + } + + pub fn search( + &self, + counter: SigCounter, + similarity: bool, + threshold: usize, + ) -> Result, Box> { + let mut matches = vec![]; + if similarity { + unimplemented!("TODO: threshold correction") + } + + for (dataset_id, size) in counter.most_common() { + if size >= threshold { + matches.push( + self.sig_files[dataset_id as usize] + .internal_location() + .to_str() + .unwrap() + .into(), + ); + } else { + break; + }; + } + Ok(matches) + } + + fn gather_round( + &self, + dataset_id: u64, + match_size: usize, + query: &KmerMinHash, + round: usize, + ) -> Result { + let match_path = if self.sig_files.is_empty() { + PathBuf::new() + } else { + self.sig_files[dataset_id as usize].internal_location() + }; + let match_sig = self.sig_for_dataset(dataset_id as usize)?; + let result = self.stats_for_match(&match_sig, query, match_size, match_path, round)?; + Ok(result) + } + + fn sig_for_dataset(&self, dataset_id: usize) -> Result { + let match_path = if self.sig_files.is_empty() { + PathBuf::new() + } else { + self.sig_files[dataset_id as usize].internal_location() + }; + + let match_sig = if let Some(refsigs) = &self.ref_sigs { + refsigs[dataset_id as usize].clone() + } else { + let mut sig = if let Some(storage) = &self.storage { + let sig_data = storage + .load( + match_path + .to_str() + .unwrap_or_else(|| panic!("error converting path {:?}", match_path)), + ) + .unwrap_or_else(|_| panic!("error loading {:?}", match_path)); + Signature::from_reader(sig_data.as_slice())? + } else { + Signature::from_path(&match_path)? + }; + // TODO: remove swap_remove + sig.swap_remove(0) + }; + Ok(match_sig) + } + + fn stats_for_match( + &self, + match_sig: &Signature, + query: &KmerMinHash, + match_size: usize, + match_path: PathBuf, + gather_result_rank: usize, + ) -> Result { + let mut match_mh = None; + if let Some(Sketch::MinHash(mh)) = match_sig.select_sketch(&self.template) { + match_mh = Some(mh); + } + let match_mh = match_mh.expect("Couldn't find a compatible MinHash"); + + // Calculate stats + let f_orig_query = match_size as f64 / query.size() as f64; + let f_match = match_size as f64 / match_mh.size() as f64; + let filename = match_path.to_str().unwrap().into(); + let name = match_sig.name(); + let unique_intersect_bp = match_mh.scaled() as usize * match_size; + + let (intersect_orig, _) = match_mh.intersection_size(query)?; + let intersect_bp = (match_mh.scaled() as u64 * intersect_orig) as usize; + + let f_unique_to_query = intersect_orig as f64 / query.size() as f64; + let match_ = match_sig.clone(); + + // TODO: all of these + let f_unique_weighted = 0.; + let average_abund = 0; + let median_abund = 0; + let std_abund = 0; + let md5 = "".into(); + let f_match_orig = 0.; + let remaining_bp = 0; + + Ok(GatherResult { + intersect_bp, + f_orig_query, + f_match, + f_unique_to_query, + f_unique_weighted, + average_abund, + median_abund, + std_abund, + filename, + name, + md5, + match_, + f_match_orig, + unique_intersect_bp, + gather_result_rank, + remaining_bp, + }) + } + + pub fn gather( + &self, + mut counter: SigCounter, + threshold: usize, + query: &KmerMinHash, + ) -> Result, Box> { + let mut match_size = usize::max_value(); + let mut matches = vec![]; + + while match_size > threshold && !counter.is_empty() { + let (dataset_id, size) = counter.most_common()[0]; + if threshold == 0 && size == 0 { + break; + } + + match_size = if size >= threshold { + size + } else { + break; + }; + + let result = self.gather_round(dataset_id, match_size, query, matches.len())?; + + // Prepare counter for finding the next match by decrementing + // all hashes found in the current match in other datasets + // TODO: maybe par_iter? + let mut to_remove: HashSet = Default::default(); + to_remove.insert(dataset_id); + + for (dataset, value) in counter.iter_mut() { + let dataset_sig = self.sig_for_dataset(*dataset as usize)?; + let mut match_mh = None; + if let Some(Sketch::MinHash(mh)) = dataset_sig.select_sketch(&self.template) { + match_mh = Some(mh); + } + let match_mh = match_mh.expect("Couldn't find a compatible MinHash"); + + let (intersection, _) = query.intersection_size(match_mh)?; + if intersection as usize > *value { + to_remove.insert(*dataset); + } else { + *value -= intersection as usize; + }; + } + to_remove.iter().for_each(|dataset_id| { + counter.remove(&dataset_id); + }); + matches.push(result); + } + Ok(matches) + } +} + +impl<'a> Index<'a> for LinearRevIndex { + type Item = Signature; + + fn insert(&mut self, _node: Self::Item) -> Result<(), Error> { + unimplemented!() + } + + fn save>(&self, _path: P) -> Result<(), Error> { + unimplemented!() + } + + fn load>(_path: P) -> Result<(), Error> { + unimplemented!() + } + + fn len(&self) -> usize { + if let Some(refs) = &self.ref_sigs { + refs.len() + } else { + self.sig_files.len() + } + } + + fn signatures(&self) -> Vec { + if let Some(ref sigs) = self.ref_sigs { + sigs.to_vec() + } else { + unimplemented!() + } + } + + fn signature_refs(&self) -> Vec<&Self::Item> { + unimplemented!() + } } impl RevIndex { @@ -401,31 +695,22 @@ impl RevIndex { } } + pub fn counter_for_query(&self, query: &KmerMinHash) -> SigCounter { + query + .iter_mins() + .filter_map(|hash| self.hash_to_color.get(hash)) + .flat_map(|color| self.colors.indices(color)) + .cloned() + .collect() + } + pub fn search( &self, counter: SigCounter, similarity: bool, threshold: usize, ) -> Result, Box> { - let mut matches = vec![]; - if similarity { - unimplemented!("TODO: threshold correction") - } - - for (dataset_id, size) in counter.most_common() { - if size >= threshold { - matches.push( - self.linear.sig_files[dataset_id as usize] - .internal_location() - .to_str() - .unwrap() - .into(), - ); - } else { - break; - }; - } - Ok(matches) + self.linear.search(counter, similarity, threshold) } pub fn gather( @@ -440,109 +725,28 @@ impl RevIndex { while match_size > threshold && !counter.is_empty() { let (dataset_id, size) = counter.most_common()[0]; match_size = if size >= threshold { size } else { break }; - - let match_path = if self.linear.sig_files.is_empty() { - PathBuf::new() - } else { - self.linear.sig_files[dataset_id as usize].internal_location() - }; - - let ref_match; - let match_sig = if let Some(refsigs) = &self.linear.ref_sigs { - &refsigs[dataset_id as usize] - } else { - let mut sig = if let Some(storage) = &self.linear.storage { - let sig_data = - storage - .load(match_path.to_str().unwrap_or_else(|| { - panic!("error converting path {:?}", match_path) - })) - .unwrap_or_else(|_| panic!("error loading {:?}", match_path)); - Signature::from_reader(sig_data.as_slice())? - } else { - Signature::from_path(&match_path)? - }; - // TODO: remove swap_remove - ref_match = sig.swap_remove(0); - &ref_match - }; - - let mut match_mh = None; - if let Some(Sketch::MinHash(mh)) = match_sig.select_sketch(&self.linear.template) { - match_mh = Some(mh); - } - let match_mh = match_mh.expect("Couldn't find a compatible MinHash"); - - // Calculate stats - let f_orig_query = match_size as f64 / query.size() as f64; - let f_match = match_size as f64 / match_mh.size() as f64; - let filename = match_path.to_str().unwrap().into(); - let name = match_sig.name(); - let unique_intersect_bp = match_mh.scaled() as usize * match_size; - let gather_result_rank = matches.len(); - - let (intersect_orig, _) = match_mh.intersection_size(query)?; - let intersect_bp = (match_mh.scaled() as u64 * intersect_orig) as usize; - - let f_unique_to_query = intersect_orig as f64 / query.size() as f64; - let match_ = match_sig.clone(); - - // TODO: all of these - let f_unique_weighted = 0.; - let average_abund = 0; - let median_abund = 0; - let std_abund = 0; - let md5 = "".into(); - let f_match_orig = 0.; - let remaining_bp = 0; - - let result = GatherResult { - intersect_bp, - f_orig_query, - f_match, - f_unique_to_query, - f_unique_weighted, - average_abund, - median_abund, - std_abund, - filename, - name, - md5, - match_, - f_match_orig, - unique_intersect_bp, - gather_result_rank, - remaining_bp, - }; - matches.push(result); - - // Prepare counter for finding the next match by decrementing - // all hashes found in the current match in other datasets - for hash in match_mh.iter_mins() { - if let Some(color) = self.hash_to_color.get(hash) { - for dataset in self.colors.indices(color) { - counter.entry(*dataset).and_modify(|e| { - if *e > 0 { - *e -= 1 - } - }); + let result = self + .linear + .gather_round(dataset_id, match_size, query, matches.len())?; + if let Some(Sketch::MinHash(match_mh)) = + result.match_.select_sketch(&self.linear.template) + { + // Prepare counter for finding the next match by decrementing + // all hashes found in the current match in other datasets + for hash in match_mh.iter_mins() { + if let Some(color) = self.hash_to_color.get(hash) { + counter.subtract(self.colors.indices(color).cloned()); } } + counter.remove(&dataset_id); + matches.push(result); + } else { + unimplemented!() } - counter.remove(&dataset_id); } Ok(matches) } - pub fn counter_for_query(&self, query: &KmerMinHash) -> SigCounter { - query - .iter_mins() - .filter_map(|hash| self.hash_to_color.get(hash)) - .flat_map(|color| self.colors.indices(color)) - .cloned() - .collect() - } - pub fn template(&self) -> Sketch { self.linear.template.clone() } @@ -646,7 +850,7 @@ impl RevIndex { } } -#[derive(CopyGetters, Getters, Setters, Serialize, Deserialize, Debug)] +#[derive(CopyGetters, Getters, Setters, Serialize, Deserialize, Debug, PartialEq)] pub struct GatherResult { #[getset(get_copy = "pub")] intersect_bp: usize, @@ -791,8 +995,32 @@ mod test { let index = RevIndex::from_zipstorage(storage, &template, 0, None, false) .expect("error building from ziptorage"); - // TODO: search too - assert_eq!(index.colors.len(), 3); + + let query_sig = Signature::from_path( + "../../tests/test-data/prot/protein/GCA_001593925.1_ASM159392v1_protein.faa.gz.sig", + ) + .expect("Error processing query") + .swap_remove(0); + let mut query_mh = None; + if let Some(Sketch::MinHash(mh)) = query_sig.select_sketch(&template) { + query_mh = Some(mh); + } + let query_mh = query_mh.expect("Couldn't find a compatible MinHash"); + + let counter_rev = index.counter_for_query(query_mh); + let counter_lin = index.linear.counter_for_query(query_mh); + + let results_rev = index.search(counter_rev, false, 0).unwrap(); + let results_linear = index.linear.search(counter_lin, false, 0).unwrap(); + assert_eq!(results_rev, results_linear); + + let counter_rev = index.counter_for_query(query_mh); + let counter_lin = index.linear.counter_for_query(query_mh); + + let results_rev = index.gather(counter_rev, 0, query_mh).unwrap(); + let results_linear = index.linear.gather(counter_lin, 0, query_mh).unwrap(); + assert_eq!(results_rev.len(), 1); + assert_eq!(results_rev, results_linear); } } From eedd07c06f99bda95da1fa0785c14e07495f49ca Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Sat, 16 Apr 2022 09:51:48 -0700 Subject: [PATCH 08/19] init zipfilelinearindex in rust --- src/core/tests/storage.rs | 38 +++++++++++++++++ src/sourmash/index/__init__.py | 74 ++++++++++++++++++++++++---------- src/sourmash/sbt_storage.py | 2 +- 3 files changed, 92 insertions(+), 22 deletions(-) diff --git a/src/core/tests/storage.rs b/src/core/tests/storage.rs index 5a60e02fcc..f05a0bc148 100644 --- a/src/core/tests/storage.rs +++ b/src/core/tests/storage.rs @@ -42,3 +42,41 @@ fn zipstorage_list_sbts() -> Result<(), Box> { Ok(()) } + +#[cfg(feature = "parallel")] +#[test] +fn zipstorage_parallel_access() -> Result<(), Box> { + use std::io::BufReader; + + use rayon::prelude::*; + use sourmash::signature::{Signature, SigsTrait}; + use sourmash::sketch::minhash::KmerMinHash; + + let mut filename = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + filename.push("../../tests/test-data/v6.sbt.zip"); + + let zs = ZipStorage::new(filename.to_str().unwrap())?; + + let total_hashes: usize = [ + ".sbt.v3/f71e78178af9e45e6f1d87a0c53c465c", + ".sbt.v3/f0c834bc306651d2b9321fb21d3e8d8f", + ".sbt.v3/4e94e60265e04f0763142e20b52c0da1", + ".sbt.v3/6d6e87e1154e95b279e5e7db414bc37b", + ".sbt.v3/0107d767a345eff67ecdaed2ee5cd7ba", + ".sbt.v3/b59473c94ff2889eca5d7165936e64b3", + ".sbt.v3/60f7e23c24a8d94791cc7a8680c493f9", + ] + .par_iter() + .map(|path| { + let data = zs.load(path).unwrap(); + let sigs: Vec = serde_json::from_reader(&data[..]).expect("Loading error"); + sigs.iter() + .map(|v| v.sketches().iter().map(|mh| mh.size()).sum::()) + .sum::() + }) + .sum(); + + assert_eq!(total_hashes, 3500); + + Ok(()) +} diff --git a/src/sourmash/index/__init__.py b/src/sourmash/index/__init__.py index a90bc6f0ce..b5ca9a9e43 100644 --- a/src/sourmash/index/__init__.py +++ b/src/sourmash/index/__init__.py @@ -34,23 +34,54 @@ CounterGather - an ancillary class returned by the 'counter_gather()' method. """ +from __future__ import annotations + import os import sourmash from abc import abstractmethod, ABC -from collections import namedtuple, Counter - -from sourmash.search import (make_jaccard_search_query, - make_containment_query, - calc_threshold_from_bp) -from sourmash.manifest import CollectionManifest -from sourmash.logging import debug_literal -from sourmash.signature import load_signatures, save_signatures -from sourmash.minhash import (flatten_and_downsample_scaled, - flatten_and_downsample_num, - flatten_and_intersect_scaled) +from collections import Counter +from collections import defaultdict +from typing import NamedTuple, Optional, TypedDict, TYPE_CHECKING + +from ..search import (make_jaccard_search_query, + make_gather_query + calc_threshold_from_bp) +from ..manifest import CollectionManifest +from ..logging import debug_literal +from ..signature import load_signatures, save_signatures +from .._lowlevel import ffi, lib +from ..utils import RustObject, rustcall, decode_str, encode_str +from .. import SourmashSignature +from ..picklist import SignaturePicklist +from ..minhash import (flatten_and_downsample_scaled, + flatten_and_downsample_num, + flatten_and_intersect_scaled) + +if TYPE_CHECKING: + from typing_extensions import Unpack + + +class IndexSearchResult(NamedTuple): + """generic return tuple for Index.search and Index.gather""" + score: float + signature: SourmashSignature + location: str + + +class Selection(TypedDict): + ksize: Optional[int] + moltype: Optional[str] + num: Optional[int] + scaled: Optional[int] + containment: Optional[bool] + abund: Optional[bool] + picklist: Optional[SignaturePicklist] + + +# TypedDict can't have methods (it is a dict in runtime) +def _selection_as_rust(selection: Selection): + ... -# generic return tuple for Index.search and Index.gather -IndexSearchResult = namedtuple('Result', 'score, signature, location') class Index(ABC): # this will be removed soon; see sourmash#1894. @@ -308,8 +339,7 @@ def counter_gather(self, query, threshold_bp, **kwargs): return counter @abstractmethod - def select(self, ksize=None, moltype=None, scaled=None, num=None, - abund=None, containment=None): + def select(self, **kwargs: Unpack[Selection]): """Return Index containing only signatures that match requirements. Current arguments can be any or all of: @@ -409,7 +439,7 @@ def load(cls, location, filename=None): lidx = LinearIndex(si, filename=filename) return lidx - def select(self, **kwargs): + def select(self, **kwargs: Unpack[Selection]): """Return new LinearIndex containing only signatures that match req's. Does not raise ValueError, but may return an empty Index. @@ -480,7 +510,7 @@ def save(self, path): def load(cls, path): raise NotImplementedError - def select(self, **kwargs): + def select(self, **kwargs: Unpack[Selection]): """Return new object yielding only signatures that match req's. Does not raise ValueError, but may return an empty Index. @@ -495,7 +525,7 @@ def select(self, **kwargs): return LazyLinearIndex(self.db, selection_dict) -class ZipFileLinearIndex(Index): +class ZipFileLinearIndex(Index, RustObject): """\ A read-only collection of signatures in a zip file. @@ -505,6 +535,8 @@ class ZipFileLinearIndex(Index): """ is_database = True + #__dealloc_func__ = lib.zflinearindex_free + def __init__(self, storage, *, selection_dict=None, traverse_yield_all=False, manifest=None, use_manifest=True): self.storage = storage @@ -643,7 +675,7 @@ def signatures(self): if select(ss): yield ss - def select(self, **kwargs): + def select(self, **kwargs: Unpack[Selection]): "Select signatures in zip file based on ksize/moltype/etc." # if we have a manifest, run 'select' on the manifest. @@ -1054,7 +1086,7 @@ def load_from_pathlist(cls, filename): def save(self, *args): raise NotImplementedError - def select(self, **kwargs): + def select(self, **kwargs: Unpack[Selection]): "Run 'select' on the manifest." new_manifest = self.manifest.select_to_manifest(**kwargs) return MultiIndex(new_manifest, self.parent, @@ -1163,7 +1195,7 @@ def save(self, *args): def insert(self, *args): raise NotImplementedError - def select(self, **kwargs): + def select(self, **kwargs: Unpack[Selection]): "Run 'select' on the manifest." new_manifest = self.manifest.select_to_manifest(**kwargs) return StandaloneManifestIndex(new_manifest, self._location, diff --git a/src/sourmash/sbt_storage.py b/src/sourmash/sbt_storage.py index 398e11c877..e57f158f32 100644 --- a/src/sourmash/sbt_storage.py +++ b/src/sourmash/sbt_storage.py @@ -99,11 +99,11 @@ class ZipStorage(RustObject, Storage): __dealloc_func__ = lib.zipstorage_free def __init__(self, path, *, mode="r"): + path = os.path.abspath(path) if mode == "w": self.__inner = _RwZipStorage(path) else: self.__inner = None - path = os.path.abspath(path) self._objptr = rustcall(lib.zipstorage_new, to_bytes(path), len(path)) @staticmethod From a309040a2734c57836129123dde0c4198886f93e Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Sat, 16 Apr 2022 15:52:16 -0700 Subject: [PATCH 09/19] preparing ZFLinearIndex --- include/sourmash.h | 10 ++++ src/core/src/ffi/index/mod.rs | 7 +++ src/core/src/ffi/index/revindex.rs | 73 ++++++++++++++++++++++++++++-- src/core/src/ffi/manifest.rs | 9 ++++ src/core/src/ffi/mod.rs | 2 + src/core/src/ffi/picklist.rs | 9 ++++ src/core/src/index/mod.rs | 2 + src/core/src/index/revindex.rs | 2 +- src/core/src/lib.rs | 1 + src/core/src/picklist.rs | 1 + src/sourmash/index/__init__.py | 6 ++- 11 files changed, 117 insertions(+), 5 deletions(-) create mode 100644 src/core/src/ffi/manifest.rs create mode 100644 src/core/src/ffi/picklist.rs create mode 100644 src/core/src/picklist.rs diff --git a/include/sourmash.h b/include/sourmash.h index 1f116e681b..f9c2c6ded3 100644 --- a/include/sourmash.h +++ b/include/sourmash.h @@ -41,6 +41,7 @@ enum SourmashErrorCode { SOURMASH_ERROR_CODE_PARSE_INT = 100003, SOURMASH_ERROR_CODE_SERDE_ERROR = 100004, SOURMASH_ERROR_CODE_NIFFLER_ERROR = 100005, + SOURMASH_ERROR_CODE_CSV_ERROR = 100006, }; typedef uint32_t SourmashErrorCode; @@ -50,6 +51,8 @@ typedef struct SourmashHyperLogLog SourmashHyperLogLog; typedef struct SourmashKmerMinHash SourmashKmerMinHash; +typedef struct SourmashLinearIndex SourmashLinearIndex; + typedef struct SourmashNodegraph SourmashNodegraph; typedef struct SourmashRevIndex SourmashRevIndex; @@ -264,6 +267,13 @@ void kmerminhash_slice_free(uint64_t *ptr, uintptr_t insize); bool kmerminhash_track_abundance(const SourmashKmerMinHash *ptr); +void linearindex_free(SourmashLinearIndex *ptr); + +SourmashLinearIndex *linearindex_new(const SourmashZipStorage *storage_ptr, + const SourmashManifest *manifest_ptr, + const SourmashSelection *selection_ptr, + bool use_manifest); + void nodegraph_buffer_free(uint8_t *ptr, uintptr_t insize); bool nodegraph_count(SourmashNodegraph *ptr, uint64_t h); diff --git a/src/core/src/ffi/index/mod.rs b/src/core/src/ffi/index/mod.rs index 932a97b222..636c57d9c2 100644 --- a/src/core/src/ffi/index/mod.rs +++ b/src/core/src/ffi/index/mod.rs @@ -1,5 +1,6 @@ pub mod revindex; +use crate::index::Selection; use crate::signature::Signature; use crate::ffi::signature::SourmashSignature; @@ -35,3 +36,9 @@ pub unsafe extern "C" fn searchresult_signature( let result = SourmashSearchResult::as_rust(ptr); SourmashSignature::from_rust((result.1).clone()) } + +pub struct SourmashSelection; + +impl ForeignObject for SourmashSelection { + type RustObject = Selection; +} diff --git a/src/core/src/ffi/index/revindex.rs b/src/core/src/ffi/index/revindex.rs index 0dd852392e..aab548d666 100644 --- a/src/core/src/ffi/index/revindex.rs +++ b/src/core/src/ffi/index/revindex.rs @@ -1,15 +1,17 @@ use std::path::PathBuf; use std::slice; -use crate::index::revindex::RevIndex; +use crate::index::revindex::{LinearRevIndex, RevIndex}; use crate::index::Index; use crate::signature::{Signature, SigsTrait}; -use crate::sketch::minhash::KmerMinHash; +use crate::sketch::minhash::{max_hash_for_scaled, KmerMinHash}; use crate::sketch::Sketch; -use crate::ffi::index::SourmashSearchResult; +use crate::ffi::index::{SourmashSearchResult, SourmashSelection}; +use crate::ffi::manifest::SourmashManifest; use crate::ffi::minhash::SourmashKmerMinHash; use crate::ffi::signature::SourmashSignature; +use crate::ffi::storage::SourmashZipStorage; use crate::ffi::utils::{ForeignObject, SourmashStr}; pub struct SourmashRevIndex; @@ -249,3 +251,68 @@ unsafe fn revindex_signatures( Ok(Box::into_raw(b) as *mut *mut SourmashSignature) } } + +//-------------------------------------------------- + +pub struct SourmashLinearIndex; + +impl ForeignObject for SourmashLinearIndex { + type RustObject = LinearRevIndex; +} + +ffi_fn! { +unsafe fn linearindex_new( + storage_ptr: *mut SourmashZipStorage, + manifest_ptr: *mut SourmashManifest, + selection_ptr: *mut SourmashSelection, + use_manifest: bool, +) -> Result<*mut SourmashLinearIndex> { + let storage = SourmashZipStorage::into_rust(storage_ptr); + + let manifest = if manifest_ptr.is_null() { + if use_manifest { + todo!("load manifest from zipstorage") + } else { + todo!("throw error") + } + } else { + SourmashManifest::into_rust(manifest_ptr) + }; + + let _selection = if !selection_ptr.is_null() { + Some(SourmashSelection::into_rust(selection_ptr)) + } else { + None + }; + // TODO: how to extract a template? Probably from selection? + let max_hash = max_hash_for_scaled(100); + let template = Sketch::MinHash( + KmerMinHash::builder() + .num(0u32) + .ksize(57) + .hash_function(crate::encodings::HashFunctions::murmur64_protein) + .max_hash(max_hash) + .build(), + ); + + /* + def __init__(self, storage, *, selection_dict=None, + traverse_yield_all=False, manifest=None, use_manifest=True): + + sig_files: Manifest, + template: &Sketch, + keep_sigs: bool, + ref_sigs: Option>, + storage: Option, + */ + + let linear_index = LinearRevIndex::new(*manifest, &template, false, None, Some(*storage)); + + Ok(SourmashLinearIndex::from_rust(linear_index)) +} +} + +#[no_mangle] +pub unsafe extern "C" fn linearindex_free(ptr: *mut SourmashLinearIndex) { + SourmashLinearIndex::drop(ptr); +} diff --git a/src/core/src/ffi/manifest.rs b/src/core/src/ffi/manifest.rs new file mode 100644 index 0000000000..d307f7373b --- /dev/null +++ b/src/core/src/ffi/manifest.rs @@ -0,0 +1,9 @@ +use crate::manifest::Manifest; + +use crate::ffi::utils::ForeignObject; + +pub struct SourmashManifest; + +impl ForeignObject for SourmashManifest { + type RustObject = Manifest; +} diff --git a/src/core/src/ffi/mod.rs b/src/core/src/ffi/mod.rs index a67de37176..44e856001f 100644 --- a/src/core/src/ffi/mod.rs +++ b/src/core/src/ffi/mod.rs @@ -9,8 +9,10 @@ pub mod utils; pub mod cmd; pub mod hyperloglog; pub mod index; +pub mod manifest; pub mod minhash; pub mod nodegraph; +pub mod picklist; pub mod signature; pub mod storage; diff --git a/src/core/src/ffi/picklist.rs b/src/core/src/ffi/picklist.rs new file mode 100644 index 0000000000..c8a9790543 --- /dev/null +++ b/src/core/src/ffi/picklist.rs @@ -0,0 +1,9 @@ +use crate::picklist::Picklist; + +use crate::ffi::utils::ForeignObject; + +pub struct SourmashPicklist; + +impl ForeignObject for SourmashPicklist{ + type RustObject = Picklist; +} diff --git a/src/core/src/index/mod.rs b/src/core/src/index/mod.rs index 9ed78af93a..9e3e137cf5 100644 --- a/src/core/src/index/mod.rs +++ b/src/core/src/index/mod.rs @@ -29,6 +29,8 @@ use crate::Error; pub type MHBT = SBT, Signature>; +pub struct Selection; + /* FIXME: bring back after MQF works on macOS and Windows use cfg_if::cfg_if; cfg_if! { diff --git a/src/core/src/index/revindex.rs b/src/core/src/index/revindex.rs index e89f46ac75..e6cfbbc533 100644 --- a/src/core/src/index/revindex.rs +++ b/src/core/src/index/revindex.rs @@ -120,7 +120,7 @@ pub struct LinearRevIndex { } impl LinearRevIndex { - fn new( + pub fn new( sig_files: Manifest, template: &Sketch, keep_sigs: bool, diff --git a/src/core/src/lib.rs b/src/core/src/lib.rs index 889558536a..a52750535a 100644 --- a/src/core/src/lib.rs +++ b/src/core/src/lib.rs @@ -27,6 +27,7 @@ pub mod prelude; pub mod cmd; pub mod manifest; +pub mod picklist; pub mod signature; pub mod sketch; pub mod storage; diff --git a/src/core/src/picklist.rs b/src/core/src/picklist.rs new file mode 100644 index 0000000000..c6af66cc5b --- /dev/null +++ b/src/core/src/picklist.rs @@ -0,0 +1 @@ +pub struct Picklist; diff --git a/src/sourmash/index/__init__.py b/src/sourmash/index/__init__.py index b5ca9a9e43..4d5ed9bcc2 100644 --- a/src/sourmash/index/__init__.py +++ b/src/sourmash/index/__init__.py @@ -535,10 +535,13 @@ class ZipFileLinearIndex(Index, RustObject): """ is_database = True - #__dealloc_func__ = lib.zflinearindex_free + __dealloc_func__ = lib.linearindex_free def __init__(self, storage, *, selection_dict=None, traverse_yield_all=False, manifest=None, use_manifest=True): + self._objptr = rustcall(lib.linearindex_new, storage._get_objptr(), + ffi.NULL, ffi.NULL, use_manifest) + """ self.storage = storage self.selection_dict = selection_dict self.traverse_yield_all = traverse_yield_all @@ -558,6 +561,7 @@ def __init__(self, storage, *, selection_dict=None, assert not self.selection_dict, self.selection_dict if self.selection_dict: assert self.manifest is None + """ def _load_manifest(self): "Load a manifest if one exists" From 29a2e96b27128474d5e1a9f52509da22086056bb Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Wed, 20 Apr 2022 13:24:05 -0700 Subject: [PATCH 10/19] wip sigiter --- include/sourmash.h | 20 +++++- src/core/src/ffi/index/mod.rs | 22 ++++++ src/core/src/ffi/index/revindex.rs | 39 ++++++++-- src/core/src/index/revindex.rs | 15 ++++ src/core/src/manifest.rs | 4 +- src/sourmash/index/__init__.py | 110 +++++++++-------------------- src/sourmash/manifest.py | 4 ++ src/sourmash/utils.py | 7 ++ 8 files changed, 133 insertions(+), 88 deletions(-) diff --git a/include/sourmash.h b/include/sourmash.h index f9c2c6ded3..c092bec9b6 100644 --- a/include/sourmash.h +++ b/include/sourmash.h @@ -53,14 +53,20 @@ typedef struct SourmashKmerMinHash SourmashKmerMinHash; typedef struct SourmashLinearIndex SourmashLinearIndex; +typedef struct SourmashManifest SourmashManifest; + typedef struct SourmashNodegraph SourmashNodegraph; typedef struct SourmashRevIndex SourmashRevIndex; typedef struct SourmashSearchResult SourmashSearchResult; +typedef struct SourmashSelection SourmashSelection; + typedef struct SourmashSignature SourmashSignature; +typedef struct SourmashSignatureIter SourmashSignatureIter; + typedef struct SourmashZipStorage SourmashZipStorage; /** @@ -269,11 +275,17 @@ bool kmerminhash_track_abundance(const SourmashKmerMinHash *ptr); void linearindex_free(SourmashLinearIndex *ptr); -SourmashLinearIndex *linearindex_new(const SourmashZipStorage *storage_ptr, - const SourmashManifest *manifest_ptr, - const SourmashSelection *selection_ptr, +uint64_t linearindex_len(const SourmashLinearIndex *ptr); + +const SourmashManifest *linearindex_manifest(const SourmashLinearIndex *ptr); + +SourmashLinearIndex *linearindex_new(SourmashZipStorage *storage_ptr, + SourmashManifest *manifest_ptr, + SourmashSelection *selection_ptr, bool use_manifest); +SourmashSignatureIter *linearindex_signatures(const SourmashLinearIndex *ptr); + void nodegraph_buffer_free(uint8_t *ptr, uintptr_t insize); bool nodegraph_count(SourmashNodegraph *ptr, uint64_t h); @@ -397,6 +409,8 @@ void signature_set_mh(SourmashSignature *ptr, const SourmashKmerMinHash *other); void signature_set_name(SourmashSignature *ptr, const char *name); +const SourmashSignature *signatures_iter_next(SourmashSignatureIter *ptr); + SourmashSignature **signatures_load_buffer(const char *ptr, uintptr_t insize, bool _ignore_md5sum, diff --git a/src/core/src/ffi/index/mod.rs b/src/core/src/ffi/index/mod.rs index 636c57d9c2..07ccda22db 100644 --- a/src/core/src/ffi/index/mod.rs +++ b/src/core/src/ffi/index/mod.rs @@ -42,3 +42,25 @@ pub struct SourmashSelection; impl ForeignObject for SourmashSelection { type RustObject = Selection; } + +pub struct SignatureIterator { + iter: Box>, +} + +pub struct SourmashSignatureIter; + +impl ForeignObject for SourmashSignatureIter { + type RustObject = SignatureIterator; +} + +#[no_mangle] +pub unsafe extern "C" fn signatures_iter_next( + ptr: *mut SourmashSignatureIter, +) -> *const SourmashSignature { + let mut iterator = SourmashSignatureIter::into_rust(ptr); + + match iterator.iter.next() { + Some(sig) => SourmashSignature::from_rust(sig), + None => std::ptr::null(), + } +} diff --git a/src/core/src/ffi/index/revindex.rs b/src/core/src/ffi/index/revindex.rs index aab548d666..a4b7ef65a5 100644 --- a/src/core/src/ffi/index/revindex.rs +++ b/src/core/src/ffi/index/revindex.rs @@ -3,11 +3,15 @@ use std::slice; use crate::index::revindex::{LinearRevIndex, RevIndex}; use crate::index::Index; +use crate::manifest::Manifest; use crate::signature::{Signature, SigsTrait}; use crate::sketch::minhash::{max_hash_for_scaled, KmerMinHash}; use crate::sketch::Sketch; +use crate::storage::Storage; -use crate::ffi::index::{SourmashSearchResult, SourmashSelection}; +use crate::ffi::index::{ + SignatureIterator, SourmashSearchResult, SourmashSelection, SourmashSignatureIter, +}; use crate::ffi::manifest::SourmashManifest; use crate::ffi::minhash::SourmashKmerMinHash; use crate::ffi::signature::SourmashSignature; @@ -267,16 +271,17 @@ unsafe fn linearindex_new( selection_ptr: *mut SourmashSelection, use_manifest: bool, ) -> Result<*mut SourmashLinearIndex> { - let storage = SourmashZipStorage::into_rust(storage_ptr); + let storage = *SourmashZipStorage::into_rust(storage_ptr); let manifest = if manifest_ptr.is_null() { if use_manifest { - todo!("load manifest from zipstorage") + // Load manifest from zipstorage + Manifest::from_reader(storage.load("SOURMASH-MANIFEST.csv")?.as_slice())? } else { todo!("throw error") } } else { - SourmashManifest::into_rust(manifest_ptr) + *SourmashManifest::into_rust(manifest_ptr) }; let _selection = if !selection_ptr.is_null() { @@ -306,7 +311,7 @@ unsafe fn linearindex_new( storage: Option, */ - let linear_index = LinearRevIndex::new(*manifest, &template, false, None, Some(*storage)); + let linear_index = LinearRevIndex::new(manifest, &template, false, None, Some(storage)); Ok(SourmashLinearIndex::from_rust(linear_index)) } @@ -316,3 +321,27 @@ unsafe fn linearindex_new( pub unsafe extern "C" fn linearindex_free(ptr: *mut SourmashLinearIndex) { SourmashLinearIndex::drop(ptr); } + +#[no_mangle] +pub unsafe extern "C" fn linearindex_manifest( + ptr: *const SourmashLinearIndex, +) -> *const SourmashManifest { + let index = SourmashLinearIndex::as_rust(ptr); + SourmashManifest::from_rust(index.manifest()) +} + +#[no_mangle] +pub unsafe extern "C" fn linearindex_len(ptr: *const SourmashLinearIndex) -> u64 { + let index = SourmashLinearIndex::as_rust(ptr); + index.len() as u64 +} + +#[no_mangle] +pub unsafe extern "C" fn linearindex_signatures( + ptr: *const SourmashLinearIndex, +) -> *mut SourmashSignatureIter { + let index = SourmashLinearIndex::as_rust(ptr); + + let iter = Box::new(index.signatures_iter()); + SourmashSignatureIter::from_rust(SignatureIterator { iter }) +} diff --git a/src/core/src/index/revindex.rs b/src/core/src/index/revindex.rs index e6cfbbc533..4aeda3a7ee 100644 --- a/src/core/src/index/revindex.rs +++ b/src/core/src/index/revindex.rs @@ -498,6 +498,21 @@ impl LinearRevIndex { } Ok(matches) } + + pub fn manifest(&self) -> Manifest { + self.sig_files.clone() + } + + pub fn signatures_iter(&self) -> impl Iterator + '_ { + if let Some(_sigs) = &self.ref_sigs { + //sigs.iter().cloned() + todo!("this works, but need to match return types") + } else { + // FIXME temp solution, must find better one! + (0..self.sig_files.len()) + .map(move |dataset_id| self.sig_for_dataset(dataset_id).expect("error loading sig")) + } + } } impl<'a> Index<'a> for LinearRevIndex { diff --git a/src/core/src/manifest.rs b/src/core/src/manifest.rs index ed646fc66c..a7c12e863b 100644 --- a/src/core/src/manifest.rs +++ b/src/core/src/manifest.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use crate::Error; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct Record { internal_location: String, /* @@ -22,7 +22,7 @@ pub struct Record { */ } -#[derive(Debug, Default, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize, Clone)] pub struct Manifest { records: Vec, } diff --git a/src/sourmash/index/__init__.py b/src/sourmash/index/__init__.py index 4d5ed9bcc2..c91cd42d0e 100644 --- a/src/sourmash/index/__init__.py +++ b/src/sourmash/index/__init__.py @@ -42,6 +42,7 @@ from collections import Counter from collections import defaultdict from typing import NamedTuple, Optional, TypedDict, TYPE_CHECKING +import weakref from ..search import (make_jaccard_search_query, make_gather_query @@ -539,67 +540,48 @@ class ZipFileLinearIndex(Index, RustObject): def __init__(self, storage, *, selection_dict=None, traverse_yield_all=False, manifest=None, use_manifest=True): - self._objptr = rustcall(lib.linearindex_new, storage._get_objptr(), - ffi.NULL, ffi.NULL, use_manifest) - """ - self.storage = storage - self.selection_dict = selection_dict - self.traverse_yield_all = traverse_yield_all - self.use_manifest = use_manifest + self._selection_dict = selection_dict + self._traverse_yield_all = traverse_yield_all + self._use_manifest = use_manifest + + # Taking ownership of the storage + storage_ptr = storage._take_objptr() + + manifest_ptr = ffi.NULL # do we have a manifest already? if not, try loading. if use_manifest: if manifest is not None: debug_literal('ZipFileLinearIndex using passed-in manifest') - self.manifest = manifest - else: - self._load_manifest() - else: - self.manifest = None + manifest_ptr = manifest._as_rust()._take_objptr() + + selection_ptr = ffi.NULL + self._objptr = rustcall(lib.linearindex_new, storage_ptr, + manifest_ptr, selection_ptr, use_manifest) + + """ if self.manifest is not None: assert not self.selection_dict, self.selection_dict if self.selection_dict: assert self.manifest is None """ - def _load_manifest(self): - "Load a manifest if one exists" - try: - manifest_data = self.storage.load('SOURMASH-MANIFEST.csv') - except (KeyError, FileNotFoundError): - self.manifest = None - else: - debug_literal(f'found manifest on load for {self.storage.path}') + @property + def manifest(self): + return CollectionManifest._from_rust(self._methodcall(lib.linearindex_manifest)) - # load manifest! - from io import StringIO - manifest_data = manifest_data.decode('utf-8') - manifest_fp = StringIO(manifest_data) - self.manifest = CollectionManifest.load_from_csv(manifest_fp) + @manifest.setter + def manifest(self, value): + raise NotImplementedError() def __bool__(self): "Are there any matching signatures in this zipfile? Avoid calling len." - try: - next(iter(self.signatures())) - except StopIteration: - return False - - return True + return self._methodcall(lib.linearindex_len) > 0 def __len__(self): "calculate number of signatures." - - # use manifest, if available. - m = self.manifest - if self.manifest is not None: - return len(m) - - # otherwise, iterate across all signatures. - n = 0 - for _ in self.signatures(): - n += 1 - return n + return self._methodcall(lib.linearindex_len) @property def location(self): @@ -642,42 +624,14 @@ def _signatures_with_internal(self): def signatures(self): "Load all signatures in the zip file." - selection_dict = self.selection_dict - manifest = None - if self.manifest is not None: - manifest = self.manifest - assert not selection_dict - - # yield all signatures found in manifest - for filename in manifest.locations(): - data = self.storage.load(filename) - for ss in load_signatures(data): - # in case multiple signatures are in the file, check - # to make sure we want to return each one. - if ss in manifest: - yield ss - - # no manifest! iterate. - else: - storage = self.storage - # if no manifest here, break Storage class encapsulation - # and go for all the files. (This is necessary to support - # ad-hoc zipfiles that have no manifests.) - for filename in storage._filenames(): - # should we load this file? if it ends in .sig OR force: - if filename.endswith('.sig') or \ - filename.endswith('.sig.gz') or \ - self.traverse_yield_all: - if selection_dict: - select = lambda x: select_signature(x, - **selection_dict) - else: - select = lambda x: True - - data = self.storage.load(filename) - for ss in load_signatures(data): - if select(ss): - yield ss + attached_refs = weakref.WeakKeyDictionary() + iterator = self._methodcall(lib.linearindex_signatures) + + next_sig = rustcall(lib.signatures_iter_next, iterator) + while next_sig != ffi.NULL: + attached_refs[next_sig] = iterator + yield SourmashSignature._from_objptr(next_sig) + next_sig = rustcall(lib.signatures_iter_next, iterator) def select(self, **kwargs: Unpack[Selection]): "Select signatures in zip file based on ksize/moltype/etc." diff --git a/src/sourmash/manifest.py b/src/sourmash/manifest.py index e2431dde36..c151263c7d 100644 --- a/src/sourmash/manifest.py +++ b/src/sourmash/manifest.py @@ -336,3 +336,7 @@ def to_picklist(self): picklist.pickset = set(self._md5_set) return picklist + + @staticmethod + def _from_rust(value): + raise NotImplementedError() diff --git a/src/sourmash/utils.py b/src/sourmash/utils.py index 71afc20261..acb4b73d7a 100644 --- a/src/sourmash/utils.py +++ b/src/sourmash/utils.py @@ -29,6 +29,13 @@ def _get_objptr(self): raise RuntimeError("Object is closed") return self._objptr + def _take_objptr(self): + if not self._objptr: + raise RuntimeError("Object is closed") + ret = self._objptr + self._objptr = None + return ret + def __del__(self): if self._objptr is None or self._shared: return From a456a78d201e04ed85844b6816b01eff1af61cdb Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Thu, 28 Apr 2022 17:26:50 -0700 Subject: [PATCH 11/19] Remove generic from SigStore --- src/core/src/index/bigsi.rs | 2 +- src/core/src/index/linear.rs | 32 +++++++------------ src/core/src/index/mod.rs | 36 ++++++++++----------- src/core/src/index/sbt/mhbt.rs | 4 +-- src/core/src/index/sbt/mod.rs | 57 ++++++++++++++++------------------ 5 files changed, 59 insertions(+), 72 deletions(-) diff --git a/src/core/src/index/bigsi.rs b/src/core/src/index/bigsi.rs index 0e45348fc7..ddae12f064 100644 --- a/src/core/src/index/bigsi.rs +++ b/src/core/src/index/bigsi.rs @@ -191,7 +191,7 @@ mod test { .unwrap(); let sig_data = sigs[0].clone(); - let leaf: SigStore<_> = sig_data.into(); + let leaf: SigStore = sig_data.into(); for l in datasets { bigsi.insert(l).expect("insertion error!"); diff --git a/src/core/src/index/linear.rs b/src/core/src/index/linear.rs index c6e14b64f6..680c55cc21 100644 --- a/src/core/src/index/linear.rs +++ b/src/core/src/index/linear.rs @@ -6,18 +6,18 @@ use std::path::PathBuf; use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; -use crate::index::{Comparable, DatasetInfo, Index, SigStore}; +use crate::index::{DatasetInfo, Index, SigStore}; use crate::prelude::*; use crate::storage::{FSStorage, InnerStorage, Storage, StorageInfo}; use crate::Error; #[derive(TypedBuilder)] -pub struct LinearIndex { +pub struct LinearIndex { #[builder(default)] storage: Option, #[builder(default)] - datasets: Vec>, + datasets: Vec, } #[derive(Serialize, Deserialize)] @@ -27,15 +27,11 @@ struct LinearInfo { leaves: Vec, } -impl<'a, L> Index<'a> for LinearIndex -where - L: Clone + Comparable + 'a, - SigStore: From, -{ - type Item = L; +impl<'a> Index<'a> for LinearIndex { + type Item = Signature; //type SignatureIterator = std::slice::Iter<'a, Self::Item>; - fn insert(&mut self, node: L) -> Result<(), Error> { + fn insert(&mut self, node: Self::Item) -> Result<(), Error> { self.datasets.push(node.into()); Ok(()) } @@ -76,11 +72,7 @@ where */ } -impl LinearIndex -where - L: ToWriter, - SigStore: ReadData, -{ +impl LinearIndex { pub fn save_file>( &mut self, path: P, @@ -115,7 +107,7 @@ where .iter_mut() .map(|l| { // Trigger data loading - let _: &L = (*l).data().unwrap(); + let _: &Signature = (*l).data().unwrap(); // set storage to new one l.storage = Some(storage.clone()); @@ -137,7 +129,7 @@ where Ok(()) } - pub fn from_path>(path: P) -> Result, Error> { + pub fn from_path>(path: P) -> Result { let file = File::open(&path)?; let mut reader = BufReader::new(file); @@ -147,11 +139,11 @@ where basepath.push(path); basepath.canonicalize()?; - let linear = LinearIndex::::from_reader(&mut reader, &basepath.parent().unwrap())?; + let linear = LinearIndex::from_reader(&mut reader, &basepath.parent().unwrap())?; Ok(linear) } - pub fn from_reader(rdr: R, path: P) -> Result, Error> + pub fn from_reader(rdr: R, path: P) -> Result where R: Read, P: AsRef, @@ -171,7 +163,7 @@ where .leaves .into_iter() .map(|l| { - let mut v: SigStore = l.into(); + let mut v: SigStore = l.into(); v.storage = Some(storage.clone()); v }) diff --git a/src/core/src/index/mod.rs b/src/core/src/index/mod.rs index 9e3e137cf5..4a1d62497b 100644 --- a/src/core/src/index/mod.rs +++ b/src/core/src/index/mod.rs @@ -27,7 +27,7 @@ use crate::sketch::Sketch; use crate::storage::{InnerStorage, Storage}; use crate::Error; -pub type MHBT = SBT, Signature>; +pub type MHBT = SBT>; pub struct Selection; @@ -134,7 +134,7 @@ pub struct DatasetInfo { } #[derive(TypedBuilder, Default, Clone)] -pub struct SigStore { +pub struct SigStore { #[builder(setter(into))] filename: String, @@ -147,16 +147,16 @@ pub struct SigStore { storage: Option, #[builder(setter(into), default)] - data: OnceCell, + data: OnceCell, } -impl SigStore { +impl SigStore { pub fn name(&self) -> String { self.name.clone() } } -impl std::fmt::Debug for SigStore { +impl std::fmt::Debug for SigStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, @@ -166,7 +166,7 @@ impl std::fmt::Debug for SigStore { } } -impl ReadData for SigStore { +impl ReadData for SigStore { fn data(&self) -> Result<&Signature, Error> { if let Some(sig) = self.data.get() { Ok(sig) @@ -190,8 +190,8 @@ impl ReadData for SigStore { } } -impl SigStore { - pub fn count_common(&self, other: &SigStore) -> u64 { +impl SigStore { + pub fn count_common(&self, other: &SigStore) -> u64 { let ng: &Signature = self.data().unwrap(); let ong: &Signature = other.data().unwrap(); @@ -218,13 +218,13 @@ impl SigStore { } } -impl From> for Signature { - fn from(other: SigStore) -> Signature { +impl From for Signature { + fn from(other: SigStore) -> Signature { other.data.get().unwrap().to_owned() } } -impl Deref for SigStore { +impl Deref for SigStore { type Target = Signature; fn deref(&self) -> &Signature { @@ -232,8 +232,8 @@ impl Deref for SigStore { } } -impl From for SigStore { - fn from(other: Signature) -> SigStore { +impl From for SigStore { + fn from(other: Signature) -> SigStore { let name = other.name(); let filename = other.filename(); @@ -247,8 +247,8 @@ impl From for SigStore { } } -impl Comparable> for SigStore { - fn similarity(&self, other: &SigStore) -> f64 { +impl Comparable for SigStore { + fn similarity(&self, other: &SigStore) -> f64 { let ng: &Signature = self.data().unwrap(); let ong: &Signature = other.data().unwrap(); @@ -271,7 +271,7 @@ impl Comparable> for SigStore { unimplemented!() } - fn containment(&self, other: &SigStore) -> f64 { + fn containment(&self, other: &SigStore) -> f64 { let ng: &Signature = self.data().unwrap(); let ong: &Signature = other.data().unwrap(); @@ -323,8 +323,8 @@ impl Comparable for Signature { } } -impl From for SigStore { - fn from(other: DatasetInfo) -> SigStore { +impl From for SigStore { + fn from(other: DatasetInfo) -> SigStore { SigStore { filename: other.filename, name: other.name, diff --git a/src/core/src/index/sbt/mhbt.rs b/src/core/src/index/sbt/mhbt.rs index 2d4ceb3fb8..d2a51c3286 100644 --- a/src/core/src/index/sbt/mhbt.rs +++ b/src/core/src/index/sbt/mhbt.rs @@ -19,7 +19,7 @@ impl ToWriter for Nodegraph { } } -impl FromFactory> for SBT, L> { +impl FromFactory> for SBT> { fn factory(&self, name: &str) -> Result, Error> { match self.factory { Factory::GraphFactory { args: (k, t, n) } => { @@ -279,7 +279,7 @@ mod test { )?; let sig_data = sigs[0].clone(); - let leaf: SigStore<_> = sig_data.into(); + let leaf: SigStore = sig_data.into(); let results = sbt.find(search_minhashes, &leaf, 0.5)?; assert_eq!(results.len(), 1); diff --git a/src/core/src/index/sbt/mod.rs b/src/core/src/index/sbt/mod.rs index 4f4a7b82f9..67bb1396f2 100644 --- a/src/core/src/index/sbt/mod.rs +++ b/src/core/src/index/sbt/mod.rs @@ -29,7 +29,7 @@ use crate::storage::{FSStorage, InnerStorage, StorageInfo}; use crate::Error; #[derive(TypedBuilder)] -pub struct SBT { +pub struct SBT { #[builder(default = 2)] d: u32, @@ -43,7 +43,7 @@ pub struct SBT { nodes: HashMap, #[builder(default = HashMap::default())] - leaves: HashMap>, + leaves: HashMap, } const fn parent(pos: u64, d: u64) -> u64 { @@ -54,9 +54,8 @@ const fn child(parent: u64, pos: u64, d: u64) -> u64 { d * parent + pos + 1 } -impl SBT +impl SBT where - L: std::clone::Clone + Default, N: Default, { #[inline(always)] @@ -102,12 +101,10 @@ where // combine } -impl SBT, T> +impl SBT> where - T: ToWriter + Clone, U: ToWriter, Node: ReadData, - SigStore: ReadData, { fn parse_v4(rdr: &mut R) -> Result where @@ -125,7 +122,7 @@ where Ok(SBTInfo::V5(sinfo)) } - pub fn from_reader(mut rdr: R, path: P) -> Result, T>, Error> + pub fn from_reader(mut rdr: R, path: P) -> Result>, Error> where R: Read, P: AsRef, @@ -276,7 +273,7 @@ where }) } - pub fn from_path>(path: P) -> Result, T>, Error> { + pub fn from_path>(path: P) -> Result>, Error> { let file = File::open(&path)?; let mut reader = BufReader::new(file); @@ -287,7 +284,7 @@ where // TODO: canonicalize doesn't work on wasm32-wasi //basepath.canonicalize()?; - let sbt = SBT::, T>::from_reader(&mut reader, &basepath.parent().unwrap())?; + let sbt = SBT::>::from_reader(&mut reader, &basepath.parent().unwrap())?; Ok(sbt) } @@ -346,7 +343,7 @@ where .iter_mut() .map(|(n, l)| { // Trigger data loading - let _: &T = (*l).data().unwrap(); + let _: &Signature = (*l).data().unwrap(); // set storage to new one l.storage = Some(storage.clone()); @@ -369,21 +366,25 @@ where Ok(()) } - pub fn leaves(&self) -> Vec> { + pub fn leaves(&self) -> Vec { self.leaves.values().cloned().collect() } } -impl<'a, N, L> Index<'a> for SBT +impl<'a, N> Index<'a> for SBT where - N: Comparable + Comparable + Update + Debug + Default, - L: Comparable + Update + Clone + Debug + Default, - SBT: FromFactory, - SigStore: From + ReadData, + N: Comparable + Comparable + Update + Debug + Default, + Signature: Update, + SBT: FromFactory, { - type Item = L; - - fn find(&self, search_fn: F, sig: &L, threshold: f64) -> Result, Error> + type Item = Signature; + + fn find( + &self, + search_fn: F, + sig: &Self::Item, + threshold: f64, + ) -> Result, Error> where F: Fn(&dyn Comparable, &Self::Item, f64) -> bool, { @@ -415,7 +416,7 @@ where Ok(matches) } - fn insert(&mut self, dataset: L) -> Result<(), Error> { + fn insert(&mut self, dataset: Signature) -> Result<(), Error> { if self.leaves.is_empty() { // in this case the tree is empty, // just add the dataset to the first available leaf @@ -594,10 +595,7 @@ where } } -impl SigStore -where - T: ToWriter, -{ +impl SigStore { pub fn save(&self, path: &str) -> Result { if let Some(storage) = &self.storage { if let Some(data) = self.data.get() { @@ -684,7 +682,7 @@ enum SBTInfo { enum BinaryTree { Empty, Internal(Box>>>>), - Leaf(Box>>), + Leaf(Box>), } struct TreeNode { @@ -693,14 +691,11 @@ struct TreeNode { right: BinaryTree, } -pub fn scaffold( - mut datasets: Vec>, - storage: Option, -) -> SBT, Signature> +pub fn scaffold(mut datasets: Vec, storage: Option) -> SBT> where N: Clone + Default, { - let mut leaves: HashMap> = HashMap::with_capacity(datasets.len()); + let mut leaves: HashMap = HashMap::with_capacity(datasets.len()); let mut next_round = Vec::new(); From 1965d501d3dcda18cef91e71e95a83353b733ba0 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Sat, 30 Apr 2022 18:02:01 -0700 Subject: [PATCH 12/19] more manifest --- Makefile | 1 + include/sourmash.h | 25 +++++++++ src/core/src/ffi/index/mod.rs | 33 ++++++++++-- src/core/src/ffi/index/revindex.rs | 52 ++++++++++++++++++- src/core/src/ffi/manifest.rs | 51 ++++++++++++++++++- src/core/src/ffi/storage.rs | 7 +-- src/core/src/index/mod.rs | 15 +++++- src/core/src/index/revindex.rs | 82 ++++++++++++++++++++++++++---- src/core/src/manifest.rs | 60 +++++++++++++++++++++- src/sourmash/index/__init__.py | 74 ++++++++++++++------------- src/sourmash/manifest.py | 33 +++++++++++- src/sourmash/sbt_storage.py | 1 + 12 files changed, 376 insertions(+), 58 deletions(-) diff --git a/Makefile b/Makefile index 18cb707ef5..6fc5dd2da2 100644 --- a/Makefile +++ b/Makefile @@ -29,6 +29,7 @@ include/sourmash.h: src/core/src/lib.rs \ src/core/src/ffi/hyperloglog.rs \ src/core/src/ffi/minhash.rs \ src/core/src/ffi/signature.rs \ + src/core/src/ffi/manifest.rs \ src/core/src/ffi/nodegraph.rs \ src/core/src/ffi/index/mod.rs \ src/core/src/ffi/index/revindex.rs \ diff --git a/include/sourmash.h b/include/sourmash.h index c092bec9b6..4fc4d35006 100644 --- a/include/sourmash.h +++ b/include/sourmash.h @@ -55,6 +55,8 @@ typedef struct SourmashLinearIndex SourmashLinearIndex; typedef struct SourmashManifest SourmashManifest; +typedef struct SourmashManifestRowIter SourmashManifestRowIter; + typedef struct SourmashNodegraph SourmashNodegraph; typedef struct SourmashRevIndex SourmashRevIndex; @@ -87,6 +89,10 @@ typedef struct { bool owned; } SourmashStr; +typedef struct { + uint32_t ksize; +} SourmashManifestRow; + bool computeparams_dayhoff(const SourmashComputeParameters *ptr); bool computeparams_dna(const SourmashComputeParameters *ptr); @@ -277,6 +283,8 @@ void linearindex_free(SourmashLinearIndex *ptr); uint64_t linearindex_len(const SourmashLinearIndex *ptr); +SourmashStr linearindex_location(const SourmashLinearIndex *ptr); + const SourmashManifest *linearindex_manifest(const SourmashLinearIndex *ptr); SourmashLinearIndex *linearindex_new(SourmashZipStorage *storage_ptr, @@ -284,8 +292,19 @@ SourmashLinearIndex *linearindex_new(SourmashZipStorage *storage_ptr, SourmashSelection *selection_ptr, bool use_manifest); +SourmashLinearIndex *linearindex_select(SourmashLinearIndex *ptr, + const SourmashSelection *selection_ptr); + +void linearindex_set_manifest(SourmashLinearIndex *ptr, SourmashManifest *manifest_ptr); + SourmashSignatureIter *linearindex_signatures(const SourmashLinearIndex *ptr); +const SourmashZipStorage *linearindex_storage(const SourmashLinearIndex *ptr); + +SourmashManifestRowIter *manifest_rows(const SourmashManifest *ptr); + +const SourmashManifestRow *manifest_rows_iter_next(SourmashManifestRowIter *ptr); + void nodegraph_buffer_free(uint8_t *ptr, uintptr_t insize); bool nodegraph_count(SourmashNodegraph *ptr, uint64_t h); @@ -375,6 +394,12 @@ double searchresult_score(const SourmashSearchResult *ptr); SourmashSignature *searchresult_signature(const SourmashSearchResult *ptr); +uint32_t selection_ksize(const SourmashSelection *ptr); + +SourmashSelection *selection_new(void); + +void selection_set_ksize(SourmashSelection *ptr, uint32_t new_ksize); + void signature_add_protein(SourmashSignature *ptr, const char *sequence); void signature_add_sequence(SourmashSignature *ptr, const char *sequence, bool force); diff --git a/src/core/src/ffi/index/mod.rs b/src/core/src/ffi/index/mod.rs index 07ccda22db..e31f577c28 100644 --- a/src/core/src/ffi/index/mod.rs +++ b/src/core/src/ffi/index/mod.rs @@ -1,6 +1,6 @@ pub mod revindex; -use crate::index::Selection; +use crate::index::{Selection, SigStore}; use crate::signature::Signature; use crate::ffi::signature::SourmashSignature; @@ -37,14 +37,39 @@ pub unsafe extern "C" fn searchresult_signature( SourmashSignature::from_rust((result.1).clone()) } +//================================================================ + pub struct SourmashSelection; impl ForeignObject for SourmashSelection { type RustObject = Selection; } +#[no_mangle] +pub unsafe extern "C" fn selection_new() -> *mut SourmashSelection { + SourmashSelection::from_rust(Selection::default()) +} + +#[no_mangle] +pub unsafe extern "C" fn selection_ksize(ptr: *const SourmashSelection) -> u32 { + let sel = SourmashSelection::as_rust(ptr); + if let Some(ksize) = sel.ksize() { + ksize + } else { + todo!("empty ksize case not supported yet") + } +} + +#[no_mangle] +pub unsafe extern "C" fn selection_set_ksize(ptr: *mut SourmashSelection, new_ksize: u32) { + let sel = SourmashSelection::as_rust_mut(ptr); + sel.set_ksize(new_ksize); +} + +//================================================================ +// pub struct SignatureIterator { - iter: Box>, + iter: Box>, } pub struct SourmashSignatureIter; @@ -57,10 +82,10 @@ impl ForeignObject for SourmashSignatureIter { pub unsafe extern "C" fn signatures_iter_next( ptr: *mut SourmashSignatureIter, ) -> *const SourmashSignature { - let mut iterator = SourmashSignatureIter::into_rust(ptr); + let iterator = SourmashSignatureIter::as_rust_mut(ptr); match iterator.iter.next() { - Some(sig) => SourmashSignature::from_rust(sig), + Some(sig) => SourmashSignature::from_rust(sig.into()), None => std::ptr::null(), } } diff --git a/src/core/src/ffi/index/revindex.rs b/src/core/src/ffi/index/revindex.rs index a4b7ef65a5..7b3ff9f55d 100644 --- a/src/core/src/ffi/index/revindex.rs +++ b/src/core/src/ffi/index/revindex.rs @@ -1,5 +1,6 @@ use std::path::PathBuf; use std::slice; +use std::sync::Arc; use crate::index::revindex::{LinearRevIndex, RevIndex}; use crate::index::Index; @@ -271,7 +272,7 @@ unsafe fn linearindex_new( selection_ptr: *mut SourmashSelection, use_manifest: bool, ) -> Result<*mut SourmashLinearIndex> { - let storage = *SourmashZipStorage::into_rust(storage_ptr); + let storage = Arc::try_unwrap(*SourmashZipStorage::into_rust(storage_ptr)).ok().unwrap(); let manifest = if manifest_ptr.is_null() { if use_manifest { @@ -330,12 +331,48 @@ pub unsafe extern "C" fn linearindex_manifest( SourmashManifest::from_rust(index.manifest()) } +ffi_fn! { +unsafe fn linearindex_set_manifest( + ptr: *mut SourmashLinearIndex, + manifest_ptr: *mut SourmashManifest, +) -> Result<()> { + let index = SourmashLinearIndex::as_rust_mut(ptr); + let manifest = SourmashManifest::into_rust(manifest_ptr); + + index.set_manifest(*manifest)?; + Ok(()) +} +} + #[no_mangle] pub unsafe extern "C" fn linearindex_len(ptr: *const SourmashLinearIndex) -> u64 { let index = SourmashLinearIndex::as_rust(ptr); index.len() as u64 } +#[no_mangle] +pub unsafe extern "C" fn linearindex_location(ptr: *const SourmashLinearIndex) -> SourmashStr { + let index = SourmashLinearIndex::as_rust(ptr); + match index.location() { + Some(x) => x, + None => "".into(), + } + .into() +} + +#[no_mangle] +pub unsafe extern "C" fn linearindex_storage( + ptr: *const SourmashLinearIndex, +) -> *const SourmashZipStorage { + let index = SourmashLinearIndex::as_rust(ptr); + let storage = index.storage(); + + match storage { + Some(st) => SourmashZipStorage::from_rust(st), + None => std::ptr::null::(), + } +} + #[no_mangle] pub unsafe extern "C" fn linearindex_signatures( ptr: *const SourmashLinearIndex, @@ -345,3 +382,16 @@ pub unsafe extern "C" fn linearindex_signatures( let iter = Box::new(index.signatures_iter()); SourmashSignatureIter::from_rust(SignatureIterator { iter }) } + +ffi_fn! { +unsafe fn linearindex_select( + ptr: *mut SourmashLinearIndex, + selection_ptr: *const SourmashSelection, +) -> Result<*mut SourmashLinearIndex> { + let index = SourmashLinearIndex::into_rust(ptr); + let selection = SourmashSelection::as_rust(selection_ptr); + + let new_index = index.select(selection)?; + Ok(SourmashLinearIndex::from_rust(new_index)) +} +} diff --git a/src/core/src/ffi/manifest.rs b/src/core/src/ffi/manifest.rs index d307f7373b..d856d32d41 100644 --- a/src/core/src/ffi/manifest.rs +++ b/src/core/src/ffi/manifest.rs @@ -1,4 +1,4 @@ -use crate::manifest::Manifest; +use crate::manifest::{Manifest, Record}; use crate::ffi::utils::ForeignObject; @@ -7,3 +7,52 @@ pub struct SourmashManifest; impl ForeignObject for SourmashManifest { type RustObject = Manifest; } + +pub struct ManifestRowIterator { + iter: Box>, +} + +pub struct SourmashManifestRowIter; + +impl ForeignObject for SourmashManifestRowIter { + type RustObject = ManifestRowIterator; +} + +#[no_mangle] +pub unsafe extern "C" fn manifest_rows_iter_next( + ptr: *mut SourmashManifestRowIter, +) -> *const SourmashManifestRow { + let iterator = SourmashManifestRowIter::as_rust_mut(ptr); + + match iterator.iter.next() { + Some(row) => SourmashManifestRow::from_rust(row.into()), + None => std::ptr::null(), + } +} + +#[no_mangle] +pub unsafe extern "C" fn manifest_rows( + ptr: *const SourmashManifest, +) -> *mut SourmashManifestRowIter { + let manifest = SourmashManifest::as_rust(ptr); + + let iter = Box::new(manifest.iter()); + SourmashManifestRowIter::from_rust(ManifestRowIterator { iter }) +} + +#[repr(C)] +pub struct SourmashManifestRow { + pub ksize: u32, +} + +impl ForeignObject for SourmashManifestRow { + type RustObject = SourmashManifestRow; +} + +impl From<&Record> for SourmashManifestRow { + fn from(record: &Record) -> SourmashManifestRow { + Self { + ksize: record.ksize(), + } + } +} diff --git a/src/core/src/ffi/storage.rs b/src/core/src/ffi/storage.rs index 98eca095b2..e8abcf1d51 100644 --- a/src/core/src/ffi/storage.rs +++ b/src/core/src/ffi/storage.rs @@ -1,5 +1,6 @@ use std::os::raw::c_char; use std::slice; +use std::sync::Arc; use crate::ffi::utils::{ForeignObject, SourmashStr}; use crate::prelude::*; @@ -8,7 +9,7 @@ use crate::storage::ZipStorage; pub struct SourmashZipStorage; impl ForeignObject for SourmashZipStorage { - type RustObject = ZipStorage; + type RustObject = Arc; } ffi_fn! { @@ -20,7 +21,7 @@ unsafe fn zipstorage_new(ptr: *const c_char, insize: usize) -> Result<*mut Sourm }; let zipstorage = ZipStorage::from_file(path)?; - Ok(SourmashZipStorage::from_rust(zipstorage)) + Ok(SourmashZipStorage::from_rust(Arc::new(zipstorage))) } } @@ -110,7 +111,7 @@ unsafe fn zipstorage_set_subdir( std::str::from_utf8(path)? }; - storage.set_subdir(path.to_string()); + (*Arc::get_mut(storage).unwrap()).set_subdir(path.to_string()); Ok(()) } } diff --git a/src/core/src/index/mod.rs b/src/core/src/index/mod.rs index 4a1d62497b..a3838c156c 100644 --- a/src/core/src/index/mod.rs +++ b/src/core/src/index/mod.rs @@ -29,7 +29,20 @@ use crate::Error; pub type MHBT = SBT>; -pub struct Selection; +#[derive(Default)] +pub struct Selection { + ksize: Option, +} + +impl Selection { + pub fn ksize(&self) -> Option { + self.ksize + } + + pub fn set_ksize(&mut self, ksize: u32) { + self.ksize = Some(ksize); + } +} /* FIXME: bring back after MQF works on macOS and Windows use cfg_if::cfg_if; diff --git a/src/core/src/index/revindex.rs b/src/core/src/index/revindex.rs index 4aeda3a7ee..244d1a05b1 100644 --- a/src/core/src/index/revindex.rs +++ b/src/core/src/index/revindex.rs @@ -1,6 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use getset::{CopyGetters, Getters, Setters}; use log::{debug, info}; @@ -11,7 +12,7 @@ use serde::{Deserialize, Serialize}; use rayon::prelude::*; use crate::encodings::{Color, Colors, Idx}; -use crate::index::Index; +use crate::index::{Index, Selection, SigStore}; use crate::manifest::Manifest; use crate::signature::{Signature, SigsTrait}; use crate::sketch::minhash::KmerMinHash; @@ -111,12 +112,12 @@ pub struct LinearRevIndex { sig_files: Manifest, #[serde(skip)] - ref_sigs: Option>, + ref_sigs: Option>, template: Sketch, #[serde(skip)] - storage: Option, + storage: Option>, } impl LinearRevIndex { @@ -130,7 +131,7 @@ impl LinearRevIndex { let search_sigs: Vec<_> = sig_files.internal_locations().map(PathBuf::from).collect(); let ref_sigs = if let Some(ref_sigs) = ref_sigs { - Some(ref_sigs) + Some(ref_sigs.into_iter().map(|m| m.into()).collect()) } else if keep_sigs { #[cfg(feature = "parallel")] let sigs_iter = search_sigs.par_iter(); @@ -150,10 +151,12 @@ impl LinearRevIndex { Signature::from_reader(sig_data.as_slice()) .unwrap_or_else(|_| panic!("Error processing {:?}", ref_path)) .swap_remove(0) + .into() } else { Signature::from_path(&ref_path) .unwrap_or_else(|_| panic!("Error processing {:?}", ref_path)) .swap_remove(0) + .into() } }) .collect(), @@ -162,6 +165,8 @@ impl LinearRevIndex { None }; + let storage = storage.map(Arc::new); + LinearRevIndex { sig_files, template: template.clone(), @@ -241,6 +246,58 @@ impl LinearRevIndex { } } + pub fn location(&self) -> Option { + if let Some(storage) = &self.storage { + storage.path() + } else { + None + } + } + + pub fn storage(&self) -> Option> { + self.storage.clone() + } + + pub fn select(mut self, selection: &Selection) -> Result { + let manifest = self.sig_files.select_to_manifest(selection)?; + self.sig_files = manifest; + + Ok(self) + /* + # if we have a manifest, run 'select' on the manifest. + manifest = self.manifest + traverse_yield_all = self.traverse_yield_all + + if manifest is not None: + manifest = manifest.select_to_manifest(**kwargs) + return ZipFileLinearIndex(self.storage, + selection_dict=None, + traverse_yield_all=traverse_yield_all, + manifest=manifest, + use_manifest=True) + else: + # no manifest? just pass along all the selection kwargs to + # the new ZipFileLinearIndex. + + assert manifest is None + if self.selection_dict: + # combine selects... + d = dict(self.selection_dict) + for k, v in kwargs.items(): + if k in d: + if d[k] is not None and d[k] != v: + raise ValueError(f"incompatible select on '{k}'") + d[k] = v + kwargs = d + + return ZipFileLinearIndex(self.storage, + selection_dict=kwargs, + traverse_yield_all=traverse_yield_all, + manifest=None, + use_manifest=False) + */ + } + pub fn counter_for_query(&self, query: &KmerMinHash) -> SigCounter { let processed_sigs = AtomicUsize::new(0); @@ -363,7 +420,7 @@ impl LinearRevIndex { Ok(result) } - fn sig_for_dataset(&self, dataset_id: usize) -> Result { + fn sig_for_dataset(&self, dataset_id: usize) -> Result { let match_path = if self.sig_files.is_empty() { PathBuf::new() } else { @@ -371,7 +428,7 @@ impl LinearRevIndex { }; let match_sig = if let Some(refsigs) = &self.ref_sigs { - refsigs[dataset_id as usize].clone() + refsigs[dataset_id as usize].clone().into() } else { let mut sig = if let Some(storage) = &self.storage { let sig_data = storage @@ -386,7 +443,7 @@ impl LinearRevIndex { Signature::from_path(&match_path)? }; // TODO: remove swap_remove - sig.swap_remove(0) + sig.swap_remove(0).into() }; Ok(match_sig) } @@ -503,7 +560,12 @@ impl LinearRevIndex { self.sig_files.clone() } - pub fn signatures_iter(&self) -> impl Iterator + '_ { + pub fn set_manifest(&mut self, new_manifest: Manifest) -> Result<(), Error> { + self.sig_files = new_manifest; + Ok(()) + } + + pub fn signatures_iter(&self) -> impl Iterator + '_ { if let Some(_sigs) = &self.ref_sigs { //sigs.iter().cloned() todo!("this works, but need to match return types") @@ -516,7 +578,7 @@ impl LinearRevIndex { } impl<'a> Index<'a> for LinearRevIndex { - type Item = Signature; + type Item = SigStore; fn insert(&mut self, _node: Self::Item) -> Result<(), Error> { unimplemented!() @@ -927,7 +989,7 @@ impl<'a> Index<'a> for RevIndex { fn signatures(&self) -> Vec { if let Some(ref sigs) = self.linear.ref_sigs { - sigs.to_vec() + sigs.iter().map(|s| s.clone().into()).collect() } else { unimplemented!() } diff --git a/src/core/src/manifest.rs b/src/core/src/manifest.rs index a7c12e863b..ab1d20d210 100644 --- a/src/core/src/manifest.rs +++ b/src/core/src/manifest.rs @@ -4,15 +4,16 @@ use std::path::PathBuf; use serde::{Deserialize, Serialize}; +use crate::index::Selection; use crate::Error; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Record { internal_location: String, + ksize: u32, /* md5: String, md5short: String, - ksize: String, moltype: String, num: String, scaled: String, n_hashes: String, @@ -31,6 +32,10 @@ impl Record { pub fn internal_location(&self) -> PathBuf { self.internal_location.clone().into() } + + pub fn ksize(&self) -> u32 { + self.ksize + } } impl Manifest { @@ -50,6 +55,58 @@ impl Manifest { pub fn internal_locations(&self) -> impl Iterator { self.records.iter().map(|r| r.internal_location.as_str()) } + + pub fn iter(&self) -> impl Iterator { + self.records.iter() + } + + pub fn select_to_manifest(&self, selection: &Selection) -> Result { + let rows = self.records.iter().filter(|row| { + let mut valid = false; + valid = if let Some(ksize) = selection.ksize() { + row.ksize == ksize + } else { + valid + }; + valid + }); + + Ok(Manifest { + records: rows.cloned().collect(), + }) + + /* + matching_rows = self.rows + if ksize: + matching_rows = ( row for row in matching_rows + if row['ksize'] == ksize ) + if moltype: + matching_rows = ( row for row in matching_rows + if row['moltype'] == moltype ) + if scaled or containment: + if containment and not scaled: + raise ValueError("'containment' requires 'scaled' in Index.select'") + + matching_rows = ( row for row in matching_rows + if row['scaled'] and not row['num'] ) + if num: + matching_rows = ( row for row in matching_rows + if row['num'] and not row['scaled'] ) + + if abund: + # only need to concern ourselves if abundance is _required_ + matching_rows = ( row for row in matching_rows + if row['with_abundance'] ) + + if picklist: + matching_rows = ( row for row in matching_rows + if picklist.matches_manifest_row(row) ) + + # return only the internal filenames! + for row in matching_rows: + yield row + */ + } } impl From<&[PathBuf]> for Manifest { @@ -59,6 +116,7 @@ impl From<&[PathBuf]> for Manifest { .iter() .map(|p| Record { internal_location: p.to_str().unwrap().into(), + ksize: 0, // FIXME }) .collect(), } diff --git a/src/sourmash/index/__init__.py b/src/sourmash/index/__init__.py index c91cd42d0e..562888490a 100644 --- a/src/sourmash/index/__init__.py +++ b/src/sourmash/index/__init__.py @@ -81,8 +81,27 @@ class Selection(TypedDict): # TypedDict can't have methods (it is a dict in runtime) def _selection_as_rust(selection: Selection): - ... - + ptr = lib.selection_new() + + for key, v in selection.items(): + if v is not None: + if key == "ksize": + rustcall(lib.selection_set_ksize, ptr, v) + elif key == "moltype": + ... + elif key == "num": + ... + elif key == "scaled": + ... + elif key == "containment": + ... + elif key == "abund": + ... + elif key == "picklist": + ... + else: + raise KeyError(f"Unsupported key {key} for Selection in rust") + return ptr class Index(ABC): # this will be removed soon; see sourmash#1894. @@ -573,7 +592,7 @@ def manifest(self): @manifest.setter def manifest(self, value): - raise NotImplementedError() + self._methodcall(lib.linearindex_set_manifest, value._as_rust()._take_objptr()) def __bool__(self): "Are there any matching signatures in this zipfile? Avoid calling len." @@ -585,7 +604,14 @@ def __len__(self): @property def location(self): - return self.storage.path + return self._methodcall(lib.linearindex_location) + + @property + def storage(self): + from ..sbt_storage import ZipStorage + + ptr = self._methodcall(lib.linearindex_storage) + return ZipStorage._from_objptr(ptr) def insert(self, signature): raise NotImplementedError @@ -617,7 +643,7 @@ def _signatures_with_internal(self): # should we load this file? if it ends in .sig OR we are forcing: if filename.endswith('.sig') or \ filename.endswith('.sig.gz') or \ - self.traverse_yield_all: + self._traverse_yield_all: sig_data = self.storage.load(filename) for ss in load_signatures(sig_data): yield ss, filename @@ -636,37 +662,13 @@ def signatures(self): def select(self, **kwargs: Unpack[Selection]): "Select signatures in zip file based on ksize/moltype/etc." - # if we have a manifest, run 'select' on the manifest. - manifest = self.manifest - traverse_yield_all = self.traverse_yield_all - - if manifest is not None: - manifest = manifest.select_to_manifest(**kwargs) - return ZipFileLinearIndex(self.storage, - selection_dict=None, - traverse_yield_all=traverse_yield_all, - manifest=manifest, - use_manifest=True) - else: - # no manifest? just pass along all the selection kwargs to - # the new ZipFileLinearIndex. - - assert manifest is None - if self.selection_dict: - # combine selects... - d = dict(self.selection_dict) - for k, v in kwargs.items(): - if k in d: - if d[k] is not None and d[k] != v: - raise ValueError(f"incompatible select on '{k}'") - d[k] = v - kwargs = d - - return ZipFileLinearIndex(self.storage, - selection_dict=kwargs, - traverse_yield_all=traverse_yield_all, - manifest=None, - use_manifest=False) + selection = _selection_as_rust(kwargs) + + # select consumes the current index + ptr = self._take_objptr() + ptr = rustcall(lib.linearindex_select, ptr, selection) + + return ZipFileLinearIndex._from_objptr(ptr) class CounterGather: diff --git a/src/sourmash/manifest.py b/src/sourmash/manifest.py index c151263c7d..c91be928a4 100644 --- a/src/sourmash/manifest.py +++ b/src/sourmash/manifest.py @@ -6,9 +6,13 @@ import os.path from abc import abstractmethod import itertools +from typing import TYPE_CHECKING from sourmash.picklist import SignaturePicklist +if TYPE_CHECKING: + from typing_extensions import Unpack + class BaseCollectionManifest: """ @@ -296,6 +300,7 @@ def _select(self, *, ksize=None, moltype=None, scaled=0, num=0, for row in matching_rows: yield row + #def select_to_manifest(self, **kwargs: Unpack[Selection]): def select_to_manifest(self, **kwargs): "Do a 'select' and return a new CollectionManifest object." new_rows = self._select(**kwargs) @@ -339,4 +344,30 @@ def to_picklist(self): @staticmethod def _from_rust(value): - raise NotImplementedError() + from ._lowlevel import ffi, lib + from .utils import rustcall + + iterator = rustcall(lib.manifest_rows, value) + + rows = [] + next_row = rustcall(lib.manifest_rows_iter_next, iterator) + while next_row != ffi.NULL: + + # TODO: extract row data from next_row + row = {} + row['md5'] = "" #ss.md5sum() + row['md5short'] = row['md5'][:8] + row['ksize'] = 32 #ss.minhash.ksize + row['moltype'] = "" #ss.minhash.moltype + row['num'] = 0 #ss.minhash.num + row['scaled'] = 0 #ss.minhash.scaled + row['n_hashes'] = 0 # len(ss.minhash) + row['with_abundance'] = 1 # 1 if ss.minhash.track_abundance else 0 + row['name'] = "" #ss.name + row['filename'] = "" #ss.filename + row['internal_location'] = "" #location + rows.append(row) + + next_row = rustcall(lib.manifest_rows_iter_next, iterator) + + return CollectionManifest(rows) diff --git a/src/sourmash/sbt_storage.py b/src/sourmash/sbt_storage.py index e57f158f32..50544d6786 100644 --- a/src/sourmash/sbt_storage.py +++ b/src/sourmash/sbt_storage.py @@ -97,6 +97,7 @@ def load(self, path): class ZipStorage(RustObject, Storage): __dealloc_func__ = lib.zipstorage_free + __inner = None def __init__(self, path, *, mode="r"): path = os.path.abspath(path) From c0216238a47b9c23b4b2fc91ac88a219cb24b344 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Sat, 30 Apr 2022 19:21:41 -0700 Subject: [PATCH 13/19] 61 failing --- include/sourmash.h | 5 +++++ src/core/src/ffi/manifest.rs | 17 ++++++++++++++++- src/core/src/manifest.rs | 33 +++++++++++++++++++++++++++------ src/sourmash/manifest.py | 15 ++++++++------- 4 files changed, 56 insertions(+), 14 deletions(-) diff --git a/include/sourmash.h b/include/sourmash.h index 4fc4d35006..748d6b8f5f 100644 --- a/include/sourmash.h +++ b/include/sourmash.h @@ -91,6 +91,11 @@ typedef struct { typedef struct { uint32_t ksize; + uint8_t with_abundance; + SourmashStr md5; + SourmashStr internal_location; + SourmashStr name; + SourmashStr moltype; } SourmashManifestRow; bool computeparams_dayhoff(const SourmashComputeParameters *ptr); diff --git a/src/core/src/ffi/manifest.rs b/src/core/src/ffi/manifest.rs index d856d32d41..202622bf1a 100644 --- a/src/core/src/ffi/manifest.rs +++ b/src/core/src/ffi/manifest.rs @@ -1,6 +1,6 @@ use crate::manifest::{Manifest, Record}; -use crate::ffi::utils::ForeignObject; +use crate::ffi::utils::{ForeignObject, SourmashStr}; pub struct SourmashManifest; @@ -43,6 +43,11 @@ pub unsafe extern "C" fn manifest_rows( #[repr(C)] pub struct SourmashManifestRow { pub ksize: u32, + pub with_abundance: u8, + pub md5: SourmashStr, + pub internal_location: SourmashStr, + pub name: SourmashStr, + pub moltype: SourmashStr, } impl ForeignObject for SourmashManifestRow { @@ -53,6 +58,16 @@ impl From<&Record> for SourmashManifestRow { fn from(record: &Record) -> SourmashManifestRow { Self { ksize: record.ksize(), + with_abundance: record.with_abundance() as u8, + md5: record.md5().into(), + name: record.name().into(), + moltype: record.moltype().into(), + internal_location: record + .internal_location() + .to_str() + .unwrap() + .to_owned() + .into(), } } } diff --git a/src/core/src/manifest.rs b/src/core/src/manifest.rs index ab1d20d210..0bdc813859 100644 --- a/src/core/src/manifest.rs +++ b/src/core/src/manifest.rs @@ -11,14 +11,15 @@ use crate::Error; pub struct Record { internal_location: String, ksize: u32, - /* + with_abundance: u8, md5: String, - md5short: String, + name: String, moltype: String, + /* + md5short: String, num: String, - scaled: String, n_hashes: String, - with_abundance: String, - name: String, + scaled: String, + n_hashes: String, filename: String, */ } @@ -36,6 +37,22 @@ impl Record { pub fn ksize(&self) -> u32 { self.ksize } + + pub fn with_abundance(&self) -> bool { + self.with_abundance != 0 + } + + pub fn md5(&self) -> &str { + self.md5.as_ref() + } + + pub fn name(&self) -> &str { + self.name.as_ref() + } + + pub fn moltype(&self) -> &str { + self.moltype.as_ref() + } } impl Manifest { @@ -116,7 +133,11 @@ impl From<&[PathBuf]> for Manifest { .iter() .map(|p| Record { internal_location: p.to_str().unwrap().into(), - ksize: 0, // FIXME + ksize: 0, // FIXME + with_abundance: 0, // FIXME + md5: "".into(), // FIXME + name: "".into(), // FIXME + moltype: "".into(), // FIXME }) .collect(), } diff --git a/src/sourmash/manifest.py b/src/sourmash/manifest.py index c91be928a4..be00e2dece 100644 --- a/src/sourmash/manifest.py +++ b/src/sourmash/manifest.py @@ -345,7 +345,7 @@ def to_picklist(self): @staticmethod def _from_rust(value): from ._lowlevel import ffi, lib - from .utils import rustcall + from .utils import rustcall, decode_str iterator = rustcall(lib.manifest_rows, value) @@ -354,18 +354,19 @@ def _from_rust(value): while next_row != ffi.NULL: # TODO: extract row data from next_row + # FIXME: free mem from strings? row = {} - row['md5'] = "" #ss.md5sum() + row['md5'] = decode_str(next_row.md5) row['md5short'] = row['md5'][:8] - row['ksize'] = 32 #ss.minhash.ksize - row['moltype'] = "" #ss.minhash.moltype + row['ksize'] = next_row.ksize + row['moltype'] = decode_str(next_row.moltype) row['num'] = 0 #ss.minhash.num row['scaled'] = 0 #ss.minhash.scaled row['n_hashes'] = 0 # len(ss.minhash) - row['with_abundance'] = 1 # 1 if ss.minhash.track_abundance else 0 - row['name'] = "" #ss.name + row['with_abundance'] = next_row.with_abundance + row['name'] = decode_str(next_row.name) row['filename'] = "" #ss.filename - row['internal_location'] = "" #location + row['internal_location'] = decode_str(next_row.internal_location) rows.append(row) next_row = rustcall(lib.manifest_rows_iter_next, iterator) From e9fb35f3512d02c8c11e8bc1fce05892ad333cfe Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Mon, 2 May 2022 21:38:50 -0700 Subject: [PATCH 14/19] 34 failing --- include/sourmash.h | 8 +++++++ src/core/src/ffi/index/mod.rs | 36 ++++++++++++++++++++++++++++++ src/core/src/ffi/index/revindex.rs | 6 ++--- src/core/src/ffi/manifest.rs | 2 +- src/core/src/index/mod.rs | 19 ++++++++++++++++ src/core/src/index/revindex.rs | 19 +++++++++++----- src/core/src/manifest.rs | 18 ++++++++++++--- src/sourmash/index/__init__.py | 15 ++++++++++--- 8 files changed, 108 insertions(+), 15 deletions(-) diff --git a/include/sourmash.h b/include/sourmash.h index 748d6b8f5f..b485853f87 100644 --- a/include/sourmash.h +++ b/include/sourmash.h @@ -399,12 +399,20 @@ double searchresult_score(const SourmashSearchResult *ptr); SourmashSignature *searchresult_signature(const SourmashSearchResult *ptr); +bool selection_abund(const SourmashSelection *ptr); + uint32_t selection_ksize(const SourmashSelection *ptr); +HashFunctions selection_moltype(const SourmashSelection *ptr); + SourmashSelection *selection_new(void); +void selection_set_abund(SourmashSelection *ptr, bool new_abund); + void selection_set_ksize(SourmashSelection *ptr, uint32_t new_ksize); +void selection_set_moltype(SourmashSelection *ptr, HashFunctions new_moltype); + void signature_add_protein(SourmashSignature *ptr, const char *sequence); void signature_add_sequence(SourmashSignature *ptr, const char *sequence, bool force); diff --git a/src/core/src/ffi/index/mod.rs b/src/core/src/ffi/index/mod.rs index e31f577c28..aa6e354a5b 100644 --- a/src/core/src/ffi/index/mod.rs +++ b/src/core/src/ffi/index/mod.rs @@ -1,5 +1,6 @@ pub mod revindex; +use crate::encodings::HashFunctions; use crate::index::{Selection, SigStore}; use crate::signature::Signature; @@ -66,6 +67,41 @@ pub unsafe extern "C" fn selection_set_ksize(ptr: *mut SourmashSelection, new_ks sel.set_ksize(new_ksize); } +#[no_mangle] +pub unsafe extern "C" fn selection_abund(ptr: *const SourmashSelection) -> bool { + let sel = SourmashSelection::as_rust(ptr); + if let Some(abund) = sel.abund() { + abund + } else { + todo!("empty abund case not supported yet") + } +} + +#[no_mangle] +pub unsafe extern "C" fn selection_set_abund(ptr: *mut SourmashSelection, new_abund: bool) { + let sel = SourmashSelection::as_rust_mut(ptr); + sel.set_abund(new_abund); +} + +#[no_mangle] +pub unsafe extern "C" fn selection_moltype(ptr: *const SourmashSelection) -> HashFunctions { + let sel = SourmashSelection::as_rust(ptr); + if let Some(hash_function) = sel.moltype() { + hash_function + } else { + todo!("empty hash_function case not supported yet") + } +} + +#[no_mangle] +pub unsafe extern "C" fn selection_set_moltype( + ptr: *mut SourmashSelection, + new_moltype: HashFunctions, +) { + let sel = SourmashSelection::as_rust_mut(ptr); + sel.set_moltype(new_moltype); +} + //================================================================ // pub struct SignatureIterator { diff --git a/src/core/src/ffi/index/revindex.rs b/src/core/src/ffi/index/revindex.rs index 7b3ff9f55d..1f25c944fe 100644 --- a/src/core/src/ffi/index/revindex.rs +++ b/src/core/src/ffi/index/revindex.rs @@ -277,12 +277,12 @@ unsafe fn linearindex_new( let manifest = if manifest_ptr.is_null() { if use_manifest { // Load manifest from zipstorage - Manifest::from_reader(storage.load("SOURMASH-MANIFEST.csv")?.as_slice())? + Some(Manifest::from_reader(storage.load("SOURMASH-MANIFEST.csv")?.as_slice())?) } else { - todo!("throw error") + None } } else { - *SourmashManifest::into_rust(manifest_ptr) + Some(*SourmashManifest::into_rust(manifest_ptr)) }; let _selection = if !selection_ptr.is_null() { diff --git a/src/core/src/ffi/manifest.rs b/src/core/src/ffi/manifest.rs index 202622bf1a..815f8d83f1 100644 --- a/src/core/src/ffi/manifest.rs +++ b/src/core/src/ffi/manifest.rs @@ -61,7 +61,7 @@ impl From<&Record> for SourmashManifestRow { with_abundance: record.with_abundance() as u8, md5: record.md5().into(), name: record.name().into(), - moltype: record.moltype().into(), + moltype: record.moltype().to_string().into(), internal_location: record .internal_location() .to_str() diff --git a/src/core/src/index/mod.rs b/src/core/src/index/mod.rs index a3838c156c..f709c26b5d 100644 --- a/src/core/src/index/mod.rs +++ b/src/core/src/index/mod.rs @@ -17,6 +17,7 @@ use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; +use crate::encodings::HashFunctions; use crate::errors::ReadDataError; use crate::index::sbt::{Node, SBT}; use crate::index::search::{search_minhashes, search_minhashes_containment}; @@ -32,6 +33,8 @@ pub type MHBT = SBT>; #[derive(Default)] pub struct Selection { ksize: Option, + abund: Option, + moltype: Option, } impl Selection { @@ -42,6 +45,22 @@ impl Selection { pub fn set_ksize(&mut self, ksize: u32) { self.ksize = Some(ksize); } + + pub fn abund(&self) -> Option { + self.abund + } + + pub fn set_abund(&mut self, value: bool) { + self.abund = Some(value); + } + + pub fn moltype(&self) -> Option { + self.moltype + } + + pub fn set_moltype(&mut self, value: HashFunctions) { + self.moltype = Some(value); + } } /* FIXME: bring back after MQF works on macOS and Windows diff --git a/src/core/src/index/revindex.rs b/src/core/src/index/revindex.rs index 244d1a05b1..98b073cb01 100644 --- a/src/core/src/index/revindex.rs +++ b/src/core/src/index/revindex.rs @@ -122,17 +122,26 @@ pub struct LinearRevIndex { impl LinearRevIndex { pub fn new( - sig_files: Manifest, + sig_files: Option, template: &Sketch, keep_sigs: bool, ref_sigs: Option>, storage: Option, ) -> Self { - let search_sigs: Vec<_> = sig_files.internal_locations().map(PathBuf::from).collect(); + if ref_sigs.is_none() && sig_files.is_none() { + todo!("throw error, one need to be set"); + } let ref_sigs = if let Some(ref_sigs) = ref_sigs { Some(ref_sigs.into_iter().map(|m| m.into()).collect()) } else if keep_sigs { + let search_sigs: Vec<_> = sig_files + .as_ref() + .unwrap() + .internal_locations() + .map(PathBuf::from) + .collect(); + #[cfg(feature = "parallel")] let sigs_iter = search_sigs.par_iter(); @@ -168,7 +177,7 @@ impl LinearRevIndex { let storage = storage.map(Arc::new); LinearRevIndex { - sig_files, + sig_files: sig_files.unwrap(), template: template.clone(), ref_sigs, storage, @@ -665,7 +674,7 @@ impl RevIndex { // If threshold is zero, let's merge all queries and save time later let merged_query = queries.and_then(|qs| Self::merge_queries(qs, threshold)); - let linear = LinearRevIndex::new(search_sigs.into(), template, keep_sigs, None, None); + let linear = LinearRevIndex::new(Some(search_sigs.into()), template, keep_sigs, None, None); linear.index(threshold, merged_query, queries) } @@ -684,7 +693,7 @@ impl RevIndex { let search_sigs: Vec<_> = manifest.internal_locations().map(PathBuf::from).collect(); let linear = LinearRevIndex::new( - search_sigs.as_slice().into(), + Some(search_sigs.as_slice().into()), template, keep_sigs, None, diff --git a/src/core/src/manifest.rs b/src/core/src/manifest.rs index 0bdc813859..09e177aa89 100644 --- a/src/core/src/manifest.rs +++ b/src/core/src/manifest.rs @@ -1,9 +1,11 @@ +use std::convert::TryInto; use std::io::Read; use std::ops::Deref; use std::path::PathBuf; use serde::{Deserialize, Serialize}; +use crate::encodings::HashFunctions; use crate::index::Selection; use crate::Error; @@ -50,8 +52,8 @@ impl Record { self.name.as_ref() } - pub fn moltype(&self) -> &str { - self.moltype.as_ref() + pub fn moltype(&self) -> HashFunctions { + self.moltype.as_str().try_into().unwrap() } } @@ -79,12 +81,22 @@ impl Manifest { pub fn select_to_manifest(&self, selection: &Selection) -> Result { let rows = self.records.iter().filter(|row| { - let mut valid = false; + let mut valid = true; valid = if let Some(ksize) = selection.ksize() { row.ksize == ksize } else { valid }; + valid = if let Some(abund) = selection.abund() { + valid && row.with_abundance() == abund + } else { + valid + }; + valid = if let Some(moltype) = selection.moltype() { + valid && row.moltype() == moltype + } else { + valid + }; valid }); diff --git a/src/sourmash/index/__init__.py b/src/sourmash/index/__init__.py index 562888490a..c990e4e521 100644 --- a/src/sourmash/index/__init__.py +++ b/src/sourmash/index/__init__.py @@ -88,7 +88,16 @@ def _selection_as_rust(selection: Selection): if key == "ksize": rustcall(lib.selection_set_ksize, ptr, v) elif key == "moltype": - ... + hash_function = None + if v.lower() == "dna": + hash_function = lib.HASH_FUNCTIONS_MURMUR64_DNA + elif v.lower() == "protein": + hash_function = lib.HASH_FUNCTIONS_MURMUR64_PROTEIN + elif v.lower() == "dayhoff": + hash_function = lib.HASH_FUNCTIONS_MURMUR64_DAYHOFF + elif v.lower() == "hp": + hash_function = lib.HASH_FUNCTIONS_MURMUR64_HP + rustcall(lib.selection_set_moltype, ptr, hash_function) elif key == "num": ... elif key == "scaled": @@ -96,7 +105,7 @@ def _selection_as_rust(selection: Selection): elif key == "containment": ... elif key == "abund": - ... + rustcall(lib.selection_set_abund, ptr, bool(v)) elif key == "picklist": ... else: @@ -604,7 +613,7 @@ def __len__(self): @property def location(self): - return self._methodcall(lib.linearindex_location) + return decode_str(self._methodcall(lib.linearindex_location)) @property def storage(self): From ef38c6e956831c3e3e5636e233a6ebfa5c421b60 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Tue, 3 May 2022 18:52:43 -0700 Subject: [PATCH 15/19] clippy --- src/core/src/index/revindex.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/src/index/revindex.rs b/src/core/src/index/revindex.rs index 98b073cb01..f8731b14d1 100644 --- a/src/core/src/index/revindex.rs +++ b/src/core/src/index/revindex.rs @@ -437,7 +437,7 @@ impl LinearRevIndex { }; let match_sig = if let Some(refsigs) = &self.ref_sigs { - refsigs[dataset_id as usize].clone().into() + refsigs[dataset_id as usize].clone() } else { let mut sig = if let Some(storage) = &self.storage { let sig_data = storage @@ -558,7 +558,7 @@ impl LinearRevIndex { }; } to_remove.iter().for_each(|dataset_id| { - counter.remove(&dataset_id); + counter.remove(dataset_id); }); matches.push(result); } From aa74ed17342ced57d2ff02e69962c47b463bac8d Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Fri, 6 May 2022 13:45:47 -0700 Subject: [PATCH 16/19] use pytest-xdist --- tox.ini | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tox.ini b/tox.ini index 6a8d681ad8..6e03be0fd9 100644 --- a/tox.ini +++ b/tox.ini @@ -44,8 +44,10 @@ commands = pytest \ --cov-config "{toxinidir}/tox.ini" \ --cov-report= \ --junitxml {toxworkdir}/junit.{envname}.xml \ + -n 4 \ {posargs:doc tests} + [testenv:pypy3] deps = pip >= 19.3.1 From e94f2f81d2efd208555164d7b9d35df72aafadd3 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Fri, 6 May 2022 13:46:13 -0700 Subject: [PATCH 17/19] deserialize multiple bool values in manifest --- src/core/src/manifest.rs | 35 +++++++++++++++++++++++++++------- src/sourmash/index/__init__.py | 2 ++ 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/src/core/src/manifest.rs b/src/core/src/manifest.rs index 09e177aa89..ce740c638b 100644 --- a/src/core/src/manifest.rs +++ b/src/core/src/manifest.rs @@ -3,6 +3,7 @@ use std::io::Read; use std::ops::Deref; use std::path::PathBuf; +use serde::de; use serde::{Deserialize, Serialize}; use crate::encodings::HashFunctions; @@ -13,7 +14,10 @@ use crate::Error; pub struct Record { internal_location: String, ksize: u32, - with_abundance: u8, + + #[serde(deserialize_with = "to_bool")] + with_abundance: bool, + md5: String, name: String, moltype: String, @@ -26,6 +30,23 @@ pub struct Record { */ } +fn to_bool<'de, D>(deserializer: D) -> Result +where + D: de::Deserializer<'de>, +{ + match String::deserialize(deserializer)? + .to_ascii_lowercase() + .as_ref() + { + "0" | "false" => Ok(false), + "1" | "true" => Ok(true), + other => Err(de::Error::invalid_value( + de::Unexpected::Str(other), + &"0/1 or true/false are the only supported values", + )), + } +} + #[derive(Debug, Default, Serialize, Deserialize, Clone)] pub struct Manifest { records: Vec, @@ -41,7 +62,7 @@ impl Record { } pub fn with_abundance(&self) -> bool { - self.with_abundance != 0 + self.with_abundance } pub fn md5(&self) -> &str { @@ -145,11 +166,11 @@ impl From<&[PathBuf]> for Manifest { .iter() .map(|p| Record { internal_location: p.to_str().unwrap().into(), - ksize: 0, // FIXME - with_abundance: 0, // FIXME - md5: "".into(), // FIXME - name: "".into(), // FIXME - moltype: "".into(), // FIXME + ksize: 0, // FIXME + with_abundance: false, // FIXME + md5: "".into(), // FIXME + name: "".into(), // FIXME + moltype: "".into(), // FIXME }) .collect(), } diff --git a/src/sourmash/index/__init__.py b/src/sourmash/index/__init__.py index c990e4e521..ccfa27212c 100644 --- a/src/sourmash/index/__init__.py +++ b/src/sourmash/index/__init__.py @@ -601,6 +601,8 @@ def manifest(self): @manifest.setter def manifest(self, value): + if value is None: + return # FIXME: can't unset manifest in a Rust Linear Index self._methodcall(lib.linearindex_set_manifest, value._as_rust()._take_objptr()) def __bool__(self): From 6500ed9f294cc8e596f85ffad07210d6f548b133 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Sun, 24 Jul 2022 13:02:40 -0700 Subject: [PATCH 18/19] wip picklist --- src/sourmash/index/__init__.py | 18 ++++++++++++++---- src/sourmash/picklist.py | 18 ++++++++++++++++++ 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/src/sourmash/index/__init__.py b/src/sourmash/index/__init__.py index ccfa27212c..add890060d 100644 --- a/src/sourmash/index/__init__.py +++ b/src/sourmash/index/__init__.py @@ -87,6 +87,7 @@ def _selection_as_rust(selection: Selection): if v is not None: if key == "ksize": rustcall(lib.selection_set_ksize, ptr, v) + elif key == "moltype": hash_function = None if v.lower() == "dna": @@ -97,19 +98,28 @@ def _selection_as_rust(selection: Selection): hash_function = lib.HASH_FUNCTIONS_MURMUR64_DAYHOFF elif v.lower() == "hp": hash_function = lib.HASH_FUNCTIONS_MURMUR64_HP + rustcall(lib.selection_set_moltype, ptr, hash_function) + elif key == "num": - ... + raise NotImplementedError("num") + elif key == "scaled": - ... + raise NotImplementedError("scaled") + elif key == "containment": - ... + raise NotImplementedError("containment") + elif key == "abund": rustcall(lib.selection_set_abund, ptr, bool(v)) + elif key == "picklist": - ... + picklist_ptr = v._as_rust() + rustcall(lib.selection_set_picklist, ptr, picklist_ptr) + else: raise KeyError(f"Unsupported key {key} for Selection in rust") + return ptr class Index(ABC): diff --git a/src/sourmash/picklist.py b/src/sourmash/picklist.py index 3111164221..23e4bdf74b 100644 --- a/src/sourmash/picklist.py +++ b/src/sourmash/picklist.py @@ -256,6 +256,24 @@ def filter(self, it): if self.__contains__(ss): yield ss + def _as_rust(self): + from ._lowlevel import ffi, lib + from .utils import rustcall, decode_str + + ptr = lib.picklist_new() + + rustcall(lib.picklist_set_coltype, ptr, self.coltype) + rustcall(lib.picklist_set_pickfile, ptr, self.pickfile) + rustcall(lib.picklist_set_column_name, ptr, self.column_name) + rustcall(lib.picklist_set_pickstyle, ptr, self.pickstyle) + + #self.preprocess_fn = preprocess[coltype] + #self.pickset = None + #self.found = set() + #self.n_queries = 0 + + return ptr + def passes_all_picklists(ss, picklists): "does the signature 'ss' pass all of the picklists?" From 5b65c1d571e25302ae0f56c0780c402c7c31cc8e Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Sun, 24 Jul 2022 13:51:40 -0700 Subject: [PATCH 19/19] revert extraneous changes --- src/sourmash/index/__init__.py | 26 +++++++++++++------------- src/sourmash/sbt_storage.py | 3 +-- tox.ini | 2 -- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/src/sourmash/index/__init__.py b/src/sourmash/index/__init__.py index add890060d..9f9481cb7f 100644 --- a/src/sourmash/index/__init__.py +++ b/src/sourmash/index/__init__.py @@ -44,19 +44,19 @@ from typing import NamedTuple, Optional, TypedDict, TYPE_CHECKING import weakref -from ..search import (make_jaccard_search_query, - make_gather_query - calc_threshold_from_bp) -from ..manifest import CollectionManifest -from ..logging import debug_literal -from ..signature import load_signatures, save_signatures -from .._lowlevel import ffi, lib -from ..utils import RustObject, rustcall, decode_str, encode_str -from .. import SourmashSignature -from ..picklist import SignaturePicklist -from ..minhash import (flatten_and_downsample_scaled, - flatten_and_downsample_num, - flatten_and_intersect_scaled) +from sourmash.search import (make_jaccard_search_query, + make_containment_query, + calc_threshold_from_bp) +from sourmash.manifest import CollectionManifest +from sourmash.logging import debug_literal +from sourmash.signature import load_signatures, save_signatures +from sourmash._lowlevel import ffi, lib +from sourmash.utils import RustObject, rustcall, decode_str, encode_str +from sourmash import SourmashSignature +from sourmash.picklist import SignaturePicklist +from sourmash.minhash import (flatten_and_downsample_scaled, + flatten_and_downsample_num, + flatten_and_intersect_scaled) if TYPE_CHECKING: from typing_extensions import Unpack diff --git a/src/sourmash/sbt_storage.py b/src/sourmash/sbt_storage.py index 50544d6786..398e11c877 100644 --- a/src/sourmash/sbt_storage.py +++ b/src/sourmash/sbt_storage.py @@ -97,14 +97,13 @@ def load(self, path): class ZipStorage(RustObject, Storage): __dealloc_func__ = lib.zipstorage_free - __inner = None def __init__(self, path, *, mode="r"): - path = os.path.abspath(path) if mode == "w": self.__inner = _RwZipStorage(path) else: self.__inner = None + path = os.path.abspath(path) self._objptr = rustcall(lib.zipstorage_new, to_bytes(path), len(path)) @staticmethod diff --git a/tox.ini b/tox.ini index 6e03be0fd9..6a8d681ad8 100644 --- a/tox.ini +++ b/tox.ini @@ -44,10 +44,8 @@ commands = pytest \ --cov-config "{toxinidir}/tox.ini" \ --cov-report= \ --junitxml {toxworkdir}/junit.{envname}.xml \ - -n 4 \ {posargs:doc tests} - [testenv:pypy3] deps = pip >= 19.3.1