From 4040ce2bf4d2d51ca226c9b7b169570b224f0990 Mon Sep 17 00:00:00 2001 From: Dan Davydov Date: Mon, 20 Aug 2018 09:14:22 -0400 Subject: [PATCH] [AIRFLOW-2895] Prevent scheduler from spamming heartbeats/logs Reverts most of AIRFLOW-2027 until the issues with it can be fixed. Closes #3747 from aoen/revert_min_file_parsing_time_commit --- README.md | 1 + UPDATING.md | 6 +++ airflow/config_templates/default_airflow.cfg | 4 +- airflow/jobs.py | 42 +++++++++++--------- airflow/utils/dag_processing.py | 25 ++---------- scripts/ci/kubernetes/kube/configmaps.yaml | 3 -- tests/utils/test_dag_processing.py | 2 - 7 files changed, 35 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index e68d26cc8cfa2..f5fe2e1bdcd4c 100644 --- a/README.md +++ b/README.md @@ -233,6 +233,7 @@ Currently **officially** using Airflow: 1. [Tile](https://tile.com/) [[@ranjanmanish](https://github.com/ranjanmanish)] 1. [Tokopedia](https://www.tokopedia.com/) [@topedmaria](https://github.com/topedmaria) 1. [Twine Labs](https://www.twinelabs.com/) [[@ivorpeles](https://github.com/ivorpeles)] +1. [Twitter](https://www.twitter.com/) [[@aoen](https://github.com/aoen)] 1. [T2 Systems](http://t2systems.com) [[@unclaimedpants](https://github.com/unclaimedpants)] 1. [Ubisoft](https://www.ubisoft.com/) [[@Walkoss](https://github.com/Walkoss)] 1. [United Airlines](https://www.united.com/) [[@ilopezfr](https://github.com/ilopezfr)] diff --git a/UPDATING.md b/UPDATING.md index 3b47882212225..d00e4f8742f26 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -5,6 +5,11 @@ assists users migrating to a new version. ## Airflow Master +### min_file_parsing_loop_time config option temporarily disabled + +The scheduler.min_file_parsing_loop_time config option has been temporarily removed due to +some bugs. + ## Airflow 1.10 Installation and upgrading requires setting `SLUGIFY_USES_TEXT_UNIDECODE=yes` in your environment or @@ -386,6 +391,7 @@ indefinitely. This is only available on the command line. After how much time should an updated DAG be picked up from the filesystem. #### min_file_parsing_loop_time +CURRENTLY DISABLED DUE TO A BUG How many seconds to wait between file-parsing loops to prevent the logs from being spammed. #### dag_dir_list_interval diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 9a15dc0d117e8..deb67bb96ca25 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -428,9 +428,7 @@ run_duration = -1 # after how much time a new DAGs should be picked up from the filesystem min_file_process_interval = 0 -# How many seconds to wait between file-parsing loops to prevent the logs from being spammed. -min_file_parsing_loop_time = 1 - +# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes. dag_dir_list_interval = 300 # How often should stats be printed to the logs diff --git a/airflow/jobs.py b/airflow/jobs.py index e2cf830aca296..88aa643c50152 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -532,8 +532,7 @@ def __init__( num_runs=-1, file_process_interval=conf.getint('scheduler', 'min_file_process_interval'), - min_file_parsing_loop_time=conf.getint('scheduler', - 'min_file_parsing_loop_time'), + processor_poll_interval=1.0, run_duration=None, do_pickle=False, log=None, @@ -548,6 +547,8 @@ def __init__( :type subdir: unicode :param num_runs: The number of times to try to schedule each DAG file. -1 for unlimited within the run_duration. + :param processor_poll_interval: The number of seconds to wait between + polls of running processors :param run_duration: how long to run (in seconds) before exiting :type run_duration: int :param do_pickle: once a DAG object is obtained by executing the Python @@ -564,6 +565,7 @@ def __init__( self.num_runs = num_runs self.run_duration = run_duration + self._processor_poll_interval = processor_poll_interval self.do_pickle = do_pickle super(SchedulerJob, self).__init__(*args, **kwargs) @@ -592,10 +594,7 @@ def __init__( # to 3 minutes. self.file_process_interval = file_process_interval - # Wait until at least this many seconds have passed before parsing files once all - # files have finished parsing. - self.min_file_parsing_loop_time = min_file_parsing_loop_time - + self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query') if run_duration is None: self.run_duration = conf.getint('scheduler', 'run_duration') @@ -1550,16 +1549,18 @@ def _execute(self): # DAGs in parallel. By processing them in separate processes, # we can get parallelism and isolation from potentially harmful # user code. - self.log.info("Processing files using up to %s processes at a time", - self.max_threads) + self.log.info( + "Processing files using up to %s processes at a time", + self.max_threads) self.log.info("Running execute loop for %s seconds", self.run_duration) self.log.info("Processing each file at most %s times", self.num_runs) - self.log.info("Process each file at most once every %s seconds", - self.file_process_interval) - self.log.info("Wait until at least %s seconds have passed between file parsing " - "loops", self.min_file_parsing_loop_time) - self.log.info("Checking for new files in %s every %s seconds", - self.subdir, self.dag_dir_list_interval) + self.log.info( + "Process each file at most once every %s seconds", + self.file_process_interval) + self.log.info( + "Checking for new files in %s every %s seconds", + self.subdir, + self.dag_dir_list_interval) # Build up a list of Python files that could contain DAGs self.log.info("Searching for files in %s", self.subdir) @@ -1575,7 +1576,6 @@ def processor_factory(file_path): known_file_paths, self.max_threads, self.file_process_interval, - self.min_file_parsing_loop_time, self.num_runs, processor_factory) @@ -1722,13 +1722,17 @@ def _execute_helper(self, processor_manager): last_stat_print_time = timezone.utcnow() loop_end_time = time.time() - self.log.debug("Ran scheduling loop in %.2f seconds", - loop_end_time - loop_start_time) + self.log.debug( + "Ran scheduling loop in %.2f seconds", + loop_end_time - loop_start_time) + self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval) + time.sleep(self._processor_poll_interval) # Exit early for a test mode if processor_manager.max_runs_reached(): - self.log.info("Exiting loop as all files have been processed %s times", - self.num_runs) + self.log.info( + "Exiting loop as all files have been processed %s times", + self.num_runs) break # Stop any processors diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index e236397da0284..43948837065e3 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -326,7 +326,6 @@ def __init__(self, file_paths, parallelism, process_file_interval, - min_file_parsing_loop_time, max_runs, processor_factory): """ @@ -340,9 +339,6 @@ def __init__(self, :param process_file_interval: process a file at most once every this many seconds :type process_file_interval: float - :param min_file_parsing_loop_time: wait until at least this many seconds have - passed before parsing files once all files have finished parsing. - :type min_file_parsing_loop_time: float :param max_runs: The number of times to parse and schedule each file. -1 for unlimited. :type max_runs: int @@ -358,7 +354,6 @@ def __init__(self, self._dag_directory = dag_directory self._max_runs = max_runs self._process_file_interval = process_file_interval - self._min_file_parsing_loop_time = min_file_parsing_loop_time self._processor_factory = processor_factory # Map from file path to the processor self._processors = {} @@ -529,24 +524,12 @@ def heartbeat(self): file_paths_in_progress = self._processors.keys() now = timezone.utcnow() file_paths_recently_processed = [] - - longest_parse_duration = 0 for file_path in self._file_paths: last_finish_time = self.get_last_finish_time(file_path) - if last_finish_time is not None: - duration = now - last_finish_time - longest_parse_duration = max(duration.total_seconds(), - longest_parse_duration) - if duration.total_seconds() < self._process_file_interval: - file_paths_recently_processed.append(file_path) - - sleep_length = max(self._min_file_parsing_loop_time - longest_parse_duration, - 0) - if sleep_length > 0: - self.log.debug("Sleeping for %.2f seconds to prevent excessive " - "logging", - sleep_length) - time.sleep(sleep_length) + if (last_finish_time is not None and + (now - last_finish_time).total_seconds() < + self._process_file_interval): + file_paths_recently_processed.append(file_path) files_paths_at_run_limit = [file_path for file_path, num_runs in self._run_count.items() diff --git a/scripts/ci/kubernetes/kube/configmaps.yaml b/scripts/ci/kubernetes/kube/configmaps.yaml index 7b91aa2e8724c..1673c3e54b577 100644 --- a/scripts/ci/kubernetes/kube/configmaps.yaml +++ b/scripts/ci/kubernetes/kube/configmaps.yaml @@ -57,9 +57,6 @@ data: statsd_port = 8125 statsd_prefix = airflow - # How many seconds to wait between file-parsing loops to prevent the logs from being spammed. - min_file_parsing_loop_time = 1 - print_stats_interval = 30 scheduler_zombie_task_threshold = 300 max_tis_per_query = 0 diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py index 7abe7efe9b35c..f29e384b8c657 100644 --- a/tests/utils/test_dag_processing.py +++ b/tests/utils/test_dag_processing.py @@ -32,7 +32,6 @@ def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self): parallelism=1, process_file_interval=1, max_runs=1, - min_file_parsing_loop_time=0, processor_factory=MagicMock().return_value) mock_processor = MagicMock() @@ -52,7 +51,6 @@ def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self): parallelism=1, process_file_interval=1, max_runs=1, - min_file_parsing_loop_time=0, processor_factory=MagicMock().return_value) mock_processor = MagicMock()