Skip to content

Commit

Permalink
Temporarily revert dag versioning changes (apache#43730)
Browse files Browse the repository at this point in the history
* Revert "Delete the Serialized Dag and DagCode before DagVersion migration (apache#43700)"

This reverts commit 438f71d.

* Revert "AIP-65: Add DAG versioning support (apache#42913)"

This reverts commit 1116f28.
  • Loading branch information
potiuk authored Nov 6, 2024
1 parent 26eaeed commit b757bd8
Show file tree
Hide file tree
Showing 46 changed files with 2,366 additions and 3,056 deletions.
5 changes: 2 additions & 3 deletions airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.exceptions import DagNotFound, DagRunAlreadyExists
from airflow.models import DagBag, DagModel, DagRun
from airflow.models.dag_version import DagVersion
from airflow.utils import timezone
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState
Expand Down Expand Up @@ -93,14 +92,14 @@ def _trigger_dag(
run_conf = None
if conf:
run_conf = conf if isinstance(conf, dict) else json.loads(conf)
dag_version = DagVersion.get_latest_version(dag.dag_id)

dag_run = dag.create_dagrun(
run_id=run_id,
execution_date=execution_date,
state=DagRunState.QUEUED,
conf=run_conf,
external_trigger=True,
dag_version=dag_version,
dag_hash=dag_bag.dags_hash.get(dag_id),
data_interval=data_interval,
triggered_by=triggered_by,
)
Expand Down
5 changes: 2 additions & 3 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.exceptions import ParamValidationError
from airflow.models import DagModel, DagRun
from airflow.models.dag_version import DagVersion
from airflow.timetables.base import DataInterval
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.api_migration import mark_fastapi_migration_done
Expand Down Expand Up @@ -342,7 +341,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
)
else:
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)
dag_version = DagVersion.get_latest_version(dag.dag_id)

