Skip to content

Commit

Permalink
sources refactor - primary and aggregator knowledge sources no longer…
Browse files Browse the repository at this point in the history
… have biolink prefix in the graphs, implements ability to specify multiple chains of aggregators, aggregator lists are expected to be ordered now
  • Loading branch information
EvanDietzMorris committed Mar 22, 2024
1 parent ba4900c commit 7257113
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 62 deletions.
85 changes: 48 additions & 37 deletions PLATER/services/util/question.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,44 +61,56 @@ def compile_cypher(self, **kwargs):
item['qualifier_type_id'] = item['qualifier_type_id'].replace('biolink:', '')
return get_query(query_graph, **kwargs)


# This function takes 'sources' results from the transpiler, converts lists of aggregator sources into the proper
# TRAPI dictionaries, and assigns the proper upstream ids to each resource. It does not currently attempt to avoid
# duplicate aggregator results, which probably shouldn't ever occur.
def _construct_sources_tree(self, sources):
# if primary source and aggregator source are specified in the graph, upstream_resource_ids of all aggregator_ks
# be that source

# if aggregator ks are coming from db, plater would add itself as aggregator and use other aggregator ids
# as upstream resources, if no aggregators are found and only primary ks is provided that would be added
# as upstream for the plater entry
formatted_sources = []
# filter out source entries that actually have values
temp = {}
# first find the primary knowledge source, there should always be one
primary_knowledge_source = None
formatted_sources = None
for source in sources:
if not source['resource_id']:
continue
temp[source['resource_role']] = temp.get(source['resource_role'], set())
if isinstance(source["resource_id"], str):
temp[source["resource_role"]].add(source["resource_id"])
elif isinstance(source["resource_id"], list):
for resource_id in source["resource_id"]:
temp[source["resource_role"]].add(resource_id)
if source['resource_role'] == "primary_knowledge_source":
primary_knowledge_source = source['resource_id']
# add it to the formatted TRAPI output
formatted_sources = [{
"resource_id": primary_knowledge_source,
"resource_role": "primary_knowledge_source"
}]
if not primary_knowledge_source:
# we could hard fail here, every edge should have a primary ks, but I haven't fixed all the tests yet
# raise KeyError(f'primary_knowledge_source missing from sources section of cypher results! '
# f'sources: {sources}')
return []

for resource_role in temp:
upstreams = None
if resource_role == "biolink:aggregator_knowledge_source":
upstreams = temp.get("biolink:primary_knowledge_source", None)

formatted_sources += [
{"resource_id": resource_id, "resource_role": resource_role.lstrip('biolink:'), "upstream_resource_ids": upstreams}
for resource_id in temp[resource_role]
]
upstreams_for_plater_entry = temp.get("biolink:aggregator_knowledge_source") or temp.get("biolink:primary_knowledge_source")
# then find any aggregator lists
aggregator_list_sources = []
for source in sources:
# this looks weird but the idea is that you could have a few parallel lists like:
# aggregator_knowledge_source, aggregator_knowledge_source_2, aggregator_knowledge_source_3
if source['resource_role'].startswith("aggregator_knowledge_source"):
aggregator_list_sources.append(source)
# walk through the aggregator lists and construct the chains of provenance
terminal_aggregators = set()
for source in aggregator_list_sources:
# each aggregator list should be in order, so we can deduce the upstream chains
last_aggregator = None
for aggregator_knowledge_source in source['resource_id']:
formatted_sources.append({
"resource_id": aggregator_knowledge_source,
"resource_role": "aggregator_knowledge_source",
"upstream_resource_ids": [last_aggregator] if last_aggregator else [primary_knowledge_source]
})
last_aggregator = aggregator_knowledge_source
# store the last aggregator in the list, because this will be an upstream source for the plater one
terminal_aggregators.add(last_aggregator)
# add the automat infores as an aggregator,
# it will have as upstream either the primary ks or all of the furthest downstream aggregators if they exist
formatted_sources.append({
"resource_id":self.provenance,
"resource_id": self.provenance,
"resource_role": "aggregator_knowledge_source",
"upstream_resource_ids": upstreams_for_plater_entry
"upstream_resource_ids": list(terminal_aggregators) if terminal_aggregators else [primary_knowledge_source]
})
return formatted_sources

return list(formatted_sources)

def format_attribute_trapi(self, kg_items, node=False):
for identifier in kg_items:
Expand Down Expand Up @@ -152,15 +164,14 @@ def format_attribute_trapi(self, kg_items, node=False):

return kg_items

def transform_attributes(self, trapi_message, graph_interface: GraphInterface):
def transform_attributes(self, trapi_message):
self.format_attribute_trapi(trapi_message.get('knowledge_graph', {}).get('nodes', {}), node=True)
self.format_attribute_trapi(trapi_message.get('knowledge_graph', {}).get('edges', {}))
for r in trapi_message.get("results", []):
for node_binding_list in r["node_bindings"].values():
for node_binding in node_binding_list:
query_id = node_binding.pop('query_id', None)
if query_id != node_binding['id'] and query_id is not None:
node_binding['query_id'] = query_id
if node_binding['query_id'] is None:
del node_binding['query_id']
# add resource id
for analyses in r["analyses"]:
analyses["resource_id"] = self.provenance
Expand Down Expand Up @@ -197,7 +208,7 @@ async def answer(self, graph_interface: GraphInterface):
}
)
results_dict = graph_interface.convert_to_dict(results)
self._question_json.update(self.transform_attributes(results_dict[0], graph_interface))
self._question_json.update(self.transform_attributes(results_dict[0]))
self._question_json = Question.apply_attribute_constraints(self._question_json)
return self._question_json

