diff --git a/src/pip/_internal/network/auth.py b/src/pip/_internal/network/auth.py index bb223c6a231..a0baee1c84b 100644 --- a/src/pip/_internal/network/auth.py +++ b/src/pip/_internal/network/auth.py @@ -4,10 +4,12 @@ providing credentials in the context of network requests. """ +import os import shutil import subprocess import urllib.parse -from typing import Any, Dict, List, NamedTuple, Optional, Tuple +from abc import ABC, abstractmethod +from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Type from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth from pip._vendor.requests.models import Request, Response @@ -27,13 +29,73 @@ class Credentials(NamedTuple): - service_name: str + url: str username: str password: str -class KeyRingCli: - """Mirror the parts of keyring's API which pip uses +class KeyRingBaseProvider(ABC): + """Keyring base provider interface""" + + @classmethod + @abstractmethod + def is_available(cls) -> bool: + ... + + @classmethod + @abstractmethod + def get_auth_info(cls, url: str, username: Optional[str]) -> Optional[AuthInfo]: + ... + + @classmethod + @abstractmethod + def save_auth_info(cls, url: str, username: str, password: str) -> None: + ... + + +class KeyRingPythonProvider(KeyRingBaseProvider): + """Keyring interface which uses locally imported `keyring`""" + + try: + import keyring + + keyring = keyring + except ImportError: + keyring = None # type: ignore[assignment] + + @classmethod + def is_available(cls) -> bool: + return cls.keyring is not None + + @classmethod + def get_auth_info(cls, url: str, username: Optional[str]) -> Optional[AuthInfo]: + if cls.is_available is False: + return None + + # Support keyring's get_credential interface which supports getting + # credentials without a username. This is only available for + # keyring>=15.2.0. + if hasattr(cls.keyring, "get_credential"): + logger.debug("Getting credentials from keyring for %s", url) + cred = cls.keyring.get_credential(url, username) + if cred is not None: + return cred.username, cred.password + return None + + if username is not None: + logger.debug("Getting password from keyring for %s", url) + password = cls.keyring.get_password(url, username) + if password: + return username, password + return None + + @classmethod + def save_auth_info(cls, url: str, username: str, password: str) -> None: + cls.keyring.set_password(url, username, password) + + +class KeyRingCliProvider(KeyRingBaseProvider): + """Provider which uses `keyring` cli Instead of calling the keyring package installed alongside pip we call keyring on the command line which will enable pip to @@ -41,75 +103,85 @@ class KeyRingCli: PATH. """ - def __init__(self, keyring: str) -> None: - self.keyring = keyring + keyring = shutil.which("keyring") + + @classmethod + def is_available(cls) -> bool: + return cls.keyring is not None - def get_password(self, service_name: str, username: str) -> Optional[str]: - cmd = [self.keyring, "get", service_name, username] + @classmethod + def get_auth_info(cls, url: str, username: Optional[str]) -> Optional[AuthInfo]: + if cls.is_available is False: + return None + + # This is the default implementation of keyring.get_credential + # https://github.com/jaraco/keyring/blob/97689324abcf01bd1793d49063e7ca01e03d7d07/keyring/backend.py#L134-L139 + if username is not None: + password = cls._get_password(url, username) + if password is not None: + return username, password + return None + + @classmethod + def save_auth_info(cls, url: str, username: str, password: str) -> None: + if not cls.is_available: + raise RuntimeError("keyring is not available") + return cls._set_password(url, username, password) + + @classmethod + def _get_password(cls, service_name: str, username: str) -> Optional[str]: + """Mirror the implemenation of keyring.get_password using cli""" + if cls.keyring is None: + return None + + cmd = [cls.keyring, "get", service_name, username] + env = os.environ + env["PYTHONIOENCODING"] = "utf-8" res = subprocess.run( cmd, stdin=subprocess.DEVNULL, capture_output=True, - env=dict(PYTHONIOENCODING="utf-8"), + env=env, ) if res.returncode: return None return res.stdout.decode("utf-8").strip("\n") - def set_password(self, service_name: str, username: str, password: str) -> None: - cmd = [self.keyring, "set", service_name, username] + @classmethod + def _set_password(cls, service_name: str, username: str, password: str) -> None: + """Mirror the implemenation of keyring.set_password using cli""" + if cls.keyring is None: + return None + + cmd = [cls.keyring, "set", service_name, username] input_ = password.encode("utf-8") + b"\n" - res = subprocess.run(cmd, input=input_, env=dict(PYTHONIOENCODING="utf-8")) + env = os.environ + env["PYTHONIOENCODING"] = "utf-8" + res = subprocess.run(cmd, input=input_, env=env) res.check_returncode() return None -try: - import keyring -except ImportError: - keyring = None # type: ignore[assignment] - keyring_path = shutil.which("keyring") - if keyring_path is not None: - keyring = KeyRingCli(keyring_path) # type: ignore[assignment] -except Exception as exc: - logger.warning( - "Keyring is skipped due to an exception: %s", - str(exc), - ) - keyring = None # type: ignore[assignment] +def get_keyring_provider() -> Optional[Type[KeyRingBaseProvider]]: + if KeyRingPythonProvider.is_available(): + return KeyRingPythonProvider + if KeyRingCliProvider.is_available(): + return KeyRingCliProvider + return None def get_keyring_auth(url: Optional[str], username: Optional[str]) -> Optional[AuthInfo]: """Return the tuple auth for a given url from keyring.""" - global keyring - if not url or not keyring: + # Do nothing if no url was provided + if not url: return None - try: - try: - get_credential = keyring.get_credential - except AttributeError: - pass - else: - logger.debug("Getting credentials from keyring for %s", url) - cred = get_credential(url, username) - if cred is not None: - return cred.username, cred.password - return None - - if username: - logger.debug("Getting password from keyring for %s", url) - password = keyring.get_password(url, username) - if password: - return username, password + keyring = get_keyring_provider() + # Do nothin if keyring is not available + if keyring is None: + return None - except Exception as exc: - logger.warning( - "Keyring is skipped due to an exception: %s", - str(exc), - ) - keyring = None # type: ignore[assignment] - return None + return keyring.get_auth_info(url, username) class MultiDomainBasicAuth(AuthBase): @@ -283,7 +355,7 @@ def _prompt_for_password( # Factored out to allow for easy patching in tests def _should_save_password_to_keyring(self) -> bool: - if not keyring: + if get_keyring_provider() is None: return False return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y" @@ -319,7 +391,7 @@ def handle_401(self, resp: Response, **kwargs: Any) -> Response: # Prompt to save the password to keyring if save and self._should_save_password_to_keyring(): self._credentials_to_save = Credentials( - service_name=parsed.netloc, + url=parsed.netloc, username=username, password=password, ) @@ -355,15 +427,16 @@ def warn_on_401(self, resp: Response, **kwargs: Any) -> None: def save_credentials(self, resp: Response, **kwargs: Any) -> None: """Response callback to save credentials on success.""" + keyring = get_keyring_provider() assert keyring is not None, "should never reach here without keyring" if not keyring: - return + return None creds = self._credentials_to_save self._credentials_to_save = None if creds and resp.status_code < 400: try: logger.info("Saving credentials to keyring") - keyring.set_password(creds.service_name, creds.username, creds.password) + keyring.save_auth_info(creds.url, creds.username, creds.password) except Exception: logger.exception("Failed to save credentials")