Skip to content

Commit

Permalink
Merge pull request #31 from mmcdermott/30-mimic-iv-example-has-high-m…
Browse files Browse the repository at this point in the history
…emory-usage-due-to-csvgz-files

added unzipping functionality for mimic iv example
  • Loading branch information
mmcdermott authored Aug 5, 2024
2 parents 06c62b5 + 6eaecb6 commit 600d20e
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 38 deletions.
43 changes: 38 additions & 5 deletions MIMIC-IV_Example/joint_script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ function display_help() {
echo "sharding events, splitting patients, converting to sharded events, and merging into a MEDS cohort."
echo
echo "Arguments:"
echo " MIMICIV_RAW_DIR Directory containing raw MIMIC-IV data files."
echo " MIMICIV_PREMEDS_DIR Output directory for pre-MEDS data."
echo " MIMICIV_MEDS_DIR Output directory for processed MEDS data."
echo " N_PARALLEL_WORKERS Number of parallel workers for processing."
echo " MIMICIV_RAW_DIR Directory containing raw MIMIC-IV data files."
echo " MIMICIV_PREMEDS_DIR Output directory for pre-MEDS data."
echo " MIMICIV_MEDS_DIR Output directory for processed MEDS data."
echo " N_PARALLEL_WORKERS Number of parallel workers for processing."
echo " (OPTIONAL) do_unzip=true OR do_unzip=false Optional flag to unzip csv files before processing."
echo
echo "Options:"
echo " -h, --help Display this help message and exit."
Expand All @@ -37,7 +38,39 @@ MIMICIV_PREMEDS_DIR="$2"
MIMICIV_MEDS_DIR="$3"
N_PARALLEL_WORKERS="$4"

shift 4
# Default do_unzip value
DO_UNZIP="false"

# Check if the 5th argument is either do_unzip=true or do_unzip=false
if [ $# -ge 5 ]; then
case "$5" in
do_unzip=true)
DO_UNZIP="true"
shift 5
;;
do_unzip=false)
DO_UNZIP="false"
shift 5
;;
do_unzip=*)
echo "Error: Invalid do_unzip value. Use 'do_unzip=true' or 'do_unzip=false'."
exit 1
;;
*)
# If the 5th argument is not related to do_unzip, leave it for other_args
shift 4
;;
esac
else
shift 4
fi

if [ "$DO_UNZIP" == "true" ]; then
echo "Unzipping csv files."
for file in "${MIMICIV_RAW_DIR}"/*/*.csv.gz; do gzip -d --force "$file"; done
else
echo "Skipping unzipping."
fi

echo "Running pre-MEDS conversion."
./MIMIC-IV_Example/pre_MEDS.py raw_cohort_dir="$MIMICIV_RAW_DIR" output_dir="$MIMICIV_PREMEDS_DIR"
Expand Down
56 changes: 27 additions & 29 deletions MIMIC-IV_Example/pre_MEDS.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,20 @@
#!/usr/bin/env python

"""Performs pre-MEDS data wrangling for MIMIC-IV."""
import rootutils

root = rootutils.setup_root(__file__, dotenv=True, pythonpath=True, cwd=True)

import gzip
from datetime import datetime
from functools import partial
from pathlib import Path

import hydra
import polars as pl
from loguru import logger
from omegaconf import DictConfig

from MEDS_transforms.extract.utils import get_supported_fp
from MEDS_transforms.utils import get_shard_prefix, hydra_loguru_init, write_lazyframe


def load_raw_mimic_file(fp: Path, **kwargs) -> pl.LazyFrame:
"""Load a raw MIMIC file into a Polars DataFrame.
Args:
fp: The path to the MIMIC file.
Returns:
The Polars DataFrame containing the MIMIC data.
"""

with gzip.open(fp, mode="rb") as f:
return pl.read_csv(f, infer_schema_length=100000, **kwargs).lazy()


def add_discharge_time_by_hadm_id(
df: pl.LazyFrame, discharge_time_df: pl.LazyFrame, out_column_name: str = "hadm_discharge_time"
) -> pl.LazyFrame:
Expand Down Expand Up @@ -82,14 +66,29 @@ def main(cfg: DictConfig):
raw_cohort_dir = Path(cfg.raw_cohort_dir)
MEDS_input_dir = Path(cfg.output_dir)

all_fps = list(raw_cohort_dir.glob("**/*.csv.gz"))
all_fps = list(raw_cohort_dir.glob("**/*.*"))

