Skip to content

Commit

Permalink
Update requirements.txt
Browse files Browse the repository at this point in the history
  • Loading branch information
gdevanla committed Jun 24, 2022
1 parent f5f9f65 commit 337575a
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 4,560 deletions.
14 changes: 7 additions & 7 deletions dwh/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# See https://stackoverflow.com/questions/33533148 why this is needed.
from __future__ import annotations

from typing import List
import typing as tp
import pandas

class ObsConstraints:
Expand All @@ -36,7 +36,7 @@ class ObsConstraints:
def __init__(
self,
code: str,
values: List[str] = None,
values: tp.List[str] = None,
value_sys: str = None,
min_value: float = None,
max_value: float = None,
Expand All @@ -61,9 +61,9 @@ class EncounterConstraints:

def __init__(
self,
locationId: List[str] = None,
locationId: tp.List[str] = None,
typeSystem: str = None,
typeCode: List[str] = None,
typeCode: tp.List[str] = None,
):
self.location_id = locationId
self.type_system = typeSystem
Expand Down Expand Up @@ -123,7 +123,7 @@ def include_obs_in_value_and_time_range(
def include_obs_values_in_time_range(
self,
code: str,
values: List[str] = None,
values: tp.List[str] = None,
min_time: str = None,
max_time: str = None,
) -> PatientQuery:
Expand All @@ -148,9 +148,9 @@ def include_all_other_codes(

def encounter_constraints(
self,
locationId: List[str] = None,
locationId: tp.List[str] = None,
typeSystem: str = None,
typeCode: List[str] = None,
typeCode: tp.List[str] = None,
):
"""Specifies constraints on encounters to be included.
Expand Down
87 changes: 42 additions & 45 deletions dwh/query_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,63 +22,60 @@

# See https://stackoverflow.com/questions/33533148 why this is needed.
from __future__ import annotations
from enum import Enum
from typing import List, Any, Type, Optional
import pandas
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
import pyspark.sql.types as T
from query_lib_big_query import _BigQueryPatientQuery
from query_lib_spark import _SparkPatientQuery
from base import PatientQuery

import common
import enum
import typing as tp
import base
import query_lib_big_query
import query_lib_spark


# This separator is used to merge date and values into one string.
DATE_VALUE_SEPARATOR = '_SeP_'
DATE_VALUE_SEPARATOR = "_SeP_"


def merge_date_and_value(d: str, v: Any) -> str:
return '{}{}{}'.format(d, DATE_VALUE_SEPARATOR, v)
def merge_date_and_value(d: str, v: tp.Any) -> str:
return "{}{}{}".format(d, DATE_VALUE_SEPARATOR, v)


class Runner(Enum):
SPARK = 1
BIG_QUERY = 2
#FHIR_SERVER = 3
class Runner(enum.Enum):
SPARK = 1
BIG_QUERY = 2
# FHIR_SERVER = 3


def patient_query_factory(
runner: Runner,
data_source: str,
code_system: Optional[str] = None,
project_name: Optional[str] = None,
) -> PatientQuery:

"""Returns the right instance of `PatientQuery` based on `data_source`.
Args:
runner: The runner to use for making data queries
data_source: The definition of the source, e.g., directory containing
Parquet files or a BigQuery dataset.
project_name: The GoogleCloud project name. This field is required if
is used as data source.
Returns:
The created instance.
Raises:
ValueError: When the input `data_source` is malformed or not implemented.
"""
if runner == Runner.SPARK:
return _SparkPatientQuery(data_source, code_system)
if runner == Runner.BIG_QUERY:
return _BigQueryPatientQuery(
project_name=project_name,
code_system: tp.Optional[str] = None,
**kwargs: tp.Dict[tp.Any, tp.Any],
) -> base.PatientQuery:

"""Returns the right instance of `PatientQuery` based on `data_source`.
Args:
runner: The runner to use for making data queries
data_source: The definition of the source, e.g., directory containing
Parquet files or a BigQuery dataset.
kwargs: A dictionary of runner specific parameters.
Returns:
The created instance.
Raises:
ValueError: When the input `data_source` is malformed or not implemented.
"""
if runner == Runner.SPARK:
return query_lib_spark._SparkPatientQuery(data_source, code_system)
if runner == Runner.BIG_QUERY:
if "project_name" not in kwargs:
raise ValueError(
"'project_name' should be provided in kwargs while using "
"a BigQuery data source"
)
return query_lib_big_query._BigQueryPatientQuery(
project_name=kwargs['project_name'],
bq_dataset=data_source,
code_system=code_system
code_system=code_system,
)

raise ValueError('Query engine {} is not supported yet.'.format(runner))
raise ValueError("Query engine {} is not supported yet.".format(runner))
Loading

0 comments on commit 337575a

Please sign in to comment.