diff --git a/google/auth/_default.py b/google/auth/_default.py index 15799cea7..fd346b102 100644 --- a/google/auth/_default.py +++ b/google/auth/_default.py @@ -324,7 +324,7 @@ def _get_external_account_credentials( google.auth.exceptions.DefaultCredentialsError: if the info dictionary is in the wrong format or is missing required information. """ - # There are currently 3 types of external_account credentials. + # There are currently 2 types of external_account credentials. if info.get("subject_token_type") == _AWS_SUBJECT_TOKEN_TYPE: # Check if configuration corresponds to an AWS credentials. from google.auth import aws @@ -332,15 +332,6 @@ def _get_external_account_credentials( credentials = aws.Credentials.from_info( info, scopes=scopes, default_scopes=default_scopes ) - elif ( - info.get("credential_source") is not None - and info.get("credential_source").get("executable") is not None - ): - from google.auth import pluggable - - credentials = pluggable.Credentials.from_info( - info, scopes=scopes, default_scopes=default_scopes - ) else: try: # Check if configuration corresponds to an Identity Pool credentials. diff --git a/google/auth/pluggable.py b/google/auth/pluggable.py deleted file mode 100644 index ebfb2c585..000000000 --- a/google/auth/pluggable.py +++ /dev/null @@ -1,349 +0,0 @@ -# Copyright 2022 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Pluggable Credentials. -Pluggable Credentials are initialized using external_account arguments which -are typically loaded from third-party executables. Unlike other -credentials that can be initialized with a list of explicit arguments, secrets -or credentials, external account clients use the environment and hints/guidelines -provided by the external_account JSON file to retrieve credentials and exchange -them for Google access tokens. - -Example credential_source for pluggable credential: - - { - "executable": { - "command": "/path/to/get/credentials.sh --arg1=value1 --arg2=value2", - "timeout_millis": 5000, - "output_file": "/path/to/generated/cached/credentials" - } - } -""" - -try: - from collections.abc import Mapping -# Python 2.7 compatibility -except ImportError: # pragma: NO COVER - from collections import Mapping -import io -import json -import os -import subprocess -import time - -from google.auth import _helpers -from google.auth import exceptions -from google.auth import external_account - -# The max supported executable spec version. -EXECUTABLE_SUPPORTED_MAX_VERSION = 1 - - -class Credentials(external_account.Credentials): - """External account credentials sourced from executables.""" - - def __init__( - self, - audience, - subject_token_type, - token_url, - credential_source, - service_account_impersonation_url=None, - client_id=None, - client_secret=None, - quota_project_id=None, - scopes=None, - default_scopes=None, - workforce_pool_user_project=None, - ): - """Instantiates an external account credentials object from a executables. - - Args: - audience (str): The STS audience field. - subject_token_type (str): The subject token type. - token_url (str): The STS endpoint URL. - credential_source (Mapping): The credential source dictionary used to - provide instructions on how to retrieve external credential to be - exchanged for Google access tokens. - - Example credential_source for pluggable credential: - - { - "executable": { - "command": "/path/to/get/credentials.sh --arg1=value1 --arg2=value2", - "timeout_millis": 5000, - "output_file": "/path/to/generated/cached/credentials" - } - } - - service_account_impersonation_url (Optional[str]): The optional service account - impersonation getAccessToken URL. - client_id (Optional[str]): The optional client ID. - client_secret (Optional[str]): The optional client secret. - quota_project_id (Optional[str]): The optional quota project ID. - scopes (Optional[Sequence[str]]): Optional scopes to request during the - authorization grant. - default_scopes (Optional[Sequence[str]]): Default scopes passed by a - Google client library. Use 'scopes' for user-defined scopes. - workforce_pool_user_project (Optona[str]): The optional workforce pool user - project number when the credential corresponds to a workforce pool and not - a workload Pluggable. The underlying principal must still have - serviceusage.services.use IAM permission to use the project for - billing/quota. - - Raises: - google.auth.exceptions.RefreshError: If an error is encountered during - access token retrieval logic. - ValueError: For invalid parameters. - - .. note:: Typically one of the helper constructors - :meth:`from_file` or - :meth:`from_info` are used instead of calling the constructor directly. - """ - - super(Credentials, self).__init__( - audience=audience, - subject_token_type=subject_token_type, - token_url=token_url, - credential_source=credential_source, - service_account_impersonation_url=service_account_impersonation_url, - client_id=client_id, - client_secret=client_secret, - quota_project_id=quota_project_id, - scopes=scopes, - default_scopes=default_scopes, - workforce_pool_user_project=workforce_pool_user_project, - ) - if not isinstance(credential_source, Mapping): - self._credential_source_executable = None - raise ValueError( - "Missing credential_source. The credential_source is not a dict." - ) - self._credential_source_executable = credential_source.get("executable") - if not self._credential_source_executable: - raise ValueError( - "Missing credential_source. An 'executable' must be provided." - ) - self._credential_source_executable_command = self._credential_source_executable.get( - "command" - ) - self._credential_source_executable_timeout_millis = self._credential_source_executable.get( - "timeout_millis" - ) - self._credential_source_executable_output_file = self._credential_source_executable.get( - "output_file" - ) - - if not self._credential_source_executable_command: - raise ValueError( - "Missing command field. Executable command must be provided." - ) - if not self._credential_source_executable_timeout_millis: - self._credential_source_executable_timeout_millis = 30 * 1000 - elif ( - self._credential_source_executable_timeout_millis < 5 * 1000 - or self._credential_source_executable_timeout_millis > 120 * 1000 - ): - raise ValueError("Timeout must be between 5 and 120 seconds.") - - @_helpers.copy_docstring(external_account.Credentials) - def retrieve_subject_token(self, request): - env_allow_executables = os.environ.get( - "GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES" - ) - if env_allow_executables != "1": - raise ValueError( - "Executables need to be explicitly allowed (set GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES to '1') to run." - ) - - # Check output file. - if self._credential_source_executable_output_file is not None: - try: - with open( - self._credential_source_executable_output_file - ) as output_file: - response = json.load(output_file) - except Exception: - pass - else: - try: - # If the cached response is expired, _parse_subject_token will raise an error which will be ignored and we will call the executable again. - subject_token = self._parse_subject_token(response) - except ValueError: - raise - except exceptions.RefreshError: - pass - else: - return subject_token - - # Inject env vars. - original_audience = os.getenv("GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE") - os.environ["GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE"] = self._audience - original_subject_token_type = os.getenv("GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE") - os.environ["GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE"] = self._subject_token_type - original_interactive = os.getenv("GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE") - os.environ[ - "GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE" - ] = "0" # Always set to 0 until interactive mode is implemented. - original_service_account_impersonation_url = os.getenv( - "GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL" - ) - if self._service_account_impersonation_url is not None: - os.environ[ - "GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL" - ] = self.service_account_email - original_credential_source_executable_output_file = os.getenv( - "GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE" - ) - if self._credential_source_executable_output_file is not None: - os.environ[ - "GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE" - ] = self._credential_source_executable_output_file - - try: - result = subprocess.check_output( - self._credential_source_executable_command.split(), - stderr=subprocess.STDOUT, - ) - except subprocess.CalledProcessError as e: - raise exceptions.RefreshError( - "Executable exited with non-zero return code {}. Error: {}".format( - e.returncode, e.output - ) - ) - else: - try: - data = result.decode("utf-8") - response = json.loads(data) - subject_token = self._parse_subject_token(response) - except Exception: - raise - - # Reset env vars. - if original_audience is not None: - os.environ["GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE"] = original_audience - else: - del os.environ["GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE"] - if original_subject_token_type is not None: - os.environ[ - "GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE" - ] = original_subject_token_type - else: - del os.environ["GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE"] - if original_interactive is not None: - os.environ["GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE"] = original_interactive - else: - del os.environ["GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE"] - if original_service_account_impersonation_url is not None: - os.environ[ - "GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL" - ] = original_service_account_impersonation_url - elif os.getenv("GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL") is not None: - del os.environ["GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL"] - if original_credential_source_executable_output_file is not None: - os.environ[ - "GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE" - ] = original_credential_source_executable_output_file - elif os.getenv("GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE") is not None: - del os.environ["GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE"] - - return subject_token - - @classmethod - def from_info(cls, info, **kwargs): - """Creates a Pluggable Credentials instance from parsed external account info. - - Args: - info (Mapping[str, str]): The Pluggable external account info in Google - format. - kwargs: Additional arguments to pass to the constructor. - - Returns: - google.auth.pluggable.Credentials: The constructed - credentials. - - Raises: - ValueError: For invalid parameters. - """ - return cls( - audience=info.get("audience"), - subject_token_type=info.get("subject_token_type"), - token_url=info.get("token_url"), - service_account_impersonation_url=info.get( - "service_account_impersonation_url" - ), - client_id=info.get("client_id"), - client_secret=info.get("client_secret"), - credential_source=info.get("credential_source"), - quota_project_id=info.get("quota_project_id"), - workforce_pool_user_project=info.get("workforce_pool_user_project"), - **kwargs - ) - - @classmethod - def from_file(cls, filename, **kwargs): - """Creates an Pluggable Credentials instance from an external account json file. - - Args: - filename (str): The path to the Pluggable external account json file. - kwargs: Additional arguments to pass to the constructor. - - Returns: - google.auth.pluggable.Credentials: The constructed - credentials. - """ - with io.open(filename, "r", encoding="utf-8") as json_file: - data = json.load(json_file) - return cls.from_info(data, **kwargs) - - def _parse_subject_token(self, response): - if "version" not in response: - raise ValueError("The executable response is missing the version field.") - if response["version"] > EXECUTABLE_SUPPORTED_MAX_VERSION: - raise exceptions.RefreshError( - "Executable returned unsupported version {}.".format( - response["version"] - ) - ) - if "success" not in response: - raise ValueError("The executable response is missing the success field.") - if not response["success"]: - if "code" not in response or "message" not in response: - raise ValueError( - "Error code and message fields are required in the response." - ) - raise exceptions.RefreshError( - "Executable returned unsuccessful response: code: {}, message: {}.".format( - response["code"], response["message"] - ) - ) - if "expiration_time" not in response: - raise ValueError( - "The executable response is missing the expiration_time field." - ) - if response["expiration_time"] < time.time(): - raise exceptions.RefreshError( - "The token returned by the executable is expired." - ) - if "token_type" not in response: - raise ValueError("The executable response is missing the token_type field.") - if ( - response["token_type"] == "urn:ietf:params:oauth:token-type:jwt" - or response["token_type"] == "urn:ietf:params:oauth:token-type:id_token" - ): # OIDC - return response["id_token"] - elif response["token_type"] == "urn:ietf:params:oauth:token-type:saml2": # SAML - return response["saml_response"] - else: - raise exceptions.RefreshError("Executable returned unsupported token type.") diff --git a/tests/test__default.py b/tests/test__default.py index e92166811..ab8bad72e 100644 --- a/tests/test__default.py +++ b/tests/test__default.py @@ -28,7 +28,6 @@ from google.auth import external_account from google.auth import identity_pool from google.auth import impersonated_credentials -from google.auth import pluggable from google.oauth2 import gdch_credentials from google.oauth2 import service_account import google.oauth2.credentials @@ -76,13 +75,6 @@ "token_url": TOKEN_URL, "credential_source": {"file": SUBJECT_TOKEN_TEXT_FILE}, } -PLUGGABLE_DATA = { - "type": "external_account", - "audience": AUDIENCE, - "subject_token_type": "urn:ietf:params:oauth:token-type:jwt", - "token_url": TOKEN_URL, - "credential_source": {"executable": {"command": "command"}}, -} AWS_DATA = { "type": "external_account", "audience": AUDIENCE, @@ -1192,15 +1184,3 @@ def test_default_gdch_service_account_credentials(apply_quota_project_id, get_ad ) assert credentials._ais_ca_cert_path == "./ais_ca_cert.pem" assert credentials._ais_token_endpoint == "https://ais_endpoint/sts/v1beta/token" - - -@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH -def test_load_credentials_from_external_account_pluggable(get_project_id, tmpdir): - config_file = tmpdir.join("config.json") - config_file.write(json.dumps(PLUGGABLE_DATA)) - credentials, project_id = _default.load_credentials_from_file(str(config_file)) - - assert isinstance(credentials, pluggable.Credentials) - # Since no scopes are specified, the project ID cannot be determined. - assert project_id is None - assert get_project_id.called diff --git a/tests/test_pluggable.py b/tests/test_pluggable.py deleted file mode 100644 index 2580fd1d4..000000000 --- a/tests/test_pluggable.py +++ /dev/null @@ -1,692 +0,0 @@ -# Copyright 2022 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# import datetime -import json -import os -import subprocess - -import mock -import pytest # type: ignore - -# from six.moves import http_client -# from six.moves import urllib - -# from google.auth import _helpers -from google.auth import exceptions -from google.auth import pluggable - -# from google.auth import transport - - -CLIENT_ID = "username" -CLIENT_SECRET = "password" -# Base64 encoding of "username:password". -BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ=" -SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com" -SERVICE_ACCOUNT_IMPERSONATION_URL = ( - "https://us-east1-iamcredentials.googleapis.com/v1/projects/-" - + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL) -) -QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID" -SCOPES = ["scope1", "scope2"] -SUBJECT_TOKEN_FIELD_NAME = "access_token" - -TOKEN_URL = "https://sts.googleapis.com/v1/token" -SERVICE_ACCOUNT_IMPERSONATION_URL = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/byoid-test@cicpclientproj.iam.gserviceaccount.com:generateAccessToken" -SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt" -AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID" - - -class TestCredentials(object): - CREDENTIAL_SOURCE_EXECUTABLE_COMMAND = ( - "/fake/external/excutable --arg1=value1 --arg2=value2" - ) - CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE = "fake_output_file" - CREDENTIAL_SOURCE_EXECUTABLE = { - "command": CREDENTIAL_SOURCE_EXECUTABLE_COMMAND, - "timeout_millis": 30000, - "output_file": CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, - } - CREDENTIAL_SOURCE = {"executable": CREDENTIAL_SOURCE_EXECUTABLE} - EXECUTABLE_OIDC_TOKEN = "FAKE_ID_TOKEN" - EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN = { - "version": 1, - "success": True, - "token_type": "urn:ietf:params:oauth:token-type:id_token", - "id_token": EXECUTABLE_OIDC_TOKEN, - "expiration_time": 9999999999, - } - EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_JWT = { - "version": 1, - "success": True, - "token_type": "urn:ietf:params:oauth:token-type:jwt", - "id_token": EXECUTABLE_OIDC_TOKEN, - "expiration_time": 9999999999, - } - EXECUTABLE_SAML_TOKEN = "FAKE_SAML_RESPONSE" - EXECUTABLE_SUCCESSFUL_SAML_RESPONSE = { - "version": 1, - "success": True, - "token_type": "urn:ietf:params:oauth:token-type:saml2", - "saml_response": EXECUTABLE_SAML_TOKEN, - "expiration_time": 9999999999, - } - EXECUTABLE_FAILED_RESPONSE = { - "version": 1, - "success": False, - "code": "401", - "message": "Permission denied. Caller not authorized", - } - CREDENTIAL_URL = "http://fakeurl.com" - - @classmethod - def make_pluggable( - cls, - audience=AUDIENCE, - subject_token_type=SUBJECT_TOKEN_TYPE, - client_id=None, - client_secret=None, - quota_project_id=None, - scopes=None, - default_scopes=None, - service_account_impersonation_url=None, - credential_source=None, - workforce_pool_user_project=None, - ): - return pluggable.Credentials( - audience=audience, - subject_token_type=subject_token_type, - token_url=TOKEN_URL, - service_account_impersonation_url=service_account_impersonation_url, - credential_source=credential_source, - client_id=client_id, - client_secret=client_secret, - quota_project_id=quota_project_id, - scopes=scopes, - default_scopes=default_scopes, - workforce_pool_user_project=workforce_pool_user_project, - ) - - @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) - def test_from_info_full_options(self, mock_init): - credentials = pluggable.Credentials.from_info( - { - "audience": AUDIENCE, - "subject_token_type": SUBJECT_TOKEN_TYPE, - "token_url": TOKEN_URL, - "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, - "client_id": CLIENT_ID, - "client_secret": CLIENT_SECRET, - "quota_project_id": QUOTA_PROJECT_ID, - "credential_source": self.CREDENTIAL_SOURCE, - } - ) - - # Confirm pluggable.Credentials instantiated with expected attributes. - assert isinstance(credentials, pluggable.Credentials) - mock_init.assert_called_once_with( - audience=AUDIENCE, - subject_token_type=SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, - client_id=CLIENT_ID, - client_secret=CLIENT_SECRET, - credential_source=self.CREDENTIAL_SOURCE, - quota_project_id=QUOTA_PROJECT_ID, - workforce_pool_user_project=None, - ) - - @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) - def test_from_info_required_options_only(self, mock_init): - credentials = pluggable.Credentials.from_info( - { - "audience": AUDIENCE, - "subject_token_type": SUBJECT_TOKEN_TYPE, - "token_url": TOKEN_URL, - "credential_source": self.CREDENTIAL_SOURCE, - } - ) - - # Confirm pluggable.Credentials instantiated with expected attributes. - assert isinstance(credentials, pluggable.Credentials) - mock_init.assert_called_once_with( - audience=AUDIENCE, - subject_token_type=SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=None, - client_id=None, - client_secret=None, - credential_source=self.CREDENTIAL_SOURCE, - quota_project_id=None, - workforce_pool_user_project=None, - ) - - @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) - def test_from_file_full_options(self, mock_init, tmpdir): - info = { - "audience": AUDIENCE, - "subject_token_type": SUBJECT_TOKEN_TYPE, - "token_url": TOKEN_URL, - "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, - "client_id": CLIENT_ID, - "client_secret": CLIENT_SECRET, - "quota_project_id": QUOTA_PROJECT_ID, - "credential_source": self.CREDENTIAL_SOURCE, - } - config_file = tmpdir.join("config.json") - config_file.write(json.dumps(info)) - credentials = pluggable.Credentials.from_file(str(config_file)) - - # Confirm pluggable.Credentials instantiated with expected attributes. - assert isinstance(credentials, pluggable.Credentials) - mock_init.assert_called_once_with( - audience=AUDIENCE, - subject_token_type=SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, - client_id=CLIENT_ID, - client_secret=CLIENT_SECRET, - credential_source=self.CREDENTIAL_SOURCE, - quota_project_id=QUOTA_PROJECT_ID, - workforce_pool_user_project=None, - ) - - @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) - def test_from_file_required_options_only(self, mock_init, tmpdir): - info = { - "audience": AUDIENCE, - "subject_token_type": SUBJECT_TOKEN_TYPE, - "token_url": TOKEN_URL, - "credential_source": self.CREDENTIAL_SOURCE, - } - config_file = tmpdir.join("config.json") - config_file.write(json.dumps(info)) - credentials = pluggable.Credentials.from_file(str(config_file)) - - # Confirm pluggable.Credentials instantiated with expected attributes. - assert isinstance(credentials, pluggable.Credentials) - mock_init.assert_called_once_with( - audience=AUDIENCE, - subject_token_type=SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, - service_account_impersonation_url=None, - client_id=None, - client_secret=None, - credential_source=self.CREDENTIAL_SOURCE, - quota_project_id=None, - workforce_pool_user_project=None, - ) - - def test_constructor_invalid_options(self): - credential_source = {"unsupported": "value"} - - with pytest.raises(ValueError) as excinfo: - self.make_pluggable(credential_source=credential_source) - - assert excinfo.match(r"Missing credential_source") - - def test_constructor_invalid_credential_source(self): - with pytest.raises(ValueError) as excinfo: - self.make_pluggable(credential_source="non-dict") - - assert excinfo.match(r"Missing credential_source") - - def test_info_with_credential_source(self): - credentials = self.make_pluggable( - credential_source=self.CREDENTIAL_SOURCE.copy() - ) - - assert credentials.info == { - "type": "external_account", - "audience": AUDIENCE, - "subject_token_type": SUBJECT_TOKEN_TYPE, - "token_url": TOKEN_URL, - "credential_source": self.CREDENTIAL_SOURCE, - } - - @mock.patch.dict( - os.environ, - { - "GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1", - "GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE": "original_audience", - "GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE": "original_token_type", - "GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE": "0", - "GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL": "original_impersonated_email", - "GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE": "original_output_file", - }, - ) - def test_retrieve_subject_token_oidc_id_token(self): - with mock.patch( - "subprocess.check_output", - return_value=json.dumps( - self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN - ).encode("UTF-8"), - ): - credentials = self.make_pluggable( - audience=AUDIENCE, - service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, - credential_source=self.CREDENTIAL_SOURCE, - ) - - subject_token = credentials.retrieve_subject_token(None) - - assert subject_token == self.EXECUTABLE_OIDC_TOKEN - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_oidc_jwt(self): - with mock.patch( - "subprocess.check_output", - return_value=json.dumps( - self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_JWT - ).encode("UTF-8"), - ): - credentials = self.make_pluggable( - audience=AUDIENCE, - service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, - credential_source=self.CREDENTIAL_SOURCE, - ) - - subject_token = credentials.retrieve_subject_token(None) - - assert subject_token == self.EXECUTABLE_OIDC_TOKEN - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_saml(self): - with mock.patch( - "subprocess.check_output", - return_value=json.dumps(self.EXECUTABLE_SUCCESSFUL_SAML_RESPONSE).encode( - "UTF-8" - ), - ): - credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) - - subject_token = credentials.retrieve_subject_token(None) - - assert subject_token == self.EXECUTABLE_SAML_TOKEN - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_failed(self): - with mock.patch( - "subprocess.check_output", - return_value=json.dumps(self.EXECUTABLE_FAILED_RESPONSE).encode("UTF-8"), - ): - credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) - - with pytest.raises(exceptions.RefreshError) as excinfo: - _ = credentials.retrieve_subject_token(None) - - assert excinfo.match( - r"Executable returned unsuccessful response: code: 401, message: Permission denied. Caller not authorized." - ) - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "0"}) - def test_retrieve_subject_token_not_allowd(self): - with mock.patch( - "subprocess.check_output", - return_value=json.dumps( - self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN - ).encode("UTF-8"), - ): - credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) - - with pytest.raises(ValueError) as excinfo: - _ = credentials.retrieve_subject_token(None) - - assert excinfo.match(r"Executables need to be explicitly allowed") - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_invalid_version(self): - EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_VERSION_2 = { - "version": 2, - "success": True, - "token_type": "urn:ietf:params:oauth:token-type:id_token", - "id_token": self.EXECUTABLE_OIDC_TOKEN, - "expiration_time": 9999999999, - } - - with mock.patch( - "subprocess.check_output", - return_value=json.dumps( - EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_VERSION_2 - ).encode("UTF-8"), - ): - credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) - - with pytest.raises(exceptions.RefreshError) as excinfo: - _ = credentials.retrieve_subject_token(None) - - assert excinfo.match(r"Executable returned unsupported version.") - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_expired_token(self): - EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_EXPIRED = { - "version": 1, - "success": True, - "token_type": "urn:ietf:params:oauth:token-type:id_token", - "id_token": self.EXECUTABLE_OIDC_TOKEN, - "expiration_time": 0, - } - - with mock.patch( - "subprocess.check_output", - return_value=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_EXPIRED).encode( - "UTF-8" - ), - ): - credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) - - with pytest.raises(exceptions.RefreshError) as excinfo: - _ = credentials.retrieve_subject_token(None) - - assert excinfo.match(r"The token returned by the executable is expired.") - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_file_cache(self): - ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE = "actual_output_file" - ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE = { - "command": "command", - "timeout_millis": 30000, - "output_file": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, - } - ACTUAL_CREDENTIAL_SOURCE = {"executable": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE} - with open(ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, "w") as output_file: - json.dump(self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN, output_file) - - credentials = self.make_pluggable(credential_source=ACTUAL_CREDENTIAL_SOURCE) - - subject_token = credentials.retrieve_subject_token(None) - assert subject_token == self.EXECUTABLE_OIDC_TOKEN - - os.remove(ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE) - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_no_file_cache(self): - ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE = { - "command": "command", - "timeout_millis": 30000, - } - ACTUAL_CREDENTIAL_SOURCE = {"executable": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE} - - with mock.patch( - "subprocess.check_output", - return_value=json.dumps( - self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN - ).encode("UTF-8"), - ): - credentials = self.make_pluggable( - credential_source=ACTUAL_CREDENTIAL_SOURCE - ) - - subject_token = credentials.retrieve_subject_token(None) - - assert subject_token == self.EXECUTABLE_OIDC_TOKEN - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_file_cache_value_error_report(self): - ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE = "actual_output_file" - ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE = { - "command": "command", - "timeout_millis": 30000, - "output_file": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, - } - ACTUAL_CREDENTIAL_SOURCE = {"executable": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE} - ACTUAL_EXECUTABLE_RESPONSE = { - "success": True, - "token_type": "urn:ietf:params:oauth:token-type:id_token", - "id_token": self.EXECUTABLE_OIDC_TOKEN, - "expiration_time": 9999999999, - } - with open(ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, "w") as output_file: - json.dump(ACTUAL_EXECUTABLE_RESPONSE, output_file) - - credentials = self.make_pluggable(credential_source=ACTUAL_CREDENTIAL_SOURCE) - - with pytest.raises(ValueError) as excinfo: - _ = credentials.retrieve_subject_token(None) - - assert excinfo.match(r"The executable response is missing the version field.") - - os.remove(ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE) - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_file_cache_refresh_error_retry(self): - ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE = "actual_output_file" - ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE = { - "command": "command", - "timeout_millis": 30000, - "output_file": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, - } - ACTUAL_CREDENTIAL_SOURCE = {"executable": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE} - ACTUAL_EXECUTABLE_RESPONSE = { - "version": 2, - "success": True, - "token_type": "urn:ietf:params:oauth:token-type:id_token", - "id_token": self.EXECUTABLE_OIDC_TOKEN, - "expiration_time": 9999999999, - } - with open(ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, "w") as output_file: - json.dump(ACTUAL_EXECUTABLE_RESPONSE, output_file) - - with mock.patch( - "subprocess.check_output", - return_value=json.dumps( - self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN - ).encode("UTF-8"), - ): - credentials = self.make_pluggable( - credential_source=ACTUAL_CREDENTIAL_SOURCE - ) - - subject_token = credentials.retrieve_subject_token(None) - - assert subject_token == self.EXECUTABLE_OIDC_TOKEN - - os.remove(ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE) - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_unsupported_token_type(self): - EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE = { - "version": 1, - "success": True, - "token_type": "unsupported_token_type", - "id_token": self.EXECUTABLE_OIDC_TOKEN, - "expiration_time": 9999999999, - } - - with mock.patch( - "subprocess.check_output", - return_value=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE).encode( - "UTF-8" - ), - ): - credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) - - with pytest.raises(exceptions.RefreshError) as excinfo: - _ = credentials.retrieve_subject_token(None) - - assert excinfo.match(r"Executable returned unsupported token type.") - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_missing_version(self): - EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE = { - "success": True, - "token_type": "urn:ietf:params:oauth:token-type:id_token", - "id_token": self.EXECUTABLE_OIDC_TOKEN, - "expiration_time": 9999999999, - } - - with mock.patch( - "subprocess.check_output", - return_value=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE).encode( - "UTF-8" - ), - ): - credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) - - with pytest.raises(ValueError) as excinfo: - _ = credentials.retrieve_subject_token(None) - - assert excinfo.match( - r"The executable response is missing the version field." - ) - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_missing_success(self): - EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE = { - "version": 1, - "token_type": "urn:ietf:params:oauth:token-type:id_token", - "id_token": self.EXECUTABLE_OIDC_TOKEN, - "expiration_time": 9999999999, - } - - with mock.patch( - "subprocess.check_output", - return_value=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE).encode( - "UTF-8" - ), - ): - credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) - - with pytest.raises(ValueError) as excinfo: - _ = credentials.retrieve_subject_token(None) - - assert excinfo.match( - r"The executable response is missing the success field." - ) - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_missing_error_code_message(self): - EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE = {"version": 1, "success": False} - - with mock.patch( - "subprocess.check_output", - return_value=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE).encode( - "UTF-8" - ), - ): - credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) - - with pytest.raises(ValueError) as excinfo: - _ = credentials.retrieve_subject_token(None) - - assert excinfo.match( - r"Error code and message fields are required in the response." - ) - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_missing_expiration_time(self): - EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE = { - "version": 1, - "success": True, - "token_type": "urn:ietf:params:oauth:token-type:id_token", - "id_token": self.EXECUTABLE_OIDC_TOKEN, - } - - with mock.patch( - "subprocess.check_output", - return_value=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE).encode( - "UTF-8" - ), - ): - credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) - - with pytest.raises(ValueError) as excinfo: - _ = credentials.retrieve_subject_token(None) - - assert excinfo.match( - r"The executable response is missing the expiration_time field." - ) - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_missing_token_type(self): - EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE = { - "version": 1, - "success": True, - "id_token": self.EXECUTABLE_OIDC_TOKEN, - "expiration_time": 9999999999, - } - - with mock.patch( - "subprocess.check_output", - return_value=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE).encode( - "UTF-8" - ), - ): - credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) - - with pytest.raises(ValueError) as excinfo: - _ = credentials.retrieve_subject_token(None) - - assert excinfo.match( - r"The executable response is missing the token_type field." - ) - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_credential_source_missing_command(self): - with pytest.raises(ValueError) as excinfo: - CREDENTIAL_SOURCE = { - "executable": { - "timeout_millis": 30000, - "output_file": self.CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, - } - } - _ = self.make_pluggable(credential_source=CREDENTIAL_SOURCE) - - assert excinfo.match( - r"Missing command field. Executable command must be provided." - ) - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_credential_source_timeout_small(self): - with pytest.raises(ValueError) as excinfo: - CREDENTIAL_SOURCE = { - "executable": { - "command": self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND, - "timeout_millis": 5000 - 1, - "output_file": self.CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, - } - } - _ = self.make_pluggable(credential_source=CREDENTIAL_SOURCE) - - assert excinfo.match(r"Timeout must be between 5 and 120 seconds.") - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_credential_source_timeout_large(self): - with pytest.raises(ValueError) as excinfo: - CREDENTIAL_SOURCE = { - "executable": { - "command": self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND, - "timeout_millis": 120000 + 1, - "output_file": self.CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, - } - } - _ = self.make_pluggable(credential_source=CREDENTIAL_SOURCE) - - assert excinfo.match(r"Timeout must be between 5 and 120 seconds.") - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_executable_fail(self): - with mock.patch("subprocess.check_output") as subprocess_mock: - subprocess_mock.side_effect = subprocess.CalledProcessError( - returncode=1, cmd="" - ) - credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) - - with pytest.raises(exceptions.RefreshError) as excinfo: - _ = credentials.retrieve_subject_token(None) - - assert excinfo.match( - r"Executable exited with non-zero return code 1. Error: None" - )