From 81b34677929b72cd59ab1b1f11597a1720d31020 Mon Sep 17 00:00:00 2001 From: Sumit Maheshwari Date: Mon, 12 Oct 2020 14:57:18 +0530 Subject: [PATCH] Fix issue with stale dags not being marked inactive by Scheduler --- airflow/config_templates/config.yml | 7 +++++++ airflow/jobs/scheduler_job.py | 23 +++++++++++------------ airflow/models/dag.py | 9 +++++---- tests/jobs/test_scheduler_job.py | 4 ++++ 4 files changed, 27 insertions(+), 16 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 6dfe6ec512a4c..c331b119265ee 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1709,6 +1709,13 @@ type: string example: ~ default: "False" + - name: stale_dag_cleanup_timeout + description: | + Timeout in seconds before Schduler marks a removed DAG as inactive in DB. + version_added: ~ + type: string + example: ~ + default: 600 - name: ldap description: ~ options: diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 8ab3829944794..edd4556d40272 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1311,23 +1311,11 @@ def _execute(self) -> None: # Start after resetting orphaned tasks to avoid stressing out DB. self.processor_agent.start() - execute_start_time = timezone.utcnow() - self._run_scheduler_loop() # Stop any processors self.processor_agent.terminate() - # Verify that all files were processed, and if so, deactivate DAGs that - # haven't been touched by the scheduler as they likely have been - # deleted. - if self.processor_agent.all_files_processed: - self.log.info( - "Deactivating DAGs that haven't been touched since %s", - execute_start_time.isoformat() - ) - models.DAG.deactivate_stale_dags(execute_start_time) - self.executor.end() settings.Session.remove() # type: ignore @@ -1364,6 +1352,7 @@ def _run_scheduler_loop(self) -> None: #. Heartbeat executor #. Execute queued tasks in executor asynchronously #. Sync on the states of running tasks + #. Deactivate stale/deleted dags Following is a graphic representation of these steps. @@ -1374,6 +1363,7 @@ def _run_scheduler_loop(self) -> None: if not self.processor_agent: raise ValueError("Processor agent is not started.") is_unit_test: bool = conf.getboolean('core', 'unit_test_mode') + stale_dag_cleanup_timeout: int = conf.getint('scheduler', 'stale_dag_cleanup_timeout', 600) for loop_count in itertools.count(start=1): loop_start_time = time.time() @@ -1409,6 +1399,15 @@ def _run_scheduler_loop(self) -> None: # usage when "idle" time.sleep(self._processor_poll_interval) + # Verify that all files were processed, and if so, deactivate DAGs that + # haven't been touched by the scheduler as they likely have been deleted. + if self.processor_agent.all_files_processed: + self.log.info( + "Deactivating DAGs that haven't been touched in %s seconds", + stale_dag_cleanup_timeout + ) + models.DAG.deactivate_stale_dags(timezone.utcnow() - timedelta(seconds=stale_dag_cleanup_timeout)) + if loop_count >= self.num_runs > 0: self.log.info( "Exiting scheduler loop as requested number of runs (%d - got to %d) has been reached", diff --git a/airflow/models/dag.py b/airflow/models/dag.py index b50f2a184b0ed..12f93e97b9af3 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1842,16 +1842,17 @@ def deactivate_stale_dags(expiration_date, session=None): :type expiration_date: datetime :return: None """ - for dag in session.query( - DagModel).filter(DagModel.last_scheduler_run < expiration_date, - DagModel.is_active).all(): + query = session.query(DagModel).filter(DagModel.last_scheduler_run < expiration_date, + DagModel.is_active) + + for dag in with_row_locks(query, of=DagModel, **skip_locked(session=session)).all(): log.info( "Deactivating DAG ID %s since it was last touched by the scheduler at %s", dag.dag_id, dag.last_scheduler_run.isoformat() ) dag.is_active = False session.merge(dag) - session.commit() + session.flush() @staticmethod @provide_session diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index b46c80d05af59..91e66f9d28d95 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3488,6 +3488,10 @@ def test_send_sla_callbacks_to_processor_sla_with_task_slas(self): full_filepath=dag.fileloc, dag_id=dag_id ) + def test_deactivate_stale_dags(self): + """"Test if Scheduler deactivates the deleted DAGs""" + pass #TODO + @pytest.mark.xfail(reason="Work out where this goes") def test_task_with_upstream_skip_process_task_instances():