Skip to content

Commit

Permalink
AIP-84: Migrate Extra Links endpoint to fastapi (#44277)
Browse files Browse the repository at this point in the history
* migrate extra link endpoint to fastapi

* add airflow license

* add tests

* Apply suggestions from code review

Co-authored-by: Kalyan R <kalyan.ben10@live.com>

* add teardown in test

* change async to sync

* Address PR comments and fix static checks

* Address PR comment

---------

Co-authored-by: Sneha Prabhu <snehaprabhu@Snehas-MacBook-Pro.local>
Co-authored-by: Kalyan R <kalyan.ben10@live.com>
  • Loading branch information
3 people authored Nov 25, 2024
1 parent db260b0 commit 1351b08
Show file tree
Hide file tree
Showing 13 changed files with 2,620 additions and 1,839 deletions.
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/extra_link_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.exceptions import TaskNotFound
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
Expand All @@ -35,6 +36,7 @@
from airflow.models.dagbag import DagBag


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@provide_session
def get_extra_links(
Expand Down
25 changes: 25 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/extra_links.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from pydantic import RootModel


class ExtraLinksResponse(RootModel):
"""Extra Links Response."""

root: dict[str, str | None]
66 changes: 66 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2899,6 +2899,64 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/links:
get:
tags:
- Extra Links
- Task Instance
summary: Get Extra Links
description: Get extra links for task instance.
operationId: get_extra_links
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/ExtraLinksResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/importErrors/{import_error_id}:
get:
tags:
Expand Down Expand Up @@ -6734,6 +6792,14 @@ components:
- extra
title: EventLogResponse
description: Event Log Response.
ExtraLinksResponse:
additionalProperties:
anyOf:
- type: string
- type: 'null'
type: object
title: ExtraLinksResponse
description: Extra Links Response.
FastAPIAppResponse:
properties:
app:
Expand Down
3 changes: 3 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from airflow.api_fastapi.core_api.routes.public.dag_warning import dag_warning_router
from airflow.api_fastapi.core_api.routes.public.dags import dags_router
from airflow.api_fastapi.core_api.routes.public.event_logs import event_logs_router
from airflow.api_fastapi.core_api.routes.public.extra_links import extra_links_router
from airflow.api_fastapi.core_api.routes.public.import_error import import_error_router
from airflow.api_fastapi.core_api.routes.public.job import job_router
from airflow.api_fastapi.core_api.routes.public.log import task_instances_log_router
Expand Down Expand Up @@ -61,6 +62,7 @@
authenticated_router.include_router(dag_warning_router)
authenticated_router.include_router(dags_router)
authenticated_router.include_router(event_logs_router)
authenticated_router.include_router(extra_links_router)
authenticated_router.include_router(import_error_router)
authenticated_router.include_router(job_router)
authenticated_router.include_router(plugins_router)
Expand All @@ -72,6 +74,7 @@
authenticated_router.include_router(xcom_router)
authenticated_router.include_router(task_instances_log_router)


# Include authenticated router in public router
public_router.include_router(authenticated_router)

Expand Down
85 changes: 85 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/extra_links.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import TYPE_CHECKING, Annotated

from fastapi import Depends, HTTPException, Request, status
from sqlalchemy.orm import Session
from sqlalchemy.sql import select

from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.extra_links import ExtraLinksResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.exceptions import TaskNotFound

if TYPE_CHECKING:
from sqlalchemy.orm.session import Session

from airflow.models import DAG


extra_links_router = AirflowRouter(
tags=["Extra Links"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/links"
)


@extra_links_router.get(
"",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
tags=["Task Instance"],
)
def get_extra_links(
dag_id: str,
dag_run_id: str,
task_id: str,
session: Annotated[Session, Depends(get_session)],
request: Request,
) -> ExtraLinksResponse:
"""Get extra links for task instance."""
from airflow.models.taskinstance import TaskInstance

dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with ID = {dag_id} not found")

try:
task = dag.get_task(task_id)
except TaskNotFound:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Task with ID = {task_id} not found")

ti = session.scalar(
select(TaskInstance).where(
TaskInstance.dag_id == dag_id,
TaskInstance.run_id == dag_run_id,
TaskInstance.task_id == task_id,
)
)

if not ti:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"DAG Run with ID = {dag_run_id} not found",
)

all_extra_link_pairs = (
(link_name, task.get_extra_links(ti, link_name)) for link_name in task.extra_links
)
all_extra_links = {link_name: link_url or None for link_name, link_url in sorted(all_extra_link_pairs)}
return ExtraLinksResponse.model_validate(all_extra_links)
Loading

0 comments on commit 1351b08

Please sign in to comment.