From 7257113e0ae28e3c5de527e45e7b88d848fac1ae Mon Sep 17 00:00:00 2001 From: Evan Morris Date: Fri, 22 Mar 2024 18:01:49 -0400 Subject: [PATCH] sources refactor - primary and aggregator knowledge sources no longer have biolink prefix in the graphs, implements ability to specify multiple chains of aggregators, aggregator lists are expected to be ordered now --- PLATER/services/util/question.py | 85 ++++++++++++++++++-------------- PLATER/tests/data/trapi1.4.json | 16 +++--- PLATER/tests/test_question.py | 37 +++++++------- 3 files changed, 76 insertions(+), 62 deletions(-) diff --git a/PLATER/services/util/question.py b/PLATER/services/util/question.py index 563f97c..b05701f 100644 --- a/PLATER/services/util/question.py +++ b/PLATER/services/util/question.py @@ -61,44 +61,56 @@ def compile_cypher(self, **kwargs): item['qualifier_type_id'] = item['qualifier_type_id'].replace('biolink:', '') return get_query(query_graph, **kwargs) - + # This function takes 'sources' results from the transpiler, converts lists of aggregator sources into the proper + # TRAPI dictionaries, and assigns the proper upstream ids to each resource. It does not currently attempt to avoid + # duplicate aggregator results, which probably shouldn't ever occur. def _construct_sources_tree(self, sources): - # if primary source and aggregator source are specified in the graph, upstream_resource_ids of all aggregator_ks - # be that source - - # if aggregator ks are coming from db, plater would add itself as aggregator and use other aggregator ids - # as upstream resources, if no aggregators are found and only primary ks is provided that would be added - # as upstream for the plater entry - formatted_sources = [] - # filter out source entries that actually have values - temp = {} + # first find the primary knowledge source, there should always be one + primary_knowledge_source = None + formatted_sources = None for source in sources: - if not source['resource_id']: - continue - temp[source['resource_role']] = temp.get(source['resource_role'], set()) - if isinstance(source["resource_id"], str): - temp[source["resource_role"]].add(source["resource_id"]) - elif isinstance(source["resource_id"], list): - for resource_id in source["resource_id"]: - temp[source["resource_role"]].add(resource_id) + if source['resource_role'] == "primary_knowledge_source": + primary_knowledge_source = source['resource_id'] + # add it to the formatted TRAPI output + formatted_sources = [{ + "resource_id": primary_knowledge_source, + "resource_role": "primary_knowledge_source" + }] + if not primary_knowledge_source: + # we could hard fail here, every edge should have a primary ks, but I haven't fixed all the tests yet + # raise KeyError(f'primary_knowledge_source missing from sources section of cypher results! ' + # f'sources: {sources}') + return [] - for resource_role in temp: - upstreams = None - if resource_role == "biolink:aggregator_knowledge_source": - upstreams = temp.get("biolink:primary_knowledge_source", None) - - formatted_sources += [ - {"resource_id": resource_id, "resource_role": resource_role.lstrip('biolink:'), "upstream_resource_ids": upstreams} - for resource_id in temp[resource_role] - ] - upstreams_for_plater_entry = temp.get("biolink:aggregator_knowledge_source") or temp.get("biolink:primary_knowledge_source") + # then find any aggregator lists + aggregator_list_sources = [] + for source in sources: + # this looks weird but the idea is that you could have a few parallel lists like: + # aggregator_knowledge_source, aggregator_knowledge_source_2, aggregator_knowledge_source_3 + if source['resource_role'].startswith("aggregator_knowledge_source"): + aggregator_list_sources.append(source) + # walk through the aggregator lists and construct the chains of provenance + terminal_aggregators = set() + for source in aggregator_list_sources: + # each aggregator list should be in order, so we can deduce the upstream chains + last_aggregator = None + for aggregator_knowledge_source in source['resource_id']: + formatted_sources.append({ + "resource_id": aggregator_knowledge_source, + "resource_role": "aggregator_knowledge_source", + "upstream_resource_ids": [last_aggregator] if last_aggregator else [primary_knowledge_source] + }) + last_aggregator = aggregator_knowledge_source + # store the last aggregator in the list, because this will be an upstream source for the plater one + terminal_aggregators.add(last_aggregator) + # add the automat infores as an aggregator, + # it will have as upstream either the primary ks or all of the furthest downstream aggregators if they exist formatted_sources.append({ - "resource_id":self.provenance, + "resource_id": self.provenance, "resource_role": "aggregator_knowledge_source", - "upstream_resource_ids": upstreams_for_plater_entry + "upstream_resource_ids": list(terminal_aggregators) if terminal_aggregators else [primary_knowledge_source] }) - return formatted_sources - + return list(formatted_sources) def format_attribute_trapi(self, kg_items, node=False): for identifier in kg_items: @@ -152,15 +164,14 @@ def format_attribute_trapi(self, kg_items, node=False): return kg_items - def transform_attributes(self, trapi_message, graph_interface: GraphInterface): + def transform_attributes(self, trapi_message): self.format_attribute_trapi(trapi_message.get('knowledge_graph', {}).get('nodes', {}), node=True) self.format_attribute_trapi(trapi_message.get('knowledge_graph', {}).get('edges', {})) for r in trapi_message.get("results", []): for node_binding_list in r["node_bindings"].values(): for node_binding in node_binding_list: - query_id = node_binding.pop('query_id', None) - if query_id != node_binding['id'] and query_id is not None: - node_binding['query_id'] = query_id + if node_binding['query_id'] is None: + del node_binding['query_id'] # add resource id for analyses in r["analyses"]: analyses["resource_id"] = self.provenance @@ -197,7 +208,7 @@ async def answer(self, graph_interface: GraphInterface): } ) results_dict = graph_interface.convert_to_dict(results) - self._question_json.update(self.transform_attributes(results_dict[0], graph_interface)) + self._question_json.update(self.transform_attributes(results_dict[0])) self._question_json = Question.apply_attribute_constraints(self._question_json) return self._question_json diff --git a/PLATER/tests/data/trapi1.4.json b/PLATER/tests/data/trapi1.4.json index c7db35a..97d0291 100644 --- a/PLATER/tests/data/trapi1.4.json +++ b/PLATER/tests/data/trapi1.4.json @@ -206,15 +206,15 @@ "predicate": "biolink:causes", "sources": [ { - "resource_role": "biolink:aggregator_knowledge_source", + "resource_role": "aggregator_knowledge_source", "resource_id": "infores:hetio" }, { - "resource_role": "biolink:primary_knowledge_source", + "resource_role": "primary_knowledge_source", "resource_id": "infores:sider" }, { - "resource_role": "biolink:aggregator_knowledge_source", + "resource_role": "aggregator_knowledge_source", "resource_id": "infores:automat-robokop" } ], @@ -233,13 +233,13 @@ "predicate": "biolink:causes", "sources": [ { - "resource_role": "biolink:aggregator_knowledge_source", + "resource_role": "aggregator_knowledge_source", "resource_id": "infores:hetio" },{ - "resource_role": "biolink:primary_knowledge_source", + "resource_role": "primary_knowledge_source", "resource_id": "infores:original_source_1" },{ - "resource_role": "biolink:aggregator_knowledge_source", + "resource_role": "aggregator_knowledge_source", "resource_id": "infores:automat-robokop" } ], @@ -262,10 +262,10 @@ "predicate": "biolink:contributes_to", "sources": [ { - "resource_role": "biolink:primary_knowledge_source", + "resource_role": "primary_knowledge_source", "resource_id": "ctd" },{ - "resource_role": "biolink:aggregator_knowledge_source", + "resource_role": "aggregator_knowledge_source", "resource_id": "infores:automat-robokop" } ], diff --git a/PLATER/tests/test_question.py b/PLATER/tests/test_question.py index d35df4b..6a3cc93 100644 --- a/PLATER/tests/test_question.py +++ b/PLATER/tests/test_question.py @@ -39,7 +39,7 @@ def test_format_attribute(): {"123123": { "attributes": [{"original_attribute_name": "some_attr", "value": "some_value"}], - "sources": [{"resource_role": "biolink:primary_knowledge_source", "resource_id":"infores:primary"}] + "sources": [{"resource_role": "primary_knowledge_source", "resource_id": "infores:primary"}] } } } @@ -55,21 +55,18 @@ def test_format_attribute(): "attribute_type_id": "biolink:Attribute", "value_type_id": "EDAM:data_0006"}, ], - "sources": [ - {"resource_role": "primary_knowledge_source", - "resource_id": "infores:primary", - "upstream_resource_ids": None}, - {"resource_role": "aggregator_knowledge_source", - "resource_id": "infores:automat.notspecified", - "upstream_resource_ids": {"infores:primary"}}, + {"resource_id": "infores:primary", + "resource_role": "primary_knowledge_source",}, + {"resource_id": "infores:automat.notspecified", + "resource_role": "aggregator_knowledge_source", + "upstream_resource_ids": ["infores:primary"]}, ]} } } } q = Question(question_json={}) - graph_interface = MOCK_GRAPH_ADAPTER() - transformed = q.transform_attributes(trapi_kg_response, graph_interface=MOCK_GRAPH_ADAPTER) + transformed = q.transform_attributes(trapi_kg_response) # test attribute_id if provided from neo4j response is preserved # test if value_type is added to default 'biolink:Attribute' @@ -96,7 +93,7 @@ def test_format_attribute(): q = Question(question_json={}) - transformed = q.transform_attributes(t2_trapi_kg_response, graph_interface=MOCK_GRAPH_ADAPTER) + transformed = q.transform_attributes(t2_trapi_kg_response) # test default attribute to be EDAM:data_0006 # test if value_type is preserved if in response from neo4j @@ -112,6 +109,10 @@ def test_format_edge_qualifiers(): "object": "NCBIGene:283871", "predicate": "biolink:affects", "subject": "PUBCHEM.COMPOUND:5311062", + "sources": [ + {"resource_role": "primary_knowledge_source", + "resource_id": "infores:primary"} + ], "attributes": [ { "attribute_type_id":"NA", @@ -136,10 +137,13 @@ def test_format_edge_qualifiers(): 'predicate': 'biolink:affects', 'subject': 'PUBCHEM.COMPOUND:5311062', 'attributes': [], - 'sources': [{'resource_id': 'infores:automat.notspecified', - 'resource_role': 'aggregator_knowledge_source', - 'upstream_resource_ids': None - }], + 'sources': [ + {"resource_role": "primary_knowledge_source", + "resource_id": "infores:primary"}, + {"resource_role": "aggregator_knowledge_source", + "resource_id": "infores:automat.notspecified", + "upstream_resource_ids": ["infores:primary"]}, + ], "qualifiers": [ { "qualifier_type_id": "biolink:qualified_predicate", @@ -158,8 +162,7 @@ def test_format_edge_qualifiers(): }} q = Question(question_json={}) - graph_interface = MOCK_GRAPH_ADAPTER() - transformed = q.transform_attributes(trapi_kg_response, graph_interface=MOCK_GRAPH_ADAPTER) + transformed = q.transform_attributes(trapi_kg_response) # test attribute_id if provided from neo4j response is preserved # test if value_type is added to default "biolink:Attribute"