Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-64: Add task instance history list endpoint #40988

Merged
merged 1 commit into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 51 additions & 2 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from airflow.api_connexion.parameters import format_datetime, format_parameters
from airflow.api_connexion.schemas.task_instance_schema import (
TaskInstanceCollection,
TaskInstanceHistoryCollection,
TaskInstanceReferenceCollection,
clear_task_instance_form,
set_single_task_instance_state_form,
Expand All @@ -38,6 +39,7 @@
task_dependencies_collection_schema,
task_instance_batch_form,
task_instance_collection_schema,
task_instance_history_collection_schema,
task_instance_history_schema,
task_instance_reference_collection_schema,
task_instance_reference_schema,
Expand All @@ -49,6 +51,7 @@
from airflow.models import SlaMiss
from airflow.models.dagrun import DagRun as DR
from airflow.models.taskinstance import TaskInstance as TI, clear_task_instances
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, provide_session
Expand Down Expand Up @@ -769,7 +772,6 @@ def get_task_instance_try_details(
session: Session = NEW_SESSION,
) -> APIResponse:
"""Get details of a task instance try."""
from airflow.models.taskinstancehistory import TaskInstanceHistory

def _query(orm_object):
query = select(orm_object).where(
Expand All @@ -788,7 +790,7 @@ def _query(orm_object):
)
return result

result = _query(TI) or _query(TaskInstanceHistory)
result = _query(TI) or _query(TIH)
if result is None:
error_message = f"Task Instance not found for dag_id={dag_id}, run_id={dag_run_id}, task_id={task_id}, map_index={map_index}, try_number={task_try_number}."
raise NotFound("Task instance not found", detail=error_message)
Expand All @@ -814,3 +816,50 @@ def get_mapped_task_instance_try_details(
map_index=map_index,
session=session,
)


@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@provide_session
def get_task_instance_tries(
*,
dag_id: str,
dag_run_id: str,
task_id: str,
map_index: int = -1,
session: Session = NEW_SESSION,
) -> APIResponse:
"""Get list of task instances history."""

def _query(orm_object):
query = select(orm_object).where(
orm_object.dag_id == dag_id,
orm_object.run_id == dag_run_id,
orm_object.task_id == task_id,
orm_object.map_index == map_index,
)
return query

task_instances = session.scalars(_query(TIH)).all() + session.scalars(_query(TI)).all()
return task_instance_history_collection_schema.dump(
TaskInstanceHistoryCollection(task_instances=task_instances, total_entries=len(task_instances))
)


@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@provide_session
def get_mapped_task_instance_tries(
*,
dag_id: str,
dag_run_id: str,
task_id: str,
map_index: int,
session: Session = NEW_SESSION,
) -> APIResponse:
"""Get list of mapped task instances history."""
return get_task_instance_tries(
dag_id=dag_id,
dag_run_id=dag_run_id,
task_id=task_id,
map_index=map_index,
session=session,
)
64 changes: 64 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1751,6 +1751,69 @@ paths:
"404":
$ref: "#/components/responses/NotFound"

/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries:
get:
summary: List task instance tries
description: |
Get details of all task instance tries.

*New in version 2.10.0*
x-openapi-router-controller: airflow.api_connexion.endpoints.task_instance_endpoint
operationId: get_task_instance_tries
tags: [TaskInstance]
parameters:
- $ref: "#/components/parameters/DAGID"
- $ref: "#/components/parameters/DAGRunID"
- $ref: "#/components/parameters/TaskID"
- $ref: "#/components/parameters/PageLimit"
- $ref: "#/components/parameters/PageOffset"
- $ref: "#/components/parameters/OrderBy"
responses:
"200":
description: Success.
content:
application/json:
schema:
$ref: "#/components/schemas/TaskInstanceCollection"
"401":
$ref: "#/components/responses/Unauthenticated"
"403":
$ref: "#/components/responses/PermissionDenied"
"404":
$ref: "#/components/responses/NotFound"

/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries:
get:
summary: List mapped task instance tries
description: |
Get details of all task instance tries.

*New in version 2.10.0*
x-openapi-router-controller: airflow.api_connexion.endpoints.task_instance_endpoint
operationId: get_mapped_task_instance_tries
tags: [TaskInstance]
parameters:
- $ref: "#/components/parameters/DAGID"
- $ref: "#/components/parameters/DAGRunID"
- $ref: "#/components/parameters/TaskID"
- $ref: "#/components/parameters/MapIndex"
- $ref: "#/components/parameters/PageLimit"
- $ref: "#/components/parameters/PageOffset"
- $ref: "#/components/parameters/OrderBy"
responses:
"200":
description: Success.
content:
application/json:
schema:
$ref: "#/components/schemas/TaskInstanceCollection"
"401":
$ref: "#/components/responses/Unauthenticated"
"403":
$ref: "#/components/responses/PermissionDenied"
"404":
$ref: "#/components/responses/NotFound"

/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}:
get:
summary: get mapped taskinstance try
Expand Down Expand Up @@ -1781,6 +1844,7 @@ paths:
"404":
$ref: "#/components/responses/NotFound"


