Skip to content

Commit

Permalink
WIP #86, up to liftover, FIX rollback, test undone
Browse files Browse the repository at this point in the history
  • Loading branch information
eboileau committed May 17, 2024
1 parent 43f8e53 commit 085edab
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 33 deletions.
22 changes: 10 additions & 12 deletions server/src/scimodom/api/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
DatasetExistsError,
DatasetHeaderError,
)
from scimodom.services.importer.base import MissingDataError
from scimodom.services.importer.header import SpecsError
from scimodom.services.project import ProjectService
from scimodom.services.mail import get_mail_service
Expand Down Expand Up @@ -116,18 +117,15 @@ def add_dataset():
return {"message": f"File upload failed. File {str(exc)} is empty!"}, 500
except SpecsError as exc:
return {
"message": f"File upload failed. File is not conform to bedRMod specifications: {str(exc)}"
"message": f"File upload failed. The header is not conform to bedRMod specifications: {str(exc)}"
}, 500

except MissingDataError:
return {
"message": "File upload failed. Too many skipped records. Consult the bedRMod documentation."
}, 500
# liftover errors
except Exception as exc:
# TODO ...
logger.error(f"Failed to create dataset: {exc}")
return (
jsonify({"result": "Failed to upload dataset. Contact the administrator."}),
500,
)

# TODO
# WARNING scimodom.services.annotation.annotate_data.193 | No records found for Kr6uj7QzWfLJ...
logger.error(exc)
return {"message": "File upload failed. Contact the administrator."}, 500

return jsonify({"result": "Ok"}), 200
return {"result": "Ok"}, 200
18 changes: 5 additions & 13 deletions server/src/scimodom/services/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,14 +325,12 @@ def create_dataset(self) -> None:
eufid=self._eufid,
title=self._title,
)
# TODO if importer fails, then checkpoint does not exists...
checkpoint = importer.header.checkpoint
importer.header.parse_header()
# compare input vs. values read from file header
# for organism (taxa ID) and assembly
# compare input and header
self.validate_imported("organism", taxa_id, importer.header.taxid)
self.validate_imported("assembly", assembly_name, importer.header.assembly)
importer.header.close() # commit
importer.header.close(no_commit=True)
# add association = (EUFID, selection)
# update self._association dict
self._add_association() # flush
Expand All @@ -341,11 +339,11 @@ def create_dataset(self) -> None:
association=self._association, seqids=seqids, no_flush=is_liftover
)
importer.data.parse_records()
importer.data.close(raise_missing=True) # commit unless...
except:
checkpoint.rollback()
raise
else:
importer.data.close() # commit unless...
if is_liftover:
msg = f"Lifting over dataset from {assembly_name} to {current_assembly_name}..."
logger.debug(msg)
Expand All @@ -367,6 +365,7 @@ def create_dataset(self) -> None:
filen = assembly_service.liftover(records)
importer.reset_data_importer(filen)
importer.data.parse_records()
# raise missing?
importer.data.close()

msg = (
Expand Down Expand Up @@ -492,14 +491,7 @@ def _set_selection(self) -> None:
"Aborting transaction!"
)
raise SelectionExistsError(msg) from exc
query = (
select(
Modomics.short_name,
)
.join_from(Modification, Modomics, Modification.inst_modomics)
.where(Modification.id == modification_id)
)
name = self._session.execute(query).scalar_one()
name = self._modification_id_to_name(modification_id)
self._association[name] = selection_id[-1]
# this cannot actually happen...
if len(set(selection_id)) != len(selection_id):
Expand Down
31 changes: 29 additions & 2 deletions server/src/scimodom/services/importer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ class MissingHeaderError(Exception):
pass


class MissingDataError(Exception):
"""Exception handling for too many skipped rows."""

pass


