Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate use of v1 tasks within v2 workflows #1721

Merged
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
b530867
Deprecate legacy tasks in V2 workflows - first commit
tcompa Sep 3, 2024
4893927
Add db migration script
tcompa Sep 3, 2024
643ca87
Update `scripts/validate_db_data_with_read_schemas.py`
tcompa Sep 3, 2024
8e27eef
Update benchmarks/runner/mocks.py
tcompa Sep 3, 2024
218ff12
Do not test order of `listdir` output
tcompa Sep 3, 2024
0053a38
Add FIXMEs
tcompa Sep 3, 2024
9601cfe
Adapt tests after removing legacy tasks from V2 workflows
tcompa Sep 3, 2024
f8b7ae4
Make non-legacy workflowtaskv2 task attributes non-optional
tcompa Sep 3, 2024
2e0588f
Update db migration script
tcompa Sep 3, 2024
b31d9d2
Remove `Task.is_v2_compatible` and task-legacy endpoints
tcompa Sep 3, 2024
4afa118
Update tests
tcompa Sep 3, 2024
e2da880
Merge branch 'main' into 1712-on-hold-deprecate-use-of-v1-tasks-withi…
tcompa Sep 3, 2024
d9e97b4
Keep removing legacy-task-related tests
tcompa Sep 3, 2024
6c88926
Merge branch 'main' into 1712-on-hold-deprecate-use-of-v1-tasks-withi…
tcompa Sep 3, 2024
9df9c85
Add required `task_id` attribute in tests
tcompa Sep 3, 2024
a4eebf1
Update test_unit_schemas_v2.py
tcompa Sep 3, 2024
9ec7b6d
Merge branch 'main' into 1712-on-hold-deprecate-use-of-v1-tasks-withi…
ychiucco Sep 3, 2024
216949d
Fix some more WorkflowTaskV2Mock required attributes in tests
tcompa Sep 3, 2024
5901a2e
Merge branch '1712-on-hold-deprecate-use-of-v1-tasks-within-v2-workfl…
tcompa Sep 3, 2024
650df6a
Merge two migration scripts
tcompa Sep 4, 2024
a9644d5
Make migration script work for sqlite (ref #1722)
tcompa Sep 4, 2024
14ac70b
Merge branch 'main' into 1712-on-hold-deprecate-use-of-v1-tasks-withi…
tcompa Sep 4, 2024
5efb607
Change sqlite-motivated order of migration script statements
tcompa Sep 4, 2024
c8b38d8
Do not explicitly drop foreign-key constraint, since we are already d…
tcompa Sep 4, 2024
da6443c
Change sqlite-motivated order of migration script statements
tcompa Sep 4, 2024
1edff7b
Add info about current sqlite version to warning message
tcompa Sep 4, 2024
defd53d
Merge branch 'main' into 1712-on-hold-deprecate-use-of-v1-tasks-withi…
tcompa Sep 5, 2024
905cb95
add _type to migration
ychiucco Sep 9, 2024
def9f1c
avoit drop_constraint in sqlite
ychiucco Sep 9, 2024
02c13a8
move check up
ychiucco Sep 9, 2024
5f9848c
move check up again
ychiucco Sep 9, 2024
1cbe911
render_as_batch=True
ychiucco Sep 9, 2024
7a79d89
autogenerated review
ychiucco Sep 9, 2024
7b834d6
if branch in migration
ychiucco Sep 9, 2024
883ddee
NAMING_CONVENTION in separate file
ychiucco Sep 9, 2024
978429b
add one missing piece to migration
ychiucco Sep 9, 2024
2f16169
remove index
ychiucco Sep 9, 2024
de47a46
if branch
ychiucco Sep 9, 2024
146e31c
check for index
ychiucco Sep 9, 2024
0c6e25d
Merge branch 'main' into 1712-on-hold-deprecate-use-of-v1-tasks-withi…
ychiucco Sep 10, 2024
8330543
fix revision order
ychiucco Sep 10, 2024
a3f0690
Literal for WorkflowTaskV2.task_type
ychiucco Sep 10, 2024
e233355
rollback Literal
ychiucco Sep 10, 2024
80022a6
Update logs
tcompa Sep 10, 2024
e680495
Update migration name
tcompa Sep 10, 2024
82f395a
Update `remove_link_between_v1_and_v2_tasks` migration script for sqlite
tcompa Sep 10, 2024
0095026
Re-order migration statements
tcompa Sep 10, 2024
41fbc2c
Merge branch 'main' into 1712-on-hold-deprecate-use-of-v1-tasks-withi…
tcompa Sep 10, 2024
9a9d2b6
Clean up old explanation comment
tcompa Sep 11, 2024
e28d9ef
Update CHANGELOG [skip ci]
tcompa Sep 11, 2024
b1acbef
Merge branch 'main' into 1712-on-hold-deprecate-use-of-v1-tasks-withi…
tcompa Sep 11, 2024
32a1045
Merge branch 'main' into 1712-on-hold-deprecate-use-of-v1-tasks-withi…
tcompa Sep 11, 2024
7c81fbd
Merge branch 'main' into 1712-on-hold-deprecate-use-of-v1-tasks-withi…
tcompa Sep 13, 2024
d973896
Update CHANGELOG.md [skip ci]
tcompa Sep 13, 2024
e49247d
Update __main__.py
tcompa Sep 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 2 additions & 42 deletions benchmarks/runner/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,59 +69,19 @@ def _set_type(cls, value, values):
return "compound"


class TaskV1Mock(BaseModel):
id: int
name: str
command: str # str
source: str = Field(unique=True)
input_type: str
output_type: str
meta: Optional[dict[str, Any]] = Field(default_factory=dict)

@property
def parallelization_level(self) -> Optional[str]:
try:
return self.meta["parallelization_level"]
except KeyError:
return None

@property
def is_parallel(self) -> bool:
return bool(self.parallelization_level)


class WorkflowTaskV2Mock(BaseModel):
args_non_parallel: dict[str, Any] = Field(default_factory=dict)
args_parallel: dict[str, Any] = Field(default_factory=dict)
meta_non_parallel: dict[str, Any] = Field(default_factory=dict)
meta_parallel: dict[str, Any] = Field(default_factory=dict)
is_legacy_task: Optional[bool]
meta_parallel: Optional[dict[str, Any]] = Field()
meta_non_parallel: Optional[dict[str, Any]] = Field()
task: Optional[TaskV2Mock] = None
task_legacy: Optional[TaskV1Mock] = None
is_legacy_task: bool = False
task: TaskV2Mock = None
input_filters: dict[str, Any] = Field(default_factory=dict)
order: int
id: int
workflow_id: int = 0
task_legacy_id: Optional[int]
task_id: Optional[int]

@root_validator(pre=False)
def _legacy_or_not(cls, values):
is_legacy_task = values["is_legacy_task"]
task = values.get("task")
task_legacy = values.get("task_legacy")
if is_legacy_task:
if task_legacy is None or task is not None:
raise ValueError(f"Invalid WorkflowTaskV2Mock with {values=}")
values["task_legacy_id"] = task_legacy.id
else:
if task is None or task_legacy is not None:
raise ValueError(f"Invalid WorkflowTaskV2Mock with {values=}")
values["task_id"] = task.id
return values
task_id: int

@validator("input_filters", always=True)
def _default_filters(cls, value):
Expand Down
6 changes: 3 additions & 3 deletions fractal_server/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ def set_db():
alembic_args = ["-c", alembic_ini.as_posix(), "upgrade", "head"]
print(f"START: Run alembic.config, with argv={alembic_args}")
alembic.config.main(argv=alembic_args)
print("END: alembic.config")
print("END: alembic.config\n")
# Insert default group
print("START: Default group creation")
_create_first_group()
print("END: Default group creation")
print("END: Default group creation\n")
# NOTE: It will be fixed with #1739
settings = Inject(get_settings)
print("START: First user creation")
Expand All @@ -103,7 +103,7 @@ def set_db():
is_verified=True,
)
)
print("END: First user creation")
print("END: First user creation\n")


