Skip to content

Commit

Permalink
Merge branch 'edgargaticaCU:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
bill-baumgartner authored Jun 19, 2024
2 parents 6229112 + fb4afbf commit 0f6ce42
Show file tree
Hide file tree
Showing 5 changed files with 410 additions and 117 deletions.
101 changes: 81 additions & 20 deletions DAG/kgx-export-parallel.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,74 @@
import os
import json
import gzip
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow import models
from airflow.contrib.operators import kubernetes_pod_operator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models

MYSQL_DATABASE_PASSWORD=os.environ.get('MYSQL_DATABASE_PASSWORD')
MYSQL_DATABASE_USER=os.environ.get('MYSQL_DATABASE_USER')
MYSQL_DATABASE_INSTANCE=os.environ.get('MYSQL_DATABASE_INSTANCE')
PR_BUCKET = os.environ.get('PR_BUCKET')
UNI_BUCKET = os.environ.get('UNI_BUCKET')
START_DATE=datetime(2023, 2, 23, 0, 0)
TMP_BUCKET = os.environ.get('TMP_BUCKET')
START_DATE=datetime(2023, 8, 7, 0, 0)
CHUNK_SIZE = '1000'
EVIDENCE_LIMIT = '5'
STEP_SIZE = 80000
STEP_SIZE = 75000


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': START_DATE,
'schedule_interval': '0 23 * * *',
'email': ['edgargaticaCU@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'email_on_failure': True,
'email_on_retry': True,
'retries': 0
}

def output_operations(**kwargs):
operations_dict = {}
with open(kwargs['edges_filename'], 'r') as infile:
for line in infile:
columns = line.split('\t')
if len(columns) < 13:
print('not enough columns')
print(line)
continue
subject_namespace = columns[0].split(':')[0]
object_namespace = columns[2].split(':')[0]
predicate = 'no_predicate'
if columns[1] == 'biolink:treats':
predicate = 'treats'
elif columns[1] == 'biolink:contributes_to':
predicate = 'contributes_to'
elif columns[1] == 'biolink:affects':
if columns[3] == 'biolink:causes':
if columns[8] == 'activity_or_abundance' and columns[9] == 'increased':
predicate = 'positively_regulates'
elif columns[8] == 'activity_or_abundance' and columns[9] == 'decreased':
predicate = 'negatively_regulates'
elif columns[3] == 'biolink:contributes_to':
if columns[7] == 'gain_of_function_variant_form':
predicate = 'gain_of_function_contributes_to'
elif columns[7] == 'loss_of_function_variant_form':
predicate = 'loss_of_function_contributes_to'
key = subject_namespace + '_' + predicate + '_' + object_namespace
if key in operations_dict:
operations_dict[key] += 1
else:
operations_dict[key] = 1
with open(kwargs['output_filename'], 'w') as outfile:
x = outfile.write(json.dumps(operations_dict))