/variables:
get:
summary: List variables
Expand Down
15 changes: 15 additions & 0 deletions airflow/api_connexion/schemas/task_instance_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,20 @@ class TaskInstanceCollectionSchema(Schema):
total_entries = fields.Int()


class TaskInstanceHistoryCollection(NamedTuple):
"""List of task instances history with metadata."""

task_instances: list[TaskInstanceHistory | None]
total_entries: int


class TaskInstanceHistoryCollectionSchema(Schema):
"""Task instance collection schema."""

task_instances = fields.List(fields.Nested(TaskInstanceHistorySchema))
total_entries = fields.Int()


class TaskInstanceBatchFormSchema(Schema):
"""Schema for the request form passed to Task Instance Batch endpoint."""

Expand Down Expand Up @@ -279,3 +293,4 @@ class TaskDependencyCollectionSchema(Schema):
task_instance_reference_collection_schema = TaskInstanceReferenceCollectionSchema()
set_task_instance_note_form_schema = SetTaskInstanceNoteFormSchema()
task_instance_history_schema = TaskInstanceHistorySchema()
task_instance_history_collection_schema = TaskInstanceHistoryCollectionSchema()
108 changes: 108 additions & 0 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,22 @@ export interface paths {
*/
get: operations["get_task_instance_try_details"];
};
"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries": {
/**
* Get details of all task instance tries.
*
* *New in version 2.10.0*
*/
get: operations["get_task_instance_tries"];
};
"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries": {
/**
* Get details of all task instance tries.
*
* *New in version 2.10.0*
*/
get: operations["get_mapped_task_instance_tries"];
};
"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}": {
/**
* Get details of a mapped task instance try.
Expand Down Expand Up @@ -4320,6 +4336,90 @@ export interface operations {
404: components["responses"]["NotFound"];
};
};
/**
* Get details of all task instance tries.
*
* *New in version 2.10.0*
*/
get_task_instance_tries: {
parameters: {
path: {
/** The DAG ID. */
dag_id: components["parameters"]["DAGID"];
/** The DAG run ID. */
dag_run_id: components["parameters"]["DAGRunID"];
/** The task ID. */
task_id: components["parameters"]["TaskID"];
};
query: {
/** The numbers of items to return. */
limit?: components["parameters"]["PageLimit"];
/** The number of items to skip before starting to collect the result set. */
offset?: components["parameters"]["PageOffset"];
/**
* The name of the field to order the results by.
* Prefix a field name with `-` to reverse the sort order.
*
* *New in version 2.1.0*
*/
order_by?: components["parameters"]["OrderBy"];
};
};
responses: {
/** Success. */
200: {
content: {
"application/json": components["schemas"]["TaskInstanceCollection"];
};
};
401: components["responses"]["Unauthenticated"];
403: components["responses"]["PermissionDenied"];
404: components["responses"]["NotFound"];
};
};
/**
* Get details of all task instance tries.
*
* *New in version 2.10.0*
*/
get_mapped_task_instance_tries: {
parameters: {
path: {
/** The DAG ID. */
dag_id: components["parameters"]["DAGID"];
/** The DAG run ID. */
dag_run_id: components["parameters"]["DAGRunID"];
bbovenzi marked this conversation as resolved.
Show resolved Hide resolved
/** The task ID. */
task_id: components["parameters"]["TaskID"];
/** The map index. */
map_index: components["parameters"]["MapIndex"];
};
query: {
/** The numbers of items to return. */
limit?: components["parameters"]["PageLimit"];
/** The number of items to skip before starting to collect the result set. */
offset?: components["parameters"]["PageOffset"];
/**
* The name of the field to order the results by.
* Prefix a field name with `-` to reverse the sort order.
*
* *New in version 2.1.0*
*/
order_by?: components["parameters"]["OrderBy"];
};
};
responses: {
/** Success. */
200: {
content: {
"application/json": components["schemas"]["TaskInstanceCollection"];
};
};
401: components["responses"]["Unauthenticated"];
403: components["responses"]["PermissionDenied"];
404: components["responses"]["NotFound"];
};
};
/**
* Get details of a mapped task instance try.
*
Expand Down Expand Up @@ -5740,6 +5840,14 @@ export type GetTaskInstancesBatchVariables = CamelCasedPropertiesDeep<
export type GetTaskInstanceTryDetailsVariables = CamelCasedPropertiesDeep<
operations["get_task_instance_try_details"]["parameters"]["path"]
>;
export type GetTaskInstanceTriesVariables = CamelCasedPropertiesDeep<
operations["get_task_instance_tries"]["parameters"]["path"] &
operations["get_task_instance_tries"]["parameters"]["query"]
>;
export type GetMappedTaskInstanceTriesVariables = CamelCasedPropertiesDeep<
operations["get_mapped_task_instance_tries"]["parameters"]["path"] &
operations["get_mapped_task_instance_tries"]["parameters"]["query"]
>;
export type GetMappedTaskInstanceTryDetailsVariables = CamelCasedPropertiesDeep<
operations["get_mapped_task_instance_try_details"]["parameters"]["path"]
>;
Expand Down
Loading