Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue with stale dags not being marked inactive by Scheduler #11462

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1709,6 +1709,13 @@
type: string
example: ~
default: "False"
- name: stale_dag_cleanup_timeout
description: |
Timeout in seconds before Schduler marks a removed DAG as inactive in DB.
version_added: ~
type: string
example: ~
default: 600
- name: ldap
description: ~
options:
Expand Down
23 changes: 11 additions & 12 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1311,23 +1311,11 @@ def _execute(self) -> None:
# Start after resetting orphaned tasks to avoid stressing out DB.
self.processor_agent.start()

execute_start_time = timezone.utcnow()

self._run_scheduler_loop()

# Stop any processors
self.processor_agent.terminate()

# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
if self.processor_agent.all_files_processed:
self.log.info(
"Deactivating DAGs that haven't been touched since %s",
execute_start_time.isoformat()
)
models.DAG.deactivate_stale_dags(execute_start_time)

self.executor.end()

settings.Session.remove() # type: ignore
Expand Down Expand Up @@ -1364,6 +1352,7 @@ def _run_scheduler_loop(self) -> None:
#. Heartbeat executor
#. Execute queued tasks in executor asynchronously
#. Sync on the states of running tasks
#. Deactivate stale/deleted dags

Following is a graphic representation of these steps.

Expand All @@ -1374,6 +1363,7 @@ def _run_scheduler_loop(self) -> None:
if not self.processor_agent:
raise ValueError("Processor agent is not started.")
is_unit_test: bool = conf.getboolean('core', 'unit_test_mode')
stale_dag_cleanup_timeout: int = conf.getint('scheduler', 'stale_dag_cleanup_timeout', 600)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
stale_dag_cleanup_timeout: int = conf.getint('scheduler', 'stale_dag_cleanup_timeout', 600)
stale_dag_cleanup_timeout = timedelta(seconds=conf.getint('scheduler', 'stale_dag_cleanup_timeout', 600))

Lets not re-create the timedelta object more often than we need to.


for loop_count in itertools.count(start=1):
loop_start_time = time.time()
Expand Down Expand Up @@ -1409,6 +1399,15 @@ def _run_scheduler_loop(self) -> None:
# usage when "idle"
time.sleep(self._processor_poll_interval)

# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been deleted.
if self.processor_agent.all_files_processed:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashb do we still need to keep this check? I feel that we don't need to keep it anymore.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, not needed anymore, correct.

self.log.info(
"Deactivating DAGs that haven't been touched in %s seconds",
stale_dag_cleanup_timeout
)
models.DAG.deactivate_stale_dags(timezone.utcnow() - timedelta(seconds=stale_dag_cleanup_timeout))

if loop_count >= self.num_runs > 0:
self.log.info(
"Exiting scheduler loop as requested number of runs (%d - got to %d) has been reached",
Expand Down
9 changes: 5 additions & 4 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1842,16 +1842,17 @@ def deactivate_stale_dags(expiration_date, session=None):
:type expiration_date: datetime
:return: None
"""
for dag in session.query(
DagModel).filter(DagModel.last_scheduler_run < expiration_date,
DagModel.is_active).all():
query = session.query(DagModel).filter(DagModel.last_scheduler_run < expiration_date,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooh crap, I don't update this column anymore on master, so I don't think this is right. But there also isn't any global setting I do update anymore.

Even SerialzedDag.updated_at might not be right, as if the dag hasn't changed, that value won't be updated anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh is it? Then should it be taken care of while doing serialization itself? Like, mark the dag inactive if it doesn't find it anymore?

DagModel.is_active)

for dag in with_row_locks(query, of=DagModel, **skip_locked(session=session)).all():
log.info(
"Deactivating DAG ID %s since it was last touched by the scheduler at %s",
dag.dag_id, dag.last_scheduler_run.isoformat()
)
dag.is_active = False
session.merge(dag)
session.commit()
session.flush()

@staticmethod
@provide_session
Expand Down
4 changes: 4 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3488,6 +3488,10 @@ def test_send_sla_callbacks_to_processor_sla_with_task_slas(self):
full_filepath=dag.fileloc, dag_id=dag_id
)

def test_deactivate_stale_dags(self):
""""Test if Scheduler deactivates the deleted DAGs"""
pass #TODO


@pytest.mark.xfail(reason="Work out where this goes")
def test_task_with_upstream_skip_process_task_instances():
Expand Down