diff --git a/social_core/backends/bitbucket_datacenter.py b/social_core/backends/bitbucket_datacenter.py new file mode 100644 index 000000000..419761486 --- /dev/null +++ b/social_core/backends/bitbucket_datacenter.py @@ -0,0 +1,107 @@ +""" +Bitbucket Data Center OAuth2 backend, docs at: + https://python-social-auth.readthedocs.io/en/latest/backends/bitbucket_datacenter_oauth2.html + https://confluence.atlassian.com/bitbucketserver/bitbucket-oauth-2-0-provider-api-1108483661.html +""" + +from .oauth import BaseOAuth2PKCE + + +class BitbucketDataCenterOAuth2(BaseOAuth2PKCE): + """ + Implements client for Bitbucket Data Center OAuth 2.0 provider API. + ref: https://confluence.atlassian.com/bitbucketserver/bitbucket-oauth-2-0-provider-api-1108483661.html + """ + + name = "bitbucket-datacenter-oauth2" + ID_KEY = "id" + SCOPE_SEPARATOR = " " + ACCESS_TOKEN_METHOD = "POST" + REFRESH_TOKEN_METHOD = "POST" + REDIRECT_STATE = False + STATE_PARAMETER = True + # ref: https://confluence.atlassian.com/bitbucketserver/bitbucket-oauth-2-0-provider-api-1108483661.html#BitbucketOAuth2.0providerAPI-scopes # noqa + DEFAULT_SCOPE = ["PUBLIC_REPOS"] + USE_BASIC_AUTH = False + EXTRA_DATA = [ + ("token_type", "token_type"), + ("access_token", "access_token"), + ("refresh_token", "refresh_token"), + ("expires_in", "expires"), + ("scope", "scope"), + # extra user profile fields + ("first_name", "first_name"), + ("last_name", "last_name"), + ("email", "email"), + ("name", "name"), + ("username", "username"), + ("display_name", "display_name"), + ("type", "type"), + ("active", "active"), + ("url", "url"), + ("avatar_url", "avatar_url"), + ] + PKCE_DEFAULT_CODE_CHALLENGE_METHOD = "s256" # can be "plain" or "s256" + PKCE_DEFAULT_CODE_VERIFIER_LENGTH = 48 # must be b/w 43-127 chars + DEFAULT_USE_PKCE = True + DEFAULT_USER_AVATAR_SIZE = 48 + + @property + def server_base_oauth2_api_url(self) -> str: + base_url = self.setting("URL") + return f"{base_url}/rest/oauth2/latest" + + @property + def server_base_rest_api_url(self) -> str: + base_url = self.setting("URL") + return f"{base_url}/rest/api/latest" + + def authorization_url(self) -> str: + return f"{self.server_base_oauth2_api_url}/authorize" + + def access_token_url(self) -> str: + return f"{self.server_base_oauth2_api_url}/token" + + def get_user_details(self, response) -> dict: + """Return user details for the Bitbucket Data Center account""" + # `response` here is the return value of `user_data` method + user_data = response + _, first_name, last_name = self.get_user_names(user_data["displayName"]) + uid = self.get_user_id(details=None, response=response) + return { + "uid": uid, + "first_name": first_name, + "last_name": last_name, + "email": user_data["emailAddress"], + "name": user_data["name"], + "username": user_data["slug"], + "display_name": user_data["displayName"], + "type": user_data["type"], + "active": user_data["active"], + "url": user_data["links"]["self"][0]["href"], + "avatar_url": user_data["avatarUrl"], + } + + def user_data(self, access_token, *args, **kwargs) -> dict: + """Fetch user data from Bitbucket Data Center REST API""" + # At this point, we don't know the current user's username + # and Bitbucket doesn't provide any API to do so. + # However, the current user's username is sent in every response header. + # ref: https://community.developer.atlassian.com/t/obtain-authorised-users-username-from-api/24422/2 # noqa + headers = {"Authorization": f"Bearer {access_token}"} + response = self.request( + url=f"{self.server_base_rest_api_url}/application-properties", + method="GET", + headers=headers, + ) + # ref: https://developer.atlassian.com/server/bitbucket/rest/v815/api-group-system-maintenance/#api-api-latest-users-userslug-get # noqa + username = response.headers["x-ausername"] + return self.get_json( + url=f"{self.server_base_rest_api_url}/users/{username}", + headers=headers, + params={ + "avatarSize": self.setting( + "USER_AVATAR_SIZE", default=self.DEFAULT_USER_AVATAR_SIZE + ) # to force `avatarUrl` in response + }, + ) diff --git a/social_core/backends/oauth.py b/social_core/backends/oauth.py index ae00aa5b2..57e146a59 100644 --- a/social_core/backends/oauth.py +++ b/social_core/backends/oauth.py @@ -1,3 +1,5 @@ +import base64 +import hashlib from urllib.parse import unquote, urlencode from oauthlib.oauth1 import SIGNATURE_TYPE_AUTH_HEADER @@ -5,6 +7,7 @@ from ..exceptions import ( AuthCanceled, + AuthException, AuthFailed, AuthMissingParameter, AuthStateForbidden, @@ -459,3 +462,68 @@ def refresh_token(self, token, *args, **kwargs): def refresh_token_url(self): return self.REFRESH_TOKEN_URL or self.access_token_url() + + +class BaseOAuth2PKCE(BaseOAuth2): + """ + Base class for providers using OAuth2 with Proof Key for Code Exchange (PKCE). + + OAuth2 details at: + https://datatracker.ietf.org/doc/html/rfc6749 + PKCE details at: + https://datatracker.ietf.org/doc/html/rfc7636 + """ + + PKCE_DEFAULT_CODE_CHALLENGE_METHOD = "s256" + PKCE_DEFAULT_CODE_VERIFIER_LENGTH = 32 + DEFAULT_USE_PKCE = True + + def create_code_verifier(self): + name = f"{self.name}_code_verifier" + code_verifier_len = self.setting( + "PKCE_CODE_VERIFIER_LENGTH", default=self.PKCE_DEFAULT_CODE_VERIFIER_LENGTH + ) + code_verifier = self.strategy.random_string(code_verifier_len) + self.strategy.session_set(name, code_verifier) + return code_verifier + + def get_code_verifier(self): + name = f"{self.name}_code_verifier" + code_verifier = self.strategy.session_get(name) + return code_verifier + + def generate_code_challenge(self, code_verifier, challenge_method): + method = challenge_method.lower() + if method == "s256": + hashed = hashlib.sha256(code_verifier.encode()).digest() + encoded = base64.urlsafe_b64encode(hashed) + code_challenge = encoded.decode().replace("=", "") # remove padding + return code_challenge + if method == "plain": + return code_verifier + raise AuthException("Unsupported code challenge method.") + + def auth_params(self, state=None): + params = super().auth_params(state=state) + + if self.setting("USE_PKCE", default=self.DEFAULT_USE_PKCE): + code_challenge_method = self.setting( + "PKCE_CODE_CHALLENGE_METHOD", + default=self.PKCE_DEFAULT_CODE_CHALLENGE_METHOD, + ) + code_verifier = self.create_code_verifier() + code_challenge = self.generate_code_challenge( + code_verifier, code_challenge_method + ) + params["code_challenge_method"] = code_challenge_method + params["code_challenge"] = code_challenge + return params + + def auth_complete_params(self, state=None): + params = super().auth_complete_params(state=state) + + if self.setting("USE_PKCE", default=self.DEFAULT_USE_PKCE): + code_verifier = self.get_code_verifier() + params["code_verifier"] = code_verifier + + return params diff --git a/social_core/backends/twitter_oauth2.py b/social_core/backends/twitter_oauth2.py index 4172fc36f..ead85b473 100644 --- a/social_core/backends/twitter_oauth2.py +++ b/social_core/backends/twitter_oauth2.py @@ -3,14 +3,10 @@ https://python-social-auth.readthedocs.io/en/latest/backends/twitter-oauth2.html https://developer.twitter.com/en/docs/authentication/oauth-2-0/authorization-code """ -import base64 -import hashlib +from .oauth import BaseOAuth2PKCE -from ..exceptions import AuthException -from .oauth import BaseOAuth2 - -class TwitterOAuth2(BaseOAuth2): +class TwitterOAuth2(BaseOAuth2PKCE): """Twitter OAuth2 authentication backend""" name = "twitter-oauth2" @@ -40,7 +36,8 @@ class TwitterOAuth2(BaseOAuth2): ("public_metrics", "public_metrics"), ] PKCE_DEFAULT_CODE_CHALLENGE_METHOD = "s256" - USE_PKCE = True + PKCE_DEFAULT_CODE_VERIFIER_LENGTH = 32 + DEFAULT_USE_PKCE = True def get_user_details(self, response): """Return user details from Twitter account""" @@ -104,50 +101,3 @@ def user_data(self, access_token, *args, **kwargs): headers={"Authorization": "Bearer %s" % access_token}, ) return response["data"] - - def create_code_verifier(self): - name = self.name + "_code_verifier" - code_verifier = self.strategy.random_string(32) - self.strategy.session_set(name, code_verifier) - return code_verifier - - def get_code_verifier(self): - name = self.name + "_code_verifier" - code_verifier = self.strategy.session_get(name) - return code_verifier - - def generate_code_challenge(self, code_verifier, challenge_method): - method = challenge_method.lower() - if method == "s256": - hashed = hashlib.sha256(code_verifier.encode()).digest() - encoded = base64.urlsafe_b64encode(hashed) - code_challenge = encoded.decode().replace("=", "") # remove padding - return code_challenge - elif method == "plain": - return code_verifier - else: - raise AuthException("Unsupported code challenge method.") - - def auth_params(self, state=None): - params = super().auth_params(state=state) - - if self.USE_PKCE: - code_challenge_method = self.setting("PKCE_CODE_CHALLENGE_METHOD") - if not code_challenge_method: - code_challenge_method = self.PKCE_DEFAULT_CODE_CHALLENGE_METHOD - code_verifier = self.create_code_verifier() - code_challenge = self.generate_code_challenge( - code_verifier, code_challenge_method - ) - params["code_challenge_method"] = code_challenge_method - params["code_challenge"] = code_challenge - return params - - def auth_complete_params(self, state=None): - params = super().auth_complete_params(state=state) - - if self.USE_PKCE: - code_verifier = self.get_code_verifier() - params["code_verifier"] = code_verifier - - return params diff --git a/social_core/tests/backends/oauth.py b/social_core/tests/backends/oauth.py index 44946d419..c00e014f7 100644 --- a/social_core/tests/backends/oauth.py +++ b/social_core/tests/backends/oauth.py @@ -1,7 +1,7 @@ from urllib.parse import urlparse import requests -from httpretty import HTTPretty +from httpretty import HTTPretty, latest_requests from ...utils import parse_qs, url_add_parameters from ..models import User @@ -121,3 +121,57 @@ def do_refresh_token(self): social = user.social[0] social.refresh_token(strategy=self.strategy, **self.refresh_token_arguments()) return user, social + + +class OAuth2PkcePlainTest(OAuth2Test): + def extra_settings(self): + settings = super().extra_settings() + settings.update( + {f"SOCIAL_AUTH_{self.name}_PKCE_CODE_CHALLENGE_METHOD": "plain"} + ) + return settings + + def do_login(self): + user = super().do_login() + + requests = latest_requests() + auth_request = [ + r for r in requests if self.backend.authorization_url() in r.url + ][0] + code_challenge = auth_request.querystring.get("code_challenge")[0] + code_challenge_method = auth_request.querystring.get("code_challenge_method")[0] + self.assertIsNotNone(code_challenge) + self.assertEqual(code_challenge_method, "plain") + + auth_complete = [ + r for r in requests if self.backend.access_token_url() in r.url + ][0] + code_verifier = auth_complete.parsed_body.get("code_verifier")[0] + self.assertEqual(code_challenge, code_verifier) + + return user + + +class OAuth2PkceS256Test(OAuth2Test): + def do_login(self): + # use default value of PKCE_CODE_CHALLENGE_METHOD (s256) + user = super().do_login() + + requests = latest_requests() + auth_request = [ + r for r in requests if self.backend.authorization_url() in r.url + ][0] + code_challenge = auth_request.querystring.get("code_challenge")[0] + code_challenge_method = auth_request.querystring.get("code_challenge_method")[0] + self.assertIsNotNone(code_challenge) + self.assertEqual(code_challenge_method, "s256") + + auth_complete = [ + r for r in requests if self.backend.access_token_url() in r.url + ][0] + code_verifier = auth_complete.parsed_body.get("code_verifier")[0] + self.assertEqual( + self.backend.generate_code_challenge(code_verifier, "s256"), code_challenge + ) + + return user diff --git a/social_core/tests/backends/test_bitbucket_datacenter.py b/social_core/tests/backends/test_bitbucket_datacenter.py new file mode 100644 index 000000000..8a3d3f9c5 --- /dev/null +++ b/social_core/tests/backends/test_bitbucket_datacenter.py @@ -0,0 +1,151 @@ +import json + +from httpretty import HTTPretty + +from .oauth import OAuth2PkcePlainTest, OAuth2PkceS256Test + + +class BitbucketDataCenterOAuth2Mixin: + backend_path = "social_core.backends.bitbucket_datacenter.BitbucketDataCenterOAuth2" + application_properties_url = ( + "https://bachmanity.atlassian.net/rest/api/latest/application-properties" + ) + application_properties_headers = {"x-ausername": "erlich-bachman"} + application_properties_body = json.dumps( + { + "version": "8.15.0", + "buildNumber": "8015000", + "buildDate": "1697764661289", + "displayName": "Bitbucket", + } + ) + user_data_url = ( + "https://bachmanity.atlassian.net/rest/api/latest/users/erlich-bachman" + ) + user_data_body = json.dumps( + { + "name": "erlich-bachman", + "emailAddress": "erlich@bachmanity.com", + "active": True, + "displayName": "Erlich Bachman", + "id": 1, + "slug": "erlich-bachman", + "type": "NORMAL", + "links": { + "self": [ + {"href": "https://bachmanity.atlassian.net/users/erlich-bachman"} + ] + }, + "avatarUrl": "http://www.gravatar.com/avatar/af7d968fe79ea45271e3100391824b79.jpg?s=48&d=mm", + } + ) + access_token_body = json.dumps( + { + "scope": "PUBLIC_REPOS", + "access_token": "dummy_access_token", + "token_type": "bearer", + "expires_in": 3600, + "refresh_token": "dummy_refresh_token", + } + ) + refresh_token_body = json.dumps( + { + "scope": "PUBLIC_REPOS", + "access_token": "dummy_access_token_refreshed", + "token_type": "bearer", + "expires_in": 3600, + "refresh_token": "dummy_refresh_token_refreshed", + } + ) + expected_username = "erlich-bachman" + + def extra_settings(self): + settings = super().extra_settings() + settings.update( + {f"SOCIAL_AUTH_{self.name}_URL": "https://bachmanity.atlassian.net"} + ) + return settings + + def auth_handlers(self, start_url): + target_url = super().auth_handlers(start_url) + HTTPretty.register_uri( + HTTPretty.GET, + self.application_properties_url, + body=self.application_properties_body, + adding_headers=self.application_properties_headers, + content_type="text/json", + ) + return target_url + + def test_login(self): + user = self.do_login() + + self.assertEqual(len(user.social), 1) + + social = user.social[0] + self.assertEqual(social.uid, 1) + self.assertEqual(social.extra_data["first_name"], "Erlich") + self.assertEqual(social.extra_data["last_name"], "Bachman") + self.assertEqual(social.extra_data["email"], "erlich@bachmanity.com") + self.assertEqual(social.extra_data["name"], "erlich-bachman") + self.assertEqual(social.extra_data["username"], "erlich-bachman") + self.assertEqual(social.extra_data["display_name"], "Erlich Bachman") + self.assertEqual(social.extra_data["type"], "NORMAL") + self.assertEqual(social.extra_data["active"], True) + self.assertEqual( + social.extra_data["url"], + "https://bachmanity.atlassian.net/users/erlich-bachman", + ) + self.assertEqual( + social.extra_data["avatar_url"], + "http://www.gravatar.com/avatar/af7d968fe79ea45271e3100391824b79.jpg?s=48&d=mm", + ) + self.assertEqual(social.extra_data["scope"], "PUBLIC_REPOS") + self.assertEqual(social.extra_data["access_token"], "dummy_access_token") + self.assertEqual(social.extra_data["token_type"], "bearer") + self.assertEqual(social.extra_data["expires"], 3600) + self.assertEqual(social.extra_data["refresh_token"], "dummy_refresh_token") + + def test_refresh_token(self): + _, social = self.do_refresh_token() + + self.assertEqual(social.uid, 1) + self.assertEqual(social.extra_data["first_name"], "Erlich") + self.assertEqual(social.extra_data["last_name"], "Bachman") + self.assertEqual(social.extra_data["email"], "erlich@bachmanity.com") + self.assertEqual(social.extra_data["name"], "erlich-bachman") + self.assertEqual(social.extra_data["username"], "erlich-bachman") + self.assertEqual(social.extra_data["display_name"], "Erlich Bachman") + self.assertEqual(social.extra_data["type"], "NORMAL") + self.assertEqual(social.extra_data["active"], True) + self.assertEqual( + social.extra_data["url"], + "https://bachmanity.atlassian.net/users/erlich-bachman", + ) + self.assertEqual( + social.extra_data["avatar_url"], + "http://www.gravatar.com/avatar/af7d968fe79ea45271e3100391824b79.jpg?s=48&d=mm", + ) + self.assertEqual(social.extra_data["scope"], "PUBLIC_REPOS") + self.assertEqual( + social.extra_data["access_token"], "dummy_access_token_refreshed" + ) + self.assertEqual(social.extra_data["token_type"], "bearer") + self.assertEqual(social.extra_data["expires"], 3600) + self.assertEqual( + social.extra_data["refresh_token"], "dummy_refresh_token_refreshed" + ) + + +class BitbucketDataCenterOAuth2TestPkcePlain( + BitbucketDataCenterOAuth2Mixin, + OAuth2PkcePlainTest, +): + pass + + +class BitbucketDataCenterOAuth2TestPkceS256( + BitbucketDataCenterOAuth2Mixin, + OAuth2PkceS256Test, +): + pass diff --git a/social_core/tests/backends/test_twitter_oauth2.py b/social_core/tests/backends/test_twitter_oauth2.py index d003fb77f..5ed76abfd 100644 --- a/social_core/tests/backends/test_twitter_oauth2.py +++ b/social_core/tests/backends/test_twitter_oauth2.py @@ -1,13 +1,11 @@ import json -import httpretty - from social_core.exceptions import AuthException -from .oauth import OAuth2Test +from .oauth import OAuth2PkcePlainTest, OAuth2PkceS256Test, OAuth2Test -class TwitterOAuth2Test(OAuth2Test): +class TwitterOAuth2Mixin: backend_path = "social_core.backends.twitter_oauth2.TwitterOAuth2" user_data_url = "https://api.twitter.com/2/users/me" access_token_body = json.dumps( @@ -172,58 +170,21 @@ def test_login(self): self.assertIsNone(social.extra_data.get("public_metrics")) -class TwitterOAuth2TestPkcePlain(TwitterOAuth2Test): - def test_login(self): - self.strategy.set_settings( - {"SOCIAL_AUTH_TWITTER_OAUTH2_PKCE_CODE_CHALLENGE_METHOD": "plain"} - ) - - self.do_login() - - requests = httpretty.latest_requests() - auth_request = [ - r for r in requests if "https://twitter.com/i/oauth2/authorize" in r.url - ][0] - code_challenge = auth_request.querystring.get("code_challenge")[0] - code_challenge_method = auth_request.querystring.get("code_challenge_method")[0] - self.assertIsNotNone(code_challenge) - self.assertEqual(code_challenge_method, "plain") - - auth_complete = [ - r for r in requests if "https://api.twitter.com/2/oauth2/token" in r.url - ][0] - code_verifier = auth_complete.parsed_body.get("code_verifier")[0] - self.assertEqual(code_challenge, code_verifier) +class TwitterOAuth2TestPkcePlain(TwitterOAuth2Mixin, OAuth2PkcePlainTest): + pass -class TwitterOAuth2TestPkceS256(TwitterOAuth2Test): - def test_login(self): - # use default value of PKCE_CODE_CHALLENGE_METHOD (s256) - self.do_login() - - requests = httpretty.latest_requests() - auth_request = [ - r for r in requests if "https://twitter.com/i/oauth2/authorize" in r.url - ][0] - code_challenge = auth_request.querystring.get("code_challenge")[0] - code_challenge_method = auth_request.querystring.get("code_challenge_method")[0] - self.assertIsNotNone(code_challenge) - self.assertEqual(code_challenge_method, "s256") - - auth_complete = [ - r for r in requests if "https://api.twitter.com/2/oauth2/toke" in r.url - ][0] - code_verifier = auth_complete.parsed_body.get("code_verifier")[0] - self.assertEqual( - self.backend.generate_code_challenge(code_verifier, "s256"), code_challenge - ) +class TwitterOAuth2TestPkceS256(TwitterOAuth2Mixin, OAuth2PkceS256Test): + pass -class TwitterOAuth2TestInvalidCodeChallengeMethod(TwitterOAuth2Test): +class TwitterOAuth2TestInvalidCodeChallengeMethod( + TwitterOAuth2Mixin, OAuth2PkcePlainTest +): def test_login__error(self): self.strategy.set_settings( { - "SOCIAL_AUTH_TWITTER_OAUTH2_PKCE_CODE_CHALLENGE_METHOD": "invalidmethodname" + f"SOCIAL_AUTH_{self.name}_PKCE_CODE_CHALLENGE_METHOD": "invalidmethodname", } )