Skip to content

Commit

Permalink
Merge pull request #1795 from fractal-analytics-platform/1774-tmp-2
Browse files Browse the repository at this point in the history
Propagate settings to job-execution background task
  • Loading branch information
tcompa committed Sep 24, 2024
2 parents 441602d + 05b99ad commit cd79cec
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 75 deletions.
35 changes: 11 additions & 24 deletions fractal_server/app/routes/api/v1/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from ....schemas.v1 import ProjectCreateV1
from ....schemas.v1 import ProjectReadV1
from ....schemas.v1 import ProjectUpdateV1
from ...aux.validate_user_settings import validate_user_settings
from ._aux_functions import _check_project_exists
from ._aux_functions import _get_dataset_check_owner
from ._aux_functions import _get_project_check_owner
Expand Down Expand Up @@ -321,25 +322,11 @@ async def apply_workflow(
),
)

# If backend is SLURM, check that the user has required attributes
backend = settings.FRACTAL_RUNNER_BACKEND
if backend == "slurm":
if not user.slurm_user:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
f"FRACTAL_RUNNER_BACKEND={backend}, "
f"but {user.slurm_user=}."
),
)
if not user.cache_dir:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
f"FRACTAL_RUNNER_BACKEND={backend}, "
f"but {user.cache_dir=}."
),
)
# Validate user settings
FRACTAL_RUNNER_BACKEND = settings.FRACTAL_RUNNER_BACKEND
user_settings = await validate_user_settings(
user=user, backend=FRACTAL_RUNNER_BACKEND, db=db
)

# Check that datasets have the right number of resources
if not input_dataset.resource_list:
Expand Down Expand Up @@ -386,7 +373,7 @@ async def apply_workflow(
)

if apply_workflow.slurm_account is not None:
if apply_workflow.slurm_account not in user.slurm_accounts:
if apply_workflow.slurm_account not in user_settings.slurm_accounts:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
Expand All @@ -395,8 +382,8 @@ async def apply_workflow(
),
)
else:
if len(user.slurm_accounts) > 0:
apply_workflow.slurm_account = user.slurm_accounts[0]
if len(user_settings.slurm_accounts) > 0:
apply_workflow.slurm_account = user_settings.slurm_accounts[0]

# Add new ApplyWorkflow object to DB
job = ApplyWorkflow(
Expand Down Expand Up @@ -480,8 +467,8 @@ async def apply_workflow(
output_dataset_id=output_dataset.id,
job_id=job.id,
worker_init=apply_workflow.worker_init,
slurm_user=user.slurm_user,
user_cache_dir=user.cache_dir,
slurm_user=user_settings.slurm_user,
user_cache_dir=user_settings.cache_dir,
)
request.app.state.jobsV1.append(job.id)
logger.info(
Expand Down
32 changes: 9 additions & 23 deletions fractal_server/app/routes/api/v2/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,26 +110,12 @@ async def apply_workflow(
),
)

# Validate user settings (which will eventually replace the block below,
# where check required user attributes)
# Validate user settings
FRACTAL_RUNNER_BACKEND = settings.FRACTAL_RUNNER_BACKEND
await validate_user_settings(
user=user, backend=settings.FRACTAL_RUNNER_BACKEND, db=db
user_settings = await validate_user_settings(
user=user, backend=FRACTAL_RUNNER_BACKEND, db=db
)

# If backend is SLURM, check that the user has required attributes
if FRACTAL_RUNNER_BACKEND == "slurm":
if not user.slurm_user:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"{FRACTAL_RUNNER_BACKEND=}, but {user.slurm_user=}.",
)
if not user.cache_dir:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"{FRACTAL_RUNNER_BACKEND=}, but {user.cache_dir=}.",
)

# Check that no other job with the same dataset_id is SUBMITTED
stm = (
select(JobV2)
Expand All @@ -147,7 +133,7 @@ async def apply_workflow(
)

if job_create.slurm_account is not None:
if job_create.slurm_account not in user.slurm_accounts:
if job_create.slurm_account not in user_settings.slurm_accounts:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
Expand All @@ -156,8 +142,8 @@ async def apply_workflow(
),
)
else:
if len(user.slurm_accounts) > 0:
job_create.slurm_account = user.slurm_accounts[0]
if len(user_settings.slurm_accounts) > 0:
job_create.slurm_account = user_settings.slurm_accounts[0]

