From 8404497be9d97a905b81a631c02aa5b50f508e73 Mon Sep 17 00:00:00 2001 From: Sumit Maheshwari Date: Tue, 20 Oct 2020 18:32:46 +0530 Subject: [PATCH] [EWT-499] Airflow Upgrade to 1.10.12 [CP] from 1.10.4+twtr : 48be0f91269d946dab3358b030a69a7c26e12499 *Cherry-Pick contains[TWTR][EWT-350] Reverting the last commit partially (#62) --- airflow/jobs/scheduler_job.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 86ab8c83ab3f6..51c6da276d17c 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1461,7 +1461,6 @@ def _execute_helper(self): while (timezone.utcnow() - execute_start_time).total_seconds() < \ self.run_duration or self.run_duration < 0: self.log.debug("Starting Loop...") - execute_start_time = timezone.utcnow() loop_start_time = time.time() if self.using_sqlite: @@ -1544,16 +1543,6 @@ def _execute_helper(self): self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval) time.sleep(self._processor_poll_interval) - # Verify that all files were processed, and if so, deactivate DAGs that - # haven't been touched by the scheduler as they likely have been - # deleted. - if self.processor_agent.all_files_processed: - self.log.info( - "Deactivating DAGs that haven't been touched since %s", - execute_start_time.isoformat() - ) - models.DAG.deactivate_stale_dags(execute_start_time) - if self.processor_agent.done: self.log.info("Exiting scheduler loop as all files" " have been processed {} times".format(self.num_runs)) @@ -1571,6 +1560,16 @@ def _execute_helper(self): self.processor_agent.terminate() self.log.info("All DAG processors terminated") + # Verify that all files were processed, and if so, deactivate DAGs that + # haven't been touched by the scheduler as they likely have been + # deleted. + if self.processor_agent.all_files_processed: + self.log.info( + "Deactivating DAGs that haven't been touched since %s", + execute_start_time.isoformat() + ) + models.DAG.deactivate_stale_dags(execute_start_time) + self.executor.end() settings.Session.remove()