Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace super(Class, self) => super() #297

Merged
merged 1 commit into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 38 additions & 26 deletions goatools/anno/gaf_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
"""

import sys
from goatools.anno.annoreader_base import AnnoReaderBase
from goatools.anno.init.reader_gaf import GafData
from goatools.anno.init.reader_gaf import InitAssc

from .annoreader_base import AnnoReaderBase
from .init.reader_gaf import GafData, InitAssc

__copyright__ = "Copyright (C) 2016-2019, DV Klopfenstein, H Tang. All rights reserved."
__author__ = "DV Klopfenstein"
Expand All @@ -17,42 +17,52 @@
class GafReader(AnnoReaderBase):
"""Reads a Gene Annotation File (GAF). Returns a Python object."""

exp_kws = {'hdr_only', 'prt', 'namespaces', 'allow_missing_symbol', 'godag'}
exp_kws = {"hdr_only", "prt", "namespaces", "allow_missing_symbol", "godag"}

def __init__(self, filename=None, **kws):
super(GafReader, self).__init__(
'gaf', filename,
godag=kws.get('godag'),
hdr_only=kws.get('hdr_only', False),
prt=kws.get('prt', sys.stdout),
namespaces=kws.get('namespaces'),
allow_missing_symbol=kws.get('allow_missing_symbol', False))
super().__init__(
"gaf",
filename,
godag=kws.get("godag"),
hdr_only=kws.get("hdr_only", False),
prt=kws.get("prt", sys.stdout),
namespaces=kws.get("namespaces"),
allow_missing_symbol=kws.get("allow_missing_symbol", False),
)

def read_gaf(self, namespace='BP', **kws):
def read_gaf(self, namespace="BP", **kws):
"""Read Gene Association File (GAF). Return associations."""
return self.get_id2gos(namespace, **kws)

@staticmethod
def wr_txt(fout_gaf, nts):
"""Write namedtuples into a gaf format"""
pat = (
'{DB}\t{DB_ID}\t{DB_Symbol}\t{Qualifier}\t{GO_ID}\t{DB_Reference}\t'
'{Evidence_Code}\t{With_From}\t{NS}\t{DB_Name}\t{DB_Synonym}\t{DB_Type}\t'
'{Taxon}\t{Date}\t{Assigned_By}\t{Extension}\t{Gene_Product_Form_ID}\n')
sets = {'Qualifier', 'DB_Reference', 'With_From', 'DB_Name', 'DB_Synonym', 'Gene_Product_Form_ID'}
ns2a = {ns:p for p, ns in GafData.aspect2ns.items()}
with open(fout_gaf, 'w') as prt:
prt.write('!gaf-version: 2.1\n')
"{DB}\t{DB_ID}\t{DB_Symbol}\t{Qualifier}\t{GO_ID}\t{DB_Reference}\t"
"{Evidence_Code}\t{With_From}\t{NS}\t{DB_Name}\t{DB_Synonym}\t{DB_Type}\t"
"{Taxon}\t{Date}\t{Assigned_By}\t{Extension}\t{Gene_Product_Form_ID}\n"
)
sets = {
"Qualifier",
"DB_Reference",
"With_From",
"DB_Name",
"DB_Synonym",
"Gene_Product_Form_ID",
}
ns2a = {ns: p for p, ns in GafData.aspect2ns.items()}
with open(fout_gaf, "w") as prt:
prt.write("!gaf-version: 2.1\n")
for ntd in nts:
dct = ntd._asdict()
for fld in sets:
dct[fld] = '|'.join(sorted(dct[fld]))
dct['Taxon'] = '|'.join(['taxon:{T}'.format(T=t) for t in dct['Taxon']])
dct['NS'] = ns2a[dct['NS']]
dct['Date'] = dct['Date'].strftime('%Y%m%d')
dct[fld] = "|".join(sorted(dct[fld]))
dct["Taxon"] = "|".join(["taxon:{T}".format(T=t) for t in dct["Taxon"]])
dct["NS"] = ns2a[dct["NS"]]
dct["Date"] = dct["Date"].strftime("%Y%m%d")
prt.write(pat.format(**dct))
#prt.write('{NT}\n'.format(NT=ntd))
print(' {N} annotations WROTE: {GAF}'.format(N=len(nts), GAF=fout_gaf))
# prt.write('{NT}\n'.format(NT=ntd))
print(" {N} annotations WROTE: {GAF}".format(N=len(nts), GAF=fout_gaf))

def chk_associations(self, fout_err="gaf.err"):
"""Check that fields are legal in GAF"""
Expand All @@ -67,7 +77,9 @@ def has_ns(self):
def _init_associations(self, fin_gaf, **kws):
"""Read annotation file and store a list of namedtuples."""
ini = InitAssc(fin_gaf)
nts = ini.init_associations(kws['hdr_only'], kws['prt'], kws['namespaces'], kws['allow_missing_symbol'])
nts = ini.init_associations(
kws["hdr_only"], kws["prt"], kws["namespaces"], kws["allow_missing_symbol"]
)
self.hdr = ini.hdr
return nts

Expand Down
82 changes: 52 additions & 30 deletions goatools/anno/genetogo_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,39 @@

"""

