Skip to content

Commit

Permalink
AIP-65: Add DAG versioning support (apache#42913)
Browse files Browse the repository at this point in the history
* AIP-65: Add DAG versioning support

This commit introduces versioning for DAGs

Changes:
- Introduced DagVersion model to handle versioning of DAGs.
- Added version_name field to DAG for use in tracking the dagversion by users
- Added support for version retrieval in the get_dag_source API endpoint
- Modified DAG execution logic to reference dag_version_id instead of the
dag_hash to ensure DAG runs are linked to specific versions.

Fix tests

revert RESTAPI changes

* fixup! AIP-65: Add DAG versioning support

* fixup! fixup! AIP-65: Add DAG versioning support

* fix migration

* fix test

* more test fixes

* update query count

* fix static checks

* Fix query and add created_at to dag_version table

* improve code

* Change to using UUID for primary keys

* DagCode.bulk_write_code is no longer used

* fixup! Change to using UUID for primary keys

* fix tests

* fixup! fix tests

* use uuid for version_name

* fixup! use uuid for version_name

* use row lock when writing dag version

* use row lock when writing dag version

* fixup! use row lock when writing dag version

* deactivating dag should not remove serialized dags

* save version_name as string not uuid

* Make dag_version_id unique

* fixup! Make dag_version_id unique

* Fix tests

* Use uuid7

* fix test

* fixup! fix test

* use binary=False for uuid field to fix sqlite issue

* apply suggestions from code review

* Remove unnecessary version_name on dagmodel

* Fix sqlalchemy 2 warning

* Fix conflicts

* Apply suggestions from code review

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>

* fixup! Apply suggestions from code review

* fixup! fixup! Apply suggestions from code review

* add test for dagversion model and make version_name, number and dag_id unique

* Remove commented test as serdag can no longer disappear

* Add SQLAlchemy-utils to requirements

* mark test_dag_version.py as db_test

* make version_name nullable

* Apply suggestions from code review

* fixup! Apply suggestions from code review

* remove file_updater

* Use dag_version for creating dagruns instead of dag_version_id

* fix conflicts

* use if TYPE_CHECKING

* Add docstrings to methods

* Move getting latest serdags to SerializedDagModel
  • Loading branch information
ephraimbuddy authored Nov 5, 2024
1 parent da50242 commit 1116f28
Show file tree
Hide file tree
Showing 46 changed files with 3,049 additions and 2,366 deletions.
5 changes: 3 additions & 2 deletions airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.exceptions import DagNotFound, DagRunAlreadyExists
from airflow.models import DagBag, DagModel, DagRun
from airflow.models.dag_version import DagVersion
from airflow.utils import timezone
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState
Expand Down Expand Up @@ -92,14 +93,14 @@ def _trigger_dag(
run_conf = None
if conf:
run_conf = conf if isinstance(conf, dict) else json.loads(conf)

dag_version = DagVersion.get_latest_version(dag.dag_id)
dag_run = dag.create_dagrun(
run_id=run_id,
execution_date=execution_date,
state=DagRunState.QUEUED,
conf=run_conf,
external_trigger=True,
dag_hash=dag_bag.dags_hash.get(dag_id),
dag_version=dag_version,
data_interval=data_interval,
triggered_by=triggered_by,
)
Expand Down
5 changes: 3 additions & 2 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.exceptions import ParamValidationError
from airflow.models import DagModel, DagRun
from airflow.models.dag_version import DagVersion
from airflow.timetables.base import DataInterval
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.api_migration import mark_fastapi_migration_done
Expand Down Expand Up @@ -341,7 +342,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
)
else:
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)

