diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index a78d4833..eb959fe5 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -13,27 +13,27 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out the repo - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Get the version id: get_version run: echo ::set-output name=VERSION::${GITHUB_REF/refs\/tags\//} - - name: Extract metadata (tags, labels) for Docker - id: meta - uses: docker/metadata-action@98669ae865ea3cffbcbaa878cf57c20bbf1c6c38 - with: - images: - ghcr.io/${{ github.repository }} - name: Login to ghcr - uses: docker/login-action@v1 + uses: docker/login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a with: registry: ${{ env.REGISTRY }} username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7 + with: + images: + ghcr.io/${{ github.repository }} - name: Push to GitHub Packages - uses: docker/build-push-action@ad44023a93711e3deb337508980b4b5e9bcdc5dc + uses: docker/build-push-action@3b5e8027fcad23fda98b2e3ac259d8d67585f671 with: context: . push: true tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} - build-args: VERSION=${{ steps.get_version.outputs.VERSION }} \ No newline at end of file + build-args: VERSION=${{ steps.get_version.outputs.VERSION }} diff --git a/Common/biolink_constants.py b/Common/biolink_constants.py index 80b12438..15bd0a32 100644 --- a/Common/biolink_constants.py +++ b/Common/biolink_constants.py @@ -48,10 +48,13 @@ PREDICATE = 'predicate' PRIMARY_KNOWLEDGE_SOURCE = 'primary_knowledge_source' AGGREGATOR_KNOWLEDGE_SOURCES = 'aggregator_knowledge_source' +SUPPORTING_DATA_SOURCE = 'supporting_data_source' P_VALUE = 'p_value' ADJUSTED_P_VALUE = 'adjusted_p_value' AGENT_TYPE = 'agent_type' KNOWLEDGE_LEVEL = 'knowledge_level' +MAX_RESEARCH_PHASE = 'max_research_phase' +HAS_SUPPORTING_STUDY_RESULT = 'has_supporting_study_result' # enums for knowledge level KNOWLEDGE_ASSERTION = 'knowledge_assertion' @@ -137,6 +140,7 @@ PREDICATE, PRIMARY_KNOWLEDGE_SOURCE, AGGREGATOR_KNOWLEDGE_SOURCES, + SUPPORTING_DATA_SOURCE, PUBLICATIONS, SYNONYMS, DESCRIPTION, @@ -147,6 +151,8 @@ FDA_APPROVAL_STATUS, KNOWLEDGE_LEVEL, MECHANISM_OF_ACTION, + MAX_RESEARCH_PHASE, + HAS_SUPPORTING_STUDY_RESULT, # qualifiers ANATOMICAL_CONTEXT_QUALIFIER, CAUSAL_MECHANISM_QUALIFIER, diff --git a/Common/build_manager.py b/Common/build_manager.py index b44e1a08..0f378927 100644 --- a/Common/build_manager.py +++ b/Common/build_manager.py @@ -12,8 +12,8 @@ from Common.load_manager import SourceDataManager from Common.kgx_file_merger import KGXFileMerger from Common.neo4j_tools import create_neo4j_dump -from Common.kgxmodel import GraphSpec, SubGraphSource, DataSource, NormalizationScheme -from Common.normalization import NORMALIZATION_CODE_VERSION +from Common.kgxmodel import GraphSpec, SubGraphSource, DataSource +from Common.normalization import NORMALIZATION_CODE_VERSION, NormalizationScheme from Common.metadata import Metadata, GraphMetadata, SourceMetadata from Common.supplementation import SequenceVariantSupplementation from Common.biolink_constants import PRIMARY_KNOWLEDGE_SOURCE, AGGREGATOR_KNOWLEDGE_SOURCES, PREDICATE, PUBLICATIONS @@ -139,8 +139,7 @@ def build_dependencies(self, graph_spec: GraphSpec): subgraph_version = subgraph_source.version if self.check_for_existing_graph_dir(subgraph_id, subgraph_version): # load previous metadata - graph_metadata = self.get_graph_metadata(subgraph_id, subgraph_version) - subgraph_source.graph_metadata = graph_metadata.metadata + subgraph_source.graph_metadata = self.get_graph_metadata(subgraph_id, subgraph_version) elif self.current_graph_versions[subgraph_id] == subgraph_version: self.logger.warning(f'For graph {graph_spec.graph_id} subgraph dependency ' f'{subgraph_id} version {subgraph_version} is not ready. Building now...') diff --git a/Common/data_sources.py b/Common/data_sources.py index b105a82a..082cf923 100644 --- a/Common/data_sources.py +++ b/Common/data_sources.py @@ -4,6 +4,7 @@ BINDING_DB = 'BINDING-DB' CAM_KP = 'CAM-KP' CHEBI_PROPERTIES = 'CHEBIProps' +CLINICAL_TRIALS_KP = 'ClinicalTrialsKP' CORD19 = 'Cord19' CTD = 'CTD' DRUG_CENTRAL = 'DrugCentral' @@ -51,6 +52,7 @@ BINDING_DB: ("parsers.BINDING.src.loadBINDINGDB", "BINDINGDBLoader"), CAM_KP: ("parsers.camkp.src.loadCAMKP", "CAMKPLoader"), CHEBI_PROPERTIES: ("parsers.chebi.src.loadChebiProperties", "ChebiPropertiesLoader"), + CLINICAL_TRIALS_KP: ("parsers.clinicaltrials.src.loadCTKP", "CTKPLoader"), CORD19: ("parsers.cord19.src.loadCord19", "Cord19Loader"), CTD: ("parsers.CTD.src.loadCTD", "CTDLoader"), DRUG_CENTRAL: ("parsers.drugcentral.src.loaddrugcentral", "DrugCentralLoader"), diff --git a/Common/kgx_file_converter.py b/Common/kgx_file_converter.py index 304c3b1b..e2f8d4bf 100644 --- a/Common/kgx_file_converter.py +++ b/Common/kgx_file_converter.py @@ -94,7 +94,7 @@ def __determine_properties_and_types(file_path: str, required_properties: dict): for key, value in entity.items(): if value is None: property_type_counts[key]["None"] += 1 - if key in required_properties: + if key in required_properties and key != "name": print(f'WARNING: Required property ({key}) was None: {entity.items()}') raise Exception( f'None found as a value for a required property (property: {key}) in line {entity.items()}') @@ -134,7 +134,7 @@ def __determine_properties_and_types(file_path: str, required_properties: dict): # if 'None' in prop_types: # print(f'WARNING: None found as a value for property {prop}') - if prop in required_properties and (num_prop_types > 1): + if prop in required_properties and (num_prop_types > 1) and prop != "name": # TODO this should just enforce that required properties are the correct type, # instead of trying to establish the type raise Exception(f'Required property {prop} had multiple conflicting types: {type_counts.items()}') @@ -192,7 +192,10 @@ def __convert_to_csv(input_file: str, for item in quick_jsonl_file_iterator(input_file): for key in list(item.keys()): if item[key] is None: - del item[key] + if key == "name": + item["name"] = item["id"] + else: + del item[key] else: prop_type = properties[key] # convert lists into strings with an array delimiter diff --git a/Common/kgx_file_merger.py b/Common/kgx_file_merger.py index 9ceb0105..b6d54159 100644 --- a/Common/kgx_file_merger.py +++ b/Common/kgx_file_merger.py @@ -85,8 +85,10 @@ def merge_primary_sources(self, needs_on_disk_merge = False for graph_source in graph_sources: if isinstance(graph_source, SubGraphSource): - needs_on_disk_merge = True - break + for source_id in graph_source.graph_metadata.get_source_ids(): + if source_id in RESOURCE_HOGS: + needs_on_disk_merge = True + break elif graph_source.id in RESOURCE_HOGS: needs_on_disk_merge = True break diff --git a/Common/kgx_file_normalizer.py b/Common/kgx_file_normalizer.py index b0ad5457..cdd97c9e 100644 --- a/Common/kgx_file_normalizer.py +++ b/Common/kgx_file_normalizer.py @@ -5,25 +5,13 @@ from Common.biolink_utils import BiolinkInformationResources, INFORES_STATUS_INVALID, INFORES_STATUS_DEPRECATED from Common.biolink_constants import SEQUENCE_VARIANT, PRIMARY_KNOWLEDGE_SOURCE, AGGREGATOR_KNOWLEDGE_SOURCES, \ PUBLICATIONS, OBJECT_ID, SUBJECT_ID, PREDICATE, SUBCLASS_OF -from Common.normalization import NodeNormalizer, EdgeNormalizer, EdgeNormalizationResult +from Common.normalization import NormalizationScheme, NodeNormalizer, EdgeNormalizer, EdgeNormalizationResult, \ + NormalizationFailedError from Common.utils import LoggingUtil, chunk_iterator from Common.kgx_file_writer import KGXFileWriter -from Common.kgxmodel import NormalizationScheme from Common.merging import MemoryGraphMerger, DiskGraphMerger -class NormalizationBrokenError(Exception): - def __init__(self, error_message: str, actual_error: Exception=None): - self.error_message = error_message - self.actual_error = actual_error - - -class NormalizationFailedError(Exception): - def __init__(self, error_message: str, actual_error: Exception=None): - self.error_message = error_message - self.actual_error = actual_error - - EDGE_PROPERTIES_THAT_SHOULD_BE_SETS = {AGGREGATOR_KNOWLEDGE_SOURCES, PUBLICATIONS} NODE_NORMALIZATION_BATCH_SIZE = 1_000_000 EDGE_NORMALIZATION_BATCH_SIZE = 1_000_000 @@ -350,6 +338,7 @@ def normalize_edge_file(self): # this could happen due to rare cases of normalization splits where one node normalizes to many if edge_count > 1: edge_splits += edge_count - 1 + graph_merger.merge_edges(normalized_edges) self.logger.info(f'Processed {number_of_source_edges} edges so far...') diff --git a/Common/kgxmodel.py b/Common/kgxmodel.py index 65cb9088..d18c2f82 100644 --- a/Common/kgxmodel.py +++ b/Common/kgxmodel.py @@ -1,6 +1,7 @@ from dataclasses import dataclass from Common.biolink_constants import NAMED_THING -from Common.normalization import NORMALIZATION_CODE_VERSION +from Common.metadata import GraphMetadata +from Common.normalization import NormalizationScheme class kgxnode: def __init__(self, @@ -33,31 +34,6 @@ def __init__(self, self.properties = {} -@dataclass -class NormalizationScheme: - node_normalization_version: str = 'latest' - edge_normalization_version: str = 'latest' - normalization_code_version: str = NORMALIZATION_CODE_VERSION - strict: bool = True - conflation: bool = False - - def get_composite_normalization_version(self): - composite_normalization_version = f'{self.node_normalization_version}_' \ - f'{self.edge_normalization_version}_{self.normalization_code_version}' - if self.conflation: - composite_normalization_version += '_conflated' - if self.strict: - composite_normalization_version += '_strict' - return composite_normalization_version - - def get_metadata_representation(self): - return {'node_normalization_version': self.node_normalization_version, - 'edge_normalization_version': self.edge_normalization_version, - 'normalization_code_version': self.normalization_code_version, - 'conflation': self.conflation, - 'strict': self.strict} - - @dataclass class GraphSpec: graph_id: str @@ -91,13 +67,13 @@ class GraphSource: @dataclass class SubGraphSource(GraphSource): - graph_metadata: dict = None + graph_metadata: GraphMetadata = None def get_metadata_representation(self): return {'graph_id': self.id, 'release_version': self.version, 'merge_strategy:': self.merge_strategy, - 'graph_metadata': self.graph_metadata} + 'graph_metadata': self.graph_metadata.metadata if self.graph_metadata else None} @dataclass diff --git a/Common/load_manager.py b/Common/load_manager.py index 323dfe53..9ba5aa44 100644 --- a/Common/load_manager.py +++ b/Common/load_manager.py @@ -5,9 +5,8 @@ from Common.data_sources import SourceDataLoaderClassFactory, RESOURCE_HOGS, get_available_data_sources from Common.utils import LoggingUtil, GetDataPullError -from Common.kgx_file_normalizer import KGXFileNormalizer, NormalizationBrokenError, NormalizationFailedError -from Common.kgxmodel import NormalizationScheme -from Common.normalization import NodeNormalizer, EdgeNormalizer +from Common.kgx_file_normalizer import KGXFileNormalizer +from Common.normalization import NormalizationScheme, NodeNormalizer, EdgeNormalizer, NormalizationFailedError from Common.metadata import SourceMetadata from Common.loader_interface import SourceDataBrokenError, SourceDataFailedError from Common.supplementation import SequenceVariantSupplementation, SupplementationFailedError @@ -356,17 +355,6 @@ def normalize_source(self, normalization_status=SourceMetadata.STABLE, normalization_info=normalization_info) return True - except NormalizationBrokenError as broken_error: - error_message = f"{source_id} NormalizationBrokenError: {broken_error.error_message}" - if broken_error.actual_error: - error_message += f" - {broken_error.actual_error}" - self.logger.error(error_message) - source_metadata.update_normalization_metadata(parsing_version, - composite_normalization_version, - normalization_status=SourceMetadata.BROKEN, - normalization_error=error_message, - normalization_time=current_time) - return False except NormalizationFailedError as failed_error: error_message = f"{source_id} NormalizationFailedError: {failed_error.error_message}" if failed_error.actual_error: diff --git a/Common/merging.py b/Common/merging.py index d1b01d85..ce617f0f 100644 --- a/Common/merging.py +++ b/Common/merging.py @@ -19,17 +19,34 @@ def edge_key_function(edge): def entity_merging_function(entity_1, entity_2, properties_that_are_sets): - for key, value in entity_2.items(): - # TODO - make sure this is the behavior we want - - # for properties that are lists append the values - # otherwise keep the first one - if key in entity_1: - if isinstance(value, list): - entity_1[key].extend(value) - if key in properties_that_are_sets: - entity_1[key] = list(set(entity_1[key])) + # for every property of entity 2 + for key, entity_2_value in entity_2.items(): + # if entity 1 also has the property and entity_2_value is not null/empty: + # concatenate values if one is a list, otherwise ignore the property from entity 2 + if (key in entity_1) and entity_2_value: + entity_1_value = entity_1[key] + entity_1_is_list = isinstance(entity_1_value, list) + entity_2_is_list = isinstance(entity_2_value, list) + if entity_1_is_list and entity_2_is_list: + # if they're both lists just combine them + entity_1_value.extend(entity_2_value) + elif entity_1_is_list: + # if 1 is a list and 2 isn't, append the value of 2 to the list from 1 + entity_1_value.append(entity_2_value) + elif entity_2_is_list: + if entity_1_value: + # if 2 is a list and 1 has a value, add the value of 1 to the list from 2 + entity_1[key] = [entity_1_value] + entity_2_value + else: + # if 2 is a list and 1 doesn't have a value, just use the list from 2 + entity_1[key] = entity_2_value + # else: + # if neither is a list, do nothing (keep the value from 1) + if (entity_1_is_list or entity_2_is_list) and (key in properties_that_are_sets): + entity_1[key] = list(set(entity_1[key])) else: - entity_1[key] = value + # if entity 1 doesn't have the property, add the property from entity 2 + entity_1[key] = entity_2_value return entity_1 diff --git a/Common/metadata.py b/Common/metadata.py index ec9bfecd..9a467f7d 100644 --- a/Common/metadata.py +++ b/Common/metadata.py @@ -3,7 +3,7 @@ import json from xxhash import xxh64_hexdigest -from Common.kgxmodel import NormalizationScheme +from Common.normalization import NormalizationScheme class Metadata: @@ -122,6 +122,9 @@ def get_build_status(self): def get_graph_version(self): return self.metadata['graph_version'] + def get_source_ids(self): + return [source['source_id'] for source in self.metadata['sources']] + class SourceMetadata(Metadata): diff --git a/Common/neo4j_tools.py b/Common/neo4j_tools.py index 889db44b..0b3b69e6 100644 --- a/Common/neo4j_tools.py +++ b/Common/neo4j_tools.py @@ -37,11 +37,12 @@ def import_csv_files(self, return password_exit_code self.logger.info(f'Importing csv files to neo4j...') - neo4j_import_cmd = ["neo4j-admin", "import", f"--nodes={csv_nodes_filename}", - f"--relationships={csv_edges_filename}", + neo4j_import_cmd = ['neo4j-admin', 'database', 'import', 'full', + f'--nodes={csv_nodes_filename}', + f'--relationships={csv_edges_filename}', '--delimiter=TAB', '--array-delimiter=U+001F', - '--force'] + '--overwrite-destination=true'] import_results: subprocess.CompletedProcess = subprocess.run(neo4j_import_cmd, cwd=graph_directory, capture_output=True) @@ -60,7 +61,7 @@ def load_backup_dump(self, return password_exit_code self.logger.info(f'Loading a neo4j backup dump {dump_file_path}...') - neo4j_load_cmd = ['neo4j-admin', 'load', f'--from={dump_file_path}', '--force'] + neo4j_load_cmd = ['neo4j-admin', 'database', 'load', f'--from-path={dump_file_path}', '--overwrite-destination=true', 'neo4j'] load_results: subprocess.CompletedProcess = subprocess.run(neo4j_load_cmd, capture_output=True) self.logger.info(load_results.stdout) @@ -71,10 +72,23 @@ def load_backup_dump(self, self.logger.error(error_message) return load_results_return_code + def migrate_dump_to_neo4j_5(self): + self.logger.info(f'Migrating db dump to neo4j 5...') + neo4j_migrate_cmd = ['neo4j-admin', 'database', 'migrate', '--force-btree-indexes-to-range', 'neo4j'] + migrate_results: subprocess.CompletedProcess = subprocess.run(neo4j_migrate_cmd, + capture_output=True) + self.logger.info(migrate_results.stdout) + results_return_code = migrate_results.returncode + if results_return_code != 0: + error_message = f'Neo4j migrate subprocess error (ExitCode {results_return_code}): ' \ + f'{migrate_results.stderr.decode("UTF-8")}' + self.logger.error(error_message) + return results_return_code + def create_backup_dump(self, - dump_file_path: str = None): + dump_directory: str = None): self.logger.info(f'Creating a backup dump of the neo4j...') - neo4j_dump_cmd = ['neo4j-admin', 'dump', f'--to={dump_file_path}'] + neo4j_dump_cmd = ['neo4j-admin', 'database', 'dump', 'neo4j', f'--to-path={dump_directory}'] dump_results: subprocess.CompletedProcess = subprocess.run(neo4j_dump_cmd, capture_output=True) self.logger.info(dump_results.stdout) @@ -107,7 +121,7 @@ def __issue_neo4j_command(self, command: str): def set_initial_password(self): self.logger.info('Setting initial password for Neo4j...') - neo4j_cmd = ['neo4j-admin', 'set-initial-password', self.password] + neo4j_cmd = ['neo4j-admin', 'dbms', 'set-initial-password', self.password] neo4j_results: subprocess.CompletedProcess = subprocess.run(neo4j_cmd, capture_output=True) self.logger.info(neo4j_results.stdout) @@ -139,7 +153,7 @@ def add_db_indexes(self): with self.neo4j_driver.session() as session: # node name index - node_name_index_cypher = f'CREATE INDEX node_name_index FOR (n:`{NAMED_THING}`) on (n.name)' + node_name_index_cypher = f'CREATE INDEX node_name_index FOR (n:`{NAMED_THING}`) ON (n.name)' self.logger.info(f'Adding node name index on {NAMED_THING}.name') session.run(node_name_index_cypher).consume() indexes_added += 1 @@ -151,8 +165,8 @@ def add_db_indexes(self): self.logger.info(f'Adding node id indexes for node labels: {node_labels}') for node_label in node_labels: node_label_index = f'node_id_{node_label.replace(":", "_")}' - node_name_index_cypher = f'CREATE CONSTRAINT {node_label_index} ON (n:`{node_label}`) ' \ - f'ASSERT n.id IS UNIQUE' + node_name_index_cypher = f'CREATE CONSTRAINT {node_label_index} FOR (n:`{node_label}`) ' \ + f'REQUIRE n.id IS UNIQUE' session.run(node_name_index_cypher).consume() indexes_added += 1 index_names.append(node_label_index) @@ -227,7 +241,11 @@ def create_neo4j_dump(nodes_filepath: str, edges_output_file=csv_edges_file_path) if logger: logger.info(f'CSV files created for {graph_id}({graph_version})...') - graph_dump_name = f'graph_{graph_version}.db.dump' if graph_version else 'graph.db.dump' + + # would like to do the following, but apparently you can't specify a custom name for the dump now + # graph_dump_name = f'graph_{graph_version}.neo4j5.db.dump' if graph_version else 'graph.neo4j5.db.dump' + # graph_dump_file_path = os.path.join(output_directory, graph_dump_name) + graph_dump_name = 'neo4j.dump' graph_dump_file_path = os.path.join(output_directory, graph_dump_name) if os.path.exists(graph_dump_file_path): if logger: @@ -258,7 +276,7 @@ def create_neo4j_dump(nodes_filepath: str, if stop_exit_code != 0: return False - dump_exit_code = neo4j_access.create_backup_dump(graph_dump_file_path) + dump_exit_code = neo4j_access.create_backup_dump(output_directory) if dump_exit_code != 0: return False diff --git a/Common/normalization.py b/Common/normalization.py index fd43d151..39150eeb 100644 --- a/Common/normalization.py +++ b/Common/normalization.py @@ -3,6 +3,9 @@ import requests import time +from requests.adapters import HTTPAdapter, Retry +from dataclasses import dataclass + from robokop_genetics.genetics_normalization import GeneticsNormalizer from Common.biolink_constants import * from Common.utils import LoggingUtil @@ -15,6 +18,36 @@ # predicate to use when normalization fails FALLBACK_EDGE_PREDICATE = 'biolink:related_to' +@dataclass +class NormalizationScheme: + node_normalization_version: str = 'latest' + edge_normalization_version: str = 'latest' + normalization_code_version: str = NORMALIZATION_CODE_VERSION + strict: bool = True + conflation: bool = False + + def get_composite_normalization_version(self): + composite_normalization_version = f'{self.node_normalization_version}_' \ + f'{self.edge_normalization_version}_{self.normalization_code_version}' + if self.conflation: + composite_normalization_version += '_conflated' + if self.strict: + composite_normalization_version += '_strict' + return composite_normalization_version + + def get_metadata_representation(self): + return {'node_normalization_version': self.node_normalization_version, + 'edge_normalization_version': self.edge_normalization_version, + 'normalization_code_version': self.normalization_code_version, + 'conflation': self.conflation, + 'strict': self.strict} + + +class NormalizationFailedError(Exception): + def __init__(self, error_message: str, actual_error: Exception = None): + self.error_message = error_message + self.actual_error = actual_error + class NodeNormalizer: """ @@ -70,109 +103,95 @@ def __init__(self, self.sequence_variant_normalizer = None self.variant_node_types = None - def hit_node_norm_service(self, curies, retries=0): - resp: requests.models.Response = requests.post(f'{self.node_norm_endpoint}get_normalized_nodes', - json={'curies': curies, - 'conflate': self.conflate_node_types, - 'drug_chemical_conflate': self.conflate_node_types, - 'description': True}) + self.requests_session = self.get_normalization_requests_session() + + def hit_node_norm_service(self, curies): + resp = self.requests_session.post(f'{self.node_norm_endpoint}get_normalized_nodes', + json={'curies': curies, + 'conflate': self.conflate_node_types, + 'drug_chemical_conflate': self.conflate_node_types, + 'description': True}) if resp.status_code == 200: # if successful return the json as an object - return resp.json() - else: - error_message = f'Node norm response code: {resp.status_code}' - if resp.status_code >= 500: - # if 5xx retry 3 times - retries += 1 - if retries == 4: - error_message += ', retried 3 times, giving up..' - self.logger.error(error_message) - resp.raise_for_status() - else: - error_message += f', retrying.. (attempt {retries})' - time.sleep(retries * 3) - self.logger.error(error_message) - return self.hit_node_norm_service(curies, retries) + response_json = resp.json() + if response_json: + return response_json else: - # we should never get a legitimate 4xx response from node norm, - # crash with an error for troubleshooting - if resp.status_code == 422: - error_message += f'(curies: {curies})' - self.logger.error(error_message) - resp.raise_for_status() + error_message = f"Node Normalization service {self.node_norm_endpoint} returned 200 " \ + f"but with an empty result for (curies: {curies})" + raise NormalizationFailedError(error_message=error_message) + else: + error_message = f'Node norm response code: {resp.status_code} (curies: {curies})' + self.logger.error(error_message) + resp.raise_for_status() - def normalize_node_data(self, node_list: list, block_size: int = 1000) -> list: + def normalize_node_data(self, node_list: list, batch_size: int = 1000) -> list: """ - This method calls the NodeNormalization web service to get the normalized identifier and name of the node. - the data comes in as a node list. + This method calls the NodeNormalization web service and normalizes a list of nodes. - :param node_list: A list with items to normalize - :param block_size: the number of curies in the request + :param node_list: A list of unique nodes to normalize + :param batch_size: the number of curies to be sent to NodeNormalization at once :return: """ - self.logger.debug(f'Start of normalize_node_data. items: {len(node_list)}') - - # init the cache - this accumulates all the results from the node norm service - cached_node_norms: dict = {} + # look up all valid biolink node types if needed + # this is used when strict normalization is off to ensure only valid types go into the graph as NODE_TYPES + if not self.strict_normalization and not self.biolink_compliant_node_types: + biolink_lookup = EdgeNormalizer(edge_normalization_version=self.biolink_version) + self.biolink_compliant_node_types = biolink_lookup.get_valid_node_types() - # create a unique set of node ids - tmp_normalize: set = set([node['id'] for node in node_list]) + # make a list of the node ids, we used to deduplicate here, but now we expect the list to be unique ids + to_normalize: list = [node['id'] for node in node_list] - # convert the set to a list so we can iterate through it - to_normalize: list = list(tmp_normalize) - - # init the array index lower boundary + # use indexes and slice to grab batch_size sized chunks of ids from the list start_index: int = 0 - - # get the last index of the list last_index: int = len(to_normalize) - - self.logger.debug(f'{last_index} unique nodes found in this group.') - - # grab chunks of the data frame + chunks_of_ids = [] while True: if start_index < last_index: - # define the end index of the slice - end_index: int = start_index + block_size + end_index: int = start_index + batch_size - # force the end index to be the last index to insure no overflow + # force the end index to be no greater than the last index to ensure no overflow if end_index >= last_index: end_index = last_index - self.logger.debug(f'Working block {start_index} to {end_index}.') - - # collect a slice of records from the data frame - data_chunk: list = to_normalize[start_index: end_index] - - # hit the node norm api - normalization_json = self.hit_node_norm_service(curies=data_chunk) - if normalization_json: - # merge the normalization results with what we have gotten so far - cached_node_norms.update(**normalization_json) - else: - # this shouldn't happen but if the API returns an empty dict instead of nulls, - # assume none of the curies normalize - empty_responses = {curie: None for curie in data_chunk} - cached_node_norms.update(empty_responses) + # collect a slice of block_size curies from the full list + chunks_of_ids.append(to_normalize[start_index: end_index]) # move on down the list - start_index += block_size + start_index += batch_size else: break + # we should be able to do the following, but it's causing RemoteDisconnected errors with node norm + # + # hit the node norm api with the chunks of curies in parallel + # we could try to optimize the number of max_workers for ThreadPoolExecutor more specifically, + # by default python attempts to find a reasonable # based on os.cpu_count() + # with ThreadPoolExecutor() as executor: + # executor_results = executor.map(self.hit_node_norm_service, chunks_of_ids) + # + # normalization_results = list(executor_results) + # for normalization_json, ids in zip(normalization_results, chunks_of_ids): + # if not normalization_json: + # raise NormalizationFailedError(f'!!! Normalization json results missing for ids: {ids}') + # else: + # merge the normalization results into one dictionary + # node_normalization_results.update(**normalization_json) + + # until we can get threading working, hit node norm sequentially + node_normalization_results: dict = {} + for chunk in chunks_of_ids: + results = self.hit_node_norm_service(chunk) + node_normalization_results.update(**results) + # reset the node index node_idx = 0 # node ids that failed to normalize failed_to_normalize: list = [] - # look up valid node types if needed - if not self.strict_normalization and not self.biolink_compliant_node_types: - biolink_lookup = EdgeNormalizer(edge_normalization_version=self.biolink_version) - self.biolink_compliant_node_types = biolink_lookup.get_valid_node_types() - # for each node update the node with normalized information # store the normalized IDs in self.node_normalization_lookup for later look up while node_idx < len(node_list): @@ -217,7 +236,7 @@ def normalize_node_data(self, node_list: list, block_size: int = 1000) -> list: current_node[NODE_TYPES] = list(set(current_node[NODE_TYPES])) # did we get a response from the normalizer - current_node_normalization = cached_node_norms[current_node_id] + current_node_normalization = node_normalization_results[current_node_id] if current_node_normalization is not None: current_node_id_section = current_node_normalization['id'] @@ -341,6 +360,17 @@ def get_current_node_norm_version(self): # this shouldn't happen, raise an exception resp.raise_for_status() + @staticmethod + def get_normalization_requests_session(): + pool_maxsize = max(os.cpu_count(), 10) + s = requests.Session() + retries = Retry(total=8, + backoff_factor=1, + status_forcelist=[502, 503, 504, 403, 429]) + s.mount('https://', HTTPAdapter(max_retries=retries, pool_maxsize=pool_maxsize)) + s.mount('http://', HTTPAdapter(max_retries=retries, pool_maxsize=pool_maxsize)) + return s + class EdgeNormalizationResult: def __init__(self, @@ -398,10 +428,8 @@ def normalize_edge_data(self, """ # find the predicates that have not been normalized yet - predicates_to_normalize = set() - for edge in edge_list: - if edge[PREDICATE] not in self.edge_normalization_lookup: - predicates_to_normalize.add(edge[PREDICATE]) + predicates_to_normalize = {edge[PREDICATE] for edge in edge_list + if edge[PREDICATE] not in self.edge_normalization_lookup} # convert the set to a list so we can iterate through it predicates_to_normalize_list = list(predicates_to_normalize) diff --git a/Common/supplementation.py b/Common/supplementation.py index 9665ceb9..8a27f4f1 100644 --- a/Common/supplementation.py +++ b/Common/supplementation.py @@ -8,11 +8,10 @@ from zipfile import ZipFile from collections import defaultdict from Common.biolink_constants import * -from Common.normalization import FALLBACK_EDGE_PREDICATE +from Common.normalization import FALLBACK_EDGE_PREDICATE, NormalizationScheme from Common.utils import LoggingUtil from Common.kgx_file_writer import KGXFileWriter from Common.kgx_file_normalizer import KGXFileNormalizer -from Common.kgxmodel import NormalizationScheme SNPEFF_SO_PREDICATES = { diff --git a/Common/utils.py b/Common/utils.py index cca7257a..30ef06f1 100644 --- a/Common/utils.py +++ b/Common/utils.py @@ -264,23 +264,30 @@ def get_http_file_modified_date(self, file_url: str): self.logger.error(error_message) raise GetDataPullError(error_message) - def pull_via_http(self, url: str, data_dir: str, is_gzip=False) -> int: + def pull_via_http(self, url: str, data_dir: str, is_gzip=False, saved_file_name: str = None) -> int: """ gets the file from an http stream. :param url: :param data_dir: :param is_gzip: + :param saved_file_name: :return: the number of bytes read """ - # get the filename - data_file: str = url.split('/')[-1] + # is_gzip isn't used on the main branch, but it's probably on some branches or forks, + # lets throw this for a while, so it's not mysteriously removed + if is_gzip: + raise NotImplementedError(f'is_gzip is deprecated, unzip files during parsing not retrieval!') - # init the byte counter + # get the name of the file to write + data_file: str = saved_file_name if saved_file_name else url.split('/')[-1] + + # this tracks how much, if any, of the file is downloaded + # (it's not really used anymore, it could be more simple) byte_counter: int = 0 - # get the file if its not there + # check if the file exists already if not os.path.exists(os.path.join(data_dir, data_file)): self.logger.debug(f'Retrieving {url} -> {data_dir}') @@ -288,17 +295,9 @@ def pull_via_http(self, url: str, data_dir: str, is_gzip=False) -> int: hdr = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)'} req = request.Request(url, headers=hdr) - # get the the file data handle + # get the file data handle file_data = request.urlopen(req) - # is this a gzip file - if is_gzip: - # get a handle to the data - file_data = gzip.GzipFile(fileobj=file_data) - - # strip off the .gz if exists - data_file = data_file.replace('.gz', '') - with open(os.path.join(data_dir, data_file), 'wb') as fp: # specify the buffered data block size block = 131072 diff --git a/Dockerfile b/Dockerfile index a65ba0b6..4031f26d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # A docker container with neo4j, java and python for Data Services -FROM neo4j:4.4.10 +FROM neo4j:5.19.0-community-bullseye RUN apt-get update \ && apt-get -y install python3 \ diff --git a/docker-compose.yml b/docker-compose.yml index 49f50611..a22dd7b7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,5 @@ -version: "3.7" services: orion: - platform: linux/amd64 build: context: . command: [python, /ORION/Common/build_manager.py, all] @@ -12,7 +10,6 @@ services: - ORION_GRAPH_SPEC - ORION_GRAPH_SPEC_URL - ORION_OUTPUT_URL - - ORION_NEO4J_PASSWORD - EDGE_NORMALIZATION_ENDPOINT - NODE_NORMALIZATION_ENDPOINT - NAME_RESOLVER_ENDPOINT diff --git a/graph_specs/ctkp-graph-spec.yaml b/graph_specs/ctkp-graph-spec.yaml new file mode 100644 index 00000000..cffefab6 --- /dev/null +++ b/graph_specs/ctkp-graph-spec.yaml @@ -0,0 +1,9 @@ +graphs: + + - graph_id: CTKP_Automat + graph_name: Clinical Trials KP + graph_description: 'The Clinical Trials KP, created and maintained by the Multiomics Provider, provides information on Clinical Trials, ultimately derived from researcher submissions to clinicaltrials.gov, via the Aggregate Analysis of Clinical Trials (AACT) database). Information on select trials includes the NCT Identifier of the trial, interventions used, diseases/conditions relevant to the trial, adverse events, etc.' + graph_url: https://github.com/NCATSTranslator/Translator-All/wiki/Clinical-Trials-KP + output_format: neo4j + sources: + - source_id: ClinicalTrialsKP \ No newline at end of file diff --git a/helm/orion/renci-values.yaml b/helm/orion/renci-values.yaml index 1b4a2e6c..4a81cd9d 100644 --- a/helm/orion/renci-values.yaml +++ b/helm/orion/renci-values.yaml @@ -46,8 +46,8 @@ orion: normalization: nodeNormEndpoint: https://nodenormalization-sri.renci.org/ edgeNormEndpoint: https://bl-lookup-sri.renci.org/ - bl_version: 4.2.0 - outputURL: https://stars.renci.org/var/plater/bl-4.2.0/ + bl_version: 4.2.1 + outputURL: https://stars.renci.org/var/plater/bl-4.2.1/ pharos: host: pod-host-or-ip diff --git a/parsers/BINDING/src/loadBINDINGDB.py b/parsers/BINDING/src/loadBINDINGDB.py index 226dd2e5..9b2b0db1 100644 --- a/parsers/BINDING/src/loadBINDINGDB.py +++ b/parsers/BINDING/src/loadBINDINGDB.py @@ -2,12 +2,13 @@ import enum import math import json +import requests + from zipfile import ZipFile -import requests as rq -import requests.exceptions +from requests.adapters import HTTPAdapter, Retry from parsers.BINDING.src.bindingdb_constraints import LOG_SCALE_AFFINITY_THRESHOLD #Change the binding affinity threshold here. Default is 10 uM Ki,Kd,EC50,orIC50 -from Common.utils import GetData +from Common.utils import GetData, GetDataPullError from Common.loader_interface import SourceDataLoader from Common.extractor import Extractor from Common.biolink_constants import PUBLICATIONS, AFFINITY, AFFINITY_PARAMETER, KNOWLEDGE_LEVEL, AGENT_TYPE, KNOWLEDGE_ASSERTION, MANUAL_AGENT @@ -78,9 +79,9 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None): self.bindingdb_version = self.get_latest_source_version() self.bindingdb_data_url = f"https://www.bindingdb.org/bind/downloads/" - self.BD_archive_file_name = f"BindingDB_All_{self.bindingdb_version}_tsv.zip" - self.BD_file_name = f"BindingDB_All_{self.bindingdb_version}.tsv" - self.data_files = [self.BD_archive_file_name] + self.bd_archive_file_name = f"BindingDB_All_{self.bindingdb_version}_tsv.zip" + self.bd_file_name = f"BindingDB_All.tsv" + self.data_files = [self.bd_archive_file_name] def get_latest_source_version(self) -> str: """ @@ -90,26 +91,35 @@ def get_latest_source_version(self) -> str: if self.bindingdb_version: return self.bindingdb_version try: + s = requests.Session() + retries = Retry(total=5, + backoff_factor=2) + s.mount('https://', HTTPAdapter(max_retries=retries)) + ### The method below gets the database version from the html, but this may be subject to change. ### - binding_db_download_page_response = rq.get('https://www.bindingdb.org/rwd/bind/chemsearch/marvin/Download.jsp',) + binding_db_download_page_response = requests.get('https://www.bindingdb.org/rwd/bind/chemsearch/marvin/Download.jsp', timeout=8) version_index = binding_db_download_page_response.text.index('BindingDB_All_2D_') + 17 bindingdb_version = binding_db_download_page_response.text[version_index:version_index + 6] - except requests.exceptions.SSLError: - # currently the binding db SSL implementation is outdated/broken with the latest packages - self.logger.error(f'BINDING-DB had an SSL error while attempting to retrieve version. Returning default.') - return '202404' + self.bindingdb_version = bindingdb_version + return f"{bindingdb_version}" - return f"{bindingdb_version}" + except requests.exceptions.SSLError: + # BINDING-DB often has ssl related errors with the jsp page + error_message = f'BINDING-DB had an SSL error while attempting to retrieve version..' + except requests.exceptions.Timeout: + error_message = f'BINDING-DB timed out attempting to retrieve version...' + except ValueError: + error_message = f'BINDING-DB get_latest_source_version got a response but could not determine the version' + raise GetDataPullError(error_message=error_message) def get_data(self) -> int: """ Gets the bindingdb data. - """ + # download the zipped data data_puller = GetData() - for source in self.data_files: - source_url = f"{self.bindingdb_data_url}{source}" - data_puller.pull_via_http(source_url, self.data_path) + source_url = f"{self.bindingdb_data_url}{self.bd_archive_file_name}" + data_puller.pull_via_http(source_url, self.data_path) return True def parse_data(self) -> dict: @@ -123,7 +133,8 @@ def parse_data(self) -> dict: data_store= dict() columns = [[x.value,x.name] for x in BD_EDGEUMAN if x.name not in ['PMID','PUBCHEM_AID','PATENT_NUMBER','PUBCHEM_CID','UNIPROT_TARGET_CHAIN']] - for n,row in enumerate(generate_zipfile_rows(os.path.join(self.data_path,self.BD_archive_file_name), self.BD_file_name)): + zipped_data_path = os.path.join(self.data_path, self.bd_archive_file_name) + for n,row in enumerate(generate_zipfile_rows(zipped_data_path, self.bd_file_name)): if n == 0: continue if self.test_mode: diff --git a/parsers/Reactome/src/loadReactome.py b/parsers/Reactome/src/loadReactome.py index dac5b697..a816269b 100755 --- a/parsers/Reactome/src/loadReactome.py +++ b/parsers/Reactome/src/loadReactome.py @@ -109,9 +109,12 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None): super().__init__(test_mode=test_mode, source_data_dir=source_data_dir) self.version_url: str = 'https://reactome.org/about/news' + # we'll rename the neo4j dump as we download it to make neo4j usage easier + # (community edition only allows one database, having just one named 'neo4j' helps) self.neo4j_dump_file = 'reactome.graphdb.dump' + self.saved_neo4j_dump_file = 'neo4j.dump' self.data_url = 'https://reactome.org/download/current/' - self.data_files = [self.neo4j_dump_file] + self.data_files = [self.saved_neo4j_dump_file] self.triple_file: str = 'reactomeContents_CriticalTriples.csv' self.triple_path = os.path.dirname(os.path.abspath(__file__)) @@ -142,15 +145,25 @@ def get_latest_source_version(self) -> str: def get_data(self) -> bool: gd: GetData = GetData(self.logger.level) - for dt_file in self.data_files: - gd.pull_via_http(f'{self.data_url}{dt_file}', - self.data_path) + gd.pull_via_http(f'{self.data_url}{self.neo4j_dump_file}', + self.data_path, saved_file_name=self.saved_neo4j_dump_file) return True def parse_data(self): neo4j_tools = Neo4jTools() - neo4j_tools.load_backup_dump(f'{self.data_path}/{self.neo4j_dump_file}') - neo4j_tools.start_neo4j() + + neo4j_status_code = neo4j_tools.load_backup_dump(f'{self.data_path}/') + if neo4j_status_code: + raise SystemError('Neo4j failed to load the backup dump.') + + neo4j_status_code = neo4j_tools.migrate_dump_to_neo4j_5() + if neo4j_status_code: + raise SystemError('Neo4j failed to migrate the dump to neo4j 5.') + + neo4j_status_code = neo4j_tools.start_neo4j() + if neo4j_status_code: + raise SystemError('Neo4j failed to start.') + neo4j_tools.wait_for_neo4j_initialization() neo4j_driver = neo4j_tools.neo4j_driver diff --git a/parsers/clinicaltrials/src/loadCTKP.py b/parsers/clinicaltrials/src/loadCTKP.py new file mode 100644 index 00000000..4b95ba1f --- /dev/null +++ b/parsers/clinicaltrials/src/loadCTKP.py @@ -0,0 +1,222 @@ +import enum +import os +import requests +import json + +from Common.biolink_constants import * +from Common.extractor import Extractor +from Common.utils import GetData +from Common.loader_interface import SourceDataLoader +from Common.utils import GetDataPullError + + +# the data header columns the nodes files are: +class NODESDATACOLS(enum.IntEnum): + ID = 0 + NAME = 1 + CATEGORY = 2 + + +# the data header columns for the edges file are: +class EDGESDATACOLS(enum.IntEnum): + ID = 0 + SUBJECT = 1 + PREDICATE = 2 + OBJECT = 3 + SUBJECT_NAME = 4 + OBJECT_NAME = 5 + CATEGORY = 6 + KNOWLEDGE_LEVEL = 7 + AGENT_TYPE = 8 + NCTID = 9 + PHASE = 10 + PRIMARY_PURPOSE = 11 + INTERVENTION_MODEL = 12 + TIME_PERSPECTIVE = 13 + OVERALL_STATUS = 14 + START_DATE = 15 + ENROLLMENT = 16 + ENROLLMENT_TYPE = 17 + AGE_RANGE = 18 + CHILD = 19 + ADULT = 20 + OLDER_ADULT = 21 + UNII = 22 + + +class CTKPLoader(SourceDataLoader): + source_id: str = "ClinicalTrialsKP" + provenance_id: str = "infores:biothings-multiomics-clinicaltrials" + description = "The Clinical Trials KP, created and maintained by the Multiomics Provider, provides information on Clinical Trials, ultimately derived from researcher submissions to clinicaltrials.gov, via the Aggregate Analysis of Clinical Trials (AACT) database). Information on select trials includes the NCT Identifier of the trial, interventions used, diseases/conditions relevant to the trial, adverse events, etc." + source_data_url = "https://aact.ctti-clinicaltrials.org/" + license = "https://github.com/ctti-clinicaltrials/aact/blob/dev/LICENSE" + attribution = "" + parsing_version = "1.0" + + def __init__(self, test_mode: bool = False, source_data_dir: str = None): + """ + :param test_mode - sets the run into test mode + :param source_data_dir - the specific storage directory to save files in + """ + super().__init__(test_mode=test_mode, source_data_dir=source_data_dir) + + # until we can use the manifest to determine versions and source data file locations we'll hard code it + self.node_file_name = 'clinical_trials_kg_nodes_v2.2.10.tsv' + self.edge_file_name = 'clinical_trials_kg_edges_v2.2.10.tsv' + self.data_url = "https://db.systemsbiology.net/gestalt/KG/" + + # once we use the manifest, we'll rename the files while downloading and they can be called something generic + # self.node_file_name = 'nodes.tsv' + # self.edge_file_name = 'edges.tsv' + + self.data_files = [ + self.node_file_name, + self.edge_file_name + ] + + self.aact_infores = "infores:aact" + self.ctgov_infores = "infores:clinicaltrials" + self.treats_predicate = "biolink:treats" + self.source_record_url = "https://db.systemsbiology.net/gestalt/cgi-pub/KGinfo.pl?id=" + + def get_latest_source_version(self) -> str: + latest_version = "2.2.10" + # we'd like to do this but for now we're using the dev version which is not in the manifest + # latest_version = self.get_manifest()['version'] + return latest_version + + @staticmethod + def get_manifest(): + manifest_response = requests.get('https://github.com/multiomicsKP/clinical_trials_kp/blob/main/manifest.json') + if manifest_response.status_code == 200: + manifest = manifest_response.json() + return manifest + else: + manifest_response.raise_for_status() + + def get_data(self) -> int: + """ + manifest = self.get_manifest() + source_data_urls = manifest['dumper']['data_url'] + nodes_url = None + edges_url = None + for data_url in source_data_urls: + if 'nodes' in data_url: + nodes_url = data_url + elif 'edges' in data_url: + edges_url = data_url + if not nodes_url and edges_url: + raise GetDataPullError(f'Could not determine nodes and edges files in CTKP manifest data urls: {source_data_urls}') + data_puller = GetData() + data_puller.pull_via_http(nodes_url, self.data_path, saved_file_name=self.node_file_name) + data_puller.pull_via_http(edges_url, self.data_path, saved_file_name=self.edge_file_name) + """ + data_puller = GetData() + for source in self.data_files: + source_url = f"{self.data_url}{source}" + data_puller.pull_via_http(source_url, self.data_path) + return True + + def parse_data(self) -> dict: + """ + Parses the data file for graph nodes/edges and writes them to the KGX files. + + :return: ret_val: record counts + """ + + extractor = Extractor(file_writer=self.output_file_writer) + + # get the nodes + # it's not really necessary because normalization will overwrite the only information here (name and category) + nodes_file: str = os.path.join(self.data_path, self.node_file_name) + with open(nodes_file, 'r') as fp: + extractor.csv_extract(fp, + lambda line: line[NODESDATACOLS.ID.value], # subject id + lambda line: None, # object id + lambda line: None, # predicate + lambda line: {NAME: line[NODESDATACOLS.NAME.value], + NODE_TYPES: line[NODESDATACOLS.CATEGORY.value]}, # subject props + lambda line: {}, # object props + lambda line: {}, # edgeprops + comment_character=None, + delim='\t', + has_header_row=True) + + edges_file: str = os.path.join(self.data_path, self.edge_file_name) + with open(edges_file, 'r') as fp: + extractor.csv_extract(fp, + lambda line: line[EDGESDATACOLS.SUBJECT.value], # subject id + lambda line: line[EDGESDATACOLS.OBJECT.value], # object id + lambda line: line[EDGESDATACOLS.PREDICATE.value], # predicate + lambda line: {}, # subject props + lambda line: {}, # object props + lambda line: self.get_edge_properties(line), # edgeprops + comment_character=None, + delim='\t', + has_header_row=True) + + return extractor.load_metadata + + def get_edge_properties(self, line): + + supporting_studies = [] + pred = str(line[EDGESDATACOLS.PREDICATE.value]) + nctids = str(line[EDGESDATACOLS.NCTID.value]).split(',') + phases = str(line[EDGESDATACOLS.PHASE.value]).split(',') + status = str(line[EDGESDATACOLS.OVERALL_STATUS.value]).split(',') + enroll = str(line[EDGESDATACOLS.ENROLLMENT.value]).split(',') + en_typ = str(line[EDGESDATACOLS.ENROLLMENT_TYPE.value]).split(',') + max_phase = 0 + elevate_to_prediction = False + for nctid, phase, stat, enrollment, enrollment_type in zip(nctids, phases, status, enroll, en_typ): + if float(phase) > max_phase: + max_phase = float(phase) + try: + enrollment = int(enrollment) + except ValueError: + enrollment = -1 + + supporting_study_attributes = { + "id": nctid, + "tested_intervention": "unsure" if pred == "biolink:mentioned_in_trials_for" else "yes", + "phase": phase, + "status": stat, + "study_size": enrollment + } + # convert to TRAPI format + supporting_studies.append( + {"attribute_type_id": HAS_SUPPORTING_STUDY_RESULT, + "value": nctid, + "attributes": [{"attribute_type_id": key, + "value": value} for key, value in supporting_study_attributes.items()]}) + + # if pred == "biolink:in_clinical_trials_for" and max_phase >= 4: + # elevate_to_prediction = True + + if pred == self.treats_predicate: + primary_knowledge_source = self.provenance_id + aggregator_knowledge_sources = [self.aact_infores] + supporting_data_source = self.ctgov_infores + else: + primary_knowledge_source = self.ctgov_infores + aggregator_knowledge_sources = [self.aact_infores, self.provenance_id] + supporting_data_source = None + + edge_attributes = { + EDGE_ID: line[EDGESDATACOLS.ID.value], + PRIMARY_KNOWLEDGE_SOURCE: primary_knowledge_source, + AGGREGATOR_KNOWLEDGE_SOURCES: aggregator_knowledge_sources, + KNOWLEDGE_LEVEL: line[EDGESDATACOLS.KNOWLEDGE_LEVEL.value], + AGENT_TYPE: line[EDGESDATACOLS.AGENT_TYPE.value], + MAX_RESEARCH_PHASE: str(float(max_phase)), + "elevate_to_prediction": elevate_to_prediction, # this isn't in biolink so not using a constant for now + # note source_record_urls should be paired with specific knowledge sources but currently + # there's no implementation for that, just pass it as a normal attribute for now + "source_record_urls": [self.source_record_url + line[EDGESDATACOLS.ID.value]] + } + if supporting_data_source: + edge_attributes[SUPPORTING_DATA_SOURCE] = supporting_data_source + # to handle nested attributes, use the "attributes" property which supports TRAPI attributes as json strings + if supporting_studies: + edge_attributes["attributes"] = [json.dumps(study) for study in supporting_studies] + return edge_attributes diff --git a/requirements.txt b/requirements.txt index 736dc01a..519dcb40 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,18 +1,18 @@ -pandas==2.2.1 +pandas==2.2.2 requests==2.32.3 -pytest==8.1.1 -git+https://github.com/ObesityHub/robokop-genetics.git +pytest==8.2.0 +robokop-genetics==0.5.0 # intermine is on pypi but as of 6/23 it's broken for python 3.10+, this fork fixes the issue git+https://github.com/EvanDietzMorris/intermine-ws-python.git jsonlines==4.0.0 -pyyaml==6.0 -beautifulsoup4==4.11.1 +pyyaml==6.0.1 +beautifulsoup4==4.12.3 psycopg2-binary==2.9.9 -orjson==3.9.15 +orjson==3.10.3 xxhash==3.4.1 -mysql-connector-python==8.3.0 -neo4j==5.10.0 +mysql-connector-python==8.4.0 +neo4j==5.20.0 pyoxigraph==0.3.22 -curies==0.7.8 -prefixmaps==0.2.2 -bmt==1.2.1 +curies==0.7.9 +prefixmaps==0.2.4 +bmt==1.4.1 diff --git a/set_up_test_env.sh b/set_up_test_env.sh index 2cf5a754..1ef6edd3 100644 --- a/set_up_test_env.sh +++ b/set_up_test_env.sh @@ -26,8 +26,11 @@ export PYTHONPATH="$PYTHONPATH:$PWD" #The following environment variables are optional -export EDGE_NORMALIZATION_ENDPOINT=https://bl-lookup-sri.renci.org/ -export NODE_NORMALIZATION_ENDPOINT=https://nodenormalization-sri.renci.org/ -export NAME_RESOLVER_ENDPOINT=https://name-resolution-sri.renci.org/ -export ORION_OUTPUT_URL=https://localhost/ # this is currently only used to generate metadata -export BL_VERSION=4.1.6 +# export EDGE_NORMALIZATION_ENDPOINT=https://bl-lookup-sri.renci.org/ +# export NODE_NORMALIZATION_ENDPOINT=https://nodenormalization-sri.renci.org/ +# export NAME_RESOLVER_ENDPOINT=https://name-resolution-sri.renci.org/ +# export ORION_OUTPUT_URL=https://localhost/ # this is currently only used to generate metadata +# export BL_VERSION=4.2.1 + +# if you are building your own docker image and issues occur, setting the correct platform may help +# export DOCKER_PLATFORM=linux/arm64