Skip to content

Commit

Permalink
WIP #86 ADD liftover, FIX commit
Browse files Browse the repository at this point in the history
  • Loading branch information
eboileau committed May 21, 2024
1 parent 085edab commit 725d12b
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 93 deletions.
44 changes: 19 additions & 25 deletions server/src/scimodom/api/management.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
from smtplib import SMTPException

from flask import Blueprint, request, jsonify
from flask import Blueprint, request
from flask_cors import cross_origin
from flask_jwt_extended import jwt_required

from scimodom.database.database import get_session
from scimodom.services.assembly import LiftOverError
from scimodom.services.data import (
DataService,
InstantiationError,
Expand Down Expand Up @@ -41,29 +42,19 @@ def create_project_request():
uuid = ProjectService.create_project_request(project_form)
except FileNotFoundError as exc:
logger.error(f"Failed to save the project submission form: {exc}")
return (
jsonify(
{
"message": "Failed to save the project submission form. Contact the administrator."
}
),
500,
)
return {
"message": "Failed to save the project submission form. Contact the system administrator."
}, 500

mail_service = get_mail_service()
try:
mail_service.send_project_request_notification(uuid)
except SMTPException as exc:
logger.error(f"Failed to send out notification email: {exc}")
return (
jsonify(
{
"message": f"Project form successfully submitted, but failed to send out notification email. Contact the administrator with this ID: {uuid}."
}
),
500,
)
return jsonify({"message": "OK"}), 200
return {
"message": f"Project form successfully submitted, but failed to send out notification email. Contact the system administrator with this ID: {uuid}."
}, 500
return {"message": "OK"}, 200


@management_api.route("/dataset", methods=["POST"])
Expand Down Expand Up @@ -97,21 +88,23 @@ def add_dataset():
except InstantiationError as exc:
logger.error(f"{exc}. The request was: {dataset_form}.")
return {
"message": "Invalid selection. Try again or contact the administrator."
"message": "Invalid selection. Try again or contact the system administrator."
}, 422
except Exception as exc:
logger.error(f"{exc}. The request was: {dataset_form}.")
return {"message": "Failed to create dataset. Contact the administrator."}, 500
return {
"message": "Failed to create dataset. Contact the system administrator."
}, 500

try:
data_service.create_dataset()
except DatasetHeaderError:
return {
"message": 'File upload failed. The file header does not match the value you entered for organism and/or assembly. Click "Cancel". Modify the form or the file header and start again.'
"message": 'File upload failed. Mismatch for organism and/or assembly between the file header and the selected values. Click "Cancel". Modify the form or the file and start again.'
}, 422
except DatasetExistsError as exc:
return {
"message": f"File upload failed. {str(exc).replace('Aborting transaction!', '')} If you are unsure about what happened, click \"Cancel\" and contact the administrator."
"message": f"File upload failed. {str(exc).replace('Aborting transaction!', '')} If you are unsure about what happened, click \"Cancel\" and contact the system administrator."
}, 422
except EOFError as exc:
return {"message": f"File upload failed. File {str(exc)} is empty!"}, 500
Expand All @@ -120,12 +113,13 @@ def add_dataset():
"message": f"File upload failed. The header is not conform to bedRMod specifications: {str(exc)}"
}, 500
except MissingDataError:
return {"message": "File upload failed. Too many skipped records."}, 500
except LiftOverError:
return {
"message": "File upload failed. Too many skipped records. Consult the bedRMod documentation."
"message": "Liftover failed. Check your data, or contact the system administrator."
}, 500
# liftover errors
except Exception as exc:
logger.error(exc)
return {"message": "File upload failed. Contact the administrator."}, 500
return {"message": "File upload failed. Contact the system administrator."}, 500

