From 14abd334e8827b7e8d412a56a228dbded00a4623 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com> Date: Fri, 25 Oct 2024 18:08:42 +0800 Subject: [PATCH] AIP-84: Add UI batch recent dag runs endpoint (#43204) * AIP-84 | add UI batch recent dag runs endpoint * AIP-84 | add UI batch recent dag runs * refactor: use public serializer for ui batch recent runs * fix: add trigger_by for dag run in public test_dag * refactor: use DAGRunResponse from public endpoint * Update code review --------- Co-authored-by: pierrejeambrun --- .../core_api/openapi/v1-generated.yaml | 272 +++++++++++++++++ .../core_api/routes/ui/__init__.py | 2 + .../api_fastapi/core_api/routes/ui/dags.py | 133 ++++++++ .../core_api/serializers/ui/__init__.py | 16 + .../core_api/serializers/ui/dags.py | 36 +++ airflow/ui/openapi-gen/queries/common.ts | 51 ++++ airflow/ui/openapi-gen/queries/prefetch.ts | 71 +++++ airflow/ui/openapi-gen/queries/queries.ts | 80 +++++ airflow/ui/openapi-gen/queries/suspense.ts | 80 +++++ .../ui/openapi-gen/requests/schemas.gen.ts | 283 ++++++++++++++++++ .../ui/openapi-gen/requests/services.gen.ts | 45 +++ airflow/ui/openapi-gen/requests/types.gen.ts | 74 +++++ .../core_api/routes/public/test_dags.py | 4 +- .../core_api/routes/ui/test_dags.py | 104 +++++++ 14 files changed, 1250 insertions(+), 1 deletion(-) create mode 100644 airflow/api_fastapi/core_api/routes/ui/dags.py create mode 100644 airflow/api_fastapi/core_api/serializers/ui/__init__.py create mode 100644 airflow/api_fastapi/core_api/serializers/ui/dags.py create mode 100644 tests/api_fastapi/core_api/routes/ui/test_dags.py diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 76c28214100f8..a6ac7ac79d28d 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -73,6 +73,103 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /ui/dags/recent_dag_runs: + get: + tags: + - Dags + summary: Recent Dag Runs + description: Get recent DAG runs. + operationId: recent_dag_runs + parameters: + - name: dag_runs_limit + in: query + required: false + schema: + type: integer + default: 10 + title: Dag Runs Limit + - name: limit + in: query + required: false + schema: + type: integer + default: 100 + title: Limit + - name: offset + in: query + required: false + schema: + type: integer + default: 0 + title: Offset + - name: tags + in: query + required: false + schema: + type: array + items: + type: string + title: Tags + - name: owners + in: query + required: false + schema: + type: array + items: + type: string + title: Owners + - name: dag_id_pattern + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Dag Id Pattern + - name: dag_display_name_pattern + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Dag Display Name Pattern + - name: only_active + in: query + required: false + schema: + type: boolean + default: true + title: Only Active + - name: paused + in: query + required: false + schema: + anyOf: + - type: boolean + - type: 'null' + title: Paused + - name: last_dag_run_state + in: query + required: false + schema: + anyOf: + - $ref: '#/components/schemas/DagRunState' + - type: 'null' + title: Last Dag Run State + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/DAGWithLatestDagRunsCollectionResponse' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/dags/: get: tags: @@ -2093,6 +2190,181 @@ components: - total_entries title: DAGTagCollectionResponse description: DAG Tags Collection serializer for responses. + DAGWithLatestDagRunsCollectionResponse: + properties: + total_entries: + type: integer + title: Total Entries + dags: + items: + $ref: '#/components/schemas/DAGWithLatestDagRunsResponse' + type: array + title: Dags + type: object + required: + - total_entries + - dags + title: DAGWithLatestDagRunsCollectionResponse + description: DAG with latest dag runs collection response serializer. + DAGWithLatestDagRunsResponse: + properties: + dag_id: + type: string + title: Dag Id + dag_display_name: + type: string + title: Dag Display Name + is_paused: + type: boolean + title: Is Paused + is_active: + type: boolean + title: Is Active + last_parsed_time: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Parsed Time + last_pickled: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Pickled + last_expired: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Expired + scheduler_lock: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Scheduler Lock + pickle_id: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Pickle Id + default_view: + anyOf: + - type: string + - type: 'null' + title: Default View + fileloc: + type: string + title: Fileloc + description: + anyOf: + - type: string + - type: 'null' + title: Description + timetable_summary: + anyOf: + - type: string + - type: 'null' + title: Timetable Summary + timetable_description: + anyOf: + - type: string + - type: 'null' + title: Timetable Description + tags: + items: + $ref: '#/components/schemas/DagTagPydantic' + type: array + title: Tags + max_active_tasks: + type: integer + title: Max Active Tasks + max_active_runs: + anyOf: + - type: integer + - type: 'null' + title: Max Active Runs + max_consecutive_failed_dag_runs: + type: integer + title: Max Consecutive Failed Dag Runs + has_task_concurrency_limits: + type: boolean + title: Has Task Concurrency Limits + has_import_errors: + type: boolean + title: Has Import Errors + next_dagrun: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun + next_dagrun_data_interval_start: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun Data Interval Start + next_dagrun_data_interval_end: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun Data Interval End + next_dagrun_create_after: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun Create After + owners: + items: + type: string + type: array + title: Owners + latest_dag_runs: + items: + $ref: '#/components/schemas/DAGRunResponse' + type: array + title: Latest Dag Runs + file_token: + type: string + title: File Token + description: Return file token. + readOnly: true + type: object + required: + - dag_id + - dag_display_name + - is_paused + - is_active + - last_parsed_time + - last_pickled + - last_expired + - scheduler_lock + - pickle_id + - default_view + - fileloc + - description + - timetable_summary + - timetable_description + - tags + - max_active_tasks + - max_active_runs + - max_consecutive_failed_dag_runs + - has_task_concurrency_limits + - has_import_errors + - next_dagrun + - next_dagrun_data_interval_start + - next_dagrun_data_interval_end + - next_dagrun_create_after + - owners + - latest_dag_runs + - file_token + title: DAGWithLatestDagRunsResponse + description: DAG with latest dag runs response serializer. DagProcessorInfoSchema: properties: status: diff --git a/airflow/api_fastapi/core_api/routes/ui/__init__.py b/airflow/api_fastapi/core_api/routes/ui/__init__.py index 9cd16fcdd16b3..b7ebf9c5c46fc 100644 --- a/airflow/api_fastapi/core_api/routes/ui/__init__.py +++ b/airflow/api_fastapi/core_api/routes/ui/__init__.py @@ -18,9 +18,11 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.routes.ui.assets import assets_router +from airflow.api_fastapi.core_api.routes.ui.dags import dags_router from airflow.api_fastapi.core_api.routes.ui.dashboard import dashboard_router ui_router = AirflowRouter(prefix="/ui") ui_router.include_router(assets_router) ui_router.include_router(dashboard_router) +ui_router.include_router(dags_router) diff --git a/airflow/api_fastapi/core_api/routes/ui/dags.py b/airflow/api_fastapi/core_api/routes/ui/dags.py new file mode 100644 index 0000000000000..665373734bb90 --- /dev/null +++ b/airflow/api_fastapi/core_api/routes/ui/dags.py @@ -0,0 +1,133 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from fastapi import Depends +from sqlalchemy import and_, func, select +from sqlalchemy.orm import Session +from typing_extensions import Annotated + +from airflow.api_fastapi.common.db.common import ( + get_session, + paginated_select, +) +from airflow.api_fastapi.common.parameters import ( + QueryDagDisplayNamePatternSearch, + QueryDagIdPatternSearch, + QueryLastDagRunStateFilter, + QueryLimit, + QueryOffset, + QueryOnlyActiveFilter, + QueryOwnersFilter, + QueryPausedFilter, + QueryTagsFilter, +) +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.serializers.dag_run import DAGRunResponse +from airflow.api_fastapi.core_api.serializers.dags import DAGResponse +from airflow.api_fastapi.core_api.serializers.ui.dags import ( + DAGWithLatestDagRunsCollectionResponse, + DAGWithLatestDagRunsResponse, +) +from airflow.models import DagModel, DagRun + +dags_router = AirflowRouter(prefix="/dags", tags=["Dags"]) + + +@dags_router.get("/recent_dag_runs", include_in_schema=False, response_model_exclude_none=True) +async def recent_dag_runs( + limit: QueryLimit, + offset: QueryOffset, + tags: QueryTagsFilter, + owners: QueryOwnersFilter, + dag_id_pattern: QueryDagIdPatternSearch, + dag_display_name_pattern: QueryDagDisplayNamePatternSearch, + only_active: QueryOnlyActiveFilter, + paused: QueryPausedFilter, + last_dag_run_state: QueryLastDagRunStateFilter, + session: Annotated[Session, Depends(get_session)], + dag_runs_limit: int = 10, +) -> DAGWithLatestDagRunsCollectionResponse: + """Get recent DAG runs.""" + recent_runs_subquery = ( + select( + DagRun.dag_id, + DagRun.execution_date, + func.rank() + .over( + partition_by=DagRun.dag_id, + order_by=DagRun.execution_date.desc(), + ) + .label("rank"), + ) + .order_by(DagRun.execution_date.desc()) + .subquery() + ) + dags_with_recent_dag_runs_select = ( + select( + DagRun, + DagModel, + recent_runs_subquery.c.execution_date, + ) + .join(DagModel, DagModel.dag_id == recent_runs_subquery.c.dag_id) + .join( + DagRun, + and_( + DagRun.dag_id == DagModel.dag_id, + DagRun.execution_date == recent_runs_subquery.c.execution_date, + ), + ) + .where(recent_runs_subquery.c.rank <= dag_runs_limit) + .group_by( + DagModel.dag_id, + recent_runs_subquery.c.execution_date, + DagRun.execution_date, + DagRun.id, + ) + .order_by(recent_runs_subquery.c.execution_date.desc()) + ) + dags_with_recent_dag_runs_select_filter, _ = paginated_select( + dags_with_recent_dag_runs_select, + [only_active, paused, dag_id_pattern, dag_display_name_pattern, tags, owners, last_dag_run_state], + None, + offset, + limit, + ) + dags_with_recent_dag_runs = session.execute(dags_with_recent_dag_runs_select_filter) + # aggregate rows by dag_id + dag_runs_by_dag_id: dict[str, DAGWithLatestDagRunsResponse] = {} + + for row in dags_with_recent_dag_runs: + dag_run, dag, *_ = row + dag_id = dag.dag_id + dag_run_response = DAGRunResponse.model_validate(dag_run, from_attributes=True) + if dag_id not in dag_runs_by_dag_id: + dag_response = DAGResponse.model_validate(dag, from_attributes=True) + dag_runs_by_dag_id[dag_id] = DAGWithLatestDagRunsResponse.model_validate( + { + **dag_response.dict(), + "latest_dag_runs": [dag_run_response], + } + ) + else: + dag_runs_by_dag_id[dag_id].latest_dag_runs.append(dag_run_response) + + return DAGWithLatestDagRunsCollectionResponse( + total_entries=len(dag_runs_by_dag_id), + dags=list(dag_runs_by_dag_id.values()), + ) diff --git a/airflow/api_fastapi/core_api/serializers/ui/__init__.py b/airflow/api_fastapi/core_api/serializers/ui/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/api_fastapi/core_api/serializers/ui/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/api_fastapi/core_api/serializers/ui/dags.py b/airflow/api_fastapi/core_api/serializers/ui/dags.py new file mode 100644 index 0000000000000..f985ce99a9725 --- /dev/null +++ b/airflow/api_fastapi/core_api/serializers/ui/dags.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from pydantic import BaseModel + +from airflow.api_fastapi.core_api.serializers.dag_run import DAGRunResponse +from airflow.api_fastapi.core_api.serializers.dags import DAGResponse + + +class DAGWithLatestDagRunsResponse(DAGResponse): + """DAG with latest dag runs response serializer.""" + + latest_dag_runs: list[DAGRunResponse] + + +class DAGWithLatestDagRunsCollectionResponse(BaseModel): + """DAG with latest dag runs collection response serializer.""" + + total_entries: int + dags: list[DAGWithLatestDagRunsResponse] diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index b12c133b6c2ea..5fa46e4e91f09 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -6,6 +6,7 @@ import { ConnectionService, DagRunService, DagService, + DagsService, DashboardService, MonitorService, PluginService, @@ -54,6 +55,56 @@ export const UseDashboardServiceHistoricalMetricsKeyFn = ( useDashboardServiceHistoricalMetricsKey, ...(queryKey ?? [{ endDate, startDate }]), ]; +export type DagsServiceRecentDagRunsDefaultResponse = Awaited< + ReturnType +>; +export type DagsServiceRecentDagRunsQueryResult< + TData = DagsServiceRecentDagRunsDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useDagsServiceRecentDagRunsKey = "DagsServiceRecentDagRuns"; +export const UseDagsServiceRecentDagRunsKeyFn = ( + { + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }: { + dagDisplayNamePattern?: string; + dagIdPattern?: string; + dagRunsLimit?: number; + lastDagRunState?: DagRunState; + limit?: number; + offset?: number; + onlyActive?: boolean; + owners?: string[]; + paused?: boolean; + tags?: string[]; + } = {}, + queryKey?: Array, +) => [ + useDagsServiceRecentDagRunsKey, + ...(queryKey ?? [ + { + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }, + ]), +]; export type DagServiceGetDagsDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 3f681a4a13b60..72b4376751f7f 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -6,6 +6,7 @@ import { ConnectionService, DagRunService, DagService, + DagsService, DashboardService, MonitorService, PluginService, @@ -62,6 +63,76 @@ export const prefetchUseDashboardServiceHistoricalMetrics = ( }), queryFn: () => DashboardService.historicalMetrics({ endDate, startDate }), }); +/** + * Recent Dag Runs + * Get recent DAG runs. + * @param data The data for the request. + * @param data.dagRunsLimit + * @param data.limit + * @param data.offset + * @param data.tags + * @param data.owners + * @param data.dagIdPattern + * @param data.dagDisplayNamePattern + * @param data.onlyActive + * @param data.paused + * @param data.lastDagRunState + * @returns DAGWithLatestDagRunsCollectionResponse Successful Response + * @throws ApiError + */ +export const prefetchUseDagsServiceRecentDagRuns = ( + queryClient: QueryClient, + { + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }: { + dagDisplayNamePattern?: string; + dagIdPattern?: string; + dagRunsLimit?: number; + lastDagRunState?: DagRunState; + limit?: number; + offset?: number; + onlyActive?: boolean; + owners?: string[]; + paused?: boolean; + tags?: string[]; + } = {}, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseDagsServiceRecentDagRunsKeyFn({ + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }), + queryFn: () => + DagsService.recentDagRuns({ + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }), + }); /** * Get Dags * Get all DAGs. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index e3942ad84e086..ce319bc6cd676 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -11,6 +11,7 @@ import { ConnectionService, DagRunService, DagService, + DagsService, DashboardService, MonitorService, PluginService, @@ -86,6 +87,85 @@ export const useDashboardServiceHistoricalMetrics = < DashboardService.historicalMetrics({ endDate, startDate }) as TData, ...options, }); +/** + * Recent Dag Runs + * Get recent DAG runs. + * @param data The data for the request. + * @param data.dagRunsLimit + * @param data.limit + * @param data.offset + * @param data.tags + * @param data.owners + * @param data.dagIdPattern + * @param data.dagDisplayNamePattern + * @param data.onlyActive + * @param data.paused + * @param data.lastDagRunState + * @returns DAGWithLatestDagRunsCollectionResponse Successful Response + * @throws ApiError + */ +export const useDagsServiceRecentDagRuns = < + TData = Common.DagsServiceRecentDagRunsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }: { + dagDisplayNamePattern?: string; + dagIdPattern?: string; + dagRunsLimit?: number; + lastDagRunState?: DagRunState; + limit?: number; + offset?: number; + onlyActive?: boolean; + owners?: string[]; + paused?: boolean; + tags?: string[]; + } = {}, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseDagsServiceRecentDagRunsKeyFn( + { + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }, + queryKey, + ), + queryFn: () => + DagsService.recentDagRuns({ + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }) as TData, + ...options, + }); /** * Get Dags * Get all DAGs. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index eb91e8f1ba936..cd7bc95fa5b59 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -6,6 +6,7 @@ import { ConnectionService, DagRunService, DagService, + DagsService, DashboardService, MonitorService, PluginService, @@ -75,6 +76,85 @@ export const useDashboardServiceHistoricalMetricsSuspense = < DashboardService.historicalMetrics({ endDate, startDate }) as TData, ...options, }); +/** + * Recent Dag Runs + * Get recent DAG runs. + * @param data The data for the request. + * @param data.dagRunsLimit + * @param data.limit + * @param data.offset + * @param data.tags + * @param data.owners + * @param data.dagIdPattern + * @param data.dagDisplayNamePattern + * @param data.onlyActive + * @param data.paused + * @param data.lastDagRunState + * @returns DAGWithLatestDagRunsCollectionResponse Successful Response + * @throws ApiError + */ +export const useDagsServiceRecentDagRunsSuspense = < + TData = Common.DagsServiceRecentDagRunsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }: { + dagDisplayNamePattern?: string; + dagIdPattern?: string; + dagRunsLimit?: number; + lastDagRunState?: DagRunState; + limit?: number; + offset?: number; + onlyActive?: boolean; + owners?: string[]; + paused?: boolean; + tags?: string[]; + } = {}, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseDagsServiceRecentDagRunsKeyFn( + { + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }, + queryKey, + ), + queryFn: () => + DagsService.recentDagRuns({ + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }) as TData, + ...options, + }); /** * Get Dags * Get all DAGs. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index b940646a69a77..a30712d02acea 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -1122,6 +1122,289 @@ export const $DAGTagCollectionResponse = { description: "DAG Tags Collection serializer for responses.", } as const; +export const $DAGWithLatestDagRunsCollectionResponse = { + properties: { + total_entries: { + type: "integer", + title: "Total Entries", + }, + dags: { + items: { + $ref: "#/components/schemas/DAGWithLatestDagRunsResponse", + }, + type: "array", + title: "Dags", + }, + }, + type: "object", + required: ["total_entries", "dags"], + title: "DAGWithLatestDagRunsCollectionResponse", + description: "DAG with latest dag runs collection response serializer.", +} as const; + +export const $DAGWithLatestDagRunsResponse = { + properties: { + dag_id: { + type: "string", + title: "Dag Id", + }, + dag_display_name: { + type: "string", + title: "Dag Display Name", + }, + is_paused: { + type: "boolean", + title: "Is Paused", + }, + is_active: { + type: "boolean", + title: "Is Active", + }, + last_parsed_time: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Last Parsed Time", + }, + last_pickled: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Last Pickled", + }, + last_expired: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Last Expired", + }, + scheduler_lock: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Scheduler Lock", + }, + pickle_id: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Pickle Id", + }, + default_view: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Default View", + }, + fileloc: { + type: "string", + title: "Fileloc", + }, + description: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Description", + }, + timetable_summary: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Timetable Summary", + }, + timetable_description: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Timetable Description", + }, + tags: { + items: { + $ref: "#/components/schemas/DagTagPydantic", + }, + type: "array", + title: "Tags", + }, + max_active_tasks: { + type: "integer", + title: "Max Active Tasks", + }, + max_active_runs: { + anyOf: [ + { + type: "integer", + }, + { + type: "null", + }, + ], + title: "Max Active Runs", + }, + max_consecutive_failed_dag_runs: { + type: "integer", + title: "Max Consecutive Failed Dag Runs", + }, + has_task_concurrency_limits: { + type: "boolean", + title: "Has Task Concurrency Limits", + }, + has_import_errors: { + type: "boolean", + title: "Has Import Errors", + }, + next_dagrun: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Next Dagrun", + }, + next_dagrun_data_interval_start: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Next Dagrun Data Interval Start", + }, + next_dagrun_data_interval_end: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Next Dagrun Data Interval End", + }, + next_dagrun_create_after: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Next Dagrun Create After", + }, + owners: { + items: { + type: "string", + }, + type: "array", + title: "Owners", + }, + latest_dag_runs: { + items: { + $ref: "#/components/schemas/DAGRunResponse", + }, + type: "array", + title: "Latest Dag Runs", + }, + file_token: { + type: "string", + title: "File Token", + description: "Return file token.", + readOnly: true, + }, + }, + type: "object", + required: [ + "dag_id", + "dag_display_name", + "is_paused", + "is_active", + "last_parsed_time", + "last_pickled", + "last_expired", + "scheduler_lock", + "pickle_id", + "default_view", + "fileloc", + "description", + "timetable_summary", + "timetable_description", + "tags", + "max_active_tasks", + "max_active_runs", + "max_consecutive_failed_dag_runs", + "has_task_concurrency_limits", + "has_import_errors", + "next_dagrun", + "next_dagrun_data_interval_start", + "next_dagrun_data_interval_end", + "next_dagrun_create_after", + "owners", + "latest_dag_runs", + "file_token", + ], + title: "DAGWithLatestDagRunsResponse", + description: "DAG with latest dag runs response serializer.", +} as const; + export const $DagProcessorInfoSchema = { properties: { status: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index bfe1d2e39d361..08e93457c4f67 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -7,6 +7,8 @@ import type { NextRunAssetsResponse, HistoricalMetricsData, HistoricalMetricsResponse, + RecentDagRunsData, + RecentDagRunsResponse, GetDagsData, GetDagsResponse, PatchDagsData, @@ -111,6 +113,49 @@ export class DashboardService { } } +export class DagsService { + /** + * Recent Dag Runs + * Get recent DAG runs. + * @param data The data for the request. + * @param data.dagRunsLimit + * @param data.limit + * @param data.offset + * @param data.tags + * @param data.owners + * @param data.dagIdPattern + * @param data.dagDisplayNamePattern + * @param data.onlyActive + * @param data.paused + * @param data.lastDagRunState + * @returns DAGWithLatestDagRunsCollectionResponse Successful Response + * @throws ApiError + */ + public static recentDagRuns( + data: RecentDagRunsData = {}, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/ui/dags/recent_dag_runs", + query: { + dag_runs_limit: data.dagRunsLimit, + limit: data.limit, + offset: data.offset, + tags: data.tags, + owners: data.owners, + dag_id_pattern: data.dagIdPattern, + dag_display_name_pattern: data.dagDisplayNamePattern, + only_active: data.onlyActive, + paused: data.paused, + last_dag_run_state: data.lastDagRunState, + }, + errors: { + 422: "Validation Error", + }, + }); + } +} + export class DagService { /** * Get Dags diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 1bca12c04e777..174190f9f493a 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -206,6 +206,50 @@ export type DAGTagCollectionResponse = { total_entries: number; }; +/** + * DAG with latest dag runs collection response serializer. + */ +export type DAGWithLatestDagRunsCollectionResponse = { + total_entries: number; + dags: Array; +}; + +/** + * DAG with latest dag runs response serializer. + */ +export type DAGWithLatestDagRunsResponse = { + dag_id: string; + dag_display_name: string; + is_paused: boolean; + is_active: boolean; + last_parsed_time: string | null; + last_pickled: string | null; + last_expired: string | null; + scheduler_lock: string | null; + pickle_id: string | null; + default_view: string | null; + fileloc: string; + description: string | null; + timetable_summary: string | null; + timetable_description: string | null; + tags: Array; + max_active_tasks: number; + max_active_runs: number | null; + max_consecutive_failed_dag_runs: number; + has_task_concurrency_limits: boolean; + has_import_errors: boolean; + next_dagrun: string | null; + next_dagrun_data_interval_start: string | null; + next_dagrun_data_interval_end: string | null; + next_dagrun_create_after: string | null; + owners: Array; + latest_dag_runs: Array; + /** + * Return file token. + */ + readonly file_token: string; +}; + /** * Schema for DagProcessor info. */ @@ -474,6 +518,21 @@ export type HistoricalMetricsData = { export type HistoricalMetricsResponse = HistoricalMetricDataResponse; +export type RecentDagRunsData = { + dagDisplayNamePattern?: string | null; + dagIdPattern?: string | null; + dagRunsLimit?: number; + lastDagRunState?: DagRunState | null; + limit?: number; + offset?: number; + onlyActive?: boolean; + owners?: Array; + paused?: boolean | null; + tags?: Array; +}; + +export type RecentDagRunsResponse = DAGWithLatestDagRunsCollectionResponse; + export type GetDagsData = { dagDisplayNamePattern?: string | null; dagIdPattern?: string | null; @@ -696,6 +755,21 @@ export type $OpenApiTs = { }; }; }; + "/ui/dags/recent_dag_runs": { + get: { + req: RecentDagRunsData; + res: { + /** + * Successful Response + */ + 200: DAGWithLatestDagRunsCollectionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/dags/": { get: { req: GetDagsData; diff --git a/tests/api_fastapi/core_api/routes/public/test_dags.py b/tests/api_fastapi/core_api/routes/public/test_dags.py index 0d3b8abbdec8c..03253dbfa7a7a 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dags.py +++ b/tests/api_fastapi/core_api/routes/public/test_dags.py @@ -26,7 +26,7 @@ from airflow.operators.empty import EmptyOperator from airflow.utils.session import provide_session from airflow.utils.state import DagRunState, TaskInstanceState -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests_common.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags @@ -73,6 +73,7 @@ def _create_deactivated_paused_dag(self, session=None): start_date=datetime(2018, 1, 1, 12, 0, 0, tzinfo=timezone.utc), run_type=DagRunType.SCHEDULED, state=DagRunState.FAILED, + triggered_by=DagRunTriggeredByType.TEST, ) dagrun_success = DagRun( @@ -82,6 +83,7 @@ def _create_deactivated_paused_dag(self, session=None): start_date=datetime(2019, 1, 1, 12, 0, 0, tzinfo=timezone.utc), run_type=DagRunType.MANUAL, state=DagRunState.SUCCESS, + triggered_by=DagRunTriggeredByType.TEST, ) session.add(dag_model) diff --git a/tests/api_fastapi/core_api/routes/ui/test_dags.py b/tests/api_fastapi/core_api/routes/ui/test_dags.py new file mode 100644 index 0000000000000..7258476bf163f --- /dev/null +++ b/tests/api_fastapi/core_api/routes/ui/test_dags.py @@ -0,0 +1,104 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime, timezone + +import pendulum +import pytest + +from airflow.models import DagRun +from airflow.utils.session import provide_session +from airflow.utils.state import DagRunState +from airflow.utils.types import DagRunTriggeredByType, DagRunType + +from tests.api_fastapi.core_api.routes.public.test_dags import ( + DAG1_ID, + DAG2_ID, + DAG3_ID, + DAG4_ID, + DAG5_ID, + TestDagEndpoint as TestPublicDagEndpoint, +) + +pytestmark = pytest.mark.db_test + + +class TestRecentDagRuns(TestPublicDagEndpoint): + @pytest.fixture(autouse=True) + @provide_session + def setup_dag_runs(self, session=None) -> None: + # Create DAG Runs + for dag_id in [DAG1_ID, DAG2_ID, DAG3_ID, DAG4_ID, DAG5_ID]: + dag_runs_count = 5 if dag_id in [DAG1_ID, DAG2_ID] else 2 + for i in range(dag_runs_count): + start_date = datetime(2021 + i, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + dag_run = DagRun( + dag_id=dag_id, + run_id=f"run_id_{i+1}", + run_type=DagRunType.MANUAL, + start_date=start_date, + execution_date=start_date, + state=(DagRunState.FAILED if i % 2 == 0 else DagRunState.SUCCESS), + triggered_by=DagRunTriggeredByType.TEST, + ) + dag_run.end_date = dag_run.start_date + pendulum.duration(hours=1) + session.add(dag_run) + session.commit() + + @pytest.mark.parametrize( + "query_params, expected_ids,expected_total_dag_runs", + [ + # Filters + ({}, [DAG1_ID, DAG2_ID], 11), + ({"limit": 1}, [DAG1_ID], 2), + ({"offset": 1}, [DAG1_ID, DAG2_ID], 11), + ({"tags": ["example"]}, [DAG1_ID], 6), + ({"only_active": False}, [DAG1_ID, DAG2_ID, DAG3_ID], 15), + ({"paused": True, "only_active": False}, [DAG3_ID], 4), + ({"paused": False}, [DAG1_ID, DAG2_ID], 11), + ({"owners": ["airflow"]}, [DAG1_ID, DAG2_ID], 11), + ({"owners": ["test_owner"], "only_active": False}, [DAG3_ID], 4), + ({"last_dag_run_state": "success", "only_active": False}, [DAG1_ID, DAG2_ID, DAG3_ID], 6), + ({"last_dag_run_state": "failed", "only_active": False}, [DAG1_ID, DAG2_ID, DAG3_ID], 9), + # Search + ({"dag_id_pattern": "1"}, [DAG1_ID], 6), + ({"dag_display_name_pattern": "test_dag2"}, [DAG2_ID], 5), + ], + ) + def test_recent_dag_runs(self, test_client, query_params, expected_ids, expected_total_dag_runs): + response = test_client.get("/ui/dags/recent_dag_runs", params=query_params) + assert response.status_code == 200 + body = response.json() + assert body["total_entries"] == len(expected_ids) + required_dag_run_key = [ + "run_id", + "dag_id", + "state", + "logical_date", + ] + for recent_dag_runs in body["dags"]: + dag_runs = recent_dag_runs["latest_dag_runs"] + # check date ordering + previous_execution_date = None + for dag_run in dag_runs: + # validate the response + for key in required_dag_run_key: + assert key in dag_run + if previous_execution_date: + assert previous_execution_date > dag_run["logical_date"] + previous_execution_date = dag_run["logical_date"]