import sys
import collections as cx
import sys

from itertools import chain
from goatools.anno.init.reader_genetogo import InitAssc
from goatools.anno.annoreader_base import AnnoReaderBase
from goatools.anno.opts import AnnoOptions

__copyright__ = "Copyright (C) 2016-present, DV Klopfenstein, H Tang. All rights reserved."
from .annoreader_base import AnnoReaderBase
from .init.reader_genetogo import InitAssc
from .opts import AnnoOptions

__copyright__ = (
"Copyright (C) 2016-present, DV Klopfenstein, H Tang. All rights reserved."
)
__author__ = "DV Klopfenstein"


# pylint: disable=broad-except,too-few-public-methods,line-too-long
class Gene2GoReader(AnnoReaderBase):
"""Reads a Gene Annotation File (GAF). Returns a Python object."""

exp_kws = {'taxids', 'taxid', 'namespaces', 'godag'}
exp_kws = {"taxids", "taxid", "namespaces", "godag"}

def __init__(self, filename=None, **kws):
# kws: taxids or taxid
super(Gene2GoReader, self).__init__('gene2go', filename, **kws)
super().__init__("gene2go", filename, **kws)
# Each taxid has a list of namedtuples - one for each line in the annotations
self.taxid2asscs = self._init_taxid2asscs()

def get_ns2assc(self, taxid=None, **kws):
"""Return given associations into 3 (BP, MF, CC) dicts, id2gos"""
return {ns:self._get_id2gos(nts, **kws) for ns, nts in self.get_ns2ntsanno(taxid).items()}
return {
ns: self._get_id2gos(nts, **kws)
for ns, nts in self.get_ns2ntsanno(taxid).items()
}

def get_ns2ntsanno(self, taxid=None):
"""Return all associations in three (one for BP MF CC) dicts, id2gos"""
Expand Down Expand Up @@ -63,28 +70,33 @@ def get_associations(self, taxid=None):
@staticmethod
def _warning_taxid(taxid):
"""Warn if an unexpected taxid"""
pat = ('**WARNING: NO ASSOCIATIONS FOR taxid({TAXID}). '
'Taxid MUST BE AN int, list of ints, OR bool')
pat = (
"**WARNING: NO ASSOCIATIONS FOR taxid({TAXID}). "
"Taxid MUST BE AN int, list of ints, OR bool"
)
print(pat.format(TAXID=taxid))
return {}

