Skip to content

Commit

Permalink
refactor: use public serializer for ui batch recent runs
Browse files Browse the repository at this point in the history
  • Loading branch information
jason810496 committed Oct 21, 2024
1 parent b0b1469 commit 5252d7a
Show file tree
Hide file tree
Showing 11 changed files with 633 additions and 178 deletions.
237 changes: 188 additions & 49 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/RecentDAGRunsCollectionResponse'
$ref: '#/components/schemas/RecentDAGCollectionResponse'
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -2040,86 +2040,225 @@ components:
- version
title: ProviderResponse
description: Provider serializer for responses.
RecentDAGRun:
RecentDAGCollectionResponse:
properties:
start_date:
dags:
items:
$ref: '#/components/schemas/RecentDAGResponse'
type: array
title: Dags
total_entries:
type: integer
title: Total Entries
type: object
required:
- dags
- total_entries
title: RecentDAGCollectionResponse
description: Recent DAG Runs collection response serializer.
RecentDAGResponse:
properties:
dag_id:
type: string
title: Dag Id
dag_display_name:
type: string
title: Dag Display Name
is_paused:
type: boolean
title: Is Paused
is_active:
type: boolean
title: Is Active
last_parsed_time:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Start Date
end_date:
title: Last Parsed Time
last_pickled:
anyOf:
- type: string
format: date-time
- type: 'null'
title: End Date
state:
$ref: '#/components/schemas/DagRunState'
execution_date:
title: Last Pickled
last_expired:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Execution Date
data_interval_start:
title: Last Expired
scheduler_lock:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Data Interval Start
data_interval_end:
title: Scheduler Lock
pickle_id:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Data Interval End
type: object
required:
- start_date
- end_date
- state
- execution_date
- data_interval_start
- data_interval_end
title: RecentDAGRun
description: Run serializer for Recent DAG Runs.
RecentDAGRunsCollectionResponse:
properties:
recent_dag_runs:
title: Pickle Id
default_view:
anyOf:
- type: string
- type: 'null'
title: Default View
fileloc:
type: string
title: Fileloc
description:
anyOf:
- type: string
- type: 'null'
title: Description
timetable_summary:
anyOf:
- type: string
- type: 'null'
title: Timetable Summary
timetable_description:
anyOf:
- type: string
- type: 'null'
title: Timetable Description
tags:
items:
$ref: '#/components/schemas/RecentDAGRunsResponse'
$ref: '#/components/schemas/DagTagPydantic'
type: array
title: Recent Dag Runs
total_dag_ids:
title: Tags
max_active_tasks:
type: integer
title: Total Dag Ids
total_dag_runs:
title: Max Active Tasks
max_active_runs:
anyOf:
- type: integer
- type: 'null'
title: Max Active Runs
max_consecutive_failed_dag_runs:
type: integer
title: Total Dag Runs
title: Max Consecutive Failed Dag Runs
has_task_concurrency_limits:
type: boolean
title: Has Task Concurrency Limits
has_import_errors:
type: boolean
title: Has Import Errors
next_dagrun:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Next Dagrun
next_dagrun_data_interval_start:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Next Dagrun Data Interval Start
next_dagrun_data_interval_end:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Next Dagrun Data Interval End
next_dagrun_create_after:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Next Dagrun Create After
owners:
items:
type: string
type: array
title: Owners
latest_dag_runs:
items:
$ref: '#/components/schemas/RecentDAGRunResponse'
type: array
title: Latest Dag Runs
file_token:
type: string
title: File Token
description: Return file token.
readOnly: true
type: object
required:
- recent_dag_runs
- total_dag_ids
- total_dag_runs
title: RecentDAGRunsCollectionResponse
description: Recent DAG Runs collection response serializer.
RecentDAGRunsResponse:
- file_token
title: RecentDAGResponse
description: Recent DAG Runs response serializer.
RecentDAGRunResponse:
properties:
run_id:
anyOf:
- type: string
- type: 'null'
title: Run Id
dag_id:
type: string
title: Dag Id
dag_runs:
items:
$ref: '#/components/schemas/RecentDAGRun'
type: array
title: Dag Runs
logical_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Logical Date
start_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Start Date
end_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: End Date
data_interval_start:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Data Interval Start
data_interval_end:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Data Interval End
last_scheduling_decision:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Scheduling Decision
run_type:
$ref: '#/components/schemas/DagRunType'
state:
$ref: '#/components/schemas/DagRunState'
external_trigger:
type: boolean
title: External Trigger
triggered_by:
$ref: '#/components/schemas/DagRunTriggeredByType'
conf:
type: object
title: Conf
note:
anyOf:
- type: string
- type: 'null'
title: Note
execution_date:
type: string
format: date-time
title: Execution Date
type: object
required:
- dag_id
- dag_runs
title: RecentDAGRunsResponse
description: Recent DAG Runs response serializer.
title: RecentDAGRunResponse
description: Recent DAG Run response serializer.
SchedulerInfoSchema:
properties:
status:
Expand Down
28 changes: 13 additions & 15 deletions airflow/api_fastapi/core_api/routes/ui/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.serializers.ui.dags import (
RecentDAGRun,
RecentDAGRunsCollectionResponse,
RecentDAGRunsResponse,
RecentDAGCollectionResponse,
RecentDAGResponse,
RecentDAGRunResponse,
)
from airflow.models import DagModel, DagRun

