Skip to content

Commit

Permalink
[EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 48be0f9
Browse files Browse the repository at this point in the history
[TWTR][EWT-350] Reverting the last commit partially (twitter-forks#62)
  • Loading branch information
msumit authored and Ayush Sethi committed Dec 21, 2020
1 parent 4ea1c3c commit 840dc0c
Showing 1 changed file with 10 additions and 11 deletions.
21 changes: 10 additions & 11 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1442,7 +1442,6 @@ def _execute_helper(self):
while (timezone.utcnow() - execute_start_time).total_seconds() < \
self.run_duration or self.run_duration < 0:
self.log.debug("Starting Loop...")
execute_start_time = timezone.utcnow()
loop_start_time = time.time()

if self.using_sqlite:
Expand Down Expand Up @@ -1526,16 +1525,6 @@ def _execute_helper(self):
self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
time.sleep(self._processor_poll_interval)

# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
if self.processor_agent.all_files_processed:
self.log.info(
"Deactivating DAGs that haven't been touched since %s",
execute_start_time.isoformat()
)
models.DAG.deactivate_stale_dags(execute_start_time)

if self.processor_agent.done:
self.log.info("Exiting scheduler loop as all files"
" have been processed {} times".format(self.num_runs))
Expand All @@ -1553,6 +1542,16 @@ def _execute_helper(self):
self.processor_agent.terminate()
self.log.info("All DAG processors terminated")

# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
if self.processor_agent.all_files_processed:
self.log.info(
"Deactivating DAGs that haven't been touched since %s",
execute_start_time.isoformat()
)
models.DAG.deactivate_stale_dags(execute_start_time)

self.executor.end()

settings.Session.remove()
Expand Down

0 comments on commit 840dc0c

Please sign in to comment.