def get_id2gos_nss(self, **kws):
"""Return all associations in a dict, id2gos, regardless of namespace"""
taxids = self._get_taxids(kws.get('taxids'), kws.get('taxid'))
taxids = self._get_taxids(kws.get("taxids"), kws.get("taxid"))
assert taxids, "NO TAXIDS FOUND"
assc = list(chain.from_iterable([self.taxid2asscs[t] for t in taxids]))
return self._get_id2gos(assc, **kws)

def get_name(self):
"""Get name using taxid"""
if len(self.taxid2asscs) == 1:
return '{BASE}_{TAXID}'.format(
BASE=self.name, TAXID=next(iter(self.taxid2asscs.keys())))
return '{BASE}_various'.format(BASE=self.name)
return "{BASE}_{TAXID}".format(
BASE=self.name, TAXID=next(iter(self.taxid2asscs.keys()))
)
return "{BASE}_various".format(BASE=self.name)

def get_taxid(self):
"""Return taxid, if one was provided. Other wise return True representing all taxids"""
return next(iter(self.taxid2asscs.keys())) if len(self.taxid2asscs) == 1 else True
return (
next(iter(self.taxid2asscs.keys())) if len(self.taxid2asscs) == 1 else True
)

def has_ns(self):
"""Return True if namespace field, NS exists on annotation namedtuples"""
Expand All @@ -96,56 +108,66 @@ def prt_counts(self, prt=sys.stdout):
num_annos = sum(len(a) for a in self.taxid2asscs.values())
# 792,891 annotations for 3 taxids stored: 10090 7227 9606
cnts = self._get_counts(list(chain.from_iterable(self.taxid2asscs.values())))
prt.write('{A:8,} annotations, {P:,} proteins/genes, {G:,} GO IDs, {N} taxids stored'.format(
A=num_annos, N=num_taxids, G=cnts['GOs'], P=cnts['geneids']))
prt.write(
"{A:8,} annotations, {P:,} proteins/genes, {G:,} GO IDs, {N} taxids stored".format(
A=num_annos, N=num_taxids, G=cnts["GOs"], P=cnts["geneids"]
)
)
if num_taxids < 5:
prt.write(': {Ts}'.format(Ts=' '.join(str(t) for t in sorted(self.taxid2asscs))))
prt.write('\n')
prt.write(
": {Ts}".format(Ts=" ".join(str(t) for t in sorted(self.taxid2asscs)))
)
prt.write("\n")
# 102,430 annotations for taxid 7227
# 323,776 annotations for taxid 9606
# 366,685 annotations for taxid 10090
if num_taxids == 1:
return
for taxid, assc in self.taxid2asscs.items():
cnts = self._get_counts(assc)
prt.write('{A:8,} annotations, {P:,} proteins/genes, {G:,} GO IDs for taxid {T}\n'.format(
A=len(assc), T=taxid, G=cnts['GOs'], P=cnts['geneids']))
prt.write(
"{A:8,} annotations, {P:,} proteins/genes, {G:,} GO IDs for taxid {T}\n".format(
A=len(assc), T=taxid, G=cnts["GOs"], P=cnts["geneids"]
)
)

@staticmethod
def _get_counts(nts):
"""Return the count of GO IDs and genes/proteins in a set of annotation namedtuples"""
sets = cx.defaultdict(set)
for ntd in nts:
sets['geneids'].add(ntd.DB_ID)
sets['GOs'].add(ntd.GO_ID)
return {'GOs':len(sets['GOs']), 'geneids':len(sets['geneids'])}
sets["geneids"].add(ntd.DB_ID)
sets["GOs"].add(ntd.GO_ID)
return {"GOs": len(sets["GOs"]), "geneids": len(sets["geneids"])}

