Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review SLURM configuration and include new configuration option for pre-sbatch commands and gpus #1678

Merged
merged 32 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5b2d25b
Fix f-string in log
tcompa Jul 23, 2024
8d8d9bf
Improve check of `gpu_slurm_config=None`
tcompa Jul 23, 2024
1d59771
Merge ssh/non-ssh redundant copies of `get_slurm_config.py`
tcompa Jul 23, 2024
57a2977
Rename v1 test
tcompa Jul 23, 2024
d0466d4
Move get_slurm_config module to v2 scope
tcompa Jul 23, 2024
779e2d3
Remove obsolete directory arguments from `get_slurm_config` in v2
tcompa Jul 23, 2024
c3672dd
Fix typo in tests (paralell->parallel)
tcompa Jul 23, 2024
1774d88
Remove obsolete argument from `get_slurm_config` call
tcompa Jul 23, 2024
c4cecb5
Add test_slurm_config for v2
tcompa Jul 23, 2024
060680e
Rename `app.runner.v2._slurm` into `app.runner.v2._slurm_sudo`
tcompa Jul 23, 2024
08ec232
Rename `app.runner.v2._slurm` into `app.runner.v2._slurm_sudo`
tcompa Jul 23, 2024
8879240
Add init file for `_slurm_common` subpackage
tcompa Jul 23, 2024
baafe51
Simplify conditional check in slurm config
tcompa Jul 23, 2024
3bf72b1
Include `pre_submission_commands` in slurm config models
tcompa Jul 23, 2024
b819297
Include logic for pre-submission commands in ssh executor (ref #1565)
tcompa Jul 23, 2024
cbbbd70
Add `test_slurm_ssh_executor_submit_with_pre_sbatch`
tcompa Jul 23, 2024
5d0e6c3
Add `test_run_command_session_persistence`
tcompa Jul 23, 2024
9da7006
Use `pre_submission_commands` attribute name consistently
tcompa Jul 24, 2024
69998d5
Include SLURM `gpus` parameter (ref #1679)
tcompa Jul 24, 2024
00da10b
Fix behvior for `needs_gpu=False` in slurm-config
tcompa Jul 24, 2024
7621d83
Add `test_get_slurm_config_gpu_options`
tcompa Jul 24, 2024
320f1a9
Print warning when using `pre_submission_commands` in non-SSH SLURM e…
tcompa Jul 24, 2024
a3d698f
Introduce `FractalSSH.write_remote_file`
tcompa Jul 24, 2024
c29d5e6
Introduce test for `FractalSSH.write_remote_file`
tcompa Jul 24, 2024
4999de3
Rename variable in fixture
tcompa Jul 24, 2024
e9f0e3f
Simplify error message
tcompa Jul 24, 2024
37ebade
Fix comment
tcompa Jul 24, 2024
168ccb8
Update CHANGELOG [skip ci]
tcompa Jul 24, 2024
6a117c1
Fix comment [skip ci]
tcompa Jul 24, 2024
bda4cd0
Update CHANGELOG [skip ci]
tcompa Jul 24, 2024
fc467b9
Drop obsolete test
tcompa Jul 24, 2024
de4b037
Cover additional branch in `write_remote_file` method test
tcompa Jul 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benchmarks/runner/benchmark_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ def mock_venv(tmp_path: str) -> dict:
args[
"command_non_parallel"
] = f"{python} {src_dir / task['executable_non_parallel']}"
args["meta_non_paralell"] = task.get("meta_non_paralell")
args["meta_non_parallel"] = task.get("meta_non_parallel")
if task.get("executable_parallel"):
args[
"command_parallel"
] = f"{python} {src_dir / task['executable_parallel']}"
args["meta_paralell"] = task.get("meta_paralell")
args["meta_parallel"] = task.get("meta_parallel")

t = TaskV2Mock(
id=ind,
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/runner/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class TaskV2Mock(BaseModel):

command_non_parallel: Optional[str] = None
command_parallel: Optional[str] = None
meta_non_paralell: Optional[dict[str, Any]] = Field(default_factory=dict)
meta_paralell: Optional[dict[str, Any]] = Field(default_factory=dict)
meta_non_parallel: Optional[dict[str, Any]] = Field(default_factory=dict)
meta_parallel: Optional[dict[str, Any]] = Field(default_factory=dict)
type: Optional[str]

@root_validator(pre=False)
Expand Down
6 changes: 6 additions & 0 deletions fractal_server/app/runner/executors/slurm/_slurm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class _SlurmConfigSet(BaseModel, extra=Extra.forbid):
time: Optional[str]
account: Optional[str]
extra_lines: Optional[list[str]]
pre_submission_commands: Optional[list[str]]


class _BatchingConfigSet(BaseModel, extra=Extra.forbid):
Expand Down Expand Up @@ -240,6 +241,8 @@ class SlurmConfig(BaseModel, extra=Extra.forbid):
Key-value pairs to be included as `export`-ed variables in SLURM
submission script, after prepending values with the user's cache
directory.
pre_submission_commands: List of commands to be prepended to the sbatch
command.
"""

# Required SLURM parameters (note that the integer attributes are those
Expand Down Expand Up @@ -274,6 +277,9 @@ class SlurmConfig(BaseModel, extra=Extra.forbid):
target_num_jobs: int
max_num_jobs: int

# FIXME
pre_submission_cmds: list[str] = Field(default_factory=list)

def _sorted_extra_lines(self) -> list[str]:
"""
Return a copy of `self.extra_lines`, where lines starting with
Expand Down
21 changes: 18 additions & 3 deletions fractal_server/app/runner/executors/slurm/ssh/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,15 +869,30 @@

# Submit job to SLURM, and get jobid
sbatch_command = f"sbatch --parsable {job.slurm_script_remote}"
sbatch_stdout = self.fractal_ssh.run_command(
cmd=sbatch_command,
)
pre_submission_cmds = job.slurm_config.pre_submission_cmds
if len(pre_submission_cmds) == 0:
sbatch_stdout = self.fractal_ssh.run_command(cmd=sbatch_command)
else:
logger.warning(f"Now using {pre_submission_cmds=}")
script_lines = pre_submission_cmds + [sbatch_command]
script_content = "\n".join(script_lines)
script_path_remote = (
f"{job.slurm_script_remote.as_posix()}_wrapper.sh"
)
with self.fractal_ssh.sftp().open(
filename=script_path_remote, mode="w"
) as f:
f.write(script_content)
cmd = f"bash {script_path_remote}"
sbatch_stdout = self.fractal_ssh.run_command(cmd=cmd)

# Extract SLURM job ID from stdout
try:
stdout = sbatch_stdout.strip("\n")
jobid = int(stdout)
except ValueError as e:
if len(pre_submission_cmds) > 0:
logger.warning(f"I used {pre_submission_cmds=}")

Check notice on line 895 in fractal_server/app/runner/executors/slurm/ssh/executor.py

View workflow job for this annotation

GitHub Actions / Coverage

Missing coverage

Missing coverage on lines 894-895
error_msg = (
f"Submit command `{sbatch_command}` returned "
f"`{stdout=}` which cannot be cast to an integer "
Expand Down
2 changes: 1 addition & 1 deletion fractal_server/app/runner/v2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
from ._local_experimental import (
process_workflow as local_experimental_process_workflow,
)
from ._slurm import process_workflow as slurm_sudo_process_workflow
from ._slurm_ssh import process_workflow as slurm_ssh_process_workflow
from ._slurm_sudo import process_workflow as slurm_sudo_process_workflow
from .handle_failed_job import assemble_filters_failed_job
from .handle_failed_job import assemble_history_failed_job
from .handle_failed_job import assemble_images_failed_job
Expand Down
182 changes: 0 additions & 182 deletions fractal_server/app/runner/v2/_slurm/get_slurm_config.py

This file was deleted.

Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

def get_slurm_config(
wftask: WorkflowTaskV2,
workflow_dir_local: Path,
workflow_dir_remote: Path,
which_type: Literal["non_parallel", "parallel"],
config_path: Optional[Path] = None,
) -> SlurmConfig:
Expand All @@ -43,13 +41,6 @@ def get_slurm_config(
wftask:
WorkflowTask for which the SLURM configuration is is to be
prepared.
workflow_dir_local:
Server-owned directory to store all task-execution-related relevant
files (inputs, outputs, errors, and all meta files related to the
job execution). Note: users cannot write directly to this folder.
workflow_dir_remote:
User-side directory with the same scope as `workflow_dir_local`,
and where a user can write.
config_path:
Path of a Fractal SLURM configuration file; if `None`, use
`FRACTAL_SLURM_CONFIG_FILE` variable from settings.
Expand Down Expand Up @@ -99,13 +90,13 @@ def get_slurm_config(
# 1. This block of definitions takes priority over other definitions from
# slurm_env which are not under the `needs_gpu` subgroup
# 2. This block of definitions has lower priority than whatever comes next
# (i.e. from WorkflowTask.meta).
# (i.e. from WorkflowTask.meta_parallel).
if wftask_meta is not None:
needs_gpu = wftask_meta.get("needs_gpu", False)
else:
needs_gpu = False
logger.debug(f"[get_slurm_config] {needs_gpu=}")
if needs_gpu and slurm_env.gpu_slurm_config is not None: # FIXME
if needs_gpu is not None:
for key, value in slurm_env.gpu_slurm_config.dict(
exclude_unset=True, exclude={"mem"}
).items():
Expand Down
6 changes: 3 additions & 3 deletions fractal_server/app/runner/v2/_slurm_ssh/_submit_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
from typing import Literal

from ...task_files import get_task_file_paths
from .get_slurm_config import get_slurm_config
from fractal_server.app.models.v2 import WorkflowTaskV2
from fractal_server.app.runner.v2._slurm_common.get_slurm_config import (
get_slurm_config,
)


def _slurm_submit_setup(
Expand Down Expand Up @@ -62,8 +64,6 @@ def _slurm_submit_setup(
# Get SlurmConfig object
slurm_config = get_slurm_config(
wftask=wftask,
workflow_dir_local=workflow_dir_local,
workflow_dir_remote=workflow_dir_remote,
which_type=which_type,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
from typing import Literal

from ...task_files import get_task_file_paths
from .get_slurm_config import get_slurm_config
from fractal_server.app.models.v2 import WorkflowTaskV2
from fractal_server.app.runner.v2._slurm_common.get_slurm_config import (
get_slurm_config,
)


def _slurm_submit_setup(
Expand Down Expand Up @@ -62,8 +64,6 @@ def _slurm_submit_setup(
# Get SlurmConfig object
slurm_config = get_slurm_config(
wftask=wftask,
workflow_dir_local=workflow_dir_local,
workflow_dir_remote=workflow_dir_remote,
which_type=which_type,
)

Expand Down
20 changes: 20 additions & 0 deletions tests/v2/00_ssh/test_FractalSSH.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,23 @@ def test_remove_folder_input_validation():
safe_root="/actual_root",
)
print(e.value)


def test_run_command_session_persistence(
fractal_ssh: FractalSSH, tmp777_path: Path
):
"""
FIXME add explanation
"""

script1 = tmp777_path / "set_var.sh"
with script1.open("w") as f:
f.write("export MYVAR=123\necho $MYVAR")
script2 = tmp777_path / "read_var.sh"
with script2.open("w") as f:
f.write("echo $MYVAR")

stdout = fractal_ssh.run_command(cmd=f"bash {script1.as_posix()}")
tcompa marked this conversation as resolved.
Show resolved Hide resolved
assert stdout.strip("\n") == "123"
stdout = fractal_ssh.run_command(cmd=f"bash {script2.as_posix()}")
assert stdout.strip("\n") == ""
Loading
Loading