Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MRG: Use RankLineageInfo to simplify reading lineages #2467

Merged
merged 11 commits into from
Feb 13, 2023
118 changes: 58 additions & 60 deletions src/sourmash/tax/tax_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class BaseLineageInfo:
optional:
lineage: tuple or list of LineagePair
lineage_str: `;`- or `,`-separated string of names
lineage_dict: dictionary of {rank: name}

If no lineage information is provided, result will be a BaseLineageInfo
with provided ranks and no lineage names.
Expand All @@ -63,7 +62,6 @@ class BaseLineageInfo:
ranks: tuple() # require ranks
lineage: tuple = None # tuple of LineagePairs
lineage_str: str = field(default=None, compare=False) # ';'- or ','-separated str of lineage names
lineage_dict: dict = field(default=None, compare=False) # dict of rank: name

def __post_init__(self):
"Initialize according to passed values"
Expand All @@ -74,8 +72,6 @@ def __post_init__(self):
self._init_from_lineage_tuples()
elif self.lineage_str is not None:
self._init_from_lineage_str()
elif self.lineage_dict is not None:
self._init_from_lineage_dict()
else:
self._init_empty()

Expand Down Expand Up @@ -170,37 +166,6 @@ def _init_from_lineage_tuples(self):
object.__setattr__(self, "lineage", tuple(new_lineage))
object.__setattr__(self, "filled_ranks", filled_ranks)

def _init_from_lineage_dict(self):
'initialize from lineage dict, e.g. from gather csv, allowing empty ranks and reordering if necessary'
if not isinstance(self.lineage_dict, (dict)):
raise ValueError(f"{self.lineage_dict} is not dictionary")
# first, initialize_empty
new_lineage = []
# build empty lineage
for rank in self.ranks:
new_lineage.append(LineagePair(rank=rank))
# now add input information in correct spots. This corrects for order and allows empty values.
for rank, info in self.lineage_dict.items():
try:
rank_idx = self.rank_index(rank)
except ValueError as e:
raise ValueError(f"Rank '{rank}' not present in {', '.join(self.ranks)}") from e

name, taxid = None, None
if isinstance(info, dict):
if 'name' in info.keys():
name = info['name']
if 'taxid' in info.keys():
taxid = info['taxid']
elif isinstance(info, str):
name = info
new_lineage[rank_idx] = LineagePair(rank=rank, name=name, taxid=taxid)
# build list of filled ranks
filled_ranks = [a.rank for a in new_lineage if a.name]
# set lineage and filled_ranks
object.__setattr__(self, "lineage", tuple(new_lineage))
object.__setattr__(self, "filled_ranks", filled_ranks)

def _init_from_lineage_str(self):
"""
Turn a ; or ,-separated set of lineages into a list of LineagePair objs.
Expand Down Expand Up @@ -329,6 +294,7 @@ class RankLineageInfo(BaseLineageInfo):
and will not be used or compared in any other class methods.
"""
ranks: tuple = ('superkingdom', 'phylum', 'class', 'order', 'family', 'genus', 'species', 'strain')
lineage_dict: dict = field(default=None, compare=False) # dict of rank: name

def __post_init__(self):
"Initialize according to passed values"
Expand All @@ -344,6 +310,53 @@ def __post_init__(self):
elif self.ranks:
self._init_empty()

def _init_from_lineage_dict(self):
"""
Initialize from lineage dict, e.g. from lineages csv.
Use NCBI taxids if available as '|'-separated 'taxpath' column.
Allows empty ranks/extra columns and reordering if necessary
"""
null_names = set(['[Blank]', 'na', 'null', 'NA', ''])
if not isinstance(self.lineage_dict, (dict)):
raise ValueError(f"{self.lineage_dict} is not dictionary")
new_lineage = []
taxpath=[]
# build empty lineage and taxpath
for rank in self.ranks:
new_lineage.append(LineagePair(rank=rank))

# check for NCBI taxpath information
taxpath_str = self.lineage_dict.get('taxpath', [])
if taxpath_str:
taxpath = taxpath_str.split('|')
if len(taxpath) > len(self.ranks):
raise ValueError(f"Number of NCBI taxids ({len(taxpath)}) exceeds number of ranks ({len(self.ranks)})")

