Skip to content

Commit

Permalink
Merge pull request #10 from mmcdermott/multirun
Browse files Browse the repository at this point in the history
Multirun capabilities and other improvements
  • Loading branch information
mmcdermott authored Jun 11, 2024
2 parents a77496f + 1423051 commit fd56b71
Show file tree
Hide file tree
Showing 31 changed files with 2,646 additions and 177 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ repos:
rev: v2.2.0
hooks:
- id: autoflake
args: [--in-place, --remove-all-unused-imports]

# python upgrading syntax to newer version
- repo: https://github.com/asottile/pyupgrade
Expand Down
89 changes: 78 additions & 11 deletions MIMIC-IV_Example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ Download this repository and install the requirements:
```bash
git clone git@github.com:mmcdermott/MEDS_polars_functions.git
cd MEDS_polars_functions
git checkout MIMIC_IV
conda create -n MEDS python=3.12
conda activate MEDS
pip install .[mimic]
pip install .[examples]
```

## Step 1: Download MIMIC-IV
Expand Down Expand Up @@ -73,19 +72,87 @@ In practice, on a machine with 150 GB of RAM and 10 cores, this step takes less

## Step 3: Run the MEDS extraction ETL

### Running locally, serially

We will assume you want to output the final MEDS dataset into a directory we'll denote as `$MIMICIV_MEDS_DIR`.
Note this is a different directory than the pre-MEDS directory (though, of course, they can both be
subdirectories of the same root directory).

This is a step in 4 parts:

1. Sub-shard the raw files. Run this command as many times simultaneously as you would like to have workers
performing this sub-sharding step.
performing this sub-sharding step. See below for how to automate this parallelism using hydra launchers.

```bash
./scripts/extraction/shard_events.py \
input_dir=$MIMICIV_PREMEDS_DIR \
cohort_dir=$MIMICIV_MEDS_DIR \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml
```

In practice, on a machine with 150 GB of RAM and 10 cores, this step takes approximately 20 minutes in total.

2. Extract and form the patient splits and sub-shards.

```bash
./scripts/extraction/split_and_shard_patients.py \
input_dir=$MIMICIV_PREMEDS_DIR \
cohort_dir=$MIMICIV_MEDS_DIR \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml
```

In practice, on a machine with 150 GB of RAM and 10 cores, this step takes less than 5 minutes in total.

3. Extract patient sub-shards and convert to MEDS events.

```bash
./scripts/extraction/convert_to_sharded_events.py \
input_dir=$MIMICIV_PREMEDS_DIR \
cohort_dir=$MIMICIV_MEDS_DIR \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml
```

In practice, serially, this also takes around 20 minutes or more. However, it can be trivially parallelized to
cut the time down by a factor of the number of workers processing the data by simply running the command
multiple times (though this will, of course, consume more resources). If your filesystem is distributed, these
commands can also be launched as separate slurm jobs, for example. For MIMIC-IV, this level of parallelization
and performance is not necessary; however, for larger datasets, it can be.

4. Merge the MEDS events into a single file per patient sub-shard.

```bash
./scripts/extraction/merge_to_MEDS_cohort.py \
input_dir=$MIMICIV_PREMEDS_DIR \
cohort_dir=$MIMICIV_MEDS_DIR \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml
```

### Running Locally, in Parallel.

