Skip to content

Commit

Permalink
AIP-84 Migrate public endpoint Clear Task Instances to FastAPI (#44220)
Browse files Browse the repository at this point in the history
* Migrate public endpoint Clear Task Instances to FastAPI

* Remove unused alias choice, route trailing slash

* Make changes as per feedback from Pierre

* Add total_entries to response

* Remove logical_date from clear tis response
  • Loading branch information
omkar-foss authored Nov 22, 2024
1 parent cf26561 commit 00fd540
Show file tree
Hide file tree
Showing 12 changed files with 1,264 additions and 26 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse:
)


@mark_fastapi_migration_done
@security.requires_access_dag("PUT", DagAccessEntity.TASK_INSTANCE)
@action_logging
@provide_session
Expand Down
11 changes: 9 additions & 2 deletions airflow/api_fastapi/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@
from enum import Enum
from typing import Annotated

from pydantic import AfterValidator, AliasGenerator, AwareDatetime, BaseModel, BeforeValidator, ConfigDict
from pydantic import (
AfterValidator,
AliasGenerator,
AwareDatetime,
BaseModel,
BeforeValidator,
ConfigDict,
)

from airflow.utils import timezone

Expand All @@ -29,7 +36,7 @@


def _validate_timedelta_field(td: timedelta | None) -> TimeDelta | None:
"""Validate the execution_timeout property."""
"""Validate the timedelta field and return it."""
if td is None:
return None
return TimeDelta(
Expand Down
55 changes: 54 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from __future__ import annotations

from datetime import datetime
from typing import Annotated
from typing import Annotated, Any

from pydantic import (
AliasPath,
Expand All @@ -27,6 +27,8 @@
ConfigDict,
Field,
NonNegativeInt,
ValidationError,
model_validator,
)

from airflow.api_fastapi.core_api.datamodels.job import JobResponse
Expand Down Expand Up @@ -150,3 +152,54 @@ class TaskInstanceHistoryCollectionResponse(BaseModel):

task_instances: list[TaskInstanceHistoryResponse]
total_entries: int


class ClearTaskInstancesBody(BaseModel):
"""Request body for Clear Task Instances endpoint."""

dry_run: bool = True
start_date: AwareDatetime | None = None
end_date: AwareDatetime | None = None
only_failed: bool = True
only_running: bool = False
reset_dag_runs: bool = False
task_ids: list[str] | None = None
dag_run_id: str | None = None
include_upstream: bool = False
include_downstream: bool = False
include_future: bool = False
include_past: bool = False

@model_validator(mode="before")
@classmethod
def validate_model(cls, data: Any) -> Any:
"""Validate clear task instance form."""
if data.get("only_failed") and data.get("only_running"):
raise ValidationError("only_failed and only_running both are set to True")
if data.get("start_date") and data.get("end_date"):
if data.get("start_date") > data.get("end_date"):
raise ValidationError("end_date is sooner than start_date")
if data.get("start_date") and data.get("end_date") and data.get("dag_run_id"):
raise ValidationError("Exactly one of dag_run_id or (start_date and end_date) must be provided")
if data.get("start_date") and data.get("dag_run_id"):
raise ValidationError("Exactly one of dag_run_id or start_date must be provided")
if data.get("end_date") and data.get("dag_run_id"):
raise ValidationError("Exactly one of dag_run_id or end_date must be provided")
if isinstance(data.get("task_ids"), list) and len(data.get("task_ids")) < 1:
raise ValidationError("task_ids list should have at least 1 element.")
return data


class TaskInstanceReferenceResponse(BaseModel):
"""Task Instance Reference serializer for responses."""

task_id: str
dag_run_id: str = Field(validation_alias="run_id")
dag_id: str


class TaskInstanceReferenceCollectionResponse(BaseModel):
"""Task Instance Reference collection serializer for responses."""

task_instances: list[TaskInstanceReferenceResponse]
total_entries: int
146 changes: 146 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4227,6 +4227,57 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/clearTaskInstances:
post:
tags:
- Task Instance
summary: Post Clear Task Instances
description: Clear task instances.
operationId: post_clear_task_instances
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/ClearTaskInstancesBody'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/TaskInstanceReferenceCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/tasks:
get:
tags:
Expand Down Expand Up @@ -5123,6 +5174,67 @@ components:
- status
title: BaseInfoSchema
description: Base status field for metadatabase and scheduler.
ClearTaskInstancesBody:
properties:
dry_run:
type: boolean
title: Dry Run
default: true
start_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Start Date
end_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: End Date
only_failed:
type: boolean
title: Only Failed
default: true
only_running:
type: boolean
title: Only Running
default: false
reset_dag_runs:
type: boolean
title: Reset Dag Runs
default: false
task_ids:
anyOf:
- items:
type: string
type: array
- type: 'null'
title: Task Ids
dag_run_id:
anyOf:
- type: string
- type: 'null'
title: Dag Run Id
include_upstream:
type: boolean
title: Include Upstream
default: false
include_downstream:
type: boolean
title: Include Downstream
default: false
include_future:
type: boolean
title: Include Future
default: false
include_past:
type: boolean
title: Include Past
default: false
type: object
title: ClearTaskInstancesBody
description: Request body for Clear Task Instances endpoint.
Config:
properties:
sections:
Expand Down Expand Up @@ -7049,6 +7161,40 @@ components:
- executor_config
title: TaskInstanceHistoryResponse
description: TaskInstanceHistory serializer for responses.
TaskInstanceReferenceCollectionResponse:
properties:
task_instances:
items:
$ref: '#/components/schemas/TaskInstanceReferenceResponse'
type: array
title: Task Instances
total_entries:
type: integer
title: Total Entries
type: object
required:
- task_instances
- total_entries
title: TaskInstanceReferenceCollectionResponse
description: Task Instance Reference collection serializer for responses.
TaskInstanceReferenceResponse:
properties:
task_id:
type: string
title: Task Id
dag_run_id:
type: string
title: Dag Run Id
dag_id:
type: string
title: Dag Id
type: object
required:
- task_id
- dag_run_id
- dag_id
title: TaskInstanceReferenceResponse
description: Task Instance Reference serializer for responses.
TaskInstanceResponse:
properties:
id:
Expand Down
Loading

0 comments on commit 00fd540

Please sign in to comment.