Skip to content

Commit

Permalink
Update code review
Browse files Browse the repository at this point in the history
  • Loading branch information
pierrejeambrun committed Oct 25, 2024
1 parent 4f16e2f commit c58a7e7
Show file tree
Hide file tree
Showing 11 changed files with 569 additions and 122 deletions.
216 changes: 183 additions & 33 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ paths:
description: Get recent DAG runs.
operationId: recent_dag_runs
parameters:
- name: dag_runs_limit
in: query
required: false
schema:
type: integer
default: 10
title: Dag Runs Limit
- name: limit
in: query
required: false
Expand Down Expand Up @@ -156,7 +163,7 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/RecentDAGCollectionResponse'
$ref: '#/components/schemas/DAGWithLatestDagRunsCollectionResponse'
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -2183,6 +2190,181 @@ components:
- total_entries
title: DAGTagCollectionResponse
description: DAG Tags Collection serializer for responses.
DAGWithLatestDagRunsCollectionResponse:
properties:
total_entries:
type: integer
title: Total Entries
dags:
items:
$ref: '#/components/schemas/DAGWithLatestDagRunsResponse'
type: array
title: Dags
type: object
required:
- total_entries
- dags
title: DAGWithLatestDagRunsCollectionResponse
description: DAG with latest dag runs collection response serializer.
DAGWithLatestDagRunsResponse:
properties:
dag_id:
type: string
title: Dag Id
dag_display_name:
type: string
title: Dag Display Name
is_paused:
type: boolean
title: Is Paused
is_active:
type: boolean
title: Is Active
last_parsed_time:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Parsed Time
last_pickled:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Pickled
last_expired:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Expired
scheduler_lock:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Scheduler Lock
pickle_id:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Pickle Id
default_view:
anyOf:
- type: string
- type: 'null'
title: Default View
fileloc:
type: string
title: Fileloc
description:
anyOf:
- type: string
- type: 'null'
title: Description
timetable_summary:
anyOf:
- type: string
- type: 'null'
title: Timetable Summary
timetable_description:
anyOf:
- type: string
- type: 'null'
title: Timetable Description
tags:
items:
$ref: '#/components/schemas/DagTagPydantic'
type: array
title: Tags
max_active_tasks:
type: integer
title: Max Active Tasks
max_active_runs:
anyOf:
- type: integer
- type: 'null'
title: Max Active Runs
max_consecutive_failed_dag_runs:
type: integer
title: Max Consecutive Failed Dag Runs
has_task_concurrency_limits:
type: boolean
title: Has Task Concurrency Limits
has_import_errors:
type: boolean
title: Has Import Errors
next_dagrun:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Next Dagrun
next_dagrun_data_interval_start:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Next Dagrun Data Interval Start
next_dagrun_data_interval_end:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Next Dagrun Data Interval End
next_dagrun_create_after:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Next Dagrun Create After
owners:
items:
type: string
type: array
title: Owners
latest_dag_runs:
items:
$ref: '#/components/schemas/DAGRunResponse'
type: array
title: Latest Dag Runs
file_token:
type: string
title: File Token
description: Return file token.
readOnly: true
type: object
required:
- dag_id
- dag_display_name
- is_paused
- is_active
- last_parsed_time
- last_pickled
- last_expired
- scheduler_lock
- pickle_id
- default_view
- fileloc
- description
- timetable_summary
- timetable_description
- tags
- max_active_tasks
- max_active_runs
- max_consecutive_failed_dag_runs
- has_task_concurrency_limits
- has_import_errors
- next_dagrun
- next_dagrun_data_interval_start
- next_dagrun_data_interval_end
- next_dagrun_create_after
- owners
- latest_dag_runs
- file_token
title: DAGWithLatestDagRunsResponse
description: DAG with latest dag runs response serializer.
DagProcessorInfoSchema:
properties:
status:
Expand Down Expand Up @@ -2563,38 +2745,6 @@ components:
- version
title: ProviderResponse
description: Provider serializer for responses.
RecentDAGCollectionResponse:
properties:
total_entries:
type: integer
title: Total Entries
dags:
items:
$ref: '#/components/schemas/RecentDAGResponse'
type: array
title: Dags
type: object
required:
- total_entries
- dags
title: RecentDAGCollectionResponse
description: Recent DAG Runs collection response serializer.
RecentDAGResponse:
properties:
dag_id:
type: string
title: Dag Id
latest_dag_runs:
items:
$ref: '#/components/schemas/DAGRunResponse'
type: array
title: Latest Dag Runs
type: object
required:
- dag_id
- latest_dag_runs
title: RecentDAGResponse
description: Recent DAG Runs response serializer.
SchedulerInfoSchema:
properties:
status:
Expand Down
45 changes: 24 additions & 21 deletions airflow/api_fastapi/core_api/routes/ui/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.serializers.dag_run import DAGRunResponse
from airflow.api_fastapi.core_api.serializers.dags import DAGResponse
from airflow.api_fastapi.core_api.serializers.ui.dags import (
RecentDAGCollectionResponse,
RecentDAGResponse,
DAGWithLatestDagRunsCollectionResponse,
DAGWithLatestDagRunsResponse,
)
from airflow.models import DagModel, DagRun

