Skip to content

Commit

Permalink
chore: iam assume token manager test improvements
Browse files Browse the repository at this point in the history
Signed-off-by: Norbert Biczo <pyrooka@users.noreply.github.com>
  • Loading branch information
pyrooka committed Oct 9, 2024
1 parent 97e4d3b commit 2d1548c
Showing 1 changed file with 70 additions and 59 deletions.
129 changes: 70 additions & 59 deletions test/test_iam_assume_token_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import json
import logging
import time
import urllib

import jwt
import responses
Expand All @@ -26,19 +27,21 @@
from .utils.logger_utils import setup_test_logger


MY_PROFILE_ID = 'my-profile-id'
MY_PROFILE_CRN = 'my-profile-crn'
MY_PROFILE_NAME = 'my-profile-name'
MY_ACCOUNT_ID = 'my-account_id'


setup_test_logger(logging.ERROR)


def _get_current_time() -> int:
return int(time.time())


IAM_URL = "https://iam.cloud.ibm.com/identity/token"
MY_PROFILE_ID = 'my-profile-id'
MY_PROFILE_CRN = 'my-profile-crn'
MY_PROFILE_NAME = 'my-profile-name'
MY_ACCOUNT_ID = 'my-account_id'


# The base layout of an access token.
ACCESS_TOKEN_LAYOUT = {
"username": "dummy",
"role": "Admin",
Expand All @@ -50,33 +53,53 @@ def _get_current_time() -> int:
"iat": _get_current_time(),
"exp": _get_current_time() + 3600,
}

# Create two different access tokens by using different secrets for the encoding.
ACCESS_TOKEN = jwt.encode(
ACCESS_TOKEN_LAYOUT, 'secret', algorithm='HS256', headers={'kid': '230498151c214b788dd97f22b85410a5'}
)
OTHER_ACCESS_TOKEN = jwt.encode(
ACCESS_TOKEN_LAYOUT, 'other_secret', algorithm='HS256', headers={'kid': '230498151c214b788dd97f22b85410a5'}
)
# Create a base response and serialize it to a JSON string to avoid doing that in each test case.
BASE_RESPONSE = {
"access_token": ACCESS_TOKEN,
"token_type": "Bearer",
"expires_in": 3600,
"expiration": _get_current_time() + 3600,
"refresh_token": "not_available",
}
BASE_RESPONSE_JSON = json.dumps(BASE_RESPONSE)
# Create a second base response just like we did above, but use the other access token.
OTHER_BASE_RESPONSE = BASE_RESPONSE.copy()
OTHER_BASE_RESPONSE['access_token'] = OTHER_ACCESS_TOKEN
OTHER_BASE_RESPONSE_JSON = json.dumps(OTHER_BASE_RESPONSE)


def request_callback(request):
"""Parse the form data, and return a response based on the `grant_type` value."""
form_data = urllib.parse.unquote(request.body)
params = dict(param.split('=') for param in form_data.split('&'))
if params.get('grant_type') == 'urn:ibm:params:oauth:grant-type:apikey':
return (200, {}, BASE_RESPONSE_JSON)
if params.get('grant_type') == 'urn:ibm:params:oauth:grant-type:assume':
return (200, {}, OTHER_BASE_RESPONSE_JSON)
else:
raise Exception("Invalid request")


@responses.activate
def test_request_token_with_profile_id():
iam_url = "https://iam.cloud.ibm.com/identity/token"
response = {
"access_token": ACCESS_TOKEN,
"token_type": "Bearer",
"expires_in": 3600,
"expiration": _get_current_time() + 3600,
"refresh_token": "jy4gl91BQ",
}
responses.add(responses.POST, url=iam_url, body=json.dumps(response), status=200)
responses.add_callback(responses.POST, url=IAM_URL, callback=request_callback)

token_manager = IAMAssumeTokenManager("apikey", iam_profile_id=MY_PROFILE_ID)

# Make sure we don't have an access token yet.
assert token_manager.request_payload.get('access_token', None) is None
assert token_manager.request_payload.get('access_token') is None

token_manager.request_token()

# Now the access token should be set along with the profile ID.
assert token_manager.request_payload.get('access_token') is not None
assert token_manager.request_payload.get('access_token') == ACCESS_TOKEN
assert token_manager.request_payload.get('profile_id') == MY_PROFILE_ID
assert token_manager.request_payload.get('profile_crn') is None
assert token_manager.request_payload.get('profile_name') is None
Expand All @@ -85,25 +108,17 @@ def test_request_token_with_profile_id():

@responses.activate
def test_request_token_with_profile_crn():
iam_url = "https://iam.cloud.ibm.com/identity/token"
response = {
"access_token": ACCESS_TOKEN,
"token_type": "Bearer",
"expires_in": 3600,
"expiration": _get_current_time() + 3600,
"refresh_token": "jy4gl91BQ",
}
responses.add(responses.POST, url=iam_url, body=json.dumps(response), status=200)
responses.add_callback(responses.POST, url=IAM_URL, callback=request_callback)

