diff --git a/docs/user-guide.rst b/docs/user-guide.rst index 239b5a6d7..a09247897 100644 --- a/docs/user-guide.rst +++ b/docs/user-guide.rst @@ -329,6 +329,155 @@ Follow the detailed instructions on how to .. _Configure Workload Identity Federation from an OIDC identity provider: https://cloud.google.com/iam/docs/access-resources-oidc +Using Executable-sourced credentials with OIDC and SAML +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Executable-sourced credentials** For executable-sourced credentials, a +local executable is used to retrieve the 3rd party token. The executable +must handle providing a valid, unexpired OIDC ID token or SAML assertion +in JSON format to stdout. + +To use executable-sourced credentials, the +``GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES`` environment variable must +be set to ``1``. + +To generate an executable-sourced workload identity configuration, run +the following command: + +.. code:: bash + + # Generate a configuration file for executable-sourced credentials. + gcloud iam workload-identity-pools create-cred-config \ + projects/$PROJECT_NUMBER/locations/global/workloadIdentityPools/$POOL_ID/providers/$PROVIDER_ID \ + --service-account=$SERVICE_ACCOUNT_EMAIL \ + --subject-token-type=$SUBJECT_TOKEN_TYPE \ + # The absolute path for the program, including arguments. + # e.g. --executable-command="/path/to/command --foo=bar" + --executable-command=$EXECUTABLE_COMMAND \ + # Optional argument for the executable timeout. Defaults to 30s. + # --executable-timeout-millis=$EXECUTABLE_TIMEOUT \ + # Optional argument for the absolute path to the executable output file. + # See below on how this argument impacts the library behaviour. + # --executable-output-file=$EXECUTABLE_OUTPUT_FILE \ + --output-file /path/to/generated/config.json + +Where the following variables need to be substituted: - +``$PROJECT_NUMBER``: The Google Cloud project number. - ``$POOL_ID``: +The workload identity pool ID. - ``$PROVIDER_ID``: The OIDC or SAML +provider ID. - ``$SERVICE_ACCOUNT_EMAIL``: The email of the service +account to impersonate. - ``$SUBJECT_TOKEN_TYPE``: The subject token +type. - ``$EXECUTABLE_COMMAND``: The full command to run, including +arguments. Must be an absolute path to the program. + +The ``--executable-timeout-millis`` flag is optional. This is the +duration for which the auth library will wait for the executable to +finish, in milliseconds. Defaults to 30 seconds when not provided. The +maximum allowed value is 2 minutes. The minimum is 5 seconds. + +The ``--executable-output-file`` flag is optional. If provided, the file +path must point to the 3PI credential response generated by the +executable. This is useful for caching the credentials. By specifying +this path, the Auth libraries will first check for its existence before +running the executable. By caching the executable JSON response to this +file, it improves performance as it avoids the need to run the +executable until the cached credentials in the output file are expired. +The executable must handle writing to this file - the auth libraries +will only attempt to read from this location. The format of contents in +the file should match the JSON format expected by the executable shown +below. + +To retrieve the 3rd party token, the library will call the executable +using the command specified. The executable’s output must adhere to the +response format specified below. It must output the response to stdout. + +A sample successful executable OIDC response: + +.. code:: json + + { + "version": 1, + "success": true, + "token_type": "urn:ietf:params:oauth:token-type:id_token", + "id_token": "HEADER.PAYLOAD.SIGNATURE", + "expiration_time": 1620499962 + } + +A sample successful executable SAML response: + +.. code:: json + + { + "version": 1, + "success": true, + "token_type": "urn:ietf:params:oauth:token-type:saml2", + "saml_response": "...", + "expiration_time": 1620499962 + } + +A sample executable error response: + +.. code:: json + + { + "version": 1, + "success": false, + "code": "401", + "message": "Caller not authorized." + } + +These are all required fields for an error response. The code and +message fields will be used by the library as part of the thrown +exception. + +Response format fields summary: ``version``: The version of the JSON +output. Currently only version 1 is supported. ``success``: The +status of the response. When true, the response must contain the 3rd +party token, token type, and expiration. The executable must also exit +with exit code 0. When false, the response must contain the error code +and message fields and exit with a non-zero value. ``token_type``: +The 3rd party subject token type. Must be +*urn:ietf:params:oauth:token-type:jwt*, +*urn:ietf:params:oauth:token-type:id_token*, or +*urn:ietf:params:oauth:token-type:saml2*. ``id_token``: The 3rd party +OIDC token. ``saml_response``: The 3rd party SAML response. +``expiration_time``: The 3rd party subject token expiration time in +seconds (unix epoch time). ``code``: The error code string. +``message``: The error message. + +All response types must include both the ``version`` and ``success`` +fields. Successful responses must include the ``token_type``, +``expiration_time``, and one of ``id_token`` or ``saml_response``. +Error responses must include both the ``code`` and ``message`` fields. + +The library will populate the following environment variables when the +executable is run: ``GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE``: The audience +field from the credential configuration. Always present. +``GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL``: The service account +email. Only present when service account impersonation is used. +``GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE``: The output file location from +the credential configuration. Only present when specified in the +credential configuration. + +These environment variables can be used by the executable to avoid +hard-coding these values. + +Security considerations + + The following security practices are highly recommended: + Access to the script should be restricted as it will be displaying + credentials to stdout. This ensures that rogue processes do not gain + access to the script. The configuration file should not be + modifiable. Write access should be restricted to avoid processes + modifying the executable command portion. + +Given the complexity of using executable-sourced credentials, it is +recommended to use the existing supported mechanisms +(file-sourced/URL-sourced) for providing 3rd party credentials unless +they do not meet your specific requirements. + +You can now `use the Auth library <#using-external-identities>`__ to +call Google Cloud resources from an OIDC or SAML provider. + Using External Identities ~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -395,7 +544,7 @@ Impersonated credentials ++++++++++++++++++++++++ Impersonated Credentials allows one set of credentials issued to a user or service account -to impersonate another. The source credentials must be granted +to impersonate another. The source credentials must be granted the "Service Account Token Creator" IAM role. :: from google.auth import impersonated_credentials @@ -417,7 +566,7 @@ the "Service Account Token Creator" IAM role. :: In the example above `source_credentials` does not have direct access to list buckets -in the target project. Using `ImpersonatedCredentials` will allow the source_credentials +in the target project. Using `ImpersonatedCredentials` will allow the source_credentials to assume the identity of a target_principal that does have access. diff --git a/google/auth/_default.py b/google/auth/_default.py index 3a4190389..a2a07800a 100644 --- a/google/auth/_default.py +++ b/google/auth/_default.py @@ -324,7 +324,7 @@ def _get_external_account_credentials( google.auth.exceptions.DefaultCredentialsError: if the info dictionary is in the wrong format or is missing required information. """ - # There are currently 2 types of external_account credentials. + # There are currently 3 types of external_account credentials. if info.get("subject_token_type") == _AWS_SUBJECT_TOKEN_TYPE: # Check if configuration corresponds to an AWS credentials. from google.auth import aws @@ -332,6 +332,15 @@ def _get_external_account_credentials( credentials = aws.Credentials.from_info( info, scopes=scopes, default_scopes=default_scopes ) + elif ( + info.get("credential_source") is not None + and info.get("credential_source").get("executable") is not None + ): + from google.auth import pluggable + + credentials = pluggable.Credentials.from_info( + info, scopes=scopes, default_scopes=default_scopes + ) else: try: # Check if configuration corresponds to an Identity Pool credentials. diff --git a/google/auth/pluggable.py b/google/auth/pluggable.py new file mode 100644 index 000000000..12cd6240e --- /dev/null +++ b/google/auth/pluggable.py @@ -0,0 +1,322 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Pluggable Credentials. +Pluggable Credentials are initialized using external_account arguments which +are typically loaded from third-party executables. Unlike other +credentials that can be initialized with a list of explicit arguments, secrets +or credentials, external account clients use the environment and hints/guidelines +provided by the external_account JSON file to retrieve credentials and exchange +them for Google access tokens. + +Example credential_source for pluggable credential: +{ + "executable": { + "command": "/path/to/get/credentials.sh --arg1=value1 --arg2=value2", + "timeout_millis": 5000, + "output_file": "/path/to/generated/cached/credentials" + } +} +""" + +try: + from collections.abc import Mapping +# Python 2.7 compatibility +except ImportError: # pragma: NO COVER + from collections import Mapping +import io +import json +import os +import subprocess +import time + +from google.auth import _helpers +from google.auth import exceptions +from google.auth import external_account + +# The max supported executable spec version. +EXECUTABLE_SUPPORTED_MAX_VERSION = 1 + + +class Credentials(external_account.Credentials): + """External account credentials sourced from executables.""" + + def __init__( + self, + audience, + subject_token_type, + token_url, + credential_source, + service_account_impersonation_url=None, + client_id=None, + client_secret=None, + quota_project_id=None, + scopes=None, + default_scopes=None, + workforce_pool_user_project=None, + ): + """Instantiates an external account credentials object from a executables. + + Args: + audience (str): The STS audience field. + subject_token_type (str): The subject token type. + token_url (str): The STS endpoint URL. + credential_source (Mapping): The credential source dictionary used to + provide instructions on how to retrieve external credential to be + exchanged for Google access tokens. + + Example credential_source for pluggable credential: + + { + "executable": { + "command": "/path/to/get/credentials.sh --arg1=value1 --arg2=value2", + "timeout_millis": 5000, + "output_file": "/path/to/generated/cached/credentials" + } + } + + service_account_impersonation_url (Optional[str]): The optional service account + impersonation getAccessToken URL. + client_id (Optional[str]): The optional client ID. + client_secret (Optional[str]): The optional client secret. + quota_project_id (Optional[str]): The optional quota project ID. + scopes (Optional[Sequence[str]]): Optional scopes to request during the + authorization grant. + default_scopes (Optional[Sequence[str]]): Default scopes passed by a + Google client library. Use 'scopes' for user-defined scopes. + workforce_pool_user_project (Optona[str]): The optional workforce pool user + project number when the credential corresponds to a workforce pool and not + a workload Pluggable. The underlying principal must still have + serviceusage.services.use IAM permission to use the project for + billing/quota. + + Raises: + google.auth.exceptions.RefreshError: If an error is encountered during + access token retrieval logic. + ValueError: For invalid parameters. + + .. note:: Typically one of the helper constructors + :meth:`from_file` or + :meth:`from_info` are used instead of calling the constructor directly. + """ + + super(Credentials, self).__init__( + audience=audience, + subject_token_type=subject_token_type, + token_url=token_url, + credential_source=credential_source, + service_account_impersonation_url=service_account_impersonation_url, + client_id=client_id, + client_secret=client_secret, + quota_project_id=quota_project_id, + scopes=scopes, + default_scopes=default_scopes, + workforce_pool_user_project=workforce_pool_user_project, + ) + if not isinstance(credential_source, Mapping): + self._credential_source_executable = None + raise ValueError( + "Missing credential_source. The credential_source is not a dict." + ) + self._credential_source_executable = credential_source.get("executable") + if not self._credential_source_executable: + raise ValueError( + "Missing credential_source. An 'executable' must be provided." + ) + self._credential_source_executable_command = self._credential_source_executable.get( + "command" + ) + self._credential_source_executable_timeout_millis = self._credential_source_executable.get( + "timeout_millis" + ) + self._credential_source_executable_output_file = self._credential_source_executable.get( + "output_file" + ) + + if not self._credential_source_executable_command: + raise ValueError( + "Missing command field. Executable command must be provided." + ) + if not self._credential_source_executable_timeout_millis: + self._credential_source_executable_timeout_millis = 30 * 1000 + elif ( + self._credential_source_executable_timeout_millis < 5 * 1000 + or self._credential_source_executable_timeout_millis > 120 * 1000 + ): + raise ValueError("Timeout must be between 5 and 120 seconds.") + + @_helpers.copy_docstring(external_account.Credentials) + def retrieve_subject_token(self, request): + env_allow_executables = os.environ.get( + "GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES" + ) + if env_allow_executables != "1": + raise ValueError( + "Executables need to be explicitly allowed (set GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES to '1') to run." + ) + + # Check output file. + if self._credential_source_executable_output_file is not None: + try: + with open( + self._credential_source_executable_output_file + ) as output_file: + response = json.load(output_file) + except Exception: + pass + else: + try: + # If the cached response is expired, _parse_subject_token will raise an error which will be ignored and we will call the executable again. + subject_token = self._parse_subject_token(response) + except ValueError: + raise + except exceptions.RefreshError: + pass + else: + return subject_token + + if not _helpers.is_python_3(): + raise exceptions.RefreshError( + "Pluggable auth is only supported for python 3.6+" + ) + + # Inject env vars. + env = os.environ.copy() + env["GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE"] = self._audience + env["GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE"] = self._subject_token_type + env[ + "GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE" + ] = "0" # Always set to 0 until interactive mode is implemented. + if self._service_account_impersonation_url is not None: + env[ + "GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL" + ] = self.service_account_email + if self._credential_source_executable_output_file is not None: + env[ + "GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE" + ] = self._credential_source_executable_output_file + + try: + result = subprocess.run( + self._credential_source_executable_command.split(), + timeout=self._credential_source_executable_timeout_millis / 1000, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env, + ) + if result.returncode != 0: + raise exceptions.RefreshError( + "Executable exited with non-zero return code {}. Error: {}".format( + result.returncode, result.stdout + ) + ) + except Exception: + raise + else: + try: + data = result.stdout.decode("utf-8") + response = json.loads(data) + subject_token = self._parse_subject_token(response) + except Exception: + raise + + return subject_token + + @classmethod + def from_info(cls, info, **kwargs): + """Creates a Pluggable Credentials instance from parsed external account info. + + Args: + info (Mapping[str, str]): The Pluggable external account info in Google + format. + kwargs: Additional arguments to pass to the constructor. + + Returns: + google.auth.pluggable.Credentials: The constructed + credentials. + + Raises: + ValueError: For invalid parameters. + """ + return cls( + audience=info.get("audience"), + subject_token_type=info.get("subject_token_type"), + token_url=info.get("token_url"), + service_account_impersonation_url=info.get( + "service_account_impersonation_url" + ), + client_id=info.get("client_id"), + client_secret=info.get("client_secret"), + credential_source=info.get("credential_source"), + quota_project_id=info.get("quota_project_id"), + workforce_pool_user_project=info.get("workforce_pool_user_project"), + **kwargs + ) + + @classmethod + def from_file(cls, filename, **kwargs): + """Creates an Pluggable Credentials instance from an external account json file. + + Args: + filename (str): The path to the Pluggable external account json file. + kwargs: Additional arguments to pass to the constructor. + + Returns: + google.auth.pluggable.Credentials: The constructed + credentials. + """ + with io.open(filename, "r", encoding="utf-8") as json_file: + data = json.load(json_file) + return cls.from_info(data, **kwargs) + + def _parse_subject_token(self, response): + if "version" not in response: + raise ValueError("The executable response is missing the version field.") + if response["version"] > EXECUTABLE_SUPPORTED_MAX_VERSION: + raise exceptions.RefreshError( + "Executable returned unsupported version {}.".format( + response["version"] + ) + ) + if "success" not in response: + raise ValueError("The executable response is missing the success field.") + if not response["success"]: + if "code" not in response or "message" not in response: + raise ValueError( + "Error code and message fields are required in the response." + ) + raise exceptions.RefreshError( + "Executable returned unsuccessful response: code: {}, message: {}.".format( + response["code"], response["message"] + ) + ) + if "expiration_time" not in response: + raise ValueError( + "The executable response is missing the expiration_time field." + ) + if response["expiration_time"] < time.time(): + raise exceptions.RefreshError( + "The token returned by the executable is expired." + ) + if "token_type" not in response: + raise ValueError("The executable response is missing the token_type field.") + if ( + response["token_type"] == "urn:ietf:params:oauth:token-type:jwt" + or response["token_type"] == "urn:ietf:params:oauth:token-type:id_token" + ): # OIDC + return response["id_token"] + elif response["token_type"] == "urn:ietf:params:oauth:token-type:saml2": # SAML + return response["saml_response"] + else: + raise exceptions.RefreshError("Executable returned unsupported token type.") diff --git a/noxfile.py b/noxfile.py index 7221315b0..18a7232e4 100644 --- a/noxfile.py +++ b/noxfile.py @@ -117,6 +117,7 @@ def unit_prev_versions(session): "--cov=google.auth", "--cov=google.oauth2", "--cov=tests", + "--ignore=tests/test_pluggable.py", # Pluggable auth only support 3.6+ for now. "tests", "--ignore=tests/transport/test__custom_tls_signer.py", # enterprise cert is for python 3.6+ ) diff --git a/tests/test__default.py b/tests/test__default.py index 61772c2e3..5ea9c73c5 100644 --- a/tests/test__default.py +++ b/tests/test__default.py @@ -28,6 +28,7 @@ from google.auth import external_account from google.auth import identity_pool from google.auth import impersonated_credentials +from google.auth import pluggable from google.oauth2 import gdch_credentials from google.oauth2 import service_account import google.oauth2.credentials @@ -75,6 +76,13 @@ "token_url": TOKEN_URL, "credential_source": {"file": SUBJECT_TOKEN_TEXT_FILE}, } +PLUGGABLE_DATA = { + "type": "external_account", + "audience": AUDIENCE, + "subject_token_type": "urn:ietf:params:oauth:token-type:jwt", + "token_url": TOKEN_URL, + "credential_source": {"executable": {"command": "command"}}, +} AWS_DATA = { "type": "external_account", "audience": AUDIENCE, @@ -1153,6 +1161,18 @@ def test_default_impersonated_service_account_set_both_scopes_and_default_scopes assert credentials._target_scopes == scopes +@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH +def test_load_credentials_from_external_account_pluggable(get_project_id, tmpdir): + config_file = tmpdir.join("config.json") + config_file.write(json.dumps(PLUGGABLE_DATA)) + credentials, project_id = _default.load_credentials_from_file(str(config_file)) + + assert isinstance(credentials, pluggable.Credentials) + # Since no scopes are specified, the project ID cannot be determined. + assert project_id is None + assert get_project_id.called + + @mock.patch( "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True ) diff --git a/tests/test_pluggable.py b/tests/test_pluggable.py new file mode 100644 index 000000000..61ddabd45 --- /dev/null +++ b/tests/test_pluggable.py @@ -0,0 +1,752 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# import datetime +import json +import os +import subprocess + +import mock +import pytest # type: ignore + +# from six.moves import http_client +# from six.moves import urllib + +# from google.auth import _helpers +from google.auth import exceptions +from google.auth import pluggable + +# from google.auth import transport + + +CLIENT_ID = "username" +CLIENT_SECRET = "password" +# Base64 encoding of "username:password". +BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ=" +SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com" +SERVICE_ACCOUNT_IMPERSONATION_URL = ( + "https://us-east1-iamcredentials.googleapis.com/v1/projects/-" + + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL) +) +QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID" +SCOPES = ["scope1", "scope2"] +SUBJECT_TOKEN_FIELD_NAME = "access_token" + +TOKEN_URL = "https://sts.googleapis.com/v1/token" +SERVICE_ACCOUNT_IMPERSONATION_URL = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/byoid-test@cicpclientproj.iam.gserviceaccount.com:generateAccessToken" +SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt" +AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID" + + +class TestCredentials(object): + CREDENTIAL_SOURCE_EXECUTABLE_COMMAND = ( + "/fake/external/excutable --arg1=value1 --arg2=value2" + ) + CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE = "fake_output_file" + CREDENTIAL_SOURCE_EXECUTABLE = { + "command": CREDENTIAL_SOURCE_EXECUTABLE_COMMAND, + "timeout_millis": 30000, + "output_file": CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, + } + CREDENTIAL_SOURCE = {"executable": CREDENTIAL_SOURCE_EXECUTABLE} + EXECUTABLE_OIDC_TOKEN = "FAKE_ID_TOKEN" + EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN = { + "version": 1, + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:id_token", + "id_token": EXECUTABLE_OIDC_TOKEN, + "expiration_time": 9999999999, + } + EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_JWT = { + "version": 1, + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:jwt", + "id_token": EXECUTABLE_OIDC_TOKEN, + "expiration_time": 9999999999, + } + EXECUTABLE_SAML_TOKEN = "FAKE_SAML_RESPONSE" + EXECUTABLE_SUCCESSFUL_SAML_RESPONSE = { + "version": 1, + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:saml2", + "saml_response": EXECUTABLE_SAML_TOKEN, + "expiration_time": 9999999999, + } + EXECUTABLE_FAILED_RESPONSE = { + "version": 1, + "success": False, + "code": "401", + "message": "Permission denied. Caller not authorized", + } + CREDENTIAL_URL = "http://fakeurl.com" + + @classmethod + def make_pluggable( + cls, + audience=AUDIENCE, + subject_token_type=SUBJECT_TOKEN_TYPE, + client_id=None, + client_secret=None, + quota_project_id=None, + scopes=None, + default_scopes=None, + service_account_impersonation_url=None, + credential_source=None, + workforce_pool_user_project=None, + ): + return pluggable.Credentials( + audience=audience, + subject_token_type=subject_token_type, + token_url=TOKEN_URL, + service_account_impersonation_url=service_account_impersonation_url, + credential_source=credential_source, + client_id=client_id, + client_secret=client_secret, + quota_project_id=quota_project_id, + scopes=scopes, + default_scopes=default_scopes, + workforce_pool_user_project=workforce_pool_user_project, + ) + + @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) + def test_from_info_full_options(self, mock_init): + credentials = pluggable.Credentials.from_info( + { + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + "quota_project_id": QUOTA_PROJECT_ID, + "credential_source": self.CREDENTIAL_SOURCE, + } + ) + + # Confirm pluggable.Credentials instantiated with expected attributes. + assert isinstance(credentials, pluggable.Credentials) + mock_init.assert_called_once_with( + audience=AUDIENCE, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + credential_source=self.CREDENTIAL_SOURCE, + quota_project_id=QUOTA_PROJECT_ID, + workforce_pool_user_project=None, + ) + + @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) + def test_from_info_required_options_only(self, mock_init): + credentials = pluggable.Credentials.from_info( + { + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE, + } + ) + + # Confirm pluggable.Credentials instantiated with expected attributes. + assert isinstance(credentials, pluggable.Credentials) + mock_init.assert_called_once_with( + audience=AUDIENCE, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + client_id=None, + client_secret=None, + credential_source=self.CREDENTIAL_SOURCE, + quota_project_id=None, + workforce_pool_user_project=None, + ) + + @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) + def test_from_file_full_options(self, mock_init, tmpdir): + info = { + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + "quota_project_id": QUOTA_PROJECT_ID, + "credential_source": self.CREDENTIAL_SOURCE, + } + config_file = tmpdir.join("config.json") + config_file.write(json.dumps(info)) + credentials = pluggable.Credentials.from_file(str(config_file)) + + # Confirm pluggable.Credentials instantiated with expected attributes. + assert isinstance(credentials, pluggable.Credentials) + mock_init.assert_called_once_with( + audience=AUDIENCE, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + credential_source=self.CREDENTIAL_SOURCE, + quota_project_id=QUOTA_PROJECT_ID, + workforce_pool_user_project=None, + ) + + @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) + def test_from_file_required_options_only(self, mock_init, tmpdir): + info = { + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE, + } + config_file = tmpdir.join("config.json") + config_file.write(json.dumps(info)) + credentials = pluggable.Credentials.from_file(str(config_file)) + + # Confirm pluggable.Credentials instantiated with expected attributes. + assert isinstance(credentials, pluggable.Credentials) + mock_init.assert_called_once_with( + audience=AUDIENCE, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + client_id=None, + client_secret=None, + credential_source=self.CREDENTIAL_SOURCE, + quota_project_id=None, + workforce_pool_user_project=None, + ) + + def test_constructor_invalid_options(self): + credential_source = {"unsupported": "value"} + + with pytest.raises(ValueError) as excinfo: + self.make_pluggable(credential_source=credential_source) + + assert excinfo.match(r"Missing credential_source") + + def test_constructor_invalid_credential_source(self): + with pytest.raises(ValueError) as excinfo: + self.make_pluggable(credential_source="non-dict") + + assert excinfo.match(r"Missing credential_source") + + def test_info_with_credential_source(self): + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE.copy() + ) + + assert credentials.info == { + "type": "external_account", + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE, + } + + @mock.patch.dict( + os.environ, + { + "GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1", + "GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE": "original_audience", + "GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE": "original_token_type", + "GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE": "0", + "GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL": "original_impersonated_email", + "GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE": "original_output_file", + }, + ) + def test_retrieve_subject_token_oidc_id_token(self): + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + stdout=json.dumps( + self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN + ).encode("UTF-8"), + returncode=0, + ), + ): + credentials = self.make_pluggable( + audience=AUDIENCE, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + credential_source=self.CREDENTIAL_SOURCE, + ) + + subject_token = credentials.retrieve_subject_token(None) + + assert subject_token == self.EXECUTABLE_OIDC_TOKEN + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_oidc_jwt(self): + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + stdout=json.dumps(self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_JWT).encode( + "UTF-8" + ), + returncode=0, + ), + ): + credentials = self.make_pluggable( + audience=AUDIENCE, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + credential_source=self.CREDENTIAL_SOURCE, + ) + + subject_token = credentials.retrieve_subject_token(None) + + assert subject_token == self.EXECUTABLE_OIDC_TOKEN + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_saml(self): + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + stdout=json.dumps(self.EXECUTABLE_SUCCESSFUL_SAML_RESPONSE).encode( + "UTF-8" + ), + returncode=0, + ), + ): + credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) + + subject_token = credentials.retrieve_subject_token(None) + + assert subject_token == self.EXECUTABLE_SAML_TOKEN + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_failed(self): + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + stdout=json.dumps(self.EXECUTABLE_FAILED_RESPONSE).encode("UTF-8"), + returncode=0, + ), + ): + credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) + + with pytest.raises(exceptions.RefreshError) as excinfo: + _ = credentials.retrieve_subject_token(None) + + assert excinfo.match( + r"Executable returned unsuccessful response: code: 401, message: Permission denied. Caller not authorized." + ) + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "0"}) + def test_retrieve_subject_token_not_allowd(self): + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + stdout=json.dumps( + self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN + ).encode("UTF-8"), + returncode=0, + ), + ): + credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) + + with pytest.raises(ValueError) as excinfo: + _ = credentials.retrieve_subject_token(None) + + assert excinfo.match(r"Executables need to be explicitly allowed") + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_invalid_version(self): + EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_VERSION_2 = { + "version": 2, + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:id_token", + "id_token": self.EXECUTABLE_OIDC_TOKEN, + "expiration_time": 9999999999, + } + + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + stdout=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_VERSION_2).encode( + "UTF-8" + ), + returncode=0, + ), + ): + credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) + + with pytest.raises(exceptions.RefreshError) as excinfo: + _ = credentials.retrieve_subject_token(None) + + assert excinfo.match(r"Executable returned unsupported version.") + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_expired_token(self): + EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_EXPIRED = { + "version": 1, + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:id_token", + "id_token": self.EXECUTABLE_OIDC_TOKEN, + "expiration_time": 0, + } + + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + stdout=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_EXPIRED).encode( + "UTF-8" + ), + returncode=0, + ), + ): + credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) + + with pytest.raises(exceptions.RefreshError) as excinfo: + _ = credentials.retrieve_subject_token(None) + + assert excinfo.match(r"The token returned by the executable is expired.") + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_file_cache(self): + ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE = "actual_output_file" + ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE = { + "command": "command", + "timeout_millis": 30000, + "output_file": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, + } + ACTUAL_CREDENTIAL_SOURCE = {"executable": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE} + with open(ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, "w") as output_file: + json.dump(self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN, output_file) + + credentials = self.make_pluggable(credential_source=ACTUAL_CREDENTIAL_SOURCE) + + subject_token = credentials.retrieve_subject_token(None) + assert subject_token == self.EXECUTABLE_OIDC_TOKEN + + os.remove(ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE) + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_no_file_cache(self): + ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE = { + "command": "command", + "timeout_millis": 30000, + } + ACTUAL_CREDENTIAL_SOURCE = {"executable": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE} + + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + stdout=json.dumps( + self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN + ).encode("UTF-8"), + returncode=0, + ), + ): + credentials = self.make_pluggable( + credential_source=ACTUAL_CREDENTIAL_SOURCE + ) + + subject_token = credentials.retrieve_subject_token(None) + + assert subject_token == self.EXECUTABLE_OIDC_TOKEN + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_file_cache_value_error_report(self): + ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE = "actual_output_file" + ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE = { + "command": "command", + "timeout_millis": 30000, + "output_file": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, + } + ACTUAL_CREDENTIAL_SOURCE = {"executable": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE} + ACTUAL_EXECUTABLE_RESPONSE = { + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:id_token", + "id_token": self.EXECUTABLE_OIDC_TOKEN, + "expiration_time": 9999999999, + } + with open(ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, "w") as output_file: + json.dump(ACTUAL_EXECUTABLE_RESPONSE, output_file) + + credentials = self.make_pluggable(credential_source=ACTUAL_CREDENTIAL_SOURCE) + + with pytest.raises(ValueError) as excinfo: + _ = credentials.retrieve_subject_token(None) + + assert excinfo.match(r"The executable response is missing the version field.") + + os.remove(ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE) + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_file_cache_refresh_error_retry(self): + ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE = "actual_output_file" + ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE = { + "command": "command", + "timeout_millis": 30000, + "output_file": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, + } + ACTUAL_CREDENTIAL_SOURCE = {"executable": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE} + ACTUAL_EXECUTABLE_RESPONSE = { + "version": 2, + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:id_token", + "id_token": self.EXECUTABLE_OIDC_TOKEN, + "expiration_time": 9999999999, + } + with open(ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, "w") as output_file: + json.dump(ACTUAL_EXECUTABLE_RESPONSE, output_file) + + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + stdout=json.dumps( + self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN + ).encode("UTF-8"), + returncode=0, + ), + ): + credentials = self.make_pluggable( + credential_source=ACTUAL_CREDENTIAL_SOURCE + ) + + subject_token = credentials.retrieve_subject_token(None) + + assert subject_token == self.EXECUTABLE_OIDC_TOKEN + + os.remove(ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE) + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_unsupported_token_type(self): + EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE = { + "version": 1, + "success": True, + "token_type": "unsupported_token_type", + "id_token": self.EXECUTABLE_OIDC_TOKEN, + "expiration_time": 9999999999, + } + + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + stdout=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE).encode("UTF-8"), + returncode=0, + ), + ): + credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) + + with pytest.raises(exceptions.RefreshError) as excinfo: + _ = credentials.retrieve_subject_token(None) + + assert excinfo.match(r"Executable returned unsupported token type.") + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_missing_version(self): + EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE = { + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:id_token", + "id_token": self.EXECUTABLE_OIDC_TOKEN, + "expiration_time": 9999999999, + } + + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + stdout=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE).encode("UTF-8"), + returncode=0, + ), + ): + credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) + + with pytest.raises(ValueError) as excinfo: + _ = credentials.retrieve_subject_token(None) + + assert excinfo.match( + r"The executable response is missing the version field." + ) + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_missing_success(self): + EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE = { + "version": 1, + "token_type": "urn:ietf:params:oauth:token-type:id_token", + "id_token": self.EXECUTABLE_OIDC_TOKEN, + "expiration_time": 9999999999, + } + + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + stdout=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE).encode("UTF-8"), + returncode=0, + ), + ): + credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) + + with pytest.raises(ValueError) as excinfo: + _ = credentials.retrieve_subject_token(None) + + assert excinfo.match( + r"The executable response is missing the success field." + ) + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_missing_error_code_message(self): + EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE = {"version": 1, "success": False} + + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + stdout=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE).encode("UTF-8"), + returncode=0, + ), + ): + credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) + + with pytest.raises(ValueError) as excinfo: + _ = credentials.retrieve_subject_token(None) + + assert excinfo.match( + r"Error code and message fields are required in the response." + ) + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_missing_expiration_time(self): + EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE = { + "version": 1, + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:id_token", + "id_token": self.EXECUTABLE_OIDC_TOKEN, + } + + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + stdout=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE).encode("UTF-8"), + returncode=0, + ), + ): + credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) + + with pytest.raises(ValueError) as excinfo: + _ = credentials.retrieve_subject_token(None) + + assert excinfo.match( + r"The executable response is missing the expiration_time field." + ) + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_missing_token_type(self): + EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE = { + "version": 1, + "success": True, + "id_token": self.EXECUTABLE_OIDC_TOKEN, + "expiration_time": 9999999999, + } + + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + stdout=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE).encode("UTF-8"), + returncode=0, + ), + ): + credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) + + with pytest.raises(ValueError) as excinfo: + _ = credentials.retrieve_subject_token(None) + + assert excinfo.match( + r"The executable response is missing the token_type field." + ) + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_credential_source_missing_command(self): + with pytest.raises(ValueError) as excinfo: + CREDENTIAL_SOURCE = { + "executable": { + "timeout_millis": 30000, + "output_file": self.CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, + } + } + _ = self.make_pluggable(credential_source=CREDENTIAL_SOURCE) + + assert excinfo.match( + r"Missing command field. Executable command must be provided." + ) + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_credential_source_timeout_small(self): + with pytest.raises(ValueError) as excinfo: + CREDENTIAL_SOURCE = { + "executable": { + "command": self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND, + "timeout_millis": 5000 - 1, + "output_file": self.CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, + } + } + _ = self.make_pluggable(credential_source=CREDENTIAL_SOURCE) + + assert excinfo.match(r"Timeout must be between 5 and 120 seconds.") + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_credential_source_timeout_large(self): + with pytest.raises(ValueError) as excinfo: + CREDENTIAL_SOURCE = { + "executable": { + "command": self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND, + "timeout_millis": 120000 + 1, + "output_file": self.CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, + } + } + _ = self.make_pluggable(credential_source=CREDENTIAL_SOURCE) + + assert excinfo.match(r"Timeout must be between 5 and 120 seconds.") + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_executable_fail(self): + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], stdout=None, returncode=1 + ), + ): + credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) + + with pytest.raises(exceptions.RefreshError) as excinfo: + _ = credentials.retrieve_subject_token(None) + + assert excinfo.match( + r"Executable exited with non-zero return code 1. Error: None" + ) + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_python_2(self): + with mock.patch("sys.version_info", (2, 7)): + credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) + + with pytest.raises(exceptions.RefreshError) as excinfo: + _ = credentials.retrieve_subject_token(None) + + assert excinfo.match(r"Pluggable auth is only supported for python 3.6+")