Skip to content

Commit

Permalink
renamed
Browse files Browse the repository at this point in the history
  • Loading branch information
haeussma committed Mar 9, 2024
1 parent 49345df commit e4ac754
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 162 deletions.
130 changes: 65 additions & 65 deletions pyeed/parsers/abstractparser.py → pyeed/fetchers/abstractfetcher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import re
import logging
import secrets
import logging.config
from pathlib import Path
from abc import ABC, abstractmethod
Expand All @@ -19,113 +19,138 @@

path_config = Path(__file__).parent.parent.parent / "logging.conf"
logging.config.fileConfig(path_config)
logger = logging.getLogger("pyeed")
LOGGER = logging.getLogger("pyeed")


class DataParser(ABC):
def __init__(self, source: Any):
self.source = source
class AbstractFetcher(ABC):
def __init__(self, foreign_id: str):
super().__init__()
self.foreign_id = foreign_id

@abstractmethod
def fetch_entry(self):
def get(self):
pass

@abstractmethod
def parse_organism(self):
def map(self, handle: Any, cls):
pass

@abstractmethod
def map(self):
pass
@staticmethod
def get_substitute_email() -> str:
return f"{secrets.token_hex(8)}@gmail.com"

@staticmethod
def make_chunks(input_list: list, chunk_size: int = 100) -> List[list]:
"""
Splits a list into chunks of a given size.
"""
if input_list is None:
raise ValueError("input_list cannot be None.")

class NCBIParser(DataParser):
if not isinstance(input_list, list):
raise TypeError("input_list must be a list")

return [
input_list[i : i + chunk_size]
for i in range(0, len(input_list), chunk_size)
]


class NCBIProteinParser(AbstractFetcher):

def map(self, cls: "ProteinInfo"):

protein_info = cls(source_id=self.source.id, sequence=str(self.source.seq))

protein_info.organism = self.parse_organism()
protein_info.organism = self.map_organism()
protein_info = self.map_protein(protein_info)
protein_info = self.map_regions(protein_info)
protein_info = self.map_sites(protein_info)
protein_info = self.map_cds(protein_info)

return protein_info

def parse_organism(self) -> Organism:
def map_organism(self) -> Organism:
"""
Gets the organism name and taxonomy ID from the source data.
Maps it to an Organism object.
"""

