Skip to content
This repository has been archived by the owner on Mar 13, 2022. It is now read-only.

Commit

Permalink
allow incluster to accept pass-in config
Browse files Browse the repository at this point in the history
  • Loading branch information
zshihang committed May 12, 2020
1 parent bf5c599 commit 1acce20
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 74 deletions.
80 changes: 45 additions & 35 deletions config/incluster_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import datetime
import os

from kubernetes.client import Configuration

Expand All @@ -35,30 +35,33 @@ def _join_host_port(host, port):


class InClusterConfigLoader(object):

def __init__(self, token_filename,
cert_filename, environ=os.environ):
def __init__(self,
token_filename,
cert_filename,
try_refresh_token,
environ=os.environ):
self._token_filename = token_filename
self._cert_filename = cert_filename
self._environ = environ
self._try_refresh_token = try_refresh_token
self._token_refresh_period = datetime.timedelta(minutes=1)

def load_and_set(self, refresh_token=True):
def load_and_set(self, client_configuration):
self._load_config()
self._set_config(refresh_token=refresh_token)
self._set_config(client_configuration)

def _load_config(self):
if (SERVICE_HOST_ENV_NAME not in self._environ or
SERVICE_PORT_ENV_NAME not in self._environ):
if (SERVICE_HOST_ENV_NAME not in self._environ
or SERVICE_PORT_ENV_NAME not in self._environ):
raise ConfigException("Service host/port is not set.")

if (not self._environ[SERVICE_HOST_ENV_NAME] or
not self._environ[SERVICE_PORT_ENV_NAME]):
if (not self._environ[SERVICE_HOST_ENV_NAME]
or not self._environ[SERVICE_PORT_ENV_NAME]):
raise ConfigException("Service host/port is set but empty.")

self.host = (
"https://" + _join_host_port(self._environ[SERVICE_HOST_ENV_NAME],
self._environ[SERVICE_PORT_ENV_NAME]))
self.host = ("https://" +
_join_host_port(self._environ[SERVICE_HOST_ENV_NAME],
self._environ[SERVICE_PORT_ENV_NAME]))

if not os.path.isfile(self._token_filename):
raise ConfigException("Service token file does not exists.")
Expand All @@ -75,37 +78,44 @@ def _load_config(self):

self.ssl_ca_cert = self._cert_filename

def _set_config(self, refresh_token):
configuration = Configuration()
configuration.host = self.host
configuration.ssl_ca_cert = self.ssl_ca_cert
configuration.api_key['authorization'] = "bearer " + self.token
Configuration.set_default(configuration)
if not refresh_token:
def _set_config(self, client_configuration):
client_configuration.host = self.host
client_configuration.ssl_ca_cert = self.ssl_ca_cert
if self.token is not None:
client_configuration.api_key['authorization'] = self.token
if not self._try_refresh_token:
return
def wrap(f):
in_cluster_config = self
def wrapped(self, identifier):
if identifier == 'authorization' and identifier in self.api_key and in_cluster_config.token_expires_at <= datetime.datetime.now():
in_cluster_config._read_token_file()
self.api_key[identifier] = "bearer " + in_cluster_config.token
return f(self, identifier)
return wrapped
Configuration.get_api_key_with_prefix = wrap(Configuration.get_api_key_with_prefix)

def load_token_from_file(*args):
if self.token_expires_at <= datetime.datetime.now():
self._read_token_file()
return self.token

client_configuration.get_api_key_with_prefix = load_token_from_file

def _read_token_file(self):
with open(self._token_filename) as f:
self.token = f.read()
self.token_expires_at = datetime.datetime.now() + self._token_refresh_period
if not self.token:
content = f.read()
if not content:
raise ConfigException("Token file exists but empty.")
self.token = "bearer " + content
self.token_expires_at = datetime.datetime.now(
) + self._token_refresh_period


def load_incluster_config(refresh_token=True):
def load_incluster_config(client_configuration=None, try_refresh_token=True):
"""
Use the service account kubernetes gives to pods to connect to kubernetes
cluster. It's intended for clients that expect to be running inside a pod
running on kubernetes. It will raise an exception if called from a process
not running in a kubernetes environment."""
InClusterConfigLoader(token_filename=SERVICE_TOKEN_FILENAME,
cert_filename=SERVICE_CERT_FILENAME).load_and_set(refresh_token=refresh_token)
loader = InClusterConfigLoader(token_filename=SERVICE_TOKEN_FILENAME,
cert_filename=SERVICE_CERT_FILENAME,
try_refresh_token=try_refresh_token)

if client_configuration is None:
config = type.__call__(Configuration)
loader.load_and_set(config)
Configuration.set_default(config)
else:
loader.load_and_set(client_configuration)
81 changes: 42 additions & 39 deletions config/incluster_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import os
import tempfile
import unittest
import datetime
import time
import unittest

from kubernetes.client import Configuration

Expand All @@ -33,14 +33,17 @@
_TEST_IPV6_HOST = "::1"
_TEST_IPV6_HOST_PORT = "[::1]:80"

