Skip to content

Commit

Permalink
Make a direct query for dag_hash
Browse files Browse the repository at this point in the history
  • Loading branch information
ephraimbuddy committed Oct 4, 2024
1 parent 177aa86 commit 9c5ea0d
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.

"""
Add SDM foreignkey to DagRun.
Add SDM foreign key to DagRun, TI & TIH.
Revision ID: 4235395d5ec5
Revises: e1ff90d3efe9
Expand Down
19 changes: 9 additions & 10 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class DagRun(Base, LoggingMixin):
Integer,
ForeignKey("serialized_dag.id", name="dag_run_serialized_dag_fkey", ondelete="SET NULL"),
)
serialized_dag = relationship("SerializedDagModel", back_populates="dag_run", lazy="select")
serialized_dag = relationship("SerializedDagModel", back_populates="dag_run")

# Remove this `if` after upgrading Sphinx-AutoAPI
if not TYPE_CHECKING and "BUILDING_AIRFLOW_DOCS" in os.environ:
Expand Down Expand Up @@ -359,13 +359,11 @@ def set_state(self, state: DagRunState) -> None:
def state(self):
return synonym("_state", descriptor=property(self.get_state, self.set_state))

@property
def dag_hash(self):
if self.serialized_dag:
return self.serialized_dag.dag_hash
# TODO: Should we avoid serialized DAG deletion since
# we can have multiple versions of same dag?
return "SerializedDAG Deleted"
@provide_session
def dag_hash(self, session: Session = NEW_SESSION):
from airflow.models.serialized_dag import SerializedDagModel as SDM

return str(session.scalar(select(SDM.dag_hash).where(SDM.id == self.serialized_dag_id)))

@provide_session
def refresh_from_db(self, session: Session = NEW_SESSION) -> None:
Expand Down Expand Up @@ -952,6 +950,7 @@ def recalculate(self) -> _UnfinishedStates:
"state=%s, external_trigger=%s, run_type=%s, "
"data_interval_start=%s, data_interval_end=%s, dag_hash=%s"
)

self.log.info(
msg,
self.dag_id,
Expand All @@ -969,7 +968,7 @@ def recalculate(self) -> _UnfinishedStates:
self.run_type,
self.data_interval_start,
self.data_interval_end,
self.dag_hash,
self.dag_hash(session),
)

with Trace.start_span_from_dagrun(dagrun=self) as span:
Expand All @@ -993,7 +992,7 @@ def recalculate(self) -> _UnfinishedStates:
"run_type": str(self.run_type),
"data_interval_start": str(self.data_interval_start),
"data_interval_end": str(self.data_interval_end),
"dag_hash": str(self.dag_hash),
"dag_hash": str(self.dag_hash(session)),
"conf": str(self.conf),
}
if span.is_recording():
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
e01b3a34963248f25786de5c6683f0152fe8592096d4568c3afc54d634e20fb8
3fe5f6994c5ccdada9ce9834d3ee314cef70c243f968aba4e399e6e1f416d720
2 changes: 1 addition & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``4235395d5ec5`` (head) | ``e1ff90d3efe9`` | ``3.0.0`` | Add SDM foreignkey to DagRun. |
| ``4235395d5ec5`` (head) | ``e1ff90d3efe9`` | ``3.0.0`` | Add SDM foreign key to DagRun, TI & TIH. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``e1ff90d3efe9`` | ``0d9e73a75ee4`` | ``3.0.0`` | Add serial ID to SDM. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down
10 changes: 5 additions & 5 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3471,7 +3471,7 @@ def test_verify_integrity_if_dag_not_changed(self, dag_maker):
assert tis_count == 1

latest_dag_version = SerializedDagModel.get_latest_version_hash(dr.dag_id, session=session)
assert dr.dag_hash == latest_dag_version
assert dr.dag_hash() == latest_dag_version

session.rollback()
session.close()
Expand Down Expand Up @@ -3505,7 +3505,7 @@ def test_verify_integrity_if_dag_changed(self, dag_maker):
dr = drs[0]

dag_version_1 = SerializedDagModel.get_latest_version_hash(dr.dag_id, session=session)
assert dr.dag_hash == dag_version_1
assert dr.dag_hash() == dag_version_1
assert self.job_runner.dagbag.dags == {"test_verify_integrity_if_dag_changed": dag}
assert len(self.job_runner.dagbag.dags.get("test_verify_integrity_if_dag_changed").tasks) == 1

Expand All @@ -3522,7 +3522,7 @@ def test_verify_integrity_if_dag_changed(self, dag_maker):
drs = DagRun.find(dag_id=dag.dag_id, session=session)
assert len(drs) == 1
dr = drs[0]
assert dr.dag_hash == dag_version_2
assert dr.dag_hash() == dag_version_2
assert self.job_runner.dagbag.dags == {"test_verify_integrity_if_dag_changed": dag}
assert len(self.job_runner.dagbag.dags.get("test_verify_integrity_if_dag_changed").tasks) == 2

Expand All @@ -3538,7 +3538,7 @@ def test_verify_integrity_if_dag_changed(self, dag_maker):
assert tis_count == 2

latest_dag_version = SerializedDagModel.get_latest_version_hash(dr.dag_id, session=session)
assert dr.dag_hash == latest_dag_version
assert dr.dag_hash() == latest_dag_version

session.rollback()
session.close()
Expand Down Expand Up @@ -3572,7 +3572,7 @@ def test_verify_integrity_if_dag_disappeared(self, dag_maker, caplog):
dr = drs[0]

dag_version_1 = SerializedDagModel.get_latest_version_hash(dag_id, session=session)
assert dr.dag_hash == dag_version_1
assert dr.dag_hash() == dag_version_1
assert self.job_runner.dagbag.dags == {"test_verify_integrity_if_dag_disappeared": dag}
assert len(self.job_runner.dagbag.dags.get("test_verify_integrity_if_dag_disappeared").tasks) == 1

Expand Down

0 comments on commit 9c5ea0d

Please sign in to comment.