Skip to content

Commit

Permalink
Merge pull request #1797 from fractal-analytics-platform/propagate-ss…
Browse files Browse the repository at this point in the history
…h-settings

Propagate user's settings into SSH background tasks
  • Loading branch information
ychiucco committed Sep 24, 2024
2 parents 5c25412 + 9c3b95e commit 5a51d26
Show file tree
Hide file tree
Showing 12 changed files with 52 additions and 70 deletions.
2 changes: 1 addition & 1 deletion docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ an incremental migration script, as in
```
$ export SQLITE_PATH=some-test.db
$ rm some-test.db
$ poetry run fractalctl set-db
$ poetry run fractalctl set-db --skip-init-data
$ poetry run alembic revision --autogenerate -m "Some migration message"
# UserWarning: SQLite is partially supported but discouraged in production environment.SQLite offers partial support for ForeignKey constraints. As such, consistency of the database cannot be guaranteed.
Expand Down
13 changes: 8 additions & 5 deletions fractal_server/app/routes/api/v2/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@ async def apply_workflow(
)
elif FRACTAL_RUNNER_BACKEND == "slurm_ssh":
WORKFLOW_DIR_REMOTE = (
Path(settings.FRACTAL_SLURM_SSH_WORKING_BASE_DIR)
/ f"{WORKFLOW_DIR_LOCAL.name}"
Path(user_settings.ssh_jobs_dir) / f"{WORKFLOW_DIR_LOCAL.name}"
)

# Update job folders in the db
Expand All @@ -234,20 +233,24 @@ async def apply_workflow(
# User appropriate FractalSSH object
if settings.FRACTAL_RUNNER_BACKEND == "slurm_ssh":
ssh_credentials = dict(
user=settings.FRACTAL_SLURM_SSH_USER,
host=settings.FRACTAL_SLURM_SSH_HOST,
key_path=settings.FRACTAL_SLURM_SSH_PRIVATE_KEY_PATH,
user=user_settings.ssh_username,
host=user_settings.ssh_host,
key_path=user_settings.ssh_private_key_path,
)
fractal_ssh_list = request.app.state.fractal_ssh_list
fractal_ssh = fractal_ssh_list.get(**ssh_credentials)
else:
fractal_ssh = None

# Expunge user settings from db, to use in background task
db.expunge(user_settings)

background_tasks.add_task(
submit_workflow,
workflow_id=workflow.id,
dataset_id=dataset.id,
job_id=job.id,
user_settings=user_settings,
worker_init=job.worker_init,
slurm_user=user_settings.slurm_user,
user_cache_dir=user_settings.cache_dir,
Expand Down
9 changes: 5 additions & 4 deletions fractal_server/app/routes/api/v2/task_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ async def collect_tasks_pip(
)

# Validate user settings (backend-specific)
await validate_user_settings(
user_settings = await validate_user_settings(
user=user, backend=settings.FRACTAL_RUNNER_BACKEND, db=db
)

Expand All @@ -131,9 +131,9 @@ async def collect_tasks_pip(

# User appropriate FractalSSH object
ssh_credentials = dict(
user=settings.FRACTAL_SLURM_SSH_USER,
host=settings.FRACTAL_SLURM_SSH_HOST,
key_path=settings.FRACTAL_SLURM_SSH_PRIVATE_KEY_PATH,
user=user_settings.ssh_username,
host=user_settings.ssh_host,
key_path=user_settings.ssh_private_key_path,
)
fractal_ssh_list = request.app.state.fractal_ssh_list
fractal_ssh = fractal_ssh_list.get(**ssh_credentials)
Expand All @@ -143,6 +143,7 @@ async def collect_tasks_pip(
state.id,
task_pkg,
fractal_ssh,
user_settings.ssh_tasks_dir,
)

response.status_code = status.HTTP_201_CREATED
Expand Down
12 changes: 5 additions & 7 deletions fractal_server/app/runner/v2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from .handle_failed_job import assemble_history_failed_job
from .handle_failed_job import assemble_images_failed_job
from fractal_server import __VERSION__
from fractal_server.app.models import UserSettings

_backends = {}
_backends["local"] = local_process_workflow
Expand Down Expand Up @@ -76,6 +77,7 @@ async def submit_workflow(
workflow_id: int,
dataset_id: int,
job_id: int,
user_settings: UserSettings,
worker_init: Optional[str] = None,
slurm_user: Optional[str] = None,
user_cache_dir: Optional[str] = None,
Expand Down Expand Up @@ -196,8 +198,7 @@ async def submit_workflow(
elif FRACTAL_RUNNER_BACKEND == "slurm_ssh":
# Folder creation is deferred to _process_workflow
WORKFLOW_DIR_REMOTE = (
Path(settings.FRACTAL_SLURM_SSH_WORKING_BASE_DIR)
/ WORKFLOW_DIR_LOCAL.name
Path(user_settings.ssh_jobs_dir) / WORKFLOW_DIR_LOCAL.name
)
else:
logger.error(
Expand Down Expand Up @@ -270,11 +271,8 @@ async def submit_workflow(
logger.debug(f"slurm_account: {job.slurm_account}")
logger.debug(f"worker_init: {worker_init}")
elif FRACTAL_RUNNER_BACKEND == "slurm_ssh":
logger.debug(f"ssh_host: {settings.FRACTAL_SLURM_SSH_HOST}")
logger.debug(f"ssh_user: {settings.FRACTAL_SLURM_SSH_USER}")
logger.debug(
f"base dir: {settings.FRACTAL_SLURM_SSH_WORKING_BASE_DIR}"
)
logger.debug(f"ssh_user: {user_settings.ssh_username}")
logger.debug(f"base dir: {user_settings.ssh_tasks_dir}")
logger.debug(f"worker_init: {worker_init}")
logger.debug(f"job.id: {job.id}")
logger.debug(f"job.working_dir: {job.working_dir}")
Expand Down
16 changes: 0 additions & 16 deletions fractal_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,22 +631,6 @@ def check_runner(self) -> None:
raise FractalConfigurationError(
f"Must set FRACTAL_SLURM_WORKER_PYTHON when {info}"
)
if self.FRACTAL_SLURM_SSH_USER is None:
raise FractalConfigurationError(
f"Must set FRACTAL_SLURM_SSH_USER when {info}"
)
if self.FRACTAL_SLURM_SSH_HOST is None:
raise FractalConfigurationError(
f"Must set FRACTAL_SLURM_SSH_HOST when {info}"
)
if self.FRACTAL_SLURM_SSH_PRIVATE_KEY_PATH is None:
raise FractalConfigurationError(
f"Must set FRACTAL_SLURM_SSH_PRIVATE_KEY_PATH when {info}"
)
if self.FRACTAL_SLURM_SSH_WORKING_BASE_DIR is None:
raise FractalConfigurationError(
f"Must set FRACTAL_SLURM_SSH_WORKING_BASE_DIR when {info}"
)

from fractal_server.app.runner.executors.slurm._slurm_config import ( # noqa: E501
load_slurm_config_file,
Expand Down
19 changes: 14 additions & 5 deletions fractal_server/tasks/v2/background_operations_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def _customize_and_run_template(
tmpdir: str,
logger_name: str,
fractal_ssh: FractalSSH,
tasks_base_dir: str,
) -> str:
"""
Customize one of the template bash scripts, transfer it to the remote host
Expand All @@ -72,7 +73,6 @@ def _customize_and_run_template(
"""
logger = get_logger(logger_name)
logger.debug(f"_customize_and_run_template {script_filename} - START")
settings = Inject(get_settings)

# Read template
template_path = templates_folder / script_filename
Expand All @@ -88,7 +88,7 @@ def _customize_and_run_template(

# Transfer script to remote host
script_path_remote = os.path.join(
settings.FRACTAL_SLURM_SSH_WORKING_BASE_DIR,
tasks_base_dir,
f"script_{abs(hash(tmpdir))}{script_filename}",
)
logger.debug(f"Now transfer {script_path_local=} over SSH.")
Expand All @@ -111,6 +111,7 @@ def background_collect_pip_ssh(
state_id: int,
task_pkg: _TaskCollectPip,
fractal_ssh: FractalSSH,
tasks_base_dir: str,
) -> None:
"""
Collect a task package over SSH
Expand All @@ -121,6 +122,13 @@ def background_collect_pip_ssh(
NOTE: by making this function sync, it will run within a thread - due to
starlette/fastapi handling of background tasks (see
https://github.com/encode/starlette/blob/master/starlette/background.py).
Arguments:
state_id:
task_pkg:
fractal_ssh:
tasks_base_dir:
"""

# Work within a temporary folder, where also logs will be placed
Expand All @@ -140,7 +148,6 @@ def background_collect_pip_ssh(
with next(get_sync_db()) as db:
try:
# Prepare replacements for task-collection scripts
settings = Inject(get_settings)
python_bin = get_python_interpreter_v2(
python_version=task_pkg.python_version
)
Expand All @@ -163,11 +170,12 @@ def background_collect_pip_ssh(
f"{install_string}=={task_pkg.package_version}"
)
package_env_dir = (
Path(settings.FRACTAL_SLURM_SSH_WORKING_BASE_DIR)
Path(tasks_base_dir)
/ ".fractal"
/ f"{task_pkg.package_name}{package_version}"
).as_posix()
logger.debug(f"{package_env_dir=}")
settings = Inject(get_settings)
replacements = [
("__PACKAGE_NAME__", task_pkg.package_name),
("__PACKAGE_ENV_DIR__", package_env_dir),
Expand All @@ -186,6 +194,7 @@ def background_collect_pip_ssh(
tmpdir=tmpdir,
logger_name=LOGGER_NAME,
fractal_ssh=fractal_ssh,
tasks_base_dir=tasks_base_dir,
)

fractal_ssh.check_connection()
Expand Down Expand Up @@ -332,7 +341,7 @@ def background_collect_pip_ssh(
)
fractal_ssh.remove_folder(
folder=package_env_dir,
safe_root=settings.FRACTAL_SLURM_SSH_WORKING_BASE_DIR, # noqa: E501
safe_root=tasks_base_dir,
)
logger.info(
f"Deleted remoted folder {package_env_dir}"
Expand Down
7 changes: 0 additions & 7 deletions tests/no_version/test_unit_lifespan.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from fractal_server.main import lifespan
from fractal_server.ssh._fabric import FractalSSHList
from fractal_server.syringe import Inject
from tests.fixtures_slurm import SLURM_USER


async def test_app_with_lifespan(
Expand Down Expand Up @@ -196,12 +195,6 @@ async def test_lifespan_slurm_ssh(
override_settings_factory(
FRACTAL_RUNNER_BACKEND="slurm_ssh",
FRACTAL_SLURM_WORKER_PYTHON="/not/relevant",
FRACTAL_SLURM_SSH_HOST=slurmlogin_ip,
FRACTAL_SLURM_SSH_USER=SLURM_USER,
FRACTAL_SLURM_SSH_PRIVATE_KEY_PATH=ssh_keys["private"],
FRACTAL_SLURM_SSH_WORKING_BASE_DIR=(
tmp777_path / "artifacts"
).as_posix(),
FRACTAL_SLURM_CONFIG_FILE=testdata_path / "slurm_config.json",
)
app = FastAPI()
Expand Down
5 changes: 3 additions & 2 deletions tests/v2/00_ssh/test_task_collection_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ async def test_task_collection_ssh(
setting_overrides = {
"FRACTAL_SLURM_WORKER_PYTHON": f"/usr/bin/python{current_py_version}",
PY_KEY: f"/usr/bin/python{current_py_version}",
"FRACTAL_SLURM_SSH_WORKING_BASE_DIR": remote_basedir,
"FRACTAL_MAX_PIP_VERSION": CURRENT_FRACTAL_MAX_PIP_VERSION,
}
override_settings_factory(**setting_overrides)
Expand All @@ -52,6 +51,7 @@ async def test_task_collection_ssh(
state_id=state.id,
task_pkg=task_pkg,
fractal_ssh=fractal_ssh,
tasks_base_dir=remote_basedir,
)
await db.refresh(state)
debug(state)
Expand Down Expand Up @@ -80,6 +80,7 @@ async def test_task_collection_ssh(
state_id=state.id,
task_pkg=task_pkg,
fractal_ssh=fractal_ssh,
tasks_base_dir=remote_basedir,
)

# Check that the second collection failed, since folder already exists
Expand Down Expand Up @@ -112,7 +113,6 @@ async def test_task_collection_ssh_failure(
setting_overrides = {
"FRACTAL_SLURM_WORKER_PYTHON": f"/usr/bin/python{current_py_version}",
PY_KEY: f"/usr/bin/python{current_py_version}",
"FRACTAL_SLURM_SSH_WORKING_BASE_DIR": remote_basedir,
}
override_settings_factory(**setting_overrides)

Expand All @@ -131,6 +131,7 @@ async def test_task_collection_ssh_failure(
state_id=state.id,
task_pkg=task_pkg,
fractal_ssh=fractal_ssh,
tasks_base_dir=remote_basedir,
)

await db.refresh(state)
Expand Down
16 changes: 6 additions & 10 deletions tests/v2/03_api/test_api_task_collection_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ async def test_task_collection_ssh_from_pypi(
fractal_ssh = fractal_ssh_list.get(**credentials)

# Define and create remote working directory
WORKING_BASE_DIR = (tmp777_path / "working_dir").as_posix()
fractal_ssh.mkdir(folder=WORKING_BASE_DIR)
TASKS_BASE_DIR = (tmp777_path / "tasks").as_posix()
fractal_ssh.mkdir(folder=TASKS_BASE_DIR)

# Assign FractalSSH object to app state
app.state.fractal_ssh_list = fractal_ssh_list
Expand All @@ -46,19 +46,15 @@ async def test_task_collection_ssh_from_pypi(
"FRACTAL_TASKS_PYTHON_DEFAULT_VERSION": current_py_version,
PY_KEY: f"/usr/bin/python{current_py_version}",
"FRACTAL_RUNNER_BACKEND": "slurm_ssh",
"FRACTAL_SLURM_SSH_HOST": slurmlogin_ip,
"FRACTAL_SLURM_SSH_USER": SLURM_USER,
"FRACTAL_SLURM_SSH_PRIVATE_KEY_PATH": ssh_keys["private"],
"FRACTAL_SLURM_SSH_WORKING_BASE_DIR": WORKING_BASE_DIR,
}
override_settings_factory(**settings_overrides)

user_settings_dict = dict(
ssh_host=slurmlogin_ip,
ssh_username=SLURM_USER,
ssh_private_key_path=ssh_keys["private"],
ssh_tasks_dir=(tmp777_path / "tasks").as_posix(),
ssh_jobs_dir=(tmp777_path / "artifacts").as_posix(),
ssh_tasks_dir=TASKS_BASE_DIR,
ssh_jobs_dir=(tmp777_path / "jobs").as_posix(),
)

async with MockCurrentUser(
Expand Down Expand Up @@ -89,7 +85,7 @@ async def test_task_collection_ssh_from_pypi(
assert data["status"] == CollectionStatusV2.OK
assert f"fractal-tasks-core=={PACKAGE_VERSION}" in data["freeze"]
remote_folder = (
Path(WORKING_BASE_DIR)
Path(TASKS_BASE_DIR)
/ ".fractal"
/ f"fractal-tasks-core{PACKAGE_VERSION}"
).as_posix()
Expand Down Expand Up @@ -119,7 +115,7 @@ async def test_task_collection_ssh_from_pypi(
assert "No matching distribution found" in data["log"]
assert f"fractal-tasks-core=={PACKAGE_VERSION}" in data["log"]
remote_folder = (
Path(WORKING_BASE_DIR)
Path(TASKS_BASE_DIR)
/ ".fractal"
/ f"fractal-tasks-core{PACKAGE_VERSION}"
).as_posix()
Expand Down
7 changes: 6 additions & 1 deletion tests/v2/04_runner/test_background_task.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from devtools import debug

from fractal_server.app.models import UserSettings
from fractal_server.app.models.v2 import JobV2
from fractal_server.app.routes.api.v2._aux_functions import (
_workflow_insert_task,
Expand Down Expand Up @@ -45,7 +46,10 @@ async def test_submit_workflow_failure(
db.expunge_all()

await submit_workflow(
workflow_id=workflow.id, dataset_id=dataset.id, job_id=job.id
workflow_id=workflow.id,
dataset_id=dataset.id,
job_id=job.id,
user_settings=UserSettings(),
)

job = await db.get(JobV2, job.id)
Expand Down Expand Up @@ -88,6 +92,7 @@ async def test_mkdir_error(
dataset_id=dataset.id,
job_id=job.id,
user_cache_dir=(tmp_path / "xxx").as_posix(),
user_settings=UserSettings(),
)

await db.close()
Expand Down
4 changes: 4 additions & 0 deletions tests/v2/04_runner/test_unit_submit_workflow.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from fractal_server.app.models import UserSettings
from fractal_server.app.routes.api.v2._aux_functions import (
_workflow_insert_task,
)
Expand Down Expand Up @@ -29,6 +30,7 @@ async def test_fail_submit_workflows_wrong_IDs(
workflow_id=workflow.id,
dataset_id=dataset.id,
job_id=9999999,
user_settings=UserSettings(),
)

job = await job_factory_v2(
Expand All @@ -42,6 +44,7 @@ async def test_fail_submit_workflows_wrong_IDs(
workflow_id=9999999,
dataset_id=9999999,
job_id=job.id,
user_settings=UserSettings(),
)
await db.refresh(job)
assert job.status == JobStatusTypeV2.FAILED
Expand Down Expand Up @@ -80,6 +83,7 @@ async def test_fail_submit_workflows_wrong_backend(
workflow_id=workflow.id,
dataset_id=dataset.id,
job_id=job.id,
user_settings=UserSettings(),
)
await db.refresh(job)
assert "Invalid FRACTAL_RUNNER_BACKEND" in job.log
Loading

0 comments on commit 5a51d26

Please sign in to comment.