Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in all Monarch KG edge properties on ingest. #260

Merged
merged 2 commits into from
Nov 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions parsers/monarchkg/src/loadMonarchKG.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
import os
import tarfile
import orjson
import requests

from Common.loader_interface import SourceDataLoader
from Common.kgxmodel import kgxedge
from Common.biolink_constants import *
from Common.utils import GetData
from Common.utils import GetData, GetDataPullError


##############
Expand All @@ -29,7 +30,7 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None):

# there is a /latest/ for this url, but without a valid get_latest_source_version function,
# it could create a mismatch, pin to this version for now
self.data_url = 'https://data.monarchinitiative.org/monarch-kg-dev/2024-03-18/'
self.data_url = 'https://data.monarchinitiative.org/monarch-kg-dev/latest/'
self.monarch_graph_archive = 'monarch-kg.jsonl.tar.gz'
self.monarch_edge_file_archive_path = 'monarch-kg_edges.jsonl'
self.data_files = [self.monarch_graph_archive]
Expand Down Expand Up @@ -63,9 +64,17 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None):
}

def get_latest_source_version(self) -> str:
# possible to retrieve from /latest/index.html with beautifulsoup or some html parser but not ideal,
# planning to try to set up a better method with owners
latest_version = '2024-03-18'
"""
Gets the name of latest monarch kg version from metadata.
"""
latest_version = None
try:
metadata_yaml : requests.Response = requests.get("https://data.monarchinitiative.org/monarch-kg-dev/latest/metadata.yaml")
for line in metadata_yaml.text.split('\n'):
if("kg-version:" in line): latest_version = line.replace("kg-version:","").strip()
if(latest_version==None):raise ValueError("Cannot find 'kg-version' in Monarch KG metadata yaml.")
except Exception as e:
raise GetDataPullError(error_message=f'Unable to determine latest version for Monarch KG: {e}')
return latest_version

def get_data(self) -> bool:
Expand All @@ -85,6 +94,10 @@ def parse_data(self) -> dict:
skipped_ignore_knowledge_source = 0
skipped_undesired_predicate = 0
full_tar_path = os.path.join(self.data_path, self.monarch_graph_archive)
protected_edge_labels = [SUBJECT_ID, OBJECT_ID, PREDICATE,PRIMARY_KNOWLEDGE_SOURCE,
AGGREGATOR_KNOWLEDGE_SOURCES, KNOWLEDGE_LEVEL, AGENT_TYPE,
PUBLICATIONS, "biolink:primary_knowledge_source", "biolink:aggregator_knowledge_source"]

with tarfile.open(full_tar_path, 'r') as tar_files:
with tar_files.extractfile(self.monarch_edge_file_archive_path) as edges_file:
for line in edges_file:
Expand Down Expand Up @@ -116,13 +129,14 @@ def parse_data(self) -> dict:
KNOWLEDGE_LEVEL: monarch_edge[KNOWLEDGE_LEVEL] if KNOWLEDGE_LEVEL in monarch_edge else NOT_PROVIDED,
AGENT_TYPE: monarch_edge[AGENT_TYPE] if AGENT_TYPE in monarch_edge else NOT_PROVIDED
}

if monarch_edge[PUBLICATIONS]:
edge_properties[PUBLICATIONS] = monarch_edge[PUBLICATIONS]

for edge_attribute in monarch_edge:
if '_qualifier' in edge_attribute and monarch_edge[edge_attribute]:
if edge_attribute not in protected_edge_labels and monarch_edge[edge_attribute]:
edge_properties[edge_attribute] = monarch_edge[edge_attribute]
elif edge_attribute == QUALIFIED_PREDICATE and monarch_edge[QUALIFIED_PREDICATE]:
edge_properties[QUALIFIED_PREDICATE] = monarch_edge[QUALIFIED_PREDICATE]

output_edge = kgxedge(
subject_id=subject_id,
predicate=predicate,
Expand Down
Loading