From 78959ca65581667d838e66b56ac03f821fd46b66 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Fri, 12 Oct 2018 10:13:05 +0100 Subject: [PATCH] Handle percents signs in configs for airflow run (#4029) * [AIRFLOW-3178] Don't mask defaults() function from ConfigParser ConfigParser (the base class for AirflowConfigParser) expects defaults() to be a function - so when we re-assign it to be a property some of the methods from ConfigParser no longer work. * [AIRFLOW-3178] Correctly escape percent signs when creating temp config Otherwise we have a problem when we come to use those values. * [AIRFLOW-3178] Use os.chmod instead of shelling out There's no need to run another process for a built in Python function. This also removes a possible race condition that would make temporary config file be readable by more than the airflow or run-as user The exact behaviour would depend on the umask we run under, and the primary group of our user, likely this would mean the file was readably by members of the airflow group (which in most cases would be just the airflow user). To remove any such possibility we chmod the file before we write to it --- airflow/configuration.py | 74 ++++++++------- airflow/task/task_runner/base_task_runner.py | 6 -- airflow/utils/configuration.py | 6 +- docs/security.rst | 7 ++ run_unit_tests.sh | 7 -- scripts/ci/5-run-tests.sh | 95 ++++++++++++++++++++ tests/configuration.py | 34 ++++++- 7 files changed, 179 insertions(+), 50 deletions(-) create mode 100755 scripts/ci/5-run-tests.sh diff --git a/airflow/configuration.py b/airflow/configuration.py index a9a68124cab58..b92e93f3feaa5 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -157,9 +157,9 @@ class AirflowConfigParser(ConfigParser): def __init__(self, default_config=None, *args, **kwargs): super(AirflowConfigParser, self).__init__(*args, **kwargs) - self.defaults = ConfigParser(*args, **kwargs) + self.airflow_defaults = ConfigParser(*args, **kwargs) if default_config is not None: - self.defaults.read_string(default_config) + self.airflow_defaults.read_string(default_config) self.is_validated = False @@ -249,9 +249,9 @@ def get(self, section, key, **kwargs): return option # ...then the default config - if self.defaults.has_option(section, key): + if self.airflow_defaults.has_option(section, key): return expand_env_var( - self.defaults.get(section, key, **kwargs)) + self.airflow_defaults.get(section, key, **kwargs)) else: log.warning( @@ -307,8 +307,8 @@ def remove_option(self, section, option, remove_default=True): if super(AirflowConfigParser, self).has_option(section, option): super(AirflowConfigParser, self).remove_option(section, option) - if self.defaults.has_option(section, option) and remove_default: - self.defaults.remove_option(section, option) + if self.airflow_defaults.has_option(section, option) and remove_default: + self.airflow_defaults.remove_option(section, option) def getsection(self, section): """ @@ -317,10 +317,11 @@ def getsection(self, section): :param section: section from the config :return: dict """ - if section not in self._sections and section not in self.defaults._sections: + if (section not in self._sections and + section not in self.airflow_defaults._sections): return None - _section = copy.deepcopy(self.defaults._sections[section]) + _section = copy.deepcopy(self.airflow_defaults._sections[section]) if section in self._sections: _section.update(copy.deepcopy(self._sections[section])) @@ -339,30 +340,35 @@ def getsection(self, section): _section[key] = val return _section - def as_dict(self, display_source=False, display_sensitive=False): + def as_dict( + self, display_source=False, display_sensitive=False, raw=False): """ Returns the current configuration as an OrderedDict of OrderedDicts. :param display_source: If False, the option value is returned. If True, a tuple of (option_value, source) is returned. Source is either - 'airflow.cfg' or 'default'. + 'airflow.cfg', 'default', 'env var', or 'cmd'. :type display_source: bool :param display_sensitive: If True, the values of options set by env vars and bash commands will be displayed. If False, those options are shown as '< hidden >' :type display_sensitive: bool + :param raw: Should the values be output as interpolated values, or the + "raw" form that can be fed back in to ConfigParser + :type raw: bool """ - cfg = copy.deepcopy(self.defaults._sections) - cfg.update(copy.deepcopy(self._sections)) - - # remove __name__ (affects Python 2 only) - for options in cfg.values(): - options.pop('__name__', None) - - # add source - if display_source: - for section in cfg: - for k, v in cfg[section].items(): - cfg[section][k] = (v, 'airflow config') + cfg = {} + configs = [ + ('default', self.airflow_defaults), + ('airflow.cfg', self), + ] + + for (source_name, config) in configs: + for section in config.sections(): + sect = cfg.setdefault(section, OrderedDict()) + for (k, val) in config.items(section=section, raw=raw): + if display_source: + val = (val, source_name) + sect[k] = val # add env vars and overwrite because they have priority for ev in [ev for ev in os.environ if ev.startswith('AIRFLOW__')]: @@ -370,16 +376,15 @@ def as_dict(self, display_source=False, display_sensitive=False): _, section, key = ev.split('__') opt = self._get_env_var_option(section, key) except ValueError: - opt = None - if opt: - if ( - not display_sensitive and - ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'): - opt = '< hidden >' - if display_source: - opt = (opt, 'env var') - cfg.setdefault(section.lower(), OrderedDict()).update( - {key.lower(): opt}) + continue + if (not display_sensitive and ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'): + opt = '< hidden >' + elif raw: + opt = opt.replace('%', '%%') + if display_source: + opt = (opt, 'env var') + cfg.setdefault(section.lower(), OrderedDict()).update( + {key.lower(): opt}) # add bash commands for (section, key) in self.as_command_stdout: @@ -388,8 +393,11 @@ def as_dict(self, display_source=False, display_sensitive=False): if not display_sensitive: opt = '< hidden >' if display_source: - opt = (opt, 'bash cmd') + opt = (opt, 'cmd') + elif raw: + opt = opt.replace('%', '%%') cfg.setdefault(section, OrderedDict()).update({key: opt}) + del cfg[section][key + '_cmd'] return cfg diff --git a/airflow/task/task_runner/base_task_runner.py b/airflow/task/task_runner/base_task_runner.py index e97cfb5f304b3..5974436d5a767 100644 --- a/airflow/task/task_runner/base_task_runner.py +++ b/airflow/task/task_runner/base_task_runner.py @@ -59,12 +59,6 @@ def __init__(self, local_task_job): # Always provide a copy of the configuration file settings cfg_path = tmp_configuration_copy() - # The following command should always work since the user doing chmod is the same - # as the one who just created the file. - subprocess.call( - ['chmod', '600', cfg_path], - close_fds=True - ) # Add sudo commands to change user if we need to. Needed to handle SubDagOperator # case using a SequentialExecutor. diff --git a/airflow/utils/configuration.py b/airflow/utils/configuration.py index 18a338c23f6ff..6a621d5fa9c18 100644 --- a/airflow/utils/configuration.py +++ b/airflow/utils/configuration.py @@ -26,16 +26,18 @@ from airflow import configuration as conf -def tmp_configuration_copy(): +def tmp_configuration_copy(chmod=0o600): """ Returns a path for a temporary file including a full copy of the configuration settings. :return: a path to a temporary file """ - cfg_dict = conf.as_dict(display_sensitive=True) + cfg_dict = conf.as_dict(display_sensitive=True, raw=True) temp_fd, cfg_path = mkstemp() with os.fdopen(temp_fd, 'w') as temp_file: + if chmod is not None: + os.fchmod(temp_fd, chmod) json.dump(cfg_dict, temp_file) return cfg_path diff --git a/docs/security.rst b/docs/security.rst index 3c328d829c03d..f8b747eb0f752 100644 --- a/docs/security.rst +++ b/docs/security.rst @@ -10,6 +10,13 @@ backends or creating your own. Be sure to checkout :doc:`api` for securing the API. +.. note:: + + Airflow uses the config parser of Python. This config parser interpolates + '%'-signs. Make sure escape any ``%`` signs in your config file (but not + environment variables) as ``%%``, otherwise Airflow might leak these + passwords on a config parser exception to a log. + Web Authentication ------------------ diff --git a/run_unit_tests.sh b/run_unit_tests.sh index 27e4d08af1416..124abff44a844 100755 --- a/run_unit_tests.sh +++ b/run_unit_tests.sh @@ -24,12 +24,6 @@ set -x export AIRFLOW_HOME=${AIRFLOW_HOME:=~/airflow} export AIRFLOW__CORE__UNIT_TEST_MODE=True -# configuration test -export AIRFLOW__TESTSECTION__TESTKEY=testvalue - -# use Airflow 2.0-style imports -export AIRFLOW_USE_NEW_IMPORTS=1 - # add test/contrib to PYTHONPATH DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" export PYTHONPATH=$PYTHONPATH:${DIR}/tests/test_utils @@ -91,4 +85,3 @@ nosetests $nose_args # To run individual tests: # nosetests tests.core:CoreTest.test_scheduler_job - diff --git a/scripts/ci/5-run-tests.sh b/scripts/ci/5-run-tests.sh new file mode 100755 index 0000000000000..8a74d824efa29 --- /dev/null +++ b/scripts/ci/5-run-tests.sh @@ -0,0 +1,95 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -o verbose + +if [ -z "$HADOOP_HOME" ]; then + echo "HADOOP_HOME not set - abort" >&2 + exit 1 +fi + +echo "Using ${HADOOP_DISTRO} distribution of Hadoop from ${HADOOP_HOME}" + +pwd + +echo "Using travis airflow.cfg" +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +cp -f ${DIR}/airflow_travis.cfg ~/unittests.cfg + +ROOTDIR="$(dirname $(dirname $DIR))" +export AIRFLOW__CORE__DAGS_FOLDER="$ROOTDIR/tests/dags" + +# add test/contrib to PYTHONPATH +export PYTHONPATH=${PYTHONPATH:-$ROOTDIR/tests/test_utils} + +echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN + +# environment +export AIRFLOW_HOME=${AIRFLOW_HOME:=~} +export AIRFLOW__CORE__UNIT_TEST_MODE=True + +# any argument received is overriding the default nose execution arguments: +nose_args=$@ + +# Generate the `airflow` executable if needed +which airflow > /dev/null || python setup.py develop + +# For impersonation tests on Travis, make airflow accessible to other users via the global PATH +# (which contains /usr/local/bin) +sudo ln -sf "${VIRTUAL_ENV}/bin/airflow" /usr/local/bin/ + +if [ -z "$KUBERNETES_VERSION" ]; then + echo "Initializing the DB" + yes | airflow initdb + yes | airflow resetdb +fi + +if [ -z "$nose_args" ]; then + nose_args="--with-coverage \ + --cover-erase \ + --cover-html \ + --cover-package=airflow \ + --cover-html-dir=airflow/www/static/coverage \ + --with-ignore-docstrings \ + --rednose \ + --with-timer \ + -v \ + --logging-level=DEBUG" +fi + +if [ -z "$KUBERNETES_VERSION" ]; then + # kdc init happens in setup_kdc.sh + kinit -kt ${KRB5_KTNAME} airflow +fi + +# For impersonation tests running on SQLite on Travis, make the database world readable so other +# users can update it +AIRFLOW_DB="$HOME/airflow.db" + +if [ -f "${AIRFLOW_DB}" ]; then + chmod a+rw "${AIRFLOW_DB}" + chmod g+rwx "${AIRFLOW_HOME}" +fi + +echo "Starting the unit tests with the following nose arguments: "$nose_args +nosetests $nose_args + +# To run individual tests: +# nosetests tests.core:CoreTest.test_scheduler_job diff --git a/tests/configuration.py b/tests/configuration.py index ac6f7b7db7746..08f57fd523d3b 100644 --- a/tests/configuration.py +++ b/tests/configuration.py @@ -37,38 +37,64 @@ class ConfTest(unittest.TestCase): - def setup(self): + @classmethod + def setUpClass(cls): + os.environ['AIRFLOW__TESTSECTION__TESTKEY'] = 'testvalue' + os.environ['AIRFLOW__TESTSECTION__TESTPERCENT'] = 'with%percent' configuration.load_test_config() + conf.set('core', 'percent', 'with%%inside') + + @classmethod + def tearDownClass(cls): + del os.environ['AIRFLOW__TESTSECTION__TESTKEY'] + del os.environ['AIRFLOW__TESTSECTION__TESTPERCENT'] def test_env_var_config(self): opt = conf.get('testsection', 'testkey') self.assertEqual(opt, 'testvalue') + opt = conf.get('testsection', 'testpercent') + self.assertEqual(opt, 'with%percent') + def test_conf_as_dict(self): cfg_dict = conf.as_dict() # test that configs are picked up self.assertEqual(cfg_dict['core']['unit_test_mode'], 'True') + self.assertEqual(cfg_dict['core']['percent'], 'with%inside') + # test env vars self.assertEqual(cfg_dict['testsection']['testkey'], '< hidden >') + def test_conf_as_dict_source(self): # test display_source cfg_dict = conf.as_dict(display_source=True) self.assertEqual( - cfg_dict['core']['load_examples'][1], 'airflow config') + cfg_dict['core']['load_examples'][1], 'airflow.cfg') self.assertEqual( cfg_dict['testsection']['testkey'], ('< hidden >', 'env var')) + def test_conf_as_dict_sensitive(self): # test display_sensitive cfg_dict = conf.as_dict(display_sensitive=True) self.assertEqual(cfg_dict['testsection']['testkey'], 'testvalue') + self.assertEqual(cfg_dict['testsection']['testpercent'], 'with%percent') # test display_source and display_sensitive cfg_dict = conf.as_dict(display_sensitive=True, display_source=True) self.assertEqual( cfg_dict['testsection']['testkey'], ('testvalue', 'env var')) + def test_conf_as_dict_raw(self): + # test display_sensitive + cfg_dict = conf.as_dict(raw=True, display_sensitive=True) + self.assertEqual(cfg_dict['testsection']['testkey'], 'testvalue') + + # Values with '%' in them should be escaped + self.assertEqual(cfg_dict['testsection']['testpercent'], 'with%%percent') + self.assertEqual(cfg_dict['core']['percent'], 'with%%inside') + def test_command_config(self): TEST_CONFIG = '''[test] key1 = hello @@ -104,6 +130,10 @@ def test_command_config(self): self.assertFalse(test_conf.has_option('test', 'key5')) self.assertTrue(test_conf.has_option('another', 'key6')) + cfg_dict = test_conf.as_dict(display_sensitive=True) + self.assertEqual('cmd_result', cfg_dict['test']['key2']) + self.assertNotIn('key2_cmd', cfg_dict['test']) + def test_remove_option(self): TEST_CONFIG = '''[test] key1 = hello