Skip to content

Commit

Permalink
[AIRFLOW-2027] Only trigger sleep in scheduler after all files have p…
Browse files Browse the repository at this point in the history
…arsed
  • Loading branch information
aoen committed Mar 7, 2018
1 parent 0f9f460 commit 24411e9
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 85 deletions.
3 changes: 3 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ indefinitely. This is only available on the command line.
#### min_file_process_interval
After how much time should an updated DAG be picked up from the filesystem.

#### min_file_parsing_loop_time
How many seconds to wait between file-parsing loops to prevent the logs from being spammed.

#### dag_dir_list_interval
How often the scheduler should relist the contents of the DAG directory. If you experience that while developing your
dags are not being picked up, have a look at this number and decrease it when necessary.
Expand Down
3 changes: 3 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ run_duration = -1
# after how much time a new DAGs should be picked up from the filesystem
min_file_process_interval = 0

# How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
min_file_parsing_loop_time = 1

dag_dir_list_interval = 300

# How often should stats be printed to the logs
Expand Down
30 changes: 19 additions & 11 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,8 @@ def __init__(
num_runs=-1,
file_process_interval=conf.getint('scheduler',
'min_file_process_interval'),
processor_poll_interval=1.0,
min_file_parsing_loop_time=conf.getint('scheduler',
'min_file_parsing_loop_time'),
run_duration=None,
do_pickle=False,
log=None,
Expand All @@ -527,8 +528,6 @@ def __init__(
:type subdir: unicode
:param num_runs: The number of times to try to schedule each DAG file.
-1 for unlimited within the run_duration.
:param processor_poll_interval: The number of seconds to wait between
polls of running processors
:param run_duration: how long to run (in seconds) before exiting
:type run_duration: int
:param do_pickle: once a DAG object is obtained by executing the Python
Expand All @@ -545,7 +544,6 @@ def __init__(

self.num_runs = num_runs
self.run_duration = run_duration
self._processor_poll_interval = processor_poll_interval

self.do_pickle = do_pickle
super(SchedulerJob, self).__init__(*args, **kwargs)
Expand Down Expand Up @@ -574,6 +572,10 @@ def __init__(
# to 3 minutes.
self.file_process_interval = file_process_interval

# Wait until at least this many seconds have passed before parsing files once all
# files have finished parsing.
self.min_file_parsing_loop_time = min_file_parsing_loop_time

self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
if run_duration is None:
self.run_duration = conf.getint('scheduler',
Expand Down Expand Up @@ -1521,11 +1523,16 @@ def _execute(self):
# DAGs in parallel. By processing them in separate processes,
# we can get parallelism and isolation from potentially harmful
# user code.
self.log.info("Processing files using up to %s processes at a time", self.max_threads)
self.log.info("Processing files using up to %s processes at a time",
self.max_threads)
self.log.info("Running execute loop for %s seconds", self.run_duration)
self.log.info("Processing each file at most %s times", self.num_runs)
self.log.info("Process each file at most once every %s seconds", self.file_process_interval)
self.log.info("Checking for new files in %s every %s seconds", self.subdir, self.dag_dir_list_interval)
self.log.info("Process each file at most once every %s seconds",
self.file_process_interval)
self.log.info("Wait until at least %s seconds have passed between file parsing "
"loops", self.min_file_parsing_loop_time)
self.log.info("Checking for new files in %s every %s seconds",
self.subdir, self.dag_dir_list_interval)

# Build up a list of Python files that could contain DAGs
self.log.info("Searching for files in %s", self.subdir)
Expand All @@ -1541,6 +1548,7 @@ def processor_factory(file_path):
known_file_paths,
self.max_threads,
self.file_process_interval,
self.min_file_parsing_loop_time,
self.num_runs,
processor_factory)

Expand Down Expand Up @@ -1684,13 +1692,13 @@ def _execute_helper(self, processor_manager):
last_stat_print_time = timezone.utcnow()

loop_end_time = time.time()
self.log.debug("Ran scheduling loop in %.2f seconds", loop_end_time - loop_start_time)
self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
time.sleep(self._processor_poll_interval)
self.log.debug("Ran scheduling loop in %.2f seconds",
loop_end_time - loop_start_time)

# Exit early for a test mode
if processor_manager.max_runs_reached():
self.log.info("Exiting loop as all files have been processed %s times", self.num_runs)
self.log.info("Exiting loop as all files have been processed %s times",
self.num_runs)
break

# Stop any processors
Expand Down
25 changes: 21 additions & 4 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ def __init__(self,
file_paths,
parallelism,
process_file_interval,
min_file_parsing_loop_time,
max_runs,
processor_factory):
"""
Expand All @@ -317,6 +318,9 @@ def __init__(self,
:param process_file_interval: process a file at most once every this
many seconds
:type process_file_interval: float
:param min_file_parsing_loop_time: wait until at least this many seconds have
passed before parsing files once all files have finished parsing.
:type min_file_parsing_loop_time: float
:param max_runs: The number of times to parse and schedule each file. -1
for unlimited.
:type max_runs: int
Expand All @@ -332,6 +336,7 @@ def __init__(self,
self._dag_directory = dag_directory
self._max_runs = max_runs
self._process_file_interval = process_file_interval
self._min_file_parsing_loop_time = min_file_parsing_loop_time
self._processor_factory = processor_factory
# Map from file path to the processor
self._processors = {}
Expand Down Expand Up @@ -502,12 +507,24 @@ def heartbeat(self):
file_paths_in_progress = self._processors.keys()
now = timezone.utcnow()
file_paths_recently_processed = []

longest_parse_duration = 0
for file_path in self._file_paths:
last_finish_time = self.get_last_finish_time(file_path)
if (last_finish_time is not None and
(now - last_finish_time).total_seconds() <
self._process_file_interval):
file_paths_recently_processed.append(file_path)
if last_finish_time is not None:
duration = now - last_finish_time
longest_parse_duration = max(duration.total_seconds(),
longest_parse_duration)
if duration.total_seconds() < self._process_file_interval:
file_paths_recently_processed.append(file_path)

sleep_length = max(self._min_file_parsing_loop_time - longest_parse_duration,
0)
if sleep_length > 0:
self.log.debug("Sleeping for %.2f seconds to prevent excessive "
"logging",
sleep_length)
time.sleep(sleep_length)

files_paths_at_run_limit = [file_path
for file_path, num_runs in self._run_count.items()
Expand Down
5 changes: 1 addition & 4 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,7 @@ def execute(*args, **kwargs):


class CoreTest(unittest.TestCase):
# These defaults make the test faster to run
default_scheduler_args = {"file_process_interval": 0,
"processor_poll_interval": 0.5,
"num_runs": 1}
default_scheduler_args = {"num_runs": 1}

def setUp(self):
configuration.load_test_config()
Expand Down
Loading

0 comments on commit 24411e9

Please sign in to comment.