Skip to content

Commit

Permalink
fix fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
ctb committed Aug 20, 2024
1 parent b1edd7a commit 5f9d00b
Showing 1 changed file with 90 additions and 100 deletions.
190 changes: 90 additions & 100 deletions src/fastmultigather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,114 +69,95 @@ pub fn fastmultigather(
let skipped_paths = AtomicUsize::new(0);
let failed_paths = AtomicUsize::new(0);

#[rustfmt::skip]
query_collection
.par_iter()
.for_each(|(coll, _idx, record)| {
// increment counter of # of queries. q: could we instead use the _idx from par_iter(), or will it vary based on thread?
let _i = processed_queries.fetch_add(1, atomic::Ordering::SeqCst);
// Load query sig (downsampling happens here)
match coll.sig_from_record(record) {
Ok(query_sig) => {
let name = query_sig.name();
let prefix = name.split(' ').next().unwrap_or_default().to_string();
let location = PathBuf::new(&prefix).file_name().unwrap();
if let Some(query_mh) = query_sig.minhash() {
let mut matching_hashes =
if save_matches { Some(Vec::new()) } else { None };
let matchlist: BinaryHeap<PrefetchResult> = against
.iter()
.filter_map(|against| {
let mut mm: Option<PrefetchResult> = None;
if let Ok(overlap) = against.minhash.count_common(query_mh, false) {
if overlap >= threshold_hashes {
if save_matches {
if let Ok(intersection) =
against.minhash.intersection(query_mh)
{
matching_hashes
.as_mut()
.unwrap()
.extend(intersection.0);
}
query_collection.par_iter().for_each(|(c, _idx, record)| {
// increment counter of # of queries. q: could we instead use the _idx from par_iter(), or will it vary based on thread?
let _i = processed_queries.fetch_add(1, atomic::Ordering::SeqCst);
// Load query sig (downsampling happens here)
match c.sig_from_record(record) {
Ok(query_sig) => {
let name = query_sig.name();
let prefix = name.split(' ').next().unwrap_or_default().to_string();
let location = PathBuf::new(&prefix).file_name().unwrap();
if let Some(query_mh) = query_sig.minhash() {
let mut matching_hashes = if save_matches { Some(Vec::new()) } else { None };
let matchlist: BinaryHeap<PrefetchResult> = against
.iter()
.filter_map(|against| {
let mut mm: Option<PrefetchResult> = None;
if let Ok(overlap) = against.minhash.count_common(query_mh, false) {
if overlap >= threshold_hashes {
if save_matches {
if let Ok(intersection) =
against.minhash.intersection(query_mh)
{
matching_hashes
.as_mut()
.unwrap()
.extend(intersection.0);
}
let result = PrefetchResult {
name: against.name.clone(),
md5sum: against.md5sum.clone(),
minhash: against.minhash.clone(),
location: against.location.clone(),
overlap,
};
mm = Some(result);
}
let result = PrefetchResult {
name: against.name.clone(),
md5sum: against.md5sum.clone(),
minhash: against.minhash.clone(),
location: against.location.clone(),
overlap,
};
mm = Some(result);
}
mm
})
.collect();
if !matchlist.is_empty() {
let prefetch_output = format!("{}.prefetch.csv", location);
let gather_output = format!("{}.gather.csv", location);

// Save initial list of matches to prefetch output
write_prefetch(&query_sig, Some(prefetch_output), &matchlist).ok();

// Now, do the gather!
consume_query_by_gather(
query_sig.clone(),
scaled as u64,
matchlist,
threshold_hashes,
Some(gather_output),
)
.ok();

// Save matching hashes to .sig file if save_matches is true
if save_matches {
if let Some(hashes) = matching_hashes {
let sig_filename = format!("{}.matches.sig", name);
if let Ok(mut file) = File::create(&sig_filename) {
let unique_hashes: HashSet<u64> =
hashes.into_iter().collect();
let mut new_mh = KmerMinHash::new(
query_mh.scaled().try_into().unwrap(),
query_mh.ksize().try_into().unwrap(),
query_mh.hash_function().clone(),
query_mh.seed(),
false,
query_mh.num(),
);
new_mh
.add_many(
&unique_hashes.into_iter().collect::<Vec<_>>(),
)
.ok();
let mut signature = Signature::default();
signature.push(Sketch::MinHash(new_mh));
signature.set_filename(&name);
if let Err(e) = signature.to_writer(&mut file) {
eprintln!("Error writing signature file: {}", e);
}
} else {
eprintln!(
"Error creating signature file: {}",
sig_filename
);
}
mm
})
.collect();
if !matchlist.is_empty() {
let prefetch_output = format!("{}.prefetch.csv", location);
let gather_output = format!("{}.gather.csv", location);

// Save initial list of matches to prefetch output
write_prefetch(&query_sig, Some(prefetch_output), &matchlist).ok();

// Now, do the gather!
consume_query_by_gather(
query_sig.clone(),
scaled as u64,
matchlist,
threshold_hashes,
Some(gather_output),
)
.ok();

// Save matching hashes to .sig file if save_matches is true
if save_matches {
if let Some(hashes) = matching_hashes {
let sig_filename = format!("{}.matches.sig", name);
if let Ok(mut file) = File::create(&sig_filename) {
let unique_hashes: HashSet<u64> = hashes.into_iter().collect();
let mut new_mh = KmerMinHash::new(
query_mh.scaled().try_into().unwrap(),
query_mh.ksize().try_into().unwrap(),
query_mh.hash_function().clone(),
query_mh.seed(),
false,
query_mh.num(),
);
new_mh
.add_many(&unique_hashes.into_iter().collect::<Vec<_>>())
.ok();
let mut signature = Signature::default();
signature.push(Sketch::MinHash(new_mh));
signature.set_filename(&name);
if let Err(e) = signature.to_writer(&mut file) {
eprintln!("Error writing signature file: {}", e);
}
} else {
eprintln!("Error creating signature file: {}", sig_filename);
}
}
} else {
println!("No matches to '{}'", location);
}
} else {
// different warning here? Could not load sig from record??
eprintln!(
"WARNING: no compatible sketches in path '{}'",
record.internal_location()
);
let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst);
println!("No matches to '{}'", location);
}
}
Err(_) => {
} else {
// different warning here? Could not load sig from record??
eprintln!(
"WARNING: no compatible sketches in path '{}'",
Expand All @@ -185,7 +166,16 @@ pub fn fastmultigather(
let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst);
}
}
});
Err(_) => {
// different warning here? Could not load sig from record??
eprintln!(
"WARNING: no compatible sketches in path '{}'",
record.internal_location()
);
let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst);
}
}
});

println!(
"DONE. Processed {} queries total.",
Expand Down

0 comments on commit 5f9d00b

Please sign in to comment.