Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MRG: add error-exit in fastmultigather against rocksdb for failed gather #392

Merged
merged 2 commits into from
Jul 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/mastiff_manygather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub fn mastiff_manygather(
let processed_sigs = AtomicUsize::new(0);
let skipped_paths = AtomicUsize::new(0);
let failed_paths = AtomicUsize::new(0);
let failed_gathers = AtomicUsize::new(0);

let send = query_collection
.par_iter()
Expand Down Expand Up @@ -122,6 +123,7 @@ pub fn mastiff_manygather(
}
} else {
eprintln!("Error gathering matches: {:?}", matches.err());
let _ = failed_gathers.fetch_add(1, atomic::Ordering::SeqCst);
}
} else {
eprintln!(
Expand All @@ -147,13 +149,17 @@ pub fn mastiff_manygather(
.flatten()
.try_for_each_with(send, |s, m| s.send(m));

let mut do_fail = false;

// do some cleanup and error handling -
if let Err(e) = send {
eprintln!("Unable to send internal data: {:?}", e);
do_fail = true;
}

if let Err(e) = thrd.join() {
eprintln!("Unable to join internal thread: {:?}", e);
do_fail = true;
}

// done!
Expand All @@ -162,6 +168,7 @@ pub fn mastiff_manygather(

let skipped_paths = skipped_paths.load(atomic::Ordering::SeqCst);
let failed_paths = failed_paths.load(atomic::Ordering::SeqCst);
let failed_gathers = failed_gathers.load(atomic::Ordering::SeqCst);

if skipped_paths > 0 {
eprintln!(
Expand All @@ -175,6 +182,17 @@ pub fn mastiff_manygather(
failed_paths
);
}
if failed_gathers > 0 {
eprintln!(
"ERROR: {} failed gathers. See error messages above.",
failed_gathers
);
do_fail = true;
}

if do_fail {
bail!("Unresolvable errors found; results cannot be trusted. Quitting.");
}

Ok(())
}
48 changes: 48 additions & 0 deletions src/python/tests/test_multigather.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import pytest
import pandas
import shutil

import sourmash
from . import sourmash_tst_utils as utils
Expand Down Expand Up @@ -1142,3 +1143,50 @@ def test_nonindexed_full_vs_sourmash_gather(runtmp):
fmg_total_weighted_hashes= set(gather_df['total_weighted_hashes'])
g_total_weighted_hashes = set(sourmash_gather_df['total_weighted_hashes'])
assert fmg_total_weighted_hashes == g_total_weighted_hashes == set([73489])


def test_rocksdb_no_sigs(runtmp, capfd):
# make sure fastmultigather error-exits if a gather fails.
query = get_test_data('SRR606249.sig.gz')

sig2 = get_test_data('2.fa.sig.gz')
sig47 = get_test_data('47.fa.sig.gz')
sig63 = get_test_data('63.fa.sig.gz')
shutil.copyfile(sig2, runtmp.output('2.fa.sig.gz'))
shutil.copyfile(sig47, runtmp.output('47.fa.sig.gz'))
shutil.copyfile(sig63, runtmp.output('63.fa.sig.gz'))

query_list = runtmp.output('query.txt')
make_file_list(query_list, [query])
against_list = runtmp.output('against.txt')
make_file_list(against_list, ["2.fa.sig.gz",
"47.fa.sig.gz",
"63.fa.sig.gz"])

# index!
runtmp.sourmash('scripts', 'index', against_list,
'-o', 'subdir/against.rocksdb')

# remove the external storage out from under the rocksdb.
# this will make gather fail.
os.unlink(runtmp.output('2.fa.sig.gz'))
os.unlink(runtmp.output('47.fa.sig.gz'))
os.unlink(runtmp.output('63.fa.sig.gz'))

g_output = runtmp.output('zzz.csv')

with pytest.raises(utils.SourmashCommandFailed):
runtmp.sourmash('scripts', 'fastmultigather', query_list,
'subdir/against.rocksdb', '-s', '100000', '-t', '0',
'-o', g_output,
in_location=runtmp.output(''))

print(runtmp.last_result.out)
print(runtmp.last_result.err)

captured = capfd.readouterr()
print(captured.err)

assert "Error gathering matches:" in captured.err
assert "ERROR: 1 failed gathers. See error messages above." in captured.err
assert "Unresolvable errors found; results cannot be trusted. Quitting." in captured.err