Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use job scheduling information to allow more flexible job submission #77

Merged
merged 49 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
94e36b8
initial commit figuring out the general shape of it
thomasmfish Dec 11, 2023
6ecb041
a bit more progress
thomasmfish Dec 11, 2023
1c827d2
fix typo
thomasmfish Dec 13, 2023
5d80e1b
`timestamp_ok` fix to work with JSI.start_time instead of a scheduler…
thomasmfish Dec 13, 2023
1e23d5c
Add a timestamp value to JSI separate from the start time
thomasmfish Dec 13, 2023
185bda0
`JobController` updates
thomasmfish Dec 13, 2023
17e3687
Remove duplicate key word argument
thomasmfish Dec 13, 2023
f032b8b
Rename `SchedulerModeInterface` to `JobSlicerInterface`
thomasmfish Jan 10, 2024
e60b232
Various fixes
thomasmfish Jan 10, 2024
03a2523
`wait_all_jobs`: work correctly with per-job timeouts
thomasmfish Jan 10, 2024
144e758
Check `job_scheduling_info.log_directory` exists and make it if not
thomasmfish Jan 11, 2024
89ffe32
Add a terminate after total wait time to avoid getting stuck waiting …
thomasmfish Jan 11, 2024
34b579d
use `datetime` and `timedelta` for `StatusInfo`
thomasmfish Jan 11, 2024
f7caa18
Set `time_to_dispatch` and `wall_time` with the right type
thomasmfish Jan 11, 2024
9fdc467
`_wait_for_jobs` working (to some extent!)
thomasmfish Jan 11, 2024
2226817
some progress with the job scheduler tests
thomasmfish Jan 11, 2024
c7bea74
`get_output_paths`: get output paths correctly from JSIs
thomasmfish Jan 12, 2024
5db4ce0
`wait_for_ended`: use the correct lists of jobs
thomasmfish Jan 12, 2024
468c409
`_report_job_info` check actual output file, as set in JSI
thomasmfish Jan 12, 2024
7776238
`get_job_history_batch`: `batch_number=None` now returns latest batch
thomasmfish Jan 12, 2024
1c1712d
use `JobSchedulingInformation.set_job_script_path`
thomasmfish Jan 12, 2024
e01fe15
Add final `check_jobscript_is_readable` check before sending for subm…
thomasmfish Jan 12, 2024
aa669c9
Use consistent phrasing for testing simplicity
thomasmfish Jan 12, 2024
f37ec35
`filter_killed_jobs`: correct typing
thomasmfish Jan 12, 2024
d92e36c
`resubmit_jobs`: allow filtering by job_id
thomasmfish Jan 12, 2024
6cfafe2
progress wth tests
thomasmfish Jan 12, 2024
aca8851
`test_job_scheduler_runs`: correct `create_js` args
thomasmfish Jan 15, 2024
95c7bb7
`resubmit_killed_jobs` and `get_job_history_batch`: Fixes for tests
thomasmfish Jan 15, 2024
0c99d49
All tests working (resubmit tests adapted)
thomasmfish Jan 15, 2024
7335a78
Remove old comment
thomasmfish Jan 15, 2024
e8478a1
Remove old variable from logging
thomasmfish Jan 15, 2024
3dad523
Remove `StatusInfo.output_path` in favour of using JSI, use stdout pa…
thomasmfish Jan 17, 2024
e005595
`fetch_and_update_state` use regex to check `tres_alloc_str`
thomasmfish Jan 17, 2024
5b62b2e
String doesn't need to be f-string
thomasmfish Jan 19, 2024
be20610
Replace leftover slots reference with cpus
PeterC-DLS Jan 19, 2024
22b8718
Set minimum Python version to 3.10, plus use Python 3.10+ style typing
thomasmfish Jan 19, 2024
9fb6912
`resubmit_killed_jobs`: fix args (missing =)
thomasmfish Jan 19, 2024
e02a6da
Remove unused imports
thomasmfish Jan 19, 2024
4669c02
add minimum versions to meta.yaml and environment.yaml
thomasmfish Jan 22, 2024
5ad4711
Refactor job slicers
PeterC-DLS Jan 25, 2024
a31d6b5
Update version number
PeterC-DLS Jan 26, 2024
0086c5c
Fix some bugs and type annotations from mypy
PeterC-DLS Jan 26, 2024
52e1b59
Fix bugs in memory argument handling and make token optional
PeterC-DLS Jan 28, 2024
1b59af3
Fix logic in job scheduler waiting method
PeterC-DLS Jan 28, 2024
b271520
Format imports and code
PeterC-DLS Jan 28, 2024
e7ee00e
Fix expected string in test
PeterC-DLS Jan 29, 2024
5d9b98b
Fix up job ID check
PeterC-DLS Jan 29, 2024
4e44dd5
`get_job`: error messages needed to be f-strings
thomasmfish Jan 29, 2024
c9142bb
Set fallback job script in job slicer to current working directory
PeterC-DLS Jan 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions .pydevproject
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?eclipse-pydev version="1.0"?><pydev_project>