with models.DAG(dag_id='targeted-parallel', default_args=default_args,
schedule_interval=timedelta(days=1), start_date=START_DATE, catchup=False) as dag:
with models.DAG(dag_id='targeted-parallel', default_args=default_args, catchup=True) as dag:
filename_list = []
export_task_list = []
# This creates as many pods as needed to export all assertion records in groups of STEP_SIZE, which are then run in
Expand All @@ -38,46 +78,67 @@
# rather than 15 longer tasks that run all at once.
# TODO: the upper limit on the range needs to be the total number of assertion records
for i in range(0, 2400000, STEP_SIZE):
filename_list.append(f'gs://{UNI_BUCKET}/kgx/UniProt/edges_{i}_{i + STEP_SIZE}.tsv')
export_task_list.append(kubernetes_pod_operator.KubernetesPodOperator(
filename_list.append(f'edges_{i}_{i + STEP_SIZE}.tsv')
export_task_list.append(KubernetesPodOperator(
task_id=f'targeted-edges-{i}',
name=f'parallel-{i}',
namespace='default',
config_file="/home/airflow/composer_kube_config",
namespace='composer-user-workloads',
image_pull_policy='Always',
arguments=['-t', 'edges', '-uni', UNI_BUCKET,
startup_timeout_seconds=1200,
arguments=['-t', 'edges', '-uni', TMP_BUCKET,
'--chunk_size', CHUNK_SIZE, '--limit', EVIDENCE_LIMIT,
'--assertion_offset', f'{i}', '--assertion_limit', f'{STEP_SIZE}'],
env_vars={
'MYSQL_DATABASE_PASSWORD': MYSQL_DATABASE_PASSWORD,
'MYSQL_DATABASE_USER': MYSQL_DATABASE_USER,
'MYSQL_DATABASE_INSTANCE': MYSQL_DATABASE_INSTANCE,
},
container_resources=k8s_models.V1ResourceRequirements(
limits={"memory": "1G", "cpu": "1000m"},
),
retries=1,
image='gcr.io/translator-text-workflow-dev/kgx-export-parallel:latest'
))
export_nodes = kubernetes_pod_operator.KubernetesPodOperator(
export_nodes = KubernetesPodOperator(
task_id='targeted-nodes',
name='nodes',
namespace='default',
config_file="/home/airflow/composer_kube_config",
namespace='composer-user-workloads',
image_pull_policy='Always',
arguments=['-t', 'nodes', '-uni', UNI_BUCKET],
arguments=['-t', 'nodes', '-uni', TMP_BUCKET],
env_vars={
'MYSQL_DATABASE_PASSWORD': MYSQL_DATABASE_PASSWORD,
'MYSQL_DATABASE_USER': MYSQL_DATABASE_USER,
'MYSQL_DATABASE_INSTANCE': MYSQL_DATABASE_INSTANCE,
},
image='gcr.io/translator-text-workflow-dev/kgx-export-parallel:latest')
generate_metadata = kubernetes_pod_operator.KubernetesPodOperator(
generate_metadata = KubernetesPodOperator(
task_id='targeted-metadata',
name='targeted-metadata',
namespace='default',
config_file="/home/airflow/composer_kube_config",
namespace='composer-user-workloads',
image_pull_policy='Always',
arguments=['-t', 'metadata', '-uni', UNI_BUCKET],
arguments=['-t', 'metadata', '-uni', TMP_BUCKET],
image='gcr.io/translator-text-workflow-dev/kgx-export-parallel:latest')
generate_bte_operations = PythonOperator(
task_id='generate_bte_operations',
python_callable=output_operations,
provide_context=True,
op_kwargs={'edges_filename': '/home/airflow/gcs/data/kgx-export/edges.tsv',
'output_filename': '/home/airflow/gcs/data/kgx-export/operations.json'},
dag=dag)
combine_files = BashOperator(
task_id='targeted-compose',
bash_command=f"gsutil compose {' '.join(filename_list)} gs://{UNI_BUCKET}/kgx/UniProt/edges.tsv")
bash_command=f"cd /home/airflow/gcs/data/kgx-export/ && cat {' '.join(filename_list)} > edges.tsv")
compress_edge_file = BashOperator(
task_id='targeted-compress',
bash_command=f"cd /home/airflow/gcs/data/kgx-export/ && gzip -f edges.tsv")
cleanup_files = BashOperator(
task_id='targeted-cleanup',
bash_command=f"gsutil rm {' '.join(filename_list)} gs://{UNI_BUCKET}/kgx/UniProt/edges.tsv")
bash_command=f"cd /home/airflow/gcs/data/kgx-export/ && rm {' '.join(filename_list)}")
publish_files = BashOperator(
task_id='targeted-publish',
bash_command=f"gsutil cp gs://{TMP_BUCKET}/data/kgx-export/* gs://{UNI_BUCKET}/kgx/UniProt/")

export_nodes >> export_task_list >> combine_files >> generate_metadata >> cleanup_files
export_nodes >> export_task_list >> combine_files >> generate_bte_operations >> compress_edge_file >> cleanup_files >> generate_metadata >> publish_files
23 changes: 15 additions & 8 deletions exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,24 @@
GCP_BLOB_PREFIX = 'kgx/UniProt/'

def export_metadata(bucket):
services.get_from_gcp(bucket, GCP_BLOB_PREFIX + 'edges.tsv', 'edges.tsv')
"""
Generate a metadata file from previously created KGX export files
:param bucket: the GCP storage bucket containing the KGX files
"""
services.get_from_gcp(bucket, GCP_BLOB_PREFIX + 'edges.tsv.gz', 'edges.tsv.gz')
services.get_from_gcp(bucket, GCP_BLOB_PREFIX + 'nodes.tsv.gz', 'nodes.tsv.gz')
services.decompress('nodes.tsv.gz', 'nodes.tsv')
services.generate_metadata('edges.tsv', 'nodes.tsv', 'KGE')
services.compress('edges.tsv', 'edges.tsv.gz')
services.upload_to_gcp(bucket, 'edges.tsv.gz', GCP_BLOB_PREFIX + 'edges.tsv.gz')
services.generate_metadata('edges.tsv.gz', 'nodes.tsv.gz', 'KGE')
services.upload_to_gcp(bucket, 'KGE/content_metadata.json', GCP_BLOB_PREFIX + 'content_metadata.json')
services.upload_to_gcp(bucket, 'targeted_assertions.tar.gz', GCP_BLOB_PREFIX + 'targeted_assertions.tar.gz')


def get_valid_nodes(bucket) -> set[str]:
"""
Retrieve the set of nodes used by a KGX nodes file
:param bucket: the GCP storage bucket containing the KGX file
:returns a set of node curies
"""
services.get_from_gcp(bucket, GCP_BLOB_PREFIX + 'nodes.tsv.gz', 'nodes.tsv.gz')
node_set = set([])
with gzip.open('nodes.tsv.gz', 'rb') as infile:
Expand Down Expand Up @@ -57,7 +64,7 @@ def get_conn() -> pymysql.connections.Connection:
logging.info('Starting Main')
parser = argparse.ArgumentParser()
parser.add_argument('-t', '--target', help='the export target: edges, nodes, or metadata', required=True)
parser.add_argument('-uni', '--uniprot_bucket', help='storage bucket for UniProt data', required=True)
parser.add_argument('-uni', '--uniprot_bucket', help='storage bucket for UniProt data', required=True) # TODO: Replace with -b --bucket
parser.add_argument('-i', '--instance', help='GCP DB instance name')
parser.add_argument('-d', '--database', help='database name')
parser.add_argument('-u', '--user', help='database username')
Expand All @@ -74,7 +81,7 @@ def get_conn() -> pymysql.connections.Connection:

if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
if args.target == 'metadata':
if args.target == 'metadata': # if we are just exporting metadata a database connection is not necessary
export_metadata(uniprot_bucket)
else:
session_maker = init_db(
Expand Down
34 changes: 34 additions & 0 deletions ops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import json
operations_dict = {}
with open('edges.tsv','r') as infile:
for line in infile:
columns = line.split('\t')
if len(columns) < 13:
print('not enough columns')
print(line)
continue
subject_namespace = columns[0].split(':')[0]
object_namespace = columns[2].split(':')[0]
predicate = 'no_predicate'
if columns[1] == 'biolink:treats':
predicate = 'treats'
elif columns[1] == 'biolink:contributes_to':
predicate = 'contributes_to'
elif columns[1] == 'biolink:affects':
if columns[3] == 'biolink:causes':
if columns[8] == 'activity_or_abundance' and columns[9] == 'increased':
predicate = 'positively_regulates'
elif columns[8] == 'activity_or_abundance' and columns[9] == 'decreased':
predicate = 'negatively_regulates'
elif columns[3] == 'biolink:contributes_to':
if columns[7] == 'gain_of_function_variant_form':
predicate = 'gain_of_function_contributes_to'
elif columns[7] == 'loss_of_function_variant_form':
predicate = 'loss_of_function_contributes_to'
key = subject_namespace + '_' + predicate + '_' + object_namespace
if key in operations_dict:
operations_dict[key] += 1
else:
operations_dict[key] = 1
with open('operations.json','w') as outfile:
x = outfile.write(json.dumps(operations_dict))
Loading

0 comments on commit 0f6ce42

Please sign in to comment.