This step is the exact same commands as above, but leverages Hydra's multirun capabilities with the `joblib`
launcher. Install this package with the optional `local_parallelism` option (e.g., `pip install -e .[local_parallelism]` and run `./MIMIC-IV_Example/joint_script.sh`. See that script for expected args.

### Running Each Step over Slurm

To use slurm, run each command with the number of workers desired using Hydra's multirun capabilities with the
`submitit_slurm` launcher. Install this package with the optional `slurm_parallelism` option. See below for
modified commands. Note these can't be chained in a single script as the jobs will not wait for all slurm jobs
to finish before moving on to the next stage. Let `$N_PARALLEL_WORKERS` be the number of desired workers

1. Sub-shard the raw files.

```bash
./scripts/extraction/shard_events.py \
raw_cohort_dir=$MIMICIV_PREMEDS_DIR \
MEDS_cohort_dir=$MIMICIV_MEDS_DIR \
--multirun \
worker="range(0,$N_PARALLEL_WORKERS)" \
hydra/launcher=submitit_slurm \
hydra.launcher.timeout_min=60 \
hydra.launcher.cpus_per_task=10 \
hydra.launcher.mem_gb=50 \
hydra.launcher.name="${hydra.job.name}_${worker}" \
hydra.launcher.partition="short" \
input_dir=$MIMICIV_PREMEDS_DIR \
cohort_dir=$MIMICIV_MEDS_DIR \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml
```

Expand All @@ -95,8 +162,8 @@ In practice, on a machine with 150 GB of RAM and 10 cores, this step takes appro

```bash
./scripts/extraction/split_and_shard_patients.py \
raw_cohort_dir=$MIMICIV_PREMEDS_DIR \
MEDS_cohort_dir=$MIMICIV_MEDS_DIR \
input_dir=$MIMICIV_PREMEDS_DIR \
cohort_dir=$MIMICIV_MEDS_DIR \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml
```

Expand All @@ -106,8 +173,8 @@ In practice, on a machine with 150 GB of RAM and 10 cores, this step takes less

```bash
./scripts/extraction/convert_to_sharded_events.py \
raw_cohort_dir=$MIMICIV_PREMEDS_DIR \
MEDS_cohort_dir=$MIMICIV_MEDS_DIR \
input_dir=$MIMICIV_PREMEDS_DIR \
cohort_dir=$MIMICIV_MEDS_DIR \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml
```

Expand All @@ -121,8 +188,8 @@ and performance is not necessary; however, for larger datasets, it can be.

```bash
./scripts/extraction/merge_to_MEDS_cohort.py \
raw_cohort_dir=$MIMICIV_PREMEDS_DIR \
MEDS_cohort_dir=$MIMICIV_MEDS_DIR \
input_dir=$MIMICIV_PREMEDS_DIR \
cohort_dir=$MIMICIV_MEDS_DIR \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml
```

Expand Down
Empty file added MIMIC-IV_Example/__init__.py
Empty file.
76 changes: 76 additions & 0 deletions MIMIC-IV_Example/joint_script.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/usr/bin/env bash

# This makes the script fail if any internal script fails
set -e

# Function to display help message
function display_help() {
echo "Usage: $0 <MIMICIV_RAW_DIR> <MIMICIV_PREMEDS_DIR> <MIMICIV_MEDS_DIR> <N_PARALLEL_WORKERS>"
echo
echo "This script processes MIMIC-IV data through several steps, handling raw data conversion,"
echo "sharding events, splitting patients, converting to sharded events, and merging into a MEDS cohort."
echo
echo "Arguments:"
echo " MIMICIV_RAW_DIR Directory containing raw MIMIC-IV data files."
echo " MIMICIV_PREMEDS_DIR Output directory for pre-MEDS data."
echo " MIMICIV_MEDS_DIR Output directory for processed MEDS data."
echo " N_PARALLEL_WORKERS Number of parallel workers for processing."
echo
echo "Options:"
echo " -h, --help Display this help message and exit."
exit 1
}

# Check if the first parameter is '-h' or '--help'
if [[ "$1" == "-h" || "$1" == "--help" ]]; then
display_help
fi

# Check for mandatory parameters
if [ "$#" -lt 4 ]; then
echo "Error: Incorrect number of arguments provided."
display_help
fi

MIMICIV_RAW_DIR="$1"
MIMICIV_PREMEDS_DIR="$2"
MIMICIV_MEDS_DIR="$3"
N_PARALLEL_WORKERS="$4"

shift 4

echo "Running pre-MEDS conversion."
./MIMIC-IV_Example/pre_MEDS.py raw_cohort_dir="$MIMICIV_RAW_DIR" output_dir="$MIMICIV_PREMEDS_DIR"

echo "Running shard_events.py with $N_PARALLEL_WORKERS workers in parallel"
./scripts/extraction/shard_events.py \
--multirun \
worker="range(0,$N_PARALLEL_WORKERS)" \
hydra/launcher=joblib \
input_dir="$MIMICIV_PREMEDS_DIR" \
cohort_dir="$MIMICIV_MEDS_DIR" \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml "$@"

echo "Splitting patients in serial"
./scripts/extraction/split_and_shard_patients.py \
input_dir="$MIMICIV_PREMEDS_DIR" \
cohort_dir="$MIMICIV_MEDS_DIR" \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml "$@"

echo "Converting to sharded events with $N_PARALLEL_WORKERS workers in parallel"
./scripts/extraction/convert_to_sharded_events.py \
--multirun \
worker="range(0,$N_PARALLEL_WORKERS)" \
hydra/launcher=joblib \
input_dir="$MIMICIV_PREMEDS_DIR" \
cohort_dir="$MIMICIV_MEDS_DIR" \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml "$@"

echo "Merging to a MEDS cohort with $N_PARALLEL_WORKERS workers in parallel"
./scripts/extraction/merge_to_MEDS_cohort.py \
--multirun \
worker="range(0,$N_PARALLEL_WORKERS)" \
hydra/launcher=joblib \
input_dir="$MIMICIV_PREMEDS_DIR" \
cohort_dir="$MIMICIV_MEDS_DIR" \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml "$@"
112 changes: 112 additions & 0 deletions MIMIC-IV_Example/joint_script_slurm.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#!/usr/bin/env bash

# This makes the script fail if any internal script fails
set -e

# Function to display help message
function display_help() {
echo "Usage: $0 <MIMICIV_RAW_DIR> <MIMICIV_PREMEDS_DIR> <MIMICIV_MEDS_DIR> <N_PARALLEL_WORKERS>"
echo
echo "This script processes MIMIC-IV data through several steps, handling raw data conversion,"
echo "sharding events, splitting patients, converting to sharded events, and merging into a MEDS cohort."
echo "This script uses slurm to process the data in parallel via the 'submitit' Hydra launcher."
echo
echo "Arguments:"
echo " MIMICIV_RAW_DIR Directory containing raw MIMIC-IV data files."
echo " MIMICIV_PREMEDS_DIR Output directory for pre-MEDS data."
echo " MIMICIV_MEDS_DIR Output directory for processed MEDS data."
echo " N_PARALLEL_WORKERS Number of parallel workers for processing."
echo
echo "Options:"
echo " -h, --help Display this help message and exit."
exit 1
}

# Check if the first parameter is '-h' or '--help'
if [[ "$1" == "-h" || "$1" == "--help" ]]; then
display_help
fi

# Check for mandatory parameters
if [ "$#" -ne 4 ]; then
echo "Error: Incorrect number of arguments provided."
display_help
fi

export MIMICIV_RAW_DIR="$1"
export MIMICIV_PREMEDS_DIR="$2"
export MIMICIV_MEDS_DIR="$3"
export N_PARALLEL_WORKERS="$4"

shift 4

# Note we use `--multirun` throughout here due to ensure the submitit launcher is used throughout, so that
# this doesn't fall back on running anything locally in a setting where only slurm worker nodes have
# sufficient computational resources to run the actual jobs.

# echo "Running pre-MEDS conversion on one worker."
# ./MIMIC-IV_Example/pre_MEDS.py \
# --multirun \
# worker="range(0,1)" \
# hydra/launcher=submitit_slurm \
# hydra.launcher.timeout_min=60 \
# hydra.launcher.cpus_per_task=10 \
# hydra.launcher.mem_gb=50 \
# hydra.launcher.partition="short" \
# raw_cohort_dir="$MIMICIV_RAW_DIR" \
# output_dir="$MIMICIV_PREMEDS_DIR"

echo "Trying submitit launching with $N_PARALLEL_WORKERS jobs."

./scripts/extraction/shard_events.py \
--multirun \
worker="range(0,$N_PARALLEL_WORKERS)" \
hydra/launcher=submitit_slurm \
hydra.launcher.timeout_min=60 \
hydra.launcher.cpus_per_task=10 \
hydra.launcher.mem_gb=50 \
hydra.launcher.partition="short" \
"hydra.job.env_copy=[PATH]" \
input_dir="$MIMICIV_PREMEDS_DIR" \
cohort_dir="$MIMICIV_MEDS_DIR" \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml \
stage=shard_events

# echo "Splitting patients on one worker"
# ./scripts/extraction/split_and_shard_patients.py \
# --multirun \
# worker="range(0,1)" \
# hydra/launcher=submitit_slurm \
# hydra.launcher.timeout_min=60 \
# hydra.launcher.cpus_per_task=10 \
# hydra.launcher.mem_gb=50 \
# hydra.launcher.partition="short" \
# input_dir="$MIMICIV_PREMEDS_DIR" \
# cohort_dir="$MIMICIV_MEDS_DIR" \
# event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml "$@"
#
# echo "Converting to sharded events with $N_PARALLEL_WORKERS workers in parallel"
# ./scripts/extraction/convert_to_sharded_events.py \
# --multirun \
# worker="range(0,$N_PARALLEL_WORKERS)" \
# hydra/launcher=submitit_slurm \
# hydra.launcher.timeout_min=60 \
# hydra.launcher.cpus_per_task=10 \
# hydra.launcher.mem_gb=50 \
# hydra.launcher.partition="short" \
# input_dir="$MIMICIV_PREMEDS_DIR" \
# cohort_dir="$MIMICIV_MEDS_DIR" \
# event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml "$@"
#
# echo "Merging to a MEDS cohort with $N_PARALLEL_WORKERS workers in parallel"
# ./scripts/extraction/merge_to_MEDS_cohort.py \
# --multirun \
# worker="range(0,$N_PARALLEL_WORKERS)" \
# hydra/launcher=submitit_slurm \
# hydra.launcher.timeout_min=60 \
# hydra.launcher.cpus_per_task=10 \
# hydra.launcher.mem_gb=50 \
# hydra.launcher.partition="short" \
# input_dir="$MIMICIV_PREMEDS_DIR" \
# cohort_dir="$MIMICIV_MEDS_DIR" \
# event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml "$@"
13 changes: 11 additions & 2 deletions MIMIC-IV_Example/pre_MEDS.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def fix_static_data(raw_static_df: pl.LazyFrame, death_times_df: pl.LazyFrame) -

return raw_static_df.join(death_times_df, on="subject_id", how="left").select(
"subject_id",
pl.coalesce(pl.col("dod"), pl.col("deathtime")).alias("dod"),
pl.coalesce(pl.col("deathtime"), pl.col("dod")).alias("dod"),
(pl.col("anchor_year") - pl.col("anchor_age")).cast(str).alias("year_of_birth"),
"gender",
)
Expand Down Expand Up @@ -94,18 +94,27 @@ def main(cfg: DictConfig):
pfx = get_shard_prefix(raw_cohort_dir, in_fp)

out_fp = MEDS_input_dir / in_fp.relative_to(raw_cohort_dir)

if out_fp.is_file():
print(f"Done with {pfx}. Continuing")
continue

out_fp.parent.mkdir(parents=True, exist_ok=True)

if pfx not in FUNCTIONS:
logger.info(
f"No function needed for {pfx}: "
f"Symlinking {str(in_fp.resolve())} to {str(out_fp.resolve())}"
)
relative_in_fp = in_fp.relative_to(out_fp.parent, walk_up=True)
relative_in_fp = in_fp.relative_to(out_fp.resolve().parent, walk_up=True)
out_fp.symlink_to(relative_in_fp)
continue
else:
out_fp = MEDS_input_dir / f"{pfx}.parquet"
if out_fp.is_file():
print(f"Done with {pfx}. Continuing")
continue

fn, need_df = FUNCTIONS[pfx]
if not need_df:
st = datetime.now()
Expand Down
21 changes: 21 additions & 0 deletions MIMIC-IV_Example/sbatch_joint_script.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env bash
#SBATCH -c 10 # Request one core
#SBATCH -t 0-03:00 # Runtime in D-HH:MM format
#SBATCH -p short # Partition to run in
#SBATCH --mem=300GB # Memory total in MiB (for all cores)
#SBATCH -o MIMIC_IV_MEDS_%j_sbatch.out # File to which STDOUT will be written, including job ID (%j)
#SBATCH -e MIMIC_IV_MEDS_%j_sbatch.err # File to which STDERR will be written, including job ID (%j)

cd /n/data1/hms/dbmi/zaklab/mmd/MEDS_polars_functions || exit

MIMICIV_MEDS_DIR="$3"

LOG_DIR="$MIMICIV_MEDS_DIR/.logs"

echo "Running with saving to $LOG_DIR"

mkdir -p "$LOG_DIR"

PATH="/home/mbm47/.conda/envs/MEDS_pipelines/bin:$PATH" \
time mprof run --include-children --exit-code --output "$LOG_DIR/mprofile.dat" \
./MIMIC-IV_Example/joint_script.sh "$@" 2> "$LOG_DIR/timings.txt"
Loading

0 comments on commit fd56b71

Please sign in to comment.