return {"result": "Ok"}, 200
8 changes: 4 additions & 4 deletions server/src/scimodom/services/annotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from scimodom.services.assembly import AssemblyService
from scimodom.services.importer import get_buffer
from scimodom.services.importer.base import MissingDataError
from scimodom.utils.operations import (
write_annotation_to_bed,
get_annotation_records,
Expand Down Expand Up @@ -206,9 +207,8 @@ def annotate_data(self, eufid: str) -> None:
records = self._session.execute(query).all()

if len(records) == 0:
msg = f"No records found for {eufid}... "
logger.warning(msg)
return
msg = f"[Annotation] No records found for {eufid}... "
raise MissingDataError(msg)

msg = f"Annotating records for EUFID {eufid}..."
logger.debug(msg)
Expand All @@ -224,7 +224,7 @@ def annotate_data(self, eufid: str) -> None:
for record in typed_annotated_records:
buffer.buffer_data(record)
buffer.flush()
self._session.commit()
self._session.flush()

def update_gene_cache(self, selection_ids: list[int]) -> None:
"""Update gene cache.
Expand Down
55 changes: 52 additions & 3 deletions server/src/scimodom/services/assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ class AssemblyVersionError(Exception):
pass


class LiftOverError(Exception):
"""Exception for handling too many
unmapped records during liftover."""

pass


class AssemblyService:
"""Utility class to manage assemblies.
Expand Down Expand Up @@ -206,6 +213,25 @@ def get_chrom_path(organism: str, assembly: str) -> tuple[Path, str]:
parent = Path(path, organism, assembly)
return parent, AssemblyService.CHROM_FILE

@staticmethod
def get_seqids(organism: str, assembly: str) -> list[str]:
"""Returns the chromosomes for a given assembly
as a list. Relies on get_chrom_path, as such
assembly must also match the current assembly.
:param organism: Organism name
:type organism: str
:param assembly: Assembly name
:type assembly: str
:returns: Chromosomes
:rtype: list of str
"""
parent, filen = AssemblyService.get_chrom_path(organism, assembly)
chrom_file = Path(parent, filen)
with open(chrom_file, "r") as f:
lines = f.readlines()
return [l.split()[0] for l in lines]

def get_chain_path(self) -> tuple[Path, str]:
"""Construct file path (chain file) for organism.
Only to (not from) current version.
Expand Down Expand Up @@ -297,18 +323,41 @@ def create_new(self):
with open(Path(parent, "release.json"), "w") as f:
json.dump(release, f, indent="\t")

def liftover(self, records: list[tuple[Any, ...]]) -> str:
def liftover(self, records: list[tuple[Any, ...]], threshold: float = 0.3) -> str:
"""Liftover records to current assembly.
Unmapped features are discarded.
:param records: Records to be lifted over
:type records: List of tuple of (str, ...) - Data records
:returns: File pointing to the liftedOver features
:param threshold: Threshold for raising LiftOverError
:type threshold: float
:returns: Files pointing to the liftedOver features
:rtype: str
"""
parent, filen = self.get_chain_path()
chain_file = Path(parent, filen).as_posix()
return liftover_to_file(records, chain_file)
lifted_file, unmapped_file = liftover_to_file(records, chain_file)

def _count_lines(reader):
b = reader(1024 * 1024)
while b:
yield b
b = reader(1024 * 1024)

with open(unmapped_file, "rb") as fh:
unmapped_lines = sum(
line.count(b"\n") for line in _count_lines(fh.raw.read)
)
failed_liftover = unmapped_lines / len(records) > threshold
if failed_liftover:
raise LiftOverError
msg = (
f"{unmapped_lines} records could not be mapped and were discarded... "
"Contact the system administrator if you have questions."
)
logger.warning(msg)

return lifted_file

def _get_current_name(self) -> str:
"""Get current assembly name. This methods
Expand Down
98 changes: 60 additions & 38 deletions server/src/scimodom/services/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,17 +269,16 @@ def create_dataset(self) -> None:
"""Dataset constructor."""
is_liftover: bool = False

# make sure we do not already have a dataset with
# the same combination of SMID, selection, and title
# test for duplicate dataset
self._validate_entry()

# instantiate AssemblyService
query = queries.query_column_where(
Assembly, ["name", "taxa_id"], filters={"id": self._assembly_id}
)
assembly_name, taxa_id = self._session.execute(query).all()[0]
query = queries.query_column_where(Taxa, "name", filters={"id": taxa_id})
organism_name = self._session.execute(query).scalar_one()
(
assembly_name,
current_assembly_name,
taxa_id,
organism_name,
) = self._query_missing_from_assembly()
try:
assembly_service = AssemblyService.from_id(
self._session, assembly_id=self._assembly_id
Expand All @@ -296,24 +295,13 @@ def create_dataset(self) -> None:
raise AssemblyVersionError(msg)
is_liftover = True
finally:
query = queries.get_assembly_version()
version = self._session.execute(query).scalar_one()
query = queries.query_column_where(
Assembly, "name", filters={"taxa_id": taxa_id, "version": version}
)
current_assembly_name = self._session.execute(query).scalar_one()
parent, filen = assembly_service.get_chrom_path(
organism_name, current_assembly_name
)
chrom_file = Path(parent, filen)
with open(chrom_file, "r") as f:
lines = f.readlines()
seqids = [l.split()[0] for l in lines]
seqids = assembly_service.get_seqids(organism_name, current_assembly_name)

# create EUFID
self._create_eufid()

# import
checkpoint = self._session.begin_nested()
try:
try:
filen = self._filen.as_posix() # type: ignore
Expand All @@ -325,12 +313,11 @@ def create_dataset(self) -> None:
eufid=self._eufid,
title=self._title,
)
checkpoint = importer.header.checkpoint
importer.header.parse_header()
# compare input and header
self.validate_imported("organism", taxa_id, importer.header.taxid)
self.validate_imported("assembly", assembly_name, importer.header.assembly)
importer.header.close(no_commit=True)
importer.header.close() # flush
# add association = (EUFID, selection)
# update self._association dict
self._add_association() # flush
Expand All @@ -339,16 +326,15 @@ def create_dataset(self) -> None:
association=self._association, seqids=seqids, no_flush=is_liftover
)
importer.data.parse_records()
importer.data.close(raise_missing=True) # commit unless...
importer.data.close(raise_missing=True) # flush
except:
checkpoint.rollback()
raise
else:
if is_liftover:
msg = f"Lifting over dataset from {assembly_name} to {current_assembly_name}..."
logger.debug(msg)
logger.info(msg)