<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">davidia</pydev_property>


<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>


<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python interpreter</pydev_property>




<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">



<path>/${PROJECT_DIR_NAME}</path>

</pydev_pathproperty>


</pydev_pathproperty>


</pydev_project>
2 changes: 1 addition & 1 deletion ParProcCo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version_info = (1, 1, 0)
version_info = (2, 0, 0)
__version__ = ".".join(str(c) for c in version_info)
3 changes: 1 addition & 2 deletions ParProcCo/aggregator_interface.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from __future__ import annotations

from pathlib import Path
from typing import List


class AggregatorInterface:
def aggregate(self, aggregation_output_dir: Path, data_files: List[Path]) -> Path:
def aggregate(self, aggregation_output_dir: Path, data_files: list[Path]) -> Path:
"""Aggregates data from multiple output files into one"""
raise NotImplementedError
7 changes: 7 additions & 0 deletions ParProcCo/data_slicer_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from __future__ import annotations


class DataSlicerInterface:
def slice(self, number_jobs: int, stop: int | None = None) -> list[slice] | None:
"""Takes an input data file and returns a list of slice parameters."""
raise NotImplementedError
192 changes: 117 additions & 75 deletions ParProcCo/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

import logging
import os
from datetime import timedelta
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional

from .job_scheduler import JobScheduler
from .slicer_interface import SlicerInterface
from .utils import check_location, get_absolute_path
from .job_scheduling_information import JobResources, JobSchedulingInformation
from .program_wrapper import ProgramWrapper
from .data_slicer_interface import DataSlicerInterface
from .utils import (check_jobscript_is_readable, check_location,
get_absolute_path)

AGGREGATION_TIME = 60 # timeout per single file, in seconds

Expand All @@ -21,18 +22,17 @@ def __init__(
program_wrapper: ProgramWrapper,
output_dir_or_file: Path,
partition: str,
extra_properties: Optional[dict[str, str]] = None,
user_name: Optional[str] = None,
user_token: Optional[str] = None,
user_name: str | None = None,
user_token: str | None = None,
timeout: timedelta = timedelta(hours=2),
) -> None:
"""JobController is used to coordinate cluster job submissions with JobScheduler"""
"""JobController is used to coordinate cluster job submissions with
JobScheduler"""
self.url = url
self.program_wrapper = program_wrapper
self.partition = partition
self.extra_properties = extra_properties
self.output_file: Optional[Path] = None
self.cluster_output_dir: Optional[Path] = None
self.output_file: Path | None = None
self.cluster_output_dir: Path | None = None