Expand Down
16 changes: 8 additions & 8 deletions PLATER/tests/data/trapi1.4.json
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,15 @@
"predicate": "biolink:causes",
"sources": [
{
"resource_role": "biolink:aggregator_knowledge_source",
"resource_role": "aggregator_knowledge_source",
"resource_id": "infores:hetio"
},
{
"resource_role": "biolink:primary_knowledge_source",
"resource_role": "primary_knowledge_source",
"resource_id": "infores:sider"
},
{
"resource_role": "biolink:aggregator_knowledge_source",
"resource_role": "aggregator_knowledge_source",
"resource_id": "infores:automat-robokop"
}
],
Expand All @@ -233,13 +233,13 @@
"predicate": "biolink:causes",
"sources": [
{
"resource_role": "biolink:aggregator_knowledge_source",
"resource_role": "aggregator_knowledge_source",
"resource_id": "infores:hetio"
},{
"resource_role": "biolink:primary_knowledge_source",
"resource_role": "primary_knowledge_source",
"resource_id": "infores:original_source_1"
},{
"resource_role": "biolink:aggregator_knowledge_source",
"resource_role": "aggregator_knowledge_source",
"resource_id": "infores:automat-robokop"
}
],
Expand All @@ -262,10 +262,10 @@
"predicate": "biolink:contributes_to",
"sources": [
{
"resource_role": "biolink:primary_knowledge_source",
"resource_role": "primary_knowledge_source",
"resource_id": "ctd"
},{
"resource_role": "biolink:aggregator_knowledge_source",
"resource_role": "aggregator_knowledge_source",
"resource_id": "infores:automat-robokop"
}
],
Expand Down
37 changes: 20 additions & 17 deletions PLATER/tests/test_question.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_format_attribute():
{"123123":
{
"attributes": [{"original_attribute_name": "some_attr", "value": "some_value"}],
"sources": [{"resource_role": "biolink:primary_knowledge_source", "resource_id":"infores:primary"}]
"sources": [{"resource_role": "primary_knowledge_source", "resource_id": "infores:primary"}]
}
}
}
Expand All @@ -55,21 +55,18 @@ def test_format_attribute():
"attribute_type_id": "biolink:Attribute",
"value_type_id": "EDAM:data_0006"},
],

"sources": [
{"resource_role": "primary_knowledge_source",
"resource_id": "infores:primary",
"upstream_resource_ids": None},
{"resource_role": "aggregator_knowledge_source",
"resource_id": "infores:automat.notspecified",
"upstream_resource_ids": {"infores:primary"}},
{"resource_id": "infores:primary",
"resource_role": "primary_knowledge_source",},
{"resource_id": "infores:automat.notspecified",
"resource_role": "aggregator_knowledge_source",
"upstream_resource_ids": ["infores:primary"]},
]}
}
}
}
q = Question(question_json={})
graph_interface = MOCK_GRAPH_ADAPTER()
transformed = q.transform_attributes(trapi_kg_response, graph_interface=MOCK_GRAPH_ADAPTER)
transformed = q.transform_attributes(trapi_kg_response)

# test attribute_id if provided from neo4j response is preserved
# test if value_type is added to default 'biolink:Attribute'
Expand All @@ -96,7 +93,7 @@ def test_format_attribute():

q = Question(question_json={})

transformed = q.transform_attributes(t2_trapi_kg_response, graph_interface=MOCK_GRAPH_ADAPTER)
transformed = q.transform_attributes(t2_trapi_kg_response)

# test default attribute to be EDAM:data_0006
# test if value_type is preserved if in response from neo4j
Expand All @@ -112,6 +109,10 @@ def test_format_edge_qualifiers():
"object": "NCBIGene:283871",
"predicate": "biolink:affects",
"subject": "PUBCHEM.COMPOUND:5311062",
"sources": [
{"resource_role": "primary_knowledge_source",
"resource_id": "infores:primary"}
],
"attributes": [
{
"attribute_type_id":"NA",
Expand All @@ -136,10 +137,13 @@ def test_format_edge_qualifiers():
'predicate': 'biolink:affects',
'subject': 'PUBCHEM.COMPOUND:5311062',
'attributes': [],
'sources': [{'resource_id': 'infores:automat.notspecified',
'resource_role': 'aggregator_knowledge_source',
'upstream_resource_ids': None
}],
'sources': [
{"resource_role": "primary_knowledge_source",
"resource_id": "infores:primary"},
{"resource_role": "aggregator_knowledge_source",
"resource_id": "infores:automat.notspecified",
"upstream_resource_ids": ["infores:primary"]},
],
"qualifiers": [
{
"qualifier_type_id": "biolink:qualified_predicate",
Expand All @@ -158,8 +162,7 @@ def test_format_edge_qualifiers():
}}

q = Question(question_json={})
graph_interface = MOCK_GRAPH_ADAPTER()
transformed = q.transform_attributes(trapi_kg_response, graph_interface=MOCK_GRAPH_ADAPTER)
transformed = q.transform_attributes(trapi_kg_response)

# test attribute_id if provided from neo4j response is preserved
# test if value_type is added to default "biolink:Attribute"
Expand Down

0 comments on commit 7257113

Please sign in to comment.