# Add new Job object to DB
job = JobV2(
Expand Down Expand Up @@ -231,7 +217,7 @@ async def apply_workflow(
WORKFLOW_DIR_REMOTE = WORKFLOW_DIR_LOCAL
elif FRACTAL_RUNNER_BACKEND == "slurm":
WORKFLOW_DIR_REMOTE = (
Path(user.cache_dir) / f"{WORKFLOW_DIR_LOCAL.name}"
Path(user_settings.cache_dir) / f"{WORKFLOW_DIR_LOCAL.name}"
)
elif FRACTAL_RUNNER_BACKEND == "slurm_ssh":
WORKFLOW_DIR_REMOTE = (
Expand All @@ -251,8 +237,8 @@ async def apply_workflow(
dataset_id=dataset.id,
job_id=job.id,
worker_init=job.worker_init,
slurm_user=user.slurm_user,
user_cache_dir=user.cache_dir,
slurm_user=user_settings.slurm_user,
user_cache_dir=user_settings.cache_dir,
fractal_ssh=request.app.state.fractal_ssh,
)
request.app.state.jobsV2.append(job.id)
Expand Down
16 changes: 10 additions & 6 deletions fractal_server/app/routes/aux/validate_user_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
from fractal_server.app.db import AsyncSession
from fractal_server.app.models import UserOAuth
from fractal_server.app.models import UserSettings
from fractal_server.app.routes.api.v2._aux_functions import logger
from fractal_server.logger import set_logger
from fractal_server.user_settings import SlurmSshUserSettings
from fractal_server.user_settings import SlurmSudoUserSettings

logger = set_logger(__name__)


async def validate_user_settings(
*, user: UserOAuth, backend: str, db: AsyncSession
):
) -> UserSettings:
"""
FIXME docstring
"""
Expand All @@ -28,14 +30,14 @@ async def validate_user_settings(
user_settings = await db.get(UserSettings, user.user_settings_id)

if backend == "slurm_ssh":
UserSettingsModel = SlurmSshUserSettings
UserSettingsValidationModel = SlurmSshUserSettings
elif backend == "slurm":
UserSettingsModel = SlurmSudoUserSettings
UserSettingsValidationModel = SlurmSudoUserSettings
else:
UserSettingsModel = BaseModel
UserSettingsValidationModel = BaseModel

try:
UserSettingsModel(**user_settings.model_dump())
UserSettingsValidationModel(**user_settings.model_dump())
except ValidationError as e:
error_msg = (
"User settings are not valid for "
Expand All @@ -47,3 +49,5 @@ async def validate_user_settings(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=error_msg,
)

return user_settings
10 changes: 6 additions & 4 deletions tests/v1/04_api/test_project_apply_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ async def test_project_apply_missing_user_attributes(
)
debug(res.json())
assert res.status_code == 422
assert "user.cache_dir=None" in res.json()["detail"]
assert "User settings are not valid" in res.json()["detail"]

user.cache_dir = "/tmp"
user.slurm_user = None
Expand All @@ -316,7 +316,7 @@ async def test_project_apply_missing_user_attributes(
)
debug(res.json())
assert res.status_code == 422
assert "user.slurm_user=None" in res.json()["detail"]
assert "User settings are not valid" in res.json()["detail"]


async def test_project_apply_missing_resources(
Expand Down Expand Up @@ -576,7 +576,8 @@ async def test_project_apply_slurm_account(

SLURM_LIST = ["foo", "bar", "rab", "oof"]
async with MockCurrentUser(
user_kwargs={"slurm_accounts": SLURM_LIST, "is_verified": True}
user_kwargs={"is_verified": True},
user_settings_dict={"slurm_accounts": SLURM_LIST},
) as user2:
project = await project_factory(user2)
dataset = await dataset_factory(
Expand All @@ -595,7 +596,7 @@ async def test_project_apply_slurm_account(
)

# User has a non empty SLURM accounts list
assert user2.slurm_accounts == SLURM_LIST
assert user2.settings.slurm_accounts == SLURM_LIST

# If no slurm_account is provided, we use the first one of the list

Expand All @@ -615,6 +616,7 @@ async def test_project_apply_slurm_account(
f"&output_dataset_id={dataset.id}",
json=dict(slurm_account=account),
)
debug(res.json())
assert res.status_code == 202
assert res.json()["slurm_account"] == account

Expand Down
47 changes: 31 additions & 16 deletions tests/v1/07_full_workflow/test_full_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from fractal_server.app.runner.filenames import WORKFLOW_LOG_FILENAME
from fractal_server.app.runner.v1 import _backends

from tests.fixtures_slurm import SLURM_USER

PREFIX = "/api/v1"

Expand Down Expand Up @@ -57,16 +57,19 @@ async def test_full_workflow(

debug(f"Testing with {backend=}")
user_kwargs = {"is_verified": True}
user_settings_dict = {}
if backend == "slurm":
request.getfixturevalue("monkey_slurm")
request.getfixturevalue("relink_python_interpreter_v1")
user_cache_dir = str(tmp777_path / f"user_cache_dir-{backend}")
from tests.fixtures_slurm import SLURM_USER

user_kwargs["cache_dir"] = user_cache_dir
user_kwargs["slurm_user"] = SLURM_USER
user_settings_dict["cache_dir"] = user_cache_dir
user_settings_dict["slurm_user"] = SLURM_USER

async with MockCurrentUser(user_kwargs=user_kwargs) as user:
async with MockCurrentUser(
user_kwargs=user_kwargs,
user_settings_dict=user_settings_dict,
) as user:
debug(user)

project = await project_factory(user)
Expand Down Expand Up @@ -266,13 +269,17 @@ async def test_failing_workflow_UnknownError(

debug(f"Testing with {backend=}")
user_kwargs = {"is_verified": True}
user_settings_dict = {}
if backend == "slurm":
request.getfixturevalue("monkey_slurm")
request.getfixturevalue("relink_python_interpreter_v1")
user_cache_dir = str(tmp777_path / f"user_cache_dir-{backend}")
user_kwargs["cache_dir"] = user_cache_dir
user_settings_dict["cache_dir"] = user_cache_dir
user_settings_dict["slurm_user"] = SLURM_USER

async with MockCurrentUser(user_kwargs=user_kwargs) as user:
async with MockCurrentUser(
user_kwargs=user_kwargs, user_settings_dict=user_settings_dict
) as user:
# Create project, dataset, resource
project = await project_factory(user)
project_id = project.id
Expand Down Expand Up @@ -361,13 +368,17 @@ async def test_failing_workflow_TaskExecutionError(

debug(f"Testing with {backend=}")
user_kwargs = {"is_verified": True}
user_settings_dict = {}
if backend == "slurm":
request.getfixturevalue("monkey_slurm")
request.getfixturevalue("relink_python_interpreter_v1")
user_cache_dir = str(tmp777_path / f"user_cache_dir-{backend}")
user_kwargs["cache_dir"] = user_cache_dir
user_settings_dict["cache_dir"] = user_cache_dir
user_settings_dict["slurm_user"] = SLURM_USER

async with MockCurrentUser(user_kwargs=user_kwargs) as user:
async with MockCurrentUser(
user_kwargs=user_kwargs, user_settings_dict=user_settings_dict
) as user:
# Create project, dataset, resource
project = await project_factory(user)
project_id = project.id
Expand Down Expand Up @@ -513,7 +524,6 @@ async def test_failing_workflow_JobExecutionError_slurm(
resource_factory,
tmp_path,
):
from tests.fixtures_slurm import SLURM_USER

override_settings_factory(
FRACTAL_RUNNER_BACKEND="slurm",
Expand All @@ -522,10 +532,11 @@ async def test_failing_workflow_JobExecutionError_slurm(
)

user_cache_dir = str(tmp777_path / "user_cache_dir")
user_kwargs = dict(
cache_dir=user_cache_dir, is_verified=True, slurm_user=SLURM_USER
)
async with MockCurrentUser(user_kwargs=user_kwargs) as user:
user_kwargs = dict(is_verified=True)
user_settings_dict = dict(cache_dir=user_cache_dir, slurm_user=SLURM_USER)
async with MockCurrentUser(
user_kwargs=user_kwargs, user_settings_dict=user_settings_dict
) as user:
project = await project_factory(user)
project_id = project.id
input_dataset = await dataset_factory(
Expand Down Expand Up @@ -904,13 +915,17 @@ async def test_non_executable_task_command(

debug(f"Testing with {backend=}")
user_kwargs = {"is_verified": True}
user_settings_dict = {}
if backend == "slurm":
request.getfixturevalue("monkey_slurm")
request.getfixturevalue("relink_python_interpreter_v1")
user_cache_dir = str(tmp777_path / f"user_cache_dir-{backend}")
user_kwargs["cache_dir"] = user_cache_dir
user_settings_dict["cache_dir"] = user_cache_dir
user_settings_dict["slurm_user"] = SLURM_USER

async with MockCurrentUser(user_kwargs=user_kwargs) as user:
async with MockCurrentUser(
user_kwargs=user_kwargs, user_settings_dict=user_settings_dict
) as user:
# Create task
task = await task_factory(
name="invalid-task-command",
Expand Down
5 changes: 3 additions & 2 deletions tests/v2/03_api/test_api_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,8 @@ async def test_project_apply_slurm_account(

SLURM_LIST = ["foo", "bar", "rab", "oof"]
async with MockCurrentUser(
user_kwargs={"slurm_accounts": SLURM_LIST, "is_verified": True}
user_kwargs={"is_verified": True},
user_settings_dict={"slurm_accounts": SLURM_LIST},
) as user2:
project = await project_factory_v2(user2)
dataset = await dataset_factory_v2(
Expand All @@ -423,7 +424,7 @@ async def test_project_apply_slurm_account(
)

# User has a non empty SLURM accounts list
assert user2.slurm_accounts == SLURM_LIST
assert user2.settings.slurm_accounts == SLURM_LIST

# If no slurm_account is provided, we use the first one of the list

Expand Down

0 comments on commit cd79cec

Please sign in to comment.