diff --git a/airflow/configuration.py b/airflow/configuration.py index a9a68124cab58..b92e93f3feaa5 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -157,9 +157,9 @@ class AirflowConfigParser(ConfigParser): def __init__(self, default_config=None, *args, **kwargs): super(AirflowConfigParser, self).__init__(*args, **kwargs) - self.defaults = ConfigParser(*args, **kwargs) + self.airflow_defaults = ConfigParser(*args, **kwargs) if default_config is not None: - self.defaults.read_string(default_config) + self.airflow_defaults.read_string(default_config) self.is_validated = False @@ -249,9 +249,9 @@ def get(self, section, key, **kwargs): return option # ...then the default config - if self.defaults.has_option(section, key): + if self.airflow_defaults.has_option(section, key): return expand_env_var( - self.defaults.get(section, key, **kwargs)) + self.airflow_defaults.get(section, key, **kwargs)) else: log.warning( @@ -307,8 +307,8 @@ def remove_option(self, section, option, remove_default=True): if super(AirflowConfigParser, self).has_option(section, option): super(AirflowConfigParser, self).remove_option(section, option) - if self.defaults.has_option(section, option) and remove_default: - self.defaults.remove_option(section, option) + if self.airflow_defaults.has_option(section, option) and remove_default: + self.airflow_defaults.remove_option(section, option) def getsection(self, section): """ @@ -317,10 +317,11 @@ def getsection(self, section): :param section: section from the config :return: dict """ - if section not in self._sections and section not in self.defaults._sections: + if (section not in self._sections and + section not in self.airflow_defaults._sections): return None - _section = copy.deepcopy(self.defaults._sections[section]) + _section = copy.deepcopy(self.airflow_defaults._sections[section]) if section in self._sections: _section.update(copy.deepcopy(self._sections[section])) @@ -339,30 +340,35 @@ def getsection(self, section): _section[key] = val return _section - def as_dict(self, display_source=False, display_sensitive=False): + def as_dict( + self, display_source=False, display_sensitive=False, raw=False): """ Returns the current configuration as an OrderedDict of OrderedDicts. :param display_source: If False, the option value is returned. If True, a tuple of (option_value, source) is returned. Source is either - 'airflow.cfg' or 'default'. + 'airflow.cfg', 'default', 'env var', or 'cmd'. :type display_source: bool :param display_sensitive: If True, the values of options set by env vars and bash commands will be displayed. If False, those options are shown as '< hidden >' :type display_sensitive: bool + :param raw: Should the values be output as interpolated values, or the + "raw" form that can be fed back in to ConfigParser + :type raw: bool """ - cfg = copy.deepcopy(self.defaults._sections) - cfg.update(copy.deepcopy(self._sections)) - - # remove __name__ (affects Python 2 only) - for options in cfg.values(): - options.pop('__name__', None) - - # add source - if display_source: - for section in cfg: - for k, v in cfg[section].items(): - cfg[section][k] = (v, 'airflow config') + cfg = {} + configs = [ + ('default', self.airflow_defaults), + ('airflow.cfg', self), + ] + + for (source_name, config) in configs: + for section in config.sections(): + sect = cfg.setdefault(section, OrderedDict()) + for (k, val) in config.items(section=section, raw=raw): + if display_source: + val = (val, source_name) + sect[k] = val # add env vars and overwrite because they have priority for ev in [ev for ev in os.environ if ev.startswith('AIRFLOW__')]: @@ -370,16 +376,15 @@ def as_dict(self, display_source=False, display_sensitive=False): _, section, key = ev.split('__') opt = self._get_env_var_option(section, key) except ValueError: - opt = None - if opt: - if ( - not display_sensitive and - ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'): - opt = '< hidden >' - if display_source: - opt = (opt, 'env var') - cfg.setdefault(section.lower(), OrderedDict()).update( - {key.lower(): opt}) + continue + if (not display_sensitive and ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'): + opt = '< hidden >' + elif raw: + opt = opt.replace('%', '%%') + if display_source: + opt = (opt, 'env var') + cfg.setdefault(section.lower(), OrderedDict()).update( + {key.lower(): opt}) # add bash commands for (section, key) in self.as_command_stdout: @@ -388,8 +393,11 @@ def as_dict(self, display_source=False, display_sensitive=False): if not display_sensitive: opt = '< hidden >' if display_source: - opt = (opt, 'bash cmd') + opt = (opt, 'cmd') + elif raw: + opt = opt.replace('%', '%%') cfg.setdefault(section, OrderedDict()).update({key: opt}) + del cfg[section][key + '_cmd'] return cfg diff --git a/airflow/task/task_runner/base_task_runner.py b/airflow/task/task_runner/base_task_runner.py index e97cfb5f304b3..5974436d5a767 100644 --- a/airflow/task/task_runner/base_task_runner.py +++ b/airflow/task/task_runner/base_task_runner.py @@ -59,12 +59,6 @@ def __init__(self, local_task_job): # Always provide a copy of the configuration file settings cfg_path = tmp_configuration_copy() - # The following command should always work since the user doing chmod is the same - # as the one who just created the file. - subprocess.call( - ['chmod', '600', cfg_path], - close_fds=True - ) # Add sudo commands to change user if we need to. Needed to handle SubDagOperator # case using a SequentialExecutor. diff --git a/airflow/utils/configuration.py b/airflow/utils/configuration.py index 18a338c23f6ff..6a621d5fa9c18 100644 --- a/airflow/utils/configuration.py +++ b/airflow/utils/configuration.py @@ -26,16 +26,18 @@ from airflow import configuration as conf -def tmp_configuration_copy(): +def tmp_configuration_copy(chmod=0o600): """ Returns a path for a temporary file including a full copy of the configuration settings. :return: a path to a temporary file """ - cfg_dict = conf.as_dict(display_sensitive=True) + cfg_dict = conf.as_dict(display_sensitive=True, raw=True) temp_fd, cfg_path = mkstemp() with os.fdopen(temp_fd, 'w') as temp_file: + if chmod is not None: + os.fchmod(temp_fd, chmod) json.dump(cfg_dict, temp_file) return cfg_path diff --git a/docs/security.rst b/docs/security.rst index 3c328d829c03d..f8b747eb0f752 100644 --- a/docs/security.rst +++ b/docs/security.rst @@ -10,6 +10,13 @@ backends or creating your own. Be sure to checkout :doc:`api` for securing the API. +.. note:: + + Airflow uses the config parser of Python. This config parser interpolates + '%'-signs. Make sure escape any ``%`` signs in your config file (but not + environment variables) as ``%%``, otherwise Airflow might leak these + passwords on a config parser exception to a log. + Web Authentication ------------------ diff --git a/run_unit_tests.sh b/run_unit_tests.sh index 27e4d08af1416..124abff44a844 100755 --- a/run_unit_tests.sh +++ b/run_unit_tests.sh @@ -24,12 +24,6 @@ set -x export AIRFLOW_HOME=${AIRFLOW_HOME:=~/airflow} export AIRFLOW__CORE__UNIT_TEST_MODE=True -# configuration test -export AIRFLOW__TESTSECTION__TESTKEY=testvalue - -# use Airflow 2.0-style imports -export AIRFLOW_USE_NEW_IMPORTS=1 - # add test/contrib to PYTHONPATH DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" export PYTHONPATH=$PYTHONPATH:${DIR}/tests/test_utils @@ -91,4 +85,3 @@ nosetests $nose_args # To run individual tests: # nosetests tests.core:CoreTest.test_scheduler_job - diff --git a/scripts/ci/5-run-tests.sh b/scripts/ci/5-run-tests.sh new file mode 100755 index 0000000000000..8a74d824efa29 --- /dev/null +++ b/scripts/ci/5-run-tests.sh @@ -0,0 +1,95 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -o verbose + +if [ -z "$HADOOP_HOME" ]; then + echo "HADOOP_HOME not set - abort" >&2 + exit 1 +fi + +echo "Using ${HADOOP_DISTRO} distribution of Hadoop from ${HADOOP_HOME}" + +pwd + +echo "Using travis airflow.cfg" +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +cp -f ${DIR}/airflow_travis.cfg ~/unittests.cfg + +ROOTDIR="$(dirname $(dirname $DIR))" +export AIRFLOW__CORE__DAGS_FOLDER="$ROOTDIR/tests/dags" + +# add test/contrib to PYTHONPATH +export PYTHONPATH=${PYTHONPATH:-$ROOTDIR/tests/test_utils} + +echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN + +# environment +export AIRFLOW_HOME=${AIRFLOW_HOME:=~} +export AIRFLOW__CORE__UNIT_TEST_MODE=True + +# any argument received is overriding the default nose execution arguments: +nose_args=$@ + +# Generate the `airflow` executable if needed +which airflow > /dev/null || python setup.py develop + +# For impersonation tests on Travis, make airflow accessible to other users via the global PATH +# (which contains /usr/local/bin) +sudo ln -sf "${VIRTUAL_ENV}/bin/airflow" /usr/local/bin/ + +if [ -z "$KUBERNETES_VERSION" ]; then + echo "Initializing the DB" + yes | airflow initdb + yes | airflow resetdb +fi + +if [ -z "$nose_args" ]; then + nose_args="--with-coverage \ + --cover-erase \ + --cover-html \ + --cover-package=airflow \ + --cover-html-dir=airflow/www/static/coverage \ + --with-ignore-docstrings \ + --rednose \ + --with-timer \ + -v \ + --logging-level=DEBUG" +fi + +if [ -z "$KUBERNETES_VERSION" ]; then + # kdc init happens in setup_kdc.sh + kinit -kt ${KRB5_KTNAME} airflow +fi + +# For impersonation tests running on SQLite on Travis, make the database world readable so other +# users can update it +AIRFLOW_DB="$HOME/airflow.db" + +if [ -f "${AIRFLOW_DB}" ]; then + chmod a+rw "${AIRFLOW_DB}" + chmod g+rwx "${AIRFLOW_HOME}" +fi + +echo "Starting the unit tests with the following nose arguments: "$nose_args +nosetests $nose_args + +# To run individual tests: +# nosetests tests.core:CoreTest.test_scheduler_job diff --git a/tests/configuration.py b/tests/configuration.py index ac6f7b7db7746..08f57fd523d3b 100644 --- a/tests/configuration.py +++ b/tests/configuration.py @@ -37,38 +37,64 @@ class ConfTest(unittest.TestCase): - def setup(self): + @classmethod + def setUpClass(cls): + os.environ['AIRFLOW__TESTSECTION__TESTKEY'] = 'testvalue' + os.environ['AIRFLOW__TESTSECTION__TESTPERCENT'] = 'with%percent' configuration.load_test_config() + conf.set('core', 'percent', 'with%%inside') + + @classmethod + def tearDownClass(cls): + del os.environ['AIRFLOW__TESTSECTION__TESTKEY'] + del os.environ['AIRFLOW__TESTSECTION__TESTPERCENT'] def test_env_var_config(self): opt = conf.get('testsection', 'testkey') self.assertEqual(opt, 'testvalue') + opt = conf.get('testsection', 'testpercent') + self.assertEqual(opt, 'with%percent') + def test_conf_as_dict(self): cfg_dict = conf.as_dict() # test that configs are picked up self.assertEqual(cfg_dict['core']['unit_test_mode'], 'True') + self.assertEqual(cfg_dict['core']['percent'], 'with%inside') + # test env vars self.assertEqual(cfg_dict['testsection']['testkey'], '< hidden >') + def test_conf_as_dict_source(self): # test display_source cfg_dict = conf.as_dict(display_source=True) self.assertEqual( - cfg_dict['core']['load_examples'][1], 'airflow config') + cfg_dict['core']['load_examples'][1], 'airflow.cfg') self.assertEqual( cfg_dict['testsection']['testkey'], ('< hidden >', 'env var')) + def test_conf_as_dict_sensitive(self): # test display_sensitive cfg_dict = conf.as_dict(display_sensitive=True) self.assertEqual(cfg_dict['testsection']['testkey'], 'testvalue') + self.assertEqual(cfg_dict['testsection']['testpercent'], 'with%percent') # test display_source and display_sensitive cfg_dict = conf.as_dict(display_sensitive=True, display_source=True) self.assertEqual( cfg_dict['testsection']['testkey'], ('testvalue', 'env var')) + def test_conf_as_dict_raw(self): + # test display_sensitive + cfg_dict = conf.as_dict(raw=True, display_sensitive=True) + self.assertEqual(cfg_dict['testsection']['testkey'], 'testvalue') + + # Values with '%' in them should be escaped + self.assertEqual(cfg_dict['testsection']['testpercent'], 'with%%percent') + self.assertEqual(cfg_dict['core']['percent'], 'with%%inside') + def test_command_config(self): TEST_CONFIG = '''[test] key1 = hello @@ -104,6 +130,10 @@ def test_command_config(self): self.assertFalse(test_conf.has_option('test', 'key5')) self.assertTrue(test_conf.has_option('another', 'key6')) + cfg_dict = test_conf.as_dict(display_sensitive=True) + self.assertEqual('cmd_result', cfg_dict['test']['key2']) + self.assertNotIn('key2_cmd', cfg_dict['test']) + def test_remove_option(self): TEST_CONFIG = '''[test] key1 = hello