Skip to content

Commit

Permalink
Merge branch 'main' into 1622-use-new-modularized-task-collection-to-…
Browse files Browse the repository at this point in the history
…simplify-task-collection-in-tests
  • Loading branch information
ychiucco committed Jul 15, 2024
2 parents d3d0620 + 2bfcb56 commit 6d96a21
Show file tree
Hide file tree
Showing 34 changed files with 986 additions and 870 deletions.
24 changes: 22 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,30 @@
**Note**: Numbers like (\#1234) point to closed Pull Requests on the fractal-server repository.

# 2.3.1 (unreleased)
# 2.3.2

> **WARNING**: The remove-remote-venv-folder in the SSH task collection is broken (see issue 1633). Do not deploy this version in an SSH-based `fractal-server` instance.
* API:
* Fix incorrect zipping of structured job-log folders (\#1648).

# 2.3.1

This release includes a bugfix for task names with special characters.

> **WARNING**: The remove-remote-venv-folder in the SSH task collection is broken (see issue 1633). Do not deploy this version in an SSH-based `fractal-server` instance.
* Runner:
* Improve sanitization of subfolder names (commits from 3d89d6ba104d1c6f11812bc9de5cbdff25f81aa2 to 426fa3522cf2eef90d8bd2da3b2b8a5b646b9bf4).
* API:
* Improve error message when task-collection Python is not defined (\#1640).
* Use a single endpoint for standard and SSH task collection (\#1640).
* SSH features:
* Remove remote venv folder upon failed task collection in SSH mode (\#1634).
* Remove remote venv folder upon failed task collection in SSH mode (\#1634, \#1640).
* Refactor `FractalSSH` (\#1635).
* Set `fabric.Connection.forward_agent=False` (\#1639).
* Testing:
* Improved testing of SSH task-collection API (\#1640).
* Improved testing of `FractalSSH` methods (\#1635).
* Stop testing SQLite database for V1 in CI (\#1630).

# 2.3.0
Expand Down
2 changes: 1 addition & 1 deletion fractal_server/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__VERSION__ = "2.3.0"
__VERSION__ = "2.3.2"
4 changes: 1 addition & 3 deletions fractal_server/app/routes/admin/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,7 @@ async def download_job_logs(
# Create and return byte stream for zipped log folder
PREFIX_ZIP = Path(job.working_dir).name
zip_filename = f"{PREFIX_ZIP}_archive.zip"
byte_stream = _zip_folder_to_byte_stream(
folder=job.working_dir, zip_filename=zip_filename
)
byte_stream = _zip_folder_to_byte_stream(folder=job.working_dir)
return StreamingResponse(
iter([byte_stream.getvalue()]),
media_type="application/x-zip-compressed",
Expand Down
4 changes: 1 addition & 3 deletions fractal_server/app/routes/admin/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,7 @@ async def download_job_logs(
# Create and return byte stream for zipped log folder
PREFIX_ZIP = Path(job.working_dir).name
zip_filename = f"{PREFIX_ZIP}_archive.zip"
byte_stream = _zip_folder_to_byte_stream(
folder=job.working_dir, zip_filename=zip_filename
)
byte_stream = _zip_folder_to_byte_stream(folder=job.working_dir)
return StreamingResponse(
iter([byte_stream.getvalue()]),
media_type="application/x-zip-compressed",
Expand Down
4 changes: 1 addition & 3 deletions fractal_server/app/routes/api/v1/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,7 @@ async def download_job_logs(
# Create and return byte stream for zipped log folder
PREFIX_ZIP = Path(job.working_dir).name
zip_filename = f"{PREFIX_ZIP}_archive.zip"
byte_stream = _zip_folder_to_byte_stream(
folder=job.working_dir, zip_filename=zip_filename
)
byte_stream = _zip_folder_to_byte_stream(folder=job.working_dir)
return StreamingResponse(
iter([byte_stream.getvalue()]),
media_type="application/x-zip-compressed",
Expand Down
4 changes: 2 additions & 2 deletions fractal_server/app/routes/api/v1/task_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
from ....security import current_active_user
from ....security import current_active_verified_user
from ....security import User
from fractal_server.string_tools import slugify_task_name_for_source
from fractal_server.tasks.utils import get_collection_log
from fractal_server.tasks.utils import slugify_task_name
from fractal_server.tasks.v1._TaskCollectPip import _TaskCollectPip
from fractal_server.tasks.v1.background_operations import (
background_collect_pip,
Expand Down Expand Up @@ -159,7 +159,7 @@ async def collect_tasks_pip(

# Check that tasks are not already in the DB
for new_task in task_pkg.package_manifest.task_list:
new_task_name_slug = slugify_task_name(new_task.name)
new_task_name_slug = slugify_task_name_for_source(new_task.name)
new_task_source = f"{task_pkg.package_source}:{new_task_name_slug}"
stm = select(Task).where(Task.source == new_task_source)
res = await db.execute(stm)
Expand Down
24 changes: 8 additions & 16 deletions fractal_server/app/routes/api/v2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from .task import router as task_router_v2
from .task_collection import router as task_collection_router_v2
from .task_collection_custom import router as task_collection_router_v2_custom
from .task_collection_ssh import router as task_collection_router_v2_ssh
from .task_legacy import router as task_legacy_router_v2
from .workflow import router as workflow_router_v2
from .workflowtask import router as workflowtask_router_v2
Expand All @@ -30,21 +29,14 @@


settings = Inject(get_settings)
if settings.FRACTAL_RUNNER_BACKEND == "slurm_ssh":
router_api_v2.include_router(
task_collection_router_v2_ssh,
prefix="/task",
tags=["V2 Task Collection"],
)
else:
router_api_v2.include_router(
task_collection_router_v2, prefix="/task", tags=["V2 Task Collection"]
)
router_api_v2.include_router(
task_collection_router_v2_custom,
prefix="/task",
tags=["V2 Task Collection"],
)
router_api_v2.include_router(
task_collection_router_v2, prefix="/task", tags=["V2 Task Collection"]
)
router_api_v2.include_router(
task_collection_router_v2_custom,
prefix="/task",
tags=["V2 Task Collection"],
)
router_api_v2.include_router(task_router_v2, prefix="/task", tags=["V2 Task"])
router_api_v2.include_router(
task_legacy_router_v2, prefix="/task-legacy", tags=["V2 Task Legacy"]
Expand Down
4 changes: 1 addition & 3 deletions fractal_server/app/routes/api/v2/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,7 @@ async def download_job_logs(
# Create and return byte stream for zipped log folder
PREFIX_ZIP = Path(job.working_dir).name
zip_filename = f"{PREFIX_ZIP}_archive.zip"
byte_stream = _zip_folder_to_byte_stream(
folder=job.working_dir, zip_filename=zip_filename
)
byte_stream = _zip_folder_to_byte_stream(folder=job.working_dir)
return StreamingResponse(
iter([byte_stream.getvalue()]),
media_type="application/x-zip-compressed",
Expand Down
89 changes: 72 additions & 17 deletions fractal_server/app/routes/api/v2/task_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from fastapi import BackgroundTasks
from fastapi import Depends
from fastapi import HTTPException
from fastapi import Request
from fastapi import Response
from fastapi import status
from pydantic.error_wrappers import ValidationError
Expand All @@ -27,17 +28,18 @@
from ....security import current_active_user
from ....security import current_active_verified_user
from ....security import User
from fractal_server.string_tools import slugify_task_name_for_source
from fractal_server.tasks.utils import get_absolute_venv_path
from fractal_server.tasks.utils import get_collection_log
from fractal_server.tasks.utils import get_collection_path
from fractal_server.tasks.utils import slugify_task_name
from fractal_server.tasks.v2._TaskCollectPip import _TaskCollectPip
from fractal_server.tasks.v2.background_operations import (
background_collect_pip,
)
from fractal_server.tasks.v2.endpoint_operations import create_package_dir_pip
from fractal_server.tasks.v2.endpoint_operations import download_package
from fractal_server.tasks.v2.endpoint_operations import inspect_package
from fractal_server.tasks.v2.utils import get_python_interpreter_v2


router = APIRouter()
Expand Down Expand Up @@ -66,6 +68,7 @@ async def collect_tasks_pip(
task_collect: TaskCollectPipV2,
background_tasks: BackgroundTasks,
response: Response,
request: Request,
user: User = Depends(current_active_verified_user),
db: AsyncSession = Depends(get_async_db),
) -> CollectionStateReadV2:
Expand All @@ -76,17 +79,26 @@ async def collect_tasks_pip(
of a package and the collection of tasks as advertised in the manifest.
"""

logger = set_logger(logger_name="collect_tasks_pip")
# Get settings
settings = Inject(get_settings)

# Set default python version
# Set/check python version
if task_collect.python_version is None:
settings = Inject(get_settings)
task_collect.python_version = (
settings.FRACTAL_TASKS_PYTHON_DEFAULT_VERSION
)
try:
get_python_interpreter_v2(python_version=task_collect.python_version)
except ValueError:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
f"Python version {task_collect.python_version} is "
"not available for Fractal task collection."
),
)

# Validate payload as _TaskCollectPip, which has more strict checks than
# TaskCollectPip
# Validate payload
try:
task_pkg = _TaskCollectPip(**task_collect.dict(exclude_unset=True))
except ValidationError as e:
Expand All @@ -95,6 +107,37 @@ async def collect_tasks_pip(
detail=f"Invalid task-collection object. Original error: {e}",
)

# END of SSH/non-SSH common part

if settings.FRACTAL_RUNNER_BACKEND == "slurm_ssh":

from fractal_server.tasks.v2.background_operations_ssh import (
background_collect_pip_ssh,
)

# Construct and return state
state = CollectionStateV2(
data=dict(
status=CollectionStatusV2.PENDING, package=task_collect.package
)
)
db.add(state)
await db.commit()

background_tasks.add_task(
background_collect_pip_ssh,
state.id,
task_pkg,
request.app.state.fractal_ssh,
)

response.status_code = status.HTTP_201_CREATED
return state

# Actual non-SSH endpoint

logger = set_logger(logger_name="collect_tasks_pip")

with TemporaryDirectory() as tmpdir:
try:
# Copy or download the package wheel file to tmpdir
Expand Down Expand Up @@ -197,7 +240,7 @@ async def collect_tasks_pip(

# Check that tasks are not already in the DB
for new_task in task_pkg.package_manifest.task_list:
new_task_name_slug = slugify_task_name(new_task.name)
new_task_name_slug = slugify_task_name_for_source(new_task.name)
new_task_source = f"{task_pkg.package_source}:{new_task_name_slug}"
stm = select(TaskV2).where(TaskV2.source == new_task_source)
res = await db.execute(stm)
Expand Down Expand Up @@ -253,6 +296,7 @@ async def check_collection_status(
"""
Check status of background task collection
"""

logger = set_logger(logger_name="check_collection_status")
logger.debug(f"Querying state for state.id={state_id}")
state = await db.get(CollectionStateV2, state_id)
Expand All @@ -263,17 +307,28 @@ async def check_collection_status(
detail=f"No task collection info with id={state_id}",
)

# In some cases (i.e. a successful or ongoing task collection),
# state.data.log is not set; if so, we collect the current logs.
if verbose and not state.data.get("log"):
if "venv_path" not in state.data.keys():
await db.close()
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"No 'venv_path' in CollectionStateV2[{state_id}].data",
settings = Inject(get_settings)
if settings.FRACTAL_RUNNER_BACKEND == "slurm_ssh":
# FIXME SSH: add logic for when data.state["log"] is empty
pass
else:
# Non-SSH mode
# In some cases (i.e. a successful or ongoing task collection),
# state.data.log is not set; if so, we collect the current logs.
if verbose and not state.data.get("log"):
if "venv_path" not in state.data.keys():
await db.close()
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
f"No 'venv_path' in CollectionStateV2[{state_id}].data"
),
)
state.data["log"] = get_collection_log(
Path(state.data["venv_path"])
)
state.data["log"] = get_collection_log(Path(state.data["venv_path"]))
state.data["venv_path"] = str(state.data["venv_path"])
state.data["venv_path"] = str(state.data["venv_path"])

reset_logger_handlers(logger)
await db.close()
return state
Loading

0 comments on commit 6d96a21

Please sign in to comment.