# ... data has not been written to database yet
records = importer.data.get_buffer()
# https://github.com/dieterich-lab/scimodom/issues/76
# overwrite name with association, remove asociation, add association back after liftover
Expand All @@ -362,24 +348,35 @@ def create_dataset(self) -> None:
)
for record in records
]
filen = assembly_service.liftover(records)
importer.reset_data_importer(filen)
importer.data.parse_records()
# raise missing?
importer.data.close()
try:
lifted_file = assembly_service.liftover(records)
importer.reset_data_importer(lifted_file)
importer.data.parse_records()
importer.data.close() # flush
except:
checkpoint.rollback()
raise

msg = "Annotating data now..."
logger.debug(msg)

# annotate newly imported data...
annotation_service = AnnotationService(session=self._session, taxa_id=taxa_id)
try:
annotation_service.annotate_data(self._eufid)
self._session.commit()
except:
checkpoint.rollback()
raise
# ... update cache
annotation_service.update_gene_cache(self._selection_id)

msg = (
f"Added dataset {self._eufid} to project {self._smid} with title = {self._title}, "
f"and the following associations: {', '.join([f'{k}:{v}' for k, v in self._association.items()])}. "
"Annotating data now..."
)
logger.debug(msg)

# annotate newly imported data, update cache
annotation_service = AnnotationService(session=self._session, taxa_id=taxa_id)
annotation_service.annotate_data(self._eufid)
annotation_service.update_gene_cache(self._selection_id)

def get_eufid(self) -> str:
"""Return newly created EUFID.
Expand Down Expand Up @@ -504,7 +501,10 @@ def _set_selection(self) -> None:

def _validate_entry(self) -> None:
"""Tentatively check if dataset already exists using
SMID, title, and selection."""
SMID, title, and selection.
Raises DatasetExistsError
"""
for selection_id in self._selection_id:
query = (
select(func.distinct(Dataset.id))
Expand Down Expand Up @@ -573,3 +573,25 @@ def _technology_id_to_tech(self, idx: int) -> str:
"""
query = select(DetectionTechnology.tech).where(DetectionTechnology.id == idx)
return self._session.execute(query).scalar_one()

def _query_missing_from_assembly(self) -> tuple[str, str, int, str]:
"""Retrieve assembly-related information.
:returns: assembly name for instance and database, taxa ID and
organism name
:rtype: tuuple of (str, str, int, str)
"""
query = queries.query_column_where(
Assembly, ["name", "taxa_id"], filters={"id": self._assembly_id}
)
assembly_name, taxa_id = self._session.execute(query).one()
query = queries.query_column_where(Taxa, "name", filters={"id": taxa_id})
organism_name = self._session.execute(query).scalar_one()

query = queries.get_assembly_version()
version = self._session.execute(query).scalar_one()
query = queries.query_column_where(
Assembly, "name", filters={"taxa_id": taxa_id, "version": version}
)
current_assembly_name = self._session.execute(query).scalar_one()
return assembly_name, current_assembly_name, taxa_id, organism_name
Loading

0 comments on commit 725d12b

Please sign in to comment.