Skip to content

Commit

Permalink
Migrate from drmaa2 to Slurm
Browse files Browse the repository at this point in the history
  • Loading branch information
VictoriaBeilsten-Edmands committed Aug 31, 2023
1 parent 70b0eea commit f78ed2d
Show file tree
Hide file tree
Showing 19 changed files with 1,221 additions and 355 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.9", "3.10"]
python-version: ["3.10"]

steps:
- uses: actions/checkout@v2
Expand Down
57 changes: 38 additions & 19 deletions ParProcCo/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
class JobController:
def __init__(
self,
url: str,
program_wrapper: ProgramWrapper,
output_dir_or_file: Path,
project: str,
queue: str,
cluster_resources: Optional[dict[str, str]] = None,
version: str = "v0.0.38",
user_name: Optional[str] = None,
user_token: Optional[str] = None,
timeout: timedelta = timedelta(hours=2),
):
"""JobController is used to coordinate cluster job submissions with JobScheduler"""
self.url = url
self.program_wrapper = program_wrapper
self.output_file: Optional[Path] = None
self.cluster_output_dir: Optional[Path] = None

if output_dir_or_file is not None:
logging.debug("JC output: %s", output_dir_or_file)
if output_dir_or_file.is_dir():
Expand All @@ -34,25 +37,35 @@ def __init__(
output_dir = output_dir_or_file.parent
self.output_file = output_dir_or_file
self.cluster_output_dir = check_location(output_dir)
logging.debug("JC cluster output: %s; file %s", self.cluster_output_dir, self.output_file)
logging.debug(
"JC cluster output: %s; file %s",
self.cluster_output_dir,
self.output_file,
)
try:
self.working_directory: Optional[Path] = check_location(os.getcwd())
except Exception:
logging.warning(
f"Could not use %s as working directory on cluster so using %s", os.getcwd(), self.cluster_output_dir
"Could not use %s as working directory on cluster so using %s",
os.getcwd(),
self.cluster_output_dir,
)
self.working_directory = self.cluster_output_dir
logging.debug("JC working dir: %s", self.working_directory)
self.data_slicer: SlicerInterface
self.project = project
self.queue = queue
self.cluster_resources = cluster_resources
self.version = version
self.user_name = user_name if user_name else os.environ["USER"]
self.user_token = user_token if user_token else os.environ["SLURM_JWT"]
self.timeout = timeout
self.sliced_results: Optional[List[Path]] = None
self.aggregated_result: Optional[Path] = None

def run(
self, number_jobs: int, jobscript_args: Optional[List] = None, memory: str = "4G", job_name: str = "ParProcCo"
self,
number_jobs: int,
jobscript_args: Optional[List] = None,
memory: str = 4000,
job_name: str = "ParProcCo",
) -> None:
self.cluster_runner = check_location(get_absolute_path(self.program_wrapper.get_cluster_runner_script()))
self.cluster_env = self.program_wrapper.get_environment()
Expand All @@ -76,10 +89,14 @@ def run(
f"Sliced jobs failed with slice_params: {slice_params}, jobscript_args: {jobscript_args},"
f" memory: {memory}, job_name: {job_name}"
)
raise RuntimeError(f"Sliced jobs failed\n")
raise RuntimeError("Sliced jobs failed\n")

def _run_sliced_jobs(
self, slice_params: List[Optional[slice]], jobscript_args: Optional[List], memory: str, job_name: str
self,
slice_params: List[Optional[slice]],
jobscript_args: Optional[List],
memory: str,
job_name: str,
) -> bool:
if jobscript_args is None:
jobscript_args = []
Expand All @@ -88,12 +105,13 @@ def _run_sliced_jobs(
processing_mode.set_parameters(slice_params)

job_scheduler = JobScheduler(
self.url,
self.working_directory,
self.cluster_output_dir,
self.project,
self.queue,
self.cluster_resources,
self.timeout,
self.version,
self.user_name,
self.user_token,
)
sliced_jobs_success = job_scheduler.run(
processing_mode,
Expand Down Expand Up @@ -125,12 +143,13 @@ def _run_aggregation_job(self, memory: str) -> None:
aggregation_args.append(aggregator_path)

aggregation_scheduler = JobScheduler(
self.url,
self.working_directory,
self.cluster_output_dir,
self.project,
self.queue,
self.cluster_resources,
self.timeout,
self.version,
self.user_name,
self.user_token,
)
aggregation_success = aggregation_scheduler.run(
aggregating_mode,
Expand All @@ -141,7 +160,6 @@ def _run_aggregation_job(self, memory: str) -> None:
aggregation_args,
aggregating_mode.__class__.__name__,
)

if not aggregation_success:
aggregation_scheduler.rerun_killed_jobs(allow_all_failed=True)

Expand All @@ -152,6 +170,7 @@ def _run_aggregation_job(self, memory: str) -> None:
else:
logging.warning(
f"Aggregated job was unsuccessful with aggregating_mode: {aggregating_mode},"
f" cluster_runner: {self.cluster_runner}, cluster_env: {self.cluster_env}, aggregator_path: {aggregator_path}, aggregation_args: {aggregation_args}"
f" cluster_runner: {self.cluster_runner}, cluster_env: {self.cluster_env},"
f" aggregator_path: {aggregator_path}, aggregation_args: {aggregation_args}"
)
self.aggregated_result = None
Loading

0 comments on commit f78ed2d

Please sign in to comment.