dag_version = DagVersion.get_latest_version(dag.dag_id)
dag_run = dag.create_dagrun(
run_type=DagRunType.MANUAL,
run_id=run_id,
Expand All @@ -350,7 +351,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
state=DagRunState.QUEUED,
conf=post_body.get("conf"),
external_trigger=True,
dag_hash=get_airflow_app().dag_bag.dags_hash.get(dag_id),
dag_version=dag_version,
session=session,
triggered_by=DagRunTriggeredByType.REST_API,
)
Expand Down
15 changes: 0 additions & 15 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
from airflow.models.dagwarning import DagWarning
from airflow.models.db_callback_request import DbCallbackRequest
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.secrets.cache import SecretCache
from airflow.stats import Stats
from airflow.traces.tracer import Trace, add_span
Expand Down Expand Up @@ -539,10 +538,6 @@ def deactivate_stale_dags(
if deactivated:
cls.logger().info("Deactivated %i DAGs which are no longer present in file.", deactivated)

for dag_id in to_deactivate:
SerializedDagModel.remove_dag(dag_id)
cls.logger().info("Deleted DAG %s in serialized_dag table", dag_id)

def _run_parsing_loop(self):
# In sync mode we want timeout=None -- wait forever until a message is received
if self._async_mode:
Expand Down Expand Up @@ -819,20 +814,10 @@ def _iter_dag_filelocs(fileloc: str) -> Iterator[str]:

dag_filelocs = {full_loc for path in self._file_paths for full_loc in _iter_dag_filelocs(path)}

from airflow.models.dagcode import DagCode

SerializedDagModel.remove_deleted_dags(
alive_dag_filelocs=dag_filelocs,
processor_subdir=self.get_dag_directory(),
)
DagModel.deactivate_deleted_dags(
dag_filelocs,
processor_subdir=self.get_dag_directory(),
)
DagCode.remove_deleted_code(
dag_filelocs,
processor_subdir=self.get_dag_directory(),
)

return True
return False
Expand Down
5 changes: 3 additions & 2 deletions airflow/example_dags/plugins/event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,10 @@ def on_dag_run_running(dag_run: DagRun, msg: str):
"""
print("Dag run in running state")
queued_at = dag_run.queued_at
dag_hash_info = dag_run.dag_hash

print(f"Dag information Queued at: {queued_at} hash info: {dag_hash_info}")
version = dag_run.dag_version.version

print(f"Dag information Queued at: {queued_at} version: {version}")


# [END howto_listen_dagrun_running_task]
22 changes: 11 additions & 11 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@
)
from airflow.models.backfill import Backfill
from airflow.models.dag import DAG, DagModel
from airflow.models.dag_version import DagVersion
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning, DagWarningType
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
from airflow.stats import Stats
from airflow.ti_deps.dependencies_states import EXECUTION_STATES
Expand Down Expand Up @@ -1338,7 +1338,7 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
self.log.error("DAG '%s' not found in serialized_dag table", dag_model.dag_id)
continue

dag_hash = self.dagbag.dags_hash.get(dag.dag_id)
latest_dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)

data_interval = dag.get_next_data_interval(dag_model)
# Explicitly check if the DagRun already exists. This is an edge case
Expand All @@ -1358,7 +1358,7 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
data_interval=data_interval,
external_trigger=False,
session=session,
dag_hash=dag_hash,
dag_version=latest_dag_version,
creating_job_id=self.job.id,
triggered_by=DagRunTriggeredByType.TIMETABLE,
)
Expand Down Expand Up @@ -1417,7 +1417,7 @@ def _create_dag_runs_asset_triggered(
)
continue

dag_hash = self.dagbag.dags_hash.get(dag.dag_id)
latest_dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)

# Explicitly check if the DagRun already exists. This is an edge case
# where a Dag Run is created but `DagModel.next_dagrun` and `DagModel.next_dagrun_create_after`
Expand Down Expand Up @@ -1472,7 +1472,7 @@ def _create_dag_runs_asset_triggered(
state=DagRunState.QUEUED,
external_trigger=False,
session=session,
dag_hash=dag_hash,
dag_version=latest_dag_version,
creating_job_id=self.job.id,
triggered_by=DagRunTriggeredByType.ASSET,
)
Expand Down Expand Up @@ -1750,18 +1750,20 @@ def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session: Session) ->
Return True if we determine that DAG still exists.
"""
latest_version = SerializedDagModel.get_latest_version_hash(dag_run.dag_id, session=session)
if dag_run.dag_hash == latest_version:
latest_dag_version = DagVersion.get_latest_version(dag_run.dag_id, session=session)
if TYPE_CHECKING:
assert latest_dag_version
if dag_run.dag_version_id == latest_dag_version.id:
self.log.debug("DAG %s not changed structure, skipping dagrun.verify_integrity", dag_run.dag_id)
return True

dag_run.dag_hash = latest_version

# Refresh the DAG
dag_run.dag = self.dagbag.get_dag(dag_id=dag_run.dag_id, session=session)
if not dag_run.dag:
return False

dag_run.dag_version = latest_dag_version

# Verify integrity also takes care of session.flush
dag_run.verify_integrity(session=session)
return True
Expand Down Expand Up @@ -2041,7 +2043,6 @@ def _cleanup_stale_dags(self, session: Session = NEW_SESSION) -> None:
In case one of DagProcessors is stopped (in case there are multiple of them
for different dag folders), its dags are never marked as inactive.
Also remove dags from SerializedDag table.
Executed on schedule only if [scheduler]standalone_dag_processor is True.
"""
self.log.debug("Checking dags not parsed within last %s seconds.", self._dag_stale_not_seen_duration)
Expand All @@ -2056,7 +2057,6 @@ def _cleanup_stale_dags(self, session: Session = NEW_SESSION) -> None:
self.log.info("Found (%d) stales dags not parsed after %s.", len(stale_dags), limit_lpt)
for dag in stale_dags:
dag.is_active = False
SerializedDagModel.remove_dag(dag_id=dag.dag_id, session=session)
session.flush()

@provide_session
Expand Down
151 changes: 151 additions & 0 deletions airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
add dag versioning.
Revision ID: 2b47dc6bc8df
Revises: d03e4a635aa3
Create Date: 2024-10-09 05:44:04.670984
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op
from sqlalchemy_utils import UUIDType

from airflow.migrations.db_types import StringID
from airflow.models.base import naming_convention
from airflow.utils import timezone
from airflow.utils.sqlalchemy import UtcDateTime

# revision identifiers, used by Alembic.
revision = "2b47dc6bc8df"
down_revision = "d03e4a635aa3"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
"""Apply add dag versioning."""
op.create_table(
"dag_version",
sa.Column("id", UUIDType(binary=False), nullable=False),
sa.Column("version_number", sa.Integer(), nullable=False),
sa.Column("version_name", StringID()),
sa.Column("dag_id", StringID(), nullable=False),
sa.Column("created_at", UtcDateTime(), nullable=False, default=timezone.utcnow),
sa.ForeignKeyConstraint(
("dag_id",), ["dag.dag_id"], name=op.f("dag_version_dag_id_fkey"), ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("id", name=op.f("dag_version_pkey")),
sa.UniqueConstraint("dag_id", "version_number", name="dag_id_v_name_v_number_unique_constraint"),
)
with op.batch_alter_table("dag_code", recreate="always", naming_convention=naming_convention) as batch_op:
batch_op.drop_constraint("dag_code_pkey", type_="primary")
batch_op.add_column(
sa.Column("id", UUIDType(binary=False), primary_key=True), insert_before="fileloc_hash"
)
batch_op.create_primary_key("dag_code_pkey", ["id"])
batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False), nullable=False))
batch_op.create_foreign_key(
batch_op.f("dag_code_dag_version_id_fkey"),
"dag_version",
["dag_version_id"],
["id"],
ondelete="CASCADE",
)
batch_op.create_unique_constraint("dag_code_dag_version_id_uq", ["dag_version_id"])

with op.batch_alter_table(
"serialized_dag", recreate="always", naming_convention=naming_convention
) as batch_op:
batch_op.drop_constraint("serialized_dag_pkey", type_="primary")
batch_op.add_column(sa.Column("id", UUIDType(binary=False), primary_key=True))
batch_op.drop_index("idx_fileloc_hash")
batch_op.drop_column("fileloc_hash")
batch_op.drop_column("fileloc")
batch_op.create_primary_key("serialized_dag_pkey", ["id"])
batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False), nullable=False))
batch_op.create_foreign_key(
batch_op.f("serialized_dag_dag_version_id_fkey"),
"dag_version",
["dag_version_id"],
["id"],
ondelete="CASCADE",
)
batch_op.create_unique_constraint("serialized_dag_dag_version_id_uq", ["dag_version_id"])

with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False)))
batch_op.create_foreign_key(
batch_op.f("task_instance_dag_version_id_fkey"),
"dag_version",
["dag_version_id"],
["id"],
ondelete="CASCADE",
)

with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False)))

with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False)))
batch_op.create_foreign_key(
batch_op.f("dag_run_dag_version_id_fkey"),
"dag_version",
["dag_version_id"],
["id"],
ondelete="CASCADE",
)
batch_op.drop_column("dag_hash")


def downgrade():
"""Unapply add dag versioning."""
with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
batch_op.drop_column("dag_version_id")

with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.drop_constraint(batch_op.f("task_instance_dag_version_id_fkey"), type_="foreignkey")
batch_op.drop_column("dag_version_id")

with op.batch_alter_table("dag_code", schema=None) as batch_op:
batch_op.drop_column("id")
batch_op.drop_constraint(batch_op.f("dag_code_dag_version_id_fkey"), type_="foreignkey")
batch_op.drop_column("dag_version_id")
batch_op.create_primary_key("dag_code_pkey", ["fileloc_hash"])

with op.batch_alter_table("serialized_dag", schema=None, naming_convention=naming_convention) as batch_op:
batch_op.drop_column("id")
batch_op.add_column(sa.Column("fileloc", sa.String(length=2000), autoincrement=False, nullable=False))
batch_op.add_column(sa.Column("fileloc_hash", sa.BIGINT(), autoincrement=False, nullable=False))
batch_op.create_index("idx_fileloc_hash", ["fileloc_hash"], unique=False)
batch_op.create_primary_key("serialized_dag_pkey", ["dag_id"])
batch_op.drop_constraint(batch_op.f("serialized_dag_dag_version_id_fkey"), type_="foreignkey")
batch_op.drop_column("dag_version_id")

with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.add_column(sa.Column("dag_hash", sa.String(length=32), autoincrement=False, nullable=True))
batch_op.drop_constraint(batch_op.f("dag_run_dag_version_id_fkey"), type_="foreignkey")
batch_op.drop_column("dag_version_id")

op.drop_table("dag_version")
1 change: 1 addition & 0 deletions airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def import_all_models():

import airflow.models.asset
import airflow.models.backfill
import airflow.models.dag_version
import airflow.models.dagwarning
import airflow.models.errors
import airflow.models.serialized_dag
Expand Down
6 changes: 4 additions & 2 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from airflow.api_connexion.exceptions import NotFound
from airflow.exceptions import AirflowException
from airflow.models.base import Base, StringID
from airflow.models.dag_version import DagVersion
from airflow.settings import json
from airflow.utils import timezone
from airflow.utils.session import create_session
Expand Down Expand Up @@ -200,7 +201,7 @@ def _create_backfill_dag_run(
)
)
return

dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
dr = dag.create_dagrun(
triggered_by=DagRunTriggeredByType.BACKFILL,
execution_date=info.logical_date,
Expand All @@ -213,6 +214,7 @@ def _create_backfill_dag_run(
creating_job_id=None,
session=session,
backfill_id=backfill_id,
dag_version=dag_version,
)
session.add(
BackfillDagRun(
Expand Down Expand Up @@ -253,7 +255,7 @@ def _create_backfill(
from airflow.models.serialized_dag import SerializedDagModel

with create_session() as session:
serdag = session.get(SerializedDagModel, dag_id)
serdag = session.scalar(SerializedDagModel.latest_item_select_object(dag_id))
if not serdag:
raise NotFound(f"Could not find dag {dag_id}")
# todo: if dag has no schedule, raise
Expand Down
Loading

0 comments on commit 1116f28

Please sign in to comment.