Skip to content

Commit

Permalink
make parallel/sequential more maintainable
Browse files Browse the repository at this point in the history
  • Loading branch information
luizirber committed Dec 15, 2020
1 parent 7228937 commit c52ab78
Showing 1 changed file with 154 additions and 160 deletions.
314 changes: 154 additions & 160 deletions src/core/src/index/greyhound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};

use cfg_if::cfg_if;
use getset::{CopyGetters, Getters, Setters};
use log::info;
use nohash_hasher::BuildNoHashHasher;
Expand Down Expand Up @@ -74,17 +75,33 @@ impl RevIndex {
RevIndex::hash_to_color(&search_sigs, queries, merged_query, threshold, template);

// TODO: build this together with hash_to_idx?
// TODO: do a parallel (par_iter) and sequential (iter) version
let ref_sigs = if keep_sigs {
Some(
search_sigs
.par_iter()
.map(|ref_path| {
Signature::from_path(&ref_path)
.unwrap_or_else(|_| panic!("Error processing {:?}", ref_path))
.swap_remove(0)
})
.collect(),
)
cfg_if! {
if #[cfg(feature = "parallel")] {
Some(
search_sigs
.par_iter()
.map(|ref_path| {
Signature::from_path(&ref_path)
.unwrap_or_else(|_| panic!("Error processing {:?}", ref_path))
.swap_remove(0)
})
.collect(),
)
} else {
Some(
search_sigs
.iter()
.map(|ref_path| {
Signature::from_path(&ref_path)
.unwrap_or_else(|_| panic!("Error processing {:?}", ref_path))
.swap_remove(0)
})
.collect(),
)
}
}
} else {
None
};
Expand All @@ -108,7 +125,7 @@ impl RevIndex {
) -> (HashToColor, Colors) {
let processed_sigs = AtomicUsize::new(0);

let (hashes, mut colors) = search_sigs
search_sigs
.par_iter()
.enumerate()
.filter_map(|(dataset_id, filename)| {
Expand All @@ -117,52 +134,14 @@ impl RevIndex {
info!("Processed {} reference sigs", i);
}

let mut search_mh = None;
let search_sig = Signature::from_path(&filename)
.unwrap_or_else(|_| panic!("Error processing {:?}", filename))
.swap_remove(0);
if let Some(sketch) = search_sig.select_sketch(&template) {
if let Sketch::MinHash(mh) = sketch {
search_mh = Some(mh);
}
}
let search_mh = search_mh.unwrap();

let mut hash_to_color = HashToColor::with_hasher(BuildNoHashHasher::default());
let mut colors = Colors::default();
let color = colors.update(None, &[dataset_id as Idx]).unwrap();

let mut add_to = |matched_hashes: Vec<u64>, intersection| {
if !matched_hashes.is_empty() || intersection > threshold as u64 {
matched_hashes.into_iter().for_each(|hash| {
hash_to_color.insert(hash, color);
});
}
};

if let Some(qs) = queries {
if let Some(ref merged) = merged_query {
let (matched_hashes, intersection) =
merged.intersection(search_mh).unwrap();
add_to(matched_hashes, intersection);
} else {
for query in qs {
let (matched_hashes, intersection) =
query.intersection(search_mh).unwrap();
add_to(matched_hashes, intersection);
}
}
} else {
let matched = search_mh.mins();
let size = matched.len() as u64;
add_to(matched, size);
};

if hash_to_color.is_empty() {
None
} else {
Some((hash_to_color, colors))
}
RevIndex::map_hashes_colors(
dataset_id,
filename,
queries,
&merged_query,
threshold,
template,
)
})
.reduce(
|| {
Expand All @@ -171,122 +150,137 @@ impl RevIndex {
Colors::default(),
)
},
|a, b| {
let ((small_hashes, small_colors), (mut large_hashes, mut large_colors)) =
if a.0.len() > b.0.len() {
(b, a)
} else {
(a, b)
};

small_hashes.into_iter().for_each(|(hash, color)| {
let ids: Vec<_> = small_colors.indices(&color).cloned().collect();

let entry = large_hashes.entry(hash).or_insert_with(|| {
// In this case, the hash was not present yet.
// we need to create the same color from small_colors
// into large_colors.
let new_color = large_colors.update(None, ids.as_slice()).unwrap();
assert_eq!(new_color, color);
new_color
});

if *entry != color {
let new_color =
large_colors.update(Some(*entry), ids.as_slice()).unwrap();
*entry = new_color;
}
});

// Doing this outside reduce (at the end) uses more memory (since it
// accumulates unused colors), but doesn't iterate over all
// hashes/colors so frequently. For now keeping it here to
// save memory
let used_colors: HashSet<_> = large_hashes.values().collect();
large_colors.retain(|color, _| used_colors.contains(color));

(large_hashes, large_colors)
},
);

(hashes, colors)
RevIndex::reduce_hashes_colors,
)
}

