Skip to content

Commit

Permalink
Added a fix for #182
Browse files Browse the repository at this point in the history
  • Loading branch information
mmcdermott committed Aug 30, 2024
1 parent 942613a commit 6fa26dc
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 19 deletions.
12 changes: 6 additions & 6 deletions MIMIC-IV_Example/slurm_runner.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ parallelize:
shard_events:
parallelize:
launcher_params:
timeout_min: 60
timeout_min: 50
cpus_per_task: 10
mem_gb: 40
partition: "short"
Expand All @@ -22,25 +22,25 @@ split_and_shard_subjects:
convert_to_sharded_events:
parallelize:
launcher_params:
timeout_min: 60
timeout_min: 10
cpus_per_task: 10
mem_gb: 25
partition: "short"

merge_to_MEDS_cohort:
parallelize:
launcher_params:
timeout_min: 60
timeout_min: 15
cpus_per_task: 10
mem_gb: 25
mem_gb: 85
partition: "short"

extract_code_metadata:
parallelize:
launcher_params:
timeout_min: 60
timeout_min: 10
cpus_per_task: 10
mem_gb: 10
mem_gb: 25
partition: "short"

finalize_MEDS_metadata:
Expand Down
1 change: 1 addition & 0 deletions src/MEDS_transforms/configs/_extract.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defaults:
- stage_configs:
- shard_events
- split_and_shard_subjects
- convert_to_sharded_events
- merge_to_MEDS_cohort
- extract_code_metadata
- finalize_MEDS_metadata
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
convert_to_sharded_events:
do_dedup_text_and_numeric: True
49 changes: 36 additions & 13 deletions src/MEDS_transforms/extract/convert_to_sharded_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ def get_code_expr(code_field: str | list | ListConfig) -> tuple[pl.Expr, pl.Expr
return code_expr, code_null_filter_expr, needed_cols


def extract_event(df: pl.LazyFrame, event_cfg: dict[str, str | None]) -> pl.LazyFrame:
def extract_event(
df: pl.LazyFrame, event_cfg: dict[str, str | None], do_dedup_text_and_numeric: bool = False,
) -> pl.LazyFrame:
"""Extracts a single event dataframe from the raw data.
Args:
Expand Down Expand Up @@ -123,6 +125,8 @@ def extract_event(df: pl.LazyFrame, event_cfg: dict[str, str | None]) -> pl.Lazy
possible, these additional columns should conform to the conventions of the MEDS data schema ---
e.g., primary numeric values associated with the event should be named `"numeric_value"` in
the output MEDS data (and thus have the key `"numeric_value"` in the `event_cfg` dictionary).
do_dedup_text_and_numeric: If true, the result will ensure that the `text_value` column is dropped if
it is simply a string version of the `numeric_value` column.
Returns:
A DataFrame containing the event data extracted from the raw data, containing only unique rows across
Expand Down Expand Up @@ -150,25 +154,27 @@ def extract_event(df: pl.LazyFrame, event_cfg: dict[str, str | None]) -> pl.Lazy
... "code_modifier": ["1", "2", "3", "4"],
... "time": ["2021-01-01", "2021-01-02", "2021-01-03", "2021-01-04"],
... "numeric_value": [1, 2, 3, 4],
... "woo_text": ["1", "2", "3/10", "4.24"],
... })
>>> event_cfg = {
... "code": ["FOO", "col(code)", "col(code_modifier)"],
... "time": "col(time)",
... "time_format": "%Y-%m-%d",
... "numeric_value": "numeric_value",
... "text_value": "woo_text",
... }
>>> extract_event(raw_data, event_cfg)
>>> extract_event(raw_data, event_cfg, do_dedup_text_and_numeric=True)
shape: (4, 4)
┌────────────┬───────────┬─────────────────────┬───────────────┐
│ subject_id ┆ code ┆ time ┆ numeric_value │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ datetime[μs] ┆ i64 │
╞════════════╪═══════════╪═════════════════════╪═══════════════╡
│ 1 ┆ FOO//A//1 ┆ 2021-01-01 00:00:00 ┆ 1 │
│ 1 ┆ FOO//B//2 ┆ 2021-01-02 00:00:00 ┆ 2 │
│ 2 ┆ FOO//C//3 ┆ 2021-01-03 00:00:00 ┆ 3 │
│ 2 ┆ FOO//D//4 ┆ 2021-01-04 00:00:00 ┆ 4 │
└────────────┴───────────┴─────────────────────┴───────────────┘
┌────────────┬───────────┬─────────────────────┬───────────────┬────────────
│ subject_id ┆ code ┆ time ┆ numeric_value ┆ text_value
│ --- ┆ --- ┆ --- ┆ --- ┆ ---
│ i64 ┆ str ┆ datetime[μs] ┆ i64 ┆ str
╞════════════╪═══════════╪═════════════════════╪═══════════════╪════════════
│ 1 ┆ FOO//A//1 ┆ 2021-01-01 00:00:00 ┆ 1 ┆ null
│ 1 ┆ FOO//B//2 ┆ 2021-01-02 00:00:00 ┆ 2 ┆ null
│ 2 ┆ FOO//C//3 ┆ 2021-01-03 00:00:00 ┆ 3 ┆ 3/10
│ 2 ┆ FOO//D//4 ┆ 2021-01-04 00:00:00 ┆ 4 ┆ 4.24
└────────────┴───────────┴─────────────────────┴───────────────┴────────────
>>> data_with_nulls = pl.DataFrame({
... "subject_id": [1, 1, 2, 2],
... "code": ["A", None, "C", "D"],
Expand Down Expand Up @@ -484,6 +490,20 @@ def extract_event(df: pl.LazyFrame, event_cfg: dict[str, str | None]) -> pl.Lazy

event_exprs[k] = col

has_numeric = "numeric_value" in event_exprs
has_text = "text_value" in event_exprs

if do_dedup_text_and_numeric and has_numeric and has_text:
text_expr = event_exprs["text_value"]
num_expr = event_exprs["numeric_value"]
event_exprs["text_value"] = (
pl.when(text_expr.cast(pl.Float32, strict=False) == num_expr.cast(pl.Float32))
.then(pl.lit(None, pl.String))
.otherwise(text_expr)
)

if "numeric_value" in event_exprs and "text_value" in event_exprs:

if code_null_filter_expr is not None:
logger.info(f"Filtering out rows with null codes via {code_null_filter_expr}")
df = df.filter(code_null_filter_expr)
Expand Down Expand Up @@ -656,7 +676,9 @@ def convert_to_events(
for event_name, event_cfg in event_cfgs.items():
try:
logger.info(f"Building computational graph for extracting {event_name}")
event_dfs.append(extract_event(df, event_cfg))
event_dfs.append(extract_event(
df, event_cfg, do_dedup_text_and_numeric=do_dedup_text_and_numeric,
))
except Exception as e:
raise ValueError(f"Error extracting event {event_name}: {e}") from e

Expand Down Expand Up @@ -731,6 +753,7 @@ def compute_fn(df: pl.LazyFrame) -> pl.LazyFrame:
return convert_to_events(
df.filter(pl.col("subject_id").is_in(typed_subjects)),
event_cfgs=copy.deepcopy(event_cfgs),
do_dedup_text_and_numeric=cfg.stage_cfg.get("do_dedup_text_and_numeric", False),
)
except Exception as e:
raise ValueError(
Expand Down

0 comments on commit 6fa26dc

Please sign in to comment.