diff --git a/src/sourmash/tax/__main__.py b/src/sourmash/tax/__main__.py index fd85134690..bfaa6afca8 100644 --- a/src/sourmash/tax/__main__.py +++ b/src/sourmash/tax/__main__.py @@ -13,7 +13,7 @@ from sourmash.lca.lca_utils import display_lineage, zip_lineage from . import tax_utils -from .tax_utils import ClassificationResult, MultiLineageDB +from .tax_utils import ClassInf, MultiLineageDB usage=''' sourmash taxonomy [] - manipulate/work with taxonomy information. @@ -222,7 +222,7 @@ def genome(args): notify(f"WARNING: classifying query {sg.query_name} at desired rank {args.rank} does not meet containment threshold {args.containment_threshold}") else: status="match" - classif = ClassificationResult(sg.query_name, status, sg.rank, sg.fraction, sg.lineage, sg.query_md5, sg.query_filename, sg.f_weighted_at_rank, sg.bp_match_at_rank, sg.query_ani_at_rank) + classif = ClassInf(sg.query_name, status, sg.rank, sg.fraction, sg.lineage, sg.query_md5, sg.query_filename, sg.f_weighted_at_rank, sg.bp_match_at_rank, sg.query_ani_at_rank) classifications[args.rank].append(classif) matched_queries.add(sg.query_name) if "krona" in args.output_format: @@ -251,7 +251,7 @@ def genome(args): elif sg.fraction >= args.containment_threshold: status = "match" if status == "match": - classif = ClassificationResult(query_name=sg.query_name, status=status, rank=sg.rank, + classif = ClassInf(query_name=sg.query_name, status=status, rank=sg.rank, fraction=sg.fraction, lineage=sg.lineage, query_md5=sg.query_md5, query_filename=sg.query_filename, f_weighted_at_rank=sg.f_weighted_at_rank, bp_match_at_rank=sg.bp_match_at_rank, @@ -261,7 +261,7 @@ def genome(args): continue elif rank == "superkingdom" and status == "nomatch": status="below_threshold" - classif = ClassificationResult(query_name=sg.query_name, status=status, + classif = ClassInf(query_name=sg.query_name, status=status, rank="", fraction=0, lineage="", query_md5=sg.query_md5, query_filename=sg.query_filename, f_weighted_at_rank=sg.f_weighted_at_rank, bp_match_at_rank=sg.bp_match_at_rank, diff --git a/src/sourmash/tax/tax_utils.py b/src/sourmash/tax/tax_utils.py index f955a61c6f..2ff7a736b5 100644 --- a/src/sourmash/tax/tax_utils.py +++ b/src/sourmash/tax/tax_utils.py @@ -7,7 +7,7 @@ from collections import abc from itertools import zip_longest from typing import NamedTuple -from dataclasses import dataclass, field, replace +from dataclasses import dataclass, field, replace, asdict import gzip from sourmash import sqlite_utils, sourmash_args @@ -24,19 +24,22 @@ 'aggregate_by_lineage_at_rank', 'format_for_krona', 'write_krona', 'write_summary', 'write_classifications', 'combine_sumgather_csvs_by_lineage', 'write_lineage_sample_frac', - 'MultiLineageDB'] + 'MultiLineageDB', 'RankLineageInfo'] from sourmash.logging import notify from sourmash.sourmash_args import load_pathlist_from_file # CTB: these could probably usefully be converted into dataclasses. -QueryInfo = namedtuple("QueryInfo", "query_md5, query_filename, query_bp, query_hashes, total_weighted_hashes") -SummarizedGatherResult = namedtuple("SummarizedGatherResult", "query_name, rank, fraction, lineage, query_md5, query_filename, f_weighted_at_rank, bp_match_at_rank, query_ani_at_rank, total_weighted_hashes") -ClassificationResult = namedtuple("ClassificationResult", "query_name, status, rank, fraction, lineage, query_md5, query_filename, f_weighted_at_rank, bp_match_at_rank, query_ani_at_rank") +QInfo = namedtuple("QInfo", "query_md5, query_filename, query_bp, query_hashes, total_weighted_hashes") +SumGathInf = namedtuple("SumGathInf", "query_name, rank, fraction, lineage, query_md5, query_filename, f_weighted_at_rank, bp_match_at_rank, query_ani_at_rank, total_weighted_hashes") +ClassInf = namedtuple("ClassInf", "query_name, status, rank, fraction, lineage, query_md5, query_filename, f_weighted_at_rank, bp_match_at_rank, query_ani_at_rank") # Essential Gather column names that must be in gather_csv to allow `tax` summarization EssentialGatherColnames = ('query_name', 'name', 'f_unique_weighted', 'f_unique_to_query', 'unique_intersect_bp', 'remaining_bp', 'query_md5', 'query_filename') +RANKCODE = { "superkingdom": "D", "kingdom": "K", "phylum": "P", "class": "C", + "order": "O", "family":"F", "genus": "G", "species": "S", "unclassified": "U"} + # import lca utils as needed for now from sourmash.lca import lca_utils from sourmash.lca.lca_utils import (taxlist, display_lineage, pop_to_rank) @@ -528,7 +531,7 @@ def summarize_gather_at(rank, tax_assign, gather_results, *, skip_idents = [], query_bp = unique_intersect_bp + int(row['remaining_bp']) # store query info - query_info[query_name] = QueryInfo(query_md5=query_md5, query_filename=query_filename, query_bp=query_bp, query_hashes=query_nhashes, total_weighted_hashes=total_weighted_hashes) + query_info[query_name] = QInfo(query_md5=query_md5, query_filename=query_filename, query_bp=query_bp, query_hashes=query_nhashes, total_weighted_hashes=total_weighted_hashes) if estimate_query_ani and (not ksize or not scaled): if not set_ksize: @@ -562,7 +565,7 @@ def summarize_gather_at(rank, tax_assign, gather_results, *, skip_idents = [], sum_uniq_weighted[query_name][lineage] += f_uniq_weighted sum_uniq_bp[query_name][lineage] += unique_intersect_bp - # sort and store each as SummarizedGatherResult + # sort and store each as SumGathInf sum_uniq_to_query_sorted = [] for query_name, lineage_weights in sum_uniq_to_query.items(): qInfo = query_info[query_name] @@ -580,7 +583,7 @@ def summarize_gather_at(rank, tax_assign, gather_results, *, skip_idents = [], if estimate_query_ani: query_ani = containment_to_distance(fraction, ksize, scaled, n_unique_kmers= qInfo.query_hashes, sequence_len_bp= qInfo.query_bp).ani - sres = SummarizedGatherResult(query_name, rank, fraction, lineage, qInfo.query_md5, + sres = SumGathInf(query_name, rank, fraction, lineage, qInfo.query_md5, qInfo.query_filename, f_weighted_at_rank, bp_intersect_at_rank, query_ani, qInfo.total_weighted_hashes * scaled) sum_uniq_to_query_sorted.append(sres) else: @@ -601,7 +604,7 @@ def summarize_gather_at(rank, tax_assign, gather_results, *, skip_idents = [], if estimate_query_ani: query_ani = containment_to_distance(fraction, ksize, scaled, n_unique_kmers=qInfo.query_hashes, sequence_len_bp=qInfo.query_bp).ani - sres = SummarizedGatherResult(query_name, rank, fraction, lineage, query_md5, + sres = SumGathInf(query_name, rank, fraction, lineage, query_md5, query_filename, f_weighted_at_rank, bp_intersect_at_rank, query_ani, qInfo.total_weighted_hashes * scaled) sum_uniq_to_query_sorted.append(sres) @@ -612,7 +615,7 @@ def summarize_gather_at(rank, tax_assign, gather_results, *, skip_idents = [], if fraction > 0: f_weighted_at_rank = 1.0 - total_f_weighted bp_intersect_at_rank = qInfo.query_bp - total_bp_classified - sres = SummarizedGatherResult(query_name, rank, fraction, lineage, query_md5, + sres = SumGathInf(query_name, rank, fraction, lineage, query_md5, query_filename, f_weighted_at_rank, bp_intersect_at_rank, query_ani, qInfo.total_weighted_hashes*scaled) sum_uniq_to_query_sorted.append(sres) @@ -650,7 +653,7 @@ def make_krona_header(min_rank, *, include_strain=False): def aggregate_by_lineage_at_rank(rank_results, *, by_query=False): ''' - Aggregate list of rank SummarizedGatherResults, + Aggregate list of rank SumGathInfs, keeping query info or aggregating across queries. ''' lineage_summary = defaultdict(float) @@ -669,7 +672,7 @@ def aggregate_by_lineage_at_rank(rank_results, *, by_query=False): def format_for_krona(rank, summarized_gather): ''' - Aggregate list of SummarizedGatherResults and format for krona output + Aggregate list of SumGathInfs and format for krona output ''' num_queries=0 for res_rank, rank_results in summarized_gather.items(): @@ -719,7 +722,7 @@ def write_summary(summarized_gather, csv_fp, *, sep=',', limit_float_decimals=Fa ''' Write taxonomy-summarized gather results for each rank. ''' - header = SummarizedGatherResult._fields + header = SumGathInf._fields w = csv.DictWriter(csv_fp, header, delimiter=sep) w.writeheader() for rank, rank_results in summarized_gather.items(): @@ -771,9 +774,6 @@ def write_kreport(summarized_gather, csv_fp, *, sep='\t'): columns = ["percent_containment", "num_bp_contained", "num_bp_assigned", "rank_code", "ncbi_taxid", "sci_name"] w = csv.DictWriter(csv_fp, columns, delimiter=sep) - rankCode = { "superkingdom": "D", "kingdom": "K", "phylum": "P", "class": "C", - "order": "O", "family":"F", "genus": "G", "species": "S"} # , "": "U" - # check - are we using v4.5.0 or later gather CSVs? for rank, rank_results in summarized_gather.items(): for res in rank_results: @@ -782,7 +782,7 @@ def write_kreport(summarized_gather, csv_fp, *, sep='\t'): unclassified_written=False for rank, rank_results in summarized_gather.items(): - rcode = rankCode[rank] + rcode = RANKCODE[rank] for res in rank_results: # SummarizedGatherResults have an unclassified lineage at every rank, to facilitate reporting at a specific rank. # Here, we only need to report it once, since it will be the same fraction for all ranks @@ -811,7 +811,7 @@ def write_human_summary(summarized_gather, out_fp, display_rank): ''' Write human-readable taxonomy-summarized gather results for a specific rank. ''' - header = SummarizedGatherResult._fields + header = SumGathInf._fields found_ANI = False results = [] @@ -874,7 +874,7 @@ def write_classifications(classifications, csv_fp, *, sep=',', limit_float_decim ''' Write taxonomy-classifed gather results. ''' - header = ClassificationResult._fields + header = ClassInf._fields w = csv.DictWriter(csv_fp, header, delimiter=sep) w.writeheader() for rank, rank_results in classifications.items(): @@ -1482,3 +1482,519 @@ def load(cls, locations, **kwargs): tax_assign.add(this_tax_assign) return tax_assign + + +# strategy from: https://subscription.packtpub.com/book/programming/9781800207455/10/ch10lvl1sec01/using-dataclasses-to-simplify-working-with-csv-files +@dataclass +class GatherRow(): # all cols should match "gather_write_cols" in `search.py` + # essential columns + query_name: str + name: str # match_name + f_unique_weighted: float + f_unique_to_query: float + unique_intersect_bp: int + remaining_bp: int + query_md5: str + query_filename: str + # new essential cols: requires 4.4x + query_bp: int + ksize: int + scaled: int + + # non-essential + intersect_bp: int = None + f_orig_query: float = None + f_match: float = None + average_abund: float = None + median_abund: float = None + std_abund: float = None + filename: str = None + md5: str = None + f_match_orig: float = None + gather_result_rank: str = None + moltype: str = None + query_n_hashes: int = None + query_abundance: int = None + query_containment_ani: float = None + match_containment_ani: float = None + average_containment_ani: float = None + max_containment_ani: float = None + potential_false_negative: bool = None + n_unique_weighted_found: int = None + sum_weighted_found: int = None + total_weighted_hashes: int = None + + +@dataclass() +class QueryInfo(): + """Class for storing query information""" + query_name: str + query_md5: str + query_filename: str + query_bp: int + ksize: int + scaled: int + query_n_hashes: int = None + total_weighted_hashes: int = 0 + + def __post_init__(self): + "Initialize and cast types" + self.query_bp = int(self.query_bp) + self.ksize = int(self.ksize) + self.scaled = int(self.scaled) + self.query_n_hashes = int(self.query_n_hashes) if self.query_n_hashes else 0 + self.total_weighted_hashes = int(self.total_weighted_hashes) if self.total_weighted_hashes else 0 + + @property + def total_weighted_bp(self): + return self.total_weighted_hashes * self.scaled + +@dataclass +class TaxResult(): + raw: GatherRow + # can we get rid of these / just choose default ident hacking/slashing for future? + keep_full_identifiers: bool = False + keep_identifier_versions: bool = False + + query_name: str = field(init=False) + query_info: QueryInfo = field(init=False) + match_ident: str = field(init=False) + lineageInfo: RankLineageInfo = RankLineageInfo() + skipped_ident: bool = False + missed_ident: bool = False + match_lineage_attempted: bool = False + + def __post_init__(self): + self.get_ident() + self.query_name = self.raw.query_name # convenience + self.query_info = QueryInfo(query_name = self.raw.query_name, + query_md5=self.raw.query_md5, + query_filename = self.raw.query_filename, + query_bp = self.raw.query_bp, + query_n_hashes = self.raw.query_n_hashes, + total_weighted_hashes = self.raw.total_weighted_hashes, + ksize = self.raw.ksize, + scaled = self.raw.scaled + ) + # cast and store the imp bits + self.f_unique_to_query = float(self.raw.f_unique_to_query) + self.f_unique_weighted = float(self.raw.f_unique_weighted) + self.unique_intersect_bp = int(self.raw.unique_intersect_bp) + + def get_ident(self): + # split identifiers = split on whitespace + # keep identifiers = don't split .[12] from assembly accessions + "Hack and slash identifiers." + self.match_ident = self.raw.name + if not self.keep_full_identifiers: + self.match_ident = self.raw.name.split(' ')[0] + else: + #overrides version bc can't keep full without keeping version + self.keep_identifier_versions = True + if not self.keep_identifier_versions: + self.match_ident = self.match_ident.split('.')[0] + + + def get_match_lineage(self, tax_assignments, skip_idents=None, fail_on_missing_taxonomy=False): + if skip_idents and self.match_ident in skip_idents: + self.skipped_ident = True + else: + lin = tax_assignments.get(self.match_ident) + if lin: + self.lineageInfo = RankLineageInfo(lineage=lin) + else: + self.missed_ident=True + self.match_lineage_attempted = True + if self.missed_ident and fail_on_missing_taxonomy: + raise ValueError(f"Error: ident '{self.match_ident}' is not in the taxonomy database. Failing, as requested via --fail-on-missing-taxonomy") +# raise ValueError('Failing on missing taxonomy, as requested via --fail-on-missing-taxonomy.') + +@dataclass +class SummarizedGatherResult(): +# """Class for storing summarized lineage information""" + rank: str + fraction: float + lineage: RankLineageInfo + f_weighted_at_rank: float + bp_match_at_rank: int + query_ani_at_rank: float = None + + def __post_init__(self): + self.check_values() + + def check_values(self): + if any([self.fraction > 1, self.f_weighted_at_rank > 1]): + raise ValueError(f"Summarized fraction is > 100% of the query! This should not be possible. Please check that your input files come directly from a single gather run per query.") + # is this true for weighted too, or is that set to 0 when --ignore-abundance is used? + if any([self.fraction <=0, self.f_weighted_at_rank <= 0]): # this shouldn't actually happen, but it breaks ANI estimation, so let's check for it. + raise ValueError(f"Summarized fraction is <=0% of the query! This should not occur.") + + def set_query_ani(self, query_info): + self.query_ani_at_rank = containment_to_distance(self.fraction, query_info.ksize, query_info.scaled, + n_unique_kmers=query_info.query_n_hashes, + sequence_len_bp=query_info.query_bp).ani + + def as_summary_dict(self, query_info, limit_float=False): + sD = asdict(self) + if sD['lineage'] == (): # get rid of my by using blank RankLineageInfo() instead of () as empty lini? + sD['lineage'] = "unclassified" + else: + sD['lineage'] = self.lineage.display_lineage() # null_as_unclassified=True + sD['query_name'] = query_info.query_name + sD['query_md5'] = query_info.query_md5 + sD['query_filename'] = query_info.query_filename + sD['total_weighted_hashes'] = str(query_info.total_weighted_hashes) + sD['bp_match_at_rank'] = str(self.bp_match_at_rank) + if limit_float: + sD['fraction'] = f'{self.fraction:.3f}' + sD['f_weighted_at_rank'] = f'{self.f_weighted_at_rank:.3f}' + if self.query_ani_at_rank: + sD['query_ani_at_rank'] = f'{self.query_ani_at_rank:.3f}'#f"{self.query_ani_at_rank*100:>3.1f}%" + else: + sD['fraction'] = str(self.fraction) + sD['f_weighted_at_rank'] = str(self.f_weighted_at_rank) + + return(sD) + + def as_human_friendly_dict(self, query_info): + sD = self.as_summary_dict(query_info=query_info, limit_float=True) + sD['f_weighted_at_rank'] = f"{self.f_weighted_at_rank*100:>4.1f}%" + if self.query_ani_at_rank is not None: + sD['query_ani_at_rank'] = f"{self.query_ani_at_rank*100:>3.1f}%" + else: + sD['query_ani_at_rank'] = '- ' + return sD + + def as_kreport_dict(self, query_info): + lowest_assignment_rank = 'species' + sD = {} + sD['num_bp_assigned'] = str(0) + # total percent containment, weighted to include abundance info + sD['percent_containment'] = f'{self.f_weighted_at_rank * 100:.2f}' + sD["num_bp_contained"] = str(int(self.f_weighted_at_rank * query_info.total_weighted_hashes)) + # could make this cleaner if used empty RankLineageInfo() + #sD['lineage'] = self.lineage.display_lineage(null_as_unclassified=True) + if self.lineage != (): + this_rank = self.lineage.lowest_rank + sD['rank_code'] = RANKCODE[this_rank] + sD['sci_name'] = self.lineage.lowest_lineage_name + sD['ncbi_taxid'] = self.lineage.lowest_lineage_taxid + # the number of bp actually 'assigned' at this rank. Sourmash assigns everything + # at genome level, but since kreport traditionally doesn't include 'strain' or genome, + # it is reasonable to state that sourmash assigns at 'species' level for this. + # can be modified later. + if this_rank == lowest_assignment_rank: + sD["num_bp_assigned"] = sD["num_bp_contained"] + else: + sD['sci_name'] = 'unclassified' + sD['rank_code'] = RANKCODE['unclassified'] + sD["num_bp_assigned"] = sD["num_bp_contained"] + return sD + +@dataclass +class ClassificationResult(SummarizedGatherResult): +# """Class for storing summarized lineage information""" + status: str = field(init=False) + + def __post_init__(self): + # check for out of bounds values, default "nomatch" if no match at all + self.check_values() + self.status = 'nomatch' #None? + + def set_status(self, query_info, containment_threshold=None, ani_threshold=None): + # if any matches, use 'below_threshold' as default; set 'match' if meets threshold + if any([containment_threshold, ani_threshold]): + self.status="below_threshold" + self.set_query_ani(query_info=query_info) + if ani_threshold: # if provided, just use ani thresh, don't use containment threshold + if self.query_ani_at_rank >= ani_threshold: + self.status = 'match' + # should we switch to using weighted here? I think yes, but this would be behavior change + elif containment_threshold and self.fraction >= containment_threshold: + #elif containment_threshold and self.f_weighted_at_rank >= containment_threshold: + self.status = 'match' + + def build_krona_result(self, rank=None): + krona_classified, krona_unclassified = None, None + if rank is not None and rank == self.rank: + lin_as_list = self.lineage.display_lineage().split(';') + krona_classification = (self.f_weighted_at_rank, *lin_as_list) + krona_classified = (krona_classification) + # handle unclassified - do we want/need this? + unclassified_fraction= 1.0-self.f_weighted_at_rank + len_unclassified_lin = len(lin_as_list) + unclassifed_lin = ["unclassified"]*(len_unclassified_lin) + krona_unclassified = (unclassified_fraction, *unclassifed_lin) + return krona_classified, krona_unclassified + + +@dataclass +class QueryTaxResult(): + """Store all TaxResults for a query. Enable summarization.""" + query_info: QueryInfo # initialize with QueryInfo dataclass + + def __post_init__(self): + self.query_name = self.query_info.query_name # for convenience + self._init_taxresult_vars() + self._init_summarization_vars() + self._init_classification_results() + + def _init_taxresult_vars(self): + self.ranks = [] + self.raw_taxresults = [] + self.skipped_idents= set() + self.missed_idents = set() + self.n_missed = 0 + self.n_skipped = 0 + self.perfect_match = set() + + def _init_summarization_vars(self): + self.sum_uniq_weighted = defaultdict(lambda: defaultdict(float)) + self.sum_uniq_to_query = defaultdict(lambda: defaultdict(float)) + self.sum_uniq_bp = defaultdict(lambda: defaultdict(int)) + self.summarized_ranks = [] + self._init_summarization_results() + + def _init_summarization_results(self): + self.total_f_weighted = defaultdict(float) #0.0 + self.total_f_classified = defaultdict(float)#0.0 + self.total_bp_classified = defaultdict(int) #0 + self.summarized_lineage_results = defaultdict(list) + + def _init_classification_results(self): + self.status = 'nomatch' + self.classified_ranks = [] + self.classification_result = None + self.krona_classified = None + self.krona_unclassified = None + self.krona_header = [] + + def is_compatible(self, taxresult): + return taxresult.query_info == self.query_info + + @property + def ascending_ranks(self): + if not self.ranks: + return [] + else: + return self.ranks[::-1] + + def add_taxresult(self, taxresult): + # check that all query parameters match + if self.is_compatible(taxresult=taxresult): + if not taxresult.match_lineage_attempted: + raise ValueError("Error: Cannot add TaxResult. Please use get_match_lineage() to add taxonomic lineage information first.") + if not self.ranks: + self.ranks = taxresult.lineageInfo.ranks + if taxresult.skipped_ident: + self.n_skipped +=1 + self.skipped_idents.add(taxresult.match_ident) + elif taxresult.missed_ident: + self.n_missed +=1 + self.missed_idents.add(taxresult.match_ident) + self.raw_taxresults.append(taxresult) + else: + raise ValueError("Error: Cannot add TaxResult: query information does not match.") + + def summarize_up_ranks(self, single_rank=None, force_resummarize=False): + if self.summarized_ranks: # has already been summarized + if force_resummarize: + self._init_summarization_vars() + else: + raise ValueError("Error: already summarized using rank(s): '{', '.join(self.summarized_ranks)}'. Use 'force_resummarize=True' to reset and resummarize") + # set ranks levels to summarize + self.summarized_ranks = self.ascending_ranks + if single_rank: + if single_rank not in self.summarized_ranks: + raise ValueError(f"Error: rank '{single_rank}' not in available ranks ({', '.join(self.summarized_ranks)})") + self.summarized_ranks = [single_rank] + notify(f"Starting summarization up rank(s): {', '.join(self.summarized_ranks)} ") + for taxres in self.raw_taxresults: + lininfo = taxres.lineageInfo + if lininfo and lininfo.filled_lineage: # won't always have lineage to summarize (skipped idents, missed idents) + # notify + track perfect matches + if taxres.f_unique_to_query >= 1.0: + if taxres.match_ident not in self.perfect_match: + notify(f"WARNING: 100% match! Is query '{self.query_name}' identical to its database match, '{taxres.match_ident}'?") + self.perfect_match.add(taxres.match_ident) + # add this taxresult to summary + for rank in self.summarized_ranks: + if rank in lininfo.filled_ranks: # only store if this rank is filled. + lin_at_rank = lininfo.pop_to_rank(rank) + self.sum_uniq_weighted[rank][lin_at_rank] += taxres.f_unique_weighted + self.sum_uniq_to_query[rank][lin_at_rank] += taxres.f_unique_to_query + self.sum_uniq_bp[rank][lin_at_rank] += taxres.unique_intersect_bp + # reset ranks levels to the ones that were actually summarized + that we can access for summarized result + self.summarized_ranks = [x for x in self.summarized_ranks if x in self.sum_uniq_bp.keys()] + if single_rank and single_rank not in self.summarized_ranks: + raise ValueError(f"Error: rank '{single_rank}' was not available for any matching lineages.") + + def build_summarized_result(self, single_rank=None, force_resummarize=False): + # just reset if we've already built summarized result (avoid adding to existing)? Or write in an error/force option? + self._init_summarization_results() + # if taxresults haven't been summarized, do that first + if not self.summarized_ranks or force_resummarize: + self.summarize_up_ranks(single_rank=single_rank, force_resummarize=force_resummarize) + # catch potential error from running summarize_up_ranks separately and passing in different single_rank + if single_rank and single_rank not in self.summarized_ranks: + raise ValueError(f"Error: rank '{single_rank}' not in summarized rank(s), {','.join(self.summarized_ranks)}") + # rank loop is currently done in __main__ + for rank in self.summarized_ranks[::-1]: # reverse so that results are in descending order + sum_uniq_to_query = self.sum_uniq_to_query[rank] #should be lineage: value + # first, sort + sorted_sum_uniq_to_query = list(sum_uniq_to_query.items()) + sorted_sum_uniq_to_query.sort(key = lambda x: -x[1]) + for lineage, f_unique in sorted_sum_uniq_to_query: + # does this ever happen? do we need it? + if f_unique == 0: #no annotated results for this query. do we need to handle this differently now? + continue + f_weighted_at_rank = self.sum_uniq_weighted[rank][lineage] + bp_intersect_at_rank = self.sum_uniq_bp[rank][lineage] + sres = SummarizedGatherResult(lineage=lineage, rank=rank, + f_weighted_at_rank=f_weighted_at_rank, fraction=f_unique, + bp_match_at_rank=bp_intersect_at_rank) + sres.set_query_ani(query_info=self.query_info) + self.summarized_lineage_results[rank].append(sres) + + # NTP Note: These change by rank ONLY when doing best_only (selecting top hit at that particular rank) + # now that I pulled best_only into separate fn, these don't need to be dicts... + self.total_f_classified[rank] += f_unique + self.total_f_weighted[rank] += f_weighted_at_rank + self.total_bp_classified[rank] += bp_intersect_at_rank + + # record unclassified + lineage = () + query_ani = None + f_unique = 1.0 - self.total_f_classified[rank] + if f_unique > 0: + f_weighted_at_rank = 1.0 - self.total_f_weighted[rank] + bp_intersect_at_rank = self.query_info.query_bp - self.total_bp_classified[rank] + sres = SummarizedGatherResult(lineage=lineage, rank=rank, f_weighted_at_rank=f_weighted_at_rank, + fraction=f_unique, bp_match_at_rank=bp_intersect_at_rank, query_ani_at_rank=query_ani) + self.summarized_lineage_results[rank].append(sres) + + def build_classification_result(self, rank=None, ani_threshold=None, containment_threshold=0.1, force_resummarize=False): + if containment_threshold and not 0 <= containment_threshold <= 1: + raise ValueError(f"Containment threshold must be between 0 and 1 (input value: {containment_threshold}).") + if ani_threshold and not 0 <= ani_threshold <= 1: + raise ValueError(f"ANI threshold must be between 0 and 1 (input value: {ani_threshold}).") + self._init_classification_results() # init some fields + if not self.summarized_ranks or force_resummarize: + self.summarize_up_ranks(single_rank=rank, force_resummarize=force_resummarize) + # catch potential error from running summarize_up_ranks separately and passing in different single_rank + self.classified_ranks = self.summarized_ranks + # if a rank is provided, we need to classify ONLY using that rank + if rank: + if rank not in self.summarized_ranks: + raise ValueError(f"Error: rank '{rank}' not in summarized rank(s), {','.join(self.summarized_ranks)}") + else: + self.classified_ranks = [rank] + # CLASSIFY using summarization--> best only result. Best way = use ANI or containment threshold + for this_rank in self.classified_ranks: # ascending order or just single rank + # reset for this rank + f_weighted=0.0 + f_unique_at_rank=0.0 + bp_intersect_at_rank=0 + sum_uniq_to_query = self.sum_uniq_to_query[this_rank] + # sort the results and grab best + sorted_sum_uniq_to_query = list(sum_uniq_to_query.items()) + sorted_sum_uniq_to_query.sort(key = lambda x: -x[1]) + # select best-at-rank only + this_lineage, f_unique_at_rank = sorted_sum_uniq_to_query[0] + bp_intersect_at_rank = self.sum_uniq_bp[this_rank][this_lineage] + f_weighted = self.sum_uniq_weighted[this_rank][this_lineage] + + classif = ClassificationResult(rank=this_rank, fraction=f_unique_at_rank, lineage=this_lineage, + f_weighted_at_rank=f_weighted, bp_match_at_rank=bp_intersect_at_rank) + + classif.set_status(self.query_info, containment_threshold=containment_threshold, ani_threshold=ani_threshold) + # determine whether to move on to a higher tax rank (if avail) + if classif.status == 'match' or classif.status == "nomatch": # not sure we want/need the `nomatch` part... + break + + # store the final classification result + self.classification_result = classif + # could do this later, in __main__.py, for example + self.krona_classified, self.krona_unclassified = self.classification_result.build_krona_result(rank=rank) + self.krona_header = self.make_krona_header(min_rank = rank) + + def make_krona_header(self, min_rank): + "make header for krona output" + if min_rank is None: + return [] + if min_rank not in self.summarized_ranks: + raise ValueError(f"Rank '{min_rank}' not present in summarized ranks.") + else: + rank_index = self.ranks.index(min_rank) + return ["fraction"] + list(self.ranks[:rank_index+1]) + + def check_classification(self): + if not self.classification_result: + raise ValueError("query not classified yet.") + + def check_summarization(self): + if not self.summarized_lineage_results: + raise ValueError("lineages not summarized yet.") + + def make_human_summary(self, display_rank, classification=False): + results = [] + if classification: + self.check_classification() + display_rank_results = [self.classification_result] + else: + self.check_summarization() + display_rank_results = self.summarized_lineage_results[display_rank] + display_rank_results.sort(key=lambda res: -res.f_weighted_at_rank) + + for res in display_rank_results: + results.append(res.as_human_friendly_dict(query_info=self.query_info)) + return results + + def make_full_summary(self, classification=False, limit_float=False): + results = [] + if classification: + self.check_classification() + header= ["query_name", "status", "rank", "fraction", "lineage", + "query_md5", "query_filename", "f_weighted_at_rank", + "bp_match_at_rank", "query_ani_at_rank"] + rD = self.classification_result.as_summary_dict(query_info = self.query_info, limit_float=limit_float) + results.append(rD) + else: + self.check_summarization() + header= ["query_name", "rank", "fraction", "lineage", "query_md5", + "query_filename", "f_weighted_at_rank", "bp_match_at_rank", + "query_ani_at_rank", "total_weighted_hashes"] + + for rank in self.summarized_ranks[::-1]: + rank_results = self.summarized_lineage_results[rank] + rank_results.sort(key=lambda res: -res.f_weighted_at_rank) + for res in rank_results: + results.append(res.as_summary_dict(query_info=self.query_info, limit_float=limit_float)) + return header, results + + def make_kreport_results(self): + self.check_summarization() + if self.query_info.total_weighted_hashes == 0: + raise ValueError("ERROR: cannot produce 'kreport' format from gather results before sourmash v4.5.0") + required_ranks = set(RANKCODE.keys()) + acceptable_ranks = list(self.ranks) + ['unclassified', 'kingdom'] + if not required_ranks.issubset(set(acceptable_ranks)): + raise ValueError("ERROR: cannot produce 'kreport' format from ranks {', '.join(self.ranks)}") + kreport_results = [] + unclassified_recorded=False + # want to order results descending by rank + for rank in self.ranks: + if rank == 'strain': # no code for strain, can't include in this output afaik + continue + rank_results = self.summarized_lineage_results[rank] + for res in rank_results: + kresD = res.as_kreport_dict(self.query_info) + if kresD['sci_name'] == "unclassified": + # SummarizedGatherResults have an unclassified lineage at every rank, to facilitate reporting at a specific rank. + # Here, we only need to report it once, since it will be the same fraction for all ranks + if unclassified_recorded: + continue + else: + unclassified_recorded = True + kreport_results.append(kresD) + return(kreport_results) diff --git a/tests/test_tax_utils.py b/tests/test_tax_utils.py index 6061a561b0..832395252f 100644 --- a/tests/test_tax_utils.py +++ b/tests/test_tax_utils.py @@ -3,6 +3,7 @@ """ import pytest +from pytest import approx import os from os.path import basename import gzip @@ -14,15 +15,17 @@ write_summary, MultiLineageDB, collect_gather_csvs, check_and_load_gather_csvs, SummarizedGatherResult, ClassificationResult, + QueryInfo, GatherRow, TaxResult, QueryTaxResult, + BaseLineageInfo, RankLineageInfo, LineagePair, write_classifications, aggregate_by_lineage_at_rank, make_krona_header, format_for_krona, write_krona, combine_sumgather_csvs_by_lineage, write_lineage_sample_frac, - LineageDB, LineageDB_Sqlite) + LineageDB, LineageDB_Sqlite, + SumGathInf, ClassInf, QInfo) # import lca utils as needed for now from sourmash.lca import lca_utils -from sourmash.tax.tax_utils import LineagePair, BaseLineageInfo, RankLineageInfo # utility functions for testing def make_mini_gather_results(g_infolist, include_ksize_and_scaled=False): @@ -45,6 +48,68 @@ def make_mini_taxonomy(tax_info): return taxD +def make_GatherRow(gather_dict=None, exclude_cols=[]): + """Load artificial gather row (dict) into GatherRow class""" + # default contains just the essential cols + gatherD = {'query_name': 'q1', + 'query_md5': 'md5', + 'query_filename': 'query_fn', + 'name': 'gA', + 'f_unique_weighted': 0.2, + 'f_unique_to_query': 0.1, + 'query_bp':100, + 'unique_intersect_bp': 20, + 'remaining_bp': 1, + 'ksize': 31, + 'scaled': 1} + if gather_dict is not None: + gatherD.update(gather_dict) + for col in exclude_cols: + gatherD.pop(col) + gatherRaw = GatherRow(**gatherD) + return gatherRaw + + +def make_TaxResult(gather_dict=None, taxD=None, keep_full_ident=False, keep_ident_version=False, skip_idents=None): + """Make TaxResult from artificial gather row (dict)""" + gRow = make_GatherRow(gather_dict) + taxres = TaxResult(raw=gRow, keep_full_identifiers=keep_full_ident, keep_identifier_versions=keep_ident_version) + if taxD is not None: + taxres.get_match_lineage(tax_assignments=taxD, skip_idents=skip_idents) + return taxres + + +def make_QueryTaxResults(gather_info, taxD=None, single_query=False, keep_full_ident=False, keep_ident_version=False, + skip_idents=None, summarize=False, classify=False, classify_rank=None, c_thresh=0.1, ani_thresh=None): + """Make QueryTaxResult(s) from artificial gather information, formatted as list of gather rows (dicts)""" + gather_results = {} + this_querytaxres = None + for gather_infoD in gather_info: + taxres = make_TaxResult(gather_infoD, taxD=taxD, keep_full_ident=keep_full_ident, + keep_ident_version=keep_ident_version, skip_idents=skip_idents) + query_name = taxres.query_name + # add to matching QueryTaxResult or create new one + if not this_querytaxres or not this_querytaxres.is_compatible(taxres): + # get existing or initialize new + this_querytaxres = gather_results.get(query_name, QueryTaxResult(taxres.query_info)) + this_querytaxres.add_taxresult(taxres) +# print('missed_ident?', taxres.missed_ident) + gather_results[query_name] = this_querytaxres + if summarize: + for query_name, qres in gather_results.items(): + qres.build_summarized_result() + if classify: + for query_name, qres in gather_results.items(): + qres.build_classification_result(rank=classify_rank, containment_threshold=c_thresh, ani_threshold=ani_thresh) + # for convenience: If working with single query, just return that QueryTaxResult. + if single_query: + if len(gather_results.keys()) > 1: + raise ValueError("You passed in results for more than one query") + else: + return next(iter(gather_results.values())) + return gather_results + + ## tests def test_ascending_taxlist_1(): assert list(ascending_taxlist()) == ['strain', 'species', 'genus', 'family', 'order', 'class', 'phylum', 'superkingdom'] @@ -54,24 +119,271 @@ def test_ascending_taxlist_2(): assert list(ascending_taxlist(include_strain=False)) == ['species', 'genus', 'family', 'order', 'class', 'phylum', 'superkingdom'] +def test_QueryInfo_basic(): + "basic functionality of QueryInfo dataclass" + qInf = QueryInfo(query_name='q1', query_md5='md5', query_filename='f1',query_bp='100',query_n_hashes='10',ksize='31',scaled='10', total_weighted_hashes='200') + assert qInf.query_name == 'q1' + assert isinstance(qInf.query_n_hashes, int) + assert isinstance(qInf.ksize, int) + assert isinstance(qInf.scaled, int) + assert qInf.total_weighted_hashes == 200 + assert qInf.total_weighted_bp == 2000 + + +def test_QueryInfo_no_hash_info(): + "QueryInfo dataclass for older gather results without query_n_hashes or total_weighted_hashes" + qInf = QueryInfo(query_name='q1', query_md5='md5', query_filename='f1',query_bp='100',ksize=31,scaled=10) + assert qInf.query_name == 'q1' + assert qInf.query_n_hashes == 0 + assert qInf.total_weighted_hashes == 0 + assert qInf.total_weighted_bp == 0 + + +def test_QueryInfo_missing(): + "check that required args" + with pytest.raises(TypeError) as exc: + QueryInfo(query_name='q1', query_filename='f1',query_bp='100',query_n_hashes='10',ksize=31,scaled=10, total_weighted_hashes=200) + print(str(exc)) + assert "missing 1 required positional argument: 'query_md5'" in str(exc) + + +def test_SummarizedGatherResult(): + "basic functionality of SummarizedGatherResult dataclass" + qInf = QueryInfo(query_name='q1', query_md5='md5', query_filename='f1',query_bp='100', + query_n_hashes='10',ksize='31',scaled='10', total_weighted_hashes='200') + sgr = SummarizedGatherResult(rank="phylum", fraction=0.2, lineage=RankLineageInfo(lineage_str="a;b"), + f_weighted_at_rank=0.3, bp_match_at_rank=30) + print(sgr) + assert sgr.rank=='phylum' + sumD = sgr.as_summary_dict(query_info=qInf) + print(sumD) + assert sumD == {'rank': 'phylum', 'fraction': "0.2", 'lineage': 'a;b', 'f_weighted_at_rank': "0.3", + 'bp_match_at_rank': "30", 'query_ani_at_rank': None, 'query_name': 'q1', + 'query_md5': 'md5', 'query_filename': 'f1', 'total_weighted_hashes': "200"} + hD = sgr.as_human_friendly_dict(query_info=qInf) + print(hD) + assert hD == {'rank': 'phylum', 'fraction': '0.200', 'lineage': 'a;b', 'f_weighted_at_rank': '30.0%', + 'bp_match_at_rank': "30", 'query_ani_at_rank': '- ', 'query_name': 'q1', + 'query_md5': 'md5', 'query_filename': 'f1', 'total_weighted_hashes': "200"} + krD = sgr.as_kreport_dict(query_info=qInf) + print(krD) + assert krD == {'ncbi_taxid': None, 'sci_name': 'b', 'rank_code': 'P', 'num_bp_assigned': "0", + 'percent_containment': '30.00', 'num_bp_contained': "60"} + + +def test_SummarizedGatherResult_set_query_ani(): + "Check ANI estimation within SummarizedGatherResult dataclass" + qInf = QueryInfo(query_name='q1', query_md5='md5', query_filename='f1',query_bp='100', + query_n_hashes='10',ksize='31',scaled='10', total_weighted_hashes='200') + sgr = SummarizedGatherResult(rank="phylum", fraction=0.2, lineage=RankLineageInfo(lineage_str="a;b"), + f_weighted_at_rank=0.3, bp_match_at_rank=30) + sgr.set_query_ani(query_info=qInf) + print(sgr.query_ani_at_rank) + assert sgr.query_ani_at_rank == approx(0.949, rel=1e-3) + # ANI can be calculated with query_bp OR query_n_hashes. Remove each and check the results are identical + qInf = QueryInfo(query_name='q1', query_md5='md5', query_filename='f1',query_bp='100', + query_n_hashes=0,ksize='31',scaled='10', total_weighted_hashes='200') + sgr = SummarizedGatherResult(rank="phylum", fraction=0.2, lineage=RankLineageInfo(lineage_str="a;b"), + f_weighted_at_rank=0.3, bp_match_at_rank=30) + sgr.set_query_ani(query_info=qInf) + print(sgr.query_ani_at_rank) + assert sgr.query_ani_at_rank == approx(0.949, rel=1e-3) + # try without query_bp + qInf = QueryInfo(query_name='q1', query_md5='md5', query_filename='f1',query_bp=0, + query_n_hashes='10',ksize='31',scaled='10', total_weighted_hashes='200') + sgr = SummarizedGatherResult(rank="phylum", fraction=0.2, lineage=RankLineageInfo(lineage_str="a;b"), + f_weighted_at_rank=0.3, bp_match_at_rank=30) + sgr.set_query_ani(query_info=qInf) + print(sgr.query_ani_at_rank) + assert sgr.query_ani_at_rank == approx(0.949, rel=1e-3) + + +def test_SummarizedGatherResult_greater_than_1(): + "basic functionality of SummarizedGatherResult dataclass" + # fraction > 1 + with pytest.raises(ValueError) as exc: + SummarizedGatherResult(rank="phylum", fraction=0.3, lineage=RankLineageInfo(lineage_str="a;b"), + f_weighted_at_rank=1.2, bp_match_at_rank=30) + print(str(exc)) + assert "> 100% of the query!" in str(exc) + # f_weighted > 1 + with pytest.raises(ValueError) as exc: + SummarizedGatherResult(rank="phylum", fraction=1.2, lineage=RankLineageInfo(lineage_str="a;b"), + f_weighted_at_rank=0.3, bp_match_at_rank=30) + print(str(exc)) + assert "> 100% of the query!" in str(exc) + + +def test_SummarizedGatherResult_0_fraction(): + with pytest.raises(ValueError) as exc: + SummarizedGatherResult(rank="phylum", fraction=-.1, lineage=RankLineageInfo(lineage_str="a;b"), + f_weighted_at_rank=0.3, bp_match_at_rank=30) + err_msg = "Summarized fraction is <=0% of the query! This should not occur." + assert err_msg in str(exc) + #assert cr.status == 'nomatch' + + with pytest.raises(ValueError) as exc: + SummarizedGatherResult(rank="phylum", fraction=.1, lineage=RankLineageInfo(lineage_str="a;b"), + f_weighted_at_rank=0, bp_match_at_rank=30) + print(str(exc)) + assert err_msg in str(exc) + + +def test_SummarizedGatherResult_species_kreport(): + "basic functionality of SummarizedGatherResult dataclass" + qInf = QueryInfo(query_name='q1', query_md5='md5', query_filename='f1',query_bp='100', + query_n_hashes='10',ksize='31',scaled='10', total_weighted_hashes='200') + sgr = SummarizedGatherResult(rank="species", fraction=0.2, lineage=RankLineageInfo(lineage_str="a;b;c;d;e;f;g"), + f_weighted_at_rank=0.3, bp_match_at_rank=30) + print(sgr) + assert sgr.rank=='species' + krD = sgr.as_kreport_dict(query_info=qInf) + print(krD) + assert krD == {'ncbi_taxid': None, 'sci_name': 'g', 'rank_code': 'S', 'num_bp_assigned': "60", + 'percent_containment': '30.00', 'num_bp_contained': "60"} + + +def test_SummarizedGatherResult_summary_dict_limit_float(): + "basic functionality of SummarizedGatherResult dataclass" + qInf = QueryInfo(query_name='q1', query_md5='md5', query_filename='f1',query_bp='100', + query_n_hashes='10',ksize='31',scaled='10', total_weighted_hashes='200') + sgr = SummarizedGatherResult(rank="phylum", fraction=0.123456, lineage=RankLineageInfo(lineage_str="a;b"), + f_weighted_at_rank=0.345678, bp_match_at_rank=30) + print(sgr) + assert sgr.rank=='phylum' + sumD = sgr.as_summary_dict(query_info=qInf) + print(sumD) + assert sumD == {'rank': 'phylum', 'fraction': "0.123456", 'lineage': 'a;b', 'f_weighted_at_rank': "0.345678", + 'bp_match_at_rank': "30", 'query_ani_at_rank': None, 'query_name': 'q1', + 'query_md5': 'md5', 'query_filename': 'f1', 'total_weighted_hashes': "200"} + + sumD = sgr.as_summary_dict(query_info=qInf, limit_float=True) + print(sumD) + assert sumD == {'rank': 'phylum', 'fraction': "0.123", 'lineage': 'a;b', 'f_weighted_at_rank': "0.346", + 'bp_match_at_rank': "30", 'query_ani_at_rank': None, 'query_name': 'q1', + 'query_md5': 'md5', 'query_filename': 'f1', 'total_weighted_hashes': "200"} + + +def test_ClassificationResult(): + "basic functionality of ClassificationResult dataclass" + qInf = QueryInfo(query_name='q1', query_md5='md5', query_filename='f1',query_bp='100', + query_n_hashes='10',ksize='31',scaled='10', total_weighted_hashes='200') + cr = ClassificationResult(rank="phylum", fraction=0.2, lineage=RankLineageInfo(lineage_str="a;b"), + f_weighted_at_rank=0.3, bp_match_at_rank=30, query_ani_at_rank=0.97) + cr.set_status(query_info=qInf, containment_threshold=0.1) + assert cr.status == 'match' + print(cr.query_ani_at_rank) + assert cr.query_ani_at_rank == approx(0.949, rel=1e-3) + cr.set_status(query_info=qInf, containment_threshold=0.35) + assert cr.status == 'below_threshold' + + +def test_ClassificationResult_greater_than_1(): + "basic functionality of SummarizedGatherResult dataclass" + # fraction > 1 + with pytest.raises(ValueError) as exc: + ClassificationResult(rank="phylum", fraction=0.3, lineage=RankLineageInfo(lineage_str="a;b"), + f_weighted_at_rank=1.2, bp_match_at_rank=30) + print(str(exc)) + assert "> 100% of the query!" in str(exc) + # f_weighted > 1 + with pytest.raises(ValueError) as exc: + ClassificationResult(rank="phylum", fraction=1.2, lineage=RankLineageInfo(lineage_str="a;b"), + f_weighted_at_rank=0.3, bp_match_at_rank=30) + print(str(exc)) + assert "> 100% of the query!" in str(exc) + + +def test_ClassificationResult_0_fraction(): + with pytest.raises(ValueError) as exc: + ClassificationResult(rank="phylum", fraction=-.1, lineage=RankLineageInfo(lineage_str="a;b"), + f_weighted_at_rank=0.3, bp_match_at_rank=30) + err_msg = "Summarized fraction is <=0% of the query! This should not occur." + assert err_msg in str(exc) + #assert cr.status == 'nomatch' + + with pytest.raises(ValueError) as exc: + ClassificationResult(rank="phylum", fraction=.1, lineage=RankLineageInfo(lineage_str="a;b"), + f_weighted_at_rank=0, bp_match_at_rank=30) + print(str(exc)) + assert err_msg in str(exc) + + +def test_ClassificationResult_build_krona_result(): + qInf = QueryInfo(query_name='q1', query_md5='md5', query_filename='f1',query_bp='100', + query_n_hashes='10',ksize='31',scaled='10', total_weighted_hashes='200') + cr = ClassificationResult(rank="phylum", fraction=0.2, lineage=RankLineageInfo(lineage_str="a;b"), + f_weighted_at_rank=0.3, bp_match_at_rank=30, query_ani_at_rank=0.97) + #cr.set_status(query_info=qInf, rank='phylum') + kr, ukr = cr.build_krona_result(rank='phylum') + print(kr) + assert kr == (0.3, 'a', 'b') + print(ukr) + assert ukr == (0.7, 'unclassified', 'unclassified') + + +def test_ClassificationResult_build_krona_result_no_rank(): + qInf = QueryInfo(query_name='q1', query_md5='md5', query_filename='f1',query_bp='100', + query_n_hashes='10',ksize='31',scaled='10', total_weighted_hashes='200') + cr = ClassificationResult(rank="phylum", fraction=0.2, lineage=RankLineageInfo(lineage_str="a;b"), + f_weighted_at_rank=0.3, bp_match_at_rank=30, query_ani_at_rank=0.97) + cr.set_status(query_info=qInf, containment_threshold=0.1) + + +def test_GatherRow_old_gather(): + # gather does not contain query_name column + gA = {"name": "gA.1 name"} + with pytest.raises(TypeError) as exc: + make_GatherRow(gA, exclude_cols=['query_bp']) + print(str(exc)) + assert "__init__() missing 1 required positional argument: 'query_bp'" in str(exc) + + def test_get_ident_default(): ident = "GCF_001881345.1" n_id = get_ident(ident) assert n_id == "GCF_001881345" +def test_TaxResult_get_ident_default(): + gA = {"name": "GCF_001881345.1"} # gather result with match name as GCF_001881345.1 + taxres = make_TaxResult(gA) + print(taxres.match_ident) + assert taxres.match_ident == "GCF_001881345" + + def test_get_ident_split_but_keep_version(): - ident = "GCF_001881345.1" + ident = "GCF_001881345.1 secondname" n_id = get_ident(ident, keep_identifier_versions=True) assert n_id == "GCF_001881345.1" +def test_TaxResult_get_ident_split_but_keep_version(): + gA = {"name": "GCF_001881345.1 secondname"} + taxres = make_TaxResult(gA, keep_ident_version=True) + print("raw ident: ", taxres.raw.name) + print("keep_full?: ", taxres.keep_full_identifiers) + print("keep_version?: ",taxres.keep_identifier_versions) + print("final ident: ", taxres.match_ident) + assert taxres.match_ident == "GCF_001881345.1" + + def test_get_ident_no_split(): ident = "GCF_001881345.1 secondname" n_id = get_ident(ident, keep_full_identifiers=True) assert n_id == "GCF_001881345.1 secondname" +def test_TaxResult_get_ident_keep_full(): + gA = {"name": "GCF_001881345.1 secondname"} + taxres = make_TaxResult(gA, keep_full_ident=True) + print("raw ident: ", taxres.raw.name) + print("keep_full?: ", taxres.keep_full_identifiers) + print("keep_version?: ",taxres.keep_identifier_versions) + print("final ident: ", taxres.match_ident) + assert taxres.match_ident == "GCF_001881345.1 secondname" + + def test_collect_gather_csvs(runtmp): g_csv = utils.get_test_data('tax/test1.gather.csv') from_file = runtmp.output("tmp-from-file.txt") @@ -411,9 +723,9 @@ def test_summarize_gather_at_0(): assert cl_sum[0].f_weighted_at_rank == 0.5 assert cl_sum[0].bp_match_at_rank == 50 assert cl_sum[1].rank == 'class' - assert cl_sum[1].lineage == (LineagePair(rank='superkingdom', name='a'), - LineagePair(rank='phylum', name='b'), - LineagePair(rank='class', name='d')) + assert cl_sum[1].lineage == (LineagePair (rank='superkingdom', name='a'), + LineagePair (rank='phylum', name='b'), + LineagePair (rank='class', name='d')) assert cl_sum[1].fraction == 0.5 assert cl_sum[1].f_weighted_at_rank == 0.5 assert cl_sum[1].bp_match_at_rank == 50 @@ -438,7 +750,7 @@ def test_summarize_gather_at_1(): # superkingdom assert len(sk_sum) == 2 print("\nsuperkingdom summarized gather 0: ", sk_sum[0]) - assert sk_sum[0].lineage == (LineagePair(rank='superkingdom', name='a'),) + assert sk_sum[0].lineage == (LineagePair (rank='superkingdom', name='a'),) assert sk_sum[0].fraction == 0.7 assert sk_sum[0].bp_match_at_rank == 70 print("superkingdom summarized gather 1: ", sk_sum[1]) @@ -453,7 +765,7 @@ def test_summarize_gather_at_1(): print("phylum summarized gather 0: ", phy_sum[0]) print("phylum summarized gather 1: ", phy_sum[1]) assert len(phy_sum) == 2 - assert phy_sum[0].lineage == (LineagePair(rank='superkingdom', name='a'),LineagePair(rank='phylum', name='b')) + assert phy_sum[0].lineage == (LineagePair (rank='superkingdom', name='a'),LineagePair (rank='phylum', name='b')) assert phy_sum[0].fraction == 0.7 assert phy_sum[0].f_weighted_at_rank == 0.6 assert phy_sum[0].bp_match_at_rank == 70 @@ -466,18 +778,18 @@ def test_summarize_gather_at_1(): cl_sum, _, _ = summarize_gather_at("class", taxD, g_res, estimate_query_ani=True) assert len(cl_sum) == 3 print("class summarized gather: ", cl_sum) - assert cl_sum[0].lineage == (LineagePair(rank='superkingdom', name='a'), - LineagePair(rank='phylum', name='b'), - LineagePair(rank='class', name='c')) + assert cl_sum[0].lineage == (LineagePair (rank='superkingdom', name='a'), + LineagePair (rank='phylum', name='b'), + LineagePair (rank='class', name='c')) assert cl_sum[0].fraction == 0.6 assert cl_sum[0].f_weighted_at_rank == 0.5 assert cl_sum[0].bp_match_at_rank == 60 assert cl_sum[0].query_ani_at_rank == 0.9836567776983505 assert cl_sum[1].rank == 'class' - assert cl_sum[1].lineage == (LineagePair(rank='superkingdom', name='a'), - LineagePair(rank='phylum', name='b'), - LineagePair(rank='class', name='d')) + assert cl_sum[1].lineage == (LineagePair (rank='superkingdom', name='a'), + LineagePair (rank='phylum', name='b'), + LineagePair (rank='class', name='d')) assert cl_sum[1].fraction == 0.1 assert cl_sum[1].f_weighted_at_rank == 0.1 assert cl_sum[1].bp_match_at_rank == 10 @@ -504,7 +816,7 @@ def test_summarize_gather_at_perfect_match(): # superkingdom assert len(sk_sum) == 1 print("superkingdom summarized gather: ", sk_sum[0]) - assert sk_sum[0].lineage == (LineagePair(rank='superkingdom', name='a'),) + assert sk_sum[0].lineage == (LineagePair (rank='superkingdom', name='a'),) assert sk_sum[0].fraction == 1.0 @@ -534,15 +846,15 @@ def test_summarize_gather_at_over100percent_f_unique_to_query(): cl_sum, _, _ = summarize_gather_at("class", taxD, g_res) assert len(cl_sum) == 2 print("class summarized gather: ", cl_sum) - assert cl_sum[0].lineage == (LineagePair(rank='superkingdom', name='a'), - LineagePair(rank='phylum', name='b'), - LineagePair(rank='class', name='d')) + assert cl_sum[0].lineage == (LineagePair (rank='superkingdom', name='a'), + LineagePair (rank='phylum', name='b'), + LineagePair (rank='class', name='d')) assert cl_sum[0].fraction == 0.6 assert cl_sum[0].bp_match_at_rank == 60 assert cl_sum[1].rank == 'class' - assert cl_sum[1].lineage == (LineagePair(rank='superkingdom', name='a'), - LineagePair(rank='phylum', name='b'), - LineagePair(rank='class', name='c')) + assert cl_sum[1].lineage == (LineagePair (rank='superkingdom', name='a'), + LineagePair (rank='phylum', name='b'), + LineagePair (rank='class', name='c')) assert cl_sum[1].fraction == 0.5 assert cl_sum[1].bp_match_at_rank == 50 @@ -563,7 +875,7 @@ def test_summarize_gather_at_missing_ignore(): # superkingdom assert len(sk_sum) == 2 print("superkingdom summarized gather: ", sk_sum[0]) - assert sk_sum[0].lineage == (LineagePair(rank='superkingdom', name='a'),) + assert sk_sum[0].lineage == (LineagePair (rank='superkingdom', name='a'),) assert sk_sum[0].fraction == 0.5 assert sk_sum[0].bp_match_at_rank == 50 assert sk_sum[1].lineage == () @@ -574,7 +886,7 @@ def test_summarize_gather_at_missing_ignore(): phy_sum, _, _ = summarize_gather_at("phylum", taxD, g_res, skip_idents=['gB']) print("phylum summarized gather: ", phy_sum[0]) assert len(phy_sum) == 2 - assert phy_sum[0].lineage == (LineagePair(rank='superkingdom', name='a'),LineagePair(rank='phylum', name='b')) + assert phy_sum[0].lineage == (LineagePair (rank='superkingdom', name='a'),LineagePair (rank='phylum', name='b')) assert phy_sum[0].fraction == 0.5 assert phy_sum[0].bp_match_at_rank == 50 assert phy_sum[1].lineage == () @@ -584,9 +896,9 @@ def test_summarize_gather_at_missing_ignore(): cl_sum, _, _ = summarize_gather_at("class", taxD, g_res, skip_idents=['gB']) assert len(cl_sum) == 2 print("class summarized gather: ", cl_sum) - assert cl_sum[0].lineage == (LineagePair(rank='superkingdom', name='a'), - LineagePair(rank='phylum', name='b'), - LineagePair(rank='class', name='c')) + assert cl_sum[0].lineage == (LineagePair (rank='superkingdom', name='a'), + LineagePair (rank='phylum', name='b'), + LineagePair (rank='class', name='c')) assert cl_sum[0].fraction == 0.5 assert cl_sum[0].bp_match_at_rank == 50 assert cl_sum[1].lineage == () @@ -629,7 +941,7 @@ def test_summarize_gather_at_best_only_0(): # superkingdom assert len(sk_sum) == 1 print("superkingdom summarized gather: ", sk_sum[0]) - assert sk_sum[0].lineage == (LineagePair(rank='superkingdom', name='a'),) + assert sk_sum[0].lineage == (LineagePair (rank='superkingdom', name='a'),) assert sk_sum[0].fraction == 0.7 assert sk_sum[0].bp_match_at_rank == 70 print("superk ANI:",sk_sum[0].query_ani_at_rank) @@ -639,7 +951,7 @@ def test_summarize_gather_at_best_only_0(): phy_sum, _, _ = summarize_gather_at("phylum", taxD, g_res, best_only=True,estimate_query_ani=True) print("phylum summarized gather: ", phy_sum[0]) assert len(phy_sum) == 1 - assert phy_sum[0].lineage == (LineagePair(rank='superkingdom', name='a'),LineagePair(rank='phylum', name='b')) + assert phy_sum[0].lineage == (LineagePair (rank='superkingdom', name='a'),LineagePair (rank='phylum', name='b')) assert phy_sum[0].fraction == 0.7 assert phy_sum[0].bp_match_at_rank == 70 print("phy ANI:",phy_sum[0].query_ani_at_rank) @@ -648,9 +960,9 @@ def test_summarize_gather_at_best_only_0(): cl_sum, _, _ = summarize_gather_at("class", taxD, g_res, best_only=True, estimate_query_ani=True) assert len(cl_sum) == 1 print("class summarized gather: ", cl_sum) - assert cl_sum[0].lineage == (LineagePair(rank='superkingdom', name='a'), - LineagePair(rank='phylum', name='b'), - LineagePair(rank='class', name='c')) + assert cl_sum[0].lineage == (LineagePair (rank='superkingdom', name='a'), + LineagePair (rank='phylum', name='b'), + LineagePair (rank='class', name='c')) assert cl_sum[0].fraction == 0.6 assert cl_sum[0].bp_match_at_rank == 60 print("cl ANI:",cl_sum[0].query_ani_at_rank) @@ -674,9 +986,9 @@ def test_summarize_gather_at_best_only_equal_choose_first(): cl_sum, _, _ = summarize_gather_at("class", taxD, g_res, best_only=True) assert len(cl_sum) == 1 print("class summarized gather: ", cl_sum) - assert cl_sum[0].lineage == (LineagePair(rank='superkingdom', name='a'), - LineagePair(rank='phylum', name='b'), - LineagePair(rank='class', name='c')) + assert cl_sum[0].lineage == (LineagePair (rank='superkingdom', name='a'), + LineagePair (rank='phylum', name='b'), + LineagePair (rank='class', name='c')) assert cl_sum[0].fraction == 0.5 assert cl_sum[0].bp_match_at_rank == 50 @@ -684,17 +996,17 @@ def test_summarize_gather_at_best_only_equal_choose_first(): def test_write_summary_csv(runtmp): """test summary csv write function""" - sum_gather = {'superkingdom': [SummarizedGatherResult(query_name='queryA', rank='superkingdom', fraction=1.0, + sum_gather = {'superkingdom': [SumGathInf(query_name='queryA', rank='superkingdom', fraction=1.0, query_md5='queryA_md5', query_filename='queryA.sig', f_weighted_at_rank=1.0, bp_match_at_rank=100, - lineage=(LineagePair(rank='superkingdom', name='a'),), + lineage=(LineagePair (rank='superkingdom', name='a'),), query_ani_at_rank=None, total_weighted_hashes=0)], - 'phylum': [SummarizedGatherResult(query_name='queryA', rank='phylum', fraction=1.0, + 'phylum': [SumGathInf(query_name='queryA', rank='phylum', fraction=1.0, query_md5='queryA_md5', query_filename='queryA.sig', f_weighted_at_rank=1.0, bp_match_at_rank=100, - lineage=(LineagePair(rank='superkingdom', name='a'), - LineagePair(rank='phylum', name='b')), + lineage=(LineagePair (rank='superkingdom', name='a'), + LineagePair (rank='phylum', name='b')), query_ani_at_rank=None, total_weighted_hashes=0)]} @@ -711,9 +1023,9 @@ def test_write_summary_csv(runtmp): def test_write_classification(runtmp): """test classification csv write function""" - classif = ClassificationResult('queryA', 'match', 'phylum', 1.0, - (LineagePair(rank='superkingdom', name='a'), - LineagePair(rank='phylum', name='b')), + classif = ClassInf('queryA', 'match', 'phylum', 1.0, + (LineagePair (rank='superkingdom', name='a'), + LineagePair (rank='phylum', name='b')), 'queryA_md5', 'queryA.sig', 1.0, 100, query_ani_at_rank=None) @@ -771,7 +1083,7 @@ def test_aggregate_by_lineage_at_rank_by_query(): print("superkingdom summarized gather results:", sk_sum) assert len(sk_sum) ==4 assert sk_sum[0].query_name == "queryA" - assert sk_sum[0].lineage == (LineagePair(rank='superkingdom', name='a'),) + assert sk_sum[0].lineage == (LineagePair (rank='superkingdom', name='a'),) assert sk_sum[0].fraction == 0.9 assert sk_sum[0].bp_match_at_rank == 160 # check for unassigned for queryA @@ -781,7 +1093,7 @@ def test_aggregate_by_lineage_at_rank_by_query(): assert round(sk_sum[1].fraction,1) == 0.1 # queryB assert sk_sum[2].query_name == "queryB" - assert sk_sum[2].lineage == (LineagePair(rank='superkingdom', name='a'),) + assert sk_sum[2].lineage == (LineagePair (rank='superkingdom', name='a'),) assert sk_sum[2].fraction == 0.3 assert sk_sum[2].bp_match_at_rank == 60 # check for unassigned for queryA @@ -791,7 +1103,7 @@ def test_aggregate_by_lineage_at_rank_by_query(): assert sk_sum[3].bp_match_at_rank == 140 sk_lin_sum, query_names, num_queries = aggregate_by_lineage_at_rank(sk_sum, by_query=True) print("superkingdom lineage summary:", sk_lin_sum, '\n') - assert sk_lin_sum == {(LineagePair(rank='superkingdom', name='a'),): {'queryA': 0.9, 'queryB': 0.3}, + assert sk_lin_sum == {(LineagePair (rank='superkingdom', name='a'),): {'queryA': 0.9, 'queryB': 0.3}, (): {'queryA': 0.09999999999999998, 'queryB': 0.7}} assert num_queries == 2 assert query_names == ['queryA', 'queryB'] @@ -800,8 +1112,8 @@ def test_aggregate_by_lineage_at_rank_by_query(): print("phylum summary:", phy_sum, ']\n') phy_lin_sum, query_names, num_queries = aggregate_by_lineage_at_rank(phy_sum, by_query=True) print("phylum lineage summary:", phy_lin_sum, '\n') - assert phy_lin_sum == {(LineagePair(rank='superkingdom', name='a'), LineagePair(rank='phylum', name='b')): {'queryA': 0.5}, - (LineagePair(rank='superkingdom', name='a'), LineagePair(rank='phylum', name='c')): {'queryA': 0.4, 'queryB': 0.3}, + assert phy_lin_sum == {(LineagePair (rank='superkingdom', name='a'), LineagePair (rank='phylum', name='b')): {'queryA': 0.5}, + (LineagePair (rank='superkingdom', name='a'), LineagePair (rank='phylum', name='c')): {'queryA': 0.4, 'queryB': 0.3}, (): {'queryA': 0.09999999999999998, 'queryB': 0.7}} assert num_queries == 2 assert query_names == ['queryA', 'queryB'] @@ -908,30 +1220,30 @@ def test_write_krona(runtmp): def test_combine_sumgather_csvs_by_lineage(runtmp): # some summarized gather dicts - sum_gather1 = {'superkingdom': [SummarizedGatherResult(query_name='queryA', rank='superkingdom', fraction=0.5, + sum_gather1 = {'superkingdom': [SumGathInf(query_name='queryA', rank='superkingdom', fraction=0.5, query_md5='queryA_md5', query_filename='queryA.sig', f_weighted_at_rank=1.0, bp_match_at_rank=100, - lineage=(LineagePair(rank='superkingdom', name='a'),), + lineage=(LineagePair (rank='superkingdom', name='a'),), query_ani_at_rank=None, total_weighted_hashes=0)], - 'phylum': [SummarizedGatherResult(query_name='queryA', rank='phylum', fraction=0.5, + 'phylum': [SumGathInf(query_name='queryA', rank='phylum', fraction=0.5, query_md5='queryA_md5', query_filename='queryA.sig', f_weighted_at_rank=0.5, bp_match_at_rank=50, - lineage=(LineagePair(rank='superkingdom', name='a'), - LineagePair(rank='phylum', name='b')), + lineage=(LineagePair (rank='superkingdom', name='a'), + LineagePair (rank='phylum', name='b')), query_ani_at_rank=None, total_weighted_hashes=0)]} - sum_gather2 = {'superkingdom': [SummarizedGatherResult(query_name='queryB', rank='superkingdom', fraction=0.7, + sum_gather2 = {'superkingdom': [SumGathInf(query_name='queryB', rank='superkingdom', fraction=0.7, query_md5='queryB_md5', query_filename='queryB.sig', f_weighted_at_rank=0.7, bp_match_at_rank=70, - lineage=(LineagePair(rank='superkingdom', name='a'),), + lineage=(LineagePair (rank='superkingdom', name='a'),), query_ani_at_rank=None, total_weighted_hashes=0)], - 'phylum': [SummarizedGatherResult(query_name='queryB', rank='phylum', fraction=0.7, + 'phylum': [SumGathInf(query_name='queryB', rank='phylum', fraction=0.7, query_md5='queryB_md5', query_filename='queryB.sig', f_weighted_at_rank=0.7, bp_match_at_rank=70, - lineage=(LineagePair(rank='superkingdom', name='a'), - LineagePair(rank='phylum', name='c')), + lineage=(LineagePair (rank='superkingdom', name='a'), + LineagePair (rank='phylum', name='c')), query_ani_at_rank=None, total_weighted_hashes=0)]} @@ -1003,30 +1315,30 @@ def test_write_lineage_sample_frac_format_lineage(runtmp): def test_combine_sumgather_csvs_by_lineage_improper_rank(runtmp): # some summarized gather dicts - sum_gather1 = {'superkingdom': [SummarizedGatherResult(query_name='queryA', rank='superkingdom', fraction=0.5, + sum_gather1 = {'superkingdom': [SumGathInf(query_name='queryA', rank='superkingdom', fraction=0.5, query_md5='queryA_md5', query_filename='queryA.sig', f_weighted_at_rank=0.5, bp_match_at_rank=50, - lineage=(LineagePair(rank='superkingdom', name='a'),), + lineage=(LineagePair (rank='superkingdom', name='a'),), query_ani_at_rank=None, total_weighted_hashes=0)], - 'phylum': [SummarizedGatherResult(query_name='queryA', rank='phylum', fraction=0.5, + 'phylum': [SumGathInf(query_name='queryA', rank='phylum', fraction=0.5, query_md5='queryA_md5', query_filename='queryA.sig', f_weighted_at_rank=0.5, bp_match_at_rank=50, - lineage=(LineagePair(rank='superkingdom', name='a'), - LineagePair(rank='phylum', name='b')), + lineage=(LineagePair (rank='superkingdom', name='a'), + LineagePair (rank='phylum', name='b')), query_ani_at_rank=None, total_weighted_hashes=0)]} - sum_gather2 = {'superkingdom': [SummarizedGatherResult(query_name='queryB', rank='superkingdom', fraction=0.7, + sum_gather2 = {'superkingdom': [SumGathInf(query_name='queryB', rank='superkingdom', fraction=0.7, query_md5='queryB_md5', query_filename='queryB.sig', f_weighted_at_rank=0.7, bp_match_at_rank=70, - lineage=(LineagePair(rank='superkingdom', name='a'),), + lineage=(LineagePair (rank='superkingdom', name='a'),), query_ani_at_rank=None, total_weighted_hashes=0)], - 'phylum': [SummarizedGatherResult(query_name='queryB', rank='phylum', fraction=0.7, + 'phylum': [SumGathInf(query_name='queryB', rank='phylum', fraction=0.7, query_md5='queryB_md5', query_filename='queryB.sig', f_weighted_at_rank=0.7, bp_match_at_rank=70, - lineage=(LineagePair(rank='superkingdom', name='a'), - LineagePair(rank='phylum', name='c')), + lineage=(LineagePair (rank='superkingdom', name='a'), + LineagePair (rank='phylum', name='c')), query_ani_at_rank=None, total_weighted_hashes=0)]} @@ -1236,11 +1548,17 @@ def test_lineage_db_sql_load(runtmp): def test_LineagePair(): lin = LineagePair(rank="rank1", name='name1') print(lin) + assert lin.rank=="rank1" + assert lin.name =="name1" + assert lin.taxid==None def test_LineagePair_1(): - lin1 = LineagePair(rank="rank1", name='name1', taxid=1) - print(lin1) + lin = LineagePair(rank="rank1", name='name1', taxid=1) + assert lin.rank=="rank1" + assert lin.name =="name1" + assert lin.taxid==1 + print(lin) def test_BaseLineageInfo_init_empty(): @@ -1693,3 +2011,1040 @@ def test_lineage_at_rank_below_rank(): LineagePair(rank='class', name='c__c', taxid=None), LineagePair(rank='order', name='o__d', taxid=None), LineagePair(rank='family', name='f__f', taxid=None)) + + +def test_TaxResult_get_match_lineage_1(): + gA_tax = ("gA", "a;b;c") + taxD = make_mini_taxonomy([gA_tax]) + + gA = {"name": "gA.1 name"} + taxres = make_TaxResult(gA) + taxres.get_match_lineage(tax_assignments=taxD) + assert taxres.lineageInfo.display_lineage() == "a;b;c" + + +def test_TaxResult_get_match_lineage_skip_ident(): + gA_tax = ("gA", "a;b;c") + taxD = make_mini_taxonomy([gA_tax]) + + gA = {"name": "gA.1 name"} + taxres = make_TaxResult(gA) + taxres.get_match_lineage(tax_assignments=taxD, skip_idents=['gA']) + print("skipped_ident?: ", taxres.skipped_ident) + print("missed_ident?: ", taxres.missed_ident) + assert taxres.skipped_ident == True + assert taxres.lineageInfo.display_lineage() == "" + + +def test_TaxResult_get_match_lineage_missed_ident(): + gA_tax = ("gA.1", "a;b;c") + taxD = make_mini_taxonomy([gA_tax]) + + gA = {"name": "gA.1 name"} + taxres = make_TaxResult(gA) + taxres.get_match_lineage(tax_assignments=taxD, skip_idents=['gB']) + print("skipped_ident?: ", taxres.skipped_ident) + print("missed_ident?: ", taxres.missed_ident) + assert taxres.skipped_ident == False + assert taxres.missed_ident == True + assert taxres.lineageInfo.display_lineage() == "" + + +def test_TaxResult_get_match_lineage_missed_ident_fail_on_missing(): + gA_tax = ("gA.1", "a;b;c") + taxD = make_mini_taxonomy([gA_tax]) + + gA = {"name": "gA.1 name"} + taxres = make_TaxResult(gA) + with pytest.raises(ValueError) as exc: + taxres.get_match_lineage(tax_assignments=taxD, skip_idents=['gB'], fail_on_missing_taxonomy=True) + print(str(exc)) + assert "Error: ident 'gA' is not in the taxonomy database." in str(exc) + + +def test_QueryTaxResult(): + "basic functionality: initialize and add a taxresult" + tax_info = [("gA", "a;b;c")] + taxD = make_mini_taxonomy(tax_info=tax_info) + taxres = make_TaxResult(taxD=taxD) + # initialize + q_res = QueryTaxResult(taxres.query_info) + assert q_res.ranks == [] + assert q_res.ascending_ranks == [] + q_res.add_taxresult(taxres) + # check that new querytaxres is compatible with taxres + assert q_res.is_compatible(taxres) + # check that a few thngs were set properly and/or are not yet set. + assert q_res.query_name == "q1" + assert q_res.query_info.query_bp == 100 + assert len(q_res.raw_taxresults) == 1 + assert q_res.skipped_idents == set() + assert q_res.missed_idents == set() + assert q_res.summarized_lineage_results == {} + taxranks = ('superkingdom', 'phylum', 'class', 'order', 'family', 'genus', 'species', 'strain') + assert q_res.ranks == taxranks + assert q_res.ascending_ranks == taxranks[::-1] + + +def test_QueryTaxResult_add_incompatible(): + "initialize and try to add incompatible taxresult" + tax_info = [("gA", "a;b;c")] + taxD = make_mini_taxonomy(tax_info=tax_info) + taxres = make_TaxResult(taxD=taxD) + taxres2 = make_TaxResult({'query_name': 'q2'}, taxD=taxD) + # initialize + q_res = QueryTaxResult(taxres.query_info) + # check that new querytaxres is compatible with taxres and not taxres2 + assert q_res.is_compatible(taxres) + assert not q_res.is_compatible(taxres2) + q_res.add_taxresult(taxres) + with pytest.raises(ValueError) as exc: + q_res.add_taxresult(taxres2) + print(str(exc)) + assert "Error: Cannot add TaxResult: query information does not match." in str(exc) + + +def test_QueryTaxResult_add_without_tax_info(): + "initialize and add a taxresult with missed ident" + taxres = make_TaxResult() # do not add taxonomic info + # initialize + q_res = QueryTaxResult(taxres.query_info) + print("attempted to add lineage info?: ", taxres.match_lineage_attempted) + with pytest.raises(ValueError) as exc: + q_res.add_taxresult(taxres) + print(str(exc)) + assert "Error: Cannot add TaxResult. Please use get_match_lineage() to add taxonomic lineage information first." in str(exc) + + +def test_QueryTaxResult_add_skipped_ident(): + "initialize and add a taxresult with skipped ident" + gA_tax = ("gA", "a;b;c") + taxD = make_mini_taxonomy([gA_tax]) + taxres = make_TaxResult(taxD=taxD, skip_idents = ['gA']) +# taxres.get_match_lineage(tax_assignments=taxD, skip_idents=['gA']) + # initialize + q_res = QueryTaxResult(taxres.query_info) + q_res.add_taxresult(taxres) + assert len(q_res.skipped_idents) == 1 + assert len(q_res.raw_taxresults) == 1 + assert q_res.missed_idents == set() + assert q_res.summarized_lineage_results == {} + + +def test_QueryTaxResult_add_missed_ident(): + "initialize and add a taxresult with missed ident" + gA_tax = ("gB", "a;b;c") + taxD = make_mini_taxonomy([gA_tax]) + taxres = make_TaxResult(taxD=taxD) + # initialize + q_res = QueryTaxResult(taxres.query_info) + # add taxonomic info to taxres + q_res.add_taxresult(taxres) + assert len(q_res.missed_idents) == 1 + assert len(q_res.raw_taxresults) == 1 + assert q_res.skipped_idents == set() + assert q_res.summarized_lineage_results == {} + + +def test_QueryTaxResult_track_missed_and_skipped(): + "make sure missed and skipped idents are being tracked" + # make taxonomy + tax_info = [("gA", "a;b;c"), ("gB", "a;b;d")] + taxD = make_mini_taxonomy(tax_info=tax_info) + # make results + taxres = make_TaxResult() + taxres2 = make_TaxResult({"name": 'gB'}) # skipped + taxres3 = make_TaxResult({"name": 'gB'}) # skipped + taxres4 = make_TaxResult({"name": 'gC'}) # skipped + taxres5 = make_TaxResult({"name": 'gD'}) # missed + taxres6 = make_TaxResult({"name": 'gE'}) # missed + # initialize + q_res = QueryTaxResult(taxres.query_info) + # add taxonomic info to taxres, add to q_res + for n, tr in enumerate([taxres, taxres2, taxres3, taxres4, taxres5, taxres6]): + tr.get_match_lineage(tax_assignments=taxD, skip_idents=['gB', 'gC']) + print("num: ", n) + print("skipped?: ", tr.skipped_ident) + print("missed?: ", tr.missed_ident) + q_res.add_taxresult(tr) + assert len(q_res.raw_taxresults) == 6 + print(q_res.n_skipped) + print(q_res.n_missed) + assert q_res.n_missed == 2 + assert q_res.n_skipped == 3 + assert 'gB' in q_res.skipped_idents + assert len(q_res.skipped_idents) == 2 + assert 'gD' in q_res.missed_idents + assert q_res.summarized_lineage_results == {} + + +def test_QueryTaxResult_track_missed_and_skipped_using_fn(): + "make sure missed and skipped idents are being tracked. Same as above but use helper fn." + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}, {"name": 'gB'}, {"name": 'gC'}, {"name": 'gD'}, {"name": 'gE'}] + gres = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, skip_idents=['gB', 'gC']) + # should have 6 results for default query 'q1' + print(gres.keys()) + q_res = next(iter(gres.values())) + assert len(q_res.raw_taxresults) == 6 + print(q_res.n_skipped) + print(q_res.n_missed) + assert q_res.n_missed == 2 + assert q_res.n_skipped == 3 + assert 'gB' in q_res.skipped_idents + assert len(q_res.skipped_idents) == 2 + assert 'gD' in q_res.missed_idents + assert q_res.summarized_lineage_results == {} + + +def test_QueryTaxResult_summarize_up_ranks_1(): + "basic functionality: summarize up ranks" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + gres = make_QueryTaxResults(gather_info=gather_results, taxD=taxD) + assert len(gres.keys()) == 1 + q_res = next(iter(gres.values())) + # now summarize up the ranks + q_res.summarize_up_ranks() + assert len(q_res.raw_taxresults) == 2 + #print(q_res.sum_uniq_weighted.values()) + #print(q_res.sum_uniq_weighted['superkingdom']) + assert list(q_res.sum_uniq_weighted.keys()) == ['class', 'phylum', 'superkingdom'] + assert q_res.sum_uniq_weighted['superkingdom'] == {RankLineageInfo(lineage_str="a"): approx(0.4)} + assert q_res.sum_uniq_to_query['superkingdom'] == {RankLineageInfo(lineage_str="a"): approx(0.2)} + assert q_res.sum_uniq_bp['superkingdom'] == {RankLineageInfo(lineage_str="a"): 40} + assert q_res.sum_uniq_weighted['phylum'] == {RankLineageInfo(lineage_str="a;b"): approx(0.4)} + assert q_res.sum_uniq_to_query['phylum'] == {RankLineageInfo(lineage_str="a;b"): approx(0.2)} + assert q_res.sum_uniq_bp['phylum'] == {RankLineageInfo(lineage_str="a;b"): 40} + assert q_res.sum_uniq_weighted['class'] == {RankLineageInfo(lineage_str="a;b;c"): approx(0.2), + RankLineageInfo(lineage_str="a;b;d"): approx(0.2)} + assert q_res.sum_uniq_to_query['class'] == {RankLineageInfo(lineage_str="a;b;c"): approx(0.1), + RankLineageInfo(lineage_str="a;b;d"): approx(0.1)} + assert q_res.sum_uniq_bp['class'] == {RankLineageInfo(lineage_str="a;b;c"): 20, + RankLineageInfo(lineage_str="a;b;d"): 20} + + +def test_QueryTaxResult_summarize_up_ranks_2(): + "summarize up ranks: different values" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB','f_unique_weighted': 0.1,'f_unique_to_query': 0.05,'unique_intersect_bp': 10,}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + # now summarize up the ranks + q_res.summarize_up_ranks() + assert len(q_res.raw_taxresults) == 2 + print(q_res.sum_uniq_weighted.values()) + print(q_res.sum_uniq_weighted['superkingdom']) + assert q_res.sum_uniq_weighted['superkingdom'] == {RankLineageInfo(lineage_str="a"): approx(0.3)} + assert q_res.sum_uniq_to_query['superkingdom'] == {RankLineageInfo(lineage_str="a"): approx(0.15)} + assert q_res.sum_uniq_bp['superkingdom'] == {RankLineageInfo(lineage_str="a"): 30} + assert q_res.sum_uniq_weighted['phylum'] == {RankLineageInfo(lineage_str="a;b"): approx(0.3)} + assert q_res.sum_uniq_to_query['phylum'] == {RankLineageInfo(lineage_str="a;b"): approx(0.15)} + assert q_res.sum_uniq_bp['phylum'] == {RankLineageInfo(lineage_str="a;b"): 30} + assert q_res.sum_uniq_weighted['class'] == {RankLineageInfo(lineage_str="a;b;c"): approx(0.2), + RankLineageInfo(lineage_str="a;b;d"): approx(0.1)} + assert q_res.sum_uniq_to_query['class'] == {RankLineageInfo(lineage_str="a;b;c"): approx(0.1), + RankLineageInfo(lineage_str="a;b;d"): approx(0.05)} + assert q_res.sum_uniq_bp['class'] == {RankLineageInfo(lineage_str="a;b;c"): 20, + RankLineageInfo(lineage_str="a;b;d"): 10} + + + +def test_QueryTaxResult_summarize_up_ranks_missing_lineage(): + "basic functionality: summarize up ranks" + taxD = make_mini_taxonomy([("gA", "a;b;c")]) + gather_results = [{}, {"name": 'gB'}] + gres = make_QueryTaxResults(gather_info=gather_results, taxD=taxD) + assert len(gres.keys()) == 1 + q_res = next(iter(gres.values())) + # now summarize up the ranks + q_res.summarize_up_ranks() + assert len(q_res.raw_taxresults) == 2 + #print(q_res.sum_uniq_weighted.values()) + print(q_res.sum_uniq_weighted['superkingdom']) + assert q_res.sum_uniq_weighted['superkingdom'] == {RankLineageInfo(lineage_str="a"): approx(0.2)} + assert q_res.sum_uniq_to_query['superkingdom'] == {RankLineageInfo(lineage_str="a"): approx(0.1)} + assert q_res.sum_uniq_bp['superkingdom'] == {RankLineageInfo(lineage_str="a"): 20} + assert q_res.sum_uniq_weighted['phylum'] == {RankLineageInfo(lineage_str="a;b"): approx(0.2)} + assert q_res.sum_uniq_to_query['phylum'] == {RankLineageInfo(lineage_str="a;b"): approx(0.1)} + assert q_res.sum_uniq_bp['phylum'] == {RankLineageInfo(lineage_str="a;b"): 20} + assert q_res.sum_uniq_weighted['class'] == {RankLineageInfo(lineage_str="a;b;c"): approx(0.2)} + assert q_res.sum_uniq_to_query['class'] == {RankLineageInfo(lineage_str="a;b;c"): approx(0.1)} + assert q_res.sum_uniq_bp['class'] == {RankLineageInfo(lineage_str="a;b;c"): 20} + + +def test_QueryTaxResult_summarize_up_ranks_skipped_lineage(): + "basic functionality: summarize up ranks" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + gres = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, skip_idents=['gB']) + assert len(gres.keys()) == 1 + q_res = next(iter(gres.values())) + # now summarize up the ranks + q_res.summarize_up_ranks() + assert len(q_res.raw_taxresults) == 2 + assert list(q_res.sum_uniq_weighted.keys()) == ['class', 'phylum', 'superkingdom'] + #print(q_res.sum_uniq_weighted.values()) + print(q_res.sum_uniq_weighted['superkingdom']) + assert q_res.sum_uniq_weighted['superkingdom'] == {RankLineageInfo(lineage_str="a"): approx(0.2)} + assert q_res.sum_uniq_to_query['superkingdom'] == {RankLineageInfo(lineage_str="a"): approx(0.1)} + assert q_res.sum_uniq_bp['superkingdom'] == {RankLineageInfo(lineage_str="a"): 20} + assert q_res.sum_uniq_weighted['phylum'] == {RankLineageInfo(lineage_str="a;b"): approx(0.2)} + assert q_res.sum_uniq_to_query['phylum'] == {RankLineageInfo(lineage_str="a;b"): approx(0.1)} + assert q_res.sum_uniq_bp['phylum'] == {RankLineageInfo(lineage_str="a;b"): 20} + assert q_res.sum_uniq_weighted['class'] == {RankLineageInfo(lineage_str="a;b;c"): approx(0.2)} + assert q_res.sum_uniq_to_query['class'] == {RankLineageInfo(lineage_str="a;b;c"): approx(0.1)} + assert q_res.sum_uniq_bp['class'] == {RankLineageInfo(lineage_str="a;b;c"): 20} + + +def test_QueryTaxResult_summarize_up_ranks_perfect_match(): + "summarize up ranks: different values" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{'f_unique_to_query': 1.0}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + # now summarize up the ranks + q_res.summarize_up_ranks() + assert len(q_res.raw_taxresults) == 1 + print(q_res.sum_uniq_weighted.values()) + print(q_res.sum_uniq_to_query['superkingdom']) + assert list(q_res.sum_uniq_to_query['superkingdom'].values()) == [1.0] + assert 'gA' in q_res.perfect_match + + +def test_QueryTaxResult_summarize_up_ranks_already_summarized(): + "summarize up ranks: error, already summarized" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{'f_unique_to_query': 1.0}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + # now summarize up the ranks + q_res.summarize_up_ranks() + with pytest.raises(ValueError) as exc: + q_res.summarize_up_ranks() + print(str(exc)) + assert "Error: already summarized" in str(exc) + + +def test_QueryTaxResult_summarize_up_ranks_already_summarized_force(): + "summarize up ranks: already summarized but force" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB','f_unique_weighted': 0.1,'f_unique_to_query': 0.05,'unique_intersect_bp': 10,}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + # now summarize up the ranks + q_res.summarize_up_ranks() + q_res.summarize_up_ranks(force_resummarize=True) + assert list(q_res.sum_uniq_weighted.keys()) == ['class', 'phylum', 'superkingdom'] + + #check that all results are still good + assert len(q_res.raw_taxresults) == 2 + assert q_res.sum_uniq_weighted['superkingdom'] == {RankLineageInfo(lineage_str="a"): approx(0.3)} + assert q_res.sum_uniq_weighted['phylum'] == {RankLineageInfo(lineage_str="a;b"): approx(0.3)} + assert q_res.sum_uniq_to_query['phylum'] == {RankLineageInfo(lineage_str="a;b"): approx(0.15)} + assert q_res.sum_uniq_bp['phylum'] == {RankLineageInfo(lineage_str="a;b"): 30} + assert q_res.sum_uniq_to_query['class'] == {RankLineageInfo(lineage_str="a;b;c"): approx(0.1), + RankLineageInfo(lineage_str="a;b;d"): approx(0.05)} + assert q_res.sum_uniq_weighted['class'] == {RankLineageInfo(lineage_str="a;b;c"): approx(0.2), + RankLineageInfo(lineage_str="a;b;d"): approx(0.1)} + assert q_res.sum_uniq_bp['class'] == {RankLineageInfo(lineage_str="a;b;c"): 20, + RankLineageInfo(lineage_str="a;b;d"): 10} + + +def test_QueryTaxResult_summarize_up_ranks_single_rank(): + "summarize up ranks: different values" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB','f_unique_weighted': 0.1,'f_unique_to_query': 0.05,'unique_intersect_bp': 10,}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + # now summarize up the ranks + q_res.summarize_up_ranks(single_rank='phylum') + assert len(q_res.raw_taxresults) == 2 + assert list(q_res.sum_uniq_weighted.keys()) == ['phylum'] + print(q_res.sum_uniq_weighted.keys()) + print(q_res.sum_uniq_weighted.values()) + print(q_res.sum_uniq_weighted['phylum']) + assert q_res.sum_uniq_weighted['phylum'] == {RankLineageInfo(lineage_str="a;b"): approx(0.3)} + assert list(q_res.sum_uniq_to_query['phylum'].values()) == [approx(0.15)] + assert list(q_res.sum_uniq_bp['phylum'].values()) == [30] + assert q_res.summarized_ranks == ['phylum'] + +def test_QueryTaxResult_summarize_up_ranks_single_rank_not_available(): + "summarize up ranks: different values" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB','f_unique_weighted': 0.1,'f_unique_to_query': 0.05,'unique_intersect_bp': 10,}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + # now summarize up the ranks + with pytest.raises(ValueError) as exc: + q_res.summarize_up_ranks(single_rank='NotARank') + print(str(exc)) + assert "Error: rank 'NotARank' not in available ranks (strain, species, genus, family, order, class, phylum, superkingdom)" in str(exc) + + +def test_QueryTaxResult_summarize_up_ranks_single_rank_not_filled(): + "summarize up ranks: different values" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB','f_unique_weighted': 0.1,'f_unique_to_query': 0.05,'unique_intersect_bp': 10,}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + # now summarize up the ranks + with pytest.raises(ValueError) as exc: + q_res.summarize_up_ranks(single_rank='species') + print(str(exc)) + assert "Error: rank 'species' was not available for any matching lineages." in str(exc) + + +def test_QueryTaxResult_build_summarized_result_1(): + "basic functionality: build summarized_result" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + q_res.build_summarized_result() + print(q_res.summarized_lineage_results.keys()) + sk = [SummarizedGatherResult(rank='superkingdom', fraction=0.2, f_weighted_at_rank=0.4, + lineage=RankLineageInfo(lineage_str='a'), + bp_match_at_rank=40, query_ani_at_rank=approx(0.95, rel=1e-2)), + SummarizedGatherResult(rank='superkingdom', fraction=0.8, f_weighted_at_rank=0.6, + lineage=(), bp_match_at_rank=60, query_ani_at_rank=None)] + print(q_res.summarized_lineage_results['superkingdom']) + assert q_res.summarized_lineage_results['superkingdom'] == sk + print(q_res.summarized_lineage_results['phylum']) + phy = [SummarizedGatherResult(rank='phylum', fraction=0.2, f_weighted_at_rank=0.4, + lineage=RankLineageInfo(lineage_str='a;b'), + bp_match_at_rank=40, query_ani_at_rank=approx(0.95, rel=1e-2)), + SummarizedGatherResult(rank='phylum', fraction=0.8, f_weighted_at_rank=0.6, + lineage=(), bp_match_at_rank=60, query_ani_at_rank=None)] + assert q_res.summarized_lineage_results['phylum'] == phy + print(q_res.summarized_lineage_results['class']) + cl = [SummarizedGatherResult(rank='class', fraction=0.1, f_weighted_at_rank=0.2, + lineage=RankLineageInfo(lineage_str='a;b;c'), + bp_match_at_rank=20, query_ani_at_rank=approx(0.93, rel=1e-2)), + SummarizedGatherResult(rank='class', fraction=0.1, f_weighted_at_rank=0.2, + lineage=RankLineageInfo(lineage_str='a;b;d'), + bp_match_at_rank=20, query_ani_at_rank=approx(0.93, rel=1e-2)), + SummarizedGatherResult(rank='class', fraction=0.8, f_weighted_at_rank=0.6, + lineage=(), bp_match_at_rank=60, query_ani_at_rank=None)] + assert q_res.summarized_lineage_results['class'] == cl + + assert q_res.total_f_weighted['phylum'] == approx(0.4) + assert q_res.total_f_classified['class'] == approx(0.2) + assert q_res.total_bp_classified['superkingdom'] == 40 + + +def test_QueryTaxResult_build_summarized_result_2(): + """test two queries, build summarized result for each""" + # make mini taxonomy + gA_tax = ("gA", "a;b") + gB_tax = ("gB", "a;c") + taxD = make_mini_taxonomy([gA_tax, gB_tax]) + # make gather results + gather_results = [{'query_name': 'queryA', 'name': 'gA', 'f_unique_weighted': 0.5,'f_unique_to_query': 0.5,'unique_intersect_bp': 50}, + {'query_name': 'queryA', "name": 'gB', 'f_unique_weighted': 0.4,'f_unique_to_query': 0.3,'unique_intersect_bp': 30}, + {'query_name': 'queryB', "name": 'gB', 'f_unique_weighted': 0.3,'f_unique_to_query': 0.3,'unique_intersect_bp': 30}] + gres = make_QueryTaxResults(gather_info=gather_results, taxD=taxD) + + for query_name, q_res in gres.items(): + q_res.build_summarized_result() # summarize and build result + sk = q_res.summarized_lineage_results['superkingdom'] + phy = q_res.summarized_lineage_results['phylum'] + assert len(sk) == 2 + assert sk[0].lineage == RankLineageInfo(lineage_str="a") + print(phy) + if query_name == 'queryA': + # check superkingdom results + assert sk[0].fraction == approx(0.8) + assert sk[0].f_weighted_at_rank == approx(0.9) + assert sk[0].bp_match_at_rank == 80 + assert sk[1].fraction == approx(0.2) + assert sk[1].f_weighted_at_rank == approx(0.1) + assert sk[1].bp_match_at_rank == 20 + assert sk[1].lineage == () + # check phylum results + assert len(phy) == 3 + assert phy[0].fraction == approx(0.5) + assert phy[0].f_weighted_at_rank == approx(0.5) + assert phy[0].bp_match_at_rank == 50 + assert phy[0].lineage == RankLineageInfo(lineage_str="a;b") + assert phy[1].fraction == approx(0.3) + assert phy[1].f_weighted_at_rank == approx(0.4) + assert phy[1].bp_match_at_rank == 30 + assert phy[1].lineage == RankLineageInfo(lineage_str="a;c") + assert phy[2].fraction == approx(0.2) + assert phy[2].f_weighted_at_rank == approx(0.1) + assert phy[2].bp_match_at_rank == 20 + assert phy[2].lineage == () + if query_name == 'queryB': + # check superkingdom results + assert sk[0].fraction == approx(0.3) + assert sk[0].f_weighted_at_rank == approx(0.3) + assert sk[0].bp_match_at_rank == 30 + assert sk[1].fraction == approx(0.7) + assert sk[1].f_weighted_at_rank == approx(0.7) + assert sk[1].bp_match_at_rank == 70 + assert sk[1].lineage == () + # check phylum results + assert len(phy) == 2 + assert phy[0].fraction == approx(0.3) + assert phy[0].f_weighted_at_rank == approx(0.3) + assert phy[0].bp_match_at_rank == 30 + assert phy[0].lineage == RankLineageInfo(lineage_str="a;c") + assert phy[1].fraction == approx(0.7) + assert phy[1].f_weighted_at_rank == approx(0.7) + assert phy[1].bp_match_at_rank == 70 + assert phy[1].lineage == () + + +def test_QueryTaxResult_build_summarized_result_missing_lineage(): + "build summarized_result with missing lineage" + taxD = make_mini_taxonomy([("gA", "a;b;c")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + q_res.build_summarized_result() + print(q_res.summarized_lineage_results.keys()) + print(q_res.summarized_lineage_results['superkingdom']) + + sk = [SummarizedGatherResult(rank='superkingdom', fraction=0.1, f_weighted_at_rank=0.2, + lineage=RankLineageInfo(lineage_str="a"), + bp_match_at_rank=20, query_ani_at_rank=approx(0.928, rel=1e-2)), + SummarizedGatherResult(rank='superkingdom', fraction=0.9, lineage=(),f_weighted_at_rank=0.8, + bp_match_at_rank=80, query_ani_at_rank=None)] + assert q_res.summarized_lineage_results['superkingdom'] == sk + print(q_res.summarized_lineage_results['phylum']) + phy = [SummarizedGatherResult(rank='phylum', fraction=0.1, f_weighted_at_rank=0.2, + lineage=RankLineageInfo(lineage_str="a;b"), + bp_match_at_rank=20, query_ani_at_rank=approx(0.928, rel=1e-2)), + SummarizedGatherResult(rank='phylum', fraction=0.9, lineage=(),f_weighted_at_rank=0.8, + bp_match_at_rank=80, query_ani_at_rank=None)] + assert q_res.summarized_lineage_results['phylum'] == phy + print(q_res.summarized_lineage_results['class']) + cl = [SummarizedGatherResult(rank='class', fraction=0.1, lineage= RankLineageInfo(lineage_str="a;b;c"), + f_weighted_at_rank=0.2, bp_match_at_rank=20, query_ani_at_rank=approx(0.928, rel=1e-2)), + SummarizedGatherResult(rank='class', fraction=0.9, lineage=(), f_weighted_at_rank=0.8, + bp_match_at_rank=80, query_ani_at_rank=None)] + assert q_res.summarized_lineage_results['class'] == cl + + assert q_res.total_f_weighted['phylum'] == approx(0.2) + assert q_res.total_f_classified['class'] == approx(0.1) + assert q_res.total_bp_classified['superkingdom'] == 20 + + +def test_QueryTaxResult_build_summarized_result_skipped_lineage(): + "build summarized_result with skipped lineage" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True, skip_idents=['gB']) + q_res.build_summarized_result() + print(q_res.summarized_lineage_results.keys()) + print(q_res.summarized_lineage_results['superkingdom']) + + sk = [SummarizedGatherResult(rank='superkingdom', fraction=0.1, f_weighted_at_rank=0.2, + lineage=RankLineageInfo(lineage_str="a"), + bp_match_at_rank=20, query_ani_at_rank=approx(0.928, rel=1e-2)), + SummarizedGatherResult(rank='superkingdom', fraction=0.9, lineage=(),f_weighted_at_rank=0.8, + bp_match_at_rank=80, query_ani_at_rank=None)] + assert q_res.summarized_lineage_results['superkingdom'] == sk + print(q_res.summarized_lineage_results['phylum']) + phy = [SummarizedGatherResult(rank='phylum', fraction=0.1, lineage=RankLineageInfo(lineage_str="a;b"), + f_weighted_at_rank=0.2, bp_match_at_rank=20, query_ani_at_rank=approx(0.928, rel=1e-2)), + SummarizedGatherResult(rank='phylum', fraction=0.9, lineage=(), f_weighted_at_rank=0.8, bp_match_at_rank=80, + query_ani_at_rank=None)] + assert q_res.summarized_lineage_results['phylum'] == phy + print(q_res.summarized_lineage_results['class']) + cl = [SummarizedGatherResult(rank='class', fraction=0.1,lineage=RankLineageInfo(lineage_str="a;b;c"), + f_weighted_at_rank=0.2, bp_match_at_rank=20, query_ani_at_rank=approx(0.928, rel=1e-2)), + SummarizedGatherResult(rank='class', fraction=0.9, lineage=(), f_weighted_at_rank=0.8, bp_match_at_rank=80, + query_ani_at_rank=None)] + assert q_res.summarized_lineage_results['class'] == cl + + assert q_res.total_f_weighted['phylum'] == approx(0.2) + assert q_res.total_f_classified['class'] == approx(0.1) + assert q_res.total_bp_classified['superkingdom'] == 20 + + +def test_QueryTaxResult_build_summarized_result_over100percent(): + "summarize up ranks: different values" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB','f_unique_to_query': 0.95}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + # now summarize up the ranks + assert len(q_res.raw_taxresults) == 2 + with pytest.raises(ValueError) as exc: + q_res.build_summarized_result() + print(str(exc)) + assert "Summarized fraction is > 100% of the query! This should not be possible" in str(exc) + + +def test_build_summarized_result_rank_fail_not_available_resummarize(): + "build classification result" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + q_res.summarize_up_ranks('superkingdom') + with pytest.raises(ValueError) as exc: + q_res.build_summarized_result(single_rank='order') + print(str(exc)) + assert "Error: rank 'order' not in summarized rank(s), superkingdom" in str(exc) + + +def test_build_classification_result_containment_threshold_fail(): + "classification result: improper containment threshold" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + with pytest.raises(ValueError) as exc: + q_res.build_classification_result(containment_threshold=1.2) + print(str(exc)) + assert "Containment threshold must be between 0 and 1 (input value: 1.2)." in str(exc) + with pytest.raises(ValueError) as exc: + q_res.build_classification_result(containment_threshold=-.1) + print(str(exc)) + assert "Containment threshold must be between 0 and 1 (input value: -0.1)." in str(exc) + + + +def test_build_classification_result_containment_threshold(): + "basic functionality: build classification result using containment threshold" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + + q_res.build_classification_result(containment_threshold=0.1) + print("classif: ", q_res.classification_result) + assert q_res.classification_result.status == 'match' + assert q_res.classification_result.rank == 'class' + assert q_res.classification_result.fraction == 0.1 + assert q_res.classification_result.lineage == RankLineageInfo(lineage_str="a;b;c") + assert q_res.classification_result.f_weighted_at_rank == 0.2 + assert q_res.classification_result.bp_match_at_rank == 20 + assert q_res.classification_result.query_ani_at_rank == approx(0.928, rel=1e-2) + + q_res.build_classification_result(containment_threshold=0.2) + print("classif: ", q_res.classification_result) + assert q_res.classification_result.status == 'match' + assert q_res.classification_result.rank == 'phylum' + assert q_res.classification_result.lineage == RankLineageInfo(lineage_str="a;b") + assert q_res.classification_result.f_weighted_at_rank == 0.4 + assert q_res.classification_result.fraction == 0.2 + assert q_res.classification_result.bp_match_at_rank == 40 + assert q_res.classification_result.query_ani_at_rank == approx(0.95, rel=1e-2) + + q_res.build_classification_result(containment_threshold=1.0) + print("classif: ", q_res.classification_result) + assert q_res.classification_result.status == 'below_threshold' + assert q_res.classification_result.rank == 'superkingdom' + assert q_res.classification_result.fraction == 0.2 + assert q_res.classification_result.lineage == RankLineageInfo(lineage_str="a") + assert q_res.classification_result.f_weighted_at_rank == 0.4 + assert q_res.classification_result.bp_match_at_rank == 40 + assert q_res.classification_result.query_ani_at_rank == approx(0.95, rel=1e-2) + + +def test_build_classification_result_ani_threshold(): + "basic functionality: build classification result" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + + q_res.build_classification_result(ani_threshold=.92) + print("classif: ", q_res.classification_result) + assert q_res.classification_result.status == 'match' + assert q_res.classification_result.rank == 'class' + assert q_res.classification_result.fraction == 0.1 + assert q_res.classification_result.lineage == RankLineageInfo(lineage_str="a;b;c") + assert q_res.classification_result.f_weighted_at_rank == 0.2 + assert q_res.classification_result.bp_match_at_rank == 20 + assert q_res.classification_result.query_ani_at_rank == approx(0.928, rel=1e-2) + + q_res.build_classification_result(ani_threshold=0.94) # should classify at phylum + print("classif: ", q_res.classification_result) + assert q_res.classification_result.status == 'match' + assert q_res.classification_result.rank == 'phylum' + assert q_res.classification_result.fraction == 0.2 + assert q_res.classification_result.lineage == RankLineageInfo(lineage_str="a;b") + assert q_res.classification_result.f_weighted_at_rank == 0.4 + assert q_res.classification_result.bp_match_at_rank == 40 + assert q_res.classification_result.query_ani_at_rank == approx(0.95, rel=1e-2) + + # superk result, but doesn't meet ANI threshold + q_res.build_classification_result(ani_threshold=0.96) + print("classif: ", q_res.classification_result) + assert q_res.classification_result.status == 'below_threshold' + assert q_res.classification_result.rank == 'superkingdom' + assert q_res.classification_result.fraction == 0.2 + assert q_res.classification_result.lineage == RankLineageInfo(lineage_str="a") + assert q_res.classification_result.f_weighted_at_rank == 0.4 + assert q_res.classification_result.bp_match_at_rank == 40 + assert q_res.classification_result.query_ani_at_rank == approx(0.95, rel=1e-2) + + +def test_build_classification_result_ani_threshold_fail(): + "classification result: improper ANI threshold" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + with pytest.raises(ValueError) as exc: + q_res.build_classification_result(ani_threshold=1.2) + print(str(exc)) + assert "ANI threshold must be between 0 and 1 (input value: 1.2)." in str(exc) + with pytest.raises(ValueError) as exc: + q_res.build_classification_result(ani_threshold=-.1) + print(str(exc)) + assert "ANI threshold must be between 0 and 1 (input value: -0.1)." in str(exc) + + +def test_build_classification_result_rank_fail_not_filled(): + "classification result: rank not available (wasn't filled in tax lineage matches)" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + with pytest.raises(ValueError) as exc: + q_res.build_classification_result(rank='order') + print(str(exc)) + assert "Error: rank 'order' was not available for any matching lineages." in str(exc) + + +def test_build_classification_result_rank_fail_not_available_resummarize(): + "classification result: rank not available (wasn't summarized)" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + q_res.summarize_up_ranks('superkingdom') + with pytest.raises(ValueError) as exc: + q_res.build_classification_result(rank='order') + print(str(exc)) + assert "Error: rank 'order' not in summarized rank(s), superkingdom" in str(exc) + + +def test_build_classification_result_rank_fail_not_available(): + "classification result: rank not available" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + with pytest.raises(ValueError) as exc: + q_res.build_classification_result(rank='NotARank') + print(str(exc)) + assert "Error: rank 'NotARank' not in available ranks (strain, species, genus, family, order, class, phylum, superkingdom)" in str(exc) + + +def test_build_classification_result_rank_containment_threshold(): + "classification result - rank and containment threshold (default)" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + + q_res.build_classification_result(rank='class') + print("classif: ", q_res.classification_result) + assert q_res.classification_result.status == 'match' + assert q_res.classification_result.rank == 'class' + assert q_res.classification_result.fraction == 0.1 + assert q_res.classification_result.lineage == RankLineageInfo(lineage_str="a;b;c") + assert q_res.classification_result.f_weighted_at_rank == 0.2 + assert q_res.classification_result.bp_match_at_rank == 20 + assert q_res.classification_result.query_ani_at_rank == approx(0.928, rel=1e-2) + + q_res.build_classification_result(rank='class', containment_threshold=0.4) + assert q_res.classification_result.status == 'below_threshold' + assert q_res.classification_result.rank == 'class' + assert q_res.classification_result.fraction == 0.1 + assert q_res.classification_result.lineage == RankLineageInfo(lineage_str="a;b;c") + assert q_res.classification_result.f_weighted_at_rank == 0.2 + assert q_res.classification_result.bp_match_at_rank == 20 + assert q_res.classification_result.query_ani_at_rank == approx(0.928, rel=1e-2) + + +def test_build_classification_result_rank_ani_threshold(): + "classification result with rank and ANI threshold" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + + q_res.build_classification_result(rank='class', ani_threshold=0.92) + assert q_res.classification_result.status == 'match' + assert q_res.classification_result.rank == 'class' + assert q_res.classification_result.fraction == 0.1 + assert q_res.classification_result.lineage == RankLineageInfo(lineage_str="a;b;c") + assert q_res.classification_result.f_weighted_at_rank == 0.2 + assert q_res.classification_result.bp_match_at_rank == 20 + assert q_res.classification_result.query_ani_at_rank == approx(0.928, rel=1e-2) + + q_res.build_classification_result(rank='class', ani_threshold=0.95) + assert q_res.classification_result.status == 'below_threshold' + assert q_res.classification_result.rank == 'class' + assert q_res.classification_result.fraction == 0.1 + assert q_res.classification_result.lineage == RankLineageInfo(lineage_str="a;b;c") + assert q_res.classification_result.f_weighted_at_rank == 0.2 + assert q_res.classification_result.bp_match_at_rank == 20 + assert q_res.classification_result.query_ani_at_rank == approx(0.928, rel=1e-2) + + +def test_krona_classified(): + "basic functionality: build classification result using containment threshold" + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + q_res.build_classification_result() + assert q_res.krona_classified == None + q_res.build_classification_result(rank='phylum')#, force_resummarize=True) + print(q_res.krona_classified) + assert q_res.krona_classified == (0.4, 'a', 'b') + assert q_res.krona_unclassified == (0.6, 'unclassified', 'unclassified') + q_res.build_classification_result(rank='superkingdom') + print(q_res.krona_classified) + assert q_res.krona_classified == (0.4, 'a') + assert q_res.krona_unclassified == (0.6, 'unclassified') + # make sure this goes back to None if we reclassify without rank + q_res.build_classification_result() + assert q_res.krona_classified == None + assert q_res.krona_unclassified == None + assert q_res.krona_header == [] + + +def test_make_krona_header_basic(): + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + phy_header = ["fraction", "superkingdom", "phylum"] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + q_res.build_classification_result(rank='phylum') + print(q_res.krona_classified) + print(q_res.krona_header) + assert q_res.krona_header == phy_header + hd = q_res.make_krona_header('phylum') + print("header: ", hd) + assert hd == phy_header + + +def test_make_krona_header_basic_1(): + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + class_header = ["fraction", "superkingdom", "phylum", "class"] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True) + q_res.build_classification_result(rank='class') + assert q_res.krona_header == class_header + hd = q_res.make_krona_header(min_rank='class') + print("header: ", hd) + assert hd == class_header + + +def test_make_krona_header_fail(): + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True, summarize=True) + with pytest.raises(ValueError) as exc: + q_res.make_krona_header("order") + assert "Rank 'order' not present in summarized ranks." in str(exc.value) + with pytest.raises(ValueError) as exc: + q_res.make_krona_header("NotARank") + assert "Rank 'NotARank' not present in summarized ranks." in str(exc.value) + + +def test_make_human_summary(): + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True, summarize=True) + hs = q_res.make_human_summary(display_rank = "superkingdom") + print(hs) + assert hs == [{'rank': 'superkingdom', 'fraction': '0.800', 'lineage': 'unclassified', + 'f_weighted_at_rank': '60.0%', 'bp_match_at_rank': "60", 'query_ani_at_rank': '- ', + 'query_name': 'q1', 'query_md5': 'md5', 'query_filename': 'query_fn', + 'total_weighted_hashes': "0"}, + {'rank': 'superkingdom', 'fraction': '0.200', 'lineage': "a", + 'f_weighted_at_rank': '40.0%', 'bp_match_at_rank': "40", 'query_ani_at_rank': '94.9%', + 'query_name': 'q1', 'query_md5': 'md5', 'query_filename': 'query_fn', 'total_weighted_hashes': "0"}] + + +def test_make_human_summary_2(): + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True, summarize=True) + hs = q_res.make_human_summary(display_rank = "phylum") + print(hs) + assert hs == [{'rank': 'phylum', 'fraction': '0.800', 'lineage': 'unclassified', + 'f_weighted_at_rank': '60.0%', 'bp_match_at_rank': "60", 'query_ani_at_rank': '- ', + 'query_name': 'q1', 'query_md5': 'md5', 'query_filename': 'query_fn', + 'total_weighted_hashes': "0"}, + {'rank': 'phylum', 'fraction': '0.200', 'lineage': 'a;b', + 'f_weighted_at_rank': '40.0%', 'bp_match_at_rank': "40", 'query_ani_at_rank': '94.9%', + 'query_name': 'q1', 'query_md5': 'md5', 'query_filename': 'query_fn', 'total_weighted_hashes': "0"}] + + +def test_make_human_summary_classification(): + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True, classify=True, classify_rank="superkingdom") + hs = q_res.make_human_summary(display_rank = "superkingdom", classification=True) + print(hs) + assert hs == [{'rank': 'superkingdom', 'fraction': '0.200', 'lineage': 'a', + 'f_weighted_at_rank': '40.0%', 'bp_match_at_rank': "40", + 'query_ani_at_rank': '94.9%', 'status': 'match', 'query_name': 'q1', + 'query_md5': 'md5', 'query_filename': 'query_fn', 'total_weighted_hashes': "0"}] + + +def test_make_human_summary_classification_2(): + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True, classify=True, classify_rank="phylum") + hs = q_res.make_human_summary(display_rank = "phylum", classification=True) + print(hs) + assert hs == [{'rank': 'phylum', 'fraction': '0.200', 'lineage': 'a;b', + 'f_weighted_at_rank': '40.0%', 'bp_match_at_rank': "40", + 'query_ani_at_rank': '94.9%', 'status': 'match', + 'query_name': 'q1', 'query_md5': 'md5', + 'query_filename': 'query_fn', 'total_weighted_hashes': "0"}] + + +def test_make_full_summary(): + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True, summarize=True) + header, fs = q_res.make_full_summary() + assert header == ['query_name', 'rank', 'fraction', 'lineage', 'query_md5', 'query_filename', + 'f_weighted_at_rank', 'bp_match_at_rank', 'query_ani_at_rank', 'total_weighted_hashes'] + print(fs) + assert fs == [{'rank': 'superkingdom', 'fraction': '0.8', 'lineage': 'unclassified', 'f_weighted_at_rank': + '0.6', 'bp_match_at_rank': '60', 'query_ani_at_rank': None, + 'query_name': 'q1', 'query_md5': 'md5', 'query_filename': 'query_fn', + 'total_weighted_hashes': '0'}, + {'rank': 'superkingdom', 'fraction': '0.2', 'lineage': 'a', 'f_weighted_at_rank': '0.4', + 'bp_match_at_rank': '40', 'query_ani_at_rank': approx(0.949,rel=1e-3), 'query_name': 'q1', + 'query_md5': 'md5', 'query_filename': 'query_fn', 'total_weighted_hashes': '0'}, + {'rank': 'phylum', 'fraction': '0.8', 'lineage': 'unclassified', 'f_weighted_at_rank': '0.6', + 'bp_match_at_rank': '60', 'query_ani_at_rank': None, 'query_name': 'q1', 'query_md5': 'md5', + 'query_filename': 'query_fn', 'total_weighted_hashes': '0'}, + {'rank': 'phylum', 'fraction': '0.2', 'lineage': 'a;b', 'f_weighted_at_rank': '0.4', + 'bp_match_at_rank': '40', 'query_ani_at_rank': approx(0.949,rel=1e-3), 'query_name': 'q1', + 'query_md5': 'md5', 'query_filename': 'query_fn', 'total_weighted_hashes': '0'}, + {'rank': 'class', 'fraction': '0.8', 'lineage': 'unclassified', 'f_weighted_at_rank': '0.6', + 'bp_match_at_rank': '60', 'query_ani_at_rank': None, 'query_name': 'q1', 'query_md5': 'md5', + 'query_filename': 'query_fn', 'total_weighted_hashes': '0'}, + {'rank': 'class', 'fraction': '0.1', 'lineage': 'a;b;c', 'f_weighted_at_rank': '0.2', + 'bp_match_at_rank': '20', 'query_ani_at_rank': approx(0.928, rel=1e-3), + 'query_name': 'q1', 'query_md5': 'md5', 'query_filename': 'query_fn', 'total_weighted_hashes': '0'}, + {'rank': 'class', 'fraction': '0.1', 'lineage': 'a;b;d','f_weighted_at_rank': '0.2', + 'bp_match_at_rank': '20', 'query_ani_at_rank': approx(0.928, rel=1e-3), 'query_name': 'q1', + 'query_md5': 'md5', 'query_filename': 'query_fn', 'total_weighted_hashes': '0'}] + + header, fs = q_res.make_full_summary(limit_float=True) + assert header == ['query_name', 'rank', 'fraction', 'lineage', 'query_md5', 'query_filename', + 'f_weighted_at_rank', 'bp_match_at_rank', 'query_ani_at_rank', 'total_weighted_hashes'] + print(fs) + assert fs == [{'rank': 'superkingdom', 'fraction': '0.800', 'lineage': 'unclassified', 'f_weighted_at_rank': + '0.600', 'bp_match_at_rank': '60', 'query_ani_at_rank': None, + 'query_name': 'q1', 'query_md5': 'md5', 'query_filename': 'query_fn', + 'total_weighted_hashes': '0'}, + {'rank': 'superkingdom', 'fraction': '0.200', 'lineage': 'a', 'f_weighted_at_rank': '0.400', + 'bp_match_at_rank': '40', 'query_ani_at_rank': "0.949", 'query_name': 'q1', + 'query_md5': 'md5', 'query_filename': 'query_fn', 'total_weighted_hashes': '0'}, + {'rank': 'phylum', 'fraction': '0.800', 'lineage': 'unclassified', 'f_weighted_at_rank': '0.600', + 'bp_match_at_rank': '60', 'query_ani_at_rank': None, 'query_name': 'q1', 'query_md5': 'md5', + 'query_filename': 'query_fn', 'total_weighted_hashes': '0'}, + {'rank': 'phylum', 'fraction': '0.200', 'lineage': 'a;b', 'f_weighted_at_rank': '0.400', + 'bp_match_at_rank': '40', 'query_ani_at_rank': "0.949", 'query_name': 'q1', + 'query_md5': 'md5', 'query_filename': 'query_fn', 'total_weighted_hashes': '0'}, + {'rank': 'class', 'fraction': '0.800', 'lineage': 'unclassified', 'f_weighted_at_rank': '0.600', + 'bp_match_at_rank': '60', 'query_ani_at_rank': None, 'query_name': 'q1', 'query_md5': 'md5', + 'query_filename': 'query_fn', 'total_weighted_hashes': '0'}, + {'rank': 'class', 'fraction': '0.100', 'lineage': 'a;b;c', 'f_weighted_at_rank': '0.200', + 'bp_match_at_rank': '20', 'query_ani_at_rank': "0.928", + 'query_name': 'q1', 'query_md5': 'md5', 'query_filename': 'query_fn', 'total_weighted_hashes': '0'}, + {'rank': 'class', 'fraction': '0.100', 'lineage': 'a;b;d','f_weighted_at_rank': '0.200', + 'bp_match_at_rank': '20', 'query_ani_at_rank': "0.928", 'query_name': 'q1', + 'query_md5': 'md5', 'query_filename': 'query_fn', 'total_weighted_hashes': '0'}] + + +def test_make_full_summary_summarization_fail(): + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True, summarize=False) + with pytest.raises(ValueError) as exc: + q_res.make_full_summary() + print(str(exc)) + assert 'not summarized yet' in str(exc) + + +def test_make_full_summary_classification(): + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True, classify=True) + header, fs = q_res.make_full_summary(classification=True) + assert header == ["query_name", "status", "rank", "fraction", "lineage", + "query_md5", "query_filename", "f_weighted_at_rank", + "bp_match_at_rank", "query_ani_at_rank"] + print(fs) + assert fs == [{'rank': 'class', 'fraction': '0.1', 'lineage': 'a;b;c', 'f_weighted_at_rank': '0.2', + 'bp_match_at_rank': '20', 'query_ani_at_rank': approx(0.928, rel=1e-3), + 'status': 'match', 'query_name': 'q1', 'query_md5': 'md5', 'query_filename': 'query_fn', + 'total_weighted_hashes': '0'}] + + +def test_make_full_summary_classification_limit_float(): + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True, classify=True) + header, fs = q_res.make_full_summary(classification=True, limit_float=True) + assert header == ["query_name", "status", "rank", "fraction", "lineage", + "query_md5", "query_filename", "f_weighted_at_rank", + "bp_match_at_rank", "query_ani_at_rank"] + print(fs) + assert fs == [{'rank': 'class', 'fraction': '0.100', 'lineage': 'a;b;c', 'f_weighted_at_rank': '0.200', + 'bp_match_at_rank': '20', 'query_ani_at_rank': "0.928", + 'status': 'match', 'query_name': 'q1', 'query_md5': 'md5', 'query_filename': 'query_fn', + 'total_weighted_hashes': '0'}] + + +def test_make_full_summary_classification_fail(): + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True, summarize=True) + with pytest.raises(ValueError) as exc: + q_res.make_full_summary(classification=True) + print(str(exc)) + assert 'not classified yet' in str(exc) + + +def test_make_kreport_results(): + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;c;d;e;f;g")]) + #need to go down to species to check that `num_bp_assigned` is happening correctly + gather_results = [{"total_weighted_hashes":100}, {"name": 'gB', "total_weighted_hashes":100}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True, summarize=True) + krepD = q_res.make_kreport_results() + print(krepD) + assert krepD == [{'num_bp_assigned': '0', 'percent_containment': '40.00', 'num_bp_contained': '40', + 'rank_code': 'D', 'sci_name': 'a', 'ncbi_taxid': None}, + {'num_bp_assigned': '60', 'percent_containment': '60.00', 'num_bp_contained': '60', + 'sci_name': 'unclassified', 'rank_code': 'U'}, + {'num_bp_assigned': '0', 'percent_containment': '40.00', 'num_bp_contained': '40', + 'rank_code': 'P', 'sci_name': 'b', 'ncbi_taxid': None}, + {'num_bp_assigned': '0', 'percent_containment': '40.00', 'num_bp_contained': '40', + 'rank_code': 'C', 'sci_name': 'c', 'ncbi_taxid': None}, + {'num_bp_assigned': '0', 'percent_containment': '20.00', 'num_bp_contained': '20', + 'rank_code': 'O', 'sci_name': 'd', 'ncbi_taxid': None}, + {'num_bp_assigned': '0', 'percent_containment': '20.00', 'num_bp_contained': '20', + 'rank_code': 'F', 'sci_name': 'e', 'ncbi_taxid': None}, + {'num_bp_assigned': '0', 'percent_containment': '20.00', 'num_bp_contained': '20', + 'rank_code': 'G', 'sci_name': 'f', 'ncbi_taxid': None}, + {'num_bp_assigned': '20', 'percent_containment': '20.00', 'num_bp_contained': '20', + 'rank_code': 'S', 'sci_name': 'g', 'ncbi_taxid': None}] + + +def test_make_kreport_results_fail(): + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True, summarize=False) + with pytest.raises(ValueError) as exc: + q_res.make_kreport_results() + print(str(exc)) + assert 'not summarized yet' in str(exc) + + +def test_make_kreport_results_fail_pre_v450(): + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True, summarize=True) + with pytest.raises(ValueError) as exc: + q_res.make_kreport_results() + print(str(exc)) + assert "cannot produce 'kreport' format from gather results before sourmash v4.5.0" in str(exc) + + +def test_make_kreport_results_fail_pre_v450(): + taxD = make_mini_taxonomy([("gA", "a;b;c"), ("gB", "a;b;d")]) + gather_results = [{}, {"name": 'gB'}] + q_res = make_QueryTaxResults(gather_info=gather_results, taxD=taxD, single_query=True, summarize=True) + with pytest.raises(ValueError) as exc: + q_res.make_kreport_results() + print(str(exc)) + assert "cannot produce 'kreport' format from gather results before sourmash v4.5.0" in str(exc)