# now add rank information in correct spots. This corrects for order and allows empty ranks and extra dict keys
for key, val in self.lineage_dict.items():
name, taxid = None, None
try:
rank, name = key, val
rank_idx = self.rank_index(rank)
except ValueError:
continue # ignore dictionary entries (columns) that don't match a rank

if taxpath:
try:
taxid = taxpath[rank_idx]
except IndexError:
taxid = None
# filter null
if name is not None and name.strip() in null_names:
name = None
new_lineage[rank_idx] = LineagePair(rank=rank, name=name, taxid=taxid)

# build list of filled ranks
filled_ranks = [a.rank for a in new_lineage if a.name]
# set lineage and filled_ranks
object.__setattr__(self, "lineage", tuple(new_lineage))
object.__setattr__(self, "filled_ranks", filled_ranks)


def get_ident(ident, *,
keep_full_identifiers=False, keep_identifier_versions=False):
Expand Down Expand Up @@ -754,15 +767,14 @@ def load(cls, filename, *, delimiter=',', force=False,
else:
header_str = ",".join([repr(x) for x in header])
raise ValueError(f'No taxonomic identifiers found; headers are {header_str}')

# is "strain" an available rank?
if "strain" in header:
include_strain=True
load_taxids=False
if 'taxpath' in header:
load_taxids=True

# check that all ranks are in header
ranks = list(lca_utils.taxlist(include_strain=include_strain))
ranks = list(RankLineageInfo().taxlist)
if not include_strain:
ranks.remove('strain')
if not set(ranks).issubset(header):
# for now, just raise err if not all ranks are present.
# in future, we can define `ranks` differently if desired
Expand All @@ -777,40 +789,26 @@ def load(cls, filename, *, delimiter=',', force=False,
# now parse and load lineages
for n, row in enumerate(r):
num_rows += 1
lineage = []
taxid=None
# read row into a lineage pair
if load_taxids:
taxpath = row['taxpath'].split('|')
for n, rank in enumerate(lca_utils.taxlist(include_strain=include_strain)):
lin = row[rank]
if load_taxids:
taxid = taxpath[n]
lineage.append(LineagePair(rank, name=lin, taxid=taxid))
# read lineage from row dictionary
lineageInfo = RankLineageInfo(lineage_dict=row)
# get identifier
ident = row[identifier]

# fold, spindle, and mutilate ident?
ident = get_ident(ident,
keep_full_identifiers=keep_full_identifiers,
keep_identifier_versions=keep_identifier_versions)

# clean lineage of null names, replace with 'unassigned'
lineage = [ (lin.rank, lca_utils.filter_null(lin.name), lin.taxid) for lin in lineage ]
lineage = [ LineagePair(a, b, c) for (a, b, c) in lineage ]

# remove end nulls
while lineage and lineage[-1].name == 'unassigned':
lineage = lineage[:-1]

# store lineage tuple
lineage = lineageInfo.filled_lineage
if lineage:
# check duplicates
if ident in assignments:
if assignments[ident] != tuple(lineage):
if assignments[ident] != lineage:
if not force:
raise ValueError(f"multiple lineages for identifier {ident}")
else:
assignments[ident] = tuple(lineage)
assignments[ident] = lineage

if lineage[-1].rank == 'species':
n_species += 1
Expand Down
126 changes: 76 additions & 50 deletions tests/test_tax_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1074,46 +1074,6 @@ def test_BaseLineageInfo_init_lca_lineage_tups():
assert taxinf.zip_lineage()== ['a', '', 'b']


def test_BaseLineageInfo_init_lineage_dict_fail():
ranks=["A", "B", "C"]
lin_tups = (LineagePair(rank="A", name='a'), LineagePair(rank="C", name='b'))
with pytest.raises(ValueError) as exc:
taxinf = BaseLineageInfo(ranks=ranks, lineage_dict=lin_tups)
print(str(exc))

assert "is not dictionary" in str(exc)


def test_BaseLineageInfo_init_lineage_dict ():
x = {'rank1': 'name1', 'rank2': 'name2'}
taxinf = BaseLineageInfo(lineage_dict=x, ranks=["rank1", "rank2"])
print("ranks: ", taxinf.ranks)
print("lineage: ", taxinf.lineage)
print("zipped lineage: ", taxinf.zip_lineage())
assert taxinf.zip_lineage()== ['name1', 'name2']


def test_BaseLineageInfo_init_lineage_dict_withtaxid():
x = {'rank1': {'name': 'name1', 'taxid': 1}, 'rank2': {'name':'name2', 'taxid': 2}}
taxinf = BaseLineageInfo(lineage_dict=x, ranks=["rank1", "rank2"])
print("ranks: ", taxinf.ranks)
print("lineage: ", taxinf.lineage)
print("zipped lineage: ", taxinf.zip_lineage())
assert taxinf.zip_lineage()== ['name1', 'name2']
assert taxinf.zip_taxid()== ['1', '2']
assert taxinf.lowest_lineage_taxid == 2
assert taxinf.lowest_lineage_name == "name2"


def test_BaseLineageInfo_init_lineage_str_lineage_dict_test_eq():
x = "a;b;c"
ranks=["A", "B", "C"]
rankD = {"A": "a", "B": "b", "C": "c"}
lin1 = BaseLineageInfo(lineage_str=x, ranks=ranks)
lin2 = BaseLineageInfo(lineage_dict=rankD, ranks=ranks)
assert lin1 == lin2


def test_BaseLineageInfo_init_no_ranks():
x = "a;b;c"
rankD = {"superkingdom": "a", "phylum": "b", "class": "c"}
Expand All @@ -1122,10 +1082,6 @@ def test_BaseLineageInfo_init_no_ranks():
BaseLineageInfo(lineage_str=x)
print(exc)
assert "__init__() missing 1 required positional argument: 'ranks'" in str(exc)
with pytest.raises(TypeError) as exc:
BaseLineageInfo(lineage_dict=rankD)
print(exc)
assert "__init__() missing 1 required positional argument: 'ranks'" in str(exc)
with pytest.raises(TypeError) as exc:
BaseLineageInfo(lineage=lin_tups)
print(exc)
Expand All @@ -1140,10 +1096,6 @@ def test_BaseLineageInfo_init_with_wrong_ranks():
BaseLineageInfo(lineage=lin_tups, ranks=ranks)
print(str(exc))
assert "Rank 'rank1' not present in A, B, C" in str(exc)
with pytest.raises(ValueError) as exc:
BaseLineageInfo(lineage_dict=linD, ranks=ranks)
print(str(exc))
assert "Rank 'rank1' not present in A, B, C" in str(exc)


def test_BaseLineageInfo_init_not_lineagepair():
Expand Down Expand Up @@ -1187,14 +1139,55 @@ def test_RankLineageInfo_init_lineage_tups():
assert taxinf.zip_lineage()== ['a', 'b', '', '', '', '', '', '']


def test_RankLineageInfo_init_lineage_dict_fail():
ranks=["A", "B", "C"]
lin_tups = (LineagePair(rank="A", name='a'), LineagePair(rank="C", name='b'))
with pytest.raises(ValueError) as exc:
taxinf = RankLineageInfo(ranks=ranks, lineage_dict=lin_tups)
print(str(exc))

assert "is not dictionary" in str(exc)


def test_RankLineageInfo_init_lineage_dict():
x = {'rank1': 'name1', 'rank2': 'name2'}
taxinf = RankLineageInfo(lineage_dict=x, ranks=["rank1", "rank2"])
print("ranks: ", taxinf.ranks)
print("lineage: ", taxinf.lineage)
print("zipped lineage: ", taxinf.zip_lineage())
assert taxinf.zip_lineage()== ['name1', 'name2']


def test_RankLineageInfo_init_lineage_dict_default_ranks():
x = {"superkingdom":'a',"phylum":'b'}
taxinf = RankLineageInfo(lineage_dict=x)
print(taxinf.lineage)
print(taxinf.lineage_str)
assert taxinf.zip_lineage()== ['a', 'b', '', '', '', '', '', '']


def test_RankLineageInfo_init_lineage_dict_withtaxpath():
x = {'rank1': 'name1', 'rank2': 'name2', 'taxpath': "1|2"}
taxinf = RankLineageInfo(lineage_dict=x, ranks=["rank1", "rank2"])
print("ranks: ", taxinf.ranks)
print("lineage: ", taxinf.lineage)
print("zipped lineage: ", taxinf.zip_lineage())
print("zipped taxids: ", taxinf.zip_taxid())
assert taxinf.zip_lineage()== ['name1', 'name2']
assert taxinf.zip_taxid()== ['1', '2']
assert taxinf.lowest_lineage_taxid == "2"
assert taxinf.lowest_lineage_name == "name2"


def test_RankLineageInfo_init_lineage_str_lineage_dict_test_eq():
x = "a;b;c"
ranks=["A", "B", "C"]
rankD = {"A": "a", "B": "b", "C": "c"}
lin1 = RankLineageInfo(lineage_str=x, ranks=ranks)
lin2 = RankLineageInfo(lineage_dict=rankD, ranks=ranks)
assert lin1 == lin2


def test_RankLineageInfo_init_lineage_dict_missing_rank():
x = {'superkingdom': 'name1', 'class': 'name2'}
taxinf = RankLineageInfo(lineage_dict=x)
Expand All @@ -1205,8 +1198,8 @@ def test_RankLineageInfo_init_lineage_dict_missing_rank():
assert taxinf.zip_lineage(truncate_empty=True)== ['name1', '', 'name2']


def test_RankLineageInfo_init_lineage_dict_missing_rank_withtaxid():
x = {'superkingdom': {'name': 'name1', 'taxid': 1}, 'class': {'name':'name2', 'taxid': 2}}
def test_RankLineageInfo_init_lineage_dict_missing_rank_with_taxpath():
x = {'superkingdom': 'name1', 'class': 'name2', 'taxpath': '1||2'}
taxinf = RankLineageInfo(lineage_dict=x)
print("ranks: ", taxinf.ranks)
print("lineage: ", taxinf.lineage)
Expand All @@ -1215,6 +1208,39 @@ def test_RankLineageInfo_init_lineage_dict_missing_rank_withtaxid():
assert taxinf.zip_taxid()== ['1', '', '2', '', '', '', '', '']


def test_RankLineageInfo_init_lineage_dict_name_taxpath_mismatch():
# If there's no name, we don't report the taxpath, because lineage is not "filled".
# Is this desired behavior?
x = {'superkingdom': 'name1', 'taxpath': '1||2'}
taxinf = RankLineageInfo(lineage_dict=x)
print("ranks: ", taxinf.ranks)
print("lineage: ", taxinf.lineage)
print("zipped lineage: ", taxinf.zip_lineage())
assert taxinf.zip_lineage()== ['name1', '', '', '', '', '', '', '']
assert taxinf.zip_taxid()== ['1', '', '', '', '', '', '', '']


def test_RankLineageInfo_init_lineage_dict_name_taxpath_missing_taxids():
# If there's no name, we don't report the taxpath, because lineage is not "filled".
# Is this desired behavior?
x = {'superkingdom': 'name1', 'phylum': "name2", "class": "name3", 'taxpath': '|2'}
taxinf = RankLineageInfo(lineage_dict=x)
print("ranks: ", taxinf.ranks)
print("lineage: ", taxinf.lineage)
print("zipped lineage: ", taxinf.zip_lineage())
print("zipped taxids: ", taxinf.zip_taxid())
assert taxinf.zip_lineage()== ['name1', 'name2', 'name3', '', '', '', '', '']
assert taxinf.zip_taxid()== ['', '2', '', '', '', '', '', '']


def test_RankLineageInfo_init_lineage_dict_taxpath_too_long():
x = {'superkingdom': 'name1', 'class': 'name2', 'taxpath': '1||2||||||||||'}
with pytest.raises(ValueError) as exc:
RankLineageInfo(lineage_dict=x)
print(str(exc))
assert f"Number of NCBI taxids (13) exceeds number of ranks (8)" in str(exc)


def test_RankLineageInfo_init_lineage_str_lineage_dict_test_eq():
x = "a;b;c"
rankD = {"superkingdom": "a", "phylum": "b", "class": "c"}
Expand Down