class BaseImporter(ABC):
"""Abstract base class for an importer. Reads data from
file rowwise, buffer records, and perform bulk inserts.
Expand Down Expand Up @@ -118,6 +124,8 @@ def __init__(
self._buffer: BaseImporter._Buffer
self._dtypes: dict[str, dict[str, Any]] = dict()
self._lino: int = skiprows
self._numrows: int = 0
self._validrows: int = 0
if comment is not None and len(comment) > 1:
raise ValueError(
f"Maximum length of 1 expected, got {len(comment)} for comment."
Expand Down Expand Up @@ -150,11 +158,29 @@ def parse_records(self) -> None:
for line in itertools.islice(self._handle, self._skiprows, None):
self._lino += 1
if self._comment is not None and not line.strip().startswith(self._comment):
self._numrows += 1
self._read_line(line)

def close(self) -> None:
"""Close handle, flush buffer, commit."""
def close(self, raise_missing: bool = False, threshold: float = 0.01) -> None:
"""Close handle. Unless no_flush,
flush buffer, and commit. Optionally
raise a MissingDataError.
:param raise_missing: Raise error if too
many missing records
:type raise_missing: bool
:param threshold: Threshold for raising error
:type threshold: float
"""
self._handle.close()

if raise_missing:
skipped = self._numrows - self._validrows
small = True if self._numrows < 100 and skipped > 1 else False
large = skipped / self._numrows > threshold
if small or large:
raise MissingDataError

if not self._no_flush:
self._buffer.flush()
self._session.commit()
Expand Down Expand Up @@ -225,6 +251,7 @@ def _read_line(self, line: str) -> None:
try:
validated = self._validate(values)
records = self.parse_record(validated)
self._validrows += 1
self._buffer.buffer_data(records)
except ValueError as error:
msg = f"Skipping: Failed to parse {self._filen} at row {self._lino}: {str(error)}"
Expand Down
2 changes: 1 addition & 1 deletion server/src/scimodom/services/importer/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def parse_record(self, record: dict[str, str]) -> dict[str, Any]:
raise ValueError(f"Value {itype}: {crecord[itype]} out of range.")
if crecord["chrom"] not in self._seqids:
raise ValueError(
f"Unrecognized chrom: {crecord['chrom']}. Ignore this warning"
f"Unrecognized chrom: {crecord['chrom']}. Ignore this warning "
"for scaffolds and contigs, otherwise this could be due to misformatting!"
)
if crecord["strand"] not in self._constraints["strand"]:
Expand Down
13 changes: 10 additions & 3 deletions server/src/scimodom/services/importer/header.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,18 @@ def parse_header(self):
self._parse_lines()
self._validate_columns()

def close(self) -> None:
"""Close handle, insert, and commit."""
def close(self, no_commit: bool = False) -> None:
"""Close handle, insert, and flush or commit.
:param no_commit: Flush instead of commit
:type no_commit: bool
"""
self._handle.close()
self._session.execute(insert(self._model), self._header)
self._session.commit()
if no_commit:
self._session.flush()
else:
self._session.commit()

def _cast_types(self) -> None:
"""Cast column types for input."""
Expand Down
28 changes: 26 additions & 2 deletions server/tests/unit/services/test_data_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from sqlalchemy import func, select

from scimodom.database.models import Data
from scimodom.services.importer.base import BaseImporter, MissingHeaderError
from scimodom.services.importer.base import (
BaseImporter,
MissingHeaderError,
MissingDataError,
)
from scimodom.services.importer.data import EUFDataImporter


Expand Down Expand Up @@ -89,7 +93,7 @@ def _get_data_with_header(fmt):
("minimum", "Value start: -1 out of range."),
(
"chrom",
"Unrecognized chrom: A. Ignore this warningfor scaffolds and contigs, otherwise this could be due to misformatting!",
"Unrecognized chrom: A. Ignore this warning for scaffolds and contigs, otherwise this could be due to misformatting!",
),
("strand", "Unrecognized strand: ."),
("maximum", "Value score: 200 out of range."),
Expand Down Expand Up @@ -276,3 +280,23 @@ def parse_record(record):
importer = TestBaseImporter()
assert str(exc.value) == "Maximum length of 1 expected, got 2 for comment."
assert exc.type == ValueError


def test_importer_missing_data(Session, EUF_specs):
# pass the raise_missing argument to close

format, version, specs = EUF_specs
handle = _get_data(EUF_specs)
importer = EUFDataImporter(
session=Session(),
filen="filen",
handle=handle,
association={"m6A": 1},
seqids=["1"],
specs_ver=version,
)
# warnings are emitted, this is expected, use caplog to assert them...
importer.parse_records()
with pytest.raises(MissingDataError) as exc:
importer.close(raise_missing=True)
assert exc.type == MissingDataError

0 comments on commit 085edab

Please sign in to comment.