diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 86ab8c83ab3f6..51c6da276d17c 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1461,7 +1461,6 @@ def _execute_helper(self): while (timezone.utcnow() - execute_start_time).total_seconds() < \ self.run_duration or self.run_duration < 0: self.log.debug("Starting Loop...") - execute_start_time = timezone.utcnow() loop_start_time = time.time() if self.using_sqlite: @@ -1544,16 +1543,6 @@ def _execute_helper(self): self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval) time.sleep(self._processor_poll_interval) - # Verify that all files were processed, and if so, deactivate DAGs that - # haven't been touched by the scheduler as they likely have been - # deleted. - if self.processor_agent.all_files_processed: - self.log.info( - "Deactivating DAGs that haven't been touched since %s", - execute_start_time.isoformat() - ) - models.DAG.deactivate_stale_dags(execute_start_time) - if self.processor_agent.done: self.log.info("Exiting scheduler loop as all files" " have been processed {} times".format(self.num_runs)) @@ -1571,6 +1560,16 @@ def _execute_helper(self): self.processor_agent.terminate() self.log.info("All DAG processors terminated") + # Verify that all files were processed, and if so, deactivate DAGs that + # haven't been touched by the scheduler as they likely have been + # deleted. + if self.processor_agent.all_files_processed: + self.log.info( + "Deactivating DAGs that haven't been touched since %s", + execute_start_time.isoformat() + ) + models.DAG.deactivate_stale_dags(execute_start_time) + self.executor.end() settings.Session.remove()