Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use job scheduling information to allow more flexible job submission #77

Merged
merged 49 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
94e36b8
initial commit figuring out the general shape of it
thomasmfish Dec 11, 2023
6ecb041
a bit more progress
thomasmfish Dec 11, 2023
1c827d2
fix typo
thomasmfish Dec 13, 2023
5d80e1b
`timestamp_ok` fix to work with JSI.start_time instead of a scheduler…
thomasmfish Dec 13, 2023
1e23d5c
Add a timestamp value to JSI separate from the start time
thomasmfish Dec 13, 2023
185bda0
`JobController` updates
thomasmfish Dec 13, 2023
17e3687
Remove duplicate key word argument
thomasmfish Dec 13, 2023
f032b8b
Rename `SchedulerModeInterface` to `JobSlicerInterface`
thomasmfish Jan 10, 2024
e60b232
Various fixes
thomasmfish Jan 10, 2024
03a2523
`wait_all_jobs`: work correctly with per-job timeouts
thomasmfish Jan 10, 2024
144e758
Check `job_scheduling_info.log_directory` exists and make it if not
thomasmfish Jan 11, 2024
89ffe32
Add a terminate after total wait time to avoid getting stuck waiting …
thomasmfish Jan 11, 2024
34b579d
use `datetime` and `timedelta` for `StatusInfo`
thomasmfish Jan 11, 2024
f7caa18
Set `time_to_dispatch` and `wall_time` with the right type
thomasmfish Jan 11, 2024
9fdc467
`_wait_for_jobs` working (to some extent!)
thomasmfish Jan 11, 2024
2226817
some progress with the job scheduler tests
thomasmfish Jan 11, 2024
c7bea74
`get_output_paths`: get output paths correctly from JSIs
thomasmfish Jan 12, 2024
5db4ce0
`wait_for_ended`: use the correct lists of jobs
thomasmfish Jan 12, 2024
468c409
`_report_job_info` check actual output file, as set in JSI
thomasmfish Jan 12, 2024
7776238
`get_job_history_batch`: `batch_number=None` now returns latest batch
thomasmfish Jan 12, 2024
1c1712d
use `JobSchedulingInformation.set_job_script_path`
thomasmfish Jan 12, 2024
e01fe15
Add final `check_jobscript_is_readable` check before sending for subm…
thomasmfish Jan 12, 2024
aa669c9
Use consistent phrasing for testing simplicity
thomasmfish Jan 12, 2024
f37ec35
`filter_killed_jobs`: correct typing
thomasmfish Jan 12, 2024
d92e36c
`resubmit_jobs`: allow filtering by job_id
thomasmfish Jan 12, 2024
6cfafe2
progress wth tests
thomasmfish Jan 12, 2024
aca8851
`test_job_scheduler_runs`: correct `create_js` args
thomasmfish Jan 15, 2024
95c7bb7
`resubmit_killed_jobs` and `get_job_history_batch`: Fixes for tests
thomasmfish Jan 15, 2024
0c99d49
All tests working (resubmit tests adapted)
thomasmfish Jan 15, 2024
7335a78
Remove old comment
thomasmfish Jan 15, 2024
e8478a1
Remove old variable from logging
thomasmfish Jan 15, 2024
3dad523
Remove `StatusInfo.output_path` in favour of using JSI, use stdout pa…
thomasmfish Jan 17, 2024
e005595
`fetch_and_update_state` use regex to check `tres_alloc_str`
thomasmfish Jan 17, 2024
5b62b2e
String doesn't need to be f-string
thomasmfish Jan 19, 2024
be20610
Replace leftover slots reference with cpus
PeterC-DLS Jan 19, 2024
22b8718
Set minimum Python version to 3.10, plus use Python 3.10+ style typing
thomasmfish Jan 19, 2024
9fb6912
`resubmit_killed_jobs`: fix args (missing =)
thomasmfish Jan 19, 2024
e02a6da
Remove unused imports
thomasmfish Jan 19, 2024
4669c02
add minimum versions to meta.yaml and environment.yaml
thomasmfish Jan 22, 2024
5ad4711
Refactor job slicers
PeterC-DLS Jan 25, 2024
a31d6b5
Update version number
PeterC-DLS Jan 26, 2024
0086c5c
Fix some bugs and type annotations from mypy
PeterC-DLS Jan 26, 2024
52e1b59
Fix bugs in memory argument handling and make token optional
PeterC-DLS Jan 28, 2024
1b59af3
Fix logic in job scheduler waiting method
PeterC-DLS Jan 28, 2024
b271520
Format imports and code
PeterC-DLS Jan 28, 2024
e7ee00e
Fix expected string in test
PeterC-DLS Jan 29, 2024
5d9b98b
Fix up job ID check
PeterC-DLS Jan 29, 2024
4e44dd5
`get_job`: error messages needed to be f-strings
thomasmfish Jan 29, 2024
c9142bb
Set fallback job script in job slicer to current working directory
PeterC-DLS Jan 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 46 additions & 30 deletions ParProcCo/job_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,13 @@ def fetch_and_update_state(
return slurm_state

def get_output_paths(
self, job_scheduling_info_list: list[JobSchedulingInformation] | ValuesView[JobSchedulingInformation]
self,
job_scheduling_info_list: list[JobSchedulingInformation]
| ValuesView[JobSchedulingInformation],
) -> tuple[Path, ...]:
return tuple(p for p in (
jsi.get_output_path()
for jsi in job_scheduling_info_list
) if p)
return tuple(
p for p in (jsi.get_output_path() for jsi in job_scheduling_info_list) if p
)

def get_success(
self, job_scheduling_info_list: list[JobSchedulingInformation]
Expand Down Expand Up @@ -346,7 +347,9 @@ def make_job_submission(

def wait_all_jobs(
self,
job_scheduling_info_list: Sequence[JobSchedulingInformation] | ValuesView[JobSchedulingInformation],
job_scheduling_info_list: Sequence[JobSchedulingInformation]
| ValuesView[JobSchedulingInformation],
in_group: bool,
state_group: STATEGROUP,
deadline: datetime,
sleep_time: int,
Expand All @@ -355,7 +358,7 @@ def wait_all_jobs(
while len(remaining_jobs) > 0 and datetime.now() <= deadline:
for jsi in list(remaining_jobs):
current_state = self.fetch_and_update_state(jsi)
if current_state in state_group:
if (current_state in state_group) == in_group:
remaining_jobs.remove(jsi)
if len(remaining_jobs) > 0:
time.sleep(sleep_time)
Expand All @@ -375,51 +378,60 @@ def get_deadline(
allow_from_submission: bool = False,
) -> datetime | None:
# Timeout shouldn't include queue time
if job_scheduling_info.status_info is None:
status_info = job_scheduling_info.status_info
if status_info is None:
return None
elif job_scheduling_info.status_info.start_time is None:
elif status_info.start_time is None:
if allow_from_submission:
return (
job_scheduling_info.status_info.submit_time
+ job_scheduling_info.timeout
)
return status_info.submit_time + job_scheduling_info.timeout
return None
return (
job_scheduling_info.status_info.start_time + job_scheduling_info.timeout
)
return status_info.start_time + job_scheduling_info.timeout

def handle_not_started(
job_scheduling_info_list: Sequence[JobSchedulingInformation] | ValuesView[JobSchedulingInformation],
job_scheduling_info_list: Sequence[JobSchedulingInformation]
| ValuesView[JobSchedulingInformation],
check_time: timedelta,
) -> list[JobSchedulingInformation]:
# Wait for jobs to start (timeout shouldn't include queue time)
starting_jobs = list(job_scheduling_info_list)
timeout = datetime.now() + check_time
while len(job_scheduling_info_list) > 0 and datetime.now() < timeout:
for jsi in self.wait_all_jobs(
logging.debug(
"Wait for jobs (%d) to start up to %s", len(starting_jobs), timeout
)
while len(starting_jobs) > 0 and datetime.now() < timeout:
starting_jobs = self.wait_all_jobs(
starting_jobs,
False,
STATEGROUP.STARTING,
timeout,
0,
):
starting_jobs.remove(jsi)
5,
)
if len(starting_jobs) > 0:
# We want ot sleep only if there are jobs waiting to start
# We want to sleep only if there are jobs waiting to start
time.sleep(5)
logging.info("Jobs left to start: %d", len(starting_jobs))
return starting_jobs

def wait_for_ended(
job_scheduling_info_list: Sequence[JobSchedulingInformation] | ValuesView[JobSchedulingInformation],
job_scheduling_info_list: Sequence[JobSchedulingInformation]
| ValuesView[JobSchedulingInformation],
deadline: datetime,
check_time: timedelta,
) -> list[JobSchedulingInformation]:
sleep_time = int(round(check_time.total_seconds()))
logging.debug(
"Wait for ending in %d jobs up to %s with sleeps of %ss",
len(job_scheduling_info_list),
deadline,
sleep_time,
)
# Wait for jobs to complete
self.wait_all_jobs(
job_scheduling_info_list,
True,
STATEGROUP.ENDED,
deadline,
int(round(check_time.total_seconds())),
sleep_time,
)
ended_jobs = handle_ended_jobs(job_scheduling_info_list)
logging.info(
Expand All @@ -430,7 +442,8 @@ def wait_for_ended(
return ended_jobs

def handle_ended_jobs(
job_scheduling_info_list: Sequence[JobSchedulingInformation] | ValuesView[JobSchedulingInformation],
job_scheduling_info_list: Sequence[JobSchedulingInformation]
| ValuesView[JobSchedulingInformation],
) -> list[JobSchedulingInformation]:
ended_jobs = []
for job_scheduling_info in job_scheduling_info_list:
Expand All @@ -442,7 +455,8 @@ def handle_ended_jobs(
return ended_jobs

def handle_timeouts(
job_scheduling_info_list: Sequence[JobSchedulingInformation] | ValuesView[JobSchedulingInformation],
job_scheduling_info_list: Sequence[JobSchedulingInformation]
| ValuesView[JobSchedulingInformation],
) -> list[JobSchedulingInformation]:
deadlines = (
(jsi, get_deadline(jsi, allow_from_submission=False))
Expand Down Expand Up @@ -492,8 +506,8 @@ def handle_timeouts(

if not running_jobs:
logging.warning("All jobs ended before wait began")
handle_ended_jobs(ended_jobs.values())
return

try:
while datetime.now() < wait_deadline and len(running_jobs) > 0:
# Handle none started (empty deadline list)
Expand All @@ -503,7 +517,8 @@ def handle_timeouts(
for deadline in (
get_deadline(jsi, allow_from_submission=True)
for jsi in running_jobs.values()
) if deadline is not None
)
if deadline is not None
]
)
check_time = min(
Expand Down Expand Up @@ -662,7 +677,8 @@ def resubmit_killed_jobs(
failed_jobs = [
jsi
for jsi in job_scheduling_info_dict.values()
if jsi.status_info and jsi.status_info.final_state != SLURMSTATE.COMPLETED
if jsi.status_info
and jsi.status_info.final_state != SLURMSTATE.COMPLETED
]
killed_jobs = self.filter_killed_jobs(failed_jobs)
logging.info(
Expand Down
18 changes: 14 additions & 4 deletions ParProcCo/passthru_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@

from .job_slicer_interface import JobSlicerInterface
from .program_wrapper import ProgramWrapper
from .utils import (check_jobscript_is_readable, check_location,
format_timestamp, get_absolute_path)
from .utils import (
check_jobscript_is_readable,
check_location,
format_timestamp,
get_absolute_path,
)

if TYPE_CHECKING:
from pathlib import Path
Expand All @@ -29,7 +33,7 @@ def create_slice_jobs(
timestamp = format_timestamp(job_scheduling_information.timestamp)
job_scheduling_information.stdout_filename = f"out_{timestamp}"
job_scheduling_information.stderr_filename = f"err_{timestamp}"
old_args= job_scheduling_information.job_script_arguments
old_args = job_scheduling_information.job_script_arguments
job_script = str(
check_jobscript_is_readable(check_location(get_absolute_path(old_args[0])))
)
Expand All @@ -42,7 +46,13 @@ def create_slice_jobs(
str(job_scheduling_information.job_resources.cpu_cores),
)
if job_scheduling_information.output_filename:
args += ("--output", job_scheduling_information.output_filename)
args += (
"--output",
str(
job_scheduling_information.output_dir
/ job_scheduling_information.output_filename
thomasmfish marked this conversation as resolved.
Show resolved Hide resolved
),
)
if len(old_args) > 1:
args += old_args[1:]
job_scheduling_information.job_script_arguments = args
Expand Down
4 changes: 3 additions & 1 deletion scripts/ppc_cluster_submit
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,12 @@ def run_ppc(args: argparse.Namespace, script_args: list[str]) -> None:
jc.run(args.jobs, wrapper_args, "PPC-" + program,
JobResources(memory=memory, cpu_cores=args.cores,
extra_properties=extra_properties))
print("complete")
print("Jobs completed")


if __name__ == "__main__":
args, script_args = create_parser().parse_known_args()
logging.getLogger().setLevel(logging.DEBUG if args.debug else logging.INFO)
if args.debug:
logging.basicConfig(format="%(asctime)s:%(levelname)s:%(name)s:%(module)s:%(funcName)s:%(lineno)d:%(message)s")
run_ppc(args, script_args)
Loading