Skip to content

Commit

Permalink
[AIRFLOW-3178] Don't bake ENV and _cmd into tmp config for non-sudo
Browse files Browse the repository at this point in the history
If we are running tasks via sudo then AIRFLOW__ config env vars won't be
visible anymore (without them showing up in `ps`) and we likely might
not have permission to run the _cmd's specified to find the passwords.

But if we are running as the same user then there is no need to "bake"
those options in to the temporary config file -- if the operator decided
they didn't want those values appearing in a config file on disk, then
lets do our best to respect that.

This also removes a possible race condition that would make temporary
config file be readable by more than the airflow or run-as user
The exact behaviour would depend on the umask we run under, and the
primary group of our user, likely this would mean the file was readably
by members of the airflow group (which in most cases would be just the
airflow user). To remove any such possibility set the umask before we
write anything to the file.
  • Loading branch information
ashb committed Oct 24, 2018
1 parent f43600d commit 0df13ac
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 30 deletions.
61 changes: 36 additions & 25 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,11 @@ def getsection(self, section):
return _section

def as_dict(
self, display_source=False, display_sensitive=False, raw=False):
self, display_source=False, display_sensitive=False, raw=False,
include_env=True, include_cmds=True):
"""
Returns the current configuration as an OrderedDict of OrderedDicts.
:param display_source: If False, the option value is returned. If True,
a tuple of (option_value, source) is returned. Source is either
'airflow.cfg', 'default', 'env var', or 'cmd'.
Expand All @@ -356,6 +358,13 @@ def as_dict(
:param raw: Should the values be output as interpolated values, or the
"raw" form that can be fed back in to ConfigParser
:type raw: bool
:param include_env: Should the value of configuration from AIRFLOW__
environment variables be included or not
:type include_env: bool
:param include_cmds: Should the result of calling any *_cmd config be
set (True, default), or should the _cmd options be left as the
command to run (False)
:type include_cmds: bool
"""
cfg = {}
configs = [
Expand All @@ -372,33 +381,35 @@ def as_dict(
sect[k] = val

# add env vars and overwrite because they have priority
for ev in [ev for ev in os.environ if ev.startswith('AIRFLOW__')]:
try:
_, section, key = ev.split('__')
opt = self._get_env_var_option(section, key)
except ValueError:
continue
if (not display_sensitive and ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'):
opt = '< hidden >'
elif raw:
opt = opt.replace('%', '%%')
if display_source:
opt = (opt, 'env var')
cfg.setdefault(section.lower(), OrderedDict()).update(
{key.lower(): opt})

# add bash commands
for (section, key) in self.as_command_stdout:
opt = self._get_cmd_option(section, key)
if opt:
if not display_sensitive:
if include_env:
for ev in [ev for ev in os.environ if ev.startswith('AIRFLOW__')]:
try:
_, section, key = ev.split('__')
opt = self._get_env_var_option(section, key)
except ValueError:
continue
if (not display_sensitive and ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'):
opt = '< hidden >'
if display_source:
opt = (opt, 'cmd')
elif raw:
opt = opt.replace('%', '%%')
cfg.setdefault(section, OrderedDict()).update({key: opt})
del cfg[section][key + '_cmd']
if display_source:
opt = (opt, 'env var')
cfg.setdefault(section.lower(), OrderedDict()).update(
{key.lower(): opt})

# add bash commands
if include_cmds:
for (section, key) in self.as_command_stdout:
opt = self._get_cmd_option(section, key)
if opt:
if not display_sensitive:
opt = '< hidden >'
if display_source:
opt = (opt, 'cmd')
elif raw:
opt = opt.replace('%', '%%')
cfg.setdefault(section, OrderedDict()).update({key: opt})
del cfg[section][key + '_cmd']

return cfg

Expand Down
25 changes: 21 additions & 4 deletions airflow/task/task_runner/base_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,18 @@ def __init__(self, local_task_job):
except conf.AirflowConfigException:
self.run_as_user = None

# Always provide a copy of the configuration file settings
cfg_path = tmp_configuration_copy()

# Add sudo commands to change user if we need to. Needed to handle SubDagOperator
# case using a SequentialExecutor.
self.log.debug("Planning to run as the %s user", self.run_as_user)
if self.run_as_user and (self.run_as_user != getpass.getuser()):
# We want to include any environment variables now, as we won't
# want to have to specify them in the sudo call - they would show
# up in `ps` that way! And run commands now, as the other user
# might not be able to run the cmds to get credentials
cfg_path = tmp_configuration_copy(chmod=0o600,
include_env=True,
include_cmds=True)

# Give ownership of file to user; only they can read and write
subprocess.call(
['sudo', 'chown', self.run_as_user, cfg_path],
Expand All @@ -78,6 +83,15 @@ def __init__(self, local_task_job):
if pythonpath_value:
popen_prepend.append('{}={}'.format(PYTHONPATH_VAR, pythonpath_value))

else:
# Always provide a copy of the configuration file settings. Since
# we are running as the same user, and can pass through environment
# variables then we don't need to include those in the config copy
# - the runner can read/execute those values as it needs
cfg_path = tmp_configuration_copy(chmod=0o600,
include_env=False,
include_cmds=False)

self._cfg_path = cfg_path
self._command = popen_prepend + self._task_instance.command_as_list(
raw=True,
Expand Down Expand Up @@ -162,4 +176,7 @@ def on_finish(self):
A callback that should be called when this is done running.
"""
if self._cfg_path and os.path.isfile(self._cfg_path):
subprocess.call(['sudo', 'rm', self._cfg_path], close_fds=True)
if self.run_as_user:
subprocess.call(['sudo', 'rm', self._cfg_path], close_fds=True)
else:
os.unlink(self._cfg_path)
3 changes: 2 additions & 1 deletion airflow/utils/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from airflow import configuration as conf


def tmp_configuration_copy(chmod=0o600):
def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True):
"""
Returns a path for a temporary file including a full copy of the configuration
settings.
Expand All @@ -36,6 +36,7 @@ def tmp_configuration_copy(chmod=0o600):
temp_fd, cfg_path = mkstemp()

with os.fdopen(temp_fd, 'w') as temp_file:
# Set the permissions before we write anything to it.
if chmod is not None:
os.fchmod(temp_fd, chmod)
json.dump(cfg_dict, temp_file)
Expand Down
14 changes: 14 additions & 0 deletions tests/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ def test_conf_as_dict_raw(self):
self.assertEqual(cfg_dict['testsection']['testpercent'], 'with%%percent')
self.assertEqual(cfg_dict['core']['percent'], 'with%%inside')

def test_conf_as_dict_exclude_env(self):
# test display_sensitive
cfg_dict = conf.as_dict(include_env=False, display_sensitive=True)

# Sinec testsection is only created from env vars, it shouldn't be
# present at all if we don't ask for env vars to be included.
self.assertNotIn('testsection', cfg_dict)

def test_command_config(self):
TEST_CONFIG = '''[test]
key1 = hello
Expand Down Expand Up @@ -134,6 +142,12 @@ def test_command_config(self):
self.assertEqual('cmd_result', cfg_dict['test']['key2'])
self.assertNotIn('key2_cmd', cfg_dict['test'])

# If we exclude _cmds then we should still see the commands to run, not
# their values
cfg_dict = test_conf.as_dict(include_cmds=False, display_sensitive=True)
self.assertNotIn('key4', cfg_dict['test'])
self.assertEqual('printf key4_result', cfg_dict['test']['key4_cmd'])

def test_remove_option(self):
TEST_CONFIG = '''[test]
key1 = hello
Expand Down

0 comments on commit 0df13ac

Please sign in to comment.