dag_run = dag.create_dagrun(
run_type=DagRunType.MANUAL,
run_id=run_id,
Expand All @@ -351,7 +350,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
state=DagRunState.QUEUED,
conf=post_body.get("conf"),
external_trigger=True,
dag_version=dag_version,
dag_hash=get_airflow_app().dag_bag.dags_hash.get(dag_id),
session=session,
triggered_by=DagRunTriggeredByType.REST_API,
)
Expand Down
15 changes: 15 additions & 0 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from airflow.models.dagwarning import DagWarning
from airflow.models.db_callback_request import DbCallbackRequest
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.secrets.cache import SecretCache
from airflow.stats import Stats
from airflow.traces.tracer import Trace, add_span
Expand Down Expand Up @@ -538,6 +539,10 @@ def deactivate_stale_dags(
if deactivated:
cls.logger().info("Deactivated %i DAGs which are no longer present in file.", deactivated)

for dag_id in to_deactivate:
SerializedDagModel.remove_dag(dag_id)
cls.logger().info("Deleted DAG %s in serialized_dag table", dag_id)

def _run_parsing_loop(self):
# In sync mode we want timeout=None -- wait forever until a message is received
if self._async_mode:
Expand Down Expand Up @@ -814,10 +819,20 @@ def _iter_dag_filelocs(fileloc: str) -> Iterator[str]:

dag_filelocs = {full_loc for path in self._file_paths for full_loc in _iter_dag_filelocs(path)}

from airflow.models.dagcode import DagCode

SerializedDagModel.remove_deleted_dags(
alive_dag_filelocs=dag_filelocs,
processor_subdir=self.get_dag_directory(),
)
DagModel.deactivate_deleted_dags(
dag_filelocs,
processor_subdir=self.get_dag_directory(),
)
DagCode.remove_deleted_code(
dag_filelocs,
processor_subdir=self.get_dag_directory(),
)

return True
return False
Expand Down
5 changes: 2 additions & 3 deletions airflow/example_dags/plugins/event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,9 @@ def on_dag_run_running(dag_run: DagRun, msg: str):
"""
print("Dag run in running state")
queued_at = dag_run.queued_at
dag_hash_info = dag_run.dag_hash

version = dag_run.dag_version.version

print(f"Dag information Queued at: {queued_at} version: {version}")
print(f"Dag information Queued at: {queued_at} hash info: {dag_hash_info}")


# [END howto_listen_dagrun_running_task]
22 changes: 11 additions & 11 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@
)
from airflow.models.backfill import Backfill
from airflow.models.dag import DAG, DagModel
from airflow.models.dag_version import DagVersion
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning, DagWarningType
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
from airflow.stats import Stats
from airflow.ti_deps.dependencies_states import EXECUTION_STATES
Expand Down Expand Up @@ -1338,7 +1338,7 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
self.log.error("DAG '%s' not found in serialized_dag table", dag_model.dag_id)
continue

latest_dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
dag_hash = self.dagbag.dags_hash.get(dag.dag_id)

data_interval = dag.get_next_data_interval(dag_model)
# Explicitly check if the DagRun already exists. This is an edge case
Expand All @@ -1358,7 +1358,7 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
data_interval=data_interval,
external_trigger=False,
session=session,
dag_version=latest_dag_version,
dag_hash=dag_hash,
creating_job_id=self.job.id,
triggered_by=DagRunTriggeredByType.TIMETABLE,
)
Expand Down Expand Up @@ -1417,7 +1417,7 @@ def _create_dag_runs_asset_triggered(
)
continue

latest_dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
dag_hash = self.dagbag.dags_hash.get(dag.dag_id)

# Explicitly check if the DagRun already exists. This is an edge case
# where a Dag Run is created but `DagModel.next_dagrun` and `DagModel.next_dagrun_create_after`
Expand Down Expand Up @@ -1472,7 +1472,7 @@ def _create_dag_runs_asset_triggered(
state=DagRunState.QUEUED,
external_trigger=False,
session=session,
dag_version=latest_dag_version,
dag_hash=dag_hash,
creating_job_id=self.job.id,
triggered_by=DagRunTriggeredByType.ASSET,
)
Expand Down Expand Up @@ -1750,20 +1750,18 @@ def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session: Session) ->
Return True if we determine that DAG still exists.
"""
latest_dag_version = DagVersion.get_latest_version(dag_run.dag_id, session=session)
if TYPE_CHECKING:
assert latest_dag_version
if dag_run.dag_version_id == latest_dag_version.id:
latest_version = SerializedDagModel.get_latest_version_hash(dag_run.dag_id, session=session)
if dag_run.dag_hash == latest_version:
self.log.debug("DAG %s not changed structure, skipping dagrun.verify_integrity", dag_run.dag_id)
return True

dag_run.dag_hash = latest_version

# Refresh the DAG
dag_run.dag = self.dagbag.get_dag(dag_id=dag_run.dag_id, session=session)
if not dag_run.dag:
return False

dag_run.dag_version = latest_dag_version

# Verify integrity also takes care of session.flush
dag_run.verify_integrity(session=session)
return True
Expand Down Expand Up @@ -2043,6 +2041,7 @@ def _cleanup_stale_dags(self, session: Session = NEW_SESSION) -> None:
In case one of DagProcessors is stopped (in case there are multiple of them
for different dag folders), its dags are never marked as inactive.
Also remove dags from SerializedDag table.
Executed on schedule only if [scheduler]standalone_dag_processor is True.
"""
self.log.debug("Checking dags not parsed within last %s seconds.", self._dag_stale_not_seen_duration)
Expand All @@ -2057,6 +2056,7 @@ def _cleanup_stale_dags(self, session: Session = NEW_SESSION) -> None:
self.log.info("Found (%d) stales dags not parsed after %s.", len(stale_dags), limit_lpt)
for dag in stale_dags:
dag.is_active = False
SerializedDagModel.remove_dag(dag_id=dag.dag_id, session=session)
session.flush()

@provide_session
Expand Down
158 changes: 0 additions & 158 deletions airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py

This file was deleted.

1 change: 0 additions & 1 deletion airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def import_all_models():

import airflow.models.asset
import airflow.models.backfill
import airflow.models.dag_version
import airflow.models.dagwarning
import airflow.models.errors
import airflow.models.serialized_dag
Expand Down
6 changes: 2 additions & 4 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
from airflow.api_connexion.exceptions import NotFound
from airflow.exceptions import AirflowException
from airflow.models.base import Base, StringID
from airflow.models.dag_version import DagVersion
from airflow.settings import json
from airflow.utils import timezone
from airflow.utils.session import create_session
Expand Down Expand Up @@ -201,7 +200,7 @@ def _create_backfill_dag_run(
)
)
return
dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)

dr = dag.create_dagrun(
triggered_by=DagRunTriggeredByType.BACKFILL,
execution_date=info.logical_date,
Expand All @@ -214,7 +213,6 @@ def _create_backfill_dag_run(
creating_job_id=None,
session=session,
backfill_id=backfill_id,
dag_version=dag_version,
)
session.add(
BackfillDagRun(
Expand Down Expand Up @@ -255,7 +253,7 @@ def _create_backfill(
from airflow.models.serialized_dag import SerializedDagModel

with create_session() as session:
serdag = session.scalar(SerializedDagModel.latest_item_select_object(dag_id))
serdag = session.get(SerializedDagModel, dag_id)
if not serdag:
raise NotFound(f"Could not find dag {dag_id}")
# todo: if dag has no schedule, raise
Expand Down
Loading

0 comments on commit b757bd8

Please sign in to comment.