Skip to content

Commit

Permalink
Use job scheduling information to allow more flexible job submission (
Browse files Browse the repository at this point in the history
#77)

* initial commit figuring out the general shape of it

* a bit more progress

* fix typo

* `timestamp_ok` fix to work with JSI.start_time instead of a scheduler-wide one

* Add a timestamp value to JSI separate from the start time

* `JobController` updates

* Remove duplicate key word argument

* Rename `SchedulerModeInterface` to `JobSlicerInterface`

* Various fixes

* `wait_all_jobs`: work correctly with per-job timeouts

* Check `job_scheduling_info.log_directory` exists and make it if not

* Add a terminate after total wait time to avoid getting stuck waiting for jobs stuck in a queue

* use `datetime` and `timedelta` for `StatusInfo`

* Set `time_to_dispatch` and `wall_time` with the right type

* `_wait_for_jobs` working (to some extent!)

* some progress with the job scheduler tests

* `get_output_paths`: get output paths correctly from JSIs

* `wait_for_ended`: use the correct lists of jobs

* `_report_job_info` check actual output file, as set in JSI

* `get_job_history_batch`:  `batch_number=None` now returns latest batch

* use `JobSchedulingInformation.set_job_script_path`

* Add final `check_jobscript_is_readable` check before sending for submission

* Use consistent phrasing for testing simplicity

* `filter_killed_jobs`: correct typing

* `resubmit_jobs`: allow filtering by job_id

* progress wth tests

* `test_job_scheduler_runs`: correct `create_js` args

* `resubmit_killed_jobs` and `get_job_history_batch`: Fixes for tests

* All tests working (resubmit tests adapted)

* Remove old comment

* Remove old variable from logging

* Remove `StatusInfo.output_path` in favour of using JSI, use stdout path to check state

* `fetch_and_update_state` use regex to check `tres_alloc_str`

* String doesn't need to be f-string

* Replace leftover slots reference with cpus

Reformat long lines

* Set minimum Python version to 3.10, plus use Python 3.10+ style typing

* `resubmit_killed_jobs`: fix args (missing =)

* Remove unused imports

* add minimum versions to meta.yaml and environment.yaml

* Refactor job slicers

Add back methods in program wrapper and tweak job slicer implementations to
fix job controller tests, and fix job scheduler tests

* Update version number

* Fix some bugs and type annotations from mypy

Add back script names for some job slicers

* Fix bugs in memory argument handling and make token optional

* Fix logic in job scheduler waiting method

Fix wrong destination of output in pass thru wrapper, and more logging info

* Format imports and code

* Fix expected string in test

Add documentation on testing and rename test env var for slurm partition

* Fix up job ID check

Simplify output path in pass thru wrapper, remove odd bits

* `get_job`: error messages needed to be f-strings

* Set fallback job script in job slicer to current working directory

---------

Co-authored-by: Peter Chang <peter.chang@diamond.ac.uk>
  • Loading branch information
thomasmfish and PeterC-DLS authored Jan 31, 2024
1 parent d83439d commit 12c62e8
Show file tree
Hide file tree
Showing 37 changed files with 1,775 additions and 1,344 deletions.
20 changes: 13 additions & 7 deletions .pydevproject
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?eclipse-pydev version="1.0"?><pydev_project>




<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">davidia</pydev_property>


<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>


<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python interpreter</pydev_property>




<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">



<path>/${PROJECT_DIR_NAME}</path>

</pydev_pathproperty>


</pydev_pathproperty>


</pydev_project>
2 changes: 1 addition & 1 deletion ParProcCo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version_info = (1, 1, 0)
version_info = (2, 0, 0)
__version__ = ".".join(str(c) for c in version_info)
3 changes: 1 addition & 2 deletions ParProcCo/aggregator_interface.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from __future__ import annotations

from pathlib import Path
from typing import List


class AggregatorInterface:
def aggregate(self, aggregation_output_dir: Path, data_files: List[Path]) -> Path:
def aggregate(self, aggregation_output_dir: Path, data_files: list[Path]) -> Path:
"""Aggregates data from multiple output files into one"""
raise NotImplementedError
7 changes: 7 additions & 0 deletions ParProcCo/data_slicer_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from __future__ import annotations


class DataSlicerInterface:
def slice(self, number_jobs: int, stop: int | None = None) -> list[slice] | None:
"""Takes an input data file and returns a list of slice parameters."""
raise NotImplementedError
193 changes: 116 additions & 77 deletions ParProcCo/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

import logging
import os
from datetime import timedelta
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional

from .data_slicer_interface import DataSlicerInterface
from .job_scheduler import JobScheduler
from .slicer_interface import SlicerInterface
from .utils import check_location, get_absolute_path
from .job_scheduling_information import JobResources, JobSchedulingInformation
from .program_wrapper import ProgramWrapper
from .utils import check_jobscript_is_readable, check_location, get_absolute_path

AGGREGATION_TIME = 60 # timeout per single file, in seconds

Expand All @@ -21,18 +21,17 @@ def __init__(
program_wrapper: ProgramWrapper,
output_dir_or_file: Path,
partition: str,
extra_properties: Optional[dict[str, str]] = None,
user_name: Optional[str] = None,
user_token: Optional[str] = None,
user_name: str | None = None,
user_token: str | None = None,
timeout: timedelta = timedelta(hours=2),
) -> None:
"""JobController is used to coordinate cluster job submissions with JobScheduler"""
"""JobController is used to coordinate cluster job submissions with
JobScheduler"""
self.url = url
self.program_wrapper = program_wrapper
self.partition = partition
self.extra_properties = extra_properties
self.output_file: Optional[Path] = None
self.cluster_output_dir: Optional[Path] = None
self.output_file: Path | None = None
self.cluster_output_dir: Path | None = None

if output_dir_or_file is not None:
logging.debug("JC output: %s", output_dir_or_file)
Expand All @@ -48,7 +47,7 @@ def __init__(
self.output_file,
)
try:
self.working_directory: Optional[Path] = check_location(os.getcwd())
self.working_directory: Path | None = check_location(os.getcwd())
except Exception:
logging.warning(
"Could not use %s as working directory on cluster so using %s",
Expand All @@ -57,39 +56,57 @@ def __init__(
)
self.working_directory = self.cluster_output_dir
logging.debug("JC working dir: %s", self.working_directory)
self.data_slicer: SlicerInterface
self.data_slicer: DataSlicerInterface
self.user_name = user_name
self.user_token = user_token
self.timeout = timeout
self.sliced_results: Optional[List[Path]] = None
self.aggregated_result: Optional[Path] = None
self.sliced_results: tuple[Path, ...] | None = None
self.aggregated_result: Path | None = None

def run(
self,
number_jobs: int,
jobscript_args: Optional[List] = None,
memory: int = 4000,
jobscript_args: list[str] | None = None,
job_name: str = "ParProcCo",
processing_job_resources: JobResources | None = None,
aggregation_job_resources: JobResources | None = None,
) -> None:
self.cluster_runner = check_location(
get_absolute_path(self.program_wrapper.get_cluster_runner_script())
)
self.cluster_runner = self.program_wrapper.get_process_script()
if self.cluster_runner is None:
raise ValueError("Processing script must be defined")
if processing_job_resources is None:
processing_job_resources = JobResources()
if aggregation_job_resources is None:
aggregation_job_resources = JobResources(
memory=processing_job_resources.memory,
cpu_cores=1,
extra_properties=processing_job_resources.extra_properties,
)
self.cluster_env = self.program_wrapper.get_environment()
logging.debug("Cluster environment is %s", self.cluster_env)
slice_params = self.program_wrapper.create_slices(number_jobs)

timestamp = datetime.now()
if jobscript_args:
jobscript_args[0] = str(
check_jobscript_is_readable(
check_location(get_absolute_path(jobscript_args[0]))
)
)
sliced_jobs_success = self._submit_sliced_jobs(
slice_params, jobscript_args, memory, job_name
number_jobs,
jobscript_args,
processing_job_resources,
job_name,
timestamp,
)

if sliced_jobs_success and self.sliced_results:
logging.info("Sliced jobs ran successfully.")
out_file: Path | None = None
if number_jobs == 1:
out_file = (
self.sliced_results[0] if len(self.sliced_results) > 0 else None
)
out_file = self.sliced_results[0] if self.sliced_results else None
else:
self._submit_aggregation_job(memory)
self._submit_aggregation_job(aggregation_job_resources, timestamp)
out_file = self.aggregated_result

if (
Expand All @@ -102,96 +119,118 @@ def run(
"Rename %s to %s: %s", out_file, renamed_file, renamed_file.exists()
)
else:
slice_params = self.program_wrapper.create_slices(number_jobs=number_jobs)
logging.error(
f"Sliced jobs failed with slice_params: {slice_params}, jobscript_args: {jobscript_args},"
f" memory: {memory}, job_name: {job_name}"
f"Sliced jobs failed with slice_params: {slice_params},"
f" jobscript_args: {jobscript_args}, job_name: {job_name}"
)
raise RuntimeError("Sliced jobs failed\n")

def _submit_sliced_jobs(
self,
slice_params: List[Optional[slice]],
jobscript_args: Optional[List],
memory: int,
number_of_jobs: int,
jobscript_args: list[str] | None,
job_resources: JobResources,
job_name: str,
timestamp: datetime,
) -> bool:
if jobscript_args is None:
jobscript_args = []

processing_mode = self.program_wrapper.processing_mode
processing_mode.set_parameters(slice_params)
assert self.cluster_runner
jsi = JobSchedulingInformation(
job_name=job_name,
job_script_path=self.cluster_runner,
job_resources=job_resources,
timeout=self.timeout,
job_script_arguments=tuple(jobscript_args),
working_directory=self.working_directory,
output_dir=self.output_file.parent if self.output_file else None,
output_filename=self.output_file.name if self.output_file else None,
log_directory=self.cluster_output_dir,
timestamp=timestamp,
)
jsi.set_job_env(self.cluster_env)

job_scheduler = JobScheduler(
self.url,
self.working_directory,
self.cluster_output_dir,
self.partition,
self.extra_properties,
self.timeout,
self.user_name,
self.user_token,
url=self.url,
partition=self.partition,
user_name=self.user_name,
user_token=self.user_token,
cluster_output_dir=self.cluster_output_dir,
)
sliced_jobs_success = job_scheduler.run(
processing_mode,
self.cluster_runner,
self.cluster_env,
memory,
processing_mode.cores,
jobscript_args,
job_name,

processing_jobs = self.program_wrapper.processing_slicer.create_slice_jobs(
jsi, self.program_wrapper.create_slices(number_jobs=number_of_jobs)
)

sliced_jobs_success = job_scheduler.run(processing_jobs)

if not sliced_jobs_success:
sliced_jobs_success = job_scheduler.resubmit_killed_jobs()

self.sliced_results = (
job_scheduler.get_output_paths() if sliced_jobs_success else None
job_scheduler.get_output_paths(processing_jobs)
if sliced_jobs_success
else None
)
return sliced_jobs_success

def _submit_aggregation_job(self, memory: int) -> None:
def _submit_aggregation_job(
self, job_resources: JobResources, timestamp: datetime
) -> None:
aggregator_path = self.program_wrapper.get_aggregate_script()
aggregating_mode = self.program_wrapper.aggregating_mode
if aggregating_mode is None or self.sliced_results is None:
aggregating_slicer = self.program_wrapper.aggregating_slicer
if aggregating_slicer is None or self.sliced_results is None:
return

aggregating_mode.set_parameters(self.sliced_results)

aggregation_args = []
if aggregator_path is not None:
aggregator_path = check_location(get_absolute_path(aggregator_path))
aggregation_args.append(aggregator_path)
aggregation_args.append(str(aggregator_path))

assert self.sliced_results is not None and self.cluster_runner
jsi = JobSchedulingInformation(
job_name=aggregating_slicer.__class__.__name__,
job_script_path=self.cluster_runner,
job_resources=job_resources,
job_script_arguments=tuple(aggregation_args),
working_directory=self.working_directory,
timeout=timedelta(seconds=AGGREGATION_TIME * len(self.sliced_results)),
output_dir=self.output_file.parent if self.output_file else None,
output_filename=self.output_file.name if self.output_file else None,
log_directory=self.cluster_output_dir,
timestamp=timestamp,
)
jsi.set_job_env(self.cluster_env)

aggregation_scheduler = JobScheduler(
self.url,
self.working_directory,
self.cluster_output_dir,
self.partition,
self.extra_properties,
timedelta(seconds=AGGREGATION_TIME * len(self.sliced_results)),
self.user_name,
self.user_token,
url=self.url,
partition=self.partition,
user_name=self.user_name,
user_token=self.user_token,
cluster_output_dir=self.cluster_output_dir,
)
aggregation_success = aggregation_scheduler.run(
aggregating_mode,
self.cluster_runner,
self.cluster_env,
memory,
aggregating_mode.cores,
aggregation_args,
aggregating_mode.__class__.__name__,

aggregation_jobs = aggregating_slicer.create_slice_jobs(
jsi, list(self.sliced_results)
)

aggregation_success = aggregation_scheduler.run(aggregation_jobs)

if not aggregation_success:
aggregation_scheduler.resubmit_killed_jobs(allow_all_failed=True)

if aggregation_success:
self.aggregated_result = aggregation_scheduler.get_output_paths()[0]
self.aggregated_result = aggregation_scheduler.get_output_paths(
aggregation_jobs
)[0]
for result in self.sliced_results:
os.remove(str(result))
else:
logging.warning(
f"Aggregated job was unsuccessful with aggregating_mode: {aggregating_mode},"
f" cluster_runner: {self.cluster_runner}, cluster_env: {self.cluster_env},"
f" aggregator_path: {aggregator_path}, aggregation_args: {aggregation_args}"
"Aggregated job was unsuccessful with aggregating_slicer:"
f" {aggregating_slicer}, cluster_runner: {self.cluster_runner},"
f" cluster_env: {self.cluster_env}, aggregator_path: {aggregator_path},"
f" aggregation_args: {aggregation_args}"
)
self.aggregated_result = None
Loading

0 comments on commit 12c62e8

Please sign in to comment.