Expand All @@ -59,9 +60,9 @@ async def recent_dag_runs(
only_active: QueryOnlyActiveFilter,
paused: QueryPausedFilter,
last_dag_run_state: QueryLastDagRunStateFilter,
dag_runs_limit: QueryLimit,
session: Annotated[Session, Depends(get_session)],
) -> RecentDAGCollectionResponse:
dag_runs_limit: int = 10,
) -> DAGWithLatestDagRunsCollectionResponse:
"""Get recent DAG runs."""
recent_runs_subquery = (
select(
Expand All @@ -80,7 +81,7 @@ async def recent_dag_runs(
dags_with_recent_dag_runs_select = (
select(
DagRun,
DagModel.dag_id,
DagModel,
recent_runs_subquery.c.execution_date,
)
.join(DagModel, DagModel.dag_id == recent_runs_subquery.c.dag_id)
Expand All @@ -91,7 +92,7 @@ async def recent_dag_runs(
DagRun.execution_date == recent_runs_subquery.c.execution_date,
),
)
.where(recent_runs_subquery.c.rank <= dag_runs_limit.value)
.where(recent_runs_subquery.c.rank <= dag_runs_limit)
.group_by(
DagModel.dag_id,
recent_runs_subquery.c.execution_date,
Expand All @@ -107,24 +108,26 @@ async def recent_dag_runs(
offset,
limit,
)
dags_with_recent_dag_runs = session.scalars(dags_with_recent_dag_runs_select_filter).all()
dags_with_recent_dag_runs = session.execute(dags_with_recent_dag_runs_select_filter)
# aggregate rows by dag_id
dag_runs_by_dag_id: dict[str, list] = {}
dag_runs_by_dag_id: dict[str, DAGWithLatestDagRunsResponse] = {}

for row in dags_with_recent_dag_runs:
dag_id = row.dag_id
dag_run, dag, *_ = row
dag_id = dag.dag_id
dag_run_response = DAGRunResponse.model_validate(dag_run, from_attributes=True)
if dag_id not in dag_runs_by_dag_id:
dag_runs_by_dag_id[dag_id] = []
dag_runs_by_dag_id[dag_id].append(row)
dag_response = DAGResponse.model_validate(dag, from_attributes=True)
dag_runs_by_dag_id[dag_id] = DAGWithLatestDagRunsResponse.model_validate(
{
**dag_response.dict(),
"latest_dag_runs": [dag_run_response],
}
)
else:
dag_runs_by_dag_id[dag_id].latest_dag_runs.append(dag_run_response)

return RecentDAGCollectionResponse(
return DAGWithLatestDagRunsCollectionResponse(
total_entries=len(dag_runs_by_dag_id),
dags=[
RecentDAGResponse(
dag_id=dag_id,
latest_dag_runs=[
DAGRunResponse.model_validate(dag_run, from_attributes=True) for dag_run in dag_runs
],
)
for dag_id, dag_runs in dag_runs_by_dag_id.items()
],
dags=list(dag_runs_by_dag_id.values()),
)
12 changes: 6 additions & 6 deletions airflow/api_fastapi/core_api/serializers/ui/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@
from pydantic import BaseModel

from airflow.api_fastapi.core_api.serializers.dag_run import DAGRunResponse
from airflow.api_fastapi.core_api.serializers.dags import DAGResponse


class RecentDAGResponse(BaseModel):
"""Recent DAG Runs response serializer."""
class DAGWithLatestDagRunsResponse(DAGResponse):
"""DAG with latest dag runs response serializer."""

dag_id: str
latest_dag_runs: list[DAGRunResponse]


class RecentDAGCollectionResponse(BaseModel):
"""Recent DAG Runs collection response serializer."""
class DAGWithLatestDagRunsCollectionResponse(BaseModel):
"""DAG with latest dag runs collection response serializer."""

total_entries: int
dags: list[RecentDAGResponse] # type: ignore
dags: list[DAGWithLatestDagRunsResponse]
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export const UseDagsServiceRecentDagRunsKeyFn = (
{
dagDisplayNamePattern,
dagIdPattern,
dagRunsLimit,
lastDagRunState,
limit,
offset,
Expand All @@ -77,6 +78,7 @@ export const UseDagsServiceRecentDagRunsKeyFn = (
}: {
dagDisplayNamePattern?: string;
dagIdPattern?: string;
dagRunsLimit?: number;
lastDagRunState?: DagRunState;
limit?: number;
offset?: number;
Expand All @@ -92,6 +94,7 @@ export const UseDagsServiceRecentDagRunsKeyFn = (
{
dagDisplayNamePattern,
dagIdPattern,
dagRunsLimit,
lastDagRunState,
limit,
offset,
Expand Down
Loading

0 comments on commit c58a7e7

Please sign in to comment.