Skip to content

Commit

Permalink
[AIRFLOW-2895] Prevent scheduler from spamming heartbeats/logs
Browse files Browse the repository at this point in the history
Reverts most of AIRFLOW-2027 until the issues with it can be fixed.

Closes apache#3747 from aoen/revert_min_file_parsing_time_commit
  • Loading branch information
aoen authored and ashb committed Oct 22, 2018
1 parent 948b09d commit 4040ce2
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 48 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ Currently **officially** using Airflow:
1. [Tile](https://tile.com/) [[@ranjanmanish](https://github.com/ranjanmanish)]
1. [Tokopedia](https://www.tokopedia.com/) [@topedmaria](https://github.com/topedmaria)
1. [Twine Labs](https://www.twinelabs.com/) [[@ivorpeles](https://github.com/ivorpeles)]
1. [Twitter](https://www.twitter.com/) [[@aoen](https://github.com/aoen)]
1. [T2 Systems](http://t2systems.com) [[@unclaimedpants](https://github.com/unclaimedpants)]
1. [Ubisoft](https://www.ubisoft.com/) [[@Walkoss](https://github.com/Walkoss)]
1. [United Airlines](https://www.united.com/) [[@ilopezfr](https://github.com/ilopezfr)]
Expand Down
6 changes: 6 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ assists users migrating to a new version.

## Airflow Master

### min_file_parsing_loop_time config option temporarily disabled

The scheduler.min_file_parsing_loop_time config option has been temporarily removed due to
some bugs.

## Airflow 1.10

Installation and upgrading requires setting `SLUGIFY_USES_TEXT_UNIDECODE=yes` in your environment or
Expand Down Expand Up @@ -386,6 +391,7 @@ indefinitely. This is only available on the command line.
After how much time should an updated DAG be picked up from the filesystem.

#### min_file_parsing_loop_time
CURRENTLY DISABLED DUE TO A BUG
How many seconds to wait between file-parsing loops to prevent the logs from being spammed.

#### dag_dir_list_interval
Expand Down
4 changes: 1 addition & 3 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,7 @@ run_duration = -1
# after how much time a new DAGs should be picked up from the filesystem
min_file_process_interval = 0

# How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
min_file_parsing_loop_time = 1

# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 300

# How often should stats be printed to the logs
Expand Down
42 changes: 23 additions & 19 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,7 @@ def __init__(
num_runs=-1,
file_process_interval=conf.getint('scheduler',
'min_file_process_interval'),
min_file_parsing_loop_time=conf.getint('scheduler',
'min_file_parsing_loop_time'),
processor_poll_interval=1.0,
run_duration=None,
do_pickle=False,
log=None,
Expand All @@ -548,6 +547,8 @@ def __init__(
:type subdir: unicode
:param num_runs: The number of times to try to schedule each DAG file.
-1 for unlimited within the run_duration.
:param processor_poll_interval: The number of seconds to wait between
polls of running processors
:param run_duration: how long to run (in seconds) before exiting
:type run_duration: int
:param do_pickle: once a DAG object is obtained by executing the Python
Expand All @@ -564,6 +565,7 @@ def __init__(

self.num_runs = num_runs
self.run_duration = run_duration
self._processor_poll_interval = processor_poll_interval

self.do_pickle = do_pickle
super(SchedulerJob, self).__init__(*args, **kwargs)
Expand Down Expand Up @@ -592,10 +594,7 @@ def __init__(
# to 3 minutes.
self.file_process_interval = file_process_interval

# Wait until at least this many seconds have passed before parsing files once all
# files have finished parsing.
self.min_file_parsing_loop_time = min_file_parsing_loop_time

self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
if run_duration is None:
self.run_duration = conf.getint('scheduler',
'run_duration')
Expand Down Expand Up @@ -1550,16 +1549,18 @@ def _execute(self):
# DAGs in parallel. By processing them in separate processes,
# we can get parallelism and isolation from potentially harmful
# user code.
self.log.info("Processing files using up to %s processes at a time",
self.max_threads)
self.log.info(
"Processing files using up to %s processes at a time",
self.max_threads)
self.log.info("Running execute loop for %s seconds", self.run_duration)
self.log.info("Processing each file at most %s times", self.num_runs)
self.log.info("Process each file at most once every %s seconds",
self.file_process_interval)
self.log.info("Wait until at least %s seconds have passed between file parsing "
"loops", self.min_file_parsing_loop_time)
self.log.info("Checking for new files in %s every %s seconds",
self.subdir, self.dag_dir_list_interval)
self.log.info(
"Process each file at most once every %s seconds",
self.file_process_interval)
self.log.info(
"Checking for new files in %s every %s seconds",
self.subdir,
self.dag_dir_list_interval)

# Build up a list of Python files that could contain DAGs
self.log.info("Searching for files in %s", self.subdir)
Expand All @@ -1575,7 +1576,6 @@ def processor_factory(file_path):
known_file_paths,
self.max_threads,
self.file_process_interval,
self.min_file_parsing_loop_time,
self.num_runs,
processor_factory)

Expand Down Expand Up @@ -1722,13 +1722,17 @@ def _execute_helper(self, processor_manager):
last_stat_print_time = timezone.utcnow()

loop_end_time = time.time()
self.log.debug("Ran scheduling loop in %.2f seconds",
loop_end_time - loop_start_time)
self.log.debug(
"Ran scheduling loop in %.2f seconds",
loop_end_time - loop_start_time)
self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
time.sleep(self._processor_poll_interval)

# Exit early for a test mode
if processor_manager.max_runs_reached():
self.log.info("Exiting loop as all files have been processed %s times",
self.num_runs)
self.log.info(
"Exiting loop as all files have been processed %s times",
self.num_runs)
break

# Stop any processors
Expand Down
25 changes: 4 additions & 21 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,6 @@ def __init__(self,
file_paths,
parallelism,
process_file_interval,
min_file_parsing_loop_time,
max_runs,
processor_factory):
"""
Expand All @@ -340,9 +339,6 @@ def __init__(self,
:param process_file_interval: process a file at most once every this
many seconds
:type process_file_interval: float
:param min_file_parsing_loop_time: wait until at least this many seconds have
passed before parsing files once all files have finished parsing.
:type min_file_parsing_loop_time: float
:param max_runs: The number of times to parse and schedule each file. -1
for unlimited.
:type max_runs: int
Expand All @@ -358,7 +354,6 @@ def __init__(self,
self._dag_directory = dag_directory
self._max_runs = max_runs
self._process_file_interval = process_file_interval
self._min_file_parsing_loop_time = min_file_parsing_loop_time
self._processor_factory = processor_factory
# Map from file path to the processor
self._processors = {}
Expand Down Expand Up @@ -529,24 +524,12 @@ def heartbeat(self):
file_paths_in_progress = self._processors.keys()
now = timezone.utcnow()
file_paths_recently_processed = []

longest_parse_duration = 0
for file_path in self._file_paths:
last_finish_time = self.get_last_finish_time(file_path)
if last_finish_time is not None:
duration = now - last_finish_time
longest_parse_duration = max(duration.total_seconds(),
longest_parse_duration)
if duration.total_seconds() < self._process_file_interval:
file_paths_recently_processed.append(file_path)

sleep_length = max(self._min_file_parsing_loop_time - longest_parse_duration,
0)
if sleep_length > 0:
self.log.debug("Sleeping for %.2f seconds to prevent excessive "
"logging",
sleep_length)
time.sleep(sleep_length)
if (last_finish_time is not None and
(now - last_finish_time).total_seconds() <
self._process_file_interval):
file_paths_recently_processed.append(file_path)

files_paths_at_run_limit = [file_path
for file_path, num_runs in self._run_count.items()
Expand Down
3 changes: 0 additions & 3 deletions scripts/ci/kubernetes/kube/configmaps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ data:
statsd_port = 8125
statsd_prefix = airflow
# How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
min_file_parsing_loop_time = 1
print_stats_interval = 30
scheduler_zombie_task_threshold = 300
max_tis_per_query = 0
Expand Down
2 changes: 0 additions & 2 deletions tests/utils/test_dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
parallelism=1,
process_file_interval=1,
max_runs=1,
min_file_parsing_loop_time=0,
processor_factory=MagicMock().return_value)

mock_processor = MagicMock()
Expand All @@ -52,7 +51,6 @@ def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self):
parallelism=1,
process_file_interval=1,
max_runs=1,
min_file_parsing_loop_time=0,
processor_factory=MagicMock().return_value)

mock_processor = MagicMock()
Expand Down

0 comments on commit 4040ce2

Please sign in to comment.