From 1bd0c1d8a0c95f5afb5b3913c7c0e0606b3547d8 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Tue, 7 May 2019 21:17:20 +0100 Subject: [PATCH] [AIRFLOW-4448] Don't bake ENV and _cmd into tmp config for non-sudo (#4050) If we are running tasks via sudo then AIRFLOW__ config env vars won't be visible anymore (without them showing up in `ps`) and we likely might not have permission to run the _cmd's specified to find the passwords. But if we are running as the same user then there is no need to "bake" those options in to the temporary config file -- if the operator decided they didn't want those values appearing in a config file on disk, then lets do our best to respect that. --- airflow/configuration.py | 61 ++++++++++++-------- airflow/task/task_runner/base_task_runner.py | 20 ++++++- airflow/utils/configuration.py | 3 +- tests/test_configuration.py | 14 +++++ 4 files changed, 69 insertions(+), 29 deletions(-) diff --git a/airflow/configuration.py b/airflow/configuration.py index 893c9785f50d1..84a0a9db62161 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -344,9 +344,11 @@ def getsection(self, section): return _section def as_dict( - self, display_source=False, display_sensitive=False, raw=False): + self, display_source=False, display_sensitive=False, raw=False, + include_env=True, include_cmds=True): """ Returns the current configuration as an OrderedDict of OrderedDicts. + :param display_source: If False, the option value is returned. If True, a tuple of (option_value, source) is returned. Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'. @@ -358,6 +360,13 @@ def as_dict( :param raw: Should the values be output as interpolated values, or the "raw" form that can be fed back in to ConfigParser :type raw: bool + :param include_env: Should the value of configuration from AIRFLOW__ + environment variables be included or not + :type include_env: bool + :param include_cmds: Should the result of calling any *_cmd config be + set (True, default), or should the _cmd options be left as the + command to run (False) + :type include_cmds: bool """ cfg = {} configs = [ @@ -374,33 +383,35 @@ def as_dict( sect[k] = val # add env vars and overwrite because they have priority - for ev in [ev for ev in os.environ if ev.startswith('AIRFLOW__')]: - try: - _, section, key = ev.split('__') - opt = self._get_env_var_option(section, key) - except ValueError: - continue - if not display_sensitive and ev != 'AIRFLOW__CORE__UNIT_TEST_MODE': - opt = '< hidden >' - elif raw: - opt = opt.replace('%', '%%') - if display_source: - opt = (opt, 'env var') - cfg.setdefault(section.lower(), OrderedDict()).update( - {key.lower(): opt}) - - # add bash commands - for (section, key) in self.as_command_stdout: - opt = self._get_cmd_option(section, key) - if opt: - if not display_sensitive: + if include_env: + for ev in [ev for ev in os.environ if ev.startswith('AIRFLOW__')]: + try: + _, section, key = ev.split('__') + opt = self._get_env_var_option(section, key) + except ValueError: + continue + if not display_sensitive and ev != 'AIRFLOW__CORE__UNIT_TEST_MODE': opt = '< hidden >' - if display_source: - opt = (opt, 'cmd') elif raw: opt = opt.replace('%', '%%') - cfg.setdefault(section, OrderedDict()).update({key: opt}) - del cfg[section][key + '_cmd'] + if display_source: + opt = (opt, 'env var') + cfg.setdefault(section.lower(), OrderedDict()).update( + {key.lower(): opt}) + + # add bash commands + if include_cmds: + for (section, key) in self.as_command_stdout: + opt = self._get_cmd_option(section, key) + if opt: + if not display_sensitive: + opt = '< hidden >' + if display_source: + opt = (opt, 'cmd') + elif raw: + opt = opt.replace('%', '%%') + cfg.setdefault(section, OrderedDict()).update({key: opt}) + del cfg[section][key + '_cmd'] return cfg diff --git a/airflow/task/task_runner/base_task_runner.py b/airflow/task/task_runner/base_task_runner.py index 390fbad12d6cd..492a1f65fd940 100644 --- a/airflow/task/task_runner/base_task_runner.py +++ b/airflow/task/task_runner/base_task_runner.py @@ -56,13 +56,18 @@ def __init__(self, local_task_job): except conf.AirflowConfigException: self.run_as_user = None - # Always provide a copy of the configuration file settings - cfg_path = tmp_configuration_copy() - # Add sudo commands to change user if we need to. Needed to handle SubDagOperator # case using a SequentialExecutor. self.log.debug("Planning to run as the %s user", self.run_as_user) if self.run_as_user and (self.run_as_user != getpass.getuser()): + # We want to include any environment variables now, as we won't + # want to have to specify them in the sudo call - they would show + # up in `ps` that way! And run commands now, as the other user + # might not be able to run the cmds to get credentials + cfg_path = tmp_configuration_copy(chmod=0o600, + include_env=True, + include_cmds=True) + # Give ownership of file to user; only they can read and write subprocess.call( ['sudo', 'chown', self.run_as_user, cfg_path], @@ -76,6 +81,15 @@ def __init__(self, local_task_job): if pythonpath_value: popen_prepend.append('{}={}'.format(PYTHONPATH_VAR, pythonpath_value)) + else: + # Always provide a copy of the configuration file settings. Since + # we are running as the same user, and can pass through environment + # variables then we don't need to include those in the config copy + # - the runner can read/execute those values as it needs + cfg_path = tmp_configuration_copy(chmod=0o600, + include_env=False, + include_cmds=False) + self._cfg_path = cfg_path self._command = popen_prepend + self._task_instance.command_as_list( raw=True, diff --git a/airflow/utils/configuration.py b/airflow/utils/configuration.py index 0bbf7ded5e08d..3ad6bc1c6dd34 100644 --- a/airflow/utils/configuration.py +++ b/airflow/utils/configuration.py @@ -24,7 +24,7 @@ from airflow import configuration as conf -def tmp_configuration_copy(chmod=0o600): +def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True): """ Returns a path for a temporary file including a full copy of the configuration settings. @@ -34,6 +34,7 @@ def tmp_configuration_copy(chmod=0o600): temp_fd, cfg_path = mkstemp() with os.fdopen(temp_fd, 'w') as temp_file: + # Set the permissions before we write anything to it. if chmod is not None: os.fchmod(temp_fd, chmod) json.dump(cfg_dict, temp_file) diff --git a/tests/test_configuration.py b/tests/test_configuration.py index 6a4ee90815ebf..9c24e0cd7b2ee 100644 --- a/tests/test_configuration.py +++ b/tests/test_configuration.py @@ -133,6 +133,14 @@ def test_conf_as_dict_raw(self): self.assertEqual(cfg_dict['testsection']['testpercent'], 'with%%percent') self.assertEqual(cfg_dict['core']['percent'], 'with%%inside') + def test_conf_as_dict_exclude_env(self): + # test display_sensitive + cfg_dict = conf.as_dict(include_env=False, display_sensitive=True) + + # Since testsection is only created from env vars, it shouldn't be + # present at all if we don't ask for env vars to be included. + self.assertNotIn('testsection', cfg_dict) + def test_command_precedence(self): TEST_CONFIG = '''[test] key1 = hello @@ -179,6 +187,12 @@ def test_command_precedence(self): self.assertEqual('cmd_result', cfg_dict['test']['key2']) self.assertNotIn('key2_cmd', cfg_dict['test']) + # If we exclude _cmds then we should still see the commands to run, not + # their values + cfg_dict = test_conf.as_dict(include_cmds=False, display_sensitive=True) + self.assertNotIn('key4', cfg_dict['test']) + self.assertEqual('printf key4_result', cfg_dict['test']['key4_cmd']) + def test_getboolean(self): """Test AirflowConfigParser.getboolean""" TEST_CONFIG = """