_TEST_ENVIRON = {SERVICE_HOST_ENV_NAME: _TEST_HOST,
SERVICE_PORT_ENV_NAME: _TEST_PORT}
_TEST_IPV6_ENVIRON = {SERVICE_HOST_ENV_NAME: _TEST_IPV6_HOST,
SERVICE_PORT_ENV_NAME: _TEST_PORT}
_TEST_ENVIRON = {
SERVICE_HOST_ENV_NAME: _TEST_HOST,
SERVICE_PORT_ENV_NAME: _TEST_PORT
}
_TEST_IPV6_ENVIRON = {
SERVICE_HOST_ENV_NAME: _TEST_IPV6_HOST,
SERVICE_PORT_ENV_NAME: _TEST_PORT
}


class InClusterConfigTest(unittest.TestCase):

def setUp(self):
self._temp_files = []

Expand All @@ -55,25 +58,18 @@ def _create_file_with_temp_content(self, content=""):
os.close(handler)
return name

def _overwrite_file_with_content(self, name, content=""):
handler = os.open(name, os.O_RDWR)
os.truncate(name, 0)
os.write(handler, str.encode(content))
os.close(handler)

def get_test_loader(
self,
token_filename=None,
cert_filename=None,
environ=_TEST_ENVIRON):
def get_test_loader(self,
token_filename=None,
cert_filename=None,
environ=_TEST_ENVIRON):
if not token_filename:
token_filename = self._create_file_with_temp_content(_TEST_TOKEN)
if not cert_filename:
cert_filename = self._create_file_with_temp_content(_TEST_CERT)
return InClusterConfigLoader(
token_filename=token_filename,
cert_filename=cert_filename,
environ=environ)
return InClusterConfigLoader(token_filename=token_filename,
cert_filename=cert_filename,
try_refresh_token=True,
environ=environ)

def test_join_host_port(self):
self.assertEqual(_TEST_HOST_PORT,
Expand All @@ -87,30 +83,35 @@ def test_load_config(self):
loader._load_config()
self.assertEqual("https://" + _TEST_HOST_PORT, loader.host)
self.assertEqual(cert_filename, loader.ssl_ca_cert)
self.assertEqual(_TEST_TOKEN, loader.token)
self.assertEqual('bearer ' + _TEST_TOKEN, loader.token)

def test_refresh_token(self):
loader = self.get_test_loader()
loader._token_refresh_period = datetime.timedelta(seconds=5)
loader.load_and_set()
config = Configuration()
loader.load_and_set(config)

self.assertEqual('bearer '+_TEST_TOKEN, config.get_api_key_with_prefix('authorization'))
self.assertEqual(_TEST_TOKEN, loader.token)
self.assertEqual('bearer ' + _TEST_TOKEN,
config.get_api_key_with_prefix('authorization'))
self.assertEqual('bearer ' + _TEST_TOKEN, loader.token)
self.assertIsNotNone(loader.token_expires_at)

old_token = loader.token
old_token_expires_at = loader.token_expires_at
self._overwrite_file_with_content(loader._token_filename, _TEST_NEW_TOKEN)
time.sleep(5)

self.assertEqual('bearer '+_TEST_NEW_TOKEN, config.get_api_key_with_prefix('authorization'))
self.assertEqual(_TEST_NEW_TOKEN, loader.token)
loader._token_filename = self._create_file_with_temp_content(
_TEST_NEW_TOKEN)
self.assertEqual('bearer ' + _TEST_TOKEN,
config.get_api_key_with_prefix('authorization'))

loader.token_expires_at = datetime.datetime.now()
self.assertEqual('bearer ' + _TEST_NEW_TOKEN,
config.get_api_key_with_prefix('authorization'))
self.assertEqual('bearer ' + _TEST_NEW_TOKEN, loader.token)
self.assertGreater(loader.token_expires_at, old_token_expires_at)

def _should_fail_load(self, config_loader, reason):
try:
config_loader.load_and_set()
config = Configuration()
config_loader.load_and_set(config)
self.fail("Should fail because %s" % reason)
except ConfigException:
# expected
Expand All @@ -122,9 +123,10 @@ def test_no_port(self):
self._should_fail_load(loader, "no port specified")

def test_empty_port(self):
loader = self.get_test_loader(
environ={SERVICE_HOST_ENV_NAME: _TEST_HOST,
SERVICE_PORT_ENV_NAME: ""})
loader = self.get_test_loader(environ={
SERVICE_HOST_ENV_NAME: _TEST_HOST,
SERVICE_PORT_ENV_NAME: ""
})
self._should_fail_load(loader, "empty port specified")

def test_no_host(self):
Expand All @@ -133,9 +135,10 @@ def test_no_host(self):
self._should_fail_load(loader, "no host specified")

def test_empty_host(self):
loader = self.get_test_loader(
environ={SERVICE_HOST_ENV_NAME: "",
SERVICE_PORT_ENV_NAME: _TEST_PORT})
loader = self.get_test_loader(environ={
SERVICE_HOST_ENV_NAME: "",
SERVICE_PORT_ENV_NAME: _TEST_PORT
})
self._should_fail_load(loader, "empty host specified")

def test_no_cert_file(self):
Expand Down

0 comments on commit 1acce20

Please sign in to comment.