Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added unzipping functionality for mimic iv example #31

Merged
43 changes: 38 additions & 5 deletions MIMIC-IV_Example/joint_script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ function display_help() {
echo "sharding events, splitting patients, converting to sharded events, and merging into a MEDS cohort."
echo
echo "Arguments:"
echo " MIMICIV_RAW_DIR Directory containing raw MIMIC-IV data files."
echo " MIMICIV_PREMEDS_DIR Output directory for pre-MEDS data."
echo " MIMICIV_MEDS_DIR Output directory for processed MEDS data."
echo " N_PARALLEL_WORKERS Number of parallel workers for processing."
echo " MIMICIV_RAW_DIR Directory containing raw MIMIC-IV data files."
echo " MIMICIV_PREMEDS_DIR Output directory for pre-MEDS data."
echo " MIMICIV_MEDS_DIR Output directory for processed MEDS data."
echo " N_PARALLEL_WORKERS Number of parallel workers for processing."
echo " (OPTIONAL) do_unzip=true OR do_unzip=false Optional flag to unzip csv files before processing."
echo
echo "Options:"
echo " -h, --help Display this help message and exit."
Expand All @@ -37,7 +38,39 @@ MIMICIV_PREMEDS_DIR="$2"
MIMICIV_MEDS_DIR="$3"
N_PARALLEL_WORKERS="$4"

shift 4
# Default do_unzip value
DO_UNZIP="false"

# Check if the 5th argument is either do_unzip=true or do_unzip=false
if [ $# -ge 5 ]; then
case "$5" in
do_unzip=true)
DO_UNZIP="true"
shift 5
;;
do_unzip=false)
DO_UNZIP="false"
shift 5
;;
do_unzip=*)
echo "Error: Invalid do_unzip value. Use 'do_unzip=true' or 'do_unzip=false'."
exit 1
;;
*)
# If the 5th argument is not related to do_unzip, leave it for other_args
shift 4
;;
esac
else
shift 4
fi

if [ "$DO_UNZIP" == "true" ]; then
echo "Unzipping csv files."
for file in "${MIMICIV_RAW_DIR}"/*/*.csv.gz; do gzip -d --force "$file"; done
mmcdermott marked this conversation as resolved.
Show resolved Hide resolved
else
echo "Skipping unzipping."
fi

echo "Running pre-MEDS conversion."
./MIMIC-IV_Example/pre_MEDS.py raw_cohort_dir="$MIMICIV_RAW_DIR" output_dir="$MIMICIV_PREMEDS_DIR"
Expand Down
56 changes: 27 additions & 29 deletions MIMIC-IV_Example/pre_MEDS.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,20 @@
#!/usr/bin/env python

"""Performs pre-MEDS data wrangling for MIMIC-IV."""
import rootutils

root = rootutils.setup_root(__file__, dotenv=True, pythonpath=True, cwd=True)

import gzip
from datetime import datetime
from functools import partial
from pathlib import Path

import hydra
import polars as pl
from loguru import logger
from omegaconf import DictConfig

from MEDS_transforms.extract.utils import get_supported_fp
from MEDS_transforms.utils import get_shard_prefix, hydra_loguru_init, write_lazyframe


def load_raw_mimic_file(fp: Path, **kwargs) -> pl.LazyFrame:
"""Load a raw MIMIC file into a Polars DataFrame.

Args:
fp: The path to the MIMIC file.

Returns:
The Polars DataFrame containing the MIMIC data.
"""

with gzip.open(fp, mode="rb") as f:
return pl.read_csv(f, infer_schema_length=100000, **kwargs).lazy()


def add_discharge_time_by_hadm_id(
df: pl.LazyFrame, discharge_time_df: pl.LazyFrame, out_column_name: str = "hadm_discharge_time"
) -> pl.LazyFrame:
Expand Down Expand Up @@ -82,14 +66,29 @@ def main(cfg: DictConfig):
raw_cohort_dir = Path(cfg.raw_cohort_dir)
MEDS_input_dir = Path(cfg.output_dir)

all_fps = list(raw_cohort_dir.glob("**/*.csv.gz"))
all_fps = list(raw_cohort_dir.glob("**/*.*"))

dfs_to_load = {}
seen_fps = {}

