Skip to content

Commit

Permalink
feat: add new backend BitbucketDataCenterOAuth2 (#856)
Browse files Browse the repository at this point in the history
* feat: add new backend BitbucketDataCenterOAuth2

* abstract away PKCE logic in BaseOAuth2PKCE for reuse

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* abstract away PKCE logic in BaseOAuth2PKCE for reuse

* noqa flake8 line length rule for URLs in comments

* chore: add BitbucketDataCenterOAuth2Test

* chore: abstract PKCE tests in OAuth2PkcePlainTest, OAuth2PkceS256Test

* noqa flake8 line length rule for URLs in comments

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* chore: fix isort errors

* chore: improvements, address review suggestions

* fix: docs URL

* Apply suggestions from code review

Co-authored-by: Johan Castiblanco <51926076+johanv26@users.noreply.github.com>

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Johan Castiblanco <51926076+johanv26@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 22, 2023
1 parent a670659 commit 51ff887
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 104 deletions.
107 changes: 107 additions & 0 deletions social_core/backends/bitbucket_datacenter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""
Bitbucket Data Center OAuth2 backend, docs at:
https://python-social-auth.readthedocs.io/en/latest/backends/bitbucket_datacenter_oauth2.html
https://confluence.atlassian.com/bitbucketserver/bitbucket-oauth-2-0-provider-api-1108483661.html
"""

from .oauth import BaseOAuth2PKCE


class BitbucketDataCenterOAuth2(BaseOAuth2PKCE):
"""
Implements client for Bitbucket Data Center OAuth 2.0 provider API.
ref: https://confluence.atlassian.com/bitbucketserver/bitbucket-oauth-2-0-provider-api-1108483661.html
"""

name = "bitbucket-datacenter-oauth2"
ID_KEY = "id"
SCOPE_SEPARATOR = " "
ACCESS_TOKEN_METHOD = "POST"
REFRESH_TOKEN_METHOD = "POST"
REDIRECT_STATE = False
STATE_PARAMETER = True
# ref: https://confluence.atlassian.com/bitbucketserver/bitbucket-oauth-2-0-provider-api-1108483661.html#BitbucketOAuth2.0providerAPI-scopes # noqa
DEFAULT_SCOPE = ["PUBLIC_REPOS"]
USE_BASIC_AUTH = False
EXTRA_DATA = [
("token_type", "token_type"),
("access_token", "access_token"),
("refresh_token", "refresh_token"),
("expires_in", "expires"),
("scope", "scope"),
# extra user profile fields
("first_name", "first_name"),
("last_name", "last_name"),
("email", "email"),
("name", "name"),
("username", "username"),
("display_name", "display_name"),
("type", "type"),
("active", "active"),
("url", "url"),
("avatar_url", "avatar_url"),
]
PKCE_DEFAULT_CODE_CHALLENGE_METHOD = "s256" # can be "plain" or "s256"
PKCE_DEFAULT_CODE_VERIFIER_LENGTH = 48 # must be b/w 43-127 chars
DEFAULT_USE_PKCE = True
DEFAULT_USER_AVATAR_SIZE = 48

@property
def server_base_oauth2_api_url(self) -> str:
base_url = self.setting("URL")
return f"{base_url}/rest/oauth2/latest"

@property
def server_base_rest_api_url(self) -> str:
base_url = self.setting("URL")
return f"{base_url}/rest/api/latest"

def authorization_url(self) -> str:
return f"{self.server_base_oauth2_api_url}/authorize"

def access_token_url(self) -> str:
return f"{self.server_base_oauth2_api_url}/token"

def get_user_details(self, response) -> dict:
"""Return user details for the Bitbucket Data Center account"""
# `response` here is the return value of `user_data` method
user_data = response
_, first_name, last_name = self.get_user_names(user_data["displayName"])
uid = self.get_user_id(details=None, response=response)
return {
"uid": uid,
"first_name": first_name,
"last_name": last_name,
"email": user_data["emailAddress"],
"name": user_data["name"],
"username": user_data["slug"],
"display_name": user_data["displayName"],
"type": user_data["type"],
"active": user_data["active"],
"url": user_data["links"]["self"][0]["href"],
"avatar_url": user_data["avatarUrl"],
}

def user_data(self, access_token, *args, **kwargs) -> dict:
"""Fetch user data from Bitbucket Data Center REST API"""
# At this point, we don't know the current user's username
# and Bitbucket doesn't provide any API to do so.
# However, the current user's username is sent in every response header.
# ref: https://community.developer.atlassian.com/t/obtain-authorised-users-username-from-api/24422/2 # noqa
headers = {"Authorization": f"Bearer {access_token}"}
response = self.request(
url=f"{self.server_base_rest_api_url}/application-properties",
method="GET",
headers=headers,
)
# ref: https://developer.atlassian.com/server/bitbucket/rest/v815/api-group-system-maintenance/#api-api-latest-users-userslug-get # noqa
username = response.headers["x-ausername"]
return self.get_json(
url=f"{self.server_base_rest_api_url}/users/{username}",
headers=headers,
params={
"avatarSize": self.setting(
"USER_AVATAR_SIZE", default=self.DEFAULT_USER_AVATAR_SIZE
) # to force `avatarUrl` in response
},
)
68 changes: 68 additions & 0 deletions social_core/backends/oauth.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import base64
import hashlib
from urllib.parse import unquote, urlencode

from oauthlib.oauth1 import SIGNATURE_TYPE_AUTH_HEADER
from requests_oauthlib import OAuth1

from ..exceptions import (
AuthCanceled,
AuthException,
AuthFailed,
AuthMissingParameter,
AuthStateForbidden,
Expand Down Expand Up @@ -459,3 +462,68 @@ def refresh_token(self, token, *args, **kwargs):

def refresh_token_url(self):
return self.REFRESH_TOKEN_URL or self.access_token_url()


class BaseOAuth2PKCE(BaseOAuth2):
"""
Base class for providers using OAuth2 with Proof Key for Code Exchange (PKCE).
OAuth2 details at:
https://datatracker.ietf.org/doc/html/rfc6749
PKCE details at:
https://datatracker.ietf.org/doc/html/rfc7636
"""

PKCE_DEFAULT_CODE_CHALLENGE_METHOD = "s256"
PKCE_DEFAULT_CODE_VERIFIER_LENGTH = 32
DEFAULT_USE_PKCE = True

def create_code_verifier(self):
name = f"{self.name}_code_verifier"
code_verifier_len = self.setting(
"PKCE_CODE_VERIFIER_LENGTH", default=self.PKCE_DEFAULT_CODE_VERIFIER_LENGTH
)
code_verifier = self.strategy.random_string(code_verifier_len)
self.strategy.session_set(name, code_verifier)
return code_verifier

def get_code_verifier(self):
name = f"{self.name}_code_verifier"
code_verifier = self.strategy.session_get(name)
return code_verifier

def generate_code_challenge(self, code_verifier, challenge_method):
method = challenge_method.lower()
if method == "s256":
hashed = hashlib.sha256(code_verifier.encode()).digest()
encoded = base64.urlsafe_b64encode(hashed)
code_challenge = encoded.decode().replace("=", "") # remove padding
return code_challenge
if method == "plain":
return code_verifier
raise AuthException("Unsupported code challenge method.")

def auth_params(self, state=None):
params = super().auth_params(state=state)

if self.setting("USE_PKCE", default=self.DEFAULT_USE_PKCE):
code_challenge_method = self.setting(
"PKCE_CODE_CHALLENGE_METHOD",
default=self.PKCE_DEFAULT_CODE_CHALLENGE_METHOD,
)
code_verifier = self.create_code_verifier()
code_challenge = self.generate_code_challenge(
code_verifier, code_challenge_method
)
params["code_challenge_method"] = code_challenge_method
params["code_challenge"] = code_challenge
return params

def auth_complete_params(self, state=None):
params = super().auth_complete_params(state=state)

if self.setting("USE_PKCE", default=self.DEFAULT_USE_PKCE):
code_verifier = self.get_code_verifier()
params["code_verifier"] = code_verifier

return params
58 changes: 4 additions & 54 deletions social_core/backends/twitter_oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@
https://python-social-auth.readthedocs.io/en/latest/backends/twitter-oauth2.html
https://developer.twitter.com/en/docs/authentication/oauth-2-0/authorization-code
"""
import base64
import hashlib
from .oauth import BaseOAuth2PKCE

from ..exceptions import AuthException
from .oauth import BaseOAuth2


class TwitterOAuth2(BaseOAuth2):
class TwitterOAuth2(BaseOAuth2PKCE):
"""Twitter OAuth2 authentication backend"""

name = "twitter-oauth2"
Expand Down Expand Up @@ -40,7 +36,8 @@ class TwitterOAuth2(BaseOAuth2):
("public_metrics", "public_metrics"),
]
PKCE_DEFAULT_CODE_CHALLENGE_METHOD = "s256"
USE_PKCE = True
PKCE_DEFAULT_CODE_VERIFIER_LENGTH = 32
DEFAULT_USE_PKCE = True

