From b85aff2b3e6c950cb9128d281cd6f7394563e202 Mon Sep 17 00:00:00 2001 From: Graham Reed Date: Fri, 29 May 2020 17:09:38 +0100 Subject: [PATCH] Accept client certificates from an authn/authz plugin (Plugin interface reference: https://kubernetes.io/docs/reference/access-authn-authz/authentication/#input-and-output-formats) When handling the response from the authn/authz plugin, `token` will be used if provided, which maintains current behaviour. Newly added is handling `clientCertificateData`: if it is present, that certificate (and its key) will be used as provided by the plugin. (And any certificate/key pair provided via the `users` section of the configuration file will be ignored.) --- config/kube_config.py | 46 +++++++++++++++++++++++++++++--------- config/kube_config_test.py | 35 +++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 11 deletions(-) diff --git a/config/kube_config.py b/config/kube_config.py index 9786e0e5..c3ba04ca 100644 --- a/config/kube_config.py +++ b/config/kube_config.py @@ -472,11 +472,31 @@ def _load_from_exec_plugin(self): return try: status = ExecProvider(self._user['exec']).run() - if 'token' not in status: - logging.error('exec: missing token field in plugin output') - return None - self.token = "Bearer %s" % status['token'] - return True + if 'token' in status: + self.token = "Bearer %s" % status['token'] + return True + if 'clientCertificateData' in status: + # https://kubernetes.io/docs/reference/access-authn-authz/authentication/#input-and-output-formats + # Plugin has provided certificates instead of a token. + if 'clientKeyData' not in status: + logging.error('exec: missing clientKeyData field in ' + 'plugin output') + return None + base_path = self._get_base_path(self._cluster.path) + self.cert_file = FileOrData( + status, None, + data_key_name='clientCertificateData', + file_base_path=base_path, + base64_file_content=False).as_file() + self.key_file = FileOrData( + status, None, + data_key_name='clientKeyData', + file_base_path=base_path, + base64_file_content=False).as_file() + return True + logging.error('exec: missing token or clientCertificateData field ' + 'in plugin output') + return None except Exception as e: logging.error(str(e)) @@ -512,12 +532,16 @@ def _load_cluster_info(self): self.ssl_ca_cert = FileOrData( self._cluster, 'certificate-authority', file_base_path=base_path).as_file() - self.cert_file = FileOrData( - self._user, 'client-certificate', - file_base_path=base_path).as_file() - self.key_file = FileOrData( - self._user, 'client-key', - file_base_path=base_path).as_file() + if 'cert_file' not in self.__dict__: + # cert_file could have been provided by + # _load_from_exec_plugin; only load from the _user + # section if we need it. + self.cert_file = FileOrData( + self._user, 'client-certificate', + file_base_path=base_path).as_file() + self.key_file = FileOrData( + self._user, 'client-key', + file_base_path=base_path).as_file() if 'insecure-skip-tls-verify' in self._cluster: self.verify_ssl = not self._cluster['insecure-skip-tls-verify'] diff --git a/config/kube_config_test.py b/config/kube_config_test.py index 1349cafe..63cf11aa 100644 --- a/config/kube_config_test.py +++ b/config/kube_config_test.py @@ -541,6 +541,13 @@ class TestKubeConfigLoader(BaseTestCase): "user": "exec_cred_user" } }, + { + "name": "exec_cred_user_certificate", + "context": { + "cluster": "ssl", + "user": "exec_cred_user_certificate" + } + }, { "name": "contexttestcmdpath", "context": { @@ -865,6 +872,16 @@ class TestKubeConfigLoader(BaseTestCase): } } }, + { + "name": "exec_cred_user_certificate", + "user": { + "exec": { + "apiVersion": "client.authentication.k8s.io/v1beta1", + "command": "custom-certificate-authenticator", + "args": [] + } + } + }, { "name": "usertestcmdpath", "user": { @@ -1295,6 +1312,24 @@ def test_user_exec_auth(self, mock): active_context="exec_cred_user").load_and_set(actual) self.assertEqual(expected, actual) + @mock.patch('kubernetes.config.kube_config.ExecProvider.run') + def test_user_exec_auth_certificates(self, mock): + mock.return_value = { + "clientCertificateData": TEST_CLIENT_CERT, + "clientKeyData": TEST_CLIENT_KEY, + } + expected = FakeConfig( + host=TEST_SSL_HOST, + cert_file=self._create_temp_file(TEST_CLIENT_CERT), + key_file=self._create_temp_file(TEST_CLIENT_KEY), + ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH), + verify_ssl=True) + actual = FakeConfig() + KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="exec_cred_user_certificate").load_and_set(actual) + self.assertEqual(expected, actual) + def test_user_cmd_path(self): A = namedtuple('A', ['token', 'expiry']) token = "dummy"