diff --git a/DAG/kgx-export-parallel.py b/DAG/kgx-export-parallel.py index fa60f21..f8103cd 100644 --- a/DAG/kgx-export-parallel.py +++ b/DAG/kgx-export-parallel.py @@ -1,34 +1,74 @@ import os +import json import gzip +from airflow.operators.python_operator import PythonOperator from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta from airflow import models -from airflow.contrib.operators import kubernetes_pod_operator +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( + KubernetesPodOperator, +) +from kubernetes.client import models as k8s_models MYSQL_DATABASE_PASSWORD=os.environ.get('MYSQL_DATABASE_PASSWORD') MYSQL_DATABASE_USER=os.environ.get('MYSQL_DATABASE_USER') MYSQL_DATABASE_INSTANCE=os.environ.get('MYSQL_DATABASE_INSTANCE') PR_BUCKET = os.environ.get('PR_BUCKET') UNI_BUCKET = os.environ.get('UNI_BUCKET') -START_DATE=datetime(2023, 2, 23, 0, 0) +TMP_BUCKET = os.environ.get('TMP_BUCKET') +START_DATE=datetime(2023, 8, 7, 0, 0) CHUNK_SIZE = '1000' EVIDENCE_LIMIT = '5' -STEP_SIZE = 80000 +STEP_SIZE = 75000 default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': START_DATE, + 'schedule_interval': '0 23 * * *', 'email': ['edgargaticaCU@gmail.com'], - 'email_on_failure': False, - 'email_on_retry': False, + 'email_on_failure': True, + 'email_on_retry': True, 'retries': 0 } +def output_operations(**kwargs): + operations_dict = {} + with open(kwargs['edges_filename'], 'r') as infile: + for line in infile: + columns = line.split('\t') + if len(columns) < 13: + print('not enough columns') + print(line) + continue + subject_namespace = columns[0].split(':')[0] + object_namespace = columns[2].split(':')[0] + predicate = 'no_predicate' + if columns[1] == 'biolink:treats': + predicate = 'treats' + elif columns[1] == 'biolink:contributes_to': + predicate = 'contributes_to' + elif columns[1] == 'biolink:affects': + if columns[3] == 'biolink:causes': + if columns[8] == 'activity_or_abundance' and columns[9] == 'increased': + predicate = 'positively_regulates' + elif columns[8] == 'activity_or_abundance' and columns[9] == 'decreased': + predicate = 'negatively_regulates' + elif columns[3] == 'biolink:contributes_to': + if columns[7] == 'gain_of_function_variant_form': + predicate = 'gain_of_function_contributes_to' + elif columns[7] == 'loss_of_function_variant_form': + predicate = 'loss_of_function_contributes_to' + key = subject_namespace + '_' + predicate + '_' + object_namespace + if key in operations_dict: + operations_dict[key] += 1 + else: + operations_dict[key] = 1 + with open(kwargs['output_filename'], 'w') as outfile: + x = outfile.write(json.dumps(operations_dict)) -with models.DAG(dag_id='targeted-parallel', default_args=default_args, - schedule_interval=timedelta(days=1), start_date=START_DATE, catchup=False) as dag: +with models.DAG(dag_id='targeted-parallel', default_args=default_args, catchup=True) as dag: filename_list = [] export_task_list = [] # This creates as many pods as needed to export all assertion records in groups of STEP_SIZE, which are then run in @@ -38,13 +78,15 @@ # rather than 15 longer tasks that run all at once. # TODO: the upper limit on the range needs to be the total number of assertion records for i in range(0, 2400000, STEP_SIZE): - filename_list.append(f'gs://{UNI_BUCKET}/kgx/UniProt/edges_{i}_{i + STEP_SIZE}.tsv') - export_task_list.append(kubernetes_pod_operator.KubernetesPodOperator( + filename_list.append(f'edges_{i}_{i + STEP_SIZE}.tsv') + export_task_list.append(KubernetesPodOperator( task_id=f'targeted-edges-{i}', name=f'parallel-{i}', - namespace='default', + config_file="/home/airflow/composer_kube_config", + namespace='composer-user-workloads', image_pull_policy='Always', - arguments=['-t', 'edges', '-uni', UNI_BUCKET, + startup_timeout_seconds=1200, + arguments=['-t', 'edges', '-uni', TMP_BUCKET, '--chunk_size', CHUNK_SIZE, '--limit', EVIDENCE_LIMIT, '--assertion_offset', f'{i}', '--assertion_limit', f'{STEP_SIZE}'], env_vars={ @@ -52,32 +94,51 @@ 'MYSQL_DATABASE_USER': MYSQL_DATABASE_USER, 'MYSQL_DATABASE_INSTANCE': MYSQL_DATABASE_INSTANCE, }, + container_resources=k8s_models.V1ResourceRequirements( + limits={"memory": "1G", "cpu": "1000m"}, + ), + retries=1, image='gcr.io/translator-text-workflow-dev/kgx-export-parallel:latest' )) - export_nodes = kubernetes_pod_operator.KubernetesPodOperator( + export_nodes = KubernetesPodOperator( task_id='targeted-nodes', name='nodes', - namespace='default', + config_file="/home/airflow/composer_kube_config", + namespace='composer-user-workloads', image_pull_policy='Always', - arguments=['-t', 'nodes', '-uni', UNI_BUCKET], + arguments=['-t', 'nodes', '-uni', TMP_BUCKET], env_vars={ 'MYSQL_DATABASE_PASSWORD': MYSQL_DATABASE_PASSWORD, 'MYSQL_DATABASE_USER': MYSQL_DATABASE_USER, 'MYSQL_DATABASE_INSTANCE': MYSQL_DATABASE_INSTANCE, }, image='gcr.io/translator-text-workflow-dev/kgx-export-parallel:latest') - generate_metadata = kubernetes_pod_operator.KubernetesPodOperator( + generate_metadata = KubernetesPodOperator( task_id='targeted-metadata', name='targeted-metadata', - namespace='default', + config_file="/home/airflow/composer_kube_config", + namespace='composer-user-workloads', image_pull_policy='Always', - arguments=['-t', 'metadata', '-uni', UNI_BUCKET], + arguments=['-t', 'metadata', '-uni', TMP_BUCKET], image='gcr.io/translator-text-workflow-dev/kgx-export-parallel:latest') + generate_bte_operations = PythonOperator( + task_id='generate_bte_operations', + python_callable=output_operations, + provide_context=True, + op_kwargs={'edges_filename': '/home/airflow/gcs/data/kgx-export/edges.tsv', + 'output_filename': '/home/airflow/gcs/data/kgx-export/operations.json'}, + dag=dag) combine_files = BashOperator( task_id='targeted-compose', - bash_command=f"gsutil compose {' '.join(filename_list)} gs://{UNI_BUCKET}/kgx/UniProt/edges.tsv") + bash_command=f"cd /home/airflow/gcs/data/kgx-export/ && cat {' '.join(filename_list)} > edges.tsv") + compress_edge_file = BashOperator( + task_id='targeted-compress', + bash_command=f"cd /home/airflow/gcs/data/kgx-export/ && gzip -f edges.tsv") cleanup_files = BashOperator( task_id='targeted-cleanup', - bash_command=f"gsutil rm {' '.join(filename_list)} gs://{UNI_BUCKET}/kgx/UniProt/edges.tsv") + bash_command=f"cd /home/airflow/gcs/data/kgx-export/ && rm {' '.join(filename_list)}") + publish_files = BashOperator( + task_id='targeted-publish', + bash_command=f"gsutil cp gs://{TMP_BUCKET}/data/kgx-export/* gs://{UNI_BUCKET}/kgx/UniProt/") - export_nodes >> export_task_list >> combine_files >> generate_metadata >> cleanup_files + export_nodes >> export_task_list >> combine_files >> generate_bte_operations >> compress_edge_file >> cleanup_files >> generate_metadata >> publish_files diff --git a/exporter.py b/exporter.py index 589769c..ed9ab9f 100644 --- a/exporter.py +++ b/exporter.py @@ -14,17 +14,24 @@ GCP_BLOB_PREFIX = 'kgx/UniProt/' def export_metadata(bucket): - services.get_from_gcp(bucket, GCP_BLOB_PREFIX + 'edges.tsv', 'edges.tsv') + """ + Generate a metadata file from previously created KGX export files + + :param bucket: the GCP storage bucket containing the KGX files + """ + services.get_from_gcp(bucket, GCP_BLOB_PREFIX + 'edges.tsv.gz', 'edges.tsv.gz') services.get_from_gcp(bucket, GCP_BLOB_PREFIX + 'nodes.tsv.gz', 'nodes.tsv.gz') - services.decompress('nodes.tsv.gz', 'nodes.tsv') - services.generate_metadata('edges.tsv', 'nodes.tsv', 'KGE') - services.compress('edges.tsv', 'edges.tsv.gz') - services.upload_to_gcp(bucket, 'edges.tsv.gz', GCP_BLOB_PREFIX + 'edges.tsv.gz') + services.generate_metadata('edges.tsv.gz', 'nodes.tsv.gz', 'KGE') services.upload_to_gcp(bucket, 'KGE/content_metadata.json', GCP_BLOB_PREFIX + 'content_metadata.json') - services.upload_to_gcp(bucket, 'targeted_assertions.tar.gz', GCP_BLOB_PREFIX + 'targeted_assertions.tar.gz') def get_valid_nodes(bucket) -> set[str]: + """ + Retrieve the set of nodes used by a KGX nodes file + + :param bucket: the GCP storage bucket containing the KGX file + :returns a set of node curies + """ services.get_from_gcp(bucket, GCP_BLOB_PREFIX + 'nodes.tsv.gz', 'nodes.tsv.gz') node_set = set([]) with gzip.open('nodes.tsv.gz', 'rb') as infile: @@ -57,7 +64,7 @@ def get_conn() -> pymysql.connections.Connection: logging.info('Starting Main') parser = argparse.ArgumentParser() parser.add_argument('-t', '--target', help='the export target: edges, nodes, or metadata', required=True) - parser.add_argument('-uni', '--uniprot_bucket', help='storage bucket for UniProt data', required=True) + parser.add_argument('-uni', '--uniprot_bucket', help='storage bucket for UniProt data', required=True) # TODO: Replace with -b --bucket parser.add_argument('-i', '--instance', help='GCP DB instance name') parser.add_argument('-d', '--database', help='database name') parser.add_argument('-u', '--user', help='database username') @@ -74,7 +81,7 @@ def get_conn() -> pymysql.connections.Connection: if args.verbose: logging.getLogger().setLevel(logging.DEBUG) - if args.target == 'metadata': + if args.target == 'metadata': # if we are just exporting metadata a database connection is not necessary export_metadata(uniprot_bucket) else: session_maker = init_db( diff --git a/ops.py b/ops.py new file mode 100644 index 0000000..96da1e3 --- /dev/null +++ b/ops.py @@ -0,0 +1,34 @@ +import json +operations_dict = {} +with open('edges.tsv','r') as infile: + for line in infile: + columns = line.split('\t') + if len(columns) < 13: + print('not enough columns') + print(line) + continue + subject_namespace = columns[0].split(':')[0] + object_namespace = columns[2].split(':')[0] + predicate = 'no_predicate' + if columns[1] == 'biolink:treats': + predicate = 'treats' + elif columns[1] == 'biolink:contributes_to': + predicate = 'contributes_to' + elif columns[1] == 'biolink:affects': + if columns[3] == 'biolink:causes': + if columns[8] == 'activity_or_abundance' and columns[9] == 'increased': + predicate = 'positively_regulates' + elif columns[8] == 'activity_or_abundance' and columns[9] == 'decreased': + predicate = 'negatively_regulates' + elif columns[3] == 'biolink:contributes_to': + if columns[7] == 'gain_of_function_variant_form': + predicate = 'gain_of_function_contributes_to' + elif columns[7] == 'loss_of_function_variant_form': + predicate = 'loss_of_function_contributes_to' + key = subject_namespace + '_' + predicate + '_' + object_namespace + if key in operations_dict: + operations_dict[key] += 1 + else: + operations_dict[key] = 1 +with open('operations.json','w') as outfile: + x = outfile.write(json.dumps(operations_dict)) \ No newline at end of file diff --git a/services.py b/services.py index 08fec0e..5864a1a 100644 --- a/services.py +++ b/services.py @@ -253,8 +253,15 @@ def get_score(row): def get_assertion_json(rows): - semmed_count = sum([row['semmed_flag'] for row in rows]) + # semmed_count = sum([row['semmed_flag'] for row in rows]) row1 = rows[0] + supporting_publications = [] + for row in rows: + document_id = row['document_id'] + if document_id.startswith('PMC') and ':' not in document_id: + supporting_publications.append(document_id.replace('PMC', 'PMC:')) + else: + supporting_publications.append(document_id) attributes_list = [ { "attribute_type_id": "biolink:primary_knowledge_source", @@ -269,37 +276,40 @@ def get_assertion_json(rows): "attribute_source": "infores:text-mining-provider-targeted" }, { - "attribute_type_id": "biolink:has_evidence_count", + "attribute_type_id": "biolink:evidence_count", "value": row1['evidence_count'], "value_type_id": "biolink:EvidenceCount", "attribute_source": "infores:text-mining-provider-targeted" }, { - "attribute_type_id": "biolink:tmkp_confidence_score", + "attribute_type_id": "biolink:extraction_confidence_score", "value": get_aggregate_score(rows), "value_type_id": "biolink:ConfidenceLevel", "attribute_source": "infores:text-mining-provider-targeted" }, { - "attribute_type_id": "biolink:supporting_document", - "value": '|'.join([row['document_id'] for row in rows]), - "value_type_id": "biolink:Publication", + "attribute_type_id": "biolink:publications", + "value": supporting_publications, + "value_type_id": "biolink:Uriorcurie", "attribute_source": "infores:pubmed" } ] - if semmed_count > 0: - attributes_list.append({ - "attribute_type_id": "biolink:semmed_agreement_count", - "value": semmed_count, - "value_type_id": "SIO:000794", - "attribute_source": "infores:text-mining-provider-targeted" - }) + # if semmed_count > 0: + # attributes_list.append({ + # "attribute_type_id": "biolink:semmed_agreement_count", + # "value": semmed_count, + # "value_type_id": "SIO:000794", + # "attribute_source": "infores:text-mining-provider-targeted" + # }) for row in rows: attributes_list.append(get_evidence_json(row)) return json.dumps(attributes_list) def get_evidence_json(row): + document_id = row['document_id'] + if document_id.startswith('PMC') and ':' not in document_id: + document_id = document_id.replace('PMC', 'PMC:') nested_attributes = [ { "attribute_type_id": "biolink:supporting_text", @@ -308,9 +318,9 @@ def get_evidence_json(row): "attribute_source": "infores:text-mining-provider-targeted" }, { - "attribute_type_id": "biolink:supporting_document", - "value": row['document_id'], - "value_type_id": "biolink:Publication", + "attribute_type_id": "biolink:publications", + "value": document_id, + "value_type_id": "biolink:Uriorcurie", "value_url": f"https://pubmed.ncbi.nlm.nih.gov/{str(row['document_id']).split(':')[-1]}/", "attribute_source": "infores:pubmed" }, @@ -339,26 +349,26 @@ def get_evidence_json(row): "attribute_source": "infores:text-mining-provider-targeted " } ] - if row['document_year']: + if row['document_year_published']: nested_attributes.append( { "attribute_type_id": "biolink:supporting_document_year", - "value": row['document_year'], + "value": row['document_year_published'], "value_type_id": "UO:0000036", "attribute_source": "infores:pubmed" } ) - if row['semmed_flag'] == 1: - nested_attributes.append( - { - "attribute_type_id": "biolink:agrees_with_data_source", - "value": "infores:semmeddb", - "value_type_id": "biolink:InformationResource", - "attribute_source": "infores:text-mining-provider-targeted" - } - ) + # if row['semmed_flag'] == 1: + # nested_attributes.append( + # { + # "attribute_type_id": "biolink:agrees_with_data_source", + # "value": "infores:semmeddb", + # "value_type_id": "biolink:InformationResource", + # "attribute_source": "infores:text-mining-provider-targeted" + # } + # ) return { - "attribute_type_id": "biolink:supporting_study_result", + "attribute_type_id": "biolink:has_supporting_study_result", "value": f"tmkp:{row['evidence_id']}", "value_type_id": "biolink:TextMiningResult", "value_url": f"https://tmui.text-mining-kp.org/evidence/{row['evidence_id']}", @@ -373,14 +383,26 @@ def get_edge(rows, predicate): logging.debug(f'No relevant rows for predicate {predicate}') return None row1 = relevant_rows[0] - if (row1['object_curie'].startswith('PR:') and not row1['object_uniprot']) or \ - (row1['subject_curie'].startswith('PR:') and not row1['subject_uniprot']): + if row1['object_curie'].startswith('PR:') or row1['subject_curie'].startswith('PR:'): logging.debug(f"Could not get uniprot for pr curie ({row1['object_curie']}|{row1['subject_curie']})") return None - sub = row1['subject_uniprot'] if row1['subject_uniprot'] else row1['subject_curie'] - obj = row1['object_uniprot'] if row1['object_uniprot'] else row1['object_curie'] + sub = row1['subject_curie'] + obj = row1['object_curie'] + # if (row1['object_curie'].startswith('PR:') and not row1['object_uniprot']) or \ + # (row1['subject_curie'].startswith('PR:') and not row1['subject_uniprot']): + # logging.debug(f"Could not get uniprot for pr curie ({row1['object_curie']}|{row1['subject_curie']})") + # return None + # sub = row1['subject_uniprot'] if row1['subject_uniprot'] else row1['subject_curie'] + # obj = row1['object_uniprot'] if row1['object_uniprot'] else row1['object_curie'] supporting_study_results = '|'.join([f"tmkp:{row['evidence_id']}" for row in relevant_rows]) - supporting_publications = '|'.join([row['document_id'] for row in relevant_rows]) + supporting_publications = [] + for row in relevant_rows: + document_id = row['document_id'] + if document_id.startswith('PMC') and ':' not in document_id: + supporting_publications.append(document_id.replace('PMC', 'PMC:')) + else: + supporting_publications.append(document_id) + supporting_publications_string = '|'.join(supporting_publications) qualified_predicate = '' subject_aspect_qualifier = '' subject_direction_qualifier = '' @@ -401,14 +423,18 @@ def get_edge(rows, predicate): qualified_predicate = 'biolink:causes' object_aspect_qualifier = 'activity_or_abundance' object_direction_qualifier = 'decreased' + elif predicate == 'biolink:treats': + predicate = 'biolink:treats_or_applied_or_studied_to_treat' elif predicate == 'biolink:gain_of_function_contributes_to': - predicate = 'biolink:affects' - qualified_predicate = 'biolink:contributes_to' - subject_form_or_variant_qualifier = 'gain_of_function_variant_form' + # predicate = 'biolink:affects' + # qualified_predicate = 'biolink:contributes_to' + # subject_form_or_variant_qualifier = 'gain_of_function_variant_form' + return None elif predicate == 'biolink:loss_of_function_contributes_to': - predicate = 'biolink:affects' - qualified_predicate = 'biolink:contributes_to' - subject_form_or_variant_qualifier = 'loss_of_function_variant_form' + # predicate = 'biolink:affects' + # qualified_predicate = 'biolink:contributes_to' + # subject_form_or_variant_qualifier = 'loss_of_function_variant_form' + return None return [sub, predicate, obj, qualified_predicate, subject_aspect_qualifier, subject_direction_qualifier, subject_part_qualifier, subject_form_or_variant_qualifier, @@ -416,13 +442,37 @@ def get_edge(rows, predicate): object_part_qualifier, object_form_or_variant_qualifier, anatomical_context_qualifier, row1['assertion_id'], row1['association_curie'], get_aggregate_score(relevant_rows), - supporting_study_results, supporting_publications, get_assertion_json(relevant_rows)] + supporting_study_results, supporting_publications_string, get_assertion_json(relevant_rows)] def write_edges(edge_dict, nodes, output_filename): logging.info("Starting edge output") skipped_assertions = set([]) with open(output_filename, 'a') as outfile: + for assertion, rows in edge_dict.items(): + row1 = rows[0] + # sub = row1['subject_uniprot'] if row1['subject_uniprot'] else row1['subject_curie'] + # obj = row1['object_uniprot'] if row1['object_uniprot'] else row1['object_curie'] + sub = row1['subject_curie'] + obj = row1['object_curie'] + if sub not in nodes or obj not in nodes: + continue + predicates = set([row['predicate_curie'] for row in rows]) + for predicate in predicates: + edge = get_edge(rows, predicate) + if not edge: + skipped_assertions.add(assertion) + continue + line = '\t'.join(str(val) for val in edge) + '\n' + throwaway_value = outfile.write(line) + outfile.flush() + logging.info(f'{len(skipped_assertions)} distinct assertions were skipped') + logging.info("Edge output complete") + +def write_edges_gzip(edge_dict, nodes, output_filename): + logging.info("Starting edge output") + skipped_assertions = set([]) + with gzip.open(output_filename, 'ab') as outfile: for assertion, rows in edge_dict.items(): row1 = rows[0] sub = row1['subject_uniprot'] if row1['subject_uniprot'] else row1['subject_curie'] @@ -435,18 +485,38 @@ def write_edges(edge_dict, nodes, output_filename): if not edge: skipped_assertions.add(assertion) continue - line = '\t'.join(str(val) for val in edge) + '\n' + line = b'\t'.join(bytes(str(val), encoding='utf-8') for val in edge) + b'\n' throwaway_value = outfile.write(line) outfile.flush() logging.info(f'{len(skipped_assertions)} distinct assertions were skipped') logging.info("Edge output complete") +def generate_edges(edge_dict, nodes): + logging.info("Starting edge output") + skipped_assertions = set([]) + for assertion, rows in edge_dict.items(): + row1 = rows[0] + sub = row1['subject_uniprot'] if row1['subject_uniprot'] else row1['subject_curie'] + obj = row1['object_uniprot'] if row1['object_uniprot'] else row1['object_curie'] + if sub not in nodes or obj not in nodes: + continue + predicates = set([row['predicate_curie'] for row in rows]) + for predicate in predicates: + edge = get_edge(rows, predicate) + if not edge: + skipped_assertions.add(assertion) + continue + yield '\t'.join(str(val) for val in edge) + '\n' + logging.info(f'{len(skipped_assertions)} distinct assertions were skipped') + logging.info("Edge output complete") + def compress(infile, outfile): with open(infile, 'rb') as textfile: with gzip.open(outfile, 'wb') as gzfile: shutil.copyfileobj(textfile, gzfile) + def decompress(infile, outfile): with gzip.open(infile, 'rb') as gzfile: with open(outfile, 'wb') as textfile: @@ -469,24 +539,18 @@ def generate_metadata(edgefile, nodefile, outdir): edge_file = os.path.join(outdir, "edges.tsv") metadata_file = os.path.join(outdir, "content_metadata.json") - node_lines = open(nodefile, 'r').readlines() - nodes = [node_line.strip().split('\t') for node_line in node_lines] + node_lines = gzip.open(nodefile, 'rb').readlines() + nodes = [node_line.decode().strip().split('\t') for node_line in node_lines] curies = [node[0] for node in nodes] for node in nodes: node_metadata_dict = update_node_metadata(node, node_metadata_dict, PRIMARY_KNOWLEDGE_SOURCE) normalized_nodes = get_normalized_nodes(curies) - with open(node_file, 'w') as outfile: - tmp = outfile.write('\t'.join(node_headers) + '\n') - for line in node_lines: - tmp = outfile.write(line) edge_metadata_dict = {} - with open(edgefile) as infile: - with open(edge_file, 'w') as outfile: - tmp = outfile.write('\t'.join(edge_headers) + '\n') - for line in infile: - tmp = outfile.write(line) - edge_metadata_dict = update_edge_metadata(line.split('\t'), edge_metadata_dict, normalized_nodes, PRIMARY_KNOWLEDGE_SOURCE) + with gzip.open(edgefile, 'rb') as infile: + for line in infile: + cols = line.decode().split('\t') + edge_metadata_dict = update_edge_metadata(cols, edge_metadata_dict, normalized_nodes, PRIMARY_KNOWLEDGE_SOURCE) metadata_dict = { "nodes": node_metadata_dict, "edges": list(edge_metadata_dict.values()) @@ -494,5 +558,5 @@ def generate_metadata(edgefile, nodefile, outdir): logging.info("Writing metadata file") with open(metadata_file, 'w') as outfile: outfile.write(json.dumps(metadata_dict)) - logging.info("Creating tarball") - shutil.make_archive('targeted_assertions', 'gztar', root_dir=outdir) + # logging.info("Creating tarball") + # shutil.make_archive('targeted_assertions', 'gztar', root_dir=outdir) diff --git a/targeted.py b/targeted.py index 11303c1..c5bb965 100644 --- a/targeted.py +++ b/targeted.py @@ -2,17 +2,60 @@ import logging import math -import sqlalchemy from sqlalchemy import text from sqlalchemy.orm import Session +from sqlalchemy import Column, String, Integer +from sqlalchemy.orm import declarative_base import services +Model = declarative_base(name='Model') ROW_BATCH_SIZE = 10000 HUMAN_TAXON = 'NCBITaxon:9606' ORIGINAL_KNOWLEDGE_SOURCE = "infores:text-mining-provider-targeted" EXCLUDED_FIG_CURIES = ['DRUGBANK:DB10633', 'PR:000006421', 'PR:000008147', 'PR:000009005', 'PR:000031137', 'PR:Q04746', 'PR:Q04746', 'PR:Q7XZU3'] +EXCLUDE_LIST = ['CHEBI:35222', 'CHEBI:23888', 'CHEBI:36080', 'PR:000003944', 'PR:000011336', 'CL:0000000', + 'PR:000000001', 'HP:0045088', 'HP:0001259', 'HP:0041092', 'HP:0031796', 'HP:0011011', 'HP:0001056', + 'HP:0011010', 'MONDO:0021141', 'MONDO:0021152', 'HP:0000005', 'HP:0000005', 'MONDO:0017169', + 'MONDO:0024497', 'MONDO:0000605', 'HP:0040285', 'HP:0025304', 'HP:0030645', 'HP:0025279', + 'HP:0003676', 'HP:0030649', 'HP:0012835', 'HP:0003674', 'HP:0020034', 'HP:0002019', 'HP:0040282', + 'HP:0040279', 'HP:0040279', 'HP:0032322', 'HP:0030645', 'HP:0011009', 'HP:0012829', 'HP:0030645', + 'HP:0031375', 'HP:0030650', 'HP:0011009', 'HP:0012824', 'HP:0012828', 'HP:0012828', 'HP:0025287', + 'HP:0025145', 'HP:0003676', 'HP:0003676', 'HP:0030645', 'MONDO:0005070', 'HP:0002664', 'MONDO:0021178', + 'MONDO:0021137', 'MONDO:0002254', 'MONDO:0021136', 'HP:0012838', 'HP:0003680', 'HP:0031915', + 'HP:0012837', 'HP:0040282', 'HP:0040279', 'HP:0040279', 'HP:0012840', 'HP:0410291', 'HP:0012830', + 'HP:0025275', 'HP:0012831', 'HP:0012831', 'HP:0030646', 'MONDO:0021137', 'HP:0040279', 'HP:0040282', + 'HP:0040282', 'HP:0040279', 'HP:0040282', 'HP:0040282', 'HP:0003680', 'HP:0012838', 'HP:0012834', + 'HP:0200034', 'HP:0012825', 'HP:0040283', 'HP:0012824', 'HP:0012828', 'HP:0012828', 'HP:0100754', + 'HP:0032320', 'HP:0030212', 'HP:0012826', 'HP:0003680', 'CHEBI:15377', 'DRUGBANK:DB09145', + 'DRUGBANK:DB10632'] + +class Evidence(Model): + __tablename__ = 'evidence' + evidence_id = Column(String(65), primary_key=True) + assertion_id = Column(String(65)) + document_id = Column(String(45)) + sentence = Column(String(2000)) + subject_entity_id = Column(String(65)) + object_entity_id = Column(String(65)) + document_zone = Column(String(45)) + document_publication_type = Column(String(100)) + document_year_published = Column(Integer) + superseded_by = Column(String(20)) + + def __init__(self, evidence_id, assertion_id, document_id, sentence, subject_entity_id, object_entity_id, + document_zone, document_publication_type, document_year_published, superseded_by): + self.evidence_id = evidence_id + self.assertion_id = assertion_id + self.document_id = document_id + self.sentence = sentence + self.subject_entity_id = subject_entity_id + self.object_entity_id = object_entity_id + self.document_zone = document_zone + self.document_publication_type = document_publication_type + self.document_year_published = document_year_published + self.superseded_by = superseded_by def get_node_data(session: Session, use_uniprot: bool = False) -> (list[str], dict[str, dict]): @@ -26,21 +69,11 @@ def get_node_data(session: Session, use_uniprot: bool = False) -> (list[str], di """ logging.info("Getting node data") logging.info(f"Mode: {'UniProt' if use_uniprot else 'PR'}") - if use_uniprot: - curies = [row[0] for row in session.query(sqlalchemy.text('DISTINCT IFNULL(uniprot, subject_curie) as curie ' - 'FROM assertion LEFT JOIN pr_to_uniprot ON ' - 'subject_curie = pr AND ' - f'taxon = "{HUMAN_TAXON}"')).all()] - curies.extend([row[0] for row in session.query(sqlalchemy.text('DISTINCT IFNULL(uniprot, object_curie) as ' - 'curie FROM assertion LEFT JOIN pr_to_uniprot ' - f'ON object_curie = pr AND ' - f'taxon = "{HUMAN_TAXON}"')).all()]) - else: - curies = [row[0] for row in session.query(sqlalchemy.text('DISTINCT subject_curie FROM assertion')).all()] - curies.extend([row[0] for row in session.query(sqlalchemy.text('DISTINCT object_curie FROM assertion')).all()]) + curies = [row[0] for row in session.query(text('DISTINCT subject_curie FROM targeted.assertion')).all()] + curies.extend([row[0] for row in session.query(text('DISTINCT object_curie FROM targeted.assertion')).all()]) curies = list(set(curies)) logging.info(f'node curies retrieved and uniquified ({len(curies)})') - curies = [curie for curie in curies if curie not in EXCLUDED_FIG_CURIES] + curies = [curie for curie in curies if curie not in EXCLUDED_FIG_CURIES and curie not in EXCLUDE_LIST] if use_uniprot: curies = [curie for curie in curies if not curie.startswith('PR:')] normalized_nodes = services.get_normalized_nodes(curies) @@ -69,49 +102,63 @@ def write_nodes(curies: list[str], normalize_dict: dict[str, dict], output_filen return metadata_dict -# TODO: Join this query to the new feedback schema def get_assertion_ids(session, limit=600000, offset=0): - id_query = text('SELECT assertion_id FROM assertion WHERE assertion_id NOT IN ' + """ + Get the assertion ids to be exported in this run + + :param session: the database session + :param limit: limit for assertion query + :param offset: offset for assertion query + :returns a list of assertion ids + """ + id_query = text('SELECT assertion_id FROM targeted.assertion WHERE assertion_id NOT IN ' '(SELECT DISTINCT(assertion_id) ' - 'FROM evaluation INNER JOIN evidence ' - 'ON evidence.evidence_id = evaluation.evidence_id ' - 'WHERE overall_correct = 0 OR subject_correct = 0 ' - 'OR object_correct = 0 OR predicate_correct = 0) ' + 'FROM assertion_evidence_feedback af ' + 'INNER JOIN evidence_feedback_answer ef ' + 'INNER JOIN evidence e ON e.evidence_id = af.evidence_id ' + 'INNER JOIN evidence_version ev ON ev.evidence_id = e.evidence_id ' + 'WHERE ef.prompt_text = \'Assertion Correct\' AND ef.response = 0 AND ev.version = 2) ' 'AND subject_curie NOT IN :ex1 AND object_curie NOT IN :ex2 ' + 'AND subject_curie NOT IN :ex3 AND object_curie NOT IN :ex4 ' 'ORDER BY assertion_id ' 'LIMIT :limit OFFSET :offset' ) return [row[0] for row in session.execute(id_query, { 'ex1': EXCLUDED_FIG_CURIES, 'ex2': EXCLUDED_FIG_CURIES, + 'ex3': EXCLUDE_LIST, + 'ex4': EXCLUDE_LIST, 'limit': limit, 'offset': offset })] def get_edge_data(session: Session, id_list, chunk_size=1000, edge_limit=5) -> list[str]: + """ + Generate edge data for the given list of ids + :param session: the database session + :param id_list: the list of assertion ids + :param chunk_size: the number of edge rows to yield at a time + :param edge_limit: the maximum number of evidence records to return for each edge + :returns edge data for up to chunk_size assertion ids from id_list with up to edge_limit supporting evidence records + """ logging.info(f'\nStarting edge data gathering\nChunk Size: {chunk_size}\nEdge Limit: {edge_limit}\n') logging.info(f'Total Assertions: {len(id_list)}.') logging.info(f'Partition count: {math.ceil(len(id_list) / chunk_size)}') main_query = text( 'SELECT a.assertion_id, e.evidence_id, a.association_curie, e.predicate_curie, ' - 'a.subject_curie, su.uniprot AS subject_uniprot, a.object_curie, ou.uniprot AS object_uniprot, ' + 'a.subject_curie, a.object_curie, ' 'si.idf AS subject_idf, oi.idf AS object_idf, ' - 'e.document_id, e.document_zone, e.document_year, e.score, ' - 'e.sentence, e.subject_span, e.subject_text, e.object_span, e.object_text, ' - '(SELECT COUNT(1) FROM top_evidences t2 ' - 'WHERE t2.assertion_id = a.assertion_id AND t2.predicate_curie = e.predicate_curie) AS evidence_count, ' - 'IF(e.tm_id IS NULL, 0, 1) AS semmed_flag ' - 'FROM assertion a ' - 'INNER JOIN LATERAL ' - '(SELECT * FROM top_evidences te LEFT JOIN tm_semmed ts ON ts.tm_id = te.evidence_id ' - f'WHERE te.assertion_id = a.assertion_id ORDER BY ts.semmed_id IS NULL LIMIT {edge_limit}) AS e ' - 'ON a.assertion_id = e.assertion_id ' - f'LEFT JOIN pr_to_uniprot su ON a.subject_curie = su.pr AND su.taxon = "{HUMAN_TAXON}" ' - f'LEFT JOIN pr_to_uniprot ou ON a.object_curie = ou.pr AND ou.taxon = "{HUMAN_TAXON}" ' + 'e.document_id, e.document_zone, e.document_year_published, e.score, ' + 'e.sentence, e.subject_span, e.subject_covered_text, e.object_span, e.object_covered_text, ' + '(SELECT COUNT(1) FROM targeted.evidence t2 ' + 'WHERE t2.assertion_id = a.assertion_id AND t2.predicate_curie = e.predicate_curie) AS evidence_count ' + 'FROM targeted.assertion a INNER JOIN LATERAL ' + '(SELECT * FROM targeted.evidence te WHERE te.assertion_id = a.assertion_id AND te.document_zone <> \'REF\' ' + f'ORDER BY te.score DESC LIMIT {edge_limit}) AS e ON a.assertion_id = e.assertion_id ' 'LEFT JOIN concept_idf si ON a.subject_curie = si.concept_curie ' 'LEFT JOIN concept_idf oi ON a.object_curie = oi.concept_curie ' - 'WHERE a.assertion_id IN :ids AND e.document_zone <> "REF" AND e.superseded_by IS NULL ' + 'WHERE a.assertion_id IN :ids ' 'ORDER BY a.assertion_id' ) for i in range(0, len(id_list), chunk_size): @@ -120,6 +167,61 @@ def get_edge_data(session: Session, id_list, chunk_size=1000, edge_limit=5) -> l yield [row for row in session.execute(main_query, {'ids': id_list[i:slice_end]})] +def get_superseded_chunk(session: Session) -> list[tuple[str, str]]: + """ + Gets up to 10000 evidence records where the PubMed document is superseded by a PMC document + :param session: the database session + :returns a list of tuples with the evidence record id and the PMC document ID that supersedes it. + """ + logging.info("get_superseded_chunk") + query_text = text(""" + SELECT e1.evidence_id, e2.document_id + FROM assertion a1 + INNER JOIN evidence e1 ON (e1.assertion_id = a1.assertion_id) + INNER JOIN top_evidence_scores es1 ON (es1.evidence_id = e1.evidence_id) + INNER JOIN pubmed_to_pmc t ON t.pmid = e1.document_id + INNER JOIN evidence e2 ON (e2.document_id = t.pmcid) + INNER JOIN top_evidence_scores es2 ON (es2.evidence_id = e2.evidence_id) + INNER JOIN assertion a2 ON (a2.assertion_id = e2.assertion_id) + WHERE + e1.document_id LIKE 'PMID%' + AND e1.superseded_by IS NULL + AND e1.document_id IN (SELECT pmid FROM pubmed_to_pmc) + AND e1.sentence = e2.sentence + AND e1.document_zone = e2.document_zone + AND a1.subject_curie = a2.subject_curie + AND a1.object_curie = a2.object_curie + AND es1.predicate_curie = es2.predicate_curie + LIMIT 10000 + """) + eids = set([]) + ids_list = [] + for row in session.execute(query_text): + eid = row['evidence_id'] + did = row['document_id'] + if eid not in eids: + ids_list.append((eid, did)) + eids.add(eid) + logging.info(len(ids_list)) + return ids_list + + +def update_superseded_by(session: Session, ids_list: list[tuple[str, str]]) -> None: + logging.info("starting update function") + mappings = [] + for ids in ids_list: + mappings.append({ + 'evidence_id': ids[0], + 'superseded_by': ids[1] + }) + # print(mappings) + logging.info('about to update: ' + str(len(mappings))) + session.bulk_update_mappings(Evidence, mappings) + logging.info('bulk update created') + session.commit() + logging.info('update committed') + + # This is a simple transformation to group all evidence that belongs to the same assertion and make lookups possible. def create_edge_dict(edge_data): edge_dict = {} @@ -131,7 +233,31 @@ def create_edge_dict(edge_data): return edge_dict +def uniquify_edge_dict(edge_dict): + for assertion_id in edge_dict.keys(): + evidence_list = edge_dict[assertion_id] + if len(evidence_list) < 2: + continue + index_dict = {} + score_dict = {} + for ev in evidence_list: + composite_key = f"{ev['assertion_id']}_{ev['evidence_id']}_{ev['document_id']}_{ev['sentence']}" + score = float(ev['score']) + if composite_key in index_dict.keys() and composite_key in score_dict.keys(): + if score > score_dict[composite_key]: + index_dict[composite_key] = evidence_list.index(ev) + score_dict[composite_key] = score + else: + index_dict[composite_key] = evidence_list.index(ev) + score_dict[composite_key] = score + new_evidence_list = [] + for index in index_dict.values(): + new_evidence_list.append(evidence_list[index]) + edge_dict[assertion_id] = new_evidence_list + + def export_nodes(session: Session, bucket: str, blob_prefix: str): + logging.info("Exporting Nodes") (node_curies, normal_dict) = get_node_data(session, use_uniprot=True) node_metadata = write_nodes(node_curies, normal_dict, 'nodes.tsv.gz') services.upload_to_gcp(bucket, 'nodes.tsv.gz', f'{blob_prefix}nodes.tsv.gz') @@ -157,5 +283,6 @@ def export_edges(session: Session, nodes: set, bucket: str, blob_prefix: str, for rows in get_edge_data(session, id_list, chunk_size, edge_limit): logging.info(f'Processing the next {len(rows)} rows') edge_dict = create_edge_dict(rows) + uniquify_edge_dict(edge_dict) services.write_edges(edge_dict, nodes, output_filename) services.upload_to_gcp(bucket, output_filename, f'{blob_prefix}{output_filename}')