-
Notifications
You must be signed in to change notification settings - Fork 0
/
oauth2.py
83 lines (65 loc) · 2.55 KB
/
oauth2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from typing import Optional
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import OAuth2
from fastapi.security.utils import get_authorization_scheme_param
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.responses import RedirectResponse
import token
from routers import exception
class OAuth2PasswordBearerCookie(OAuth2):
def __init__(
self,
tokenUrl: str,
scheme_name: str = None,
scopes: dict = None,
auto_error: bool = True,
):
if not scopes:
scopes = {}
flows = OAuthFlowsModel(
password={"tokenUrl": tokenUrl, "scopes": scopes})
super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error)
async def __call__(self, request: Request) -> Optional[str]:
header_authorization: str = request.headers.get("Authorization")
cookie_authorization: str = request.cookies.get("Authorization")
header_scheme, header_param = get_authorization_scheme_param(
header_authorization
)
cookie_scheme, cookie_param = get_authorization_scheme_param(
cookie_authorization
)
if header_scheme.lower() == "bearer":
authorization = True
scheme = header_scheme
param = header_param
elif cookie_scheme.lower() == "bearer":
authorization = True
scheme = cookie_scheme
param = cookie_param
else:
authorization = False
if not authorization or scheme.lower() != "bearer":
if self.auto_error:
# raise HTTPException(
# status_code=status.HTTP_403_FORBIDDEN, detail="Not authenticated"
# )
raise exception.AuthException()
else:
return None
return param
oauth2_scheme = OAuth2PasswordBearerCookie(tokenUrl="auth")
def get_current_user(data: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return token.verify_token(data, credentials_exception)
def login_check(request: Request):
auth = request.cookies.get("Authorization")
cookie_scheme, cookie_param = get_authorization_scheme_param(auth)
if cookie_scheme.lower() == "bearer":
param = cookie_param
if token.verify_token(param).email:
return True
return False