diff --git a/ibm_cloud_sdk_core/__init__.py b/ibm_cloud_sdk_core/__init__.py index 0361987..7f99a49 100644 --- a/ibm_cloud_sdk_core/__init__.py +++ b/ibm_cloud_sdk_core/__init__.py @@ -17,6 +17,6 @@ from .detailed_response import DetailedResponse from .iam_token_manager import IAMTokenManager from .jwt_token_manager import JWTTokenManager -from .icp4d_token_manager import ICP4DTokenManager +from .cp4d_token_manager import CP4DTokenManager from .api_exception import ApiException from .utils import datetime_to_string, string_to_datetime diff --git a/ibm_cloud_sdk_core/authenticators/__init__.py b/ibm_cloud_sdk_core/authenticators/__init__.py new file mode 100644 index 0000000..59d5013 --- /dev/null +++ b/ibm_cloud_sdk_core/authenticators/__init__.py @@ -0,0 +1,22 @@ +# coding: utf-8 + +# Copyright 2019 IBM All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .authenticator import Authenticator +from .basic_authenticator import BasicAuthenticator +from .bearer_authenticator import BearerAuthenticator +from .cp4d_authenticator import CP4DAuthenticator +from .iam_authenticator import IAMAuthenticator +from .no_auth_authenticator import NoAuthAuthenticator diff --git a/ibm_cloud_sdk_core/authenticators/authenticator.py b/ibm_cloud_sdk_core/authenticators/authenticator.py new file mode 100644 index 0000000..162f3f2 --- /dev/null +++ b/ibm_cloud_sdk_core/authenticators/authenticator.py @@ -0,0 +1,46 @@ +# coding: utf-8 + +# Copyright 2019 IBM All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod + +class Authenticator(ABC): + @abstractmethod + def authenticate(self): + """ + Returns the (username, password) or bearer + """ + pass + + @abstractmethod + def validate(self): + """ + Checks if all the inputs needed are present + """ + pass + + @abstractmethod + def _is_basic_authentication(self): + """ + Return true if basic authentication + """ + pass + + @abstractmethod + def _is_bearer_authentication(self): + """ + Return true if bearer authentication + """ + pass diff --git a/ibm_cloud_sdk_core/authenticators/basic_authenticator.py b/ibm_cloud_sdk_core/authenticators/basic_authenticator.py new file mode 100644 index 0000000..196deb2 --- /dev/null +++ b/ibm_cloud_sdk_core/authenticators/basic_authenticator.py @@ -0,0 +1,78 @@ +# coding: utf-8 + +# Copyright 2019 IBM All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .authenticator import Authenticator +from ..utils import has_bad_first_or_last_char + + +class BasicAuthenticator(Authenticator): + authentication_type = 'basic' + + def __init__(self, username, password): + """ + :attr str username: The username + :attr str password: The password + """ + self.username = username + self.password = password + self.validate() + + def validate(self): + """ + Performs validation on input params + """ + if self.username is None or self.password is None: + raise ValueError('The username and password shouldn\'t be None.') + + if has_bad_first_or_last_char( + self.username) or has_bad_first_or_last_char(self.password): + raise ValueError( + 'The username and password shouldn\'t start or end with curly brackets or quotes. ' + 'Please remove any surrounding {, }, or \" characters.') + + def authenticate(self): + """ + Returns the username and password tuple + """ + return (self.username, self.password) + + def set_username(self, username): + """ + Sets the username + """ + self.username = username + self.validate() + + def set_password(self, password): + """ + Sets the password + """ + self.password = password + self.validate() + + def set_username_and_password(self, username, password): + """ + Sets the username and password + """ + self.username = username + self.password = password + self.validate() + + def _is_basic_authentication(self): + return True + + def _is_bearer_authentication(self): + return False diff --git a/ibm_cloud_sdk_core/authenticators/bearer_authenticator.py b/ibm_cloud_sdk_core/authenticators/bearer_authenticator.py new file mode 100644 index 0000000..56818b4 --- /dev/null +++ b/ibm_cloud_sdk_core/authenticators/bearer_authenticator.py @@ -0,0 +1,59 @@ +# coding: utf-8 + +# Copyright 2019 IBM All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .authenticator import Authenticator +from ..utils import has_bad_first_or_last_char + + +class BearerAuthenticator(Authenticator): + authentication_type = 'bearerToken' + + def __init__(self, bearer_token): + """ + :attr str bearer_token: User managed bearer token + """ + self.bearer_token = bearer_token + self.validate() + + def validate(self): + """ + Performs validation on input params + """ + if self.bearer_token is None: + raise ValueError('The bearer token shouldn\'t be None.') + + if has_bad_first_or_last_char(self.bearer_token): + raise ValueError( + 'The bearer token shouldn\'t start or end with curly brackets or quotes. ' + 'Please remove any surrounding {, }, or \" characters.') + + def authenticate(self): + """ + Returns the bearer token + """ + return 'Bearer {0}'.format(self.bearer_token) + + def set_bearer_token(self, bearer_token): + """ + Sets the bearer token + """ + self.bearer_token = bearer_token + + def _is_basic_authentication(self): + return False + + def _is_bearer_authentication(self): + return True diff --git a/ibm_cloud_sdk_core/authenticators/cp4d_authenticator.py b/ibm_cloud_sdk_core/authenticators/cp4d_authenticator.py new file mode 100644 index 0000000..b8e38cc --- /dev/null +++ b/ibm_cloud_sdk_core/authenticators/cp4d_authenticator.py @@ -0,0 +1,112 @@ +# coding: utf-8 + +# Copyright 2019 IBM All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .authenticator import Authenticator +from ..cp4d_token_manager import CP4DTokenManager +from ..utils import has_bad_first_or_last_char + + +class CP4DAuthenticator(Authenticator): + authentication_type = 'cp4d' + + def __init__(self, + username, + password, + url, + disable_ssl_verification=False, + headers=None, + proxies=None): + """ + :attr str username: The username + :attr str password: The password + :attr str url: The url for authentication + :attr bool disable_ssl_verification: enables/ disabled ssl verification + :attr dict headers: user-defined headers + :attr dict proxies: user-defined proxies + """ + self.token_manager = CP4DTokenManager( + username, password, url, disable_ssl_verification, headers, proxies) + self.validate() + + def validate(self): + """ + Performs validation on input params + """ + if self.token_manager.username is None or self.token_manager.password is None: + raise ValueError('The username and password shouldn\'t be None.') + + if has_bad_first_or_last_char( + self.token_manager.username) or has_bad_first_or_last_char(self.token_manager.password): + raise ValueError( + 'The username and password shouldn\'t start or end with curly brackets or quotes. ' + 'Please remove any surrounding {, }, or \" characters.') + + if has_bad_first_or_last_char(self.token_manager.url): + raise ValueError( + 'The url shouldn\'t start or end with curly brackets or quotes. ' + 'Please remove any surrounding {, }, or \" characters.') + + def authenticate(self): + """ + Returns the bearer token + """ + bearer_token = self.token_manager.get_token() + return 'Bearer {0}'.format(bearer_token) + + def _is_basic_authentication(self): + return False + + def _is_bearer_authentication(self): + return True + + def set_username(self, username): + """ + Sets the username + """ + self.token_manager.set_username(username) + self.validate() + + def set_password(self, password): + """ + Sets the password + """ + self.token_manager.set_password(password) + self.validate() + + def set_url(self, url): + """ + Sets the url + """ + self.token_manager.set_url(url) + self.validate() + + def set_disable_ssl_verification(self, status=False): + """ + Sets the ssl verification to enabled or disabled + """ + self.token_manager.set_disable_ssl_verification(status) + + def set_headers(self, headers): + """ + Sets user-defined headers + """ + self.token_manager.set_headers(headers) + + def set_proxies(self, proxies): + """ + Sets the proxies + """ + self.token_manager.set_proxies(proxies) diff --git a/ibm_cloud_sdk_core/authenticators/iam_authenticator.py b/ibm_cloud_sdk_core/authenticators/iam_authenticator.py new file mode 100644 index 0000000..33862ef --- /dev/null +++ b/ibm_cloud_sdk_core/authenticators/iam_authenticator.py @@ -0,0 +1,120 @@ +# coding: utf-8 + +# Copyright 2019 IBM All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .authenticator import Authenticator +from ..iam_token_manager import IAMTokenManager +from ..utils import has_bad_first_or_last_char + + +class IAMAuthenticator(Authenticator): + authentication_type = 'iam' + + def __init__(self, + apikey, + url=None, + client_id=None, + client_secret=None, + disable_ssl_verification=False, + headers=None, + proxies=None): + """ + :attr str apikey: The apikey + :attr str url: The url for authentication + :attr str client_id: The client id for rate limiting + :attr str client_secret: The client secret for rate limiting + :attr bool disable_ssl_verification: enables/ disabled ssl verification + :attr dict headers: user-defined headers + :attr dict proxies: user-defined proxies + """ + self.token_manager = IAMTokenManager( + apikey, url, client_id, client_secret, disable_ssl_verification, + headers, proxies) + self.validate() + + def validate(self): + """ + Performs validation on input params + """ + if self.token_manager.apikey is None: + raise ValueError('The apikey shouldn\'t be None.') + + if has_bad_first_or_last_char(self.token_manager.apikey): + raise ValueError( + 'The apikey shouldn\'t start or end with curly brackets or quotes. ' + 'Please remove any surrounding {, }, or \" characters.') + + if (self.token_manager.client_id and + not self.token_manager.client_secret) or ( + not self.token_manager.client_id and + self.token_manager.client_secret): + raise ValueError( + 'Both client id and client secret should be initialized.') + + def authenticate(self): + """ + Returns the bearer token + """ + bearer_token = self.token_manager.get_token() + return 'Bearer {0}'.format(bearer_token) + + def _is_basic_authentication(self): + return False + + def _is_bearer_authentication(self): + return True + + def set_apikey(self, apikey): + """ + Set the IAM api key + """ + self.token_manager.set_apikey(apikey) + self.validate() + + def set_url(self, url): + """ + Set the IAM url + """ + self.token_manager.set_url(url) + + def set_authorization_info(self, client_id, client_secret): + """ + Set the IAM authorization information. + This consists of the client_id and secret. + These values are used to form the basic authorization header that + is used when interacting with the IAM token server. + If these values are not supplied, then a default Authorization header + is used. + """ + self.token_manager.set_authorization_info(client_id, client_secret) + self.validate() + + def set_disable_ssl_verification(self, status=False): + """ + Sets the ssl verification to enabled or disabled + """ + self.token_manager.set_disable_ssl_verification(status) + + def set_headers(self, headers): + """ + Sets user-defined headers + """ + self.token_manager.set_headers(headers) + + def set_proxies(self, proxies): + """ + Sets the proxies + """ + self.token_manager.set_proxies(proxies) diff --git a/ibm_cloud_sdk_core/authenticators/no_auth_authenticator.py b/ibm_cloud_sdk_core/authenticators/no_auth_authenticator.py new file mode 100644 index 0000000..38d184d --- /dev/null +++ b/ibm_cloud_sdk_core/authenticators/no_auth_authenticator.py @@ -0,0 +1,32 @@ +# coding: utf-8 + +# Copyright 2019 IBM All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .authenticator import Authenticator + +class NoAuthAuthenticator(Authenticator): + authentication_type = 'noauth' + + def validate(self): + pass + + def _is_basic_authentication(self): + return False + + def _is_bearer_authentication(self): + return False + + def authenticate(self): + pass diff --git a/ibm_cloud_sdk_core/base_service.py b/ibm_cloud_sdk_core/base_service.py index b384af6..1aee67a 100644 --- a/ibm_cloud_sdk_core/base_service.py +++ b/ibm_cloud_sdk_core/base_service.py @@ -23,134 +23,72 @@ from requests.structures import CaseInsensitiveDict from .version import __version__ from .utils import has_bad_first_or_last_char, remove_null_values, cleanup_values -from .iam_token_manager import IAMTokenManager -from .icp4d_token_manager import ICP4DTokenManager from .detailed_response import DetailedResponse from .api_exception import ApiException - -try: - from http.cookiejar import CookieJar # Python 3 -except ImportError: - from cookielib import CookieJar # Python 2 +from .authenticators import Authenticator, BasicAuthenticator, BearerAuthenticator, CP4DAuthenticator, IAMAuthenticator, NoAuthAuthenticator +from http.cookiejar import CookieJar # Uncomment this to enable http debugging -# try: -# import http.client as http_client -# except ImportError: -# # Python 2 -# import httplib as http_client +# import http.client as http_client # http_client.HTTPConnection.debuglevel = 1 + class BaseService(object): - BEARER = 'Bearer' - ICP_PREFIX = 'icp-' - APIKEY = 'apikey' - IAM_ACCESS_TOKEN = 'iam_access_token' - ICP4D_ACCESS_TOKEN = 'icp4d_access_token' - URL = 'url' - USERNAME = 'username' - PASSWORD = 'password' - IAM_APIKEY = 'iam_apikey' - IAM_URL = 'iam_url' - ICP4D_URL = 'icp4d_url' - APIKEY_DEPRECATION_MESSAGE = 'Authenticating with apikey is deprecated. Move to using Identity and Access Management (IAM) authentication.' DEFAULT_CREDENTIALS_FILE_NAME = 'ibm-credentials.env' SDK_NAME = 'ibm-python-sdk-core' - def __init__(self, vcap_services_name, url, username=None, password=None, - use_vcap_services=True, api_key=None, iam_apikey=None, iam_url=None, - iam_access_token=None, iam_client_id=None, iam_client_secret=None, - display_name=None, icp4d_access_token=None, icp4d_url=None, authentication_type=None): + def __init__(self, + vcap_services_name, + url, + authenticator=None, + disable_ssl_verification=False, + display_name=None): """ It loads credentials with the following preference: - 1) Credentials explicitly set in the request + 1) Credentials explicitly set in the authenticator 2) Credentials loaded from credentials file if present - 3) Credentials loaded from VCAP_SERVICES environment variable if available and use_vcap_services is True + 3) Credentials loaded from VCAP_SERVICES environment variable + + :attr str vcap_services_name: The vcap service name + :attr str url: The url for service api calls + :attr bool disable_ssl_verification: enables/ disabled ssl verification + :attr str display_name the name used for mapping services in credential file """ self.url = url self.http_config = {} - self.authentication_type = authentication_type.lower() if authentication_type else None self.jar = CookieJar() - self.api_key = api_key - self.username = username - self.password = password - self.iam_apikey = iam_apikey - self.iam_access_token = iam_access_token - self.iam_url = iam_url - self.iam_client_id = iam_client_id - self.iam_client_secret = iam_client_secret - self.icp4d_access_token = icp4d_access_token - self.icp4d_url = icp4d_url - self.token_manager = None + self.authenticator = authenticator + self.disable_ssl_verification = disable_ssl_verification self.default_headers = None - self.verify = None # Indicates whether to ignore verifying the SSL certification - self._check_credentials() - - self.set_user_agent_header(self.build_user_agent()) + self._set_user_agent_header(self._build_user_agent()) # 1. Credentials are passed in constructor - if self.authentication_type == 'iam' or self._has_iam_credentials(self.iam_apikey, self.iam_access_token) or self._has_iam_credentials(self.api_key, self.iam_access_token): - self.token_manager = IAMTokenManager(self.iam_apikey or self.api_key or self.password, - self.iam_access_token, - self.iam_url, - self.iam_client_id, - self.iam_client_secret) - self.iam_apikey = self.iam_apikey or self.api_key or self.password - elif self._uses_basic_for_iam(self.username, self.password): - self.token_manager = IAMTokenManager(self.password, - self.iam_access_token, - self.iam_url, - self.iam_client_id, - self.iam_client_secret) - self.iam_apikey = self.password - self.username = None - self.password = None - elif self._is_for_icp4d(self.authentication_type, self.icp4d_access_token): - if self.icp4d_access_token is None and self.icp4d_url is None: - raise Exception('The icp4d_url is mandatory for ICP4D.') - self.token_manager = ICP4DTokenManager(self.icp4d_url, - self.username, - self.password, - self.icp4d_access_token) - elif self._is_for_icp(self.api_key) or self._is_for_icp(self.iam_apikey): - self.username = self.APIKEY - self.password = self.api_key or self.iam_apikey - elif self.token_manager is None and self._has_basic_credentials(username, password): - self.username = username - self.password = password + if self.authenticator: + if not isinstance(self.authenticator, Authenticator): + raise ValueError( + 'authenticator should be of type Authenticator') # 2. Credentials from credential file - if display_name and not self.username and not self.token_manager: + if display_name and not self.authenticator: service_name = display_name.replace(' ', '_').lower() self._load_from_credential_file(service_name) # 3. Credentials from VCAP - if use_vcap_services and not self.username and not self.token_manager: - self.vcap_service_credentials = self._load_from_vcap_services( + if not self.authenticator: + vcap_service_credentials = self._load_from_vcap_services( vcap_services_name) - if self.vcap_service_credentials is not None and isinstance( - self.vcap_service_credentials, dict): - self.url = self.vcap_service_credentials[self.URL] - if self.USERNAME in self.vcap_service_credentials: - self.username = self.vcap_service_credentials.get(self.USERNAME) - if self.PASSWORD in self.vcap_service_credentials: - self.password = self.vcap_service_credentials.get(self.PASSWORD) - if self.APIKEY in self.vcap_service_credentials: - self.set_iam_apikey(self.vcap_service_credentials.get(self.APIKEY)) - if self.IAM_APIKEY in self.vcap_service_credentials: - self.set_iam_apikey(self.vcap_service_credentials.get(self.IAM_APIKEY)) - if self.IAM_ACCESS_TOKEN in self.vcap_service_credentials: - self.set_iam_access_token(self.vcap_service_credentials.get(self.IAM_ACCESS_TOKEN)) - if self.ICP4D_URL in self.vcap_service_credentials: - self.icp4d_url = self.vcap_service_credentials.get(self.ICP4D_URL) - if self.ICP4D_ACCESS_TOKEN in self.vcap_service_credentials: - self.set_icp4d_access_token(self.vcap_service_credentials.get(self.ICP4D_ACCESS_TOKEN)) - - if (self.username is None or self.password is None) and self.token_manager is None: - raise ValueError( - 'You must specify your IAM api key or username and password service ' - 'credentials (Note: these are different from your IBM Cloud id)') + if vcap_service_credentials is not None and isinstance( + vcap_service_credentials, dict): + if vcap_service_credentials.get('username') and vcap_service_credentials.get('password'): # cf + vcap_service_credentials['auth_type'] = 'basic' + elif vcap_service_credentials.get('apikey'): # rc + vcap_service_credentials['auth_type'] = 'iam' + self._set_authenticator_properties(vcap_service_credentials) + self._set_service_properties(vcap_service_credentials) + + if not self.authenticator: + self.authenticator = NoAuthAuthenticator() def _load_from_credential_file(self, service_name, separator='='): """ @@ -164,37 +102,66 @@ def _load_from_credential_file(self, service_name, separator='='): # Home directory if credential_file_path is None: - file_path = join(expanduser('~'), self.DEFAULT_CREDENTIALS_FILE_NAME) + file_path = join( + expanduser('~'), self.DEFAULT_CREDENTIALS_FILE_NAME) if isfile(file_path): credential_file_path = file_path # Top-level of the project directory if credential_file_path is None: - file_path = join(dirname(dirname(abspath(__file__))), self.DEFAULT_CREDENTIALS_FILE_NAME) + file_path = join( + dirname(dirname(abspath(__file__))), + self.DEFAULT_CREDENTIALS_FILE_NAME) if isfile(file_path): credential_file_path = file_path + properties = {} if credential_file_path is not None: with open(credential_file_path, 'r') as fp: for line in fp: key_val = line.strip().split(separator) if len(key_val) == 2: - self._set_credential_based_on_type(service_name, key_val[0].lower(), key_val[1]) - - def _set_credential_based_on_type(self, service_name, key, value): - if service_name in key: - if self.APIKEY in key: - self.set_iam_apikey(value) - elif self.URL in key: - self.set_url(value) - elif self.USERNAME in key: - self.username = value - elif self.PASSWORD in key: - self.password = value - elif self.IAM_APIKEY in key: - self.set_iam_apikey(value) - elif self.IAM_URL in key: - self.set_iam_url(value) + key = key_val[0].lower() + value = key_val[1] + if service_name in key: + index = key.find('_') + if index != -1: + properties[key[index + 1:]] = value + + if properties: + self._set_authenticator_properties(properties) + self._set_service_properties(properties) + + def _set_authenticator_properties(self, properties): + auth_type = properties.get('auth_type') + if auth_type == 'basic': + self.authenticator = BasicAuthenticator( + username=properties.get('username'), + password=properties.get('password')) + elif auth_type == 'bearerToken': + self.authenticator = BearerAuthenticator( + bearer_token=properties.get('bearer_token')) + elif auth_type == 'cp4d': + self.authenticator = CP4DAuthenticator( + username=properties.get('username'), + password=properties.get('password'), + url=properties.get('auth_url'), + disable_ssl_verification=properties.get('auth_disable_ssl')) + elif auth_type == 'iam': + self.authenticator = IAMAuthenticator( + apikey=properties.get('apikey'), + url=properties.get('auth_url'), + client_id=properties.get('client_id'), + client_secret=properties.get('client_secret'), + disable_ssl_verification=properties.get('auth_disable_ssl')) + elif auth_type == 'noauth': + self.authenticator = NoAuthAuthenticator() + + def _set_service_properties(self, properties): + if 'url' in properties: + self.url = properties.get('url') + if 'disable_ssl' in properties: + self.disable_ssl_verification = properties.get('disable_ssl') def _load_from_vcap_services(self, service_name): vcap_services = os.getenv('VCAP_SERVICES') @@ -205,101 +172,51 @@ def _load_from_vcap_services(self, service_name): else: return None - def _is_for_icp(self, credential=None): - return credential and credential.startswith(self.ICP_PREFIX) - - def _is_for_icp4d(self, authentication_type, icp4d_access_token=None): - return authentication_type == 'icp4d' or icp4d_access_token + def _get_system_info(self): + return '{0} {1} {2}'.format( + platform.system(), # OS + platform.release(), # OS version + platform.python_version()) # Python version - def _has_basic_credentials(self, username, password): - return username and password and not self._uses_basic_for_iam(username, password) + def _build_user_agent(self): + return '{0}-{1} {2}'.format(self.SDK_NAME, __version__, + self._get_system_info()) - def _has_iam_credentials(self, iam_apikey, iam_access_token): - return (iam_apikey or iam_access_token) and not self._is_for_icp(iam_apikey) + def _set_user_agent_header(self, user_agent_string=None): + self.user_agent_header = {'User-Agent': user_agent_string} - def _uses_basic_for_iam(self, username, password): + def set_http_config(self, http_config): """ - Returns true if the user provides basic auth creds with the intention - of using IAM auth + Sets the http client config like timeout, proxies, etc. """ - return username and password and username == self.APIKEY and not self._is_for_icp(password) - - def _has_bad_first_or_last_char(self, str): - return str is not None and (str.startswith('{') or str.startswith('"') or str.endswith('}') or str.endswith('"')) - - def _check_credentials(self): - credentials_to_check = { - 'URL': self.url, - 'username': self.username, - 'password': self.password, - 'credentials': self.iam_apikey - } - - for key in credentials_to_check: - if self._has_bad_first_or_last_char(credentials_to_check.get(key)): - raise ValueError('The ' + key + ' shouldn\'t start or end with curly brackets or quotes. ' - 'Be sure to remove any {} and \" characters surrounding your ' + key) - - def disable_SSL_verification(self): - self.verify = False - if self.token_manager is not None: - self.token_manager.disable_SSL_verification(True) - - def set_username_and_password(self, username, password): - if has_bad_first_or_last_char(username): - raise ValueError('The username shouldn\'t start or end with curly brackets or quotes. ' - 'Be sure to remove any {} and \" characters surrounding your username') - if has_bad_first_or_last_char(password): - raise ValueError('The password shouldn\'t start or end with curly brackets or quotes. ' - 'Be sure to remove any {} and \" characters surrounding your password') - - self.username = username - self.password = password - self.jar = CookieJar() - - def set_iam_access_token(self, iam_access_token): - if self.token_manager: - self.token_manager.set_access_token(iam_access_token) - else: - self.token_manager = IAMTokenManager(iam_access_token=iam_access_token) - self.iam_access_token = iam_access_token - self.jar = CookieJar() - - def set_icp4d_access_token(self, icp4d_access_token): - if self.token_manager: - self.token_manager.set_access_token(icp4d_access_token) - else: - if self.icp4d_url is None: - raise Exception('The icp4d_url is mandatory for ICP4D.') - self.token_manager = ICP4DTokenManager(self.icp4d_url, access_token=icp4d_access_token) - self.icp4d_access_token = icp4d_access_token - self.jar = CookieJar() - - def set_iam_url(self, iam_url): - if self.token_manager: - self.token_manager.set_iam_url(iam_url) + if isinstance(http_config, dict): + self.http_config = http_config else: - self.token_manager = IAMTokenManager(iam_url=iam_url) - self.iam_url = iam_url - self.jar = CookieJar() + raise TypeError("http_config parameter must be a dictionary") - def set_iam_apikey(self, iam_apikey): - if has_bad_first_or_last_char(iam_apikey): - raise ValueError('The credentials shouldn\'t start or end with curly brackets or quotes. ' - 'Be sure to remove any {} and \" characters surrounding your credentials') - if self.token_manager: - self.token_manager.set_iam_apikey(iam_apikey) - else: - self.token_manager = IAMTokenManager(iam_apikey=iam_apikey) - self.iam_apikey = iam_apikey - self.jar = CookieJar() + def set_disable_ssl_verification(self, status=False): + """ + Sets the ssl verification to enabled or disabled + """ + self.disable_ssl_verification = status def set_url(self, url): + """ + Sets the url + """ if has_bad_first_or_last_char(url): - raise ValueError('The URL shouldn\'t start or end with curly brackets or quotes. ' - 'Be sure to remove any {} and \" characters surrounding your URL') + raise ValueError( + 'The url shouldn\'t start or end with curly brackets or quotes. ' + 'Be sure to remove any {} and \" characters surrounding your url' + ) self.url = url + def get_authenticator(self): + """ + Returns the authenticator + """ + return self.authenticator + def set_default_headers(self, headers): """ Set http headers to be sent in every request. @@ -310,31 +227,16 @@ def set_default_headers(self, headers): else: raise TypeError("headers parameter must be a dictionary") - def get_system_info(self): - return '{0} {1} {2}'.format(platform.system(), # OS - platform.release(), # OS version - platform.python_version()) # Python version - - def build_user_agent(self): - return '{0}-{1} {2}'.format(self.SDK_NAME, __version__, self.get_system_info()) - - def get_user_agent_header(self): - return self.user_agent_header - - def set_user_agent_header(self, user_agent_string=None): - self.user_agent_header = {'User-Agent': user_agent_string} - - def set_http_config(self, http_config): - """ - Sets the http client config like timeout, proxies, etc. - """ - if isinstance(http_config, dict): - self.http_config = http_config - else: - raise TypeError("http_config parameter must be a dictionary") - - def request(self, method, url, accept_json=False, headers=None, - params=None, json=None, data=None, files=None, **kwargs): + def request(self, + method, + url, + accept_json=False, + headers=None, + params=None, + json=None, + data=None, + files=None, + **kwargs): full_url = self.url + url headers = remove_null_values(headers) if headers else {} @@ -365,52 +267,62 @@ def request(self, method, url, accept_json=False, headers=None, headers.update({'content-type': 'application/json'}) auth = None - if self.token_manager: - access_token = self.token_manager.get_token() - headers['Authorization'] = '{0} {1}'.format(self.BEARER, access_token) - elif self.username and self.password: - auth = (self.username, self.password) + if self.authenticator._is_bearer_authentication(): + headers['Authorization'] = self.authenticator.authenticate() + elif self.authenticator._is_basic_authentication(): + auth = self.authenticator.authenticate() # Use a one minute timeout when our caller doesn't give a timeout. # http://docs.python-requests.org/en/master/user/quickstart/#timeouts kwargs = dict({"timeout": 60}, **kwargs) kwargs = dict(kwargs, **self.http_config) - if self.verify is not None: - kwargs['verify'] = self.verify + if self.disable_ssl_verification: + kwargs['verify'] = False if files is not None: for k, file_tuple in files.items(): - if file_tuple and len(file_tuple) == 3 and file_tuple[0] is None: + if file_tuple and len( + file_tuple) == 3 and file_tuple[0] is None: file = file_tuple[1] if file and hasattr(file, 'name'): filename = basename(file.name) files[k] = (filename, file_tuple[1], file_tuple[2]) - response = requests.request(method=method, url=full_url, - cookies=self.jar, auth=auth, - headers=headers, - params=params, data=data, files=files, - **kwargs) + response = requests.request( + method=method, + url=full_url, + cookies=self.jar, + auth=auth, + headers=headers, + params=params, + data=data, + files=files, + **kwargs) if 200 <= response.status_code <= 299: if response.status_code == 204 or method == 'HEAD': # There is no body content for a HEAD request or a 204 response - return DetailedResponse(None, response.headers, response.status_code) + return DetailedResponse(None, response.headers, + response.status_code) if accept_json: try: response_json = response.json() except: # deserialization fails because there is no text - return DetailedResponse(None, response.headers, response.status_code) - return DetailedResponse(response_json, response.headers, response.status_code) - return DetailedResponse(response, response.headers, response.status_code) + return DetailedResponse(None, response.headers, + response.status_code) + return DetailedResponse(response_json, response.headers, + response.status_code) + return DetailedResponse(response, response.headers, + response.status_code) else: error_message = None if response.status_code == 401: error_message = 'Unauthorized: Access is denied due to ' \ 'invalid credentials' - raise ApiException(response.status_code, error_message, http_response=response) + raise ApiException( + response.status_code, error_message, http_response=response) @staticmethod def _convert_model(val, classname=None): diff --git a/ibm_cloud_sdk_core/cp4d_token_manager.py b/ibm_cloud_sdk_core/cp4d_token_manager.py new file mode 100644 index 0000000..d95a20b --- /dev/null +++ b/ibm_cloud_sdk_core/cp4d_token_manager.py @@ -0,0 +1,94 @@ +# coding: utf-8 + +# Copyright 2019 IBM All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .jwt_token_manager import JWTTokenManager + + +class CP4DTokenManager(JWTTokenManager): + TOKEN_NAME = 'accessToken' + + def __init__(self, + username, + password, + url, + disable_ssl_verification=False, + headers=None, + proxies=None): + """ + :attr str username: The username + :attr str password: The password + :attr str url: The url for authentication + :attr bool disable_ssl_verification: enables/ disabled ssl verification + :attr dict headers: user-defined headers + :attr dict proxies: user-defined proxies + """ + self.username = username + self.password = password + url = url + '/v1/preauth/validateAuth' + self.headers = headers + self.proxies = proxies + super(CP4DTokenManager, self).__init__(url, disable_ssl_verification, + self.TOKEN_NAME) + + def request_token(self): + """ + Makes a request for a token + """ + auth_tuple = (self.username, self.password) + + response = self._request( + method='GET', + headers=self.headers, + url=self.url, + auth_tuple=auth_tuple, + proxies=self.proxies) + return response + + def set_username(self, username): + """ + Sets the username + """ + self.username = username + + def set_password(self, password): + """ + Sets the password + """ + self.password = password + + def set_url(self, url): + """ + Sets the url + """ + self.url = url + + def set_headers(self, headers): + """ + Sets user-defined headers + """ + if isinstance(headers, dict): + self.headers = headers + else: + raise TypeError('headers must be a dictionary') + + def set_proxies(self, proxies): + """ + Sets the proxies + """ + if isinstance(proxies, dict): + self.proxies = proxies + else: + raise TypeError('proxies must be a dictionary') diff --git a/ibm_cloud_sdk_core/iam_token_manager.py b/ibm_cloud_sdk_core/iam_token_manager.py index 40d1d9c..b00efa4 100644 --- a/ibm_cloud_sdk_core/iam_token_manager.py +++ b/ibm_cloud_sdk_core/iam_token_manager.py @@ -16,6 +16,7 @@ from .jwt_token_manager import JWTTokenManager + class IAMTokenManager(JWTTokenManager): DEFAULT_IAM_URL = 'https://iam.cloud.ibm.com/identity/token' CONTENT_TYPE = 'application/x-www-form-urlencoded' @@ -23,13 +24,31 @@ class IAMTokenManager(JWTTokenManager): REQUEST_TOKEN_RESPONSE_TYPE = 'cloud_iam' TOKEN_NAME = 'access_token' - def __init__(self, iam_apikey=None, iam_access_token=None, iam_url=None, - iam_client_id=None, iam_client_secret=None): - self.iam_apikey = iam_apikey - self.iam_url = iam_url if iam_url else self.DEFAULT_IAM_URL - self.iam_client_id = iam_client_id - self.iam_client_secret = iam_client_secret - super(IAMTokenManager, self).__init__(self.iam_url, iam_access_token, self.TOKEN_NAME) + def __init__(self, + apikey, + url=None, + client_id=None, + client_secret=None, + disable_ssl_verification=False, + headers=None, + proxies=None): + """ + :attr str apikey: The apikey + :attr str url: The url for authentication + :attr str client_id: The client id for rate limiting + :attr str client_secret: The client secret for rate limiting + :attr bool disable_ssl_verification: enables/ disabled ssl verification + :attr dict headers: user-defined headers + :attr dict proxies: user-defined proxies + """ + self.apikey = apikey + self.url = url if url else self.DEFAULT_IAM_URL + self.client_id = client_id + self.client_secret = client_secret + self.headers = headers + self.proxies = proxies + super(IAMTokenManager, self).__init__( + self.url, disable_ssl_verification, self.TOKEN_NAME) def request_token(self): """ @@ -39,9 +58,12 @@ def request_token(self): 'Content-type': self.CONTENT_TYPE, 'Accept': 'application/json' } + if self.headers is not None and isinstance(self.headers, dict): + headers.update(self.headers) + data = { 'grant_type': self.REQUEST_TOKEN_GRANT_TYPE, - 'apikey': self.iam_apikey, + 'apikey': self.apikey, 'response_type': self.REQUEST_TOKEN_RESPONSE_TYPE } @@ -49,30 +71,31 @@ def request_token(self): auth_tuple = ('bx', 'bx') # If both the clientId and secret were specified by the user, then use them - if self.iam_client_id and self.iam_client_secret: - auth_tuple = (self.iam_client_id, self.iam_client_secret) + if self.client_id and self.client_secret: + auth_tuple = (self.client_id, self.client_secret) response = self._request( method='POST', url=self.url, headers=headers, data=data, - auth_tuple=auth_tuple) + auth_tuple=auth_tuple, + proxies=self.proxies) return response - def set_iam_apikey(self, iam_apikey): + def set_apikey(self, apikey): """ - Set the IAM api key + Set the apikey """ - self.iam_apikey = iam_apikey + self.apikey = apikey - def set_iam_url(self, iam_url): + def set_url(self, url): """ Set the IAM url """ - self.iam_url = iam_url + self.url = url - def set_iam_authorization_info(self, iam_client_id, iam_client_secret): + def set_authorization_info(self, client_id, client_secret): """ Set the IAM authorization information. This consists of the client_id and secret. @@ -81,5 +104,23 @@ def set_iam_authorization_info(self, iam_client_id, iam_client_secret): If these values are not supplied, then a default Authorization header is used. """ - self.iam_client_id = iam_client_id - self.iam_client_secret = iam_client_secret + self.client_id = client_id + self.client_secret = client_secret + + def set_headers(self, headers): + """ + Sets user-defined headers + """ + if isinstance(headers, dict): + self.headers = headers + else: + raise TypeError('headers must be a dictionary') + + def set_proxies(self, proxies): + """ + Sets proxies + """ + if isinstance(proxies, dict): + self.proxies = proxies + else: + raise TypeError('proxies must be a dictionary') diff --git a/ibm_cloud_sdk_core/icp4d_token_manager.py b/ibm_cloud_sdk_core/icp4d_token_manager.py deleted file mode 100644 index d203669..0000000 --- a/ibm_cloud_sdk_core/icp4d_token_manager.py +++ /dev/null @@ -1,34 +0,0 @@ -# coding: utf-8 - -# Copyright 2019 IBM All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from .jwt_token_manager import JWTTokenManager - -class ICP4DTokenManager(JWTTokenManager): - TOKEN_NAME = 'accessToken' - def __init__(self, icp4d_url, username=None, password=None, access_token=None): - url = icp4d_url + '/v1/preauth/validateAuth' - self.username = username - self.password = password - super(ICP4DTokenManager, self).__init__(url, access_token, self.TOKEN_NAME) - - def request_token(self): - auth_tuple = (self.username, self.password) - - response = self._request( - method='GET', - url=self.url, - auth_tuple=auth_tuple) - return response diff --git a/ibm_cloud_sdk_core/jwt_token_manager.py b/ibm_cloud_sdk_core/jwt_token_manager.py index d2506b3..cebed62 100644 --- a/ibm_cloud_sdk_core/jwt_token_manager.py +++ b/ibm_cloud_sdk_core/jwt_token_manager.py @@ -19,30 +19,26 @@ import requests from .api_exception import ApiException + class JWTTokenManager(object): - def __init__(self, url, access_token=None, token_name=None): + + def __init__(self, url, disable_ssl_verification=False, token_name=None): """ - Parameters - ---------- - url : str - url of the api to retrieve tokens from - access_token : str - User-managed access token + :attr str url: url of the API to retrieve tokens from + :attr bool disable_ssl_verification: disables ssl verification when True + :attr: str token_name: name of the key containing the token """ - self.token_info = {} self.url = url - self.user_access_token = access_token - self.time_to_live = None - self.expire_time = None - self.verify = None # to enable/ disable SSL verification + self.disable_ssl_verification = disable_ssl_verification self.token_name = token_name + self.token_info = {} + self.time_for_new_token = None def get_token(self): """ The source of the token is determined by the following logic: - 1. If user provides their own managed access token, assume it is valid and send it - 2. a) If this class is managing tokens and does not yet have one, make a request for one - b) If this class is managing tokens and the token has expired, request a new one + 2. a) If this class does not yet have one, make a request for one + b) If this class token has expired, request a new one 3. If this class is managing tokens and has a valid token stored, send it Returns @@ -50,37 +46,22 @@ def get_token(self): str A valid access token """ - if self.user_access_token: - return self.user_access_token - elif not self.token_info or self._is_token_expired(): + if not self.token_info or self._is_token_expired(): token_response = self.request_token() self._save_token_info(token_response) return self.token_info.get(self.token_name) - def set_access_token(self, access_token): + def set_disable_ssl_verification(self, status=False): """ - Set a self-managed IAM access token. - The access token should be valid and not yet expired. - - By using this method, you accept responsibility for managing the - access token yourself. You must set a new access token before this - one expires. Failing to do so will result in authentication errors - after this token expires. - - Parameters - ---------- - access_token : str - A valid, non-expired access token + Sets the ssl verification to enabled or disabled """ - self.user_access_token = access_token - - def disable_SSL_verification(self, status=None): - if status is not None: - self.verify = status + self.disable_ssl_verification = status def request_token(self): - raise NotImplementedError('request_token MUST be overridden by a subclass of JWTTokenManager.') + raise NotImplementedError( + 'request_token MUST be overridden by a subclass of JWTTokenManager.' + ) def _get_current_time(self): return int(time.time()) @@ -99,13 +80,11 @@ def _is_token_expired(self): bool If token expired or not """ - if self.time_to_live is None or self.expire_time is None: + if not self.time_for_new_token: return True - fraction_of_ttl = 0.8 current_time = self._get_current_time() - time_for_new_token = self.expire_time - (self.time_to_live * (1.0 - fraction_of_ttl)) - return time_for_new_token < current_time + return self.time_for_new_token < current_time def _save_token_info(self, token_response): """ @@ -124,18 +103,32 @@ def _save_token_info(self, token_response): iat = decoded_response.get('iat') # exp is the time of expire and iat is the time of token retrieval - self.time_to_live = exp - iat - self.expire_time = exp - + time_to_live = exp - iat + expire_time = exp + fraction_of_ttl = 0.8 + self.time_for_new_token = expire_time - (time_to_live * + (1.0 - fraction_of_ttl)) self.token_info = token_response - def _request(self, method, url, headers=None, params=None, data=None, auth_tuple=None, **kwargs): - if self.verify is not None: - kwargs['verify'] = not self.verify - - response = requests.request(method=method, url=url, - headers=headers, params=params, - data=data, auth=auth_tuple, **kwargs) + def _request(self, + method, + url, + headers=None, + params=None, + data=None, + auth_tuple=None, + **kwargs): + if self.disable_ssl_verification: + kwargs['verify'] = False + + response = requests.request( + method=method, + url=url, + headers=headers, + params=params, + data=data, + auth=auth_tuple, + **kwargs) if 200 <= response.status_code <= 299: return response.json() else: diff --git a/resources/ibm-credentials-basic.env b/resources/ibm-credentials-basic.env new file mode 100644 index 0000000..8896a6a --- /dev/null +++ b/resources/ibm-credentials-basic.env @@ -0,0 +1,3 @@ +WATSON_USERNAME=my_username +WATSON_PASSWORD=my_password +WATSON_AUTH_TYPE=basic \ No newline at end of file diff --git a/resources/ibm-credentials-bearer.env b/resources/ibm-credentials-bearer.env new file mode 100644 index 0000000..c7c7b71 --- /dev/null +++ b/resources/ibm-credentials-bearer.env @@ -0,0 +1,2 @@ +WATSON_BEARER_TOKEN=my_token +WATSON_AUTH_TYPE=bearerToken diff --git a/resources/ibm-credentials-cp4d.env b/resources/ibm-credentials-cp4d.env new file mode 100644 index 0000000..732533e --- /dev/null +++ b/resources/ibm-credentials-cp4d.env @@ -0,0 +1,6 @@ +WATSON_USERNAME=my_username +WATSON_PASSWORD=my_password +WATSON_AUTH_URL=https://my_url +WATSON_AUTH_TYPE=cp4d +WATSON_AUTH_DISABLE_SSL=False +WATSON_DISABLE_SSL=True diff --git a/resources/ibm-credentials.env b/resources/ibm-credentials-iam.env similarity index 58% rename from resources/ibm-credentials.env rename to resources/ibm-credentials-iam.env index 7f8ae3a..142252c 100644 --- a/resources/ibm-credentials.env +++ b/resources/ibm-credentials-iam.env @@ -1,5 +1,3 @@ WATSON_APIKEY=5678efgh WATSON_URL=https://gateway-s.watsonplatform.net/watson/api -WATSON_USERNAME=xxx -WATSON_PASSWORD=yyy -WATSON_IAM_URL=lll \ No newline at end of file +WATSON_AUTH_TYPE=iam \ No newline at end of file diff --git a/resources/ibm-credentials-no-auth.env b/resources/ibm-credentials-no-auth.env new file mode 100644 index 0000000..bb874be --- /dev/null +++ b/resources/ibm-credentials-no-auth.env @@ -0,0 +1 @@ +WATSON_AUTH_TYPE=noauth \ No newline at end of file diff --git a/test/test_base_service.py b/test/test_base_service.py index 53afb63..d7a410e 100644 --- a/test/test_base_service.py +++ b/test/test_base_service.py @@ -7,35 +7,25 @@ import jwt from ibm_cloud_sdk_core import BaseService from ibm_cloud_sdk_core import ApiException -from ibm_cloud_sdk_core import ICP4DTokenManager +from ibm_cloud_sdk_core import CP4DTokenManager +from ibm_cloud_sdk_core.authenticators import IAMAuthenticator, Authenticator, BasicAuthenticator, CP4DAuthenticator + class AnyServiceV1(BaseService): default_url = 'https://gateway.watsonplatform.net/test/api' - def __init__(self, version, url=default_url, username=None, password=None, - api_key=None, - iam_apikey=None, - iam_access_token=None, - iam_url=None, - icp4d_access_token=None, - icp4d_url=None, - authentication_type=None - ): + def __init__(self, + version, + url=default_url, + authenticator=None, + disable_ssl_verification=False): BaseService.__init__( self, vcap_services_name='test', url=url, - api_key=api_key, - username=username, - password=password, - use_vcap_services=True, - iam_apikey=iam_apikey, - iam_access_token=iam_access_token, - iam_url=iam_url, - display_name='Watson', - icp4d_access_token=icp4d_access_token, - icp4d_url=icp4d_url, - authentication_type=authentication_type) + authenticator=authenticator, + disable_ssl_verification=disable_ssl_verification, + display_name='Watson') self.version = version def op_with_path_params(self, path0, path1): @@ -63,14 +53,12 @@ def head_request(self): response = self.request(method='HEAD', url='', accept_json=True) return response + def get_access_token(): access_token_layout = { "username": "dummy", "role": "Admin", - "permissions": [ - "administrator", - "manage_catalog" - ], + "permissions": ["administrator", "manage_catalog"], "sub": "admin", "iss": "sss", "aud": "sss", @@ -79,12 +67,19 @@ def get_access_token(): "exp": int(time.time()) } - access_token = jwt.encode(access_token_layout, 'secret', algorithm='HS256', headers={'kid': '230498151c214b788dd97f22b85410a5'}) + access_token = jwt.encode( + access_token_layout, + 'secret', + algorithm='HS256', + headers={ + 'kid': '230498151c214b788dd97f22b85410a5' + }) return access_token.decode('utf-8') + @responses.activate def test_url_encoding(): - service = AnyServiceV1('2017-07-07', username='username', password='password') + service = AnyServiceV1('2017-07-07') # All characters in path0 _must_ be encoded in path segments path0 = ' \"<>^`{}|/\\?#%[]' @@ -96,11 +91,14 @@ def test_url_encoding(): path_encoded = '/v1/foo/' + path0_encoded + '/bar/' + path1_encoded + '/baz' test_url = service.default_url + path_encoded - responses.add(responses.GET, - test_url, - status=200, - body=json.dumps({"foobar": "baz"}), - content_type='application/json') + responses.add( + responses.GET, + test_url, + status=200, + body=json.dumps({ + "foobar": "baz" + }), + content_type='application/json') response = service.op_with_path_params(path0, path1) @@ -109,38 +107,39 @@ def test_url_encoding(): assert path_encoded in responses.calls[0].request.url assert 'version=2017-07-07' in responses.calls[0].request.url + @responses.activate def test_http_config(): - service = AnyServiceV1('2017-07-07', username='username', password='password') - responses.add(responses.GET, - service.default_url, - status=200, - body=json.dumps({"foobar": "baz"}), - content_type='application/json') + service = AnyServiceV1('2017-07-07') + responses.add( + responses.GET, + service.default_url, + status=200, + body=json.dumps({ + "foobar": "baz" + }), + content_type='application/json') response = service.with_http_config({'timeout': 100}) assert response is not None assert len(responses.calls) == 1 + def test_fail_http_config(): - service = AnyServiceV1('2017-07-07', username='username', password='password') + service = AnyServiceV1('2017-07-07') with pytest.raises(TypeError): service.with_http_config(None) + @responses.activate def test_iam(): - iam_url = "https://iam.cloud.ibm.com/identity/token" - service = AnyServiceV1('2017-07-07', iam_apikey="iam_apikey") - assert service.token_manager is not None - - service.set_iam_url('https://iam-test.cloud.ibm.com/identity/token') - assert service.token_manager.iam_url == 'https://iam-test.cloud.ibm.com/identity/token' + url = "https://iam.cloud.ibm.com/identity/token" + iam_authenticator = IAMAuthenticator('my_apikey') + service = AnyServiceV1('2017-07-07', authenticator=iam_authenticator) + assert service.authenticator is not None - iam_url = "https://iam.cloud.ibm.com/identity/token" - service = AnyServiceV1('2017-07-07', username='xxx', password='yyy') - assert service.token_manager is None - service.set_iam_apikey('yyy') - assert service.token_manager is not None + iam_authenticator.set_url('https://iam-test.cloud.ibm.com/identity/token') + assert service.authenticator.token_manager.url == 'https://iam-test.cloud.ibm.com/identity/token' response = { "access_token": get_access_token(), @@ -149,127 +148,83 @@ def test_iam(): "expiration": int(time.time()), "refresh_token": "jy4gl91BQ" } - responses.add(responses.POST, url=iam_url, body=json.dumps(response), status=200) - responses.add(responses.GET, - service.default_url, - body=json.dumps({"foobar": "baz"}), - content_type='application/json') + responses.add( + responses.POST, + url='https://iam-test.cloud.ibm.com/identity/token', + body=json.dumps(response), + status=200) + responses.add( + responses.GET, + service.default_url, + body=json.dumps({ + "foobar": "baz" + }), + content_type='application/json') service.any_service_call() assert "grant-type%3Aapikey" in responses.calls[0].request.body + def test_no_auth(): - try: - service = AnyServiceV1('2017-07-07') - service.request('GET', url='') - except ValueError as err: - assert str(err) == 'You must specify your IAM api key or username and password service credentials (Note: these are different from your IBM Cloud id)' - -def test_when_apikey_is_username(): - service1 = AnyServiceV1('2017-07-07', username='apikey', password='xxxxx') - assert service1.token_manager is not None - assert service1.iam_apikey == 'xxxxx' - assert service1.username is None - assert service1.password is None - assert service1.token_manager.iam_url == 'https://iam.cloud.ibm.com/identity/token' - - service2 = AnyServiceV1('2017-07-07', username='apikey', password='xxxxx', iam_url='https://iam.stage1.cloud.ibm.com/identity/token') - assert service2.token_manager is not None - assert service2.iam_apikey == 'xxxxx' - assert service2.username is None - assert service2.password is None - assert service2.token_manager.iam_url == 'https://iam.stage1.cloud.ibm.com/identity/token' + class MadeUp(object): + def __init__(self): + self.lazy = 'made up' + + with pytest.raises(ValueError) as err: + AnyServiceV1('2017-07-07', authenticator=MadeUp()) + assert str(err.value) == 'authenticator should be of type Authenticator' + + service = AnyServiceV1('2017-07-07') + assert service.authenticator is not None + assert isinstance(service.authenticator, Authenticator) + def test_set_username_and_password(): - service = AnyServiceV1('2017-07-07', username='hello', password='world') - assert service.username == 'hello' - assert service.password == 'world' - - service.set_username_and_password('hello', 'ibm') - assert service.username == 'hello' - assert service.password == 'ibm' - -def test_for_icp(): - service1 = AnyServiceV1('2017-07-07', username='apikey', password='icp-xxxx', url='service_url') - assert service1.token_manager is None - assert service1.iam_apikey is None - assert service1.username is not None - assert service1.password is not None - assert service1.url == 'service_url' - - service2 = AnyServiceV1('2017-07-07', username='apikey', password='icp-xxx', url='service_url') - assert service2.token_manager is None - assert service2.iam_apikey is None - assert service2.username is not None - assert service2.password is not None - assert service2.url == 'service_url' - - service3 = AnyServiceV1('2017-07-07', iam_apikey='icp-xxx') - assert service3.token_manager is None - assert service2.iam_apikey is None - assert service2.username is not None - assert service2.password is not None - - service4 = AnyServiceV1('2017-07-07', iam_access_token='lala') - assert service4.token_manager is not None - assert service4.username is None - assert service4.password is None - - service5 = AnyServiceV1('2017-07-07', api_key='haha') - assert service5.token_manager is not None - assert service5.username is None - assert service5.password is None - - service6 = AnyServiceV1('2017-07-07', api_key='icp-haha') - assert service6.token_manager is None - assert service6.username is not None - assert service6.password is not None - -def test_for_icp4d(): - service1 = AnyServiceV1('2017-07-07', username='hello', password='world', icp4d_url='service_url', authentication_type='icp4d') - assert service1.token_manager is not None - assert service1.iam_apikey is None - assert service1.username is not None - assert service1.password is not None - assert service1.icp4d_url == 'service_url' - assert isinstance(service1.token_manager, ICP4DTokenManager) - - service2 = AnyServiceV1('2017-07-07', icp4d_access_token='icp4d_access_token', icp4d_url='service_url') - assert service2.token_manager is not None - assert service2.iam_apikey is None - assert service2.username is None - assert service2.password is None - assert isinstance(service2.token_manager, ICP4DTokenManager) - - service3 = AnyServiceV1('2019-06-03', username='hello', password='world', icp4d_url='icp4d_url') - assert service3.username is not None - assert service3.password is not None - assert service3.token_manager is None - - service3.set_icp4d_access_token('icp4d_access_token') - assert service3.token_manager is not None - assert isinstance(service3.token_manager, ICP4DTokenManager) - -def test_disable_SSL_verification(): - service1 = AnyServiceV1('2017-07-07', username='apikey', password='icp-xxxx', url='service_url') - assert service1.verify is None - - service1.disable_SSL_verification() - assert service1.verify is False - - service2 = AnyServiceV1('2017-07-07', username='hello', password='world', authentication_type='icp4d', icp4d_url='icp4d_url') - assert service2.verify is None - service2.disable_SSL_verification() - assert service2.token_manager.verify is not None + basic_authenticator = BasicAuthenticator('my_username', 'my_password') + service = AnyServiceV1('2017-07-07', authenticator=basic_authenticator) + assert service.authenticator.username == 'my_username' + assert service.authenticator.password == 'my_password' + + basic_authenticator.set_username_and_password('hello', 'ibm') + assert service.authenticator.username == 'hello' + assert service.authenticator.password == 'ibm' + + +def test_for_cp4d(): + cp4d_authenticator = CP4DAuthenticator('my_username', 'my_password', + 'my_url') + service = AnyServiceV1('2017-07-07', authenticator=cp4d_authenticator) + assert service.authenticator.token_manager is not None + assert service.authenticator.token_manager.username == 'my_username' + assert service.authenticator.token_manager.password == 'my_password' + assert service.authenticator.token_manager.url == 'my_url/v1/preauth/validateAuth' + assert isinstance(service.authenticator.token_manager, CP4DTokenManager) + + +def test_disable_ssl_verification(): + service1 = AnyServiceV1('2017-07-07', disable_ssl_verification=True) + assert service1.disable_ssl_verification is True + + service1.set_disable_ssl_verification(False) + assert service1.disable_ssl_verification is False + + cp4d_authenticator = CP4DAuthenticator('my_username', 'my_password', + 'my_url') + service2 = AnyServiceV1('2017-07-07', authenticator=cp4d_authenticator) + assert service2.disable_ssl_verification is False + cp4d_authenticator.set_disable_ssl_verification(True) + assert service2.authenticator.token_manager.disable_ssl_verification is True + @responses.activate def test_http_head(): - service = AnyServiceV1('2018-11-20', username='username', password='password') + service = AnyServiceV1('2018-11-20') expectedHeaders = {'Test-Header1': 'value1', 'Test-Header2': 'value2'} - responses.add(responses.HEAD, - service.default_url, - status=200, - headers=expectedHeaders, - content_type=None) + responses.add( + responses.HEAD, + service.default_url, + status=200, + headers=expectedHeaders, + content_type=None) response = service.head_request() assert response is not None @@ -277,101 +232,103 @@ def test_http_head(): assert response.headers is not None assert response.headers == expectedHeaders + @responses.activate def test_response_with_no_body(): - service = AnyServiceV1('2018-11-20', username='username', password='password') - responses.add(responses.GET, - service.default_url, - status=200, - body=None) + service = AnyServiceV1('2018-11-20') + responses.add(responses.GET, service.default_url, status=200, body=None) response = service.any_service_call() assert response is not None assert len(responses.calls) == 1 assert response.get_result() is None -def test_has_bad_first_or_last_char(): - with pytest.raises(ValueError) as err: - AnyServiceV1('2018-11-20', username='{username}', password='password') - assert str(err.value) == 'The username shouldn\'t start or end with curly brackets or quotes. Be sure to remove any {} and \" characters surrounding your username' - - with pytest.raises(ValueError) as err: - AnyServiceV1('2018-11-20', username='username', password='{password}') - assert str(err.value) == 'The password shouldn\'t start or end with curly brackets or quotes. Be sure to remove any {} and \" characters surrounding your password' - - with pytest.raises(ValueError) as err: - AnyServiceV1('2018-11-20', iam_apikey='{apikey}') - assert str(err.value) == 'The credentials shouldn\'t start or end with curly brackets or quotes. Be sure to remove any {} and \" characters surrounding your credentials' +def test_has_bad_first_or_last_char(): with pytest.raises(ValueError) as err: - AnyServiceV1('2018-11-20', iam_apikey='apikey', url='"url"') - assert str(err.value) == 'The URL shouldn\'t start or end with curly brackets or quotes. Be sure to remove any {} and \" characters surrounding your URL' + basic_authenticator = BasicAuthenticator('{my_username}', 'my_password') + AnyServiceV1('2018-11-20', authenticator=basic_authenticator) + assert str( + err.value + ) == 'The username and password shouldn\'t start or end with curly brackets or quotes. Please remove any surrounding {, }, or \" characters.' - with pytest.raises(ValueError) as err: - service = AnyServiceV1('2018-11-20', iam_apikey='apikey', url='url') - service.set_iam_apikey('"wrong"') - assert str(err.value) == 'The credentials shouldn\'t start or end with curly brackets or quotes. Be sure to remove any {} and \" characters surrounding your credentials' - with pytest.raises(ValueError) as err: - service = AnyServiceV1('2018-11-20', iam_apikey='apikey', url='url') - service.set_url('"wrong"') - assert str(err.value) == 'The URL shouldn\'t start or end with curly brackets or quotes. Be sure to remove any {} and \" characters surrounding your URL' +def test_set_credential_based_on_type(): + file_path = os.path.join( + os.path.dirname(__file__), '../resources/ibm-credentials-iam.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + service = AnyServiceV1('2018-11-20') + assert service.authenticator is not None + assert service.authenticator.token_manager.apikey == '5678efgh' + del os.environ['IBM_CREDENTIALS_FILE'] - with pytest.raises(ValueError) as err: - service = AnyServiceV1('2018-11-20', username='hello', password='world') - service.set_username_and_password('"wrong"', 'password') - assert str(err.value) == 'The username shouldn\'t start or end with curly brackets or quotes. Be sure to remove any {} and \" characters surrounding your username' + file_path = os.path.join( + os.path.dirname(__file__), '../resources/ibm-credentials-basic.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + service2 = AnyServiceV1('2018-11-20') + assert service2.authenticator is not None + assert service2.authenticator.username == 'my_username' + del os.environ['IBM_CREDENTIALS_FILE'] - with pytest.raises(ValueError) as err: - service = AnyServiceV1('2018-11-20', username='hello', password='world') - service.set_username_and_password('hello', '"wrong"') - assert str(err.value) == 'The password shouldn\'t start or end with curly brackets or quotes. Be sure to remove any {} and \" characters surrounding your password' + file_path = os.path.join( + os.path.dirname(__file__), '../resources/ibm-credentials-cp4d.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + service3 = AnyServiceV1('2018-11-20') + assert service3.authenticator is not None + assert service3.authenticator.token_manager.username == 'my_username' + assert service3.authenticator.token_manager.password == 'my_password' + del os.environ['IBM_CREDENTIALS_FILE'] -def test_set_credential_based_on_type(): - file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials.env') + file_path = os.path.join( + os.path.dirname(__file__), '../resources/ibm-credentials-no-auth.env') os.environ['IBM_CREDENTIALS_FILE'] = file_path - service = AnyServiceV1('2018-11-20') - assert service.iam_apikey == '5678efgh' + service4 = AnyServiceV1('2018-11-20') + assert service4.authenticator is not None del os.environ['IBM_CREDENTIALS_FILE'] - service = AnyServiceV1('2018-11-20', username='test', password='test') - assert service.username == 'test' + file_path = os.path.join( + os.path.dirname(__file__), '../resources/ibm-credentials-bearer.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + service5 = AnyServiceV1('2018-11-20') + assert service5.authenticator is not None + assert service5.authenticator.bearer_token is not None + del os.environ['IBM_CREDENTIALS_FILE'] def test_vcap_credentials(): vcap_services = '{"test":[{"credentials":{ \ "url":"https://gateway.watsonplatform.net/compare-comply/api",\ "username":"bogus username", \ - "password":"bogus password", \ - "apikey":"bogus apikey",\ - "iam_access_token":"bogus iam_access_token",\ - "iam_apikey":"bogus iam_apikey"}}]}' + "password":"bogus password"}}]}' + os.environ['VCAP_SERVICES'] = vcap_services service = AnyServiceV1('2018-11-20') assert service.url == 'https://gateway.watsonplatform.net/compare-comply/api' - assert service.username == 'bogus username' - assert service.password == 'bogus password' - assert service.iam_apikey == 'bogus iam_apikey' - assert service.iam_access_token == 'bogus iam_access_token' + assert service.authenticator is not None + assert service.authenticator.username == 'bogus username' + assert service.authenticator.password == 'bogus password' del os.environ['VCAP_SERVICES'] vcap_services = '{"test":[{"credentials":{ \ "url":"https://gateway.watsonplatform.net/compare-comply/api",\ - "icp4d_url":"https://test/",\ - "icp4d_access_token":"bogus icp4d_access_token"}}]}' + "apikey":"bogus apikey"}}]}' + os.environ['VCAP_SERVICES'] = vcap_services service = AnyServiceV1('2018-11-20') - assert service.token_manager is not None - assert service.token_manager.user_access_token == 'bogus icp4d_access_token' + assert service.authenticator is not None + assert service.authenticator.token_manager.apikey == 'bogus apikey' del os.environ['VCAP_SERVICES'] @responses.activate def test_request_server_error(): - responses.add(responses.GET, - 'https://gateway.watsonplatform.net/test/api', - status=500, - body=json.dumps({'error': 'internal server error'}), - content_type='application/json') - service = AnyServiceV1('2018-11-20', username='username', password='password') + responses.add( + responses.GET, + 'https://gateway.watsonplatform.net/test/api', + status=500, + body=json.dumps({ + 'error': 'internal server error' + }), + content_type='application/json') + service = AnyServiceV1('2018-11-20') try: service.request('GET', url='') except ApiException as err: @@ -379,41 +336,58 @@ def test_request_server_error(): @responses.activate def test_request_success_json(): - responses.add(responses.GET, - 'https://gateway.watsonplatform.net/test/api', - status=200, - body=json.dumps({'foo': 'bar'}), - content_type='application/json') - service = AnyServiceV1('2018-11-20', username='username', password='password') + responses.add( + responses.GET, + 'https://gateway.watsonplatform.net/test/api', + status=200, + body=json.dumps({ + 'foo': 'bar' + }), + content_type='application/json') + service = AnyServiceV1('2018-11-20') + detailed_response = service.request('GET', url='', accept_json=True) + assert detailed_response.get_result() == {'foo': 'bar'} + + service = AnyServiceV1('2018-11-20', authenticator=BasicAuthenticator('my_username', 'my_password')) + service.set_default_headers({'test': 'header'}) + service.set_disable_ssl_verification(True) detailed_response = service.request('GET', url='', accept_json=True) assert detailed_response.get_result() == {'foo': 'bar'} @responses.activate def test_request_success_response(): - responses.add(responses.GET, - 'https://gateway.watsonplatform.net/test/api', - status=200, - body=json.dumps({'foo': 'bar'}), - content_type='application/json') - service = AnyServiceV1('2018-11-20', username='username', password='password') + responses.add( + responses.GET, + 'https://gateway.watsonplatform.net/test/api', + status=200, + body=json.dumps({ + 'foo': 'bar' + }), + content_type='application/json') + service = AnyServiceV1('2018-11-20') detailed_response = service.request('GET', url='') assert detailed_response.get_result().text == '{"foo": "bar"}' @responses.activate def test_request_fail_401(): - responses.add(responses.GET, - 'https://gateway.watsonplatform.net/test/api', - status=401, - body=json.dumps({'foo': 'bar'}), - content_type='application/json') - service = AnyServiceV1('2018-11-20', username='username', password='password') + responses.add( + responses.GET, + 'https://gateway.watsonplatform.net/test/api', + status=401, + body=json.dumps({ + 'foo': 'bar' + }), + content_type='application/json') + service = AnyServiceV1('2018-11-20') try: service.request('GET', url='') except ApiException as err: assert err.message == 'Unauthorized: Access is denied due to invalid credentials' def test_misc_methods(): + class MockModel(object): + def __init__(self, x=None): self.x = x @@ -431,7 +405,7 @@ def _from_dict(cls, _dict): return cls(**args) mock = MockModel('foo') - service = AnyServiceV1('2018-11-20', username='username', password='password') + service = AnyServiceV1('2018-11-20') model1 = service._convert_model(mock) assert model1 == {'x': 'foo'} @@ -444,41 +418,67 @@ def _from_dict(cls, _dict): assert res_str == 'default,123' def test_default_headers(): - service = AnyServiceV1('2018-11-20', username='username', password='password') + service = AnyServiceV1('2018-11-20') service.set_default_headers({'xxx': 'yyy'}) assert service.default_headers == {'xxx': 'yyy'} with pytest.raises(TypeError): service.set_default_headers('xxx') +def test_set_url(): + service = AnyServiceV1('2018-11-20') + with pytest.raises(ValueError) as err: + service.set_url('{url}') + assert str(err.value) == 'The url shouldn\'t start or end with curly brackets or quotes. Be sure to remove any {} and \" characters surrounding your url' + + service.set_url('my_url') + +def test_get_authenticator(): + auth = BasicAuthenticator('my_username', 'my_password') + service = AnyServiceV1('2018-11-20', authenticator=auth) + assert service.get_authenticator() is not None + @responses.activate def test_user_agent_header(): - service = AnyServiceV1('2018-11-20', username='username', password='password') - user_agent_header = service.get_user_agent_header() + service = AnyServiceV1('2018-11-20') + user_agent_header = service.user_agent_header assert user_agent_header is not None assert user_agent_header['User-Agent'] is not None - responses.add(responses.GET, - 'https://gateway.watsonplatform.net/test/api', - status=200, - body=json.dumps({'foo': 'bar'}), - content_type='application/json') - response = service.request('GET', url='', headers={'user-agent': 'my_user_agent'}) - assert response.get_result().request.headers.__getitem__('user-agent') == 'my_user_agent' + responses.add( + responses.GET, + 'https://gateway.watsonplatform.net/test/api', + status=200, + body=json.dumps({ + 'foo': 'bar' + }), + content_type='application/json') + response = service.request( + 'GET', url='', headers={ + 'user-agent': 'my_user_agent' + }) + assert response.get_result().request.headers.__getitem__( + 'user-agent') == 'my_user_agent' response = service.request('GET', url='', headers=None) - assert response.get_result().request.headers.__getitem__('user-agent') == user_agent_header['User-Agent'] + assert response.get_result().request.headers.__getitem__( + 'user-agent') == user_agent_header['User-Agent'] @responses.activate def test_files(): - service = AnyServiceV1('2018-11-20', username='username', password='password') + service = AnyServiceV1('2018-11-20') - responses.add(responses.GET, - 'https://gateway.watsonplatform.net/test/api', - status=200, - body=json.dumps({'foo': 'bar'}), - content_type='application/json') + responses.add( + responses.GET, + 'https://gateway.watsonplatform.net/test/api', + status=200, + body=json.dumps({ + 'foo': 'bar' + }), + content_type='application/json') form_data = {} - file = open(os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials.env'), 'r') + file = open( + os.path.join( + os.path.dirname(__file__), '../resources/ibm-credentials-iam.env'), 'r') form_data['file1'] = (None, file, 'application/octet-stream') form_data['string1'] = (None, 'hello', 'text.plain') service.request('GET', url='', headers={'X-opt-out': True}, files=form_data) diff --git a/test/test_basic_authenticator.py b/test/test_basic_authenticator.py new file mode 100644 index 0000000..cef7e0a --- /dev/null +++ b/test/test_basic_authenticator.py @@ -0,0 +1,39 @@ +import pytest +from ibm_cloud_sdk_core.authenticators import BasicAuthenticator + +def test_basic_authenticator(): + authenticator = BasicAuthenticator('my_username', 'my_password') + assert authenticator is not None + assert authenticator.username == 'my_username' + assert authenticator.password == 'my_password' + + authenticator.set_username('bogus') + authenticator.set_password('bogus') + + (username, password) = authenticator.authenticate() + assert username == 'bogus' + assert password == 'bogus' + + authenticator.set_username_and_password('wonder', 'woman') + assert authenticator.username == 'wonder' + assert authenticator.password == 'woman' + + assert authenticator._is_basic_authentication() is True + assert authenticator._is_bearer_authentication() is False + +def test_basic_authenticator_validate_failed(): + with pytest.raises(ValueError) as err: + BasicAuthenticator('my_username', None) + assert str(err.value) == 'The username and password shouldn\'t be None.' + + with pytest.raises(ValueError) as err: + BasicAuthenticator(None, 'my_password') + assert str(err.value) == 'The username and password shouldn\'t be None.' + + with pytest.raises(ValueError) as err: + BasicAuthenticator('{my_username}', 'my_password') + assert str(err.value) == 'The username and password shouldn\'t start or end with curly brackets or quotes. Please remove any surrounding {, }, or \" characters.' + + with pytest.raises(ValueError) as err: + BasicAuthenticator('my_username', '{my_password}') + assert str(err.value) == 'The username and password shouldn\'t start or end with curly brackets or quotes. Please remove any surrounding {, }, or \" characters.' diff --git a/test/test_bearer_authenticator.py b/test/test_bearer_authenticator.py new file mode 100644 index 0000000..c08e6b4 --- /dev/null +++ b/test/test_bearer_authenticator.py @@ -0,0 +1,28 @@ +import pytest +import responses +import time +import jwt +import json +from ibm_cloud_sdk_core.authenticators import BearerAuthenticator + +def test_bearer_authenticator(): + authenticator = BearerAuthenticator('my_bearer_token') + assert authenticator is not None + assert authenticator.bearer_token == 'my_bearer_token' + + authenticator.set_bearer_token('james bond') + assert authenticator.bearer_token == 'james bond' + + assert authenticator._is_basic_authentication() is False + assert authenticator._is_bearer_authentication() is True + + assert authenticator.authenticate() == 'Bearer james bond' + +def test_bearer_validate_failed(): + with pytest.raises(ValueError) as err: + BearerAuthenticator(None) + assert str(err.value) == 'The bearer token shouldn\'t be None.' + + with pytest.raises(ValueError) as err: + BearerAuthenticator('{my_bearer_token}') + assert str(err.value) == 'The bearer token shouldn\'t start or end with curly brackets or quotes. Please remove any surrounding {, }, or \" characters.' diff --git a/test/test_cp4d_authenticator.py b/test/test_cp4d_authenticator.py new file mode 100644 index 0000000..fd50c97 --- /dev/null +++ b/test/test_cp4d_authenticator.py @@ -0,0 +1,100 @@ +import pytest +import responses +import time +import jwt +import json +from ibm_cloud_sdk_core.authenticators import CP4DAuthenticator + +def test_iam_authenticator(): + authenticator = CP4DAuthenticator('my_username', 'my_password', 'http://my_url') + assert authenticator is not None + assert authenticator.token_manager.url == 'http://my_url/v1/preauth/validateAuth' + assert authenticator.token_manager.username == 'my_username' + assert authenticator.token_manager.password == 'my_password' + assert authenticator.token_manager.disable_ssl_verification is False + assert authenticator.token_manager.headers is None + assert authenticator.token_manager.proxies is None + + authenticator.set_url('new_url') + assert authenticator.token_manager.url == 'new_url' + + authenticator.set_username('tom') + assert authenticator.token_manager.username == 'tom' + authenticator.set_password('jerry') + assert authenticator.token_manager.password == 'jerry' + + authenticator.set_disable_ssl_verification(True) + assert authenticator.token_manager.disable_ssl_verification is True + + with pytest.raises(TypeError) as err: + authenticator.set_headers('dummy') + assert str(err.value) == 'headers must be a dictionary' + + authenticator.set_headers({'dummy': 'headers'}) + assert authenticator.token_manager.headers == {'dummy': 'headers'} + + with pytest.raises(TypeError) as err: + authenticator.set_proxies('dummy') + assert str(err.value) == 'proxies must be a dictionary' + + authenticator.set_proxies({'dummy': 'proxies'}) + assert authenticator.token_manager.proxies == {'dummy': 'proxies'} + + assert authenticator._is_basic_authentication() is False + assert authenticator._is_bearer_authentication() is True + + +def test_iam_authenticator_validate_failed(): + with pytest.raises(ValueError) as err: + CP4DAuthenticator('my_username', None, 'my_url') + assert str(err.value) == 'The username and password shouldn\'t be None.' + + with pytest.raises(ValueError) as err: + CP4DAuthenticator(None, 'my_password', 'my_url') + assert str(err.value) == 'The username and password shouldn\'t be None.' + + with pytest.raises(ValueError) as err: + CP4DAuthenticator('{my_username}', 'my_password', 'my_url') + assert str(err.value) == 'The username and password shouldn\'t start or end with curly brackets or quotes. Please remove any surrounding {, }, or \" characters.' + + with pytest.raises(ValueError) as err: + CP4DAuthenticator('my_username', '{my_password}', 'my_url') + assert str(err.value) == 'The username and password shouldn\'t start or end with curly brackets or quotes. Please remove any surrounding {, }, or \" characters.' + + with pytest.raises(ValueError) as err: + CP4DAuthenticator('my_username', 'my_password', '{my_url}') + assert str(err.value) == 'The url shouldn\'t start or end with curly brackets or quotes. Please remove any surrounding {, }, or \" characters.' + + +@responses.activate +def test_get_token(): + url = "https://test" + access_token_layout = { + "username": "dummy", + "role": "Admin", + "permissions": [ + "administrator", + "manage_catalog" + ], + "sub": "admin", + "iss": "sss", + "aud": "sss", + "uid": "sss", + "iat": 1559324664, + "exp": 1559324664 + } + + access_token = jwt.encode(access_token_layout, + 'secret', algorithm='HS256', + headers={'kid': '230498151c214b788dd97f22b85410a5'}).decode('utf-8') + response = { + "accessToken": access_token, + "token_type": "Bearer", + "expires_in": 3600, + "expiration": 1524167011, + "refresh_token": "jy4gl91BQ" + } + responses.add(responses.GET, url + '/v1/preauth/validateAuth', body=json.dumps(response), status=200) + + authenticator = CP4DAuthenticator('my_username', 'my_password', url) + assert authenticator.authenticate() is not None \ No newline at end of file diff --git a/test/test_icp4d_token_manager.py b/test/test_cp4d_token_manager.py similarity index 87% rename from test/test_icp4d_token_manager.py rename to test/test_cp4d_token_manager.py index 4cab236..e7dc164 100644 --- a/test/test_icp4d_token_manager.py +++ b/test/test_cp4d_token_manager.py @@ -1,7 +1,7 @@ import responses import jwt import json -from ibm_cloud_sdk_core import ICP4DTokenManager +from ibm_cloud_sdk_core import CP4DTokenManager @responses.activate def test_request_token(): @@ -33,8 +33,8 @@ def test_request_token(): } responses.add(responses.GET, url + '/v1/preauth/validateAuth', body=json.dumps(response), status=200) - token_manager = ICP4DTokenManager(url, "username", "password") - token_manager.disable_SSL_verification(True) + token_manager = CP4DTokenManager("username", "password", url) + token_manager.set_disable_ssl_verification(True) token = token_manager.get_token() assert responses.calls[0].request.url == url + '/v1/preauth/validateAuth' diff --git a/test/test_iam_authenticator.py b/test/test_iam_authenticator.py new file mode 100644 index 0000000..20c711b --- /dev/null +++ b/test/test_iam_authenticator.py @@ -0,0 +1,107 @@ +import pytest +import responses +import time +import jwt +import json +from ibm_cloud_sdk_core.authenticators import IAMAuthenticator + + +def test_iam_authenticator(): + authenticator = IAMAuthenticator('my_apikey') + assert authenticator is not None + assert authenticator.token_manager.url == 'https://iam.cloud.ibm.com/identity/token' + assert authenticator.token_manager.client_id is None + assert authenticator.token_manager.client_secret is None + assert authenticator.token_manager.disable_ssl_verification is False + assert authenticator.token_manager.headers is None + assert authenticator.token_manager.proxies is None + assert authenticator.token_manager.apikey == 'my_apikey' + + authenticator.set_apikey('bogus') + assert authenticator.token_manager.apikey == 'bogus' + + authenticator.set_url('new_url') + assert authenticator.token_manager.url == 'new_url' + + authenticator.set_authorization_info('tom', 'jerry') + assert authenticator.token_manager.client_id == 'tom' + assert authenticator.token_manager.client_secret == 'jerry' + + authenticator.set_disable_ssl_verification(True) + assert authenticator.token_manager.disable_ssl_verification is True + + with pytest.raises(TypeError) as err: + authenticator.set_headers('dummy') + assert str(err.value) == 'headers must be a dictionary' + + authenticator.set_headers({'dummy': 'headers'}) + assert authenticator.token_manager.headers == {'dummy': 'headers'} + + with pytest.raises(TypeError) as err: + authenticator.set_proxies('dummy') + assert str(err.value) == 'proxies must be a dictionary' + + authenticator.set_proxies({'dummy': 'proxies'}) + assert authenticator.token_manager.proxies == {'dummy': 'proxies'} + + assert authenticator._is_basic_authentication() is False + assert authenticator._is_bearer_authentication() is True + + +def test_iam_authenticator_validate_failed(): + with pytest.raises(ValueError) as err: + IAMAuthenticator(None) + assert str(err.value) == 'The apikey shouldn\'t be None.' + + with pytest.raises(ValueError) as err: + IAMAuthenticator('{apikey}') + assert str( + err.value + ) == 'The apikey shouldn\'t start or end with curly brackets or quotes. Please remove any surrounding {, }, or \" characters.' + + with pytest.raises(ValueError) as err: + IAMAuthenticator('my_apikey', client_id='my_client_id') + assert str( + err.value) == 'Both client id and client secret should be initialized.' + + with pytest.raises(ValueError) as err: + IAMAuthenticator('my_apikey', client_secret='my_client_secret') + assert str( + err.value) == 'Both client id and client secret should be initialized.' + + +@responses.activate +def test_get_token(): + url = "https://iam.cloud.ibm.com/identity/token" + access_token_layout = { + "username": "dummy", + "role": "Admin", + "permissions": ["administrator", "manage_catalog"], + "sub": "admin", + "iss": "sss", + "aud": "sss", + "uid": "sss", + "iat": 1559324664, + "exp": 1559324664 + } + + access_token = jwt.encode( + access_token_layout, + 'secret', + algorithm='HS256', + headers={ + 'kid': '230498151c214b788dd97f22b85410a5' + }) + access_token = access_token.decode('utf-8') + response = { + "access_token": access_token, + "token_type": "Bearer", + "expires_in": 3600, + "expiration": 1524167011, + "refresh_token": "jy4gl91BQ" + } + responses.add( + responses.POST, url=url, body=json.dumps(response), status=200) + + authenticator = IAMAuthenticator('my_apikey') + assert authenticator.authenticate() is not None diff --git a/test/test_iam_token_manager.py b/test/test_iam_token_manager.py index 34c3385..a0fd5e3 100644 --- a/test/test_iam_token_manager.py +++ b/test/test_iam_token_manager.py @@ -23,31 +23,6 @@ def get_access_token(): access_token = jwt.encode(access_token_layout, 'secret', algorithm='HS256', headers={'kid': '230498151c214b788dd97f22b85410a5'}) return access_token.decode('utf-8') -@responses.activate -def test_get_token(): - iam_url = "https://iam.cloud.ibm.com/identity/token" - access_token = get_access_token() - response = { - "access_token": access_token, - "token_type": "Bearer", - "expires_in": 3600, - "expiration": 1524167011, - "refresh_token": "jy4gl91BQ" - } - responses.add(responses.POST, url=iam_url, body=json.dumps(response), status=200) - - token_manager = IAMTokenManager(None, "user_access_token") - token_manager.set_iam_apikey("iam_apikey") - token_manager.set_iam_url("iam_apikey") - token = token_manager.get_token() - assert token == "user_access_token" - - token_manager.set_access_token(None) - token = token_manager.get_token() - assert responses.calls[0].request.url == iam_url - assert len(responses.calls) == 1 - assert token_manager.token_info.get('access_token') == access_token - @responses.activate def test_request_token_auth_default(): iam_url = "https://iam.cloud.ibm.com/identity/token" @@ -61,7 +36,7 @@ def test_request_token_auth_default(): default_auth_header = 'Basic Yng6Yng=' responses.add(responses.POST, url=iam_url, body=response, status=200) - token_manager = IAMTokenManager("iam_apikey", "iam_access_token") + token_manager = IAMTokenManager("apikey") token_manager.request_token() assert len(responses.calls) == 1 @@ -82,7 +57,7 @@ def test_request_token_auth_in_ctor(): default_auth_header = 'Basic Yng6Yng=' responses.add(responses.POST, url=iam_url, body=response, status=200) - token_manager = IAMTokenManager("iam_apikey", "iam_access_token", iam_url, 'foo', 'bar') + token_manager = IAMTokenManager("apikey", iam_url, 'foo', 'bar') token_manager.request_token() assert len(responses.calls) == 1 @@ -103,7 +78,7 @@ def test_request_token_auth_in_ctor_client_id_only(): default_auth_header = 'Basic Yng6Yng=' responses.add(responses.POST, url=iam_url, body=response, status=200) - token_manager = IAMTokenManager("iam_apikey", "iam_access_token", iam_url, 'foo') + token_manager = IAMTokenManager("iam_apikey", iam_url, 'foo') token_manager.request_token() assert len(responses.calls) == 1 @@ -124,7 +99,7 @@ def test_request_token_auth_in_ctor_secret_only(): default_auth_header = 'Basic Yng6Yng=' responses.add(responses.POST, url=iam_url, body=response, status=200) - token_manager = IAMTokenManager("iam_apikey", "iam_access_token", iam_url, None, 'bar') + token_manager = IAMTokenManager("iam_apikey", iam_url, None, 'bar') token_manager.request_token() assert len(responses.calls) == 1 @@ -146,7 +121,7 @@ def test_request_token_auth_in_setter(): responses.add(responses.POST, url=iam_url, body=response, status=200) token_manager = IAMTokenManager("iam_apikey") - token_manager.set_iam_authorization_info('foo', 'bar') + token_manager.set_authorization_info('foo', 'bar') token_manager.request_token() assert len(responses.calls) == 1 @@ -168,7 +143,7 @@ def test_request_token_auth_in_setter_client_id_only(): responses.add(responses.POST, url=iam_url, body=response, status=200) token_manager = IAMTokenManager("iam_apikey") - token_manager.set_iam_authorization_info('foo', None) + token_manager.set_authorization_info('foo', None) token_manager.request_token() assert len(responses.calls) == 1 @@ -190,7 +165,8 @@ def test_request_token_auth_in_setter_secret_only(): responses.add(responses.POST, url=iam_url, body=response, status=200) token_manager = IAMTokenManager("iam_apikey") - token_manager.set_iam_authorization_info(None, 'bar') + token_manager.set_authorization_info(None, 'bar') + token_manager.set_headers({'user':'header'}) token_manager.request_token() assert len(responses.calls) == 1 diff --git a/test/test_jwt_token_manager.py b/test/test_jwt_token_manager.py index f330688..02daee1 100644 --- a/test/test_jwt_token_manager.py +++ b/test/test_jwt_token_manager.py @@ -24,8 +24,8 @@ def request_token(self): "iss": "sss", "aud": "sss", "uid": "sss", - "iat": current_time + 3600, - "exp": current_time + "iat": current_time, + "exp": current_time + 3600 } access_token = jwt.encode(token_layout, 'secret', algorithm='HS256', headers={'kid': '230498151c214b788dd97f22b85410a5'}) @@ -40,17 +40,11 @@ def request_token(self): def test_get_token(): url = "https://iam.cloud.ibm.com/identity/token" - # Case 1: - token_manager = JWTTokenManagerMockImpl(url, 'user_access_token') + token_manager = JWTTokenManagerMockImpl(url) token = token_manager.get_token() - assert token == token_manager.user_access_token - - # Case 2a: - token_manager.set_access_token(None) - token_manager.get_token() assert token_manager.token_info.get('expires_in') == 3600 + assert token_manager._is_token_expired() is False - # Case 3, valid token present token_manager.token_info = {"access_token": "old_dummy", "token_type": "Bearer", "expires_in": 3600, @@ -60,10 +54,8 @@ def test_get_token(): token = token_manager.get_token() assert token == "old_dummy" - # Case 2b, expired token: - token_manager.expire_time = int(time.time()) - (13 * 3600) - token_manager.time_to_live = 43200 - + # expired token: + token_manager.time_for_new_token = token_manager._get_current_time() - 300 token = token_manager.get_token() assert token != "old_dummy" assert token_manager.request_count == 2 @@ -71,10 +63,9 @@ def test_get_token(): def test_is_token_expired(): token_manager = JWTTokenManagerMockImpl(None, None) assert token_manager._is_token_expired() is True - token_manager.time_to_live = 3600 - token_manager.expire_time = int(time.time()) + 6000 + token_manager.time_for_new_token = token_manager._get_current_time() + 3600 assert token_manager._is_token_expired() is False - token_manager.expire_time = int(time.time()) - 3600 + token_manager.time_for_new_token = token_manager._get_current_time() - 3600 assert token_manager._is_token_expired() def test_not_implemented_error(): @@ -82,3 +73,8 @@ def test_not_implemented_error(): token_manager = JWTTokenManager(None, None) token_manager.request_token() assert str(err.value) == 'request_token MUST be overridden by a subclass of JWTTokenManager.' + +def test_disable_ssl_verification(): + token_manager = JWTTokenManagerMockImpl('https://iam.cloud.ibm.com/identity/token') + token_manager.set_disable_ssl_verification(True) + assert token_manager.disable_ssl_verification is True diff --git a/test/test_no_auth_authenticator.py b/test/test_no_auth_authenticator.py new file mode 100644 index 0000000..68b9942 --- /dev/null +++ b/test/test_no_auth_authenticator.py @@ -0,0 +1,18 @@ +import pytest +import responses +import time +import jwt +import json +from ibm_cloud_sdk_core.authenticators import NoAuthAuthenticator + +def test_no_auth_authenticator(): + authenticator = NoAuthAuthenticator() + assert authenticator is not None + assert authenticator.authentication_type == 'noauth' + + assert authenticator._is_basic_authentication() is False + assert authenticator._is_bearer_authentication() is False + + authenticator.validate() + authenticator.authenticate() +