Skip to content
This repository has been archived by the owner on Mar 13, 2022. It is now read-only.

load_kube_config_from_dict() support define custom temp files path #233

Merged
merged 1 commit into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions config/kube_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,17 @@ def _cleanup_temp_files():
_temp_files = {}


def _create_temp_file_with_content(content):
def _create_temp_file_with_content(content, temp_file_path=None):
if len(_temp_files) == 0:
atexit.register(_cleanup_temp_files)
# Because we may change context several times, try to remember files we
# created and reuse them at a small memory cost.
content_key = str(content)
if content_key in _temp_files:
return _temp_files[content_key]
_, name = tempfile.mkstemp()
if temp_file_path and not os.path.isdir(temp_file_path):
os.makedirs(name=temp_file_path)
_, name = tempfile.mkstemp(dir=temp_file_path)
_temp_files[content_key] = name
with open(name, 'wb') as fd:
fd.write(content.encode() if isinstance(content, str) else content)
Expand All @@ -91,12 +93,14 @@ class FileOrData(object):
result in base64 encode of the file content after read."""

def __init__(self, obj, file_key_name, data_key_name=None,
file_base_path="", base64_file_content=True):
file_base_path="", base64_file_content=True,
temp_file_path=None):
if not data_key_name:
data_key_name = file_key_name + "-data"
self._file = None
self._data = None
self._base64_file_content = base64_file_content
self._temp_file_path = temp_file_path
if not obj:
return
if data_key_name in obj:
Expand All @@ -116,9 +120,10 @@ def as_file(self):
else:
content = self._data
self._file = _create_temp_file_with_content(
base64.standard_b64decode(content))
base64.standard_b64decode(content), self._temp_file_path)
else:
self._file = _create_temp_file_with_content(self._data)
self._file = _create_temp_file_with_content(
self._data, self._temp_file_path)
if self._file and not os.path.isfile(self._file):
raise ConfigException("File does not exist: %s" % self._file)
return self._file
Expand Down Expand Up @@ -182,7 +187,8 @@ class KubeConfigLoader(object):
def __init__(self, config_dict, active_context=None,
get_google_credentials=None,
config_base_path="",
config_persister=None):
config_persister=None,
temp_file_path=None):

if config_dict is None:
raise ConfigException(
Expand All @@ -199,6 +205,7 @@ def __init__(self, config_dict, active_context=None,
self.set_active_context(active_context)
self._config_base_path = config_base_path
self._config_persister = config_persister
self._temp_file_path = temp_file_path

def _refresh_credentials_with_cmd_path():
config = self._user['auth-provider']['config']
Expand Down Expand Up @@ -489,12 +496,14 @@ def _load_from_exec_plugin(self):
status, None,
data_key_name='clientCertificateData',
file_base_path=base_path,
base64_file_content=False).as_file()
base64_file_content=False,
temp_file_path=self._temp_file_path).as_file()
self.key_file = FileOrData(
status, None,
data_key_name='clientKeyData',
file_base_path=base_path,
base64_file_content=False).as_file()
base64_file_content=False,
temp_file_path=self._temp_file_path).as_file()
return True
logging.error('exec: missing token or clientCertificateData field '
'in plugin output')
Expand All @@ -507,7 +516,8 @@ def _load_user_token(self):
token = FileOrData(
self._user, 'tokenFile', 'token',
file_base_path=base_path,
base64_file_content=False).as_data()
base64_file_content=False,
temp_file_path=self._temp_file_path).as_data()
if token:
self.token = "Bearer %s" % token
return True
Expand All @@ -533,17 +543,20 @@ def _load_cluster_info(self):
base_path = self._get_base_path(self._cluster.path)
self.ssl_ca_cert = FileOrData(
self._cluster, 'certificate-authority',
file_base_path=base_path).as_file()
file_base_path=base_path,
temp_file_path=self._temp_file_path).as_file()
if 'cert_file' not in self.__dict__:
# cert_file could have been provided by
# _load_from_exec_plugin; only load from the _user
# section if we need it.
self.cert_file = FileOrData(
self._user, 'client-certificate',
file_base_path=base_path).as_file()
file_base_path=base_path,
temp_file_path=self._temp_file_path).as_file()
self.key_file = FileOrData(
self._user, 'client-key',
file_base_path=base_path).as_file()
file_base_path=base_path,
temp_file_path=self._temp_file_path).as_file()
if 'insecure-skip-tls-verify' in self._cluster:
self.verify_ssl = not self._cluster['insecure-skip-tls-verify']

Expand Down Expand Up @@ -811,7 +824,8 @@ def load_kube_config(config_file=None, context=None,

def load_kube_config_from_dict(config_dict, context=None,
client_configuration=None,
persist_config=True):
persist_config=True,
temp_file_path=None):
"""Loads authentication and cluster information from config_dict file
and stores them in kubernetes.client.configuration.

Expand All @@ -822,16 +836,17 @@ def load_kube_config_from_dict(config_dict, context=None,
set configs to.
:param persist_config: If True, config file will be updated when changed
(e.g GCP token refresh).
:param temp_file_path: store temp files path.
"""

if config_dict is None:
raise ConfigException(
'Invalid kube-config dict. '
'No configuration found.')

loader = _get_kube_config_loader(
config_dict=config_dict, active_context=context,
persist_config=persist_config)
persist_config=persist_config,
temp_file_path=temp_file_path)

if client_configuration is None:
config = type.__call__(Configuration)
Expand Down
23 changes: 23 additions & 0 deletions config/kube_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,29 @@ def test_load_kube_config_from_dict(self):
client_configuration=actual)
self.assertEqual(expected, actual)

def test_load_kube_config_from_dict_with_temp_file_path(self):
expected = FakeConfig(
host=TEST_SSL_HOST,
token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64,
cert_file=self._create_temp_file(TEST_CLIENT_CERT),
key_file=self._create_temp_file(TEST_CLIENT_KEY),
ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH),
verify_ssl=True
)
actual = FakeConfig()
tmp_path = os.path.join(
os.path.dirname(
os.path.dirname(
os.path.abspath(__file__))),
'tmp_file_path_test')
load_kube_config_from_dict(config_dict=self.TEST_KUBE_CONFIG,
context="ssl",
client_configuration=actual,
temp_file_path=tmp_path)
self.assertFalse(True if not os.listdir(tmp_path) else False)
onecer marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual(expected, actual)
onecer marked this conversation as resolved.
Show resolved Hide resolved
_cleanup_temp_files

def test_load_kube_config_from_empty_file_like_object(self):
config_file_like_object = io.StringIO()
self.assertRaises(
Expand Down