Skip to content

Commit

Permalink
AIP-84 Get Event Logs
Browse files Browse the repository at this point in the history
  • Loading branch information
jason810496 committed Oct 27, 2024
1 parent 8918cd9 commit 169e45e
Show file tree
Hide file tree
Showing 13 changed files with 886 additions and 2 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/event_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def get_event_log(*, event_log_id: int, session: Session = NEW_SESSION) -> APIRe
return event_log_schema.dump(event_log)


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.AUDIT_LOG)
@format_parameters({"limit": check_limit})
@provide_session
Expand Down
51 changes: 50 additions & 1 deletion airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from abc import ABC, abstractmethod
from datetime import datetime
from typing import TYPE_CHECKING, Any, Callable, Generic, List, TypeVar
from typing import TYPE_CHECKING, Any, Callable, Generic, List, Literal, TypeVar

from fastapi import Depends, HTTPException, Query
from pendulum.parsing.exceptions import ParserError
Expand Down Expand Up @@ -219,6 +219,55 @@ def inner(order_by: str = self.get_primary_key_string()) -> SortParam:
return inner


_filter_options = Literal["in", "not_in", "eq", "ne", "lt", "le", "gt", "ge"]


class FilterParam(BaseParam[T]):
"""Filter on attribute."""

def __init__(
self,
attribute: ColumnElement,
value: T | None = None,
filter_option: _filter_options = "eq",
skip_none: bool = True,
) -> None:
super().__init__(skip_none)
self.attribute: ColumnElement = attribute
self.value: T | None = value
self.filter_option: _filter_options = filter_option

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select

if isinstance(self.value, list):
if self.filter_option == "in":
return select.where(self.attribute.in_(self.value))
if self.filter_option == "not_in":
return select.where(self.attribute.notin_(self.value))
raise ValueError(f"Invalid filter option {self.filter_option} for list value {self.value}")

if self.filter_option == "eq":
return select.where(self.attribute == self.value)
if self.filter_option == "ne":
return select.where(self.attribute != self.value)
if self.filter_option == "lt":
return select.where(self.attribute < self.value)
if self.filter_option == "le":
return select.where(self.attribute <= self.value)
if self.filter_option == "gt":
return select.where(self.attribute > self.value)
if self.filter_option == "ge":
return select.where(self.attribute >= self.value)
raise ValueError(f"Invalid filter option {self.filter_option} for value {self.value}")

def depends(self, *args: Any, **kwargs: Any) -> Self:
raise NotImplementedError(
"Construct FilterParam directly within the router handler, depends is not implemented."
)


class _TagsFilter(BaseParam[List[str]]):
"""Filter on tags."""

Expand Down
152 changes: 152 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,142 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/eventLogs/:
get:
tags:
- Event Log
summary: Get Event Logs
description: Get all Event Logs.
operationId: get_event_logs
parameters:
- name: dag_id
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Dag Id
- name: task_id
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Task Id
- name: run_id
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Run Id
- name: map_index
in: query
required: false
schema:
anyOf:
- type: integer
- type: 'null'
title: Map Index
- name: try_number
in: query
required: false
schema:
anyOf:
- type: integer
- type: 'null'
title: Try Number
- name: owner
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Owner
- name: event
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Event
- name: excluded_events
in: query
required: false
schema:
anyOf:
- type: array
items:
type: string
- type: 'null'
title: Excluded Events
- name: included_events
in: query
required: false
schema:
anyOf:
- type: array
items:
type: string
- type: 'null'
title: Included Events
- name: before
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Before
- name: after
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: After
- name: limit
in: query
required: false
schema:
type: integer
default: 100
title: Limit
- name: offset
in: query
required: false
schema:
type: integer
default: 0
title: Offset
- name: order_by
in: query
required: false
schema:
type: string
default: id
title: Order By
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/EventLogCollectionResponse'
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
components:
schemas:
AppBuilderMenuItemResponse:
Expand Down Expand Up @@ -2462,6 +2598,22 @@ components:
title: DagTagPydantic
description: Serializable representation of the DagTag ORM SqlAlchemyModel used
by internal API.
EventLogCollectionResponse:
properties:
event_logs:
items:
$ref: '#/components/schemas/EventLogResponse'
type: array
title: Event Logs
total_entries:
type: integer
title: Total Entries
type: object
required:
- event_logs
- total_entries
title: EventLogCollectionResponse
description: Event Log Collection Response.
EventLogResponse:
properties:
event_log_id:
Expand Down
84 changes: 83 additions & 1 deletion airflow/api_fastapi/core_api/routes/public/event_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,26 @@
# under the License.
from __future__ import annotations

from fastapi import Depends, HTTPException
from datetime import datetime

from fastapi import Depends, HTTPException, Query
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.common.db.common import (
get_session,
paginated_select,
)
from airflow.api_fastapi.common.parameters import (
FilterParam,
QueryLimit,
QueryOffset,
SortParam,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.serializers.event_logs import (
EventLogCollectionResponse,
EventLogResponse,
)
from airflow.models import Log
Expand All @@ -45,3 +55,75 @@ async def get_event_log(
event_log,
from_attributes=True,
)


@event_logs_router.get("/")
async def get_event_logs(
limit: QueryLimit,
offset: QueryOffset,
session: Annotated[Session, Depends(get_session)],
order_by: Annotated[
SortParam,
Depends(
SortParam(
[
"id", # event_log_id
"dttm", # when
"dag_id",
"task_id",
"run_id",
"event",
"execution_date", # logical_date
"owner",
"extra",
],
Log,
).dynamic_depends()
),
],
dag_id: str | None = None,
task_id: str | None = None,
run_id: str | None = None,
map_index: int | None = None,
try_number: int | None = None,
owner: str | None = None,
event: str | None = None,
excluded_events: list[str] | None = Query(None),
included_events: list[str] | None = Query(None),
before: datetime | None = None,
after: datetime | None = None,
) -> EventLogCollectionResponse:
"""Get all Event Logs."""
base_select = select(Log).group_by(Log.id)
event_logs_select, total_entries = paginated_select(
base_select,
[
FilterParam(Log.dag_id, dag_id),
FilterParam(Log.task_id, task_id),
FilterParam(Log.run_id, run_id),
FilterParam(Log.map_index, map_index),
FilterParam(Log.event, event),
FilterParam(Log.try_number, try_number),
FilterParam(Log.owner, owner),
FilterParam(Log.event, excluded_events, "not_in"),
FilterParam(Log.event, included_events, "in"),
FilterParam(Log.dttm, before, "lt"),
FilterParam(Log.dttm, after, "gt"),
],
order_by,
offset,
limit,
session,
)
event_logs = session.scalars(event_logs_select).all()

return EventLogCollectionResponse(
event_logs=[
EventLogResponse.model_validate(
event_log,
from_attributes=True,
)
for event_log in event_logs
],
total_entries=total_entries,
)
7 changes: 7 additions & 0 deletions airflow/api_fastapi/core_api/serializers/event_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,10 @@ class EventLogResponse(BaseModel):
extra: str | None

model_config = ConfigDict(populate_by_name=True)


class EventLogCollectionResponse(BaseModel):
"""Event Log Collection Response."""

event_logs: list[EventLogResponse]
total_entries: int
Loading

0 comments on commit 169e45e

Please sign in to comment.