Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring and performance improvements #71

Merged
merged 3 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 0 additions & 53 deletions dynamodb.tf
Original file line number Diff line number Diff line change
Expand Up @@ -147,56 +147,3 @@ resource "aws_dynamodb_table" "variant_query_responses" {
enabled = true
}
}

# ontology term index
resource "aws_dynamodb_table" "ontology_terms" {
billing_mode = "PAY_PER_REQUEST"
hash_key = "id"
name = "OntoIndex"
tags = var.common-tags

# this is the tab concatenated value of
# tableName, columnName, term
# this must not be repeated
attribute {
name = "id"
type = "S"
}

attribute {
name = "tableName"
type = "S"
}

attribute {
name = "tableTerms"
type = "S"
}

attribute {
name = "term"
type = "S"
}

# be able to query a term
global_secondary_index {
hash_key = "term"
name = "term_index"
projection_type = "ALL"
}

# be able to query a tableName
global_secondary_index {
hash_key = "tableName"
name = "table_index"
projection_type = "ALL"
}

# be able to query a terms in a table
# tab concatenated value of table and term
global_secondary_index {
hash_key = "tableTerms"
name = "tableterms_index"
projection_type = "ALL"
}
}
12 changes: 6 additions & 6 deletions examples/test-data/GUIDE.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
# Getting started with test data

Please ensure you first upload the `chr1.vcf.gz` and `chr1.vcf.gz.tbi` files to an S3 bucket that is accessible from the sBeacon deployment account. Obtain the S3 URI for the `chr1.vcf.gz` from the uploaded desitation. Note that, both `vcf.gz` and `vcf.gz.tbi` files must have the same prefix in S3 for this to work.
Please ensure you first upload the `chr1.vcf.gz` and `chr1.vcf.gz.tbi` files to an S3 bucket that is accessible from the sBeacon deployment account. Obtain the S3 URI for the `chr1.vcf.gz` from the uploaded desitation. Note that, both `vcf.gz` and `vcf.gz.tbi` files must have the same prefix in S3 for this to work. Please note that, all the buckets you create in AWS are in the same region as the deployment.

Now edit the `submission.json` file such that they match the S3 URI of the `vcf.gz` file.
Now edit the `submission.json` using the S3 URI of the `vcf.gz` file.

```json
...
. . .
"vcfLocations": [
"s3://<bucket>/<prefix>/chr1.vcf.gz"
]
...
. . .
```

## Data submission

You can submit the data in two ways.
You can submit this data in two ways.

### Option 1: Submission as request body

