From 858fa9c4dc854bcd90040706ca246afb5ef19b44 Mon Sep 17 00:00:00 2001 From: Chuan Ren Date: Wed, 16 Feb 2022 08:57:15 -0800 Subject: [PATCH 1/7] Port identity pool credentials --- google/auth/_default.py | 6 + google/auth/pluggable.py | 287 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 293 insertions(+) create mode 100644 google/auth/pluggable.py diff --git a/google/auth/_default.py b/google/auth/_default.py index 34edda046..68d7cc1a1 100644 --- a/google/auth/_default.py +++ b/google/auth/_default.py @@ -325,6 +325,12 @@ def _get_external_account_credentials( credentials = aws.Credentials.from_info( info, scopes=scopes, default_scopes=default_scopes ) + elif info.get("credential_source").get("executable") is not None: + from google.auth import pluggable + + credentials = pluggable.Credentials.from_info( + info, scopes=scopes, default_scopes=default_scopes + ) else: try: # Check if configuration corresponds to an Identity Pool credentials. diff --git a/google/auth/pluggable.py b/google/auth/pluggable.py new file mode 100644 index 000000000..0e23728cf --- /dev/null +++ b/google/auth/pluggable.py @@ -0,0 +1,287 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Identity Pool Credentials. + +This module provides credentials to access Google Cloud resources from on-prem +or non-Google Cloud platforms which support external credentials (e.g. OIDC ID +tokens) retrieved from local file locations or local servers. This includes +Microsoft Azure and OIDC identity providers (e.g. K8s workloads registered with +Hub with Hub workload identity enabled). + +These credentials are recommended over the use of service account credentials +in on-prem/non-Google Cloud platforms as they do not involve the management of +long-live service account private keys. + +Identity Pool Credentials are initialized using external_account +arguments which are typically loaded from an external credentials file or +an external credentials URL. Unlike other Credentials that can be initialized +with a list of explicit arguments, secrets or credentials, external account +clients use the environment and hints/guidelines provided by the +external_account JSON file to retrieve credentials and exchange them for Google +access tokens. +""" + +try: + from collections.abc import Mapping +# Python 2.7 compatibility +except ImportError: # pragma: NO COVER + from collections import Mapping +import io +import json +import os + +from google.auth import _helpers +from google.auth import exceptions +from google.auth import external_account + + +class Credentials(external_account.Credentials): + """External account credentials sourced from files and URLs.""" + + def __init__( + self, + audience, + subject_token_type, + token_url, + credential_source, + service_account_impersonation_url=None, + client_id=None, + client_secret=None, + quota_project_id=None, + scopes=None, + default_scopes=None, + workforce_pool_user_project=None, + ): + """Instantiates an external account credentials object from a file/URL. + + Args: + audience (str): The STS audience field. + subject_token_type (str): The subject token type. + token_url (str): The STS endpoint URL. + credential_source (Mapping): The credential source dictionary used to + provide instructions on how to retrieve external credential to be + exchanged for Google access tokens. + + Example credential_source for url-sourced credential:: + + { + "url": "http://www.example.com", + "format": { + "type": "json", + "subject_token_field_name": "access_token", + }, + "headers": {"foo": "bar"}, + } + + Example credential_source for file-sourced credential:: + + { + "file": "/path/to/token/file.txt" + } + + service_account_impersonation_url (Optional[str]): The optional service account + impersonation getAccessToken URL. + client_id (Optional[str]): The optional client ID. + client_secret (Optional[str]): The optional client secret. + quota_project_id (Optional[str]): The optional quota project ID. + scopes (Optional[Sequence[str]]): Optional scopes to request during the + authorization grant. + default_scopes (Optional[Sequence[str]]): Default scopes passed by a + Google client library. Use 'scopes' for user-defined scopes. + workforce_pool_user_project (Optona[str]): The optional workforce pool user + project number when the credential corresponds to a workforce pool and not + a workload identity pool. The underlying principal must still have + serviceusage.services.use IAM permission to use the project for + billing/quota. + + Raises: + google.auth.exceptions.RefreshError: If an error is encountered during + access token retrieval logic. + ValueError: For invalid parameters. + + .. note:: Typically one of the helper constructors + :meth:`from_file` or + :meth:`from_info` are used instead of calling the constructor directly. + """ + + super(Credentials, self).__init__( + audience=audience, + subject_token_type=subject_token_type, + token_url=token_url, + credential_source=credential_source, + service_account_impersonation_url=service_account_impersonation_url, + client_id=client_id, + client_secret=client_secret, + quota_project_id=quota_project_id, + scopes=scopes, + default_scopes=default_scopes, + workforce_pool_user_project=workforce_pool_user_project, + ) + if not isinstance(credential_source, Mapping): + self._credential_source_file = None + self._credential_source_url = None + else: + self._credential_source_file = credential_source.get("file") + self._credential_source_url = credential_source.get("url") + self._credential_source_headers = credential_source.get("headers") + credential_source_format = credential_source.get("format", {}) + # Get credential_source format type. When not provided, this + # defaults to text. + self._credential_source_format_type = ( + credential_source_format.get("type") or "text" + ) + # environment_id is only supported in AWS or dedicated future external + # account credentials. + if "environment_id" in credential_source: + raise ValueError( + "Invalid Identity Pool credential_source field 'environment_id'" + ) + if self._credential_source_format_type not in ["text", "json"]: + raise ValueError( + "Invalid credential_source format '{}'".format( + self._credential_source_format_type + ) + ) + # For JSON types, get the required subject_token field name. + if self._credential_source_format_type == "json": + self._credential_source_field_name = credential_source_format.get( + "subject_token_field_name" + ) + if self._credential_source_field_name is None: + raise ValueError( + "Missing subject_token_field_name for JSON credential_source format" + ) + else: + self._credential_source_field_name = None + + if self._credential_source_file and self._credential_source_url: + raise ValueError( + "Ambiguous credential_source. 'file' is mutually exclusive with 'url'." + ) + if not self._credential_source_file and not self._credential_source_url: + raise ValueError( + "Missing credential_source. A 'file' or 'url' must be provided." + ) + + @_helpers.copy_docstring(external_account.Credentials) + def retrieve_subject_token(self, request): + return self._parse_token_data( + self._get_token_data(request), + self._credential_source_format_type, + self._credential_source_field_name, + ) + + def _get_token_data(self, request): + if self._credential_source_file: + return self._get_file_data(self._credential_source_file) + else: + return self._get_url_data( + request, self._credential_source_url, self._credential_source_headers + ) + + def _get_file_data(self, filename): + if not os.path.exists(filename): + raise exceptions.RefreshError("File '{}' was not found.".format(filename)) + + with io.open(filename, "r", encoding="utf-8") as file_obj: + return file_obj.read(), filename + + def _get_url_data(self, request, url, headers): + response = request(url=url, method="GET", headers=headers) + + # support both string and bytes type response.data + response_body = ( + response.data.decode("utf-8") + if hasattr(response.data, "decode") + else response.data + ) + + if response.status != 200: + raise exceptions.RefreshError( + "Unable to retrieve Identity Pool subject token", response_body + ) + + return response_body, url + + def _parse_token_data( + self, token_content, format_type="text", subject_token_field_name=None + ): + content, filename = token_content + if format_type == "text": + token = content + else: + try: + # Parse file content as JSON. + response_data = json.loads(content) + # Get the subject_token. + token = response_data[subject_token_field_name] + except (KeyError, ValueError): + raise exceptions.RefreshError( + "Unable to parse subject_token from JSON file '{}' using key '{}'".format( + filename, subject_token_field_name + ) + ) + if not token: + raise exceptions.RefreshError( + "Missing subject_token in the credential_source file" + ) + return token + + @classmethod + def from_info(cls, info, **kwargs): + """Creates an Identity Pool Credentials instance from parsed external account info. + + Args: + info (Mapping[str, str]): The Identity Pool external account info in Google + format. + kwargs: Additional arguments to pass to the constructor. + + Returns: + google.auth.identity_pool.Credentials: The constructed + credentials. + + Raises: + ValueError: For invalid parameters. + """ + return cls( + audience=info.get("audience"), + subject_token_type=info.get("subject_token_type"), + token_url=info.get("token_url"), + service_account_impersonation_url=info.get( + "service_account_impersonation_url" + ), + client_id=info.get("client_id"), + client_secret=info.get("client_secret"), + credential_source=info.get("credential_source"), + quota_project_id=info.get("quota_project_id"), + workforce_pool_user_project=info.get("workforce_pool_user_project"), + **kwargs + ) + + @classmethod + def from_file(cls, filename, **kwargs): + """Creates an IdentityPool Credentials instance from an external account json file. + + Args: + filename (str): The path to the IdentityPool external account json file. + kwargs: Additional arguments to pass to the constructor. + + Returns: + google.auth.identity_pool.Credentials: The constructed + credentials. + """ + with io.open(filename, "r", encoding="utf-8") as json_file: + data = json.load(json_file) + return cls.from_info(data, **kwargs) From f3c3c622bac1a21816206247e693b0e91f5f2a2d Mon Sep 17 00:00:00 2001 From: Chuan Ren Date: Wed, 23 Feb 2022 16:17:45 -0800 Subject: [PATCH 2/7] access_token retrieved --- google/auth/pluggable.py | 126 +++++++++++---------------------------- 1 file changed, 35 insertions(+), 91 deletions(-) diff --git a/google/auth/pluggable.py b/google/auth/pluggable.py index 0e23728cf..c6c5d57b6 100644 --- a/google/auth/pluggable.py +++ b/google/auth/pluggable.py @@ -41,11 +41,13 @@ import io import json import os +import subprocess from google.auth import _helpers from google.auth import exceptions from google.auth import external_account +EXECUTABLE_SUPPORTED_MAX_VERSION = 1 class Credentials(external_account.Credentials): """External account credentials sourced from files and URLs.""" @@ -130,114 +132,56 @@ def __init__( workforce_pool_user_project=workforce_pool_user_project, ) if not isinstance(credential_source, Mapping): - self._credential_source_file = None + self._credential_source_executable = None self._credential_source_url = None else: - self._credential_source_file = credential_source.get("file") - self._credential_source_url = credential_source.get("url") - self._credential_source_headers = credential_source.get("headers") - credential_source_format = credential_source.get("format", {}) - # Get credential_source format type. When not provided, this - # defaults to text. - self._credential_source_format_type = ( - credential_source_format.get("type") or "text" - ) + self._credential_source_executable = credential_source.get("executable") + self._credential_source_executable_command = self._credential_source_executable.get("command") + self._credential_source_executable_timeout_millis = self._credential_source_executable.get("timeout_millis") + self._credential_source_executable_output_file = self._credential_source_executable.get("output_file") + # environment_id is only supported in AWS or dedicated future external # account credentials. if "environment_id" in credential_source: raise ValueError( "Invalid Identity Pool credential_source field 'environment_id'" ) - if self._credential_source_format_type not in ["text", "json"]: - raise ValueError( - "Invalid credential_source format '{}'".format( - self._credential_source_format_type - ) - ) - # For JSON types, get the required subject_token field name. - if self._credential_source_format_type == "json": - self._credential_source_field_name = credential_source_format.get( - "subject_token_field_name" - ) - if self._credential_source_field_name is None: - raise ValueError( - "Missing subject_token_field_name for JSON credential_source format" - ) - else: - self._credential_source_field_name = None - if self._credential_source_file and self._credential_source_url: - raise ValueError( - "Ambiguous credential_source. 'file' is mutually exclusive with 'url'." - ) - if not self._credential_source_file and not self._credential_source_url: + if not self._credential_source_executable: raise ValueError( - "Missing credential_source. A 'file' or 'url' must be provided." + "Missing credential_source. A 'excutable' must be provided." ) @_helpers.copy_docstring(external_account.Credentials) def retrieve_subject_token(self, request): - return self._parse_token_data( - self._get_token_data(request), - self._credential_source_format_type, - self._credential_source_field_name, - ) - - def _get_token_data(self, request): - if self._credential_source_file: - return self._get_file_data(self._credential_source_file) - else: - return self._get_url_data( - request, self._credential_source_url, self._credential_source_headers - ) - - def _get_file_data(self, filename): - if not os.path.exists(filename): - raise exceptions.RefreshError("File '{}' was not found.".format(filename)) - - with io.open(filename, "r", encoding="utf-8") as file_obj: - return file_obj.read(), filename - - def _get_url_data(self, request, url, headers): - response = request(url=url, method="GET", headers=headers) - - # support both string and bytes type response.data - response_body = ( - response.data.decode("utf-8") - if hasattr(response.data, "decode") - else response.data - ) - - if response.status != 200: - raise exceptions.RefreshError( - "Unable to retrieve Identity Pool subject token", response_body + env_allow_executables = os.environ.get('GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES') + if env_allow_executables is None or env_allow_executables != '1': + raise ValueError( + "Executables need to be explicitly allowed to run." ) - - return response_body, url - - def _parse_token_data( - self, token_content, format_type="text", subject_token_field_name=None - ): - content, filename = token_content - if format_type == "text": - token = content + + os.environ["GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE"] = self._audience + os.environ["GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE"] = self._subject_token_type + os.environ["GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL"] = "byoid-test@cicpclientproj.iam.gserviceaccount.com" + os.environ["GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE"] = self._credential_source_executable_output_file + os.environ["GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE"] = "0" + result = subprocess.run(self._credential_source_executable_command.split(), capture_output=True) # todo: inject envs + if result.returncode != 0: + # TODO: raise error + print("error") else: - try: - # Parse file content as JSON. - response_data = json.loads(content) - # Get the subject_token. - token = response_data[subject_token_field_name] - except (KeyError, ValueError): - raise exceptions.RefreshError( - "Unable to parse subject_token from JSON file '{}' using key '{}'".format( - filename, subject_token_field_name - ) + data = result.stdout.decode('utf-8') + response = json.loads(data) + if not response['success']: + raise ValueError( + "TODO: response error." ) - if not token: - raise exceptions.RefreshError( - "Missing subject_token in the credential_source file" - ) - return token + elif response['version'] > EXECUTABLE_SUPPORTED_MAX_VERSION: + raise ValueError( + "Executable returned unsupported version {}.".format(response['version']) + ) + else: + return response["id_token"] @classmethod def from_info(cls, info, **kwargs): From f18f9bab3bfc49b979c08786245d4ac5cd8e5842 Mon Sep 17 00:00:00 2001 From: Chuan Ren Date: Thu, 24 Feb 2022 16:16:07 -0800 Subject: [PATCH 3/7] -> pluggable --- google/auth/pluggable.py | 96 ++++++++++++++++++++++------------------ 1 file changed, 53 insertions(+), 43 deletions(-) diff --git a/google/auth/pluggable.py b/google/auth/pluggable.py index c6c5d57b6..fb0967bd9 100644 --- a/google/auth/pluggable.py +++ b/google/auth/pluggable.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Identity Pool Credentials. +"""Pluggable Credentials. This module provides credentials to access Google Cloud resources from on-prem or non-Google Cloud platforms which support external credentials (e.g. OIDC ID @@ -24,13 +24,12 @@ in on-prem/non-Google Cloud platforms as they do not involve the management of long-live service account private keys. -Identity Pool Credentials are initialized using external_account -arguments which are typically loaded from an external credentials file or -an external credentials URL. Unlike other Credentials that can be initialized -with a list of explicit arguments, secrets or credentials, external account -clients use the environment and hints/guidelines provided by the -external_account JSON file to retrieve credentials and exchange them for Google -access tokens. +Pluggable Credentials are initialized using external_account arguments which +are typically loaded from third-party executables. Unlike other +credentials that can be initialized with a list of explicit arguments, secrets +or credentials, external account clients use the environment and hints/guidelines +provided by the external_account JSON file to retrieve credentials and exchange +them for Google access tokens. """ try: @@ -47,10 +46,11 @@ from google.auth import exceptions from google.auth import external_account +# External account JSON type identifier. EXECUTABLE_SUPPORTED_MAX_VERSION = 1 class Credentials(external_account.Credentials): - """External account credentials sourced from files and URLs.""" + """External account credentials sourced from executables.""" def __init__( self, @@ -66,7 +66,7 @@ def __init__( default_scopes=None, workforce_pool_user_project=None, ): - """Instantiates an external account credentials object from a file/URL. + """Instantiates an external account credentials object from a executables. Args: audience (str): The STS audience field. @@ -76,21 +76,13 @@ def __init__( provide instructions on how to retrieve external credential to be exchanged for Google access tokens. - Example credential_source for url-sourced credential:: + Example credential_source for pluggable credential:: { - "url": "http://www.example.com", - "format": { - "type": "json", - "subject_token_field_name": "access_token", - }, - "headers": {"foo": "bar"}, - } - - Example credential_source for file-sourced credential:: - - { - "file": "/path/to/token/file.txt" + "executable": { + "command": "/path/to/get/credentials.sh --arg1=value1 --arg2=value2", + "timeout_millis": 5000, + "output_file": "/path/to/generated/cached/credentials" } service_account_impersonation_url (Optional[str]): The optional service account @@ -104,7 +96,7 @@ def __init__( Google client library. Use 'scopes' for user-defined scopes. workforce_pool_user_project (Optona[str]): The optional workforce pool user project number when the credential corresponds to a workforce pool and not - a workload identity pool. The underlying principal must still have + a workload Pluggable. The underlying principal must still have serviceusage.services.use IAM permission to use the project for billing/quota. @@ -133,7 +125,6 @@ def __init__( ) if not isinstance(credential_source, Mapping): self._credential_source_executable = None - self._credential_source_url = None else: self._credential_source_executable = credential_source.get("executable") self._credential_source_executable_command = self._credential_source_executable.get("command") @@ -144,12 +135,20 @@ def __init__( # account credentials. if "environment_id" in credential_source: raise ValueError( - "Invalid Identity Pool credential_source field 'environment_id'" + "Invalid Pluggable credential_source field 'environment_id'" ) if not self._credential_source_executable: raise ValueError( - "Missing credential_source. A 'excutable' must be provided." + "Missing credential_source. An 'excutable' must be provided." + ) + if not self._credential_source_executable_command: + raise ValueError( + "Missing command. Excutable command must be provided." + ) + if not self._credential_source_executable_timeout_millis: + raise ValueError( + "Missing timeout_millis. Excutable timeout millis must be provided." ) @_helpers.copy_docstring(external_account.Credentials) @@ -160,40 +159,51 @@ def retrieve_subject_token(self, request): "Executables need to be explicitly allowed to run." ) + # Inject env vars os.environ["GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE"] = self._audience os.environ["GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE"] = self._subject_token_type - os.environ["GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL"] = "byoid-test@cicpclientproj.iam.gserviceaccount.com" - os.environ["GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE"] = self._credential_source_executable_output_file - os.environ["GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE"] = "0" - result = subprocess.run(self._credential_source_executable_command.split(), capture_output=True) # todo: inject envs + os.environ["GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE"] = "0" # Always set to 0 until interactive mode is implemented. + if self._service_account_impersonation_url is not None: + os.environ["GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL"] = self._service_account_impersonation_url + if self._credential_source_executable_output_file is not None: + os.environ["GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE"] = self._credential_source_executable_output_file + + result = subprocess.run(self._credential_source_executable_command.split(), capture_output=True) if result.returncode != 0: - # TODO: raise error - print("error") + raise exceptions.RefreshError( + "Excutable exited with non-zero return code {}.".format(result.returncode) + ) else: data = result.stdout.decode('utf-8') response = json.loads(data) if not response['success']: - raise ValueError( - "TODO: response error." + raise exceptions.RefreshError( + "Excutable returned unsuccessful response: {}.".format(response) ) elif response['version'] > EXECUTABLE_SUPPORTED_MAX_VERSION: - raise ValueError( + raise exceptions.RefreshError( "Executable returned unsupported version {}.".format(response['version']) ) - else: + elif response["token_type"] == "urn:ietf:params:oauth:token-type:jwt" or response["token_type"] == "urn:ietf:params:oauth:token-type:id_token": # OIDC return response["id_token"] + elif response["token_type"] == "urn:ietf:params:oauth:token-type:saml2": # SAML + return response["saml_response"] + else: + raise exceptions.RefreshError( + "Executable returned unsupported token type." + ) @classmethod def from_info(cls, info, **kwargs): - """Creates an Identity Pool Credentials instance from parsed external account info. + """Creates a Pluggable Credentials instance from parsed external account info. Args: - info (Mapping[str, str]): The Identity Pool external account info in Google + info (Mapping[str, str]): The Pluggable external account info in Google format. kwargs: Additional arguments to pass to the constructor. Returns: - google.auth.identity_pool.Credentials: The constructed + google.auth.pluggable.Credentials: The constructed credentials. Raises: @@ -216,14 +226,14 @@ def from_info(cls, info, **kwargs): @classmethod def from_file(cls, filename, **kwargs): - """Creates an IdentityPool Credentials instance from an external account json file. + """Creates an Pluggable Credentials instance from an external account json file. Args: - filename (str): The path to the IdentityPool external account json file. + filename (str): The path to the Pluggable external account json file. kwargs: Additional arguments to pass to the constructor. Returns: - google.auth.identity_pool.Credentials: The constructed + google.auth.pluggable.Credentials: The constructed credentials. """ with io.open(filename, "r", encoding="utf-8") as json_file: From 1fbb5a06eb5ad5fb81ce8784bbdc422c3953e8a7 Mon Sep 17 00:00:00 2001 From: Chuan Ren Date: Thu, 24 Feb 2022 23:27:24 -0800 Subject: [PATCH 4/7] Update pluggable.py --- google/auth/pluggable.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google/auth/pluggable.py b/google/auth/pluggable.py index fb0967bd9..0fc8a3962 100644 --- a/google/auth/pluggable.py +++ b/google/auth/pluggable.py @@ -168,10 +168,10 @@ def retrieve_subject_token(self, request): if self._credential_source_executable_output_file is not None: os.environ["GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE"] = self._credential_source_executable_output_file - result = subprocess.run(self._credential_source_executable_command.split(), capture_output=True) + result = subprocess.run(self._credential_source_executable_command.split(), timeout=self._credential_source_executable_timeout_millis/1000, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if result.returncode != 0: raise exceptions.RefreshError( - "Excutable exited with non-zero return code {}.".format(result.returncode) + "Excutable exited with non-zero return code {}. Error: {}".format(result.returncode, result.stdout) ) else: data = result.stdout.decode('utf-8') From b8631cb2f3066ca0574ac4121643fc18b6a7014d Mon Sep 17 00:00:00 2001 From: Chuan Ren Date: Fri, 25 Feb 2022 10:57:08 -0800 Subject: [PATCH 5/7] Create test_pluggable.py --- tests/test_pluggable.py | 1108 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 1108 insertions(+) create mode 100644 tests/test_pluggable.py diff --git a/tests/test_pluggable.py b/tests/test_pluggable.py new file mode 100644 index 000000000..664c317d0 --- /dev/null +++ b/tests/test_pluggable.py @@ -0,0 +1,1108 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json +import os + +import mock +import pytest # type: ignore +from six.moves import http_client +from six.moves import urllib + +from google.auth import _helpers +from google.auth import exceptions +from google.auth import identity_pool +from google.auth import transport + + +CLIENT_ID = "username" +CLIENT_SECRET = "password" +# Base64 encoding of "username:password". +BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ=" +SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com" +SERVICE_ACCOUNT_IMPERSONATION_URL = ( + "https://us-east1-iamcredentials.googleapis.com/v1/projects/-" + + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL) +) +QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID" +SCOPES = ["scope1", "scope2"] +DATA_DIR = os.path.join(os.path.dirname(__file__), "data") +SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt") +SUBJECT_TOKEN_JSON_FILE = os.path.join(DATA_DIR, "external_subject_token.json") +SUBJECT_TOKEN_FIELD_NAME = "access_token" + +with open(SUBJECT_TOKEN_TEXT_FILE) as fh: + TEXT_FILE_SUBJECT_TOKEN = fh.read() + +with open(SUBJECT_TOKEN_JSON_FILE) as fh: + JSON_FILE_CONTENT = json.load(fh) + JSON_FILE_SUBJECT_TOKEN = JSON_FILE_CONTENT.get(SUBJECT_TOKEN_FIELD_NAME) + +TOKEN_URL = "https://sts.googleapis.com/v1/token" +SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt" +AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID" +WORKFORCE_AUDIENCE = ( + "//iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID" +) +WORKFORCE_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token" +WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER" + + +class TestCredentials(object): + CREDENTIAL_SOURCE_TEXT = {"file": SUBJECT_TOKEN_TEXT_FILE} + CREDENTIAL_SOURCE_JSON = { + "file": SUBJECT_TOKEN_JSON_FILE, + "format": {"type": "json", "subject_token_field_name": "access_token"}, + } + CREDENTIAL_URL = "http://fakeurl.com" + CREDENTIAL_SOURCE_TEXT_URL = {"url": CREDENTIAL_URL} + CREDENTIAL_SOURCE_JSON_URL = { + "url": CREDENTIAL_URL, + "format": {"type": "json", "subject_token_field_name": "access_token"}, + } + SUCCESS_RESPONSE = { + "access_token": "ACCESS_TOKEN", + "issued_token_type": "urn:ietf:params:oauth:token-type:access_token", + "token_type": "Bearer", + "expires_in": 3600, + "scope": " ".join(SCOPES), + } + + @classmethod + def make_mock_response(cls, status, data): + response = mock.create_autospec(transport.Response, instance=True) + response.status = status + if isinstance(data, dict): + response.data = json.dumps(data).encode("utf-8") + else: + response.data = data + return response + + @classmethod + def make_mock_request( + cls, token_status=http_client.OK, token_data=None, *extra_requests + ): + responses = [] + responses.append(cls.make_mock_response(token_status, token_data)) + + while len(extra_requests) > 0: + # If service account impersonation is requested, mock the expected response. + status, data, extra_requests = ( + extra_requests[0], + extra_requests[1], + extra_requests[2:], + ) + responses.append(cls.make_mock_response(status, data)) + + request = mock.create_autospec(transport.Request) + request.side_effect = responses + + return request + + @classmethod + def assert_credential_request_kwargs( + cls, request_kwargs, headers, url=CREDENTIAL_URL + ): + assert request_kwargs["url"] == url + assert request_kwargs["method"] == "GET" + assert request_kwargs["headers"] == headers + assert request_kwargs.get("body", None) is None + + @classmethod + def assert_token_request_kwargs( + cls, request_kwargs, headers, request_data, token_url=TOKEN_URL + ): + assert request_kwargs["url"] == token_url + assert request_kwargs["method"] == "POST" + assert request_kwargs["headers"] == headers + assert request_kwargs["body"] is not None + body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) + assert len(body_tuples) == len(request_data.keys()) + for (k, v) in body_tuples: + assert v.decode("utf-8") == request_data[k.decode("utf-8")] + + @classmethod + def assert_impersonation_request_kwargs( + cls, + request_kwargs, + headers, + request_data, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + ): + assert request_kwargs["url"] == service_account_impersonation_url + assert request_kwargs["method"] == "POST" + assert request_kwargs["headers"] == headers + assert request_kwargs["body"] is not None + body_json = json.loads(request_kwargs["body"].decode("utf-8")) + assert body_json == request_data + + @classmethod + def assert_underlying_credentials_refresh( + cls, + credentials, + audience, + subject_token, + subject_token_type, + token_url, + service_account_impersonation_url=None, + basic_auth_encoding=None, + quota_project_id=None, + used_scopes=None, + credential_data=None, + scopes=None, + default_scopes=None, + workforce_pool_user_project=None, + ): + """Utility to assert that a credentials are initialized with the expected + attributes by calling refresh functionality and confirming response matches + expected one and that the underlying requests were populated with the + expected parameters. + """ + # STS token exchange request/response. + token_response = cls.SUCCESS_RESPONSE.copy() + token_headers = {"Content-Type": "application/x-www-form-urlencoded"} + if basic_auth_encoding: + token_headers["Authorization"] = "Basic " + basic_auth_encoding + + if service_account_impersonation_url: + token_scopes = "https://www.googleapis.com/auth/iam" + else: + token_scopes = " ".join(used_scopes or []) + + token_request_data = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "audience": audience, + "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", + "scope": token_scopes, + "subject_token": subject_token, + "subject_token_type": subject_token_type, + } + if workforce_pool_user_project: + token_request_data["options"] = urllib.parse.quote( + json.dumps({"userProject": workforce_pool_user_project}) + ) + + if service_account_impersonation_url: + # Service account impersonation request/response. + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + + datetime.timedelta(seconds=3600) + ).isoformat("T") + "Z" + impersonation_response = { + "accessToken": "SA_ACCESS_TOKEN", + "expireTime": expire_time, + } + impersonation_headers = { + "Content-Type": "application/json", + "authorization": "Bearer {}".format(token_response["access_token"]), + } + impersonation_request_data = { + "delegates": None, + "scope": used_scopes, + "lifetime": "3600s", + } + + # Initialize mock request to handle token retrieval, token exchange and + # service account impersonation request. + requests = [] + if credential_data: + requests.append((http_client.OK, credential_data)) + + token_request_index = len(requests) + requests.append((http_client.OK, token_response)) + + if service_account_impersonation_url: + impersonation_request_index = len(requests) + requests.append((http_client.OK, impersonation_response)) + + request = cls.make_mock_request(*[el for req in requests for el in req]) + + credentials.refresh(request) + + assert len(request.call_args_list) == len(requests) + if credential_data: + cls.assert_credential_request_kwargs(request.call_args_list[0][1], None) + # Verify token exchange request parameters. + cls.assert_token_request_kwargs( + request.call_args_list[token_request_index][1], + token_headers, + token_request_data, + token_url, + ) + # Verify service account impersonation request parameters if the request + # is processed. + if service_account_impersonation_url: + cls.assert_impersonation_request_kwargs( + request.call_args_list[impersonation_request_index][1], + impersonation_headers, + impersonation_request_data, + service_account_impersonation_url, + ) + assert credentials.token == impersonation_response["accessToken"] + else: + assert credentials.token == token_response["access_token"] + assert credentials.quota_project_id == quota_project_id + assert credentials.scopes == scopes + assert credentials.default_scopes == default_scopes + + @classmethod + def make_credentials( + cls, + audience=AUDIENCE, + subject_token_type=SUBJECT_TOKEN_TYPE, + client_id=None, + client_secret=None, + quota_project_id=None, + scopes=None, + default_scopes=None, + service_account_impersonation_url=None, + credential_source=None, + workforce_pool_user_project=None, + ): + return identity_pool.Credentials( + audience=audience, + subject_token_type=subject_token_type, + token_url=TOKEN_URL, + service_account_impersonation_url=service_account_impersonation_url, + credential_source=credential_source, + client_id=client_id, + client_secret=client_secret, + quota_project_id=quota_project_id, + scopes=scopes, + default_scopes=default_scopes, + workforce_pool_user_project=workforce_pool_user_project, + ) + + @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) + def test_from_info_full_options(self, mock_init): + credentials = identity_pool.Credentials.from_info( + { + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + "quota_project_id": QUOTA_PROJECT_ID, + "credential_source": self.CREDENTIAL_SOURCE_TEXT, + } + ) + + # Confirm identity_pool.Credentials instantiated with expected attributes. + assert isinstance(credentials, identity_pool.Credentials) + mock_init.assert_called_once_with( + audience=AUDIENCE, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + credential_source=self.CREDENTIAL_SOURCE_TEXT, + quota_project_id=QUOTA_PROJECT_ID, + workforce_pool_user_project=None, + ) + + @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) + def test_from_info_required_options_only(self, mock_init): + credentials = identity_pool.Credentials.from_info( + { + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE_TEXT, + } + ) + + # Confirm identity_pool.Credentials instantiated with expected attributes. + assert isinstance(credentials, identity_pool.Credentials) + mock_init.assert_called_once_with( + audience=AUDIENCE, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + client_id=None, + client_secret=None, + credential_source=self.CREDENTIAL_SOURCE_TEXT, + quota_project_id=None, + workforce_pool_user_project=None, + ) + + @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) + def test_from_info_workforce_pool(self, mock_init): + credentials = identity_pool.Credentials.from_info( + { + "audience": WORKFORCE_AUDIENCE, + "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE_TEXT, + "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT, + } + ) + + # Confirm identity_pool.Credentials instantiated with expected attributes. + assert isinstance(credentials, identity_pool.Credentials) + mock_init.assert_called_once_with( + audience=WORKFORCE_AUDIENCE, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + client_id=None, + client_secret=None, + credential_source=self.CREDENTIAL_SOURCE_TEXT, + quota_project_id=None, + workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, + ) + + @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) + def test_from_file_full_options(self, mock_init, tmpdir): + info = { + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + "quota_project_id": QUOTA_PROJECT_ID, + "credential_source": self.CREDENTIAL_SOURCE_TEXT, + } + config_file = tmpdir.join("config.json") + config_file.write(json.dumps(info)) + credentials = identity_pool.Credentials.from_file(str(config_file)) + + # Confirm identity_pool.Credentials instantiated with expected attributes. + assert isinstance(credentials, identity_pool.Credentials) + mock_init.assert_called_once_with( + audience=AUDIENCE, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + credential_source=self.CREDENTIAL_SOURCE_TEXT, + quota_project_id=QUOTA_PROJECT_ID, + workforce_pool_user_project=None, + ) + + @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) + def test_from_file_required_options_only(self, mock_init, tmpdir): + info = { + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE_TEXT, + } + config_file = tmpdir.join("config.json") + config_file.write(json.dumps(info)) + credentials = identity_pool.Credentials.from_file(str(config_file)) + + # Confirm identity_pool.Credentials instantiated with expected attributes. + assert isinstance(credentials, identity_pool.Credentials) + mock_init.assert_called_once_with( + audience=AUDIENCE, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + client_id=None, + client_secret=None, + credential_source=self.CREDENTIAL_SOURCE_TEXT, + quota_project_id=None, + workforce_pool_user_project=None, + ) + + @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) + def test_from_file_workforce_pool(self, mock_init, tmpdir): + info = { + "audience": WORKFORCE_AUDIENCE, + "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE_TEXT, + "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT, + } + config_file = tmpdir.join("config.json") + config_file.write(json.dumps(info)) + credentials = identity_pool.Credentials.from_file(str(config_file)) + + # Confirm identity_pool.Credentials instantiated with expected attributes. + assert isinstance(credentials, identity_pool.Credentials) + mock_init.assert_called_once_with( + audience=WORKFORCE_AUDIENCE, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + client_id=None, + client_secret=None, + credential_source=self.CREDENTIAL_SOURCE_TEXT, + quota_project_id=None, + workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, + ) + + def test_constructor_nonworkforce_with_workforce_pool_user_project(self): + with pytest.raises(ValueError) as excinfo: + self.make_credentials( + audience=AUDIENCE, + workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, + ) + + assert excinfo.match( + "workforce_pool_user_project should not be set for non-workforce " + "pool credentials" + ) + + def test_constructor_invalid_options(self): + credential_source = {"unsupported": "value"} + + with pytest.raises(ValueError) as excinfo: + self.make_credentials(credential_source=credential_source) + + assert excinfo.match(r"Missing credential_source") + + def test_constructor_invalid_options_url_and_file(self): + credential_source = { + "url": self.CREDENTIAL_URL, + "file": SUBJECT_TOKEN_TEXT_FILE, + } + + with pytest.raises(ValueError) as excinfo: + self.make_credentials(credential_source=credential_source) + + assert excinfo.match(r"Ambiguous credential_source") + + def test_constructor_invalid_options_environment_id(self): + credential_source = {"url": self.CREDENTIAL_URL, "environment_id": "aws1"} + + with pytest.raises(ValueError) as excinfo: + self.make_credentials(credential_source=credential_source) + + assert excinfo.match( + r"Invalid Identity Pool credential_source field 'environment_id'" + ) + + def test_constructor_invalid_credential_source(self): + with pytest.raises(ValueError) as excinfo: + self.make_credentials(credential_source="non-dict") + + assert excinfo.match(r"Missing credential_source") + + def test_constructor_invalid_credential_source_format_type(self): + credential_source = {"format": {"type": "xml"}} + + with pytest.raises(ValueError) as excinfo: + self.make_credentials(credential_source=credential_source) + + assert excinfo.match(r"Invalid credential_source format 'xml'") + + def test_constructor_missing_subject_token_field_name(self): + credential_source = {"format": {"type": "json"}} + + with pytest.raises(ValueError) as excinfo: + self.make_credentials(credential_source=credential_source) + + assert excinfo.match( + r"Missing subject_token_field_name for JSON credential_source format" + ) + + def test_info_with_workforce_pool_user_project(self): + credentials = self.make_credentials( + audience=WORKFORCE_AUDIENCE, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy(), + workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, + ) + + assert credentials.info == { + "type": "external_account", + "audience": WORKFORCE_AUDIENCE, + "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL, + "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT, + } + + def test_info_with_file_credential_source(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy() + ) + + assert credentials.info == { + "type": "external_account", + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL, + } + + def test_info_with_url_credential_source(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_JSON_URL.copy() + ) + + assert credentials.info == { + "type": "external_account", + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE_JSON_URL, + } + + def test_retrieve_subject_token_missing_subject_token(self, tmpdir): + # Provide empty text file. + empty_file = tmpdir.join("empty.txt") + empty_file.write("") + credential_source = {"file": str(empty_file)} + credentials = self.make_credentials(credential_source=credential_source) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.retrieve_subject_token(None) + + assert excinfo.match(r"Missing subject_token in the credential_source file") + + def test_retrieve_subject_token_text_file(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_TEXT + ) + + subject_token = credentials.retrieve_subject_token(None) + + assert subject_token == TEXT_FILE_SUBJECT_TOKEN + + def test_retrieve_subject_token_json_file(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_JSON + ) + + subject_token = credentials.retrieve_subject_token(None) + + assert subject_token == JSON_FILE_SUBJECT_TOKEN + + def test_retrieve_subject_token_json_file_invalid_field_name(self): + credential_source = { + "file": SUBJECT_TOKEN_JSON_FILE, + "format": {"type": "json", "subject_token_field_name": "not_found"}, + } + credentials = self.make_credentials(credential_source=credential_source) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.retrieve_subject_token(None) + + assert excinfo.match( + "Unable to parse subject_token from JSON file '{}' using key '{}'".format( + SUBJECT_TOKEN_JSON_FILE, "not_found" + ) + ) + + def test_retrieve_subject_token_invalid_json(self, tmpdir): + # Provide JSON file. This should result in JSON parsing error. + invalid_json_file = tmpdir.join("invalid.json") + invalid_json_file.write("{") + credential_source = { + "file": str(invalid_json_file), + "format": {"type": "json", "subject_token_field_name": "access_token"}, + } + credentials = self.make_credentials(credential_source=credential_source) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.retrieve_subject_token(None) + + assert excinfo.match( + "Unable to parse subject_token from JSON file '{}' using key '{}'".format( + str(invalid_json_file), "access_token" + ) + ) + + def test_retrieve_subject_token_file_not_found(self): + credential_source = {"file": "./not_found.txt"} + credentials = self.make_credentials(credential_source=credential_source) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.retrieve_subject_token(None) + + assert excinfo.match(r"File './not_found.txt' was not found") + + def test_refresh_text_file_success_without_impersonation_ignore_default_scopes( + self, + ): + credentials = self.make_credentials( + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + # Test with text format type. + credential_source=self.CREDENTIAL_SOURCE_TEXT, + scopes=SCOPES, + # Default scopes should be ignored. + default_scopes=["ignored"], + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + basic_auth_encoding=BASIC_AUTH_ENCODING, + quota_project_id=None, + used_scopes=SCOPES, + scopes=SCOPES, + default_scopes=["ignored"], + ) + + def test_refresh_workforce_success_with_client_auth_without_impersonation(self): + credentials = self.make_credentials( + audience=WORKFORCE_AUDIENCE, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + # Test with text format type. + credential_source=self.CREDENTIAL_SOURCE_TEXT, + scopes=SCOPES, + # This will be ignored in favor of client auth. + workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=WORKFORCE_AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + basic_auth_encoding=BASIC_AUTH_ENCODING, + quota_project_id=None, + used_scopes=SCOPES, + scopes=SCOPES, + workforce_pool_user_project=None, + ) + + def test_refresh_workforce_success_with_client_auth_and_no_workforce_project(self): + credentials = self.make_credentials( + audience=WORKFORCE_AUDIENCE, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + # Test with text format type. + credential_source=self.CREDENTIAL_SOURCE_TEXT, + scopes=SCOPES, + # This is not needed when client Auth is used. + workforce_pool_user_project=None, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=WORKFORCE_AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + basic_auth_encoding=BASIC_AUTH_ENCODING, + quota_project_id=None, + used_scopes=SCOPES, + scopes=SCOPES, + workforce_pool_user_project=None, + ) + + def test_refresh_workforce_success_without_client_auth_without_impersonation(self): + credentials = self.make_credentials( + audience=WORKFORCE_AUDIENCE, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + client_id=None, + client_secret=None, + # Test with text format type. + credential_source=self.CREDENTIAL_SOURCE_TEXT, + scopes=SCOPES, + # This will not be ignored as client auth is not used. + workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=WORKFORCE_AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + basic_auth_encoding=None, + quota_project_id=None, + used_scopes=SCOPES, + scopes=SCOPES, + workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, + ) + + def test_refresh_workforce_success_without_client_auth_with_impersonation(self): + credentials = self.make_credentials( + audience=WORKFORCE_AUDIENCE, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + client_id=None, + client_secret=None, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + # Test with text format type. + credential_source=self.CREDENTIAL_SOURCE_TEXT, + scopes=SCOPES, + # This will not be ignored as client auth is not used. + workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=WORKFORCE_AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + basic_auth_encoding=None, + quota_project_id=None, + used_scopes=SCOPES, + scopes=SCOPES, + workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, + ) + + def test_refresh_text_file_success_without_impersonation_use_default_scopes(self): + credentials = self.make_credentials( + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + # Test with text format type. + credential_source=self.CREDENTIAL_SOURCE_TEXT, + scopes=None, + # Default scopes should be used since user specified scopes are none. + default_scopes=SCOPES, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + basic_auth_encoding=BASIC_AUTH_ENCODING, + quota_project_id=None, + used_scopes=SCOPES, + scopes=None, + default_scopes=SCOPES, + ) + + def test_refresh_text_file_success_with_impersonation_ignore_default_scopes(self): + # Initialize credentials with service account impersonation and basic auth. + credentials = self.make_credentials( + # Test with text format type. + credential_source=self.CREDENTIAL_SOURCE_TEXT, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + scopes=SCOPES, + # Default scopes should be ignored. + default_scopes=["ignored"], + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + basic_auth_encoding=None, + quota_project_id=None, + used_scopes=SCOPES, + scopes=SCOPES, + default_scopes=["ignored"], + ) + + def test_refresh_text_file_success_with_impersonation_use_default_scopes(self): + # Initialize credentials with service account impersonation, basic auth + # and default scopes (no user scopes). + credentials = self.make_credentials( + # Test with text format type. + credential_source=self.CREDENTIAL_SOURCE_TEXT, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + scopes=None, + # Default scopes should be used since user specified scopes are none. + default_scopes=SCOPES, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + basic_auth_encoding=None, + quota_project_id=None, + used_scopes=SCOPES, + scopes=None, + default_scopes=SCOPES, + ) + + def test_refresh_json_file_success_without_impersonation(self): + credentials = self.make_credentials( + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + # Test with JSON format type. + credential_source=self.CREDENTIAL_SOURCE_JSON, + scopes=SCOPES, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=AUDIENCE, + subject_token=JSON_FILE_SUBJECT_TOKEN, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + basic_auth_encoding=BASIC_AUTH_ENCODING, + quota_project_id=None, + used_scopes=SCOPES, + scopes=SCOPES, + default_scopes=None, + ) + + def test_refresh_json_file_success_with_impersonation(self): + # Initialize credentials with service account impersonation and basic auth. + credentials = self.make_credentials( + # Test with JSON format type. + credential_source=self.CREDENTIAL_SOURCE_JSON, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + scopes=SCOPES, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=AUDIENCE, + subject_token=JSON_FILE_SUBJECT_TOKEN, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + basic_auth_encoding=None, + quota_project_id=None, + used_scopes=SCOPES, + scopes=SCOPES, + default_scopes=None, + ) + + def test_refresh_with_retrieve_subject_token_error(self): + credential_source = { + "file": SUBJECT_TOKEN_JSON_FILE, + "format": {"type": "json", "subject_token_field_name": "not_found"}, + } + credentials = self.make_credentials(credential_source=credential_source) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.refresh(None) + + assert excinfo.match( + "Unable to parse subject_token from JSON file '{}' using key '{}'".format( + SUBJECT_TOKEN_JSON_FILE, "not_found" + ) + ) + + def test_retrieve_subject_token_from_url(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_TEXT_URL + ) + request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN) + subject_token = credentials.retrieve_subject_token(request) + + assert subject_token == TEXT_FILE_SUBJECT_TOKEN + self.assert_credential_request_kwargs(request.call_args_list[0][1], None) + + def test_retrieve_subject_token_from_url_with_headers(self): + credentials = self.make_credentials( + credential_source={"url": self.CREDENTIAL_URL, "headers": {"foo": "bar"}} + ) + request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN) + subject_token = credentials.retrieve_subject_token(request) + + assert subject_token == TEXT_FILE_SUBJECT_TOKEN + self.assert_credential_request_kwargs( + request.call_args_list[0][1], {"foo": "bar"} + ) + + def test_retrieve_subject_token_from_url_json(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_JSON_URL + ) + request = self.make_mock_request(token_data=JSON_FILE_CONTENT) + subject_token = credentials.retrieve_subject_token(request) + + assert subject_token == JSON_FILE_SUBJECT_TOKEN + self.assert_credential_request_kwargs(request.call_args_list[0][1], None) + + def test_retrieve_subject_token_from_url_json_with_headers(self): + credentials = self.make_credentials( + credential_source={ + "url": self.CREDENTIAL_URL, + "format": {"type": "json", "subject_token_field_name": "access_token"}, + "headers": {"foo": "bar"}, + } + ) + request = self.make_mock_request(token_data=JSON_FILE_CONTENT) + subject_token = credentials.retrieve_subject_token(request) + + assert subject_token == JSON_FILE_SUBJECT_TOKEN + self.assert_credential_request_kwargs( + request.call_args_list[0][1], {"foo": "bar"} + ) + + def test_retrieve_subject_token_from_url_not_found(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_TEXT_URL + ) + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.retrieve_subject_token( + self.make_mock_request(token_status=404, token_data=JSON_FILE_CONTENT) + ) + + assert excinfo.match("Unable to retrieve Identity Pool subject token") + + def test_retrieve_subject_token_from_url_json_invalid_field(self): + credential_source = { + "url": self.CREDENTIAL_URL, + "format": {"type": "json", "subject_token_field_name": "not_found"}, + } + credentials = self.make_credentials(credential_source=credential_source) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.retrieve_subject_token( + self.make_mock_request(token_data=JSON_FILE_CONTENT) + ) + + assert excinfo.match( + "Unable to parse subject_token from JSON file '{}' using key '{}'".format( + self.CREDENTIAL_URL, "not_found" + ) + ) + + def test_retrieve_subject_token_from_url_json_invalid_format(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_JSON_URL + ) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.retrieve_subject_token(self.make_mock_request(token_data="{")) + + assert excinfo.match( + "Unable to parse subject_token from JSON file '{}' using key '{}'".format( + self.CREDENTIAL_URL, "access_token" + ) + ) + + def test_refresh_text_file_success_without_impersonation_url(self): + credentials = self.make_credentials( + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + # Test with text format type. + credential_source=self.CREDENTIAL_SOURCE_TEXT_URL, + scopes=SCOPES, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + basic_auth_encoding=BASIC_AUTH_ENCODING, + quota_project_id=None, + used_scopes=SCOPES, + scopes=SCOPES, + default_scopes=None, + credential_data=TEXT_FILE_SUBJECT_TOKEN, + ) + + def test_refresh_text_file_success_with_impersonation_url(self): + # Initialize credentials with service account impersonation and basic auth. + credentials = self.make_credentials( + # Test with text format type. + credential_source=self.CREDENTIAL_SOURCE_TEXT_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + scopes=SCOPES, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + basic_auth_encoding=None, + quota_project_id=None, + used_scopes=SCOPES, + scopes=SCOPES, + default_scopes=None, + credential_data=TEXT_FILE_SUBJECT_TOKEN, + ) + + def test_refresh_json_file_success_without_impersonation_url(self): + credentials = self.make_credentials( + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + # Test with JSON format type. + credential_source=self.CREDENTIAL_SOURCE_JSON_URL, + scopes=SCOPES, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=AUDIENCE, + subject_token=JSON_FILE_SUBJECT_TOKEN, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + basic_auth_encoding=BASIC_AUTH_ENCODING, + quota_project_id=None, + used_scopes=SCOPES, + scopes=SCOPES, + default_scopes=None, + credential_data=JSON_FILE_CONTENT, + ) + + def test_refresh_json_file_success_with_impersonation_url(self): + # Initialize credentials with service account impersonation and basic auth. + credentials = self.make_credentials( + # Test with JSON format type. + credential_source=self.CREDENTIAL_SOURCE_JSON_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + scopes=SCOPES, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=AUDIENCE, + subject_token=JSON_FILE_SUBJECT_TOKEN, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + basic_auth_encoding=None, + quota_project_id=None, + used_scopes=SCOPES, + scopes=SCOPES, + default_scopes=None, + credential_data=JSON_FILE_CONTENT, + ) + + def test_refresh_with_retrieve_subject_token_error_url(self): + credential_source = { + "url": self.CREDENTIAL_URL, + "format": {"type": "json", "subject_token_field_name": "not_found"}, + } + credentials = self.make_credentials(credential_source=credential_source) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.refresh(self.make_mock_request(token_data=JSON_FILE_CONTENT)) + + assert excinfo.match( + "Unable to parse subject_token from JSON file '{}' using key '{}'".format( + self.CREDENTIAL_URL, "not_found" + ) + ) From 6337de11ad475216d3bd611ef423f1d51ee934fd Mon Sep 17 00:00:00 2001 From: Chuan Ren Date: Fri, 25 Feb 2022 14:47:44 -0800 Subject: [PATCH 6/7] Unit tests --- google/auth/pluggable.py | 19 +- tests/test_pluggable.py | 845 ++++++--------------------------------- 2 files changed, 135 insertions(+), 729 deletions(-) diff --git a/google/auth/pluggable.py b/google/auth/pluggable.py index 0fc8a3962..afecb90f6 100644 --- a/google/auth/pluggable.py +++ b/google/auth/pluggable.py @@ -125,8 +125,15 @@ def __init__( ) if not isinstance(credential_source, Mapping): self._credential_source_executable = None + raise ValueError( + "Missing credential_source. The credential_source is not a dict." + ) else: self._credential_source_executable = credential_source.get("executable") + if not self._credential_source_executable: + raise ValueError( + "Missing credential_source. An 'executable' must be provided." + ) self._credential_source_executable_command = self._credential_source_executable.get("command") self._credential_source_executable_timeout_millis = self._credential_source_executable.get("timeout_millis") self._credential_source_executable_output_file = self._credential_source_executable.get("output_file") @@ -138,17 +145,13 @@ def __init__( "Invalid Pluggable credential_source field 'environment_id'" ) - if not self._credential_source_executable: - raise ValueError( - "Missing credential_source. An 'excutable' must be provided." - ) if not self._credential_source_executable_command: raise ValueError( - "Missing command. Excutable command must be provided." + "Missing command. Executable command must be provided." ) if not self._credential_source_executable_timeout_millis: raise ValueError( - "Missing timeout_millis. Excutable timeout millis must be provided." + "Missing timeout_millis. Executable timeout millis must be provided." ) @_helpers.copy_docstring(external_account.Credentials) @@ -171,14 +174,14 @@ def retrieve_subject_token(self, request): result = subprocess.run(self._credential_source_executable_command.split(), timeout=self._credential_source_executable_timeout_millis/1000, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if result.returncode != 0: raise exceptions.RefreshError( - "Excutable exited with non-zero return code {}. Error: {}".format(result.returncode, result.stdout) + "Executable exited with non-zero return code {}. Error: {}".format(result.returncode, result.stdout) ) else: data = result.stdout.decode('utf-8') response = json.loads(data) if not response['success']: raise exceptions.RefreshError( - "Excutable returned unsuccessful response: {}.".format(response) + "Executable returned unsuccessful response: {}.".format(response) ) elif response['version'] > EXECUTABLE_SUPPORTED_MAX_VERSION: raise exceptions.RefreshError( diff --git a/tests/test_pluggable.py b/tests/test_pluggable.py index 664c317d0..79fe9b27c 100644 --- a/tests/test_pluggable.py +++ b/tests/test_pluggable.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,12 +18,15 @@ import mock import pytest # type: ignore +import subprocess +import pytest_subprocess from six.moves import http_client from six.moves import urllib from google.auth import _helpers from google.auth import exceptions from google.auth import identity_pool +from google.auth import pluggable from google.auth import transport @@ -38,47 +41,52 @@ ) QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID" SCOPES = ["scope1", "scope2"] -DATA_DIR = os.path.join(os.path.dirname(__file__), "data") -SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt") -SUBJECT_TOKEN_JSON_FILE = os.path.join(DATA_DIR, "external_subject_token.json") SUBJECT_TOKEN_FIELD_NAME = "access_token" -with open(SUBJECT_TOKEN_TEXT_FILE) as fh: - TEXT_FILE_SUBJECT_TOKEN = fh.read() - -with open(SUBJECT_TOKEN_JSON_FILE) as fh: - JSON_FILE_CONTENT = json.load(fh) - JSON_FILE_SUBJECT_TOKEN = JSON_FILE_CONTENT.get(SUBJECT_TOKEN_FIELD_NAME) - TOKEN_URL = "https://sts.googleapis.com/v1/token" SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt" AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID" -WORKFORCE_AUDIENCE = ( - "//iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID" -) -WORKFORCE_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token" -WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER" - class TestCredentials(object): - CREDENTIAL_SOURCE_TEXT = {"file": SUBJECT_TOKEN_TEXT_FILE} - CREDENTIAL_SOURCE_JSON = { - "file": SUBJECT_TOKEN_JSON_FILE, - "format": {"type": "json", "subject_token_field_name": "access_token"}, + CREDENTIAL_SOURCE_EXECUTABLE_COMMAND = "/fake/external/excutable --arg1=value1 --arg2=value2" + CREDENTIAL_SOURCE_EXECUTABLE = { + "command": CREDENTIAL_SOURCE_EXECUTABLE_COMMAND, + "timeout_millis": 5000, + "output_file": "/fake/output/file" } - CREDENTIAL_URL = "http://fakeurl.com" - CREDENTIAL_SOURCE_TEXT_URL = {"url": CREDENTIAL_URL} - CREDENTIAL_SOURCE_JSON_URL = { - "url": CREDENTIAL_URL, - "format": {"type": "json", "subject_token_field_name": "access_token"}, + CREDENTIAL_SOURCE = { + "executable": CREDENTIAL_SOURCE_EXECUTABLE } - SUCCESS_RESPONSE = { - "access_token": "ACCESS_TOKEN", - "issued_token_type": "urn:ietf:params:oauth:token-type:access_token", - "token_type": "Bearer", - "expires_in": 3600, - "scope": " ".join(SCOPES), + EXECUTABLE_OIDC_TOKEN = "FAKE_ID_TOKEN" + EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN = { + "version": 1, + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:id_token", + "id_token": EXECUTABLE_OIDC_TOKEN, + "expiration_time": 9999999999 } + EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_JWT = { + "version": 1, + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:jwt", + "id_token": EXECUTABLE_OIDC_TOKEN, + "expiration_time": 9999999999 + } + EXECUTABLE_SAML_TOKEN = "FAKE_SAML_RESPONSE" + EXECUTABLE_SUCCESSFUL_SAML_RESPONSE = { + "version": 1, + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:saml2", + "saml_response": EXECUTABLE_SAML_TOKEN, + "expiration_time": 1620433341 + } + EXECUTABLE_FAILED_RESPONSE = { + "version": 1, + "success": False, + "code": "401", + "message": "Permission denied. Caller not authorized" + } + CREDENTIAL_URL = "http://fakeurl.com" @classmethod def make_mock_response(cls, status, data): @@ -258,7 +266,7 @@ def assert_underlying_credentials_refresh( assert credentials.default_scopes == default_scopes @classmethod - def make_credentials( + def make_pluggable( cls, audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, @@ -271,7 +279,7 @@ def make_credentials( credential_source=None, workforce_pool_user_project=None, ): - return identity_pool.Credentials( + return pluggable.Credentials( audience=audience, subject_token_type=subject_token_type, token_url=TOKEN_URL, @@ -284,10 +292,10 @@ def make_credentials( default_scopes=default_scopes, workforce_pool_user_project=workforce_pool_user_project, ) - - @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) + + @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) def test_from_info_full_options(self, mock_init): - credentials = identity_pool.Credentials.from_info( + credentials = pluggable.Credentials.from_info( { "audience": AUDIENCE, "subject_token_type": SUBJECT_TOKEN_TYPE, @@ -296,12 +304,12 @@ def test_from_info_full_options(self, mock_init): "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "quota_project_id": QUOTA_PROJECT_ID, - "credential_source": self.CREDENTIAL_SOURCE_TEXT, + "credential_source": self.CREDENTIAL_SOURCE, } ) - # Confirm identity_pool.Credentials instantiated with expected attributes. - assert isinstance(credentials, identity_pool.Credentials) + # Confirm pluggable.Credentials instantiated with expected attributes. + assert isinstance(credentials, pluggable.Credentials) mock_init.assert_called_once_with( audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, @@ -309,24 +317,24 @@ def test_from_info_full_options(self, mock_init): service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, client_id=CLIENT_ID, client_secret=CLIENT_SECRET, - credential_source=self.CREDENTIAL_SOURCE_TEXT, + credential_source=self.CREDENTIAL_SOURCE, quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, ) - @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) + @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) def test_from_info_required_options_only(self, mock_init): - credentials = identity_pool.Credentials.from_info( + credentials = pluggable.Credentials.from_info( { "audience": AUDIENCE, "subject_token_type": SUBJECT_TOKEN_TYPE, "token_url": TOKEN_URL, - "credential_source": self.CREDENTIAL_SOURCE_TEXT, + "credential_source": self.CREDENTIAL_SOURCE, } ) - # Confirm identity_pool.Credentials instantiated with expected attributes. - assert isinstance(credentials, identity_pool.Credentials) + # Confirm pluggable.Credentials instantiated with expected attributes. + assert isinstance(credentials, pluggable.Credentials) mock_init.assert_called_once_with( audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, @@ -334,38 +342,12 @@ def test_from_info_required_options_only(self, mock_init): service_account_impersonation_url=None, client_id=None, client_secret=None, - credential_source=self.CREDENTIAL_SOURCE_TEXT, + credential_source=self.CREDENTIAL_SOURCE, quota_project_id=None, workforce_pool_user_project=None, ) - @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) - def test_from_info_workforce_pool(self, mock_init): - credentials = identity_pool.Credentials.from_info( - { - "audience": WORKFORCE_AUDIENCE, - "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE, - "token_url": TOKEN_URL, - "credential_source": self.CREDENTIAL_SOURCE_TEXT, - "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT, - } - ) - - # Confirm identity_pool.Credentials instantiated with expected attributes. - assert isinstance(credentials, identity_pool.Credentials) - mock_init.assert_called_once_with( - audience=WORKFORCE_AUDIENCE, - subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=None, - client_id=None, - client_secret=None, - credential_source=self.CREDENTIAL_SOURCE_TEXT, - quota_project_id=None, - workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, - ) - - @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) + @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) def test_from_file_full_options(self, mock_init, tmpdir): info = { "audience": AUDIENCE, @@ -375,14 +357,14 @@ def test_from_file_full_options(self, mock_init, tmpdir): "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "quota_project_id": QUOTA_PROJECT_ID, - "credential_source": self.CREDENTIAL_SOURCE_TEXT, + "credential_source": self.CREDENTIAL_SOURCE, } config_file = tmpdir.join("config.json") config_file.write(json.dumps(info)) - credentials = identity_pool.Credentials.from_file(str(config_file)) + credentials = pluggable.Credentials.from_file(str(config_file)) - # Confirm identity_pool.Credentials instantiated with expected attributes. - assert isinstance(credentials, identity_pool.Credentials) + # Confirm pluggable.Credentials instantiated with expected attributes. + assert isinstance(credentials, pluggable.Credentials) mock_init.assert_called_once_with( audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, @@ -390,25 +372,25 @@ def test_from_file_full_options(self, mock_init, tmpdir): service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, client_id=CLIENT_ID, client_secret=CLIENT_SECRET, - credential_source=self.CREDENTIAL_SOURCE_TEXT, + credential_source=self.CREDENTIAL_SOURCE, quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, ) - @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) + @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) def test_from_file_required_options_only(self, mock_init, tmpdir): info = { "audience": AUDIENCE, "subject_token_type": SUBJECT_TOKEN_TYPE, "token_url": TOKEN_URL, - "credential_source": self.CREDENTIAL_SOURCE_TEXT, + "credential_source": self.CREDENTIAL_SOURCE, } config_file = tmpdir.join("config.json") config_file.write(json.dumps(info)) - credentials = identity_pool.Credentials.from_file(str(config_file)) + credentials = pluggable.Credentials.from_file(str(config_file)) - # Confirm identity_pool.Credentials instantiated with expected attributes. - assert isinstance(credentials, identity_pool.Credentials) + # Confirm pluggable.Credentials instantiated with expected attributes. + assert isinstance(credentials, pluggable.Credentials) mock_init.assert_called_once_with( audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, @@ -416,123 +398,38 @@ def test_from_file_required_options_only(self, mock_init, tmpdir): service_account_impersonation_url=None, client_id=None, client_secret=None, - credential_source=self.CREDENTIAL_SOURCE_TEXT, + credential_source=self.CREDENTIAL_SOURCE, quota_project_id=None, workforce_pool_user_project=None, ) - @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) - def test_from_file_workforce_pool(self, mock_init, tmpdir): - info = { - "audience": WORKFORCE_AUDIENCE, - "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE, - "token_url": TOKEN_URL, - "credential_source": self.CREDENTIAL_SOURCE_TEXT, - "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT, - } - config_file = tmpdir.join("config.json") - config_file.write(json.dumps(info)) - credentials = identity_pool.Credentials.from_file(str(config_file)) - - # Confirm identity_pool.Credentials instantiated with expected attributes. - assert isinstance(credentials, identity_pool.Credentials) - mock_init.assert_called_once_with( - audience=WORKFORCE_AUDIENCE, - subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=None, - client_id=None, - client_secret=None, - credential_source=self.CREDENTIAL_SOURCE_TEXT, - quota_project_id=None, - workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, - ) - - def test_constructor_nonworkforce_with_workforce_pool_user_project(self): - with pytest.raises(ValueError) as excinfo: - self.make_credentials( - audience=AUDIENCE, - workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, - ) - - assert excinfo.match( - "workforce_pool_user_project should not be set for non-workforce " - "pool credentials" - ) - def test_constructor_invalid_options(self): credential_source = {"unsupported": "value"} with pytest.raises(ValueError) as excinfo: - self.make_credentials(credential_source=credential_source) + self.make_pluggable(credential_source=credential_source) assert excinfo.match(r"Missing credential_source") - def test_constructor_invalid_options_url_and_file(self): - credential_source = { - "url": self.CREDENTIAL_URL, - "file": SUBJECT_TOKEN_TEXT_FILE, - } - - with pytest.raises(ValueError) as excinfo: - self.make_credentials(credential_source=credential_source) - - assert excinfo.match(r"Ambiguous credential_source") - def test_constructor_invalid_options_environment_id(self): - credential_source = {"url": self.CREDENTIAL_URL, "environment_id": "aws1"} + credential_source = {"executable": self.CREDENTIAL_SOURCE_EXECUTABLE, "environment_id": "aws1"} with pytest.raises(ValueError) as excinfo: - self.make_credentials(credential_source=credential_source) + self.make_pluggable(credential_source=credential_source) assert excinfo.match( - r"Invalid Identity Pool credential_source field 'environment_id'" + r"Invalid Pluggable credential_source field 'environment_id'" ) def test_constructor_invalid_credential_source(self): with pytest.raises(ValueError) as excinfo: - self.make_credentials(credential_source="non-dict") + self.make_pluggable(credential_source="non-dict") assert excinfo.match(r"Missing credential_source") - def test_constructor_invalid_credential_source_format_type(self): - credential_source = {"format": {"type": "xml"}} - - with pytest.raises(ValueError) as excinfo: - self.make_credentials(credential_source=credential_source) - - assert excinfo.match(r"Invalid credential_source format 'xml'") - - def test_constructor_missing_subject_token_field_name(self): - credential_source = {"format": {"type": "json"}} - - with pytest.raises(ValueError) as excinfo: - self.make_credentials(credential_source=credential_source) - - assert excinfo.match( - r"Missing subject_token_field_name for JSON credential_source format" - ) - - def test_info_with_workforce_pool_user_project(self): - credentials = self.make_credentials( - audience=WORKFORCE_AUDIENCE, - subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, - credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy(), - workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, - ) - - assert credentials.info == { - "type": "external_account", - "audience": WORKFORCE_AUDIENCE, - "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE, - "token_url": TOKEN_URL, - "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL, - "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT, - } - - def test_info_with_file_credential_source(self): - credentials = self.make_credentials( - credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy() + def test_info_with_credential_source(self): + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE.copy() ) assert credentials.info == { @@ -540,569 +437,75 @@ def test_info_with_file_credential_source(self): "audience": AUDIENCE, "subject_token_type": SUBJECT_TOKEN_TYPE, "token_url": TOKEN_URL, - "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL, + "credential_source": self.CREDENTIAL_SOURCE, } - def test_info_with_url_credential_source(self): - credentials = self.make_credentials( - credential_source=self.CREDENTIAL_SOURCE_JSON_URL.copy() - ) - - assert credentials.info == { - "type": "external_account", - "audience": AUDIENCE, - "subject_token_type": SUBJECT_TOKEN_TYPE, - "token_url": TOKEN_URL, - "credential_source": self.CREDENTIAL_SOURCE_JSON_URL, - } - - def test_retrieve_subject_token_missing_subject_token(self, tmpdir): - # Provide empty text file. - empty_file = tmpdir.join("empty.txt") - empty_file.write("") - credential_source = {"file": str(empty_file)} - credentials = self.make_credentials(credential_source=credential_source) - - with pytest.raises(exceptions.RefreshError) as excinfo: - credentials.retrieve_subject_token(None) - - assert excinfo.match(r"Missing subject_token in the credential_source file") - - def test_retrieve_subject_token_text_file(self): - credentials = self.make_credentials( - credential_source=self.CREDENTIAL_SOURCE_TEXT + def test_retrieve_subject_token_oidc_id_token(self, fp): + os.environ['GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES'] = '1' + fp.register(self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(), stdout=json.dumps(self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN)) + + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE ) subject_token = credentials.retrieve_subject_token(None) - assert subject_token == TEXT_FILE_SUBJECT_TOKEN + assert subject_token == self.EXECUTABLE_OIDC_TOKEN - def test_retrieve_subject_token_json_file(self): - credentials = self.make_credentials( - credential_source=self.CREDENTIAL_SOURCE_JSON + def test_retrieve_subject_token_oidc_jwt(self, fp): + os.environ['GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES'] = '1' + fp.register(self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(), stdout=json.dumps(self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_JWT)) + + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE ) subject_token = credentials.retrieve_subject_token(None) - assert subject_token == JSON_FILE_SUBJECT_TOKEN - - def test_retrieve_subject_token_json_file_invalid_field_name(self): - credential_source = { - "file": SUBJECT_TOKEN_JSON_FILE, - "format": {"type": "json", "subject_token_field_name": "not_found"}, - } - credentials = self.make_credentials(credential_source=credential_source) - - with pytest.raises(exceptions.RefreshError) as excinfo: - credentials.retrieve_subject_token(None) - - assert excinfo.match( - "Unable to parse subject_token from JSON file '{}' using key '{}'".format( - SUBJECT_TOKEN_JSON_FILE, "not_found" - ) - ) - - def test_retrieve_subject_token_invalid_json(self, tmpdir): - # Provide JSON file. This should result in JSON parsing error. - invalid_json_file = tmpdir.join("invalid.json") - invalid_json_file.write("{") - credential_source = { - "file": str(invalid_json_file), - "format": {"type": "json", "subject_token_field_name": "access_token"}, - } - credentials = self.make_credentials(credential_source=credential_source) - - with pytest.raises(exceptions.RefreshError) as excinfo: - credentials.retrieve_subject_token(None) - - assert excinfo.match( - "Unable to parse subject_token from JSON file '{}' using key '{}'".format( - str(invalid_json_file), "access_token" - ) - ) - - def test_retrieve_subject_token_file_not_found(self): - credential_source = {"file": "./not_found.txt"} - credentials = self.make_credentials(credential_source=credential_source) - - with pytest.raises(exceptions.RefreshError) as excinfo: - credentials.retrieve_subject_token(None) - - assert excinfo.match(r"File './not_found.txt' was not found") - - def test_refresh_text_file_success_without_impersonation_ignore_default_scopes( - self, - ): - credentials = self.make_credentials( - client_id=CLIENT_ID, - client_secret=CLIENT_SECRET, - # Test with text format type. - credential_source=self.CREDENTIAL_SOURCE_TEXT, - scopes=SCOPES, - # Default scopes should be ignored. - default_scopes=["ignored"], - ) - - self.assert_underlying_credentials_refresh( - credentials=credentials, - audience=AUDIENCE, - subject_token=TEXT_FILE_SUBJECT_TOKEN, - subject_token_type=SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=None, - basic_auth_encoding=BASIC_AUTH_ENCODING, - quota_project_id=None, - used_scopes=SCOPES, - scopes=SCOPES, - default_scopes=["ignored"], - ) - - def test_refresh_workforce_success_with_client_auth_without_impersonation(self): - credentials = self.make_credentials( - audience=WORKFORCE_AUDIENCE, - subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, - client_id=CLIENT_ID, - client_secret=CLIENT_SECRET, - # Test with text format type. - credential_source=self.CREDENTIAL_SOURCE_TEXT, - scopes=SCOPES, - # This will be ignored in favor of client auth. - workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, - ) - - self.assert_underlying_credentials_refresh( - credentials=credentials, - audience=WORKFORCE_AUDIENCE, - subject_token=TEXT_FILE_SUBJECT_TOKEN, - subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=None, - basic_auth_encoding=BASIC_AUTH_ENCODING, - quota_project_id=None, - used_scopes=SCOPES, - scopes=SCOPES, - workforce_pool_user_project=None, - ) - - def test_refresh_workforce_success_with_client_auth_and_no_workforce_project(self): - credentials = self.make_credentials( - audience=WORKFORCE_AUDIENCE, - subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, - client_id=CLIENT_ID, - client_secret=CLIENT_SECRET, - # Test with text format type. - credential_source=self.CREDENTIAL_SOURCE_TEXT, - scopes=SCOPES, - # This is not needed when client Auth is used. - workforce_pool_user_project=None, - ) - - self.assert_underlying_credentials_refresh( - credentials=credentials, - audience=WORKFORCE_AUDIENCE, - subject_token=TEXT_FILE_SUBJECT_TOKEN, - subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=None, - basic_auth_encoding=BASIC_AUTH_ENCODING, - quota_project_id=None, - used_scopes=SCOPES, - scopes=SCOPES, - workforce_pool_user_project=None, - ) - - def test_refresh_workforce_success_without_client_auth_without_impersonation(self): - credentials = self.make_credentials( - audience=WORKFORCE_AUDIENCE, - subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, - client_id=None, - client_secret=None, - # Test with text format type. - credential_source=self.CREDENTIAL_SOURCE_TEXT, - scopes=SCOPES, - # This will not be ignored as client auth is not used. - workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, - ) - - self.assert_underlying_credentials_refresh( - credentials=credentials, - audience=WORKFORCE_AUDIENCE, - subject_token=TEXT_FILE_SUBJECT_TOKEN, - subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=None, - basic_auth_encoding=None, - quota_project_id=None, - used_scopes=SCOPES, - scopes=SCOPES, - workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, - ) - - def test_refresh_workforce_success_without_client_auth_with_impersonation(self): - credentials = self.make_credentials( - audience=WORKFORCE_AUDIENCE, - subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, - client_id=None, - client_secret=None, - service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, - # Test with text format type. - credential_source=self.CREDENTIAL_SOURCE_TEXT, - scopes=SCOPES, - # This will not be ignored as client auth is not used. - workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, - ) - - self.assert_underlying_credentials_refresh( - credentials=credentials, - audience=WORKFORCE_AUDIENCE, - subject_token=TEXT_FILE_SUBJECT_TOKEN, - subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, - basic_auth_encoding=None, - quota_project_id=None, - used_scopes=SCOPES, - scopes=SCOPES, - workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, - ) - - def test_refresh_text_file_success_without_impersonation_use_default_scopes(self): - credentials = self.make_credentials( - client_id=CLIENT_ID, - client_secret=CLIENT_SECRET, - # Test with text format type. - credential_source=self.CREDENTIAL_SOURCE_TEXT, - scopes=None, - # Default scopes should be used since user specified scopes are none. - default_scopes=SCOPES, - ) - - self.assert_underlying_credentials_refresh( - credentials=credentials, - audience=AUDIENCE, - subject_token=TEXT_FILE_SUBJECT_TOKEN, - subject_token_type=SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=None, - basic_auth_encoding=BASIC_AUTH_ENCODING, - quota_project_id=None, - used_scopes=SCOPES, - scopes=None, - default_scopes=SCOPES, - ) - - def test_refresh_text_file_success_with_impersonation_ignore_default_scopes(self): - # Initialize credentials with service account impersonation and basic auth. - credentials = self.make_credentials( - # Test with text format type. - credential_source=self.CREDENTIAL_SOURCE_TEXT, - service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, - scopes=SCOPES, - # Default scopes should be ignored. - default_scopes=["ignored"], - ) - - self.assert_underlying_credentials_refresh( - credentials=credentials, - audience=AUDIENCE, - subject_token=TEXT_FILE_SUBJECT_TOKEN, - subject_token_type=SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, - basic_auth_encoding=None, - quota_project_id=None, - used_scopes=SCOPES, - scopes=SCOPES, - default_scopes=["ignored"], - ) - - def test_refresh_text_file_success_with_impersonation_use_default_scopes(self): - # Initialize credentials with service account impersonation, basic auth - # and default scopes (no user scopes). - credentials = self.make_credentials( - # Test with text format type. - credential_source=self.CREDENTIAL_SOURCE_TEXT, - service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, - scopes=None, - # Default scopes should be used since user specified scopes are none. - default_scopes=SCOPES, - ) - - self.assert_underlying_credentials_refresh( - credentials=credentials, - audience=AUDIENCE, - subject_token=TEXT_FILE_SUBJECT_TOKEN, - subject_token_type=SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, - basic_auth_encoding=None, - quota_project_id=None, - used_scopes=SCOPES, - scopes=None, - default_scopes=SCOPES, - ) - - def test_refresh_json_file_success_without_impersonation(self): - credentials = self.make_credentials( - client_id=CLIENT_ID, - client_secret=CLIENT_SECRET, - # Test with JSON format type. - credential_source=self.CREDENTIAL_SOURCE_JSON, - scopes=SCOPES, - ) - - self.assert_underlying_credentials_refresh( - credentials=credentials, - audience=AUDIENCE, - subject_token=JSON_FILE_SUBJECT_TOKEN, - subject_token_type=SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=None, - basic_auth_encoding=BASIC_AUTH_ENCODING, - quota_project_id=None, - used_scopes=SCOPES, - scopes=SCOPES, - default_scopes=None, - ) - - def test_refresh_json_file_success_with_impersonation(self): - # Initialize credentials with service account impersonation and basic auth. - credentials = self.make_credentials( - # Test with JSON format type. - credential_source=self.CREDENTIAL_SOURCE_JSON, - service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, - scopes=SCOPES, - ) - - self.assert_underlying_credentials_refresh( - credentials=credentials, - audience=AUDIENCE, - subject_token=JSON_FILE_SUBJECT_TOKEN, - subject_token_type=SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, - basic_auth_encoding=None, - quota_project_id=None, - used_scopes=SCOPES, - scopes=SCOPES, - default_scopes=None, - ) - - def test_refresh_with_retrieve_subject_token_error(self): - credential_source = { - "file": SUBJECT_TOKEN_JSON_FILE, - "format": {"type": "json", "subject_token_field_name": "not_found"}, - } - credentials = self.make_credentials(credential_source=credential_source) - - with pytest.raises(exceptions.RefreshError) as excinfo: - credentials.refresh(None) - - assert excinfo.match( - "Unable to parse subject_token from JSON file '{}' using key '{}'".format( - SUBJECT_TOKEN_JSON_FILE, "not_found" - ) - ) - - def test_retrieve_subject_token_from_url(self): - credentials = self.make_credentials( - credential_source=self.CREDENTIAL_SOURCE_TEXT_URL + assert subject_token == self.EXECUTABLE_OIDC_TOKEN + + def test_retrieve_subject_token_saml(self, fp): + os.environ['GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES'] = '1' + fp.register(self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(), stdout=json.dumps(self.EXECUTABLE_SUCCESSFUL_SAML_RESPONSE)) + + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE ) - request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN) - subject_token = credentials.retrieve_subject_token(request) - assert subject_token == TEXT_FILE_SUBJECT_TOKEN - self.assert_credential_request_kwargs(request.call_args_list[0][1], None) - - def test_retrieve_subject_token_from_url_with_headers(self): - credentials = self.make_credentials( - credential_source={"url": self.CREDENTIAL_URL, "headers": {"foo": "bar"}} - ) - request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN) - subject_token = credentials.retrieve_subject_token(request) - - assert subject_token == TEXT_FILE_SUBJECT_TOKEN - self.assert_credential_request_kwargs( - request.call_args_list[0][1], {"foo": "bar"} - ) - - def test_retrieve_subject_token_from_url_json(self): - credentials = self.make_credentials( - credential_source=self.CREDENTIAL_SOURCE_JSON_URL - ) - request = self.make_mock_request(token_data=JSON_FILE_CONTENT) - subject_token = credentials.retrieve_subject_token(request) + subject_token = credentials.retrieve_subject_token(None) - assert subject_token == JSON_FILE_SUBJECT_TOKEN - self.assert_credential_request_kwargs(request.call_args_list[0][1], None) + assert subject_token == self.EXECUTABLE_SAML_TOKEN - def test_retrieve_subject_token_from_url_json_with_headers(self): - credentials = self.make_credentials( - credential_source={ - "url": self.CREDENTIAL_URL, - "format": {"type": "json", "subject_token_field_name": "access_token"}, - "headers": {"foo": "bar"}, - } + def test_retrieve_subject_token_failed(self, fp): + os.environ['GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES'] = '1' + fp.register(self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(), stdout=json.dumps(self.EXECUTABLE_FAILED_RESPONSE)) + + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE ) - request = self.make_mock_request(token_data=JSON_FILE_CONTENT) - subject_token = credentials.retrieve_subject_token(request) - assert subject_token == JSON_FILE_SUBJECT_TOKEN - self.assert_credential_request_kwargs( - request.call_args_list[0][1], {"foo": "bar"} - ) - - def test_retrieve_subject_token_from_url_not_found(self): - credentials = self.make_credentials( - credential_source=self.CREDENTIAL_SOURCE_TEXT_URL - ) with pytest.raises(exceptions.RefreshError) as excinfo: - credentials.retrieve_subject_token( - self.make_mock_request(token_status=404, token_data=JSON_FILE_CONTENT) - ) - - assert excinfo.match("Unable to retrieve Identity Pool subject token") - - def test_retrieve_subject_token_from_url_json_invalid_field(self): - credential_source = { - "url": self.CREDENTIAL_URL, - "format": {"type": "json", "subject_token_field_name": "not_found"}, + subject_token = credentials.retrieve_subject_token(None) + + assert excinfo.match(r"Executable returned unsuccessful response") + + def test_retrieve_subject_token_invalid_version(self, fp): + EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_VERSION_2 = { + "version": 2, + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:id_token", + "id_token": self.EXECUTABLE_OIDC_TOKEN, + "expiration_time": 9999999999 } - credentials = self.make_credentials(credential_source=credential_source) - - with pytest.raises(exceptions.RefreshError) as excinfo: - credentials.retrieve_subject_token( - self.make_mock_request(token_data=JSON_FILE_CONTENT) - ) - - assert excinfo.match( - "Unable to parse subject_token from JSON file '{}' using key '{}'".format( - self.CREDENTIAL_URL, "not_found" - ) - ) - - def test_retrieve_subject_token_from_url_json_invalid_format(self): - credentials = self.make_credentials( - credential_source=self.CREDENTIAL_SOURCE_JSON_URL + + os.environ['GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES'] = '1' + fp.register(self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(), stdout=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_VERSION_2)) + + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE ) with pytest.raises(exceptions.RefreshError) as excinfo: - credentials.retrieve_subject_token(self.make_mock_request(token_data="{")) - - assert excinfo.match( - "Unable to parse subject_token from JSON file '{}' using key '{}'".format( - self.CREDENTIAL_URL, "access_token" - ) - ) - - def test_refresh_text_file_success_without_impersonation_url(self): - credentials = self.make_credentials( - client_id=CLIENT_ID, - client_secret=CLIENT_SECRET, - # Test with text format type. - credential_source=self.CREDENTIAL_SOURCE_TEXT_URL, - scopes=SCOPES, - ) - - self.assert_underlying_credentials_refresh( - credentials=credentials, - audience=AUDIENCE, - subject_token=TEXT_FILE_SUBJECT_TOKEN, - subject_token_type=SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=None, - basic_auth_encoding=BASIC_AUTH_ENCODING, - quota_project_id=None, - used_scopes=SCOPES, - scopes=SCOPES, - default_scopes=None, - credential_data=TEXT_FILE_SUBJECT_TOKEN, - ) - - def test_refresh_text_file_success_with_impersonation_url(self): - # Initialize credentials with service account impersonation and basic auth. - credentials = self.make_credentials( - # Test with text format type. - credential_source=self.CREDENTIAL_SOURCE_TEXT_URL, - service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, - scopes=SCOPES, - ) + subject_token = credentials.retrieve_subject_token(None) - self.assert_underlying_credentials_refresh( - credentials=credentials, - audience=AUDIENCE, - subject_token=TEXT_FILE_SUBJECT_TOKEN, - subject_token_type=SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, - basic_auth_encoding=None, - quota_project_id=None, - used_scopes=SCOPES, - scopes=SCOPES, - default_scopes=None, - credential_data=TEXT_FILE_SUBJECT_TOKEN, - ) - - def test_refresh_json_file_success_without_impersonation_url(self): - credentials = self.make_credentials( - client_id=CLIENT_ID, - client_secret=CLIENT_SECRET, - # Test with JSON format type. - credential_source=self.CREDENTIAL_SOURCE_JSON_URL, - scopes=SCOPES, - ) - - self.assert_underlying_credentials_refresh( - credentials=credentials, - audience=AUDIENCE, - subject_token=JSON_FILE_SUBJECT_TOKEN, - subject_token_type=SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=None, - basic_auth_encoding=BASIC_AUTH_ENCODING, - quota_project_id=None, - used_scopes=SCOPES, - scopes=SCOPES, - default_scopes=None, - credential_data=JSON_FILE_CONTENT, - ) - - def test_refresh_json_file_success_with_impersonation_url(self): - # Initialize credentials with service account impersonation and basic auth. - credentials = self.make_credentials( - # Test with JSON format type. - credential_source=self.CREDENTIAL_SOURCE_JSON_URL, - service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, - scopes=SCOPES, - ) - - self.assert_underlying_credentials_refresh( - credentials=credentials, - audience=AUDIENCE, - subject_token=JSON_FILE_SUBJECT_TOKEN, - subject_token_type=SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, - basic_auth_encoding=None, - quota_project_id=None, - used_scopes=SCOPES, - scopes=SCOPES, - default_scopes=None, - credential_data=JSON_FILE_CONTENT, - ) - - def test_refresh_with_retrieve_subject_token_error_url(self): - credential_source = { - "url": self.CREDENTIAL_URL, - "format": {"type": "json", "subject_token_field_name": "not_found"}, - } - credentials = self.make_credentials(credential_source=credential_source) - - with pytest.raises(exceptions.RefreshError) as excinfo: - credentials.refresh(self.make_mock_request(token_data=JSON_FILE_CONTENT)) - - assert excinfo.match( - "Unable to parse subject_token from JSON file '{}' using key '{}'".format( - self.CREDENTIAL_URL, "not_found" - ) - ) + assert excinfo.match(r"Executable returned unsupported version") \ No newline at end of file From 52ac58265c720d60b90cc846038e44976149054e Mon Sep 17 00:00:00 2001 From: Chuan Ren Date: Tue, 1 Mar 2022 14:08:57 -0800 Subject: [PATCH 7/7] Address pr issues --- google/auth/pluggable.py | 39 ++++++++++++++++++++++++++++++++++----- tests/test_pluggable.py | 10 +++++----- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/google/auth/pluggable.py b/google/auth/pluggable.py index afecb90f6..d440079f8 100644 --- a/google/auth/pluggable.py +++ b/google/auth/pluggable.py @@ -80,9 +80,10 @@ def __init__( { "executable": { - "command": "/path/to/get/credentials.sh --arg1=value1 --arg2=value2", - "timeout_millis": 5000, - "output_file": "/path/to/generated/cached/credentials" + "command": "/path/to/get/credentials.sh --arg1=value1 --arg2=value2", + "timeout_millis": 5000, + "output_file": "/path/to/generated/cached/credentials" + } } service_account_impersonation_url (Optional[str]): The optional service account @@ -157,21 +158,49 @@ def __init__( @_helpers.copy_docstring(external_account.Credentials) def retrieve_subject_token(self, request): env_allow_executables = os.environ.get('GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES') - if env_allow_executables is None or env_allow_executables != '1': + if env_allow_executables != '1': raise ValueError( - "Executables need to be explicitly allowed to run." + "Executables need to be explicitly allowed (set GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES to '1') to run." ) # Inject env vars + original_audience = os.getenv("GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE") os.environ["GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE"] = self._audience + original_subject_token_type = os.getenv("GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE") os.environ["GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE"] = self._subject_token_type + original_interactive = os.getenv("GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE") os.environ["GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE"] = "0" # Always set to 0 until interactive mode is implemented. + original_service_account_impersonation_url = os.getenv("GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL") if self._service_account_impersonation_url is not None: os.environ["GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL"] = self._service_account_impersonation_url + original_credential_source_executable_output_file = os.getenv("GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE") if self._credential_source_executable_output_file is not None: os.environ["GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE"] = self._credential_source_executable_output_file result = subprocess.run(self._credential_source_executable_command.split(), timeout=self._credential_source_executable_timeout_millis/1000, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + # Reset env vars + if original_audience is not None: + os.environ["GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE"] = original_audience + else: + del os.environ["GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE"] + if original_subject_token_type is not None: + os.environ["GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE"] = self.original_subject_token_type + else: + del os.environ["GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE"] + if original_interactive is not None: + os.environ["GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE"] = original_interactive + else: + del os.environ["GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE"] + if original_service_account_impersonation_url is not None: + os.environ["GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL"] = original_service_account_impersonation_url + elif os.getenv("GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL") is not None: + del os.environ["GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL"] + if original_credential_source_executable_output_file is not None: + os.environ["GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE"] = original_credential_source_executable_output_file + elif os.getenv("GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE") is not None: + del os.environ["GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE"] + if result.returncode != 0: raise exceptions.RefreshError( "Executable exited with non-zero return code {}. Error: {}".format(result.returncode, result.stdout) diff --git a/tests/test_pluggable.py b/tests/test_pluggable.py index 79fe9b27c..b9d6e9f27 100644 --- a/tests/test_pluggable.py +++ b/tests/test_pluggable.py @@ -440,8 +440,8 @@ def test_info_with_credential_source(self): "credential_source": self.CREDENTIAL_SOURCE, } + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) def test_retrieve_subject_token_oidc_id_token(self, fp): - os.environ['GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES'] = '1' fp.register(self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(), stdout=json.dumps(self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN)) credentials = self.make_pluggable( @@ -452,8 +452,8 @@ def test_retrieve_subject_token_oidc_id_token(self, fp): assert subject_token == self.EXECUTABLE_OIDC_TOKEN + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) def test_retrieve_subject_token_oidc_jwt(self, fp): - os.environ['GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES'] = '1' fp.register(self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(), stdout=json.dumps(self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_JWT)) credentials = self.make_pluggable( @@ -464,8 +464,8 @@ def test_retrieve_subject_token_oidc_jwt(self, fp): assert subject_token == self.EXECUTABLE_OIDC_TOKEN + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) def test_retrieve_subject_token_saml(self, fp): - os.environ['GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES'] = '1' fp.register(self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(), stdout=json.dumps(self.EXECUTABLE_SUCCESSFUL_SAML_RESPONSE)) credentials = self.make_pluggable( @@ -476,8 +476,8 @@ def test_retrieve_subject_token_saml(self, fp): assert subject_token == self.EXECUTABLE_SAML_TOKEN + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) def test_retrieve_subject_token_failed(self, fp): - os.environ['GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES'] = '1' fp.register(self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(), stdout=json.dumps(self.EXECUTABLE_FAILED_RESPONSE)) credentials = self.make_pluggable( @@ -489,6 +489,7 @@ def test_retrieve_subject_token_failed(self, fp): assert excinfo.match(r"Executable returned unsuccessful response") + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) def test_retrieve_subject_token_invalid_version(self, fp): EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_VERSION_2 = { "version": 2, @@ -498,7 +499,6 @@ def test_retrieve_subject_token_invalid_version(self, fp): "expiration_time": 9999999999 } - os.environ['GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES'] = '1' fp.register(self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(), stdout=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_VERSION_2)) credentials = self.make_pluggable(