diff --git a/google/auth/_default.py b/google/auth/_default.py index 34edda046..68d7cc1a1 100644 --- a/google/auth/_default.py +++ b/google/auth/_default.py @@ -325,6 +325,12 @@ def _get_external_account_credentials( credentials = aws.Credentials.from_info( info, scopes=scopes, default_scopes=default_scopes ) + elif info.get("credential_source").get("executable") is not None: + from google.auth import pluggable + + credentials = pluggable.Credentials.from_info( + info, scopes=scopes, default_scopes=default_scopes + ) else: try: # Check if configuration corresponds to an Identity Pool credentials. diff --git a/google/auth/pluggable.py b/google/auth/pluggable.py new file mode 100644 index 000000000..d440079f8 --- /dev/null +++ b/google/auth/pluggable.py @@ -0,0 +1,273 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Pluggable Credentials. + +This module provides credentials to access Google Cloud resources from on-prem +or non-Google Cloud platforms which support external credentials (e.g. OIDC ID +tokens) retrieved from local file locations or local servers. This includes +Microsoft Azure and OIDC identity providers (e.g. K8s workloads registered with +Hub with Hub workload identity enabled). + +These credentials are recommended over the use of service account credentials +in on-prem/non-Google Cloud platforms as they do not involve the management of +long-live service account private keys. + +Pluggable Credentials are initialized using external_account arguments which +are typically loaded from third-party executables. Unlike other +credentials that can be initialized with a list of explicit arguments, secrets +or credentials, external account clients use the environment and hints/guidelines +provided by the external_account JSON file to retrieve credentials and exchange +them for Google access tokens. +""" + +try: + from collections.abc import Mapping +# Python 2.7 compatibility +except ImportError: # pragma: NO COVER + from collections import Mapping +import io +import json +import os +import subprocess + +from google.auth import _helpers +from google.auth import exceptions +from google.auth import external_account + +# External account JSON type identifier. +EXECUTABLE_SUPPORTED_MAX_VERSION = 1 + +class Credentials(external_account.Credentials): + """External account credentials sourced from executables.""" + + def __init__( + self, + audience, + subject_token_type, + token_url, + credential_source, + service_account_impersonation_url=None, + client_id=None, + client_secret=None, + quota_project_id=None, + scopes=None, + default_scopes=None, + workforce_pool_user_project=None, + ): + """Instantiates an external account credentials object from a executables. + + Args: + audience (str): The STS audience field. + subject_token_type (str): The subject token type. + token_url (str): The STS endpoint URL. + credential_source (Mapping): The credential source dictionary used to + provide instructions on how to retrieve external credential to be + exchanged for Google access tokens. + + Example credential_source for pluggable credential:: + + { + "executable": { + "command": "/path/to/get/credentials.sh --arg1=value1 --arg2=value2", + "timeout_millis": 5000, + "output_file": "/path/to/generated/cached/credentials" + } + } + + service_account_impersonation_url (Optional[str]): The optional service account + impersonation getAccessToken URL. + client_id (Optional[str]): The optional client ID. + client_secret (Optional[str]): The optional client secret. + quota_project_id (Optional[str]): The optional quota project ID. + scopes (Optional[Sequence[str]]): Optional scopes to request during the + authorization grant. + default_scopes (Optional[Sequence[str]]): Default scopes passed by a + Google client library. Use 'scopes' for user-defined scopes. + workforce_pool_user_project (Optona[str]): The optional workforce pool user + project number when the credential corresponds to a workforce pool and not + a workload Pluggable. The underlying principal must still have + serviceusage.services.use IAM permission to use the project for + billing/quota. + + Raises: + google.auth.exceptions.RefreshError: If an error is encountered during + access token retrieval logic. + ValueError: For invalid parameters. + + .. note:: Typically one of the helper constructors + :meth:`from_file` or + :meth:`from_info` are used instead of calling the constructor directly. + """ + + super(Credentials, self).__init__( + audience=audience, + subject_token_type=subject_token_type, + token_url=token_url, + credential_source=credential_source, + service_account_impersonation_url=service_account_impersonation_url, + client_id=client_id, + client_secret=client_secret, + quota_project_id=quota_project_id, + scopes=scopes, + default_scopes=default_scopes, + workforce_pool_user_project=workforce_pool_user_project, + ) + if not isinstance(credential_source, Mapping): + self._credential_source_executable = None + raise ValueError( + "Missing credential_source. The credential_source is not a dict." + ) + else: + self._credential_source_executable = credential_source.get("executable") + if not self._credential_source_executable: + raise ValueError( + "Missing credential_source. An 'executable' must be provided." + ) + self._credential_source_executable_command = self._credential_source_executable.get("command") + self._credential_source_executable_timeout_millis = self._credential_source_executable.get("timeout_millis") + self._credential_source_executable_output_file = self._credential_source_executable.get("output_file") + + # environment_id is only supported in AWS or dedicated future external + # account credentials. + if "environment_id" in credential_source: + raise ValueError( + "Invalid Pluggable credential_source field 'environment_id'" + ) + + if not self._credential_source_executable_command: + raise ValueError( + "Missing command. Executable command must be provided." + ) + if not self._credential_source_executable_timeout_millis: + raise ValueError( + "Missing timeout_millis. Executable timeout millis must be provided." + ) + + @_helpers.copy_docstring(external_account.Credentials) + def retrieve_subject_token(self, request): + env_allow_executables = os.environ.get('GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES') + if env_allow_executables != '1': + raise ValueError( + "Executables need to be explicitly allowed (set GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES to '1') to run." + ) + + # Inject env vars + original_audience = os.getenv("GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE") + os.environ["GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE"] = self._audience + original_subject_token_type = os.getenv("GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE") + os.environ["GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE"] = self._subject_token_type + original_interactive = os.getenv("GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE") + os.environ["GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE"] = "0" # Always set to 0 until interactive mode is implemented. + original_service_account_impersonation_url = os.getenv("GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL") + if self._service_account_impersonation_url is not None: + os.environ["GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL"] = self._service_account_impersonation_url + original_credential_source_executable_output_file = os.getenv("GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE") + if self._credential_source_executable_output_file is not None: + os.environ["GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE"] = self._credential_source_executable_output_file + + result = subprocess.run(self._credential_source_executable_command.split(), timeout=self._credential_source_executable_timeout_millis/1000, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + # Reset env vars + if original_audience is not None: + os.environ["GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE"] = original_audience + else: + del os.environ["GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE"] + if original_subject_token_type is not None: + os.environ["GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE"] = self.original_subject_token_type + else: + del os.environ["GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE"] + if original_interactive is not None: + os.environ["GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE"] = original_interactive + else: + del os.environ["GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE"] + if original_service_account_impersonation_url is not None: + os.environ["GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL"] = original_service_account_impersonation_url + elif os.getenv("GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL") is not None: + del os.environ["GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL"] + if original_credential_source_executable_output_file is not None: + os.environ["GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE"] = original_credential_source_executable_output_file + elif os.getenv("GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE") is not None: + del os.environ["GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE"] + + if result.returncode != 0: + raise exceptions.RefreshError( + "Executable exited with non-zero return code {}. Error: {}".format(result.returncode, result.stdout) + ) + else: + data = result.stdout.decode('utf-8') + response = json.loads(data) + if not response['success']: + raise exceptions.RefreshError( + "Executable returned unsuccessful response: {}.".format(response) + ) + elif response['version'] > EXECUTABLE_SUPPORTED_MAX_VERSION: + raise exceptions.RefreshError( + "Executable returned unsupported version {}.".format(response['version']) + ) + elif response["token_type"] == "urn:ietf:params:oauth:token-type:jwt" or response["token_type"] == "urn:ietf:params:oauth:token-type:id_token": # OIDC + return response["id_token"] + elif response["token_type"] == "urn:ietf:params:oauth:token-type:saml2": # SAML + return response["saml_response"] + else: + raise exceptions.RefreshError( + "Executable returned unsupported token type." + ) + + @classmethod + def from_info(cls, info, **kwargs): + """Creates a Pluggable Credentials instance from parsed external account info. + + Args: + info (Mapping[str, str]): The Pluggable external account info in Google + format. + kwargs: Additional arguments to pass to the constructor. + + Returns: + google.auth.pluggable.Credentials: The constructed + credentials. + + Raises: + ValueError: For invalid parameters. + """ + return cls( + audience=info.get("audience"), + subject_token_type=info.get("subject_token_type"), + token_url=info.get("token_url"), + service_account_impersonation_url=info.get( + "service_account_impersonation_url" + ), + client_id=info.get("client_id"), + client_secret=info.get("client_secret"), + credential_source=info.get("credential_source"), + quota_project_id=info.get("quota_project_id"), + workforce_pool_user_project=info.get("workforce_pool_user_project"), + **kwargs + ) + + @classmethod + def from_file(cls, filename, **kwargs): + """Creates an Pluggable Credentials instance from an external account json file. + + Args: + filename (str): The path to the Pluggable external account json file. + kwargs: Additional arguments to pass to the constructor. + + Returns: + google.auth.pluggable.Credentials: The constructed + credentials. + """ + with io.open(filename, "r", encoding="utf-8") as json_file: + data = json.load(json_file) + return cls.from_info(data, **kwargs) diff --git a/tests/test_pluggable.py b/tests/test_pluggable.py new file mode 100644 index 000000000..b9d6e9f27 --- /dev/null +++ b/tests/test_pluggable.py @@ -0,0 +1,511 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json +import os + +import mock +import pytest # type: ignore +import subprocess +import pytest_subprocess +from six.moves import http_client +from six.moves import urllib + +from google.auth import _helpers +from google.auth import exceptions +from google.auth import identity_pool +from google.auth import pluggable +from google.auth import transport + + +CLIENT_ID = "username" +CLIENT_SECRET = "password" +# Base64 encoding of "username:password". +BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ=" +SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com" +SERVICE_ACCOUNT_IMPERSONATION_URL = ( + "https://us-east1-iamcredentials.googleapis.com/v1/projects/-" + + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL) +) +QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID" +SCOPES = ["scope1", "scope2"] +SUBJECT_TOKEN_FIELD_NAME = "access_token" + +TOKEN_URL = "https://sts.googleapis.com/v1/token" +SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt" +AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID" + +class TestCredentials(object): + CREDENTIAL_SOURCE_EXECUTABLE_COMMAND = "/fake/external/excutable --arg1=value1 --arg2=value2" + CREDENTIAL_SOURCE_EXECUTABLE = { + "command": CREDENTIAL_SOURCE_EXECUTABLE_COMMAND, + "timeout_millis": 5000, + "output_file": "/fake/output/file" + } + CREDENTIAL_SOURCE = { + "executable": CREDENTIAL_SOURCE_EXECUTABLE + } + EXECUTABLE_OIDC_TOKEN = "FAKE_ID_TOKEN" + EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN = { + "version": 1, + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:id_token", + "id_token": EXECUTABLE_OIDC_TOKEN, + "expiration_time": 9999999999 + } + EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_JWT = { + "version": 1, + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:jwt", + "id_token": EXECUTABLE_OIDC_TOKEN, + "expiration_time": 9999999999 + } + EXECUTABLE_SAML_TOKEN = "FAKE_SAML_RESPONSE" + EXECUTABLE_SUCCESSFUL_SAML_RESPONSE = { + "version": 1, + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:saml2", + "saml_response": EXECUTABLE_SAML_TOKEN, + "expiration_time": 1620433341 + } + EXECUTABLE_FAILED_RESPONSE = { + "version": 1, + "success": False, + "code": "401", + "message": "Permission denied. Caller not authorized" + } + CREDENTIAL_URL = "http://fakeurl.com" + + @classmethod + def make_mock_response(cls, status, data): + response = mock.create_autospec(transport.Response, instance=True) + response.status = status + if isinstance(data, dict): + response.data = json.dumps(data).encode("utf-8") + else: + response.data = data + return response + + @classmethod + def make_mock_request( + cls, token_status=http_client.OK, token_data=None, *extra_requests + ): + responses = [] + responses.append(cls.make_mock_response(token_status, token_data)) + + while len(extra_requests) > 0: + # If service account impersonation is requested, mock the expected response. + status, data, extra_requests = ( + extra_requests[0], + extra_requests[1], + extra_requests[2:], + ) + responses.append(cls.make_mock_response(status, data)) + + request = mock.create_autospec(transport.Request) + request.side_effect = responses + + return request + + @classmethod + def assert_credential_request_kwargs( + cls, request_kwargs, headers, url=CREDENTIAL_URL + ): + assert request_kwargs["url"] == url + assert request_kwargs["method"] == "GET" + assert request_kwargs["headers"] == headers + assert request_kwargs.get("body", None) is None + + @classmethod + def assert_token_request_kwargs( + cls, request_kwargs, headers, request_data, token_url=TOKEN_URL + ): + assert request_kwargs["url"] == token_url + assert request_kwargs["method"] == "POST" + assert request_kwargs["headers"] == headers + assert request_kwargs["body"] is not None + body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) + assert len(body_tuples) == len(request_data.keys()) + for (k, v) in body_tuples: + assert v.decode("utf-8") == request_data[k.decode("utf-8")] + + @classmethod + def assert_impersonation_request_kwargs( + cls, + request_kwargs, + headers, + request_data, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + ): + assert request_kwargs["url"] == service_account_impersonation_url + assert request_kwargs["method"] == "POST" + assert request_kwargs["headers"] == headers + assert request_kwargs["body"] is not None + body_json = json.loads(request_kwargs["body"].decode("utf-8")) + assert body_json == request_data + + @classmethod + def assert_underlying_credentials_refresh( + cls, + credentials, + audience, + subject_token, + subject_token_type, + token_url, + service_account_impersonation_url=None, + basic_auth_encoding=None, + quota_project_id=None, + used_scopes=None, + credential_data=None, + scopes=None, + default_scopes=None, + workforce_pool_user_project=None, + ): + """Utility to assert that a credentials are initialized with the expected + attributes by calling refresh functionality and confirming response matches + expected one and that the underlying requests were populated with the + expected parameters. + """ + # STS token exchange request/response. + token_response = cls.SUCCESS_RESPONSE.copy() + token_headers = {"Content-Type": "application/x-www-form-urlencoded"} + if basic_auth_encoding: + token_headers["Authorization"] = "Basic " + basic_auth_encoding + + if service_account_impersonation_url: + token_scopes = "https://www.googleapis.com/auth/iam" + else: + token_scopes = " ".join(used_scopes or []) + + token_request_data = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "audience": audience, + "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", + "scope": token_scopes, + "subject_token": subject_token, + "subject_token_type": subject_token_type, + } + if workforce_pool_user_project: + token_request_data["options"] = urllib.parse.quote( + json.dumps({"userProject": workforce_pool_user_project}) + ) + + if service_account_impersonation_url: + # Service account impersonation request/response. + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + + datetime.timedelta(seconds=3600) + ).isoformat("T") + "Z" + impersonation_response = { + "accessToken": "SA_ACCESS_TOKEN", + "expireTime": expire_time, + } + impersonation_headers = { + "Content-Type": "application/json", + "authorization": "Bearer {}".format(token_response["access_token"]), + } + impersonation_request_data = { + "delegates": None, + "scope": used_scopes, + "lifetime": "3600s", + } + + # Initialize mock request to handle token retrieval, token exchange and + # service account impersonation request. + requests = [] + if credential_data: + requests.append((http_client.OK, credential_data)) + + token_request_index = len(requests) + requests.append((http_client.OK, token_response)) + + if service_account_impersonation_url: + impersonation_request_index = len(requests) + requests.append((http_client.OK, impersonation_response)) + + request = cls.make_mock_request(*[el for req in requests for el in req]) + + credentials.refresh(request) + + assert len(request.call_args_list) == len(requests) + if credential_data: + cls.assert_credential_request_kwargs(request.call_args_list[0][1], None) + # Verify token exchange request parameters. + cls.assert_token_request_kwargs( + request.call_args_list[token_request_index][1], + token_headers, + token_request_data, + token_url, + ) + # Verify service account impersonation request parameters if the request + # is processed. + if service_account_impersonation_url: + cls.assert_impersonation_request_kwargs( + request.call_args_list[impersonation_request_index][1], + impersonation_headers, + impersonation_request_data, + service_account_impersonation_url, + ) + assert credentials.token == impersonation_response["accessToken"] + else: + assert credentials.token == token_response["access_token"] + assert credentials.quota_project_id == quota_project_id + assert credentials.scopes == scopes + assert credentials.default_scopes == default_scopes + + @classmethod + def make_pluggable( + cls, + audience=AUDIENCE, + subject_token_type=SUBJECT_TOKEN_TYPE, + client_id=None, + client_secret=None, + quota_project_id=None, + scopes=None, + default_scopes=None, + service_account_impersonation_url=None, + credential_source=None, + workforce_pool_user_project=None, + ): + return pluggable.Credentials( + audience=audience, + subject_token_type=subject_token_type, + token_url=TOKEN_URL, + service_account_impersonation_url=service_account_impersonation_url, + credential_source=credential_source, + client_id=client_id, + client_secret=client_secret, + quota_project_id=quota_project_id, + scopes=scopes, + default_scopes=default_scopes, + workforce_pool_user_project=workforce_pool_user_project, + ) + + @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) + def test_from_info_full_options(self, mock_init): + credentials = pluggable.Credentials.from_info( + { + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + "quota_project_id": QUOTA_PROJECT_ID, + "credential_source": self.CREDENTIAL_SOURCE, + } + ) + + # Confirm pluggable.Credentials instantiated with expected attributes. + assert isinstance(credentials, pluggable.Credentials) + mock_init.assert_called_once_with( + audience=AUDIENCE, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + credential_source=self.CREDENTIAL_SOURCE, + quota_project_id=QUOTA_PROJECT_ID, + workforce_pool_user_project=None, + ) + + @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) + def test_from_info_required_options_only(self, mock_init): + credentials = pluggable.Credentials.from_info( + { + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE, + } + ) + + # Confirm pluggable.Credentials instantiated with expected attributes. + assert isinstance(credentials, pluggable.Credentials) + mock_init.assert_called_once_with( + audience=AUDIENCE, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + client_id=None, + client_secret=None, + credential_source=self.CREDENTIAL_SOURCE, + quota_project_id=None, + workforce_pool_user_project=None, + ) + + @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) + def test_from_file_full_options(self, mock_init, tmpdir): + info = { + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + "quota_project_id": QUOTA_PROJECT_ID, + "credential_source": self.CREDENTIAL_SOURCE, + } + config_file = tmpdir.join("config.json") + config_file.write(json.dumps(info)) + credentials = pluggable.Credentials.from_file(str(config_file)) + + # Confirm pluggable.Credentials instantiated with expected attributes. + assert isinstance(credentials, pluggable.Credentials) + mock_init.assert_called_once_with( + audience=AUDIENCE, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + credential_source=self.CREDENTIAL_SOURCE, + quota_project_id=QUOTA_PROJECT_ID, + workforce_pool_user_project=None, + ) + + @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) + def test_from_file_required_options_only(self, mock_init, tmpdir): + info = { + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE, + } + config_file = tmpdir.join("config.json") + config_file.write(json.dumps(info)) + credentials = pluggable.Credentials.from_file(str(config_file)) + + # Confirm pluggable.Credentials instantiated with expected attributes. + assert isinstance(credentials, pluggable.Credentials) + mock_init.assert_called_once_with( + audience=AUDIENCE, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + client_id=None, + client_secret=None, + credential_source=self.CREDENTIAL_SOURCE, + quota_project_id=None, + workforce_pool_user_project=None, + ) + + def test_constructor_invalid_options(self): + credential_source = {"unsupported": "value"} + + with pytest.raises(ValueError) as excinfo: + self.make_pluggable(credential_source=credential_source) + + assert excinfo.match(r"Missing credential_source") + + def test_constructor_invalid_options_environment_id(self): + credential_source = {"executable": self.CREDENTIAL_SOURCE_EXECUTABLE, "environment_id": "aws1"} + + with pytest.raises(ValueError) as excinfo: + self.make_pluggable(credential_source=credential_source) + + assert excinfo.match( + r"Invalid Pluggable credential_source field 'environment_id'" + ) + + def test_constructor_invalid_credential_source(self): + with pytest.raises(ValueError) as excinfo: + self.make_pluggable(credential_source="non-dict") + + assert excinfo.match(r"Missing credential_source") + + def test_info_with_credential_source(self): + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE.copy() + ) + + assert credentials.info == { + "type": "external_account", + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE, + } + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_oidc_id_token(self, fp): + fp.register(self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(), stdout=json.dumps(self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN)) + + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE + ) + + subject_token = credentials.retrieve_subject_token(None) + + assert subject_token == self.EXECUTABLE_OIDC_TOKEN + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_oidc_jwt(self, fp): + fp.register(self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(), stdout=json.dumps(self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_JWT)) + + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE + ) + + subject_token = credentials.retrieve_subject_token(None) + + assert subject_token == self.EXECUTABLE_OIDC_TOKEN + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_saml(self, fp): + fp.register(self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(), stdout=json.dumps(self.EXECUTABLE_SUCCESSFUL_SAML_RESPONSE)) + + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE + ) + + subject_token = credentials.retrieve_subject_token(None) + + assert subject_token == self.EXECUTABLE_SAML_TOKEN + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_failed(self, fp): + fp.register(self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(), stdout=json.dumps(self.EXECUTABLE_FAILED_RESPONSE)) + + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE + ) + + with pytest.raises(exceptions.RefreshError) as excinfo: + subject_token = credentials.retrieve_subject_token(None) + + assert excinfo.match(r"Executable returned unsuccessful response") + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_invalid_version(self, fp): + EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_VERSION_2 = { + "version": 2, + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:id_token", + "id_token": self.EXECUTABLE_OIDC_TOKEN, + "expiration_time": 9999999999 + } + + fp.register(self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(), stdout=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_VERSION_2)) + + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE + ) + + with pytest.raises(exceptions.RefreshError) as excinfo: + subject_token = credentials.retrieve_subject_token(None) + + assert excinfo.match(r"Executable returned unsupported version") \ No newline at end of file