diff --git a/dynamodb.tf b/dynamodb.tf index 8f0bfe9..c3905f4 100644 --- a/dynamodb.tf +++ b/dynamodb.tf @@ -147,56 +147,3 @@ resource "aws_dynamodb_table" "variant_query_responses" { enabled = true } } - -# ontology term index -resource "aws_dynamodb_table" "ontology_terms" { - billing_mode = "PAY_PER_REQUEST" - hash_key = "id" - name = "OntoIndex" - tags = var.common-tags - - # this is the tab concatenated value of - # tableName, columnName, term - # this must not be repeated - attribute { - name = "id" - type = "S" - } - - attribute { - name = "tableName" - type = "S" - } - - attribute { - name = "tableTerms" - type = "S" - } - - attribute { - name = "term" - type = "S" - } - - # be able to query a term - global_secondary_index { - hash_key = "term" - name = "term_index" - projection_type = "ALL" - } - - # be able to query a tableName - global_secondary_index { - hash_key = "tableName" - name = "table_index" - projection_type = "ALL" - } - - # be able to query a terms in a table - # tab concatenated value of table and term - global_secondary_index { - hash_key = "tableTerms" - name = "tableterms_index" - projection_type = "ALL" - } -} \ No newline at end of file diff --git a/examples/test-data/GUIDE.md b/examples/test-data/GUIDE.md index 1e98368..a070ccc 100644 --- a/examples/test-data/GUIDE.md +++ b/examples/test-data/GUIDE.md @@ -1,20 +1,20 @@ # Getting started with test data -Please ensure you first upload the `chr1.vcf.gz` and `chr1.vcf.gz.tbi` files to an S3 bucket that is accessible from the sBeacon deployment account. Obtain the S3 URI for the `chr1.vcf.gz` from the uploaded desitation. Note that, both `vcf.gz` and `vcf.gz.tbi` files must have the same prefix in S3 for this to work. +Please ensure you first upload the `chr1.vcf.gz` and `chr1.vcf.gz.tbi` files to an S3 bucket that is accessible from the sBeacon deployment account. Obtain the S3 URI for the `chr1.vcf.gz` from the uploaded desitation. Note that, both `vcf.gz` and `vcf.gz.tbi` files must have the same prefix in S3 for this to work. Please note that, all the buckets you create in AWS are in the same region as the deployment. -Now edit the `submission.json` file such that they match the S3 URI of the `vcf.gz` file. +Now edit the `submission.json` using the S3 URI of the `vcf.gz` file. ```json -... +. . . "vcfLocations": [ "s3:////chr1.vcf.gz" ] -... +. . . ``` ## Data submission -You can submit the data in two ways. +You can submit this data in two ways. ### Option 1: Submission as request body @@ -30,7 +30,7 @@ Alternatively, you can upload edited `submission.json` file to an S3 location ac } ``` -This approach is recommended for larger submissions with thousands of metadata entries. +Option 2 is recommended for larger submissions with thousands of metadata entries. ## API testing diff --git a/iam.tf b/iam.tf index 34a96bf..855322b 100644 --- a/iam.tf +++ b/iam.tf @@ -827,26 +827,6 @@ data "aws_iam_policy_document" "dynamodb-onto-access" { aws_dynamodb_table.anscestor_terms.arn, ] } - - statement { - actions = [ - "dynamodb:Query", - ] - resources = [ - "${aws_dynamodb_table.datasets.arn}/index/*", - "${aws_dynamodb_table.variant_query_responses.arn}/index/*", - "${aws_dynamodb_table.ontology_terms.arn}/index/*", - ] - } - - statement { - actions = [ - "dynamodb:Scan", - ] - resources = [ - aws_dynamodb_table.ontology_terms.arn, - ] - } } # DynamoDB Ontology Related Write Access diff --git a/lambda/indexer/lambda_function.py b/lambda/indexer/lambda_function.py index f4f7336..41d1ad6 100644 --- a/lambda/indexer/lambda_function.py +++ b/lambda/indexer/lambda_function.py @@ -1,16 +1,13 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from collections import defaultdict import threading -import urllib -import json import time -import re from smart_open import open as sopen -import requests import boto3 -from shared.dynamodb import OntoData, Ontology, Descendants, Anscestors +from shared.dynamodb import Descendants, Anscestors +from shared.ontoutils import request_hierarchy from shared.utils import ENV_ATHENA from ctas_queries import QUERY as CTAS_TEMPLATE from generate_query_index import QUERY as INDEX_QUERY @@ -42,36 +39,7 @@ ) -def get_ontology_details(ontology): - details = None - try: - details = Ontology.get(ontology) - except Ontology.DoesNotExist: - if ontology == "SNOMED": - # use ontoserver - details = Ontology(ontology.upper()) - details.data = json.dumps( - {"id": "SNOMED", "baseUri": "http://snomed.info/sct"} - ) - details.save() - else: - # use ENSEMBL - if response := requests.get(f"{ENSEMBL_OLS}/{ontology}"): - response_json = response.json() - details = Ontology(ontology.upper()) - details.data = json.dumps( - { - "id": response_json["ontologyId"].upper(), - "baseUri": response_json["config"]["baseUris"][0], - } - ) - details.save() - - # any other error must be raised - return details - - -def get_ontologies_clusters(): +def get_ontologie_terms_in_beacon(): query = f'SELECT DISTINCT term FROM "{ENV_ATHENA.ATHENA_TERMS_TABLE}"' response = athena.start_query_execution( @@ -83,7 +51,7 @@ def get_ontologies_clusters(): execution_id = response["QueryExecutionId"] await_result(execution_id) - ontology_clusters = defaultdict(set) + ontology_terms = list() with sopen( f"s3://{ENV_ATHENA.ATHENA_METADATA_BUCKET}/query-results/{execution_id}.csv" @@ -92,125 +60,27 @@ def get_ontologies_clusters(): if n == 0: continue term = line.strip().strip('"') - - # beacon API does not allow non CURIE formatted terms - # however, SNOMED appears are non-CURIE prefixed terms - # following is to support that, however API will not ingest - # always submit in form SNOMED:123212 - if re.match(r"(?i)(^SNOMED)|([0-9]+)", term): - ontology = "SNOMED" - ontology_clusters[ontology].add(term) - else: - ontology = term.split(":")[0] - ontology_clusters[ontology].add(term) - - return ontology_clusters + ontology_terms.append(term) + return ontology_terms -# in future, there could be an issue when descendants entries exceed 400KB +# TODO in future, there could be an issue when descendants entries exceed 400KB # which means we would have roughtly 20480, 20 byte entries (unlikely?) # this would also mean, our SQL queries would reach the 256KB limit # we should be able to easily spread terms across multiple dynamodb # entries and have multiple queries (as recommended by AWS) def index_terms_tree(): - # START subroutines - # subroutine for ensemble - def threaded_request_ensemble(term, url): - if response := requests.get(url): - response_json = response.json() - anscestors = set() - for response_term in response_json["_embedded"]["terms"]: - obo_id = response_term["obo_id"] - if obo_id: - anscestors.add(obo_id) - return (term, anscestors) - else: - print(f"Error fetching term from Ensembl OLS {term}") - - # subroutine for ontoserver - def threaded_request_ontoserver(term, url): - snomed = "SNOMED" in term.upper() - retries = 1 - response = None - while (not response or response.status_code != 200) and retries < 10: - retries += 1 - response = requests.post( - url, - json={ - "resourceType": "Parameters", - "parameter": [ - { - "name": "valueSet", - "resource": { - "resourceType": "ValueSet", - "compose": { - "include": [ - { - "system": "http://snomed.info/sct", - "filter": [ - { - "property": "concept", - "op": "generalizes", - "value": f"{term.replace('SNOMED:', '')}", - } - ], - } - ] - }, - }, - } - ], - }, - ) - if response.status_code == 200: - response_json = response.json() - anscestors = set() - for response_term in response_json["expansion"]["contains"]: - anscestors.add( - "SNOMED:" + response_term["code"] - if snomed - else response_term["code"] - ) - return (term, anscestors) - else: - time.sleep(1) - - if response.status_code != 200: - print(f"Error fetching term from Ontoserver {term}") - - # END subroutines - - ontology_clusters = get_ontologies_clusters() + terms_in_beacon = get_ontologie_terms_in_beacon() executor = ThreadPoolExecutor(500) futures = [] - for ontology, terms in ontology_clusters.items(): - if ontology == "SNOMED": - for term in terms: - # fetch only anscestors that aren't fetched yet - try: - data = Anscestors.get(term) - except Anscestors.DoesNotExist: - futures.append( - executor.submit(threaded_request_ontoserver, term, ONTOSERVER) - ) - else: - for term in terms: - # fetch only anscestors that aren't fetched yet - try: - data = Anscestors.get(term) - except Anscestors.DoesNotExist: - # details will be missing if the ontology info is not in OLS - if details := get_ontology_details(ontology): - data = json.loads(details.data) - iri = data["baseUri"] + term.split(":")[1] - iri_double_encoded = urllib.parse.quote_plus( - urllib.parse.quote_plus(iri) - ) - url = f"{ENSEMBL_OLS}/{ontology}/terms/{iri_double_encoded}/hierarchicalAncestors" - futures.append( - executor.submit(threaded_request_ensemble, term, url) - ) + for term in terms_in_beacon: + try: + Anscestors.get(term) + except Anscestors.DoesNotExist: + futures.append(executor.submit(request_hierarchy, term, True)) + + # record ancestors term_anscestors = defaultdict(set) for future in as_completed(futures): @@ -219,17 +89,21 @@ def threaded_request_ontoserver(term, url): term_anscestors[term].update(ancestors) term_anscestors[term].add(term) + # reverse the tree for descendent term search term_descendants = defaultdict(set) + # write ancestors with Anscestors.batch_write() as batch: for term, anscestors in term_anscestors.items(): item = Anscestors(term) item.anscestors = anscestors batch.save(item) + # record descendents for anscestor in anscestors: term_descendants[anscestor].add(term) + # write descendents with Descendants.batch_write() as batch: for term, descendants in term_descendants.items(): # if descendants are recorded, just update, else make record @@ -353,36 +227,6 @@ def record_relations(): await_result(response["QueryExecutionId"]) -# TODO re-evaluate the following function remove or useful? -def onto_index(): - response = athena.start_query_execution( - QueryString=ONTO_TERMS_QUERY, - QueryExecutionContext={"Database": ENV_ATHENA.ATHENA_METADATA_DATABASE}, - WorkGroup=ENV_ATHENA.ATHENA_WORKGROUP, - ) - execution_id = response["QueryExecutionId"] - await_result(execution_id) - - with sopen( - f"s3://{ENV_ATHENA.ATHENA_METADATA_BUCKET}/query-results/{execution_id}.csv" - ) as s3f: - for n, line in enumerate(s3f): - if n == 0: - continue - term, tablename, colname, type, label = [ - item.strip('"') for item in line.strip().split(",") - ] - entry = OntoData.make_index_entry( - term=term, - tableName=tablename, - columnName=colname, - type=type, - label=label, - ) - entry.save() - return - - def lambda_handler(event, context): # CTAS this must finish before all threads = [] diff --git a/main.tf b/main.tf index b16a97c..e5333d8 100644 --- a/main.tf +++ b/main.tf @@ -77,7 +77,6 @@ locals { DYNAMO_ONTOLOGIES_TABLE = aws_dynamodb_table.ontologies.name DYNAMO_ANSCESTORS_TABLE = aws_dynamodb_table.anscestor_terms.name DYNAMO_DESCENDANTS_TABLE = aws_dynamodb_table.descendant_terms.name - DYNAMO_ONTO_INDEX_TABLE = aws_dynamodb_table.ontology_terms.name } # layers binaries_layer = "${aws_lambda_layer_version.binaries_layer.layer_arn}:${aws_lambda_layer_version.binaries_layer.version}" diff --git a/shared_resources/python-modules/python/shared/athena/filters.py b/shared_resources/python-modules/python/shared/athena/filters.py index 9c02326..97a88d5 100644 --- a/shared_resources/python-modules/python/shared/athena/filters.py +++ b/shared_resources/python-modules/python/shared/athena/filters.py @@ -6,7 +6,10 @@ from .dataset import Dataset from .cohort import Cohort from .run import Run -from shared.dynamodb import Descendants, Anscestors +from shared.ontoutils import ( + get_term_ancestors_in_beacon, + get_term_descendants_in_beacon, +) from shared.utils import ENV_ATHENA from shared.apiutils import ( OntologyFilter, @@ -48,24 +51,6 @@ def _get_comparison_operator(filter: Union[AlphanumericFilter, OntologyFilter]): return "LIKE" if filter.operator == Operator.EQUAL else "NOT LIKE" -def _get_term_ancestors(term): - terms = set() - try: - terms.update(Anscestors.get(term).anscestors) - except Anscestors.DoesNotExist: - terms.add(term) - return terms - - -def _get_term_descendants(term: str): - terms = set() - try: - terms.update(Descendants.get(term).descendants) - except Descendants.DoesNotExist: - terms.add(term) - return terms - - def entity_search_conditions( filters: List[Union[OntologyFilter, AlphanumericFilter, CustomFilter]], id_type: str, @@ -117,12 +102,12 @@ def entity_search_conditions( if f.include_descendant_terms: # process inclusion of term descendants dependant on 'similarity' if f.similarity in (Similarity.HIGH or Similarity.EXACT): - expanded_terms = _get_term_descendants(f.id) + expanded_terms = get_term_descendants_in_beacon(f.id) else: # NOTE: this simplistic similarity method not nessisarily efficient or nessisarily desirable - ancestors = _get_term_ancestors(f.id) + ancestors = get_term_ancestors_in_beacon(f.id) ancestor_descendants = sorted( - [_get_term_descendants(a) for a in ancestors], key=len + [get_term_descendants_in_beacon(a) for a in ancestors], key=len ) if f.similarity == Similarity.MEDIUM: # all terms which have an ancestor half way up diff --git a/shared_resources/python-modules/python/shared/dynamodb/__init__.py b/shared_resources/python-modules/python/shared/dynamodb/__init__.py index d1de649..9873199 100644 --- a/shared_resources/python-modules/python/shared/dynamodb/__init__.py +++ b/shared_resources/python-modules/python/shared/dynamodb/__init__.py @@ -1,4 +1,3 @@ from .datasets import Dataset, VcfChromosomeMap -from .onto_index import OntoData, TableIndex, TableTermsIndex, TermIndex from .ontologies import Anscestors, Descendants, Ontology from .variant_queries import VariantQuery, VariantResponse, VariantResponseIndex, S3Location diff --git a/shared_resources/python-modules/python/shared/dynamodb/onto_index.py b/shared_resources/python-modules/python/shared/dynamodb/onto_index.py deleted file mode 100644 index 2b031ac..0000000 --- a/shared_resources/python-modules/python/shared/dynamodb/onto_index.py +++ /dev/null @@ -1,80 +0,0 @@ -import boto3 -from pynamodb.models import Model -from pynamodb.indexes import GlobalSecondaryIndex, AllProjection -from pynamodb.attributes import UnicodeAttribute - -from shared.utils import ENV_DYNAMO - - -SESSION = boto3.session.Session() -REGION = SESSION.region_name - - -# Terms index -class TermIndex(GlobalSecondaryIndex): - class Meta: - index_name = "term_index" - projection = AllProjection() - billing_mode = "PAY_PER_REQUEST" - region = REGION - - term = UnicodeAttribute(hash_key=True) - - -# TableNames -class TableIndex(GlobalSecondaryIndex): - class Meta: - index_name = "table_index" - projection = AllProjection() - billing_mode = "PAY_PER_REQUEST" - region = REGION - - tableName = UnicodeAttribute(hash_key=True) - - -# TableNames -class TableTermsIndex(GlobalSecondaryIndex): - class Meta: - index_name = "tableterms_index" - projection = AllProjection() - billing_mode = "PAY_PER_REQUEST" - region = REGION - - tableTerms = UnicodeAttribute(hash_key=True) - - -# ontoIndex table -class OntoData(Model): - class Meta: - table_name = ENV_DYNAMO.DYNAMO_ONTO_INDEX_TABLE - region = REGION - - id = UnicodeAttribute(hash_key=True) - tableTerms = UnicodeAttribute() - tableName = UnicodeAttribute() - columnName = UnicodeAttribute() - term = UnicodeAttribute() - label = UnicodeAttribute() - type = UnicodeAttribute() - - termIndex = TermIndex() - tableIndex = TableIndex() - tableTermsIndex = TableTermsIndex() - - @classmethod - def make_index_entry(cls, tableName, columnName, term, label, type): - id = f"{tableName}\t{columnName}\t{term}" - tableTerms = f"{tableName}\t{term}" - entry = OntoData(hash_key=id) - entry.tableName = tableName - entry.tableTerms = tableTerms - entry.columnName = columnName - entry.term = term - entry.label = label - entry.type = type - - return entry - - -if __name__ == "__main__": - pass diff --git a/shared_resources/python-modules/python/shared/ontoutils/__init__.py b/shared_resources/python-modules/python/shared/ontoutils/__init__.py new file mode 100644 index 0000000..cb70bad --- /dev/null +++ b/shared_resources/python-modules/python/shared/ontoutils/__init__.py @@ -0,0 +1,162 @@ +import json +import time +import urllib +from functools import lru_cache + +import requests + +from shared.dynamodb import Ontology, Descendants, Anscestors + + +ENSEMBL_OLS = "https://www.ebi.ac.uk/ols/api/ontologies" +ONTOSERVER = "https://r4.ontoserver.csiro.au/fhir/ValueSet/$expand" + + +@lru_cache() +def get_term_ancestors_in_beacon(term): + terms = set() + try: + terms.update(Anscestors.get(term).anscestors) + except Anscestors.DoesNotExist: + terms.add(term) + return terms + + +@lru_cache() +def get_term_descendants_in_beacon(term: str): + terms = set() + try: + terms.update(Descendants.get(term).descendants) + except Descendants.DoesNotExist: + terms.add(term) + return terms + + +@lru_cache() +def get_term_all_ancestors(term: str): + term, ancestors = request_hierarchy(term, True) + ancestors.add(term) + + return ancestors + + +@lru_cache() +def get_term_all_descendants(term: str): + term, descendants = request_hierarchy(term, False) + descendants.add(term) + + return descendants + + +@lru_cache() +def get_ontology_details(ontology) -> Ontology: + details = None + try: + details = Ontology.get(ontology) + except Ontology.DoesNotExist: + if ontology == "SNOMED": + # use ontoserver + details = Ontology(ontology.upper()) + details.data = json.dumps( + {"id": "SNOMED", "baseUri": "http://snomed.info/sct"} + ) + details.save() + else: + # use ENSEMBL + if response := requests.get(f"{ENSEMBL_OLS}/{ontology}"): + response_json = response.json() + details = Ontology(ontology.upper()) + details.data = json.dumps( + { + "id": response_json["ontologyId"].upper(), + "baseUri": response_json["config"]["baseUris"][0], + } + ) + details.save() + + return details + + +@lru_cache() +def request_ontoserver_hierarchy(term: str, ancestors=True): + snomed = "SNOMED" in term.upper() + retries = 1 + response = None + while (not response or response.status_code != 200) and retries < 10: + retries += 1 + response = requests.post( + ONTOSERVER, + json={ + "resourceType": "Parameters", + "parameter": [ + { + "name": "valueSet", + "resource": { + "resourceType": "ValueSet", + "compose": { + "include": [ + { + "system": "http://snomed.info/sct", + "filter": [ + { + "property": "concept", + "op": "generalizes" + if ancestors + else "descendent-of", + "value": f"{term.replace('SNOMED:', '')}", + } + ], + } + ] + }, + }, + } + ], + }, + ) + if response.status_code == 200: + response_json = response.json() + members = set() + for response_term in response_json["expansion"]["contains"]: + members.add( + "SNOMED:" + response_term["code"] + if snomed + else response_term["code"] + ) + return (term, members) + else: + time.sleep(1) + + raise Exception(f"Error fetching term from Ontoserver {term}") + + +@lru_cache() +def request_ensembl_hierarchy(term: str, ancestors=True): + ontology, code = term.split(":") + details = get_ontology_details(ontology) + # if no details available, it is probably not an ontology term + if not details: + return (term, set()) + + data = json.loads(details.data) + iri = data["baseUri"] + code + iri_double_encoded = urllib.parse.quote_plus(urllib.parse.quote_plus(iri)) + url = f"{ENSEMBL_OLS}/{ontology}/terms/{iri_double_encoded}/{'hierarchicalAncestors' if ancestors else 'hierarchicalDescendants'}" + + if response := requests.get(url): + response_json = response.json() + members = set() + for response_term in response_json["_embedded"]["terms"]: + obo_id = response_term["obo_id"] + if obo_id: + members.add(obo_id) + return (term, members) + + raise Exception(f"Error fetching term from Ensembl OLS {term}") + + +@lru_cache() +def request_hierarchy(term, ancestors): + if term.startswith("SNOMED"): + return request_ontoserver_hierarchy(term, ancestors) + return request_ensembl_hierarchy(term, ancestors)