Expand All @@ -30,7 +30,7 @@ Alternatively, you can upload edited `submission.json` file to an S3 location ac
}
```

This approach is recommended for larger submissions with thousands of metadata entries.
Option 2 is recommended for larger submissions with thousands of metadata entries.

## API testing

Expand Down
20 changes: 0 additions & 20 deletions iam.tf
Original file line number Diff line number Diff line change
Expand Up @@ -827,26 +827,6 @@ data "aws_iam_policy_document" "dynamodb-onto-access" {
aws_dynamodb_table.anscestor_terms.arn,
]
}

statement {
actions = [
"dynamodb:Query",
]
resources = [
"${aws_dynamodb_table.datasets.arn}/index/*",
"${aws_dynamodb_table.variant_query_responses.arn}/index/*",
"${aws_dynamodb_table.ontology_terms.arn}/index/*",
]
}

statement {
actions = [
"dynamodb:Scan",
]
resources = [
aws_dynamodb_table.ontology_terms.arn,
]
}
}

# DynamoDB Ontology Related Write Access
Expand Down
194 changes: 19 additions & 175 deletions lambda/indexer/lambda_function.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import defaultdict
import threading
import urllib
import json
import time
import re

from smart_open import open as sopen
import requests
import boto3

from shared.dynamodb import OntoData, Ontology, Descendants, Anscestors
from shared.dynamodb import Descendants, Anscestors
from shared.ontoutils import request_hierarchy
from shared.utils import ENV_ATHENA
from ctas_queries import QUERY as CTAS_TEMPLATE
from generate_query_index import QUERY as INDEX_QUERY
Expand Down Expand Up @@ -42,36 +39,7 @@
)


def get_ontology_details(ontology):
details = None
try:
details = Ontology.get(ontology)
except Ontology.DoesNotExist:
if ontology == "SNOMED":
# use ontoserver
details = Ontology(ontology.upper())
details.data = json.dumps(
{"id": "SNOMED", "baseUri": "http://snomed.info/sct"}
)
details.save()
else:
# use ENSEMBL
if response := requests.get(f"{ENSEMBL_OLS}/{ontology}"):
response_json = response.json()
details = Ontology(ontology.upper())
details.data = json.dumps(
{
"id": response_json["ontologyId"].upper(),
"baseUri": response_json["config"]["baseUris"][0],
}
)
details.save()

# any other error must be raised
return details


def get_ontologies_clusters():
def get_ontologie_terms_in_beacon():
query = f'SELECT DISTINCT term FROM "{ENV_ATHENA.ATHENA_TERMS_TABLE}"'

response = athena.start_query_execution(
Expand All @@ -83,7 +51,7 @@ def get_ontologies_clusters():
execution_id = response["QueryExecutionId"]
await_result(execution_id)

ontology_clusters = defaultdict(set)
ontology_terms = list()

with sopen(
f"s3://{ENV_ATHENA.ATHENA_METADATA_BUCKET}/query-results/{execution_id}.csv"
Expand All @@ -92,125 +60,27 @@ def get_ontologies_clusters():
if n == 0:
continue
term = line.strip().strip('"')

# beacon API does not allow non CURIE formatted terms
# however, SNOMED appears are non-CURIE prefixed terms
# following is to support that, however API will not ingest
# always submit in form SNOMED:123212
if re.match(r"(?i)(^SNOMED)|([0-9]+)", term):
ontology = "SNOMED"
ontology_clusters[ontology].add(term)
else:
ontology = term.split(":")[0]
ontology_clusters[ontology].add(term)

return ontology_clusters
ontology_terms.append(term)
return ontology_terms


# in future, there could be an issue when descendants entries exceed 400KB
# TODO in future, there could be an issue when descendants entries exceed 400KB
# which means we would have roughtly 20480, 20 byte entries (unlikely?)
# this would also mean, our SQL queries would reach the 256KB limit
# we should be able to easily spread terms across multiple dynamodb
# entries and have multiple queries (as recommended by AWS)
def index_terms_tree():
# START subroutines
# subroutine for ensemble
def threaded_request_ensemble(term, url):
if response := requests.get(url):
response_json = response.json()
anscestors = set()
for response_term in response_json["_embedded"]["terms"]:
obo_id = response_term["obo_id"]
if obo_id:
anscestors.add(obo_id)
return (term, anscestors)
else:
print(f"Error fetching term from Ensembl OLS {term}")

# subroutine for ontoserver
def threaded_request_ontoserver(term, url):
snomed = "SNOMED" in term.upper()
retries = 1
response = None
while (not response or response.status_code != 200) and retries < 10:
retries += 1
response = requests.post(
url,
json={
"resourceType": "Parameters",
"parameter": [
{
"name": "valueSet",
"resource": {
"resourceType": "ValueSet",
"compose": {
"include": [
{
"system": "http://snomed.info/sct",
"filter": [
{
"property": "concept",
"op": "generalizes",
"value": f"{term.replace('SNOMED:', '')}",
}
],
}
]
},
},
}
],
},
)
if response.status_code == 200:
response_json = response.json()
anscestors = set()
for response_term in response_json["expansion"]["contains"]:
anscestors.add(
"SNOMED:" + response_term["code"]
if snomed
else response_term["code"]
)
return (term, anscestors)
else:
time.sleep(1)

if response.status_code != 200:
print(f"Error fetching term from Ontoserver {term}")

# END subroutines

ontology_clusters = get_ontologies_clusters()
terms_in_beacon = get_ontologie_terms_in_beacon()
executor = ThreadPoolExecutor(500)
futures = []

for ontology, terms in ontology_clusters.items():
if ontology == "SNOMED":
for term in terms:
# fetch only anscestors that aren't fetched yet
try:
data = Anscestors.get(term)
except Anscestors.DoesNotExist:
futures.append(
executor.submit(threaded_request_ontoserver, term, ONTOSERVER)
)
else:
for term in terms:
# fetch only anscestors that aren't fetched yet
try:
data = Anscestors.get(term)
except Anscestors.DoesNotExist:
# details will be missing if the ontology info is not in OLS
if details := get_ontology_details(ontology):
data = json.loads(details.data)
iri = data["baseUri"] + term.split(":")[1]
iri_double_encoded = urllib.parse.quote_plus(
urllib.parse.quote_plus(iri)
)
url = f"{ENSEMBL_OLS}/{ontology}/terms/{iri_double_encoded}/hierarchicalAncestors"
futures.append(
executor.submit(threaded_request_ensemble, term, url)
)
for term in terms_in_beacon:
try:
Anscestors.get(term)
except Anscestors.DoesNotExist:
futures.append(executor.submit(request_hierarchy, term, True))

# record ancestors
term_anscestors = defaultdict(set)

for future in as_completed(futures):
Expand All @@ -219,17 +89,21 @@ def threaded_request_ontoserver(term, url):
term_anscestors[term].update(ancestors)
term_anscestors[term].add(term)

# reverse the tree for descendent term search
term_descendants = defaultdict(set)

# write ancestors
with Anscestors.batch_write() as batch:
for term, anscestors in term_anscestors.items():
item = Anscestors(term)
item.anscestors = anscestors
batch.save(item)

# record descendents
for anscestor in anscestors:
term_descendants[anscestor].add(term)

# write descendents
with Descendants.batch_write() as batch:
for term, descendants in term_descendants.items():
# if descendants are recorded, just update, else make record
Expand Down Expand Up @@ -353,36 +227,6 @@ def record_relations():
await_result(response["QueryExecutionId"])


# TODO re-evaluate the following function remove or useful?
def onto_index():
response = athena.start_query_execution(
QueryString=ONTO_TERMS_QUERY,
QueryExecutionContext={"Database": ENV_ATHENA.ATHENA_METADATA_DATABASE},
WorkGroup=ENV_ATHENA.ATHENA_WORKGROUP,
)
execution_id = response["QueryExecutionId"]
await_result(execution_id)

with sopen(
f"s3://{ENV_ATHENA.ATHENA_METADATA_BUCKET}/query-results/{execution_id}.csv"
) as s3f:
for n, line in enumerate(s3f):
if n == 0:
continue
term, tablename, colname, type, label = [
item.strip('"') for item in line.strip().split(",")
]
entry = OntoData.make_index_entry(
term=term,
tableName=tablename,
columnName=colname,
type=type,
label=label,
)
entry.save()
return


def lambda_handler(event, context):
# CTAS this must finish before all
threads = []
Expand Down
1 change: 0 additions & 1 deletion main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ locals {
DYNAMO_ONTOLOGIES_TABLE = aws_dynamodb_table.ontologies.name
DYNAMO_ANSCESTORS_TABLE = aws_dynamodb_table.anscestor_terms.name
DYNAMO_DESCENDANTS_TABLE = aws_dynamodb_table.descendant_terms.name
DYNAMO_ONTO_INDEX_TABLE = aws_dynamodb_table.ontology_terms.name
}
# layers
binaries_layer = "${aws_lambda_layer_version.binaries_layer.layer_arn}:${aws_lambda_layer_version.binaries_layer.version}"
Expand Down
Loading