def get_user_details(self, response):
"""Return user details from Twitter account"""
Expand Down Expand Up @@ -104,50 +101,3 @@ def user_data(self, access_token, *args, **kwargs):
headers={"Authorization": "Bearer %s" % access_token},
)
return response["data"]

def create_code_verifier(self):
name = self.name + "_code_verifier"
code_verifier = self.strategy.random_string(32)
self.strategy.session_set(name, code_verifier)
return code_verifier

def get_code_verifier(self):
name = self.name + "_code_verifier"
code_verifier = self.strategy.session_get(name)
return code_verifier

def generate_code_challenge(self, code_verifier, challenge_method):
method = challenge_method.lower()
if method == "s256":
hashed = hashlib.sha256(code_verifier.encode()).digest()
encoded = base64.urlsafe_b64encode(hashed)
code_challenge = encoded.decode().replace("=", "") # remove padding
return code_challenge
elif method == "plain":
return code_verifier
else:
raise AuthException("Unsupported code challenge method.")

def auth_params(self, state=None):
params = super().auth_params(state=state)

if self.USE_PKCE:
code_challenge_method = self.setting("PKCE_CODE_CHALLENGE_METHOD")
if not code_challenge_method:
code_challenge_method = self.PKCE_DEFAULT_CODE_CHALLENGE_METHOD
code_verifier = self.create_code_verifier()
code_challenge = self.generate_code_challenge(
code_verifier, code_challenge_method
)
params["code_challenge_method"] = code_challenge_method
params["code_challenge"] = code_challenge
return params