for in_fp in all_fps:
pfx = get_shard_prefix(raw_cohort_dir, in_fp)

out_fp = MEDS_input_dir / in_fp.relative_to(raw_cohort_dir)
try:
fp, read_fn = get_supported_fp(raw_cohort_dir, pfx)
except FileNotFoundError:
logger.info(f"Skipping {pfx} @ {str(in_fp.resolve())} as no compatible dataframe file was found.")
continue

if fp.suffix in [".csv", ".csv.gz"]:
read_fn = partial(read_fn, infer_schema_length=100000)

if str(fp.resolve()) in seen_fps:
continue
else:
seen_fps[str(fp.resolve())] = read_fn

out_fp = MEDS_input_dir / fp.relative_to(raw_cohort_dir)

if out_fp.is_file():
print(f"Done with {pfx}. Continuing")
Expand All @@ -99,10 +98,9 @@ def main(cfg: DictConfig):

if pfx not in FUNCTIONS:
logger.info(
f"No function needed for {pfx}: "
f"Symlinking {str(in_fp.resolve())} to {str(out_fp.resolve())}"
f"No function needed for {pfx}: " f"Symlinking {str(fp.resolve())} to {str(out_fp.resolve())}"
)
relative_in_fp = in_fp.relative_to(out_fp.resolve().parent, walk_up=True)
relative_in_fp = fp.relative_to(out_fp.resolve().parent, walk_up=True)
out_fp.symlink_to(relative_in_fp)
continue
else:
Expand All @@ -115,8 +113,8 @@ def main(cfg: DictConfig):
if not need_df:
st = datetime.now()
logger.info(f"Processing {pfx}...")
df = load_raw_mimic_file(in_fp)
logger.info(f" Loaded raw {in_fp} in {datetime.now() - st}")
df = read_fn(fp)
logger.info(f" Loaded raw {fp} in {datetime.now() - st}")
processed_df = fn(df)
write_lazyframe(processed_df, out_fp)
logger.info(f" Processed and wrote to {str(out_fp.resolve())} in {datetime.now() - st}")
Expand All @@ -125,19 +123,19 @@ def main(cfg: DictConfig):
if needed_pfx not in dfs_to_load:
dfs_to_load[needed_pfx] = {"fps": set(), "cols": set()}

dfs_to_load[needed_pfx]["fps"].add(in_fp)
dfs_to_load[needed_pfx]["fps"].add(fp)
dfs_to_load[needed_pfx]["cols"].update(needed_cols)

for df_to_load_pfx, fps_and_cols in dfs_to_load.items():
fps = fps_and_cols["fps"]
cols = list(fps_and_cols["cols"])

df_to_load_fp = raw_cohort_dir / f"{df_to_load_pfx}.csv.gz"
df_to_load_fp, df_to_load_read_fn = get_supported_fp(raw_cohort_dir, df_to_load_pfx)

st = datetime.now()

logger.info(f"Loading {str(df_to_load_fp.resolve())} for manipulating other dataframes...")
df = load_raw_mimic_file(df_to_load_fp, columns=cols)
df = read_fn(df_to_load_fp, columns=cols)
logger.info(f" Loaded in {datetime.now() - st}")

for fp in fps:
Expand All @@ -149,7 +147,7 @@ def main(cfg: DictConfig):

fp_st = datetime.now()
logger.info(f" Loading {str(fp.resolve())}...")
fp_df = load_raw_mimic_file(fp)
fp_df = seen_fps[str(fp.resolve())](fp)
logger.info(f" Loaded in {datetime.now() - fp_st}")
processed_df = fn(fp_df, df)
write_lazyframe(processed_df, out_fp)
Expand Down
3 changes: 0 additions & 3 deletions eICU_Example/pre_MEDS.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@

See the docstring of `main` for more information.
"""
import rootutils

root = rootutils.setup_root(__file__, dotenv=True, pythonpath=True, cwd=True)

import gzip
from collections.abc import Callable
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ dependencies = [
[tool.setuptools_scm]

[project.optional-dependencies]
examples = ["rootutils"]
dev = ["pre-commit"]
tests = ["pytest", "pytest-cov", "rootutils"]
local_parallelism = ["hydra-joblib-launcher"]
Expand Down
Loading