Skip to content

Commit

Permalink
Migrate the public endpoint Delete DAG to FastAPI (apache#42914)
Browse files Browse the repository at this point in the history
* Migrate the public endpoint Delete DAG to FastAPI

* Refactor tests
  • Loading branch information
omkar-foss authored and Lorin committed Oct 17, 2024
1 parent 94c14ae commit dabc4f2
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 82 deletions.
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from airflow.api_connexion.types import APIResponse, UpdateMask


@mark_fastapi_migration_done
@security.requires_access_dag("GET")
@provide_session
def get_dag(
Expand Down Expand Up @@ -215,6 +216,7 @@ def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pat
return dags_collection_schema.dump(DAGCollection(dags=dags, total_entries=total_entries))


@mark_fastapi_migration_done
@security.requires_access_dag("DELETE")
@action_logging
@provide_session
Expand Down
49 changes: 49 additions & 0 deletions airflow/api_fastapi/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,55 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
delete:
tags:
- DAG
summary: Delete Dag
description: Delete the specific DAG.
operationId: delete_dag
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
responses:
'200':
description: Successful Response
content:
application/json:
schema: {}
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unprocessable Entity
/public/dags/{dag_id}/details:
get:
tags:
Expand Down
19 changes: 18 additions & 1 deletion airflow/api_fastapi/views/public/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

from __future__ import annotations

from fastapi import Depends, HTTPException, Query, Request
from fastapi import Depends, HTTPException, Query, Request, Response
from sqlalchemy import update
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api.common import delete_dag as delete_dag_module
from airflow.api_fastapi.db.common import (
get_session,
paginated_select,
Expand All @@ -48,6 +49,7 @@
DAGResponse,
)
from airflow.api_fastapi.views.router import AirflowRouter
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models import DAG, DagModel

dags_router = AirflowRouter(tags=["DAG"], prefix="/dags")
Expand Down Expand Up @@ -204,3 +206,18 @@ async def patch_dags(
dags=[DAGResponse.model_validate(dag, from_attributes=True) for dag in dags],
total_entries=total_entries,
)


@dags_router.delete("/{dag_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422]))
async def delete_dag(
dag_id: str,
session: Annotated[Session, Depends(get_session)],
) -> Response:
"""Delete the specific DAG."""
try:
delete_dag_module.delete_dag(dag_id, session=session)
except DagNotFound:
raise HTTPException(404, f"Dag with id: {dag_id} was not found")
except AirflowException:
raise HTTPException(409, f"Task instances of dag with id: '{dag_id}' are still running")
return Response(status_code=204)
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ export type DagServicePatchDagMutationResult = Awaited<
export type VariableServicePatchVariableMutationResult = Awaited<
ReturnType<typeof VariableService.patchVariable>
>;
export type DagServiceDeleteDagMutationResult = Awaited<
ReturnType<typeof DagService.deleteDag>
>;
export type ConnectionServiceDeleteConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.deleteConnection>
>;
Expand Down
37 changes: 37 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,43 @@ export const useVariableServicePatchVariable = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Delete Dag
* Delete the specific DAG.
* @param data The data for the request.
* @param data.dagId
* @returns unknown Successful Response
* @throws ApiError
*/
export const useDagServiceDeleteDag = <
TData = Common.DagServiceDeleteDagMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
dagId: string;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
dagId: string;
},
TContext
>({
mutationFn: ({ dagId }) =>
DagService.deleteDag({ dagId }) as unknown as Promise<TData>,
...options,
});
/**
* Delete Connection
* Delete a connection entry.
Expand Down
29 changes: 29 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import type {
GetDagResponse,
PatchDagData,
PatchDagResponse,
DeleteDagData,
DeleteDagResponse,
GetDagDetailsData,
GetDagDetailsResponse,
DeleteConnectionData,
Expand Down Expand Up @@ -234,6 +236,33 @@ export class DagService {
});
}

/**
* Delete Dag
* Delete the specific DAG.
* @param data The data for the request.
* @param data.dagId
* @returns unknown Successful Response
* @throws ApiError
*/
public static deleteDag(
data: DeleteDagData,
): CancelablePromise<DeleteDagResponse> {
return __request(OpenAPI, {
method: "DELETE",
url: "/public/dags/{dag_id}",
path: {
dag_id: data.dagId,
},
errors: {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Unprocessable Entity",
},
});
}

/**
* Get Dag Details
* Get details of DAG.
Expand Down
35 changes: 35 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,12 @@ export type PatchDagData = {

export type PatchDagResponse = DAGResponse;

export type DeleteDagData = {
dagId: string;
};

export type DeleteDagResponse = unknown;

export type GetDagDetailsData = {
dagId: string;
};
Expand Down Expand Up @@ -525,6 +531,35 @@ export type $OpenApiTs = {
422: HTTPValidationError;
};
};
delete: {
req: DeleteDagData;
res: {
/**
* Successful Response
*/
200: unknown;
/**
* Bad Request
*/
400: HTTPExceptionResponse;
/**
* Unauthorized
*/
401: HTTPExceptionResponse;
/**
* Forbidden
*/
403: HTTPExceptionResponse;
/**
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Unprocessable Entity
*/
422: HTTPExceptionResponse;
};
};
};
"/public/dags/{dag_id}/details": {
get: {
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/api_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
RT = TypeVar("RT")


def mark_fastapi_migration_done(function: Callable[PS, RT]) -> Callable[PS, RT]:
def mark_fastapi_migration_done(function: Callable[..., RT]) -> Callable[..., RT]:
"""
Mark an endpoint as migrated over to the new FastAPI API.
Expand Down
Loading

0 comments on commit dabc4f2

Please sign in to comment.