From 9f36e2f8c0ef89879924d3c2d44eacdbf6ee795a Mon Sep 17 00:00:00 2001 From: Tessa Pierce Ward Date: Mon, 5 Feb 2024 10:57:56 -0800 Subject: [PATCH] MRG: core: add scaled selection to manifest; add helper functions for collection and sig/sketch usage (#2948) This PR adds: ## New functions: - `Collection::sig_from_record` > when we iterate through the sketches, we have both `idx` and `record` available. I thought it would make sense to just use record directly, rather than re-getting record from index. - `Signature::minhash` > if there is one minhash sketch available within the sig that matches selection params, return it - `Signature::get_sketch` > if there is one sketch (of any type) available within the sig that matches selection params, return it. Note that since this returns the sketch enum, it still requires checking MinHash type afterwards. @luizirber is there a way to return any of the sketches directly from the same function (like minhash function, above, but more flexible?). - `Manifest::From<&PathBuf>` > build a `manifest` directly from a pathlist file. Added and tested, but lmk if you think we should just build a list of paths separately. I wanted this for branchwater, but am not actually using it since neither the paths or PathBuf loading code allow missing/failed paths. ## New selection functionality - added `scaled` and `num` selection to manifest. For scaled, if sketch is compatible (equal scaled or can be downsampled), keep it during manifest selection. Otherwise, discard. Tests added for each new function/added code. Co-authored-by: Luiz Irber --- src/core/src/collection.rs | 219 +++++++++++++++++++++++++++++++++++++ src/core/src/manifest.rs | 113 ++++++++++++++++++- src/core/src/signature.rs | 141 ++++++++++++++++++++++-- 3 files changed, 465 insertions(+), 8 deletions(-) diff --git a/src/core/src/collection.rs b/src/core/src/collection.rs index c00b2fd288..8cc6129cf4 100644 --- a/src/core/src/collection.rs +++ b/src/core/src/collection.rs @@ -180,6 +180,14 @@ impl Collection { assert_eq!(sig.signatures.len(), 1); Ok(sig) } + + pub fn sig_from_record(&self, record: &Record) -> Result { + let match_path = record.internal_location().as_str(); + let selection = Selection::from_record(record)?; + let sig = self.storage.load_sig(match_path)?.select(&selection)?; + assert_eq!(sig.signatures.len(), 1); + Ok(sig) + } } impl Select for Collection { @@ -188,3 +196,214 @@ impl Select for Collection { Ok(self) } } + +#[cfg(test)] +mod test { + use camino::Utf8PathBuf as PathBuf; + use std::fs::File; + use std::io::BufReader; + + use super::Collection; + + use crate::encodings::HashFunctions; + use crate::prelude::Select; + use crate::selection::Selection; + use crate::signature::Signature; + + #[test] + fn sigstore_selection_with_downsample() { + // load test sigs + let mut filename = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + filename.push("../../tests/test-data/47+63-multisig.sig"); + let file = File::open(filename).unwrap(); + let reader = BufReader::new(file); + let sigs: Vec = serde_json::from_reader(reader).expect("Loading error"); + // create Selection object + let mut selection = Selection::default(); + selection.set_scaled(2000); + // load sigs into collection + select compatible signatures + let cl = Collection::from_sigs(sigs) + .unwrap() + .select(&selection) + .unwrap(); + // count collection length + assert_eq!(cl.len(), 6); + for (idx, _rec) in cl.iter() { + // need to pass select again here so we actually downsample + let this_sig = cl.sig_for_dataset(idx).unwrap().select(&selection).unwrap(); + let this_mh = this_sig.minhash().unwrap(); + assert_eq!(this_mh.scaled(), 2000); + } + } + + #[test] + fn sigstore_selection_with_downsample_too_low() { + // load test sigs + let mut filename = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + filename.push("../../tests/test-data/47+63-multisig.sig"); + let file = File::open(filename).unwrap(); + let reader = BufReader::new(file); + let sigs: Vec = serde_json::from_reader(reader).expect("Loading error"); + // create Selection object + let mut selection = Selection::default(); + selection.set_scaled(500); + // load sigs into collection + select compatible signatures + let cl = Collection::from_sigs(sigs) + .unwrap() + .select(&selection) + .unwrap(); + // no sigs should remain + assert_eq!(cl.len(), 0); + } + + #[test] + fn sigstore_selection_scaled_handle_num_sig() { + // load test sigs + let mut filename = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + // four num=500 sigs + filename.push("../../tests/test-data/genome-s11.fa.gz.sig"); + let file = File::open(filename).unwrap(); + let reader = BufReader::new(file); + let sigs: Vec = serde_json::from_reader(reader).expect("Loading error"); + assert_eq!(sigs.len(), 4); + // create Selection object + let mut selection = Selection::default(); + selection.set_scaled(1000); + // load sigs into collection + select compatible signatures + let cl = Collection::from_sigs(sigs) + .unwrap() + .select(&selection) + .unwrap(); + // no sigs should remain + assert_eq!(cl.len(), 0); + } + + #[test] + fn sigstore_selection_num() { + // load test sigs + let mut filename = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + // four num=500 sigs + filename.push("../../tests/test-data/genome-s11.fa.gz.sig"); + let file = File::open(filename).unwrap(); + let reader = BufReader::new(file); + let sigs: Vec = serde_json::from_reader(reader).expect("Loading error"); + let sigs_copy = sigs.clone(); + assert_eq!(sigs.len(), 4); + // create Selection object + let mut selection = Selection::default(); + selection.set_num(500); + // load sigs into collection + select compatible signatures + let cl = Collection::from_sigs(sigs) + .unwrap() + .select(&selection) + .unwrap(); + // all sigs should remain + assert_eq!(cl.len(), 4); + //now select diff num and none should remain + selection.set_num(100); + let cl2 = Collection::from_sigs(sigs_copy) + .unwrap() + .select(&selection) + .unwrap(); + assert_eq!(cl2.len(), 0); + } + + #[test] + fn sigstore_selection_num_handle_scaled_sig() { + // load test sigs + let mut filename = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + // four num=500 sigs + filename.push("../../tests/test-data/47+63-multisig.sig"); + let file = File::open(filename).unwrap(); + let reader = BufReader::new(file); + let sigs: Vec = serde_json::from_reader(reader).expect("Loading error"); + assert_eq!(sigs.len(), 6); + // create Selection object + let mut selection = Selection::default(); + selection.set_num(500); + // load sigs into collection + select compatible signatures + let cl = Collection::from_sigs(sigs) + .unwrap() + .select(&selection) + .unwrap(); + // no sigs should remain + assert_eq!(cl.len(), 0); + } + + #[test] + fn sigstore_sig_from_record() { + // load test sigs + let mut filename = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + filename.push("../../tests/test-data/47+63-multisig.sig"); + let file = File::open(filename).unwrap(); + let reader = BufReader::new(file); + let sigs: Vec = serde_json::from_reader(reader).expect("Loading error"); + // create Selection object + let mut selection = Selection::default(); + selection.set_scaled(2000); + // load sigs into collection + select compatible signatures + let cl = Collection::from_sigs(sigs) + .unwrap() + .select(&selection) + .unwrap(); + // no sigs should remain + assert_eq!(cl.len(), 6); + for (_idx, rec) in cl.iter() { + // need to pass select again here so we actually downsample + let this_sig = cl.sig_from_record(rec).unwrap().select(&selection).unwrap(); + let this_mh = this_sig.minhash().unwrap(); + assert_eq!(this_mh.scaled(), 2000); + } + } + + #[test] + fn sigstore_selection_moltype_zip() { + // load test sigs + let mut filename = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + filename.push("../../tests/test-data/prot/hp.zip"); + // create Selection object + let mut selection = Selection::default(); + selection.set_scaled(100); + selection.set_moltype(HashFunctions::Murmur64Hp); + // load sigs into collection + select compatible signatures + let cl = Collection::from_zipfile(&filename) + .unwrap() + .select(&selection) + .unwrap(); + // count collection length + assert_eq!(cl.len(), 2); + for (idx, _rec) in cl.iter() { + // need to pass select again here so we actually downsample + let this_sig = cl.sig_for_dataset(idx).unwrap().select(&selection).unwrap(); + let this_mh = this_sig.minhash().unwrap(); + assert_eq!(this_mh.scaled(), 100); + } + } + + #[test] + fn sigstore_selection_moltype_sig() { + // load test sigs + let mut filename = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + filename + .push("../../tests/test-data/prot/hp/GCA_001593925.1_ASM159392v1_protein.faa.gz.sig"); + let file = File::open(filename).unwrap(); + let reader = BufReader::new(file); + let sigs: Vec = serde_json::from_reader(reader).expect("Loading error"); + // create Selection object + let mut selection = Selection::default(); + selection.set_moltype(HashFunctions::Murmur64Hp); + // load sigs into collection + select compatible signatures + let cl = Collection::from_sigs(sigs) + .unwrap() + .select(&selection) + .unwrap(); + // count collection length + assert_eq!(cl.len(), 1); + for (idx, _rec) in cl.iter() { + // need to pass select again here so we actually downsample + let this_sig = cl.sig_for_dataset(idx).unwrap().select(&selection).unwrap(); + let this_mh = this_sig.minhash().unwrap(); + assert_eq!(this_mh.scaled(), 100); + } + } +} diff --git a/src/core/src/manifest.rs b/src/core/src/manifest.rs index 5bad8ec81b..a7ebfdfc96 100644 --- a/src/core/src/manifest.rs +++ b/src/core/src/manifest.rs @@ -1,5 +1,6 @@ use std::convert::TryInto; -use std::io::{Read, Write}; +use std::fs::File; +use std::io::{BufRead, BufReader, Read, Write}; use std::ops::Deref; use camino::Utf8PathBuf as PathBuf; @@ -200,6 +201,17 @@ impl Select for Manifest { } else { valid }; + valid = if let Some(scaled) = selection.scaled() { + // num sigs have row.scaled = 0, don't include them + valid && row.scaled != 0 && row.scaled <= scaled as u64 + } else { + valid + }; + valid = if let Some(num) = selection.num() { + valid && row.num == num + } else { + valid + }; valid }); @@ -270,6 +282,21 @@ impl From<&[PathBuf]> for Manifest { } } +impl From<&PathBuf> for Manifest { + fn from(pathlist: &PathBuf) -> Self { + let file = File::open(pathlist).unwrap_or_else(|_| panic!("Failed to open {:?}", pathlist)); + let reader = BufReader::new(file); + + let paths: Vec = reader + .lines() + .map(|line| line.unwrap_or_else(|_| panic!("Failed to read line from {:?}", pathlist))) + .map(PathBuf::from) + .collect(); + + paths.as_slice().into() + } +} + impl Deref for Manifest { type Target = Vec; @@ -277,3 +304,87 @@ impl Deref for Manifest { &self.records } } + +#[cfg(test)] +mod test { + use camino::Utf8PathBuf as PathBuf; + use std::fs::File; + use std::io::Write; + use tempfile::TempDir; + + use super::Manifest; + + #[test] + fn manifest_from_pathlist() { + let temp_dir = TempDir::new().unwrap(); + let utf8_output = PathBuf::from_path_buf(temp_dir.path().to_path_buf()) + .expect("Path should be valid UTF-8"); + let mut filename = utf8_output.join("sig-pathlist.txt"); + //convert to camino utf8pathbuf + filename = PathBuf::from(filename); + // build sig filenames + let base_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let test_sigs = vec![ + "../../tests/test-data/47.fa.sig", + "../../tests/test-data/63.fa.sig", + ]; + + let full_paths: Vec<_> = test_sigs + .into_iter() + .map(|sig| base_path.join(sig)) + .collect(); + + // write a file in test directory with a filename on each line + let mut pathfile = File::create(&filename).unwrap(); + for sigfile in &full_paths { + writeln!(pathfile, "{}", sigfile).unwrap(); + } + + // load into manifest + let manifest = Manifest::from(&filename); + assert_eq!(manifest.len(), 2); + } + + #[test] + #[should_panic(expected = "Failed to open \"no-exist\"")] + fn manifest_from_pathlist_nonexistent_file() { + let filename = PathBuf::from("no-exist"); + let _manifest = Manifest::from(&filename); + } + + #[test] + #[should_panic] + fn manifest_from_pathlist_badfile() { + let temp_dir = TempDir::new().unwrap(); + let utf8_output = PathBuf::from_path_buf(temp_dir.path().to_path_buf()) + .expect("Path should be valid UTF-8"); + let mut filename = utf8_output.join("sig-pathlist.txt"); + //convert to camino utf8pathbuf + filename = PathBuf::from(filename); + + let mut pathfile = File::create(&filename).unwrap(); + write!(pathfile, "Valid line\n").unwrap(); + pathfile.write_all(&[0xED, 0xA0, 0x80]).unwrap(); // invalid UTF-8 + + // load into manifest + let _manifest = Manifest::from(&filename); + } + + #[test] + #[should_panic] + fn manifest_from_paths_badpath() { + let base_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let test_sigs = vec![ + PathBuf::from("no-exist"), + PathBuf::from("../../tests/test-data/63.fa.sig"), + ]; + + let full_paths: Vec = test_sigs + .into_iter() + .map(|sig| base_path.join(sig)) + .collect(); + + // load into manifest + let _manifest = Manifest::from(&full_paths[..]); // pass full_paths as a slice + } +} diff --git a/src/core/src/signature.rs b/src/core/src/signature.rs index a75eb6c3f8..381d45c643 100644 --- a/src/core/src/signature.rs +++ b/src/core/src/signature.rs @@ -19,6 +19,7 @@ use typed_builder::TypedBuilder; use crate::encodings::{aa_to_dayhoff, aa_to_hp, revcomp, to_aa, HashFunctions, VALID}; use crate::prelude::*; use crate::selection::{Select, Selection}; +use crate::sketch::minhash::KmerMinHash; use crate::sketch::Sketch; use crate::Error; use crate::HashIntoType; @@ -534,6 +535,39 @@ impl Signature { None } + // return single corresponding sketch + pub fn get_sketch(&self) -> Option<&Sketch> { + if self.signatures.len() != 1 { + if self.signatures.len() > 1 { + todo!("Multiple sketches found! Please run select first."); + } + return None; + } + self.signatures.iter().find(|sk| { + matches!( + sk, + Sketch::MinHash(_) | Sketch::LargeMinHash(_) | Sketch::HyperLogLog(_) + ) + }) + } + + // return minhash directly + pub fn minhash(&self) -> Option<&KmerMinHash> { + if self.signatures.len() != 1 { + if self.signatures.len() > 1 { + todo!("Multiple sketches found! Please run select first."); + } + return None; + } + self.signatures.iter().find_map(|sk| { + if let Sketch::MinHash(mh) = sk { + Some(mh) + } else { + None + } + }) + } + pub fn from_path>(path: P) -> Result, Error> { let mut reader = io::BufReader::new(File::open(path)?); Signature::from_reader(&mut reader) @@ -772,13 +806,16 @@ impl Select for Signature { valid }; // keep compatible scaled if applicable - if let Some(sel_scaled) = selection.scaled() { - valid = if let Sketch::MinHash(mh) = s { - valid && mh.scaled() <= sel_scaled as u64 - } else { - valid - }; - } + valid = if let Some(sel_scaled) = selection.scaled() { + match s { + Sketch::MinHash(mh) => valid && mh.scaled() <= sel_scaled as u64, + // TODO: test LargeMinHash + // Sketch::LargeMinHash(lmh) => valid && lmh.scaled() <= sel_scaled as u64, + _ => valid, // other sketch types or invalid cases + } + } else { + valid // if selection.scaled() is None, keep prior valid + }; /* valid = if let Some(abund) = selection.abund() { valid && *s.with_abundance() == abund @@ -798,6 +835,7 @@ impl Select for Signature { // downsample the retained sketches if needed. if let Some(sel_scaled) = selection.scaled() { for sketch in self.signatures.iter_mut() { + // TODO: also account for LargeMinHash if let Sketch::MinHash(mh) = sketch { if (mh.scaled() as u32) < sel_scaled { *sketch = Sketch::MinHash(mh.downsample_scaled(sel_scaled as u64)?); @@ -1002,6 +1040,95 @@ mod test { } } + #[test] + fn load_minhash_from_signature() { + let mut filename = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + filename.push("../../tests/test-data/47.fa.sig"); + + let file = File::open(filename).unwrap(); + let reader = BufReader::new(file); + let sigs: Vec = serde_json::from_reader(reader).expect("Loading error"); + + assert_eq!(sigs.len(), 1); + + let sig = sigs.get(0).unwrap(); + let mh = sig.minhash().unwrap(); + assert_eq!(mh.scaled(), 1000); + } + + #[test] + fn load_single_sketch_from_signature() { + let mut filename = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + filename.push("../../tests/test-data/47.fa.sig"); + + let file = File::open(filename).unwrap(); + let reader = BufReader::new(file); + let sigs: Vec = serde_json::from_reader(reader).expect("Loading error"); + + assert_eq!(sigs.len(), 1); + + let sig = sigs.get(0).unwrap(); + let mhdirect = sig.minhash().unwrap(); + let sketch = sig.get_sketch().unwrap(); + if let Sketch::MinHash(mh) = sketch { + assert_eq!(mh.scaled(), 1000); + assert_eq!(mhdirect, mh); // should be the same + } else { + // error + assert!(false); + } + } + + #[test] + #[should_panic] + fn get_sketch_multisketch_panic() { + let mut filename = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + filename.push("../../tests/test-data/47.fa.sig"); + + let file = File::open(filename).unwrap(); + let reader = BufReader::new(file); + let sigs: Vec = serde_json::from_reader(reader).expect("Loading error"); + + assert_eq!(sigs.len(), 1); + + let sig = sigs.get(0).unwrap(); + let mut mhdirect = sig.minhash().unwrap().clone(); + // change slightly and push into new_sig + mhdirect.add_sequence(b"ATGGA", false).unwrap(); + let new_sketch = Sketch::MinHash(mhdirect.clone()); + let mut new_sig = sig.clone(); + new_sig.push(new_sketch); + // check there are now two sketches in new_sig + assert_eq!(new_sig.signatures.len(), 2); + + let _ = new_sig.get_sketch(); + } + + #[test] + #[should_panic] + fn load_minhash_multisketch_panic() { + let mut filename = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + filename.push("../../tests/test-data/47.fa.sig"); + + let file = File::open(filename).unwrap(); + let reader = BufReader::new(file); + let sigs: Vec = serde_json::from_reader(reader).expect("Loading error"); + + assert_eq!(sigs.len(), 1); + + let sig = sigs.get(0).unwrap(); + let mut mhdirect = sig.minhash().unwrap().clone(); + // change slightly and push into new_sig + mhdirect.add_sequence(b"ATGGA", false).unwrap(); + let new_sketch = Sketch::MinHash(mhdirect.clone()); + let mut new_sig = sig.clone(); + new_sig.push(new_sketch); + // check there are now two sketches in new_sig + assert_eq!(new_sig.signatures.len(), 2); + + let _ = new_sig.minhash(); + } + #[test] fn selection_with_downsample() { let mut filename = PathBuf::from(env!("CARGO_MANIFEST_DIR"));