dags_router = AirflowRouter(prefix="/dags", tags=["Dags"])


@dags_router.get("/recent_dag_runs", include_in_schema=False)
@dags_router.get("/recent_dag_runs", include_in_schema=False, response_model_exclude_none=True)
async def recent_dag_runs(
limit: QueryLimit,
offset: QueryOffset,
Expand All @@ -61,7 +61,7 @@ async def recent_dag_runs(
last_dag_run_state: QueryLastDagRunStateFilter,
dag_runs_limit: QueryLimit,
session: Annotated[Session, Depends(get_session)],
) -> RecentDAGRunsCollectionResponse:
) -> RecentDAGCollectionResponse:
"""Get recent DAG runs."""
recent_runs_subquery = (
select(
Expand Down Expand Up @@ -91,8 +91,6 @@ async def recent_dag_runs(
DagRun.data_interval_end,
)
.join(DagModel, DagModel.dag_id == recent_runs_subquery.c.dag_id)
# `last_run_state` and `last_run_start_date` query params of`get_dags` endpoint
# needs `DagRun` to order_by
.join(
DagRun,
and_(
Expand All @@ -110,8 +108,9 @@ async def recent_dag_runs(
DagRun.data_interval_start,
DagRun.data_interval_end,
)
.order_by(recent_runs_subquery.c.execution_date.desc())
)
dags_with_recent_dag_runs_select_filter, total_entries = paginated_select(
dags_with_recent_dag_runs_select_filter, _ = paginated_select(
dags_with_recent_dag_runs_select,
[only_active, paused, dag_id_pattern, dag_display_name_pattern, tags, owners, last_dag_run_state],
None,
Expand All @@ -127,14 +126,13 @@ async def recent_dag_runs(
dag_runs_by_dag_id[dag_id] = []
dag_runs_by_dag_id[dag_id].append(row)

return RecentDAGRunsCollectionResponse(
total_dag_runs=total_entries,
total_dag_ids=len(dag_runs_by_dag_id),
recent_dag_runs=[
RecentDAGRunsResponse(
return RecentDAGCollectionResponse(
total_entries=len(dag_runs_by_dag_id),
dags=[
RecentDAGResponse(
dag_id=dag_id,
dag_runs=[
RecentDAGRun(
latest_dag_runs=[
RecentDAGRunResponse( # type: ignore
start_date=dag_run.start_date,
end_date=dag_run.end_date,
state=dag_run.state,
Expand Down
35 changes: 35 additions & 0 deletions airflow/api_fastapi/core_api/serializers/optional.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import Any

from pydantic import BaseModel


class OptionalModel(BaseModel):
"""A Pydantic model that makes all fields optional."""

@classmethod
def __pydantic_init_subclass__(cls, **kwargs: Any) -> None:
super().__pydantic_init_subclass__(**kwargs)

for field in cls.model_fields.values():
field.default = None

cls.model_rebuild(force=True)
Loading

0 comments on commit 5252d7a

Please sign in to comment.