Skip to content

Commit

Permalink
Merge pull request #71 from mmcdermott/fix_MIMIC_IV
Browse files Browse the repository at this point in the history
Update the MIMIC-IV example with the updated interface and installable options.
  • Loading branch information
mmcdermott authored Jul 26, 2024
2 parents 853c2bc + 60b1fb8 commit 9cd2d92
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 59 deletions.
18 changes: 18 additions & 0 deletions MIMIC-IV_Example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ up from this one).
- [ ] Debug and remove rows with null codes! (there are a lot of them)
- [ ] Detailed validation

Note: If you use the slurm system and you launch the hydra submitit jobs from an interactive slurm node, you
may need to run `unset SLURM_CPU_BIND` in your terminal first to avoid errors.

## Step 0: Installation

Download this repository and install the requirements:
Expand All @@ -45,6 +48,21 @@ that page. You will need the raw `.csv.gz` files for this example. We will use `
the root directory of where the resulting _core data files_ are stored -- e.g., there should be a `hosp` and
`icu` subdirectory of `$MIMICIV_RAW_DIR`.

## Step 1.5: Download MIMIC-IV Metadata files
```bash
cd $MIMIC_RAW_DIR
wget https://raw.githubusercontent.com/MIT-LCP/mimic-code/v2.4.0/mimic-iv/concepts/concept_map/d_labitems_to_loinc.csv
wget https://raw.githubusercontent.com/MIT-LCP/mimic-code/v2.4.0/mimic-iv/concepts/concept_map/inputevents_to_rxnorm.csv
wget https://raw.githubusercontent.com/MIT-LCP/mimic-code/v2.4.0/mimic-iv/concepts/concept_map/lab_itemid_to_loinc.csv
wget https://raw.githubusercontent.com/MIT-LCP/mimic-code/v2.4.0/mimic-iv/concepts/concept_map/meas_chartevents_main.csv
wget https://raw.githubusercontent.com/MIT-LCP/mimic-code/v2.4.0/mimic-iv/concepts/concept_map/meas_chartevents_value.csv
wget https://raw.githubusercontent.com/MIT-LCP/mimic-code/v2.4.0/mimic-iv/concepts/concept_map/numerics-summary.csv
wget https://raw.githubusercontent.com/MIT-LCP/mimic-code/v2.4.0/mimic-iv/concepts/concept_map/outputevents_to_loinc.csv
wget https://raw.githubusercontent.com/MIT-LCP/mimic-code/v2.4.0/mimic-iv/concepts/concept_map/proc_datetimeevents.csv
wget https://raw.githubusercontent.com/MIT-LCP/mimic-code/v2.4.0/mimic-iv/concepts/concept_map/proc_itemid.csv
wget https://raw.githubusercontent.com/MIT-LCP/mimic-code/v2.4.0/mimic-iv/concepts/concept_map/waveforms-summary.csv
```

## Step 2: Run the basic MEDS ETL

This step contains several sub-steps; luckily, all these substeps can be run via a single script, with the
Expand Down
44 changes: 24 additions & 20 deletions MIMIC-IV_Example/configs/event_configs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ hosp/diagnoses_icd:
timestamp: col(hadm_discharge_time)
timestamp_format: "%Y-%m-%d %H:%M:%S"
_metadata:
d_icd_diagnoses:
hosp/d_icd_diagnoses:
description: "long_title"
parent_code: "ICD{icd_version}CM/{icd_code}" # Single strings are templates of columns.

Expand Down Expand Up @@ -85,7 +85,7 @@ hosp/hcpcsevents:
timestamp_format: "%Y-%m-%d"
_metadata:
# These are not all CPT codes, unfortunately
d_hcpcs:
hosp/d_hcpcs:
description: "long_description"
possibly_cpt_code: "code"

Expand Down Expand Up @@ -167,7 +167,7 @@ hosp/procedures_icd:
timestamp: col(chartdate)
timestamp_format: "%Y-%m-%d"
_metadata:
d_icd_procedures:
hosp/d_icd_procedures:
description: "long_title"
parent_code: # List of objects are string labels mapping to filters to be evaluated.
- "ICD{icd_version}Proc/{icd_code}": { icd_version: 9 }
Expand Down Expand Up @@ -214,10 +214,11 @@ icu/chartevents:
hadm_id: hadm_id
icustay_id: stay_id
_metadata:
meas_chartevents_main:
description: ["omop_concept_name", "label"] # List of strings are columns to be collated
itemid: "itemid (omop_source_code)"
parent_code: "{omop_vocabulary_id}/{omop_concept_code}"
# TODO: make this work even with missing valueuom
#meas_chartevents_main:
# description: ["omop_concept_name", "label"] # List of strings are columns to be collated
# itemid: "itemid (omop_source_code)"
# parent_code: "{omop_vocabulary_id}/{omop_concept_code}"
# TODO: I don't know if this is necessary...
d_labitems_to_loinc:
description: ["omop_concept_name", "label"] # List of strings are columns to be collated
Expand Down Expand Up @@ -276,12 +277,13 @@ icu/inputevents:
order_id: orderid
link_order_id: linkorderid
numerical_value: rate
_metadata:
inputevents_to_rxnorm:
description: ["omop_concept_name", "label"] # List of strings are columns to be collated
itemid: "itemid (omop_source_code)"
parent_code: "{omop_vocabulary_id}/{omop_concept_code}"
rateuom: null # A null column means this column is needed in pulling from the metadata.
#_metadata:
# TODO: get this working with null columns
#inputevents_to_rxnorm:
# description: ["omop_concept_name", "label"] # List of strings are columns to be collated
# itemid: "itemid (omop_source_code)"
# parent_code: "{omop_vocabulary_id}/{omop_concept_code}"
# rateuom: null # A null column means this column is needed in pulling from the metadata.
input_end:
code:
- INFUSION_END
Expand All @@ -296,11 +298,13 @@ icu/inputevents:
order_id: orderid
link_order_id: linkorderid
numerical_value: amount
_metadata:
inputevents_to_rxnorm:
description: ["omop_concept_name", "label"] # List of strings are columns to be collated
itemid: "itemid (omop_source_code)"
parent_code: "{omop_vocabulary_id}/{omop_concept_code}"
#_metadata:
# inputevents_to_rxnorm:
# description: ["omop_concept_name", "label"] # List of strings are columns to be collated
# itemid: "itemid (omop_source_code)"
# parent_code: "{omop_vocabulary_id}/{omop_concept_code}"
# statusdescription: null
# amountuom: null
patient_weight:
code:
- PATIENT_WEIGHT_AT_INFUSION
Expand All @@ -321,8 +325,8 @@ icu/outputevents:
icustay_id: stay_id
numerical_value: value
_metadata:
outputevents_to_rxnorm:
outputevents_to_loinc:
description: ["omop_concept_name", "label"] # List of strings are columns to be collated
itemid: "itemid (omop_source_code)"
valueuom: "unitname"
parent_code: "{omop_vocabulary_id}/{omop_concept_code}"
valueuom: unitname
32 changes: 28 additions & 4 deletions MIMIC-IV_Example/joint_script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,34 +43,58 @@ echo "Running pre-MEDS conversion."
./MIMIC-IV_Example/pre_MEDS.py raw_cohort_dir="$MIMICIV_RAW_DIR" output_dir="$MIMICIV_PREMEDS_DIR"

echo "Running shard_events.py with $N_PARALLEL_WORKERS workers in parallel"
./scripts/extraction/shard_events.py \
MEDS_extract-shard_events \
--multirun \
worker="range(0,$N_PARALLEL_WORKERS)" \
hydra/launcher=joblib \
input_dir="$MIMICIV_PREMEDS_DIR" \
cohort_dir="$MIMICIV_MEDS_DIR" \
stage="shard_events" \
stage_configs.shard_events.infer_schema_length=999999999 \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml "$@"

echo "Splitting patients in serial"
./scripts/extraction/split_and_shard_patients.py \
MEDS_extract-split_and_shard_patients \
input_dir="$MIMICIV_PREMEDS_DIR" \
cohort_dir="$MIMICIV_MEDS_DIR" \
stage="split_and_shard_patients" \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml "$@"

echo "Converting to sharded events with $N_PARALLEL_WORKERS workers in parallel"
./scripts/extraction/convert_to_sharded_events.py \
MEDS_extract-convert_to_sharded_events \
--multirun \
worker="range(0,$N_PARALLEL_WORKERS)" \
hydra/launcher=joblib \
input_dir="$MIMICIV_PREMEDS_DIR" \
cohort_dir="$MIMICIV_MEDS_DIR" \
stage="convert_to_sharded_events" \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml "$@"

echo "Merging to a MEDS cohort with $N_PARALLEL_WORKERS workers in parallel"
./scripts/extraction/merge_to_MEDS_cohort.py \
MEDS_extract-merge_to_MEDS_cohort \
--multirun \
worker="range(0,$N_PARALLEL_WORKERS)" \
hydra/launcher=joblib \
input_dir="$MIMICIV_PREMEDS_DIR" \
cohort_dir="$MIMICIV_MEDS_DIR" \
stage="merge_to_MEDS_cohort"
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml "$@"

echo "Aggregating initial code stats with $N_PARALLEL_WORKERS workers in parallel"
MEDS_transform-aggregate_code_metadata \
--config-name="extract" \
--multirun \
worker="range(0,$N_PARALLEL_WORKERS)" \
hydra/launcher=joblib \
input_dir="$MIMICIV_PREMEDS_DIR" \
cohort_dir="$MIMICIV_MEDS_DIR" \
stage="aggregate_code_metadata"
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml "$@"

# TODO -- make this the pre-meds dir and have the pre-meds script symlink
echo "Collecting code metadata with $N_PARALLEL_WORKERS workers in parallel"
MEDS_extract-extract_code_metadata \
input_dir="$MIMICIV_RAW_DIR" \
cohort_dir="$MIMICIV_MEDS_DIR" \
stage="extract_code_metadata" \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml "$@"
39 changes: 34 additions & 5 deletions MIMIC-IV_Example/joint_script_slurm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ shift 4
echo "Running pre-MEDS conversion on one worker."
./MIMIC-IV_Example/pre_MEDS.py \
--multirun \
worker="range(0,1)" \
+worker="range(0,1)" \
hydra/launcher=submitit_slurm \
hydra.launcher.timeout_min=60 \
hydra.launcher.cpus_per_task=10 \
Expand All @@ -58,7 +58,7 @@ echo "Running pre-MEDS conversion on one worker."

echo "Trying submitit launching with $N_PARALLEL_WORKERS jobs."

./scripts/extraction/shard_events.py \
MEDS_extract-shard_events \
--multirun \
worker="range(0,$N_PARALLEL_WORKERS)" \
hydra/launcher=submitit_slurm \
Expand All @@ -73,7 +73,7 @@ echo "Trying submitit launching with $N_PARALLEL_WORKERS jobs."
stage=shard_events

echo "Splitting patients on one worker"
./scripts/extraction/split_and_shard_patients.py \
MEDS_extract-split_and_shard_patients \
--multirun \
worker="range(0,1)" \
hydra/launcher=submitit_slurm \
Expand All @@ -86,7 +86,7 @@ echo "Splitting patients on one worker"
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml "$@"

echo "Converting to sharded events with $N_PARALLEL_WORKERS workers in parallel"
./scripts/extraction/convert_to_sharded_events.py \
MEDS_extract-convert_to_sharded_events \
--multirun \
worker="range(0,$N_PARALLEL_WORKERS)" \
hydra/launcher=submitit_slurm \
Expand All @@ -99,7 +99,7 @@ echo "Converting to sharded events with $N_PARALLEL_WORKERS workers in parallel"
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml "$@"

echo "Merging to a MEDS cohort with $N_PARALLEL_WORKERS workers in parallel"
./scripts/extraction/merge_to_MEDS_cohort.py \
MEDS_extract-merge_to_MEDS_cohort \
--multirun \
worker="range(0,$N_PARALLEL_WORKERS)" \
hydra/launcher=submitit_slurm \
Expand All @@ -110,3 +110,32 @@ echo "Merging to a MEDS cohort with $N_PARALLEL_WORKERS workers in parallel"
input_dir="$MIMICIV_PREMEDS_DIR" \
cohort_dir="$MIMICIV_MEDS_DIR" \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml "$@"

echo "Aggregating initial code stats with $N_PARALLEL_WORKERS workers in parallel"
MEDS_transform-aggregate_code_metadata \
--config-name="extract" \
--multirun \
worker="range(0,$N_PARALLEL_WORKERS)" \
hydra/launcher=submitit_slurm \
hydra.launcher.timeout_min=60 \
hydra.launcher.cpus_per_task=10 \
hydra.launcher.mem_gb=50 \
hydra.launcher.partition="short" \
input_dir="$MIMICIV_PREMEDS_DIR" \
cohort_dir="$MIMICIV_MEDS_DIR" \
stage="aggregate_code_metadata"
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml "$@"

# TODO -- make this the pre-meds dir and have the pre-meds script symlink
echo "Collecting code metadata with $N_PARALLEL_WORKERS workers in parallel"
MEDS_extract-extract_code_metadata
--multirun \
worker="range(0,$N_PARALLEL_WORKERS)" \
hydra/launcher=submitit_slurm \
hydra.launcher.timeout_min=60 \
hydra.launcher.cpus_per_task=10 \
hydra.launcher.mem_gb=50 \
hydra.launcher.partition="short" \
input_dir="$MIMICIV_RAW_DIR" \
cohort_dir="$MIMICIV_MEDS_DIR" \
event_conversion_config_fp=./MIMIC-IV_Example/configs/event_configs.yaml "$@"
21 changes: 0 additions & 21 deletions MIMIC-IV_Example/sbatch_joint_script.sh

This file was deleted.

18 changes: 14 additions & 4 deletions src/MEDS_transforms/extract/extract_code_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def extract_metadata(
code_expr, _, needed_code_cols = get_code_expr(event_cfg.pop("code"))

columns = metadata_df.collect_schema().names()
missing_cols = (needed_cols | needed_code_cols) - set(columns)
missing_cols = (needed_cols | needed_code_cols) - set(columns) - set(final_cols)
if missing_cols:
raise KeyError(f"Columns {missing_cols} not found in metadata columns: {columns}")

Expand Down Expand Up @@ -241,6 +241,7 @@ def get_events_and_metadata_by_metadata_fp(event_configs: dict | DictConfig) ->
Examples:
>>> event_configs = {
... "patient_id_col": "MRN",
... "icu/procedureevents": {
... "patient_id_col": "subject_id",
... "start": {
Expand Down Expand Up @@ -293,7 +294,10 @@ def get_events_and_metadata_by_metadata_fp(event_configs: dict | DictConfig) ->

out = {}

for event_cfgs_for_pfx in event_configs.values():
for file_pfx, event_cfgs_for_pfx in event_configs.items():
if file_pfx == "patient_id_col":
continue

for event_key, event_cfg in event_cfgs_for_pfx.items():
if event_key == "patient_id_col":
continue
Expand Down Expand Up @@ -342,6 +346,8 @@ def main(cfg: DictConfig):
event_metadata_cfgs = copy.deepcopy(event_metadata_cfgs)

metadata_fp, read_fn = get_supported_fp(raw_input_dir, input_prefix)
if metadata_fp.suffix != ".parquet":
read_fn = partial(read_fn, infer_schema_length=999999999)
out_fp = partial_metadata_dir / f"{input_prefix}.parquet"
logger.info(f"Extracting metadata from {metadata_fp} and saving to {out_fp}")

Expand All @@ -358,8 +364,12 @@ def main(cfg: DictConfig):

logger.info("Starting reduction process")

while not all(fp.is_file() for fp in all_out_fps):
logger.info("Waiting to begin reduction for all files to be written...")
while not all(fp.exists() for fp in all_out_fps):
missing_files_str = "\n".join((f" - {str(fp.resolve())}" for fp in all_out_fps if not fp.exists()))
logger.info(
"Waiting to begin reduction for all files to be written...\n"
f"{missing_files_str}"
)
time.sleep(cfg.polling_time)

start = datetime.now()
Expand Down
2 changes: 1 addition & 1 deletion src/MEDS_transforms/extract/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def parse_col_expr(cfg: str | list | dict[str, str] | ListConfig | DictConfig) -
return {"str": cfg}
case str():
return {"col": cfg}
case list() | ListConfig() if all(isinstance(x, (str, dict)) for x in cfg):
case list() | ListConfig() if all(isinstance(x, (str, dict, DictConfig)) for x in cfg):
return [parse_col_expr(x) for x in cfg]
case list() | ListConfig():
raise ValueError(
Expand Down
8 changes: 4 additions & 4 deletions src/MEDS_transforms/extract/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ def get_supported_fp(root_dir: Path, file_prefix: str | Path) -> tuple[Path, Cal
... get_supported_fp(tmpdir, "test") # doctest: +NORMALIZE_WHITESPACE
Traceback (most recent call last):
...
FileNotFoundError: No files found with prefix: test and allowed suffixes:
['.parquet', '.csv.gz', '.csv']
FileNotFoundError: No files found with prefix: test and allowed suffixes
['.parquet', '.csv.gz', '.csv']...
"""

for suffix in list(SupportedFileFormats):
Expand All @@ -126,6 +126,6 @@ def get_supported_fp(root_dir: Path, file_prefix: str | Path) -> tuple[Path, Cal
logger.debug(f"Found file: {str(fp.resolve())}")
return fp, READERS[suffix]
raise FileNotFoundError(
f"No files found with prefix: {file_prefix} and allowed suffixes: "
f"{[x.value for x in SupportedFileFormats]}"
f"No files found with prefix: {file_prefix} and allowed suffixes "
f"{[x.value for x in SupportedFileFormats]} in root dir {str(root_dir.resolve())}"
)

0 comments on commit 9cd2d92

Please sign in to comment.