#[cfg(not(feature = "parallel"))]
fn hash_to_color(search_sigs: &[PathBuf], threshold: usize) -> HashToColor {
fn hash_to_color(
search_sigs: &[PathBuf],
queries: Option<&[KmerMinHash]>,
merged_query: Option<KmerMinHash>,
threshold: usize,
template: &Sketch,
) -> (HashToColor, Colors) {
let processed_sigs = AtomicUsize::new(0);

let hash_to_color = search_sigs
.par_iter()
search_sigs
.iter()
.enumerate()
.filter_map(|(dataset_id, filename)| {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
}
RevIndex::map_hashes_colors(
dataset_id,
filename,
queries,
&merged_query,
threshold,
template,
)
})
.fold(
(
HashToColor::with_hasher(BuildNoHashHasher::default()),
Colors::default(),
),
RevIndex::reduce_hashes_colors,
)
}

let mut search_mh = None;
let search_sig = Signature::from_path(&filename)
.unwrap_or_else(|_| panic!("Error processing {:?}", filename))
.swap_remove(0);
if let Some(sketch) = search_sig.select_sketch(&template) {
if let Sketch::MinHash(mh) = sketch {
search_mh = Some(mh);
}
}
let search_mh = search_mh.unwrap();

let mut hash_to_idx = HashToIdx::with_hasher(BuildNoHashHasher::default());
let mut add_to = |matched_hashes: Vec<u64>, intersection| {
if !matched_hashes.is_empty() || intersection > threshold as u64 {
matched_hashes.into_iter().for_each(|hash| {
let mut dataset_ids = HashSet::new();
dataset_ids.insert(dataset_id);
hash_to_idx.insert(hash, dataset_ids);
});
}
};

if let Some(qs) = queries {
if let Some(ref merged) = merged_query {
let (matched_hashes, intersection) =
merged.intersection(search_mh).unwrap();
add_to(matched_hashes, intersection);
} else {
for query in qs {
let (matched_hashes, intersection) =
query.intersection(search_mh).unwrap();
add_to(matched_hashes, intersection);
}
}
} else {
let matched = search_mh.mins();
let size = matched.len() as u64;
add_to(matched, size);
};

if hash_to_idx.is_empty() {
None
} else {
Some(hash_to_idx)
fn map_hashes_colors(
dataset_id: usize,
filename: &PathBuf,
queries: Option<&[KmerMinHash]>,
merged_query: &Option<KmerMinHash>,
threshold: usize,
template: &Sketch,
) -> Option<(HashToColor, Colors)> {
let mut search_mh = None;
let search_sig = Signature::from_path(&filename)
.unwrap_or_else(|_| panic!("Error processing {:?}", filename))
.swap_remove(0);
if let Some(sketch) = search_sig.select_sketch(&template) {
if let Sketch::MinHash(mh) = sketch {
search_mh = Some(mh);
}
}
let search_mh = search_mh.unwrap();

let mut hash_to_color = HashToColor::with_hasher(BuildNoHashHasher::default());
let mut colors = Colors::default();
let color = colors.update(None, &[dataset_id as Idx]).unwrap();

let mut add_to = |matched_hashes: Vec<u64>, intersection| {
if !matched_hashes.is_empty() || intersection > threshold as u64 {
matched_hashes.into_iter().for_each(|hash| {
hash_to_color.insert(hash, color);
});
}
};

if let Some(qs) = queries {
if let Some(ref merged) = merged_query {
let (matched_hashes, intersection) = merged.intersection(search_mh).unwrap();
add_to(matched_hashes, intersection);
} else {
for query in qs {
let (matched_hashes, intersection) = query.intersection(search_mh).unwrap();
add_to(matched_hashes, intersection);
}
})
.reduce(
|| HashToIdx::with_hasher(BuildNoHashHasher::default()),
|a, b| {
let (small, mut large) = if a.len() > b.len() { (b, a) } else { (a, b) };

small.into_iter().for_each(|(hash, ids)| {
let entry = large.entry(hash).or_insert_with(HashSet::new);
for id in ids {
entry.insert(id);
}
});

large
},
);
hash_to_color
}
} else {
let matched = search_mh.mins();
let size = matched.len() as u64;
add_to(matched, size);
};

if hash_to_color.is_empty() {
None
} else {
Some((hash_to_color, colors))
}
}

fn reduce_hashes_colors(
a: (HashToColor, Colors),
b: (HashToColor, Colors),
) -> (HashToColor, Colors) {
let ((small_hashes, small_colors), (mut large_hashes, mut large_colors)) =
if a.0.len() > b.0.len() {
(b, a)
} else {
(a, b)
};

small_hashes.into_iter().for_each(|(hash, color)| {
let ids: Vec<_> = small_colors.indices(&color).cloned().collect();

let entry = large_hashes.entry(hash).or_insert_with(|| {
// In this case, the hash was not present yet.
// we need to create the same color from small_colors
// into large_colors.
let new_color = large_colors.update(None, ids.as_slice()).unwrap();
assert_eq!(new_color, color);
new_color
});

if *entry != color {
let new_color = large_colors.update(Some(*entry), ids.as_slice()).unwrap();
*entry = new_color;
}
});

// Doing this outside reduce (at the end) uses more memory (since it
// accumulates unused colors), but doesn't iterate over all
// hashes/colors so frequently. For now keeping it here to
// save memory
let used_colors: HashSet<_> = large_hashes.values().collect();
large_colors.retain(|color, _| used_colors.contains(color));

(large_hashes, large_colors)
}

pub fn search(
Expand Down

0 comments on commit c52ab78

Please sign in to comment.