def update_db_data():
Expand Down
5 changes: 4 additions & 1 deletion fractal_server/app/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
`db` module, loosely adapted from
https://testdriven.io/blog/fastapi-sqlmodel/#async-sqlmodel
"""
import sqlite3
from typing import AsyncGenerator
from typing import Generator

Expand All @@ -21,7 +22,9 @@
logger = set_logger(__name__)

SQLITE_WARNING_MESSAGE = (
"SQLite is supported (for version >=3.37) but discouraged in production. "
"SQLite is supported (supported version >=3.37, "
f"current {sqlite3.sqlite_version=}) "
"but discouraged in production. "
"Given its partial support for ForeignKey constraints, "
"database consistency cannot be guaranteed."
)
Expand Down
5 changes: 0 additions & 5 deletions fractal_server/app/models/v1/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from pydantic import HttpUrl
from sqlalchemy import Column
from sqlalchemy import sql
from sqlalchemy.types import JSON
from sqlmodel import Field
from sqlmodel import SQLModel
Expand Down Expand Up @@ -49,10 +48,6 @@ class Task(_TaskBaseV1, SQLModel, table=True):
docs_info: Optional[str] = None
docs_link: Optional[HttpUrl] = None

is_v2_compatible: bool = Field(
default=False, sa_column_kwargs={"server_default": sql.false()}
)

@property
def parallelization_level(self) -> Optional[str]:
try:
Expand Down
12 changes: 2 additions & 10 deletions fractal_server/app/models/v2/workflowtask.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from sqlmodel import Relationship
from sqlmodel import SQLModel

from ..v1.task import Task
from .task import TaskV2


Expand Down Expand Up @@ -37,13 +36,6 @@ class Config:
)

# Task
is_legacy_task: bool
task_type: str
task_id: Optional[int] = Field(foreign_key="taskv2.id")
task: Optional[TaskV2] = Relationship(
sa_relationship_kwargs=dict(lazy="selectin")
)
task_legacy_id: Optional[int] = Field(foreign_key="task.id")
task_legacy: Optional[Task] = Relationship(
sa_relationship_kwargs=dict(lazy="selectin")
)
task_id: int = Field(foreign_key="taskv2.id")
task: TaskV2 = Relationship(sa_relationship_kwargs=dict(lazy="selectin"))
30 changes: 0 additions & 30 deletions fractal_server/app/routes/admin/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from ....zip_tools import _zip_folder_to_byte_stream_iterator
from ...db import AsyncSession
from ...db import get_async_db
from ...models.v1 import Task
from ...models.v2 import JobV2
from ...models.v2 import ProjectV2
from ...models.v2 import TaskV2
Expand Down Expand Up @@ -281,35 +280,6 @@ async def download_job_logs(
)


class TaskCompatibility(BaseModel):
is_v2_compatible: bool


@router_admin_v2.patch(
"/task-v1/{task_id}/",
status_code=status.HTTP_200_OK,
)
async def flag_task_v1_as_v2_compatible(
task_id: int,
compatibility: TaskCompatibility,
user: UserOAuth = Depends(current_active_superuser),
db: AsyncSession = Depends(get_async_db),
) -> Response:

task = await db.get(Task, task_id)
if task is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task {task_id} not found",
)

task.is_v2_compatible = compatibility.is_v2_compatible
await db.commit()
await db.close()

return Response(status_code=status.HTTP_200_OK)


class TaskV2Minimal(BaseModel):

id: int
Expand Down
4 changes: 0 additions & 4 deletions fractal_server/app/routes/api/v2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from .task import router as task_router_v2
from .task_collection import router as task_collection_router_v2
from .task_collection_custom import router as task_collection_router_v2_custom
from .task_legacy import router as task_legacy_router_v2
from .workflow import router as workflow_router_v2
from .workflowtask import router as workflowtask_router_v2
from fractal_server.config import get_settings
Expand All @@ -38,9 +37,6 @@
tags=["V2 Task Collection"],
)
router_api_v2.include_router(task_router_v2, prefix="/task", tags=["V2 Task"])
router_api_v2.include_router(
task_legacy_router_v2, prefix="/task-legacy", tags=["V2 Task Legacy"]
)
router_api_v2.include_router(workflow_router_v2, tags=["V2 Workflow"])
router_api_v2.include_router(workflowtask_router_v2, tags=["V2 WorkflowTask"])
router_api_v2.include_router(status_router_v2, tags=["V2 Status"])
51 changes: 10 additions & 41 deletions fractal_server/app/routes/api/v2/_aux_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from sqlmodel.sql.expression import SelectOfScalar

from ....db import AsyncSession
from ....models.v1 import Task
from ....models.v2 import DatasetV2
from ....models.v2 import JobV2
from ....models.v2 import LinkUserProjectV2
Expand Down Expand Up @@ -389,7 +388,6 @@ async def _workflow_insert_task(
*,
workflow_id: int,
task_id: int,
is_legacy_task: bool = False,
order: Optional[int] = None,
meta_parallel: Optional[dict[str, Any]] = None,
meta_non_parallel: Optional[dict[str, Any]] = None,
Expand All @@ -404,7 +402,7 @@ async def _workflow_insert_task(
Args:
workflow_id:
task_id:
is_legacy_task:

order:
meta_parallel:
meta_non_parallel:
Expand All @@ -424,40 +422,13 @@ async def _workflow_insert_task(
# method
# NOTE: this logic remains there for V1 tasks only. When we deprecate V1
# tasks, we can simplify this block
if is_legacy_task is True:
db_task = await db.get(Task, task_id)
if db_task is None:
raise ValueError(f"Task {task_id} not found.")
task_type = "parallel"

final_args_parallel = db_task.default_args_from_args_schema.copy()
final_args_non_parallel = {}
final_meta_parallel = (db_task.meta or {}).copy()
final_meta_non_parallel = {}
db_task = await db.get(TaskV2, task_id)
if db_task is None:
raise ValueError(f"TaskV2 {task_id} not found.")
task_type = db_task.type

else:
db_task = await db.get(TaskV2, task_id)
if db_task is None:
raise ValueError(f"TaskV2 {task_id} not found.")
task_type = db_task.type

final_args_non_parallel = {}
final_args_parallel = {}
final_meta_parallel = (db_task.meta_parallel or {}).copy()
final_meta_non_parallel = (db_task.meta_non_parallel or {}).copy()

# Combine arg_parallel
if args_parallel is not None:
for k, v in args_parallel.items():
final_args_parallel[k] = v
if final_args_parallel == {}:
final_args_parallel = None
# Combine arg_non_parallel
if args_non_parallel is not None:
for k, v in args_non_parallel.items():
final_args_non_parallel[k] = v
if final_args_non_parallel == {}:
final_args_non_parallel = None
final_meta_parallel = (db_task.meta_parallel or {}).copy()
final_meta_non_parallel = (db_task.meta_non_parallel or {}).copy()

# Combine meta_parallel (higher priority)
# and db_task.meta_parallel (lower priority)
Expand All @@ -479,11 +450,9 @@ async def _workflow_insert_task(
# Create DB entry
wf_task = WorkflowTaskV2(
task_type=task_type,
is_legacy_task=is_legacy_task,
task_id=(task_id if not is_legacy_task else None),
task_legacy_id=(task_id if is_legacy_task else None),
args_non_parallel=final_args_non_parallel,
args_parallel=final_args_parallel,
task_id=task_id,
args_non_parallel=args_non_parallel,
args_parallel=args_parallel,
meta_parallel=final_meta_parallel,
meta_non_parallel=final_meta_non_parallel,
**input_filters_kwarg,
Expand Down
59 changes: 0 additions & 59 deletions fractal_server/app/routes/api/v2/task_legacy.py

This file was deleted.

Loading
Loading