def auth_complete_params(self, state=None):
params = super().auth_complete_params(state=state)

if self.USE_PKCE:
code_verifier = self.get_code_verifier()
params["code_verifier"] = code_verifier

return params
56 changes: 55 additions & 1 deletion social_core/tests/backends/oauth.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from urllib.parse import urlparse

import requests
from httpretty import HTTPretty
from httpretty import HTTPretty, latest_requests

from ...utils import parse_qs, url_add_parameters
from ..models import User
Expand Down Expand Up @@ -121,3 +121,57 @@ def do_refresh_token(self):
social = user.social[0]
social.refresh_token(strategy=self.strategy, **self.refresh_token_arguments())
return user, social


class OAuth2PkcePlainTest(OAuth2Test):
def extra_settings(self):
settings = super().extra_settings()
settings.update(
{f"SOCIAL_AUTH_{self.name}_PKCE_CODE_CHALLENGE_METHOD": "plain"}
)
return settings

def do_login(self):
user = super().do_login()

requests = latest_requests()
auth_request = [
r for r in requests if self.backend.authorization_url() in r.url
][0]
code_challenge = auth_request.querystring.get("code_challenge")[0]
code_challenge_method = auth_request.querystring.get("code_challenge_method")[0]
self.assertIsNotNone(code_challenge)
self.assertEqual(code_challenge_method, "plain")

auth_complete = [
r for r in requests if self.backend.access_token_url() in r.url
][0]
code_verifier = auth_complete.parsed_body.get("code_verifier")[0]
self.assertEqual(code_challenge, code_verifier)

return user


class OAuth2PkceS256Test(OAuth2Test):
def do_login(self):
# use default value of PKCE_CODE_CHALLENGE_METHOD (s256)
user = super().do_login()

requests = latest_requests()
auth_request = [
r for r in requests if self.backend.authorization_url() in r.url
][0]
code_challenge = auth_request.querystring.get("code_challenge")[0]
code_challenge_method = auth_request.querystring.get("code_challenge_method")[0]
self.assertIsNotNone(code_challenge)
self.assertEqual(code_challenge_method, "s256")

auth_complete = [
r for r in requests if self.backend.access_token_url() in r.url
][0]
code_verifier = auth_complete.parsed_body.get("code_verifier")[0]
self.assertEqual(
self.backend.generate_code_challenge(code_verifier, "s256"), code_challenge
)

return user
Loading

0 comments on commit 51ff887

Please sign in to comment.