diff --git a/config/kube_config.py b/config/kube_config.py index 61a261f..584b8a4 100644 --- a/config/kube_config.py +++ b/config/kube_config.py @@ -60,7 +60,7 @@ def _cleanup_temp_files(): _temp_files = {} -def _create_temp_file_with_content(content): +def _create_temp_file_with_content(content, temp_file_path=None): if len(_temp_files) == 0: atexit.register(_cleanup_temp_files) # Because we may change context several times, try to remember files we @@ -68,7 +68,9 @@ def _create_temp_file_with_content(content): content_key = str(content) if content_key in _temp_files: return _temp_files[content_key] - _, name = tempfile.mkstemp() + if temp_file_path and not os.path.isdir(temp_file_path): + os.makedirs(name=temp_file_path) + _, name = tempfile.mkstemp(dir=temp_file_path) _temp_files[content_key] = name with open(name, 'wb') as fd: fd.write(content.encode() if isinstance(content, str) else content) @@ -91,12 +93,14 @@ class FileOrData(object): result in base64 encode of the file content after read.""" def __init__(self, obj, file_key_name, data_key_name=None, - file_base_path="", base64_file_content=True): + file_base_path="", base64_file_content=True, + temp_file_path=None): if not data_key_name: data_key_name = file_key_name + "-data" self._file = None self._data = None self._base64_file_content = base64_file_content + self._temp_file_path = temp_file_path if not obj: return if data_key_name in obj: @@ -116,9 +120,10 @@ def as_file(self): else: content = self._data self._file = _create_temp_file_with_content( - base64.standard_b64decode(content)) + base64.standard_b64decode(content), self._temp_file_path) else: - self._file = _create_temp_file_with_content(self._data) + self._file = _create_temp_file_with_content( + self._data, self._temp_file_path) if self._file and not os.path.isfile(self._file): raise ConfigException("File does not exist: %s" % self._file) return self._file @@ -182,7 +187,8 @@ class KubeConfigLoader(object): def __init__(self, config_dict, active_context=None, get_google_credentials=None, config_base_path="", - config_persister=None): + config_persister=None, + temp_file_path=None): if config_dict is None: raise ConfigException( @@ -199,6 +205,7 @@ def __init__(self, config_dict, active_context=None, self.set_active_context(active_context) self._config_base_path = config_base_path self._config_persister = config_persister + self._temp_file_path = temp_file_path def _refresh_credentials_with_cmd_path(): config = self._user['auth-provider']['config'] @@ -489,12 +496,14 @@ def _load_from_exec_plugin(self): status, None, data_key_name='clientCertificateData', file_base_path=base_path, - base64_file_content=False).as_file() + base64_file_content=False, + temp_file_path=self._temp_file_path).as_file() self.key_file = FileOrData( status, None, data_key_name='clientKeyData', file_base_path=base_path, - base64_file_content=False).as_file() + base64_file_content=False, + temp_file_path=self._temp_file_path).as_file() return True logging.error('exec: missing token or clientCertificateData field ' 'in plugin output') @@ -507,7 +516,8 @@ def _load_user_token(self): token = FileOrData( self._user, 'tokenFile', 'token', file_base_path=base_path, - base64_file_content=False).as_data() + base64_file_content=False, + temp_file_path=self._temp_file_path).as_data() if token: self.token = "Bearer %s" % token return True @@ -533,17 +543,20 @@ def _load_cluster_info(self): base_path = self._get_base_path(self._cluster.path) self.ssl_ca_cert = FileOrData( self._cluster, 'certificate-authority', - file_base_path=base_path).as_file() + file_base_path=base_path, + temp_file_path=self._temp_file_path).as_file() if 'cert_file' not in self.__dict__: # cert_file could have been provided by # _load_from_exec_plugin; only load from the _user # section if we need it. self.cert_file = FileOrData( self._user, 'client-certificate', - file_base_path=base_path).as_file() + file_base_path=base_path, + temp_file_path=self._temp_file_path).as_file() self.key_file = FileOrData( self._user, 'client-key', - file_base_path=base_path).as_file() + file_base_path=base_path, + temp_file_path=self._temp_file_path).as_file() if 'insecure-skip-tls-verify' in self._cluster: self.verify_ssl = not self._cluster['insecure-skip-tls-verify'] @@ -811,7 +824,8 @@ def load_kube_config(config_file=None, context=None, def load_kube_config_from_dict(config_dict, context=None, client_configuration=None, - persist_config=True): + persist_config=True, + temp_file_path=None): """Loads authentication and cluster information from config_dict file and stores them in kubernetes.client.configuration. @@ -822,8 +836,8 @@ def load_kube_config_from_dict(config_dict, context=None, set configs to. :param persist_config: If True, config file will be updated when changed (e.g GCP token refresh). + :param temp_file_path: store temp files path. """ - if config_dict is None: raise ConfigException( 'Invalid kube-config dict. ' @@ -831,7 +845,8 @@ def load_kube_config_from_dict(config_dict, context=None, loader = _get_kube_config_loader( config_dict=config_dict, active_context=context, - persist_config=persist_config) + persist_config=persist_config, + temp_file_path=temp_file_path) if client_configuration is None: config = type.__call__(Configuration) diff --git a/config/kube_config_test.py b/config/kube_config_test.py index a82ef40..c33ffed 100644 --- a/config/kube_config_test.py +++ b/config/kube_config_test.py @@ -1290,6 +1290,29 @@ def test_load_kube_config_from_dict(self): client_configuration=actual) self.assertEqual(expected, actual) + def test_load_kube_config_from_dict_with_temp_file_path(self): + expected = FakeConfig( + host=TEST_SSL_HOST, + token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, + cert_file=self._create_temp_file(TEST_CLIENT_CERT), + key_file=self._create_temp_file(TEST_CLIENT_KEY), + ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH), + verify_ssl=True + ) + actual = FakeConfig() + tmp_path = os.path.join( + os.path.dirname( + os.path.dirname( + os.path.abspath(__file__))), + 'tmp_file_path_test') + load_kube_config_from_dict(config_dict=self.TEST_KUBE_CONFIG, + context="ssl", + client_configuration=actual, + temp_file_path=tmp_path) + self.assertFalse(True if not os.listdir(tmp_path) else False) + self.assertEqual(expected, actual) + _cleanup_temp_files + def test_load_kube_config_from_empty_file_like_object(self): config_file_like_object = io.StringIO() self.assertRaises(