Skip to content

Commit

Permalink
avoid too much duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
luizirber committed Feb 14, 2022
1 parent febee1a commit f016123
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 187 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
def build_native(spec):
cmd = ["cargo", "build",
"--manifest-path", "src/core/Cargo.toml",
"--features", "parallel",
# "--features", "parallel",
"--lib"]

target = "debug"
Expand Down
283 changes: 97 additions & 186 deletions src/core/src/index/revindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};

use cfg_if::cfg_if;
use getset::{CopyGetters, Getters, Setters};
use log::{debug, info};
use nohash_hasher::BuildNoHashHasher;
Expand Down Expand Up @@ -163,107 +162,65 @@ impl RevIndex {
keep_sigs: bool,
) -> RevIndex {
// If threshold is zero, let's merge all queries and save time later
let merged_query = if let Some(qs) = queries {
if threshold == 0 {
let mut merged = qs[0].clone();
for query in &qs[1..] {
merged.merge(query).unwrap();
}
Some(merged)
} else {
None
}
} else {
None
};
let merged_query = queries.and_then(|qs| Self::merge_queries(qs, threshold));

let processed_sigs = AtomicUsize::new(0);

cfg_if! {
if #[cfg(feature = "parallel")] {
let (hash_to_color, colors) = search_sigs
.par_iter()
.enumerate()
.filter_map(|(dataset_id, filename)| {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
}
#[cfg(feature = "parallel")]
let sig_iter = search_sigs.par_iter();

RevIndex::map_hashes_colors(
dataset_id,
filename,
queries,
&merged_query,
threshold,
template,
)
})
.reduce(
|| {
(
HashToColor::new(),
Colors::default(),
)
},
HashToColor::reduce_hashes_colors,
);
} else {
let (hash_to_color, colors) = search_sigs
.iter()
.enumerate()
.filter_map(|(dataset_id, filename)| {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
}
RevIndex::map_hashes_colors(
dataset_id,
filename,
queries,
&merged_query,
threshold,
template,
)
})
.fold(
(
HashToColor::new(),
Colors::default(),
),
HashToColor::reduce_hashes_colors,
);
}
}
#[cfg(not(feature = "parallel"))]
let sig_iter = search_sigs.iter();

let filtered_sigs = sig_iter.enumerate().filter_map(|(dataset_id, filename)| {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
}

let search_sig = Signature::from_path(&filename)
.unwrap_or_else(|_| panic!("Error processing {:?}", filename))
.swap_remove(0);

RevIndex::map_hashes_colors(
dataset_id,
&search_sig,
queries,
&merged_query,
threshold,
template,
)
});

#[cfg(feature = "parallel")]
let (hash_to_color, colors) = filtered_sigs.reduce(
|| (HashToColor::new(), Colors::default()),
HashToColor::reduce_hashes_colors,
);

#[cfg(not(feature = "parallel"))]
let (hash_to_color, colors) = filtered_sigs.fold(
(HashToColor::new(), Colors::default()),
HashToColor::reduce_hashes_colors,
);

// TODO: build this together with hash_to_idx?
// TODO: do a parallel (par_iter) and sequential (iter) version
let ref_sigs = if keep_sigs {
cfg_if! {
if #[cfg(feature = "parallel")] {
Some(
search_sigs
.par_iter()
.map(|ref_path| {
Signature::from_path(&ref_path)
.unwrap_or_else(|_| panic!("Error processing {:?}", ref_path))
.swap_remove(0)
})
.collect(),
)
} else {
Some(
search_sigs
.iter()
.map(|ref_path| {
Signature::from_path(&ref_path)
.unwrap_or_else(|_| panic!("Error processing {:?}", ref_path))
.swap_remove(0)
})
.collect(),
)
}
}
#[cfg(feature = "parallel")]
let sigs_iter = search_sigs.par_iter();

#[cfg(not(feature = "parallel"))]
let sigs_iter = search_sigs.iter();