token_manager = IAMAssumeTokenManager("apikey", iam_profile_crn=MY_PROFILE_CRN)

# Make sure we don't have an access token yet.
assert token_manager.request_payload.get('access_token', None) is None
assert token_manager.request_payload.get('access_token') is None

token_manager.request_token()

# Now the access token should be set along with the profile ID.
assert token_manager.request_payload.get('access_token') is not None
assert token_manager.request_payload.get('access_token') == ACCESS_TOKEN
assert token_manager.request_payload.get('profile_id') is None
assert token_manager.request_payload.get('profile_crn') == MY_PROFILE_CRN
assert token_manager.request_payload.get('profile_name') is None
Expand All @@ -112,25 +127,17 @@ def test_request_token_with_profile_crn():

@responses.activate
def test_request_token_with_profile_name_and_account_id():
iam_url = "https://iam.cloud.ibm.com/identity/token"
response = {
"access_token": ACCESS_TOKEN,
"token_type": "Bearer",
"expires_in": 3600,
"expiration": _get_current_time() + 3600,
"refresh_token": "jy4gl91BQ",
}
responses.add(responses.POST, url=iam_url, body=json.dumps(response), status=200)
responses.add_callback(responses.POST, url=IAM_URL, callback=request_callback)

token_manager = IAMAssumeTokenManager("apikey", iam_profile_name=MY_PROFILE_NAME, iam_account_id=MY_ACCOUNT_ID)

# Make sure we don't have an access token yet.
assert token_manager.request_payload.get('access_token', None) is None
assert token_manager.request_payload.get('access_token') is None

token_manager.request_token()

# Now the access token should be set along with the profile ID.
assert token_manager.request_payload.get('access_token') is not None
assert token_manager.request_payload.get('access_token') == ACCESS_TOKEN
assert token_manager.request_payload.get('profile_id') is None
assert token_manager.request_payload.get('profile_crn') is None
assert token_manager.request_payload.get('profile_name') == MY_PROFILE_NAME
Expand All @@ -140,15 +147,7 @@ def test_request_token_with_profile_name_and_account_id():
@responses.activate
def test_request_token_uses_the_correct_grant_types():
iam_url = "https://iam.cloud.ibm.com/identity/token"
response = {
"access_token": ACCESS_TOKEN,
"token_type": "Bearer",
"expires_in": 3600,
"expiration": _get_current_time() + 3600,
"refresh_token": "jy4gl91BQ",
}
response_json = json.dumps(response)
responses.add(responses.POST, url=iam_url, body=response_json, status=200)
responses.add(responses.POST, url=iam_url, body=BASE_RESPONSE_JSON, status=200)

token_manager = IAMAssumeTokenManager("apikey", iam_profile_id='my_profile_id')
token_manager.request_token()
Expand All @@ -159,16 +158,7 @@ def test_request_token_uses_the_correct_grant_types():

@responses.activate
def test_request_token_uses_the_correct_headers():
iam_url = "https://iam.cloud.ibm.com/identity/token"
response = {
"access_token": ACCESS_TOKEN,
"token_type": "Bearer",
"expires_in": 3600,
"expiration": _get_current_time() + 3600,
"refresh_token": "jy4gl91BQ",
}
response_json = json.dumps(response)
responses.add(responses.POST, url=iam_url, body=response_json, status=200)
responses.add_callback(responses.POST, url=IAM_URL, callback=request_callback)

token_manager = IAMAssumeTokenManager("apikey", iam_profile_id='my_profile_id')
token_manager.request_token()
Expand All @@ -178,3 +168,24 @@ def test_request_token_uses_the_correct_headers():
assert (
responses.calls[1].request.headers.get('User-Agent').startswith('ibm-python-sdk-core/iam-assume-authenticator')
)


@responses.activate
def test_get_token():
responses.add_callback(responses.POST, url=IAM_URL, callback=request_callback)

token_manager = IAMAssumeTokenManager("apikey", iam_profile_id=MY_PROFILE_ID)

# Make sure we don't have an access token yet.
assert token_manager.request_payload.get('access_token') is None

access_token = token_manager.get_token()
# Now the access token should be set along with the profile ID.
assert token_manager.request_payload.get('access_token') == ACCESS_TOKEN
assert token_manager.request_payload.get('profile_id') == MY_PROFILE_ID
assert token_manager.request_payload.get('profile_crn') is None
assert token_manager.request_payload.get('profile_name') is None
assert token_manager.request_payload.get('account_id') is None

# The final result should be the other access token, which belong to the "assume" request.
assert access_token == OTHER_ACCESS_TOKEN

0 comments on commit 2d1548c

Please sign in to comment.