diff --git a/config/kube_config.py b/config/kube_config.py index 584b8a4..f295dbc 100644 --- a/config/kube_config.py +++ b/config/kube_config.py @@ -359,6 +359,8 @@ def _load_gcp_token(self, provider): self._refresh_gcp_token() self.token = "Bearer %s" % provider['config']['access-token'] + if 'expiry' in provider['config']: + self.expiry = parse_rfc3339(provider['config']['expiry']) return self.token def _refresh_gcp_token(self): @@ -483,8 +485,7 @@ def _load_from_exec_plugin(self): status = ExecProvider(self._user['exec']).run() if 'token' in status: self.token = "Bearer %s" % status['token'] - return True - if 'clientCertificateData' in status: + elif 'clientCertificateData' in status: # https://kubernetes.io/docs/reference/access-authn-authz/authentication/#input-and-output-formats # Plugin has provided certificates instead of a token. if 'clientKeyData' not in status: @@ -504,10 +505,13 @@ def _load_from_exec_plugin(self): file_base_path=base_path, base64_file_content=False, temp_file_path=self._temp_file_path).as_file() - return True - logging.error('exec: missing token or clientCertificateData field ' - 'in plugin output') - return None + else: + logging.error('exec: missing token or clientCertificateData ' + 'field in plugin output') + return None + if 'expirationTimestamp' in status: + self.expiry = parse_rfc3339(status['expirationTimestamp']) + return True except Exception as e: logging.error(str(e)) @@ -560,25 +564,15 @@ def _load_cluster_info(self): if 'insecure-skip-tls-verify' in self._cluster: self.verify_ssl = not self._cluster['insecure-skip-tls-verify'] - def _using_gcp_auth_provider(self): - return self._user and \ - 'auth-provider' in self._user and \ - 'name' in self._user['auth-provider'] and \ - self._user['auth-provider']['name'] == 'gcp' - def _set_config(self, client_configuration): - if self._using_gcp_auth_provider(): - # GCP auth tokens must be refreshed regularly, but swagger expects - # a constant token. Replace the swagger-generated client config's - # get_api_key_with_prefix method with our own to allow automatic - # token refresh. - def _gcp_get_api_key(*args): - return self._load_gcp_token(self._user['auth-provider']) - client_configuration.get_api_key_with_prefix = _gcp_get_api_key if 'token' in self.__dict__: - # Note: this line runs for GCP auth tokens as well, but this entry - # will not be updated upon GCP token refresh. client_configuration.api_key['authorization'] = self.token + + def _refresh_api_key(client_configuration): + if ('expiry' in self.__dict__ and _is_expired(self.expiry)): + self._load_authentication() + self._set_config(client_configuration) + client_configuration.refresh_api_key_hook = _refresh_api_key # copy these keys directly from self to configuration object keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file', 'verify_ssl'] for key in keys: diff --git a/config/kube_config_test.py b/config/kube_config_test.py index c33ffed..8151f94 100644 --- a/config/kube_config_test.py +++ b/config/kube_config_test.py @@ -29,7 +29,7 @@ from kubernetes.client import Configuration from .config_exception import ConfigException -from .dateutil import parse_rfc3339 +from .dateutil import format_rfc3339, parse_rfc3339 from .kube_config import (ENV_KUBECONFIG_PATH_SEPARATOR, CommandTokenSource, ConfigNode, FileOrData, KubeConfigLoader, KubeConfigMerger, _cleanup_temp_files, @@ -346,9 +346,12 @@ def test_get_with_name_on_duplicate_name(self): class FakeConfig: FILE_KEYS = ["ssl_ca_cert", "key_file", "cert_file"] + IGNORE_KEYS = ["refresh_api_key_hook"] def __init__(self, token=None, **kwargs): self.api_key = {} + # Provided by the OpenAPI-generated Configuration class + self.refresh_api_key_hook = None if token: self.api_key['authorization'] = token @@ -358,6 +361,8 @@ def __eq__(self, other): if len(self.__dict__) != len(other.__dict__): return for k, v in self.__dict__.items(): + if k in self.IGNORE_KEYS: + continue if k not in other.__dict__: return if k in self.FILE_KEYS: @@ -956,17 +961,15 @@ def test_load_user_token(self): def test_gcp_no_refresh(self): fake_config = FakeConfig() - # swagger-generated config has this, but FakeConfig does not. - self.assertFalse(hasattr(fake_config, 'get_api_key_with_prefix')) + self.assertIsNone(fake_config.refresh_api_key_hook) KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="gcp", get_google_credentials=lambda: _raise_exception( "SHOULD NOT BE CALLED")).load_and_set(fake_config) # Should now be populated with a gcp token fetcher. - self.assertIsNotNone(fake_config.get_api_key_with_prefix) + self.assertIsNotNone(fake_config.refresh_api_key_hook) self.assertEqual(TEST_HOST, fake_config.host) - # For backwards compatibility, authorization field should still be set. self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, fake_config.api_key['authorization']) @@ -997,7 +1000,7 @@ def cred(): return None self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64, loader.token) - def test_gcp_get_api_key_with_prefix(self): + def test_gcp_refresh_api_key_hook(self): class cred_old: token = TEST_DATA_BASE64 expiry = DATETIME_EXPIRY_PAST @@ -1015,15 +1018,13 @@ class cred_new: get_google_credentials=_get_google_credentials) loader.load_and_set(fake_config) original_expiry = _get_expiry(loader, "expired_gcp_refresh") - # Call GCP token fetcher. - token = fake_config.get_api_key_with_prefix() + # Refresh the GCP token. + fake_config.refresh_api_key_hook(fake_config) new_expiry = _get_expiry(loader, "expired_gcp_refresh") self.assertTrue(new_expiry > original_expiry) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64, loader.token) - self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64, - token) def test_oidc_no_refresh(self): loader = KubeConfigLoader( @@ -1383,6 +1384,38 @@ def test_user_exec_auth(self, mock): active_context="exec_cred_user").load_and_set(actual) self.assertEqual(expected, actual) + @mock.patch('kubernetes.config.kube_config.ExecProvider.run') + def test_user_exec_auth_with_expiry(self, mock): + expired_token = "expired" + current_token = "current" + mock.side_effect = [ + { + "token": expired_token, + "expirationTimestamp": format_rfc3339(DATETIME_EXPIRY_PAST) + }, + { + "token": current_token, + "expirationTimestamp": format_rfc3339(DATETIME_EXPIRY_FUTURE) + } + ] + + fake_config = FakeConfig() + self.assertIsNone(fake_config.refresh_api_key_hook) + + KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="exec_cred_user").load_and_set(fake_config) + # The kube config should use the first token returned from the + # exec provider. + self.assertEqual(fake_config.api_key["authorization"], + BEARER_TOKEN_FORMAT % expired_token) + # Should now be populated with a method to refresh expired tokens. + self.assertIsNotNone(fake_config.refresh_api_key_hook) + # Refresh the token; the kube config should be updated. + fake_config.refresh_api_key_hook(fake_config) + self.assertEqual(fake_config.api_key["authorization"], + BEARER_TOKEN_FORMAT % current_token) + @mock.patch('kubernetes.config.kube_config.ExecProvider.run') def test_user_exec_auth_certificates(self, mock): mock.return_value = { @@ -1412,7 +1445,6 @@ def test_user_cmd_path(self): KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="contexttestcmdpath").load_and_set(actual) - del actual.get_api_key_with_prefix self.assertEqual(expected, actual) def test_user_cmd_path_empty(self): @@ -1490,31 +1522,28 @@ def test__get_kube_config_loader_dict_no_persist(self): class TestKubernetesClientConfiguration(BaseTestCase): # Verifies properties of kubernetes.client.Configuration. # These tests guard against changes to the upstream configuration class, - # since GCP authorization overrides get_api_key_with_prefix to refresh its - # token regularly. + # since GCP and Exec authorization use refresh_api_key_hook to refresh + # their tokens regularly. - def test_get_api_key_with_prefix_exists(self): - self.assertTrue(hasattr(Configuration, 'get_api_key_with_prefix')) + def test_refresh_api_key_hook_exists(self): + self.assertTrue(hasattr(Configuration(), 'refresh_api_key_hook')) - def test_get_api_key_with_prefix_returns_token(self): - expected_token = 'expected_token' - config = Configuration() - config.api_key['authorization'] = expected_token - self.assertEqual(expected_token, - config.get_api_key_with_prefix('authorization')) - - def test_auth_settings_calls_get_api_key_with_prefix(self): + def test_get_api_key_calls_refresh_api_key_hook(self): + identifier = 'authorization' expected_token = 'expected_token' old_token = 'old_token' + config = Configuration( + api_key={identifier: old_token}, + api_key_prefix={identifier: 'Bearer'} + ) + + def refresh_api_key_hook(client_config): + self.assertEqual(client_config, config) + client_config.api_key[identifier] = expected_token + config.refresh_api_key_hook = refresh_api_key_hook - def fake_get_api_key_with_prefix(identifier): - self.assertEqual('authorization', identifier) - return expected_token - config = Configuration() - config.api_key['authorization'] = old_token - config.get_api_key_with_prefix = fake_get_api_key_with_prefix - self.assertEqual(expected_token, - config.auth_settings()['BearerToken']['value']) + self.assertEqual('Bearer ' + expected_token, + config.get_api_key_with_prefix(identifier)) class TestKubeConfigMerger(BaseTestCase):