Skip to content

Commit

Permalink
AIP-84 Get Event Logs (apache#43407)
Browse files Browse the repository at this point in the history
* AIP-84 Get Event Logs

* fix: add http execption docs for router

* refactor: remove `FilterParam` out of this PR
  • Loading branch information
jason810496 authored and ellisms committed Nov 13, 2024
1 parent 0b804db commit 37948f2
Show file tree
Hide file tree
Showing 12 changed files with 871 additions and 1 deletion.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/event_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def get_event_log(*, event_log_id: int, session: Session = NEW_SESSION) -> APIRe
return event_log_schema.dump(event_log)


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.AUDIT_LOG)
@format_parameters({"limit": check_limit})
@provide_session
Expand Down
164 changes: 164 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,154 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/eventLogs/:
get:
tags:
- Event Log
summary: Get Event Logs
description: Get all Event Logs.
operationId: get_event_logs
parameters:
- name: dag_id
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Dag Id
- name: task_id
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Task Id
- name: run_id
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Run Id
- name: map_index
in: query
required: false
schema:
anyOf:
- type: integer
- type: 'null'
title: Map Index
- name: try_number
in: query
required: false
schema:
anyOf:
- type: integer
- type: 'null'
title: Try Number
- name: owner
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Owner
- name: event
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Event
- name: excluded_events
in: query
required: false
schema:
anyOf:
- type: array
items:
type: string
- type: 'null'
title: Excluded Events
- name: included_events
in: query
required: false
schema:
anyOf:
- type: array
items:
type: string
- type: 'null'
title: Included Events
- name: before
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Before
- name: after
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: After
- name: limit
in: query
required: false
schema:
type: integer
default: 100
title: Limit
- name: offset
in: query
required: false
schema:
type: integer
default: 0
title: Offset
- name: order_by
in: query
required: false
schema:
type: string
default: id
title: Order By
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/EventLogCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/monitor/health:
get:
tags:
Expand Down Expand Up @@ -3149,6 +3297,22 @@ components:
This is the set of allowable values for the ``warning_type`` field
in the DagWarning model.'
EventLogCollectionResponse:
properties:
event_logs:
items:
$ref: '#/components/schemas/EventLogResponse'
type: array
title: Event Logs
total_entries:
type: integer
title: Total Entries
type: object
required:
- event_logs
- total_entries
title: EventLogCollectionResponse
description: Event Log Collection Response.
EventLogResponse:
properties:
event_log_id:
Expand Down
97 changes: 96 additions & 1 deletion airflow/api_fastapi/core_api/routes/public/event_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,26 @@
# under the License.
from __future__ import annotations

from fastapi import Depends, HTTPException, status
from datetime import datetime

from fastapi import Depends, HTTPException, Query, status
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.common.db.common import (
get_session,
paginated_select,
)
from airflow.api_fastapi.common.parameters import (
QueryLimit,
QueryOffset,
SortParam,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.event_logs import (
EventLogCollectionResponse,
EventLogResponse,
)
from airflow.models import Log
Expand All @@ -51,3 +60,89 @@ async def get_event_log(
event_log,
from_attributes=True,
)


@event_logs_router.get(
"/",
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def get_event_logs(
limit: QueryLimit,
offset: QueryOffset,
session: Annotated[Session, Depends(get_session)],
order_by: Annotated[
SortParam,
Depends(
SortParam(
[
"id", # event_log_id
"dttm", # when
"dag_id",
"task_id",
"run_id",
"event",
"execution_date", # logical_date
"owner",
"extra",
],
Log,
).dynamic_depends()
),
],
dag_id: str | None = None,
task_id: str | None = None,
run_id: str | None = None,
map_index: int | None = None,
try_number: int | None = None,
owner: str | None = None,
event: str | None = None,
excluded_events: list[str] | None = Query(None),
included_events: list[str] | None = Query(None),
before: datetime | None = None,
after: datetime | None = None,
) -> EventLogCollectionResponse:
"""Get all Event Logs."""
base_select = select(Log).group_by(Log.id)
# TODO: Refactor using the `FilterParam` class in commit `574b72e41cc5ed175a2bbf4356522589b836bb11`
if dag_id is not None:
base_select = base_select.where(Log.dag_id == dag_id)
if task_id is not None:
base_select = base_select.where(Log.task_id == task_id)
if run_id is not None:
base_select = base_select.where(Log.run_id == run_id)
if map_index is not None:
base_select = base_select.where(Log.map_index == map_index)
if try_number is not None:
base_select = base_select.where(Log.try_number == try_number)
if owner is not None:
base_select = base_select.where(Log.owner == owner)
if event is not None:
base_select = base_select.where(Log.event == event)
if excluded_events is not None:
base_select = base_select.where(Log.event.notin_(excluded_events))
if included_events is not None:
base_select = base_select.where(Log.event.in_(included_events))
if before is not None:
base_select = base_select.where(Log.dttm < before)
if after is not None:
base_select = base_select.where(Log.dttm > after)
event_logs_select, total_entries = paginated_select(
base_select,
[],
order_by,
offset,
limit,
session,
)
event_logs = session.scalars(event_logs_select).all()

return EventLogCollectionResponse(
event_logs=[
EventLogResponse.model_validate(
event_log,
from_attributes=True,
)
for event_log in event_logs
],
total_entries=total_entries,
)
7 changes: 7 additions & 0 deletions airflow/api_fastapi/core_api/serializers/event_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,10 @@ class EventLogResponse(BaseModel):
extra: str | None

model_config = ConfigDict(populate_by_name=True)


class EventLogCollectionResponse(BaseModel):
"""Event Log Collection Response."""

event_logs: list[EventLogResponse]
total_entries: int
62 changes: 62 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,68 @@ export const UseEventLogServiceGetEventLogKeyFn = (
},
queryKey?: Array<unknown>,
) => [useEventLogServiceGetEventLogKey, ...(queryKey ?? [{ eventLogId }])];
export type EventLogServiceGetEventLogsDefaultResponse = Awaited<
ReturnType<typeof EventLogService.getEventLogs>
>;
export type EventLogServiceGetEventLogsQueryResult<
TData = EventLogServiceGetEventLogsDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useEventLogServiceGetEventLogsKey = "EventLogServiceGetEventLogs";
export const UseEventLogServiceGetEventLogsKeyFn = (
{
after,
before,
dagId,
event,
excludedEvents,
includedEvents,
limit,
mapIndex,
offset,
orderBy,
owner,
runId,
taskId,
tryNumber,
}: {
after?: string;
before?: string;
dagId?: string;
event?: string;
excludedEvents?: string[];
includedEvents?: string[];
limit?: number;
mapIndex?: number;
offset?: number;
orderBy?: string;
owner?: string;
runId?: string;
taskId?: string;
tryNumber?: number;
} = {},
queryKey?: Array<unknown>,
) => [
useEventLogServiceGetEventLogsKey,
...(queryKey ?? [
{
after,
before,
dagId,
event,
excludedEvents,
includedEvents,
limit,
mapIndex,
offset,
orderBy,
owner,
runId,
taskId,
tryNumber,
},
]),
];
export type MonitorServiceGetHealthDefaultResponse = Awaited<
ReturnType<typeof MonitorService.getHealth>
>;
Expand Down
Loading

0 comments on commit 37948f2

Please sign in to comment.