Skip to content

Commit

Permalink
All tests working (resubmit tests adapted)
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasmfish committed Jan 15, 2024
1 parent 95c7bb7 commit 0c99d49
Showing 1 changed file with 73 additions and 46 deletions.
119 changes: 73 additions & 46 deletions tests/test_job_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ def test_resubmit_jobs(self) -> None:
jsi_list = processing_mode.create_slice_jobs(
slice_params=slices, job_scheduling_information=jsi
)
job_history = {}
completion_statuses = [False, True, False, True]
for job_id, jsi, status_info, completed in zip(
job_ids, jsi_list, status_infos, completion_statuses
Expand All @@ -625,11 +626,12 @@ def test_resubmit_jobs(self) -> None:
jsi.set_job_id(job_id)
jsi.update_status_info(status_info)
jsi.set_completion_status(completed)
job_history[job_id] = jsi

js = create_js(cluster_output_dir)
js.job_history.append({jsi.job_id: jsi for jsi in jsi_list})
js.job_history.append(job_history)

success = js.resubmit_jobs(job_ids=[0, 2])
success = js.resubmit_jobs(job_ids=[0, 2], batch=0)
self.assertTrue(success)
resubmitted_output_paths = [output_paths[i] for i in [0, 2]]
for output in resubmitted_output_paths:
Expand All @@ -640,8 +642,8 @@ def test_resubmit_jobs(self) -> None:
(
"all_success",
False,
{
0: {
[
{
i: StatusInfo(
output_path=Path(f"to/somewhere_{i}"),
submit_time=datetime.now(),
Expand All @@ -653,8 +655,8 @@ def test_resubmit_jobs(self) -> None:
)
for i in range(4)
}
},
{str(i): True for i in range(4)},
],
{i: True for i in range(4)},
False,
None,
True,
Expand All @@ -663,8 +665,8 @@ def test_resubmit_jobs(self) -> None:
(
"all_failed_do_not_allow",
False,
{
0: {
[
{
i: StatusInfo(
output_path=Path(f"to/somewhere_{i}"),
submit_time=datetime.now(),
Expand All @@ -676,8 +678,8 @@ def test_resubmit_jobs(self) -> None:
)
for i in range(4)
}
},
{str(i): False for i in range(4)},
],
{i: False for i in range(4)},
False,
None,
False,
Expand All @@ -700,7 +702,7 @@ def test_resubmit_jobs(self) -> None:
for i in range(4)
}
},
{str(i): False for i in range(4)},
{i: False for i in range(4)},
True,
[0, 1, 2, 3],
True,
Expand All @@ -709,8 +711,8 @@ def test_resubmit_jobs(self) -> None:
(
"some_failed_do_allow",
True,
{
0: {
[
{
0: StatusInfo(
output_path=Path("to/somewhere_0"),
submit_time=datetime.now(),
Expand Down Expand Up @@ -748,8 +750,8 @@ def test_resubmit_jobs(self) -> None:
final_state=SLURMSTATE.COMPLETED,
),
}
},
{"0": False, "1": True, "2": False, "3": True},
],
{0: False, 1: True, 2: False, 3: True},
True,
[0, 2],
True,
Expand All @@ -758,8 +760,8 @@ def test_resubmit_jobs(self) -> None:
(
"some_failed_do_not_allow",
False,
{
0: {
[
{
0: StatusInfo(
output_path=Path("to/somewhere_0"),
submit_time=datetime.now(),
Expand Down Expand Up @@ -797,8 +799,8 @@ def test_resubmit_jobs(self) -> None:
final_state=SLURMSTATE.COMPLETED,
),
}
},
{"0": False, "1": True, "2": False, "3": True},
],
{0: False, 1: True, 2: False, 3: True},
True,
[0, 2],
True,
Expand All @@ -810,7 +812,7 @@ def test_resubmit_killed_jobs(
self,
_name,
allow_all_failed,
job_history,
job_statuses,
job_completion_status,
runs,
indices,
Expand All @@ -825,46 +827,71 @@ def test_resubmit_killed_jobs(
working_directory, cluster_output_dir
)

processing_mode.set_parameters(slices)
js = create_js(working_directory, cluster_output_dir)
jobscript = setup_runner_script(working_directory)
processing_mode = SimpleProcessingMode(jobscript)
js.jobscript_path = jobscript
js.jobscript_args = [
str(setup_jobscript(working_directory)),
"--input-path",
str(input_path),
]
js.memory = 4000
js.cores = 6
js.job_name = "test_resubmit_jobs"
js.scheduler_mode = processing_mode
for output_path, status_info in zip(output_paths, job_history[0].values()):
status_info.output = output_path
js.job_env = {"ParProcCo": "0"}
js.job_history = job_history
js.job_completion_status = job_completion_status
js.output_paths = output_paths
runner_script = setup_runner_script(working_directory)
job_script = setup_jobscript(working_directory)
processing_mode = SimpleProcessingMode(runner_script)
jsi = JobSchedulingInformation(
job_name="test_resubmit_jobs",
job_script_path=job_script,
job_script_arguments=["--input-path", str(input_path)],
job_resources=JobResources(memory=4000, cpu_cores=6),
working_directory=working_directory,
job_env={"ParProcCo": "0"},
timestamp=datetime.now(),
output_dir=cluster_output_dir,
timeout=timedelta(seconds=30),
)

jsi_list = processing_mode.create_slice_jobs(
slice_params=slices, job_scheduling_information=jsi
)

job_history = {}
for job_id, jsi in enumerate(jsi_list):
status_info = job_statuses[0][job_id]
status_info.output_path = output_paths[
job_id
] # Only works because test IDs are 0, 1, 2, 3
jsi.log_directory = status_info.output_path.parent
jsi.stdout_filename = status_info.output_path.name
jsi.sterr_filename = status_info.output_path.name + "_err"
jsi.set_job_id(job_id)
jsi.update_status_info(status_info)
jsi.set_completion_status(job_completion_status[job_id])
job_history[job_id] = jsi

js = create_js(cluster_output_dir)
js.job_history.append(job_history)

if raises_error:
with self.assertRaises(RuntimeError) as context:
js.resubmit_killed_jobs(allow_all_failed)
js.resubmit_killed_jobs(allow_all_failed=allow_all_failed)
self.assertTrue(
"All jobs failed. job_history: " in str(context.exception)
)
self.assertEqual(js.batch_number, 0)
self.assertEqual(js.get_batch_number(), 0)
return

success = js.resubmit_killed_jobs(allow_all_failed)
success = js.resubmit_killed_jobs(allow_all_failed=allow_all_failed)
self.assertEqual(success, expected_success)
self.assertEqual(js.output_paths, output_paths)
latest_batch = js.get_job_history_batch()

if runs:
self.assertEqual(js.batch_number, 1)
self.assertEqual(js.get_batch_number(), 1)

resubmitted_output_paths = [output_paths[i] for i in indices]
self.assertEqual(
list(js.get_output_paths(latest_batch.values())),
resubmitted_output_paths,
)
for output in resubmitted_output_paths:
self.assertTrue(output.is_file())
else:
self.assertEqual(js.batch_number, 0)
self.assertEqual(js.get_batch_number(), 0)
self.assertEqual(
list(js.get_output_paths(latest_batch.values())),
output_paths,
)


if __name__ == "__main__":
Expand Down

0 comments on commit 0c99d49

Please sign in to comment.