From 9c5ea0dea8ac1716aa6614253de63f14533fa892 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 4 Oct 2024 20:36:03 +0100 Subject: [PATCH] Make a direct query for dag_hash --- ...0037_3_0_0_add_sdm_foreignkey_to_dagrun.py | 2 +- airflow/models/dagrun.py | 19 +++++++++---------- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/migrations-ref.rst | 2 +- tests/jobs/test_scheduler_job.py | 10 +++++----- 5 files changed, 17 insertions(+), 18 deletions(-) diff --git a/airflow/migrations/versions/0037_3_0_0_add_sdm_foreignkey_to_dagrun.py b/airflow/migrations/versions/0037_3_0_0_add_sdm_foreignkey_to_dagrun.py index 6c99ee9f6a81c..8ae5c714bd609 100644 --- a/airflow/migrations/versions/0037_3_0_0_add_sdm_foreignkey_to_dagrun.py +++ b/airflow/migrations/versions/0037_3_0_0_add_sdm_foreignkey_to_dagrun.py @@ -17,7 +17,7 @@ # under the License. """ -Add SDM foreignkey to DagRun. +Add SDM foreign key to DagRun, TI & TIH. Revision ID: 4235395d5ec5 Revises: e1ff90d3efe9 diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index edfe3703e676d..20ecd170db7c9 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -159,7 +159,7 @@ class DagRun(Base, LoggingMixin): Integer, ForeignKey("serialized_dag.id", name="dag_run_serialized_dag_fkey", ondelete="SET NULL"), ) - serialized_dag = relationship("SerializedDagModel", back_populates="dag_run", lazy="select") + serialized_dag = relationship("SerializedDagModel", back_populates="dag_run") # Remove this `if` after upgrading Sphinx-AutoAPI if not TYPE_CHECKING and "BUILDING_AIRFLOW_DOCS" in os.environ: @@ -359,13 +359,11 @@ def set_state(self, state: DagRunState) -> None: def state(self): return synonym("_state", descriptor=property(self.get_state, self.set_state)) - @property - def dag_hash(self): - if self.serialized_dag: - return self.serialized_dag.dag_hash - # TODO: Should we avoid serialized DAG deletion since - # we can have multiple versions of same dag? - return "SerializedDAG Deleted" + @provide_session + def dag_hash(self, session: Session = NEW_SESSION): + from airflow.models.serialized_dag import SerializedDagModel as SDM + + return str(session.scalar(select(SDM.dag_hash).where(SDM.id == self.serialized_dag_id))) @provide_session def refresh_from_db(self, session: Session = NEW_SESSION) -> None: @@ -952,6 +950,7 @@ def recalculate(self) -> _UnfinishedStates: "state=%s, external_trigger=%s, run_type=%s, " "data_interval_start=%s, data_interval_end=%s, dag_hash=%s" ) + self.log.info( msg, self.dag_id, @@ -969,7 +968,7 @@ def recalculate(self) -> _UnfinishedStates: self.run_type, self.data_interval_start, self.data_interval_end, - self.dag_hash, + self.dag_hash(session), ) with Trace.start_span_from_dagrun(dagrun=self) as span: @@ -993,7 +992,7 @@ def recalculate(self) -> _UnfinishedStates: "run_type": str(self.run_type), "data_interval_start": str(self.data_interval_start), "data_interval_end": str(self.data_interval_end), - "dag_hash": str(self.dag_hash), + "dag_hash": str(self.dag_hash(session)), "conf": str(self.conf), } if span.is_recording(): diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index e9ec48807f946..48830b1ddaff8 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -e01b3a34963248f25786de5c6683f0152fe8592096d4568c3afc54d634e20fb8 \ No newline at end of file +3fe5f6994c5ccdada9ce9834d3ee314cef70c243f968aba4e399e6e1f416d720 \ No newline at end of file diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 6901f8aee9918..067cbb6f619e6 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,7 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``4235395d5ec5`` (head) | ``e1ff90d3efe9`` | ``3.0.0`` | Add SDM foreignkey to DagRun. | +| ``4235395d5ec5`` (head) | ``e1ff90d3efe9`` | ``3.0.0`` | Add SDM foreign key to DagRun, TI & TIH. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``e1ff90d3efe9`` | ``0d9e73a75ee4`` | ``3.0.0`` | Add serial ID to SDM. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index c8b40f6af7e24..ac7910afbc579 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3471,7 +3471,7 @@ def test_verify_integrity_if_dag_not_changed(self, dag_maker): assert tis_count == 1 latest_dag_version = SerializedDagModel.get_latest_version_hash(dr.dag_id, session=session) - assert dr.dag_hash == latest_dag_version + assert dr.dag_hash() == latest_dag_version session.rollback() session.close() @@ -3505,7 +3505,7 @@ def test_verify_integrity_if_dag_changed(self, dag_maker): dr = drs[0] dag_version_1 = SerializedDagModel.get_latest_version_hash(dr.dag_id, session=session) - assert dr.dag_hash == dag_version_1 + assert dr.dag_hash() == dag_version_1 assert self.job_runner.dagbag.dags == {"test_verify_integrity_if_dag_changed": dag} assert len(self.job_runner.dagbag.dags.get("test_verify_integrity_if_dag_changed").tasks) == 1 @@ -3522,7 +3522,7 @@ def test_verify_integrity_if_dag_changed(self, dag_maker): drs = DagRun.find(dag_id=dag.dag_id, session=session) assert len(drs) == 1 dr = drs[0] - assert dr.dag_hash == dag_version_2 + assert dr.dag_hash() == dag_version_2 assert self.job_runner.dagbag.dags == {"test_verify_integrity_if_dag_changed": dag} assert len(self.job_runner.dagbag.dags.get("test_verify_integrity_if_dag_changed").tasks) == 2 @@ -3538,7 +3538,7 @@ def test_verify_integrity_if_dag_changed(self, dag_maker): assert tis_count == 2 latest_dag_version = SerializedDagModel.get_latest_version_hash(dr.dag_id, session=session) - assert dr.dag_hash == latest_dag_version + assert dr.dag_hash() == latest_dag_version session.rollback() session.close() @@ -3572,7 +3572,7 @@ def test_verify_integrity_if_dag_disappeared(self, dag_maker, caplog): dr = drs[0] dag_version_1 = SerializedDagModel.get_latest_version_hash(dag_id, session=session) - assert dr.dag_hash == dag_version_1 + assert dr.dag_hash() == dag_version_1 assert self.job_runner.dagbag.dags == {"test_verify_integrity_if_dag_disappeared": dag} assert len(self.job_runner.dagbag.dags.get("test_verify_integrity_if_dag_disappeared").tasks) == 1