Skip to content

Commit

Permalink
Replace super(Class, self) => super() (#297)
Browse files Browse the repository at this point in the history
  • Loading branch information
tanghaibao authored Jun 3, 2024
1 parent a2b7e34 commit 01144a3
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 170 deletions.
64 changes: 38 additions & 26 deletions goatools/anno/gaf_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
"""

import sys
from goatools.anno.annoreader_base import AnnoReaderBase
from goatools.anno.init.reader_gaf import GafData
from goatools.anno.init.reader_gaf import InitAssc

from .annoreader_base import AnnoReaderBase
from .init.reader_gaf import GafData, InitAssc

__copyright__ = "Copyright (C) 2016-2019, DV Klopfenstein, H Tang. All rights reserved."
__author__ = "DV Klopfenstein"
Expand All @@ -17,42 +17,52 @@
class GafReader(AnnoReaderBase):
"""Reads a Gene Annotation File (GAF). Returns a Python object."""

exp_kws = {'hdr_only', 'prt', 'namespaces', 'allow_missing_symbol', 'godag'}
exp_kws = {"hdr_only", "prt", "namespaces", "allow_missing_symbol", "godag"}

def __init__(self, filename=None, **kws):
super(GafReader, self).__init__(
'gaf', filename,
godag=kws.get('godag'),
hdr_only=kws.get('hdr_only', False),
prt=kws.get('prt', sys.stdout),
namespaces=kws.get('namespaces'),
allow_missing_symbol=kws.get('allow_missing_symbol', False))
super().__init__(
"gaf",
filename,
godag=kws.get("godag"),
hdr_only=kws.get("hdr_only", False),
prt=kws.get("prt", sys.stdout),
namespaces=kws.get("namespaces"),
allow_missing_symbol=kws.get("allow_missing_symbol", False),
)

def read_gaf(self, namespace='BP', **kws):
def read_gaf(self, namespace="BP", **kws):
"""Read Gene Association File (GAF). Return associations."""
return self.get_id2gos(namespace, **kws)

@staticmethod
def wr_txt(fout_gaf, nts):
"""Write namedtuples into a gaf format"""
pat = (
'{DB}\t{DB_ID}\t{DB_Symbol}\t{Qualifier}\t{GO_ID}\t{DB_Reference}\t'
'{Evidence_Code}\t{With_From}\t{NS}\t{DB_Name}\t{DB_Synonym}\t{DB_Type}\t'
'{Taxon}\t{Date}\t{Assigned_By}\t{Extension}\t{Gene_Product_Form_ID}\n')
sets = {'Qualifier', 'DB_Reference', 'With_From', 'DB_Name', 'DB_Synonym', 'Gene_Product_Form_ID'}
ns2a = {ns:p for p, ns in GafData.aspect2ns.items()}
with open(fout_gaf, 'w') as prt:
prt.write('!gaf-version: 2.1\n')
"{DB}\t{DB_ID}\t{DB_Symbol}\t{Qualifier}\t{GO_ID}\t{DB_Reference}\t"
"{Evidence_Code}\t{With_From}\t{NS}\t{DB_Name}\t{DB_Synonym}\t{DB_Type}\t"
"{Taxon}\t{Date}\t{Assigned_By}\t{Extension}\t{Gene_Product_Form_ID}\n"
)
sets = {
"Qualifier",
"DB_Reference",
"With_From",
"DB_Name",
"DB_Synonym",
"Gene_Product_Form_ID",
}
ns2a = {ns: p for p, ns in GafData.aspect2ns.items()}
with open(fout_gaf, "w") as prt:
prt.write("!gaf-version: 2.1\n")
for ntd in nts:
dct = ntd._asdict()
for fld in sets:
dct[fld] = '|'.join(sorted(dct[fld]))
dct['Taxon'] = '|'.join(['taxon:{T}'.format(T=t) for t in dct['Taxon']])
dct['NS'] = ns2a[dct['NS']]
dct['Date'] = dct['Date'].strftime('%Y%m%d')
dct[fld] = "|".join(sorted(dct[fld]))
dct["Taxon"] = "|".join(["taxon:{T}".format(T=t) for t in dct["Taxon"]])
dct["NS"] = ns2a[dct["NS"]]
dct["Date"] = dct["Date"].strftime("%Y%m%d")
prt.write(pat.format(**dct))
#prt.write('{NT}\n'.format(NT=ntd))
print(' {N} annotations WROTE: {GAF}'.format(N=len(nts), GAF=fout_gaf))
# prt.write('{NT}\n'.format(NT=ntd))
print(" {N} annotations WROTE: {GAF}".format(N=len(nts), GAF=fout_gaf))

def chk_associations(self, fout_err="gaf.err"):
"""Check that fields are legal in GAF"""
Expand All @@ -67,7 +77,9 @@ def has_ns(self):
def _init_associations(self, fin_gaf, **kws):
"""Read annotation file and store a list of namedtuples."""
ini = InitAssc(fin_gaf)
nts = ini.init_associations(kws['hdr_only'], kws['prt'], kws['namespaces'], kws['allow_missing_symbol'])
nts = ini.init_associations(
kws["hdr_only"], kws["prt"], kws["namespaces"], kws["allow_missing_symbol"]
)
self.hdr = ini.hdr
return nts

Expand Down
82 changes: 52 additions & 30 deletions goatools/anno/genetogo_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,39 @@
"""

import sys
import collections as cx
import sys

from itertools import chain
from goatools.anno.init.reader_genetogo import InitAssc
from goatools.anno.annoreader_base import AnnoReaderBase
from goatools.anno.opts import AnnoOptions

__copyright__ = "Copyright (C) 2016-present, DV Klopfenstein, H Tang. All rights reserved."
from .annoreader_base import AnnoReaderBase
from .init.reader_genetogo import InitAssc
from .opts import AnnoOptions

__copyright__ = (
"Copyright (C) 2016-present, DV Klopfenstein, H Tang. All rights reserved."
)
__author__ = "DV Klopfenstein"


# pylint: disable=broad-except,too-few-public-methods,line-too-long
class Gene2GoReader(AnnoReaderBase):
"""Reads a Gene Annotation File (GAF). Returns a Python object."""

exp_kws = {'taxids', 'taxid', 'namespaces', 'godag'}
exp_kws = {"taxids", "taxid", "namespaces", "godag"}

def __init__(self, filename=None, **kws):
# kws: taxids or taxid
super(Gene2GoReader, self).__init__('gene2go', filename, **kws)
super().__init__("gene2go", filename, **kws)
# Each taxid has a list of namedtuples - one for each line in the annotations
self.taxid2asscs = self._init_taxid2asscs()

def get_ns2assc(self, taxid=None, **kws):
"""Return given associations into 3 (BP, MF, CC) dicts, id2gos"""
return {ns:self._get_id2gos(nts, **kws) for ns, nts in self.get_ns2ntsanno(taxid).items()}
return {
ns: self._get_id2gos(nts, **kws)
for ns, nts in self.get_ns2ntsanno(taxid).items()
}

def get_ns2ntsanno(self, taxid=None):
"""Return all associations in three (one for BP MF CC) dicts, id2gos"""
Expand Down Expand Up @@ -63,28 +70,33 @@ def get_associations(self, taxid=None):
@staticmethod
def _warning_taxid(taxid):
"""Warn if an unexpected taxid"""
pat = ('**WARNING: NO ASSOCIATIONS FOR taxid({TAXID}). '
'Taxid MUST BE AN int, list of ints, OR bool')
pat = (
"**WARNING: NO ASSOCIATIONS FOR taxid({TAXID}). "
"Taxid MUST BE AN int, list of ints, OR bool"
)
print(pat.format(TAXID=taxid))
return {}

def get_id2gos_nss(self, **kws):
"""Return all associations in a dict, id2gos, regardless of namespace"""
taxids = self._get_taxids(kws.get('taxids'), kws.get('taxid'))
taxids = self._get_taxids(kws.get("taxids"), kws.get("taxid"))
assert taxids, "NO TAXIDS FOUND"
assc = list(chain.from_iterable([self.taxid2asscs[t] for t in taxids]))
return self._get_id2gos(assc, **kws)

def get_name(self):
"""Get name using taxid"""
if len(self.taxid2asscs) == 1:
return '{BASE}_{TAXID}'.format(
BASE=self.name, TAXID=next(iter(self.taxid2asscs.keys())))
return '{BASE}_various'.format(BASE=self.name)
return "{BASE}_{TAXID}".format(
BASE=self.name, TAXID=next(iter(self.taxid2asscs.keys()))
)
return "{BASE}_various".format(BASE=self.name)

def get_taxid(self):
"""Return taxid, if one was provided. Other wise return True representing all taxids"""
return next(iter(self.taxid2asscs.keys())) if len(self.taxid2asscs) == 1 else True
return (
next(iter(self.taxid2asscs.keys())) if len(self.taxid2asscs) == 1 else True
)

def has_ns(self):
"""Return True if namespace field, NS exists on annotation namedtuples"""
Expand All @@ -96,56 +108,66 @@ def prt_counts(self, prt=sys.stdout):
num_annos = sum(len(a) for a in self.taxid2asscs.values())
# 792,891 annotations for 3 taxids stored: 10090 7227 9606
cnts = self._get_counts(list(chain.from_iterable(self.taxid2asscs.values())))
prt.write('{A:8,} annotations, {P:,} proteins/genes, {G:,} GO IDs, {N} taxids stored'.format(
A=num_annos, N=num_taxids, G=cnts['GOs'], P=cnts['geneids']))
prt.write(
"{A:8,} annotations, {P:,} proteins/genes, {G:,} GO IDs, {N} taxids stored".format(
A=num_annos, N=num_taxids, G=cnts["GOs"], P=cnts["geneids"]
)
)
if num_taxids < 5:
prt.write(': {Ts}'.format(Ts=' '.join(str(t) for t in sorted(self.taxid2asscs))))
prt.write('\n')
prt.write(
": {Ts}".format(Ts=" ".join(str(t) for t in sorted(self.taxid2asscs)))
)
prt.write("\n")
# 102,430 annotations for taxid 7227
# 323,776 annotations for taxid 9606
# 366,685 annotations for taxid 10090
if num_taxids == 1:
return
for taxid, assc in self.taxid2asscs.items():
cnts = self._get_counts(assc)
prt.write('{A:8,} annotations, {P:,} proteins/genes, {G:,} GO IDs for taxid {T}\n'.format(
A=len(assc), T=taxid, G=cnts['GOs'], P=cnts['geneids']))
prt.write(
"{A:8,} annotations, {P:,} proteins/genes, {G:,} GO IDs for taxid {T}\n".format(
A=len(assc), T=taxid, G=cnts["GOs"], P=cnts["geneids"]
)
)

@staticmethod
def _get_counts(nts):
"""Return the count of GO IDs and genes/proteins in a set of annotation namedtuples"""
sets = cx.defaultdict(set)
for ntd in nts:
sets['geneids'].add(ntd.DB_ID)
sets['GOs'].add(ntd.GO_ID)
return {'GOs':len(sets['GOs']), 'geneids':len(sets['geneids'])}
sets["geneids"].add(ntd.DB_ID)
sets["GOs"].add(ntd.GO_ID)
return {"GOs": len(sets["GOs"]), "geneids": len(sets["geneids"])}

# -- taxids2asscs -------------------------------------------------------------------------
def get_taxid2asscs(self, taxids=None, **kws):
"""Read Gene Association File (GAF). Return data."""
# WAS: get_annotations_taxid2dct
taxid2asscs = cx.defaultdict(lambda: cx.defaultdict(lambda: cx.defaultdict(set)))
taxid2asscs = cx.defaultdict(
lambda: cx.defaultdict(lambda: cx.defaultdict(set))
)
options = AnnoOptions(self.evobj, **kws)
for taxid in self._get_taxids(taxids):
nts = self.taxid2asscs[taxid]
assc = self.reduce_annotations(nts, options)
taxid2asscs[taxid]['ID2GOs'] = self.get_dbid2goids(assc)
taxid2asscs[taxid]['GO2IDs'] = self.get_goid2dbids(assc)
taxid2asscs[taxid]["ID2GOs"] = self.get_dbid2goids(assc)
taxid2asscs[taxid]["GO2IDs"] = self.get_goid2dbids(assc)
return taxid2asscs

@staticmethod
def fill_taxid2asscs(taxid2asscs_usr, taxid2asscs_ret):
"""Fill user taxid2asscs for backward compatibility."""
for taxid, ab_ret in taxid2asscs_ret.items():
taxid2asscs_usr[taxid]['ID2GOs'] = ab_ret['ID2GOs']
taxid2asscs_usr[taxid]['GO2IDs'] = ab_ret['GO2IDs']
taxid2asscs_usr[taxid]["ID2GOs"] = ab_ret["ID2GOs"]
taxid2asscs_usr[taxid]["GO2IDs"] = ab_ret["GO2IDs"]

@staticmethod
def get_id2gos_all(taxid2asscs_a2b):
"""Get associations for all stored species taxid2asscs[taxid][ID2GOs|GO2IDs]."""
id2gos_all = {}
for a2b in taxid2asscs_a2b.values():
for geneid, gos in a2b['ID2GOs'].items():
for geneid, gos in a2b["ID2GOs"].items():
id2gos_all[geneid] = gos
return id2gos_all

Expand Down
22 changes: 13 additions & 9 deletions goatools/anno/gpad_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
"""

import collections as cx
from goatools.anno.annoreader_base import AnnoReaderBase
from goatools.anno.init.reader_gpad import InitAssc

from .annoreader_base import AnnoReaderBase
from .init.reader_gpad import InitAssc

__copyright__ = "Copyright (C) 2016-2019, DV Klopfenstein, H Tang. All rights reserved."
__author__ = "DV Klopfenstein"
Expand All @@ -17,13 +18,16 @@
class GpadReader(AnnoReaderBase):
"""dRead a Gene Product Association Data (GPAD) and store the data in a Python object."""

exp_kws = {'hdr_only', 'godag', 'namespaces'}
exp_kws = {"hdr_only", "godag", "namespaces"}

def __init__(self, filename=None, **kws):
super(GpadReader, self).__init__('gpad', filename,
hdr_only=kws.get('hdr_only', False),
godag=kws.get('godag'),
namespaces=kws.get('namespaces'))
super().__init__(
"gpad",
filename,
hdr_only=kws.get("hdr_only", False),
godag=kws.get("godag"),
namespaces=kws.get("namespaces"),
)
self.qty = len(self.associations)

def get_relation_cnt(self):
Expand All @@ -36,8 +40,8 @@ def get_relation_cnt(self):

def _init_associations(self, fin_gpad, **kws):
"""Read annotation file and store a list of namedtuples."""
ini = InitAssc(fin_gpad, kws['godag'])
nts = ini.init_associations(kws['hdr_only'], kws['namespaces'])
ini = InitAssc(fin_gpad, kws["godag"])
nts = ini.init_associations(kws["hdr_only"], kws["namespaces"])
self.hdr = ini.hdr
return nts

Expand Down
2 changes: 1 addition & 1 deletion goatools/anno/idtogos_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class IdToGosReader(AnnoReaderBase):

def __init__(self, filename=None, **kws):
self.id2gos = None # ID to GO ID set as loaded from annotations file
super(IdToGosReader, self).__init__(
super().__init__(
"id2gos",
filename,
godag=kws.get("godag"),
Expand Down
Loading

0 comments on commit 01144a3

Please sign in to comment.