From 2d1548c37b7cbedcf8213096c6891883748a1c42 Mon Sep 17 00:00:00 2001 From: Norbert Biczo Date: Wed, 9 Oct 2024 15:07:44 +0200 Subject: [PATCH] chore: iam assume token manager test improvements Signed-off-by: Norbert Biczo --- test/test_iam_assume_token_manager.py | 129 ++++++++++++++------------ 1 file changed, 70 insertions(+), 59 deletions(-) diff --git a/test/test_iam_assume_token_manager.py b/test/test_iam_assume_token_manager.py index 6489436..740c313 100644 --- a/test/test_iam_assume_token_manager.py +++ b/test/test_iam_assume_token_manager.py @@ -18,6 +18,7 @@ import json import logging import time +import urllib import jwt import responses @@ -26,12 +27,6 @@ from .utils.logger_utils import setup_test_logger -MY_PROFILE_ID = 'my-profile-id' -MY_PROFILE_CRN = 'my-profile-crn' -MY_PROFILE_NAME = 'my-profile-name' -MY_ACCOUNT_ID = 'my-account_id' - - setup_test_logger(logging.ERROR) @@ -39,6 +34,14 @@ def _get_current_time() -> int: return int(time.time()) +IAM_URL = "https://iam.cloud.ibm.com/identity/token" +MY_PROFILE_ID = 'my-profile-id' +MY_PROFILE_CRN = 'my-profile-crn' +MY_PROFILE_NAME = 'my-profile-name' +MY_ACCOUNT_ID = 'my-account_id' + + +# The base layout of an access token. ACCESS_TOKEN_LAYOUT = { "username": "dummy", "role": "Admin", @@ -50,33 +53,53 @@ def _get_current_time() -> int: "iat": _get_current_time(), "exp": _get_current_time() + 3600, } - +# Create two different access tokens by using different secrets for the encoding. ACCESS_TOKEN = jwt.encode( ACCESS_TOKEN_LAYOUT, 'secret', algorithm='HS256', headers={'kid': '230498151c214b788dd97f22b85410a5'} ) +OTHER_ACCESS_TOKEN = jwt.encode( + ACCESS_TOKEN_LAYOUT, 'other_secret', algorithm='HS256', headers={'kid': '230498151c214b788dd97f22b85410a5'} +) +# Create a base response and serialize it to a JSON string to avoid doing that in each test case. +BASE_RESPONSE = { + "access_token": ACCESS_TOKEN, + "token_type": "Bearer", + "expires_in": 3600, + "expiration": _get_current_time() + 3600, + "refresh_token": "not_available", +} +BASE_RESPONSE_JSON = json.dumps(BASE_RESPONSE) +# Create a second base response just like we did above, but use the other access token. +OTHER_BASE_RESPONSE = BASE_RESPONSE.copy() +OTHER_BASE_RESPONSE['access_token'] = OTHER_ACCESS_TOKEN +OTHER_BASE_RESPONSE_JSON = json.dumps(OTHER_BASE_RESPONSE) + + +def request_callback(request): + """Parse the form data, and return a response based on the `grant_type` value.""" + form_data = urllib.parse.unquote(request.body) + params = dict(param.split('=') for param in form_data.split('&')) + if params.get('grant_type') == 'urn:ibm:params:oauth:grant-type:apikey': + return (200, {}, BASE_RESPONSE_JSON) + if params.get('grant_type') == 'urn:ibm:params:oauth:grant-type:assume': + return (200, {}, OTHER_BASE_RESPONSE_JSON) + else: + raise Exception("Invalid request") @responses.activate def test_request_token_with_profile_id(): - iam_url = "https://iam.cloud.ibm.com/identity/token" - response = { - "access_token": ACCESS_TOKEN, - "token_type": "Bearer", - "expires_in": 3600, - "expiration": _get_current_time() + 3600, - "refresh_token": "jy4gl91BQ", - } - responses.add(responses.POST, url=iam_url, body=json.dumps(response), status=200) + responses.add_callback(responses.POST, url=IAM_URL, callback=request_callback) token_manager = IAMAssumeTokenManager("apikey", iam_profile_id=MY_PROFILE_ID) # Make sure we don't have an access token yet. - assert token_manager.request_payload.get('access_token', None) is None + assert token_manager.request_payload.get('access_token') is None token_manager.request_token() # Now the access token should be set along with the profile ID. - assert token_manager.request_payload.get('access_token') is not None + assert token_manager.request_payload.get('access_token') == ACCESS_TOKEN assert token_manager.request_payload.get('profile_id') == MY_PROFILE_ID assert token_manager.request_payload.get('profile_crn') is None assert token_manager.request_payload.get('profile_name') is None @@ -85,25 +108,17 @@ def test_request_token_with_profile_id(): @responses.activate def test_request_token_with_profile_crn(): - iam_url = "https://iam.cloud.ibm.com/identity/token" - response = { - "access_token": ACCESS_TOKEN, - "token_type": "Bearer", - "expires_in": 3600, - "expiration": _get_current_time() + 3600, - "refresh_token": "jy4gl91BQ", - } - responses.add(responses.POST, url=iam_url, body=json.dumps(response), status=200) + responses.add_callback(responses.POST, url=IAM_URL, callback=request_callback) token_manager = IAMAssumeTokenManager("apikey", iam_profile_crn=MY_PROFILE_CRN) # Make sure we don't have an access token yet. - assert token_manager.request_payload.get('access_token', None) is None + assert token_manager.request_payload.get('access_token') is None token_manager.request_token() # Now the access token should be set along with the profile ID. - assert token_manager.request_payload.get('access_token') is not None + assert token_manager.request_payload.get('access_token') == ACCESS_TOKEN assert token_manager.request_payload.get('profile_id') is None assert token_manager.request_payload.get('profile_crn') == MY_PROFILE_CRN assert token_manager.request_payload.get('profile_name') is None @@ -112,25 +127,17 @@ def test_request_token_with_profile_crn(): @responses.activate def test_request_token_with_profile_name_and_account_id(): - iam_url = "https://iam.cloud.ibm.com/identity/token" - response = { - "access_token": ACCESS_TOKEN, - "token_type": "Bearer", - "expires_in": 3600, - "expiration": _get_current_time() + 3600, - "refresh_token": "jy4gl91BQ", - } - responses.add(responses.POST, url=iam_url, body=json.dumps(response), status=200) + responses.add_callback(responses.POST, url=IAM_URL, callback=request_callback) token_manager = IAMAssumeTokenManager("apikey", iam_profile_name=MY_PROFILE_NAME, iam_account_id=MY_ACCOUNT_ID) # Make sure we don't have an access token yet. - assert token_manager.request_payload.get('access_token', None) is None + assert token_manager.request_payload.get('access_token') is None token_manager.request_token() # Now the access token should be set along with the profile ID. - assert token_manager.request_payload.get('access_token') is not None + assert token_manager.request_payload.get('access_token') == ACCESS_TOKEN assert token_manager.request_payload.get('profile_id') is None assert token_manager.request_payload.get('profile_crn') is None assert token_manager.request_payload.get('profile_name') == MY_PROFILE_NAME @@ -140,15 +147,7 @@ def test_request_token_with_profile_name_and_account_id(): @responses.activate def test_request_token_uses_the_correct_grant_types(): iam_url = "https://iam.cloud.ibm.com/identity/token" - response = { - "access_token": ACCESS_TOKEN, - "token_type": "Bearer", - "expires_in": 3600, - "expiration": _get_current_time() + 3600, - "refresh_token": "jy4gl91BQ", - } - response_json = json.dumps(response) - responses.add(responses.POST, url=iam_url, body=response_json, status=200) + responses.add(responses.POST, url=iam_url, body=BASE_RESPONSE_JSON, status=200) token_manager = IAMAssumeTokenManager("apikey", iam_profile_id='my_profile_id') token_manager.request_token() @@ -159,16 +158,7 @@ def test_request_token_uses_the_correct_grant_types(): @responses.activate def test_request_token_uses_the_correct_headers(): - iam_url = "https://iam.cloud.ibm.com/identity/token" - response = { - "access_token": ACCESS_TOKEN, - "token_type": "Bearer", - "expires_in": 3600, - "expiration": _get_current_time() + 3600, - "refresh_token": "jy4gl91BQ", - } - response_json = json.dumps(response) - responses.add(responses.POST, url=iam_url, body=response_json, status=200) + responses.add_callback(responses.POST, url=IAM_URL, callback=request_callback) token_manager = IAMAssumeTokenManager("apikey", iam_profile_id='my_profile_id') token_manager.request_token() @@ -178,3 +168,24 @@ def test_request_token_uses_the_correct_headers(): assert ( responses.calls[1].request.headers.get('User-Agent').startswith('ibm-python-sdk-core/iam-assume-authenticator') ) + + +@responses.activate +def test_get_token(): + responses.add_callback(responses.POST, url=IAM_URL, callback=request_callback) + + token_manager = IAMAssumeTokenManager("apikey", iam_profile_id=MY_PROFILE_ID) + + # Make sure we don't have an access token yet. + assert token_manager.request_payload.get('access_token') is None + + access_token = token_manager.get_token() + # Now the access token should be set along with the profile ID. + assert token_manager.request_payload.get('access_token') == ACCESS_TOKEN + assert token_manager.request_payload.get('profile_id') == MY_PROFILE_ID + assert token_manager.request_payload.get('profile_crn') is None + assert token_manager.request_payload.get('profile_name') is None + assert token_manager.request_payload.get('account_id') is None + + # The final result should be the other access token, which belong to the "assume" request. + assert access_token == OTHER_ACCESS_TOKEN