Skip to content

Commit

Permalink
[AIRFLOW-4448] Don't bake ENV and _cmd into tmp config for non-sudo (a…
Browse files Browse the repository at this point in the history
…pache#4050)

If we are running tasks via sudo then AIRFLOW__ config env vars won't be
visible anymore (without them showing up in `ps`) and we likely might
not have permission to run the _cmd's specified to find the passwords.

But if we are running as the same user then there is no need to "bake"
those options in to the temporary config file -- if the operator decided
they didn't want those values appearing in a config file on disk, then
lets do our best to respect that.
  • Loading branch information
ashb authored and wayne.morris committed Jul 29, 2019
1 parent 3190e0e commit 1bd0c1d
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 29 deletions.
61 changes: 36 additions & 25 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,11 @@ def getsection(self, section):
return _section

def as_dict(
self, display_source=False, display_sensitive=False, raw=False):
self, display_source=False, display_sensitive=False, raw=False,
include_env=True, include_cmds=True):
"""
Returns the current configuration as an OrderedDict of OrderedDicts.
:param display_source: If False, the option value is returned. If True,
a tuple of (option_value, source) is returned. Source is either
'airflow.cfg', 'default', 'env var', or 'cmd'.
Expand All @@ -358,6 +360,13 @@ def as_dict(
:param raw: Should the values be output as interpolated values, or the
"raw" form that can be fed back in to ConfigParser
:type raw: bool
:param include_env: Should the value of configuration from AIRFLOW__
environment variables be included or not
:type include_env: bool
:param include_cmds: Should the result of calling any *_cmd config be
set (True, default), or should the _cmd options be left as the
command to run (False)
:type include_cmds: bool
"""
cfg = {}
configs = [
Expand All @@ -374,33 +383,35 @@ def as_dict(
sect[k] = val

# add env vars and overwrite because they have priority
for ev in [ev for ev in os.environ if ev.startswith('AIRFLOW__')]:
try:
_, section, key = ev.split('__')
opt = self._get_env_var_option(section, key)
except ValueError:
continue
if not display_sensitive and ev != 'AIRFLOW__CORE__UNIT_TEST_MODE':
opt = '< hidden >'
elif raw:
opt = opt.replace('%', '%%')
if display_source:
opt = (opt, 'env var')
cfg.setdefault(section.lower(), OrderedDict()).update(
{key.lower(): opt})

# add bash commands
for (section, key) in self.as_command_stdout:
opt = self._get_cmd_option(section, key)
if opt:
if not display_sensitive:
if include_env:
for ev in [ev for ev in os.environ if ev.startswith('AIRFLOW__')]:
try:
_, section, key = ev.split('__')
opt = self._get_env_var_option(section, key)
except ValueError:
continue
if not display_sensitive and ev != 'AIRFLOW__CORE__UNIT_TEST_MODE':
opt = '< hidden >'
if display_source:
opt = (opt, 'cmd')
elif raw:
opt = opt.replace('%', '%%')
cfg.setdefault(section, OrderedDict()).update({key: opt})
del cfg[section][key + '_cmd']
if display_source:
opt = (opt, 'env var')
cfg.setdefault(section.lower(), OrderedDict()).update(
{key.lower(): opt})

# add bash commands
if include_cmds:
for (section, key) in self.as_command_stdout:
opt = self._get_cmd_option(section, key)
if opt:
if not display_sensitive:
opt = '< hidden >'
if display_source:
opt = (opt, 'cmd')
elif raw:
opt = opt.replace('%', '%%')
cfg.setdefault(section, OrderedDict()).update({key: opt})
del cfg[section][key + '_cmd']

return cfg

Expand Down
20 changes: 17 additions & 3 deletions airflow/task/task_runner/base_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,18 @@ def __init__(self, local_task_job):
except conf.AirflowConfigException:
self.run_as_user = None

# Always provide a copy of the configuration file settings
cfg_path = tmp_configuration_copy()

# Add sudo commands to change user if we need to. Needed to handle SubDagOperator
# case using a SequentialExecutor.
self.log.debug("Planning to run as the %s user", self.run_as_user)
if self.run_as_user and (self.run_as_user != getpass.getuser()):
# We want to include any environment variables now, as we won't
# want to have to specify them in the sudo call - they would show
# up in `ps` that way! And run commands now, as the other user
# might not be able to run the cmds to get credentials
cfg_path = tmp_configuration_copy(chmod=0o600,
include_env=True,
include_cmds=True)

# Give ownership of file to user; only they can read and write
subprocess.call(
['sudo', 'chown', self.run_as_user, cfg_path],
Expand All @@ -76,6 +81,15 @@ def __init__(self, local_task_job):
if pythonpath_value:
popen_prepend.append('{}={}'.format(PYTHONPATH_VAR, pythonpath_value))

else:
# Always provide a copy of the configuration file settings. Since
# we are running as the same user, and can pass through environment
# variables then we don't need to include those in the config copy
# - the runner can read/execute those values as it needs
cfg_path = tmp_configuration_copy(chmod=0o600,
include_env=False,
include_cmds=False)

self._cfg_path = cfg_path
self._command = popen_prepend + self._task_instance.command_as_list(
raw=True,
Expand Down
3 changes: 2 additions & 1 deletion airflow/utils/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from airflow import configuration as conf


def tmp_configuration_copy(chmod=0o600):
def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True):
"""
Returns a path for a temporary file including a full copy of the configuration
settings.
Expand All @@ -34,6 +34,7 @@ def tmp_configuration_copy(chmod=0o600):
temp_fd, cfg_path = mkstemp()

with os.fdopen(temp_fd, 'w') as temp_file:
# Set the permissions before we write anything to it.
if chmod is not None:
os.fchmod(temp_fd, chmod)
json.dump(cfg_dict, temp_file)
Expand Down
14 changes: 14 additions & 0 deletions tests/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ def test_conf_as_dict_raw(self):
self.assertEqual(cfg_dict['testsection']['testpercent'], 'with%%percent')
self.assertEqual(cfg_dict['core']['percent'], 'with%%inside')

def test_conf_as_dict_exclude_env(self):
# test display_sensitive
cfg_dict = conf.as_dict(include_env=False, display_sensitive=True)

# Since testsection is only created from env vars, it shouldn't be
# present at all if we don't ask for env vars to be included.
self.assertNotIn('testsection', cfg_dict)

def test_command_precedence(self):
TEST_CONFIG = '''[test]
key1 = hello
Expand Down Expand Up @@ -179,6 +187,12 @@ def test_command_precedence(self):
self.assertEqual('cmd_result', cfg_dict['test']['key2'])
self.assertNotIn('key2_cmd', cfg_dict['test'])

# If we exclude _cmds then we should still see the commands to run, not
# their values
cfg_dict = test_conf.as_dict(include_cmds=False, display_sensitive=True)
self.assertNotIn('key4', cfg_dict['test'])
self.assertEqual('printf key4_result', cfg_dict['test']['key4_cmd'])

def test_getboolean(self):
"""Test AirflowConfigParser.getboolean"""
TEST_CONFIG = """
Expand Down

0 comments on commit 1bd0c1d

Please sign in to comment.