Skip to content

Commit

Permalink
AIP 84: Migrate GET ASSET EVENTS legacy API to fast API (#43881)
Browse files Browse the repository at this point in the history
* AIP-84: Migrating GET Assets to fastAPI

* matching response to legacy

* Adding unit tests - part 1

* Update airflow/api_fastapi/common/parameters.py

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>

* fixing the dag_ids filter

* fixing the dag_ids filter

* Adding unit tests - part 2

* fixing unit tests & updating parameter type

* review comments pierre

* fixing last commit

* fixing unit tests

* migrating get assets events endpoint to fastapi

* fixing test response

* Adding tests for filtering

* address review comments

* fixing test parametrize

* address review comments

* address review comments

* removing http 401 and 403 as its now added in  root router in #43932

---------

Co-authored-by: Amogh <amoghrajesh1999@gmail.com>
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 14, 2024
1 parent b3362f8 commit 9a364ac
Show file tree
Hide file tree
Showing 13 changed files with 1,124 additions and 6 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def get_assets(
return asset_collection_schema.dump(AssetCollection(assets=assets, total_entries=total_entries))


@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@provide_session
@format_parameters({"limit": check_limit})
Expand Down
97 changes: 96 additions & 1 deletion airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from airflow.api_connexion.endpoints.task_instance_endpoint import _convert_ti_states
from airflow.models import Base, Connection
from airflow.models.asset import AssetModel, DagScheduleAssetReference, TaskOutletAssetReference
from airflow.models.asset import AssetEvent, AssetModel, DagScheduleAssetReference, TaskOutletAssetReference
from airflow.models.dag import DagModel, DagTag
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning, DagWarningType
Expand Down Expand Up @@ -440,6 +440,86 @@ def to_orm(self, select: Select) -> Select:
)


class _AssetIdFilter(BaseParam[int]):
"""Filter on asset_id."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, asset_id: int | None = None) -> _AssetIdFilter:
return self.set_value(asset_id)


class _SourceDagIdFilter(BaseParam[str]):
"""Filter on source_dag_id."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, source_dag_id: str | None = None) -> _SourceDagIdFilter:
return self.set_value(source_dag_id)


class _SourceTaskIdFilter(BaseParam[str]):
"""Filter on source_task_id."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, source_task_id: str | None = None) -> _SourceTaskIdFilter:
return self.set_value(source_task_id)


class _SourceRunIdFilter(BaseParam[str]):
"""filter on source_run_id."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, source_run_id: str | None = None) -> _SourceRunIdFilter:
return self.set_value(source_run_id)


class _SourceMapIndexFilter(BaseParam[int]):
"""Filter on source_map_index."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, source_map_index: int | None = None) -> _SourceMapIndexFilter:
return self.set_value(source_map_index)


class Range(BaseModel, Generic[T]):
"""Range with a lower and upper bound."""

Expand Down Expand Up @@ -537,3 +617,18 @@ def depends_float(
QueryAssetDagIdPatternSearch = Annotated[
_DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter().depends)
]
QueryAssetIdFilter = Annotated[_AssetIdFilter, Depends(_AssetIdFilter(AssetEvent.asset_id).depends)]


QuerySourceDagIdFilter = Annotated[
_SourceDagIdFilter, Depends(_SourceDagIdFilter(AssetEvent.source_dag_id).depends)
]
QuerySourceTaskIdFilter = Annotated[
_SourceTaskIdFilter, Depends(_SourceTaskIdFilter(AssetEvent.source_task_id).depends)
]
QuerySourceRunIdFilter = Annotated[
_SourceRunIdFilter, Depends(_SourceRunIdFilter(AssetEvent.source_run_id).depends)
]
QuerySourceMapIndexFilter = Annotated[
_SourceMapIndexFilter, Depends(_SourceMapIndexFilter(AssetEvent.source_map_index).depends)
]
37 changes: 36 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from datetime import datetime

from pydantic import BaseModel
from pydantic import BaseModel, Field


class DagScheduleAssetReference(BaseModel):
Expand Down Expand Up @@ -64,3 +64,38 @@ class AssetCollectionResponse(BaseModel):

assets: list[AssetResponse]
total_entries: int


class DagRunAssetReference(BaseModel):
"""DAGRun serializer for asset responses."""

run_id: str
dag_id: str
execution_date: datetime = Field(alias="logical_date")
start_date: datetime
end_date: datetime
state: str
data_interval_start: datetime
data_interval_end: datetime


class AssetEventResponse(BaseModel):
"""Asset event serializer for responses."""

id: int
asset_id: int
uri: str
extra: dict | None = None
source_task_id: str | None = None
source_dag_id: str | None = None
source_run_id: str | None = None
source_map_index: int
created_dagruns: list[DagRunAssetReference]
timestamp: datetime


class AssetEventCollectionResponse(BaseModel):
"""Asset event collection response."""

asset_events: list[AssetEventResponse]
total_entries: int
Loading

0 comments on commit 9a364ac

Please sign in to comment.