feature = self.get_feature("source")
if len(feature) != 1:
logger.debug(
LOGGER.debug(
f"Multiple features ({len(feature)}) of type `source` found for {self.source.id}: {feature}"
)
feature = feature[0]

try:
taxonomy_id = feature.qualifiers["db_xref"]
if len(feature.qualifiers["db_xref"]) != 1:
LOGGER.info(
f"For {self.source.id} {feature.qualifiers['db_xref']} taxonomy ID(s) were found, using the first one. Skipping organism assignment"
)
return None

taxonomy_id = feature.qualifiers["db_xref"][0]

if ":" in taxonomy_id:
taxonomy_id = int(taxonomy_id.split(":")[1])

except KeyError:
logger.debug(
f"No taxonomy ID found for {self.source.id}: {feature[0].qualifiers}"
)
taxonomy_id = None
LOGGER.debug(f"No taxonomy ID found for {self.source.id}: {feature}")
return None

try:
organism_name = feature.qualifiers["organism"]
except KeyError:
logger.debug(
LOGGER.debug(
f"No organism name found for {self.source.id}: {feature[0].qualifiers}"
)
organism_name = None

return Organism(name=organism_name[0], taxonomy_id=taxonomy_id[0])
return Organism(name=organism_name[0], taxonomy_id=taxonomy_id)

def map_protein(self, protein_info: ProteinInfo):

protein = self.get_feature("Protein")
if len(protein) == 0:
logger.debug(
LOGGER.debug(
f"No protein feature found for {self.source.id}: {self.source.features}"
)

return protein_info

if len(protein) > 1:
logger.debug(
LOGGER.debug(
f"Multiple features ({len(protein)}) of type `Protein` found for {self.source.id}"
)

protein = protein[0]
try:
protein_info.name = protein.qualifiers["product"][0]
except KeyError:
logger.debug(
LOGGER.debug(
f"No protein name found for {self.source.id}: {protein.qualifiers}"
)
try:
protein_info.name = protein.qualifiers["name"][0]
except KeyError:
logger.debug(
LOGGER.debug(
f"No protein name found for {self.source.id}: {protein.qualifiers}"
)
protein_info.name = None

try:
protein_info.mol_weight = protein.qualifiers["calculated_mol_wt"][0]
except KeyError:
logger.debug(
LOGGER.debug(
f"No molecular weight found for {self.source.id}: {protein.qualifiers}"
)
protein_info.mol_weight = None

try:
protein_info.ec_number = protein.qualifiers["EC_number"][0]
except KeyError:
logger.debug(
LOGGER.debug(
f"No EC number found for {self.source.id}: {protein.qualifiers}"
)
protein_info.ec_number = None
Expand All @@ -151,7 +176,7 @@ def map_regions(self, protein_info: ProteinInfo):
)
)
except KeyError:
logger.debug(
LOGGER.debug(
f"Incomplete region data found for {self.source.id}: {region.qualifiers}, skipping region"
)

Expand All @@ -171,7 +196,7 @@ def map_sites(self, protein_info: ProteinInfo):
cross_ref=site.qualifiers["db_xref"][0],
)
except KeyError:
logger.warning(
LOGGER.warning(
f"Incomplete site data found for {self.source.id}: {site.qualifiers}, skipping site"
)

Expand All @@ -181,14 +206,14 @@ def map_cds(self, protein_info: ProteinInfo):

cds = self.get_feature("CDS")
if len(cds) > 1:
logger.info(
LOGGER.info(
f"Multiple features ({len(cds)}) of type `CDS` found for {self.source.id}"
)

try:
cds = cds[0]
except IndexError:
logger.debug(f"No CDS found for {self.source.id}: {cds}")
LOGGER.debug(f"No CDS found for {self.source.id}: {cds}")

return protein_info

Expand All @@ -197,7 +222,7 @@ def map_cds(self, protein_info: ProteinInfo):
cds.qualifiers["coded_by"][0]
)
except IndexError:
logger.debug(
LOGGER.debug(
f"No coding sequence reference found for {self.source.id}: {cds.qualifiers}"
)

Expand All @@ -220,7 +245,7 @@ def get_cds_regions(coded_by: dict) -> List[DNARegion]:
if not all(
[reference_id == reference_ids[0] for reference_id in reference_ids]
):
logger.warning(
LOGGER.warning(
"Nucleotide sequence references are not identical: {reference_ids}"
)

Expand All @@ -238,14 +263,6 @@ def get_cds_regions(coded_by: dict) -> List[DNARegion]:

return region

def fetch_entry(self, identifier: str):
# Implementation for fetching data from NCBI
logger.debug(f"Fetching NCBI data for {identifier}")

def parse_data(self, data):
# Implementation for parsing NCBI data
logger.debug("Parsing NCBI data")

def get_feature(self, feature_type: str) -> "Bio.SeqFeature.SeqFeature":
return [
feature
Expand All @@ -254,34 +271,17 @@ def get_feature(self, feature_type: str) -> "Bio.SeqFeature.SeqFeature":
]


class UniProtParser(DataParser):

def parse_organism():
pass

def fetch_entry(self, identifier: str):
# Implementation for fetching data from UniProt
pass
class UniProtParser(AbstractFetcher):

def parse_data(self, data):
# Implementation for parsing UniProt data
def map():
pass


class ParserFactory:
@staticmethod
def get_parser(source: str) -> DataParser:
parsers = {"NCBI": NCBIParser(), "UniProt": UniProtParser()}
parser = parsers.get(source.upper())
if not parser:
raise ValueError(f"Parser for {source} not found.")
return parser


if __name__ == "__main__":
from pyeed.ncbi.seq_io import get_ncbi_entry
from pyeed.ncbi.seq_io import get_ncbi_entry, get_ncbi_taxonomy
from pyeed.core import Organism

entry = get_ncbi_entry("7P82_A", "protein")
entry = get_ncbi_taxonomy("311400")

parser = NCBIParser(entry)
print(parser.map(ProteinInfo))
parser = NCBITaxonomyParser(entry[0])
print(parser.map(Organism))
98 changes: 98 additions & 0 deletions pyeed/fetchers/ncbitaxonomy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from typing import Generator, List
from Bio import Entrez
from tqdm import tqdm

from pyeed.core.organism import Organism
from pyeed.fetchers.abstractfetcher import AbstractFetcher
from pyeed.fetchers.abstractfetcher import LOGGER


class NCBITaxonomyParser(AbstractFetcher):

def __init__(self, foreign_id: List[str], email: str = None, api_key: str = None):
super().__init__(foreign_id)
self.api_key = api_key
if email is None:
self.email = self.get_substitute_email()

def get(self):

if isinstance(self.foreign_id, list):
return list(self.get_multiple_ids())
else:
return list(self.get_single_id())

def make_request(self, request_string: str) -> Generator:
Entrez.email = self.email
Entrez.api_key = self.api_key

with Entrez.efetch(
db="taxonomy",
id=request_string,
retmode="xml",
api_key=self.api_key,
) as handle:
yield handle

def get_single_id(self):
handle = next(self.make_request(self.foreign_id))
return Entrez.read(handle)

def get_multiple_ids(self):
with tqdm(
total=len(self.foreign_id),
desc="⬇️ Fetching taxonomy data",
) as pbar:

for chunk in self.make_chunks(self.foreign_id):
request_string = ",".join(chunk)

for handle in self.make_request(request_string):

pbar.update(1)
yield Entrez.read(handle)

def map(self, cls: "Organism"):

tax_id = self.source.get("TaxId")
organism = cls(taxonomy_id=tax_id)

organism.name = self.source.get("ScientificName")
organism.species = self.source.get("ScientificName")

lineage = self.source.get("LineageEx")

if not lineage:
LOGGER.debug(f"No lineage found for {tax_id}: {self.source}")
return organism

for tax_rank in lineage:
if tax_rank.get("Rank") == "superkingdom":
organism.domain = tax_rank.get("ScientificName")
elif tax_rank.get("Rank") == "phylum":
organism.phylum = tax_rank.get("ScientificName")
elif tax_rank.get("Rank") == "class":
organism.tax_class = tax_rank.get("ScientificName")
elif tax_rank.get("Rank") == "order":
organism.order = tax_rank.get("ScientificName")
elif tax_rank.get("Rank") == "family":
organism.family = tax_rank.get("ScientificName")
elif tax_rank.get("Rank") == "genus":
organism.genus = tax_rank.get("ScientificName")
elif tax_rank.get("Rank") == "species":
organism.species = tax_rank.get("ScientificName")
elif tax_rank.get("Rank") == "kingdom":
organism.kingdom = tax_rank.get("ScientificName")
else:
continue

return organism


if __name__ == "__main__":
single_tax_id = "9606"
multiple_tax_ids = ["9606"]

# print(NCBITaxonomyParser(single_tax_id).get())

print(NCBITaxonomyParser(multiple_tax_ids).get())
Empty file removed pyeed/parsers/__init__.py
Empty file.
Loading

0 comments on commit e4ac754

Please sign in to comment.