diff --git a/server/src/scimodom/api/management.py b/server/src/scimodom/api/management.py index 2c66abac..071c2e07 100644 --- a/server/src/scimodom/api/management.py +++ b/server/src/scimodom/api/management.py @@ -1,11 +1,12 @@ import logging from smtplib import SMTPException -from flask import Blueprint, request, jsonify +from flask import Blueprint, request from flask_cors import cross_origin from flask_jwt_extended import jwt_required from scimodom.database.database import get_session +from scimodom.services.assembly import LiftOverError from scimodom.services.data import ( DataService, InstantiationError, @@ -41,29 +42,19 @@ def create_project_request(): uuid = ProjectService.create_project_request(project_form) except FileNotFoundError as exc: logger.error(f"Failed to save the project submission form: {exc}") - return ( - jsonify( - { - "message": "Failed to save the project submission form. Contact the administrator." - } - ), - 500, - ) + return { + "message": "Failed to save the project submission form. Contact the system administrator." + }, 500 mail_service = get_mail_service() try: mail_service.send_project_request_notification(uuid) except SMTPException as exc: logger.error(f"Failed to send out notification email: {exc}") - return ( - jsonify( - { - "message": f"Project form successfully submitted, but failed to send out notification email. Contact the administrator with this ID: {uuid}." - } - ), - 500, - ) - return jsonify({"message": "OK"}), 200 + return { + "message": f"Project form successfully submitted, but failed to send out notification email. Contact the system administrator with this ID: {uuid}." + }, 500 + return {"message": "OK"}, 200 @management_api.route("/dataset", methods=["POST"]) @@ -97,21 +88,23 @@ def add_dataset(): except InstantiationError as exc: logger.error(f"{exc}. The request was: {dataset_form}.") return { - "message": "Invalid selection. Try again or contact the administrator." + "message": "Invalid selection. Try again or contact the system administrator." }, 422 except Exception as exc: logger.error(f"{exc}. The request was: {dataset_form}.") - return {"message": "Failed to create dataset. Contact the administrator."}, 500 + return { + "message": "Failed to create dataset. Contact the system administrator." + }, 500 try: data_service.create_dataset() except DatasetHeaderError: return { - "message": 'File upload failed. The file header does not match the value you entered for organism and/or assembly. Click "Cancel". Modify the form or the file header and start again.' + "message": 'File upload failed. Mismatch for organism and/or assembly between the file header and the selected values. Click "Cancel". Modify the form or the file and start again.' }, 422 except DatasetExistsError as exc: return { - "message": f"File upload failed. {str(exc).replace('Aborting transaction!', '')} If you are unsure about what happened, click \"Cancel\" and contact the administrator." + "message": f"File upload failed. {str(exc).replace('Aborting transaction!', '')} If you are unsure about what happened, click \"Cancel\" and contact the system administrator." }, 422 except EOFError as exc: return {"message": f"File upload failed. File {str(exc)} is empty!"}, 500 @@ -120,12 +113,13 @@ def add_dataset(): "message": f"File upload failed. The header is not conform to bedRMod specifications: {str(exc)}" }, 500 except MissingDataError: + return {"message": "File upload failed. Too many skipped records."}, 500 + except LiftOverError: return { - "message": "File upload failed. Too many skipped records. Consult the bedRMod documentation." + "message": "Liftover failed. Check your data, or contact the system administrator." }, 500 - # liftover errors except Exception as exc: logger.error(exc) - return {"message": "File upload failed. Contact the administrator."}, 500 + return {"message": "File upload failed. Contact the system administrator."}, 500 return {"result": "Ok"}, 200 diff --git a/server/src/scimodom/services/annotation.py b/server/src/scimodom/services/annotation.py index 62195e63..0078b738 100644 --- a/server/src/scimodom/services/annotation.py +++ b/server/src/scimodom/services/annotation.py @@ -21,6 +21,7 @@ ) from scimodom.services.assembly import AssemblyService from scimodom.services.importer import get_buffer +from scimodom.services.importer.base import MissingDataError from scimodom.utils.operations import ( write_annotation_to_bed, get_annotation_records, @@ -206,9 +207,8 @@ def annotate_data(self, eufid: str) -> None: records = self._session.execute(query).all() if len(records) == 0: - msg = f"No records found for {eufid}... " - logger.warning(msg) - return + msg = f"[Annotation] No records found for {eufid}... " + raise MissingDataError(msg) msg = f"Annotating records for EUFID {eufid}..." logger.debug(msg) @@ -224,7 +224,7 @@ def annotate_data(self, eufid: str) -> None: for record in typed_annotated_records: buffer.buffer_data(record) buffer.flush() - self._session.commit() + self._session.flush() def update_gene_cache(self, selection_ids: list[int]) -> None: """Update gene cache. diff --git a/server/src/scimodom/services/assembly.py b/server/src/scimodom/services/assembly.py index 4851be2b..107d622c 100644 --- a/server/src/scimodom/services/assembly.py +++ b/server/src/scimodom/services/assembly.py @@ -24,6 +24,13 @@ class AssemblyVersionError(Exception): pass +class LiftOverError(Exception): + """Exception for handling too many + unmapped records during liftover.""" + + pass + + class AssemblyService: """Utility class to manage assemblies. @@ -206,6 +213,25 @@ def get_chrom_path(organism: str, assembly: str) -> tuple[Path, str]: parent = Path(path, organism, assembly) return parent, AssemblyService.CHROM_FILE + @staticmethod + def get_seqids(organism: str, assembly: str) -> list[str]: + """Returns the chromosomes for a given assembly + as a list. Relies on get_chrom_path, as such + assembly must also match the current assembly. + + :param organism: Organism name + :type organism: str + :param assembly: Assembly name + :type assembly: str + :returns: Chromosomes + :rtype: list of str + """ + parent, filen = AssemblyService.get_chrom_path(organism, assembly) + chrom_file = Path(parent, filen) + with open(chrom_file, "r") as f: + lines = f.readlines() + return [l.split()[0] for l in lines] + def get_chain_path(self) -> tuple[Path, str]: """Construct file path (chain file) for organism. Only to (not from) current version. @@ -297,18 +323,41 @@ def create_new(self): with open(Path(parent, "release.json"), "w") as f: json.dump(release, f, indent="\t") - def liftover(self, records: list[tuple[Any, ...]]) -> str: + def liftover(self, records: list[tuple[Any, ...]], threshold: float = 0.3) -> str: """Liftover records to current assembly. Unmapped features are discarded. :param records: Records to be lifted over :type records: List of tuple of (str, ...) - Data records - :returns: File pointing to the liftedOver features + :param threshold: Threshold for raising LiftOverError + :type threshold: float + :returns: Files pointing to the liftedOver features :rtype: str """ parent, filen = self.get_chain_path() chain_file = Path(parent, filen).as_posix() - return liftover_to_file(records, chain_file) + lifted_file, unmapped_file = liftover_to_file(records, chain_file) + + def _count_lines(reader): + b = reader(1024 * 1024) + while b: + yield b + b = reader(1024 * 1024) + + with open(unmapped_file, "rb") as fh: + unmapped_lines = sum( + line.count(b"\n") for line in _count_lines(fh.raw.read) + ) + failed_liftover = unmapped_lines / len(records) > threshold + if failed_liftover: + raise LiftOverError + msg = ( + f"{unmapped_lines} records could not be mapped and were discarded... " + "Contact the system administrator if you have questions." + ) + logger.warning(msg) + + return lifted_file def _get_current_name(self) -> str: """Get current assembly name. This methods diff --git a/server/src/scimodom/services/data.py b/server/src/scimodom/services/data.py index 839ed126..0f0860e4 100644 --- a/server/src/scimodom/services/data.py +++ b/server/src/scimodom/services/data.py @@ -269,17 +269,16 @@ def create_dataset(self) -> None: """Dataset constructor.""" is_liftover: bool = False - # make sure we do not already have a dataset with - # the same combination of SMID, selection, and title + # test for duplicate dataset self._validate_entry() # instantiate AssemblyService - query = queries.query_column_where( - Assembly, ["name", "taxa_id"], filters={"id": self._assembly_id} - ) - assembly_name, taxa_id = self._session.execute(query).all()[0] - query = queries.query_column_where(Taxa, "name", filters={"id": taxa_id}) - organism_name = self._session.execute(query).scalar_one() + ( + assembly_name, + current_assembly_name, + taxa_id, + organism_name, + ) = self._query_missing_from_assembly() try: assembly_service = AssemblyService.from_id( self._session, assembly_id=self._assembly_id @@ -296,24 +295,13 @@ def create_dataset(self) -> None: raise AssemblyVersionError(msg) is_liftover = True finally: - query = queries.get_assembly_version() - version = self._session.execute(query).scalar_one() - query = queries.query_column_where( - Assembly, "name", filters={"taxa_id": taxa_id, "version": version} - ) - current_assembly_name = self._session.execute(query).scalar_one() - parent, filen = assembly_service.get_chrom_path( - organism_name, current_assembly_name - ) - chrom_file = Path(parent, filen) - with open(chrom_file, "r") as f: - lines = f.readlines() - seqids = [l.split()[0] for l in lines] + seqids = assembly_service.get_seqids(organism_name, current_assembly_name) # create EUFID self._create_eufid() # import + checkpoint = self._session.begin_nested() try: try: filen = self._filen.as_posix() # type: ignore @@ -325,12 +313,11 @@ def create_dataset(self) -> None: eufid=self._eufid, title=self._title, ) - checkpoint = importer.header.checkpoint importer.header.parse_header() # compare input and header self.validate_imported("organism", taxa_id, importer.header.taxid) self.validate_imported("assembly", assembly_name, importer.header.assembly) - importer.header.close(no_commit=True) + importer.header.close() # flush # add association = (EUFID, selection) # update self._association dict self._add_association() # flush @@ -339,16 +326,15 @@ def create_dataset(self) -> None: association=self._association, seqids=seqids, no_flush=is_liftover ) importer.data.parse_records() - importer.data.close(raise_missing=True) # commit unless... + importer.data.close(raise_missing=True) # flush except: checkpoint.rollback() raise else: if is_liftover: msg = f"Lifting over dataset from {assembly_name} to {current_assembly_name}..." - logger.debug(msg) + logger.info(msg) - # ... data has not been written to database yet records = importer.data.get_buffer() # https://github.com/dieterich-lab/scimodom/issues/76 # overwrite name with association, remove asociation, add association back after liftover @@ -362,24 +348,35 @@ def create_dataset(self) -> None: ) for record in records ] - filen = assembly_service.liftover(records) - importer.reset_data_importer(filen) - importer.data.parse_records() - # raise missing? - importer.data.close() + try: + lifted_file = assembly_service.liftover(records) + importer.reset_data_importer(lifted_file) + importer.data.parse_records() + importer.data.close() # flush + except: + checkpoint.rollback() + raise + + msg = "Annotating data now..." + logger.debug(msg) + + # annotate newly imported data... + annotation_service = AnnotationService(session=self._session, taxa_id=taxa_id) + try: + annotation_service.annotate_data(self._eufid) + self._session.commit() + except: + checkpoint.rollback() + raise + # ... update cache + annotation_service.update_gene_cache(self._selection_id) msg = ( f"Added dataset {self._eufid} to project {self._smid} with title = {self._title}, " f"and the following associations: {', '.join([f'{k}:{v}' for k, v in self._association.items()])}. " - "Annotating data now..." ) logger.debug(msg) - # annotate newly imported data, update cache - annotation_service = AnnotationService(session=self._session, taxa_id=taxa_id) - annotation_service.annotate_data(self._eufid) - annotation_service.update_gene_cache(self._selection_id) - def get_eufid(self) -> str: """Return newly created EUFID. @@ -504,7 +501,10 @@ def _set_selection(self) -> None: def _validate_entry(self) -> None: """Tentatively check if dataset already exists using - SMID, title, and selection.""" + SMID, title, and selection. + + Raises DatasetExistsError + """ for selection_id in self._selection_id: query = ( select(func.distinct(Dataset.id)) @@ -573,3 +573,25 @@ def _technology_id_to_tech(self, idx: int) -> str: """ query = select(DetectionTechnology.tech).where(DetectionTechnology.id == idx) return self._session.execute(query).scalar_one() + + def _query_missing_from_assembly(self) -> tuple[str, str, int, str]: + """Retrieve assembly-related information. + + :returns: assembly name for instance and database, taxa ID and + organism name + :rtype: tuuple of (str, str, int, str) + """ + query = queries.query_column_where( + Assembly, ["name", "taxa_id"], filters={"id": self._assembly_id} + ) + assembly_name, taxa_id = self._session.execute(query).one() + query = queries.query_column_where(Taxa, "name", filters={"id": taxa_id}) + organism_name = self._session.execute(query).scalar_one() + + query = queries.get_assembly_version() + version = self._session.execute(query).scalar_one() + query = queries.query_column_where( + Assembly, "name", filters={"taxa_id": taxa_id, "version": version} + ) + current_assembly_name = self._session.execute(query).scalar_one() + return assembly_name, current_assembly_name, taxa_id, organism_name diff --git a/server/src/scimodom/services/importer/base.py b/server/src/scimodom/services/importer/base.py index 1cbbd7a2..a1bbe95d 100644 --- a/server/src/scimodom/services/importer/base.py +++ b/server/src/scimodom/services/importer/base.py @@ -161,11 +161,15 @@ def parse_records(self) -> None: self._numrows += 1 self._read_line(line) - def close(self, raise_missing: bool = False, threshold: float = 0.01) -> None: + def close( + self, force: bool = False, raise_missing: bool = False, threshold: float = 0.05 + ) -> None: """Close handle. Unless no_flush, flush buffer, and commit. Optionally raise a MissingDataError. + :param force: Force commit + :type force: bool :param raise_missing: Raise error if too many missing records :type raise_missing: bool @@ -174,16 +178,19 @@ def close(self, raise_missing: bool = False, threshold: float = 0.01) -> None: """ self._handle.close() - if raise_missing: - skipped = self._numrows - self._validrows - small = True if self._numrows < 100 and skipped > 1 else False - large = skipped / self._numrows > threshold - if small or large: - raise MissingDataError + skipped = self._numrows - self._validrows + absolute = int(threshold * 100) + small = True if self._numrows <= 100 and skipped > absolute else False + large = skipped / self._numrows > threshold + if (small or large) and raise_missing: + raise MissingDataError if not self._no_flush: self._buffer.flush() - self._session.commit() + if force: + self._session.commit() + else: + self._session.flush() def get_buffer(self) -> list[dict[str, Any]]: """Return buffer with records. diff --git a/server/src/scimodom/services/importer/header.py b/server/src/scimodom/services/importer/header.py index 752b0d29..b35c521f 100644 --- a/server/src/scimodom/services/importer/header.py +++ b/server/src/scimodom/services/importer/header.py @@ -55,7 +55,6 @@ def __init__( self._eufid = eufid self._title = title - self.checkpoint = self._session.begin_nested() self._model = Dataset self._sep: str = self.SPECS["header"]["delimiter"] self._tag: str = self.SPECS["header"]["comment"] @@ -79,18 +78,18 @@ def parse_header(self): self._parse_lines() self._validate_columns() - def close(self, no_commit: bool = False) -> None: + def close(self, force: bool = False) -> None: """Close handle, insert, and flush or commit. - :param no_commit: Flush instead of commit - :type no_commit: bool + :param force: Force commit + :type force: bool """ self._handle.close() self._session.execute(insert(self._model), self._header) - if no_commit: - self._session.flush() - else: + if force: self._session.commit() + else: + self._session.flush() def _cast_types(self) -> None: """Cast column types for input.""" diff --git a/server/src/scimodom/utils/operations.py b/server/src/scimodom/utils/operations.py index 75cbcec0..7ba92ce4 100644 --- a/server/src/scimodom/utils/operations.py +++ b/server/src/scimodom/utils/operations.py @@ -221,7 +221,7 @@ def liftover_to_file( chain_file: str, unmapped: str | None = None, chrom_id: str = "s", -) -> str: +) -> tuple[str, str]: """Liftover records. Handles conversion to BedTool, but not from, of the liftedOver features. A file is returned pointing to the liftedOver features. The unmapped ones are saved as @@ -235,8 +235,8 @@ def liftover_to_file( :type unmapped: str or None :param chrom_id: The style of chromosome IDs (default s). :type chrom_id: str - :returns: File with liftedOver features - :rtype: str + :returns: Files with liftedOver and unmapped features + :rtype: tuple of (str, str) """ bedtool = to_bedtool(records) result = pybedtools.BedTool._tmp() @@ -253,7 +253,7 @@ def liftover_to_file( msg = f"Process failed with {exc.stderr}" raise Exception(msg) from exc # except subprocess.TimeoutExpired as exc: - return result + return result, unmapped def _remove_filno(feature, n_fields: int = 9, is_closest: bool = False): diff --git a/server/tests/unit/services/test_header_importer.py b/server/tests/unit/services/test_header_importer.py index 3f321da3..cf642ae9 100644 --- a/server/tests/unit/services/test_header_importer.py +++ b/server/tests/unit/services/test_header_importer.py @@ -256,7 +256,7 @@ def test_importer(Session, EUF_specs): title="Title", ) importer.parse_header() - importer.close() + importer.close(force=True) with Session() as session, session.begin(): records = session.execute(select(Dataset)).scalar() assert records.id == "123456789ABC" diff --git a/server/tests/unit/services/test_importer.py b/server/tests/unit/services/test_importer.py index 0ecc96b4..ef06a827 100644 --- a/server/tests/unit/services/test_importer.py +++ b/server/tests/unit/services/test_importer.py @@ -18,6 +18,7 @@ [(True), (False)], ) def test_importer(close_handle, Session, data_path): + checkpoint = Session().begin_nested() try: importer = get_importer( filen=Path(data_path.LOC, "test.bed").as_posix(), @@ -25,7 +26,6 @@ def test_importer(close_handle, Session, data_path): eufid="123456789ABC", title="title", ) - checkpoint = importer.header.checkpoint importer.header.parse_header() if close_handle: importer.header.close() @@ -72,6 +72,7 @@ def test_importer(close_handle, Session, data_path): def test_importer_header_fail(Session, data_path): + checkpoint = Session().begin_nested() try: importer = get_importer( filen=Path(data_path.LOC, "test_header_fail.bed").as_posix(), @@ -79,7 +80,6 @@ def test_importer_header_fail(Session, data_path): eufid="123456789ABC", title="title", ) - checkpoint = importer.header.checkpoint importer.header.parse_header() importer.header.close() importer.init_data_importer(association={"m6A": 1}, seqids=["1"]) @@ -109,6 +109,7 @@ def test_importer_data_fail(Session, data_path): # All "bad" records are discared, e.g. out of range # values, non-numerical values where a numerical value # is expected, etc. + checkpoint = Session().begin_nested() try: importer = get_importer( filen=Path(data_path.LOC, "test_data_fail.bed").as_posix(), @@ -116,7 +117,6 @@ def test_importer_data_fail(Session, data_path): eufid="123456789ABC", title="title", ) - checkpoint = importer.header.checkpoint importer.header.parse_header() importer.header.close() importer.init_data_importer(association={"m6A": 1}, seqids=["1"])