Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Robaina committed Sep 17, 2023
1 parent a9c6d82 commit 359fae1
Show file tree
Hide file tree
Showing 3 changed files with 338 additions and 107 deletions.
340 changes: 264 additions & 76 deletions src/pynteny/hmm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,25 @@

import logging
import os
import shutil
import sys
from typing import Callable
from collections import defaultdict
from pathlib import Path
import tempfile

import pandas as pd
from Bio import SearchIO

import pynteny.wrappers as wrappers
from pynteny.utils import extract_tar_file, flatten_directory, is_tar_file, list_tar_dir
from pynteny.utils import (
extract_tar_file,
flatten_directory,
is_tar_file,
list_tar_dir,
download_file,
extract_gz_file,
split_hmms,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -120,78 +129,56 @@ def get_HMMER_tables(
return hmm_hits


class PGAP:
"""Tools to parse PGAP hmm database metadata"""
class HMMDatabase:
"""Base class for HMM databases"""

def __init__(self, meta_file: Path):
"""Initialize class PGAP
def __init__(
self,
database_directory: Path,
metadata_file: Path,
metadata_columns: list[str] = None,
) -> None:
"""Initialize class HMMDatabase
Args:
meta_file (Path): path to PGAP's metadata file.
database_directory (Path): path to directory containing HMM database
with individual files for each HMM.
metadata_file (Path): path to metadata file.
metadata_columns (list[str], optional): list of metadata columns to be
used. Defaults to None (all columns).
"""
meta_file = Path(meta_file)
meta = pd.read_csv(
str(meta_file),
sep="\t",
usecols=[
"#ncbi_accession",
"gene_symbol",
"label",
"product_name",
"ec_numbers",
],
self._data_dir = Path(database_directory)
self._metadata_file = Path(metadata_file)
self._meta = pd.read_csv(
self._metadata_file, sep="\t", usecols=metadata_columns
)
# meta = meta[
# ["#ncbi_accession", "gene_symbol", "label", "product_name", "ec_numbers"]
# ]
self._meta = meta
self._meta_file = meta_file

@staticmethod
def extract_PGAP_to_directory(pgap_tar: Path, output_dir: Path) -> None:
"""Extract PGAP hmm database (tar.gz) to given directory
@property
def meta(self) -> pd.DataFrame:
"""Return PFAM metadata
Args:
pgap_tar (Path): path to compressed PGAP database.
output_dir (Path): path to output directory.
Returns:
pd.DataFrame: PFAM metadata
"""
pgap_tar = Path(pgap_tar)
if not is_tar_file(pgap_tar):
logger.warning(f"{pgap_tar} is not a tar file. Skipping extraction")
sys.exit(1)
logger.info("Extracting hmm files to target directory")
output_dir = Path(output_dir)
if output_dir.exists():
shutil.rmtree(output_dir)
extract_tar_file(tar_file=pgap_tar, dest_dir=output_dir)
flatten_directory(output_dir)

def remove_missing_HMMs_from_metadata(
self, hmm_dir: Path, output_file: Path = None
) -> None:
"""Remove HMMs from metadata that are not in HMM directory
return self._meta

Args:
hmm_dir (Path): path to directory containing PGAP database.
output_file (Path, optional): path to output file. Defaults to None.
@property
def meta_file(self) -> Path:
"""Return path to metadata file
Returns:
Path: metadata file
"""
hmm_dir = Path(hmm_dir)
if output_file is None:
output_file = self._meta.parent / f"{self._meta.stem}_missing_hmms.tsv"
else:
output_file = Path(output_file)
if is_tar_file(hmm_dir):
hmm_file_names = [
Path(hmm_file).stem.strip() for hmm_file in list_tar_dir(hmm_dir)
]
else:
hmm_file_names = [hmm_file.stem.strip() for hmm_file in hmm_dir.iterdir()]
not_found = set()
for i, row in self._meta.iterrows():
if row["#ncbi_accession"].strip() not in hmm_file_names:
not_found.add(i)
self._meta = self._meta.drop(not_found)
self._meta.to_csv(output_file, sep="\t", index=False)
return self._metadata_file

@property
def data_dir(self) -> Path:
"""Return data directory
Returns:
Path: data directory
"""
return self._data_dir

def get_HMM_names_by_gene_symbol(self, gene_symbol: str) -> list[str]:
"""Try to retrieve HMM by its gene symbol, more
Expand All @@ -204,14 +191,9 @@ def get_HMM_names_by_gene_symbol(self, gene_symbol: str) -> list[str]:
list[str]: list of HMM names matching gene symbol.
"""
meta = self._meta # .dropna(subset=["gene_symbol", "label"], axis=0)
return meta[
(
(meta.gene_symbol == gene_symbol)
|
# (meta.label.str.contains(gene_id))
(meta.label == gene_symbol)
)
]["#ncbi_accession"].values.tolist()
return meta[((meta.gene_symbol == gene_symbol) | (meta.label == gene_symbol))][
"accession"
].values.tolist()

def get_HMM_group_for_gene_symbol(self, gene_symbol: str) -> str:
"""Get HMMs corresponding to gene symbol in PGAP metadata.
Expand Down Expand Up @@ -241,8 +223,8 @@ def get_HMM_gene_ID(self, hmm_name: str) -> list[str]:
Returns:
list[str]: list of gene symbols matching given HMM.
"""
meta = self._meta.dropna(subset=["#ncbi_accession"], axis=0)
return meta[meta["#ncbi_accession"] == hmm_name]["gene_symbol"].values.tolist()
meta = self._meta.dropna(subset=["accession"], axis=0)
return meta[meta["accession"] == hmm_name]["gene_symbol"].values.tolist()

def get_meta_info_for_HMM(self, hmm_name: str) -> dict:
"""Get meta info for given hmm.
Expand All @@ -253,13 +235,219 @@ def get_meta_info_for_HMM(self, hmm_name: str) -> dict:
Returns:
dict: metadata of provided HMM.
"""
meta = self._meta.dropna(subset=["#ncbi_accession"], axis=0).applymap(
meta = self._meta.dropna(subset=["accession"], axis=0).applymap(
lambda x: x if not pd.isna(x) else ""
)
metadata = {
k: list(v.values())[0] if list(v.values())[0] else "undef"
for k, v in meta[meta["#ncbi_accession"] == hmm_name].to_dict().items()
for k, v in meta[meta["accession"] == hmm_name].to_dict().items()
}
if not metadata:
logger.warning(f"No metadata for HMM: {hmm_name}")
return metadata


class PGAP(HMMDatabase):
"""Tools to parse PGAP hmm database metadata"""

def __init__(self):
"""Initialize class PGAP"""
super().__init__()
self._meta = self._meta.rename(columns={"#ncbi_accession": "accession"})
self._meta = self.remove_missing_HMMs_from_metadata(meta_outfile=None)

def remove_missing_HMMs_from_metadata(self, meta_outfile: Path = None) -> None:
"""Remove HMMs from metadata that are not in HMM directory
Args:
hmm_dir (Path): path to directory containing PGAP database.
meta_outfile (Path, optional): path to output file. Defaults to None.
"""
logger.info("Removing missing HMMs from PGAP metadata")
hmm_dir = self._database_dir
if meta_outfile is None:
meta_outfile = (
self._metadata_file.parent
/ f"{self._metadata_file.stem}_preprocessed.tsv"
)
else:
meta_outfile = Path(meta_outfile)
if is_tar_file(hmm_dir):
hmm_file_names = [
Path(hmm_file).stem.strip() for hmm_file in list_tar_dir(hmm_dir)
]
else:
hmm_file_names = [hmm_file.stem.strip() for hmm_file in hmm_dir.iterdir()]
not_found = set()
for i, row in self._meta.iterrows():
if row["accession"].strip() not in hmm_file_names:
not_found.add(i)
self._meta = self._meta.drop(not_found)
self._metadata_file = meta_outfile
self._meta.to_csv(meta_outfile, sep="\t", index=False)


class PFAM(HMMDatabase):
"""Tools to preprocess the PFAM-A hmm database"""

def __init__(self):
"""Initialize class PFAM"""
super().__init__()

@classmethod
def from_gz_file(
cls, hmm_gz_file: Path, hmm_outdir: Path = None, meta_outfile: Path = None
) -> PFAM:
"""Initialize PFAM class from hmm file
Args:
hmm_gz_file (Path): path to hmm gz file.
hmm_outdir (Path): path to output directory to store individual HMM files.
meta_outfile (Path, optional): path to metadata output file. Defaults to None.
"""
hmm_gz_file = Path(hmm_gz_file)
if hmm_outdir is not None:
hmm_outdir = Path(hmm_outdir)
else:
hmm_outdir = hmm_gz_file.parent / "pfam_hmms"
if meta_outfile is not None:
meta_outfile = Path(meta_outfile)
else:
meta_outfile = hmm_outdir / f"{hmm_gz_file.stem}_meta.tsv"
with tempfile.NamedTemporaryFile() as temp:
extract_gz_file(hmm_gz_file, temp.name)
split_hmms(temp.name, hmm_outdir)
cls.construct_meta_file(hmm_outdir, meta_outfile)
return cls(hmm_outdir, meta_outfile)

def construct_meta_file(self, meta_outfile: Path = None) -> None:
"""Construct metadata file from individual HMM files.
Args:
meta_outfile (Path): path to metadata file.
"""
logger.info("Constructing metadata file for PFAM-A database")
if meta_outfile is None:
meta_outfile = self._database_dir / "PFAM_meta.tsv"
else:
meta_outfile = Path(meta_outfile)
hmm_meta_lines = ["accession\tgene_symbol\tdescription\tlength\n"]
for hmm_file in self._database_dir.glob("*.hmm"):
with open(hmm_file, "r") as f:
hmm_text = f.read()

acc_code = [
entry for entry in hmm_text.split("\n") if entry.startswith("ACC")
]
name = [entry for entry in hmm_text.split("\n") if entry.startswith("NAME")]
description = [
entry for entry in hmm_text.split("\n") if entry.startswith("DESC")
]
length = [
entry for entry in hmm_text.split("\n") if entry.startswith("LENG")
]
name = name[0].split()[1] if name else "Unspecified"
description = description[0].split()[1] if description else "Unspecified"
length = length[0].split()[1] if length else "Unspecified"

if acc_code:
acc_name = acc_code[0].split()[1]
hmm_meta_lines.append(f"{acc_name}\t{name}\t{description}\t{length}\n")
with open(meta_outfile, "w+") as f:
f.writelines(hmm_meta_lines)
self._metadata_file = meta_outfile
self._meta = pd.read_csv(meta_outfile, sep="\t")


class Downloader:
"""Tools to download and preprocess HMM databases"""

def __init__(self, download_dir: Path):
"""Initialize class Downloader
Args:
output_dir (Path): path to output directory.
"""
self._download_dir = Path(download_dir)
if self._download_dir.exists():
logger.warning(
f"{self._download_dir} already exists. Downloader may overwrite files."
)

def download_pgap(self, unpack: bool = False) -> None:
"""Download PGAP database
Args:
unpack (bool, optional): if True then PGAP database will be extracted
"""

data_url = "https://ftp.ncbi.nlm.nih.gov/hmm/current/hmm_PGAP.HMM.tgz"
meta_url = "https://ftp.ncbi.nlm.nih.gov/hmm/current/hmm_PGAP.tsv"
logger.info("Downloading PGAP database")
try:
PGAP_file = self._download_dir / "hmm_PGAP.HMM.tgz"
meta_file = self._download_dir / "hmm_PGAP.tsv"
download_file(data_url, PGAP_file)
download_file(meta_url, meta_file)
except Exception:
logger.exception(
"Failed to download PGAP database. Please check your internet connection."
)
sys.exit(1)
if unpack:
self.extract_pgap_to_directory(PGAP_file)
logger.info("Database downloaded successfully\n")

def download_pfam(self, unpack: bool = False) -> None:
"""Download PFAM database
Args:
unpack (bool, optional): if True then PFAM database will be extracted
"""
pfam_file = self.download_dir / "Pfam-A.gz"
# hmm_outdir = self._output_dir.parent / "pfam_hmms"
# meta_outfile = hmm_outdir / f"{pfam_file.stem}_meta.tsv"
logger.info("Downloading PFAM-A hmm database")
try:
url = (
"https://ftp.ebi.ac.uk/pub/databases/Pfam/current_release/Pfam-A.hmm.gz"
)
download_file(url, pfam_file)
except Exception:
logger.exception(
"Failed to download PFAM-A database. Please check your internet connection."
)
sys.exit(1)
if unpack:
self.extract_pfam_to_directory(pfam_file)
logger.info("Database downloaded successfully")

def extract_pgap_to_directory(self, pgap_tar: Path) -> None:
"""Extract PGAP hmm database (tar.gz) to downlaod directory
Args:
pgap_tar (Path): path to compressed PGAP database.
"""
pgap_tar = Path(pgap_tar)
if not is_tar_file(pgap_tar):
logger.warning(f"{pgap_tar} is not a tar file. Skipping extraction")
return
logger.info("Extracting hmm files to target directory")
extract_tar_file(pgap_tar, self._download_dir)
flatten_directory(self._download_dir)
logger.info("PGAP database unpacked successfully")

def extract_pfam_to_directory(self, pfam_gz: Path) -> None:
"""Extract PFAM hmm database (gz) to downlaod directory
Args:
pfam_gz (Path): path to compressed PFAM database.
"""
pfam_gz = Path(pfam_gz)
if not pfam_gz.is_file():
logger.warning(f"{pfam_gz} is not a file. Skipping extraction")
return
logger.info("Extracting hmm files to target directory")
extract_gz_file(pfam_gz, self._download_dir)
flatten_directory(self.download_dir)
logger.info("PGAP database unpacked successfully")
Loading

0 comments on commit 359fae1

Please sign in to comment.