Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

computed identifiers using pydantic model serializers #342

Merged
merged 22 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
8d23685
move digest from VO to GA4GH identifiable
ahwagner Feb 7, 2024
c63af59
add ga4gh serialize to models
ahwagner Feb 7, 2024
c2436f8
remove computed field and add get_or_create
ahwagner Feb 7, 2024
89e256c
attribute ordering fixes
ahwagner Feb 10, 2024
6b789a2
add tests for schema to pydantic matching
ahwagner Feb 10, 2024
a173cf4
update is_identifiable
ahwagner Feb 10, 2024
5e38b61
update model validations
ahwagner Feb 10, 2024
9ccef4b
refactor identifier code
ahwagner Feb 10, 2024
3674042
fix IRI behavior when serialized alone
ahwagner Feb 10, 2024
9ab2026
restore haplotype as unordered List with Serializer override
ahwagner Feb 10, 2024
f0402ea
add context control support for in-place edits
ahwagner Feb 10, 2024
f407fd0
Use LiteralSequenceExpression for ambiguous insertions that cannot be…
ehclark Feb 15, 2024
694ce86
Update VCF unit tests to match new digest logic
ehclark Feb 15, 2024
13a45ac
update digests and message structure for trx test
ahwagner Feb 16, 2024
e15837a
Revert "Use LiteralSequenceExpression for ambiguous insertions that c…
ahwagner Feb 19, 2024
1ff0695
remove unnecessary try/except
ahwagner Feb 19, 2024
96d98cd
check insertions for ambiguous novel sequence
ahwagner Feb 19, 2024
9806784
update test cassettes
ahwagner Feb 19, 2024
ae02cc5
Merge branch 'main' into issue-341
korikuzma Feb 19, 2024
b51fb6c
restore use of VOCA seed for RSL
ahwagner Feb 19, 2024
8faaf00
add TODO
ahwagner Feb 19, 2024
7cbae0d
test: fix vcf annotation test
korikuzma Feb 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
544 changes: 544 additions & 0 deletions notebooks/scratch.ipynb

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/ga4gh/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from ._internal.exceptions import GA4GHError
from ._internal.identifiers import (
ga4gh_digest, ga4gh_identify, ga4gh_serialize, is_ga4gh_identifier,
parse_ga4gh_identifier, GA4GHComputeIdentifierWhen, use_ga4gh_compute_identifier_when
parse_ga4gh_identifier, VrsObjectIdentifierIs, use_ga4gh_compute_identifier_when,
CURIE_NAMESPACE, CURIE_SEP, GA4GH_PREFIX_SEP, GA4GH_IR_REGEXP, GA4GH_DIGEST_REGEXP
)
from ._internal.pydantic import (
is_pydantic_instance, is_curie_type, is_ga4gh_identifiable, is_literal, pydantic_copy
Expand Down
171 changes: 43 additions & 128 deletions src/ga4gh/core/_internal/identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,26 @@
from enum import IntEnum
from typing import Union, Optional
from pydantic import BaseModel, RootModel
from canonicaljson import encode_canonical_json

from .digests import sha512t24u
from .pydantic import (
is_pydantic_instance,
is_curie_type,
is_ga4gh_identifiable,
getattr_in,
get_pydantic_root,
is_pydantic_custom_type
)

from .pydantic import get_pydantic_root

__all__ = "ga4gh_digest ga4gh_identify ga4gh_serialize is_ga4gh_identifier parse_ga4gh_identifier".split()

_logger = logging.getLogger(__name__)

namespace = "ga4gh"
curie_sep = ":"
ref_sep = "."
CURIE_NAMESPACE = "ga4gh"
CURIE_SEP = ":"
GA4GH_PREFIX_SEP = "."

ga4gh_ir_regexp = re.compile(r"^ga4gh:(?P<type>[^.]+)\.(?P<digest>.+)$")
GA4GH_IR_REGEXP = re.compile(r"^ga4gh:(?P<type>[^.]+)\.(?P<digest>[0-9A-Za-z_\-]{32})$")
GA4GH_DIGEST_REGEXP = re.compile(r"^[0-9A-Za-z_\-]{32}$")

ns_w_sep = namespace + curie_sep
ns_w_sep = CURIE_NAMESPACE + CURIE_SEP


class GA4GHComputeIdentifierWhen(IntEnum):
class VrsObjectIdentifierIs(IntEnum):
"""
Defines the rule for when the `ga4gh_identify` method should compute
Defines the state for when the `ga4gh_identify` method should compute
an identifier ('id' attribute) for the specified object. The options are:
ALWAYS - Always compute the identifier (this is the default behavior)
INVALID - Compute the identifier if it is missing or is present but syntactically invalid
Expand All @@ -61,8 +53,8 @@ class GA4GHComputeIdentifierWhen(IntEnum):
using `MISSING` can improve performance.
"""

ALWAYS = 0
INVALID = 1
ANY = 0
GA4GH_INVALID = 1
MISSING = 2


Expand All @@ -74,16 +66,16 @@ class use_ga4gh_compute_identifier_when(ContextDecorator):
Context manager that defines when to compute identifiers
for all operations within the context. For example:

with use_ga4gh_compute_identifier_when(GA4GHComputeIdentifierWhen.INVALID):
with use_ga4gh_compute_identifier_when(VrsObjectIdentifierIs.GA4GH_INVALID):
VCFAnnotator(...).annotate(...)

Or:

@use_ga4gh_compute_identifier_when(GA4GHComputeIdentifierWhen.INVALID)
@use_ga4gh_compute_identifier_when(VrsObjectIdentifierIs.GA4GH_INVALID)
def my_method():
"""

def __init__(self, when: GA4GHComputeIdentifierWhen):
def __init__(self, when: VrsObjectIdentifierIs):
self.when = when
self.token = None

Expand Down Expand Up @@ -125,16 +117,26 @@ def parse_ga4gh_identifier(ir):
"""

try:
return ga4gh_ir_regexp.match(str(ir)).groupdict()
return GA4GH_IR_REGEXP.match(str(ir)).groupdict()
except AttributeError as e:
raise ValueError(ir) from e


def ga4gh_identify(vro):
def ga4gh_identify(vro, in_place='default'):
"""
Return the GA4GH digest-based id for the object, as a CURIE
(string). Returns None if object is not identifiable.

This function has three options for in_place editing of vro.id:
- 'default': the standard identifier update behavior for GA4GH
identifiable objects, this mode will update the vro.id
field if the field is empty
- 'always': this will update the vro.id field any time the
identifier is computed (compute behavior is controlled by the
use_ga4gh_compute_identifier_when context)
- 'never': the vro.id field will not be edited in-place,
even when empty

TODO update example for VRS 2.0
>>> import ga4gh.vrs
>>> ival = ga4gh.vrs.models.SimpleInterval(start=44908821, end=44908822)
Expand All @@ -143,36 +145,27 @@ def ga4gh_identify(vro):
'ga4gh:VSL.u5fspwVbQ79QkX6GHLF8tXPCAXFJqRPx'

"""
if is_ga4gh_identifiable(vro):
when_rule = ga4gh_compute_identifier_when.get(GA4GHComputeIdentifierWhen.ALWAYS)
do_compute = False
ir = None
if when_rule == GA4GHComputeIdentifierWhen.ALWAYS:
if vro.is_ga4gh_identifiable():
when_rule = ga4gh_compute_identifier_when.get(VrsObjectIdentifierIs.ANY)
obj_id = None
if when_rule == VrsObjectIdentifierIs.ANY:
do_compute = True
else:
ir = getattr(vro, "id", None)
if when_rule == GA4GHComputeIdentifierWhen.MISSING:
do_compute = ir is None or ir == ""
else: # INVALID
do_compute = ir is None or ir == "" or ga4gh_ir_regexp.match(ir) is None
obj_id = getattr(vro, "id", None)
if when_rule == VrsObjectIdentifierIs.MISSING:
do_compute = obj_id is None or obj_id == ""
else: # GA4GHComputeIdentifierIs.GA4GH_INVALID
do_compute = not vro.has_valid_ga4gh_id()

if do_compute:
digest = ga4gh_digest(vro)
pfx = vro.ga4gh.prefix
ir = f"{namespace}{curie_sep}{pfx}{ref_sep}{digest}"
obj_id = vro.get_or_create_ga4gh_identifier(in_place)

return ir
return obj_id

return None


def _is_sequence_reference(input_obj) -> bool:
"""Determine if `input_obj` is a Sequence Reference"""

return getattr_in(input_obj, ["ga4gh", "assigned", "default"]) and input_obj.type == "SequenceReference"


def ga4gh_digest(vro: BaseModel, do_compact=True):
def ga4gh_digest(vro: BaseModel, overwrite=False):
"""
Return the GA4GH digest for the object.

Expand All @@ -187,12 +180,10 @@ def ga4gh_digest(vro: BaseModel, do_compact=True):
'u5fspwVbQ79QkX6GHLF8tXPCAXFJqRPx'

"""
if _is_sequence_reference(vro):
digest = vro.refgetAccession.split("SQ.")[-1]
if vro.is_ga4gh_identifiable(): # Only GA4GH identifiable objects are GA4GH digestible
return vro.get_or_create_digest(overwrite)
else:
s = ga4gh_serialize(vro)
digest = sha512t24u(s)
return digest
return None


def replace_with_digest(val: dict) -> Union[str, dict]:
Expand Down Expand Up @@ -224,18 +215,7 @@ def ga4gh_serialize(obj: BaseModel) -> Optional[bytes]:
TODO find a way to output identify_all without the 'digest' fields on subobjects,
without traversing the whole tree again in collapse_identifiable_values.
"""
if _is_sequence_reference(obj):
return None

identified = identify_all(obj)
if isinstance(identified, dict):
# Replace identifiable subobjects with their digests
collapsed = collapse_identifiable_values(identified)
if "digest" in collapsed:
del collapsed["digest"]
return encode_canonical_json(collapsed)
else:
return identified.encode("utf-8")
return obj.model_dump_json().encode("utf-8")


def export_pydantic_model(obj, exclude_none=True):
Expand All @@ -252,71 +232,6 @@ def export_pydantic_model(obj, exclude_none=True):
return obj


"""
TODO: discussed making all objects digestible. If no digest keys defined,
include all fields. We first need to define keys for all model objects.
"""


def identify_all(
input_obj: Union[BaseModel, dict, str]
) -> Union[str, dict]:
"""
Adds digests to an identifiable Pydantic object and any identifiable Pydantic
objects in its fields, at any depth. Assumes IRIs are dereferenced.

Returns the identified object tree, and the tree with identified objects
replaced with their digests.

TODO It would be nice to have a pydantic-agnostic version of this that just takes
a dict for input_object, and another dict that has the identifiable+keys metadata.
Something like scrape_model_metadata can be used to generate that metadata.
"""
if input_obj is None:
return None
output_obj = input_obj
if isinstance(input_obj, str):
if input_obj.startswith("ga4gh:") and not input_obj.startswith("ga4gh:SQ"):
return input_obj.split(".")[-1]

if is_pydantic_custom_type(input_obj):
val = export_pydantic_model(input_obj)
if isinstance(val, str) and is_ga4gh_identifier(val):
val = parse_ga4gh_identifier(val)["digest"]
output_obj = val
elif is_pydantic_instance(input_obj):
exported_obj = export_pydantic_model(input_obj)
if "digest" in exported_obj and exported_obj["digest"] is not None:
output_obj = exported_obj
elif _is_sequence_reference(input_obj):
output_obj = exported_obj["refgetAccession"].split("SQ.")[-1]
else:
# Take static key set from the object, or use all fields
include_keys = getattr_in(input_obj, ["ga4gh", "keys"])
# TODO Add keys to each Model class
if include_keys is None or len(include_keys) == 0:
include_keys = exported_obj.keys()
if "digest" in include_keys:
include_keys.remove("digest")
# Serialize each field value
output_obj = {
k: identify_all(getattr(input_obj, k))
for k in include_keys
if hasattr(input_obj, k) # check if None?
}
# Assumes any obj with 'digest' should be collapsed.
collapsed_output_obj = collapse_identifiable_values(output_obj)
# Add a digest to the output if it is identifiable
if is_ga4gh_identifiable(input_obj):
# Compute digest for updated object, not re-running compaction
output_obj["digest"] = ga4gh_digest(collapsed_output_obj, do_compact=False)
else:
exported_obj = export_pydantic_model(input_obj)
if type(exported_obj) in [list, set]:
output_obj = [identify_all(elem) for elem in exported_obj]
return output_obj


# def scrape_model_metadata(obj, meta={}) -> dict:
# """
# For a Pydantic object obj, pull out .ga4gh.identifiable
Expand Down
14 changes: 13 additions & 1 deletion src/ga4gh/core/_internal/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
from typing import Any, Dict, List, Literal, Optional, Union
from enum import Enum

from pydantic import BaseModel, ConfigDict, Field, RootModel, constr
from pydantic import BaseModel, ConfigDict, Field, RootModel, constr, model_serializer
from ga4gh.core import GA4GH_IR_REGEXP


class Relation(Enum):
Expand Down Expand Up @@ -43,6 +44,17 @@ class Code(RootModel):


class IRI(RootModel):

def __hash__(self):
return self.root.__hash__()

@model_serializer(when_used='json')
def ga4gh_serialize(self):
m = GA4GH_IR_REGEXP.match(self.root)
if m is not None:
return m['digest']
return self.root

root: str = Field(
...,
json_schema_extra={'description': 'An IRI Reference (either an IRI or a relative-reference), according to `RFC3986 section 4.1 <https://datatracker.ietf.org/doc/html/rfc3986#section-4.1>` and `RFC3987 section 2.1 <https://datatracker.ietf.org/doc/html/rfc3987#section-2.1>`. MAY be a JSON Pointer as an IRI fragment, as described by `RFC6901 section 6 <https://datatracker.ietf.org/doc/html/rfc6901#section-6>`.',
Expand Down
7 changes: 3 additions & 4 deletions src/ga4gh/core/_internal/pydantic.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ def getattr_in(obj, names) -> Any:

def is_ga4gh_identifiable(o: Any) -> bool:
"""
Determine if object is GA4GH identifiable. An object is considered
GA4GH identifiable if it contains a `ga4gh_prefix` attribute
Determine if object is a GA4GH identifiable type.

:param o: Object
:return: `True` if `o` has `ga4gh_prefix` attribute. `False` otherwise.
:return: `True` if `o` is a GA4GH Identifiable Object. `False` otherwise.
"""
return bool(getattr_in(o, ['ga4gh', 'prefix']))
return o.is_ga4gh_identifiable()


def is_literal(o: Any) -> bool:
Expand Down
Loading
Loading