Skip to content

Commit

Permalink
knowledge graph (#39)
Browse files Browse the repository at this point in the history
* knowledge graph initial commit

* error handling

* test for neo4j connection

* fix bug

* fix typo in test

* add code to populate knowledge graph

* fix bug in knowledge graph querying in job

* use correct terms in KG query

* hopefully faster date censoring

* fix bug in new date censoring, add test

* fix some more progress reporting bugs

* cache full query including synonyms

* make queries faster, remove debug code, neo4j etc

* retrieve PMIDs as strings

* handle synonyms and match by all words

* fix crash

* fix kg directionality

* comment out neo4j tests for now

* endpoint to cancel a job

Co-authored-by: rmillikin <rmillikin@morgridge.org>
  • Loading branch information
rmillikin and rmillikin authored Sep 21, 2022
1 parent 9e7f577 commit 5a0d1ed
Show file tree
Hide file tree
Showing 15 changed files with 459 additions and 137 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# change this to a directory on your local machine to store pubmed articles
PUBMED_DIR=/path/to/pubmed/folder
NEO4J_DIR=/path/to/neo4j/folder

# password hash (password is 'password' by default; to change it, you need
# to generate a hash yourself using bcrypt and put it here)
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ project/tests/htmlcov
./.env
*Icon*
**/Index/**
**venv**
**venv**
src/tests/test_data/indexer/neo4j**
13 changes: 13 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,18 @@ services:
networks:
- fast_km-network

neo4j:
image: neo4j
environment:
NEO4J_AUTH: neo4j/mypass
volumes:
- ${NEO4J_DIR}/data:/data
- ${NEO4J_DIR}/logs:/logs
ports:
- "7474:7474"
- "7687:7687"
networks:
- fast_km-network

networks:
fast_km-network:
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ rq-dashboard==0.6.1
scipy==1.7.1
flask-bcrypt==0.7.1
pymongo==4.0.1
Werkzeug==2.0.2
Werkzeug==2.0.2
neo4j==4.4.5
py2neo==2021.2.3
137 changes: 104 additions & 33 deletions src/indexing/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
from indexing.abstract_catalog import AbstractCatalog

delim = '\t'
logical_or = '/' # supports '/' to mean 'or'
logical_and = '&' # supports '&' to mean 'and'
mongo_cache = None

class Index():
def __init__(self, pubmed_abstract_dir: str):
# caches
self._query_cache = dict()
self._token_cache = dict()
self._date_censored_query_cache = dict()
self._n_articles_by_pub_year = dict()
_connect_to_mongo()

Expand All @@ -26,48 +29,59 @@ def __init__(self, pubmed_abstract_dir: str):
self._abstract_catalog = util.get_abstract_catalog(pubmed_abstract_dir)
self._byte_offsets = dict()
self._publication_years = dict()
self._date_censored_pmids = dict()
self._init_byte_info()
self._open_connection()

def close_connection(self) -> None:
self.connection.close()
self.file_obj.close()

def query_index(self, query: str) -> 'set[int]':
query = query.lower().strip()

if query in self._query_cache:
return self._query_cache[query]
def construct_abstract_set(self, term: str) -> set:
# TODO: support parenthesis for allowing OR and AND at the same time?
# e.g., "(cancer/carcinoma) & BRCA1"

term = sanitize_term(term)
is_cached, pmid_set = self.check_caches_for_term(term)

if is_cached:
return pmid_set

if logical_or in term:
terms = term.split(logical_or)
pmid_set = set()
for synonym in terms:
pmid_set.update(self._query_index(synonym))
elif logical_and in term:
terms = term.split(logical_and)
pmid_set = self._query_index(terms[0])
for t in terms[1:]:
pmid_set.intersection_update(self._query_index(t))
else:
result = _check_mongo_for_query(query)
if not isinstance(result, type(None)):
self._query_cache[query] = result
return result
pmid_set = self._query_index(term)

tokens = util.get_tokens(query)
if len(pmid_set) < 10000:
_place_in_mongo(term, pmid_set)
self._query_cache[term] = pmid_set

if len(tokens) > 100:
raise ValueError("Query must have <=100 words")
if not tokens:
return set()
return pmid_set

result = self._query_disk(tokens)
def censor_by_year(self, pmids: 'set[int]', censor_year: int, term: str) -> 'set[int]':
if censor_year not in self._date_censored_pmids:
censored_set = set()

if len(result) < 10000 or len(tokens) > 1:
_place_in_mongo(query, result)
for pmid, year in self._publication_years.items():
if year <= censor_year:
censored_set.add(pmid)
self._date_censored_pmids[censor_year] = censored_set

self._query_cache[query] = result
if (term, censor_year) in self._date_censored_query_cache:
return self._date_censored_query_cache[(term, censor_year)]

date_censored_pmid_set = self._date_censored_pmids[censor_year] & pmids
self._date_censored_query_cache[(term, censor_year)] = date_censored_pmid_set

return result

def censor_by_year(self, pmids: 'set[int]', censor_year: int) -> 'set[int]':
censored_set = set()

for pmid in pmids:
if self._publication_years[pmid] <= censor_year:
censored_set.add(pmid)

return censored_set
return date_censored_pmid_set

def n_articles(self, censor_year = math.inf) -> int:
"""Returns the number of indexed abstracts, given an optional
Expand Down Expand Up @@ -97,6 +111,44 @@ def decache_token(self, token: str):
if ltoken in self._query_cache:
del self._query_cache[ltoken]

def check_caches_for_term(self, term: str):
if term in self._query_cache:
# check RAM cache
return (True, self._query_cache[term])
else:
# check mongoDB cache
result = _check_mongo_for_query(term)
if not isinstance(result, type(None)):
self._query_cache[term] = result
return (True, result)

return (False, None)

def _query_index(self, query: str) -> 'set[int]':
query = util.sanitize_text(query)

is_cached, result = self.check_caches_for_term(query)
if is_cached:
return result

tokens = util.get_tokens(query)

if len(tokens) > 100:
print("Query failed, must have <=100 words; query was " + query)
return set()
# raise ValueError("Query must have <=100 words")
if not tokens:
return set()

result = self._query_disk(tokens)

if len(result) < 10000 or len(tokens) > 1:
_place_in_mongo(query, result)

self._query_cache[query] = result

return result

def _open_connection(self) -> None:
if not os.path.exists(self._bin_path):
print('warning: index does not exist and needs to be built')
Expand Down Expand Up @@ -217,7 +269,26 @@ def _check_if_mongo_should_be_refreshed(self, terms_to_check: 'list[str]' = ['fe

return False

def _intersect_dict_keys(dicts: 'list[dict]'):
def sanitize_term(term: str) -> str:
if logical_or in term or logical_and in term:
sanitized_subterms = []

if logical_or in term:
string_joiner = logical_or
elif logical_and in term:
string_joiner = logical_and

for subterm in term.split(string_joiner):
sanitized_subterms.append(util.sanitize_text(subterm))

sanitized_subterms.sort()
sanitized_term = str.join(string_joiner, sanitized_subterms)
else:
sanitized_term = util.sanitize_text(term)

return sanitized_term

def _intersect_dict_keys(dicts: 'list[dict]') -> None:
lowest_n_keys = sorted(dicts, key=lambda x: len(x))[0]
key_intersect = set(lowest_n_keys.keys())

Expand All @@ -232,7 +303,7 @@ def _intersect_dict_keys(dicts: 'list[dict]'):

return key_intersect

def _connect_to_mongo():
def _connect_to_mongo() -> None:
# TODO: set expiration time for cached items (72h, etc.?)
# mongo_cache.create_index('query', unique=True) #expireafterseconds=72 * 60 * 60,
global mongo_cache
Expand All @@ -246,7 +317,7 @@ def _connect_to_mongo():
print('warning: could not find a MongoDB instance to use as a query cache')
mongo_cache = None

def _check_mongo_for_query(query: str):
def _check_mongo_for_query(query: str) -> bool:
if not isinstance(mongo_cache, type(None)):
try:
result = mongo_cache.find_one({'query': query})
Expand All @@ -261,7 +332,7 @@ def _check_mongo_for_query(query: str):
else:
return None

def _place_in_mongo(query, result):
def _place_in_mongo(query: str, result: 'set[int]') -> None:
if not isinstance(mongo_cache, type(None)):
try:
mongo_cache.insert_one({'query': query, 'result': list(result)})
Expand All @@ -277,7 +348,7 @@ def _place_in_mongo(query, result):
else:
pass

def _empty_mongo():
def _empty_mongo() -> None:
if not isinstance(mongo_cache, type(None)):
x = mongo_cache.delete_many({})
print('mongodb cache cleared, ' + str(x.deleted_count) + ' items were deleted')
3 changes: 3 additions & 0 deletions src/indexing/km_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ def get_tokens(text: str) -> 'list[str]':
tokens = tokenizer.tokenize(l_text)
return tokens

def sanitize_text(text: str) -> str:
return str.join(' ', get_tokens(text))

def get_index_dir(abstracts_dir: str) -> str:
return os.path.join(abstracts_dir, 'Index')

Expand Down
Loading

0 comments on commit 5a0d1ed

Please sign in to comment.