diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 273d69ab705db..6b5800f870413 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -1243,6 +1243,8 @@ paths: - $ref: "#/components/parameters/FilterDAGID" - $ref: "#/components/parameters/FilterTaskID" - $ref: "#/components/parameters/FilterRunID" + - $ref: "#/components/parameters/FilterMapIndex" + - $ref: "#/components/parameters/FilterTryNumber" - $ref: "#/components/parameters/Event" - $ref: "#/components/parameters/Owner" - $ref: "#/components/parameters/Before" @@ -5587,6 +5589,13 @@ components: type: integer description: Filter on map index for mapped task. + FilterTryNumber: + in: query + name: try_number + schema: + type: integer + description: Filter on try_number for task instance. + OrderBy: in: query name: order_by diff --git a/airflow/api_connexion/schemas/event_log_schema.py b/airflow/api_connexion/schemas/event_log_schema.py index da1fc73cc1697..bf88d1a9ce88c 100644 --- a/airflow/api_connexion/schemas/event_log_schema.py +++ b/airflow/api_connexion/schemas/event_log_schema.py @@ -37,6 +37,8 @@ class Meta: dag_id = auto_field(dump_only=True) task_id = auto_field(dump_only=True) run_id = auto_field(dump_only=True) + map_index = auto_field(dump_only=True) + try_number = auto_field(dump_only=True) event = auto_field(dump_only=True) execution_date = auto_field(dump_only=True) owner = auto_field(dump_only=True) diff --git a/airflow/migrations/versions/0149_2_10_0_add_try_number_to_audit_log.py b/airflow/migrations/versions/0149_2_10_0_add_try_number_to_audit_log.py new file mode 100644 index 0000000000000..7259082020fa9 --- /dev/null +++ b/airflow/migrations/versions/0149_2_10_0_add_try_number_to_audit_log.py @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Add try_number to audit log. + +Revision ID: 41b3bc7c0272 +Revises: ec3471c1e067 +Create Date: 2024-07-11 14:48:58.998259 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "41b3bc7c0272" +down_revision = "ec3471c1e067" +branch_labels = None +depends_on = None +airflow_version = "2.10.0" + + +def upgrade(): + """Apply add try_number to audit log.""" + with op.batch_alter_table("log") as batch_op: + batch_op.add_column(sa.Column("try_number", sa.Integer(), nullable=True)) + batch_op.create_index( + "idx_log_task_instance", ["dag_id", "task_id", "run_id", "map_index", "try_number"], unique=False + ) + + +def downgrade(): + """Unapply add try_number to audit log.""" + with op.batch_alter_table("log") as batch_op: + batch_op.drop_index("idx_log_task_instance") + batch_op.drop_column("try_number") diff --git a/airflow/models/log.py b/airflow/models/log.py index fbf26b23276d6..7701888268bc4 100644 --- a/airflow/models/log.py +++ b/airflow/models/log.py @@ -40,11 +40,13 @@ class Log(Base): owner = Column(String(500)) owner_display_name = Column(String(500)) extra = Column(Text) + try_number = Column(Integer) __table_args__ = ( Index("idx_log_dag", dag_id), Index("idx_log_dttm", dttm), Index("idx_log_event", event), + Index("idx_log_task_instance", dag_id, task_id, run_id, map_index, try_number), ) def __init__(self, event, task_instance=None, owner=None, owner_display_name=None, extra=None, **kwargs): @@ -59,6 +61,7 @@ def __init__(self, event, task_instance=None, owner=None, owner_display_name=Non self.task_id = task_instance.task_id self.execution_date = task_instance.execution_date self.run_id = task_instance.run_id + self.try_number = task_instance.try_number self.map_index = task_instance.map_index if getattr(task_instance, "task", None): task_owner = task_instance.task.owner @@ -73,6 +76,8 @@ def __init__(self, event, task_instance=None, owner=None, owner_display_name=Non self.run_id = kwargs["run_id"] if "map_index" in kwargs: self.map_index = kwargs["map_index"] + if "try_number" in kwargs: + self.try_number = kwargs["try_number"] self.owner = owner or task_owner self.owner_display_name = owner_display_name or None diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 463b6894f5226..51eef66ea4542 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -118,7 +118,7 @@ class MappedClassProtocol(Protocol): "2.8.1": "88344c1d9134", "2.9.0": "1949afb29106", "2.9.2": "686269002441", - "2.10.0": "ec3471c1e067", + "2.10.0": "41b3bc7c0272", } diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 1b82d07835ab2..2a92c2c087a6d 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -2598,6 +2598,8 @@ export interface components { FilterSourceMapIndex: number; /** @description Filter on map index for mapped task. */ FilterMapIndex: number; + /** @description Filter on try_number for task instance. */ + FilterTryNumber: number; /** * @description The name of the field to order the results by. * Prefix a field name with `-` to reverse the sort order. @@ -3676,6 +3678,10 @@ export interface operations { task_id?: components["parameters"]["FilterTaskID"]; /** Returns objects matched by the Run ID. */ run_id?: components["parameters"]["FilterRunID"]; + /** Filter on map index for mapped task. */ + map_index?: components["parameters"]["FilterMapIndex"]; + /** Filter on try_number for task instance. */ + try_number?: components["parameters"]["FilterTryNumber"]; /** The name of event log. */ event?: components["parameters"]["Event"]; /** The owner's name of event log. */ diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index a3624746f5ec8..b39f4b2df5cd2 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -88c9ce0742c54f376a3c600600d06ac9a6f80a3a2cb85dfcf2472d1c77ed75db \ No newline at end of file +3afa3d98632dcfe0f585f8777d1bfcb06a4d4f1df20838ee019dc94fa05ef73d \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 0328672c20a87..614baec788567 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -12,144 +12,148 @@ job - -job - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - -end_date - - [TIMESTAMP] - -executor_class - - [VARCHAR(500)] - -hostname - - [VARCHAR(500)] - -job_type - - [VARCHAR(30)] - -latest_heartbeat - - [TIMESTAMP] - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -unixname - - [VARCHAR(1000)] + +job + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + +end_date + + [TIMESTAMP] + +executor_class + + [VARCHAR(500)] + +hostname + + [VARCHAR(500)] + +job_type + + [VARCHAR(30)] + +latest_heartbeat + + [TIMESTAMP] + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +unixname + + [VARCHAR(1000)] slot_pool - -slot_pool - -id - - [INTEGER] - NOT NULL - -description - - [TEXT] - -include_deferred - - [BOOLEAN] - NOT NULL - -pool - - [VARCHAR(256)] - -slots - - [INTEGER] + +slot_pool + +id + + [INTEGER] + NOT NULL + +description + + [TEXT] + +include_deferred + + [BOOLEAN] + NOT NULL + +pool + + [VARCHAR(256)] + +slots + + [INTEGER] dag_priority_parsing_request - -dag_priority_parsing_request - -id - - [VARCHAR(32)] - NOT NULL - -fileloc - - [VARCHAR(2000)] - NOT NULL + +dag_priority_parsing_request + +id + + [VARCHAR(32)] + NOT NULL + +fileloc + + [VARCHAR(2000)] + NOT NULL log - -log - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - -dttm - - [TIMESTAMP] - -event - - [VARCHAR(60)] - -execution_date - - [TIMESTAMP] - -extra - - [TEXT] - -map_index - - [INTEGER] - -owner - - [VARCHAR(500)] - -owner_display_name - - [VARCHAR(500)] - -run_id - - [VARCHAR(250)] - -task_id - - [VARCHAR(250)] + +log + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + +dttm + + [TIMESTAMP] + +event + + [VARCHAR(60)] + +execution_date + + [TIMESTAMP] + +extra + + [TEXT] + +map_index + + [INTEGER] + +owner + + [VARCHAR(500)] + +owner_display_name + + [VARCHAR(500)] + +run_id + + [VARCHAR(250)] + +task_id + + [VARCHAR(250)] + +try_number + + [INTEGER] diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 95fe3dc591403..45ae0a7d5afb9 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=================================+===================+===================+==============================================================+ -| ``ec3471c1e067`` (head) | ``05e19f3176be`` | ``2.10.0`` | Add dataset_alias_dataset_event. | +| ``41b3bc7c0272`` (head) | ``ec3471c1e067`` | ``2.10.0`` | Add try_number to audit log. | ++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ +| ``ec3471c1e067`` | ``05e19f3176be`` | ``2.10.0`` | Add dataset_alias_dataset_event. | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | ``05e19f3176be`` | ``d482b7261ff9`` | ``2.10.0`` | Add dataset_alias. | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ diff --git a/tests/api_connexion/endpoints/test_event_log_endpoint.py b/tests/api_connexion/endpoints/test_event_log_endpoint.py index 6738858ddd00f..bd910c94a2799 100644 --- a/tests/api_connexion/endpoints/test_event_log_endpoint.py +++ b/tests/api_connexion/endpoints/test_event_log_endpoint.py @@ -132,12 +132,16 @@ def test_should_respond_200(self, log_model): f"/api/v1/eventLogs/{event_log_id}", environ_overrides={"REMOTE_USER": "test"} ) assert response.status_code == 200 + data = response.json + data["try_number"] assert response.json == { "event_log_id": event_log_id, "event": "TEST_EVENT", "dag_id": "TEST_DAG_ID", "task_id": "TEST_TASK_ID", "run_id": "TEST_RUN_ID", + "map_index": -1, + "try_number": 0, "execution_date": self.default_time.isoformat(), "owner": "airflow", "when": self.default_time.isoformat(), @@ -199,6 +203,8 @@ def test_should_respond_200(self, session, create_log_model): "dag_id": "TEST_DAG_ID", "task_id": "TEST_TASK_ID", "run_id": "TEST_RUN_ID", + "map_index": -1, + "try_number": 0, "execution_date": self.default_time.isoformat(), "owner": "airflow", "when": self.default_time.isoformat(), @@ -210,6 +216,8 @@ def test_should_respond_200(self, session, create_log_model): "dag_id": "TEST_DAG_ID", "task_id": "TEST_TASK_ID", "run_id": "TEST_RUN_ID", + "map_index": -1, + "try_number": 0, "execution_date": self.default_time.isoformat(), "owner": "airflow", "when": self.default_time_2.isoformat(), @@ -221,6 +229,8 @@ def test_should_respond_200(self, session, create_log_model): "dag_id": None, "task_id": None, "run_id": None, + "map_index": None, + "try_number": None, "execution_date": None, "owner": "root", "when": self.default_time_2.isoformat(), @@ -249,6 +259,8 @@ def test_order_eventlogs_by_owner(self, create_log_model, session): "dag_id": "TEST_DAG_ID", "task_id": "TEST_TASK_ID", "run_id": "TEST_RUN_ID", + "map_index": -1, + "try_number": 0, "execution_date": self.default_time.isoformat(), "owner": "zsh", # Order by name, sort order is descending(-) "when": self.default_time_2.isoformat(), @@ -260,6 +272,8 @@ def test_order_eventlogs_by_owner(self, create_log_model, session): "dag_id": None, "task_id": None, "run_id": None, + "map_index": None, + "try_number": None, "execution_date": None, "owner": "root", "when": self.default_time_2.isoformat(), @@ -271,6 +285,8 @@ def test_order_eventlogs_by_owner(self, create_log_model, session): "dag_id": "TEST_DAG_ID", "task_id": "TEST_TASK_ID", "run_id": "TEST_RUN_ID", + "map_index": -1, + "try_number": 0, "execution_date": self.default_time.isoformat(), "owner": "airflow", "when": self.default_time.isoformat(), diff --git a/tests/api_connexion/schemas/test_event_log_schema.py b/tests/api_connexion/schemas/test_event_log_schema.py index 7a475e614604f..d4b1bf2c8eb06 100644 --- a/tests/api_connexion/schemas/test_event_log_schema.py +++ b/tests/api_connexion/schemas/test_event_log_schema.py @@ -58,6 +58,8 @@ def test_serialize(self, task_instance): "dag_id": "TEST_DAG_ID", "task_id": "TEST_TASK_ID", "run_id": "TEST_RUN_ID", + "map_index": -1, + "try_number": 0, "execution_date": self.default_time.isoformat(), "owner": "airflow", "when": self.default_time.isoformat(), @@ -82,6 +84,8 @@ def test_serialize(self, task_instance): "dag_id": "TEST_DAG_ID", "task_id": "TEST_TASK_ID", "run_id": "TEST_RUN_ID", + "map_index": -1, + "try_number": 0, "execution_date": self.default_time.isoformat(), "owner": "airflow", "when": self.default_time.isoformat(), @@ -93,6 +97,8 @@ def test_serialize(self, task_instance): "dag_id": "TEST_DAG_ID", "task_id": "TEST_TASK_ID", "run_id": "TEST_RUN_ID", + "map_index": -1, + "try_number": 0, "execution_date": self.default_time.isoformat(), "owner": "airflow", "when": self.default_time2.isoformat(),