Skip to content

Commit

Permalink
Handle percents signs in configs for airflow run (apache#4029)
Browse files Browse the repository at this point in the history
* [AIRFLOW-3178] Don't mask defaults() function from ConfigParser

ConfigParser (the base class for AirflowConfigParser) expects defaults()
to be a function - so when we re-assign it to be a property some of the
methods from ConfigParser no longer work.

* [AIRFLOW-3178] Correctly escape percent signs when creating temp config

Otherwise we have a problem when we come to use those values.

* [AIRFLOW-3178] Use os.chmod instead of shelling out

There's no need to run another process for a built in Python function.

This also removes a possible race condition that would make temporary
config file be readable by more than the airflow or run-as user
The exact behaviour would depend on the umask we run under, and the
primary group of our user, likely this would mean the file was readably
by members of the airflow group (which in most cases would be just the
airflow user). To remove any such possibility we chmod the file
before we write to it
  • Loading branch information
ashb authored and Chris Fei committed Jan 23, 2019
1 parent 97a13a7 commit 78959ca
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 50 deletions.
74 changes: 41 additions & 33 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ class AirflowConfigParser(ConfigParser):
def __init__(self, default_config=None, *args, **kwargs):
super(AirflowConfigParser, self).__init__(*args, **kwargs)

self.defaults = ConfigParser(*args, **kwargs)
self.airflow_defaults = ConfigParser(*args, **kwargs)
if default_config is not None:
self.defaults.read_string(default_config)
self.airflow_defaults.read_string(default_config)

self.is_validated = False

Expand Down Expand Up @@ -249,9 +249,9 @@ def get(self, section, key, **kwargs):
return option

# ...then the default config
if self.defaults.has_option(section, key):
if self.airflow_defaults.has_option(section, key):
return expand_env_var(
self.defaults.get(section, key, **kwargs))
self.airflow_defaults.get(section, key, **kwargs))

