diff --git a/MIMIC-IV_Example/joint_script.sh b/MIMIC-IV_Example/joint_script.sh index 7187b78..a98fee7 100755 --- a/MIMIC-IV_Example/joint_script.sh +++ b/MIMIC-IV_Example/joint_script.sh @@ -11,10 +11,11 @@ function display_help() { echo "sharding events, splitting patients, converting to sharded events, and merging into a MEDS cohort." echo echo "Arguments:" - echo " MIMICIV_RAW_DIR Directory containing raw MIMIC-IV data files." - echo " MIMICIV_PREMEDS_DIR Output directory for pre-MEDS data." - echo " MIMICIV_MEDS_DIR Output directory for processed MEDS data." - echo " N_PARALLEL_WORKERS Number of parallel workers for processing." + echo " MIMICIV_RAW_DIR Directory containing raw MIMIC-IV data files." + echo " MIMICIV_PREMEDS_DIR Output directory for pre-MEDS data." + echo " MIMICIV_MEDS_DIR Output directory for processed MEDS data." + echo " N_PARALLEL_WORKERS Number of parallel workers for processing." + echo " (OPTIONAL) do_unzip=true OR do_unzip=false Optional flag to unzip csv files before processing." echo echo "Options:" echo " -h, --help Display this help message and exit." @@ -37,7 +38,39 @@ MIMICIV_PREMEDS_DIR="$2" MIMICIV_MEDS_DIR="$3" N_PARALLEL_WORKERS="$4" -shift 4 +# Default do_unzip value +DO_UNZIP="false" + +# Check if the 5th argument is either do_unzip=true or do_unzip=false +if [ $# -ge 5 ]; then + case "$5" in + do_unzip=true) + DO_UNZIP="true" + shift 5 + ;; + do_unzip=false) + DO_UNZIP="false" + shift 5 + ;; + do_unzip=*) + echo "Error: Invalid do_unzip value. Use 'do_unzip=true' or 'do_unzip=false'." + exit 1 + ;; + *) + # If the 5th argument is not related to do_unzip, leave it for other_args + shift 4 + ;; + esac +else + shift 4 +fi + +if [ "$DO_UNZIP" == "true" ]; then + echo "Unzipping csv files." + for file in "${MIMICIV_RAW_DIR}"/*/*.csv.gz; do gzip -d --force "$file"; done +else + echo "Skipping unzipping." +fi echo "Running pre-MEDS conversion." ./MIMIC-IV_Example/pre_MEDS.py raw_cohort_dir="$MIMICIV_RAW_DIR" output_dir="$MIMICIV_PREMEDS_DIR" diff --git a/MIMIC-IV_Example/pre_MEDS.py b/MIMIC-IV_Example/pre_MEDS.py index bf1d3e8..1530a8b 100755 --- a/MIMIC-IV_Example/pre_MEDS.py +++ b/MIMIC-IV_Example/pre_MEDS.py @@ -1,12 +1,9 @@ #!/usr/bin/env python """Performs pre-MEDS data wrangling for MIMIC-IV.""" -import rootutils -root = rootutils.setup_root(__file__, dotenv=True, pythonpath=True, cwd=True) - -import gzip from datetime import datetime +from functools import partial from pathlib import Path import hydra @@ -14,23 +11,10 @@ from loguru import logger from omegaconf import DictConfig +from MEDS_transforms.extract.utils import get_supported_fp from MEDS_transforms.utils import get_shard_prefix, hydra_loguru_init, write_lazyframe -def load_raw_mimic_file(fp: Path, **kwargs) -> pl.LazyFrame: - """Load a raw MIMIC file into a Polars DataFrame. - - Args: - fp: The path to the MIMIC file. - - Returns: - The Polars DataFrame containing the MIMIC data. - """ - - with gzip.open(fp, mode="rb") as f: - return pl.read_csv(f, infer_schema_length=100000, **kwargs).lazy() - - def add_discharge_time_by_hadm_id( df: pl.LazyFrame, discharge_time_df: pl.LazyFrame, out_column_name: str = "hadm_discharge_time" ) -> pl.LazyFrame: @@ -82,14 +66,29 @@ def main(cfg: DictConfig): raw_cohort_dir = Path(cfg.raw_cohort_dir) MEDS_input_dir = Path(cfg.output_dir) - all_fps = list(raw_cohort_dir.glob("**/*.csv.gz")) + all_fps = list(raw_cohort_dir.glob("**/*.*")) dfs_to_load = {} + seen_fps = {} for in_fp in all_fps: pfx = get_shard_prefix(raw_cohort_dir, in_fp) - out_fp = MEDS_input_dir / in_fp.relative_to(raw_cohort_dir) + try: + fp, read_fn = get_supported_fp(raw_cohort_dir, pfx) + except FileNotFoundError: + logger.info(f"Skipping {pfx} @ {str(in_fp.resolve())} as no compatible dataframe file was found.") + continue + + if fp.suffix in [".csv", ".csv.gz"]: + read_fn = partial(read_fn, infer_schema_length=100000) + + if str(fp.resolve()) in seen_fps: + continue + else: + seen_fps[str(fp.resolve())] = read_fn + + out_fp = MEDS_input_dir / fp.relative_to(raw_cohort_dir) if out_fp.is_file(): print(f"Done with {pfx}. Continuing") @@ -99,10 +98,9 @@ def main(cfg: DictConfig): if pfx not in FUNCTIONS: logger.info( - f"No function needed for {pfx}: " - f"Symlinking {str(in_fp.resolve())} to {str(out_fp.resolve())}" + f"No function needed for {pfx}: " f"Symlinking {str(fp.resolve())} to {str(out_fp.resolve())}" ) - relative_in_fp = in_fp.relative_to(out_fp.resolve().parent, walk_up=True) + relative_in_fp = fp.relative_to(out_fp.resolve().parent, walk_up=True) out_fp.symlink_to(relative_in_fp) continue else: @@ -115,8 +113,8 @@ def main(cfg: DictConfig): if not need_df: st = datetime.now() logger.info(f"Processing {pfx}...") - df = load_raw_mimic_file(in_fp) - logger.info(f" Loaded raw {in_fp} in {datetime.now() - st}") + df = read_fn(fp) + logger.info(f" Loaded raw {fp} in {datetime.now() - st}") processed_df = fn(df) write_lazyframe(processed_df, out_fp) logger.info(f" Processed and wrote to {str(out_fp.resolve())} in {datetime.now() - st}") @@ -125,19 +123,19 @@ def main(cfg: DictConfig): if needed_pfx not in dfs_to_load: dfs_to_load[needed_pfx] = {"fps": set(), "cols": set()} - dfs_to_load[needed_pfx]["fps"].add(in_fp) + dfs_to_load[needed_pfx]["fps"].add(fp) dfs_to_load[needed_pfx]["cols"].update(needed_cols) for df_to_load_pfx, fps_and_cols in dfs_to_load.items(): fps = fps_and_cols["fps"] cols = list(fps_and_cols["cols"]) - df_to_load_fp = raw_cohort_dir / f"{df_to_load_pfx}.csv.gz" + df_to_load_fp, df_to_load_read_fn = get_supported_fp(raw_cohort_dir, df_to_load_pfx) st = datetime.now() logger.info(f"Loading {str(df_to_load_fp.resolve())} for manipulating other dataframes...") - df = load_raw_mimic_file(df_to_load_fp, columns=cols) + df = read_fn(df_to_load_fp, columns=cols) logger.info(f" Loaded in {datetime.now() - st}") for fp in fps: @@ -149,7 +147,7 @@ def main(cfg: DictConfig): fp_st = datetime.now() logger.info(f" Loading {str(fp.resolve())}...") - fp_df = load_raw_mimic_file(fp) + fp_df = seen_fps[str(fp.resolve())](fp) logger.info(f" Loaded in {datetime.now() - fp_st}") processed_df = fn(fp_df, df) write_lazyframe(processed_df, out_fp) diff --git a/eICU_Example/pre_MEDS.py b/eICU_Example/pre_MEDS.py index 317ecb5..5ebe058 100755 --- a/eICU_Example/pre_MEDS.py +++ b/eICU_Example/pre_MEDS.py @@ -4,9 +4,6 @@ See the docstring of `main` for more information. """ -import rootutils - -root = rootutils.setup_root(__file__, dotenv=True, pythonpath=True, cwd=True) import gzip from collections.abc import Callable diff --git a/pyproject.toml b/pyproject.toml index 5bea0a8..e512e7e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,6 @@ dependencies = [ [tool.setuptools_scm] [project.optional-dependencies] -examples = ["rootutils"] dev = ["pre-commit"] tests = ["pytest", "pytest-cov", "rootutils"] local_parallelism = ["hydra-joblib-launcher"]