-
Notifications
You must be signed in to change notification settings - Fork 11
/
pirate.py
1512 lines (1434 loc) · 68.2 KB
/
pirate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import datetime
import hashlib
import logging
import math
import multiprocessing
import re
import traceback
import warnings
from functools import partial
from pathlib import Path
from types import TracebackType
from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Type, Union
import pandas as pd
import requests
from dateutil.parser import parse
from requests.adapters import HTTPAdapter, Retry
from requests_cache import CachedSession
from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm
from fhir_pyrate import Ahoy
from fhir_pyrate.util import FHIRObj, string_from_column
from fhir_pyrate.util.bundle_processing_templates import flatten_data, parse_fhir_path
from fhir_pyrate.util.imports import optional_import
# Note to people from the future. This actually should be an optional import to avoid that, if people want to use another version of antlr, this creates crazy errors that the version does not match. In such cases, it is not possible to use fhirpathpy, but the processing functions can still be used.
fhirpathpy, _ = optional_import(module="fhirpathpy")
logger = logging.getLogger(__name__)
def create_key(request: requests.PreparedRequest, **kwargs: Any) -> str:
"""
Creates a unique key for each request URL.
:param request: The request to create a key for
:param kwargs: Unused, needed for compatibility with the library
:return: A string which is a hash of the request
"""
assert isinstance(request.url, str)
return hashlib.sha256(request.url.encode()).hexdigest()
class Pirate:
"""
Main class to query resources using the FHIR API.
:param base_url: The main URL where the FHIR server is located
:param auth: Either an authenticated instance of the Ahoy class, or an authenticated
requests.Session that can be used to communicate and to query resources
:param num_processes: The number of processes that should be used to run the query for the
functions that use multiprocessing
:param print_request_url: Whether the request URLs should be printed whenever we do a request
:param time_format: The time format used by the FHIR API
:param default_count: The default count of results per page used by the server
:param cache_folder: Whether the requests should be stored for later use, and where
:param cache_expiry_time: In case the cache is used, when should it expire
:param retry_requests: You can specify a requests.adapter.Retry instance to retry the requests
that are failing, an example could be
retry_requests=Retry(
total=3, # Retries for a total of three times
backoff_factor=0.5, # A backoff factor to apply between attempts, such that the requests
# are not run directly one after the other
status_forcelist=[500, 502, 503, 504], # HTTP status codes that we should force a retry on
allowed_methods=["GET"] # Set of uppercased HTTP method verbs that we should retry on
)
Complete set of parameters:
https://urllib3.readthedocs.io/en/stable/reference/urllib3.util.html
:param disable_multiprocessing_requests: Whether to disable multiprocessing for running the
requests with the FHIR server
:param disable_multiprocessing_build: Whether to disable multiprocessing when building the
DataFrame
:param silence_fhirpath_warning: Whether the FHIR path warning regarding already existing
expressions should be silenced
:param optional_get_params: Optional parameters that will be passed to the session's get calls
"""
FHIRPATH_INVALID_TOKENS = [
"div",
"mod",
"in",
"and",
"or",
"xor",
"implies",
]
def __init__(
self,
base_url: str,
auth: Optional[Union[requests.Session, Ahoy]],
num_processes: int = 1,
print_request_url: bool = False,
time_format: str = "%Y-%m-%dT%H:%M",
default_count: int = None,
cache_folder: Union[str, Path] = None,
cache_expiry_time: Union[datetime.datetime, int] = -1, # -1 = does not expire
retry_requests: Retry = None,
disable_multiprocessing_requests: bool = False,
disable_multiprocessing_build: bool = False,
silence_fhirpath_warning: bool = False,
optional_get_params: Dict[Any, Any] = None,
):
# Remove the last character if they added it
url_search = re.search(
pattern=r"(https?:\/\/([^\/]+))([\w\.\-~\/]*)", string=base_url
)
if url_search is None:
raise ValueError(
"The given URL does not follow the validation RegEx. Was your URL "
"written correctly? If it is, please create an issue."
)
self.base_url = url_search.group(1)
self.domain = url_search.group(2)
self.fhir_app_location = (
url_search.group(3)
if len(url_search.group(3)) > 0 and url_search.group(3)[-1] == "/"
else url_search.group(3) + "/"
)
self._close_session_on_exit = False
if isinstance(auth, Ahoy):
self.session = auth.session
elif isinstance(auth, requests.Session):
self.session = auth
else:
self._close_session_on_exit = True
self.session = requests.session()
self.disable_multiprocessing_requests = disable_multiprocessing_requests
self.disable_multiprocessing_build = disable_multiprocessing_build
if num_processes == 1:
self.disable_multiprocessing_requests = True
self.disable_multiprocessing_build = True
self.caching = False
if cache_folder is not None:
# TODO: Change this to work with context managers
session = CachedSession(
str(Path(cache_folder) / "fhir_pyrate"),
# cache_control=False,
# Use Cache-Control response headers for expiration, if available
expire_after=cache_expiry_time, # Otherwise expire responses after one day
allowable_codes=[
200,
400,
], # Cache 400 responses as a solemn reminder of your failures
allowable_methods=["GET"], # Cache whatever HTTP methods you want
ignored_parameters=["api_key"],
# # Don't match this request param, and redact if from the cache
match_headers=[
"Accept-Language"
], # Cache a different response per language
stale_if_error=True, # In case of request errors, use stale cache data if possible
key_fn=create_key,
)
session.auth = self.session.auth
self.session = session
self.caching = True
self.disable_multiprocessing_requests = True
logger.warning(
"Request caching and multiprocessing cannot be run together."
)
if self.disable_multiprocessing_requests and self.disable_multiprocessing_build:
self.num_processes = 1
else:
self.num_processes = num_processes
if retry_requests is not None:
self.session.mount(self.base_url, HTTPAdapter(max_retries=retry_requests))
self.optional_get_params = (
optional_get_params if optional_get_params is not None else {}
)
self._print_request_url = print_request_url
self._time_format = time_format
self._default_count = default_count
self.silence_fhirpath_warning = silence_fhirpath_warning
##############################
# MAIN FUNCTIONS #
##############################
def get_bundle_total(
self,
resource_type: str,
request_params: Dict[str, Any] = None,
count_entries: bool = False,
) -> Optional[int]:
"""
Perform a request to return the total amount of bundles for a query.
:param resource_type: The resource to query, e.g. Patient, DiagnosticReport
:param request_params: The parameters for the current request
:param count_entries: Whether the number of entries should be counted instead
:return: The number of entries for this bundle, either given by the total attribute or by
counting the number of entries
"""
return self._get_total_from_bundle(
bundle=self._get_response(
self._build_request_url(
resource_type=resource_type, request_params=request_params or {}
)
),
count_entries=count_entries,
)
def steal_bundles(
self,
resource_type: str,
request_params: Dict[str, Any] = None,
num_pages: int = -1,
) -> Generator[FHIRObj, None, int]:
"""
Executes a request, iterates through the result pages and returns all the bundles as a
generator.
:param resource_type: The resource to be queried, e.g. DiagnosticReport
:param request_params: The parameters for the query, e.g. _count, _id
:param num_pages: The number of pages of bundles that should be returned, the default is
-1 (all bundles), with any other value exactly that value of bundles will be returned,
assuming that there are that many
:return: A Generator of FHIR bundles containing the queried information
"""
request_params = {} if request_params is None else request_params.copy()
with logging_redirect_tqdm():
return self._get_bundles(
resource_type=resource_type,
request_params=request_params,
num_pages=num_pages,
silence_tqdm=False,
tqdm_df_build=False,
)
def steal_bundles_to_dataframe(
self,
resource_type: str,
request_params: Dict[str, Any] = None,
num_pages: int = -1,
process_function: Callable[[FHIRObj], Any] = flatten_data,
fhir_paths: List[Union[str, Tuple[str, str]]] = None,
build_df_after_query: bool = False,
) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]:
"""
Executes a request, iterates through the result pages, and builds a DataFrame with their
information. The DataFrames are either built after each
bundle is retrieved, or after we collected all bundles.
:param resource_type: The resource to be queried, e.g. DiagnosticReport
:param request_params: The parameters for the query, e.g. _count, _id
:param num_pages: The number of pages of bundles that should be returned, the default is
-1 (all bundles), with any other value exactly that value of bundles will be returned,
assuming that there are that many
:param process_function: The transformation function going through the entries and
storing the entries to save
:param fhir_paths: A list of FHIR paths (https://hl7.org/fhirpath/) to be used to build the
DataFrame, alternatively, a list of tuples can be used to specify the column name of the
future column with (column_name, fhir_path). Please refer to the `bundles_to_dataframe`
functions for notes on how to use the FHIR paths
:param build_df_after_query: Whether the DataFrame should be built after all bundles have
been collected, or whether the bundles should be transformed just after retrieving
:return: A DataFrame per queried resource. In case only once resource is queried, then only
one dictionary is given back, otherwise a dictionary of (resourceType, DataFrame) is
returned.
"""
return self._query_to_dataframe(self._get_bundles)(
resource_type=resource_type,
request_params=request_params,
num_pages=num_pages,
silence_tqdm=False,
process_function=process_function,
fhir_paths=fhir_paths,
build_df_after_query=build_df_after_query,
disable_multiprocessing_build=True,
always_return_dict=False,
)
def sail_through_search_space(
self,
resource_type: str,
time_attribute_name: str,
date_init: Union[str, datetime.date],
date_end: Union[str, datetime.date],
request_params: Dict[str, Any] = None,
) -> Generator[FHIRObj, None, int]:
"""
Uses the multiprocessing module to speed up some queries. The time frame is
divided into multiple time spans (as many as there are processes) and each smaller
time frame is investigated simultaneously.
:param resource_type: The resource to query, e.g. Patient, DiagnosticReport
:param time_attribute_name: The time attribute that should be used to define the
timespan; e.g. `started` for ImagingStudy, `date` for DiagnosticReport; `_lastUpdated`
should be able to be used by all queries
:param date_init: The start of the timespan for `time_attribute_name` (inclusive)
:param date_end: The end of the timespan for `time_attribute_name` (exclusive)
:param request_params: The parameters for the query, e.g. `_count`, `_id`, `_sort`
:return: A Generator containing FHIR bundles with the queried information for all timespans
"""
return self._sail_through_search_space(
resource_type=resource_type,
time_attribute_name=time_attribute_name,
date_init=date_init,
date_end=date_end,
request_params=request_params,
tqdm_df_build=False,
)
def sail_through_search_space_to_dataframe(
self,
resource_type: str,
time_attribute_name: str,
date_init: Union[str, datetime.date],
date_end: Union[str, datetime.date],
request_params: Dict[str, Any] = None,
process_function: Callable[[FHIRObj], Any] = flatten_data,
fhir_paths: List[Union[str, Tuple[str, str]]] = None,
build_df_after_query: bool = False,
) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]:
"""
Uses the multiprocessing module to speed up some queries. The time frame is
divided into multiple time spans (as many as there are processes) and each smaller
time frame is investigated simultaneously. Finally, it builds a DataFrame with the
information from all timespans. The DataFrames are either built after each
bundle is retrieved, or after we collected all bundles.
:param resource_type: The resource to query, e.g. Patient, DiagnosticReport
:param time_attribute_name: The time attribute that should be used to define the
timespan; e.g. `started` for ImagingStudy, `date` for DiagnosticReport; `_lastUpdated`
should be able to be used by all queries
:param date_init: The start of the timespan for `time_attribute_name` (inclusive)
:param date_end: The end of the timespan for `time_attribute_name` (exclusive)
:param request_params: The parameters for the query, e.g. `_count`, `_id`, `_sort`
:param process_function: The transformation function going through the entries and
storing the entries to save
:param fhir_paths: A list of FHIR paths (https://hl7.org/fhirpath/) to be used to build the
DataFrame, alternatively, a list of tuples can be used to specify the column name of the
future column with (column_name, fhir_path). Please refer to the `bundles_to_dataframe`
functions for notes on how to use the FHIR paths
:param build_df_after_query: Whether the DataFrame should be built after all bundles have
been collected, or whether the bundles should be transformed just after retrieving
:return: A DataFrame per queried resource for all timestamps. In case only once resource
is queried, then only one dictionary is given back, otherwise a dictionary of
(resourceType, DataFrame) is returned.
"""
return self._query_to_dataframe(self._sail_through_search_space)(
resource_type=resource_type,
request_params=request_params,
time_attribute_name=time_attribute_name,
date_init=date_init,
date_end=date_end,
process_function=process_function,
fhir_paths=fhir_paths,
build_df_after_query=build_df_after_query,
always_return_dict=False,
)
def trade_rows_for_bundles(
self,
df: pd.DataFrame,
resource_type: str,
df_constraints: Dict[
str, Union[Union[str, Tuple[str, str]], List[Union[str, Tuple[str, str]]]]
],
request_params: Dict[str, Any] = None,
num_pages: int = -1,
) -> Generator[FHIRObj, None, int]:
"""
Go through the rows of a DataFrame (with multiprocessing), run a query and retrieve
bundles for each row.
:param df: The DataFrame with the queries
:param resource_type: The resource to query, e.g. Patient, DiagnosticReport
:param df_constraints: A dictionary containing a mapping between the FHIR attributes and
the columns of the input DataFrame, e.g. {"subject" : "fhir_patient_id"}, where subject
is the FHIR attribute and fhir_patient_id is the name of the column. It is also possible
to add the system by using a tuple instead of a string, e.g. "code": (
"http://loinc.org", "loinc_code")
Possible structures:
{"code": "code_column"}
{"code": ("code_system", "code_column")}
{"date": ["init_date_column", "end_date_column"]}
{"date": [("ge", "init_date_column"), ("le", "end_date_column")]}
:param request_params: The parameters for the query, e.g. _count, _id
:param num_pages: The number of pages of bundles that should be returned, the default is
-1 (all bundles), with any other value exactly that value of bundles will be returned,
assuming that there are that many
:return: A Generator containing FHIR bundles with the queried information for all rows
"""
return self._trade_rows_for_bundles(
df=df,
resource_type=resource_type,
df_constraints=df_constraints,
request_params=request_params,
num_pages=num_pages,
tqdm_df_build=False,
)
def trade_rows_for_dataframe(
self,
df: pd.DataFrame,
resource_type: str,
df_constraints: Dict[
str, Union[Union[str, Tuple[str, str]], List[Union[str, Tuple[str, str]]]]
],
process_function: Callable[[FHIRObj], Any] = flatten_data,
fhir_paths: List[Union[str, Tuple[str, str]]] = None,
request_params: Dict[str, Any] = None,
num_pages: int = -1,
with_ref: bool = True,
with_columns: List[Union[str, Tuple[str, str]]] = None,
build_df_after_query: bool = False,
) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]:
"""
Go through the rows of a DataFrame (with multiprocessing), run a query, retrieve
bundles for each row and transform them into a DataFrame.
The DataFrames can be computed in different ways:
1. The bundles are retrieved and the DataFrame is computed straight away, which can be
obtained by setting `build_df_after_query` to False. If `with_ref` is True, then the
DataFrame is always computed like this.
2. If `build_df_after_query` is True and `with_ref` is False, then first all bundles
will be retrieved, and then they will be processed into a DataFrame.
:param df: The DataFrame with the queries
:param resource_type: The resource to query, e.g. Patient, DiagnosticReport
:param df_constraints: A dictionary containing a mapping between the FHIR attributes and
the columns of the input DataFrame, e.g. {"subject" : "fhir_patient_id"}, where subject
is the FHIR attribute and fhir_patient_id is the name of the column. It is also possible
to add the system by using a tuple instead of a string, e.g. "code": (
"http://loinc.org", "loinc_code")
Possible structures:
{"code": "code_column"}
{"code": ("code_system", "code_column")}
{"date": ["init_date_column", "end_date_column"]}
{"date": [("ge", "init_date_column"), ("le", "end_date_column")]}
:param request_params: The parameters for the query, e.g. _count, _id
:param num_pages: The number of pages of bundles that should be returned per request,
the default is -1 (all bundles), with any other value exactly that value of bundles will
be returned, assuming that there are that many
:param with_ref: Whether the input columns of `df_constraints` should be added to the
output DataFrame
:param with_columns: Whether additional columns from the source DataFrame should be
added to output DataFrame. The columns from the source DataFrame can be either specified
as a list of columns `[col1, col2, ...]` or as a list of tuples
`[(new_name_for_col1, col1), (new_name_for_col2, col2), ...]`
:param process_function: The transformation function going through the entries and
storing the entries to save
:param fhir_paths: A list of FHIR paths (https://hl7.org/fhirpath/) to be used to build the
DataFrame, alternatively, a list of tuples can be used to specify the column name of the
future column with (column_name, fhir_path). Please refer to the `bundles_to_dataframe`
functions for notes on how to use the FHIR paths
:param build_df_after_query: Whether the DataFrame should be built after all bundles have
been collected, or whether the bundles should be transformed just after retrieving
:return: A DataFrame per queried resource which contains information about all rows.
In case only once resource is queried, then only one dictionary is given back, otherwise
a dictionary of (resourceType, DataFrame) is returned.
"""
with logging_redirect_tqdm():
if fhir_paths is not None:
logger.info(
f"The selected process_function {process_function.__name__} will be "
f"overwritten."
)
process_function = self._set_up_fhirpath_function(fhir_paths)
if not with_ref and not with_columns:
return self._bundles_to_dataframe(
bundles=self._trade_rows_for_bundles(
df=df,
resource_type=resource_type,
df_constraints=df_constraints,
request_params=request_params,
num_pages=num_pages,
tqdm_df_build=not build_df_after_query,
),
process_function=process_function,
build_df_after_query=build_df_after_query,
disable_multiprocessing=self.disable_multiprocessing_build,
always_return_dict=False,
)
if self.caching and self.num_processes > 1:
logger.info(
"In trade_rows_for_dataframe with multiprocessing, each row is handled by a "
"separate process, which sends the request and builds a Dataframe. "
"Since caching does not support multiprocessing, this function will now run on "
"a single process."
)
else:
logger.info(
f"Querying each row of the DataFrame with {self.num_processes} processes."
)
request_params = {} if request_params is None else request_params.copy()
adjusted_constraints = self._adjust_df_constraints(df_constraints)
req_params_per_sample = self._get_request_params_for_sample(
df=df,
request_params=request_params,
df_constraints=adjusted_constraints,
)
# Reformat the with_columns attribute such that it becomes a list of tuples
# (new_name, current_name)
# The columns to be added are either as elements in the list
# [col1, col2, ...]
# or as second argument of a tuple
# [(renamed_col_1, col1), ...]
with_columns_adjusted = (
[(col, col) if isinstance(col, str) else col for col in with_columns]
if with_columns is not None
else []
)
# Create a dictionary to rename the columns
with_columns_rename = {
col: col_rename for col_rename, col in with_columns_adjusted
}
# Also go through the df_constraints, in case they are not in the list for renaming
for _, list_of_constraints in adjusted_constraints.items():
for _, value in list_of_constraints:
if value not in with_columns_rename:
with_columns_rename[value] = value
# Prepare the inputs that will end up in the final dataframe
input_params_per_sample = [
{
**{
# The name of the parameter will be the same as the column name
# The value will be the same as the value in that column for that row
value: row[df.columns.get_loc(value)]
# Concatenate the given system identifier string with the desired
# identifier
for _, list_of_constraints in adjusted_constraints.items()
for _, value in list_of_constraints
},
# Add other columns from with_columns
**{
col: row[df.columns.get_loc(col)]
for _, col in with_columns_adjusted
},
}
for row in df.itertuples(index=False)
]
# Add all the parameters needed by the steal_bundles function
params_per_sample = [
{
"resource_type": resource_type,
"request_params": req_sample,
"num_pages": num_pages,
"silence_tqdm": True,
}
for req_sample in req_params_per_sample
]
final_dfs: Dict[str, List[pd.DataFrame]] = {}
tqdm_text = f"Query & Build DF ({resource_type})"
if (
self.disable_multiprocessing_requests
or self.disable_multiprocessing_build
):
# If we don't want multiprocessing
for param, input_param in tqdm(
zip(params_per_sample, input_params_per_sample),
total=len(params_per_sample),
desc=tqdm_text,
):
# Get the dataframe
found_dfs = self._query_to_dataframe(self._get_bundles)(
process_function=process_function,
build_df_after_query=False,
disable_multiprocessing_build=True,
always_return_dict=True,
**param,
)
for resource_type, found_df in found_dfs.items():
final_dfs.setdefault(resource_type, [])
self._copy_existing_columns(
df=found_df,
input_params=input_param,
key_mapping=with_columns_rename,
)
final_dfs[resource_type].append(found_df)
else:
pool = multiprocessing.Pool(self.num_processes)
results = []
for param, input_param in tqdm(
zip(params_per_sample, input_params_per_sample),
total=len(params_per_sample),
desc=tqdm_text,
):
# Add the functions that we want to run
results.append(
(
pool.apply_async(
self._bundles_to_dataframe,
kwds=dict(
bundles=[b for b in self._get_bundles(**param)],
process_function=process_function,
build_df_after_query=False,
disable_multiprocessing=True,
always_return_dict=True,
),
),
input_param,
)
)
for async_result, input_param in results:
# Get the results and build the dataframes
found_dfs = async_result.get()
for resource_type, found_df in found_dfs.items():
final_dfs.setdefault(resource_type, [])
self._copy_existing_columns(
df=found_df,
input_params=input_param,
key_mapping=with_columns_rename,
)
final_dfs[resource_type].append(found_df)
pool.close()
pool.join()
dfs = {
resource_type: pd.concat(final_dfs[resource_type], ignore_index=True)
for resource_type in final_dfs
}
return list(dfs.values())[0] if len(dfs) == 1 else dfs
def bundles_to_dataframe(
self,
bundles: Union[List[FHIRObj], Generator[FHIRObj, None, int]],
process_function: Callable[[FHIRObj], Any] = flatten_data,
fhir_paths: List[Union[str, Tuple[str, str]]] = None,
) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]:
"""
Convert a bundle into a DataFrame using either the `flatten_data` function (default),
FHIR paths or a custom processing function. For the case of `flatten_data` and the FHIR
paths, each row of the DataFrame will represent a resource. In the custom processing
function each bundle can be handled as one pleases.
:param bundles: The bundles to transform
:param process_function: The transformation function going through the entries and
storing the entries to save
:param fhir_paths: A list of FHIR paths (https://hl7.org/fhirpath/) to be used to build the
DataFrame, alternatively, a list of tuples can be used to specify the column name of the
future column with (column_name, fhir_path).
:return: A DataFrame per queried resource. In case only once resource is queried, then only
one dictionary is given back, otherwise a dictionary of (resourceType, DataFrame) is
returned.
**NOTE 1 on FHIR paths**: The standard also allows some primitive math operations such as
modulus (`mod`) or integer division (`div`), and this may be problematic if there are
fields of the resource that use these terms as attributes.
It is actually the case in many generated
[public FHIR resources](https://hapi.fhir.org/baseDstu2/DiagnosticReport/133015).
In this case the term `text.div` cannot be used, and you should use a processing function
instead (as in 2.).
**NOTE 2 on FHIR paths**: Since it is possible to specify the column name with a tuple
`(key, fhir_path)`, it is important to know that if a key is used multiple times for different
pieces of information but for the same resource, the field will be only filled with the first
occurence that is not None.
```python
df = search.steal_bundles_to_dataframe(
resource_type="DiagnosticReport",
request_params={
"_count": 1,
"_include": "DiagnosticReport:subject",
},
# CORRECT EXAMPLE
# In this case subject.reference is None for patient, so all patients will have their Patient.id
fhir_paths=[("patient", "subject.reference"), ("patient", "Patient.id")],
# And Patient.id is None for DiagnosticReport, so they will have their subject.reference
# fhir_paths=[("patient", "Patient.id"), ("patient", "subject.reference")],
# WRONG EXAMPLE
# In this case, only the first code will be stored
# fhir_paths=[("code", "code.coding[0].code"), ("code", "code.coding[1].code")],
# CORRECT EXAMPLE
# Whenever we are working with codes, it is usually better to use the `where` argument
# and to store the values using a meaningful name
# fhir_paths=[
# ("code_abc", "code.coding.where(system = 'ABC').code"),
# ("code_def", "code.coding.where(system = 'DEF').code")
# ],
num_pages=1,
)
"""
with logging_redirect_tqdm():
if fhir_paths is not None:
logger.info(
f"The selected process_function {process_function.__name__} will be "
f"overwritten."
)
process_function = self._set_up_fhirpath_function(fhir_paths)
return self._bundles_to_dataframe(
bundles=bundles,
process_function=process_function,
build_df_after_query=True,
disable_multiprocessing=self.disable_multiprocessing_build,
always_return_dict=False,
)
@staticmethod
def smash_rows(
df: pd.DataFrame,
group_by_col: str,
separator: str = ", ",
unique: bool = False,
sort: bool = False,
sort_reverse: bool = False,
) -> pd.DataFrame:
"""
Group a DataFrame by a certain row and summarize all results of the involved rows into a
single cell, separated by a predefined separator.
:param df: The DataFrame to smash
:param group_by_col: The column we should use to group by
:param separator: The separator for the values
:param unique: Whether only unique values should be stored
:param sort: Whether the values should be sorted
:param sort_reverse: Whether the values should sorted in reverse order
:return: A DataFrame containing where the rows that have been grouped are now in a single
row
"""
return df.groupby(group_by_col, as_index=False).agg(
{
col: partial(
string_from_column,
separator=separator,
unique=unique,
sort=sort,
sort_reverse=sort_reverse,
)
for col in df.columns
}
)
##############################
# CONTEXT HANDLING #
##############################
def __enter__(self) -> "Pirate":
return self
def close(self) -> None:
# Only close the session if it does not come from an authentication class
if self._close_session_on_exit:
self.session.close()
def __exit__(
self,
exctype: Optional[Type[BaseException]],
excinst: Optional[BaseException],
exctb: Optional[TracebackType],
) -> None:
self.close()
##############################
# REQUEST PARAMETER HANDLING #
##############################
@staticmethod
def _concat_request_params(request_params: Dict[str, Any]) -> str:
"""
Concatenates the parameters to create a request string.
:param request_params: The parameters that should be used for the request
:return: The concatenated string for the request
"""
if "history" in request_params or "_id" in request_params:
if "history" in request_params:
param = "history"
else:
param = "_id"
found_param = (
request_params[param]
if not isinstance(request_params[param], List)
else next(iter(request_params[param]))
)
assert isinstance(found_param, str)
if "history" in request_params:
return f"{found_param}/_history"
else:
return found_param
params = [
f"{k}={v}"
for k, v in sorted(request_params.items())
if not isinstance(v, (list, tuple))
]
params += [
f"{k}={element}"
for k, v in sorted(request_params.items())
if isinstance(v, (list, tuple))
for element in v
]
return "&".join(params)
@staticmethod
def _adjust_df_constraints(
df_constraints: Dict[
str, Union[Union[str, Tuple[str, str]], List[Union[str, Tuple[str, str]]]]
]
) -> Dict[str, List[Tuple[str, str]]]:
"""
Adjust the constraint dictionary to always have the same structure, which makes it easier
to parse it for other function.
Possible structures:
{"code": "code_column"}
{"code": ("code_system", "code_column")}
{"date": ["init_date_column", "end_date_column"]}
{"date": [("ge", "init_date_column"), ("le", "end_date_column")]}
:param df_constraints: A dictionary that specifies the constraints that should be applied
during a search query and that refer to a DataFrame
:return: A standardized request dictionary
"""
# First make sure that everything is transformed into a dictionary of lists
df_constraints_list: Dict[str, List[Union[str, Tuple[str, str]]]] = {
fhir_identifier: (
[possible_list]
if not isinstance(possible_list, List)
else possible_list
)
for fhir_identifier, possible_list in df_constraints.items()
}
# Then handle the internal tuples
return {
fhir_identifier: [
("", column_constraint)
if isinstance(column_constraint, str)
else (
column_constraint[0]
+ (
"%7C"
if "http" in column_constraint[0]
and "%7C" not in column_constraint[0]
else ""
),
column_constraint[1],
)
for column_constraint in list_of_constraints
]
for fhir_identifier, list_of_constraints in df_constraints_list.items()
}
@staticmethod
def _get_request_params_for_sample(
df: pd.DataFrame,
request_params: Dict[str, Any],
df_constraints: Dict[str, List[Tuple[str, str]]],
) -> List[Dict[str, List[str]]]:
"""
Builds the request parameters for each sample by checking the constraint set on each row.
The resulting request parameters are given by the general `request_params` and by the
constraint given by each row. E.g. if df_constraints = {"subject": "patient_id"}, then
the resulting list will contain {"subject": row.patient_id} for each row of the DataFrame.
:param df: The DataFrame that contains the constraints that should be applied
:param request_params: The parameters for the query that do not depend on the DataFrame
:param df_constraints: A dictionary that specifies the constraints that should be applied
during a search query and that refer to a DataFrame
:return: A list of dictionary constraint for each row of the DataFrame
"""
for _, list_of_constraints in df_constraints.items():
for _, value in list_of_constraints:
if df[value].isnull().any():
raise ValueError(
f"The column {value} contains NaN values, "
f"and thus it cannot be used to build queries."
)
return [
dict(
{
fhir_identifier: [
(modifier + str(row[df.columns.get_loc(value)]).split("/")[-1])
if fhir_identifier == "_id"
else modifier + str(row[df.columns.get_loc(value)])
for modifier, value in list_of_constraints
]
# Concatenate the given system identifier string with the desired identifier
for fhir_identifier, list_of_constraints in df_constraints.items()
},
**request_params,
)
for row in df.itertuples(index=False)
]
def _get_timespan_list(
self, date_init: str, date_end: str
) -> List[Tuple[str, str]]:
"""
Divides a timespan into equal parts according to the number of processes selected.
:param date_init: Beginning of the timespan
:param date_end: End of the timespan
:return: A list of sub periods that divide the timespan into equal parts
"""
timespans = (
pd.date_range(date_init, date_end, periods=(self.num_processes + 1))
.strftime(self._time_format)
.tolist()
)
# Convert the list into tuples
return [(timespans[i], timespans[i + 1]) for i in range(len(timespans) - 1)]
def _return_count_from_request(
self, request_params: Dict[str, Any]
) -> Optional[int]:
"""
Return the number of expected resources per page. If count has been defined in the
request parameters, return it, otherwise choose the default count that has been given as
input to the class.
:param request_params: The parameters for the current request
:return: The number of resources that shall be returned for every page
"""
return (
int(request_params["_count"])
if "_count" in request_params
else self._default_count
)
##############################
# BUNDLE HANDLING #
##############################
def _get_response(self, request_url: str) -> Optional[FHIRObj]:
"""
Performs the API request and returns the response as a dictionary.
:param request_url: The request string
:return: A FHIR bundle
"""
try:
response = self.session.get(request_url, **self.optional_get_params)
if self._print_request_url:
tqdm.write(request_url)
response.raise_for_status()
json_response = response.json()
# If it's a bundle return it
if json_response.get("resourceType") == "Bundle":
return FHIRObj(**json_response)
# Otherwise it's a read operation (are there other options?)
# and we should convert it to a bundle for the sake of consistency
else:
return FHIRObj(
**{
"resourceType": "Bundle",
"type": "read",
"total": 1,
"entry": [
{
"full_url": request_url,
"resource": json_response,
}
],
}
)
except Exception:
# Leave this to be able to quickly see the errors
logger.error(traceback.format_exc())
return None
@staticmethod
def _get_total_from_bundle(
bundle: Optional[FHIRObj],
count_entries: bool = False,
) -> Optional[int]:
"""
Return the total attribute of a bundle or the number of entries.
:param bundle: The bundle for which we need to find the total
:param count_entries: Whether the number of entries should be counted instead
:return: The total attribute for this bundle or the total number of entries
"""
if bundle is not None:
if count_entries and bundle.entry is not None:
return len(bundle.entry)
if bundle.total is not None:
assert isinstance(bundle.total, int)
return bundle.total
return None
def _build_request_url(
self, resource_type: str, request_params: Dict[str, Any]
) -> str:
"""
Use the resource type and the request parameters to build the final request URL.
:param resource_type: The resource to call
:param request_params: The parameters for the request
:return: The URL for the request
"""
request_params_string = self._concat_request_params(request_params)
return (
f"{self.base_url}{self.fhir_app_location}{resource_type}"
f"{'/' if ('history' in request_params or '_id' in request_params) else '?'}{request_params_string}"
)
def _get_bundles(
self,
resource_type: str,
request_params: Dict[str, Any] = None,
num_pages: int = -1,
silence_tqdm: bool = False,
tqdm_df_build: bool = False,
) -> Generator[FHIRObj, None, int]:
"""
Executes a request, iterates through the result pages and returns all the bundles as a
generator.
Additionally, some checks are performed, and the corresponding warnings are returned:
- Whether a sorting has been defined