else:
log.warning(
Expand Down Expand Up @@ -307,8 +307,8 @@ def remove_option(self, section, option, remove_default=True):
if super(AirflowConfigParser, self).has_option(section, option):
super(AirflowConfigParser, self).remove_option(section, option)

if self.defaults.has_option(section, option) and remove_default:
self.defaults.remove_option(section, option)
if self.airflow_defaults.has_option(section, option) and remove_default:
self.airflow_defaults.remove_option(section, option)

def getsection(self, section):
"""
Expand All @@ -317,10 +317,11 @@ def getsection(self, section):
:param section: section from the config
:return: dict
"""
if section not in self._sections and section not in self.defaults._sections:
if (section not in self._sections and
section not in self.airflow_defaults._sections):
return None

_section = copy.deepcopy(self.defaults._sections[section])
_section = copy.deepcopy(self.airflow_defaults._sections[section])

if section in self._sections:
_section.update(copy.deepcopy(self._sections[section]))
Expand All @@ -339,47 +340,51 @@ def getsection(self, section):
_section[key] = val
return _section

def as_dict(self, display_source=False, display_sensitive=False):
def as_dict(
self, display_source=False, display_sensitive=False, raw=False):
"""
Returns the current configuration as an OrderedDict of OrderedDicts.
:param display_source: If False, the option value is returned. If True,
a tuple of (option_value, source) is returned. Source is either
'airflow.cfg' or 'default'.
'airflow.cfg', 'default', 'env var', or 'cmd'.
:type display_source: bool
:param display_sensitive: If True, the values of options set by env
vars and bash commands will be displayed. If False, those options
are shown as '< hidden >'
:type display_sensitive: bool
:param raw: Should the values be output as interpolated values, or the
"raw" form that can be fed back in to ConfigParser
:type raw: bool
"""
cfg = copy.deepcopy(self.defaults._sections)
cfg.update(copy.deepcopy(self._sections))

# remove __name__ (affects Python 2 only)
for options in cfg.values():
options.pop('__name__', None)

# add source
if display_source:
for section in cfg:
for k, v in cfg[section].items():
cfg[section][k] = (v, 'airflow config')
cfg = {}
configs = [
('default', self.airflow_defaults),
('airflow.cfg', self),
]

for (source_name, config) in configs:
for section in config.sections():
sect = cfg.setdefault(section, OrderedDict())
for (k, val) in config.items(section=section, raw=raw):
if display_source:
val = (val, source_name)
sect[k] = val

# add env vars and overwrite because they have priority
for ev in [ev for ev in os.environ if ev.startswith('AIRFLOW__')]:
try:
_, section, key = ev.split('__')
opt = self._get_env_var_option(section, key)
except ValueError:
opt = None
if opt:
if (
not display_sensitive and
ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'):
opt = '< hidden >'
if display_source:
opt = (opt, 'env var')
cfg.setdefault(section.lower(), OrderedDict()).update(
{key.lower(): opt})
continue
if (not display_sensitive and ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'):
opt = '< hidden >'
elif raw:
opt = opt.replace('%', '%%')
if display_source:
opt = (opt, 'env var')
cfg.setdefault(section.lower(), OrderedDict()).update(
{key.lower(): opt})

# add bash commands
for (section, key) in self.as_command_stdout:
Expand All @@ -388,8 +393,11 @@ def as_dict(self, display_source=False, display_sensitive=False):
if not display_sensitive:
opt = '< hidden >'
if display_source:
opt = (opt, 'bash cmd')
opt = (opt, 'cmd')
elif raw:
opt = opt.replace('%', '%%')
cfg.setdefault(section, OrderedDict()).update({key: opt})
del cfg[section][key + '_cmd']

return cfg

Expand Down
6 changes: 0 additions & 6 deletions airflow/task/task_runner/base_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,6 @@ def __init__(self, local_task_job):

# Always provide a copy of the configuration file settings
cfg_path = tmp_configuration_copy()
# The following command should always work since the user doing chmod is the same
# as the one who just created the file.
subprocess.call(
['chmod', '600', cfg_path],
close_fds=True
)

# Add sudo commands to change user if we need to. Needed to handle SubDagOperator
# case using a SequentialExecutor.
Expand Down
6 changes: 4 additions & 2 deletions airflow/utils/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@
from airflow import configuration as conf


def tmp_configuration_copy():
def tmp_configuration_copy(chmod=0o600):
"""
Returns a path for a temporary file including a full copy of the configuration
settings.
:return: a path to a temporary file
"""
cfg_dict = conf.as_dict(display_sensitive=True)
cfg_dict = conf.as_dict(display_sensitive=True, raw=True)
temp_fd, cfg_path = mkstemp()

with os.fdopen(temp_fd, 'w') as temp_file:
if chmod is not None:
os.fchmod(temp_fd, chmod)
json.dump(cfg_dict, temp_file)

return cfg_path
7 changes: 7 additions & 0 deletions docs/security.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ backends or creating your own.

Be sure to checkout :doc:`api` for securing the API.

.. note::

Airflow uses the config parser of Python. This config parser interpolates
'%'-signs. Make sure escape any ``%`` signs in your config file (but not
environment variables) as ``%%``, otherwise Airflow might leak these
passwords on a config parser exception to a log.

Web Authentication
------------------

Expand Down
7 changes: 0 additions & 7 deletions run_unit_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@ set -x
export AIRFLOW_HOME=${AIRFLOW_HOME:=~/airflow}
export AIRFLOW__CORE__UNIT_TEST_MODE=True

# configuration test
export AIRFLOW__TESTSECTION__TESTKEY=testvalue

# use Airflow 2.0-style imports
export AIRFLOW_USE_NEW_IMPORTS=1

# add test/contrib to PYTHONPATH
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
export PYTHONPATH=$PYTHONPATH:${DIR}/tests/test_utils
Expand Down Expand Up @@ -91,4 +85,3 @@ nosetests $nose_args

# To run individual tests:
# nosetests tests.core:CoreTest.test_scheduler_job

95 changes: 95 additions & 0 deletions scripts/ci/5-run-tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

set -o verbose

if [ -z "$HADOOP_HOME" ]; then
echo "HADOOP_HOME not set - abort" >&2
exit 1
fi

echo "Using ${HADOOP_DISTRO} distribution of Hadoop from ${HADOOP_HOME}"

pwd

echo "Using travis airflow.cfg"
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
cp -f ${DIR}/airflow_travis.cfg ~/unittests.cfg

ROOTDIR="$(dirname $(dirname $DIR))"
export AIRFLOW__CORE__DAGS_FOLDER="$ROOTDIR/tests/dags"

# add test/contrib to PYTHONPATH
export PYTHONPATH=${PYTHONPATH:-$ROOTDIR/tests/test_utils}

echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN

# environment
export AIRFLOW_HOME=${AIRFLOW_HOME:=~}
export AIRFLOW__CORE__UNIT_TEST_MODE=True

# any argument received is overriding the default nose execution arguments:
nose_args=$@

# Generate the `airflow` executable if needed
which airflow > /dev/null || python setup.py develop

# For impersonation tests on Travis, make airflow accessible to other users via the global PATH
# (which contains /usr/local/bin)
sudo ln -sf "${VIRTUAL_ENV}/bin/airflow" /usr/local/bin/

if [ -z "$KUBERNETES_VERSION" ]; then
echo "Initializing the DB"
yes | airflow initdb
yes | airflow resetdb
fi

if [ -z "$nose_args" ]; then
nose_args="--with-coverage \
--cover-erase \
--cover-html \
--cover-package=airflow \
--cover-html-dir=airflow/www/static/coverage \
--with-ignore-docstrings \
--rednose \
--with-timer \
-v \
--logging-level=DEBUG"
fi

if [ -z "$KUBERNETES_VERSION" ]; then
# kdc init happens in setup_kdc.sh
kinit -kt ${KRB5_KTNAME} airflow
fi

# For impersonation tests running on SQLite on Travis, make the database world readable so other
# users can update it
AIRFLOW_DB="$HOME/airflow.db"

if [ -f "${AIRFLOW_DB}" ]; then
chmod a+rw "${AIRFLOW_DB}"
chmod g+rwx "${AIRFLOW_HOME}"
fi

echo "Starting the unit tests with the following nose arguments: "$nose_args
nosetests $nose_args

# To run individual tests:
# nosetests tests.core:CoreTest.test_scheduler_job
34 changes: 32 additions & 2 deletions tests/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,38 +37,64 @@

class ConfTest(unittest.TestCase):

def setup(self):
@classmethod
def setUpClass(cls):
os.environ['AIRFLOW__TESTSECTION__TESTKEY'] = 'testvalue'
os.environ['AIRFLOW__TESTSECTION__TESTPERCENT'] = 'with%percent'
configuration.load_test_config()
conf.set('core', 'percent', 'with%%inside')

@classmethod
def tearDownClass(cls):
del os.environ['AIRFLOW__TESTSECTION__TESTKEY']
del os.environ['AIRFLOW__TESTSECTION__TESTPERCENT']

def test_env_var_config(self):
opt = conf.get('testsection', 'testkey')
self.assertEqual(opt, 'testvalue')

opt = conf.get('testsection', 'testpercent')
self.assertEqual(opt, 'with%percent')

def test_conf_as_dict(self):
cfg_dict = conf.as_dict()

# test that configs are picked up
self.assertEqual(cfg_dict['core']['unit_test_mode'], 'True')

self.assertEqual(cfg_dict['core']['percent'], 'with%inside')

# test env vars
self.assertEqual(cfg_dict['testsection']['testkey'], '< hidden >')

def test_conf_as_dict_source(self):
# test display_source
cfg_dict = conf.as_dict(display_source=True)
self.assertEqual(
cfg_dict['core']['load_examples'][1], 'airflow config')
cfg_dict['core']['load_examples'][1], 'airflow.cfg')
self.assertEqual(
cfg_dict['testsection']['testkey'], ('< hidden >', 'env var'))

def test_conf_as_dict_sensitive(self):
# test display_sensitive
cfg_dict = conf.as_dict(display_sensitive=True)
self.assertEqual(cfg_dict['testsection']['testkey'], 'testvalue')
self.assertEqual(cfg_dict['testsection']['testpercent'], 'with%percent')

# test display_source and display_sensitive
cfg_dict = conf.as_dict(display_sensitive=True, display_source=True)
self.assertEqual(
cfg_dict['testsection']['testkey'], ('testvalue', 'env var'))

def test_conf_as_dict_raw(self):
# test display_sensitive
cfg_dict = conf.as_dict(raw=True, display_sensitive=True)
self.assertEqual(cfg_dict['testsection']['testkey'], 'testvalue')

# Values with '%' in them should be escaped
self.assertEqual(cfg_dict['testsection']['testpercent'], 'with%%percent')
self.assertEqual(cfg_dict['core']['percent'], 'with%%inside')

def test_command_config(self):
TEST_CONFIG = '''[test]
key1 = hello
Expand Down Expand Up @@ -104,6 +130,10 @@ def test_command_config(self):
self.assertFalse(test_conf.has_option('test', 'key5'))
self.assertTrue(test_conf.has_option('another', 'key6'))

cfg_dict = test_conf.as_dict(display_sensitive=True)
self.assertEqual('cmd_result', cfg_dict['test']['key2'])
self.assertNotIn('key2_cmd', cfg_dict['test'])

def test_remove_option(self):
TEST_CONFIG = '''[test]
key1 = hello
Expand Down

0 comments on commit 78959ca

Please sign in to comment.