Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collapsed qualifiers kg #268

Merged
merged 3 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions Common/build_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
from Common.biolink_constants import PRIMARY_KNOWLEDGE_SOURCE, AGGREGATOR_KNOWLEDGE_SOURCES, PREDICATE, PUBLICATIONS
from Common.meta_kg import MetaKnowledgeGraphBuilder, META_KG_FILENAME, TEST_DATA_FILENAME
from Common.redundant_kg import generate_redundant_kg
from Common.collapse_qualifiers import generate_collapsed_qualifiers_kg

NODES_FILENAME = 'nodes.jsonl'
EDGES_FILENAME = 'edges.jsonl'
REDUNDANT_EDGES_FILENAME = 'redundant_edges.jsonl'
COLLAPSED_QUALIFIERS_FILENAME = 'collapsed_qualifier_edges.jsonl'


class GraphBuilder:
Expand Down Expand Up @@ -115,6 +117,49 @@ def build_graph(self, graph_id: str):
output_formats = graph_spec.graph_output_format.lower().split('+') if graph_spec.graph_output_format else []
nodes_filepath = os.path.join(graph_output_dir, NODES_FILENAME)
edges_filepath = os.path.join(graph_output_dir, EDGES_FILENAME)

if 'redundant_jsonl' in output_formats:
self.logger.info(f'Generating redundant edge KG for {graph_id}...')
redundant_filepath = edges_filepath.replace(EDGES_FILENAME, REDUNDANT_EDGES_FILENAME)
generate_redundant_kg(edges_filepath, redundant_filepath)

if 'redundant_neo4j' in output_formats:
self.logger.info(f'Generating redundant edge KG for {graph_id}...')
redundant_filepath = edges_filepath.replace(EDGES_FILENAME, REDUNDANT_EDGES_FILENAME)
generate_redundant_kg(edges_filepath, redundant_filepath)
self.logger.info(f'Starting Neo4j dump pipeline for redundant {graph_id}...')
dump_success = create_neo4j_dump(nodes_filepath=nodes_filepath,
edges_filepath=redundant_filepath,
output_directory=graph_output_dir,
graph_id=graph_id,
graph_version=graph_version,
logger=self.logger)

if dump_success:
graph_output_url = self.get_graph_output_URL(graph_id, graph_version)
graph_metadata.set_dump_url(f'{graph_output_url}graph_{graph_version}_redundant.db.dump')

if 'collapsed_qualifiers_jsonl' in output_formats:
self.logger.info(f'Generating collapsed qualifier predicates KG for {graph_id}...')
collapsed_qualifiers_filepath = edges_filepath.replace(EDGES_FILENAME, COLLAPSED_QUALIFIERS_FILENAME)
generate_collapsed_qualifiers_kg(edges_filepath, collapsed_qualifiers_filepath)

if 'collapsed_qualifiers_neo4j' in output_formats:
self.logger.info(f'Generating collapsed qualifier predicates KG for {graph_id}...')
collapsed_qualifiers_filepath = edges_filepath.replace(EDGES_FILENAME, COLLAPSED_QUALIFIERS_FILENAME)
generate_collapsed_qualifiers_kg(edges_filepath, collapsed_qualifiers_filepath)
self.logger.info(f'Starting Neo4j dump pipeline for {graph_id} with collapsed qualifiers...')
dump_success = create_neo4j_dump(nodes_filepath=nodes_filepath,
edges_filepath=collapsed_qualifiers_filepath,
output_directory=graph_output_dir,
graph_id=graph_id,
graph_version=graph_version,
logger=self.logger)

if dump_success:
graph_output_url = self.get_graph_output_URL(graph_id, graph_version)
graph_metadata.set_dump_url(f'{graph_output_url}graph_{graph_version}_collapsed_qualifiers.db.dump')

if 'neo4j' in output_formats:
self.logger.info(f'Starting Neo4j dump pipeline for {graph_id}...')
dump_success = create_neo4j_dump(nodes_filepath=nodes_filepath,
Expand All @@ -128,11 +173,6 @@ def build_graph(self, graph_id: str):
graph_output_url = self.get_graph_output_URL(graph_id, graph_version)
graph_metadata.set_dump_url(f'{graph_output_url}graph_{graph_version}.db.dump')

if 'redundant_jsonl' in output_formats:
self.logger.info(f'Generating redundant edge KG for {graph_id}...')
redundant_filepath = edges_filepath.replace(EDGES_FILENAME, REDUNDANT_EDGES_FILENAME)
generate_redundant_kg(edges_filepath, redundant_filepath)

def build_dependencies(self, graph_spec: GraphSpec):
for subgraph_source in graph_spec.subgraphs:
subgraph_id = subgraph_source.id
Expand Down
92 changes: 92 additions & 0 deletions Common/collapse_qualifiers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
try:
from tqdm import tqdm
TQDM_AVAILABLE = True
except ImportError:
TQDM_AVAILABLE = False

from Common.biolink_constants import OBJECT_ASPECT_QUALIFIER, OBJECT_DIRECTION_QUALIFIER, SPECIES_CONTEXT_QUALIFIER, \
QUALIFIED_PREDICATE, PREDICATE
from Common.utils import quick_jsonl_file_iterator
from Common.kgx_file_writer import KGXFileWriter

### The goal of this script is to collapse the qualifiers, which are in edge properties, into a single statement, then replace the
### existing predicate label with the collapsed qualifier statement.


# TODO - really we should get the full list of qualifiers from Common/biolink_constants.py,
# but because we currently cannot deduce the association types of edges and/or permissible value enumerators,
# we have to hard code qualifier handling anyway, we might as well check against a smaller list
QUALIFIER_KEYS = [OBJECT_DIRECTION_QUALIFIER, OBJECT_ASPECT_QUALIFIER]
# we do have these qualifiers but we cant do any redundancy with them so ignore for now:
# QUALIFIED_PREDICATE -
# SPECIES_CONTEXT_QUALIFIER -
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments aren't as applicable to collapse_qualifiers as they were for the redundant graph. For the redundant graph we needed to be able to look up the ancestors of the value of the qualifier, but to do that you need to know the qualifier value enum_name, and I wasn't sure how to derive that given an edge from ORION, so we hard coded what we needed at the time (which is still not ideal). In this case, we only care about qualifiers and values which are actually on the edge, so we should be able to capture them all. We can do this dynamically easily using the biolink model toolkit.

Getting the full list of qualifiers from the constants file was a bad idea anyway, they shouldn't be hard coded if the biolink model toolkit can supply the current list. I'm not sure why I said that. :)


