Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84 Get Event Logs #43407

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/event_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def get_event_log(*, event_log_id: int, session: Session = NEW_SESSION) -> APIRe
return event_log_schema.dump(event_log)


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.AUDIT_LOG)
@format_parameters({"limit": check_limit})
@provide_session
Expand Down
164 changes: 164 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,154 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/eventLogs/:
get:
tags:
- Event Log
summary: Get Event Logs
description: Get all Event Logs.
operationId: get_event_logs
parameters:
- name: dag_id
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Dag Id
- name: task_id
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Task Id
- name: run_id
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Run Id
- name: map_index
in: query
required: false
schema:
anyOf:
- type: integer
- type: 'null'
title: Map Index
- name: try_number
in: query
required: false
schema:
anyOf:
- type: integer
- type: 'null'
title: Try Number
- name: owner
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Owner
- name: event
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Event
- name: excluded_events
in: query
required: false
schema:
anyOf:
- type: array
items:
type: string
- type: 'null'
title: Excluded Events
- name: included_events
in: query
required: false
schema:
anyOf:
- type: array
items:
type: string
- type: 'null'
title: Included Events
- name: before
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Before
- name: after
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: After
- name: limit
in: query
required: false
schema:
type: integer
default: 100
title: Limit
- name: offset
in: query
required: false
schema:
type: integer
default: 0
title: Offset
- name: order_by
in: query
required: false
schema:
type: string
default: id
title: Order By
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/EventLogCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/monitor/health:
get:
tags:
Expand Down Expand Up @@ -3191,6 +3339,22 @@ components:
This is the set of allowable values for the ``warning_type`` field

in the DagWarning model.'
EventLogCollectionResponse:
properties:
event_logs:
items:
$ref: '#/components/schemas/EventLogResponse'
type: array
title: Event Logs
total_entries:
type: integer
title: Total Entries
type: object
required:
- event_logs
- total_entries
title: EventLogCollectionResponse
description: Event Log Collection Response.
EventLogResponse:
properties:
event_log_id:
Expand Down
97 changes: 96 additions & 1 deletion airflow/api_fastapi/core_api/routes/public/event_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,26 @@
# under the License.
from __future__ import annotations

from fastapi import Depends, HTTPException, status
from datetime import datetime

from fastapi import Depends, HTTPException, Query, status
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.common.db.common import (
get_session,
paginated_select,
)
from airflow.api_fastapi.common.parameters import (
QueryLimit,
QueryOffset,
SortParam,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.event_logs import (
EventLogCollectionResponse,
EventLogResponse,
)
from airflow.models import Log
Expand All @@ -51,3 +60,89 @@ async def get_event_log(
event_log,
from_attributes=True,
)


@event_logs_router.get(
"/",
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def get_event_logs(
limit: QueryLimit,
offset: QueryOffset,
session: Annotated[Session, Depends(get_session)],
order_by: Annotated[
SortParam,
Depends(
SortParam(
[
"id", # event_log_id
"dttm", # when
"dag_id",
"task_id",
"run_id",
"event",
"execution_date", # logical_date
"owner",
"extra",
],
Log,
).dynamic_depends()
),
],
dag_id: str | None = None,
task_id: str | None = None,
run_id: str | None = None,
map_index: int | None = None,
try_number: int | None = None,
owner: str | None = None,
event: str | None = None,
excluded_events: list[str] | None = Query(None),
included_events: list[str] | None = Query(None),
before: datetime | None = None,
after: datetime | None = None,
pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved
) -> EventLogCollectionResponse:
"""Get all Event Logs."""
base_select = select(Log).group_by(Log.id)
# TODO: Refactor using the `FilterParam` class in commit `574b72e41cc5ed175a2bbf4356522589b836bb11`
if dag_id is not None:
base_select = base_select.where(Log.dag_id == dag_id)
if task_id is not None:
base_select = base_select.where(Log.task_id == task_id)
if run_id is not None:
base_select = base_select.where(Log.run_id == run_id)
if map_index is not None:
base_select = base_select.where(Log.map_index == map_index)
if try_number is not None:
base_select = base_select.where(Log.try_number == try_number)
if owner is not None:
base_select = base_select.where(Log.owner == owner)
if event is not None:
base_select = base_select.where(Log.event == event)
if excluded_events is not None:
base_select = base_select.where(Log.event.notin_(excluded_events))
if included_events is not None:
base_select = base_select.where(Log.event.in_(included_events))
if before is not None:
base_select = base_select.where(Log.dttm < before)
if after is not None:
base_select = base_select.where(Log.dttm > after)
event_logs_select, total_entries = paginated_select(
base_select,
[],
order_by,
offset,
limit,
session,
)
event_logs = session.scalars(event_logs_select).all()

return EventLogCollectionResponse(
event_logs=[
EventLogResponse.model_validate(
event_log,
from_attributes=True,
)
for event_log in event_logs
],
total_entries=total_entries,
)
7 changes: 7 additions & 0 deletions airflow/api_fastapi/core_api/serializers/event_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,10 @@ class EventLogResponse(BaseModel):
extra: str | None

model_config = ConfigDict(populate_by_name=True)


class EventLogCollectionResponse(BaseModel):
"""Event Log Collection Response."""

event_logs: list[EventLogResponse]
total_entries: int
62 changes: 62 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,68 @@ export const UseEventLogServiceGetEventLogKeyFn = (
},
queryKey?: Array<unknown>,
) => [useEventLogServiceGetEventLogKey, ...(queryKey ?? [{ eventLogId }])];
export type EventLogServiceGetEventLogsDefaultResponse = Awaited<
ReturnType<typeof EventLogService.getEventLogs>
>;
export type EventLogServiceGetEventLogsQueryResult<
TData = EventLogServiceGetEventLogsDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useEventLogServiceGetEventLogsKey = "EventLogServiceGetEventLogs";
export const UseEventLogServiceGetEventLogsKeyFn = (
{
after,
before,
dagId,
event,
excludedEvents,
includedEvents,
limit,
mapIndex,
offset,
orderBy,
owner,
runId,
taskId,
tryNumber,
}: {
after?: string;
before?: string;
dagId?: string;
event?: string;
excludedEvents?: string[];
includedEvents?: string[];
limit?: number;
mapIndex?: number;
offset?: number;
orderBy?: string;
owner?: string;
runId?: string;
taskId?: string;
tryNumber?: number;
} = {},
queryKey?: Array<unknown>,
) => [
useEventLogServiceGetEventLogsKey,
...(queryKey ?? [
{
after,
before,
dagId,
event,
excludedEvents,
includedEvents,
limit,
mapIndex,
offset,
orderBy,
owner,
runId,
taskId,
tryNumber,
},
]),
];
export type MonitorServiceGetHealthDefaultResponse = Awaited<
ReturnType<typeof MonitorService.getHealth>
>;
Expand Down
Loading