Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MRG: fix gather memory usage issue by not accumulating GatherResult #2962

Merged
merged 6 commits into from
Jan 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 107 additions & 72 deletions src/sourmash/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os.path
import sys
import shutil
import io

import screed
from .compare import (compare_all_pairs, compare_serial_containment,
Expand Down Expand Up @@ -829,7 +830,7 @@
## ok! now do gather -
notify("Doing gather to generate minimum metagenome cover.")

found = []
found = 0

Check warning on line 833 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L833

Added line #L833 was not covered by tests
weighted_missed = 1
is_abundance = query.minhash.track_abundance and not args.ignore_abundance
orig_query_mh = query.minhash
Expand All @@ -845,39 +846,71 @@
screen_width = _get_screen_width()
sum_f_uniq_found = 0.
result = None
for result in gather_iter:
sum_f_uniq_found += result.f_unique_to_query

if not len(found): # first result? print header.
if is_abundance:
print_results("")
print_results("overlap p_query p_match avg_abund")
print_results("--------- ------- ------- ---------")
else:
print_results("")
print_results("overlap p_query p_match")
print_results("--------- ------- -------")
### open output handles as needed for (1) saving CSV (2) saving matches

# save matching signatures?
if args.save_matches:
notify(f"saving all matches to '{args.save_matches}'")
save_sig_obj = SaveSignaturesToLocation(args.save_matches)
save_sig = save_sig_obj.__enter__()

Check warning on line 856 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L854-L856

Added lines #L854 - L856 were not covered by tests
else:
save_sig_obj = None
save_sig = None

Check warning on line 859 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L858-L859

Added lines #L858 - L859 were not covered by tests

# save CSV?
csv_outfp = io.StringIO()
csv_writer = None

Check warning on line 863 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L862-L863

Added lines #L862 - L863 were not covered by tests

# print interim result & save in `found` list for later use
pct_query = '{:.1f}%'.format(result.f_unique_weighted*100)
pct_genome = '{:.1f}%'.format(result.f_match*100)
try:

Check warning on line 865 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L865

Added line #L865 was not covered by tests
for result in gather_iter:
found += 1
sum_f_uniq_found += result.f_unique_to_query

Check warning on line 868 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L867-L868

Added lines #L867 - L868 were not covered by tests

if is_abundance:
name = result.match._display_name(screen_width - 41)
average_abund ='{:.1f}'.format(result.average_abund)
print_results('{:9} {:>7} {:>7} {:>9} {}',
format_bp(result.intersect_bp), pct_query, pct_genome,
average_abund, name)
else:
name = result.match._display_name(screen_width - 31)
print_results('{:9} {:>7} {:>7} {}',
format_bp(result.intersect_bp), pct_query, pct_genome,
name)
found.append(result)
if found == 1: # first result? print header.
if is_abundance:
print_results("")
print_results("overlap p_query p_match avg_abund")
print_results("--------- ------- ------- ---------")

Check warning on line 874 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L872-L874

Added lines #L872 - L874 were not covered by tests
else:
print_results("")
print_results("overlap p_query p_match")
print_results("--------- ------- -------")

Check warning on line 878 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L876-L878

Added lines #L876 - L878 were not covered by tests

if args.num_results and len(found) >= args.num_results:
break

# print interim result & save in `found` list for later use
pct_query = '{:.1f}%'.format(result.f_unique_weighted*100)
pct_genome = '{:.1f}%'.format(result.f_match*100)

Check warning on line 883 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L882-L883

Added lines #L882 - L883 were not covered by tests

if is_abundance:
name = result.match._display_name(screen_width - 41)
average_abund ='{:.1f}'.format(result.average_abund)
print_results('{:9} {:>7} {:>7} {:>9} {}',

Check warning on line 888 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L886-L888

Added lines #L886 - L888 were not covered by tests
format_bp(result.intersect_bp), pct_query, pct_genome,
average_abund, name)
else:
name = result.match._display_name(screen_width - 31)
print_results('{:9} {:>7} {:>7} {}',

Check warning on line 893 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L892-L893

Added lines #L892 - L893 were not covered by tests
format_bp(result.intersect_bp), pct_query, pct_genome,
name)

# write out CSV
if args.output:
if csv_writer is None:
csv_writer = result.init_dictwriter(csv_outfp)
result.write(csv_writer)

Check warning on line 901 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L900-L901

Added lines #L900 - L901 were not covered by tests

# save matches?
if save_sig is not None:
save_sig.add(result.match)

Check warning on line 905 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L905

Added line #L905 was not covered by tests

if args.num_results and found >= args.num_results:
break

Check warning on line 908 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L908

Added line #L908 was not covered by tests
finally:
if save_sig_obj:
save_sig_obj.close()
save_sig_obj = None
save_sig = None

Check warning on line 913 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L911-L913

Added lines #L911 - L913 were not covered by tests

# report on thresholding -
if gather_iter.query:
Expand All @@ -886,8 +919,8 @@

# basic reporting:
if found:
print_results(f'\nfound {len(found)} matches total;')
if len(found) == args.num_results:
print_results(f'\nfound {found} matches total;')

Check warning on line 922 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L922

Added line #L922 was not covered by tests
if found == args.num_results:
print_results(f'(truncated gather because --num-results={args.num_results})')
else:
display_bp = format_bp(args.threshold_bp)
Expand All @@ -908,18 +941,7 @@
# save CSV?
if (found and args.output) or args.create_empty_results:
with FileOutputCSV(args.output) as fp:
w = None
for result in found:
if w is None:
w = result.init_dictwriter(fp)
result.write(w)

# save matching signatures?
if found and args.save_matches:
notify(f"saving all matches to '{args.save_matches}'")
with SaveSignaturesToLocation(args.save_matches) as save_sig:
for sr in found:
save_sig.add(sr.match)
fp.write(csv_outfp.getvalue())

Check warning on line 944 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L944

Added line #L944 was not covered by tests

# save unassigned hashes?
if args.output_unassigned:
Expand Down Expand Up @@ -1027,7 +1049,7 @@
noident_mh.remove_many(union_found)
ident_mh.add_many(union_found)

found = []
found = 0

Check warning on line 1052 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L1052

Added line #L1052 was not covered by tests
weighted_missed = 1
is_abundance = query.minhash.track_abundance and not args.ignore_abundance
orig_query_mh = query.minhash
Expand All @@ -1040,9 +1062,32 @@
screen_width = _get_screen_width()
sum_f_uniq_found = 0.
result = None

query_filename = query.filename

Check warning on line 1066 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L1066

Added line #L1066 was not covered by tests
if not query_filename:
# use md5sum if query.filename not properly set
query_filename = query.md5sum()

Check warning on line 1069 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L1069

Added line #L1069 was not covered by tests

output_base = os.path.basename(query_filename)

Check warning on line 1071 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L1071

Added line #L1071 was not covered by tests
if args.output_dir:
output_base = os.path.join(args.output_dir, output_base)
output_csv = output_base + '.csv'

Check warning on line 1074 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L1073-L1074

Added lines #L1073 - L1074 were not covered by tests

output_matches = output_base + '.matches.sig'
save_sig_obj = SaveSignaturesToLocation(output_matches)
save_sig = save_sig_obj.__enter__()
notify(f"saving all matching signatures to '{output_matches}'")

Check warning on line 1079 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L1076-L1079

Added lines #L1076 - L1079 were not covered by tests

# track matches
notify(f'saving all CSV matches to "{output_csv}"')
csv_out_obj = FileOutputCSV(output_csv)
csv_outfp = csv_out_obj.__enter__()
csv_writer = None

Check warning on line 1085 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L1082-L1085

Added lines #L1082 - L1085 were not covered by tests

for result in gather_iter:
found += 1

Check warning on line 1088 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L1088

Added line #L1088 was not covered by tests
sum_f_uniq_found += result.f_unique_to_query
if not len(found): # first result? print header.
if found == 1: # first result? print header.
if is_abundance:
print_results("")
print_results("overlap p_query p_match avg_abund")
Expand All @@ -1068,7 +1113,13 @@
print_results('{:9} {:>7} {:>7} {}',
format_bp(result.intersect_bp), pct_query, pct_genome,
name)
found.append(result)

## @CTB
if csv_writer is None:
csv_writer = result.init_dictwriter(csv_outfp)
result.write(csv_writer)

Check warning on line 1120 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L1119-L1120

Added lines #L1119 - L1120 were not covered by tests

save_sig.add(result.match)

Check warning on line 1122 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L1122

Added line #L1122 was not covered by tests

# check for size estimation accuracy, which impacts ANI estimation
if not size_may_be_inaccurate and result.size_may_be_inaccurate:
Expand All @@ -1080,7 +1131,14 @@
notify(f'found less than {format_bp(args.threshold_bp)} in common. => exiting')

# basic reporting
print_results('\nfound {} matches total;', len(found))
print_results('\nfound {} matches total;', found)

Check warning on line 1134 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L1134

Added line #L1134 was not covered by tests

# close saving etc.
save_sig_obj.close()
save_sig_obj = save_sig = None

Check warning on line 1138 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L1137-L1138

Added lines #L1137 - L1138 were not covered by tests

csv_out_obj.close()
csv_out_obj = csv_outfp = csv_writer = None

Check warning on line 1141 in src/sourmash/commands.py

View check run for this annotation

Codecov / codecov/patch

src/sourmash/commands.py#L1140-L1141

Added lines #L1140 - L1141 were not covered by tests

if is_abundance and result:
p_covered = result.sum_weighted_found / result.total_weighted_hashes
Expand All @@ -1090,33 +1148,10 @@
print_results(f'the recovered matches hit {sum_f_uniq_found*100:.1f}% of the query k-mers (unweighted).')
print_results('')

if not found:
if found == 0:
notify('nothing found... skipping.')
continue

query_filename = query.filename
if not query_filename:
# use md5sum if query.filename not properly set
query_filename = query.md5sum()

output_base = os.path.basename(query_filename)
if args.output_dir:
output_base = os.path.join(args.output_dir, output_base)
output_csv = output_base + '.csv'

notify(f'saving all CSV matches to "{output_csv}"')
w = None
with FileOutputCSV(output_csv) as fp:
for result in found:
if w is None:
w = result.init_dictwriter(fp)
result.write(w)

output_matches = output_base + '.matches.sig'
with SaveSignaturesToLocation(output_matches) as save_sig:
notify(f"saving all matching signatures to '{output_matches}'")
save_sig.add_many([ r.match for r in found ])

output_unassigned = output_base + '.unassigned.sig'
with open(output_unassigned, 'wt') as fp:
remaining_query = gather_iter.query
Expand All @@ -1129,7 +1164,7 @@
abund_query_mh = remaining_query.minhash.inflate(orig_query_mh)
remaining_query.minhash = abund_query_mh

if not found:
if found == 0:
notify('nothing found - entire query signature unassigned.')
elif not remaining_query:
notify('no unassigned hashes! not saving.')
Expand Down
Loading