Skip to content

Commit

Permalink
Merge branch 'main' into 1573-replace-slurm-docker-image-for-ci
Browse files Browse the repository at this point in the history
  • Loading branch information
tcompa committed Jul 18, 2024
2 parents 44b632c + 9ca3fe9 commit a9009a2
Show file tree
Hide file tree
Showing 15 changed files with 546 additions and 134 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
strategy:
matrix:
python-version: ["3.9", "3.10"]
db: ["postgres", "postgres-psycopg"]
db: ["postgres"]

services:
postgres:
Expand Down
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
**Note**: Numbers like (\#1234) point to closed Pull Requests on the fractal-server repository.

# 2.3.4

* SSH SLURM runner:
* Refactor `compress_folder` and `extract_archive` modules, and stop using `tarfile` library (\#1641).
* API:
* Introduce `FRACTAL_API_V1_MODE=include_without_submission` to include V1 API but forbid job submission (\#1664).
* Testing:
* Do not test V1 API with `DB_ENGINE="postgres-psycopg"` (\#1667).

# 2.3.3

This release fixes a SSH-task-collection bug introduced in version 2.3.1.

* API:
* Expose new superuser-restricted endpoint `GET /api/settings/` (\#1662).
* SLURM runner:
* Make `FRACTAL_SLURM_SBATCH_SLEEP` configuration variable `float` (\#1658).
* SSH features:
Expand Down
9 changes: 9 additions & 0 deletions fractal_server/app/routes/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
`api` module
"""
from fastapi import APIRouter
from fastapi import Depends

from ....config import get_settings
from ....syringe import Inject
from ...models.security import UserOAuth
from ...security import current_active_superuser


router_api = APIRouter()
Expand All @@ -17,3 +20,9 @@ async def alive():
alive=True,
version=settings.PROJECT_VERSION,
)


@router_api.get("/settings/")
async def view_settings(user: UserOAuth = Depends(current_active_superuser)):
settings = Inject(get_settings)
return settings.get_sanitized()
11 changes: 10 additions & 1 deletion fractal_server/app/routes/api/v1/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,18 @@ async def apply_workflow(
db: AsyncSession = Depends(get_async_db),
) -> Optional[ApplyWorkflowReadV1]:

settings = Inject(get_settings)
if settings.FRACTAL_API_V1_MODE == "include_without_submission":
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
"Legacy API is still accessible, "
"but the submission of legacy jobs is not available."
),
)

# Remove non-submitted V1 jobs from the app state when the list grows
# beyond a threshold
settings = Inject(get_settings)
if (
len(request.app.state.jobsV1)
> settings.FRACTAL_API_MAX_JOB_LIST_LENGTH
Expand Down
210 changes: 111 additions & 99 deletions fractal_server/app/runner/compress_folder.py
Original file line number Diff line number Diff line change
@@ -1,120 +1,132 @@
import shlex
import subprocess # nosec
"""
Wrap `tar` compression command.
This module is used both locally (in the environment where `fractal-server`
is running) and remotely (as a standalon Python module, executed over SSH).
This is a twin-module of `extract_archive.py`.
The reason for using the `tar` command via `subprocess` rather than Python
built-in `tarfile` library has to do with performance issues we observed
when handling files which were just created within a SLURM job, and in the
context of a CephFS filesystem.
"""
import shutil
import sys
import tarfile
import time
from pathlib import Path
from typing import Optional

from fractal_server.app.runner.run_subprocess import run_subprocess
from fractal_server.logger import get_logger
from fractal_server.logger import set_logger


def copy_subfolder(src: Path, dest: Path, logger_name: str):
cmd_cp = f"cp -r {src.as_posix()} {dest.as_posix()}"
logger = get_logger(logger_name=logger_name)
logger.debug(f"{cmd_cp=}")
res = run_subprocess(cmd=cmd_cp, logger_name=logger_name)
return res


def create_tar_archive(
tarfile_path: Path,
subfolder_path_tmp_copy: Path,
logger_name: str,
remote_to_local: bool,
):
logger = get_logger(logger_name)

if remote_to_local:
exclude_options = "--exclude *sbatch --exclude *_in_*.pickle "
else:
exclude_options = ""

cmd_tar = (
f"tar czf {tarfile_path} "
f"{exclude_options} "
f"--directory={subfolder_path_tmp_copy.as_posix()} "
"."
)
logger.debug(f"cmd tar:\n{cmd_tar}")
run_subprocess(cmd=cmd_tar, logger_name=logger_name)

# COMPRESS_FOLDER_MODALITY = "python"
COMPRESS_FOLDER_MODALITY = "cp-tar-rmtree"

def remove_temp_subfolder(subfolder_path_tmp_copy: Path, logger_name: str):
logger = get_logger(logger_name)
try:
logger.debug(f"Now remove {subfolder_path_tmp_copy}")
shutil.rmtree(subfolder_path_tmp_copy)
except Exception as e:
logger.debug(f"ERROR during shutil.rmtree: {e}")

def _filter(info: tarfile.TarInfo) -> Optional[tarfile.TarInfo]:
if info.name.endswith(".pickle"):
filename = info.name.split("/")[-1]
parts = filename.split("_")
if len(parts) == 3 and parts[1] == "in":
return None
elif len(parts) == 5 and parts[3] == "in":
return None
elif info.name.endswith("slurm_submit.sbatch"):
return None
return info

def compress_folder(
subfolder_path: Path, remote_to_local: bool = False
) -> str:
"""
Compress e.g. `/path/archive` into `/path/archive.tar.gz`
if __name__ == "__main__":
help_msg = (
"Expected use:\n"
"python -m fractal_server.app.runner.compress_folder "
"path/to/folder"
)
Note that `/path/archive.tar.gz` may already exist. In this case, it will
be overwritten.
if len(sys.argv[1:]) != 1:
raise ValueError(
"Invalid argument(s).\n" f"{help_msg}\n" f"Provided: {sys.argv=}"
)
Args:
subfolder_path: Absolute path to the folder to compress.
remote_to_local: If `True`, exclude some files from the tar.gz archive.
Returns:
Absolute path to the tar.gz archive.
"""

subfolder_path = Path(sys.argv[1])
t_0 = time.perf_counter()
print("[compress_folder.py] START")
print(f"[compress_folder.py] {COMPRESS_FOLDER_MODALITY=}")
print(f"[compress_folder.py] {subfolder_path=}")
logger_name = "compress_folder"
logger = set_logger(logger_name)

job_folder = subfolder_path.parent
logger.debug("START")
logger.debug(f"{subfolder_path=}")
parent_dir = subfolder_path.parent
subfolder_name = subfolder_path.name
tarfile_path = (job_folder / f"{subfolder_name}.tar.gz").as_posix()
print(f"[compress_folder.py] {tarfile_path=}")

if COMPRESS_FOLDER_MODALITY == "python":
raise NotImplementedError()
with tarfile.open(tarfile_path, "w:gz") as tar:
tar.add(
subfolder_path,
arcname=".", # ????
recursive=True,
filter=_filter,
)
elif COMPRESS_FOLDER_MODALITY == "cp-tar-rmtree":
import shutil
import time

subfolder_path_tmp_copy = (
subfolder_path.parent / f"{subfolder_path.name}_copy"
)
tarfile_path = (parent_dir / f"{subfolder_name}.tar.gz").as_posix()
logger.debug(f"{tarfile_path=}")

t0 = time.perf_counter()
# shutil.copytree(subfolder_path, subfolder_path_tmp_copy)
cmd_cp = (
"cp -r "
f"{subfolder_path.as_posix()} "
f"{subfolder_path_tmp_copy.as_posix()}"
)
res = subprocess.run( # nosec
shlex.split(cmd_cp),
check=True,
capture_output=True,
encoding="utf-8",
subfolder_path_tmp_copy = (
subfolder_path.parent / f"{subfolder_path.name}_copy"
)
try:
copy_subfolder(
subfolder_path, subfolder_path_tmp_copy, logger_name=logger_name
)
t1 = time.perf_counter()
print("[compress_folder.py] `cp -r` END - " f"elapsed: {t1-t0:.3f} s")

cmd_tar = (
"tar czf "
f"{tarfile_path} "
"--exclude *sbatch --exclude *_in_*.pickle "
f"--directory={subfolder_path_tmp_copy.as_posix()} "
"."
create_tar_archive(
tarfile_path,
subfolder_path_tmp_copy,
logger_name=logger_name,
remote_to_local=remote_to_local,
)
return tarfile_path

print(f"[compress_folder.py] cmd tar:\n{cmd_tar}")
t0 = time.perf_counter()
res = subprocess.run( # nosec
shlex.split(cmd_tar),
capture_output=True,
encoding="utf-8",
)
t1 = time.perf_counter()
t_1 = time.perf_counter()
print(f"[compress_folder.py] tar END - elapsed: {t1-t0:.3f} s")
except Exception as e:
logger.debug(f"ERROR: {e}")
sys.exit(1)

print(f"[compress_folder] END - elapsed {t_1 - t_0:.3f} seconds")
finally:
remove_temp_subfolder(subfolder_path_tmp_copy, logger_name=logger_name)

if res.returncode != 0:
print("[compress_folder.py] ERROR in tar")
print(f"[compress_folder.py] tar stdout:\n{res.stdout}")
print(f"[compress_folder.py] tar stderr:\n{res.stderr}")

shutil.rmtree(subfolder_path_tmp_copy)
sys.exit(1)
def main(sys_argv: list[str]):

help_msg = (
"Expected use:\n"
"python -m fractal_server.app.runner.compress_folder "
"path/to/folder [--remote-to-local]\n"
)
num_args = len(sys_argv[1:])
if num_args == 0:
sys.exit(f"Invalid argument.\n{help_msg}\nProvided: {sys_argv[1:]=}")
elif num_args == 1:
compress_folder(subfolder_path=Path(sys_argv[1]))
elif num_args == 2 and sys_argv[2] == "--remote-to-local":
compress_folder(subfolder_path=Path(sys_argv[1]), remote_to_local=True)
else:
sys.exit(f"Invalid argument.\n{help_msg}\nProvided: {sys_argv[1:]=}")

t0 = time.perf_counter()
shutil.rmtree(subfolder_path_tmp_copy)
t1 = time.perf_counter()
print(
f"[compress_folder.py] shutil.rmtree END - elapsed: {t1-t0:.3f} s"
)

t_1 = time.perf_counter()
print(f"[compress_folder] END - elapsed {t_1 - t_0:.3f} seconds")
if __name__ == "__main__":
main(sys.argv)
20 changes: 8 additions & 12 deletions fractal_server/app/runner/executors/slurm/ssh/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import json
import math
import sys
import tarfile
import threading
import time
from concurrent.futures import Future
Expand All @@ -38,9 +37,11 @@
from .._batching import heuristics
from ._executor_wait_thread import FractalSlurmWaitThread
from fractal_server.app.runner.components import _COMPONENT_KEY_
from fractal_server.app.runner.compress_folder import compress_folder
from fractal_server.app.runner.exceptions import JobExecutionError
from fractal_server.app.runner.exceptions import TaskExecutionError
from fractal_server.app.runner.executors.slurm.ssh._slurm_job import SlurmJob
from fractal_server.app.runner.extract_archive import extract_archive
from fractal_server.config import get_settings
from fractal_server.logger import set_logger
from fractal_server.ssh._fabric import FractalSSH
Expand Down Expand Up @@ -822,17 +823,12 @@ def _put_subfolder_sftp(self, jobs: list[SlurmJob]) -> None:

# Create compressed subfolder archive (locally)
local_subfolder = self.workflow_dir_local / subfolder_name
tarfile_name = f"{subfolder_name}.tar.gz"
tarfile_path_local = (
self.workflow_dir_local / tarfile_name
).as_posix()
tarfile_path_local = compress_folder(local_subfolder)
tarfile_name = Path(tarfile_path_local).name
logger.info(f"Subfolder archive created at {tarfile_path_local}")
tarfile_path_remote = (
self.workflow_dir_remote / tarfile_name
).as_posix()
with tarfile.open(tarfile_path_local, "w:gz") as tar:
for this_file in local_subfolder.glob("*"):
tar.add(this_file, arcname=this_file.name)
logger.info(f"Subfolder archive created at {tarfile_path_local}")

# Transfer archive
t_0_put = time.perf_counter()
Expand Down Expand Up @@ -1222,7 +1218,8 @@ def _get_subfolder_sftp(self, jobs: list[SlurmJob]) -> None:
tar_command = (
f"{self.python_remote} "
"-m fractal_server.app.runner.compress_folder "
f"{(self.workflow_dir_remote / subfolder_name).as_posix()}"
f"{(self.workflow_dir_remote / subfolder_name).as_posix()} "
"--remote-to-local"
)
stdout = self.fractal_ssh.run_command(cmd=tar_command)
print(stdout)
Expand All @@ -1240,8 +1237,7 @@ def _get_subfolder_sftp(self, jobs: list[SlurmJob]) -> None:
)

# Extract tarfile locally
with tarfile.open(tarfile_path_local) as tar:
tar.extractall(path=(self.workflow_dir_local / subfolder_name))
extract_archive(Path(tarfile_path_local))

t_1 = time.perf_counter()
logger.info("[_get_subfolder_sftp] End - " f"elapsed: {t_1-t_0:.3f} s")
Expand Down
Loading

0 comments on commit a9009a2

Please sign in to comment.