Skip to content
This repository has been archived by the owner on Mar 13, 2022. It is now read-only.

Fix issue 84: Update _load_azure_token to hande str and int #141

Merged
merged 3 commits into from
Jul 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion config/kube_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,21 @@ def _load_auth_provider_token(self):
if provider['name'] == 'oidc':
return self._load_oid_token(provider)

def _azure_is_expired(self, provider):
expires_on = provider['config']['expires-on']
if expires_on.isdigit():
return int(expires_on) < time.time()
else:
exp_time = time.strptime(expires_on, '%Y-%m-%d %H:%M:%S.%f')
return exp_time < time.gmtime()

def _load_azure_token(self, provider):
if 'config' not in provider:
return
if 'access-token' not in provider['config']:
return
if 'expires-on' in provider['config']:
if int(provider['config']['expires-on']) < time.gmtime():
if self._azure_is_expired(provider):
self._refresh_azure_token(provider['config'])
self.token = 'Bearer %s' % provider['config']['access-token']
return self.token
Expand Down
162 changes: 162 additions & 0 deletions config/kube_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ def _raise_exception(st):

TEST_OIDC_CA = _base64(TEST_CERTIFICATE_AUTH)

TEST_AZURE_LOGIN = TEST_OIDC_LOGIN
TEST_AZURE_TOKEN = "test-azure-token"
TEST_AZURE_TOKEN_FULL = "Bearer " + TEST_AZURE_TOKEN


class BaseTestCase(unittest.TestCase):

Expand Down Expand Up @@ -420,6 +424,41 @@ class TestKubeConfigLoader(BaseTestCase):
"user": "oidc"
}
},
{
"name": "azure",
"context": {
"cluster": "default",
"user": "azure"
}
},
{
"name": "azure_num",
"context": {
"cluster": "default",
"user": "azure_num"
}
},
{
"name": "azure_str",
"context": {
"cluster": "default",
"user": "azure_str"
}
},
{
"name": "azure_num_error",
"context": {
"cluster": "default",
"user": "azure_str_error"
}
},
{
"name": "azure_str_error",
"context": {
"cluster": "default",
"user": "azure_str_error"
}
},
{
"name": "expired_oidc",
"context": {
Expand Down Expand Up @@ -603,6 +642,89 @@ class TestKubeConfigLoader(BaseTestCase):
}
}
},
{
"name": "azure",
"user": {
"auth-provider": {
"config": {
"access-token": TEST_AZURE_TOKEN,
"apiserver-id": "ApiserverId",
"environment": "AzurePublicCloud",
"refresh-token": "refreshToken",
"tenant-id": "9d2ac018-e843-4e14-9e2b-4e0ddac75433"
},
"name": "azure"
}
}
},
{
"name": "azure_num",
"user": {
"auth-provider": {
"config": {
"access-token": TEST_AZURE_TOKEN,
"apiserver-id": "ApiserverId",
"environment": "AzurePublicCloud",
"expires-in": "0",
"expires-on": "156207275",
"refresh-token": "refreshToken",
"tenant-id": "9d2ac018-e843-4e14-9e2b-4e0ddac75433"
},
"name": "azure"
}
}
},
{
"name": "azure_str",
"user": {
"auth-provider": {
"config": {
"access-token": TEST_AZURE_TOKEN,
"apiserver-id": "ApiserverId",
"environment": "AzurePublicCloud",
"expires-in": "0",
"expires-on": "2018-10-18 00:52:29.044727",
"refresh-token": "refreshToken",
"tenant-id": "9d2ac018-e843-4e14-9e2b-4e0ddac75433"
},
"name": "azure"
}
}
},
{
"name": "azure_str_error",
"user": {
"auth-provider": {
"config": {
"access-token": TEST_AZURE_TOKEN,
"apiserver-id": "ApiserverId",
"environment": "AzurePublicCloud",
"expires-in": "0",
"expires-on": "2018-10-18 00:52",
"refresh-token": "refreshToken",
"tenant-id": "9d2ac018-e843-4e14-9e2b-4e0ddac75433"
},
"name": "azure"
}
}
},
{
"name": "azure_num_error",
"user": {
"auth-provider": {
"config": {
"access-token": TEST_AZURE_TOKEN,
"apiserver-id": "ApiserverId",
"environment": "AzurePublicCloud",
"expires-in": "0",
"expires-on": "-1",
"refresh-token": "refreshToken",
"tenant-id": "9d2ac018-e843-4e14-9e2b-4e0ddac75433"
},
"name": "azure"
}
}
},
{
"name": "expired_oidc",
"user": {
Expand Down Expand Up @@ -886,6 +1008,46 @@ def test_oidc_fails_if_invalid_padding_length(self):
None,
)

def test_azure_no_refresh(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="azure",
)
self.assertTrue(loader._load_auth_provider_token())
self.assertEqual(TEST_AZURE_TOKEN_FULL, loader.token)

def test_azure_with_expired_num(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="azure_num",
)
provider = loader._user['auth-provider']
self.assertTrue(loader._azure_is_expired(provider))

def test_azure_with_expired_str(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="azure_str",
)
provider = loader._user['auth-provider']
self.assertTrue(loader._azure_is_expired(provider))

def test_azure_with_expired_str_error(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="azure_str_error",
)
provider = loader._user['auth-provider']
self.assertRaises(ValueError, loader._azure_is_expired, provider)

def test_azure_with_expired_int_error(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="azure_num_error",
)
provider = loader._user['auth-provider']
self.assertRaises(ValueError, loader._azure_is_expired, provider)

def test_user_pass(self):
expected = FakeConfig(host=TEST_HOST, token=TEST_BASIC_TOKEN)
actual = FakeConfig()
Expand Down