# -- taxids2asscs -------------------------------------------------------------------------
def get_taxid2asscs(self, taxids=None, **kws):
"""Read Gene Association File (GAF). Return data."""
# WAS: get_annotations_taxid2dct
taxid2asscs = cx.defaultdict(lambda: cx.defaultdict(lambda: cx.defaultdict(set)))
taxid2asscs = cx.defaultdict(
lambda: cx.defaultdict(lambda: cx.defaultdict(set))
)
options = AnnoOptions(self.evobj, **kws)
for taxid in self._get_taxids(taxids):
nts = self.taxid2asscs[taxid]
assc = self.reduce_annotations(nts, options)
taxid2asscs[taxid]['ID2GOs'] = self.get_dbid2goids(assc)
taxid2asscs[taxid]['GO2IDs'] = self.get_goid2dbids(assc)
taxid2asscs[taxid]["ID2GOs"] = self.get_dbid2goids(assc)
taxid2asscs[taxid]["GO2IDs"] = self.get_goid2dbids(assc)
return taxid2asscs

@staticmethod
def fill_taxid2asscs(taxid2asscs_usr, taxid2asscs_ret):
"""Fill user taxid2asscs for backward compatibility."""
for taxid, ab_ret in taxid2asscs_ret.items():
taxid2asscs_usr[taxid]['ID2GOs'] = ab_ret['ID2GOs']
taxid2asscs_usr[taxid]['GO2IDs'] = ab_ret['GO2IDs']
taxid2asscs_usr[taxid]["ID2GOs"] = ab_ret["ID2GOs"]
taxid2asscs_usr[taxid]["GO2IDs"] = ab_ret["GO2IDs"]

@staticmethod
def get_id2gos_all(taxid2asscs_a2b):
"""Get associations for all stored species taxid2asscs[taxid][ID2GOs|GO2IDs]."""
id2gos_all = {}
for a2b in taxid2asscs_a2b.values():
for geneid, gos in a2b['ID2GOs'].items():
for geneid, gos in a2b["ID2GOs"].items():
id2gos_all[geneid] = gos
return id2gos_all

Expand Down
22 changes: 13 additions & 9 deletions goatools/anno/gpad_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
"""

import collections as cx
from goatools.anno.annoreader_base import AnnoReaderBase
from goatools.anno.init.reader_gpad import InitAssc

from .annoreader_base import AnnoReaderBase
from .init.reader_gpad import InitAssc

__copyright__ = "Copyright (C) 2016-2019, DV Klopfenstein, H Tang. All rights reserved."
__author__ = "DV Klopfenstein"
Expand All @@ -17,13 +18,16 @@
class GpadReader(AnnoReaderBase):
"""dRead a Gene Product Association Data (GPAD) and store the data in a Python object."""

exp_kws = {'hdr_only', 'godag', 'namespaces'}
exp_kws = {"hdr_only", "godag", "namespaces"}

def __init__(self, filename=None, **kws):
super(GpadReader, self).__init__('gpad', filename,
hdr_only=kws.get('hdr_only', False),
godag=kws.get('godag'),
namespaces=kws.get('namespaces'))
super().__init__(
"gpad",
filename,
hdr_only=kws.get("hdr_only", False),
godag=kws.get("godag"),
namespaces=kws.get("namespaces"),
)
self.qty = len(self.associations)

def get_relation_cnt(self):
Expand All @@ -36,8 +40,8 @@ def get_relation_cnt(self):

def _init_associations(self, fin_gpad, **kws):
"""Read annotation file and store a list of namedtuples."""
ini = InitAssc(fin_gpad, kws['godag'])
nts = ini.init_associations(kws['hdr_only'], kws['namespaces'])
ini = InitAssc(fin_gpad, kws["godag"])
nts = ini.init_associations(kws["hdr_only"], kws["namespaces"])
self.hdr = ini.hdr
return nts

Expand Down
2 changes: 1 addition & 1 deletion goatools/anno/idtogos_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class IdToGosReader(AnnoReaderBase):

def __init__(self, filename=None, **kws):
self.id2gos = None # ID to GO ID set as loaded from annotations file
super(IdToGosReader, self).__init__(
super().__init__(
"id2gos",
filename,
godag=kws.get("godag"),
Expand Down
Loading