From 4dea5939f4f1875cf219175afd93d10091360140 Mon Sep 17 00:00:00 2001 From: Haibao Tang Date: Sun, 2 Jun 2024 16:46:32 -0700 Subject: [PATCH] Replace super(Class, self) => super() --- goatools/anno/gaf_reader.py | 64 ++++++++++------- goatools/anno/genetogo_reader.py | 82 +++++++++++++-------- goatools/anno/gpad_reader.py | 22 +++--- goatools/anno/idtogos_reader.py | 2 +- goatools/gosubdag/go_edges.py | 74 ++++++++----------- goatools/grouper/wr_sections.py | 120 ++++++++++++++++++++----------- goatools/obo_parser.py | 7 +- goatools/pvalcalc.py | 48 +++++++++---- 8 files changed, 249 insertions(+), 170 deletions(-) diff --git a/goatools/anno/gaf_reader.py b/goatools/anno/gaf_reader.py index 5c92326c..ecea2360 100755 --- a/goatools/anno/gaf_reader.py +++ b/goatools/anno/gaf_reader.py @@ -5,9 +5,9 @@ """ import sys -from goatools.anno.annoreader_base import AnnoReaderBase -from goatools.anno.init.reader_gaf import GafData -from goatools.anno.init.reader_gaf import InitAssc + +from .annoreader_base import AnnoReaderBase +from .init.reader_gaf import GafData, InitAssc __copyright__ = "Copyright (C) 2016-2019, DV Klopfenstein, H Tang. All rights reserved." __author__ = "DV Klopfenstein" @@ -17,18 +17,20 @@ class GafReader(AnnoReaderBase): """Reads a Gene Annotation File (GAF). Returns a Python object.""" - exp_kws = {'hdr_only', 'prt', 'namespaces', 'allow_missing_symbol', 'godag'} + exp_kws = {"hdr_only", "prt", "namespaces", "allow_missing_symbol", "godag"} def __init__(self, filename=None, **kws): - super(GafReader, self).__init__( - 'gaf', filename, - godag=kws.get('godag'), - hdr_only=kws.get('hdr_only', False), - prt=kws.get('prt', sys.stdout), - namespaces=kws.get('namespaces'), - allow_missing_symbol=kws.get('allow_missing_symbol', False)) + super().__init__( + "gaf", + filename, + godag=kws.get("godag"), + hdr_only=kws.get("hdr_only", False), + prt=kws.get("prt", sys.stdout), + namespaces=kws.get("namespaces"), + allow_missing_symbol=kws.get("allow_missing_symbol", False), + ) - def read_gaf(self, namespace='BP', **kws): + def read_gaf(self, namespace="BP", **kws): """Read Gene Association File (GAF). Return associations.""" return self.get_id2gos(namespace, **kws) @@ -36,23 +38,31 @@ def read_gaf(self, namespace='BP', **kws): def wr_txt(fout_gaf, nts): """Write namedtuples into a gaf format""" pat = ( - '{DB}\t{DB_ID}\t{DB_Symbol}\t{Qualifier}\t{GO_ID}\t{DB_Reference}\t' - '{Evidence_Code}\t{With_From}\t{NS}\t{DB_Name}\t{DB_Synonym}\t{DB_Type}\t' - '{Taxon}\t{Date}\t{Assigned_By}\t{Extension}\t{Gene_Product_Form_ID}\n') - sets = {'Qualifier', 'DB_Reference', 'With_From', 'DB_Name', 'DB_Synonym', 'Gene_Product_Form_ID'} - ns2a = {ns:p for p, ns in GafData.aspect2ns.items()} - with open(fout_gaf, 'w') as prt: - prt.write('!gaf-version: 2.1\n') + "{DB}\t{DB_ID}\t{DB_Symbol}\t{Qualifier}\t{GO_ID}\t{DB_Reference}\t" + "{Evidence_Code}\t{With_From}\t{NS}\t{DB_Name}\t{DB_Synonym}\t{DB_Type}\t" + "{Taxon}\t{Date}\t{Assigned_By}\t{Extension}\t{Gene_Product_Form_ID}\n" + ) + sets = { + "Qualifier", + "DB_Reference", + "With_From", + "DB_Name", + "DB_Synonym", + "Gene_Product_Form_ID", + } + ns2a = {ns: p for p, ns in GafData.aspect2ns.items()} + with open(fout_gaf, "w") as prt: + prt.write("!gaf-version: 2.1\n") for ntd in nts: dct = ntd._asdict() for fld in sets: - dct[fld] = '|'.join(sorted(dct[fld])) - dct['Taxon'] = '|'.join(['taxon:{T}'.format(T=t) for t in dct['Taxon']]) - dct['NS'] = ns2a[dct['NS']] - dct['Date'] = dct['Date'].strftime('%Y%m%d') + dct[fld] = "|".join(sorted(dct[fld])) + dct["Taxon"] = "|".join(["taxon:{T}".format(T=t) for t in dct["Taxon"]]) + dct["NS"] = ns2a[dct["NS"]] + dct["Date"] = dct["Date"].strftime("%Y%m%d") prt.write(pat.format(**dct)) - #prt.write('{NT}\n'.format(NT=ntd)) - print(' {N} annotations WROTE: {GAF}'.format(N=len(nts), GAF=fout_gaf)) + # prt.write('{NT}\n'.format(NT=ntd)) + print(" {N} annotations WROTE: {GAF}".format(N=len(nts), GAF=fout_gaf)) def chk_associations(self, fout_err="gaf.err"): """Check that fields are legal in GAF""" @@ -67,7 +77,9 @@ def has_ns(self): def _init_associations(self, fin_gaf, **kws): """Read annotation file and store a list of namedtuples.""" ini = InitAssc(fin_gaf) - nts = ini.init_associations(kws['hdr_only'], kws['prt'], kws['namespaces'], kws['allow_missing_symbol']) + nts = ini.init_associations( + kws["hdr_only"], kws["prt"], kws["namespaces"], kws["allow_missing_symbol"] + ) self.hdr = ini.hdr return nts diff --git a/goatools/anno/genetogo_reader.py b/goatools/anno/genetogo_reader.py index 8f1c6617..8bf646f1 100755 --- a/goatools/anno/genetogo_reader.py +++ b/goatools/anno/genetogo_reader.py @@ -5,14 +5,18 @@ """ -import sys import collections as cx +import sys + from itertools import chain -from goatools.anno.init.reader_genetogo import InitAssc -from goatools.anno.annoreader_base import AnnoReaderBase -from goatools.anno.opts import AnnoOptions -__copyright__ = "Copyright (C) 2016-present, DV Klopfenstein, H Tang. All rights reserved." +from .annoreader_base import AnnoReaderBase +from .init.reader_genetogo import InitAssc +from .opts import AnnoOptions + +__copyright__ = ( + "Copyright (C) 2016-present, DV Klopfenstein, H Tang. All rights reserved." +) __author__ = "DV Klopfenstein" @@ -20,17 +24,20 @@ class Gene2GoReader(AnnoReaderBase): """Reads a Gene Annotation File (GAF). Returns a Python object.""" - exp_kws = {'taxids', 'taxid', 'namespaces', 'godag'} + exp_kws = {"taxids", "taxid", "namespaces", "godag"} def __init__(self, filename=None, **kws): # kws: taxids or taxid - super(Gene2GoReader, self).__init__('gene2go', filename, **kws) + super().__init__("gene2go", filename, **kws) # Each taxid has a list of namedtuples - one for each line in the annotations self.taxid2asscs = self._init_taxid2asscs() def get_ns2assc(self, taxid=None, **kws): """Return given associations into 3 (BP, MF, CC) dicts, id2gos""" - return {ns:self._get_id2gos(nts, **kws) for ns, nts in self.get_ns2ntsanno(taxid).items()} + return { + ns: self._get_id2gos(nts, **kws) + for ns, nts in self.get_ns2ntsanno(taxid).items() + } def get_ns2ntsanno(self, taxid=None): """Return all associations in three (one for BP MF CC) dicts, id2gos""" @@ -63,14 +70,16 @@ def get_associations(self, taxid=None): @staticmethod def _warning_taxid(taxid): """Warn if an unexpected taxid""" - pat = ('**WARNING: NO ASSOCIATIONS FOR taxid({TAXID}). ' - 'Taxid MUST BE AN int, list of ints, OR bool') + pat = ( + "**WARNING: NO ASSOCIATIONS FOR taxid({TAXID}). " + "Taxid MUST BE AN int, list of ints, OR bool" + ) print(pat.format(TAXID=taxid)) return {} def get_id2gos_nss(self, **kws): """Return all associations in a dict, id2gos, regardless of namespace""" - taxids = self._get_taxids(kws.get('taxids'), kws.get('taxid')) + taxids = self._get_taxids(kws.get("taxids"), kws.get("taxid")) assert taxids, "NO TAXIDS FOUND" assc = list(chain.from_iterable([self.taxid2asscs[t] for t in taxids])) return self._get_id2gos(assc, **kws) @@ -78,13 +87,16 @@ def get_id2gos_nss(self, **kws): def get_name(self): """Get name using taxid""" if len(self.taxid2asscs) == 1: - return '{BASE}_{TAXID}'.format( - BASE=self.name, TAXID=next(iter(self.taxid2asscs.keys()))) - return '{BASE}_various'.format(BASE=self.name) + return "{BASE}_{TAXID}".format( + BASE=self.name, TAXID=next(iter(self.taxid2asscs.keys())) + ) + return "{BASE}_various".format(BASE=self.name) def get_taxid(self): """Return taxid, if one was provided. Other wise return True representing all taxids""" - return next(iter(self.taxid2asscs.keys())) if len(self.taxid2asscs) == 1 else True + return ( + next(iter(self.taxid2asscs.keys())) if len(self.taxid2asscs) == 1 else True + ) def has_ns(self): """Return True if namespace field, NS exists on annotation namedtuples""" @@ -96,11 +108,16 @@ def prt_counts(self, prt=sys.stdout): num_annos = sum(len(a) for a in self.taxid2asscs.values()) # 792,891 annotations for 3 taxids stored: 10090 7227 9606 cnts = self._get_counts(list(chain.from_iterable(self.taxid2asscs.values()))) - prt.write('{A:8,} annotations, {P:,} proteins/genes, {G:,} GO IDs, {N} taxids stored'.format( - A=num_annos, N=num_taxids, G=cnts['GOs'], P=cnts['geneids'])) + prt.write( + "{A:8,} annotations, {P:,} proteins/genes, {G:,} GO IDs, {N} taxids stored".format( + A=num_annos, N=num_taxids, G=cnts["GOs"], P=cnts["geneids"] + ) + ) if num_taxids < 5: - prt.write(': {Ts}'.format(Ts=' '.join(str(t) for t in sorted(self.taxid2asscs)))) - prt.write('\n') + prt.write( + ": {Ts}".format(Ts=" ".join(str(t) for t in sorted(self.taxid2asscs))) + ) + prt.write("\n") # 102,430 annotations for taxid 7227 # 323,776 annotations for taxid 9606 # 366,685 annotations for taxid 10090 @@ -108,44 +125,49 @@ def prt_counts(self, prt=sys.stdout): return for taxid, assc in self.taxid2asscs.items(): cnts = self._get_counts(assc) - prt.write('{A:8,} annotations, {P:,} proteins/genes, {G:,} GO IDs for taxid {T}\n'.format( - A=len(assc), T=taxid, G=cnts['GOs'], P=cnts['geneids'])) + prt.write( + "{A:8,} annotations, {P:,} proteins/genes, {G:,} GO IDs for taxid {T}\n".format( + A=len(assc), T=taxid, G=cnts["GOs"], P=cnts["geneids"] + ) + ) @staticmethod def _get_counts(nts): """Return the count of GO IDs and genes/proteins in a set of annotation namedtuples""" sets = cx.defaultdict(set) for ntd in nts: - sets['geneids'].add(ntd.DB_ID) - sets['GOs'].add(ntd.GO_ID) - return {'GOs':len(sets['GOs']), 'geneids':len(sets['geneids'])} + sets["geneids"].add(ntd.DB_ID) + sets["GOs"].add(ntd.GO_ID) + return {"GOs": len(sets["GOs"]), "geneids": len(sets["geneids"])} # -- taxids2asscs ------------------------------------------------------------------------- def get_taxid2asscs(self, taxids=None, **kws): """Read Gene Association File (GAF). Return data.""" # WAS: get_annotations_taxid2dct - taxid2asscs = cx.defaultdict(lambda: cx.defaultdict(lambda: cx.defaultdict(set))) + taxid2asscs = cx.defaultdict( + lambda: cx.defaultdict(lambda: cx.defaultdict(set)) + ) options = AnnoOptions(self.evobj, **kws) for taxid in self._get_taxids(taxids): nts = self.taxid2asscs[taxid] assc = self.reduce_annotations(nts, options) - taxid2asscs[taxid]['ID2GOs'] = self.get_dbid2goids(assc) - taxid2asscs[taxid]['GO2IDs'] = self.get_goid2dbids(assc) + taxid2asscs[taxid]["ID2GOs"] = self.get_dbid2goids(assc) + taxid2asscs[taxid]["GO2IDs"] = self.get_goid2dbids(assc) return taxid2asscs @staticmethod def fill_taxid2asscs(taxid2asscs_usr, taxid2asscs_ret): """Fill user taxid2asscs for backward compatibility.""" for taxid, ab_ret in taxid2asscs_ret.items(): - taxid2asscs_usr[taxid]['ID2GOs'] = ab_ret['ID2GOs'] - taxid2asscs_usr[taxid]['GO2IDs'] = ab_ret['GO2IDs'] + taxid2asscs_usr[taxid]["ID2GOs"] = ab_ret["ID2GOs"] + taxid2asscs_usr[taxid]["GO2IDs"] = ab_ret["GO2IDs"] @staticmethod def get_id2gos_all(taxid2asscs_a2b): """Get associations for all stored species taxid2asscs[taxid][ID2GOs|GO2IDs].""" id2gos_all = {} for a2b in taxid2asscs_a2b.values(): - for geneid, gos in a2b['ID2GOs'].items(): + for geneid, gos in a2b["ID2GOs"].items(): id2gos_all[geneid] = gos return id2gos_all diff --git a/goatools/anno/gpad_reader.py b/goatools/anno/gpad_reader.py index a1f88327..2e706de9 100755 --- a/goatools/anno/gpad_reader.py +++ b/goatools/anno/gpad_reader.py @@ -7,8 +7,9 @@ """ import collections as cx -from goatools.anno.annoreader_base import AnnoReaderBase -from goatools.anno.init.reader_gpad import InitAssc + +from .annoreader_base import AnnoReaderBase +from .init.reader_gpad import InitAssc __copyright__ = "Copyright (C) 2016-2019, DV Klopfenstein, H Tang. All rights reserved." __author__ = "DV Klopfenstein" @@ -17,13 +18,16 @@ class GpadReader(AnnoReaderBase): """dRead a Gene Product Association Data (GPAD) and store the data in a Python object.""" - exp_kws = {'hdr_only', 'godag', 'namespaces'} + exp_kws = {"hdr_only", "godag", "namespaces"} def __init__(self, filename=None, **kws): - super(GpadReader, self).__init__('gpad', filename, - hdr_only=kws.get('hdr_only', False), - godag=kws.get('godag'), - namespaces=kws.get('namespaces')) + super().__init__( + "gpad", + filename, + hdr_only=kws.get("hdr_only", False), + godag=kws.get("godag"), + namespaces=kws.get("namespaces"), + ) self.qty = len(self.associations) def get_relation_cnt(self): @@ -36,8 +40,8 @@ def get_relation_cnt(self): def _init_associations(self, fin_gpad, **kws): """Read annotation file and store a list of namedtuples.""" - ini = InitAssc(fin_gpad, kws['godag']) - nts = ini.init_associations(kws['hdr_only'], kws['namespaces']) + ini = InitAssc(fin_gpad, kws["godag"]) + nts = ini.init_associations(kws["hdr_only"], kws["namespaces"]) self.hdr = ini.hdr return nts diff --git a/goatools/anno/idtogos_reader.py b/goatools/anno/idtogos_reader.py index beb7cc59..da1f1c3c 100755 --- a/goatools/anno/idtogos_reader.py +++ b/goatools/anno/idtogos_reader.py @@ -17,7 +17,7 @@ class IdToGosReader(AnnoReaderBase): def __init__(self, filename=None, **kws): self.id2gos = None # ID to GO ID set as loaded from annotations file - super(IdToGosReader, self).__init__( + super().__init__( "id2gos", filename, godag=kws.get("godag"), diff --git a/goatools/gosubdag/go_edges.py b/goatools/gosubdag/go_edges.py index 2c34ae2d..75920716 100644 --- a/goatools/gosubdag/go_edges.py +++ b/goatools/gosubdag/go_edges.py @@ -22,11 +22,12 @@ def get_edgesobj(gosubdag, **kws): # get_edgesobj(go2obj, go_sources=..., traverse_child=...,) # get_edgesobj(go2obj, go_sources=..., traverse_parent=..., traverse_child=...,) edgeobj = _get_edgesobj(gosubdag, **kws) - rm_gos = kws.get('rm_gos') + rm_gos = kws.get("rm_gos") if rm_gos is not None: edgeobj.rm_gos(rm_gos) return edgeobj + def _get_edgesobj(gosubdag, **kws): """Return specfied GoSubDag initialization object.""" # Keyword args (kws): @@ -42,12 +43,13 @@ def _get_edgesobj(gosubdag, **kws): # get_edgesobj(go2obj, go_sources=..., traverse_parent=...,) # get_edgesobj(go2obj, go_sources=..., traverse_child=...,) # get_edgesobj(go2obj, go_sources=..., traverse_parent=..., traverse_child=...,) - dst_srcs_list = kws.get('dst_srcs_list', None) + dst_srcs_list = kws.get("dst_srcs_list", None) if dst_srcs_list is not None: return EdgesPath(gosubdag, dst_srcs_list) - return EdgesRelatives(gosubdag, - kws.get('traverse_parent', True), - kws.get('traverse_child', False)) + return EdgesRelatives( + gosubdag, kws.get("traverse_parent", True), kws.get("traverse_child", False) + ) + # -- Base Class ---------------------------------------------------------------- class EdgesBase(object): @@ -104,8 +106,11 @@ def chk_edges_nodes(edges, nodes, name): """Check that user specified edges have a node which exists.""" edge_nodes = set(e for es in edges for e in es) missing_nodes = edge_nodes.difference(nodes) - assert not missing_nodes, "MISSING: {GOs}\n{NM} EDGES MISSING {N} NODES (OF {T})".format( - NM=name, N=len(missing_nodes), T=len(edge_nodes), GOs=missing_nodes) + assert ( + not missing_nodes + ), "MISSING: {GOs}\n{NM} EDGES MISSING {N} NODES (OF {T})".format( + NM=name, N=len(missing_nodes), T=len(edge_nodes), GOs=missing_nodes + ) def get_c2ps(self): """Set child2parents dict for all parents used in this set of edges.""" @@ -123,7 +128,6 @@ def _getobjs_higher(self, goobj): return goobjs_higher - # -- Initialization by considering all child and/or parent relatives ----------- class EdgesRelatives(EdgesBase): """Inits GO-to-GO edges using all relatives above and/or below source GOs.""" @@ -131,7 +135,7 @@ class EdgesRelatives(EdgesBase): # pylint: disable=too-many-arguments # def __init__(self, go2obj, relationships, go_sources, traverse_parent, traverse_child): def __init__(self, gosubdag, traverse_parent, traverse_child): - super(EdgesRelatives, self).__init__(gosubdag) + super().__init__(gosubdag) # go2obj contain GO IDs in subset _gos = set(gosubdag.go2obj) assert traverse_child or traverse_parent, "NO EDGES IN GRAPH" @@ -143,15 +147,12 @@ def __init__(self, gosubdag, traverse_parent, traverse_child): rel2src2dsts = self._init_rel2src2dsts(_gos, traverse_parent) rel2dst2srcs = self._init_rel2dst2srcs(_gos, traverse_child) # Set by derived edge class - # self.edges = self._init_edges(_gos, p2cs, c2ps) self.edges = self._init_edges(p2cs, c2ps) self.edges_rel = self._init_edges_relationships(rel2src2dsts, rel2dst2srcs) assert _gos == set(self.go2obj) - # self.chk_edges() @staticmethod # Too slow to check goids_present as we go. Only minor init modes need checking. - # def _init_edges(goids_present, p2cs, c2ps): def _init_edges(p2cs, c2ps): """Get the directed edges from GO term to GO term.""" edge_from_to = [] @@ -183,12 +184,11 @@ def _init_edges_relationships(rel2src2dsts, rel2dst2srcs): edge_rel2fromto[reltype] = edge_from_to return edge_rel2fromto - # ------------------------------------------------------------------- def _init_rel2src2dsts(self, go_sources, traverse_parent): """Traverse up parents.""" if not traverse_parent or not self.relationships: return {} - rel2src2dsts = {r:defaultdict(set) for r in self.relationships} + rel2src2dsts = {r: defaultdict(set) for r in self.relationships} goids_seen = set() go2obj = self.go2obj for goid_src in go_sources: @@ -201,11 +201,9 @@ def _traverse_relationship_objs(self, rel2src2dsts, goobj_child, goids_seen): """Traverse from source GO up relationships.""" child_id = goobj_child.id goids_seen.add(child_id) - ##A self.go2obj[child_id] = goobj_child # Update goids_seen and go2obj with child alt_ids for goid_altid in goobj_child.alt_ids: goids_seen.add(goid_altid) - ##A self.go2obj[goid_altid] = goobj_child # Loop through relationships of child object for reltype, recs in goobj_child.relationship.items(): if reltype in self.relationships: @@ -214,31 +212,32 @@ def _traverse_relationship_objs(self, rel2src2dsts, goobj_child, goids_seen): rel2src2dsts[reltype][relationship_id].add(child_id) # If relationship has not been seen, traverse if relationship_id not in goids_seen: - self._traverse_relationship_objs(rel2src2dsts, relationship_obj, goids_seen) + self._traverse_relationship_objs( + rel2src2dsts, relationship_obj, goids_seen + ) - # ------------------------------------------------------------------- def _init_rel2dst2srcs(self, go_sources, traverse_child): """Traverse through reverse relationships.""" if not traverse_child or not self.relationships: return {} - rel2dst2srcs = {r:defaultdict(set) for r in self.relationships} + rel2dst2srcs = {r: defaultdict(set) for r in self.relationships} goids_seen = set() go2obj = self.go2obj for goid_src in go_sources: goobj_src = go2obj[goid_src] if goid_src not in goids_seen: - self._traverse_relationship_rev_objs(rel2dst2srcs, goobj_src, goids_seen) + self._traverse_relationship_rev_objs( + rel2dst2srcs, goobj_src, goids_seen + ) return rel2dst2srcs def _traverse_relationship_rev_objs(self, rel2dst2srcs, goobj_parent, goids_seen): """Traverse from source GO down children.""" parent_id = goobj_parent.id goids_seen.add(parent_id) - ##A self.go2obj[parent_id] = goobj_parent # Update goids_seen and go2obj with parent alt_ids for goid_altid in goobj_parent.alt_ids: goids_seen.add(goid_altid) - ##A self.go2obj[goid_altid] = goobj_parent # Loop through children for reltype, recs in goobj_parent.relationship.items(): if reltype in self.relationships: @@ -247,10 +246,10 @@ def _traverse_relationship_rev_objs(self, rel2dst2srcs, goobj_parent, goids_seen rel2dst2srcs[relrev_id].add(parent_id) # If child has not been seen, traverse if relrev_id not in goids_seen: - ##F self._traverse_relrev_objs(rel2dst2srcs, relrev_obj, go2obj, goids_seen) - self._traverse_relationship_rev_objs(rel2dst2srcs, relrev_obj, goids_seen) + self._traverse_relationship_rev_objs( + rel2dst2srcs, relrev_obj, goids_seen + ) - # ------------------------------------------------------------------- def _init_p2cs(self, go_sources, traverse_parent): """Traverse up parents.""" if not traverse_parent: @@ -261,32 +260,26 @@ def _init_p2cs(self, go_sources, traverse_parent): for goid_src in go_sources: goobj_src = go2obj[goid_src] if goid_src not in goids_seen: - ##F self._traverse_parent_objs(p2cs, goobj_src, go2obj, goids_seen) self._traverse_parent_objs(p2cs, goobj_src, goids_seen) return p2cs - ##F def _traverse_parent_objs(self, p2cs, goobj_child, go2obj, goids_seen): def _traverse_parent_objs(self, p2cs, goobj_child, goids_seen): """Traverse from source GO up parents.""" # Update public(go2obj p2cs), private(goids_seen) child_id = goobj_child.id # mark child as seen goids_seen.add(child_id) - ##A self.go2obj[child_id] = goobj_child # Update goids_seen and go2obj with child alt_ids for goid_altid in goobj_child.alt_ids: goids_seen.add(goid_altid) - ##A self.go2obj[goid_altid] = goobj_child # Loop through parents of child object for parent_obj in goobj_child.parents: parent_id = parent_obj.id p2cs[parent_id].add(child_id) # If parent has not been seen, traverse if parent_id not in goids_seen: - ##F self._traverse_parent_objs(p2cs, parent_obj, go2obj, goids_seen) self._traverse_parent_objs(p2cs, parent_obj, goids_seen) - # ------------------------------------------------------------------- def _init_c2ps(self, go_sources, traverse_child): """Traverse up children.""" if not traverse_child: @@ -297,29 +290,24 @@ def _init_c2ps(self, go_sources, traverse_child): for goid_src in go_sources: goobj_src = go2obj[goid_src] if goid_src not in goids_seen: - ##F self._traverse_child_objs(c2ps, goobj_src, go2obj, goids_seen) self._traverse_child_objs(c2ps, goobj_src, goids_seen) return c2ps - ##F def _traverse_child_objs(self, c2ps, goobj_parent, go2obj, goids_seen): def _traverse_child_objs(self, c2ps, goobj_parent, goids_seen): """Traverse from source GO down children.""" # Update public(godag.go2obj godag.c2ps), private(_seen_pids) parent_id = goobj_parent.id # mark parent as seen goids_seen.add(parent_id) - ##A self.go2obj[parent_id] = goobj_parent # Update goids_seen and go2obj with parent alt_ids for goid_altid in goobj_parent.alt_ids: goids_seen.add(goid_altid) - ##A self.go2obj[goid_altid] = goobj_parent # Loop through children for child_obj in goobj_parent.children: child_id = child_obj.id c2ps[child_id].add(parent_id) # If child has not been seen, traverse if child_id not in goids_seen: - ##F self._traverse_child_objs(c2ps, child_obj, go2obj, goids_seen) self._traverse_child_objs(c2ps, child_obj, goids_seen) @@ -328,14 +316,10 @@ class EdgesPath(EdgesBase): """Inits GO-to-GO edges using a list of (parent destination, child sources)""" def __init__(self, gosubdag, dst_srcs_list): - super(EdgesPath, self).__init__(gosubdag) + super().__init__(gosubdag) self.edges = None self.goid_all = None self._init_edges(dst_srcs_list) - # GO IDs for child->parents - # self.p2cs = self._init_p2cs(go_sources, traverse_parent) - # GO IDs for parent->children - # self.c2ps = self._init_c2ps(go_sources, traverse_child) def get_edges(self): """Get the directed edges from GO term to GO term.""" @@ -343,7 +327,8 @@ def get_edges(self): def _init_edges(self, dst_srcs_list): """Create all GO edges given a list of (dst, srcs).""" - from goatools.gosubdag.go_paths import get_paths_goobjs, paths2edges + from .go_paths import get_paths_goobjs, paths2edges + edges_all = set() goid_all = set() go2obj = self.go2obj @@ -351,10 +336,13 @@ def _init_edges(self, dst_srcs_list): go2obj_srcs = {} for goid in srcs: go2obj_srcs[goid] = go2obj[goid] - go_paths, go_all = get_paths_goobjs(go2obj_srcs.values(), go_top=dst, go2obj=go2obj) + go_paths, go_all = get_paths_goobjs( + go2obj_srcs.values(), go_top=dst, go2obj=go2obj + ) edges_all |= paths2edges(go_paths) goid_all |= go_all self.edges = [(a.id, b.id) for a, b in edges_all] self.goid_all = goid_all + # Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved. diff --git a/goatools/grouper/wr_sections.py b/goatools/grouper/wr_sections.py index 5aa90e80..69d749b4 100644 --- a/goatools/grouper/wr_sections.py +++ b/goatools/grouper/wr_sections.py @@ -1,14 +1,13 @@ """Prints a Python sections file.""" import sys -from goatools.wr_tbl import prt_txt -from goatools.grouper.tasks import SummarySec2dHdrGos +from ..wr_tbl import prt_txt +from .tasks import SummarySec2dHdrGos __copyright__ = "Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved." __author__ = "DV Klopfenstein" - class WrSectionsBase(object): """Tasks for writing a sections file.""" @@ -16,13 +15,15 @@ def __init__(self, grprobj, ver_list=None): self.ver_list = ver_list self.grprobj = grprobj self.gosubdag = grprobj.gosubdag - self.fncsortnt = self._init_fncsortnt(self.gosubdag.prt_attr['flds']) + self.fncsortnt = self._init_fncsortnt(self.gosubdag.prt_attr["flds"]) self.prtfmt = self._init_prtfmt("fmta") def prt_ver(self, prt): """Print version of GO-DAG for the GO and for GO slims.""" if self.ver_list is not None: - prt.write("# Versions:\n# {VER}\n\n".format(VER="\n# ".join(self.ver_list))) + prt.write( + "# Versions:\n# {VER}\n\n".format(VER="\n# ".join(self.ver_list)) + ) def get_sections_2dnt(self, sec2d_go): """Return a sections list containing sorted lists of namedtuples.""" @@ -57,37 +58,50 @@ def get_summary_data(self, sec2d_nt): sections.add(section_name) else: ungrouped.update(set(nt.GO for nt in nts)) - return {'grouped':grouped, 'ungrouped':ungrouped, 'sections':sections} + return {"grouped": grouped, "ungrouped": ungrouped, "sections": sections} def get_summary_str(self, sec2d_nt): """Get string describing counts of placed/unplaced GO IDs and count of sections.""" data = self.get_summary_data(sec2d_nt) return "{M} GO IDs placed into {N} sections; {U} unplaced GO IDs".format( - N=len(data['sections']), M=len(data['grouped']), U=len(data['ungrouped'])) + N=len(data["sections"]), M=len(data["grouped"]), U=len(data["ungrouped"]) + ) @staticmethod def _init_fncsortnt(flds): """Return a sort function for sorting header GO IDs found in sections.""" - if 'tinfo' in flds: - if 'D1' in flds: - return lambda ntgo: [ntgo.NS, -1*ntgo.tinfo, ntgo.depth, ntgo.D1, ntgo.alt] + if "tinfo" in flds: + if "D1" in flds: + return lambda ntgo: [ + ntgo.NS, + -1 * ntgo.tinfo, + ntgo.depth, + ntgo.D1, + ntgo.alt, + ] else: - return lambda ntgo: [ntgo.NS, -1*ntgo.tinfo, ntgo.depth, ntgo.alt] - if 'dcnt' in flds: - if 'D1' in flds: - return lambda ntgo: [ntgo.NS, -1*ntgo.dcnt, ntgo.depth, ntgo.D1, ntgo.alt] + return lambda ntgo: [ntgo.NS, -1 * ntgo.tinfo, ntgo.depth, ntgo.alt] + if "dcnt" in flds: + if "D1" in flds: + return lambda ntgo: [ + ntgo.NS, + -1 * ntgo.dcnt, + ntgo.depth, + ntgo.D1, + ntgo.alt, + ] else: - return lambda ntgo: [ntgo.NS, -1*ntgo.dcnt, ntgo.depth, ntgo.alt] + return lambda ntgo: [ntgo.NS, -1 * ntgo.dcnt, ntgo.depth, ntgo.alt] else: - return lambda ntgo: [ntgo.NS, -1*ntgo.depth, ntgo.alt] + return lambda ntgo: [ntgo.NS, -1 * ntgo.depth, ntgo.alt] class WrSectionsPy(WrSectionsBase): """Holds formatting information for printing sections into a Python file.""" def __init__(self, grprobj, ver_list=None): - super(WrSectionsPy, self).__init__(grprobj, ver_list) - self.prtfmt = self.prtfmt.replace('{GO}', ' "{GO}", ') + super().__init__(grprobj, ver_list) + self.prtfmt = self.prtfmt.replace("{GO}", ' "{GO}", ') def wr_py_sections_new(self, fout_py, doc=None): """Write the first sections file.""" @@ -99,32 +113,41 @@ def wr_py_sections(self, fout_py, sections, doc=None): if sections is None: sections = self.grprobj.get_sections_2d() sec2d_nt = self.get_sections_2dnt(sections) # lists of GO Grouper namedtuples - with open(fout_py, 'w') as prt: + with open(fout_py, "w") as prt: self._prt_py_sections(sec2d_nt, prt, doc) dat = SummarySec2dHdrGos().summarize_sec2hdrgos(sections) - sys.stdout.write(self.grprobj.fmtsum.format( - GO_DESC='hdr', SECs=len(dat['S']), GOs=len(dat['G']), - UNGRP=len(dat['U']), undesc="unused", - ACTION="WROTE:", FILE=fout_py)) + sys.stdout.write( + self.grprobj.fmtsum.format( + GO_DESC="hdr", + SECs=len(dat["S"]), + GOs=len(dat["G"]), + UNGRP=len(dat["U"]), + undesc="unused", + ACTION="WROTE:", + FILE=fout_py, + ) + ) def _prt_py_sections(self, sec2d_nt, prt=sys.stdout, doc=None): """Print sections 2-D list into a Python format list.""" if doc is None: - doc = 'Sections variable' + doc = "Sections variable" prt.write('"""{DOC}"""\n\n'.format(DOC=doc)) self.prt_ver(prt) prt.write("# pylint: disable=line-too-long\n") strcnt = self.get_summary_str(sec2d_nt) prt.write("SECTIONS = [ # {CNTS}\n".format(CNTS=strcnt)) prt.write(' # ("New Section", [\n') - prt.write(' # ]),\n') + prt.write(" # ]),\n") for section_name, nthdrgos in sec2d_nt: self._prt_py_section(prt, section_name, nthdrgos) prt.write("]\n") def _prt_py_section(self, prt, section_name, ntgos): """Print one section and its GO headers.""" - prt.write(' ("{SEC}", [ # {N} GO-headers\n'.format(SEC=section_name, N=len(ntgos))) + prt.write( + ' ("{SEC}", [ # {N} GO-headers\n'.format(SEC=section_name, N=len(ntgos)) + ) self.prt_ntgos(prt, ntgos) prt.write(" ]),\n") @@ -133,7 +156,7 @@ class WrSectionsTxt(WrSectionsBase): """Manages GO group headers and optionally sections containing GO group headers.""" def __init__(self, grprobj, ver_list=None): - super(WrSectionsTxt, self).__init__(grprobj, ver_list) + super().__init__(grprobj, ver_list) @staticmethod def prt_sections(prt, sections, prtfmt, secspc=False): @@ -156,17 +179,22 @@ def prt_info(self, prt, sections=None): for section_name, nts_flat in sections: num_nts = len(nts_flat) num_goids += num_nts - prt.write("{N:3} GO IDs in section({SEC})\n".format(N=num_nts, SEC=section_name)) + prt.write( + "{N:3} GO IDs in section({SEC})\n".format(N=num_nts, SEC=section_name) + ) prt.write("{N:3} GO IDs\n".format(N=num_goids)) def prt_goid_cnt(self, prt=sys.stdout): """Get number of hdrgos and usrgos in each section.""" for section_name, hdrgos_sec in self.grprobj.get_sections_2d(): - prt.write("{NAME} {Us:5,} {Hs:5,} {SEC}\n".format( - NAME=self.grprobj.grpname, - Us=len(self.grprobj.get_usrgos_g_hdrgos(hdrgos_sec)), - Hs=len(hdrgos_sec), - SEC=section_name)) + prt.write( + "{NAME} {Us:5,} {Hs:5,} {SEC}\n".format( + NAME=self.grprobj.grpname, + Us=len(self.grprobj.get_usrgos_g_hdrgos(hdrgos_sec)), + Hs=len(hdrgos_sec), + SEC=section_name, + ) + ) def wr_txt_grouping_gos(self): """Write one file per GO group.""" @@ -174,18 +202,19 @@ def wr_txt_grouping_gos(self): for hdrgo, usrgos in self.grprobj.hdrgo2usrgos.items(): keygos = usrgos.union([hdrgo]) fout_txt = "{BASE}.txt".format(BASE=self.grprobj.get_fout_base(hdrgo)) - with open(fout_txt, 'w') as prt: + with open(fout_txt, "w") as prt: prt_goids(keygos, prt=prt) - sys.stdout.write(" {N:5,} GO IDs WROTE: {TXT}\n".format( - N=len(keygos), TXT=fout_txt)) + sys.stdout.write( + " {N:5,} GO IDs WROTE: {TXT}\n".format(N=len(keygos), TXT=fout_txt) + ) def wr_txt_section_hdrgos(self, fout_txt, sortby=None, prt_section=True): """Write high GO IDs that are actually used to group current set of GO IDs.""" - sec2d_go = self.grprobj.get_sections_2d() # lists of GO IDs + sec2d_go = self.grprobj.get_sections_2d() # lists of GO IDs sec2d_nt = self.get_sections_2dnt(sec2d_go) # lists of GO Grouper namedtuples if sortby is None: sortby = self.fncsortnt - with open(fout_txt, 'w') as prt: + with open(fout_txt, "w") as prt: self.prt_ver(prt) prt.write("# GROUP NAME: {NAME}\n".format(NAME=self.grprobj.grpname)) for section_name, nthdrgos_actual in sec2d_nt: @@ -195,10 +224,17 @@ def wr_txt_section_hdrgos(self, fout_txt, sortby=None, prt_section=True): if prt_section: prt.write("\n") dat = SummarySec2dHdrGos().summarize_sec2hdrgos(sec2d_go) - sys.stdout.write(self.grprobj.fmtsum.format( - GO_DESC='hdr', SECs=len(dat['S']), GOs=len(dat['G']), - UNGRP=len(dat['U']), undesc="unused", - ACTION="WROTE:", FILE=fout_txt)) + sys.stdout.write( + self.grprobj.fmtsum.format( + GO_DESC="hdr", + SECs=len(dat["S"]), + GOs=len(dat["G"]), + UNGRP=len(dat["U"]), + undesc="unused", + ACTION="WROTE:", + FILE=fout_txt, + ) + ) return sec2d_nt diff --git a/goatools/obo_parser.py b/goatools/obo_parser.py index 8a7eb68e..8991adae 100755 --- a/goatools/obo_parser.py +++ b/goatools/obo_parser.py @@ -11,9 +11,8 @@ from sys import stderr, stdout from typing import Optional -from goatools.godag.obo_optional_attributes import OboOptionalAttrs -from goatools.godag.typedef import TypeDef -from goatools.godag.typedef import add_to_typedef +from .godag.obo_optional_attributes import OboOptionalAttrs +from .godag.typedef import TypeDef, add_to_typedef GraphEngines = ("pygraphviz", "pydot") @@ -313,7 +312,7 @@ def __init__( load_obsolete: bool = False, prt=stdout, ): - super(GODag, self).__init__() + super().__init__() self.version, self.data_version = self.load_obo_file( obo_file, optional_attrs, load_obsolete, prt ) diff --git a/goatools/pvalcalc.py b/goatools/pvalcalc.py index 3391b9b9..22981ab7 100755 --- a/goatools/pvalcalc.py +++ b/goatools/pvalcalc.py @@ -1,13 +1,14 @@ """Options for calculating uncorrected p-values.""" -from __future__ import print_function - -__copyright__ = "Copyright (C) 2016-2018, DV Klopfenstein, H Tang et al., All rights reserved." +__copyright__ = ( + "Copyright (C) 2016-2018, DV Klopfenstein, H Tang et al., All rights reserved." +) __author__ = "DV Klopfenstein" import collections as cx import sys + class PvalCalcBase(object): """Base class for initial p-value calculations.""" @@ -19,19 +20,22 @@ def __init__(self, name, pval_fnc, log): def calc_pvalue(self, study_count, study_n, pop_count, pop_n): """pvalues are calculated in derived classes.""" fnc_call = "calc_pvalue({SCNT}, {STOT}, {PCNT} {PTOT})".format( - SCNT=study_count, STOT=study_n, PCNT=pop_count, PTOT=pop_n) - raise Exception("NOT IMPLEMENTED: {FNC_CALL} using {FNC}.".format( - FNC_CALL=fnc_call, FNC=self.pval_fnc)) + SCNT=study_count, STOT=study_n, PCNT=pop_count, PTOT=pop_n + ) + raise NotImplementedError(f"NOT IMPLEMENTED: {fnc_call} using {self.pval_fnc}.") class FisherScipyStats(PvalCalcBase): """From the scipy stats package, use function, fisher_exact.""" - fmterr = "STUDY={A}/{B} POP={C}/{D} scnt({scnt}) stot({stot}) pcnt({pcnt}) ptot({ptot})" + fmterr = ( + "STUDY={A}/{B} POP={C}/{D} scnt({scnt}) stot({stot}) pcnt({pcnt}) ptot({ptot})" + ) def __init__(self, name, log): from scipy import stats - super(FisherScipyStats, self).__init__(name, stats.fisher_exact, log) + + super().__init__(name, stats.fisher_exact, log) def calc_pvalue(self, study_count, study_n, pop_count, pop_n): """Calculate uncorrected p-values.""" @@ -53,7 +57,15 @@ def calc_pvalue(self, study_count, study_n, pop_count, pop_n): cvar = pop_count - study_count dvar = pop_n - pop_count - bvar assert cvar >= 0, self.fmterr.format( - A=avar, B=bvar, C=cvar, D=dvar, scnt=study_count, stot=study_n, pcnt=pop_count, ptot=pop_n) + A=avar, + B=bvar, + C=cvar, + D=dvar, + scnt=study_count, + stot=study_n, + pcnt=pop_count, + ptot=pop_n, + ) # stats.fisher_exact returns oddsratio, pval_uncorrected _, p_uncorrected = self.pval_fnc([[avar, bvar], [cvar, dvar]]) return p_uncorrected @@ -62,13 +74,17 @@ def calc_pvalue(self, study_count, study_n, pop_count, pop_n): class FisherFactory(object): """Factory for choosing a fisher function.""" - options = cx.OrderedDict([ - ('fisher_scipy_stats', FisherScipyStats), - ]) + options = cx.OrderedDict( + [ + ("fisher_scipy_stats", FisherScipyStats), + ] + ) def __init__(self, **kws): - self.log = kws['log'] if 'log' in kws else sys.stdout - self.pval_fnc_name = kws["pvalcalc"] if "pvalcalc" in kws else "fisher_scipy_stats" + self.log = kws["log"] if "log" in kws else sys.stdout + self.pval_fnc_name = ( + kws["pvalcalc"] if "pvalcalc" in kws else "fisher_scipy_stats" + ) self.pval_obj = self._init_pval_obj() def _init_pval_obj(self): @@ -76,7 +92,9 @@ def _init_pval_obj(self): if self.pval_fnc_name in self.options.keys(): return self.options[self.pval_fnc_name](self.pval_fnc_name, self.log) - raise Exception("PVALUE FUNCTION({FNC}) NOT FOUND".format(FNC=self.pval_fnc_name)) + raise Exception( + "PVALUE FUNCTION({FNC}) NOT FOUND".format(FNC=self.pval_fnc_name) + ) def __str__(self): return " ".join(self.options.keys())