Skip to content

Commit

Permalink
[EWT-499] Airflow Upgrade to 1.10.12 [CP] from 1.10.4+twtr : 48be0f9
Browse files Browse the repository at this point in the history
*Cherry-Pick contains[TWTR][EWT-350] Reverting the last commit partially (twitter-forks#62)
  • Loading branch information
msumit authored and Ayush Sethi committed Nov 9, 2020
1 parent 4221b46 commit 8404497
Showing 1 changed file with 10 additions and 11 deletions.
21 changes: 10 additions & 11 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1461,7 +1461,6 @@ def _execute_helper(self):
while (timezone.utcnow() - execute_start_time).total_seconds() < \
self.run_duration or self.run_duration < 0:
self.log.debug("Starting Loop...")
execute_start_time = timezone.utcnow()
loop_start_time = time.time()

if self.using_sqlite:
Expand Down Expand Up @@ -1544,16 +1543,6 @@ def _execute_helper(self):
self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
time.sleep(self._processor_poll_interval)

# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
if self.processor_agent.all_files_processed:
self.log.info(
"Deactivating DAGs that haven't been touched since %s",
execute_start_time.isoformat()
)
models.DAG.deactivate_stale_dags(execute_start_time)

if self.processor_agent.done:
self.log.info("Exiting scheduler loop as all files"
" have been processed {} times".format(self.num_runs))
Expand All @@ -1571,6 +1560,16 @@ def _execute_helper(self):
self.processor_agent.terminate()
self.log.info("All DAG processors terminated")

# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
if self.processor_agent.all_files_processed:
self.log.info(
"Deactivating DAGs that haven't been touched since %s",
execute_start_time.isoformat()
)
models.DAG.deactivate_stale_dags(execute_start_time)

self.executor.end()

settings.Session.remove()
Expand Down

0 comments on commit 8404497

Please sign in to comment.