Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84: Migrate Extra Links endpoint to fastapi #44277

Merged
merged 10 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/extra_link_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.exceptions import TaskNotFound
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
Expand All @@ -35,6 +36,7 @@
from airflow.models.dagbag import DagBag


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@provide_session
def get_extra_links(
Expand Down
25 changes: 25 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/extra_links.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from pydantic import RootModel


class ExtraLinksResponse(RootModel):
prabhusneha marked this conversation as resolved.
Show resolved Hide resolved
"""Extra Links Response."""

root: dict[str, str | None]
66 changes: 66 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2899,6 +2899,64 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/links:
get:
tags:
- Extra Links
- Task Instance
summary: Get Extra Links
description: Get extra links for task instance.
operationId: get_extra_links
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/ExtraLinksResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/importErrors/{import_error_id}:
get:
tags:
Expand Down Expand Up @@ -6734,6 +6792,14 @@ components:
- extra
title: EventLogResponse
description: Event Log Response.
ExtraLinksResponse:
additionalProperties:
anyOf:
- type: string
- type: 'null'
type: object
title: ExtraLinksResponse
description: Extra Links Response.
FastAPIAppResponse:
properties:
app:
Expand Down
3 changes: 3 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from airflow.api_fastapi.core_api.routes.public.dag_warning import dag_warning_router
from airflow.api_fastapi.core_api.routes.public.dags import dags_router
from airflow.api_fastapi.core_api.routes.public.event_logs import event_logs_router
from airflow.api_fastapi.core_api.routes.public.extra_links import extra_links_router
from airflow.api_fastapi.core_api.routes.public.import_error import import_error_router
from airflow.api_fastapi.core_api.routes.public.job import job_router
from airflow.api_fastapi.core_api.routes.public.log import task_instances_log_router
Expand Down Expand Up @@ -61,6 +62,7 @@
authenticated_router.include_router(dag_warning_router)
authenticated_router.include_router(dags_router)
authenticated_router.include_router(event_logs_router)
authenticated_router.include_router(extra_links_router)
prabhusneha marked this conversation as resolved.
Show resolved Hide resolved
authenticated_router.include_router(import_error_router)
authenticated_router.include_router(job_router)
authenticated_router.include_router(plugins_router)
Expand All @@ -72,6 +74,7 @@
authenticated_router.include_router(xcom_router)
authenticated_router.include_router(task_instances_log_router)


# Include authenticated router in public router
public_router.include_router(authenticated_router)

Expand Down
85 changes: 85 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/extra_links.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Licensed to the Apache Software Foundation (ASF) under one
prabhusneha marked this conversation as resolved.
Show resolved Hide resolved
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import TYPE_CHECKING, Annotated

from fastapi import Depends, HTTPException, Request, status
from sqlalchemy.orm import Session
from sqlalchemy.sql import select

from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.extra_links import ExtraLinksResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.exceptions import TaskNotFound
prabhusneha marked this conversation as resolved.
Show resolved Hide resolved

if TYPE_CHECKING:
from sqlalchemy.orm.session import Session

from airflow.models import DAG


extra_links_router = AirflowRouter(
tags=["Extra Links"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/links"
prabhusneha marked this conversation as resolved.
Show resolved Hide resolved
)


@extra_links_router.get(
"",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
tags=["Task Instance"],
)
def get_extra_links(
dag_id: str,
dag_run_id: str,
task_id: str,
session: Annotated[Session, Depends(get_session)],
request: Request,
) -> ExtraLinksResponse:
"""Get extra links for task instance."""
from airflow.models.taskinstance import TaskInstance

dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with ID = {dag_id} not found")

try:
task = dag.get_task(task_id)
except TaskNotFound:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Task with ID = {task_id} not found")

ti = session.scalar(
select(TaskInstance).where(
TaskInstance.dag_id == dag_id,
TaskInstance.run_id == dag_run_id,
TaskInstance.task_id == task_id,
)
)

if not ti:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"DAG Run with ID = {dag_run_id} not found",
)

all_extra_link_pairs = (
(link_name, task.get_extra_links(ti, link_name)) for link_name in task.extra_links
)
all_extra_links = {link_name: link_url or None for link_name, link_url in sorted(all_extra_link_pairs)}
return ExtraLinksResponse.model_validate(all_extra_links)
Loading