-
Notifications
You must be signed in to change notification settings - Fork 121
/
mimic_querier.py
113 lines (92 loc) · 4.04 KB
/
mimic_querier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import copy, psycopg2, pandas as pd
# TODO(mmd): Where should this go?
# TODO(mmd): Rename
# TODO(mmd): eliminate try/except. Just use conditionals.
def get_values_by_name_from_df_column_or_index(data_df, colname):
""" Easily get values for named field, whether a column or an index
Returns
-------
values : 1D array
"""
try:
values = data_df[colname]
except KeyError as e:
if colname in data_df.index.names:
values = data_df.index.get_level_values(colname)
else:
raise e
return values
# TODO(mmd): Maybe make context manager?
class MIMIC_Querier():
def __init__(
self,
exclusion_criteria_template_vars={},
query_args={}, # passed wholesale to psycopg2.connect
schema_name='public,mimiciii'
):
""" A class to facilitate repeated Queries to a MIMIC psql database """
self.exclusion_criteria_template_vars = {}
self.query_args = query_args
self.schema_name = schema_name
self.connected = False
self.connection, self.cursor = None, None
# TODO(mmd): this isn't really doing exclusion criteria. Should maybe also absorb 'WHERE' clause...
def add_exclusion_criteria_from_df(self, df, columns=[]):
self.exclusion_criteria_template_vars.update({
c: "','".join(
set([str(v) for v in get_values_by_name_from_df_column_or_index(df, c)])
) for c in columns
})
def clear_exclusion_criteria(self): self.exclusion_criteria_template_vars = {}
def close(self):
if not self.connected: return
self.connection.close()
self.cursor.close() # TODO(mmd): Maybe don't actually need this to stay open?
self.connected = False
def connect(self):
self.close()
self.connection = psycopg2.connect(**self.query_args)
self.cursor = self.connection.cursor()
self.cursor.execute('SET search_path TO %s' % self.schema_name)
self.connected = True
def query(self, query_string=None, query_file=None, extra_template_vars={}):
assert query_string is not None or query_file is not None, "Must pass a query!"
assert query_string is None or query_file is None, "Must only pass one query!"
self.connect()
if query_string is None:
with open(query_file, mode='r') as f: query_string = f.read()
template_vars = copy.copy(self.exclusion_criteria_template_vars)
template_vars.update(extra_template_vars)
query_string = query_string.format(**template_vars)
out = pd.read_sql_query(query_string, self.connection)
self.close()
return out
def add_exclusion_criteria_from_df(self, df, columns=[]):
self.exclusion_criteria_template_vars.update({
c: "','".join(
set([str(v) for v in get_values_by_name_from_df_column_or_index(df, c)])
) for c in columns
})
def close(self):
if not self.connected: return
self.connection.close()
self.cursor.close() # TODO(mmd): Maybe don't actually need this to stay open?
self.connected = False
def connect(self):
self.close()
self.connection = psycopg2.connect(**self.query_args)
self.cursor = self.connection.cursor()
self.cursor.execute('SET search_path TO %s' % self.schema_name)
self.connected = True
def query(self, query_string=None, query_file=None, extra_template_vars={}):
assert query_string is not None or query_file is not None, "Must pass a query!"
assert query_string is None or query_file is None, "Must only pass one query!"
self.connect()
if query_string is None:
with open(query_file, mode='r') as f: query_string = f.read()
template_vars = copy.copy(self.exclusion_criteria_template_vars)
template_vars.update(extra_template_vars)
query_string = query_string.format(**template_vars)
out = pd.read_sql_query(query_string, self.connection)
self.close()
return out