Skip to content

Commit

Permalink
fix: allele normalization + pin pydantic version
Browse files Browse the repository at this point in the history
  • Loading branch information
korikuzma committed Aug 24, 2023
1 parent 875bbf5 commit c8f5419
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 628 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ install_requires =
jsonschema>=4.17.3
numpy
pyyaml
pydantic>=2.0.3
pydantic == 2.1.1
setup_requires =
cython
pytest-runner
Expand Down
152 changes: 131 additions & 21 deletions src/ga4gh/vrs/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,171 @@
See https://vrs.ga4gh.org/en/stable/impl-guide/normalization.html
"""

import logging
from enum import IntEnum
from typing import NamedTuple, Optional, Union

from bioutils.normalize import normalize as _normalize, NormalizationMode
from ga4gh.core import is_pydantic_instance, ga4gh_digest, pydantic_copy

from ._internal import models
from .dataproxy import SequenceProxy

_logger = logging.getLogger(__name__)

class PosType(IntEnum):
"""Define the kind of position on a location"""

def _normalize_allele(input_allele, data_proxy):
INTEGER = 0
RANGE_LT_OR_EQUAL = 1
RANGE_GT_OR_EQUAL = 2


class LocationPos(NamedTuple):
"""Define Allele location pos value and type"""

value: int
pos_type: PosType


def _get_allele_location_pos(
allele_vo: models.Allele, use_start: bool = True
) -> Optional[LocationPos]:
"""Get Allele location start or end value for interval
:param allele_vo: VRS Allele object
:param use_start: `True` if using `allele_vo.location.start`. `False` if using
`allele_vo.location.end`
:return: A `LocationPos` if using integer or indefinite range. Otherwise return
`None`
"""
Converts .location.sequence into an IRI if it is a SequenceReference because it makes the code simpler.
If it started as a sequence reference, put it back as one at the end.
if use_start:
pos = allele_vo.location.start
else:
pos = allele_vo.location.end

if isinstance(pos, int):
val = pos
pos_type = PosType.INTEGER
else:
pos0_is_none = pos.root[0] is None
pos1_is_none = pos.root[1] is None

if not pos0_is_none and not pos1_is_none: # definite range
return None

val = pos.root[0] or pos.root[1]
if pos0_is_none:
pos_type = PosType.RANGE_LT_OR_EQUAL
else:
pos_type = PosType.RANGE_GT_OR_EQUAL

return LocationPos(value=val, pos_type=pos_type)


def _get_new_allele_location_pos(
new_pos_val: int, pos_type: PosType
) -> Union[int, models.Range]:
"""Get updated location pos on normalized allele
:param new_pos_val: New position after normalization
:param pos_type: Original position type used in pre-normalized VRS Allele object
:return: Updated position as integer or VRS Range object
"""
if pos_type == PosType.INTEGER:
val = new_pos_val
elif pos_type == PosType.RANGE_LT_OR_EQUAL:
val = models.Range([None, new_pos_val])
else:
val = models.Range([new_pos_val, None])
return val


def _normalize_allele(input_allele, data_proxy):
"""Normalize Allele using "fully-justified" normalization adapted from NCBI's
VOCA. Fully-justified normalization expands such ambiguous representation over the
entire region of ambiguity, resulting in an unambiguous representation that may be
readily compared with other alleles.
Does not attempt to normalize Allele's with definite ranges. Will return the
`input_allele`
"""
allele = pydantic_copy(input_allele)

# Temporarily convert SequenceReference to IRI because it makes the code simpler.
# This will be changed back to SequenceReference at the end of the method
sequence_reference = None
if isinstance(allele.location.sequence, models.SequenceReference):
sequence_reference = allele.location.sequence
allele.location.sequence = models.IRI(sequence_reference.refgetAccession)

sequence = SequenceProxy(data_proxy, allele.location.sequence.root)
# Get reference sequence and interval
ref_seq = SequenceProxy(data_proxy, allele.location.sequence.root)
start = _get_allele_location_pos(allele, use_start=True)
if start is None:
return input_allele

ival = (allele.location.start, allele.location.end)
end = _get_allele_location_pos(allele, use_start=False)
if end is None:
return input_allele

_allele_state = allele.state.type
_states_with_sequence = ["ReferenceLengthExpression", "LiteralSequenceExpression"]
if _allele_state in _states_with_sequence:
ival = (start.value, end.value)

# Get alleles (the sequences to be normalized) for _normalize
if allele.state.sequence:
alleles = (None, allele.state.sequence.root)
elif _allele_state == "RepeatedSequenceExpression" and \
allele.state.seq_expr.type in _states_with_sequence:
alleles = (None, allele.state.seq_expr.sequence.root)
else:
alleles = (None, "")

new_allele = pydantic_copy(allele)
# If one of Reference Allele Sequence or Alternate Allele Sequence is empty,
# store the length of the non-empty sequence: this is the Repeat Subunit Length
len_ref_seq = len(ref_seq[ival[0]: ival[1]])
len_alt_seq = len(alleles[1])
if not len_ref_seq and len_alt_seq:
# Insertion
repeat_subunit_len = len_alt_seq
elif len_ref_seq and not len_alt_seq:
# Deletion
repeat_subunit_len = len_ref_seq
else:
repeat_subunit_len = 0

new_allele = pydantic_copy(allele)
try:
new_ival, new_alleles = _normalize(sequence,
new_ival, new_alleles = _normalize(ref_seq,
ival,
alleles=alleles,
mode=NormalizationMode.EXPAND,
anchor_length=0)

new_allele.location.start = new_ival[0]
new_allele.location.end = new_ival[1]

if new_allele.state.type in _states_with_sequence:
new_allele.state.sequence = models.SequenceString(new_alleles[1])
new_allele.location.start = _get_new_allele_location_pos(
new_ival[0], start.pos_type
)
new_allele.location.end = _get_new_allele_location_pos(
new_ival[1], end.pos_type
)
new_ref_seq = ref_seq[new_ival[0]: new_ival[1]]

if not new_ref_seq:
# If the reference sequence is empty this is an unambiguous insertion.
# Return a new Allele with the trimmed alternate sequence as a Literal
# Sequence Expression
new_allele.state = models.LiteralSequenceExpression(
sequence=models.SequenceString(new_alleles[1])
)
else:
# Otherwise, return a new Allele using a reference length expression, using
# a Location specified by the coordinates of the new ival, a length
# specified by the length of the alternate allele, and a repeat subunit
# length
allele.state = models.ReferenceLengthExpression(
length=len(new_alleles[1]),
sequence=models.SequenceString(new_ref_seq),
repeat_subunit_length=repeat_subunit_len
)
except ValueError:
# Occurs for ref agree Alleles (when alt = ref)
pass

# Convert IRI back to SequenceReference
if sequence_reference:
new_allele.location.sequence = sequence_reference

Expand Down
Loading

0 comments on commit c8f5419

Please sign in to comment.