Skip to content

Commit

Permalink
Corrected a small parallelism issue in reshard.
Browse files Browse the repository at this point in the history
  • Loading branch information
mmcdermott committed Aug 10, 2024
1 parent 5e47f0c commit e0d1ecb
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions src/MEDS_transforms/reshard_to_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""Utilities for re-sharding a MEDS cohort to subsharded splits."""

import json
import time
from collections import defaultdict
from pathlib import Path

Expand Down Expand Up @@ -96,21 +97,22 @@ def main(cfg: DictConfig):

logger.info("Merging sub-shards")
for subshard_name, subshard_dir in new_shards_iter:
in_dir = subshard_dir
in_fps = subshard_fps[subshard_name]
out_fp = subshard_dir.with_suffix(".parquet")
if out_fp.is_file():
if cfg.do_overwrite:
logger.warning(f"Overwriting {str(out_fp.resolve())}")
out_fp.unlink()
else:
raise FileExistsError(f"{str(out_fp.resolve())} already exists.")

logger.info(f"Merging {subshard_dir}/**/*.parquet into {str(out_fp.resolve())}")

if not subshard_fps:
raise ValueError(f"No subshards found for {subshard_name}!")

in_dir = subshard_dir
in_fps = subshard_fps[subshard_name]
if out_fp.is_file():
logger.info(f"Output file {str(out_fp.resolve())} already exists. Skipping.")
continue

while not (all(fp.is_file() for fp in in_fps) or out_fp.is_file()):
logger.info("Waiting to begin merging for all sub-shard files to be written...")
time.sleep(cfg.polling_time)

def read_fn(fp: Path) -> pl.LazyFrame:
return pl.concat([pl.scan_parquet(fp, glob=False) for fp in in_fps], how="diagonal_relaxed").sort(
Expand Down

0 comments on commit e0d1ecb

Please sign in to comment.