def write_edge_no_q(edge, predicate):
tmp_edge = edge.copy()
tmp_edge[PREDICATE] = f"{predicate}"
tmp_edge.pop(OBJECT_DIRECTION_QUALIFIER, None)
tmp_edge.pop(OBJECT_ASPECT_QUALIFIER, None)
tmp_edge.pop(QUALIFIED_PREDICATE, None)
return tmp_edge

#
def object_direction_qualifier_semantic_adjustment(object_direction_qualifier):
object_direction_conversion_map = {
'increased': 'increases',
'decreased': 'decreases',
'upregulated': 'upregulates',
'downregulated': 'downregulates',
}
try:
object_direction_conversion = object_direction_conversion_map[object_direction_qualifier]
except KeyError:
object_direction_conversion = object_direction_qualifier
return object_direction_conversion

def object_aspect_qualifier_semantic_adjustment(object_aspect_qualifier):
# TODO check if other object aspect qualifiers besides molecular interaction need to be treated differently.
if object_aspect_qualifier.split('_')[-1] == 'molecular_interaction':
object_aspect_conversion = object_aspect_qualifier + "_with"
else:
object_aspect_conversion = object_aspect_qualifier + "_of"
return object_aspect_conversion

def generate_collapsed_qualifiers_kg(infile, edges_file_path):

with KGXFileWriter(edges_output_file_path=edges_file_path) as kgx_file_writer:
for edge in tqdm(quick_jsonl_file_iterator(infile)) if TQDM_AVAILABLE else quick_jsonl_file_iterator(infile):

try:
edge_predicate = edge['predicate']
except KeyError:
print(f"Collapsed Qualifiers Graph Failed - missing predicate on edge: {edge}")
break

# qualifiers = check_qualifier(edge) <- it would be better to do something like this but because we're not
# handling other qualifiers anyway it's faster to just do the following:
qualifiers = [qualifier for qualifier in QUALIFIER_KEYS if qualifier in edge]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of using this hard coded list, we can do something like the following (except you would want to instantiate the toolkit outside of the loop):

from Common.biolink_utils import get_biolink_model_toolkit
bmt = get_biolink_model_toolkit()
qualifiers = {key:value for key, value in edge.items() if bmt.is_qualifier(key)}

Then you could do something like the following, with a function that does your semantic transformations where applicable
for qualifier, qualifier_value in qualifiers.items():
qualifier_statement += semantic_adjustment(qualifier, qualifier_value)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestions. I've added a new commit that handles all of qualifiers. There is still some hard-coded decisions involved with the current qualifiers, but it will print a warning if new qualifiers that aren't handled here are ever found in edges.jsonl files in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looping through the qualifiers once you determine them like I mentioned would be better because it's more efficient (it would only attempt to handle the qualifiers that are actually there instead of checking if every possible qualifier is on every edge), and cleaner/more maintainable, mostly because the current implementation violates the DRY principle.

It would also allow you check against less things per qualifier using if/elif and remove the counting part:
if qualifier_key == qualifier_type_x:
...
elif qualifier_key == qualifier_type_y:
...
else:
print(qualifier_key not supported)

All that being said, it looks like it should all work and won't break or affect other parts of ORION, so it's fine with me to merge in if you'd like.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be even better to do a lookup of functions or something like that:

semantic_adjustments = {
qualifier_key_1: some_semantic_adjustment_function,
qualifier_key_2: some_other_semantic_adjustment_function,
}

if qualifier_key in semantic_adjustments:
adjusted_qualifier = semantic_adjustments[qualifier_key](qualifier_key, qualifier_value)


qualifier_statement = ""

object_direction_qualifier_exists = False
# The following crafts a new collapsed qualifier statement to replace the edge predicate, but needs to do some semantic adjustment.
if OBJECT_DIRECTION_QUALIFIER in qualifiers:
object_direction_qualifier_exists = True
qualifier_statement+= object_direction_qualifier_semantic_adjustment(edge[OBJECT_DIRECTION_QUALIFIER])

if OBJECT_ASPECT_QUALIFIER in qualifiers:
if object_direction_qualifier_exists == True:
qualifier_statement+= "_"
else: # Currently, we'll just say "affects_something" if no direction is specified.
qualifier_statement+= "affects_"
qualifier_statement+= object_aspect_qualifier_semantic_adjustment(edge[OBJECT_ASPECT_QUALIFIER])

edges_to_write = []

# Either rewrite the original edge if no qualifier collapsing happened, or rewrite with new predicate from qualifier_statement.
if qualifier_statement != "":
edges_to_write.append(write_edge_no_q(edge, qualifier_statement))
else:
edges_to_write.append(edge)

kgx_file_writer.write_normalized_edges(edges_to_write)
Loading