if output_dir_or_file is not None:
logging.debug("JC output: %s", output_dir_or_file)
Expand All @@ -48,7 +48,7 @@ def __init__(
self.output_file,
)
try:
self.working_directory: Optional[Path] = check_location(os.getcwd())
self.working_directory: Path | None = check_location(os.getcwd())
except Exception:
logging.warning(
"Could not use %s as working directory on cluster so using %s",
Expand All @@ -57,39 +57,59 @@ def __init__(
)
self.working_directory = self.cluster_output_dir
logging.debug("JC working dir: %s", self.working_directory)
self.data_slicer: SlicerInterface
self.data_slicer: DataSlicerInterface
self.user_name = user_name
self.user_token = user_token
self.timeout = timeout
self.sliced_results: Optional[List[Path]] = None
self.aggregated_result: Optional[Path] = None
self.sliced_results: tuple[Path, ...] | None = None
self.aggregated_result: Path | None = None

def run(
self,
number_jobs: int,
jobscript_args: Optional[List] = None,
memory: int = 4000,
jobscript_args: list[str] | None = None,
job_name: str = "ParProcCo",
processing_job_resources: JobResources | None = None,
aggregation_job_resources: JobResources | None = None,
) -> None:
self.cluster_runner = check_location(
get_absolute_path(self.program_wrapper.get_cluster_runner_script())
)
self.cluster_runner = self.program_wrapper.get_process_script()
if self.cluster_runner is None:
raise ValueError("Processing script must be defined")
if processing_job_resources is None:
processing_job_resources = JobResources()
if aggregation_job_resources is None:
aggregation_job_resources = JobResources(
memory=processing_job_resources.memory,
cpu_cores=1,
extra_properties=processing_job_resources.extra_properties,
)
self.cluster_env = self.program_wrapper.get_environment()
logging.debug("Cluster environment is %s", self.cluster_env)
slice_params = self.program_wrapper.create_slices(number_jobs)

timestamp = datetime.now()
if jobscript_args:
jobscript_args[0] = str(
check_jobscript_is_readable(
check_location(get_absolute_path(jobscript_args[0]))
)
)
sliced_jobs_success = self._submit_sliced_jobs(
slice_params, jobscript_args, memory, job_name
number_jobs,
jobscript_args,
processing_job_resources,
job_name,
timestamp,
)

if sliced_jobs_success and self.sliced_results:
logging.info("Sliced jobs ran successfully.")
out_file: Path | None = None
if number_jobs == 1:
out_file = (
self.sliced_results[0] if len(self.sliced_results) > 0 else None
self.sliced_results[0] if self.sliced_results else None
)
else:
self._submit_aggregation_job(memory)
self._submit_aggregation_job(aggregation_job_resources, timestamp)
out_file = self.aggregated_result

if (
Expand All @@ -102,96 +122,118 @@ def run(
"Rename %s to %s: %s", out_file, renamed_file, renamed_file.exists()
)
else:
slice_params = self.program_wrapper.create_slices(number_jobs=number_jobs)
logging.error(
f"Sliced jobs failed with slice_params: {slice_params}, jobscript_args: {jobscript_args},"
f" memory: {memory}, job_name: {job_name}"
f"Sliced jobs failed with slice_params: {slice_params},"
f" jobscript_args: {jobscript_args}, job_name: {job_name}"
)
raise RuntimeError("Sliced jobs failed\n")

def _submit_sliced_jobs(
self,
slice_params: List[Optional[slice]],
jobscript_args: Optional[List],
memory: int,
number_of_jobs: int,
jobscript_args: list[str] | None,
job_resources: JobResources,
job_name: str,
timestamp: datetime,
) -> bool:
if jobscript_args is None:
jobscript_args = []

processing_mode = self.program_wrapper.processing_mode
processing_mode.set_parameters(slice_params)
assert self.cluster_runner
jsi = JobSchedulingInformation(
job_name=job_name,
job_script_path=self.cluster_runner,
job_resources=job_resources,
timeout=self.timeout,
job_script_arguments=tuple(jobscript_args),
working_directory=self.working_directory,
output_dir=self.output_file.parent if self.output_file else None,
output_filename=self.output_file.name if self.output_file else None,
log_directory=self.cluster_output_dir,
timestamp=timestamp,
)
jsi.set_job_env(self.cluster_env)

job_scheduler = JobScheduler(
self.url,
self.working_directory,
self.cluster_output_dir,
self.partition,
self.extra_properties,
self.timeout,
self.user_name,
self.user_token,
url=self.url,
partition=self.partition,
user_name=self.user_name,
user_token=self.user_token,
cluster_output_dir=self.cluster_output_dir,
)
sliced_jobs_success = job_scheduler.run(
processing_mode,
self.cluster_runner,
self.cluster_env,
memory,
processing_mode.cores,
jobscript_args,
job_name,

processing_jobs = self.program_wrapper.processing_slicer.create_slice_jobs(
jsi, self.program_wrapper.create_slices(number_jobs=number_of_jobs)
)

sliced_jobs_success = job_scheduler.run(processing_jobs)

if not sliced_jobs_success:
sliced_jobs_success = job_scheduler.resubmit_killed_jobs()

self.sliced_results = (
job_scheduler.get_output_paths() if sliced_jobs_success else None
job_scheduler.get_output_paths(processing_jobs)
if sliced_jobs_success
else None
)
return sliced_jobs_success

def _submit_aggregation_job(self, memory: int) -> None:
def _submit_aggregation_job(
self, job_resources: JobResources, timestamp: datetime
) -> None:
aggregator_path = self.program_wrapper.get_aggregate_script()
aggregating_mode = self.program_wrapper.aggregating_mode
if aggregating_mode is None or self.sliced_results is None:
aggregating_slicer = self.program_wrapper.aggregating_slicer
if aggregating_slicer is None or self.sliced_results is None:
return

aggregating_mode.set_parameters(self.sliced_results)

aggregation_args = []
if aggregator_path is not None:
aggregator_path = check_location(get_absolute_path(aggregator_path))
aggregation_args.append(aggregator_path)
aggregation_args.append(str(aggregator_path))

assert self.sliced_results is not None and self.cluster_runner
jsi = JobSchedulingInformation(
job_name=aggregating_slicer.__class__.__name__,
job_script_path=self.cluster_runner,
job_resources=job_resources,
job_script_arguments=tuple(aggregation_args),
working_directory=self.working_directory,
timeout=timedelta(seconds=AGGREGATION_TIME * len(self.sliced_results)),
output_dir=self.output_file.parent if self.output_file else None,
output_filename=self.output_file.name if self.output_file else None,
log_directory=self.cluster_output_dir,
timestamp=timestamp,
)
jsi.set_job_env(self.cluster_env)

aggregation_scheduler = JobScheduler(
self.url,
self.working_directory,
self.cluster_output_dir,
self.partition,
self.extra_properties,
timedelta(seconds=AGGREGATION_TIME * len(self.sliced_results)),
self.user_name,
self.user_token,
url=self.url,
partition=self.partition,
user_name=self.user_name,
user_token=self.user_token,
cluster_output_dir=self.cluster_output_dir,
)
aggregation_success = aggregation_scheduler.run(
aggregating_mode,
self.cluster_runner,
self.cluster_env,
memory,
aggregating_mode.cores,
aggregation_args,
aggregating_mode.__class__.__name__,

aggregation_jobs = aggregating_slicer.create_slice_jobs(
jsi, list(self.sliced_results)
)

aggregation_success = aggregation_scheduler.run(aggregation_jobs)

if not aggregation_success:
aggregation_scheduler.resubmit_killed_jobs(allow_all_failed=True)

if aggregation_success:
self.aggregated_result = aggregation_scheduler.get_output_paths()[0]
self.aggregated_result = aggregation_scheduler.get_output_paths(
aggregation_jobs
)[0]
for result in self.sliced_results:
os.remove(str(result))
else:
logging.warning(
f"Aggregated job was unsuccessful with aggregating_mode: {aggregating_mode},"
f" cluster_runner: {self.cluster_runner}, cluster_env: {self.cluster_env},"
f" aggregator_path: {aggregator_path}, aggregation_args: {aggregation_args}"
"Aggregated job was unsuccessful with aggregating_slicer:"
f" {aggregating_slicer}, cluster_runner: {self.cluster_runner},"
f" cluster_env: {self.cluster_env}, aggregator_path: {aggregator_path},"
f" aggregation_args: {aggregation_args}"
)
self.aggregated_result = None
Loading
Loading