Some(
sigs_iter
.map(|ref_path| {
Signature::from_path(&ref_path)
.unwrap_or_else(|_| panic!("Error processing {:?}", ref_path))
.swap_remove(0)
})
.collect(),
)
} else {
None
};
Expand All @@ -278,85 +235,61 @@ impl RevIndex {
}
}

fn merge_queries(qs: &[KmerMinHash], threshold: usize) -> Option<KmerMinHash> {
if threshold == 0 {
let mut merged = qs[0].clone();
for query in &qs[1..] {
merged.merge(query).unwrap();
}
Some(merged)
} else {
None
}
}

pub fn new_with_sigs(
search_sigs: Vec<Signature>,
template: &Sketch,
threshold: usize,
queries: Option<&[KmerMinHash]>,
) -> RevIndex {
// If threshold is zero, let's merge all queries and save time later
let merged_query = if let Some(qs) = queries {
if threshold == 0 {
let mut merged = qs[0].clone();
for query in &qs[1..] {
merged.merge(query).unwrap();
}
Some(merged)
} else {
None
}
} else {
None
};
let merged_query = queries.and_then(|qs| Self::merge_queries(qs, threshold));

let processed_sigs = AtomicUsize::new(0);

cfg_if! {
if #[cfg(feature = "parallel")] {
let (hash_to_color, colors) = search_sigs
.par_iter()
.enumerate()
.filter_map(|(dataset_id, sig)| {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
}
#[cfg(feature = "parallel")]
let sigs_iter = search_sigs.par_iter();
#[cfg(not(feature = "parallel"))]
let sigs_iter = search_sigs.iter();

RevIndex::map_hashes_colors_sigs(
dataset_id,
sig,
queries,
&merged_query,
threshold,
template,
)
})
.reduce(
|| {
(
HashToColor::new(),
Colors::default(),
)
},
HashToColor::reduce_hashes_colors,
);
} else {
let (hash_to_color, colors) = search_sigs
.iter()
.enumerate()
.filter_map(|(dataset_id, filename)| {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
}
RevIndex::map_hashes_colors_sigs(
dataset_id,
filename,
queries,
&merged_query,
threshold,
template,
)
})
.fold(
(
HashToColor::new(),
Colors::default(),
),
HashToColor::reduce_hashes_colors,
);
}
}
let filtered_sigs = sigs_iter.enumerate().filter_map(|(dataset_id, sig)| {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
}

RevIndex::map_hashes_colors(
dataset_id,
sig,
queries,
&merged_query,
threshold,
template,
)
});

#[cfg(feature = "parallel")]
let (hash_to_color, colors) = filtered_sigs.reduce(
|| (HashToColor::new(), Colors::default()),
HashToColor::reduce_hashes_colors,
);

#[cfg(not(feature = "parallel"))]
let (hash_to_color, colors) = filtered_sigs.fold(
(HashToColor::new(), Colors::default()),
HashToColor::reduce_hashes_colors,
);

RevIndex {
hash_to_color,
Expand All @@ -368,7 +301,7 @@ impl RevIndex {
}
}

fn map_hashes_colors_sigs(
fn map_hashes_colors(
dataset_id: usize,
search_sig: &Signature,
queries: Option<&[KmerMinHash]>,
Expand Down Expand Up @@ -414,28 +347,6 @@ impl RevIndex {
}
}

fn map_hashes_colors(
dataset_id: usize,
filename: &Path,
queries: Option<&[KmerMinHash]>,
merged_query: &Option<KmerMinHash>,
threshold: usize,
template: &Sketch,
) -> Option<(HashToColor, Colors)> {
let search_sig = Signature::from_path(&filename)
.unwrap_or_else(|_| panic!("Error processing {:?}", filename))
.swap_remove(0);

RevIndex::map_hashes_colors_sigs(
dataset_id,
&search_sig,
queries,
merged_query,
threshold,
template,
)
}

pub fn search(
&self,
counter: SigCounter,
Expand Down

0 comments on commit f016123

Please sign in to comment.