diff --git a/CHANGELOG.md b/CHANGELOG.md index 65517067d7..d5be8b2dae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,21 @@ **Note**: Numbers like (\#1234) point to closed Pull Requests on the fractal-server repository. +# 2.5.0 + +This release removes support for including V1 tasks in V2 workflows. This comes +with changes to the database (data and metadata), to the API, and to the V2 +runner. + +* Runner: + * Deprecate running v1 tasks within v2 workflows (\#1721). +* Database: + * Remove `Task.is_v2_compatible` column (\#1721). + * For table `WorkflowTaskV2`, drop `is_legacy_task` and `task_legacy_id` columns, remove `task_legacy` ORM attribute, make `task_id` required, make `task` required (\#1721). +* API: + * Drop v1-v2-task-compatibility admin endpoint (\#1721). + * Drop `/task-legacy/` endpoint (\#1721). + * Remove legacy task code branches from `WorkflowTaskV2` CRUD endpoints (\#1721). + # 2.4.2 * Runner: @@ -18,6 +34,11 @@ This is mainly a bugfix release, re-implementing a check that was removed in 2.4 # 2.4.0 +This release introduces support for user groups, but without linking it to any +access-control rules (which will be introduced later). + +> NOTE: This release requires running the `fractalctl update-db-data` script. + * App: * Move creation of first user from application startup into `fractalctl set-db` command (\#1738, \#1748). * Add creation of default user group into `fractalctl set-db` command (\#1738). diff --git a/benchmarks/runner/mocks.py b/benchmarks/runner/mocks.py index c7da9e0eb2..20e16c34a9 100644 --- a/benchmarks/runner/mocks.py +++ b/benchmarks/runner/mocks.py @@ -69,59 +69,19 @@ def _set_type(cls, value, values): return "compound" -class TaskV1Mock(BaseModel): - id: int - name: str - command: str # str - source: str = Field(unique=True) - input_type: str - output_type: str - meta: Optional[dict[str, Any]] = Field(default_factory=dict) - - @property - def parallelization_level(self) -> Optional[str]: - try: - return self.meta["parallelization_level"] - except KeyError: - return None - - @property - def is_parallel(self) -> bool: - return bool(self.parallelization_level) - - class WorkflowTaskV2Mock(BaseModel): args_non_parallel: dict[str, Any] = Field(default_factory=dict) args_parallel: dict[str, Any] = Field(default_factory=dict) meta_non_parallel: dict[str, Any] = Field(default_factory=dict) meta_parallel: dict[str, Any] = Field(default_factory=dict) - is_legacy_task: Optional[bool] meta_parallel: Optional[dict[str, Any]] = Field() meta_non_parallel: Optional[dict[str, Any]] = Field() - task: Optional[TaskV2Mock] = None - task_legacy: Optional[TaskV1Mock] = None - is_legacy_task: bool = False + task: TaskV2Mock = None input_filters: dict[str, Any] = Field(default_factory=dict) order: int id: int workflow_id: int = 0 - task_legacy_id: Optional[int] - task_id: Optional[int] - - @root_validator(pre=False) - def _legacy_or_not(cls, values): - is_legacy_task = values["is_legacy_task"] - task = values.get("task") - task_legacy = values.get("task_legacy") - if is_legacy_task: - if task_legacy is None or task is not None: - raise ValueError(f"Invalid WorkflowTaskV2Mock with {values=}") - values["task_legacy_id"] = task_legacy.id - else: - if task is None or task_legacy is not None: - raise ValueError(f"Invalid WorkflowTaskV2Mock with {values=}") - values["task_id"] = task.id - return values + task_id: int @validator("input_filters", always=True) def _default_filters(cls, value): diff --git a/fractal_server/app/db/__init__.py b/fractal_server/app/db/__init__.py index e501c12af8..b0e9137b39 100644 --- a/fractal_server/app/db/__init__.py +++ b/fractal_server/app/db/__init__.py @@ -2,6 +2,7 @@ `db` module, loosely adapted from https://testdriven.io/blog/fastapi-sqlmodel/#async-sqlmodel """ +import sqlite3 from typing import AsyncGenerator from typing import Generator @@ -21,7 +22,9 @@ logger = set_logger(__name__) SQLITE_WARNING_MESSAGE = ( - "SQLite is supported (for version >=3.37) but discouraged in production. " + "SQLite is supported (supported version >=3.37, " + f"current {sqlite3.sqlite_version=}) " + "but discouraged in production. " "Given its partial support for ForeignKey constraints, " "database consistency cannot be guaranteed." ) diff --git a/fractal_server/app/models/v1/task.py b/fractal_server/app/models/v1/task.py index 0d13b40ddb..21f784c791 100644 --- a/fractal_server/app/models/v1/task.py +++ b/fractal_server/app/models/v1/task.py @@ -5,7 +5,6 @@ from pydantic import HttpUrl from sqlalchemy import Column -from sqlalchemy import sql from sqlalchemy.types import JSON from sqlmodel import Field from sqlmodel import SQLModel @@ -49,10 +48,6 @@ class Task(_TaskBaseV1, SQLModel, table=True): docs_info: Optional[str] = None docs_link: Optional[HttpUrl] = None - is_v2_compatible: bool = Field( - default=False, sa_column_kwargs={"server_default": sql.false()} - ) - @property def parallelization_level(self) -> Optional[str]: try: diff --git a/fractal_server/app/models/v2/workflowtask.py b/fractal_server/app/models/v2/workflowtask.py index 75e0f28b2a..32f64215a7 100644 --- a/fractal_server/app/models/v2/workflowtask.py +++ b/fractal_server/app/models/v2/workflowtask.py @@ -8,7 +8,6 @@ from sqlmodel import Relationship from sqlmodel import SQLModel -from ..v1.task import Task from .task import TaskV2 @@ -37,13 +36,6 @@ class Config: ) # Task - is_legacy_task: bool task_type: str - task_id: Optional[int] = Field(foreign_key="taskv2.id") - task: Optional[TaskV2] = Relationship( - sa_relationship_kwargs=dict(lazy="selectin") - ) - task_legacy_id: Optional[int] = Field(foreign_key="task.id") - task_legacy: Optional[Task] = Relationship( - sa_relationship_kwargs=dict(lazy="selectin") - ) + task_id: int = Field(foreign_key="taskv2.id") + task: TaskV2 = Relationship(sa_relationship_kwargs=dict(lazy="selectin")) diff --git a/fractal_server/app/routes/admin/v2.py b/fractal_server/app/routes/admin/v2.py index c51a751922..7fdba6f230 100644 --- a/fractal_server/app/routes/admin/v2.py +++ b/fractal_server/app/routes/admin/v2.py @@ -24,7 +24,6 @@ from ....zip_tools import _zip_folder_to_byte_stream_iterator from ...db import AsyncSession from ...db import get_async_db -from ...models.v1 import Task from ...models.v2 import JobV2 from ...models.v2 import ProjectV2 from ...models.v2 import TaskV2 @@ -281,35 +280,6 @@ async def download_job_logs( ) -class TaskCompatibility(BaseModel): - is_v2_compatible: bool - - -@router_admin_v2.patch( - "/task-v1/{task_id}/", - status_code=status.HTTP_200_OK, -) -async def flag_task_v1_as_v2_compatible( - task_id: int, - compatibility: TaskCompatibility, - user: UserOAuth = Depends(current_active_superuser), - db: AsyncSession = Depends(get_async_db), -) -> Response: - - task = await db.get(Task, task_id) - if task is None: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Task {task_id} not found", - ) - - task.is_v2_compatible = compatibility.is_v2_compatible - await db.commit() - await db.close() - - return Response(status_code=status.HTTP_200_OK) - - class TaskV2Minimal(BaseModel): id: int diff --git a/fractal_server/app/routes/api/v2/__init__.py b/fractal_server/app/routes/api/v2/__init__.py index 9231a17bef..cc4eaa4483 100644 --- a/fractal_server/app/routes/api/v2/__init__.py +++ b/fractal_server/app/routes/api/v2/__init__.py @@ -12,7 +12,6 @@ from .task import router as task_router_v2 from .task_collection import router as task_collection_router_v2 from .task_collection_custom import router as task_collection_router_v2_custom -from .task_legacy import router as task_legacy_router_v2 from .workflow import router as workflow_router_v2 from .workflowtask import router as workflowtask_router_v2 from fractal_server.config import get_settings @@ -38,9 +37,6 @@ tags=["V2 Task Collection"], ) router_api_v2.include_router(task_router_v2, prefix="/task", tags=["V2 Task"]) -router_api_v2.include_router( - task_legacy_router_v2, prefix="/task-legacy", tags=["V2 Task Legacy"] -) router_api_v2.include_router(workflow_router_v2, tags=["V2 Workflow"]) router_api_v2.include_router(workflowtask_router_v2, tags=["V2 WorkflowTask"]) router_api_v2.include_router(status_router_v2, tags=["V2 Status"]) diff --git a/fractal_server/app/routes/api/v2/_aux_functions.py b/fractal_server/app/routes/api/v2/_aux_functions.py index 1dc3f1532f..c3280162bf 100644 --- a/fractal_server/app/routes/api/v2/_aux_functions.py +++ b/fractal_server/app/routes/api/v2/_aux_functions.py @@ -13,7 +13,6 @@ from sqlmodel.sql.expression import SelectOfScalar from ....db import AsyncSession -from ....models.v1 import Task from ....models.v2 import DatasetV2 from ....models.v2 import JobV2 from ....models.v2 import LinkUserProjectV2 @@ -389,7 +388,6 @@ async def _workflow_insert_task( *, workflow_id: int, task_id: int, - is_legacy_task: bool = False, order: Optional[int] = None, meta_parallel: Optional[dict[str, Any]] = None, meta_non_parallel: Optional[dict[str, Any]] = None, @@ -404,7 +402,7 @@ async def _workflow_insert_task( Args: workflow_id: task_id: - is_legacy_task: + order: meta_parallel: meta_non_parallel: @@ -420,52 +418,21 @@ async def _workflow_insert_task( if order is None: order = len(db_workflow.task_list) - # Get task from db, and extract default arguments via a Task property - # method - # NOTE: this logic remains there for V1 tasks only. When we deprecate V1 - # tasks, we can simplify this block - if is_legacy_task is True: - db_task = await db.get(Task, task_id) - if db_task is None: - raise ValueError(f"Task {task_id} not found.") - task_type = "parallel" - - final_args_parallel = db_task.default_args_from_args_schema.copy() - final_args_non_parallel = {} - final_meta_parallel = (db_task.meta or {}).copy() - final_meta_non_parallel = {} - - else: - db_task = await db.get(TaskV2, task_id) - if db_task is None: - raise ValueError(f"TaskV2 {task_id} not found.") - task_type = db_task.type - - final_args_non_parallel = {} - final_args_parallel = {} - final_meta_parallel = (db_task.meta_parallel or {}).copy() - final_meta_non_parallel = (db_task.meta_non_parallel or {}).copy() - - # Combine arg_parallel - if args_parallel is not None: - for k, v in args_parallel.items(): - final_args_parallel[k] = v - if final_args_parallel == {}: - final_args_parallel = None - # Combine arg_non_parallel - if args_non_parallel is not None: - for k, v in args_non_parallel.items(): - final_args_non_parallel[k] = v - if final_args_non_parallel == {}: - final_args_non_parallel = None + # Get task from db + db_task = await db.get(TaskV2, task_id) + if db_task is None: + raise ValueError(f"TaskV2 {task_id} not found.") + task_type = db_task.type # Combine meta_parallel (higher priority) # and db_task.meta_parallel (lower priority) + final_meta_parallel = (db_task.meta_parallel or {}).copy() final_meta_parallel.update(meta_parallel or {}) if final_meta_parallel == {}: final_meta_parallel = None # Combine meta_non_parallel (higher priority) # and db_task.meta_non_parallel (lower priority) + final_meta_non_parallel = (db_task.meta_non_parallel or {}).copy() final_meta_non_parallel.update(meta_non_parallel or {}) if final_meta_non_parallel == {}: final_meta_non_parallel = None @@ -479,11 +446,9 @@ async def _workflow_insert_task( # Create DB entry wf_task = WorkflowTaskV2( task_type=task_type, - is_legacy_task=is_legacy_task, - task_id=(task_id if not is_legacy_task else None), - task_legacy_id=(task_id if is_legacy_task else None), - args_non_parallel=final_args_non_parallel, - args_parallel=final_args_parallel, + task_id=task_id, + args_non_parallel=args_non_parallel, + args_parallel=args_parallel, meta_parallel=final_meta_parallel, meta_non_parallel=final_meta_non_parallel, **input_filters_kwarg, diff --git a/fractal_server/app/routes/api/v2/task_legacy.py b/fractal_server/app/routes/api/v2/task_legacy.py deleted file mode 100644 index 787b420b94..0000000000 --- a/fractal_server/app/routes/api/v2/task_legacy.py +++ /dev/null @@ -1,59 +0,0 @@ -from fastapi import APIRouter -from fastapi import Depends -from fastapi import HTTPException -from fastapi import status -from sqlmodel import select - -from fractal_server.app.db import AsyncSession -from fractal_server.app.db import get_async_db -from fractal_server.app.models import UserOAuth -from fractal_server.app.models.v1 import Task as TaskV1 -from fractal_server.app.routes.auth import current_active_user -from fractal_server.app.schemas.v2 import TaskLegacyReadV2 -from fractal_server.logger import set_logger - -router = APIRouter() - -logger = set_logger(__name__) - - -@router.get("/", response_model=list[TaskLegacyReadV2]) -async def get_list_task_legacy( - args_schema: bool = True, - only_v2_compatible: bool = False, - user: UserOAuth = Depends(current_active_user), - db: AsyncSession = Depends(get_async_db), -) -> list[TaskLegacyReadV2]: - """ - Get list of available legacy tasks - """ - stm = select(TaskV1) - if only_v2_compatible: - stm = stm.where(TaskV1.is_v2_compatible) - res = await db.execute(stm) - task_list = res.scalars().all() - await db.close() - if args_schema is False: - for task in task_list: - setattr(task, "args_schema", None) - - return task_list - - -@router.get("/{task_id}/", response_model=TaskLegacyReadV2) -async def get_task_legacy( - task_id: int, - user: UserOAuth = Depends(current_active_user), - db: AsyncSession = Depends(get_async_db), -) -> TaskLegacyReadV2: - """ - Get info on a specific legacy task - """ - task = await db.get(TaskV1, task_id) - await db.close() - if not task: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"TaskV1[{task_id}] not found", - ) - return task diff --git a/fractal_server/app/routes/api/v2/workflow.py b/fractal_server/app/routes/api/v2/workflow.py index a3ab3bf87f..f26708a919 100644 --- a/fractal_server/app/routes/api/v2/workflow.py +++ b/fractal_server/app/routes/api/v2/workflow.py @@ -11,7 +11,6 @@ from .....logger import set_logger from ....db import AsyncSession from ....db import get_async_db -from ....models.v1 import Task as TaskV1 from ....models.v2 import JobV2 from ....models.v2 import ProjectV2 from ....models.v2 import TaskV2 @@ -242,23 +241,13 @@ async def export_worfklow( # Emit a warning when exporting a workflow with custom tasks logger = set_logger(None) for wftask in workflow.task_list: - if wftask.is_legacy_task: - if wftask.task_legacy.owner is not None: - logger.warning( - f"Custom tasks (like the one with " - f"id={wftask.task_legacy_id} and " - f"source='{wftask.task_legacy.source}') are not meant to " - "be portable; re-importing this workflow may not work as " - "expected." - ) - else: - if wftask.task.owner is not None: - logger.warning( - f"Custom tasks (like the one with id={wftask.task_id} and " - f'source="{wftask.task.source}") are not meant to be ' - "portable; re-importing this workflow may not work as " - "expected." - ) + if wftask.task.owner is not None: + logger.warning( + f"Custom tasks (like the one with id={wftask.task_id} and " + f'source="{wftask.task.source}") are not meant to be ' + "portable; re-importing this workflow may not work as " + "expected." + ) reset_logger_handlers(logger) await db.close() @@ -296,38 +285,22 @@ async def import_workflow( # Check that all required tasks are available source_to_id = {} - source_to_id_legacy = {} for wf_task in workflow.task_list: - if wf_task.is_legacy_task is True: - source = wf_task.task_legacy.source - if source not in source_to_id_legacy.keys(): - stm = select(TaskV1).where(TaskV1.source == source) - tasks_by_source = (await db.execute(stm)).scalars().all() - if len(tasks_by_source) != 1: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=( - f"Found {len(tasks_by_source)} tasks legacy " - f"with {source=}." - ), - ) - source_to_id_legacy[source] = tasks_by_source[0].id - else: - source = wf_task.task.source - if source not in source_to_id.keys(): - stm = select(TaskV2).where(TaskV2.source == source) - tasks_by_source = (await db.execute(stm)).scalars().all() - if len(tasks_by_source) != 1: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=( - f"Found {len(tasks_by_source)} tasks " - f"with {source=}." - ), - ) - source_to_id[source] = tasks_by_source[0].id + source = wf_task.task.source + if source not in source_to_id.keys(): + stm = select(TaskV2).where(TaskV2.source == source) + tasks_by_source = (await db.execute(stm)).scalars().all() + if len(tasks_by_source) != 1: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=( + f"Found {len(tasks_by_source)} tasks " + f"with {source=}." + ), + ) + source_to_id[source] = tasks_by_source[0].id # Create new Workflow (with empty task_list) db_workflow = WorkflowV2( @@ -341,15 +314,11 @@ async def import_workflow( # Insert tasks for wf_task in workflow.task_list: - if wf_task.is_legacy_task is True: - source = wf_task.task_legacy.source - task_id = source_to_id_legacy[source] - else: - source = wf_task.task.source - task_id = source_to_id[source] + source = wf_task.task.source + task_id = source_to_id[source] new_wf_task = WorkflowTaskCreateV2( - **wf_task.dict(exclude_none=True, exclude={"task", "task_legacy"}) + **wf_task.dict(exclude_none=True, exclude={"task"}) ) # Insert task await _workflow_insert_task( diff --git a/fractal_server/app/routes/api/v2/workflowtask.py b/fractal_server/app/routes/api/v2/workflowtask.py index beba04028c..12504bb953 100644 --- a/fractal_server/app/routes/api/v2/workflowtask.py +++ b/fractal_server/app/routes/api/v2/workflowtask.py @@ -9,7 +9,6 @@ from ....db import AsyncSession from ....db import get_async_db -from ....models.v1 import Task from ....models.v2 import TaskV2 from ....schemas.v2 import WorkflowTaskCreateV2 from ....schemas.v2 import WorkflowTaskReadV2 @@ -44,27 +43,14 @@ async def create_workflowtask( project_id=project_id, workflow_id=workflow_id, user_id=user.id, db=db ) - if new_task.is_legacy_task is True: - task = await db.get(Task, task_id) - if not task: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Task {task_id} not found.", - ) - if not task.is_v2_compatible: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=f"Task {task_id} is not V2-compatible.", - ) - else: - task = await db.get(TaskV2, task_id) - if not task: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"TaskV2 {task_id} not found.", - ) + task = await db.get(TaskV2, task_id) + if not task: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"TaskV2 {task_id} not found.", + ) - if new_task.is_legacy_task is True or task.type == "parallel": + if task.type == "parallel": if ( new_task.meta_non_parallel is not None or new_task.args_non_parallel is not None @@ -74,7 +60,7 @@ async def create_workflowtask( detail=( "Cannot set `WorkflowTaskV2.meta_non_parallel` or " "`WorkflowTask.args_non_parallel` if the associated Task " - "is `parallel` (or legacy)." + "is `parallel`." ), ) elif task.type == "non_parallel": @@ -93,7 +79,6 @@ async def create_workflowtask( workflow_task = await _workflow_insert_task( workflow_id=workflow.id, - is_legacy_task=new_task.is_legacy_task, task_id=task_id, order=new_task.order, meta_non_parallel=new_task.meta_non_parallel, @@ -182,16 +167,7 @@ async def update_workflowtask( for key, value in workflow_task_update.dict(exclude_unset=True).items(): if key == "args_parallel": # Get default arguments via a Task property method - if db_wf_task.is_legacy_task: - default_args = ( - db_wf_task.task_legacy.default_args_from_args_schema - ) - actual_args = deepcopy(default_args) - if value is not None: - for k, v in value.items(): - actual_args[k] = v - else: - actual_args = deepcopy(value) + actual_args = deepcopy(value) if not actual_args: actual_args = None setattr(db_wf_task, key, actual_args) diff --git a/fractal_server/app/runner/v2/__init__.py b/fractal_server/app/runner/v2/__init__.py index 31455cd9ba..c6c8a9f5f2 100644 --- a/fractal_server/app/runner/v2/__init__.py +++ b/fractal_server/app/runner/v2/__init__.py @@ -208,10 +208,7 @@ async def submit_workflow( # Create all tasks subfolders for order in range(job.first_task_index, job.last_task_index + 1): this_wftask = workflow.task_list[order] - if this_wftask.is_legacy_task: - task_name = this_wftask.task_legacy.name - else: - task_name = this_wftask.task.name + task_name = this_wftask.task.name subfolder_name = task_subfolder_name( order=order, task_name=task_name, diff --git a/fractal_server/app/runner/v2/_slurm_common/get_slurm_config.py b/fractal_server/app/runner/v2/_slurm_common/get_slurm_config.py index 362873cd96..06abb49e16 100644 --- a/fractal_server/app/runner/v2/_slurm_common/get_slurm_config.py +++ b/fractal_server/app/runner/v2/_slurm_common/get_slurm_config.py @@ -116,10 +116,7 @@ def get_slurm_config( slurm_dict["mem_per_task_MB"] = mem_per_task_MB # Job name - if wftask.is_legacy_task: - job_name = wftask.task_legacy.name.replace(" ", "_") - else: - job_name = wftask.task.name.replace(" ", "_") + job_name = wftask.task.name.replace(" ", "_") slurm_dict["job_name"] = job_name # Optional SLURM arguments and extra lines diff --git a/fractal_server/app/runner/v2/handle_failed_job.py b/fractal_server/app/runner/v2/handle_failed_job.py index b2deba5aac..0886743de8 100644 --- a/fractal_server/app/runner/v2/handle_failed_job.py +++ b/fractal_server/app/runner/v2/handle_failed_job.py @@ -96,15 +96,8 @@ def assemble_history_failed_job( # Part 3/B: Append failed task to history if failed_wftask is not None: - failed_wftask_dump = failed_wftask.model_dump( - exclude={"task", "task_legacy"} - ) - if failed_wftask.is_legacy_task: - failed_wftask_dump[ - "task_legacy" - ] = failed_wftask.task_legacy.model_dump() - else: - failed_wftask_dump["task"] = failed_wftask.task.model_dump() + failed_wftask_dump = failed_wftask.model_dump(exclude={"task"}) + failed_wftask_dump["task"] = failed_wftask.task.model_dump() new_history_item = dict( workflowtask=failed_wftask_dump, status=WorkflowTaskStatusTypeV2.FAILED, diff --git a/fractal_server/app/runner/v2/runner.py b/fractal_server/app/runner/v2/runner.py index c6150a71d1..5a87ac8a58 100644 --- a/fractal_server/app/runner/v2/runner.py +++ b/fractal_server/app/runner/v2/runner.py @@ -17,7 +17,6 @@ from ..filenames import HISTORY_FILENAME from ..filenames import IMAGES_FILENAME from .runner_functions import no_op_submit_setup_call -from .runner_functions import run_v1_task_parallel from .runner_functions import run_v2_task_compound from .runner_functions import run_v2_task_non_parallel from .runner_functions import run_v2_task_parallel @@ -53,16 +52,8 @@ def execute_tasks_v2( for wftask in wf_task_list: task = wftask.task - task_legacy = wftask.task_legacy - if wftask.is_legacy_task: - task_name = task_legacy.name - logger.debug( - f"SUBMIT {wftask.order}-th task " - f'(legacy, name="{task_name}")' - ) - else: - task_name = task.name - logger.debug(f'SUBMIT {wftask.order}-th task (name="{task_name}")') + task_name = task.name + logger.debug(f'SUBMIT {wftask.order}-th task (name="{task_name}")') # PRE TASK EXECUTION @@ -78,67 +69,53 @@ def execute_tasks_v2( filters=Filters(**pre_filters), ) # Verify that filtered images comply with task input_types - if not wftask.is_legacy_task: - for image in filtered_images: - if not match_filter(image, Filters(types=task.input_types)): - raise JobExecutionError( - "Invalid filtered image list\n" - f"Task input types: {task.input_types=}\n" - f'Image zarr_url: {image["zarr_url"]}\n' - f'Image types: {image["types"]}\n' - ) + for image in filtered_images: + if not match_filter(image, Filters(types=task.input_types)): + raise JobExecutionError( + "Invalid filtered image list\n" + f"Task input types: {task.input_types=}\n" + f'Image zarr_url: {image["zarr_url"]}\n' + f'Image types: {image["types"]}\n' + ) # TASK EXECUTION (V2) - if not wftask.is_legacy_task: - if task.type == "non_parallel": - current_task_output = run_v2_task_non_parallel( - images=filtered_images, - zarr_dir=zarr_dir, - wftask=wftask, - task=task, - workflow_dir_local=workflow_dir_local, - workflow_dir_remote=workflow_dir_remote, - executor=executor, - logger_name=logger_name, - submit_setup_call=submit_setup_call, - ) - elif task.type == "parallel": - current_task_output = run_v2_task_parallel( - images=filtered_images, - wftask=wftask, - task=task, - workflow_dir_local=workflow_dir_local, - workflow_dir_remote=workflow_dir_remote, - executor=executor, - logger_name=logger_name, - submit_setup_call=submit_setup_call, - ) - elif task.type == "compound": - current_task_output = run_v2_task_compound( - images=filtered_images, - zarr_dir=zarr_dir, - wftask=wftask, - task=task, - workflow_dir_local=workflow_dir_local, - workflow_dir_remote=workflow_dir_remote, - executor=executor, - logger_name=logger_name, - submit_setup_call=submit_setup_call, - ) - else: - raise ValueError(f"Unexpected error: Invalid {task.type=}.") - # TASK EXECUTION (V1) - else: - current_task_output = run_v1_task_parallel( + if task.type == "non_parallel": + current_task_output = run_v2_task_non_parallel( images=filtered_images, + zarr_dir=zarr_dir, wftask=wftask, - task_legacy=task_legacy, + task=task, + workflow_dir_local=workflow_dir_local, + workflow_dir_remote=workflow_dir_remote, executor=executor, logger_name=logger_name, + submit_setup_call=submit_setup_call, + ) + elif task.type == "parallel": + current_task_output = run_v2_task_parallel( + images=filtered_images, + wftask=wftask, + task=task, + workflow_dir_local=workflow_dir_local, + workflow_dir_remote=workflow_dir_remote, + executor=executor, + logger_name=logger_name, + submit_setup_call=submit_setup_call, + ) + elif task.type == "compound": + current_task_output = run_v2_task_compound( + images=filtered_images, + zarr_dir=zarr_dir, + wftask=wftask, + task=task, workflow_dir_local=workflow_dir_local, workflow_dir_remote=workflow_dir_remote, + executor=executor, + logger_name=logger_name, submit_setup_call=submit_setup_call, ) + else: + raise ValueError(f"Unexpected error: Invalid {task.type=}.") # POST TASK EXECUTION @@ -191,8 +168,7 @@ def execute_tasks_v2( # Update image attributes/types with task output and manifest updated_attributes.update(image["attributes"]) updated_types.update(image["types"]) - if not wftask.is_legacy_task: - updated_types.update(task.output_types) + updated_types.update(task.output_types) # Unset attributes with None value updated_attributes = { @@ -249,8 +225,7 @@ def execute_tasks_v2( if value is not None } updated_types.update(image["types"]) - if not wftask.is_legacy_task: - updated_types.update(task.output_types) + updated_types.update(task.output_types) new_image = dict( zarr_url=image["zarr_url"], origin=image["origin"], @@ -282,10 +257,7 @@ def execute_tasks_v2( ) # Find manifest ouptut types - if wftask.is_legacy_task: - types_from_manifest = {} - else: - types_from_manifest = task.output_types + types_from_manifest = task.output_types # Find task-output types if current_task_output.filters is not None: diff --git a/fractal_server/app/runner/v2/runner_functions.py b/fractal_server/app/runner/v2/runner_functions.py index e7ab2aa819..7d6fce1a4f 100644 --- a/fractal_server/app/runner/v2/runner_functions.py +++ b/fractal_server/app/runner/v2/runner_functions.py @@ -16,8 +16,6 @@ from .runner_functions_low_level import run_single_task from .task_interface import InitTaskOutput from .task_interface import TaskOutput -from .v1_compat import convert_v2_args_into_v1 -from fractal_server.app.models.v1 import Task as TaskV1 from fractal_server.app.models.v2 import TaskV2 from fractal_server.app.models.v2 import WorkflowTaskV2 from fractal_server.app.runner.components import _COMPONENT_KEY_ @@ -28,7 +26,6 @@ "run_v2_task_non_parallel", "run_v2_task_parallel", "run_v2_task_compound", - "run_v1_task_parallel", ] MAX_PARALLELIZATION_LIST_SIZE = 20_000 @@ -317,58 +314,3 @@ def run_v2_task_compound( merged_output = merge_outputs(outputs) return merged_output - - -def run_v1_task_parallel( - *, - images: list[dict[str, Any]], - task_legacy: TaskV1, - wftask: WorkflowTaskV2, - executor: Executor, - workflow_dir_local: Path, - workflow_dir_remote: Optional[Path] = None, - logger_name: Optional[str] = None, - submit_setup_call: Callable = no_op_submit_setup_call, -) -> TaskOutput: - - _check_parallelization_list_size(images) - - executor_options = _get_executor_options( - wftask=wftask, - workflow_dir_local=workflow_dir_local, - workflow_dir_remote=workflow_dir_remote, - submit_setup_call=submit_setup_call, - which_type="parallel", - ) - - list_function_kwargs = [] - for ind, image in enumerate(images): - list_function_kwargs.append( - convert_v2_args_into_v1( - kwargs_v2=dict( - zarr_url=image["zarr_url"], - **(wftask.args_parallel or {}), - ), - parallelization_level=task_legacy.parallelization_level, - ), - ) - list_function_kwargs[-1][_COMPONENT_KEY_] = _index_to_component(ind) - - results_iterator = executor.map( - functools.partial( - run_single_task, - wftask=wftask, - command=task_legacy.command, - workflow_dir_local=workflow_dir_local, - workflow_dir_remote=workflow_dir_remote, - is_task_v1=True, - ), - list_function_kwargs, - **executor_options, - ) - # Explicitly iterate over the whole list, so that all futures are waited - list(results_iterator) - - # Ignore any output metadata for V1 tasks, and return an empty object - out = TaskOutput() - return out diff --git a/fractal_server/app/runner/v2/runner_functions_low_level.py b/fractal_server/app/runner/v2/runner_functions_low_level.py index a41afa152b..8ba2d68dc1 100644 --- a/fractal_server/app/runner/v2/runner_functions_low_level.py +++ b/fractal_server/app/runner/v2/runner_functions_low_level.py @@ -61,7 +61,6 @@ def run_single_task( workflow_dir_local: Path, workflow_dir_remote: Optional[Path] = None, logger_name: Optional[str] = None, - is_task_v1: bool = False, ) -> dict[str, Any]: """ Runs within an executor. @@ -73,10 +72,7 @@ def run_single_task( if not workflow_dir_remote: workflow_dir_remote = workflow_dir_local - if is_task_v1: - task_name = wftask.task_legacy.name - else: - task_name = wftask.task.name + task_name = wftask.task.name component = args.pop(_COMPONENT_KEY_, None) task_files = get_task_file_paths( @@ -92,18 +88,11 @@ def run_single_task( json.dump(args, f, indent=2) # Assemble full command - if is_task_v1: - full_command = ( - f"{command} " - f"--json {task_files.args.as_posix()} " - f"--metadata-out {task_files.metadiff.as_posix()}" - ) - else: - full_command = ( - f"{command} " - f"--args-json {task_files.args.as_posix()} " - f"--out-json {task_files.metadiff.as_posix()}" - ) + full_command = ( + f"{command} " + f"--args-json {task_files.args.as_posix()} " + f"--out-json {task_files.metadiff.as_posix()}" + ) try: _call_command_wrapper( @@ -113,10 +102,7 @@ def run_single_task( except TaskExecutionError as e: e.workflow_task_order = wftask.order e.workflow_task_id = wftask.id - if wftask.is_legacy_task: - e.task_name = wftask.task_legacy.name - else: - e.task_name = wftask.task.name + e.task_name = wftask.task.name raise e try: diff --git a/fractal_server/app/runner/v2/v1_compat.py b/fractal_server/app/runner/v2/v1_compat.py deleted file mode 100644 index f9accc135b..0000000000 --- a/fractal_server/app/runner/v2/v1_compat.py +++ /dev/null @@ -1,31 +0,0 @@ -from copy import deepcopy -from pathlib import Path -from typing import Any - - -def convert_v2_args_into_v1( - kwargs_v2: dict[str, Any], - parallelization_level: str = "image", -) -> dict[str, Any]: - kwargs_v1 = deepcopy(kwargs_v2) - - zarr_url = kwargs_v1.pop("zarr_url") - input_path = Path(zarr_url).parents[3].as_posix() - image_component = zarr_url.replace(input_path, "").lstrip("/") - if parallelization_level == "image": - component = image_component - elif parallelization_level == "well": - component = str(Path(image_component).parent) - elif parallelization_level == "plate": - component = str(Path(image_component).parents[2]) - else: - raise ValueError(f"Invalid {parallelization_level=}.") - - kwargs_v1.update( - input_paths=[input_path], - output_path=input_path, - metadata={}, - component=component, - ) - - return kwargs_v1 diff --git a/fractal_server/app/schemas/v2/__init__.py b/fractal_server/app/schemas/v2/__init__.py index 560e3d7700..1660d48ef7 100644 --- a/fractal_server/app/schemas/v2/__init__.py +++ b/fractal_server/app/schemas/v2/__init__.py @@ -20,7 +20,6 @@ from .task import TaskCreateV2 # noqa F401 from .task import TaskExportV2 # noqa F401 from .task import TaskImportV2 # noqa F401 -from .task import TaskLegacyReadV2 # noqa F401 from .task import TaskReadV2 # noqa F401 from .task import TaskUpdateV2 # noqa F401 from .task_collection import CollectionStateReadV2 # noqa F401 diff --git a/fractal_server/app/schemas/v2/dumps.py b/fractal_server/app/schemas/v2/dumps.py index 8aa625ed58..1b733b064a 100644 --- a/fractal_server/app/schemas/v2/dumps.py +++ b/fractal_server/app/schemas/v2/dumps.py @@ -12,9 +12,7 @@ from pydantic import BaseModel from pydantic import Extra -from pydantic import root_validator -from fractal_server.app.schemas.v1.dumps import TaskDumpV1 from fractal_server.images import Filters @@ -45,29 +43,10 @@ class WorkflowTaskDumpV2(BaseModel): workflow_id: int order: Optional[int] - is_legacy_task: bool - input_filters: Filters - task_id: Optional[int] - task: Optional[TaskDumpV2] - task_legacy_id: Optional[int] - task_legacy: Optional[TaskDumpV1] - - # Validators - @root_validator - def task_v1_or_v2(cls, values): - v1 = values.get("task_legacy_id") - v2 = values.get("task_id") - if ((v1 is not None) and (v2 is not None)) or ( - (v1 is None) and (v2 is None) - ): - message = "both" if (v1 and v2) else "none" - raise ValueError( - "One and only one must be provided between " - f"'task_legacy_id' and 'task_id' (you provided {message})" - ) - return values + task_id: int + task: TaskDumpV2 class WorkflowDumpV2(BaseModel, extra=Extra.forbid): diff --git a/fractal_server/app/schemas/v2/task.py b/fractal_server/app/schemas/v2/task.py index b3b650dd24..55957e5b47 100644 --- a/fractal_server/app/schemas/v2/task.py +++ b/fractal_server/app/schemas/v2/task.py @@ -11,7 +11,6 @@ from .._validators import valdictkeys from .._validators import valstr -from ..v1.task import TaskReadV1 class TaskCreateV2(BaseModel, extra=Extra.forbid): @@ -101,10 +100,6 @@ class TaskReadV2(BaseModel): output_types: dict[str, bool] -class TaskLegacyReadV2(TaskReadV1): - is_v2_compatible: bool - - class TaskUpdateV2(BaseModel): name: Optional[str] diff --git a/fractal_server/app/schemas/v2/workflowtask.py b/fractal_server/app/schemas/v2/workflowtask.py index 9303dca17e..1edd35e2d7 100644 --- a/fractal_server/app/schemas/v2/workflowtask.py +++ b/fractal_server/app/schemas/v2/workflowtask.py @@ -5,16 +5,12 @@ from pydantic import BaseModel from pydantic import Extra from pydantic import Field -from pydantic import root_validator from pydantic import validator from .._validators import valdictkeys from .._validators import valint -from ..v1.task import TaskExportV1 -from ..v1.task import TaskImportV1 from .task import TaskExportV2 from .task import TaskImportV2 -from .task import TaskLegacyReadV2 from .task import TaskReadV2 from fractal_server.images import Filters @@ -49,8 +45,6 @@ class WorkflowTaskCreateV2(BaseModel, extra=Extra.forbid): order: Optional[int] input_filters: Filters = Field(default_factory=Filters) - is_legacy_task: bool = False - # Validators _meta_non_parallel = validator("meta_non_parallel", allow_reuse=True)( valdictkeys("meta_non_parallel") @@ -88,18 +82,6 @@ def validate_args_parallel(cls, value): ) return value - @root_validator - def validate_legacy_task(cls, values): - if values["is_legacy_task"] and ( - values.get("meta_non_parallel") is not None - or values.get("args_non_parallel") is not None - ): - raise ValueError( - "If Task is legacy, 'args_non_parallel' and 'meta_non_parallel" - "must be None" - ) - return values - class WorkflowTaskReadV2(BaseModel): @@ -115,12 +97,9 @@ class WorkflowTaskReadV2(BaseModel): input_filters: Filters - is_legacy_task: bool task_type: str - task_id: Optional[int] - task: Optional[TaskReadV2] - task_legacy_id: Optional[int] - task_legacy: Optional[TaskLegacyReadV2] + task_id: int + task: TaskReadV2 class WorkflowTaskUpdateV2(BaseModel): @@ -177,9 +156,7 @@ class WorkflowTaskImportV2(BaseModel): input_filters: Optional[Filters] = None - is_legacy_task: bool = False - task: Optional[TaskImportV2] = None - task_legacy: Optional[TaskImportV1] = None + task: TaskImportV2 _meta_non_parallel = validator("meta_non_parallel", allow_reuse=True)( valdictkeys("meta_non_parallel") @@ -203,6 +180,4 @@ class WorkflowTaskExportV2(BaseModel): args_parallel: Optional[dict[str, Any]] = None input_filters: Filters = Field(default_factory=Filters) - is_legacy_task: bool = False - task: Optional[TaskExportV2] - task_legacy: Optional[TaskExportV1] + task: TaskExportV2 diff --git a/fractal_server/migrations/env.py b/fractal_server/migrations/env.py index 390d177bb9..ac909554bb 100644 --- a/fractal_server/migrations/env.py +++ b/fractal_server/migrations/env.py @@ -7,6 +7,7 @@ from fractal_server.app import models # noqa from fractal_server.config import get_settings +from fractal_server.migrations.naming_convention import NAMING_CONVENTION from fractal_server.syringe import Inject # this is the Alembic Config object, which provides @@ -25,13 +26,7 @@ # from myapp import mymodel # target_metadata = mymodel.Base.metadata target_metadata = SQLModel.metadata -target_metadata.naming_convention = { - "ix": "ix_%(column_0_label)s", - "uq": "uq_%(table_name)s_%(column_0_name)s", - "ck": "ck_%(table_name)s_`%(constraint_name)s`", - "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", - "pk": "pk_%(table_name)s", -} +target_metadata.naming_convention = NAMING_CONVENTION # other values from the config, defined by the needs of env.py, # can be acquired: @@ -58,6 +53,7 @@ def run_migrations_offline() -> None: target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}, + render_as_batch=True, ) with context.begin_transaction(): @@ -68,6 +64,7 @@ def do_run_migrations(connection: Connection) -> None: context.configure( connection=connection, target_metadata=target_metadata, + render_as_batch=True, ) with context.begin_transaction(): diff --git a/fractal_server/migrations/naming_convention.py b/fractal_server/migrations/naming_convention.py new file mode 100644 index 0000000000..9787bfc2e6 --- /dev/null +++ b/fractal_server/migrations/naming_convention.py @@ -0,0 +1,7 @@ +NAMING_CONVENTION = { + "ix": "ix_%(column_0_label)s", + "uq": "uq_%(table_name)s_%(column_0_name)s", + "ck": "ck_%(table_name)s_`%(constraint_name)s`", + "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", + "pk": "pk_%(table_name)s", +} diff --git a/fractal_server/migrations/versions/091b01f51f88_add_usergroup_and_linkusergroup_table.py b/fractal_server/migrations/versions/091b01f51f88_add_usergroup_and_linkusergroup_table.py index 45d4565fdb..e45bfcc434 100644 --- a/fractal_server/migrations/versions/091b01f51f88_add_usergroup_and_linkusergroup_table.py +++ b/fractal_server/migrations/versions/091b01f51f88_add_usergroup_and_linkusergroup_table.py @@ -1,4 +1,4 @@ -"""“Add_usergroup_and_linkusergroup_table” +"""Add_usergroup_and_linkusergroup_table Revision ID: 091b01f51f88 Revises: 5bf02391cfef diff --git a/fractal_server/migrations/versions/501961cfcd85_remove_link_between_v1_and_v2_tasks_.py b/fractal_server/migrations/versions/501961cfcd85_remove_link_between_v1_and_v2_tasks_.py new file mode 100644 index 0000000000..41db4dffe7 --- /dev/null +++ b/fractal_server/migrations/versions/501961cfcd85_remove_link_between_v1_and_v2_tasks_.py @@ -0,0 +1,97 @@ +"""Remove link between v1 and v2 tasks/workflowtasks tables + +Revision ID: d9a140db5d42 +Revises: 5bf02391cfef +Create Date: 2024-09-09 14:15:34.415926 + +""" +import sqlalchemy as sa +from alembic import op + +from fractal_server.migrations.naming_convention import NAMING_CONVENTION + +# revision identifiers, used by Alembic. +revision = "d9a140db5d42" +down_revision = "091b01f51f88" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + + with op.batch_alter_table("workflowtaskv2") as batch_op: + batch_op.alter_column( + "task_id", existing_type=sa.INTEGER(), nullable=False + ) + + # NOTE: in sqlite, this `drop_constraint` only works if + # `batch_alter_table` has a `naming_convention` set. Ref + # https://alembic.sqlalchemy.org/en/latest/batch.html#dropping-unnamed-or-named-foreign-key-constraints + with op.batch_alter_table( + "workflowtaskv2", naming_convention=NAMING_CONVENTION + ) as batch_op: + batch_op.drop_constraint( + "fk_workflowtaskv2_task_legacy_id_task", type_="foreignkey" + ) + + # NOTE: in sqlite, the `drop_index` command fails if the existing table + # has zero rows, while it succeeds if there are already some rows + if op.get_bind().dialect.name == "sqlite": + import sqlite3 + import logging + + logger = logging.getLogger("alembic.runtime.migration") + logger.warning( + f"Using sqlite, with {sqlite3.version=} and " + f"{sqlite3.sqlite_version=}" + ) + logger.warning("Now drop index 'idx_workflowtaskv2_task_legacy_id'") + try: + with op.batch_alter_table("workflowtaskv2") as batch_op: + batch_op.drop_index("idx_workflowtaskv2_task_legacy_id") + except sa.exc.OperationalError: + logger.warning( + "Could not drop index; " + "this is expected, when the database is empty." + ) + logger.warning("Continue.") + + with op.batch_alter_table( + "workflowtaskv2", schema=None, naming_convention=NAMING_CONVENTION + ) as batch_op: + batch_op.drop_column("is_legacy_task") + batch_op.drop_column("task_legacy_id") + + with op.batch_alter_table("task") as batch_op: + batch_op.drop_column("is_v2_compatible") + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("task", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "is_v2_compatible", + sa.BOOLEAN(), + server_default=sa.text("(false)"), + nullable=False, + ) + ) + with op.batch_alter_table("workflowtaskv2", schema=None) as batch_op: + batch_op.add_column( + sa.Column("task_legacy_id", sa.INTEGER(), nullable=True) + ) + batch_op.add_column( + sa.Column("is_legacy_task", sa.BOOLEAN(), nullable=False) + ) + batch_op.create_foreign_key( + "fk_workflowtaskv2_task_legacy_id_task", + "task", + ["task_legacy_id"], + ["id"], + ) + batch_op.alter_column( + "task_id", existing_type=sa.INTEGER(), nullable=True + ) + + # ### end Alembic commands ### diff --git a/scripts/validate_db_data_with_read_schemas.py b/scripts/validate_db_data_with_read_schemas.py index 97018d7348..0c2f7aa049 100644 --- a/scripts/validate_db_data_with_read_schemas.py +++ b/scripts/validate_db_data_with_read_schemas.py @@ -31,7 +31,6 @@ from fractal_server.app.schemas.v2 import DatasetReadV2 from fractal_server.app.schemas.v2 import JobReadV2 from fractal_server.app.schemas.v2 import ProjectReadV2 -from fractal_server.app.schemas.v2 import TaskLegacyReadV2 from fractal_server.app.schemas.v2 import TaskReadV2 from fractal_server.app.schemas.v2 import WorkflowReadV2 from fractal_server.app.schemas.v2 import WorkflowTaskReadV2 @@ -177,22 +176,12 @@ # validate task_list task_list = [] for wftask in workflow.task_list: - if wftask.is_legacy_task is True: - task_list.append( - WorkflowTaskReadV2( - **wftask.model_dump(), - task_legacy=TaskLegacyReadV2( - **wftask.task.model_dump() - ), - ) - ) - else: - task_list.append( - WorkflowTaskReadV2( - **wftask.model_dump(), - task=TaskReadV2(**wftask.task.model_dump()), - ) + task_list.append( + WorkflowTaskReadV2( + **wftask.model_dump(), + task=TaskReadV2(**wftask.task.model_dump()), ) + ) WorkflowReadV2( **workflow.model_dump(), diff --git a/tests/data/example_server_startup/cleanup.sh b/tests/data/example_server_startup/cleanup.sh index 7368e464fe..131f9e9d81 100755 --- a/tests/data/example_server_startup/cleanup.sh +++ b/tests/data/example_server_startup/cleanup.sh @@ -4,3 +4,4 @@ rm test.db rm -r Tasks rm -r Artifacts rm logs* +rm test.db-shm test.db-wal diff --git a/tests/data/import_export/workflow-v2.json b/tests/data/import_export/workflow-v2.json index d7a0855f5b..f469bcaa5d 100644 --- a/tests/data/import_export/workflow-v2.json +++ b/tests/data/import_export/workflow-v2.json @@ -13,22 +13,9 @@ "meta_parallel": {"key1": "value1"}, "meta_non_parallel": {"key2": "value2"}, "order": 0, - "is_legacy_task": false, "task": { "source": "PKG_SOURCE:dummy2" } - }, - { - "args_parallel": { - "message": "task", - "arg": "value" - }, - "meta_parallel": {"key": "value"}, - "order": 1, - "is_legacy_task": true, - "task_legacy": { - "source": "PKG_SOURCE:dummy1" - } } ] } diff --git a/tests/fixtures_server_v2.py b/tests/fixtures_server_v2.py index 114d1b8c08..4fba6d62f6 100644 --- a/tests/fixtures_server_v2.py +++ b/tests/fixtures_server_v2.py @@ -242,7 +242,6 @@ async def __workflowtask_factory( defaults = dict( workflow_id=workflow_id, task_id=task_id, - is_legacy_task=False, task_type=task.type, ) args = dict(**defaults) diff --git a/tests/no_version/test_unit_config.py b/tests/no_version/test_unit_config.py index 5650159bf4..f0de848655 100644 --- a/tests/no_version/test_unit_config.py +++ b/tests/no_version/test_unit_config.py @@ -193,6 +193,8 @@ def test_settings_check( settings_dict: dict[str, str], raises: bool, testdata_path: Path ): + debug(settings_dict, raises) + # Workaround to set FRACTAL_SLURM_CONFIG_FILE to a valid path, which # requires the value of testdata_path if ( diff --git a/tests/v1/02_schemas/test_unit_schemas.py b/tests/v1/02_schemas/test_unit_schemas.py index e7f5b4d6ea..de512089cc 100644 --- a/tests/v1/02_schemas/test_unit_schemas.py +++ b/tests/v1/02_schemas/test_unit_schemas.py @@ -507,7 +507,6 @@ def test_workflow_read_non_empty_task_list(): input_type="input_type", output_type="output_type", meta=dict(something="else"), - is_v2_compatible=False, ) # Create two WorkflowTaskRead wft1 = WorkflowTaskReadV1(id=1, task_id=1, workflow_id=1, task=t1) diff --git a/tests/v1/03_db/test_unit_db_models.py b/tests/v1/03_db/test_unit_db_models.py index 2f8b063beb..afe0dd16b8 100644 --- a/tests/v1/03_db/test_unit_db_models.py +++ b/tests/v1/03_db/test_unit_db_models.py @@ -70,7 +70,6 @@ async def test_tasks(db): assert db_task.args_schema_version is None assert db_task.docs_info is None assert db_task.docs_link is None - assert db_task.is_v2_compatible is False # `Task.source` has unique constraint broken_task = Task(**args) # == task1 diff --git a/tests/v2/01_schemas/test_unit_schemas_v2.py b/tests/v2/01_schemas/test_unit_schemas_v2.py index 5e773de71c..d80f1eee8f 100644 --- a/tests/v2/01_schemas/test_unit_schemas_v2.py +++ b/tests/v2/01_schemas/test_unit_schemas_v2.py @@ -6,6 +6,7 @@ from fractal_server.app.schemas.v2 import ProjectCreateV2 from fractal_server.app.schemas.v2 import TaskCollectPipV2 from fractal_server.app.schemas.v2 import TaskCreateV2 +from fractal_server.app.schemas.v2 import TaskDumpV2 from fractal_server.app.schemas.v2 import TaskUpdateV2 from fractal_server.app.schemas.v2 import WorkflowCreateV2 from fractal_server.app.schemas.v2 import WorkflowTaskCreateV2 @@ -48,17 +49,6 @@ def test_extra_on_create_models(): WorkflowTaskCreateV2(foo="bar") -def test_validate_legacy_task(): - - WorkflowTaskCreateV2(meta_non_parallel={"a": "b"}) - with pytest.raises(ValidationError): - WorkflowTaskCreateV2(is_legacy_task=True, meta_non_parallel={"a": "b"}) - - WorkflowTaskCreateV2(args_non_parallel={"a": "b"}) - with pytest.raises(ValidationError): - WorkflowTaskCreateV2(is_legacy_task=True, args_non_parallel={"a": "b"}) - - def test_dictionary_keys_validation(): args = dict( name="name", @@ -125,23 +115,14 @@ def test_workflow_task_dump(): WorkflowTaskDumpV2( id=1, workflow_id=1, - is_legacy_task=False, input_filters=Filters(), task_id=1, - ) - with pytest.raises(ValidationError, match="both"): - WorkflowTaskDumpV2( - id=1, - workflow_id=1, - is_legacy_task=False, - input_filters=Filters(), - task_id=1, - task_legacy_id=1, - ) - with pytest.raises(ValidationError, match="none"): - WorkflowTaskDumpV2( + task=TaskDumpV2( id=1, - workflow_id=1, - is_legacy_task=False, - input_filters=Filters(), - ) + name="name", + type="type", + source="source", + input_types={}, + output_types={}, + ), + ) diff --git a/tests/v2/03_api/test_api_admin.py b/tests/v2/03_api/test_api_admin.py index 6a8f7c1969..6361f3ec34 100644 --- a/tests/v2/03_api/test_api_admin.py +++ b/tests/v2/03_api/test_api_admin.py @@ -486,42 +486,6 @@ async def test_download_job_logs( assert LOG_CONTENT in actual_logs -async def test_flag_task_v2_compatible( - db, - client, - MockCurrentUser, - task_factory, -): - task = await task_factory() - assert task.is_v2_compatible is False - - async with MockCurrentUser(user_kwargs={"is_superuser": True}): - - for _ in range(2): - res = await client.patch( - f"{PREFIX}/task-v1/{task.id}/", - json=dict(is_v2_compatible=True), - ) - assert res.status_code == 200 - await db.refresh(task) - assert task.is_v2_compatible is True - - for _ in range(2): - res = await client.patch( - f"{PREFIX}/task-v1/{task.id}/", - json=dict(is_v2_compatible=False), - ) - assert res.status_code == 200 - await db.refresh(task) - assert task.is_v2_compatible is False - - res = await client.patch( - f"{PREFIX}/task-v1/{task.id + 100}/", - json=dict(is_v2_compatible=True), - ) - assert res.status_code == 404 - - async def test_task_query( db, client, diff --git a/tests/v2/03_api/test_api_task.py b/tests/v2/03_api/test_api_task.py index 933156cac3..fd3de9228b 100644 --- a/tests/v2/03_api/test_api_task.py +++ b/tests/v2/03_api/test_api_task.py @@ -538,79 +538,3 @@ async def test_post_same_source(client, MockCurrentUser): # POST v1_c OK res = await client.post(V1, json=task_v1_c.dict(exclude_unset=True)) assert res.status_code == 201 - - -async def test_get_legacy_task( - task_factory, task_factory_v2, client, MockCurrentUser -): - async with MockCurrentUser(): - - # Add 3 task V2 - await task_factory_v2(source="a") - await task_factory_v2(source="b") - await task_factory_v2(source="c") - - # Assert - res = await client.get("/api/v1/task/") - assert len(res.json()) == 0 - res = await client.get("/api/v2/task/") - assert len(res.json()) == 3 - res = await client.get("/api/v2/task-legacy/") - assert len(res.json()) == 0 - res = await client.get("/api/v2/task-legacy/?only_v2_available=True") - assert len(res.json()) == 0 - - # Add 2 task V1, not v2-compatibles - await task_factory(source="d") - await task_factory(source="e") - await task_factory(source="f") - - # Assert - res = await client.get("/api/v1/task/") - assert len(res.json()) == 3 - for task in res.json(): - assert "is_v2_compatible" not in task - - res = await client.get("/api/v2/task/") - assert len(res.json()) == 3 - - res = await client.get("/api/v2/task-legacy/") - assert len(res.json()) == 3 - for task in res.json(): - assert task["is_v2_compatible"] is False - - res = await client.get("/api/v2/task-legacy/?only_v2_compatible=True") - assert len(res.json()) == 0 - - # Add 2 task V1 and v2-compatibles - await task_factory(source="g", is_v2_compatible=True) - await task_factory( - source="h", is_v2_compatible=True, args_schema={"args": "schema"} - ) - - # Assert - res = await client.get("/api/v1/task/") - assert len(res.json()) == 5 - for task in res.json(): - assert "is_v2_compatible" not in task - - res = await client.get("/api/v2/task/") - assert len(res.json()) == 3 - - res = await client.get("/api/v2/task-legacy/") - assert len(res.json()) == 5 - for task in res.json(): - assert "is_v2_compatible" in task - - res = await client.get( - "/api/v2/task-legacy/?only_v2_compatible=True&args_schema=False" - ) - assert len(res.json()) == 2 - for task in res.json(): - assert task["is_v2_compatible"] is True - assert task["args_schema"] is None - res = await client.get(f"/api/v2/task-legacy/{task['id']}/") - assert res.json()["is_v2_compatible"] is True - - res = await client.get("/api/v2/task-legacy/9999999/") - assert res.status_code == 404 diff --git a/tests/v2/03_api/test_api_workflow.py b/tests/v2/03_api/test_api_workflow.py index dd6157040f..8c4ebe2c24 100644 --- a/tests/v2/03_api/test_api_workflow.py +++ b/tests/v2/03_api/test_api_workflow.py @@ -338,8 +338,6 @@ async def test_post_worfkflow_task( workflow = await get_workflow(client, project.id, wf_id) task_list = workflow["task_list"] assert len(task_list) == 4 - for wftask in task_list: - assert wftask["is_legacy_task"] is False assert task_list[0]["task"]["name"] == "task0" assert task_list[1]["task"]["name"] == "task0b" assert task_list[2]["task"]["name"] == "task1" @@ -688,80 +686,6 @@ async def test_patch_workflow_task( assert "Cannot patch" in res.json()["detail"] -async def test_patch_workflow_task_legacy( - MockCurrentUser, - project_factory_v2, - workflow_factory_v2, - task_factory, - client, -): - """ - GIVEN a WorkflowTask pointing to a legacy task - WHEN the endpoint to PATCH a WorkflowTask is called - THEN the WorkflowTask is updated - - In this test, we also include null non-parallel attributes in the PATCH - request body. They are not necessary or relevant, but we need to make - sure that the endpoint does not fail in this case. - """ - async with MockCurrentUser(user_kwargs=dict(is_verified=True)) as user: - # Create task - legacy_task = await task_factory(is_v2_compatible=True) - - # Create project and workflow - proj = await project_factory_v2(user) - wf = await workflow_factory_v2(project_id=proj.id) - - # Add legacy task to workflow - res = await client.post( - f"{PREFIX}/project/{proj.id}/workflow/{wf.id}/wftask/" - f"?task_id={legacy_task.id}", - json={"is_legacy_task": True}, - ) - assert res.status_code == 201 - wf_task_id = res.json()["id"] - - res = await client.post( - f"{PREFIX}/project/{proj.id}/workflow/{wf.id}/wftask/" - "?task_id=123456789", - json={"is_legacy_task": True}, - ) - assert res.status_code == 404 - - workflow = await get_workflow(client, proj.id, wf.id) - - assert workflow["task_list"][0]["args_parallel"] is None - assert workflow["task_list"][0]["args_non_parallel"] is None - assert workflow["task_list"][0]["meta_non_parallel"] is None - assert workflow["task_list"][0]["meta_parallel"] is None - - ARGS = {"c": 333, "d": 444} - META = {"executor": "cpu-low"} - INPUT_FILTERS = { - "attributes": {"a": "b", "c": "d"}, - "types": {"e": True, "f": False, "g": True}, - } - - patch_payload = dict( - args_non_parallel=None, - meta_non_parallel=None, - args_parallel=ARGS, - meta_parallel=META, - input_filters=INPUT_FILTERS, - ) - res = await client.patch( - f"{PREFIX}/project/{proj.id}/workflow/{wf.id}/" - f"wftask/{wf_task_id}/", - json=patch_payload, - ) - debug(res.json()) - assert res.status_code == 200 - - assert res.json()["args_parallel"] == ARGS - assert res.json()["meta_parallel"] == META - assert res.json()["input_filters"] == INPUT_FILTERS - - async def test_patch_workflow_task_with_args_schema( client, MockCurrentUser, project_factory_v2, task_factory_v2 ): @@ -1193,12 +1117,8 @@ async def test_import_export_workflow( for wftask in workflow_exported["task_list"]: assert "id" not in wftask assert "task_id" not in wftask - assert "task_legacy_id" not in wftask assert "workflow_id" not in wftask - if wftask["is_legacy_task"]: - assert "id" not in wftask["task_legacy"] - else: - assert "id" not in wftask["task"] + assert "id" not in wftask["task"] assert res.status_code == 200 # Check that the exported workflow is an extension of the one in the @@ -1222,56 +1142,6 @@ async def test_import_export_workflow( assert task_old == task_new -async def test_task_legacy_is_v2_compatible( - MockCurrentUser, - client, - project_factory_v2, - workflow_factory_v2, - task_factory, - task_factory_v2, -): - # Create tasks - task_v1_compatible = await task_factory( - id=1, name="task_v1-c", source="source1", is_v2_compatible=True - ) - task_v1_not_compatible = await task_factory( - id=2, name="task_v1-good", source="source2", is_v2_compatible=False - ) - task_v2 = await task_factory_v2(id=3, name="task_v2", source="source3") - assert task_v1_compatible.is_v2_compatible is True - assert task_v1_not_compatible.is_v2_compatible is False - - async with MockCurrentUser() as user: - - project = await project_factory_v2(user) - workflow = await workflow_factory_v2(project_id=project.id) - - # Fail adding TaskV1 with is_v2_compatible==False - res = await client.post( - f"{PREFIX}/project/{project.id}/workflow/{workflow.id}/wftask/" - f"?task_id={task_v1_not_compatible.id}", - json=dict(is_legacy_task=True), - ) - assert res.status_code == 422 - assert "compatible" in res.json()["detail"] - - # Succeed adding TaskV1 with is_v2_compatible==True - res = await client.post( - f"{PREFIX}/project/{project.id}/workflow/{workflow.id}/wftask/" - f"?task_id={task_v1_compatible.id}", - json=dict(is_legacy_task=True), - ) - assert res.status_code == 201 - - # Succeed adding TaskV2 - res = await client.post( - f"{PREFIX}/project/{project.id}/workflow/{workflow.id}/wftask/" - f"?task_id={task_v2.id}", - json=dict(), - ) - assert res.status_code == 201 - - async def test_export_workflow_log( client, MockCurrentUser, diff --git a/tests/v2/03_api/test_unit_aux_functions.py b/tests/v2/03_api/test_unit_aux_functions.py index fd6d4feb6d..1250eb0d4e 100644 --- a/tests/v2/03_api/test_unit_aux_functions.py +++ b/tests/v2/03_api/test_unit_aux_functions.py @@ -270,6 +270,11 @@ async def test_get_job_check_owner( workflow = await workflow_factory_v2(project_id=project.id) t = await task_factory_v2() + + with pytest.raises(ValueError): + await _workflow_insert_task( + workflow_id=workflow.id, task_id=9999, db=db + ) await _workflow_insert_task( workflow_id=workflow.id, task_id=t.id, db=db ) diff --git a/tests/v2/04_runner/fixtures_mocks.py b/tests/v2/04_runner/fixtures_mocks.py index 5544067ff6..3f3075fd1d 100644 --- a/tests/v2/04_runner/fixtures_mocks.py +++ b/tests/v2/04_runner/fixtures_mocks.py @@ -1,8 +1,4 @@ -import json import logging -import os -import sys -from pathlib import Path import pytest from v2_mock_models import TaskV2Mock @@ -47,62 +43,3 @@ def fractal_tasks_mock_no_db( task.name: TaskV2Mock(id=_id, **task.dict()) for _id, task in enumerate(fractal_tasks_mock_collection["task_list"]) } - - -@pytest.fixture -def fractal_tasks_mock_venv_legacy(testdata_path, tmp_path_factory) -> dict: - from v2_mock_models import TaskV1Mock - - basetemp = tmp_path_factory.getbasetemp() - venv_name = "venv_fractal_tasks_core_alpha" - venv_path = (basetemp / venv_name).as_posix() - python_bin = (basetemp / venv_name / "bin/python").as_posix() - - if not os.path.isdir(venv_path): - logging.debug(f"venv does not exists ({venv_path=})") - # Create venv - cmd = f"{sys.executable} -m venv {venv_path}" - _run_cmd(cmd=cmd, label="create-venv") - # Install fractal-tasks-core-alpha from wheel - wheel_file = ( - testdata_path - / "dummy_package_with_args_schemas" - / "dist/fractal_tasks_core_alpha-0.0.1a0-py3-none-any.whl" - ).as_posix() - cmd = f"{python_bin} -m pip install {wheel_file}" - _run_cmd(cmd=cmd, label="install-fractal-tasks-core-alpha") - else: - logging.info("venv already exists") - - # Extract installed-package folder - cmd = f"{python_bin} -m pip show fractal_tasks_core_alpha" - out = _run_cmd(cmd=cmd, label="extract-pkg-dir") - location = next( - line for line in out.split("\n") if line.startswith("Location:") - ) - location = location.replace("Location: ", "") - src_dir = Path(location) / "fractal_tasks_core_alpha/" - - # Construct TaskV1Mock objects, and store them as a key-value pairs - # (indexed by their names) - with (src_dir / "__FRACTAL_MANIFEST__.json").open("r") as f: - manifest = json.load(f) - task_dict = {} - for ind, task in enumerate(manifest["task_list"]): - - more_attributes = {} - if task["name"] == "dummy parallel": - more_attributes["meta"] = {"parallelization_level": "image"} - - task_path = (src_dir / task["executable"]).as_posix() - t = TaskV1Mock( - id=ind, - name=task["name"], - source=task["name"].replace(" ", "_"), - input_type="Any", - output_type="Any", - command=f"{python_bin} {task_path}", - **more_attributes, - ) - task_dict[t.name] = t - return task_dict diff --git a/tests/v2/04_runner/test_dummy_examples.py b/tests/v2/04_runner/test_dummy_examples.py index 7c2122180b..b9f493f283 100644 --- a/tests/v2/04_runner/test_dummy_examples.py +++ b/tests/v2/04_runner/test_dummy_examples.py @@ -50,6 +50,9 @@ def test_dummy_insert_single_image( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["dummy_insert_single_image"], + task_id=fractal_tasks_mock_no_db[ + "dummy_insert_single_image" + ].id, id=0, order=0, ) @@ -64,6 +67,9 @@ def test_dummy_insert_single_image( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["dummy_insert_single_image"], + task_id=fractal_tasks_mock_no_db[ + "dummy_insert_single_image" + ].id, id=1, order=1, ) @@ -80,6 +86,9 @@ def test_dummy_insert_single_image( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["dummy_insert_single_image"], + task_id=fractal_tasks_mock_no_db[ + "dummy_insert_single_image" + ].id, args_non_parallel={ "full_new_image": dict( zarr_url=IMAGES[0]["zarr_url"], origin="/somewhere" @@ -116,6 +125,7 @@ def test_dummy_insert_single_image( wf_task_list=[ WorkflowTaskV2Mock( task=PATCHED_TASK, + task_id=PATCHED_TASK.id, args_non_parallel={"types": {KEY: True}}, id=2, order=2, @@ -140,6 +150,9 @@ def test_dummy_insert_single_image( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["dummy_insert_single_image"], + task_id=fractal_tasks_mock_no_db[ + "dummy_insert_single_image" + ].id, args_non_parallel={"fail": True}, id=2, order=2, @@ -158,6 +171,9 @@ def test_dummy_insert_single_image( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["dummy_insert_single_image"], + task_id=fractal_tasks_mock_no_db[ + "dummy_insert_single_image" + ].id, args_non_parallel={"fail_2": True}, id=3, order=3, @@ -194,6 +210,7 @@ def test_dummy_remove_images( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["dummy_remove_images"], + task_id=fractal_tasks_mock_no_db["dummy_remove_images"].id, id=0, order=0, ) @@ -213,6 +230,7 @@ def test_dummy_remove_images( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["dummy_remove_images"], + task_id=fractal_tasks_mock_no_db["dummy_remove_images"].id, id=1, order=1, args_non_parallel=dict( @@ -257,6 +275,7 @@ def test_dummy_unset_attribute( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["dummy_unset_attribute"], + task_id=fractal_tasks_mock_no_db["dummy_unset_attribute"].id, args_non_parallel=dict(attribute="key2"), id=0, order=0, @@ -273,6 +292,7 @@ def test_dummy_unset_attribute( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["dummy_unset_attribute"], + task_id=fractal_tasks_mock_no_db["dummy_unset_attribute"].id, args_non_parallel=dict(attribute="missing-attribute"), id=1, order=1, @@ -304,6 +324,9 @@ def test_dummy_insert_single_image_none_attribute( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["dummy_insert_single_image"], + task_id=fractal_tasks_mock_no_db[ + "dummy_insert_single_image" + ].id, args_non_parallel=dict(attributes={"attribute-name": None}), id=0, order=0, @@ -334,6 +357,9 @@ def test_dummy_insert_single_image_normalization( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["dummy_insert_single_image"], + task_id=fractal_tasks_mock_no_db[ + "dummy_insert_single_image" + ].id, id=0, order=0, args_non_parallel={"trailing_slash": True}, @@ -372,6 +398,7 @@ def test_default_inclusion_of_images( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["generic_task_parallel"], + task_id=fractal_tasks_mock_no_db["generic_task_parallel"].id, id=0, order=0, ) diff --git a/tests/v2/04_runner/test_fractal_examples.py b/tests/v2/04_runner/test_fractal_examples.py index 71abd604c2..ac90971130 100644 --- a/tests/v2/04_runner/test_fractal_examples.py +++ b/tests/v2/04_runner/test_fractal_examples.py @@ -14,7 +14,6 @@ from fractal_server.app.runner.exceptions import JobExecutionError from fractal_server.images import SingleImage from fractal_server.images.tools import find_image_by_zarr_url -from fractal_server.logger import set_logger def execute_tasks_v2(wf_task_list, workflow_dir_local, **kwargs): @@ -24,14 +23,9 @@ def execute_tasks_v2(wf_task_list, workflow_dir_local, **kwargs): ) for wftask in wf_task_list: - if wftask.task is not None: - subfolder = workflow_dir_local / task_subfolder_name( - order=wftask.order, task_name=wftask.task.name - ) - else: - subfolder = workflow_dir_local / task_subfolder_name( - order=wftask.order, task_name=wftask.task_legacy.name - ) + subfolder = workflow_dir_local / task_subfolder_name( + order=wftask.order, task_name=wftask.task.name + ) logging.info(f"Now creating {subfolder.as_posix()}") subfolder.mkdir(parents=True) @@ -85,6 +79,9 @@ def test_fractal_demos_01( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["create_ome_zarr_compound"], + task_id=fractal_tasks_mock_no_db[ + "create_ome_zarr_compound" + ].id, args_non_parallel=dict(image_dir="/tmp/input_images"), args_parallel={}, id=0, @@ -107,6 +104,7 @@ def test_fractal_demos_01( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["illumination_correction"], + task_id=fractal_tasks_mock_no_db["illumination_correction"].id, args_parallel=dict(overwrite_input=True), id=1, order=1, @@ -152,6 +150,7 @@ def test_fractal_demos_01( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["MIP_compound"], + task_id=fractal_tasks_mock_no_db["MIP_compound"].id, args_non_parallel=dict(suffix="mip"), args_parallel={}, id=2, @@ -196,6 +195,7 @@ def test_fractal_demos_01( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["cellpose_segmentation"], + task_id=fractal_tasks_mock_no_db["cellpose_segmentation"].id, args_parallel={}, id=3, order=3, @@ -234,6 +234,9 @@ def test_fractal_demos_01_no_overwrite( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["create_ome_zarr_compound"], + task_id=fractal_tasks_mock_no_db[ + "create_ome_zarr_compound" + ].id, args_non_parallel=dict(image_dir="/tmp/input_images"), id=0, order=0, @@ -253,6 +256,7 @@ def test_fractal_demos_01_no_overwrite( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["illumination_correction"], + task_id=fractal_tasks_mock_no_db["illumination_correction"].id, args_parallel=dict(overwrite_input=False), id=1, order=1, @@ -329,6 +333,7 @@ def test_fractal_demos_01_no_overwrite( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["MIP_compound"], + task_id=fractal_tasks_mock_no_db["MIP_compound"].id, args_non_parallel=dict(suffix="mip"), id=2, order=2, @@ -394,6 +399,7 @@ def test_fractal_demos_01_no_overwrite( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["cellpose_segmentation"], + task_id=fractal_tasks_mock_no_db["cellpose_segmentation"].id, id=3, order=3, ) @@ -428,6 +434,9 @@ def test_registration_no_overwrite( task=fractal_tasks_mock_no_db[ "create_ome_zarr_multiplex_compound" ], + task_id=fractal_tasks_mock_no_db[ + "create_ome_zarr_multiplex_compound" + ].id, args_non_parallel=dict(image_dir="/tmp/input_images"), id=0, order=0, @@ -444,6 +453,9 @@ def test_registration_no_overwrite( task=fractal_tasks_mock_no_db[ "calculate_registration_compound" ], + task_id=fractal_tasks_mock_no_db[ + "calculate_registration_compound" + ].id, args_non_parallel={"ref_acquisition": 0}, id=1, order=1, @@ -469,6 +481,9 @@ def test_registration_no_overwrite( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["find_registration_consensus"], + task_id=fractal_tasks_mock_no_db[ + "find_registration_consensus" + ].id, id=2, order=2, ) @@ -491,6 +506,9 @@ def test_registration_no_overwrite( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["apply_registration_to_image"], + task_id=fractal_tasks_mock_no_db[ + "apply_registration_to_image" + ].id, args_parallel={"overwrite_input": False}, id=3, order=3, @@ -526,6 +544,9 @@ def test_registration_overwrite( task=fractal_tasks_mock_no_db[ "create_ome_zarr_multiplex_compound" ], + task_id=fractal_tasks_mock_no_db[ + "create_ome_zarr_multiplex_compound" + ].id, args_non_parallel=dict(image_dir="/tmp/input_images"), id=0, order=0, @@ -542,6 +563,9 @@ def test_registration_overwrite( task=fractal_tasks_mock_no_db[ "calculate_registration_compound" ], + task_id=fractal_tasks_mock_no_db[ + "calculate_registration_compound" + ].id, args_non_parallel={"ref_acquisition": 0}, order=1, id=1, @@ -567,6 +591,9 @@ def test_registration_overwrite( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["find_registration_consensus"], + task_id=fractal_tasks_mock_no_db[ + "find_registration_consensus" + ].id, id=2, order=2, ) @@ -589,6 +616,9 @@ def test_registration_overwrite( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["apply_registration_to_image"], + task_id=fractal_tasks_mock_no_db[ + "apply_registration_to_image" + ].id, args_parallel={"overwrite_input": True}, id=3, order=3, @@ -621,6 +651,9 @@ def test_channel_parallelization_with_overwrite( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["create_ome_zarr_compound"], + task_id=fractal_tasks_mock_no_db[ + "create_ome_zarr_compound" + ].id, args_non_parallel=dict(image_dir="/tmp/input_images"), id=0, order=0, @@ -637,6 +670,9 @@ def test_channel_parallelization_with_overwrite( task=fractal_tasks_mock_no_db[ "illumination_correction_compound" ], + task_id=fractal_tasks_mock_no_db[ + "illumination_correction_compound" + ].id, args_non_parallel=dict(overwrite_input=True), args_parallel=dict(another_argument="something"), id=1, @@ -668,6 +704,9 @@ def test_channel_parallelization_no_overwrite( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["create_ome_zarr_compound"], + task_id=fractal_tasks_mock_no_db[ + "create_ome_zarr_compound" + ].id, args_non_parallel=dict(image_dir="/tmp/input_images"), id=0, order=0, @@ -684,6 +723,9 @@ def test_channel_parallelization_no_overwrite( task=fractal_tasks_mock_no_db[ "illumination_correction_compound" ], + task_id=fractal_tasks_mock_no_db[ + "illumination_correction_compound" + ].id, args_non_parallel=dict(overwrite_input=False), args_parallel=dict(another_argument="something"), id=1, @@ -730,6 +772,7 @@ def test_invalid_filtered_image_list( type="non_parallel", input_types=dict(invalid=True), ), + task_id=0, id=0, order=0, ) @@ -740,78 +783,3 @@ def test_invalid_filtered_image_list( **execute_tasks_v2_args, ) assert "Invalid filtered image list" in str(e.value) - - -def test_legacy_task( - tmp_path: Path, - executor: Executor, - fractal_tasks_mock_no_db, - fractal_tasks_mock_venv_legacy, - override_settings_factory, -): - """ - Run a workflow that includes V2 and V1 tasks. - """ - - # Set up logger - override_settings_factory(FRACTAL_LOGGING_LEVEL=logging.INFO) - logger_name = "test_legacy_task" - log_file_path = (tmp_path / "log").as_posix() - set_logger( - logger_name=logger_name, - log_file_path=log_file_path, - ) - - execute_tasks_v2_args = dict( - executor=executor, - workflow_dir_local=tmp_path / "job_dir", - workflow_dir_remote=tmp_path / "job_dir", - logger_name=logger_name, - ) - - zarr_dir = (tmp_path / "zarr_dir").as_posix().rstrip("/") - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["create_ome_zarr_compound"], - args_non_parallel=dict(image_dir="/tmp/input_images"), - id=0, - order=0, - ) - ], - dataset=DatasetV2Mock(name="dataset", zarr_dir=zarr_dir), - **execute_tasks_v2_args, - ) - - assert _task_names_from_history(dataset_attrs["history"]) == [ - "create_ome_zarr_compound" - ] - assert dataset_attrs["filters"]["attributes"] == {} - assert dataset_attrs["filters"]["types"] == {} - _assert_image_data_exist(dataset_attrs["images"]) - assert len(dataset_attrs["images"]) == 2 - - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task_legacy=fractal_tasks_mock_venv_legacy["dummy parallel"], - is_legacy_task=True, - id=1, - order=1, - ) - ], - dataset=DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, **dataset_attrs - ), - **execute_tasks_v2_args, - ) - - assert len(dataset_attrs["history"]) == 1 - assert dataset_attrs["history"][0]["status"] == "done" - assert dataset_attrs["history"][0]["workflowtask"]["is_legacy_task"] - - # Check logs - with open(log_file_path, "r") as f: - lines = f.read() - assert 'SUBMIT 1-th task (legacy, name="dummy parallel")' in lines - assert 'END 1-th task (name="dummy parallel")' in lines diff --git a/tests/v2/04_runner/test_no_images_parallelization.py b/tests/v2/04_runner/test_no_images_parallelization.py index 8d9d3a3866..b744176cc6 100644 --- a/tests/v2/04_runner/test_no_images_parallelization.py +++ b/tests/v2/04_runner/test_no_images_parallelization.py @@ -15,14 +15,9 @@ def execute_tasks_v2(wf_task_list, workflow_dir_local, **kwargs): ) for wftask in wf_task_list: - if wftask.task is not None: - subfolder = workflow_dir_local / task_subfolder_name( - order=wftask.order, task_name=wftask.task.name - ) - else: - subfolder = workflow_dir_local / task_subfolder_name( - order=wftask.order, task_name=wftask.task_legacy.name - ) + subfolder = workflow_dir_local / task_subfolder_name( + order=wftask.order, task_name=wftask.task.name + ) logging.info(f"Now creating {subfolder.as_posix()}") subfolder.mkdir(parents=True) @@ -57,6 +52,7 @@ def test_parallelize_on_no_images(tmp_path: Path, executor: Executor): id=0, source="source", ), + task_id=0, id=0, order=0, ) @@ -91,6 +87,7 @@ def test_parallelize_on_no_images_compound(tmp_path: Path, executor: Executor): id=0, source="source", ), + task_id=0, id=0, order=0, ) diff --git a/tests/v2/04_runner/test_unit_aux_functions_v2.py b/tests/v2/04_runner/test_unit_aux_functions_v2.py index f7fd33de1f..f3407fe560 100644 --- a/tests/v2/04_runner/test_unit_aux_functions_v2.py +++ b/tests/v2/04_runner/test_unit_aux_functions_v2.py @@ -1,5 +1,3 @@ -from pathlib import Path - import pytest from devtools import debug from pydantic import ValidationError @@ -14,7 +12,6 @@ ) from fractal_server.app.runner.v2.task_interface import InitArgsModel from fractal_server.app.runner.v2.task_interface import TaskOutput -from fractal_server.app.runner.v2.v1_compat import convert_v2_args_into_v1 def test_deduplicate_list_of_dicts(): @@ -69,58 +66,6 @@ def test_task_output(): TaskOutput(image_list_removals=["http://url.json"]) -def test_convert_v2_args_into_v1(tmp_path: Path): - kwargs_v2 = dict( - zarr_url=(tmp_path / "input_path/plate.zarr/B/03/0").as_posix(), - something="else", - metadata="this will be overwritten", - component="this will be overwritten", - ) - - # Image - kwargs_v1 = convert_v2_args_into_v1( - kwargs_v2, parallelization_level="image" - ) - PATH = (tmp_path / "input_path").as_posix() - assert kwargs_v1 == { - "input_paths": [PATH], - "output_path": PATH, - "metadata": {}, - "component": "plate.zarr/B/03/0", - "something": "else", - } - - # Well - kwargs_v1 = convert_v2_args_into_v1( - kwargs_v2, parallelization_level="well" - ) - PATH = (tmp_path / "input_path").as_posix() - assert kwargs_v1 == { - "input_paths": [PATH], - "output_path": PATH, - "metadata": {}, - "component": "plate.zarr/B/03", - "something": "else", - } - - # Plate - kwargs_v1 = convert_v2_args_into_v1( - kwargs_v2, parallelization_level="plate" - ) - PATH = (tmp_path / "input_path").as_posix() - assert kwargs_v1 == { - "input_paths": [PATH], - "output_path": PATH, - "metadata": {}, - "component": "plate.zarr", - "something": "else", - } - - # None - with pytest.raises(ValueError): - convert_v2_args_into_v1(kwargs_v2, parallelization_level=None) - - def test_cast_and_validate_functions(): _cast_and_validate_TaskOutput( diff --git a/tests/v2/04_runner/v2_mock_models.py b/tests/v2/04_runner/v2_mock_models.py index 21135ab763..9275e40d36 100644 --- a/tests/v2/04_runner/v2_mock_models.py +++ b/tests/v2/04_runner/v2_mock_models.py @@ -69,59 +69,19 @@ def _set_type(cls, value, values): return "compound" -class TaskV1Mock(BaseModel): - id: int - name: str - command: str # str - source: str = Field(unique=True) - input_type: str - output_type: str - meta: Optional[dict[str, Any]] = Field(default_factory=dict) - - @property - def parallelization_level(self) -> Optional[str]: - try: - return self.meta["parallelization_level"] - except KeyError: - return None - - @property - def is_parallel(self) -> bool: - return bool(self.parallelization_level) - - class WorkflowTaskV2Mock(BaseModel): args_non_parallel: dict[str, Any] = Field(default_factory=dict) args_parallel: dict[str, Any] = Field(default_factory=dict) meta_non_parallel: dict[str, Any] = Field(default_factory=dict) meta_parallel: dict[str, Any] = Field(default_factory=dict) - is_legacy_task: Optional[bool] meta_parallel: Optional[dict[str, Any]] = Field() meta_non_parallel: Optional[dict[str, Any]] = Field() - task: Optional[TaskV2Mock] = None - task_legacy: Optional[TaskV1Mock] = None - is_legacy_task: bool = False + task: TaskV2Mock input_filters: dict[str, Any] = Field(default_factory=dict) order: int id: int workflow_id: int = 0 - task_legacy_id: Optional[int] - task_id: Optional[int] - - @root_validator(pre=False) - def _legacy_or_not(cls, values): - is_legacy_task = values["is_legacy_task"] - task = values.get("task") - task_legacy = values.get("task_legacy") - if is_legacy_task: - if task_legacy is None or task is not None: - raise ValueError(f"Invalid WorkflowTaskV2Mock with {values=}") - values["task_legacy_id"] = task_legacy.id - else: - if task is None or task_legacy is not None: - raise ValueError(f"Invalid WorkflowTaskV2Mock with {values=}") - values["task_id"] = task.id - return values + task_id: int @validator("input_filters", always=True) def _default_filters(cls, value): diff --git a/tests/v2/08_full_workflow/common_functions.py b/tests/v2/08_full_workflow/common_functions.py index e61d3788f6..f764bf0283 100644 --- a/tests/v2/08_full_workflow/common_functions.py +++ b/tests/v2/08_full_workflow/common_functions.py @@ -395,7 +395,6 @@ async def failing_workflow_UnknownError( MockCurrentUser, client, monkeypatch, - legacy, project_factory_v2, dataset_factory_v2, workflow_factory_v2, @@ -423,18 +422,14 @@ async def failing_workflow_UnknownError( workflow_id = workflow.id # Create task - if legacy: - task = await task_factory(command="echo", is_v2_compatible=True) - else: - task = await task_factory_v2( - command_non_parallel="echo", type="non_parallel" - ) + task = await task_factory_v2( + command_non_parallel="echo", type="non_parallel" + ) - payload = dict(is_legacy_task=legacy) res = await client.post( f"{PREFIX}/project/{project_id}/workflow/{workflow_id}/wftask/" f"?task_id={task.id}", - json=payload, + json={}, ) assert res.status_code == 201 workflow_task_id = res.json()["id"] @@ -448,18 +443,11 @@ async def failing_workflow_UnknownError( def _raise_RuntimeError(*args, **kwargs): raise RuntimeError(ERROR_MSG) - if legacy: - monkeypatch.setattr( - fractal_server.app.runner.v2.runner, - "run_v1_task_parallel", - _raise_RuntimeError, - ) - else: - monkeypatch.setattr( - fractal_server.app.runner.v2.runner, - "run_v2_task_non_parallel", - _raise_RuntimeError, - ) + monkeypatch.setattr( + fractal_server.app.runner.v2.runner, + "run_v2_task_non_parallel", + _raise_RuntimeError, + ) # EXECUTE WORKFLOW res = await client.post( diff --git a/tests/v2/08_full_workflow/test_full_workflow_local_experimental_v2.py b/tests/v2/08_full_workflow/test_full_workflow_local_experimental_v2.py index 515700ca82..f4ad722352 100644 --- a/tests/v2/08_full_workflow/test_full_workflow_local_experimental_v2.py +++ b/tests/v2/08_full_workflow/test_full_workflow_local_experimental_v2.py @@ -1,4 +1,3 @@ -import pytest from common_functions import failing_workflow_UnknownError from common_functions import full_workflow from common_functions import full_workflow_TaskExecutionError @@ -97,9 +96,7 @@ async def test_non_executable_task_command_local( ) -@pytest.mark.parametrize("legacy", [False, True]) async def test_failing_workflow_UnknownError_local( - legacy: bool, client, MockCurrentUser, tmp777_path, @@ -124,7 +121,6 @@ async def test_failing_workflow_UnknownError_local( MockCurrentUser=MockCurrentUser, client=client, monkeypatch=monkeypatch, - legacy=legacy, project_factory_v2=project_factory_v2, dataset_factory_v2=dataset_factory_v2, workflow_factory_v2=workflow_factory_v2, diff --git a/tests/v2/08_full_workflow/test_full_workflow_local_v2.py b/tests/v2/08_full_workflow/test_full_workflow_local_v2.py index 1c2672e430..18d6293989 100644 --- a/tests/v2/08_full_workflow/test_full_workflow_local_v2.py +++ b/tests/v2/08_full_workflow/test_full_workflow_local_v2.py @@ -1,4 +1,3 @@ -import pytest from common_functions import failing_workflow_UnknownError from common_functions import full_workflow from common_functions import full_workflow_TaskExecutionError @@ -97,9 +96,7 @@ async def test_non_executable_task_command_local( ) -@pytest.mark.parametrize("legacy", [False, True]) async def test_failing_workflow_UnknownError_local( - legacy: bool, client, MockCurrentUser, tmp777_path, @@ -124,7 +121,6 @@ async def test_failing_workflow_UnknownError_local( MockCurrentUser=MockCurrentUser, client=client, monkeypatch=monkeypatch, - legacy=legacy, project_factory_v2=project_factory_v2, dataset_factory_v2=dataset_factory_v2, workflow_factory_v2=workflow_factory_v2, diff --git a/tests/v2/08_full_workflow/test_full_workflow_slurm_v2.py b/tests/v2/08_full_workflow/test_full_workflow_slurm_v2.py index 45ba68463d..1935e20826 100644 --- a/tests/v2/08_full_workflow/test_full_workflow_slurm_v2.py +++ b/tests/v2/08_full_workflow/test_full_workflow_slurm_v2.py @@ -1,7 +1,6 @@ import shlex import subprocess -import pytest from common_functions import failing_workflow_UnknownError from common_functions import full_workflow from common_functions import full_workflow_TaskExecutionError @@ -254,9 +253,7 @@ async def test_non_executable_task_command_slurm( ) -@pytest.mark.parametrize("legacy", [False, True]) async def test_failing_workflow_UnknownError_slurm( - legacy: bool, client, MockCurrentUser, testdata_path, @@ -288,7 +285,6 @@ async def test_failing_workflow_UnknownError_slurm( user_kwargs={"cache_dir": str(tmp777_path / "user_cache_dir-slurm")}, client=client, monkeypatch=monkeypatch, - legacy=legacy, project_factory_v2=project_factory_v2, dataset_factory_v2=dataset_factory_v2, workflow_factory_v2=workflow_factory_v2, diff --git a/tests/v2/09_backends/test_slurm_config.py b/tests/v2/09_backends/test_slurm_config.py index bec3d893cf..601977c032 100644 --- a/tests/v2/09_backends/test_slurm_config.py +++ b/tests/v2/09_backends/test_slurm_config.py @@ -50,56 +50,29 @@ class WorkflowTaskV2Mock(BaseModel, extra=Extra.forbid): args_parallel: dict[str, Any] = Field(default_factory=dict) meta_non_parallel: dict[str, Any] = Field(default_factory=dict) meta_parallel: dict[str, Any] = Field(default_factory=dict) - is_legacy_task: Optional[bool] meta_parallel: Optional[dict[str, Any]] = Field() meta_non_parallel: Optional[dict[str, Any]] = Field() - task: Optional[TaskV2Mock] = None - task_legacy: Optional[TaskV1Mock] = None - is_legacy_task: bool = False + task: TaskV2Mock input_filters: dict[str, Any] = Field(default_factory=dict) order: int = 0 id: int = 1 workflow_id: int = 0 - task_legacy_id: Optional[int] - task_id: Optional[int] - - @root_validator(pre=False) - def _legacy_or_not(cls, values): - is_legacy_task = values["is_legacy_task"] - task = values.get("task") - task_legacy = values.get("task_legacy") - if is_legacy_task: - if task_legacy is None or task is not None: - raise ValueError(f"Invalid WorkflowTaskV2Mock with {values=}") - values["task_legacy_id"] = task_legacy.id - else: - if task is None or task_legacy is not None: - raise ValueError(f"Invalid WorkflowTaskV2Mock with {values=}") - values["task_id"] = task.id - return values + task_id: int @root_validator(pre=False) def merge_meta(cls, values): - if values["is_legacy_task"]: - task_meta = values["task"].meta - if task_meta: - values["meta"] = { - **task_meta, - **values["meta"], - } - else: - task_meta_parallel = values["task"].meta_parallel - if task_meta_parallel: - values["meta_parallel"] = { - **task_meta_parallel, - **values["meta_parallel"], - } - task_meta_non_parallel = values["task"].meta_non_parallel - if task_meta_non_parallel: - values["meta_non_parallel"] = { - **task_meta_non_parallel, - **values["meta_non_parallel"], - } + task_meta_parallel = values["task"].meta_parallel + if task_meta_parallel: + values["meta_parallel"] = { + **task_meta_parallel, + **values["meta_parallel"], + } + task_meta_non_parallel = values["task"].meta_non_parallel + if task_meta_non_parallel: + values["meta_non_parallel"] = { + **task_meta_non_parallel, + **values["meta_non_parallel"], + } return values @@ -180,6 +153,7 @@ def test_get_slurm_config(tmp_path: Path): ) mywftask = WorkflowTaskV2Mock( task=mytask, + task_id=mytask.id, args_non_parallel=dict(message="test"), meta_non_parallel=meta_non_parallel, ) @@ -241,6 +215,7 @@ def test_get_slurm_config_fail(tmp_path): get_slurm_config( wftask=WorkflowTaskV2Mock( task=TaskV2Mock(), + task_id=TaskV2Mock().id, meta_non_parallel={}, ), config_path=config_path_valid, @@ -258,6 +233,7 @@ def test_get_slurm_config_fail(tmp_path): get_slurm_config( wftask=WorkflowTaskV2Mock( task=TaskV2Mock(), + task_id=TaskV2Mock().id, meta_non_parallel={}, ), config_path=config_path_invalid, @@ -321,6 +297,7 @@ def test_get_slurm_config_wftask_meta_none(tmp_path): ) mywftask = WorkflowTaskV2Mock( task=TaskV2Mock(meta_non_parallel=None), + task_id=TaskV2Mock(meta_non_parallel=None).id, args_non_parallel=dict(message="test"), meta_non_parallel=meta_non_parallel, ) @@ -362,7 +339,7 @@ def test_slurm_submit_setup( ) # No account in `wftask.meta` --> OK - wftask = WorkflowTaskV2Mock(task=TaskV2Mock()) + wftask = WorkflowTaskV2Mock(task=TaskV2Mock(), task_id=TaskV2Mock().id) slurm_config = _slurm_submit_setup( wftask=wftask, workflow_dir_local=tmp_path, @@ -376,6 +353,7 @@ def test_slurm_submit_setup( wftask = WorkflowTaskV2Mock( meta_non_parallel=dict(key="value", account="MyFakeAccount"), task=TaskV2Mock(), + task_id=TaskV2Mock().id, ) with pytest.raises(SlurmConfigError) as e: _slurm_submit_setup( @@ -425,7 +403,7 @@ def test_get_slurm_config_gpu_options(tmp_path: Path): json.dump(slurm_config_dict, f) # In absence of `needs_gpu`, parameters in `gpu_slurm_config` are not used - mywftask = WorkflowTaskV2Mock(task=TaskV2Mock()) + mywftask = WorkflowTaskV2Mock(task=TaskV2Mock(), task_id=TaskV2Mock().id) slurm_config = get_slurm_config( wftask=mywftask, config_path=config_path, @@ -437,7 +415,9 @@ def test_get_slurm_config_gpu_options(tmp_path: Path): # When `needs_gpu` is set, parameters in `gpu_slurm_config` are used mywftask = WorkflowTaskV2Mock( - meta_non_parallel=dict(needs_gpu=True), task=TaskV2Mock() + meta_non_parallel=dict(needs_gpu=True), + task=TaskV2Mock(), + task_id=TaskV2Mock().id, ) slurm_config = get_slurm_config( wftask=mywftask,