dfs_to_load = {}
seen_fps = {}

for in_fp in all_fps:
pfx = get_shard_prefix(raw_cohort_dir, in_fp)

out_fp = MEDS_input_dir / in_fp.relative_to(raw_cohort_dir)
try:
fp, read_fn = get_supported_fp(raw_cohort_dir, pfx)
except FileNotFoundError:
logger.info(f"Skipping {pfx} @ {str(in_fp.resolve())} as no compatible dataframe file was found.")
continue

if fp.suffix in [".csv", ".csv.gz"]:
read_fn = partial(read_fn, infer_schema_length=100000)

if str(fp.resolve()) in seen_fps:
continue
else:
seen_fps[str(fp.resolve())] = read_fn

out_fp = MEDS_input_dir / fp.relative_to(raw_cohort_dir)

if out_fp.is_file():
print(f"Done with {pfx}. Continuing")
Expand All @@ -99,10 +98,9 @@ def main(cfg: DictConfig):

if pfx not in FUNCTIONS:
logger.info(
f"No function needed for {pfx}: "
f"Symlinking {str(in_fp.resolve())} to {str(out_fp.resolve())}"
f"No function needed for {pfx}: " f"Symlinking {str(fp.resolve())} to {str(out_fp.resolve())}"
)
relative_in_fp = in_fp.relative_to(out_fp.resolve().parent, walk_up=True)
relative_in_fp = fp.relative_to(out_fp.resolve().parent, walk_up=True)
out_fp.symlink_to(relative_in_fp)
continue
else:
Expand All @@ -115,8 +113,8 @@ def main(cfg: DictConfig):
if not need_df:
st = datetime.now()
logger.info(f"Processing {pfx}...")
df = load_raw_mimic_file(in_fp)
logger.info(f" Loaded raw {in_fp} in {datetime.now() - st}")
df = read_fn(fp)
logger.info(f" Loaded raw {fp} in {datetime.now() - st}")
processed_df = fn(df)
write_lazyframe(processed_df, out_fp)
logger.info(f" Processed and wrote to {str(out_fp.resolve())} in {datetime.now() - st}")
Expand All @@ -125,19 +123,19 @@ def main(cfg: DictConfig):
if needed_pfx not in dfs_to_load:
dfs_to_load[needed_pfx] = {"fps": set(), "cols": set()}

dfs_to_load[needed_pfx]["fps"].add(in_fp)
dfs_to_load[needed_pfx]["fps"].add(fp)
dfs_to_load[needed_pfx]["cols"].update(needed_cols)

for df_to_load_pfx, fps_and_cols in dfs_to_load.items():
fps = fps_and_cols["fps"]
cols = list(fps_and_cols["cols"])

df_to_load_fp = raw_cohort_dir / f"{df_to_load_pfx}.csv.gz"
df_to_load_fp, df_to_load_read_fn = get_supported_fp(raw_cohort_dir, df_to_load_pfx)

st = datetime.now()

logger.info(f"Loading {str(df_to_load_fp.resolve())} for manipulating other dataframes...")
df = load_raw_mimic_file(df_to_load_fp, columns=cols)
df = read_fn(df_to_load_fp, columns=cols)
logger.info(f" Loaded in {datetime.now() - st}")

for fp in fps:
Expand All @@ -149,7 +147,7 @@ def main(cfg: DictConfig):

fp_st = datetime.now()
logger.info(f" Loading {str(fp.resolve())}...")
fp_df = load_raw_mimic_file(fp)
fp_df = seen_fps[str(fp.resolve())](fp)
logger.info(f" Loaded in {datetime.now() - fp_st}")
processed_df = fn(fp_df, df)
write_lazyframe(processed_df, out_fp)
Expand Down
3 changes: 0 additions & 3 deletions eICU_Example/pre_MEDS.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
See the docstring of `main` for more information.
"""
import rootutils

root = rootutils.setup_root(__file__, dotenv=True, pythonpath=True, cwd=True)

import gzip
from collections.abc import Callable
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ dependencies = [
[tool.setuptools_scm]

[project.optional-dependencies]
examples = ["rootutils"]
dev = ["pre-commit"]
tests = ["pytest", "pytest-cov", "rootutils"]
local_parallelism = ["hydra-joblib-launcher"]
Expand Down

0 comments on commit 600d20e

Please sign in to comment.