Skip to content

Commit

Permalink
Revert "AIP-84: Migrating GET queued asset events for DAG to fastAPI (a…
Browse files Browse the repository at this point in the history
…pache#43934)"

This reverts commit 3917730.
  • Loading branch information
potiuk committed Nov 16, 2024
1 parent 2db0d11 commit a42cf9c
Show file tree
Hide file tree
Showing 13 changed files with 17 additions and 507 deletions.
1 change: 0 additions & 1 deletion airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ def delete_dag_asset_queued_event(
)


@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@security.requires_access_dag("GET")
@provide_session
Expand Down
26 changes: 2 additions & 24 deletions airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from abc import ABC, abstractmethod
from datetime import datetime
from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List, Optional, TypeVar, Union, overload
from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List, Optional, TypeVar

from fastapi import Depends, HTTPException, Query
from pendulum.parsing.exceptions import ParserError
Expand Down Expand Up @@ -409,27 +409,6 @@ def _safe_parse_datetime(date_to_check: str) -> datetime:
"""
if not date_to_check:
raise ValueError(f"{date_to_check} cannot be None.")
return _safe_parse_datetime_optional(date_to_check)


@overload
def _safe_parse_datetime_optional(date_to_check: str) -> datetime: ...


@overload
def _safe_parse_datetime_optional(date_to_check: None) -> None: ...


def _safe_parse_datetime_optional(date_to_check: str | None) -> datetime | None:
"""
Parse datetime and raise error for invalid dates.
Allow None values.
:param date_to_check: the string value to be parsed
"""
if date_to_check is None:
return None
try:
return timezone.parse(date_to_check, strict=True)
except (TypeError, ParserError):
Expand Down Expand Up @@ -635,8 +614,7 @@ def depends_float(


# Common Safe DateTime
DateTimeQuery = Annotated[datetime, AfterValidator(_safe_parse_datetime)]
OptionalDateTimeQuery = Annotated[Union[datetime, None], AfterValidator(_safe_parse_datetime_optional)]
DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)]

# DAG
QueryLimit = Annotated[LimitFilter, Depends(LimitFilter().depends)]
Expand Down
15 changes: 0 additions & 15 deletions airflow/api_fastapi/core_api/datamodels/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,6 @@ class AssetEventCollectionResponse(BaseModel):
total_entries: int


class QueuedEventResponse(BaseModel):
"""Queued Event serializer for responses.."""

uri: str
dag_id: str
created_at: datetime


class QueuedEventCollectionResponse(BaseModel):
"""Queued Event Collection serializer for responses."""

queued_events: list[QueuedEventResponse]
total_entries: int


class CreateAssetEventsBody(BaseModel):
"""Create asset events request."""

Expand Down
102 changes: 5 additions & 97 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,12 @@ paths:
required: true
schema:
type: string
format: date-time
title: Start Date
- name: end_date
in: query
required: true
schema:
type: string
format: date-time
title: End Date
responses:
'200':
Expand Down Expand Up @@ -172,7 +170,7 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/assets:
/public/assets/:
get:
tags:
- Asset
Expand Down Expand Up @@ -348,19 +346,18 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/events:
post:
tags:
- Asset
summary: Create Asset Event
description: Create asset events.
operationId: create_asset_event
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/CreateAssetEventsBody'
required: true
responses:
'200':
description: Successful Response
Expand All @@ -369,23 +366,23 @@ paths:
schema:
$ref: '#/components/schemas/AssetEventResponse'
'401':
description: Unauthorized
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
description: Forbidden
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
description: Not Found
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -437,60 +434,6 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/assets/queuedEvent:
get:
tags:
- Asset
summary: Get Dag Asset Queued Events
description: Get queued asset events for a DAG.
operationId: get_dag_asset_queued_events
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: before
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Before
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/QueuedEventCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/backfills/:
get:
tags:
Expand Down Expand Up @@ -5787,41 +5730,6 @@ components:
- version
title: ProviderResponse
description: Provider serializer for responses.
QueuedEventCollectionResponse:
properties:
queued_events:
items:
$ref: '#/components/schemas/QueuedEventResponse'
type: array
title: Queued Events
total_entries:
type: integer
title: Total Entries
type: object
required:
- queued_events
- total_entries
title: QueuedEventCollectionResponse
description: Queued Event Collection serializer for responses.
QueuedEventResponse:
properties:
uri:
type: string
title: Uri
dag_id:
type: string
title: Dag Id
created_at:
type: string
format: date-time
title: Created At
type: object
required:
- uri
- dag_id
- created_at
title: QueuedEventResponse
description: Queued Event serializer for responses..
ReprocessBehavior:
type: string
enum:
Expand Down
79 changes: 5 additions & 74 deletions airflow/api_fastapi/core_api/routes/public/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from __future__ import annotations

from datetime import datetime
from typing import Annotated

from fastapi import Depends, HTTPException, status
Expand All @@ -26,7 +25,6 @@

from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import (
OptionalDateTimeQuery,
QueryAssetDagIdPatternSearch,
QueryAssetIdFilter,
QueryLimit,
Expand All @@ -45,41 +43,18 @@
AssetEventResponse,
AssetResponse,
CreateAssetEventsBody,
QueuedEventCollectionResponse,
QueuedEventResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.assets import Asset
from airflow.assets.manager import asset_manager
from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
from airflow.models.asset import AssetEvent, AssetModel
from airflow.utils import timezone

assets_router = AirflowRouter(tags=["Asset"])


def _generate_queued_event_where_clause(
*,
dag_id: str | None = None,
uri: str | None = None,
before: datetime | None = None,
) -> list:
"""Get AssetDagRunQueue where clause."""
where_clause = []
if dag_id is not None:
where_clause.append(AssetDagRunQueue.target_dag_id == dag_id)
if uri is not None:
where_clause.append(
AssetDagRunQueue.asset_id.in_(
select(AssetModel.id).where(AssetModel.uri == uri),
),
)
if before is not None:
where_clause.append(AssetDagRunQueue.created_at < before)
return where_clause
assets_router = AirflowRouter(tags=["Asset"], prefix="/assets")


@assets_router.get(
"/assets",
"/",
responses=create_openapi_http_exception_doc([401, 403, 404]),
)
def get_assets(
Expand Down Expand Up @@ -114,7 +89,7 @@ def get_assets(


@assets_router.get(
"/assets/events",
"/events",
responses=create_openapi_http_exception_doc([404]),
)
def get_asset_events(
Expand Down Expand Up @@ -190,7 +165,7 @@ def create_asset_event(


@assets_router.get(
"/assets/{uri:path}",
"/{uri:path}",
responses=create_openapi_http_exception_doc([401, 403, 404]),
)
def get_asset(
Expand All @@ -208,47 +183,3 @@ def get_asset(
raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri: `{uri}` was not found")

return AssetResponse.model_validate(asset, from_attributes=True)


@assets_router.get(
"/dags/{dag_id}/assets/queuedEvent",
responses=create_openapi_http_exception_doc(
[
status.HTTP_404_NOT_FOUND,
]
),
)
def get_dag_asset_queued_events(
dag_id: str,
session: Annotated[Session, Depends(get_session)],
before: OptionalDateTimeQuery = None,
) -> QueuedEventCollectionResponse:
"""Get queued asset events for a DAG."""
where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before)
query = (
select(AssetDagRunQueue, AssetModel.uri)
.join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
.where(*where_clause)
)

dag_asset_queued_events_select, total_entries = paginated_select(
query,
[],
)
adrqs = session.execute(dag_asset_queued_events_select).all()

if not adrqs:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with dag_id: `{dag_id}` was not found")

queued_events = [
QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri)
for adrq, uri in adrqs
]

return QueuedEventCollectionResponse(
queued_events=[
QueuedEventResponse.model_validate(queued_event, from_attributes=True)
for queued_event in queued_events
],
total_entries=total_entries,
)
Loading

0 comments on commit a42cf9c

Please sign in to comment.