Skip to content

Commit

Permalink
Merge pull request #1696 from fractal-analytics-platform/1695-add-tes…
Browse files Browse the repository at this point in the history
…ts-for-fractalslurmsshexecutorshutdown

Handle early shutdown in SSH executor
  • Loading branch information
tcompa authored Jul 26, 2024
2 parents 7b0fae9 + 0c753ee commit fe74c18
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

# 2.3.7

* SSH SLURM executor:
* Handle early shutdown in SSH executor (\#1696).
* Task collection:
* Introduce a new configuration variable `FRACTAL_MAX_PIP_VERSION` to pin task-collection pip (\#1675).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from cfut import FileWaitThread

from ......logger import set_logger
from fractal_server.app.runner.exceptions import JobExecutionError

logger = set_logger(__name__)

Expand Down Expand Up @@ -48,6 +49,10 @@ def wait(self, *, job_id: str):
This method is executed on the main thread.
"""
if self.shutdown:
error_msg = "Cannot call `wait` method after executor shutdown."
logger.warning(error_msg)
raise JobExecutionError(info=error_msg)
with self.lock:
self.active_job_ids.append(job_id)

Expand Down
23 changes: 23 additions & 0 deletions fractal_server/app/runner/executors/slurm/ssh/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,12 @@ def submit(
Future representing the execution of the current SLURM job.
"""

# Do not continue if auxiliary thread was shut down
if self.wait_thread.shutdown:
error_msg = "Cannot call `submit` method after executor shutdown"
logger.warning(error_msg)
raise JobExecutionError(info=error_msg)

# Set defaults, if needed
if slurm_config is None:
slurm_config = get_default_slurm_config()
Expand Down Expand Up @@ -436,6 +442,12 @@ def map(
"""

# Do not continue if auxiliary thread was shut down
if self.wait_thread.shutdown:
error_msg = "Cannot call `map` method after executor shutdown"
logger.warning(error_msg)
raise JobExecutionError(info=error_msg)

def _result_or_cancel(fut):
"""
This function is based on the Python Standard Library 3.11.
Expand Down Expand Up @@ -867,6 +879,14 @@ def _submit_job(self, job: SlurmJob) -> tuple[Future, str]:
job: The `SlurmJob` object to submit.
"""

# Prevent calling sbatch if auxiliary thread was shut down
if self.wait_thread.shutdown:
error_msg = (
"Cannot call `_submit_job` method after executor shutdown"
)
logger.warning(error_msg)
raise JobExecutionError(info=error_msg)

# Submit job to SLURM, and get jobid
sbatch_command = f"sbatch --parsable {job.slurm_script_remote}"
pre_submission_cmds = job.slurm_config.pre_submission_commands
Expand Down Expand Up @@ -1336,6 +1356,9 @@ def shutdown(self, wait=True, *, cancel_futures=False):
the self.wait_thread thread, see _completion.
"""

# Redudantly set thread shutdown attribute to True
self.wait_thread.shutdown = True

logger.debug("Executor shutdown: start")

# Handle all job futures
Expand Down
64 changes: 64 additions & 0 deletions tests/v2/00_ssh/test_executor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
from pathlib import Path

import pytest
from devtools import debug

from fractal_server.app.runner.exceptions import JobExecutionError
from fractal_server.app.runner.executors.slurm.ssh.executor import (
FractalSlurmSSHExecutor,
) # noqa
Expand Down Expand Up @@ -111,3 +113,65 @@ def test_slurm_ssh_executor_submit_with_pre_sbatch(
debug(fut.result())

assert auxfile.exists()


def test_slurm_ssh_executor_shutdown_before_job_submission(
fractal_ssh,
tmp_path: Path,
tmp777_path: Path,
override_settings_factory,
current_py_version: str,
):
"""
Verify the behavior when shutdown is called before any job has started.
"""

override_settings_factory(
FRACTAL_SLURM_WORKER_PYTHON=f"/usr/bin/python{current_py_version}"
)

with MockFractalSSHSlurmExecutor(
workflow_dir_local=tmp_path / "job_dir1",
workflow_dir_remote=(tmp777_path / "remote_job_dir1"),
slurm_poll_interval=1,
fractal_ssh=fractal_ssh,
) as executor:
executor.shutdown()
with pytest.raises(JobExecutionError) as exc_info:
fut = executor.submit(lambda: 1)
fut.result()
debug(exc_info.value)

with MockFractalSSHSlurmExecutor(
workflow_dir_local=tmp_path / "job_dir2",
workflow_dir_remote=(tmp777_path / "remote_job_dir2"),
slurm_poll_interval=1,
fractal_ssh=fractal_ssh,
) as executor:
executor.shutdown()
with pytest.raises(JobExecutionError) as exc_info:
fut = executor.map(lambda x: 1, [1, 2, 3])
fut.result()
debug(exc_info.value)

with MockFractalSSHSlurmExecutor(
workflow_dir_local=tmp_path / "job_dir3",
workflow_dir_remote=(tmp777_path / "remote_job_dir3"),
slurm_poll_interval=1,
fractal_ssh=fractal_ssh,
) as executor:
executor.shutdown()
with pytest.raises(JobExecutionError) as exc_info:
executor.wait_thread.wait(job_id=1)
debug(exc_info.value)

with MockFractalSSHSlurmExecutor(
workflow_dir_local=tmp_path / "job_dir4",
workflow_dir_remote=(tmp777_path / "remote_job_dir4"),
slurm_poll_interval=1,
fractal_ssh=fractal_ssh,
) as executor:
executor.shutdown()
with pytest.raises(JobExecutionError) as exc_info:
executor._submit_job(None)
debug(exc_info.value)

0 comments on commit fe74c18

Please sign in to comment.